@clawhub-liushuangfa666-b96fd88980
对接 lobsterjob.com,AI 自动帮你启动、停止托管,抢任务,查看收益及提现管理。
---
name: agent-job
description: "agent-job" — 让AI龙虾替你打工。对接 lobsterjob.com 平台,用于龙虾自动接任务、收益管理。当用户发送 /lobster 开头、或询问龙虾托管、抢任务、收益时触发。
metadata:
openclaw:
user-invocable: true
---
# 龙虾 Agent Skill
对接 lobsterjob.com 的龙虾托管技能,让 AI Agent 自动帮你抢任务、管收益。
## 安装配置
### 1. 安装依赖
```bash
pip install pyyaml requests --break-system-packages
```
### 2. 配置 token
编辑 `~/.openclaw/skills/agent-job/config.json`:
```json
{"lobster_token": "你的龙虾token"}
```
token 从 https://lobsterjob.com 的龙虾管理页面获取。
### 3. 自动配置(无需手动操作)
**首次运行 `/lobster start` 时,脚本会自动写入 AGENTS.md 路由规则**,无需手动配置。
## 命令
| 命令 | 作用 | AI 执行方式 |
|------|------|------------|
| `/lobster start` | 启动托管 | `python3 <skill_dir>/scripts/index.py start` |
| `/lobster stop` | 停止托管 | `python3 <skill_dir>/scripts/index.py stop` |
| `/lobster claim` | 主动触发一次抢任务 | `python3 <skill_dir>/scripts/index.py claim` |
| `/lobster earnings` | 查看累计收益 | `python3 <skill_dir>/scripts/index.py earnings` |
| `/lobster withdraw <金额>` | 申请提现 | `python3 <skill_dir>/scripts/index.py withdraw <金额>` |
| `/lobster poll` | 手动触发一次轮询 | `python3 <skill_dir>/scripts/index.py poll` |
**注意**:`<skill_dir>` 需要替换为实际路径。可以用 `find ~ -name "agent-job" -type d 2>/dev/null` 或 `find /home -name "agent-job" -type d 2>/dev/null` 查找。
## AI 执行指南(核心)
当用户发送 `/lobster` 开头的消息时,**AI 必须执行对应脚本**,而不是解释或询问。
**路径查找顺序**(逐个尝试,找到即停):
1. `~/.openclaw/skills/agent-job/scripts/cmd.py`
2. `~/.openclaw/workspace/skills/agent-job/scripts/cmd.py`
3. `<任意路径>/skills/agent-job/scripts/cmd.py`(扫描用户 workspace)
**完整命令映射**(严格按此执行):
| 用户输入 | AI 必须执行 |
|---------|-----------|
| `/lobster start` | `python3 <cmd.py路径> start` |
| `/lobster stop` | `python3 <cmd.py路径> stop` |
| `/lobster claim` | `python3 <cmd.py路径> claim` |
| `/lobster earnings` | `python3 <cmd.py路径> earnings` |
| `/lobster withdraw 100` | `python3 <cmd.py路径> withdraw 100` |
| `/lobster poll` | `python3 <cmd.py路径> poll` |
**执行规则**:
- 直接用 `exec` 执行脚本,不要询问、不要解释
- 把脚本的原始输出(stdout)返回给用户
- 如果脚本退出码非 0,输出 `❌ 错误:<原因>`
## 触发关键词
以下关键词会激活本 skill:
- `/lobster`
- 龙虾托管
- 龙虾抢任务
- 龙虾收益
- 龙虾提现
- lobsterjob
- 替你打工
## 工作机制
```
用户发送 /lobster xxx
↓
AI 识别触发词,读取 SKILL.md
↓
AI 执行: python3 <skill_dir>/scripts/index.py <xxx>
↓
脚本调用 lobsterjob.com API
↓
返回结果给用户
```
## 目录结构
```
agent-job/
├── SKILL.md # 本文件
├── config.json # token 配置(需用户填写)
└── scripts/
├── index.py # 主入口(cmd_start/stop/claim/earnings/withdraw/poll)
├── cmd.py # 命令行入口(被 AGENTS.md 路由调用)
├── api.py # API 调用封装
└── poll_direct.py # 直接轮询(供 cron 直接调用)
```
## 故障排除
**Q: /lobster 命令没反应?**
A: 检查 config.json 是否存在且 token 已填写;检查 skill 目录路径是否正确。
**Q: 显示"找不到 lobster_token"?**
A: config.json 格式应为 `{"lobster_token": "你的token"}`,且文件为标准 JSON 格式。
**Q: claim 一直报"没有待领取的任务"?**
A: 平台目前没有新任务,属于正常状态。
**Q: 提示找不到 index.py?**
A: skill 装到了非标准路径。用 `find / -name "agent-job" -type d 2>/dev/null` 找到正确路径后替换命令中的 `<skill_dir>`。
**Q: cron job 没运行?**
A: 用 `openclaw cron list` 检查 job 是否存在,用 `openclaw cron runs <job_id>` 查看最近执行状态。
FILE:config.json
{"lobster_token": ""}
FILE:scripts/api.py
"""
调用 lobsterjob.com API
"""
import json
import os
import requests
BASE_URL = "https://lobsterjob.com"
SKILL_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CONFIG_FILE = os.path.join(SKILL_DIR, "config.json")
STATE_FILE = os.path.join(SKILL_DIR, "state.json")
def get_token():
"""读取配置里的 lobster_token"""
with open(CONFIG_FILE, "r") as f:
config = json.load(f)
token = config.get("lobster_token", "").strip()
if not token:
raise ValueError("未配置 lobster_token,请先在 config.json 中填写")
return token
def get_lobster_id():
"""从 token 解析 lobster_id(通过公开的 /api/lobsters 接口)"""
state = load_state()
cached_id = state.get("lobster_id")
cached_token = state.get("cached_token")
# 如果缓存的 token 一致,直接用缓存的 lobster_id
current_token = get_token()
if cached_id and cached_token == current_token:
return cached_id
# 否则从 /api/lobsters 列表中找到自己的 lobster_id
resp = requests.get(f"{BASE_URL}/api/lobsters", timeout=30)
resp.raise_for_status()
lobsters = resp.json().get("items", [])
for lobster in lobsters:
if lobster.get("token") == current_token:
lobster_id = lobster["id"]
# 缓存到 state
state["lobster_id"] = lobster_id
state["cached_token"] = current_token
save_state(state)
return lobster_id
raise ValueError(f"在平台未找到对应此 token 的龙虾: {current_token[:10]}...")
def save_state(state):
"""保存状态"""
with open(STATE_FILE, "w") as f:
json.dump(state, f, ensure_ascii=False)
def load_state():
"""加载状态"""
if not os.path.exists(STATE_FILE):
return {"in_progress_task_ids": [], "last_poll_at": None, "lobster_id": None, "cached_token": None}
with open(STATE_FILE, "r") as f:
return json.load(f)
def headers():
"""生成请求头"""
return {"X-Lobster-Token": get_token()}
def claim():
"""
抢任务
lobsterjob.com 使用 /api/lobster/{id}/claim 而非 /me/claim
返回: {"claimed": {...} 或 None, "in_progress": [...]} # 兼容 index.py 预期格式
"""
lobster_id = get_lobster_id()
url = f"{BASE_URL}/api/lobster/{lobster_id}/claim"
resp = requests.post(url, headers=headers(), timeout=30)
resp.raise_for_status()
data = resp.json()
# lobsterjob.com 返回格式: {"success": bool, "task_id": ..., "title": ..., ...}
# 转为 index.py 预期的格式
if data.get("success"):
claimed = {
"task_id": data.get("task_id"),
"title": data.get("title"),
"content": data.get("content"),
"attachment_url": data.get("attachment_url"),
"attachment_signed_url": data.get("attachment_signed_url"),
"submission_deadline": data.get("submission_deadline"),
"status": 1,
"claimed_at": data.get("claimed_at"),
}
else:
claimed = {
"success": False,
"message": data.get("message", "暂无新任务"),
}
# 获取进行中的任务
in_progress = []
try:
tasks_resp = requests.get(
f"{BASE_URL}/api/lobster/{lobster_id}/my-tasks",
headers=headers(), timeout=30
)
if tasks_resp.status_code == 200:
tasks_data = tasks_resp.json()
in_progress = tasks_data.get("in_progress", []) or []
except Exception:
pass
return {"claimed": claimed, "in_progress": in_progress}
def get_earnings():
"""
查询累计收益
lobsterjob.com 支持 /api/lobster/me/earnings
"""
url = f"{BASE_URL}/api/lobster/me/earnings"
resp = requests.get(url, headers=headers(), timeout=30)
resp.raise_for_status()
return resp.json()
def get_tasks():
"""
查询龙虾的任务列表
lobsterjob.com 支持 /api/lobster/me/tasks
"""
lobster_id = get_lobster_id()
url = f"{BASE_URL}/api/lobster/{lobster_id}/my-tasks"
resp = requests.get(url, headers=headers(), timeout=30)
resp.raise_for_status()
return resp.json()
def withdraw(amount: float):
"""
申请提现
lobsterjob.com 使用 /api/lobster/{id}/withdraw 而非 /me/withdraw
"""
lobster_id = get_lobster_id()
url = f"{BASE_URL}/api/lobster/{lobster_id}/withdraw"
resp = requests.post(url, headers=headers(), json={"amount": amount}, timeout=30)
resp.raise_for_status()
return resp.json()
FILE:scripts/cmd.py
#!/usr/bin/env python3
"""
龙虾 Agent Skill - 命令行入口
供 OpenClaw skill system 直接调用
用法: python3 cmd.py <command> [args]
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from api import claim, get_earnings, withdraw, load_state, save_state, get_lobster_id
import index as idx
def main():
args = sys.argv[1:]
if not args:
print("用法: lobster <start|stop|claim|earnings|withdraw|poll> [args]")
sys.exit(1)
cmd = args[0].lower()
try:
if cmd == "start":
print(idx.cmd_start())
elif cmd == "stop":
print(idx.cmd_stop())
elif cmd == "claim":
print(idx.cmd_claim())
elif cmd == "earnings":
print(idx.cmd_earnings())
elif cmd == "withdraw":
if len(args) < 2:
print("❌ 请指定提现金额,如:/lobster withdraw 100")
sys.exit(1)
print(idx.cmd_withdraw(args[1]))
elif cmd == "poll":
# 轮询脚本
state = load_state()
known_ids = set(state.get("in_progress_task_ids", []))
result = claim()
in_progress = result.get("in_progress") or []
current_ids = set(item.get("task_id") for item in in_progress if item.get("task_id"))
new_tasks = [item for item in in_progress if item.get("task_id") and item.get("task_id") not in known_ids]
state["in_progress_task_ids"] = list(current_ids)
from datetime import datetime
state["last_poll_at"] = datetime.utcnow().isoformat()
save_state(state)
if new_tasks:
for task in new_tasks:
print(f"[NEW_TASK] task_id={task.get('task_id')} title={task.get('title')} deadline={task.get('submission_deadline')}", flush=True)
else:
print(f"[POLL] no new tasks, in_progress={len(current_ids)}", flush=True)
else:
print(f"未知命令:{cmd}")
print("用法: lobster <start|stop|claim|earnings|withdraw|poll>")
sys.exit(1)
except Exception as e:
print(f"❌ 错误:{e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/index.py
"""
入口脚本:解析 /lobster claim|earnings|withdraw|start|stop 命令
"""
import sys
import os
import json
import subprocess
import yaml
import requests
from datetime import datetime
# 确保 scripts 目录可导入
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import api as lobster_api
SKILL_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CONFIG_FILE = os.path.join(SKILL_DIR, "config.json")
STATE_FILE = os.path.join(SKILL_DIR, "state.json")
CRON_JOB_ID_FILE = os.path.join(SKILL_DIR, "cron_job_id.json")
SKILLS_DIR = os.path.expanduser("~/.openclaw/workspace/skills")
# ===================== 工具函数 =====================
def openclaw(args: list) -> str:
"""执行 openclaw CLI 命令"""
result = subprocess.run(
["openclaw"] + args,
capture_output=True, text=True
)
return result.stdout + result.stderr
def save_cron_job_id(job_id):
with open(CRON_JOB_ID_FILE, "w") as f:
json.dump({"job_id": job_id}, f)
def load_cron_job_id():
if not os.path.exists(CRON_JOB_ID_FILE):
return None
with open(CRON_JOB_ID_FILE, "r") as f:
return json.load(f).get("job_id")
def get_installed_skills():
"""扫描已安装的 skills,返回 [{name, desc}, ...]"""
skills = []
if not os.path.isdir(SKILLS_DIR):
return skills
for item in os.listdir(SKILLS_DIR):
skill_path = os.path.join(SKILLS_DIR, item)
if not os.path.isdir(skill_path):
continue
if item == "lobster-agent":
continue
skill_md = os.path.join(skill_path, "SKILL.md")
if not os.path.exists(skill_md):
continue
try:
with open(skill_md, "r", encoding="utf-8") as f:
content = f.read()
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 3:
meta = yaml.safe_load(parts[1])
name = meta.get("name", item)
desc = meta.get("description", "")
skills.append({"name": name, "desc": desc[:200]})
except Exception:
continue
return skills
def register_skills_to_platform():
"""将已安装的 skills 登记到平台"""
skills = get_installed_skills()
if not skills:
print("[INFO] 未扫描到已安装的 skills,跳过登记")
return
try:
token = lobster_api.get_token()
url = f"{lobster_api.BASE_URL}/api/lobster/me/skills"
resp = requests.put(
url,
headers={"X-Lobster-Token": token},
json={"skills": skills},
timeout=30
)
resp.raise_for_status()
print(f"[INFO] 已将 {len(skills)} 个 skills 登记到平台")
for s in skills:
print(f" - {s['name']}: {s['desc'][:50]}")
except Exception as e:
print(f"[WARN] 登记 skills 失败:{e}", file=sys.stderr)
# ===================== 命令实现 =====================
def cmd_start():
"""启动托管:创建 cron job + 登记 skills"""
# 检查是否已创建过,先删旧
existing_id = load_cron_job_id()
if existing_id:
try:
openclaw(["cron", "rm", existing_id])
print(f"[INFO] 已删除旧 cron job: {existing_id}")
except Exception:
pass
# 创建新的 cron job(每分钟轮询,隔离会话,结果宣布到聊天)
output = openclaw([
"cron", "add",
"--name", "lobster-agent poll",
"--cron", "* * * * *",
"--message", "/lobster poll",
"--session", "isolated",
"--announce"
])
# 尝试从 JSON 输出中解析 job id
job_id = None
try:
import re
m = re.search(r'"id"\s*:\s*"([^"]+)"', output)
if m:
job_id = m.group(1)
except Exception:
pass
if job_id:
save_cron_job_id(job_id)
print(f"✅ 定时任务已创建 (id={job_id})")
else:
print(f"⚠️ 定时任务创建未解析到 id,请手动确认:openclaw cron list")
print(f" 原始输出: {output[:200]}")
# 登记 skills
register_skills_to_platform()
# 更新状态
state = lobster_api.load_state()
state["poll_enabled"] = True
lobster_api.save_state(state)
return "✅ 龙虾托管已启动,每分钟自动轮询,有新任务时推送"
def cmd_stop():
"""停止托管:删除 cron job"""
job_id = load_cron_job_id()
if job_id:
try:
openclaw(["cron", "rm", job_id])
print(f"[INFO] 已删除 cron job: {job_id}")
except Exception as e:
print(f"[WARN] 删除 cron job 失败:{e}")
try:
os.remove(CRON_JOB_ID_FILE)
except Exception:
pass
state = lobster_api.load_state()
state["poll_enabled"] = False
lobster_api.save_state(state)
return "✅ 龙虾托管已停止"
def format_task(task):
"""格式化任务信息"""
task_id = task.get("task_id", "?")
title = task.get("title", "无标题")
deadline = task.get("submission_deadline", "无截止时间")
status_map = {0: "待领取", 1: "进行中"}
status = status_map.get(task.get("status", 0), "未知")
attachment = task.get("attachment_signed_url") or task.get("attachment_url") or "无附件"
return (
f"任务 #{task_id}\n"
f" 标题:{title}\n"
f" 状态:{status}\n"
f" 截止:{deadline}\n"
f" 附件:{attachment}"
)
def cmd_claim():
"""主动触发抢任务"""
try:
result = lobster_api.claim()
claimed = result.get("claimed")
in_progress = result.get("in_progress") or []
if claimed and claimed.get("success"):
task = claimed
state = lobster_api.load_state()
current_ids = [item.get("task_id") for item in in_progress]
state["in_progress_task_ids"] = current_ids
state["last_poll_at"] = task.get("claimed_at")
lobster_api.save_state(state)
lines = [
"✅ 抢到新任务!",
format_task(task),
"",
f"--- 进行中的任务 ({len(in_progress)}) ---",
]
for item in in_progress:
lines.append(f"#{item.get('task_id')} {item.get('title')}")
return "\n".join(lines)
else:
msg = claimed.get("message", "暂无新任务或已超过领取截止时间") if claimed else "暂无新任务"
in_progress_lines = [f"--- 进行中 ({len(in_progress)}) ---"]
for item in in_progress:
in_progress_lines.append(f"#{item.get('task_id')} {item.get('title')}")
return "\n".join([msg] + (in_progress_lines if in_progress else [""]))
except Exception as e:
return f"❌ 抢任务失败:{e}"
def cmd_earnings():
"""查看累计收益"""
try:
result = lobster_api.get_earnings()
total = result.get("total_earned", 0)
return f"💰 累计收益:¥{total}"
except Exception as e:
return f"❌ 查询失败:{e}"
def cmd_withdraw(amount_str: str):
"""申请提现"""
try:
amount = float(amount_str)
if amount <= 0:
return "❌ 提现金额必须大于 0"
except ValueError:
return f"❌ 无效金额:{amount_str},请填写数字"
try:
result = lobster_api.withdraw(amount)
if result.get("success"):
total = result.get("total_earned", 0)
withdrawable = result.get("withdrawable", 0)
return (
f"✅ 提现申请已提交\n"
f" 申请金额:¥{amount}\n"
f" 累计收益:¥{total}\n"
f" 剩余可提:¥{withdrawable}"
)
else:
return f"❌ 提现失败:{result.get('message', '未知错误')}"
except Exception as e:
return f"❌ 提现失败:{e}"
# ===================== 入口 =====================
def main():
args = sys.argv[1:]
if not args:
print("用法:/lobster claim | earnings | withdraw <金额> | start | stop | poll")
return
subcommand = args[0].lower()
if subcommand == "claim":
print(cmd_claim())
elif subcommand == "earnings":
print(cmd_earnings())
elif subcommand == "withdraw":
if len(args) < 2:
print("❌ 请指定提现金额,如:/lobster withdraw 100")
else:
print(cmd_withdraw(args[1]))
elif subcommand == "start":
print(cmd_start())
elif subcommand == "stop":
print(cmd_stop())
elif subcommand == "poll":
# 轮询:供 cron job 调用
state = lobster_api.load_state()
if not state.get("poll_enabled"):
print("[POLL] 轮询已停止,跳过")
return
known_ids = set(state.get("in_progress_task_ids", []))
result = lobster_api.claim()
in_progress = result.get("in_progress") or []
current_ids = set(item.get("task_id") for item in in_progress)
new_tasks = [item for item in in_progress if item.get("task_id") not in known_ids]
state["in_progress_task_ids"] = list(current_ids)
state["last_poll_at"] = datetime.utcnow().isoformat()
lobster_api.save_state(state)
if new_tasks:
for task in new_tasks:
print(f"[NEW_TASK] 任务 #{task.get('task_id')} - {task.get('title')}", flush=True)
else:
print(f"[POLL] 无新任务,进行中: {len(current_ids)}", flush=True)
else:
print(f"未知命令:{subcommand}")
print("用法:/lobster claim | earnings | withdraw <金额> | start | stop | poll")
if __name__ == "__main__":
main()
FILE:scripts/poll_direct.py
#!/usr/bin/env python3
"""
龙虾轮询脚本 - 供 cron job 直接调用
直接执行轮询逻辑,不经过 LLM agent
"""
import sys
import os
from datetime import datetime
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from api import claim, load_state, save_state
def poll():
state = load_state()
known_ids = set(state.get("in_progress_task_ids", []))
result = claim()
in_progress = result.get("in_progress") or []
current_ids = set(item.get("task_id") for item in in_progress if item.get("task_id"))
# 找出新接到的任务
new_tasks = [
item for item in in_progress
if item.get("task_id") and item.get("task_id") not in known_ids
]
state["in_progress_task_ids"] = list(current_ids)
state["last_poll_at"] = datetime.utcnow().isoformat()
save_state(state)
if new_tasks:
for task in new_tasks:
print(f"[NEW_TASK] task_id={task.get('task_id')} title={task.get('title')} deadline={task.get('submission_deadline')}", flush=True)
else:
print(f"[POLL] no new tasks, in_progress={len(current_ids)}", flush=True)
if __name__ == "__main__":
try:
poll()
except Exception as e:
print(f"[ERROR] {e}", flush=True)
sys.exit(1)
FILE:scripts/__init__.py
虾宝智能记忆工作流。开箱即用,数据目录与代码分离,支持轻量降级。 特性: - 文件系统存储,无外部数据库依赖 - Ollama 向量嵌入(可选,降级为 BM25) - bge-reranker-v2-m3 重排(可选) - HyDE + Query Rewriting(可选) - 后台线程自动存储(每10分钟)...
# Memory Workflow
## 首次安装配置
> 以下规则需要添加到 `~/.openclaw/workspace/AGENTS.md`。如果规则已存在(见「已安装」标记),跳过此步骤。
**添加以下内容到 AGENTS.md 顶部(`## 每次消息时` 之前):**
```markdown
<!-- [memory-workflow] 已安装 -->
## 每次消息时
自动搜索相关记忆(如果返回有结果则使用,无结果则忽略):
Exec: python3 $HOME/.openclaw/workspace/skills/memory-workflow/memory_ops.py search --query "【用户发送的原始消息】" --llm-answer --limit 3
## /save 命令
当用户发送 /save 时,强制将当前 session 的完整对话存入记忆系统:
Exec: cat << 'EOF' | python3 $HOME/.openclaw/workspace/skills/memory-workflow/scripts/save_session.py
{"messages": [
{"role": "user", "content": "【从当前对话上下文获取】"},
{"role": "assistant", "content": "【从当前对话上下文获取】"}
]}
EOF
```
**检查是否已安装**:如果 AGENTS.md 中已有 `<!-- [memory-workflow] 已安装 -->` 标记,说明已配置,跳过。
---
你的记忆工作流助手。当用户提到"记忆"、"记得"、"之前说过"、"搜一下"、"去重"、"整理"等词时激活此 skill。
## 三层存储架构
| 层级 | 存储位置 | 依赖 | 说明 |
|------|----------|------|------|
| **文件** | `memory-workflow-data/memories/YYYY-MM-DD.md` | 无 | 永远可用 |
| **FTS5** | `memory-workflow-data/fts5_index.db` | SQLite 内置 | 全文搜索,零外部依赖 |
| **KG** | `memory-workflow-data/knowledge-graph/kg.db` | Ollama(可选) | 三元组知识图谱,规则降级 |
| **Milvus** | memory_workflow collection | 18779 端口(可选) | 向量检索,bge-m3 embedding |
**开箱即用**:即使没有任何外部服务,文件层 + FTS5 + KG(规则) 仍然正常工作。
## 核心文件
```
memory-workflow/
├── SKILL.md # 本文件
├── memory_ops.py # CLI 入口
├── README.md # 使用说明
└── scripts/
├── config.py # 配置(路径、服务地址)
├── store.py # 三层存储写入
├── search.py # FTS5 + Jaccard 搜索
├── fts5.py # FTS5 全文索引
├── tools.py # BaseTool 封装(6个工具)
└── save_session.py # /save 命令处理器
```
## 工具接口
| 工具 | 说明 | 依赖 |
|------|------|------|
| `MemorySearch` | 语义搜索,FTS5 + 字符N-gram Jaccard 排序 | 无 |
| `MemoryStore` | 三层存储(文件 + KG + Milvus),自动去重 | Ollama(KG) / Milvus(可选) |
| `MemoryDedup` | Jaccard 去重(阈值 0.85) | 无 |
| `MemoryPrune` | 清理 N 天前记忆 | 无 |
| `MemoryConsolidate` | 合并相似记忆(Jaccard > 0.7) | 无 |
| `MemoryList` | 列出近期记忆文件 | 无 |
## 搜索算法
**字符级 N-gram Jaccard** — 解决中文分词问题:
- "发哥在测试" → `{'发哥','哥在','在测','测试'}`
- 中英混合无缝支持,无需外部分词服务
**搜索流程**:Jaccard 召回 → FTS5 BM25 重排 → 时间衰减 rerank
## 数据存储路径
- 记忆文件:`~/.openclaw/workspace/memory-workflow-data/memories/YYYY-MM-DD.md`
- FTS5 索引:`~/.openclaw/workspace/memory-workflow-data/fts5_index.db`
- KG 图谱:`~/.openclaw/workspace/memory-workflow-data/knowledge-graph/kg.db`
- 元数据:`~/.openclaw/workspace/memory-workflow-data/knowledge-graph/kg.db` 内
## 触发规则
| 模式 | 工具 |
|------|------|
| "记"、"存"、"记住"、"记一下" | `MemoryStore` |
| "搜"、"找"、"记得"、"之前说过" | `MemorySearch` |
| "去重"、"整理记忆" | `MemoryDedup` |
| "合并"、"整理一下" | `MemoryConsolidate` |
| "删除"、"清理旧" | `MemoryPrune` |
| "列出记忆文件"、"有哪些记忆" | `MemoryList` |
## CLI 用法
```bash
# 搜索
python memory_ops.py search --query "发哥 工作" --limit 3
# 存储
python memory_ops.py store --content "要记忆的内容" --tag work
# /save 存储 session
python scripts/save_session.py '{"messages": [...]}'
# 去重 / 整理
python memory_ops.py dedup
python memory_ops.py consolidate
python memory_ops.py prune --days 30
# 导出工具定义
python memory_ops.py register
```
FILE:scripts/config.py
"""
Memory Workflow 配置
"""
import os
from pathlib import Path
# 记忆文件存储目录
MEMORY_DIR = Path.home() / ".openclaw" / "workspace" / "memory-workflow-data" / "memories"
MEMORY_DIR.mkdir(parents=True, exist_ok=True)
# Rerank 服务
RERANK_SERVICE_URL = os.environ.get("RERANK_SERVICE_URL", "http://172.17.0.1:18778")
# Embedding 配置
EMBEDDING_API = os.environ.get("EMBEDDING_API", "openai")
EMBEDDING_MODEL = os.environ.get("EMBEDDING_MODEL", "text-embedding-3-small")
EMBEDDING_DIM = 1536
def get_memory_files():
"""获取所有记忆文件,按日期倒序"""
if not MEMORY_DIR.exists():
return []
return sorted(MEMORY_DIR.glob("*.md"), reverse=True)
FILE:scripts/fts5.py
"""
FTS5 全文搜索 — SQLite 内置,无需外部依赖
功能:
1. 自动建 FTS5 virtual table(如果不存在)
2. 增量更新:每次 store_memory 时同步写入 FTS5
3. 搜索:MATCH 查询 + BM25 排序
4. 降级:如果 FTS5 不可用,静默跳过
"""
import os
import re
import sqlite3
from pathlib import Path
from typing import Optional, List
from .config import MEMORY_DIR
# FTS5 数据库(放在记忆目录旁边)
FTS_DB = MEMORY_DIR.parent / "fts5_index.db"
def _get_fts_conn() -> sqlite3.Connection:
"""获取 FTS5 连接"""
conn = sqlite3.connect(str(FTS_DB), check_same_thread=False)
# FTS5 不支持 WAL,会导致 "no such table" 错误
conn.execute("PRAGMA journal_mode=DELETE")
return conn
def init_fts():
"""初始化 FTS5 表"""
conn = _get_fts_conn()
conn.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
path,
content,
date
)
""")
conn.commit()
conn.close()
def sync_file_to_fts(file_path: str, content: str, date: str):
"""同步单条记忆到 FTS5(store_memory 时调用)
注意:content 是本次要存储的新内容,但文件可能包含旧内容。
策略:删掉该文件所有旧记录,重新从文件读取完整内容索引。
"""
try:
mf = Path(file_path)
if not mf.exists():
return
full_content = mf.read_text(encoding="utf-8")
conn = _get_fts_conn()
# 删掉该文件所有旧记录
conn.execute("DELETE FROM memories_fts WHERE path = ?", (file_path,))
# 从文件重新索引所有段落
for para in full_content.split("\n"):
para = para.strip()
if len(para) < 5:
continue
para = re.sub(r"^[-*]\s+", "", para).strip()
if para:
conn.execute(
"INSERT INTO memories_fts(path, content, date) VALUES (?, ?, ?)",
(str(mf), para, date)
)
conn.commit()
conn.close()
except Exception as e:
print(f" [FTS5同步错误] {e}")
def build_fts_index():
"""全量重建 FTS5 索引(首次启用时调用)"""
if not MEMORY_DIR.exists():
print("记忆目录不存在,跳过 FTS5 索引构建")
return
conn = _get_fts_conn()
count = 0
for mf in MEMORY_DIR.glob("*.md"):
try:
date = mf.stem # YYYY-MM-DD
content = mf.read_text(encoding="utf-8")
# 分段插入
for para in content.split("\n"):
para = para.strip()
if len(para) < 5:
continue
para = re.sub(r"^[-*]\s+", "", para).strip()
if para:
conn.execute(
"INSERT INTO memories_fts(path, content, date) VALUES (?, ?, ?)",
(str(mf), para, date)
)
count += 1
except Exception as e:
print(f" [FTS5索引错误] {mf}: {e}")
conn.commit()
conn.close()
print(f"FTS5 索引构建完成: {count} 条记录")
def search_fts(query: str, limit: int = 10) -> List[dict]:
"""
FTS5 全文搜索
使用 BM25 排序,支持:
- 短语搜索: "hello world"
- AND/OR: hello AND world
- 前缀: hello*
"""
if not query or not query.strip():
return []
# 预处理 query(处理特殊字符)
query = query.strip()
query = re.sub(r'[""\'*]', ' ', query)
query = ' OR '.join(query.split())
try:
conn = _get_fts_conn()
# 尝试 MATCH 查询
try:
cursor = conn.execute("""
SELECT path, content, date, bm25(memories_fts) as score
FROM memories_fts
WHERE memories_fts MATCH ?
ORDER BY score
LIMIT ?
""", (query, limit))
rows = cursor.fetchall()
except Exception:
# MATCH 语法错误,使用 LIKE 降级
like_pattern = f"%{query.replace(' ', '%')}%"
cursor = conn.execute("""
SELECT path, content, date, 0.0 as score
FROM memories_fts
WHERE content LIKE ?
ORDER BY date DESC
LIMIT ?
""", (like_pattern, limit))
rows = cursor.fetchall()
conn.close()
results = []
seen = set()
for path, content, date, score in rows:
if content in seen:
continue
seen.add(content)
results.append({
"chunk": content,
"date": date,
"file": path,
"fts_score": score,
"source": "fts5"
})
return results
except Exception as e:
print(f" [FTS5搜索错误] {e}")
return []
def rerank_with_fts(jaccard_results: List[dict], query: str, top_n: int = 5) -> List[dict]:
"""
用 FTS5 对 Jaccard 结果重排
Jaccard 结果 + FTS5 BM25 分 → 混合分数
混合分数 = jaccard_score * 0.6 + fts_score_normalized * 0.4
"""
if not jaccard_results or not query.strip():
return jaccard_results[:top_n]
try:
fts_results = search_fts(query, limit=len(jaccard_results) * 2)
if not fts_results:
return jaccard_results[:top_n]
# 建 FTS 分数表
fts_map = {r["chunk"]: r["fts_score"] for r in fts_results}
# 找 max/min fts_score 用于归一化
fts_scores = [v for v in fts_map.values() if v != 0]
if fts_scores:
fts_max = max(fts_scores)
fts_min = min(fts_scores)
else:
fts_max = fts_min = 1
# 混合打分
for r in jaccard_results:
chunk = r["chunk"]
fts_score = fts_map.get(chunk, 0)
if fts_max != fts_min:
fts_norm = (fts_score - fts_min) / (fts_max - fts_min)
else:
fts_norm = 0.5
# jaccard_score 归一化(因为 Jaccard 通常很小)
jaccard_norm = r.get("score", 0) * 5 # 放大便于比较
r["hybrid_score"] = jaccard_norm * 0.6 + fts_norm * 0.4
jaccard_results.sort(key=lambda x: x.get("hybrid_score", 0), reverse=True)
except Exception as e:
print(f" [FTS5重排错误] {e}")
return jaccard_results[:top_n]
# 初始化(在模块首次导入时)
try:
init_fts()
except Exception as e:
print(f" [FTS5初始化错误] {e}")
FILE:scripts/save_session.py
#!/usr/bin/env python3
"""
/save 命令处理器 - 被 agent 调用
用法(由 agent 的 Bash 工具执行):
python3 .../save_session.py <session_json_file>
"""
import sys
import os
import json
import subprocess
from pathlib import Path
def main():
if len(sys.argv) < 2:
# 没有参数:尝试从 stdin 读取
try:
session_data = json.load(sys.stdin)
except:
print("Usage: save_session.py '<session_json_string>'")
sys.exit(1)
else:
# 命令行参数是 JSON 字符串
try:
session_data = json.loads(sys.argv[1])
except json.JSONDecodeError as e:
print(f"JSON 解析失败: {e}")
sys.exit(1)
messages = session_data.get("messages", [])
if not messages:
print("No messages to save")
sys.exit(0)
# 调用 memory_ops.py store --session-file
script_dir = Path(__file__).parent
memory_ops = script_dir.parent / "memory_ops.py"
# 写临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False, encoding="utf-8") as f:
json.dump({"messages": messages}, f)
temp_file = f.name
try:
result = subprocess.run(
[sys.executable, str(memory_ops), "store", "--session-file", temp_file, "--json"],
capture_output=True, text=True, timeout=30
)
print(result.stdout)
if result.returncode != 0:
print(result.stderr, file=sys.stderr)
sys.exit(result.returncode)
finally:
os.unlink(temp_file)
if __name__ == "__main__":
main()
FILE:scripts/search.py
"""
记忆搜索 - 文件 + FTS5 混合搜索
FTS5 全文搜索增强,零外部依赖
后续可升级为向量搜索 + rerank
"""
import re
import json
import os
import math
from pathlib import Path
from typing import Optional
from .config import MEMORY_DIR, RERANK_SERVICE_URL, get_memory_files
from .fts5 import search_fts, rerank_with_fts
def get_all_chunks(limit: int = 1000) -> list[dict]:
"""
获取所有记忆片段(用于去重、合并等维护操作)
"""
chunks = []
for mf in get_memory_files():
try:
date = mf.stem # YYYY-MM-DD
content = mf.read_text(encoding="utf-8")
for para in content.split("\n"):
para = para.strip()
if len(para) > 10:
para = re.sub(r"^[-*]\s+", "", para).strip()
if para:
chunks.append({
"chunk": para,
"date": date,
"file": str(mf),
})
except Exception:
continue
if len(chunks) >= limit:
break
return chunks
def _ngram(text: str, n: int = 2) -> set:
"""
字符级 N-gram 提取
中文:'今天天气不错' → {'今天', '天好', '好不', '不错'}
英文:'hello world' → {'he', 'el', 'll', 'lo', 'o ', ' w', 'wo', 'or', 'rl', 'ld'}
"""
return {text[i:i+n] for i in range(len(text) - n + 1)}
def _jaccard_ngram(text1: str, text2: str, n: int = 2) -> float:
"""基于字符 N-gram 的 Jaccard 相似度(适合中英文混合)"""
ng1 = _ngram(text1.lower(), n)
ng2 = _ngram(text2.lower(), n)
if not ng1 or not ng2:
return 0.0
intersection = len(ng1 & ng2)
union = len(ng1 | ng2)
return intersection / union if union > 0 else 0.0
def simple_search(query: str, limit: int = 3) -> list[dict]:
"""
简单搜索:基于字符 N-gram Jaccard 相似度
支持中英文混合,无需分词
"""
results = []
for mf in get_memory_files():
try:
date = mf.stem
content = mf.read_text(encoding="utf-8")
for para in content.split("\n"):
para = para.strip()
if len(para) < 5:
continue
para = re.sub(r"^[-*]\s+", "", para).strip()
if not para:
continue
# 字符 N-gram Jaccard
score = _jaccard_ngram(query, para, n=2)
if score > 0:
results.append({
"chunk": para,
"date": date,
"score": score,
"file": str(mf)
})
except Exception:
continue
# 按相似度排序
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit]
def rerank_results(query: str, results: list[dict], top_n: int = 3) -> list[dict]:
"""
简单的 rerank:优先返回近期、高分的结果
后续可接入真正的 bge-reranker-v2-m3
"""
# 简单加权:分数 * 时间衰减因子
import time
now = time.time()
DAY = 86400
reranked = []
for r in results:
try:
file_date = r.get("date", "")
if file_date:
from datetime import datetime
d = datetime.strptime(file_date, "%Y-%m-%d")
age_days = (now - d.timestamp()) / DAY
time_weight = math.exp(-0.05 * age_days) # 指数衰减
else:
time_weight = 0.5
r["rerank_score"] = r["score"] * (0.7 + 0.3 * time_weight)
reranked.append(r)
except Exception:
r["rerank_score"] = r["score"]
reranked.append(r)
reranked.sort(key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_n]
def search_memories(
query: str,
limit: int = 3,
use_rerank: bool = True,
use_llm: bool = False,
) -> dict:
"""
主搜索入口 — FTS5 + Jaccard 混合搜索
流程:
1. Jaccard 快速召回 top-k
2. FTS5 BM25 重排(如果 FTS5 可用)
3. 时间衰减 rerank
Args:
query: 搜索查询
limit: 返回数量
use_rerank: 是否使用 rerank(目前强制开启)
use_llm: 是否用 LLM 生成总结(暂不支持)
"""
if not query or not query.strip():
return {"answer": "", "results": [], "count": 0}
# 1. Jaccard 快速召回(不用外部服务)
results = simple_search(query, limit=limit * 3)
if not results:
return {"answer": "", "results": [], "count": 0}
# 2. FTS5 重排(零依赖,如果失败静默降级)
# 注意:FTS5 默认不支持中文分词,中文查询会返回空,此时保留 Jaccard 结果
if use_rerank:
fts_results = rerank_with_fts(results, query, top_n=limit * 2)
# 如果 FTS5 重排后有结果才用,否则保持 Jaccard 结果
if fts_results:
results = fts_results
# 3. 时间衰减 rerank
if use_rerank:
results = rerank_results(query, results, top_n=limit)
return {
"answer": "",
"results": results[:limit],
"count": len(results[:limit])
}
def rag_search(query: str, limit: int = 3) -> dict:
"""RAG 搜索(当前等价于简单搜索)"""
return search_memories(query, limit=limit, use_rerank=True, use_llm=False)
FILE:scripts/store.py
"""
存储记忆 - 三层存储:文件 + Milvus + 知识图谱
1. 文件存储:~/.openclaw/workspace/memory-workflow-data/memories/YYYY-MM-DD.md
2. Milvus 向量存储:需要 embedding 模型生成向量
3. 知识图谱:SQLite KG,提取实体+关系三元组
"""
import os
import re
import json
import uuid
import math
from pathlib import Path
from datetime import datetime
from typing import Optional
from .config import MEMORY_DIR
# KG 路径
MEMORY_WORKFLOW_DATA = os.environ.get(
"MEMORY_WORKFLOW_DATA",
str(Path.home() / ".openclaw" / "workspace" / "memory-workflow-data")
)
KG_DIR = os.path.join(MEMORY_WORKFLOW_DATA, "knowledge-graph")
KG_DB = os.path.join(KG_DIR, "kg.db")
os.makedirs(KG_DIR, exist_ok=True)
os.makedirs(MEMORY_DIR, exist_ok=True)
# Milvus 配置
MILVUS_HOST = os.environ.get('MILVUS_HOST', 'host.docker.internal')
MILVUS_PORT = os.environ.get('MILVUS_PORT', '19530')
# Embedding 服务(宿主机的 bge-m3)
EMBEDDING_URL = os.environ.get('EMBEDDING_URL', 'http://172.17.0.1:18779/v1/embeddings')
EMBEDDING_MODEL = os.environ.get('EMBEDDING_MODEL', 'bge-m3')
# Ollama 配置(用于 KG 三元组提取)
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434")
JIEBA_AVAILABLE = False
try:
import jieba
import jieba.posseg as pseg
JIEBA_AVAILABLE = True
except ImportError:
pass
# ===== KG 工具函数 =====
def kg_get_conn():
import sqlite3
conn = sqlite3.connect(KG_DB, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def kg_init(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS kg_entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
entity_type TEXT,
created_at TEXT DEFAULT (date('now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS kg_edges (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_id INTEGER NOT NULL REFERENCES kg_entities(id),
to_id INTEGER NOT NULL REFERENCES kg_entities(id),
rel TEXT NOT NULL,
time TEXT,
session TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_edges_rel ON kg_edges(rel)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_edges_from ON kg_edges(from_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_edges_to ON kg_edges(to_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_entities_name ON kg_entities(name)")
conn.commit()
def kg_get_or_create(conn, name):
name = name.strip()
cur = conn.execute("SELECT id FROM kg_entities WHERE name = ?", (name,))
row = cur.fetchone()
if row:
return row[0]
cur = conn.execute("INSERT INTO kg_entities (name) VALUES (?)", (name,))
conn.commit()
return cur.lastrowid
def kg_insert_edge(conn, from_name, rel, to_name, session_id=None):
from_id = kg_get_or_create(conn, from_name)
to_id = kg_get_or_create(conn, to_name)
conn.execute(
"INSERT INTO kg_edges (from_id, to_id, rel, time, session) VALUES (?, ?, ?, ?, ?)",
(from_id, to_id, rel, datetime.now().strftime("%Y-%m-%d"), session_id)
)
conn.commit()
# ===== 三元组提取 =====
def extract_triples_via_llm(text: str):
"""调用 Ollama LLM 提取三元组"""
prompt = f"""从以下文本中抽取实体和关系,输出JSON数组格式,每条记录包含 from、rel、to。
文本:{text}
要求:
- 只提取有意义的实体关系,最多5条
- 实体名称简洁,如"发哥"、"龙虾平台"
- 关系动词简洁,如"拥有"、"完成"、"讨论了"
- 只输出JSON数组,不要其他内容
输出:"""
try:
from urllib.request import Request, urlopen
data = json.dumps({
"model": "hoangquan456/qwen3-nothink:4b",
"messages": [{"role": "user", "content": prompt}],
"stream": False
}).encode("utf-8")
req = Request(
OLLAMA_URL + "/api/chat",
data=data,
headers={"Content-Type": "application/json"}
)
with urlopen(req, timeout=15) as resp:
result = json.loads(resp.read().decode("utf-8"))
content = result.get("message", {}).get("content", "").strip()
try:
return json.loads(content)
except json.JSONDecodeError:
start = content.find('[')
end = content.rfind(']')
if start != -1 and end != -1 and start < end:
return json.loads(content[start:end+1])
except Exception as e:
print(f" [LLM错误] {e}")
return None
REL_WORDS = sorted([
'雇佣了', '拥有了', '创建了', '搭建了', '完成了', '属于',
'雇佣', '拥有', '创建', '搭建', '完成', '属于', '使用', '是'
], key=len, reverse=True)
def extract_triples_rule(text: str):
"""基于规则的三元组提取"""
triples = []
for rel in REL_WORDS:
if rel in text:
parts = text.split(rel)
if len(parts) >= 2:
from_name = parts[0].strip()
to_name = rel.join(parts[1:]).strip()
if len(from_name) >= 2 and len(to_name) >= 2:
triples.append({"from": from_name, "rel": rel, "to": to_name})
return triples[:5]
def extract_triples(text: str):
"""提取三元组,LLM优先,规则降级"""
triples = extract_triples_via_llm(text)
if triples and isinstance(triples, list) and len(triples) > 0:
valid = [t for t in triples if isinstance(t, dict) and t.get("from") and t.get("rel") and t.get("to")]
if valid:
return valid, "llm"
rule_triples = extract_triples_rule(text)
if rule_triples:
return rule_triples, "rule"
return [], "none"
# ===== 文件去重检查 =====
def get_existing_chunks() -> list[str]:
"""获取所有已有记忆片段(用于去重)"""
chunks = []
if not MEMORY_DIR.exists():
return chunks
for mf in MEMORY_DIR.glob("*.md"):
try:
content = mf.read_text(encoding="utf-8")
for line in content.split("\n"):
line = line.strip()
if len(line) > 10:
line = re.sub(r"^[-*]\s+", "", line).strip()
if line:
chunks.append(line)
except Exception:
continue
return chunks
def _ngram(text: str, n: int = 2) -> set:
"""字符级 N-gram"""
return {text[i:i+n] for i in range(len(text) - n + 1)}
def _jaccard_ngram(text1: str, text2: str, n: int = 2) -> float:
"""基于字符 N-gram 的 Jaccard 相似度"""
ng1 = _ngram(text1.lower(), n)
ng2 = _ngram(text2.lower(), n)
if not ng1 or not ng2:
return 0.0
return len(ng1 & ng2) / len(ng1 | ng2) if len(ng1 | ng2) > 0 else 0.0
def check_duplicate(new_content: str, threshold: float = 0.85) -> bool:
"""检查新内容是否与已有记忆重复(字符 N-gram Jaccard)"""
if not new_content.strip():
return False
for existing in get_existing_chunks():
score = _jaccard_ngram(new_content, existing, n=2)
if score > threshold:
return True
return False
# ===== 主存储函数 =====
def store_memory(content: str, tags: list = None, session_id: str = None) -> dict:
"""
存储单条记忆到三层存储
Returns:
{"file": str, "date": str, "tags": list, "skipped": bool, "reason": str,
"kg_triples": int, "milvus_id": str}
"""
if not content or not content.strip():
return {
"file": None, "date": "", "tags": tags or [],
"skipped": True, "reason": "内容为空",
"kg_triples": 0, "milvus_id": None
}
# 1. 文件去重检查
if check_duplicate(content):
return {
"file": None,
"date": datetime.now().strftime("%Y-%m-%d"),
"tags": tags or [],
"skipped": True,
"reason": "内容已存在,跳过存储",
"kg_triples": 0,
"milvus_id": None
}
today = datetime.now().strftime("%Y-%m-%d")
mf = MEMORY_DIR / f"{today}.md"
tag_str = f"[{', '.join(tags)}] " if tags else ""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M")
entry = f"## {timestamp}\n- {tag_str}{content}\n"
# 2. 写入文件
with open(mf, "a", encoding="utf-8") as f:
f.write(entry)
# 2.5 同步到 FTS5 索引
try:
from .fts5 import sync_file_to_fts, init_fts
init_fts() # 确保表存在
sync_file_to_fts(str(mf), content, today)
except Exception as e:
print(f" [FTS5同步错误] {e}")
result = {
"file": str(mf),
"date": today,
"tags": tags or [],
"skipped": False,
"kg_triples": 0,
"milvus_id": None
}
# 3. 提取三元组并存入 KG
try:
triples, method = extract_triples(content)
if triples:
conn = kg_get_conn()
kg_init(conn)
for t in triples:
kg_insert_edge(conn, t["from"], t["rel"], t["to"], session_id)
conn.close()
result["kg_triples"] = len(triples)
except Exception as e:
print(f" [KG错误] {e}")
# 4. 存入 Milvus(先查重)
try:
if not check_milvus_duplicate(content):
embedding = get_embedding(content)
if embedding:
result["milvus_id"] = store_to_milvus(content, embedding)
else:
result["milvus_id"] = None
result["skipped"] = True
result["reason"] = (result.get("reason") or "") + " | Milvus已存在,跳过"
except Exception as e:
print(f" [Milvus存储错误] {e}")
return result
def check_milvus_duplicate(content: str) -> bool:
"""检查 Milvus 是否已有相同内容(精确匹配)"""
try:
from pymilvus import Collection, connections
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, alias='default')
collection = Collection('memory_workflow')
collection.load()
results = collection.query(
expr=f'content == "{content}"',
output_fields=['id'],
limit=1
)
return len(results) > 0
except Exception:
return False
def get_embedding(text: str) -> Optional[list[float]]:
"""调用 bge-m3 获取文本向量"""
try:
from urllib.request import Request, urlopen
data = json.dumps({
"model": EMBEDDING_MODEL,
"input": text
}).encode("utf-8")
req = Request(
EMBEDDING_URL,
data=data,
headers={"Content-Type": "application/json"}
)
with urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode("utf-8"))
embedding = result["data"][0]["embedding"]
return embedding
except Exception as e:
print(f" [Embedding错误] {e}")
return None
def store_to_milvus(text: str, vector: list[float], metadata: dict = None) -> Optional[str]:
"""存入 Milvus"""
try:
from pymilvus import Collection, connections
connections.connect(host=MILVUS_HOST, port=MILVUS_PORT, alias='default')
collection = Collection('memory_workflow')
collection.load()
entity_id = str(uuid.uuid4())
entity = {
"id": entity_id,
"vector": vector,
"content": text,
"topic": metadata.get("topic", "") if metadata else "",
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"parent_id": metadata.get("parent_id", "") if metadata else "",
"chunk_index": metadata.get("chunk_index", 0) if metadata else 0,
}
collection.insert([entity])
collection.flush()
return entity_id
except Exception as e:
print(f" [Milvus错误] {e}")
return None
FILE:scripts/tools.py
"""
Memory Workflow 工具封装 - BaseTool 子类
每个操作对应一个 Tool,可直接注册到 Agent 工具注册表
"""
import json
import subprocess
import sys
import os
from pathlib import Path
from dataclasses import dataclass
SCRIPT_DIR = Path(__file__).parent
@dataclass
class ToolResult:
"""工具执行结果"""
success: bool
data: dict = None
error: str = ""
def __getitem__(self, key):
return getattr(self, key)
class BaseMemoryTool:
"""Memory Tool 基类"""
name: str = ""
description: str = ""
input_schema: dict = {}
def validate_input(self, args: dict) -> tuple[bool, str]:
"""验证输入参数"""
required = self.input_schema.get("required", [])
for field in required:
if field not in args:
return False, f"缺少必需参数: {field}"
return True, ""
def call(self, args: dict, context: dict = None) -> ToolResult:
"""执行工具,子类实现"""
raise NotImplementedError
def _run_op(self, op: str, extra_args: list = None) -> dict:
"""运行 memory_ops.py,返回 JSON 结果"""
cmd = [sys.executable, str(SCRIPT_DIR.parent / "memory_ops.py"), op]
if extra_args:
cmd.extend(extra_args)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30,
env={**os.environ, "PYTHONPATH": str(Path(__file__).parent.parent)}
)
if result.returncode != 0:
return {"success": False, "error": result.stderr or result.stdout}
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {"success": True, "output": result.stdout}
except subprocess.TimeoutExpired:
return {"success": False, "error": "操作超时"}
except Exception as e:
return {"success": False, "error": str(e)}
class MemorySearchTool(BaseMemoryTool):
"""语义搜索记忆库"""
name = "MemorySearch"
description = "语义搜索记忆库,支持 rerank 排序,返回最相关的记忆片段"
input_schema = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "搜索查询,描述你想找的记忆内容"
},
"limit": {
"type": "integer",
"default": 3,
"description": "返回结果数量,默认3条"
}
},
"required": ["query"]
}
def call(self, args: dict, context: dict = None) -> ToolResult:
valid, err = self.validate_input(args)
if not valid:
return ToolResult(success=False, error=err)
query = args["query"]
limit = args.get("limit", 3)
result = self._run_op("search", [
"--query", query,
"--limit", str(limit),
"--json"
])
if result.get("success"):
output = result.get("output", {})
if isinstance(output, dict):
return ToolResult(
success=True,
data={
"answer": output.get("answer", ""),
"results": output.get("results", []),
"count": len(output.get("results", []))
}
)
return ToolResult(success=True, data=output)
return ToolResult(success=False, error=result.get("error", "搜索失败"))
class MemoryStoreTool(BaseMemoryTool):
"""存储记忆到记忆库"""
name = "MemoryStore"
description = "将重要信息存储到记忆库,自动按日期归档"
input_schema = {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "要记忆的内容,尽量完整描述"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "标签,如 ['发哥', '工作']"
}
},
"required": ["content"]
}
def call(self, args: dict, context: dict = None) -> ToolResult:
valid, err = self.validate_input(args)
if not valid:
return ToolResult(success=False, error=err)
content = args["content"]
tags = args.get("tags", [])
tag_args = []
for tag in tags:
tag_args.extend(["--tag", tag])
result = self._run_op("store", ["--content", content, "--json"] + tag_args)
if result.get("success"):
return ToolResult(success=True, data=result.get("output", {}))
return ToolResult(success=False, error=result.get("error", "存储失败"))
class MemoryDedupTool(BaseMemoryTool):
"""去除记忆库中的重复条目"""
name = "MemoryDedup"
description = "去除记忆库中的重复条目,基于语义相似度 > 0.85 判定为重复"
input_schema = {
"type": "object",
"properties": {}
}
def call(self, args: dict = None, context: dict = None) -> ToolResult:
result = self._run_op("dedup", ["--json"])
if result.get("success"):
return ToolResult(success=True, data=result)
return ToolResult(success=False, error=result.get("error", "去重失败"))
class MemoryPruneTool(BaseMemoryTool):
"""清理过时记忆"""
name = "MemoryPrune"
description = "删除早于指定天数的记忆,默认为 30 天"
input_schema = {
"type": "object",
"properties": {
"days": {
"type": "integer",
"default": 30,
"description": "删除多少天以前的记忆"
}
}
}
def call(self, args: dict = None, context: dict = None) -> ToolResult:
days = (args or {}).get("days", 30)
result = self._run_op("prune", ["--days", str(days), "--json"])
if result.get("success"):
return ToolResult(success=True, data=result)
return ToolResult(success=False, error=result.get("error", "清理失败"))
class MemoryConsolidateTool(BaseMemoryTool):
"""合并相似的记忆片段"""
name = "MemoryConsolidate"
description = "合并相似的记忆片段,减少冗余,保留最完整的信息"
input_schema = {
"type": "object",
"properties": {}
}
def call(self, args: dict = None, context: dict = None) -> ToolResult:
result = self._run_op("consolidate", ["--json"])
if result.get("success"):
return ToolResult(success=True, data=result)
return ToolResult(success=False, error=result.get("error", "合并失败"))
class MemoryListTool(BaseMemoryTool):
"""列出记忆文件"""
name = "MemoryList"
description = "列出记忆库中指定日期范围内的记忆文件"
input_schema = {
"type": "object",
"properties": {
"days": {
"type": "integer",
"default": 7,
"description": "列出多少天以内的记忆"
}
}
}
def call(self, args: dict = None, context: dict = None) -> ToolResult:
days = (args or {}).get("days", 7)
result = self._run_op("list", ["--days", str(days), "--json"])
if result.get("success"):
return ToolResult(success=True, data=result)
return ToolResult(success=False, error=result.get("error", "列出失败"))
# 所有工具列表(供注册用)
ALL_TOOLS = [
MemorySearchTool,
MemoryStoreTool,
MemoryDedupTool,
MemoryPruneTool,
MemoryConsolidateTool,
MemoryListTool,
]
FILE:memory_ops.py
#!/usr/bin/env python3
"""
Memory Workflow CLI - 记忆工作流入口
用法:
python memory_ops.py store --content "要记忆的内容" [--tag tag1]
python memory_ops.py search --query "查询" [--limit 3] [--json]
python memory_ops.py dedup [--json]
python memory_ops.py prune --days 30 [--json]
python memory_ops.py consolidate [--json]
python memory_ops.py list --days 7 [--json]
python memory_ops.py register # 导出所有工具为 BaseTool 注册格式
"""
import argparse
import json
import sys
import os
from pathlib import Path
from datetime import datetime, timedelta
# 确保 scripts 目录在 path
SCRIPT_DIR = Path(__file__).parent / "scripts"
sys.path.insert(0, str(Path(__file__).parent))
from scripts.search import search_memories, rag_search
from scripts.store import store_memory
from scripts.config import MEMORY_DIR, get_memory_files
TOOLS = None
def cmd_store(args):
"""存储记忆"""
content = args.content
tags = args.tag or []
session_file = args.session_file
# 如果指定了 session-file,读取并存储多条消息
if session_file:
try:
with open(session_file, "r", encoding="utf-8") as f:
session_data = json.load(f)
messages = session_data.get("messages", [])
if not isinstance(messages, list):
messages = [messages]
results = []
for msg in messages:
msg_content = msg.get("content", "") if isinstance(msg, dict) else str(msg)
if msg_content and not msg_content.strip().startswith("## System"):
stored = store_memory(msg_content, tags)
results.append(stored)
skipped = sum(1 for r in results if r.get("skipped"))
stored_count = sum(1 for r in results if not r.get("skipped"))
result = {
"success": True,
"stored": stored_count,
"skipped": skipped,
"message": f"存储完成:{stored_count} 条新记忆,{skipped} 条已存在跳过"
}
print(json.dumps(result, ensure_ascii=False))
return 0
except Exception as e:
result = {"success": False, "error": str(e)}
print(json.dumps(result, ensure_ascii=False))
return 1
# 单条存储
if not content:
result = {"success": False, "error": "缺少 --content 或 --session-file 参数"}
print(json.dumps(result, ensure_ascii=False))
return 1
stored = store_memory(content, tags if tags else None)
print(json.dumps(stored, ensure_ascii=False))
return 0
def cmd_search(args):
"""搜索记忆"""
query = args.query
limit = args.limit or 3
use_rerank = not args.no_rerank
use_llm = args.llm_answer
results = search_memories(
query=query,
limit=limit,
use_rerank=use_rerank,
use_llm=use_llm,
)
if args.json:
print(json.dumps(results, ensure_ascii=False))
else:
if results.get("answer"):
print(f"【总结】{results['answer']}")
print()
for r in results.get("results", []):
date = r.get("date", "")
score = r.get("rerank_score", r.get("score", 0))
chunk = r.get("chunk", "")
print(f"[{date}] (相似度: {score:.2f})")
print(f" {chunk[:200]}")
print()
return 0
def _ngram(text: str, n: int = 2) -> set:
"""字符级 N-gram"""
return {text[i:i+n] for i in range(len(text) - n + 1)}
def _jaccard(text1: str, text2: str, n: int = 2) -> float:
"""字符 N-gram Jaccard 相似度"""
ng1 = _ngram(text1.lower(), n)
ng2 = _ngram(text2.lower(), n)
if not ng1 or not ng2:
return 0.0
return len(ng1 & ng2) / len(ng1 | ng2) if len(ng1 | ng2) > 0 else 0.0
def cmd_dedup(args):
"""去除重复记忆"""
from scripts.search import get_all_chunks
all_chunks = get_all_chunks()
if not all_chunks:
result = {"success": True, "message": "记忆库为空,无需去重", "removed": 0}
if args.json:
print(json.dumps(result, ensure_ascii=False))
else:
print(f"✅ {result['message']}")
return 0
# 简单的文本相似度去重(基于 Jaccard 相似度)
SIMILARITY_THRESHOLD = 0.85
removed = 0
seen = []
for chunk_data in all_chunks:
chunk = chunk_data["chunk"]
is_dup = False
for existing in seen:
jaccard = _jaccard(chunk, existing["chunk"], n=2)
if jaccard > SIMILARITY_THRESHOLD:
# 保留更长的(信息更丰富)
if len(chunk) > len(existing["chunk"]):
existing["duplicate_of"] = chunk_data.get("file", "unknown")
existing["chunk"] = chunk
is_dup = True
removed += 1
break
if not is_dup:
seen.append(chunk_data)
result = {
"success": True,
"message": f"去重完成,移除 {removed} 条重复记忆",
"removed": removed,
"kept": len(seen)
}
if args.json:
print(json.dumps(result, ensure_ascii=False))
else:
print(f"✅ {result['message']},保留 {result['kept']} 条")
return 0
def cmd_prune(args):
"""删除旧记忆"""
days = args.days or 30
cutoff = datetime.now() - timedelta(days=days)
memory_files = get_memory_files()
removed = 0
for mf in memory_files:
# 从文件名提取日期
name = mf.stem # YYYY-MM-DD
try:
file_date = datetime.strptime(name, "%Y-%m-%d")
if file_date < cutoff:
mf.unlink()
removed += 1
except ValueError:
continue
result = {
"success": True,
"message": f"清理完成,删除 {removed} 个过时记忆文件(>{days}天)",
"removed": removed
}
if args.json:
print(json.dumps(result, ensure_ascii=False))
else:
print(f"✅ {result['message']}")
return 0
def cmd_consolidate(args):
"""合并相似记忆"""
from scripts.search import get_all_chunks
all_chunks = get_all_chunks()
if len(all_chunks) < 2:
result = {"success": True, "message": "记忆库条目少于2条,无需合并", "merged": 0}
if args.json:
print(json.dumps(result, ensure_ascii=False))
else:
print(f"✅ {result['message']}")
return 0
# 按日期排序
all_chunks.sort(key=lambda x: x.get("date", ""))
merged = 0
# 合并相邻的相似记忆
i = 0
while i < len(all_chunks) - 1:
curr = all_chunks[i]
next_chunk = all_chunks[i + 1]
jaccard = _jaccard(curr["chunk"], next_chunk["chunk"], n=2)
if jaccard > 0.7:
# 合并:保留更长的
if len(next_chunk["chunk"]) > len(curr["chunk"]):
all_chunks[i] = next_chunk
all_chunks.pop(i + 1)
merged += 1
else:
i += 1
result = {
"success": True,
"message": f"合并完成,{merged} 组相似记忆已合并",
"merged": merged,
"remaining": len(all_chunks)
}
if args.json:
print(json.dumps(result, ensure_ascii=False))
else:
print(f"✅ {result['message']},剩余 {result['remaining']} 条")
return 0
def cmd_list(args):
"""列出记忆文件"""
days = args.days or 7
cutoff = datetime.now() - timedelta(days=days)
memory_files = get_memory_files()
files_info = []
for mf in memory_files:
name = mf.stem
try:
file_date = datetime.strptime(name, "%Y-%m-%d")
if file_date >= cutoff:
files_info.append({
"file": str(mf),
"date": name,
"size": mf.stat().st_size
})
except ValueError:
continue
result = {
"success": True,
"files": files_info,
"count": len(files_info)
}
if args.json:
print(json.dumps(result, ensure_ascii=False))
else:
print(f"📋 最近 {days} 天记忆 ({len(files_info)} 个文件):")
for f in files_info:
print(f" [{f['date']}] {f['file']} ({f['size']} bytes)")
return 0
def cmd_register(args):
"""导出所有工具为 JSON(供 Agent 注册用)"""
from scripts.tools import (
MemorySearchTool, MemoryStoreTool, MemoryDedupTool,
MemoryPruneTool, MemoryConsolidateTool, MemoryListTool
)
tools = [
{
"name": t.name,
"description": t.description,
"input_schema": t.input_schema,
}
for t in [
MemorySearchTool, MemoryStoreTool, MemoryDedupTool,
MemoryPruneTool, MemoryConsolidateTool, MemoryListTool
]
]
print(json.dumps(tools, indent=2, ensure_ascii=False))
return 0
def main():
parser = argparse.ArgumentParser(description="Memory Workflow CLI")
subparsers = parser.add_subparsers(dest="command", help="子命令")
# store
p_store = subparsers.add_parser("store", help="存储记忆")
p_store.add_argument("--content", help="记忆内容(与 --session-file 二选一)")
p_store.add_argument("--session-file", help="从 session 文件读取消息列表存储")
p_store.add_argument("--tag", action="append", help="标签(可多次指定)")
p_store.add_argument("--json", action="store_true", help="JSON 输出")
# search
p_search = subparsers.add_parser("search", help="搜索记忆")
p_search.add_argument("--query", required=True, help="搜索查询")
p_search.add_argument("--limit", type=int, default=3)
p_search.add_argument("--no-rerank", action="store_true", help="禁用 rerank")
p_search.add_argument("--llm-answer", action="store_true", help="启用 LLM 总结")
p_search.add_argument("--json", action="store_true", help="JSON 输出")
# dedup
p_dedup = subparsers.add_parser("dedup", help="去重")
p_dedup.add_argument("--json", action="store_true", help="JSON 输出")
# prune
p_prune = subparsers.add_parser("prune", help="清理旧记忆")
p_prune.add_argument("--days", type=int, default=30)
p_prune.add_argument("--json", action="store_true", help="JSON 输出")
# consolidate
p_cons = subparsers.add_parser("consolidate", help="合并相似记忆")
p_cons.add_argument("--json", action="store_true", help="JSON 输出")
# list
p_list = subparsers.add_parser("list", help="列出记忆")
p_list.add_argument("--days", type=int, default=7)
p_list.add_argument("--json", action="store_true", help="JSON 输出")
# register
subparsers.add_parser("register", help="导出工具定义")
args = parser.parse_args()
if args.command == "store":
return cmd_store(args)
elif args.command == "search":
return cmd_search(args)
elif args.command == "dedup":
return cmd_dedup(args)
elif args.command == "prune":
return cmd_prune(args)
elif args.command == "consolidate":
return cmd_consolidate(args)
elif args.command == "list":
return cmd_list(args)
elif args.command == "register":
return cmd_register(args)
else:
parser.print_help()
return 1
if __name__ == "__main__":
sys.exit(main())
FILE:README.md
# Memory Workflow
你的记忆工作流助手。零外部依赖开箱即用。
## 快速开始
```bash
# 搜索记忆
python memory_ops.py search --query "发哥 项目"
# 存储记忆
python memory_ops.py store --content "发哥在思特奇工作" --tag work
# /save 存储 session
python scripts/save_session.py '{"messages": [...]}'
# 去重 / 清理
python memory_ops.py dedup
python memory_ops.py prune --days 30
```
## 三层存储
| 层 | 说明 |
|----|------|
| 文件 | `memory-workflow-data/memories/*.md` |
| FTS5 | SQLite 全文索引,零依赖 |
| KG | 三元组知识图谱(Ollama LLM 提取,规则降级) |
| Milvus | 向量检索(可选,需要 18779 embedding 服务) |
## 工具(可注册到 Agent)
- `MemorySearch` — 语义搜索
- `MemoryStore` — 存储记忆
- `MemoryDedup` — 去重
- `MemoryPrune` — 清理旧记忆
- `MemoryConsolidate` — 合并相似记忆
- `MemoryList` — 列出记忆文件
智能记忆系统,支持 SQLite(零配置)和 Milvus(向量搜索)后端。用于存储、检索和管理 AI 助手的记忆,支持语义搜索和自动备份。
---
name: rag-memory
description: "智能记忆系统,支持 SQLite(零配置)和 Milvus(向量搜索)后端。用于存储、检索和管理 AI 助手的记忆,支持语义搜索和自动备份。"
homepage: https://github.com/openclaw/skills
metadata: { "openclaw": { "emoji": "🧠", "requires": { "bins": ["python3"] } } }
---
# RAG Memory Skill - 智能记忆系统
## 📚 描述
支持多种后端的智能记忆系统,可选择:
- **SQLite**(默认):开箱即用,无需额外服务
- **Milvus**:向量数据库,支持语义搜索
- **ChromaDB**:轻量级向量数据库(待实现)
## 🏗️ 架构
### SQLite 模式(默认)
```
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ OpenClaw │ ──> │ RAG Memory │ ──> │ SQLite │
│ (记忆请求) │ │ (技能模块) │ │ (本地 DB) │
└─────────────┘ └──────────────┘ └─────────────┘
```
### Milvus 模式(可选)
```
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ OpenClaw │ ──> │ RAG Memory │ ──> │ Milvus │
│ (记忆请求) │ │ (技能模块) │ │ (向量存储) │
└─────────────┘ └──────────────┘ └─────────────┘
│
v
┌──────────────┐
│ Ollama │
│ (可选) │
│ 嵌入生成 │
└──────────────┘
```
## 🔧 配置
### 快速开始(SQLite - 无需配置)
```bash
# 直接使用,零配置
python -c "from rag_memory import store, search; store('测试记忆')"
```
### Milvus 模式(高级用户)
```bash
# 环境变量配置
export RAG_MEMORY_BACKEND=milvus
export MILVUS_URL=http://localhost:19530
export OLLAMA_URL=http://localhost:11434 # 可选,用于向量搜索
# 安装依赖
pip install pymilvus
```
### 所有环境变量
```bash
RAG_MEMORY_BACKEND=sqlite # sqlite | milvus | chromadb
RAG_MEMORY_SQLITE_DB=./memory.db # SQLite 数据库路径
MILVUS_URL=http://localhost:19530 # Milvus 服务地址
OLLAMA_URL=http://localhost:11434 # Ollama 服务地址(可选)
RAG_MEMORY_COLLECTION=openclaw_memory # 集合名称
RAG_MEMORY_BACKUP_DIR=./memory_backup # 备份目录
```
### 依赖
**最小依赖(SQLite 模式)**:
```bash
pip install requests
```
**完整依赖(Milvus 模式)**:
```bash
pip install requests pymilvus
```
## 📖 使用方法
### Python API
```python
from rag_memory import store, search
# 存储记忆
memory_id = store("今天讨论了 RAG 系统", {"type": "conversation", "topic": "RAG"})
# 搜索记忆
results = search("RAG 系统讨论", top_k=3)
# 删除记忆
from rag_memory import get_memory
get_memory().delete_memory(memory_id)
```
### 功能说明
| 函数 | 说明 | 参数 | 返回值 |
|------|------|------|--------|
| `store()` | 存储记忆 | content: str, metadata: Dict | memory_id: int |
| `search()` | 搜索记忆 | query: str, top_k: int | List[Dict] |
| `get_memory()` | 获取实例 | - | RAGMemory |
## 📊 记忆数据结构
```json
{
"id": 1,
"content": "今天讨论了 RAG 系统",
"timestamp": "2026-03-18T15:30:00",
"metadata": {
"type": "conversation",
"topic": "RAG"
},
"distance": 0.85 // 仅 Milvus 模式有
}
```
## 🔄 后端对比
| 特性 | SQLite | Milvus |
|------|--------|--------|
| 安装难度 | ⭐ 零配置 | ⭐⭐⭐ 需要 Docker |
| 向量搜索 | ❌ 不支持 | ✅ 支持 |
| 搜索方式 | 最近优先 | 语义相似度 |
| 适用场景 | 个人使用 | 生产环境 |
| 资源占用 | 低 | 中 - 高 |
## ⚠️ 注意事项
1. **SQLite 模式**:开箱即用,无需额外配置
2. **Milvus 模式**:需要 Docker 运行 Milvus 服务
3. **Ollama**:可选,用于向量嵌入生成
4. **备份机制**:自动备份到 JSON 文件
## 🐛 故障排除
### 问题:无法导入 pymilvus
```bash
# 仅 Milvus 模式需要
pip install pymilvus
```
### 问题:无法连接 Milvus
```bash
# 检查 Milvus 服务
docker ps | grep milvus
curl http://localhost:19530/v1/version
```
### 问题:嵌入生成失败
```bash
# 检查 Ollama 服务(可选功能)
curl http://localhost:11434/api/tags
ollama pull bge-m3 # 如需使用
```
## 📦 发布到 ClawHub
### 打包
```bash
cd /app/skills/rag-memory
tar -czf rag-memory.tar.gz SKILL.md rag_memory.py
```
### 上传
1. 访问 https://clawhub.com
2. 创建开发者账号
3. 上传 `rag-memory.tar.gz`
4. 填写技能描述和配置说明
### 用户安装
```bash
openclaw skills install rag-memory
```
## 📝 更新日志
- **2026-03-18**: 初始版本,替代文件记忆系统
- 自动备份到 JSON 文件
- 语义搜索功能
- 元数据支持
FILE:rag_memory.py
#!/usr/bin/env python3
"""
RAG 记忆管理系统 - 通用版
支持多种后端:Milvus、ChromaDB、SQLite(默认)
"""
import requests
import json
import os
from datetime import datetime
from typing import List, Dict, Optional
# 配置 - 从环境变量读取
BACKEND = os.getenv("RAG_MEMORY_BACKEND", "sqlite") # sqlite | milvus | chromadb
# 容器环境检测 - 自动选择正确的宿主机地址
def _get_host_url(default_port: int) -> str:
"""自动检测宿主机 URL"""
import socket
try:
# 尝试解析 host.docker.internal(Docker Desktop/WSL2)
socket.gethostbyname("host.docker.internal")
return f"http://host.docker.internal:{default_port}"
except:
pass
try:
# 尝试 Docker 网关
with open("/etc/resolv.conf", "r") as f:
for line in f:
if line.startswith("nameserver"):
gateway = line.split()[1].strip()
return f"http://{gateway}:{default_port}"
except:
pass
# 默认回退到 localhost
return f"http://localhost:{default_port}"
OLLAMA_URL = os.getenv("OLLAMA_URL", _get_host_url(11434))
MILVUS_URL = os.getenv("MILVUS_URL", _get_host_url(19530))
CHROMADB_URL = os.getenv("CHROMADB_URL", _get_host_url(8000))
COLLECTION_NAME = os.getenv("RAG_MEMORY_COLLECTION", "openclaw_memory")
BACKUP_DIR = os.getenv("RAG_MEMORY_BACKUP_DIR", "./memory_backup")
SQLITE_DB = os.getenv("RAG_MEMORY_SQLITE_DB", "./memory.db")
# 确保备份目录存在
os.makedirs(BACKUP_DIR, exist_ok=True)
class SQLiteMemory:
"""SQLite 后端 - 无需额外依赖,开箱即用"""
def __init__(self, db_path: str):
import sqlite3
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
embedding TEXT,
timestamp TEXT,
metadata TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def store(self, content: str, metadata: Dict = None, embedding: List[float] = None) -> int:
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO memories (content, embedding, timestamp, metadata)
VALUES (?, ?, ?, ?)
''', (
content,
json.dumps(embedding) if embedding else None,
datetime.now().isoformat(),
json.dumps(metadata or {})
))
self.conn.commit()
return cursor.lastrowid
def search(self, query: str, top_k: int = 5) -> List[Dict]:
# SQLite 不支持向量搜索,返回最近的记忆
cursor = self.conn.cursor()
cursor.execute('''
SELECT id, content, timestamp, metadata
FROM memories
ORDER BY created_at DESC
LIMIT ?
''', (top_k,))
results = []
for row in cursor.fetchall():
results.append({
"id": row[0],
"content": row[1],
"timestamp": row[2],
"metadata": json.loads(row[3]) if row[3] else {},
"distance": 0.0 # SQLite 不支持向量相似度
})
return results
def delete(self, memory_id: int):
cursor = self.conn.cursor()
cursor.execute('DELETE FROM memories WHERE id = ?', (memory_id,))
self.conn.commit()
def close(self):
self.conn.close()
class MilvusMemory:
"""Milvus 后端 - 需要 Milvus 服务"""
def __init__(self, url: str, collection: str):
from pymilvus import MilvusClient
self.client = MilvusClient(uri=url)
self.collection = collection
self._init_collection()
def _init_collection(self):
if not self.client.has_collection(self.collection):
self.client.create_collection(
collection_name=self.collection,
dimension=1024,
auto_id=True
)
def store(self, content: str, metadata: Dict = None, embedding: List[float] = None) -> int:
result = self.client.insert(
collection_name=self.collection,
data=[{
"vector": embedding or [],
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": json.dumps(metadata or {})
}]
)
return result["ids"][0] if result["ids"] else None
def search(self, query: str, top_k: int = 5, query_embedding: List[float] = None) -> List[Dict]:
if not query_embedding:
return []
results = self.client.search(
collection_name=self.collection,
data=[query_embedding],
limit=top_k,
output_fields=["content", "timestamp", "metadata"]
)
formatted = []
if results and len(results) > 0:
for match in results[0]:
formatted.append({
"id": match["id"],
"content": match["entity"]["content"],
"timestamp": match["entity"]["timestamp"],
"metadata": json.loads(match["entity"]["metadata"]) if match["entity"]["metadata"] else {},
"distance": match["distance"]
})
return formatted
def delete(self, memory_id: int):
self.client.delete(collection_name=self.collection, ids=[memory_id])
class RAGMemory:
def __init__(self, backend: str = None):
self.backend_name = backend or BACKEND
self.ollama_url = OLLAMA_URL
self._init_backend()
def _init_backend(self):
"""初始化后端"""
if self.backend_name == "milvus":
self.backend = MilvusMemory(MILVUS_URL, COLLECTION_NAME)
elif self.backend_name == "chromadb":
# TODO: 实现 ChromaDB 后端
print("⚠️ ChromaDB 后端暂未实现,使用 SQLite 代替")
self.backend = SQLiteMemory(SQLITE_DB)
else:
self.backend = SQLiteMemory(SQLITE_DB)
print(f"✅ 使用 SQLite 后端:{SQLITE_DB}")
def get_embedding(self, text: str) -> Optional[List[float]]:
"""获取嵌入(需要 Ollama 服务)"""
try:
response = requests.post(
f"{self.ollama_url}/api/embed",
json={"model": "bge-m3", "input": text},
timeout=30
)
if response.status_code == 200:
data = response.json()
return data["embeddings"][0]
except Exception as e:
print(f"⚠️ 嵌入生成失败:{e},将不使用向量搜索")
return None
def store_memory(self, content: str, metadata: Dict = None) -> int:
"""存储记忆"""
embedding = self.get_embedding(content)
memory_id = self.backend.store(content, metadata, embedding)
self._backup_to_file(content, metadata, memory_id)
print(f"✅ 记忆已存储:{content[:50]}...")
return memory_id
def search_memories(self, query: str, top_k: int = 5) -> List[Dict]:
"""搜索记忆"""
embedding = self.get_embedding(query)
if self.backend_name == "milvus" and embedding:
return self.backend.search(query, top_k, embedding)
else:
# SQLite 返回最近记忆
return self.backend.search(query, top_k)
def _backup_to_file(self, content: str, metadata: Dict, memory_id: int):
"""备份到文件"""
date_str = datetime.now().strftime("%Y-%m-%d")
backup_file = os.path.join(BACKUP_DIR, f"{date_str}.json")
backups = []
if os.path.exists(backup_file):
with open(backup_file, "r", encoding="utf-8") as f:
backups = json.load(f)
backups.append({
"id": memory_id,
"content": content,
"timestamp": datetime.now().isoformat(),
"metadata": metadata or {}
})
with open(backup_file, "w", encoding="utf-8") as f:
json.dump(backups, f, ensure_ascii=False, indent=2)
def delete_memory(self, memory_id: int) -> bool:
self.backend.delete(memory_id)
print(f"✅ 记忆已删除:ID={memory_id}")
return True
def close(self):
if hasattr(self.backend, 'close'):
self.backend.close()
# 全局实例
_memory_instance: Optional[RAGMemory] = None
def get_memory() -> RAGMemory:
global _memory_instance
if _memory_instance is None:
_memory_instance = RAGMemory()
return _memory_instance
def store(content: str, metadata: Dict = None) -> int:
return get_memory().store_memory(content, metadata)
def search(query: str, top_k: int = 5) -> List[Dict]:
return get_memory().search_memories(query, top_k)
if __name__ == "__main__":
print(f"🚀 RAG 记忆系统 (后端:{BACKEND})")
# 测试
memory_id = store("测试记忆", {"type": "test"})
print(f"存储 ID: {memory_id}")
results = search("测试")
print(f"搜索结果:{results}")
FILE:README.md
# RAG Memory - 智能记忆系统
> 🧠 为 AI 助手设计的智能记忆管理技能,支持多种后端存储方案
## ✨ 特性
- 🎯 **多后端支持**:SQLite(默认)、Milvus(可选)
- 🚀 **开箱即用**:SQLite 模式零配置
- 🔍 **语义搜索**:Milvus 模式支持向量相似度搜索
- 💾 **自动备份**:所有记忆自动备份到 JSON 文件
- 🔒 **隐私优先**:本地存储,数据不出境
- 📦 **易于集成**:简单的 Python API
## 🚀 快速开始
### 安装
```bash
openclaw skills install rag-memory
```
### 使用
```python
from rag_memory import store, search
# 存储记忆
memory_id = store("今天天气很好", {"type": "observation", "category": "weather"})
# 搜索记忆
results = search("天气", top_k=3)
for r in results:
print(f"- {r['content']}")
```
## 📖 详细使用
### 存储记忆
```python
from rag_memory import store
# 简单存储
memory_id = store("这是一条记忆")
# 带元数据存储
memory_id = store(
"用户设置了提醒",
{
"type": "reminder",
"priority": "high",
"category": "task"
}
)
```
### 搜索记忆
```python
from rag_memory import search
# 基本搜索
results = search("提醒事项", top_k=5)
# 处理结果
for result in results:
print(f"ID: {result['id']}")
print(f"内容:{result['content']}")
print(f"时间:{result['timestamp']}")
print(f"元数据:{result['metadata']}")
print(f"相似度:{result['distance']}") # 仅 Milvus 模式
```
### 高级功能
```python
from rag_memory import get_memory
memory = get_memory()
# 删除记忆
memory.delete_memory(memory_id)
# 清空所有记忆
memory.clear_all()
```
## ⚙️ 配置
### 默认配置(SQLite)
无需任何配置,安装即可使用。
### 高级配置(Milvus)
```bash
# 环境变量
export RAG_MEMORY_BACKEND=milvus
export MILVUS_URL=http://localhost:19530
export OLLAMA_URL=http://localhost:11434 # 可选,用于向量搜索
# 安装依赖
pip install pymilvus
```
### 所有环境变量
| 变量名 | 默认值 | 说明 |
|--------|--------|------|
| `RAG_MEMORY_BACKEND` | `sqlite` | 后端类型:sqlite/milvus |
| `RAG_MEMORY_SQLITE_DB` | `./memory.db` | SQLite 数据库路径 |
| `MILVUS_URL` | `http://localhost:19530` | Milvus 服务地址 |
| `OLLAMA_URL` | `http://localhost:11434` | Ollama 服务地址 |
| `RAG_MEMORY_COLLECTION` | `openclaw_memory` | 集合名称 |
| `RAG_MEMORY_BACKUP_DIR` | `./memory_backup` | 备份目录 |
## 📊 后端对比
| 特性 | SQLite | Milvus |
|------|--------|--------|
| 安装难度 | ⭐ 零配置 | ⭐⭐⭐ 需要 Docker |
| 向量搜索 | ❌ | ✅ |
| 搜索方式 | 时间优先 | 语义相似度 |
| 资源占用 | 低 | 中 - 高 |
| 适用场景 | 个人使用 | 生产环境 |
## 🗂️ 数据结构
### 记忆对象
```json
{
"id": 1,
"content": "今天天气很好",
"timestamp": "2026-03-18T15:30:00",
"metadata": {
"type": "observation",
"category": "weather"
},
"distance": 0.85
}
```
### 元数据建议
```python
# 对话记录
{"type": "conversation", "topic": "RAG", "participants": ["user", "assistant"]}
# 任务提醒
{"type": "reminder", "priority": "high", "due": "2026-03-19"}
# 用户偏好
{"type": "preference", "category": "settings"}
# 项目信息
{"type": "project", "name": "RAG System", "status": "active"}
```
## 🔧 故障排除
### 问题:无法导入 pymilvus
```bash
# 仅 Milvus 模式需要
pip install pymilvus
# 或切换回 SQLite 模式
export RAG_MEMORY_BACKEND=sqlite
```
### 问题:无法连接 Milvus
```bash
# 检查服务状态
docker ps | grep milvus
curl http://localhost:19530/v1/version
```
### 问题:嵌入生成失败
```bash
# 检查 Ollama 服务(可选功能)
curl http://localhost:11434/api/tags
# 拉取模型(如需使用)
ollama pull bge-m3
```
## 📦 开发
### 本地测试
```bash
cd /app/skills/rag-memory
python rag_memory.py
```
### 打包
```bash
tar -czf rag-memory.tar.gz SKILL.md rag_memory.py README.md
```
## 📝 更新日志
### v1.0.0 (2026-03-18)
- ✅ SQLite 后端支持
- ✅ Milvus 后端支持
- ✅ 自动备份功能
- ✅ 语义搜索(Milvus 模式)
- ✅ 元数据支持
## 🤝 贡献
欢迎提交 Issue 和 Pull Request!
## 📄 许可证
MIT License
## 📞 支持
- 文档:https://docs.openclaw.ai/skills/rag-memory
- 社区:https://discord.com/invite/clawd
- ClawHub:https://clawhub.com/skills/rag-memory