@clawhub-534422530-d42106a7f7
Analyzes market trends and competitor products to provide data-driven recommendations for product selection and optimization.
name: laosi-product-analysis-skill
version: 1.0.0
description: AI-powered product analysis skill for OpenClaw agents - analyzes market trends, competitor products, and provides recommendations for product selection and optimization
author: laosi
homepage: https://github.com/laosi/product-analysis-skill
tags: [product, analysis, market-research, competitor-analysis, recommendations, e-commerce]
FILE:batch_product_analyzer.py
#!/usr/bin/env python3
"""
Batch Product Analysis Manager - Uses SubAgent system for parallel product processing
"""
import json
import os
import sys
import time
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
# Import SubAgent system
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from subagent import SubAgentManager, AgentStatus
# Import our product analysis engine
sys.path.insert(0, str(Path(__file__).parent))
from product_analyzer import ProductAnalyzer
class BatchProductAnalysisManager:
def __init__(self, max_workers: int = 4):
self.manager = SubAgentManager()
self.max_workers = max_workers
self.results_dir = Path(__file__).parent / "batch_results"
self.results_dir.mkdir(exist_ok=True)
def analyze_products_parallel(self, products: List[Dict[str, str]]) -> Dict[str, Any]:
"""
Analyze multiple products in parallel using SubAgent system
"""
start_time = time.time()
# Create SubAgents for each product
agent_ids = []
for i, product in enumerate(products):
name = product.get('name', f'Product_{i+1}')
category = product.get('category', 'general')
agent_name = f"ProductAnalyzer_{i+1}"
task = f"Analyze product: {name}"
context = {
"product_name": name,
"category": category,
"index": i
}
agent_id = self.manager.create_agent(agent_name, task, context)
agent_ids.append(agent_id)
# Process agents in batches (respecting max_workers)
results = {}
completed_agents = set()
while len(completed_agents) < len(agent_ids):
# Check status of running agents
for agent_id in agent_ids:
if agent_id in completed_agents:
continue
agent = self.manager.get_agent(agent_id)
if not agent:
continue
# If agent is not yet started, start it
if agent.status == AgentStatus.PENDING:
self._process_product_agent(agent_id)
# If agent completed, collect result
if agent.status in [AgentStatus.COMPLETED, AgentStatus.FAILED]:
if agent_id not in completed_agents:
results[agent_id] = {
"agent_id": agent_id,
"name": agent.name,
"task": agent.task,
"status": agent.status.value,
"result": agent.result,
"error": agent.error,
"created_at": agent.created_at,
"completed_at": agent.completed_at
}
completed_agents.add(agent_id)
# Small delay to prevent busy waiting
time.sleep(0.1)
# Timeout check (5 minutes max)
if time.time() - start_time > 300:
break
end_time = time.time()
processing_time = end_time - start_time
# Format final results
batch_results = {
"batch_info": {
"total_products": len(products),
"processed_products": len([r for r in results.values() if r["status"] == "completed"]),
"failed_products": len([r for r in results.values() if r["status"] == "failed"]),
"processing_time_seconds": round(processing_time, 2),
"timestamp": datetime.now().isoformat(),
"max_workers": self.max_workers
},
"results": [],
"summary": {
"average_market_trend_score": 0,
"average_competitor_score": 0,
"average_category_fit_score": 0,
"average_risk_score": 0,
"trend_distribution": {"high": 0, "medium": 0, "low": 0},
"competition_distribution": {"high": 0, "medium": 0, "low": 0}
}
}
# Process results
market_trend_scores = []
competitor_scores = []
category_fit_scores = []
risk_scores = []
for result in results.values():
if result["status"] == "completed" and result["result"]:
analysis_result = result["result"]
batch_results["results"].append({
"product_name": analysis_result.get("product_name", "unknown"),
"category": analysis_result.get("category", "general"),
"market_trend_score": analysis_result.get("market_trends", {}).get("score", 0),
"competitor_score": analysis_result.get("competitor_analysis", {}).get("score", 0),
"category_fit_score": analysis_result.get("category_fit", {}).get("score", 0),
"risk_score": analysis_result.get("risk_assessment", {}).get("score", 0),
"recommendations_count": len(analysis_result.get("recommendations", [])),
"optimization_suggestions_count": len(analysis_result.get("optimization_suggestions", []))
})
# Accumulate for summary
market_trend_scores.append(analysis_result.get("market_trends", {}).get("score", 0))
competitor_scores.append(analysis_result.get("competitor_analysis", {}).get("score", 0))
category_fit_scores.append(analysis_result.get("category_fit", {}).get("score", 0))
risk_scores.append(analysis_result.get("risk_assessment", {}).get("score", 0))
# Trend distribution
trend_score = analysis_result.get("market_trends", {}).get("score", 0)
if trend_score >= 70:
batch_results["summary"]["trend_distribution"]["high"] += 1
elif trend_score >= 40:
batch_results["summary"]["trend_distribution"]["medium"] += 1
else:
batch_results["summary"]["trend_distribution"]["low"] += 1
# Competition distribution (inverted: lower score means higher competition)
comp_score = analysis_result.get("competitor_analysis", {}).get("score", 0)
if comp_score >= 70:
batch_results["summary"]["competition_distribution"]["low"] += 1 # Low competition
elif comp_score >= 40:
batch_results["summary"]["competition_distribution"]["medium"] += 1
else:
batch_results["summary"]["competition_distribution"]["high"] += 1 # High competition
# Calculate average scores
if market_trend_scores:
batch_results["summary"]["average_market_trend_score"] = round(sum(market_trend_scores) / len(market_trend_scores), 2)
if competitor_scores:
batch_results["summary"]["average_competitor_score"] = round(sum(competitor_scores) / len(competitor_scores), 2)
if category_fit_scores:
batch_results["summary"]["average_category_fit_score"] = round(sum(category_fit_scores) / len(category_fit_scores), 2)
if risk_scores:
batch_results["summary"]["average_risk_score"] = round(sum(risk_scores) / len(risk_scores), 2)
return batch_results
def _process_product_agent(self, agent_id: str):
"""Process a single product analysis agent"""
agent = self.manager.get_agent(agent_id)
if not agent:
return
# Update status to running
self.manager.update_status(agent_id, AgentStatus.RUNNING)
try:
# Get context
product_name = agent.context.get("product_name")
category = agent.context.get("category", "general")
if not product_name:
raise ValueError("Product name is required")
# Analyze product
analyzer = ProductAnalyzer()
result = analyzer.analyze_product(product_name, category)
# Update agent with result
self.manager.update_status(agent_id, AgentStatus.COMPLETED, result=result)
except Exception as e:
# Update agent with error
self.manager.update_status(agent_id, AgentStatus.FAILED, error=str(e))
def save_batch_report(self, batch_results: Dict[str, Any], output_file: str = None) -> str:
"""Save batch analysis results to file"""
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = self.results_dir / f"batch_analysis_report_{timestamp}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(batch_results, f, indent=2, ensure_ascii=False)
return str(output_file)
def print_batch_summary(self, batch_results: Dict[str, Any]):
"""Print human-readable batch summary"""
info = batch_results["batch_info"]
summary = batch_results["summary"]
print("=" * 70)
print("BATCH PRODUCT ANALYSIS REPORT")
print("=" * 70)
print(f"Total Products: {info['total_products']}")
print(f"Processed: {info['processed_products']}")
print(f"Failed: {info['failed_products']}")
print(f"Processing Time: {info['processing_time_seconds']} seconds")
print(f"Timestamp: {info['timestamp']}")
print(f"Max Workers: {info['max_workers']}")
print("-" * 70)
print("SUMMARY:")
print(f" Average Market Trend Score: {summary['average_market_trend_score']}/100")
print(f" Average Competitor Score: {summary['average_competitor_score']}/100")
print(f" Average Category Fit Score: {summary['average_category_fit_score']}/100")
print(f" Average Risk Score: {summary['average_risk_score']}/100 (lower is better)")
print(f" Trend Distribution: High:{summary['trend_distribution']['high']} Medium:{summary['trend_distribution']['medium']} Low:{summary['trend_distribution']['low']}")
print(f" Competition Distribution: High:{summary['competition_distribution']['high']} Medium:{summary['competition_distribution']['medium']} Low:{summary['competition_distribution']['low']}")
print("-" * 70)
print("INDIVIDUAL RESULTS:")
for i, result in enumerate(batch_results["results"], 1):
print(f"{i}. {result['product_name']} ({result['category']})")
print(f" Market Trend: {result['market_trend_score']}/100")
print(f" Competitor Analysis: {result['competitor_score']}/100")
print(f" Category Fit: {result['category_fit_score']}/100")
print(f" Risk Score: {result['risk_score']}/100 (lower is better)")
print(f" Recommendations: {result['recommendations_count']}")
print(f" Optimization Suggestions: {result['optimization_suggestions_count']}")
print()
def main():
if len(sys.argv) < 2:
print("Usage: python batch_product_analyzer.py <product_file> [--workers <n>]")
print("Example: python batch_product_analyzer.py products.json --workers 4")
print("Product file format: JSON array of objects with 'name' and 'category' fields")
sys.exit(1)
# Parse arguments
args = sys.argv[1:]
product_file = None
max_workers = 4
i = 0
while i < len(args):
if args[i] == "--workers" and i + 1 < len(args):
try:
max_workers = int(args[i + 1])
except ValueError:
pass
i += 2
else:
product_file = args[i]
i += 1
if not product_file:
print("Error: No product file specified")
sys.exit(1)
# Validate file exists
if not os.path.exists(product_file):
print(f"Error: File not found - {product_file}")
sys.exit(1)
# Load products from file
try:
with open(product_file, 'r', encoding='utf-8') as f:
products = json.load(f)
if not isinstance(products, list):
print("Error: Product file must contain a JSON array")
sys.exit(1)
# Validate each product has required fields
valid_products = []
for i, product in enumerate(products):
if not isinstance(product, dict):
print(f"Warning: Product at index {i} is not an object, skipping")
continue
name = product.get('name')
if not name:
print(f"Warning: Product at index {i} missing 'name' field, skipping")
continue
valid_products.append(product)
if not valid_products:
print("Error: No valid products found in file")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in product file: {e}")
sys.exit(1)
# Process batch
manager = BatchProductAnalysisManager(max_workers=max_workers)
print(f"Starting batch analysis of {len(valid_products)} products with {max_workers} workers...")
results = manager.analyze_products_parallel(valid_products)
# Display results
manager.print_batch_summary(results)
# Save results
output_file = manager.save_batch_report(results)
print(f"Detailed results saved to: {output_file}")
if __name__ == "__main__":
main()
FILE:batch_results/batch_analysis_report_20260427_121739.json
{
"batch_info": {
"total_products": 5,
"processed_products": 5,
"failed_products": 0,
"processing_time_seconds": 0.1,
"timestamp": "2026-04-27T12:17:39.450621",
"max_workers": 2
},
"results": [
{
"product_name": "Wireless Earbuds",
"category": "electronics",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Smart Watch",
"category": "electronics",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Yoga Mat",
"category": "sports",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Running Shoes",
"category": "sports",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Coffee Maker",
"category": "home_goods",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
}
],
"summary": {
"average_market_trend_score": 75.0,
"average_competitor_score": 60.0,
"average_category_fit_score": 80.0,
"average_risk_score": 30.0,
"trend_distribution": {
"high": 5,
"medium": 0,
"low": 0
},
"competition_distribution": {
"high": 0,
"medium": 5,
"low": 0
}
}
}
FILE:batch_results/batch_analysis_report_20260427_123328.json
{
"batch_info": {
"total_products": 5,
"processed_products": 5,
"failed_products": 0,
"processing_time_seconds": 0.1,
"timestamp": "2026-04-27T12:33:28.534957",
"max_workers": 3
},
"results": [
{
"product_name": "Wireless Earbuds",
"category": "electronics",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Smart Watch",
"category": "electronics",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Yoga Mat",
"category": "sports",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Running Shoes",
"category": "sports",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
},
{
"product_name": "Coffee Maker",
"category": "home_goods",
"market_trend_score": 75,
"competitor_score": 60,
"category_fit_score": 80,
"risk_score": 30,
"recommendations_count": 3,
"optimization_suggestions_count": 2
}
],
"summary": {
"average_market_trend_score": 75.0,
"average_competitor_score": 60.0,
"average_category_fit_score": 80.0,
"average_risk_score": 30.0,
"trend_distribution": {
"high": 5,
"medium": 0,
"low": 0
},
"competition_distribution": {
"high": 0,
"medium": 5,
"low": 0
}
}
}
FILE:claw.json
{
"name": "laosi-product-analysis-skill",
"displayName": "LAOSI Product Analysis Skill",
"description": "AI-powered product analysis skill for OpenClaw agents - analyzes market trends, competitor products, and provides recommendations for product selection and optimization",
"version": "1.0.0",
"author": "laosi",
"homepage": "https://github.com/laosi/product-analysis-skill",
"license": "MIT",
"keywords": ["product", "analysis", "market-research", "competitor-analysis", "recommendations", "e-commerce"],
"category": "productivity",
"engines": {
"openclaw": ">=1.0.0"
},
"scripts": {
"analyze": "python product_analysis.py"
},
"files": [
"SKILL.md",
"README.md",
"claw.json",
"product_analyzer.py",
"product_analysis.py",
"market_patterns/",
"templates/"
]
}
FILE:examples/sample_products.json
[
{
"name": "Wireless Earbuds",
"category": "electronics"
},
{
"name": "Smart Watch",
"category": "electronics"
},
{
"name": "Yoga Mat",
"category": "sports"
}
]
FILE:market_patterns/default_patterns.json
{
"trend_indicators": [
"rising demand",
"increasing search volume",
"positive sentiment",
"growing market size"
],
"risk_factors": [
"high competition",
"declining interest",
"negative reviews",
"regulatory issues"
],
"optimization_areas": [
"pricing strategy",
"product features",
"marketing channels",
"customer service"
],
"category_weights": {
"electronics": {
"innovation": 0.3,
"price": 0.25,
"brand": 0.2,
"features": 0.15,
"support": 0.1
},
"fashion": {
"style": 0.3,
"price": 0.2,
"brand": 0.2,
"quality": 0.2,
"trend": 0.1
},
"home_goods": {
"durability": 0.25,
"price": 0.2,
"design": 0.2,
"functionality": 0.2,
"reviews": 0.15
}
}
}
FILE:product_analysis.py
#!/usr/bin/env python3
"""
Main entry point for the product analysis skill.
This script is called by the claw.json script.
"""
import sys
import os
import json
from product_analyzer import ProductAnalyzer
def main():
# Initialize the analyzer
analyzer = ProductAnalyzer()
# For now, we just print a message and example usage.
# In a real implementation, we would parse command line arguments and perform analysis.
print("LAOSI Product Analysis Skill")
print("============================")
print("This skill analyzes market trends, competitor products, and provides recommendations.")
print()
print("Usage:")
print(" product_analysis.py --product \"Product Name\" --category \"Electronics\"")
print(" product_analysis.py --file products.json --output results.json")
print()
print("Example:")
print(" product_analysis.py --product \"Wireless Earbuds\" --category \"electronics\"")
print()
# Example analysis
example = analyzer.analyze_product("Wireless Earbuds", "electronics")
print("Example Analysis for 'Wireless Earbuds' (electronics):")
print(json.dumps(example, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
FILE:product_analyzer.py
import json
import os
from typing import Dict, List, Any
from datetime import datetime
class ProductAnalyzer:
def __init__(self, patterns_file: str = None):
"""
Initialize the ProductAnalyzer with optional patterns file.
"""
if patterns_file is None:
# Default to the patterns in the same directory
patterns_file = os.path.join(os.path.dirname(__file__), 'market_patterns', 'default_patterns.json')
with open(patterns_file, 'r', encoding='utf-8') as f:
self.patterns = json.load(f)
def analyze_product(self, product_name: str, category: str = "general") -> Dict[str, Any]:
"""
Analyze a single product and return insights.
This is a simplified version - in a real implementation, this would use web search, APIs, etc.
"""
# In a real implementation, we would fetch data from the web, APIs, etc.
# For now, we return a mock analysis based on the product name and category.
# Determine category-specific weights
category_weights = self.patterns.get('category_weights', {}).get(category, {
"importance": 0.4,
"demand": 0.3,
"competition": 0.2,
"trend": 0.1
})
# Mock analysis - replace with actual data collection and analysis
analysis = {
"product_name": product_name,
"category": category,
"timestamp": datetime.now().isoformat(),
"market_trends": {
"score": 75, # out of 100
"indicators": self.patterns['trend_indicators'][:2], # Just for example
"summary": "Moderate growth with stable demand"
},
"competitor_analysis": {
"score": 60,
"summary": "Moderate competition with differentiation opportunities"
},
"recommendations": [
"Focus on unique features to differentiate",
"Consider competitive pricing strategy",
"Leverage online marketing channels"
],
"optimization_suggestions": self.patterns['optimization_areas'][:2],
"risk_assessment": {
"score": 30, # Lower is better
"factors": self.patterns['risk_factors'][:2]
},
"category_fit": {
"score": 80,
"weights": category_weights
}
}
return analysis
def analyze_multiple_products(self, products: List[Dict[str, str]]) -> List[Dict[str, Any]]:
"""
Analyze multiple products.
"""
results = []
for product in products:
name = product.get('name', 'Unknown Product')
category = product.get('category', 'general')
analysis = self.analyze_product(name, category)
results.append(analysis)
return results
def main():
"""
Main function for command-line usage.
"""
import argparse
parser = argparse.ArgumentParser(description='Analyze products for market trends and recommendations.')
parser.add_argument('--product', type=str, help='Product name to analyze')
parser.add_argument('--category', type=str, default='general', help='Product category')
parser.add_argument('--file', type=str, help='File containing products to analyze (JSON format)')
parser.add_argument('--output', type=str, help='Output file for results (JSON format)')
args = parser.parse_args()
analyzer = ProductAnalyzer()
if args.file:
# Load products from file
with open(args.file, 'r', encoding='utf-8') as f:
products = json.load(f)
results = analyzer.analyze_multiple_products(products)
output = {
"analysis_count": len(results),
"results": results
}
elif args.product:
# Analyze single product
result = analyzer.analyze_product(args.product, args.category)
output = result
else:
parser.error("Either --product or --file must be specified")
# Output results
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
print(f"Analysis results saved to {args.output}")
else:
print(json.dumps(output, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
FILE:README.md
# LAOSI Product Analysis Skill
AI-powered product analysis skill for OpenClaw agents that analyzes market trends, competitor products, and provides recommendations for product selection and optimization.
## Features
- Market trend analysis
- Competitor product comparison
- Product selection recommendations
- Optimization suggestions
- Batch processing capabilities
- Integration with OpenClaw's SubAgent system for parallel analysis
## Installation
Install via ClawHub:
```bash
clawhub install laosi-product-analysis-skill
```
## Usage
### Basic Usage
```bash
laosi-product-analysis-skill analyze --product "Product Name" --category "Electronics"
```
### Batch Analysis
```bash
laosi-product-analysis-skill analyze --file products.txt --output results.json
```
### Activation Words
In the laosi system, you can activate this skill with:
- 选品分析
- product analysis
- 市场分析
- market analysis
## Configuration
The skill can be configured through environment variables or a config file. See `config_template.json` in the templates directory.
## Output Format
The skill returns analysis results in JSON format including:
- Market trends score
- Competitor analysis
- Recommendations
- Optimization suggestions
- Risk assessment
## Examples
See the `examples` directory for sample usage.
## Requirements
- Python 3.8+
- OpenClaw framework
## License
MIT
FILE:test_products.json
[
{
"name": "Wireless Earbuds",
"category": "electronics"
},
{
"name": "Smart Watch",
"category": "electronics"
},
{
"name": "Yoga Mat",
"category": "sports"
},
{
"name": "Running Shoes",
"category": "sports"
},
{
"name": "Coffee Maker",
"category": "home_goods"
}
]AI-powered contract review that identifies risky clauses, missing provisions, and compliance issues in legal documents for informed decision-making.
name: laosi-contract-review-skill
version: 1.0.0
description: AI-powered contract review skill for OpenClaw agents - identifies risky clauses, missing provisions, and compliance issues in legal documents
author: laosi
homepage: https://github.com/laosi/contract-review-skill
tags: [legal, contract, review, compliance, risk-assessment, document-analysis]
FILE:batch_results/batch_review_report_20260427_095105.json
{
"batch_info": {
"total_contracts": 2,
"processed_contracts": 2,
"failed_contracts": 0,
"processing_time_seconds": 0.19,
"timestamp": "2026-04-27T09:51:05.149558",
"industry": "general",
"max_workers": 2
},
"results": [
{
"contract_file": "C:\\Users\\pc\\.laosi\\sample_contract.txt",
"contract_name": "sample_contract.txt",
"risk_score": 40,
"risk_level": "HIGH",
"findings_count": 5,
"high_risk_count": 0,
"medium_risk_count": 0,
"low_risk_count": 0,
"critical_risk_count": 0
},
{
"contract_file": "C:\\Users\\pc\\.laosi\\sample_contract_2.txt",
"contract_name": "sample_contract_2.txt",
"risk_score": 48,
"risk_level": "HIGH",
"findings_count": 6,
"high_risk_count": 0,
"medium_risk_count": 0,
"low_risk_count": 0,
"critical_risk_count": 0
}
],
"summary": {
"total_high_risk": 0,
"total_medium_risk": 0,
"total_low_risk": 0,
"average_risk_score": 44.0,
"risk_distribution": {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
}
}
}
FILE:batch_results/batch_review_report_20260427_123317.json
{
"batch_info": {
"total_contracts": 1,
"processed_contracts": 1,
"failed_contracts": 0,
"processing_time_seconds": 0.13,
"timestamp": "2026-04-27T12:33:17.569710",
"industry": "general",
"max_workers": 4
},
"results": [
{
"contract_file": "C:\\Users\\pc\\test_contract.txt",
"contract_name": "test_contract.txt",
"risk_score": 40,
"risk_level": "HIGH",
"findings_count": 5,
"high_risk_count": 0,
"medium_risk_count": 3,
"low_risk_count": 2,
"critical_risk_count": 0
}
],
"summary": {
"total_high_risk": 0,
"total_medium_risk": 3,
"total_low_risk": 2,
"average_risk_score": 40.0,
"risk_distribution": {
"critical": 0,
"high": 0,
"medium": 3,
"low": 2
}
}
}
FILE:batch_reviewer.py
#!/usr/bin/env python3
"""
Batch Contract Review Manager - Uses SubAgent system for parallel contract processing
"""
import json
import os
import sys
import time
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
# Import SubAgent system
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from subagent import SubAgentManager, AgentStatus
# Import our contract review engine
sys.path.insert(0, str(Path(__file__).parent))
from review_engine import ContractReviewEngine
class BatchContractReviewManager:
def __init__(self, max_workers: int = 4):
self.manager = SubAgentManager()
self.max_workers = max_workers
self.engine = ContractReviewEngine(Path(__file__).parent / "legal_patterns")
self.results_dir = Path(__file__).parent / "batch_results"
self.results_dir.mkdir(exist_ok=True)
def review_contracts_parallel(self, contract_files: List[str], industry: str = None) -> Dict[str, Any]:
"""
Review multiple contracts in parallel using SubAgent system
"""
start_time = time.time()
# Create SubAgents for each contract
agent_ids = []
for i, contract_file in enumerate(contract_files):
if not os.path.exists(contract_file):
continue
agent_name = f"ContractReviewer_{i+1}"
task = f"Review contract: {os.path.basename(contract_file)}"
context = {
"contract_file": contract_file,
"industry": industry,
"index": i
}
agent_id = self.manager.create_agent(agent_name, task, context)
agent_ids.append(agent_id)
# Process agents in batches (respecting max_workers)
results = {}
completed_agents = set()
while len(completed_agents) < len(agent_ids):
# Check status of running agents
for agent_id in agent_ids:
if agent_id in completed_agents:
continue
agent = self.manager.get_agent(agent_id)
if not agent:
continue
# If agent is not yet started, start it
if agent.status == AgentStatus.PENDING:
self._process_contract_agent(agent_id)
# If agent completed, collect result
if agent.status in [AgentStatus.COMPLETED, AgentStatus.FAILED]:
if agent_id not in completed_agents:
results[agent_id] = {
"agent_id": agent_id,
"name": agent.name,
"task": agent.task,
"status": agent.status.value,
"result": agent.result,
"error": agent.error,
"created_at": agent.created_at,
"completed_at": agent.completed_at
}
completed_agents.add(agent_id)
# Small delay to prevent busy waiting
time.sleep(0.1)
# Timeout check (5 minutes max)
if time.time() - start_time > 300:
break
end_time = time.time()
processing_time = end_time - start_time
# Format final results
batch_results = {
"batch_info": {
"total_contracts": len(contract_files),
"processed_contracts": len([r for r in results.values() if r["status"] == "completed"]),
"failed_contracts": len([r for r in results.values() if r["status"] == "failed"]),
"processing_time_seconds": round(processing_time, 2),
"timestamp": datetime.now().isoformat(),
"industry": industry or "general",
"max_workers": self.max_workers
},
"results": [],
"summary": {
"total_high_risk": 0,
"total_medium_risk": 0,
"total_low_risk": 0,
"average_risk_score": 0,
"risk_distribution": {"critical": 0, "high": 0, "medium": 0, "low": 0}
}
}
# Process results
risk_scores = []
for result in results.values():
if result["status"] == "completed" and result["result"]:
contract_result = result["result"]
batch_results["results"].append({
"contract_file": contract_result.get("metadata", {}).get("contract_file", "unknown"),
"contract_name": os.path.basename(contract_result.get("metadata", {}).get("contract_file", "unknown")),
"risk_score": contract_result.get("risk_score", 0),
"risk_level": self._score_to_level(contract_result.get("risk_score", 0)),
"findings_count": len(contract_result.get("findings", [])),
"high_risk_count": len([f for f in contract_result.get("findings", []) if f.get("risk_level") == "high"]),
"medium_risk_count": len([f for f in contract_result.get("findings", []) if f.get("risk_level") == "medium"]),
"low_risk_count": len([f for f in contract_result.get("findings", []) if f.get("risk_level") == "low"]),
"critical_risk_count": len([f for f in contract_result.get("findings", []) if f.get("risk_level") == "critical"])
})
# Accumulate for summary
risk_scores.append(contract_result.get("risk_score", 0))
batch_results["summary"]["total_high_risk"] += len([f for f in contract_result.get("findings", []) if f.get("risk_level") == "high"])
batch_results["summary"]["total_medium_risk"] += len([f for f in contract_result.get("findings", []) if f.get("risk_level") == "medium"])
batch_results["summary"]["total_low_risk"] += len([f for f in contract_result.get("findings", []) if f.get("risk_level") == "low"])
# Risk distribution
for finding in contract_result.get("findings", []):
risk_level = finding.get("risk_level", "low")
if risk_level == "critical":
batch_results["summary"]["risk_distribution"]["critical"] += 1
elif risk_level == "high":
batch_results["summary"]["risk_distribution"]["high"] += 1
elif risk_level == "medium":
batch_results["summary"]["risk_distribution"]["medium"] += 1
elif risk_level == "low":
batch_results["summary"]["risk_distribution"]["low"] += 1
# Calculate average risk score
if risk_scores:
batch_results["summary"]["average_risk_score"] = round(sum(risk_scores) / len(risk_scores), 2)
return batch_results
def _process_contract_agent(self, agent_id: str):
"""Process a single contract review agent"""
agent = self.manager.get_agent(agent_id)
if not agent:
return
# Update status to running
self.manager.update_status(agent_id, AgentStatus.RUNNING)
try:
# Get context
contract_file = agent.context.get("contract_file")
industry = agent.context.get("industry")
if not contract_file or not os.path.exists(contract_file):
raise FileNotFoundError(f"Contract file not found: {contract_file}")
# Read contract
contract_text = Path(contract_file).read_text(encoding='utf-8')
# Review contract
result = self.engine.review_contract(contract_text, industry)
# Add file info to result
result["metadata"]["contract_file"] = contract_file
result["metadata"]["file_size"] = os.path.getsize(contract_file)
# Update agent with result
self.manager.update_status(agent_id, AgentStatus.COMPLETED, result=result)
except Exception as e:
# Update agent with error
self.manager.update_status(agent_id, AgentStatus.FAILED, error=str(e))
def _score_to_level(self, score: int) -> str:
"""Convert risk score to level label"""
if score >= 80:
return "LOW"
elif score >= 60:
return "MEDIUM"
elif score >= 40:
return "HIGH"
else:
return "CRITICAL"
def save_batch_report(self, batch_results: Dict[str, Any], output_file: str = None) -> str:
"""Save batch review results to file"""
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = self.results_dir / f"batch_review_report_{timestamp}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(batch_results, f, indent=2, ensure_ascii=False)
return str(output_file)
def print_batch_summary(self, batch_results: Dict[str, Any]):
"""Print human-readable batch summary"""
info = batch_results["batch_info"]
summary = batch_results["summary"]
print("=" * 70)
print("BATCH CONTRACT REVIEW REPORT")
print("=" * 70)
print(f"Total Contracts: {info['total_contracts']}")
print(f"Processed: {info['processed_contracts']}")
print(f"Failed: {info['failed_contracts']}")
print(f"Processing Time: {info['processing_time_seconds']} seconds")
print(f"Timestamp: {info['timestamp']}")
print(f"Industry Focus: {info['industry']}")
print(f"Max Workers: {info['max_workers']}")
print("-" * 70)
print("SUMMARY:")
print(f" Average Risk Score: {summary['average_risk_score']}/100")
print(f" Total High Risk Findings: {summary['total_high_risk']}")
print(f" Total Medium Risk Findings: {summary['total_medium_risk']}")
print(f" Total Low Risk Findings: {summary['total_low_risk']}")
print(f" Risk Distribution:")
print(f" Critical: {summary['risk_distribution']['critical']}")
print(f" High: {summary['risk_distribution']['high']}")
print(f" Medium: {summary['risk_distribution']['medium']}")
print(f" Low: {summary['risk_distribution']['low']}")
print("-" * 70)
print("INDIVIDUAL RESULTS:")
for i, result in enumerate(batch_results["results"], 1):
print(f"{i}. {result['contract_name']}")
print(f" Risk Score: {result['risk_score']}/100 ({result['risk_level']})")
print(f" Findings: {result['findings_count']} "
f"(H:{result['high_risk_count']} M:{result['medium_risk_count']} "
f"L:{result['low_risk_count']} C:{result['critical_risk_count']})")
print()
def main():
if len(sys.argv) < 2:
print("Usage: python batch_reviewer.py <contract_file1> [contract_file2] ... [--industry <industry>] [--workers <n>]")
print("Example: python batch_reviewer.py contract1.txt contract2.txt --industry tech --workers 4")
sys.exit(1)
# Parse arguments
args = sys.argv[1:]
contract_files = []
industry = None
max_workers = 4
i = 0
while i < len(args):
if args[i] == "--industry" and i + 1 < len(args):
industry = args[i + 1]
i += 2
elif args[i] == "--workers" and i + 1 < len(args):
try:
max_workers = int(args[i + 1])
except ValueError:
pass
i += 2
else:
contract_files.append(args[i])
i += 1
if not contract_files:
print("Error: No contract files specified")
sys.exit(1)
# Validate files exist
valid_files = []
for f in contract_files:
if os.path.exists(f):
valid_files.append(f)
else:
print(f"Warning: File not found - {f}")
if not valid_files:
print("Error: No valid contract files found")
sys.exit(1)
# Process batch
manager = BatchContractReviewManager(max_workers=max_workers)
print(f"Starting batch review of {len(valid_files)} contracts with {max_workers} workers...")
results = manager.review_contracts_parallel(valid_files, industry)
# Display results
manager.print_batch_summary(results)
# Save results
output_file = manager.save_batch_report(results)
print(f"Detailed results saved to: {output_file}")
if __name__ == "__main__":
main()
FILE:claw.json
{
"name": "laosi-contract-review-skill",
"displayName": "LAOSI Contract Review Skill",
"description": "AI-powered contract review skill for OpenClaw agents - identifies risky clauses, missing provisions, and compliance issues in legal documents",
"version": "1.0.1",
"author": "laosi",
"homepage": "https://github.com/laosi/contract-review-skill",
"license": "MIT",
"keywords": ["legal", "contract", "review", "compliance", "risk-assessment", "document-analysis"],
"category": "productivity",
"engines": {
"openclaw": ">=1.0.0"
},
"scripts": {
"review": "python contract_reviewer.py"
},
"files": [
"SKILL.md",
"README.md",
"claw.json",
"review_engine.py",
"contract_reviewer.py",
"legal_patterns/",
"templates/"
]
}
FILE:contract_reviewer.py
#!/usr/bin/env python3
"""
Contract Review Skill - CLI Wrapper for OpenClaw Skill Chain
Provides easy-to-use interface for the contract reviewing functionality
"""
import json
import sys
import os
from pathlib import Path
def main():
if len(sys.argv) < 2:
print('{"error": "Usage: contract-review-skill <path_to_contract> [--industry <industry>] [--format <json|text>]"}')
sys.exit(1)
target_path = sys.argv[1]
industry = None
output_format = "text" # default
# Parse optional arguments
i = 2
while i < len(sys.argv):
if sys.argv[i] == "--industry" and i + 1 < len(sys.argv):
industry = sys.argv[i + 1]
i += 2
elif sys.argv[i] == "--format" and i + 1 < len(sys.argv):
output_format = sys.argv[i + 1]
i += 2
else:
i += 1
# Read contract content
try:
contract_text = Path(target_path).read_text(encoding='utf-8')
except Exception as e:
print(json.dumps({"error": f"Failed to read contract file: {str(e)}"}, ensure_ascii=False))
sys.exit(1)
# Import and run the review engine
sys.path.insert(0, str(Path(__file__).parent))
from review_engine import ContractReviewEngine
engine = ContractReviewEngine(Path(__file__).parent / "legal_patterns")
result = engine.review_contract(contract_text, industry)
# Output as requested format
if output_format == "json":
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
# Format as human-readable report
print(format_report(result, Path(target_path).name))
def format_report(report: dict, contract_name: str) -> str:
"""Format review results as human-readable report."""
findings = report.get("findings", [])
risk_score = report.get("risk_score", 0)
summary = report.get("summary", {})
# Determine risk level label (lower score means higher risk)
if risk_score >= 80:
risk_label = "LOW"
elif risk_score >= 60:
risk_label = "MEDIUM"
elif risk_score >= 40:
risk_label = "HIGH"
else:
risk_label = "CRITICAL"
# Count by risk level
by_level = summary.get("by_risk_level", {"critical": 0, "high": 0, "medium": 0, "low": 0})
critical_count = by_level.get("critical", 0)
high_count = by_level.get("high", 0)
medium_count = by_level.get("medium", 0)
low_count = by_level.get("low", 0)
lines = [
"=" * 60,
"CONTRACT REVIEW REPORT",
"=" * 60,
f"Contract: {contract_name}",
f"Overall Risk Score: {risk_score}/100 ({risk_label})",
"Risk Distribution:",
f" - Critical: {critical_count} clauses",
f" - High: {high_count} clauses",
f" - Medium: {medium_count} clauses",
f" - Low: {low_count} clauses",
""
]
if findings:
lines.append("Top Risk Findings:")
# Sort findings by risk level (critical first)
risk_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
sorted_findings = sorted(findings, key=lambda f: risk_order.get(f["risk_level"], 4))
for finding in sorted_findings[:5]: # Show top 5
risk_level = finding['risk_level']
if hasattr(risk_level, 'value'):
risk_level_str = risk_level.value.upper()
else:
risk_level_str = str(risk_level).upper()
lines.append(f" [{risk_level_str}] {finding['title']}")
lines.append(f" Location: {finding['location']}")
lines.append(f" Issue: {finding['description']}")
lines.append(f" Suggestion: {finding['suggestion']}")
if finding.get("reference"):
lines.append(f" Reference: {finding['reference']}")
lines.append("")
if len(findings) > 5:
lines.append(f" ... and {len(findings) - 5} more findings")
lines.append("")
lines.append("Recommendations:")
if critical_count > 0:
lines.append(" 1. [CRITICAL] Address all critical risk items immediately - do not sign without legal review")
if high_count > 0:
lines.append(" 2. [HIGH] Address high risk items before signing or seek legal counsel")
if medium_count > 0:
lines.append(" 3. [MEDIUM] Consider negotiating medium risk clauses based on your leverage position")
if low_count > 0:
lines.append(" 4. [LOW] Minor items - standard business practice")
if critical_count == 0 and high_count == 0 and medium_count == 0:
lines.append(" [LOW] No significant risks identified - contract appears reasonably balanced")
lines.extend([
"",
"=" * 60
])
return "\n".join(lines)
if __name__ == "__main__":
main()
FILE:fix_json.py
import json
import os
data = {
"confidentiality": {
"title": "Missing Confidentiality Clause",
"description": "Contract lacks explicit confidentiality obligations for protecting proprietary information",
"pattern": "(confidential|confidentiality|non-disclosure|nda).*?(obligation|duty|responsibility|requirement)|(proprietary|confidential).*?(information|data|knowledge).*?(shall be|must be|is required to be).*?(kept confidential|maintained in confidence)",
"risk_level": "MEDIUM",
"suggestion": "Add a standard confidentiality clause defining what information is confidential and obligations to protect it",
"reference": "Standard Business Practice and Trade Secret Protection Laws"
},
"intellectual_property": {
"title": "Missing Intellectual Property Clause",
"description": "Contract does not address ownership, licensing, or use of intellectual property",
"pattern": "(intellectual property|ip|copyright|patent|trademark|trade secret|proprietary).*?(ownership|title|interest|rights|license|usage).*?(retains|retain|remains with|belongs to)|(work product|deliverables|materials).*?(created|developed|produced).*?(shall be|is|are).*?(the exclusive property of|owned by)",
"risk_level": "MEDIUM",
"suggestion": "Add IP clause specifying ownership of pre-existing IP, ownership of work product, and license grants",
"reference": "Copyright Law and Patent Act Principles"
},
"payment_terms": {
"title": "Unclear Payment Terms",
"description": "Payment amount, schedule, or method is not clearly defined",
"pattern": "(payment|pay|fee|charge|compensation|remuneration).*?(amount|sum|fee|rate).*?(shall be|is|will be|equals|totals).*?\\$?\\d+(\\.\\d+)?(?:\s*(USD|dollars|RMB|yuan|EUR|euros|GBP|pounds))?|(invoice|bill).*?(submit|send|issue).*?(monthly|quarterly|upon completion|net \\d+)",
"risk_level": "MEDIUM",
"suggestion": "Specify exact payment amounts, schedule (e.g., monthly, milestone-based), method, and late payment penalties",
"reference": "Standard Contract Payment Practices"
},
"term_and_termination": {
"title": "Unclear Term and Termination",
"description": "Contract duration, renewal terms, or termination conditions are not clearly specified",
"pattern": "(term|duration).*?(shall be|is|will be|equals).*?\\d+.*?(day|days|week|weeks|month|months|year|years)|(termination|terminate|end|cancel).*?(for cause|without cause|with notice).*?(notice|notification).*?\\d+.*?(day|days|week|weeks|month|months)",
"risk_level": "MEDIUM",
"suggestion": "Clearly define initial term, renewal options, termination for cause and without cause procedures, and required notice periods",
"reference": "Standard Contract Duration and Termination Practices"
},
"dispute_resolution": {
"title": "Missing Dispute Resolution Clause",
"description": "Contract does not specify how disputes will be resolved (negotiation, mediation, arbitration, litigation)",
"pattern": "(dispute|disagreement|controversy|claim).*?(resolution|settled|resolved).*?(through|via|by means of).*?(negotiation|mediation|arbitration|litigation|court)|(arbitration).*?(shall be|is|will be).*?(final|binding)|(governing law).*?(shall be|is|will be).*?[A-Z][a-z]+",
"risk_level": "MEDIUM",
"suggestion": "Add dispute resolution clause specifying negotiation first, then mediation, then binding arbitration or litigation with specified jurisdiction",
"reference": "Standard Dispute Resolution Practices and Arbitration Law"
},
"force_majeure": {
"title": "Missing Force Majeure Clause",
"description": "Contract lacks provision for unforeseen circumstances preventing performance",
"pattern": "(force majeure|act of god|unforeseeable circumstances|beyond reasonable control).*?(shall excuse|excuses|releases from liability)|(neither party).*?(shall be liable|is responsible).*?(for any failure|for any delay).*?(caused by|due to|resulting from).*?(force majeure|act of god|natural disaster|war|terrorism|government action)",
"risk_level": "LOW",
"suggestion": "Add standard force majeure clause covering natural disasters, war, terrorism, government actions, and other uncontrollable events",
"reference": "Standard Force Majeure Principles and Civil Code Articles"
},
"governing_law": {
"title": "Missing Governing Law Clause",
"description": "Contract does not specify which jurisdiction's laws will govern interpretation",
"pattern": "(governing law|this agreement shall be governed by|interpreted in accordance with).*?the laws of.*?[A-Z][a-z]+.*?(state|province|country)|(this agreement).*?(shall be|is|will be).*?(subject to|governed by).*?the laws.*?of",
"risk_level": "LOW",
"suggestion": "Specify governing law (e.g., laws of the State of California, Peoples Republic of China) and optionally jurisdiction for disputes",
"reference": "Standard Choice of Law Principles"
},
"entire_agreement": {
"title": "Missing Entire Agreement Clause",
"description": "Contract lacks clause stating that the written document constitutes the complete agreement between parties",
"pattern": "(entire agreement|whole agreement|complete agreement).*?(this agreement|this document|this instrument).*?(constitutes|is|represents).*?(the|the full|the complete).*?(agreement|understanding|arrangement).*?(between|among).*?(the parties|party a and party b)|(supersedes|replaces|cancels).*?(all|any).*?(prior|previous|preceding).*?(agreements|understandings|arrangements|representations).*?(whether|whether oral|whether written)",
"risk_level": "LOW",
"suggestion": "Add entire agreement clause to prevent reliance on prior oral or written statements not included in the written contract",
"reference": "Standard Integration Clause Principles and Parol Evidence Rule"
}
}
with open('C:/Users/pc/.laosi/skills/contract-review-skill/legal_patterns/required_clauses.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print("JSON file written successfully.")
FILE:legal_patterns/high_risk.json
{
"unilateral_modification": {
"title": "Unilateral Modification Clause",
"description": "One party can modify contract terms without the other's consent",
"pattern": "(either party|either of the parties|[A-Z][a-z]+ [A-Z][a-z]+).*?(may|can|shall have the right to|is permitted to).*?(amend|modify|alter|change|vary|revise).*?(this agreement|the agreement|these terms).*?(without|without requiring|without obtaining|without needing).*?(consent|approval|agreement).*?(of the other party|from the other party)",
"risk_level": "HIGH",
"suggestion": "Require mutual agreement for any modifications to ensure fairness",
"reference": "Contract Law Principle of Mutuality and Good Faith"
},
"automatic_renewal": {
"title": "Automatic Renewal Without Notice",
"description": "Contract renews automatically without requiring advance notice for non-renewal",
"pattern": "(automatically|automatic).*?(renew|extend|continue).*?(unless|until).*?(notice|notification).*?(given|provided|delivered).*?(\\d+).*?(day|days|month|months).*?(prior|before|in advance)",
"risk_level": "HIGH",
"suggestion": "Add required notice period (typically 30-60 days) for non-renewal",
"reference": "Consumer Protection Laws and Contract Fairness Principles"
},
"excessive_liability": {
"title": "Excessive Liability Exposure",
"description": "Unlimited or excessive liability exposure without reasonable caps",
"pattern": "(liability|responsible|liable).*?(for|to).*?(all|any|every|any and all|direct|indirect|consequential|special|punitive|exemplary).*?(damages|losses|expenses|costs).*?(without limit|unlimited|uncapped|no cap)",
"risk_level": "HIGH",
"suggestion": "Add reasonable liability caps (e.g., fees paid, insurance coverage amounts)",
"reference": "Standard Contract Risk Allocation Practices"
},
"unilateral_termination": {
"title": "Unilateral Termination Rights",
"description": "One party can terminate without cause or with minimal notice",
"pattern": "(either party|either of the parties|[A-Z][a-z]+ [A-Z][a-z]+).*?(may|can|shall have the right to|is permitted to).*?(terminate|end|cancel).*?(this agreement|the agreement).*?(at any time|without cause|without reason).*?(with|upon).*?(\\d+).*?(day|days|week|weeks|month|months).*?(notice|notification)",
"risk_level": "MEDIUM",
"suggestion": "Balance termination rights with reasonable notice periods or mutual agreement requirements",
"reference": "Fair Contract Termination Principles"
},
"broad_indemnification": {
"title": "Overly Broad Indemnification",
"description": "One party required to indemnify for overly broad scope including their own negligence",
"pattern": "(indemnify|indemnification|hold harmless).*?(and|&).*?(defend).*?(against|for).*?(all|any|every).*?(claims|demands|actions|suits|proceedings).*?(arising out of|related to|in connection with).*?(negligence|willful misconduct|breach)",
"risk_level": "HIGH",
"suggestion": "Limit indemnification to third-party claims arising from indemnifying party's gross negligence or willful misconduct",
"reference": "Standard Indemnification Principles and Insurance Requirements"
},
"jurisdiction_shopping": {
"title": "Unfair Jurisdiction and Venue",
"description": "Requires litigation in inconvenient or unfavorable jurisdiction",
"pattern": "(governing law|jurisdiction|venue).*?(shall be|is|will be).*?(the courts of|the state|the province|the country).*?[A-Z][a-z]+.*?(exclusive|sole|only).*?(jurisdiction|venue|forum)",
"risk_level": "MEDIUM",
"suggestion": "Consider neutral jurisdiction or mutual agreement on dispute resolution forum",
"reference": "Forum Non Conveniens and Fair Forum Selection Principles"
}
}
FILE:legal_patterns/required_clauses.json
{
"confidentiality": {
"title": "Missing Confidentiality Clause",
"description": "Contract lacks explicit confidentiality obligations for protecting proprietary information",
"pattern": "(confidential|confidentiality|non-disclosure|nda).*?(obligation|duty|responsibility|requirement)|(proprietary|confidential).*?(information|data|knowledge).*?(shall be|must be|is required to be).*?(kept confidential|maintained in confidence)",
"risk_level": "MEDIUM",
"suggestion": "Add a standard confidentiality clause defining what information is confidential and obligations to protect it",
"reference": "Standard Business Practice and Trade Secret Protection Laws"
},
"intellectual_property": {
"title": "Missing Intellectual Property Clause",
"description": "Contract does not address ownership, licensing, or use of intellectual property",
"pattern": "(intellectual property|ip|copyright|patent|trademark|trade secret|proprietary).*?(ownership|title|interest|rights|license|usage).*?(retains|retain|remains with|belongs to)|(work product|deliverables|materials).*?(created|developed|produced).*?(shall be|is|are).*?(the exclusive property of|owned by)",
"risk_level": "MEDIUM",
"suggestion": "Add IP clause specifying ownership of pre-existing IP, ownership of work product, and license grants",
"reference": "Copyright Law and Patent Act Principles"
},
"payment_terms": {
"title": "Unclear Payment Terms",
"description": "Payment amount, schedule, or method is not clearly defined",
"pattern": "(payment|pay|fee|charge|compensation|remuneration).*?(amount|sum|fee|rate).*?(shall be|is|will be|equals|totals).*?\\$?\\d+(\\.\\d+)?(?:\\s*(USD|dollars|RMB|yuan|EUR|euros|GBP|pounds))?|(invoice|bill).*?(submit|send|issue).*?(monthly|quarterly|upon completion|net \\d+)",
"risk_level": "MEDIUM",
"suggestion": "Specify exact payment amounts, schedule (e.g., monthly, milestone-based), method, and late payment penalties",
"reference": "Standard Contract Payment Practices"
},
"term_and_termination": {
"title": "Unclear Term and Termination",
"description": "Contract duration, renewal terms, or termination conditions are not clearly specified",
"pattern": "(term|duration).*?(shall be|is|will be|equals).*?\\d+.*?(day|days|week|weeks|month|months|year|years)|(termination|terminate|end|cancel).*?(for cause|without cause|with notice).*?(notice|notification).*?\\d+.*?(day|days|week|weeks|month|months)",
"risk_level": "MEDIUM",
"suggestion": "Clearly define initial term, renewal options, termination for cause and without cause procedures, and required notice periods",
"reference": "Standard Contract Duration and Termination Practices"
},
"dispute_resolution": {
"title": "Missing Dispute Resolution Clause",
"description": "Contract does not specify how disputes will be resolved (negotiation, mediation, arbitration, litigation)",
"pattern": "(dispute|disagreement|controversy|claim).*?(resolution|settled|resolved).*?(through|via|by means of).*?(negotiation|mediation|arbitration|litigation|court)|(arbitration).*?(shall be|is|will be).*?(final|binding)|(governing law).*?(shall be|is|will be).*?[A-Z][a-z]+",
"risk_level": "MEDIUM",
"suggestion": "Add dispute resolution clause specifying negotiation first, then mediation, then binding arbitration or litigation with specified jurisdiction",
"reference": "Standard Dispute Resolution Practices and Arbitration Law"
},
"force_majeure": {
"title": "Missing Force Majeure Clause",
"description": "Contract lacks provision for unforeseen circumstances preventing performance",
"pattern": "(force majeure|act of god|unforeseeable circumstances|beyond reasonable control).*?(shall excuse|excuses|releases from liability)|(neither party).*?(shall be liable|is responsible).*?(for any failure|for any delay).*?(caused by|due to|resulting from).*?(force majeure|act of god|natural disaster|war|terrorism|government action)",
"risk_level": "LOW",
"suggestion": "Add standard force majeure clause covering natural disasters, war, terrorism, government actions, and other uncontrollable events",
"reference": "Standard Force Majeure Principles and Civil Code Articles"
},
"governing_law": {
"title": "Missing Governing Law Clause",
"description": "Contract does not specify which jurisdiction's laws will govern interpretation",
"pattern": "(governing law|this agreement shall be governed by|interpreted in accordance with).*?the laws of.*?[A-Z][a-z]+.*?(state|province|country)|(this agreement).*?(shall be|is|will be).*?(subject to|governed by).*?the laws.*?of",
"risk_level": "LOW",
"suggestion": "Specify governing law (e.g., laws of the State of California, Peoples Republic of China) and optionally jurisdiction for disputes",
"reference": "Standard Choice of Law Principles"
},
"entire_agreement": {
"title": "Missing Entire Agreement Clause",
"description": "Contract lacks clause stating that the written document constitutes the complete agreement between parties",
"pattern": "(entire agreement|whole agreement|complete agreement).*?(this agreement|this document|this instrument).*?(constitutes|is|represents).*?(the|the full|the complete).*?(agreement|understanding|arrangement).*?(between|among).*?(the parties|party a and party b)|(supersedes|replaces|cancels).*?(all|any).*?(prior|previous|preceding).*?(agreements|understandings|arrangements|representations).*?(whether|whether oral|whether written)",
"risk_level": "LOW",
"suggestion": "Add entire agreement clause to prevent reliance on prior oral or written statements not included in the written contract",
"reference": "Standard Integration Clause Principles and Parol Evidence Rule"
}
}
FILE:legal_patterns/required_clauses_new.json
{
"confidentiality": {
"title": "Missing Confidentiality Clause",
"description": "Contract lacks explicit confidentiality obligations for protecting proprietary information",
"pattern": "(confidential|confidentiality|non-disclosure|nda).*?(obligation|duty|responsibility|requirement)|(proprietary|confidential).*?(information|data|knowledge).*?(shall be|must be|is required to be).*?(kept confidential|maintained in confidence)",
"risk_level": "MEDIUM",
"suggestion": "Add a standard confidentiality clause defining what information is confidential and obligations to protect it",
"reference": "Standard Business Practice and Trade Secret Protection Laws"
},
"intellectual_property": {
"title": "Missing Intellectual Property Clause",
"description": "Contract does not address ownership, licensing, or use of intellectual property",
"pattern": "(intellectual property|ip|copyright|patent|trademark|trade secret|proprietary).*?(ownership|title|interest|rights|license|usage).*?(retains|retain|remains with|belongs to)|(work product|deliverables|materials).*?(created|developed|produced).*?(shall be|is|are).*?(the exclusive property of|owned by)",
"risk_level": "MEDIUM",
"suggestion": "Add IP clause specifying ownership of pre-existing IP, ownership of work product, and license grants",
"reference": "Copyright Law and Patent Act Principles"
},
"payment_terms": {
"title": "Unclear Payment Terms",
"description": "Payment amount, schedule, or method is not clearly defined",
"pattern": "(payment|pay|fee|charge|compensation|remuneration).*?(amount|sum|fee|rate).*?(shall be|is|will be|equals|totals).*?\\$?\\d+(\\.\\d+)?(?:\s*(USD|dollars|RMB|yuan|EUR|euros|GBP|pounds))?|(invoice|bill).*?(submit|send|issue).*?(monthly|quarterly|upon completion|net \\d+)",
"risk_level": "MEDIUM",
"suggestion": "Specify exact payment amounts, schedule (e.g., monthly, milestone-based), method, and late payment penalties",
"reference": "Standard Contract Payment Practices"
},
"term_and_termination": {
"title": "Unclear Term and Termination",
"description": "Contract duration, renewal terms, or termination conditions are not clearly specified",
"pattern": "(term|duration).*?(shall be|is|will be|equals).*?\\d+.*?(day|days|week|weeks|month|months|year|years)|(termination|terminate|end|cancel).*?(for cause|without cause|with notice).*?(notice|notification).*?\\d+.*?(day|days|week|weeks|month|months)",
"risk_level": "MEDIUM",
"suggestion": "Clearly define initial term, renewal options, termination for cause and without cause procedures, and required notice periods",
"reference": "Standard Contract Duration and Termination Practices"
},
"dispute_resolution": {
"title": "Missing Dispute Resolution Clause",
"description": "Contract does not specify how disputes will be resolved (negotiation, mediation, arbitration, litigation)",
"pattern": "(dispute|disagreement|controversy|claim).*?(resolution|settled|resolved).*?(through|via|by means of).*?(negotiation|mediation|arbitration|litigation|court)|(arbitration).*?(shall be|is|will be).*?(final|binding)|(governing law).*?(shall be|is|will be).*?[A-Z][a-z]+",
"risk_level": "MEDIUM",
"suggestion": "Add dispute resolution clause specifying negotiation first, then mediation, then binding arbitration or litigation with specified jurisdiction",
"reference": "Standard Dispute Resolution Practices and Arbitration Law"
},
"force_majeure": {
"title": "Missing Force Majeure Clause",
"description": "Contract lacks provision for unforeseen circumstances preventing performance",
"pattern": "(force majeure|act of god|unforeseeable circumstances|beyond reasonable control).*?(shall excuse|excuses|releases from liability)|(neither party).*?(shall be liable|is responsible).*?(for any failure|for any delay).*?(caused by|due to|resulting from).*?(force majeure|act of god|natural disaster|war|terrorism|government action)",
"risk_level": "LOW",
"suggestion": "Add standard force majeure clause covering natural disasters, war, terrorism, government actions, and other uncontrollable events",
"reference": "Standard Force Majeure Principles and Civil Code Articles"
},
"governing_law": {
"title": "Missing Governing Law Clause",
"description": "Contract does not specify which jurisdiction's laws will govern interpretation",
"pattern": "(governing law|this agreement shall be governed by|interpreted in accordance with).*?the laws of.*?[A-Z][a-z]+.*?(state|province|country)|(this agreement).*?(shall be|is|will be).*?(subject to|governed by).*?the laws.*?of",
"risk_level": "LOW",
"suggestion": "Specify governing law (e.g., laws of the State of California, Peoples Republic of China) and optionally jurisdiction for disputes",
"reference": "Standard Choice of Law Principles"
},
"entire_agreement": {
"title": "Missing Entire Agreement Clause",
"description": "Contract lacks clause stating that the written document constitutes the complete agreement between parties",
"pattern": "(entire agreement|whole agreement|complete agreement).*?(this agreement|this document|this instrument).*?(constitutes|is|represents).*?(the|the full|the complete).*?(agreement|understanding|arrangement).*?(between|among).*?(the parties|party a and party b)|(supersedes|replaces|cancels).*?(all|any).*?(prior|previous|preceding).*?(agreements|understandings|arrangements|representations).*?(whether|whether oral|whether written)",
"risk_level": "LOW",
"suggestion": "Add entire agreement clause to prevent reliance on prior oral or written statements not included in the written contract",
"reference": "Standard Integration Clause Principles and Parol Evidence Rule"
}
}
FILE:README.md
# Contract Review Skill
An OpenClaw skill for AI-powered contract analysis and review. Identifies risky clauses, missing provisions, and compliance issues in legal documents.
## Features
- 📄 **Contract Analysis**: Parses and analyzes various contract formats (text, markdown, PDF via external tools)
- ⚠️ **Risk Clause Detection**: Identifies high-risk clauses such as unilateral modification, automatic renewal, excessive liability, etc.
- ✅ **Required Clause Checking**: Verifies presence of essential clauses like confidentiality, IP, termination, dispute resolution
- 🏢 **Industry-Specific Rules**: Customizable checks for different industries (tech, healthcare, finance, construction, etc.)
- 📊 **Risk Scoring**: Provides overall contract risk score with detailed breakdown
- 📝 **Remediation Suggestions**: Offers specific language improvements and negotiation points
- 📋 **Report Generation**: Creates structured reports in JSON and human-readable formats
## Installation
```bash
clawhub install contract-review-skill
```
## Usage
### Basic Contract Review
```bash
# Review a contract text file
contract-review-skill ./contract.txt
# Review a markdown contract
contract-review-skill ./agreement.md
```
### With Custom Options
```bash
# Specify industry focus
contract-review-skill ./contract.txt --industry tech
# Output JSON format for integration
contract-review-skill ./contract.txt --format json
```
## Output Example
```
============================================================
CONTRACT REVIEW REPORT
============================================================
Contract: service_agreement.txt
Overall Risk Score: 68/100 (MEDIUM-HIGH)
Risk Distribution:
- High Risk: 3 clauses
- Medium Risk: 5 clauses
- Low Risk: 8 clauses
- Compliant: 12 clauses
Top Risk Findings:
[HIGH] Unilateral Modification Clause
Location: Section 8.2 (Page 4)
Issue: Party A may modify terms without Party B's consent
Suggestion: Require mutual agreement for any modifications
Reference: Contract Law Principle of Mutuality
[HIGH] Automatic Renewal without Notice
Location: Section 12.1 (Page 6)
Issue: Contract renews annually without prior notice requirement
Suggestion: Add 30-60 day notice period for non-renewal
Reference: Consumer Protection Regulations
[MEDIUM] Missing Confidentiality Clause
Location: Not found in contract
Issue: No explicit confidentiality obligations defined
Suggestion: Add standard NDA clause covering proprietary information
Reference: Standard Business Practice
Recommendations:
1. Address all HIGH risk items before signing
2. Consider negotiating MEDIUM risk clauses based on leverage
3. Have legal counsel review final revised version
============================================================
```
## Configuration
The skill can be customized by:
1. **Modifying Pattern Files**: Update JSON files in `legal_patterns/` directory
2. **Adding Industry Templates**: Create new industry-specific rule sets
3. **Adjusting Risk Weights**: Modify scoring algorithm in `review_engine.py`
## Requirements
- Python 3.7+
- No external dependencies for core functionality (uses standard library)
- Optional: `pypdf` or `pdfplumber` for PDF contract support (install via pip if needed)
## Security Notes
- This skill processes contract text locally - no data leaves your machine
- For highly sensitive contracts, consider running in air-gapped environment
- The skill does not provide legal advice - consult qualified attorney for binding decisions
## License
MIT
## Author
laosi (did:soul:laosi)
FILE:review_engine.py
#!/usr/bin/env python3
"""
Contract Review Engine - Core logic for analyzing contracts and identifying risks, missing clauses, and compliance issues.
"""
import json
import os
import re
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class ReviewFinding:
id: str
title: str
description: str
risk_level: RiskLevel
category: str # e.g., "Risk Clause", "Missing Clause", "Compliance"
location: str # e.g., "Section 5.2", "Line 45"
suggestion: str
reference: Optional[str] = None # e.g., "Civil Code Article 496"
class ContractReviewEngine:
def __init__(self, patterns_dir: Path):
self.patterns_dir = patterns_dir
self.high_risk_patterns = self._load_patterns("high_risk.json")
self.required_clauses = self._load_patterns("required_clauses.json")
self.industry_patterns = {} # Can be extended for industry-specific rules
def _load_patterns(self, filename: str) -> Dict[str, Any]:
"""Load pattern JSON file from patterns directory."""
file_path = self.patterns_dir / filename
if file_path.exists():
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def review_contract(self, contract_text: str, industry: str = None) -> Dict[str, Any]:
"""
Main review function.
Returns a dictionary with review results.
"""
findings = []
# 1. Check for high-risk clauses
findings.extend(self._check_high_risk_clauses(contract_text))
# 2. Check for missing required clauses
findings.extend(self._check_required_clauses(contract_text))
# 3. Industry-specific checks (if industry specified)
if industry:
findings.extend(self._check_industry_specific(contract_text, industry))
# 4. Calculate overall risk score
risk_score = self._calculate_risk_score(findings)
# 5. Generate summary
summary = self._generate_summary(findings)
findings_list = []
for f in findings:
f_dict = asdict(f)
# Convert Enum to string for JSON serialization
if isinstance(f_dict['risk_level'], RiskLevel):
f_dict['risk_level'] = f_dict['risk_level'].value
findings_list.append(f_dict)
return {
"findings": findings_list,
"risk_score": risk_score,
"summary": summary,
"metadata": {
"total_clauses_checked": len(self.high_risk_patterns) + len(self.required_clauses),
"industry": industry or "general",
"engine_version": "1.0.0"
}
}
def _check_high_risk_clauses(self, text: str) -> List[ReviewFinding]:
"""Scan for high-risk clause patterns."""
findings = []
for pattern_id, pattern_data in self.high_risk_patterns.items():
pattern = pattern_data.get("pattern", "")
if not pattern:
continue
# Simple regex search - in production, might use more sophisticated NLP
matches = list(re.finditer(pattern, text, re.IGNORECASE))
for match in matches:
# Extract context around match
start = max(0, match.start() - 50)
end = min(len(text), match.end() + 50)
context = text[start:end]
# Approximate location (line number)
line_number = text[:match.start()].count('\n') + 1
findings.append(ReviewFinding(
id=f"HR-{pattern_id}-{len(findings)+1:03d}",
title=pattern_data.get("title", "High Risk Clause Detected"),
description=pattern_data.get("description", "Potentially problematic clause detected"),
risk_level=RiskLevel[pattern_data.get("risk_level", "HIGH").upper()],
category="Risk Clause",
location=f"Line {line_number}",
suggestion=pattern_data.get("suggestion", "Review this clause with legal counsel"),
reference=pattern_data.get("reference")
))
return findings
def _check_required_clauses(self, text: str) -> List[ReviewFinding]:
"""Check for presence of required clauses."""
findings = []
for clause_id, clause_data in self.required_clauses.items():
pattern = clause_data.get("pattern", "")
if not pattern:
continue
# Check if pattern exists in text
if not re.search(pattern, text, re.IGNORECASE):
findings.append(ReviewFinding(
id=f"RC-{clause_id}-{len(findings)+1:03d}",
title=clause_data.get("title", "Missing Required Clause"),
description=clause_data.get("description", "Important clause may be missing"),
risk_level=RiskLevel[clause_data.get("risk_level", "MEDIUM").upper()],
category="Missing Clause",
location="Not found in contract",
suggestion=clause_data.get("suggestion", "Consider adding this clause"),
reference=clause_data.get("reference")
))
return findings
def _check_industry_specific(self, text: str, industry: str) -> List[ReviewFinding]:
"""Apply industry-specific rules."""
# This would be extended with industry-specific pattern files
# For now, return empty list as placeholder
return []
def _calculate_risk_score(self, findings: List[ReviewFinding]) -> int:
"""Calculate overall risk score (0-100, lower is better)."""
if not findings:
return 10 # Low risk if no issues found
# Weight by risk level
weights = {
RiskLevel.CRITICAL: 25,
RiskLevel.HIGH: 15,
RiskLevel.MEDIUM: 8,
RiskLevel.LOW: 3
}
total_penalty = sum(weights[f.risk_level] for f in findings)
# Cap the penalty at 90 to keep minimum score at 10
penalty = min(total_penalty, 90)
return 10 + penalty # Score range 10-100
def _generate_summary(self, findings: List[ReviewFinding]) -> Dict[str, Any]:
"""Generate summary statistics."""
counts = {
"critical": len([f for f in findings if f.risk_level == RiskLevel.CRITICAL]),
"high": len([f for f in findings if f.risk_level == RiskLevel.HIGH]),
"medium": len([f for f in findings if f.risk_level == RiskLevel.MEDIUM]),
"low": len([f for f in findings if f.risk_level == RiskLevel.LOW])
}
categories = {}
for f in findings:
cat = f.category
categories[cat] = categories.get(cat, 0) + 1
return {
"by_risk_level": counts,
"by_category": categories,
"total_findings": len(findings)
}
def main():
"""Simple test function."""
# This would be replaced by the CLI wrapper
sample_contract = """
SERVICE AGREEMENT
This Agreement is made between Party A and Party B.
Section 1: Services
Party A shall provide services as described in Exhibit A.
Section 2: Payment
Party B shall pay Party A $1000 per month.
Section 8: Modification
Either party may amend this agreement at any time by providing written notice to the other party.
Section 12: Term and Termination
This agreement shall remain in effect until terminated by either party.
"""
engine = ContractReviewEngine(Path("legal_patterns"))
result = engine.review_contract(sample_contract)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
FILE:test_simple.py
import json
import re
from pathlib import Path
# Test loading the JSON
patterns_path = Path("C:/Users/pc/.laosi/skills/contract-review-skill/legal_patterns/required_clauses.json")
print(f"Loading from: {patterns_path}")
print(f"File exists: {patterns_path.exists()}")
try:
with open(patterns_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print("JSON loaded successfully")
print(f"Keys: {list(data.keys())}")
# Test one pattern
if "confidentiality" in data:
pattern = data["confidentiality"]["pattern"]
print(f"Confidentiality pattern: {pattern}")
# Test if it's a valid regex
try:
compiled = re.compile(pattern, re.IGNORECASE)
print("Pattern compiled successfully")
except Exception as e:
print(f"Pattern compilation failed: {e}")
except Exception as e:
print(f"Error loading JSON: {e}")
import traceback
traceback.print_exc()Performs comprehensive security audits on MCP servers including vulnerability scans, malware detection, compliance checks, and detailed remediation reports.
name: laosi-mcp-security-audit
version: 1.0.0
description: Enterprise-grade MCP server security audit skill for OpenClaw agents - performs comprehensive vulnerability scanning, malware detection, and compliance checking on MCP servers and skills with detailed reporting and remediation guidance
author: laosi
homepage: https://github.com/laosi/mcp-security-audit-skill
tags: [security, mcp, audit, enterprise, compliance, vulnerability-scanning, malware-detection]
FILE:audit.py
#!/usr/bin/env python3
"""
MCP Security Auditor - Enterprise-grade security scanning for OpenClaw MCP servers
Detects vulnerabilities, malware patterns, and compliance issues in MCP configurations
"""
import json
import os
import re
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
class Severity(Enum):
INFO = "info"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class SecurityFinding:
id: str
title: str
description: str
severity: Severity
category: str
location: str
remediation: str
cve_id: Optional[str] = None
class MCPSecurityAuditor:
def __init__(self, mcp_path: str):
self.mcp_path = Path(mcp_path)
self.findings: List[SecurityFinding] = []
# Security patterns to check
self.malware_patterns = [
r'eval\s*\(',
r'exec\s*\(',
r'__import__\s*\(',
r'subprocess\.call\s*\([^)]*shell\s*=\s*True',
r'os\.system\s*\(',
r'pickle\.loads\s*\(',
r'marshal\.loads\s*\(',
r'shelve\.open\s*\(',
r'yaml\.load\s*\([^)]*Loader\s*=\s*yaml\.Loader',
r'jwt\.decode\s*\([^)]*verify\s*=\s*False',
]
self.vulnerability_patterns = [
(r'password\s*=\s*["\'][^"\']*["\']', "Hardcoded password detected"),
(r'api_key\s*=\s*["\'][^"\']*["\']', "Hardcoded API key detected"),
(r'secret\s*=\s*["\'][^"\']*["\']', "Hardcoded secret detected"),
(r'token\s*=\s*["\'][^"\']*["\']', "Hardcoded token detected"),
(r'bind\s*[:\s]*0\.0\.0\.0', "Binding to all interfaces (0.0.0.0)"),
(r'bind\s*[:\s]*::', "Binding to all IPv6 interfaces"),
(r'--disable-web-security', "Web security disabled flag"),
(r'--allow-running-insecure-content', "Insecure content allowed"),
]
self.compliance_patterns = [
(r'logging\.basicConfig\s*\([^)]*level\s*=\s*logging\.DEBUG', "Debug logging may leak sensitive info"),
(r'print\s*\([^)]*password', "Password may be logged to console"),
(r'print\s*\([^)]*token', "Token may be logged to console"),
(r'print\s*\([^)]*key', "Key may be logged to console"),
]
def audit_file(self, file_path: Path) -> List[SecurityFinding]:
"""Audit a single file for security issues"""
findings = []
try:
content = file_path.read_text(encoding='utf-8')
lines = content.split('\n')
# Check for malware patterns
for pattern in self.malware_patterns:
for i, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
findings.append(SecurityFinding(
id=f"MAL-{len(findings)+1:03d}",
title="Potential Malware Pattern Detected",
description=f"Suspicious code pattern found: {pattern}",
severity=Severity.HIGH,
category="Malware",
location=f"{file_path.name}:{i}",
remediation="Review this code carefully - it may contain malicious functionality",
cve_id=None
))
# Check for vulnerabilities
for pattern, description in self.vulnerability_patterns:
for i, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
severity = Severity.HIGH if any(x in pattern.lower() for x in ['password', 'api_key', 'secret', 'token']) else Severity.MEDIUM
findings.append(SecurityFinding(
id=f"VULN-{len(findings)+1:03d}",
title="Security Vulnerability Detected",
description=description,
severity=severity,
category="Vulnerability",
location=f"{file_path.name}:{i}",
remediation="Remove hardcoded credentials or use secure vault/environment variables",
cve_id=None
))
# Check for compliance issues
for pattern, description in self.compliance_patterns:
for i, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
findings.append(SecurityFinding(
id=f"COMP-{len(findings)+1:03d}",
title="Compliance Issue Detected",
description=description,
severity=Severity.LOW,
category="Compliance",
location=f"{file_path.name}:{i}",
remediation="Review logging and output to prevent sensitive data exposure",
cve_id=None
))
except Exception as e:
findings.append(SecurityFinding(
id=f"ERR-{len(findings)+1:03d}",
title="File Read Error",
description=f"Could not read file: {str(e)}",
severity=Severity.MEDIUM,
category="Error",
location=str(file_path),
remediation="Check file permissions and encoding",
cve_id=None
))
return findings
def audit_directory(self) -> Dict[str, Any]:
"""Audit entire MCP directory"""
if not self.mcp_path.exists():
return {
"error": f"MCP path does not exist: {self.mcp_path}",
"findings": [],
"score": 0,
"grade": "F"
}
# Scan all relevant files
extensions = ['.py', '.js', '.ts', '.json', '.yaml', '.yml', '.txt', '.md', '.env', '.config', '.conf']
files_to_scan = []
for ext in extensions:
files_to_scan.extend(self.mcp_path.rglob(f"*{ext}"))
# Also check for common config files
common_files = ['.env', '.env.local', '.env.production', 'docker-compose.yml', 'Dockerfile']
for cf in common_files:
files_to_scan.extend(self.mcp_path.rglob(cf))
# Remove duplicates and audit
unique_files = list(set(files_to_scan))
all_findings = []
for file_path in unique_files:
if file_path.is_file():
findings = self.audit_file(file_path)
all_findings.extend(findings)
self.findings = all_findings
return self.generate_report()
def generate_report(self) -> Dict[str, Any]:
"""Generate security audit report"""
if not self.findings:
return {
"score": 100,
"grade": "A+",
"findings": [],
"summary": {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"info": 0
},
"recommendations": ["No security issues found - excellent work!"]
}
# Count by severity
severity_counts = {
"critical": len([f for f in self.findings if f.severity == Severity.CRITICAL]),
"high": len([f for f in self.findings if f.severity == Severity.HIGH]),
"medium": len([f for f in self.findings if f.severity == Severity.MEDIUM]),
"low": len([f for f in self.findings if f.severity == Severity.LOW]),
"info": len([f for f in self.findings if f.severity == Severity.INFO])
}
# Calculate score (0-100)
penalty = (
severity_counts["critical"] * 25 +
severity_counts["high"] * 15 +
severity_counts["medium"] * 8 +
severity_counts["low"] * 3 +
severity_counts["info"] * 1
)
score = max(0, 100 - penalty)
# Determine grade
if score >= 95:
grade = "A+"
elif score >= 90:
grade = "A"
elif score >= 80:
grade = "B"
elif score >= 70:
grade = "C"
elif score >= 60:
grade = "D"
else:
grade = "F"
# Generate recommendations
recommendations = []
if severity_counts["critical"] > 0:
recommendations.append("[CRITICAL] Address critical security findings immediately")
if severity_counts["high"] > 0:
recommendations.append("[HIGH] Fix high severity vulnerabilities soon")
if any(f.category == "Malware" for f in self.findings):
recommendations.append("[MALWARE] Potential malicious code detected - investigate immediately")
if any("password" in f.description.lower() or "api_key" in f.description.lower() for f in self.findings):
recommendations.append("[CREDENTIALS] Remove hardcoded credentials, use environment variables or vault")
if any("0.0.0.0" in f.description or "::" in f.description for f in self.findings):
recommendations.append("[NETWORK] Restrict binding to specific interfaces only")
if not recommendations:
recommendations.append("[OK] Review low/medium severity findings for improvement")
# Convert findings to JSON-serializable format
findings_serializable = []
for f in self.findings:
f_dict = asdict(f)
# Convert enum to string
f_dict['severity'] = f.severity.value
findings_serializable.append(f_dict)
return {
"score": score,
"grade": grade,
"findings": findings_serializable,
"summary": severity_counts,
"recommendations": recommendations,
"metadata": {
"auditor": "MCP Security Auditor v1.0.0",
"timestamp": str(Path().cwd()),
"files_scanned": len(list(self.mcp_path.rglob("*"))) if self.mcp_path.exists() else 0
}
}
def main():
if len(sys.argv) < 2:
print("Usage: python audit.py <mcp_directory_path>")
print("Example: python audit.py ./mcp_server")
sys.exit(1)
mcp_path = sys.argv[1]
auditor = MCPSecurityAuditor(mcp_path)
report = auditor.audit_directory()
# Print formatted report
print("=" * 60)
print("MCP SECURITY AUDIT REPORT")
print("=" * 60)
print(f"Path: {mcp_path}")
print(f"Score: {report['score']}/100")
print(f"Grade: {report['grade']}")
print("-" * 60)
print("Summary:")
print(f" Critical: {report['summary']['critical']}")
print(f" High: {report['summary']['high']}")
print(f" Medium: {report['summary']['medium']}")
print(f" Low: {report['summary']['low']}")
print(f" Info: {report['summary']['info']}")
print("-" * 60)
print("Recommendations:")
for rec in report['recommendations']:
print(f" {rec}")
print("-" * 60)
if report['findings']:
print("Detailed Findings:")
for finding in report['findings'][:10]: # Show first 10
severity = finding['severity']
if hasattr(severity, 'value'):
severity_str = severity.value.upper()
else:
severity_str = str(severity).upper()
print(f" [{severity_str}] {finding['title']}")
print(f" {finding['description']}")
print(f" Location: {finding['location']}")
print(f" Fix: {finding['remediation']}")
print()
if len(report['findings']) > 10:
print(f" ... and {len(report['findings']) - 10} more findings")
else:
print("✅ No security issues found!")
print("=" * 60)
# Return appropriate exit code
if report['score'] < 70:
sys.exit(1) # Fail if score too low
elif report['score'] < 90:
sys.exit(0) # Warn but don't fail
else:
sys.exit(0) # Success
if __name__ == "__main__":
main()
FILE:claw.json
{
"name": "laosi-mcp-security-audit",
"displayName": "LAOSI MCP Security Audit",
"description": "Enterprise-grade MCP server security audit for OpenClaw agents - scans for vulnerabilities, malware, and compliance issues",
"version": "1.0.0",
"author": "laosi",
"homepage": "https://github.com/laosi/mcp-security-audit-skill",
"license": "MIT",
"keywords": ["security", "mcp", "audit", "enterprise", "compliance", "vulnerability", "malware"],
"category": "security",
"engines": {
"openclaw": ">=1.0.0"
},
"scripts": {
"audit": "python mcp_security_audit.py"
},
"files": [
"SKILL.md",
"README.md",
"claw.json",
"audit.py",
"mcp_security_audit.py"
]
}
FILE:mcp_security_audit.py
#!/usr/bin/env python3
"""
MCP Security Audit Skill - CLI Wrapper for OpenClaw Skill Chain
Provides easy-to-use interface for the security auditing functionality
"""
import json
import sys
import os
from pathlib import Path
def main():
if len(sys.argv) < 2:
print('{"error": "Usage: mcp-security-audit <path_to_audit>"}')
sys.exit(1)
target_path = sys.argv[1]
# Import and run the audit
sys.path.insert(0, str(Path(__file__).parent))
from audit import MCPSecurityAuditor
auditor = MCPSecurityAuditor(target_path)
report = auditor.audit_directory()
# Output as JSON for skill chain consumption
print(json.dumps(report, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
FILE:README.md
# MCP Security Audit Skill
An OpenClaw skill for performing enterprise-grade security audits on MCP (Model Context Protocol) servers and skills.
## Features
- 🔍 **Vulnerability Scanning**: Detects hardcoded credentials, insecure bindings, and common vulnerabilities
- 🛡️ **Malware Detection**: Identifies suspicious patterns like eval/exec, shell injection, and potential backdoors
- 📋 **Compliance Checking**: Flags logging and output issues that could lead to data exposure
- 📊 **Security Scoring**: Provides a 0-100 score with letter grade (A+ to F)
- 📝 **Detailed Reports**: Line-by-line findings with remediation guidance
- 🚨 **Severity Levels**: Critical, High, Medium, Low, Info classifications
- 🎯 **Actionable Recommendations**: Prioritized fixes based on risk level
## Installation
```bash
clawhub install mcp-security-audit
```
## Usage
### Basic Audit
```bash
# Audit an MCP server directory
mcp-security-audit ./mcp_server
# Audit a skill directory
mcp-security-audit ./my-skill
```
### With Custom Path
```bash
python audit.py /path/to/mcp/server
```
## Output Example
```
============================================================
MCP SECURITY AUDIT REPORT
============================================================
Path: ./mcp_server
Score: 85/100
Grade: B
------------------------------------------------------------
Summary:
Critical: 0
High: 2
Medium: 5
Low: 3
Info: 0
------------------------------------------------------------
Recommendations:
⚠️ HIGH: Fix high severity vulnerabilities soon
🔑 CREDENTIALS: Remove hardcoded credentials, use environment variables or vault
🌐 NETWORK: Restrict binding to specific interfaces only
------------------------------------------------------------
Detailed Findings:
[HIGH] Hardcoded API key detected
Hardcoded API key detected
Location: config.json:12
Fix: Remove hardcoded credentials or use secure vault/environment variables
[HIGH] Binding to all interfaces (0.0.0.0)
Binding to all interfaces (0.0.0.0)
Location: server.py:45
Fix: Restrict binding to specific interfaces only
[MEDIUM] Debug logging may leak sensitive info
Debug logging may leak sensitive info
Location: main.py:8
Fix: Review logging and output to prevent sensitive data exposure
...
============================================================
```
## Configuration
The skill can be customized by modifying the patterns in `audit.py`:
- `malware_patterns`: Regex patterns for detecting malicious code
- `vulnerability_patterns`: Patterns for security vulnerabilities (credentials, bindings, etc.)
- `compliance_patterns`: Patterns for compliance and data exposure issues
## Requirements
- Python 3.7+
- No external dependencies (uses only standard library)
## Security Notes
- This skill is designed to be run in trusted environments
- Always review findings carefully before making changes
- Consider using in conjunction with other security tools (VirusTotal, Snyk, etc.)
- For enterprise use, integrate with your CI/CD pipeline for continuous security monitoring
## License
MIT
## Author
laosi (did:soul:laosi)