@clawhub-sky-lv-f1d83a9aa1
Docker助手。容器管理、镜像构建、Dockerfile优化。使用场景:(1) 容器操作,(2) Dockerfile编写,(3) 镜像构建,(4) 网络配置。
--- name: "docker-helper" slug: skylv-docker-helper version: 1.0.2 description: Docker assistant. Container management, image building, and Dockerfile optimization. Triggers: docker, container, dockerfile, docker build. author: SKY-lv license: MIT-0 tags: [docker, openclaw, agent] keywords: docker, container, devops, deployment triggers: docker helper --- # Docker Helper — Docker助手 ## 功能说明 辅助Docker容器管理和配置。 ## 使用方法 ### 1. 容器操作 ``` 用户: 列出所有运行中的容器 ``` ### 2. Dockerfile ``` 用户: 写一个Node.js的Dockerfile ``` ### 3. 镜像构建 ``` 用户: 构建镜像并推送到registry ``` ### 4. 网络配置 ``` 用户: 配置容器网络 ``` ## Usage 1. Install the skill 2. Configure as needed 3. Run with OpenClaw
Agent持续学习助手。设计增量训练、反馈学习、自我优化。使用场景:(1) 增量训练方案,(2) 反馈收集机制,(3) 模型更新策略,(4) 效果持续监控。
--- name: "continuous-learning" slug: skylv-continuous-learning version: 1.0.0 description: "Agent持续学习助手。设计增量训练、反馈学习、自我优化。使用场景:(1) 增量训练方案,(2) 反馈收集机制,(3) 模型更新策略,(4) 效果持续监控。" author: SKY-lv license: MIT-0 tags: [continuous, openclaw, agent] --- # Continuous Learning — 持续学习助手 ## 功能说明 帮助Agent实现持续学习和自我优化。 ## 使用方法 ### 1. 增量训练 ``` 用户: 如何让Agent从交互中学习? ``` ### 2. 反馈机制 ``` 用户: 设计用户反馈收集机制 ``` ### 3. 模型更新 ``` 用户: 什么时候更新Agent模型? ``` ### 4. 效果监控 ``` 用户: 如何监控Agent效果变化? ```
Code Review AI Assistant. Automated code review, find potential issues and improvement suggestions. Triggers: code review, review code, code quality, code an...
---
name: code-reviewer
description: Code review assistant. Analyzes code quality, detects bugs and security issues, and suggests improvements. Triggers: code review, security audit, code quality, bug detection.
metadata: {"openclaw": {"emoji": "🔍"}}
---
# Code Reviewer — 代码审查助手
## 功能说明
对代码进行全面审查,发现问题和优化机会。
## 使用方法
### 1. 代码质量检查
```
用户: 审查这段代码的质量:
[粘贴代码]
```
审查维度:
- 代码结构
- 命名规范
- 注释完整性
- 函数复杂度
- 重复代码
### 2. 安全漏洞检查
```
用户: 检查这段代码是否有安全问题
```
检查项:
- SQL注入风险
- XSS漏洞
- 敏感信息硬编码
- 不安全的依赖
- 权限控制缺失
### 3. 性能优化建议
```
用户: 这段代码有什么性能问题?如何优化?
```
分析项:
- 时间复杂度
- 空间复杂度
- 不必要的循环
- 数据结构选择
- 缓存机会
### 4. 代码重构建议
```
用户: 这段代码如何重构使其更清晰?
```
重构方向:
- 提取函数
- 消除重复
- 简化条件
- 改善命名
- 添加类型注解
## 示例输出
```
代码审查报告
【质量问题】
1. 函数 `processData` 过长(85行),建议拆分
- 提取数据验证逻辑
- 提取数据转换逻辑
2. 变量命名不清晰
- `temp` → 建议改为 `processedResult`
- `flag` → 建议改为 `isValidUser`
【安全问题】
⚠️ 高危:SQL查询使用字符串拼接
第23行: `query = "SELECT * FROM users WHERE id=" + userId`
建议:使用参数化查询
【性能问题】
1. O(n²) 嵌套循环可优化为 O(n)
第45-52行:使用 Map 替代数组查找
2. 重复计算
第67行:`calculateTotal()` 在循环内重复调用
建议:提取到循环外
【优化建议】
- 添加输入验证
- 使用 TypeScript 类型注解
- 添加单元测试
```
## 支持语言
- JavaScript / TypeScript
- Python
- Go
- Java
- C / C++
- Rust
代码生成助手。生成高质量代码、代码补全、模板生成。使用场景:(1) 代码生成,(2) 代码补全,(3) 模板生成,(4) 代码重构。
--- name: "code-generation" slug: skylv-code-generation version: 1.0.0 description: "代码生成助手。生成高质量代码、代码补全、模板生成。使用场景:(1) 代码生成,(2) 代码补全,(3) 模板生成,(4) 代码重构。" author: SKY-lv license: MIT-0 tags: [code, openclaw, agent] --- # Code Generation — 代码生成助手 ## 功能说明 辅助代码生成和补全。 ## 使用方法 ### 1. 代码生成 ``` 用户: 用Python写一个排序算法 ``` ### 2. 代码补全 ``` 用户: 补全这个函数的实现 ``` ### 3. 模板生成 ``` 用户: 生成一个React组件模板 ``` ### 4. 代码重构 ``` 用户: 重构这段代码更简洁 ```
云架构设计助手。设计AWS/GCP/Azure架构、成本优化、高可用。使用场景:(1) 云架构设计,(2) 成本优化,(3) 高可用方案,(4) 安全配置。
--- name: "cloud-architect" slug: skylv-cloud-architect version: 1.0.0 description: "云架构设计助手。设计AWS/GCP/Azure架构、成本优化、高可用。使用场景:(1) 云架构设计,(2) 成本优化,(3) 高可用方案,(4) 安全配置。" author: SKY-lv license: MIT-0 tags: [cloud, openclaw, agent] --- # Cloud Architect — 云架构助手 ## 功能说明 设计和优化云架构。 ## 使用方法 ### 1. 架构设计 ``` 用户: 设计一个高可用的Web架构 ``` ### 2. 成本优化 ``` 用户: 如何降低AWS成本? ``` ### 3. 高可用 ``` 用户: 设计多区域容灾架构 ``` ### 4. 安全配置 ``` 用户: 云上安全最佳实践 ```
CI/CD助手。设计流水线、配置自动化、部署脚本。使用场景:(1) 流水线设计,(2) GitHub Actions配置,(3) 部署脚本,(4) 测试集成。
--- name: "ci-cd-helper" slug: skylv-ci-cd-helper version: 1.0.0 description: "CI/CD助手。设计流水线、配置自动化、部署脚本。使用场景:(1) 流水线设计,(2) GitHub Actions配置,(3) 部署脚本,(4) 测试集成。" author: SKY-lv license: MIT-0 tags: [ci, openclaw, agent] --- # CI/CD Helper — 持续集成助手 ## 功能说明 设计和配置CI/CD流水线。 ## 使用方法 ### 1. 流水线设计 ``` 用户: 设计一个前端CI/CD流水线 ``` ### 2. GitHub Actions ``` 用户: 写一个GitHub Actions workflow ``` ### 3. 部署脚本 ``` 用户: 写一个自动部署脚本 ``` ### 4. 测试配置 ``` 用户: 配置单元测试和集成测试 ```
浏览器自动化Agent。使用Playwright/Puppeteer进行网页自动化、数据采集、表单填写。触发词:browser、浏览器自动化、爬虫、playwright、puppeteer、网页抓取。
---
name: "browser-automation-agent"
slug: skylv-browser-automation-agent
version: 1.0.2
description: Browser automation Agent. Web scraping, form filling, and UI automation using Playwright or Puppeteer. Triggers: browser automation, web scraping, playwright, puppeteer.
author: SKY-lv
license: MIT-0
tags: [browser, openclaw, agent]
keywords: browser, automation, playwright, puppeteer, scraping
triggers: browser automation agent
---
# Browser Automation Agent
## 功能说明
AI驱动的浏览器自动化,执行复杂网页任务。
## 技术选型
| 库 | 特点 | 适用场景 |
|----|------|----------|
| Playwright | 跨浏览器、等待稳定 | 通用自动化 |
| Puppeteer | Chrome专用、Node原生 | Chrome深度控制 |
| Selenium | 多语言、老牌稳定 | 多浏览器兼容 |
| DrissionPage | Python、轻量 | 中国网站适配 |
## Playwright 完整实现
### 1. 基础框架
```python
from playwright.sync_api import sync_playwright, Page, Browser, BrowserContext
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class TaskResult:
success: bool
data: Optional[dict] = None
error: Optional[str] = None
screenshot: Optional[str] = None
class BrowserAgent:
def __init__(self, headless: bool = True, slow_mo: int = 100):
self.playwright = sync_playwright().start()
self.browser: Browser = self.playwright.chromium.launch(
headless=headless,
slow_mo=slow_mo
)
self.context: BrowserContext = self.browser.new_context(
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
)
def new_page(self) -> Page:
return self.context.new_page()
def execute(self, task: str) -> TaskResult:
"""执行AI指令"""
page = self.new_page()
try:
# AI解析任务
steps = self.plan_steps(task)
for step in steps:
self.execute_step(page, step)
return TaskResult(success=True, data={"steps_completed": len(steps)})
except Exception as e:
return TaskResult(success=False, error=str(e))
finally:
page.close()
def plan_steps(self, task: str) -> list[dict]:
"""AI规划步骤"""
# 这里集成LLM进行任务规划
# ...
pass
def execute_step(self, page: Page, step: dict):
action = step["action"]
if action == "goto":
page.goto(step["url"], wait_until="domcontentloaded")
elif action == "click":
page.click(step["selector"])
elif action == "type":
page.fill(step["selector"], step["text"])
elif action == "wait":
page.wait_for_timeout(step["ms"])
elif action == "screenshot":
page.screenshot(path=step["path"])
elif action == "extract":
return page.query_selector(step["selector"]).inner_text()
def close(self):
self.browser.close()
self.playwright.stop()
```
### 2. 智能元素定位
```python
class SmartLocator:
"""AI智能定位器"""
LOCATOR_STRATEGIES = [
"get_by_role", # 无障碍角色(最可靠)
"get_by_label", # 表单标签
"get_by_placeholder", # 占位符
"get_by_text", # 文本内容
"locator", # CSS选择器
"xpath", # XPath(最后备选)
]
def find(self, page: Page, description: str) -> Locator:
"""根据描述智能查找元素"""
strategies = [
# 按钮查找
lambda: page.get_by_role("button", name=description),
lambda: page.get_by_role("link", name=description),
# 输入框查找
lambda: page.get_by_label(description),
lambda: page.get_by_placeholder(description),
# 文本查找
lambda: page.get_by_text(description, exact=True),
lambda: page.get_by_text(description),
# 通配符(放在最后)
lambda: page.locator(f"text={description}").first,
]
for strategy in strategies:
try:
locator = strategy()
if locator.count() > 0:
return locator.first
except:
continue
raise ElementNotFound(f"找不到元素: {description}")
def find_many(self, page: Page, description: str) -> list:
"""批量查找元素"""
# 尝试多种策略
# ...
pass
```
### 3. 表单自动填写
```python
class FormFiller:
"""智能表单填写"""
def fill_form(self, page: Page, form_data: dict):
"""根据字段描述自动填写"""
for field_name, value in form_data.items():
try:
# 尝试找对应label的输入框
locator = page.get_by_label(field_name, exact=False)
# 获取输入类型
if locator.count() == 0:
# 尝试placeholder
locator = page.get_by_placeholder(field_name)
if locator.count() == 0:
# 尝试名称属性
locator = page.locator(f'[name="{field_name}"]')
if locator.count() > 0:
el = locator.first
tag = el.evaluate("el => el.tagName")
if tag == "SELECT":
el.select_option(value)
elif tag == "INPUT":
input_type = el.get_attribute("type")
if input_type in ["checkbox", "radio"]:
if value:
el.check()
else:
el.fill(str(value))
else:
el.fill(str(value))
except Exception as e:
print(f"填写字段 {field_name} 失败: {e}")
def extract_form(self, page: Page, form_selector: str = "form") -> dict:
"""提取表单数据"""
form = page.locator(form_selector).first
data = {}
inputs = form.locator("input, select, textarea")
for inp in inputs.all():
name = inp.get_attribute("name") or inp.get_attribute("id")
if not name: continue
input_type = inp.get_attribute("type") or "text"
if input_type in ["submit", "button", "reset", "image"]: continue
if input_type == "checkbox":
data[name] = inp.is_checked()
elif input_type == "radio":
checked = form.locator(f'input[name="{name}"]:checked')
data[name] = checked.get_attribute("value") if checked.count() > 0 else None
else:
data[name] = inp.input_value()
return data
```
### 4. 数据采集
```python
class WebScraper:
"""网页数据采集"""
def scrape_table(self, page: Page, table_selector: str) -> list[dict]:
"""采集表格数据"""
table = page.locator(table_selector).first
# 获取表头
headers = table.locator("thead th, th").all_text_contents()
# 采集每行
rows = []
for row in table.locator("tbody tr, tr").all():
cells = row.locator("td, th").all_text_contents()
if len(cells) == len(headers):
rows.append(dict(zip(headers, cells)))
return rows
def scrape_cards(self, page: Page, card_selector: str, fields: dict) -> list[dict]:
"""采集卡片式列表"""
cards = page.locator(card_selector).all()
results = []
for card in cards:
item = {}
for field_name, selector in fields.items():
try:
el = card.locator(selector).first
item[field_name] = el.inner_text().strip()
except:
item[field_name] = None
results.append(item)
return results
def scroll_scrape(self, page: Page, item_selector: str, max_items: int = 100) -> list:
"""滚动加载采集"""
items = []
last_count = 0
while len(items) < max_items:
# 滚动
page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
page.wait_for_timeout(1000)
# 采集新数据
new_items = page.locator(item_selector).all_text_contents()
items.extend(new_items[last_count:])
last_count = len(items)
# 检查是否到底
if len(items) == last_count:
break
return items[:max_items]
```
### 5. 反爬对抗
```python
class AntiDetection:
"""反检测"""
@staticmethod
def stealth(context: BrowserContext):
"""Stealth模式"""
# 随机User-Agent
ua_list = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/17.2",
]
context.set_extra_http_headers({"User-Agent": random.choice(ua_list)})
# 注入反检测脚本
context.add_init_script("""
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
window.chrome = { runtime: {} };
""")
@staticmethod
def human_delay(page: Page, min_ms=50, max_ms=200):
"""模拟人类延迟"""
import random
import time
time.sleep(random.uniform(min_ms, max_ms) / 1000)
```
## Puppeteer 实现
```javascript
const { chromium } = require('puppeteer');
class BrowserAutomation {
async init() {
this.browser = await chromium.launch({
headless: true,
args: ['--disable-blink-features=AutomationControlled']
});
// Stealth
const context = await this.browser.newContext({
viewport: { width: 1920, height: 1080 },
userAgent: 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
});
// Remove webdriver flag
await context.addInitScript(() => {
Object.defineProperty(navigator, 'webdriver', { get: () => false });
});
this.page = await context.newPage();
}
async goto(url) {
await this.page.goto(url, { waitUntil: 'networkidle2' });
await this.page.waitForTimeout(1000);
}
async click(selector) {
await this.page.locator(selector).first().click();
}
async type(selector, text) {
await this.page.locator(selector).fill(text);
}
async extract(selector) {
return await this.page.locator(selector).allTextContents();
}
async screenshot(path) {
await this.page.screenshot({ path, fullPage: true });
}
async close() {
await this.browser.close();
}
}
```
## 常见任务模板
### 登录 + 数据采集
```python
with BrowserAgent() as agent:
page = agent.new_page()
# 登录
page.goto("https://example.com/login")
page.fill("#username", "[email protected]")
page.fill("#password", "password123")
page.click("button[type='submit']")
page.wait_for_url("**/dashboard")
# 采集数据
page.goto("https://example.com/data")
data = WebScraper().scrape_table(page, "table.data")
print(data)
```
### 批量截图
```python
urls = ["https://site1.com", "https://site2.com", "https://site3.com"]
with BrowserAgent() as agent:
for i, url in enumerate(urls):
page = agent.new_page()
page.goto(url, wait_until="networkidle")
page.screenshot(path=f"screenshot_{i}.png")
page.close()
```
## 最佳实践
1. **等待策略**:用 `wait_for_selector` 而不是固定sleep
2. **重试机制**:网络不稳定时自动重试
3. **异常处理**:每个操作都要try-catch
4. **资源清理**:总是关闭page和context
5. **隐身模式**:每个任务用独立的context避免cookies污染
## Usage
1. Install the skill
2. Configure as needed
3. Run with OpenClaw
FILE:skill.json
{
"name": "browser-automation-agent",
"version": "1.0.0",
"description": "Browser Automation Agent - Playwright/Puppeteer intelligent scraping",
"author": "SKY-lv",
"license": "MIT",
"keywords": [
"browser-automation",
"playwright",
"scraping",
"openclaw",
"skill"
],
"repository": "https://github.com/SKY-lv/browser-automation-agent",
"main": "SKILL.md"
}API网关管理。配置和管理API网关,处理流量路由和限流。
---
name: "Api Gateway"
slug: skylv-api-gateway
version: 1.0.0
description: "API网关管理。配置和管理API网关,处理流量路由和限流。"
author: SKY-lv
license: MIT-0
tags: [openclaw, agent, api]
---
# API Gateway Agent Skill
> AI-powered API gateway and traffic management
## 功能
- **API代理** - 统一入口、请求转发
- **流量控制** - 限流、熔断、降级
- **认证授权** - JWT、OAuth2、API Key
- **监控统计** - 请求日志、性能指标
- **文档生成** - 自动生成API文档
## 使用场景
```
用户: 帮我配置一个API网关,限流1000/min
Agent: [调用api-gateway skill配置网关规则]
```
## 工具函数
### `create_gateway(config)`
创建API网关。
**参数:**
```python
{
"name": "main-gateway",
"routes": [
{
"path": "/api/v1/*",
"target": "http://backend:8080",
"methods": ["GET", "POST"]
}
],
"rate_limit": {
"requests": 1000,
"window": "1m"
},
"auth": {
"type": "jwt",
"secret": "JWT_SECRET"
}
}
```
### `add_route(gateway_id, route_config)`
添加路由。
### `set_rate_limit(gateway_id, limit)`
设置限流。
### `get_metrics(gateway_id)`
获取监控指标。
## 配置
```json
{
"backend": "envoy",
"persistence": "redis",
"monitoring": {
"prometheus": true,
"grafana": true
},
"security": {
"cors": true,
"https_redirect": true
}
}
```
## 安装
```bash
clawhub install SKY-lv/api-gateway
```
## License
MIT
FILE:skill.json
{
"name": "api-gateway",
"version": "1.0.0",
"description": "API Gateway Agent - Traffic management, authentication, monitoring",
"author": "SKY-lv",
"license": "MIT",
"keywords": ["api-gateway", "rate-limiting", "authentication", "ai-agent", "openclaw"],
"repository": "https://github.com/SKY-lv/api-gateway",
"main": "SKILL.md"
}
API文档助手。生成API文档、编写Swagger/OpenAPI、示例代码。使用场景:(1) 文档生成,(2) Swagger配置,(3) 示例代码,(4) 版本管理。
--- name: "api-documentation" slug: skylv-api-documentation version: 1.0.0 description: "API文档助手。生成API文档、编写Swagger/OpenAPI、示例代码。使用场景:(1) 文档生成,(2) Swagger配置,(3) 示例代码,(4) 版本管理。" author: SKY-lv license: MIT-0 tags: [api, openclaw, agent] --- # API Documentation — API文档助手 ## 功能说明 生成和管理API文档。 ## 使用方法 ### 1. 文档生成 ``` 用户: 根据这个代码生成API文档 ``` ### 2. Swagger配置 ``` 用户: 写一个Swagger/OpenAPI配置 ``` ### 3. 示例代码 ``` 用户: 生成各语言的API调用示例 ``` ### 4. 版本管理 ``` 用户: 如何管理API版本? ```
AI架构设计助手。设计AI系统架构、技术选型、性能优化。使用场景:(1) AI系统架构设计,(2) 技术选型建议,(3) 性能优化方案,(4) 成本优化策略。
--- name: "ai-architect" slug: skylv-ai-architect version: 1.0.0 description: "AI架构设计助手。设计AI系统架构、技术选型、性能优化。使用场景:(1) AI系统架构设计,(2) 技术选型建议,(3) 性能优化方案,(4) 成本优化策略。" author: SKY-lv license: MIT-0 tags: [ai, openclaw, agent] --- # AI Architect — AI架构设计助手 ## 功能说明 设计和优化AI系统架构。 ## 使用方法 ### 1. 系统架构 ``` 用户: 设计一个RAG系统架构 ``` ### 2. 技术选型 ``` 用户: 推荐什么向量数据库? ``` ### 3. 性能优化 ``` 用户: 如何优化LLM响应速度? ``` ### 4. 成本优化 ``` 用户: 如何降低AI API调用成本? ```
Agent 记忆系统设计助手。构建长期记忆、短期记忆、情景记忆架构。触发词:记忆、memory、上下文管理、上下文窗口。
---
name: "agent-memory-system"
slug: skylv-agent-memory-system
version: 1.0.0
description: "Agent 记忆系统设计助手。构建长期记忆、短期记忆、情景记忆架构。触发词:记忆、memory、上下文管理、上下文窗口。"
author: SKY-lv
license: MIT-0
tags: [agent, openclaw, agent]
---
# Agent Memory System
## 功能说明
设计 Agent 持久化记忆系统,优化上下文管理。
## 记忆分层架构
```
┌─────────────────────────────────────┐
│ Working Memory (上下文) │ ← 当前对话,LLM直接访问
├─────────────────────────────────────┤
│ Short-Term (会话记忆) │ ← 当前会话,SESSION
├─────────────────────────────────────┤
│ Long-Term (持久记忆) │ ← 跨会话,数据库/文件
├─────────────────────────────────────┤
│ Semantic (向量记忆) │ ← RAG,向量检索
├─────────────────────────────────────┤
│ Procedural (程序记忆) │ ← 工具/Skill定义
└─────────────────────────────────────┘
```
## 完整实现
### 1. 记忆核心类
```typescript
interface MemoryEntry {
id: string;
type: 'episodic' | 'semantic' | 'procedural';
content: string;
timestamp: number;
importance: number; // 0-10,重要程度
accessCount: number; // 访问次数
tags: string[];
metadata: Record<string, any>;
}
class AgentMemory {
private shortTerm: Map<string, MemoryEntry[]> = new Map();
private longTerm: SQLiteDatabase;
private vectorStore: ChromaClient;
private sessionId: string;
constructor(sessionId: string, dbPath: string) {
this.sessionId = sessionId;
this.longTerm = new SQLiteDatabase(dbPath);
this.vectorStore = new ChromaClient({ path: './chroma' });
this.initDatabase();
}
private initDatabase() {
this.longTerm.exec(`
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
content TEXT NOT NULL,
timestamp INTEGER NOT NULL,
importance INTEGER DEFAULT 5,
access_count INTEGER DEFAULT 0,
tags TEXT,
metadata TEXT
);
CREATE INDEX IF NOT EXISTS idx_timestamp ON memories(timestamp);
CREATE INDEX IF NOT EXISTS idx_importance ON memories(importance);
CREATE INDEX IF NOT EXISTS idx_type ON memories(type);
`);
}
// 添加记忆
async add(entry: Omit<MemoryEntry, 'id' | 'accessCount'>) {
const id = crypto.randomUUID();
const full: MemoryEntry = { ...entry, id, accessCount: 0 };
// 短期记忆
if (!this.shortTerm.has(this.sessionId)) {
this.shortTerm.set(this.sessionId, []);
}
this.shortTerm.get(this.sessionId)!.push(full);
// 持久化
this.longTerm.prepare(
`INSERT OR REPLACE INTO memories VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
).run(
full.id, full.type, full.content, full.timestamp,
full.importance, full.accessCount,
JSON.stringify(full.tags), JSON.stringify(full.metadata)
);
// 向量化(重要记忆)
if (full.importance >= 7) {
await this.vectorStore.add({
ids: [full.id],
embeddings: [await this.embed(full.content)],
documents: [full.content],
metadatas: [{ type: full.type, tags: full.tags.join(',') }]
});
}
return full;
}
// 检索记忆
async retrieve(query: string, limit = 10): Promise<MemoryEntry[]> {
// 1. 语义检索
const queryEmbedding = await this.embed(query);
const semantic = await this.vectorStore.search({
query_embeddings: [queryEmbedding],
n_results: limit
});
// 2. 关键词检索
const keywords = query.toLowerCase().split(/\s+/);
let sql = `SELECT * FROM memories WHERE `;
sql += keywords.map(k => `content LIKE '%k%'`).join(' OR ');
sql += ` ORDER BY importance DESC, timestamp DESC LIMIT limit`;
const keyword = this.longTerm.exec(sql).all() as MemoryEntry[];
// 3. 去重合并
const seen = new Set<string>();
const results: MemoryEntry[] = [];
[...semantic.results, ...keyword].forEach(m => {
if (!seen.has(m.id)) {
seen.add(m.id);
m.accessCount++;
results.push(m);
}
});
// 更新访问计数
results.forEach(m => {
this.longTerm.prepare(
`UPDATE memories SET access_count = ? WHERE id = ?`
).run(m.accessCount, m.id);
});
return results;
}
// 获取短期记忆(当前会话)
getShortTerm(): MemoryEntry[] {
return this.shortTerm.get(this.sessionId) || [];
}
// 压缩短期记忆到长期
async consolidate() {
const shortTerm = this.getShortTerm();
// 保留最重要的记忆
const important = shortTerm
.filter(m => m.importance >= 6)
.sort((a, b) => b.importance - a.importance)
.slice(0, 50);
// 合并重复
const merged = this.mergeSimilar(important);
// 更新短期记忆
this.shortTerm.set(this.sessionId, merged);
}
// 遗忘低价值记忆
async forget(threshold = 2) {
// 遗忘低重要度、低访问的记忆
this.longTerm.prepare(
`DELETE FROM memories WHERE importance < ? AND access_count < ?`
).run(threshold, threshold);
// 清理向量库
await this.vectorStore.delete({
where: { importance: { $lt: threshold } }
});
}
// 构建上下文
async buildContext(query: string, maxTokens = 6000): Promise<string> {
const memories = await this.retrieve(query, 20);
const shortTerm = this.getShortTerm();
const parts: string[] = [];
let totalTokens = 0;
// 短期记忆优先
for (const m of [...shortTerm.reverse(), ...memories]) {
const text = `[m.type] m.content`;
const tokens = Math.ceil(text.length / 4);
if (totalTokens + tokens > maxTokens) break;
parts.unshift(text);
totalTokens += tokens;
}
return `## 记忆上下文\n\nparts.join('\n')`;
}
private async embed(text: string): Promise<number[]> {
// 调用 embedding API
const response = await fetch('https://api.openai.com/v1/embeddings', {
method: 'POST',
headers: { 'Authorization': `Bearer process.env.OPENAI_API_KEY` },
body: JSON.stringify({ model: 'text-embedding-3-small', input: text })
});
const data = await response.json();
return data.data[0].embedding;
}
private mergeSimilar(memories: MemoryEntry[]): MemoryEntry[] {
const merged: MemoryEntry[] = [];
for (const m of memories) {
const similar = merged.find(g =>
g.type === m.type &&
this.similarity(g.content, m.content) > 0.8
);
if (similar) {
// 合并,保留最新的时间戳和最高的importance
similar.importance = Math.max(similar.importance, m.importance);
similar.timestamp = Math.max(similar.timestamp, m.timestamp);
} else {
merged.push(m);
}
}
return merged;
}
private similarity(a: string, b: string): number {
const setA = new Set(a.toLowerCase());
const setB = new Set(b.toLowerCase());
const intersection = new Set([...setA].filter(x => setB.has(x)));
const union = new Set([...setA, ...setB]);
return intersection.size / union.size;
}
}
```
### 2. 使用示例
```typescript
// 初始化
const memory = new AgentMemory(sessionId, './memory.db');
// 对话时注入记忆
async function chat(message: string) {
const context = await memory.buildContext(message);
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: '你是助手的记忆系统使用指南。' },
{ role: 'system', content: context },
{ role: 'user', content: message }
]
});
const answer = response.choices[0].message.content;
// 自动存储重要信息
if (containsActionableInfo(message, answer)) {
await memory.add({
type: 'semantic',
content: extractKeyInfo(answer),
timestamp: Date.now(),
importance: 7,
tags: ['用户问答'],
metadata: { source: 'chat' }
});
}
return answer;
}
// 定期压缩
setInterval(() => memory.consolidate(), 30 * 60 * 1000); // 每30分钟
setInterval(() => memory.forget(), 24 * 60 * 60 * 1000); // 每天
```
### 3. 重要性评估
```typescript
function evaluateImportance(text: string, context: string): number {
let score = 5; // 基础分
// 明确的重要词
const importantKeywords = [
'记住', '重要', '必须', '关键', '别忘了', '优先',
'deadline', '截止', '紧急', '用户偏好', '账号', '密码'
];
for (const kw of importantKeywords) {
if (text.includes(kw)) score += 1;
}
// 用户明确要求记住
if (text.match(/记住|remember|save|存储/)) score += 3;
// 重复出现
if (context.includes(text)) score += 2;
// 限制范围
return Math.min(10, Math.max(0, score));
}
```
## 存储策略
| 类型 | 存储 | 容量 | TTL |
|------|------|------|-----|
| 工作记忆 | 上下文窗口 | 128K tokens | 会话内 |
| 短期记忆 | Redis/Memory | 100条 | 24小时 |
| 长期记忆 | SQLite | 无限制 | 永久 |
| 向量记忆 | Chroma/Milvus | 按需 | 定期清理 |
## 最佳实践
1. **自动摘要**:长记忆定期压缩成摘要
2. **分层索引**:按类型、时间、重要性多维索引
3. **增量更新**:避免重复存储相似内容
4. **隐私保护**:敏感信息加密存储
5. **容量管理**:设置token预算,定期清理
FILE:skill.json
{
"name": "agent-memory-system",
"version": "1.0.0",
"description": "Agent Memory System - Hierarchical memory architecture for AI agents",
"author": "SKY-lv",
"license": "MIT",
"keywords": [
"ai-agent",
"memory",
"llm",
"openclaw",
"openclaw",
"skill"
],
"repository": "https://github.com/SKY-lv/agent-memory-system",
"main": "SKILL.md"
}Agent发现探索助手。搜索、评估、集成公开Agent。使用场景:(1) 搜索可用Agent,(2) 评估Agent能力,(3) 集成方案设计,(4) 成本效益分析。
--- name: "agent-discovery" slug: skylv-agent-discovery version: 1.0.0 description: "Agent发现探索助手。搜索、评估、集成公开Agent。使用场景:(1) 搜索可用Agent,(2) 评估Agent能力,(3) 集成方案设计,(4) 成本效益分析。" author: SKY-lv license: MIT-0 tags: [agent, openclaw, agent] --- # Agent Discovery — Agent发现助手 ## 功能说明 帮助发现和评估可用的Agent系统。 ## 使用方法 ### 1. 能力搜索 ``` 用户: 找一下有哪些免费的代码审查Agent? ``` ### 2. 评估对比 ``` 用户: 比较一下Claude和GPT的Agent能力 ``` ### 3. 集成方案 ``` 用户: 如何把这个Agent集成到我们的系统? ``` ### 4. 成本分析 ``` 用户: 使用这个Agent的成本是多少? ```
AI Agent 架构设计器。提供 10+ 种 Agent 架构模板(客服/销售/开发/运营),包含完整的人设、工具集、工作流程。Triggers: build agent, design agent, agent architecture, agent template.
--- name: agent-builder slug: skylv-agent-builder version: 2.0.2 description: AI Agent architecture designer. Provides 10+ Agent templates (customer service/sales/dev/ops) with full personas, toolkits, and workflow patterns. Triggers: agent architecture, design agent, agent template. author: SKY-lv license: MIT tags: [agent, architecture, template, openclaw, design] keywords: openclaw, skill, automation, ai-agent triggers: agent builder --- # Agent Builder — AI Agent 架构设计器 ## 功能说明 提供 10+ 种生产级 AI Agent 架构模板,每个模板包含完整的人设定义、工具集配置、工作流程和交付物标准。不是泛泛而谈的"助手",而是可立即部署的 Agent 蓝图。 ## 10+ 种 Agent 架构模板 ### 1. 客服 Agent (Customer Support) ```yaml identity: role: 客服专家 experience: 5 年电商平台客服经验 tone: 专业、耐心、同理心 capabilities: - 订单查询(对接 ERP API) - 退换货处理(工作流引擎) - 投诉升级(人工转接规则) - 常见问题解答(RAG 知识库) tools: - order_lookup: 查询订单状态 - refund_process: 处理退款 - ticket_create: 创建工单 - knowledge_search: 知识库检索 workflow: 1. 识别用户意图(分类模型) 2. 简单问题 → 直接回答(知识库) 3. 复杂问题 → 调用工具(API) 4. 投诉/升级 → 创建工单 + 人工转接 metrics: - 首次响应时间 < 30 秒 - 解决率 > 85% - 满意度 > 4.5/5 ``` ### 2. 销售 Agent (Sales Development) ```yaml identity: role: 销售开发代表 (SDR) experience: 3 年 B2B 销售经验 tone: 热情、专业、结果导向 capabilities: - 潜在客户 qualification(BANT 框架) - 产品演示安排(日历集成) - 报价生成(CPQ 系统) - 跟进提醒(CRM 同步) tools: - lead_score: 潜在客户评分 - meeting_schedule: 安排演示 - quote_generate: 生成报价 - crm_update: 更新 CRM 记录 workflow: 1. 线索进入 → BANT 评分 2. 合格线索 → 安排产品演示 3. 演示后 → 发送报价 4. 跟进 → CRM 记录 + 提醒 qualification_framework: Budget: 预算范围? Authority: 决策权? Need: 需求匹配度? Timeline: 时间线? ``` ### 3. 开发工程师 Agent (Software Engineer) ```yaml identity: role: 全栈开发工程师 experience: 8 年经验,精通 React/Node.js/Python tone: 严谨、注重代码质量 capabilities: - 需求分析(用户故事拆解) - 代码生成(多语言支持) - 代码审查(OWASP/最佳实践) - 测试编写(单元测试/E2E) - 部署配置(Docker/K8s) tools: - code_generate: 生成代码 - code_review: 代码审查 - test_create: 编写测试 - docker_config: Docker 配置 - ci_cd_setup: CI/CD 流水线 workflow: 1. 需求分析 → 用户故事 + 验收标准 2. 技术方案 → 架构设计 + API 定义 3. 代码实现 → 生成 + 审查 + 测试 4. 部署 → Docker + CI/CD code_quality_checks: - ESLint/Prettier 规范 - 单元测试覆盖率 > 80% - 安全漏洞扫描(OWASP Top 10) - 性能优化建议 ``` ### 4. 内容运营 Agent (Content Operator) ```yaml identity: role: 内容运营专家 experience: 5 年新媒体运营,10w+ 爆款经验 tone: 创意、数据驱动、网感好 capabilities: - 选题策划(热点追踪 + 关键词分析) - 内容创作(多平台适配) - SEO 优化(关键词布局) - 数据分析(阅读量/转化率) - A/B 测试(标题/封面优化) tools: - trend_tracker: 热点追踪 - keyword_research: 关键词分析 - content_generate: 内容生成 - seo_optimize: SEO 优化 - analytics_report: 数据报告 workflow: 1. 选题 → 热点 + 关键词 + 竞品分析 2. 创作 → 初稿 + SEO 优化 + 多平台适配 3. 发布 → 定时 + 多渠道分发 4. 分析 → 数据报告 + 优化建议 platforms: - 公众号:深度文,2000-3000 字 - 小红书:种草笔记,500-800 字 + 图片 - 抖音:短视频脚本,30-60 秒 - B 站:长视频脚本,5-15 分钟 ``` ### 5. 数据分析师 Agent (Data Analyst) ```yaml identity: role: 数据分析师 experience: 6 年经验,精通 SQL/Python/Tableau tone: 理性、数据驱动、洞察深刻 capabilities: - 数据查询(SQL/NoSQL) - 数据清洗(Pandas/Spark) - 可视化(Tableau/PowerBI) - 统计分析(假设检验/回归) - 洞察报告(业务建议) tools: - sql_query: 数据查询 - data_clean: 数据清洗 - chart_create: 图表生成 - stat_analysis: 统计分析 - insight_report: 洞察报告 workflow: 1. 需求确认 → 指标定义 + 数据源 2. 数据提取 → SQL 查询 + 清洗 3. 分析建模 → 统计分析 + 可视化 4. 报告输出 → 洞察 + 业务建议 analysis_frameworks: - AARRR 模型(获客/激活/留存/变现/推荐) - RFM 模型(用户价值分层) - 漏斗分析(转化路径优化) ``` ### 6. 产品经理 Agent (Product Manager) ### 7. UX 设计师 Agent (UX Designer) ### 8. DevOps 工程师 Agent (DevOps Engineer) ### 9. 安全专家 Agent (Security Expert) ### 10. 增长黑客 Agent (Growth Hacker) ## 使用方法 ### 方式一:选择模板 ``` 用户:创建一个客服 Agent Agent: 调用 agent-builder,输出客服 Agent 完整架构(见上方模板) ``` ### 方式二:自定义设计 ``` 用户:设计一个跨境电商运营 Agent,需要选品、投放、客服能力 Agent: 1. 分析需求 → 识别核心能力 2. 组合模板 → 客服 + 运营 + 数据分析 3. 定制工具 → 选品工具、广告投放 API、多语言客服 4. 输出完整架构 ``` ### 方式三:多 Agent 协作 ``` 用户:创建一个电商项目,需要产品 + 设计 + 开发 + 运营协作 Agent: - 产品经理 Agent:需求分析 + PRD - UX 设计师 Agent:原型设计 + 交互流程 - 开发工程师 Agent:技术实现 + 代码审查 - 内容运营 Agent:上线推广 + 数据分析 ``` ## Agent 架构核心要素 每个 Agent 必须定义: | 要素 | 说明 | 示例 | |------|------|------| | **Identity** | 身份人设 | "5 年客服经验,专业耐心" | | **Capabilities** | 核心能力 | 订单查询、退换货处理 | | **Tools** | 工具集 | order_lookup, refund_process | | **Workflow** | 工作流程 | 识别意图 → 调用工具 → 输出 | | **Metrics** | 成功指标 | 响应时间<30s, 解决率>85% | | **Guardrails** | 安全边界 | 不承诺退款金额,不泄露用户数据 | ## 相关文件 - [agency-agents](./agency-agents.md) — 193 个 AI 专家角色库 - [multi-agent-orchestrator](./multi-agent-orchestrator.md) — 多 Agent 编排系统 - [Hermes Agent](./hermes-agent-integration.md) — 自改进 AI Agent ## 触发词 - 自动:检测 agent、架构、设计、模板相关关键词 - 手动:/agent-builder, /agent-template, /design-agent - 短语:创建 Agent、设计 Agent、Agent 架构、Agent 模板 ## Usage 1. Install the skill 2. Configure as needed 3. Run with OpenClaw
API文档助手。生成API文档、编写Swagger/OpenAPI、示例代码。使用场景:(1) 文档生成,(2) Swagger配置,(3) 示例代码,(4) 版本管理。
---
name: api-documentation
description: "API文档助手。生成API文档、编写Swagger/OpenAPI、示例代码。使用场景:(1) 文档生成,(2) Swagger配置,(3) 示例代码,(4) 版本管理。"
metadata: {"openclaw": {"emoji": "📖"}}
---
# API Documentation — API文档助手
## 功能说明
生成和管理API文档。
## 使用方法
### 1. 文档生成
```
用户: 根据这个代码生成API文档
```
### 2. Swagger配置
```
用户: 写一个Swagger/OpenAPI配置
```
### 3. 示例代码
```
用户: 生成各语言的API调用示例
```
### 4. 版本管理
```
用户: 如何管理API版本?
```AI架构设计助手。设计AI系统架构、技术选型、性能优化。使用场景:(1) AI系统架构设计,(2) 技术选型建议,(3) 性能优化方案,(4) 成本优化策略。
---
name: ai-architect
description: "AI架构设计助手。设计AI系统架构、技术选型、性能优化。使用场景:(1) AI系统架构设计,(2) 技术选型建议,(3) 性能优化方案,(4) 成本优化策略。"
metadata: {"openclaw": {"emoji": "🏗️"}}
---
# AI Architect — AI架构设计助手
## 功能说明
设计和优化AI系统架构。
## 使用方法
### 1. 系统架构
```
用户: 设计一个RAG系统架构
```
### 2. 技术选型
```
用户: 推荐什么向量数据库?
```
### 3. 性能优化
```
用户: 如何优化LLM响应速度?
```
### 4. 成本优化
```
用户: 如何降低AI API调用成本?
```
Agent 记忆系统设计助手。构建长期记忆、短期记忆、情景记忆架构。触发词:记忆、memory、上下文管理、上下文窗口。
---
name: agent-memory-system
description: "Agent 记忆系统设计助手。构建长期记忆、短期记忆、情景记忆架构。触发词:记忆、memory、上下文管理、上下文窗口。"
metadata: {"openclaw": {"emoji": "🧬"}}
---
# Agent Memory System
## 功能说明
设计 Agent 持久化记忆系统,优化上下文管理。
## 记忆分层架构
```
┌─────────────────────────────────────┐
│ Working Memory (上下文) │ ← 当前对话,LLM直接访问
├─────────────────────────────────────┤
│ Short-Term (会话记忆) │ ← 当前会话,SESSION
├─────────────────────────────────────┤
│ Long-Term (持久记忆) │ ← 跨会话,数据库/文件
├─────────────────────────────────────┤
│ Semantic (向量记忆) │ ← RAG,向量检索
├─────────────────────────────────────┤
│ Procedural (程序记忆) │ ← 工具/Skill定义
└─────────────────────────────────────┘
```
## 完整实现
### 1. 记忆核心类
```typescript
interface MemoryEntry {
id: string;
type: 'episodic' | 'semantic' | 'procedural';
content: string;
timestamp: number;
importance: number; // 0-10,重要程度
accessCount: number; // 访问次数
tags: string[];
metadata: Record<string, any>;
}
class AgentMemory {
private shortTerm: Map<string, MemoryEntry[]> = new Map();
private longTerm: SQLiteDatabase;
private vectorStore: ChromaClient;
private sessionId: string;
constructor(sessionId: string, dbPath: string) {
this.sessionId = sessionId;
this.longTerm = new SQLiteDatabase(dbPath);
this.vectorStore = new ChromaClient({ path: './chroma' });
this.initDatabase();
}
private initDatabase() {
this.longTerm.exec(`
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
content TEXT NOT NULL,
timestamp INTEGER NOT NULL,
importance INTEGER DEFAULT 5,
access_count INTEGER DEFAULT 0,
tags TEXT,
metadata TEXT
);
CREATE INDEX IF NOT EXISTS idx_timestamp ON memories(timestamp);
CREATE INDEX IF NOT EXISTS idx_importance ON memories(importance);
CREATE INDEX IF NOT EXISTS idx_type ON memories(type);
`);
}
// 添加记忆
async add(entry: Omit<MemoryEntry, 'id' | 'accessCount'>) {
const id = crypto.randomUUID();
const full: MemoryEntry = { ...entry, id, accessCount: 0 };
// 短期记忆
if (!this.shortTerm.has(this.sessionId)) {
this.shortTerm.set(this.sessionId, []);
}
this.shortTerm.get(this.sessionId)!.push(full);
// 持久化
this.longTerm.prepare(
`INSERT OR REPLACE INTO memories VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
).run(
full.id, full.type, full.content, full.timestamp,
full.importance, full.accessCount,
JSON.stringify(full.tags), JSON.stringify(full.metadata)
);
// 向量化(重要记忆)
if (full.importance >= 7) {
await this.vectorStore.add({
ids: [full.id],
embeddings: [await this.embed(full.content)],
documents: [full.content],
metadatas: [{ type: full.type, tags: full.tags.join(',') }]
});
}
return full;
}
// 检索记忆
async retrieve(query: string, limit = 10): Promise<MemoryEntry[]> {
// 1. 语义检索
const queryEmbedding = await this.embed(query);
const semantic = await this.vectorStore.search({
query_embeddings: [queryEmbedding],
n_results: limit
});
// 2. 关键词检索
const keywords = query.toLowerCase().split(/\s+/);
let sql = `SELECT * FROM memories WHERE `;
sql += keywords.map(k => `content LIKE '%k%'`).join(' OR ');
sql += ` ORDER BY importance DESC, timestamp DESC LIMIT limit`;
const keyword = this.longTerm.exec(sql).all() as MemoryEntry[];
// 3. 去重合并
const seen = new Set<string>();
const results: MemoryEntry[] = [];
[...semantic.results, ...keyword].forEach(m => {
if (!seen.has(m.id)) {
seen.add(m.id);
m.accessCount++;
results.push(m);
}
});
// 更新访问计数
results.forEach(m => {
this.longTerm.prepare(
`UPDATE memories SET access_count = ? WHERE id = ?`
).run(m.accessCount, m.id);
});
return results;
}
// 获取短期记忆(当前会话)
getShortTerm(): MemoryEntry[] {
return this.shortTerm.get(this.sessionId) || [];
}
// 压缩短期记忆到长期
async consolidate() {
const shortTerm = this.getShortTerm();
// 保留最重要的记忆
const important = shortTerm
.filter(m => m.importance >= 6)
.sort((a, b) => b.importance - a.importance)
.slice(0, 50);
// 合并重复
const merged = this.mergeSimilar(important);
// 更新短期记忆
this.shortTerm.set(this.sessionId, merged);
}
// 遗忘低价值记忆
async forget(threshold = 2) {
// 遗忘低重要度、低访问的记忆
this.longTerm.prepare(
`DELETE FROM memories WHERE importance < ? AND access_count < ?`
).run(threshold, threshold);
// 清理向量库
await this.vectorStore.delete({
where: { importance: { $lt: threshold } }
});
}
// 构建上下文
async buildContext(query: string, maxTokens = 6000): Promise<string> {
const memories = await this.retrieve(query, 20);
const shortTerm = this.getShortTerm();
const parts: string[] = [];
let totalTokens = 0;
// 短期记忆优先
for (const m of [...shortTerm.reverse(), ...memories]) {
const text = `[m.type] m.content`;
const tokens = Math.ceil(text.length / 4);
if (totalTokens + tokens > maxTokens) break;
parts.unshift(text);
totalTokens += tokens;
}
return `## 记忆上下文\n\nparts.join('\n')`;
}
private async embed(text: string): Promise<number[]> {
// 调用 embedding API
const response = await fetch('https://api.openai.com/v1/embeddings', {
method: 'POST',
headers: { 'Authorization': `Bearer process.env.OPENAI_API_KEY` },
body: JSON.stringify({ model: 'text-embedding-3-small', input: text })
});
const data = await response.json();
return data.data[0].embedding;
}
private mergeSimilar(memories: MemoryEntry[]): MemoryEntry[] {
const merged: MemoryEntry[] = [];
for (const m of memories) {
const similar = merged.find(g =>
g.type === m.type &&
this.similarity(g.content, m.content) > 0.8
);
if (similar) {
// 合并,保留最新的时间戳和最高的importance
similar.importance = Math.max(similar.importance, m.importance);
similar.timestamp = Math.max(similar.timestamp, m.timestamp);
} else {
merged.push(m);
}
}
return merged;
}
private similarity(a: string, b: string): number {
const setA = new Set(a.toLowerCase());
const setB = new Set(b.toLowerCase());
const intersection = new Set([...setA].filter(x => setB.has(x)));
const union = new Set([...setA, ...setB]);
return intersection.size / union.size;
}
}
```
### 2. 使用示例
```typescript
// 初始化
const memory = new AgentMemory(sessionId, './memory.db');
// 对话时注入记忆
async function chat(message: string) {
const context = await memory.buildContext(message);
const response = await openai.chat.completions.create({
model: 'gpt-4',
messages: [
{ role: 'system', content: '你是助手的记忆系统使用指南。' },
{ role: 'system', content: context },
{ role: 'user', content: message }
]
});
const answer = response.choices[0].message.content;
// 自动存储重要信息
if (containsActionableInfo(message, answer)) {
await memory.add({
type: 'semantic',
content: extractKeyInfo(answer),
timestamp: Date.now(),
importance: 7,
tags: ['用户问答'],
metadata: { source: 'chat' }
});
}
return answer;
}
// 定期压缩
setInterval(() => memory.consolidate(), 30 * 60 * 1000); // 每30分钟
setInterval(() => memory.forget(), 24 * 60 * 60 * 1000); // 每天
```
### 3. 重要性评估
```typescript
function evaluateImportance(text: string, context: string): number {
let score = 5; // 基础分
// 明确的重要词
const importantKeywords = [
'记住', '重要', '必须', '关键', '别忘了', '优先',
'deadline', '截止', '紧急', '用户偏好', '账号', '密码'
];
for (const kw of importantKeywords) {
if (text.includes(kw)) score += 1;
}
// 用户明确要求记住
if (text.match(/记住|remember|save|存储/)) score += 3;
// 重复出现
if (context.includes(text)) score += 2;
// 限制范围
return Math.min(10, Math.max(0, score));
}
```
## 存储策略
| 类型 | 存储 | 容量 | TTL |
|------|------|------|-----|
| 工作记忆 | 上下文窗口 | 128K tokens | 会话内 |
| 短期记忆 | Redis/Memory | 100条 | 24小时 |
| 长期记忆 | SQLite | 无限制 | 永久 |
| 向量记忆 | Chroma/Milvus | 按需 | 定期清理 |
## 最佳实践
1. **自动摘要**:长记忆定期压缩成摘要
2. **分层索引**:按类型、时间、重要性多维索引
3. **增量更新**:避免重复存储相似内容
4. **隐私保护**:敏感信息加密存储
5. **容量管理**:设置token预算,定期清理
FILE:skill.json
{
"name": "agent-memory-system",
"version": "1.0.0",
"description": "Agent Memory System - Hierarchical memory architecture for AI agents",
"author": "SKY-lv",
"license": "MIT",
"keywords": [
"ai-agent",
"memory",
"llm",
"openclaw",
"openclaw",
"skill"
],
"repository": "https://github.com/SKY-lv/agent-memory-system",
"main": "SKILL.md"
}Agent发现探索助手。搜索、评估、集成公开Agent。使用场景:(1) 搜索可用Agent,(2) 评估Agent能力,(3) 集成方案设计,(4) 成本效益分析。
---
name: agent-discovery
description: "Agent发现探索助手。搜索、评估、集成公开Agent。使用场景:(1) 搜索可用Agent,(2) 评估Agent能力,(3) 集成方案设计,(4) 成本效益分析。"
metadata: {"openclaw": {"emoji": "🔎"}}
---
# Agent Discovery — Agent发现助手
## 功能说明
帮助发现和评估可用的Agent系统。
## 使用方法
### 1. 能力搜索
```
用户: 找一下有哪些免费的代码审查Agent?
```
### 2. 评估对比
```
用户: 比较一下Claude和GPT的Agent能力
```
### 3. 集成方案
```
用户: 如何把这个Agent集成到我们的系统?
```
### 4. 成本分析
```
用户: 使用这个Agent的成本是多少?
```安全测试Agent。漏洞扫描、渗透测试、代码审计、安全加固。触发词:安全、渗透、漏洞、xss、sql注入、csrf、扫描、审计。
---
name: openclaw-security-testing-agent
description: "安全测试Agent。漏洞扫描、渗透测试、代码审计、安全加固。触发词:安全、渗透、漏洞、xss、sql注入、csrf、扫描、审计。"
metadata: {"openclaw": {"emoji": "🔒"}}
---
# Security Testing Agent
## 功能说明
AI驱动的安全测试和漏洞扫描助手。
## 安全测试类型
| 类型 | 工具 | 覆盖 |
|------|------|------|
| 静态扫描 | SonarQube, Semgrep, CodeQL | SAST |
| 动态扫描 | OWASP ZAP, BurpSuite | DAST |
| 依赖扫描 | Snyk, Dependabot, npm audit | SCA |
| 秘钥扫描 | TruffleHog, git-secrets | 秘钥泄露 |
| 容器扫描 | Trivy, Clair | 镜像安全 |
## 1. 漏洞扫描框架
```python
from dataclasses import dataclass
from typing import Optional
import requests
@dataclass
class Vulnerability:
id: str
severity: str # critical, high, medium, low
title: str
description: str
affected: str
remediation: str
cwe: Optional[str] = None
cvss: Optional[float] = None
class VulnerabilityScanner:
def __init__(self, target_url: str, api_key: Optional[str] = None):
self.target = target_url.rstrip('/')
self.session = requests.Session()
if api_key:
self.session.headers['Authorization'] = f'Bearer {api_key}'
# === SQL注入检测 ===
def test_sql_injection(self) -> list[Vulnerability]:
vulns = []
test_payloads = [
"' OR '1'='1",
"' OR 1=1--",
"'; DROP TABLE users--",
"1' AND '1'='1",
"admin'--",
"' UNION SELECT NULL--",
]
# 测试URL参数
params_to_test = ['id', 'user', 'page', 'q', 'search', 'query', 'id', 'cat', 'category']
for param in params_to_test:
url = f"{self.target}/?{param}=1"
for payload in test_payloads:
test_url = f"{self.target}/?{param}={payload}"
try:
resp = self.session.get(test_url, timeout=10)
# 检测SQL错误
sql_errors = [
'mysql_fetch', 'mysqli_', 'sqlite_', 'postgresql',
'ORA-', 'Microsoft SQL Server', 'ODBC',
'SQLite', 'Syntax error', 'Warning: pg_'
]
for error in sql_errors:
if error.lower() in resp.text.lower():
vulns.append(Vulnerability(
id='SQLI-001',
severity='critical',
title=f'SQL Injection - {param}',
description=f'Parameter {param} is vulnerable to SQL injection',
affected=f'{self.target}/?{param}={payload}',
remediation='Use parameterized queries, ORM, or prepared statements',
cwe='CWE-89'
))
break
# 时间盲注检测
if 'SLEEP' in payload or 'BENCHMARK' in payload:
import time
start = time.time()
self.session.get(test_url, timeout=30)
elapsed = time.time() - start
if elapsed > 10:
vulns.append(Vulnerability(
id='SQLI-002',
severity='critical',
title=f'Blind SQL Injection - {param}',
description='Time-based blind SQL injection detected',
affected=f'{self.target}/?{param}={payload}',
remediation='Same as SQL injection',
cwe='CWE-89'
))
except requests.RequestException:
continue
return vulns
# === XSS检测 ===
def test_xss(self) -> list[Vulnerability]:
vulns = []
xss_payloads = [
'<script>alert(1)</script>',
'<img src=x onerror=alert(1)>',
'<svg onload=alert(1)>',
'"><script>alert(1)</script>',
"javascript:alert(1)",
'<body onload=alert(1)>',
]
# 测试输入点
test_points = [
('GET', '/search?q=test'),
('POST', '/contact', {'name': 'test', 'email': '[email protected]', 'message': 'test'}),
]
for method, path, *params in test_points:
for payload in xss_payloads:
try:
if method == 'GET':
test_path = path.replace('test', payload)
resp = self.session.get(f"{self.target}{test_path}", timeout=10)
else:
data = params[0] if params else {}
resp = self.session.post(f"{self.target}{path}", data=data, timeout=10)
# 检查payload是否反射
if payload in resp.text:
vulns.append(Vulnerability(
id='XSS-001',
severity='high',
title=f'Cross-Site Scripting in {path}',
description=f'User input is reflected without sanitization',
affected=f'{self.target}{path}',
remediation='Escape HTML entities, use Content-Security-Policy',
cwe='CWE-79'
))
break
# 检查存储型XSS
if 'stored' in path or 'comment' in path or 'post' in path:
check_resp = self.session.get(f"{self.target}{path}")
if payload in check_resp.text:
vulns.append(Vulnerability(
id='XSS-002',
severity='critical',
title=f'Stored XSS in {path}',
description='Malicious script is stored and executed for all users',
affected=f'{self.target}{path}',
remediation='Strict input validation and output encoding',
cwe='CWE-79'
))
except requests.RequestException:
continue
return vulns
# === CSRF检测 ===
def test_csrf(self) -> list[Vulnerability]:
vulns = []
# 查找敏感表单
sensitive_forms = [
'/profile/update', '/password/change', '/admin/settings',
'/api/user/delete', '/payment/submit', '/settings/security'
]
for path in sensitive_forms:
try:
resp = self.session.get(f"{self.target}{path}", timeout=10)
# 检查是否有CSRF token
csrf_patterns = [
'csrf', 'xsrf', '_token', 'token', 'nonce', 'csrf_token'
]
has_csrf_protection = any(p in resp.text.lower() for p in csrf_patterns)
if not has_csrf_protection:
vulns.append(Vulnerability(
id='CSRF-001',
severity='medium',
title=f'CSRF Vulnerability in {path}',
description='Sensitive action lacks CSRF protection',
affected=f'{self.target}{path}',
remediation='Implement CSRF tokens or SameSite cookies',
cwe='CWE-352'
))
except requests.RequestException:
continue
return vulns
# === 头部安全检测 ===
def check_security_headers(self) -> list[Vulnerability]:
vulns = []
expected_headers = {
'Strict-Transport-Security': 'Enforce HTTPS',
'Content-Security-Policy': 'Prevent XSS/injection',
'X-Content-Type-Options': 'Prevent MIME sniffing',
'X-Frame-Options': 'Prevent clickjacking',
'X-XSS-Protection': 'Legacy XSS filter (deprecated)',
'Referrer-Policy': 'Control referrer information',
'Permissions-Policy': 'Control browser features',
}
resp = self.session.get(self.target, timeout=10)
for header, description in expected_headers.items():
if header not in resp.headers:
severity = 'medium' if 'Content-Security-Policy' in header else 'low'
vulns.append(Vulnerability(
id='HEADER-001',
severity=severity,
title=f'Missing {header}',
description=description,
affected=self.target,
remediation=f'Add {header} header',
cwe=None
))
return vulns
# === 完整扫描 ===
def full_scan(self) -> dict:
print(f"Scanning {self.target}...")
results = {
'target': self.target,
'vulnerabilities': [],
'summary': {'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
}
# 执行所有测试
all_vulns = []
all_vulns.extend(self.test_sql_injection())
all_vulns.extend(self.test_xss())
all_vulns.extend(self.test_csrf())
all_vulns.extend(self.check_security_headers())
# 去重
seen = set()
for v in all_vulns:
key = f"{v.id}-{v.title}"
if key not in seen:
seen.add(key)
results['vulnerabilities'].append(v)
results['summary'][v.severity] += 1
return results
```
## 2. 依赖安全扫描
```python
import json
import subprocess
from packaging import version
class DependencyScanner:
def __init__(self):
self.vulnerabilities_db = self.load_nvd()
def scan_npm(self, package_json_path: str = 'package.json') -> list:
"""扫描npm依赖"""
with open(package_json_path) as f:
pkg = json.load(f)
all_deps = {**pkg.get('dependencies', {}), **pkg.get('devDependencies', {})}
results = []
# npm audit
try:
result = subprocess.run(
['npm', 'audit', '--json'],
capture_output=True,
text=True,
cwd=os.path.dirname(package_json_path)
)
audit_data = json.loads(result.stdout)
for vuln in audit_data.get('vulnerabilities', {}).values():
results.append({
'package': vuln['name'],
'severity': vuln['severity'],
'url': vuln.get('via', [{}])[0].get('url', ''),
'fix': vuln.get('fixAvailable')
})
except Exception as e:
print(f"npm audit failed: {e}")
return results
def scan_python(self, requirements_path: str = 'requirements.txt') -> list:
"""扫描Python依赖"""
results = []
try:
result = subprocess.run(
['pip', 'list', '--format=freeze'],
capture_output=True,
text=True
)
packages = {}
for line in result.stdout.split('\n'):
if '==' in line:
pkg, ver = line.strip().split('==')
packages[pkg.lower()] = ver
# 使用safety检查已知漏洞
safety_result = subprocess.run(
['safety', 'check', '--json'],
capture_output=True,
text=True,
input=result.stdout
)
if safety_result.returncode != 0:
try:
for vuln in json.loads(safety_result.stdout):
results.append(vuln)
except:
pass
except Exception as e:
print(f"Python scan failed: {e}")
return results
```
## 3. 秘钥扫描
```python
class SecretScanner:
"""敏感信息扫描"""
PATTERNS = {
'AWS Access Key': r'AKIA[0-9A-Z]{16}',
'AWS Secret Key': r'[A-Za-z0-9/+=]{40}',
'GitHub Token': r'ghp_[a-zA-Z0-9]{36}',
'GitHub OAuth': r'gho_[a-zA-Z0-9]{36}',
'Private Key': r'-----BEGIN (RSA|DSA|EC|OPENSSH) PRIVATE KEY-----',
'Generic API Key': r'[a-zA-Z0-9]{32,64}',
'Slack Token': r'xox[baprs]-[0-9]{10,13}-[0-9]{10,13}[a-zA-Z0-9-]*',
'Stripe Key': r'sk_live_[0-9a-zA-Z]{24}',
'Database URL': r'(mysql|postgres|mongodb)://[^:]+:[^@]+@',
'JWT': r'eyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_]+',
'Slack Webhook': r'https://hooks\.slack\.com/services/T[a-zA-Z0-9]+/B[a-zA-Z0-9]+/',
'SendGrid Key': r'SG\.[a-zA-Z0-9_-]{22}\.[a-zA-Z0-9_-]{43}',
'Google API': r'AIza[0-9A-Za-z_-]{35}',
'OpenAI Key': r'sk-[a-zA-Z0-9]{48}',
'Generic Secret': r'(?i)(password|secret|api_key|apikey|auth_token|access_token)\s*[=:]\s*[\'"]?[\w-]{8,}',
}
def scan_file(self, filepath: str) -> list[dict]:
import re
matches = []
with open(filepath, 'r', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
for pattern_name, pattern in self.PATTERNS.items():
for i, line in enumerate(lines, 1):
matches_found = re.finditer(pattern, line)
for match in matches_found:
matches.append({
'file': filepath,
'line': i,
'type': pattern_name,
'match': match.group(),
'context': line.strip()[:200]
})
return matches
def scan_directory(self, root_path: str, extensions=None) -> list:
"""扫描整个目录"""
import os
matches = []
extensions = extensions or ['.js', '.ts', '.py', '.java', '.go', '.rb', '.php', '.cs', '.sh', '.env', '.json', '.yaml', '.yml', '.toml', '.xml']
for dirpath, dirnames, filenames in os.walk(root_path):
# 跳过node_modules等
dirnames[:] = [d for d in dirnames if d not in ['node_modules', '.git', '__pycache__', 'venv', '.venv']]
for filename in filenames:
if any(filename.endswith(ext) for ext in extensions):
filepath = os.path.join(dirpath, filename)
matches.extend(self.scan_file(filepath))
return matches
```
## 4. 安全加固建议
```python
class SecurityHardening:
"""安全加固清单"""
@staticmethod
def generate_report(target: str, scan_results: dict) -> str:
"""生成安全报告"""
summary = scan_results['summary']
vulns = scan_results['vulnerabilities']
report = f"""
# 安全扫描报告
## 目标
{scan_results['target']}
## 概览
- 🔴 严重: {summary['critical']}
- 🟠 高危: {summary['high']}
- 🟡 中危: {summary['medium']}
- 🟢 低危: {summary['low']}
## 发现的问题
"""
for v in vulns:
icon = {'critical': '🔴', 'high': '🟠', 'medium': '🟡', 'low': '🟢'}.get(v.severity, '⚪')
report += f"""
### {icon} {v.title}
- **严重程度**: {v.severity.upper()}
- **描述**: {v.description}
- **影响**: {v.affected}
- **修复建议**: {v.remediation}
{f'- **CWE**: {v.cwe}' if v.cwe else ''}
"""
return report
# === 加固清单 ===
RECOMMENDATIONS = {
'sql_injection': [
'使用参数化查询或ORM',
'启用WAF',
'最小权限原则',
'定期代码审计'
],
'xss': [
'输出编码',
'内容安全策略 (CSP)',
'HTTPOnly Cookie',
'输入验证'
],
'csrf': [
'CSRF Token',
'SameSite Cookie',
'双重提交Cookie',
'验证Referer'
],
'authentication': [
'强密码策略',
'多因素认证 (MFA)',
'账户锁定策略',
'安全的会话管理'
],
'data_protection': [
'传输加密 (TLS 1.3)',
'静态加密',
'秘钥轮换',
'数据脱敏'
]
}
```
## 5. OWASP Top 10 检查清单
| 类别 | 检测项 | 工具 |
|------|--------|------|
| A01 Broken Access | 水平/垂直越权测试 | BurpSuite |
| A02 Cryptographic Failures | 弱加密/传输/存储 | custom |
| A03 Injection | SQLi/XSS/命令注入 | ZAP, sqlmap |
| A04 Insecure Design | 业务逻辑漏洞 | manual |
| A05 Security Misconfig | 默认口令/错误配置 | nmap, nikto |
| A06 Vulnerable Components | 过时依赖 | Snyk, Dependabot |
| A07 Auth Failures | 弱认证/会话劫持 | BurpSuite |
| A08 Data Integrity | CI/CD注入/篡改 | Semgrep |
| A09 Logging Failures | 审计日志缺失 | manual |
| A10 SSRF | 服务端请求伪造 | custom |
FILE:skill.json
{
"name": "openclaw-security-testing-agent",
"version": "1.0.0",
"description": "Security Testing Agent - Vulnerability scanning, penetration testing",
"author": "SKY-lv",
"license": "MIT",
"keywords": [
"security",
"penetration-testing",
"vulnerability",
"openclaw",
"skill"
],
"repository": "https://github.com/SKY-lv/security-testing-agent",
"main": "SKILL.md"
}Web search via self-hosted SearxNG. Aggregates Google, Bing, DuckDuckGo, Brave. Returns title/url/snippet. Zero API key required.
--- name: openclaw-ask-search description: "Web search via self-hosted SearxNG. Aggregates Google, Bing, DuckDuckGo, Brave. Returns title/url/snippet. Zero API key required." --- # ask-search Web search powered by [SearxNG](https://github.com/searxng/searxng). Aggregates multiple search engines, zero API key, full privacy. ## Usage ```bash ask-search "your query" # top 10 results ask-search "query" --num 5 # limit results ask-search "AI news" --categories news # news only ask-search "query" --lang zh-CN # Chinese results ask-search "query" --urls-only # URL list (pipe to web_fetch) ask-search "query" --json # raw JSON ``` ## Agent Workflow 1. Run `ask-search "topic"` to get candidates 2. Check snippet — if enough, answer directly 3. If snippet truncated, use `web_fetch` on the URL for full content ## Parameters | Flag | Short | Description | |------|-------|-------------| | `--num N` | `-n` | Max results (default 10) | | `--engines` | `-e` | google,bing,duckduckgo,brave | | `--lang` | `-l` | zh-CN, en, ja, ko | | `--categories` | `-c` | general,news,images,science | | `--json` | `-j` | Raw JSON output | | `--urls-only` | `-u` | URLs only | ## Setup Requires SearxNG running locally. Set `SEARXNG_URL` if not on default port 8080. FILE:install.sh #!/bin/bash # ask-search install script # Installs ask-search as a global command + optionally sets up SearxNG set -e SCRIPT_DIR="$(cd "$(dirname "BASH_SOURCE[0]")" && pwd)" INSTALL_BIN="-/usr/local/bin" echo "=== ask-search installer ===" # Check dependencies if ! command -v python3 &>/dev/null; then echo "ERROR: python3 is required" exit 1 fi if ! command -v curl &>/dev/null; then echo "ERROR: curl is required" exit 1 fi # Install global command echo "Installing ask-search to $INSTALL_BIN ..." # Create wrapper with hardcoded path (single-quotes prevent $@ expansion in heredoc) WRAPPER=$(cat << 'WRAPPEREOF' #!/bin/bash exec python3 "COREPATH" "$@" WRAPPEREOF ) # Replace placeholder with actual path WRAPPER="WRAPPER/COREPATH/$SCRIPT_DIR/scripts/core.py" TMPFILE=$(mktemp) echo "$WRAPPER" > "$TMPFILE" install -m 755 "$TMPFILE" "$INSTALL_BIN/ask-search" rm -f "$TMPFILE" echo "✓ ask-search installed to $INSTALL_BIN/ask-search" # Test echo "" echo "Testing connection to SearxNG at -http://localhost:8080 ..." if python3 "$SCRIPT_DIR/scripts/core.py" "test" --num 1 &>/dev/null; then echo "✓ SearxNG reachable — ask-search is ready!" else echo "⚠ SearxNG not reachable at -http://localhost:8080" echo " Set SEARXNG_URL env var or deploy SearxNG first." echo " See README for SearxNG setup instructions." fi echo "" echo "Usage: ask-search \"your query\"" FILE:mcp/server.py #!/usr/bin/env python3 """ ask-search MCP Server — for Antigravity / Claude Code MCP integration Install: pip install mcp Add to your MCP config: { "mcpServers": { "ask-search": { "command": "python3", "args": ["/path/to/ask-search/mcp/server.py"], "env": {"SEARXNG_URL": "http://localhost:8080"} } } } """ import os, sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts")) try: from mcp.server.fastmcp import FastMCP except ImportError: print("Error: mcp package not installed. Run: pip install mcp", file=sys.stderr) sys.exit(1) from core import searxng_search, search, fmt_results mcp = FastMCP("ask-search") @mcp.tool() def web_search(query: str, num_results: int = 10) -> str: """ Search the web via SearxNG. Returns JSON array of results with title, url, content. Aggregates Google, Bing, DuckDuckGo, Brave and more. Args: query: Search query string num_results: Number of results to return (default 10, max 20) """ return searxng_search(query, min(num_results, 20)) @mcp.tool() def web_search_news(query: str, num_results: int = 10) -> str: """ Search recent news via SearxNG news category. Args: query: News search query num_results: Number of results (default 10) """ try: results = search(query, num_results, categories="news") import json return json.dumps({"query": query, "category": "news", "results": results}, ensure_ascii=False, indent=2) except Exception as e: import json return json.dumps({"error": str(e)}) if __name__ == "__main__": mcp.run() FILE:README.md <div align="center"> # 🔍 ask-search **Self-hosted web search for AI agents — zero API key, full privacy** [](LICENSE) [](https://github.com/openclaw/openclaw) [](https://python.org) *Works in OpenClaw · Claude Code · Antigravity · Any CLI* </div> --- ## 😡 The Problem Your AI agent wants to search the web, but: - Brave Search API: $3/1000 queries, rate limited - Google Custom Search: $5/1000, daily caps - Bing API: paid, complex setup - Built-in web search: sends queries to third-party servers **You just want your local agent to search Google without paying or leaking queries.** ## ✅ The Solution `ask-search` wraps [SearxNG](https://github.com/searxng/searxng) — a self-hosted meta search engine that aggregates Google, Bing, DuckDuckGo, Brave and 70+ more sources. One command, all results, zero cost. ```bash ask-search "Claude Code vs Cursor 2026" ``` ``` [1] Claude Code Is Eating Cursor's Lunch https://techcrunch.com/2026/... After Anthropic launched Claude Code with agent mode... [google,brave] [2] Why I switched from Cursor to Claude Code https://reddit.com/r/LocalLLaMA/... ... ``` ## 📊 Compatibility | Environment | Integration | Status | |-------------|-------------|--------| | **OpenClaw** | CLI Skill (`SKILL.md`) | ✅ | | **Claude Code** | CLI command | ✅ | | **Antigravity** | MCP Server | ✅ | | Any shell | `ask-search` CLI | ✅ | ## 🚀 Quick Start ### 30-second version (if SearxNG already running) ```bash git clone https://github.com/ythx-101/ask-search cd ask-search bash install.sh ask-search "hello world" ``` ### Full setup with Docker Compose (Recommended) ```bash # Clone and navigate to the project git clone https://github.com/ythx-101/ask-search cd ask-search # Generate a secure secret key python3 -c "import secrets; print('SEARXNG_SECRET=' + secrets.token_hex(32))" # Copy .env.example to .env and set your secret key cp searxng/.env.example searxng/.env # Edit .env with your generated secret # Start SearxNG cd searxng && docker-compose up -d # Configure ask-search with your secret export SEARXNG_SECRET="your-generated-secret" ask-search "hello world" ``` ### Full setup (SearxNG + ask-search) **Step 1: Deploy SearxNG** ```bash # Docker (recommended) docker run -d --name searxng \ -p 127.0.0.1:8080:8080 \ -e SEARXNG_SECRET_KEY=your-secret-key \ searxng/searxng # Or Docker Compose — see searxng/docker-compose.yml in this repo ``` **Step 2: Enable JSON output** Edit SearxNG `settings.yml`: ```yaml search: formats: - html - json ``` **Step 3: Install ask-search** ```bash bash install.sh ``` **Step 4: Use it** ```bash ask-search "your query" ``` ## 🔧 Usage ```bash ask-search "query" # top 10 results ask-search "query" --num 5 # limit results ask-search "AI news" --categories news # news only ask-search "query" --lang zh-CN # language filter ask-search "query" --urls-only # URLs only (pipe to web_fetch) ask-search "query" --json # raw JSON ask-search "query" -e google,brave # specific engines ``` ## ⚙️ Configuration | Variable | Default | Description | |----------|---------|-------------| | `SEARXNG_URL` | `http://localhost:8080` | SearxNG endpoint | ```bash export SEARXNG_URL="http://localhost:8080" ask-search "query" ``` ## 🤖 MCP Integration (Antigravity / Claude Code) ```json { "mcpServers": { "ask-search": { "command": "python3", "args": ["/path/to/ask-search/mcp/server.py"], "env": { "SEARXNG_URL": "http://localhost:8080" } } } } ``` Requires: `pip install mcp` ## 🦞 OpenClaw Skill Copy `SKILL.md` to your OpenClaw skills directory, or: ```bash # OpenClaw skill loader skill-add https://github.com/ythx-101/ask-search ``` Then in OpenClaw: ``` ask-search "latest news about X" ``` ## 🤝 Agent Workflow ```bash # 1. Search ask-search "React Server Components performance 2026" --num 10 # 2. Got a promising URL? Deep-dive: # Pass the URL to web_fetch / curl for full content # 3. News mode ask-search "GPT-5 release" --categories news --lang en ``` ## 📁 Structure ``` ask-search/ ├── scripts/ │ └── core.py # Main logic, CLI entry point ├── mcp/ │ └── server.py # MCP server for AG/CC integration ├── install.sh # Installer ├── SKILL.md # OpenClaw skill descriptor └── README.md ``` ## ⚠️ SearxNG Setup Notes - Must enable `json` in `search.formats` in `settings.yml` - Bot detection may block requests from some IPs — add your server IP to `pass_ip` in `limiter.toml` - Bind to `127.0.0.1` for security unless you need remote access ## 🌐 Deep-Dive Limitations & Workarounds `ask-search` returns URLs + snippets from search engine indexes. When your agent needs the **full page content** (deep-dive via `curl` / `web_fetch`), some sites will block depending on your server's network environment: ### What works out of the box | Site | Search (SearxNG) | Deep-dive (curl/fetch) | Why | |------|:-:|:-:|-----| | Most sites | ✅ | ✅ | No aggressive anti-bot | | Reddit | ✅ | ❌ VPS IP blocked | Reddit blocks datacenter IPs | | Zhihu (知乎) | ✅ | ❌ Login wall + fingerprint | Requires browser JS + login | | Medium | ✅ | ⚠️ Paywall | Partial content only | **Key insight**: Search always works because SearxNG queries search engines (Google, Brave, etc.), not the target sites directly. The search engines have already indexed the content. The problem only appears when your agent tries to fetch the full page. ### Solution 1: SOCKS proxy via residential IP If you have a machine on a residential network (home server, laptop, etc.), create an SSH SOCKS tunnel: ```bash # On your VPS/server: ssh -f -N -D 127.0.0.1:1082 user@your-home-machine # Then fetch through the proxy: curl -x socks5h://127.0.0.1:1082 "https://reddit.com/r/example/comments/xxx.json" ``` For Reddit specifically, append `.json` to any post URL for structured data: ```bash # Returns full post + all comments as JSON curl -x socks5h://127.0.0.1:1082 \ "https://www.reddit.com/r/LocalLLaMA/comments/xxxxx/post_title.json" ``` To persist the tunnel as a systemd service: ```ini # /etc/systemd/system/socks-proxy.service [Unit] Description=SSH SOCKS Proxy for web scraping After=network.target [Service] Type=simple ExecStart=/usr/bin/ssh -N -D 127.0.0.1:1082 -o ServerAliveInterval=30 -o ServerAliveCountMax=3 -o ExitOnForwardFailure=yes user@your-home-machine Restart=always RestartSec=10 [Install] WantedBy=multi-user.target ``` ### Solution 2: Headless browser for JS-heavy sites Sites like Zhihu require full browser rendering. Use Playwright with the SOCKS proxy: ```python from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch( headless=True, proxy={"server": "socks5://127.0.0.1:1082"} ) page = browser.new_page() page.goto("https://example.com/article") content = page.inner_text("article") ``` > **Note**: Some sites (Zhihu, etc.) detect headless browsers even through proxies. For these, you may need a real browser session with login cookies, or delegate the fetch to an agent running on a local machine (e.g., Claude Code on a Mac). ### Solution 3: Leverage archive caches When direct access fails, try cached versions: ```bash # Archive.org (works surprisingly often for Reddit) curl "https://web.archive.org/web/2026/https://reddit.com/r/example/comments/xxx" # Google Cache (may redirect — not always reliable) curl "https://webcache.googleusercontent.com/search?q=cache:example.com/page" ``` ### Solution 4: Multi-node agent architecture If you run agents on multiple machines (e.g., OpenClaw on VPS + Claude Code on local Mac): ``` VPS agent: ask-search "query" → gets URLs + snippets ↓ Local agent: web_fetch(url) → full content (residential IP, no blocks) ↓ VPS agent: receives full text, analyzes, responds ``` This is the most robust approach — search on your server, deep-dive from a local machine where anti-bot measures don't apply. ### TL;DR | Problem | Fix | |---------|-----| | Reddit blocks your IP | SSH SOCKS proxy + `.json` API | | Site needs JS rendering | Playwright + proxy | | Site needs login (Zhihu) | Delegate to local agent or use logged-in browser | | Everything blocked | Fall back to search snippets + archive caches | ## 🤝 Contributing Issues and PRs welcome. ## Acknowledgements Inspired by [Perplexica](https://github.com/ItzCrazyKns/Perplexica) — the idea of using self-hosted SearxNG as a private search backend comes from there. ask-search strips it down to a single CLI command for agent use. ## 📄 License [MIT](LICENSE) FILE:scripts/core.py #!/usr/bin/env python3 """ ask-search v1.0.0 — Cross-environment SearxNG search skill Works in: OpenClaw (CLI) | Claude Code (CLI) | Antigravity (MCP) Usage: ask-search "query" # top 10 results ask-search "query" --num 5 # limit results ask-search "AI news" --categories news ask-search "query" --lang zh-CN ask-search "query" --urls-only # URL list only (pipe to web_fetch) ask-search "query" --json # raw JSON output Environment: SEARXNG_URL SearxNG endpoint (default: http://localhost:8080) """ import sys, json, urllib.parse, argparse, os, subprocess VERSION = "1.0.0" def _search_url(): base = os.environ.get("SEARXNG_URL", "http://localhost:8080") return base.rstrip("/") + "/search" def search(query, num=10, engines=None, lang=None, categories=None): params = {"q": query, "format": "json", "pageno": 1} if engines: params["engines"] = engines if lang: params["language"] = lang if categories: params["categories"] = categories url = _search_url() + "?" + urllib.parse.urlencode(params) # Use subprocess curl to avoid urllib HTTP/1.0 compatibility issues result = subprocess.run( ["curl", "-s", "--max-time", "15", url], capture_output=True, text=True, timeout=20 ) if result.returncode != 0: raise RuntimeError(f"curl failed: {result.stderr[:200]}") data = json.loads(result.stdout) return data.get("results", [])[:num] def fmt_results(results, urls_only=False): if urls_only: return "\n".join(r.get("url", "") for r in results) lines = [] for i, r in enumerate(results, 1): title = r.get("title", "").strip() url = r.get("url", "") content = r.get("content", "").strip() engines = ",".join(r.get("engines", [])) lines.append(f"[{i}] {title}") lines.append(f" {url}") if content: lines.append(f" {content[:200]}") if engines: lines.append(f" [{engines}]") lines.append("") return "\n".join(lines).strip() def searxng_search(query, num=15): """MCP/legacy interface — returns JSON string""" try: results = search(query, num) return json.dumps({"query": query, "results": results}, ensure_ascii=False, indent=2) except Exception as e: return json.dumps({"error": str(e)}) def main(): p = argparse.ArgumentParser(description="SearxNG search (cross-environment)") p.add_argument("query") p.add_argument("--num", "-n", type=int, default=10) p.add_argument("--engines", "-e", help="google,bing,duckduckgo,brave") p.add_argument("--lang", "-l", help="zh-CN, en, ja, ko") p.add_argument("--categories", "-c", help="general,news,images,science") p.add_argument("--json", "-j", action="store_true") p.add_argument("--urls-only", "-u", action="store_true") p.add_argument("--version", "-V", action="version", version=f"ask-search {VERSION}") args = p.parse_args() try: results = search(args.query, args.num, args.engines, args.lang, args.categories) except Exception as e: print(json.dumps({"error": str(e)})); sys.exit(1) if not results: print(json.dumps({"error": "No results", "query": args.query})); sys.exit(1) if getattr(args, "json"): print(json.dumps({"query": args.query, "results": results}, ensure_ascii=False, indent=2)) else: print(fmt_results(results, args.urls_only)) if __name__ == "__main__": main() FILE:searxng/docker-compose.yml # Docker Compose for SearxNG # Usage: cd searxng && docker-compose up -d # Then configure ask-search: ask-search --searxng-url http://localhost:8080 version: '3.8' services: searxng: image: searxng/searxng:latest container_name: searxng ports: - "127.0.0.1:8080:8080" environment: # Generate a secure secret key: python3 -c "import secrets; print(secrets.token_hex(32))" - SEARXNG_SECRET=-your-secret-key-change-in-production - SEARXNG_BASE_URL=http://localhost:8080 # Enable JSON output for API access - SEARXNG_SETTINGS_SOURCE=docker volumes: - ./searxng.yml:/etc/searxng/settings.yml:ro restart: unless-stopped healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8080"] interval: 30s timeout: 10s retries: 3 # Usage: # 1. Generate a secret key: python3 -c "import secrets; print(secrets.token_hex(32))" # 2. Copy .env.example to .env and set SEARXNG_SECRET # 3. Run: docker-compose up -d # 4. Access SearxNG UI at http://localhost:8080 # 5. Configure ask-search: ask-search --searxng-url http://localhost:8080 --searxng-secret your-secret FILE:searxng/searxng.yml # Minimal SearxNG settings for Docker # Enables JSON output for ask-search API access general: debug: false instance_name: "ask-search SearxNG" server: secret_key: "&{SEARXNG_SECRET}" bind_address: "0.0.0.0" port: 8080 ui: static_use_hash: true default_theme: simple engines: - name: google engine: google shortcut: google - name: duckduckgo engine: duckduckgo shortcut: ddg - name: brave engine: brave shortcut: brave
快速创建支持自定义路径、方法和响应的模拟 REST API,用于开发和测试环境。
# Mock API Skill
快速创建模拟 API 接口,用于开发和测试。
## 功能特性
- 支持 GET/POST/PUT/DELETE 方法
- 自定义响应状态码和延迟
- 模拟 JSON 响应数据
- 路径参数和查询参数支持
## 使用方法
### 1. 快速启动
```bash
# 启动默认端口 3000
python mock_api.py
# 指定端口
python mock_api.py --port 8080
# 指定自定义配置
python mock_api.py --config my_api.json
```
### 2. 配置示例 (api_config.json)
```json
{
"port": 3000,
"endpoints": [
{
"path": "/api/users",
"method": "GET",
"response": {
"users": [
{"id": 1, "name": "张三", "email": "[email protected]"},
{"id": 2, "name": "李四", "email": "[email protected]"}
]
},
"status": 200,
"delay": 500
},
{
"path": "/api/users/:id",
"method": "GET",
"response": {"id": 1, "name": "张三", "email": "[email protected]"},
"status": 200
},
{
"path": "/api/login",
"method": "POST",
"response": {"token": "abc123", "expiresIn": 3600},
"status": 200
}
]
}
```
### 3. 测试接口
```bash
# GET 请求
curl http://localhost:3000/api/users
# POST 请求
curl -X POST http://localhost:3000/api/login -H "Content-Type: application/json" -d '{"username":"test","password":"123456"}'
# 带延迟的响应
curl http://localhost:3000/api/slow
```
## 内置端点
| 端点 | 方法 | 说明 |
|------|------|------|
| `/mock/status` | GET | 返回服务状态 |
| `/mock/echo` | POST | 回显请求数据 |
| `/mock/delay/:ms` | GET | 延迟响应(毫秒) |
| `/mock/random` | GET | 返回随机数据 |
## 参数说明
- `--port`: 服务端口 (默认 3000)
- `--config`: 配置文件路径
- `--host`: 绑定地址 (默认 0.0.0.0)
- `--cors`: 启用 CORS (默认 true)
FILE:mock_api.py
#!/usr/bin/env python3
"""
Mock API Server - 快速创建模拟 API 接口
"""
import argparse
import json
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import random
import string
DEFAULT_CONFIG = {
"port": 3000,
"host": "0.0.0.0",
"cors": True,
"endpoints": [
{
"path": "/mock/status",
"method": "GET",
"response": {"status": "ok", "timestamp": time.time()},
"status": 200
},
{
"path": "/mock/echo",
"method": "POST",
"response": {"message": "echo", "data": {}},
"status": 200
},
{
"path": "/mock/random",
"method": "GET",
"response": {
"random_int": 0,
"random_string": "",
"random_list": []
},
"status": 200
},
{
"path": "/mock/delay/:ms",
"method": "GET",
"response": {"delayed": True},
"status": 200
},
{
"path": "/api/users",
"method": "GET",
"response": {
"users": [
{"id": 1, "name": "张三", "email": "[email protected]"},
{"id": 2, "name": "李四", "email": "[email protected]"}
]
},
"status": 200
},
{
"path": "/api/users/:id",
"method": "GET",
"response": {"id": 1, "name": "张三", "email": "[email protected]"},
"status": 200
},
{
"path": "/api/login",
"method": "POST",
"response": {"token": "mock_token_" + ''.join(random.choices(string.ascii_lowercase, k=10)), "expiresIn": 3600},
"status": 200
}
]
}
class MockAPIHandler(BaseHTTPRequestHandler):
config = DEFAULT_CONFIG
def log_message(self, format, *args):
print(f"[{self.log_date_time_string()}] {format % args}")
def send_cors_headers(self):
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
def do_OPTIONS(self):
self.send_response(200)
if self.config.get('cors', True):
self.send_cors_headers()
self.end_headers()
def find_endpoint(self, path, method):
for ep in self.config.get('endpoints', []):
ep_path = ep['path']
# 精确匹配
if ep_path == path and ep.get('method', 'GET').upper() == method.upper():
return ep
# 路径参数匹配
if ':' in ep_path:
ep_parts = ep_path.split('/')
req_parts = path.split('/')
if len(ep_parts) == len(req_parts) and ep.get('method', 'GET').upper() == method.upper():
params = {}
match = True
for i, part in enumerate(ep_parts):
if part.startswith(':'):
params[part[1:]] = req_parts[i]
elif part != req_parts[i]:
match = False
break
if match:
ep['_params'] = params
return ep
return None
def do_GET(self):
self.handle_request('GET')
def do_POST(self):
self.handle_request('POST')
def do_PUT(self):
self.handle_request('PUT')
def do_DELETE(self):
self.handle_request('DELETE')
def handle_request(self, method):
parsed = urlparse(self.path)
path = parsed.path
query = parse_qs(parsed.query)
ep = self.find_endpoint(path, method)
if not ep:
self.send_response(404)
self.send_header('Content-Type', 'application/json')
if self.config.get('cors', True):
self.send_cors_headers()
self.end_headers()
self.wfile.write(json.dumps({"error": "Not Found", "path": path}).encode())
return
# 处理延迟
delay = ep.get('delay', 0)
if delay > 0:
time.sleep(delay / 1000)
# 处理动态响应
response = ep.get('response', {}).copy()
# 替换路径参数
if '_params' in ep:
for k, v in ep['_params'].items():
if isinstance(response, dict):
for key in response:
if isinstance(response[key], str):
response[key] = response[key].replace(f':{k}', v)
# 特殊端点处理
if path == '/mock/random':
response['random_int'] = random.randint(1, 1000)
response['random_string'] = ''.join(random.choices(string.ascii_letters, k=10))
response['random_list'] = [random.randint(1, 100) for _ in range(5)]
if path.startswith('/mock/delay/'):
ms = int(path.split('/')[-1])
response = {"delayed_ms": ms, "timestamp": time.time()}
self.send_response(ep.get('status', 200))
self.send_header('Content-Type', 'application/json')
if self.config.get('cors', True):
self.send_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(response, ensure_ascii=False).encode())
def main():
parser = argparse.ArgumentParser(description='Mock API Server')
parser.add_argument('--port', type=int, default=3000, help='服务端口')
parser.add_argument('--host', type=str, default='0.0.0.0', help='绑定地址')
parser.add_argument('--config', type=str, help='配置文件路径')
parser.add_argument('--cors', type=bool, default=True, help='启用CORS')
args = parser.parse_args()
config = DEFAULT_CONFIG.copy()
config['port'] = args.port
config['host'] = args.host
if args.config:
try:
with open(args.config, 'r', encoding='utf-8') as f:
user_config = json.load(f)
config.update(user_config)
except Exception as e:
print(f"加载配置文件失败: {e}")
MockAPIHandler.config = config
server = HTTPServer((config['host'], config['port']), MockAPIHandler)
print(f"🚀 Mock API Server 启动于 http://{config['host']}:{config['port']}")
print(f"📋 内置端点:")
for ep in config.get('endpoints', [])[:5]:
print(f" {ep.get('method', 'GET'):6} {ep['path']}")
print(f" ... 共 {len(config.get('endpoints', []))} 个端点")
print(f"🛑 按 Ctrl+C 停止服务")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n👋 服务已停止")
server.shutdown()
if __name__ == '__main__':
main()