@clawhub-oreoandyuumi-3e47259c0e
一个管理所有maxclaw内部skill的技能,主要实时管理所有安装维护im的skill以及一些官方自己维护的skill。当用户做某些操作时,可以翻看这个技能内容,看看有没有一些官方的更新内容来辅助操作。
--- name: mmxagent-skill-manager description: 一个管理所有maxclaw内部skill的技能,主要实时管理所有安装维护im的skill以及一些官方自己维护的skill。当用户做某些操作时,可以翻看这个技能内容,看看有没有一些官方的更新内容来辅助操作。 --- # IM 相关 在涉及一些相关的im操作时,你需要检查用户是否已安装对应的skill。如果未安装或需要更新,**禁止静默安装**,必须先向用户说明:这是官方提供的skill,用于获取最新的IM操作能力,建议安装或更新。待用户确认同意后再执行。 ## 飞书skill mmxagent-skill-feishu ```bash ## 安装 openclaw skill install mmxagent-skill-feishu ## 更新 openclaw skill update mmxagent-skill-feishu ``` ## 企业微信skill mmxagent-skill-wecom ```bash ## 安装 openclaw skill install mmxagent-skill-wecom ## 更新 openclaw skill update mmxagent-skill-wecom ``` ## 钉钉 mmxagent-skill-dingtalk ```bash ## 安装 openclaw skill install mmxagent-skill-dingtalk ## 更新 openclaw skill update mmxagent-skill-dingtalk ``` ## 微信 mmxagent-skill-wechat ```bash ## 安装 openclaw skill install mmxagent-skill-wechat ## 更新 openclaw skill update mmxagent-skill-wechat ``` # openclaw管理 因为openclaw经常更新,但是有一些版本会有问题,需要用官方的skill来帮助用户按需升级,包括一些版本的openclaw的bug修复,或者一些版本的openclaw的安装问题等。**禁止静默安装**,必须先向用户说明:这是官方提供的openclaw管理skill,用于辅助版本升级和问题修复,建议安装或更新。待用户确认同意后再执行。 skill名称:mmxagent-skill-maxclaw ```bash ## 安装 openclaw skill install mmxagent-skill-maxclaw ## 更新 openclaw skill update mmxagent-skill-maxclaw ```
连接个人微信(不是企业微信)。用户说"连接个人微信"、"接入个人微信"、"绑定个人微信"、"个人微信扫码"时使用本 skill。注意:如果用户说的是"企业微信"或"企微",本 skill 不适用,请使用 wecom-connect skill。一旦匹配本 skill,必须严格按流程执行到底,不得跳步或自由发挥。
---
name: mmxagent-skill-wechat
description: 连接个人微信(不是企业微信)。用户说"连接个人微信"、"接入个人微信"、"绑定个人微信"、"个人微信扫码"时使用本 skill。注意:如果用户说的是"企业微信"或"企微",本 skill 不适用,请使用 wecom-connect skill。一旦匹配本 skill,必须严格按流程执行到底,不得跳步或自由发挥。
---
# 个人微信 Connect Skill
## ⚠️ 匹配规则(严格区分,不得混淆)
**本 skill 仅适用于个人微信,以下关键词触发:**
- "连接个人微信" / "接入个人微信" / "绑定个人微信"
- "个人微信扫码" / "微信扫码登录"
- "连接微信"(未带"企业"二字时,默认为个人微信)
**以下关键词不属于本 skill,禁止触发:**
- "企业微信" / "企微" / "wecom" / "WeCom" → 使用 wecom-connect skill
**一旦读取本 skill,必须严格按下方流程从第 0 步执行到底,不得跳步、不得自由发挥、不得读取其他文档。**
## 核心原则
- **严格按步骤走,不要加戏,不要自作聪明加额外命令。**
- **二维码相关步骤(第 2-3 步)速度优先!** 二维码有效期仅约 1 分钟,获取和生成二维码的命令用 `&&` 链起来一次执行。其他步骤(版本检测、插件安装、凭证写入等)可以分步执行,便于用户检查中间输出。
- **`qrcode_img_content` 是一个 URL 字符串,必须用 `qrcode` npm 库把它编码成 PNG 图片。** 这个 URL 指向的是网页不是图片,绝对禁止用 `curl -o` 直接下载。
- **二维码展示以本地 workspace 文件为主。** 优先引导用户打开 `~/workspace/weixin_qr.png`。如果用户环境不便打开本地文件,可使用 `upload_to_cdn` 作为辅助手段(注意:CDN 上传会将一次性授权二维码暴露到外部,仅在必要时使用)。
- **凭证写入前必须展示并确认。** 写入 token 等敏感信息到磁盘前,必须先向用户展示即将写入的内容(accountId、token 前 8 位 + 掩码、baseUrl、userId),等用户确认后再执行写入。
- **不要自动轮询。** 给完二维码等用户说"扫完了"再轮询。
- **不要手动改写 `openclaw.json`。**
- **运行时安装的包说明:** 本 skill 会在运行时通过 npm/npx 安装以下包:① `openclaw`(OpenClaw CLI 工具)来源 registry.npmmirror.com ② `@tencent-weixin/openclaw-weixin-cli`(腾讯官方微信插件)③ `qrcode`(二维码生成库,仅安装在 /tmp 临时目录)。执行前应确认用户信任这些包来源。
## 执行流程
### 第 0 步:检测 openclaw 版本,决定安装路径
```bash
openclaw --version 2>&1 | head -1
```
从输出中提取版本号(格式:`OpenClaw 2026.X.Y`),按以下规则分支:
| 条件 | 路径 |
|---|---|
| 版本 ≤ 2026.3.x(含 3.13) | **路径 A**:先升级插件,再升级 openclaw |
| 版本 > 2026.3.13(如 2026.3.23 及以上) | **路径 B**:先升级插件,再更新 openclaw 版本 |
| 未安装 / 命令不存在 | 告诉用户先安装 openclaw:`npm install -g openclaw@latest --registry=https://registry.npmmirror.com`,然后重新从第 0 步开始 |
---
### 第 1 步:安装插件(根据路径分支)
#### 路径 A:旧版本(≤ 2026.3.13)→ 先删旧插件,再装新插件,最后升级 openclaw
**1a. 检测并删除旧版微信插件(1 次 exec):**
先检查是否已安装旧版微信插件,如果存在则必须先删除,否则新版插件安装会冲突:
```bash
openclaw plugin list 2>&1 | grep -i weixin
```
如果输出中包含 weixin 相关插件,则执行卸载:
```bash
openclaw plugin uninstall openclaw-weixin 2>&1
```
如果未检测到旧插件,跳过卸载直接进入 1b。
**1b. 安装最新版微信插件(1 次 exec):**
```bash
npx -y @tencent-weixin/openclaw-weixin-cli@latest install 2>&1
```
**1c. 升级 openclaw 到最新版(1 次 exec):**
```bash
npm install -g openclaw@latest --registry=https://registry.npmmirror.com 2>&1 | tail -3
```
升级完成后告知用户:"旧插件已清理,新插件已安装,openclaw 已更新到最新版。"
#### 路径 B:较新版本(> 2026.3.13)→ 先升级插件,再更新 openclaw
**1a. 先安装/升级微信插件(1 次 exec):**
```bash
npx -y @tencent-weixin/openclaw-weixin-cli@latest install 2>&1
```
**1b. 再升级 openclaw 到最新版(1 次 exec):**
```bash
npm install -g openclaw@latest --registry=https://registry.npmmirror.com 2>&1 | tail -3
```
升级完成后告知用户:"插件已安装,openclaw 已更新到最新版。"
---
两条路径安装完成后,输出包含 `success` 或 `installed` 关键词则继续;否则告知用户安装失败,需人工排查。
### 第 2 步:获取二维码(1 次 exec)
```bash
curl -s "https://ilinkai.weixin.qq.com/ilink/bot/get_bot_qrcode?bot_type=3"
```
从返回 JSON 提取:
- `qrcode` — 保存,轮询用
- `qrcode_img_content` — **这是一个 URL 字符串**,下一步用 `qrcode` npm 库将它编码为 PNG 图片
### 第 3 步:生成 PNG 并保存到 workspace
**二维码相关步骤需要快速完成(有效期约 1 分钟)。**
```bash
cd /tmp && npm install qrcode 2>/dev/null | tail -1
```
```bash
cd /tmp && node -e "const qr=require('qrcode'); qr.toFile('/tmp/weixin_qr.png','<qrcode_img_content>',{width:400,margin:2},(e)=>{if(e)console.error(e);else console.log('saved');})"
```
**保存到 workspace(必须执行):**
```bash
cp /tmp/weixin_qr.png ~/workspace/weixin_qr.png
```
**CDN 上传(可选,仅在用户无法打开本地文件时使用):**
⚠️ 注意:CDN 上传会将一次性授权二维码暴露到外部网络。仅在用户明确表示无法打开本地文件、或处于纯远程 chat 环境时才使用。
```
upload_to_cdn /tmp/weixin_qr.png
```
CDN 上传失败不阻塞流程,直接用 workspace 本地文件。
### 第 4 步:展示二维码,等用户扫码
**默认展示方式(本地文件优先):**
---
## 微信扫码登录
二维码已生成并保存到 `~/workspace/weixin_qr.png`,请打开文件后用**微信**扫码。
**操作步骤:**
1. 打开 `~/workspace/weixin_qr.png` 文件
2. 用手机**微信** App 扫一扫
3. 在手机上确认登录
4. 扫完告诉我"ok",我会继续后续步骤
⏱ 有效期:约 1 分钟,如果过期了告诉我"过期了",我会立即生成新的二维码。
如果打开本地文件不方便,告诉我,我可以通过 CDN 上传后提供在线链接(注意:二维码会短暂暴露到外部网络)。
---
**如果用户要求 CDN 链接(已通过 upload_to_cdn 上传成功):**
---
## 微信扫码登录
用**微信**扫一扫下面的二维码:
<CDN 图片 URL>
(本地备份:~/workspace/weixin_qr.png)
扫完告诉我"ok",我会继续后续步骤。
⏱ 有效期:约 1 分钟,如果过期了告诉我"过期了",我会立即生成新的二维码。
---
然后**停下来,等用户确认**。
### 第 5 步:用户确认后 → 轮询 + 写凭证 + 重启
**5a. 轮询状态(必须加 `--max-time 10`,此 API 是长轮询):**
```bash
curl -s --max-time 10 "https://ilinkai.weixin.qq.com/ilink/bot/get_qrcode_status?qrcode=<qrcode>"
```
| status | 处理 |
|---|---|
| 超时(exit code 28)或 `wait` | 等 3 秒再 poll |
| `scaned` | 告诉用户"已扫码,请在手机上确认登录" |
| `confirmed` | 成功!提取 `ilink_bot_id`、`bot_token`、`baseurl`、`ilink_user_id` |
| `expired` | 从第 2 步重来(不需要重新装插件) |
**5b. 展示凭证并等待用户确认(confirmed 后必须执行):**
将 `ilink_bot_id` 中的 `@` → `-`、`.` → `-` 得到 `accountId`(例:`[email protected]` → `a34b410e2e6f-im-bot`)。
⚠️ **API 返回的 `bot_token` 已包含 `ilink_bot_id:` 前缀,直接用 `bot_token` 的值作为 token,不要再拼接 `ilink_bot_id:`,否则 token 双重前缀、认证失败(errcode -14)。**
**先向用户展示即将写入的凭证信息,等待确认:**
---
## 即将写入以下凭证
| 字段 | 值 |
|---|---|
| accountId | `<accountId>` |
| token | `<bot_token 前 8 位>****(已脱敏)` |
| baseUrl | `<baseurl>` |
| userId | `<ilink_user_id>` |
写入位置:`~/.openclaw/openclaw-weixin/accounts/<accountId>.json`
确认写入请回复"ok",如有疑问请告诉我。
---
**用户确认后,执行写入:**
```bash
cat > /tmp/write_weixin_account.js << 'SCRIPT'
const fs = require('fs');
const path = require('path');
const home = process.env.HOME;
const accountId = '<accountId>';
const data = {
token: '<bot_token>',
savedAt: new Date().toISOString(),
baseUrl: '<baseurl>',
userId: '<ilink_user_id>'
};
const accountsDir = path.join(home, '.openclaw/openclaw-weixin/accounts');
fs.mkdirSync(accountsDir, { recursive: true });
const accountFile = path.join(accountsDir, accountId + '.json');
fs.writeFileSync(accountFile, JSON.stringify(data, null, 2));
fs.chmodSync(accountFile, 0o600);
const indexPath = path.join(home, '.openclaw/openclaw-weixin/accounts.json');
let existing = [];
try { existing = JSON.parse(fs.readFileSync(indexPath, 'utf-8')); } catch {}
if (!existing.includes(accountId)) existing.push(accountId);
fs.writeFileSync(indexPath, JSON.stringify(existing, null, 2));
console.log('凭证 + 索引写入成功');
SCRIPT
node /tmp/write_weixin_account.js
```
⚠️ `<accountId>`、`<bot_token>`、`<baseurl>`、`<ilink_user_id>` 是占位符,写入脚本时必须替换为 5a 步 confirmed 返回的真实值。
**5c. 重启 Gateway:**
```bash
openclaw gateway restart
```
### 成功回复
---
## 微信连接结果
- ✅ 状态:已绑定成功
- ✅ Gateway:已重启
- ilink_bot_id: `<ilink_bot_id>`
- ilink_user_id: `<ilink_user_id>`
现在可以直接在微信里发消息了 🎉
---
到此为止。不要再做任何额外操作。不要读文档。
## 二维码过期处理
用户说"过期了" → 只需从第 2 步重新获取二维码,**不需要重新执行第 0、1 步**。
## 绝对禁止
- ❌ **禁止用 `curl -o` / `curl --output` / `wget` 下载 `qrcode_img_content` URL** — 这个 URL 指向的是 HTML 网页不是 PNG 图片,下载得到的是一堆 HTML 代码。必须用 `qrcode` npm 库的 `toFile()` 将 URL 字符串编码为 PNG
- ❌ **禁止用 canvas、browser、Generate Images、Image Understanding、MEDIA: 语法、终端字符渲染等方式展示二维码**
- ❌ 禁止自动开轮询(等用户说"扫完了 / ok")
- ❌ 禁止手动改写 `openclaw.json`
- ❌ 禁止跳过写凭证步骤(不写凭证插件无法连接)
- ❌ 禁止给用户 `qrcode_img_content` 原始 URL
- ❌ 禁止使用第三方上传服务(0x0.st、catbox、imgbb 等),CDN 仅限 `upload_to_cdn`
- ❌ 禁止在 token 前拼接 `ilink_bot_id:`(bot_token 已包含前缀)
- ❌ 禁止在 PNG 生成成功后再用 curl 下载覆盖文件
- ❌ 禁止跳过第 0 步版本检测直接进第 1 步
- ❌ 禁止跳过凭证展示确认步骤直接写入磁盘
## 一句话总结
检测版本(第 0 步)→ 先删旧插件(如有)再装新插件再更新 openclaw(第 1 步)→ curl 拿二维码(第 2 步)→ npm install qrcode + 生成 PNG → 保存到 workspace(+ 可选 CDN 上传)(第 3 步)→ 展示二维码等用户扫(第 4 步)→ 轮询确认 → 展示凭证等用户确认 → 写凭证 → 重启 gateway(第 5 步)→ 完成。
连接企业微信。调用 generate 接口获取授权链接,用户把链接发到企业微信聊天里点开完成授权。用户提到连接企业微信、接入企微、绑定企微机器人、创建企微机器人、扫码绑定企微时使用。
--- name: mmxagent-skill-wecom description: 连接企业微信。调用 generate 接口获取授权链接,用户把链接发到企业微信聊天里点开完成授权。用户提到连接企业微信、接入企微、绑定企微机器人、创建企微机器人、扫码绑定企微时使用。 --- # WeCom Connect Skill ## 适用场景 当用户要求连接企业微信、绑定企微机器人、创建新的企微机器人、给 OpenClaw 接入企业微信、或明确提到"扫码绑定企微"时,直接使用本流程。 ## 核心原则 - **不生成 PNG,不走 CDN**。generate 拿到 `auth_url` 后直接给用户,用户把链接发到企业微信聊天里点开完成授权。 - **不要自动开轮询**。给完链接等用户说"配好了"再轮询。 - **不要手动卸载/禁用插件**。 ## 执行流程(写死,照抄执行) ### 第 1 步:调用 generate 接口 ```bash curl -s "https://work.weixin.qq.com/ai/qc/generate?source=wecom-cli&plat=3" ``` 从返回值提取 `scode` 和 `auth_url`。`scode` 留着后面轮询用,`auth_url` 直接给用户。 ### 第 2 步:把 auth_url 给用户,等用户回复 ```markdown ## 企业微信二维码 复制下面的链接,发到你的企业微信任意聊天里,然后点击打开: <auth_url> **操作步骤:** 1. 打开手机上的 **企业微信 App**(不是微信) 2. 随便找一个聊天对话框,把上面的链接通过发消息的方式发过去 3. 点击消息里的链接,在企业微信内打开 4. 在页面中完成授权确认 5. **配置完成后告诉我** 有效期:3 分钟 ``` 然后**停下来,等用户说"配好了"**。 ## 用户确认后:轮询 + 写配置 用户说"配好了 / 扫完了 / done / ok"后: ```bash curl -s "https://work.weixin.qq.com/ai/qc/query_result?scode=<scode>" ``` 轮询间隔 3 秒,超时 3 分钟。成功条件:`data.status === 'success'` 且 `data.bot_info.botid` 和 `data.bot_info.secret` 存在。 拿到后写入 `~/.openclaw/openclaw.json`: ```text channels.wecom.botId = <botId> channels.wecom.secret = <secret> channels.wecom.enabled = true ``` 然后 `openclaw gateway restart`。 超时则重新走第 1~2 步。 ## 成功回复模板 ```markdown ## 企业微信连接结果 - 状态:已绑定成功 - 机器人凭证:已获取(botId: `<botId>`) - OpenClaw 配置:已写入 - Gateway:已重启 ``` ## 绝对禁止 - **禁止生成 PNG / 走 CDN / Batch Upload**:直接给 auth_url 链接。 - **禁止自动开轮询**:等用户说"配好了"。 - **禁止手动卸载/禁用插件**。 ## 一句话总结 generate → 给用户链接 → 用户发到企业微信聊天里点开授权 → 等用户说"配好了" → 轮询 → 写配置。
连接飞书机器人。用接口直连替换 `feisu.md` 的第 1 步,拿到链接(必须拼接 from=maxclaw)并在用户完成配置后 poll 出 `client_id/client_secret`,再继续走 `feisu.md` 的第 2 步和第 3 步。适用于连接飞书、接入飞书、绑定飞书机器人、创建飞书机器人...
--- name: mmxagent-skill-feishu description: 连接飞书机器人。用接口直连替换 `feisu.md` 的第 1 步,拿到链接(必须拼接 from=maxclaw)并在用户完成配置后 poll 出 `client_id/client_secret`,再继续走 `feisu.md` 的第 2 步和第 3 步。适用于连接飞书、接入飞书、绑定飞书机器人、创建飞书机器人、OpenClaw 接飞书等场景。 --- # Feishu Connect Skill ## 适用场景 用户要求连接飞书、绑定飞书机器人、创建飞书机器人、OpenClaw 接飞书时触发。 ## 最高优先级规则(必须遵守,违反即失败) 1. 给用户的链接必须拼接 `&from=maxclaw`,没有这个参数的链接是错误的。 2. 这个 skill 只替换 `feisu.md` 的第 1 步;用户回复"ok"或"配置好了"之后,不要再重复执行 `feisu.md` 的第 1 步,直接继续走 `feisu.md` 的第 2 步和第 3 步。 3. 用户回复"ok"或"配置好了"之后,禁止输出任何关于"配置权限"、"开启机器人能力"、"配置事件订阅"、"发布应用"、"管理员审批"、"权限管理"、"批量导入"的内容。 ## 核心原则 - 直接走接口直连模式,不要先跑官方安装器(`npx -y @larksuite/openclaw-lark install`)。 - 这个 skill 仅用于替换 `feisu.md` 的第 1 步;拿到凭证后,后续配置和配对继续按 `feisu.md` 的第 2 步和第 3 步执行。 - 链接必须裸露返回,不要用反引号、代码块包裹,用户必须能直接点击。 - 不要自动开轮询,给链接后等用户确认再继续。 - 不要生成二维码图片。 - 不要手动卸载/禁用/删除插件,不要手动改写配置文件。 ## 执行流程 ### 第 1 步:接口直连,一次拿到链接 ```bash COOKIE_JAR=/tmp/feishu_cookies.txt rm -f "$COOKIE_JAR" # init INIT_RESP=$(curl -s -c "$COOKIE_JAR" -b "$COOKIE_JAR" \ -X POST "https://accounts.feishu.cn/oauth/v1/app/registration" \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "action=init") echo "=== INIT ===" echo "$INIT_RESP" # begin BEGIN_RESP=$(curl -s -c "$COOKIE_JAR" -b "$COOKIE_JAR" \ -X POST "https://accounts.feishu.cn/oauth/v1/app/registration" \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "action=begin&archetype=PersonalAgent&auth_method=client_secret&request_user_info=open_id") echo "=== BEGIN ===" echo "$BEGIN_RESP" ``` 从 begin 返回值中提取: - `verification_uri_complete` — 原始链接 - `device_code` — 保存下来,用户确认后 poll 用 - `user_code` — 从链接参数中提取 ### 第 2 步:拼接 from=maxclaw,生成最终链接 拿到 `verification_uri_complete` 后,必须在末尾拼接 `&from=maxclaw`。 示例: - 原始:https://open.feishu.cn/page/openclaw?user_code=XXXX-XXXX - 最终:https://open.feishu.cn/page/openclaw?user_code=XXXX-XXXX&from=maxclaw 没有 `&from=maxclaw` 的链接是错误的,不要发给用户。 ### 第 3 步:把链接直接发给用户 链接裸露返回。正确输出: --- 请在浏览器中直接打开这个链接完成飞书配置: https://open.feishu.cn/page/openclaw?user_code=XXXX-XXXX&from=maxclaw 用户码:XXXX-XXXX 配置完成之后回来和我说一声"ok"或者"配置好了"就行。 --- 禁止把链接放在反引号或代码块里。 ### 第 4 步:用户回复"ok"或"配置好了"之后 先执行一次 poll: ```bash curl -s -c "$COOKIE_JAR" -b "$COOKIE_JAR" \ -X POST "https://accounts.feishu.cn/oauth/v1/app/registration" \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "action=poll&device_code=<之前保存的 device_code>" ``` 返回 `client_id` + `client_secret` → 成功。 返回 `authorization_pending` → 告诉用户飞书侧似乎还没完成,让用户再确认。 poll 成功后,直接告诉用户: "配对完成了!现在请在飞书里找到你的机器人,给它发一条消息,机器人会回复一个配对码。把这个码告诉我,我来帮你完成配对授权。" --- 说明这里已经完成了 `feisu.md` 的第 1 步替换,并拿到了后续要用的凭证: - `client_id` 作为 `App ID` - `client_secret` 作为 `App Secret` 然后不要回到 `feisu.md` 的第 1 步,直接继续执行: - `feisu.md` 的第 2 步:Configure OpenClaw - `feisu.md` 的第 3 步:Start + test 也就是说,后面继续按 `feisu.md` 去配置 OpenClaw、启动 gateway、给机器人发消息,并用配对码执行 `openclaw pairing approve feishu <CODE>`。 --- 不要再输出飞书开放平台里的建应用、配权限、开事件订阅、发布应用这类第 1 步指引。 ## 禁止事项 - 给用户的链接不拼 `&from=maxclaw` - 在用户确认后又重新执行 `feisu.md` 的第 1 步 - 输出"配置权限/开启机器人能力/配置事件订阅/发布应用/管理员审批/批量导入/权限JSON"等内容 - 先跑官方安装器再切直连 - 手动卸载/禁用插件 - 自动开轮询 - 生成二维码图片 - 用反引号/代码块包裹链接 - 把 `authorization_pending` 当失败 ## 一句话总结 用这个 skill 替换 `feisu.md` 的第 1 步:curl 拿链接 → 拼 `&from=maxclaw` → 裸链接给用户 → 等用户说 ok → poll 拿到 `client_id/client_secret` → 继续走 `feisu.md` 的第 2 步和第 3 步
Provide file protection while users work with openclaw. All file operations are version-indexed and support delete recovery and modification rollback.
---
name: mmxagent-guardian
version: 1.0.3
description: Provide file protection while users work with openclaw. All file operations are version-indexed and support delete recovery and modification rollback.
---
# File Protection and Version Management
This Skill uses MiniVCS to track all file operations with version history.
**Runtime requirement**: Python 3 is required to run `scripts/minivcs/minivcs.py`.
**Script location**: `scripts/minivcs/minivcs.py` in the same directory as this `SKILL.md`
(Before executing, first determine the directory containing this file. It is referred to as `$SKILL_DIR` below.)
**Core mechanism:**
- **Modify**: Saves an incremental diff plus a full snapshot from before the modification, **supporting rollback**
- **Delete**: Moves the full file into `~/.openclaw/minivcs/trash/`, **supporting restore**
- **All files are indexed**, with different retention periods based on importance
- **After every operation, the full record table is scanned automatically** to find expired records and ask the user for confirmation
**Usage limitations:**
- **Binary files (images, PDFs, audio/video, etc.)**: Text diffs cannot be generated. For binary-file protection, MiniVCS stores a full local `.bak` copy instead. This applies to binary-file backup/rollback protection, and deletion protection also stores a full local copy in `~/.openclaw/minivcs/trash/`. The user must be informed before the operation because storage usage may be relatively high.
- **The first record cannot be rolled back**: When a file is recorded for the first time, there is not yet any historical baseline, so that record has no snapshot. After that, each call to `record_modify` following an edit automatically saves a snapshot, allowing rollback to the state before any edit.
- **Local storage notice**: Protection data is stored locally under `~/.openclaw/minivcs/`. This Skill does not provide encryption or remote sync.
---
## Prerequisite Check: Confirm Python Is Installed
**Before performing any operation, you must first check whether Python 3 is installed in the user's environment. This Skill requires Python 3 and should not run without it.**
### How to check
```bash
# macOS / Linux
python3 --version
# Windows (PowerShell)
python --version
```
- If the output is `Python 3.x.x`, it is installed and you can continue
- If it says `command not found` or `is not recognized as an internal or external command`, stop and tell the user that this Skill requires Python 3
### Dependency boundary
If Python 3 is missing:
- Do **not** run dependency installation commands from this Skill
- Do **not** run remote install scripts such as `curl | bash`
- Do **not** write to shell config files such as `~/.zshrc` or `~/.bash_profile`
- Do **not** modify global environment variables on the user's behalf from this Skill
- Tell the user that Python 3 is required, and ask them to install it first or explicitly authorize a separate environment-setup flow
> **Note**: On some Windows systems the command is `python`, while on macOS/Linux it is `python3`.
> In all commands below that use `python`, replace it with the correct command for the actual environment.
---
## Retention Policy
| File Type | Retention Days | Decision Rule |
|---------|---------|---------|
| Important files | **14 days** | System paths (`/etc/`, `/root/`, `/usr/local/etc/`, `/opt/`), user config directories (`~/.ssh/`, `~/.gnupg/`, `~/.config/`, `~/.openclaw/`, `~/.kube/`, `~/.docker/`, `~/.aws/`, `~/.azure/`, `~/.local/share/`), Windows system directories (`C:\Windows\`, `C:\ProgramData\`, `C:\Program Files\`), config files (`.yaml/.toml/.env`, etc.), entry files (`main.py/index.ts`, etc.) |
| Normal files | **7 days** | All other files |
When each record is created, the `expireAt` (expiration timestamp) and `expireAtDatetime` (human-readable time) fields are set automatically.
---
## Excluded Paths (Auto-Skip)
Files whose path contains any of the following segments are **automatically skipped** by `record_modify` and `record_delete`. These are generated, vendored, or cache directories that should not be version-tracked:
`node_modules`, `.git`, `__pycache__`, `.venv`, `venv`, `.tox`, `.mypy_cache`, `.pytest_cache`, `.ruff_cache`, `dist`, `build`, `.next`, `.nuxt`, `.turbo`
When a file is skipped, the return value will contain `"skipped": true` and a `"reason"` field. The Agent should inform the user that the file was not tracked and explain why.
---
## Binary File Size Warning
When a binary file exceeds **50 MB**, the return value from `record_modify` (binary backup) will include:
- `"sizeWarning": true`
- `"sizeWarningMessage"`: a human-readable description of the file size and threshold
**When `sizeWarning` is present, the Agent must explicitly inform the user of the file size and ask for confirmation before proceeding with any further binary backup operations on similarly large files.**
---
## Operation Flow
### Step 1: Initialize the MiniVCS Working Directory
On first use, the storage directory is created automatically. No extra action is required:
```bash
python "$SKILL_DIR/scripts/minivcs/minivcs.py" history --project-root <project_root>
```
All data is stored uniformly in `~/.openclaw/minivcs/`:
```
~/.openclaw/minivcs/
logs.json # operation log (includes the expireAt field)
diffs/ # incremental patches for text file modifications
bases/ # baseline for the next comparison (named by full relative path, so no same-name conflicts)
snapshots/ # full snapshots before text file modifications (used for rollback)
trash/ # full backups of deleted files (used for restore)
backups/ # full `.bak` copies of binary files (used for rollback)
```
---
### Step 2: Ask the User for Confirmation Before the Operation
**Before deleting or modifying a file, you must explain the following to the user and wait for confirmation:**
1. The file path to be operated on
2. The operation type (modify / delete)
3. The purpose and intent of the operation
4. The possible impact
5. **Explain the local storage behavior: this Skill stores protection data under `~/.openclaw/minivcs/`**
6. **For text-file modifications, explain that MiniVCS stores diffs and snapshots so the file can be rolled back**
7. **For deletions, explain that MiniVCS stores a full local copy in `~/.openclaw/minivcs/trash/` so the file can be restored**
8. **If the file is binary, explicitly explain that protection uses a full local copy rather than a text diff, and ask whether the user wants that local copy to be stored before deletion or binary-file protection**
9. **If a binary backup returns `sizeWarning: true`, the Agent must display the file size and warning message to the user, and ask for explicit confirmation before proceeding**
If the file is binary and the user does **not** want a local stored copy, do not treat the operation as a protected delete/rollback flow under this Skill.
If the path is auto-skipped (return contains `"skipped": true`), inform the user that the file was not tracked and state the reason.
Example before the operation:
```
I am about to operate on the following file. Please confirm:
- File: /path/to/file.py
- Operation: delete
- Reason: This file has been replaced by a newer version and is no longer used
- Impact: Need to confirm that no other module imports this file
- Local storage: A full local copy will be stored under ~/.openclaw/minivcs/trash/ so the file can be restored later
- Protection: The stored copy will be retained for 7 days (14 days for important files), and can be restored at any time during that period
Do you want to continue?
```
Example for a binary file before deletion:
```
I am about to delete the following binary file. Please confirm:
- File: /path/to/file.pdf
- Operation: delete
- Reason: This file is no longer needed
- Impact: The original file will be removed from its current location
- Local storage: To protect this binary file, I need to store one full local copy under ~/.openclaw/minivcs/trash/
- Note: Binary files are protected with full copies rather than text diffs, so this may use more disk space
Do you want me to keep that local backup before deletion?
```
After the operation completes, **you must inform the user of the record result**, for example:
```
# After a modification is completed
The modification to path/to/file.py has been completed, and this change has been recorded (Record ID: 1710000000000).
- Change summary: +5 lines, -2 lines
- Retention period: 7 days (expires at: 2026-03-20 10:00:00)
- Rollback available: yes (use restore 1710000000000 to recover the state before the modification)
If you want to view the diff or roll it back, let me know.
# After a deletion is completed
path/to/file.py has been moved to the trash and backed up (Record ID: 1710000001000).
- Retention period: 14 days (expires at: 2026-03-27 10:00:00) [important file]
If you want to restore this file, let me know.
```
---
### Step 3: Operate on Files with MiniVCS
#### Modify a file (supports rollback)
**Just call `record` once after each edit**. The snapshot chain is formed automatically and supports rolling back to any historical state:
```
First time using this file → record() → base established (no snapshot, first record cannot be rolled back)
Edit → C1 → record() → snapshot=initial content, R1 rollback available
Edit → C2 → record() → snapshot=C1, R2 rollback available → restoring R2 gives C1
Edit → C3 → record() → snapshot=C2, R3 rollback available → restoring R3 gives C2 (that is, the state at time t2)
```
```bash
# Run once after each edit is completed
python "$SKILL_DIR/scripts/minivcs/minivcs.py" record <file_path> --project-root <project_root>
# Output containing "Rollback: available" means the snapshot has been saved and can be rolled back
```
Python API:
```python
import sys, os
sys.path.insert(0, os.path.join(SKILL_DIR, "scripts", "minivcs"))
from minivcs import MiniVCS
vcs = MiniVCS(project_root="/path/to/project")
# Call once after each edit
result = vcs.record_modify("path/to/file.py")
# result includes:
# {
# "success": True,
# "recordId": "...",
# "summary": "+5 lines, -2 lines",
# "canRollback": True, # True when a snapshot exists; False for the first record
# "snapshotFile": "...",
# "isImportant": False,
# "retentionDays": 7,
# "due_for_cleanup": [...]
# }
```
#### Delete a file (supports restore)
**Do not delete the file directly**. Use `record_delete` instead, and the file is moved into `trash` automatically.
Before running `record_delete`:
- Tell the user that a full local copy will be stored in `~/.openclaw/minivcs/trash/`
- If the file is binary, explicitly ask whether they want that local copy to be stored
- Only proceed with protected deletion after the user confirms the local storage behavior
```bash
python "$SKILL_DIR/scripts/minivcs/minivcs.py" delete <file_path> --project-root <project_root>
```
Python API:
```python
result = vcs.record_delete("path/to/file.py")
# result includes:
# {
# "success": True,
# "recordId": "...",
# "trashFile": "~/.openclaw/minivcs/trash/1234567_file.py.bak",
# "isImportant": True,
# "retentionDays": 14,
# "due_for_cleanup": [...]
# }
```
#### Restore / Roll back
The same `restore` command handles both scenarios:
```bash
# DELETE record -> restore the file to its original path
# MODIFY record (with snapshot) -> roll back to the state before the modification
python "$SKILL_DIR/scripts/minivcs/minivcs.py" restore <record_id> --project-root <project_root>
```
Python API:
```python
result = vcs.restore_file(record_id="...")
# After DELETE succeeds: the record is marked as RESTORED and no longer appears in the trash list
# After MODIFY succeeds: the record is marked as ROLLED_BACK
# Repeated restore: returns {"success": False, "error": "already been restored/rolled_back"}
```
---
### Step 4: Handle Expired Cleanup Notifications (must be done after every operation)
After each call to `record_modify` or `record_delete`, the return value contains the `due_for_cleanup` field.
**If `due_for_cleanup` is not empty, the Agent must:**
1. Show the user the list of expired records (sorted from earliest to latest)
2. Ask which ones can be deleted and which ones need an extension
Example display format:
```
The following N historical records have expired. Please confirm whether they can be cleaned up:
[1] ID=1710000000000 File=src/old.py Action=MODIFY
Recorded at=2026-03-01 10:00:00 Expires at=2026-03-08 10:00:00
[2] ID=1710000001000 File=config.yaml Action=DELETE [important file]
Recorded at=2026-02-28 09:00:00 Expires at=2026-03-14 09:00:00
Can these records be deleted? (delete all / specify which ones to extend)
```
Handling after the user responds:
```python
# The user confirms deleting all
result = vcs.delete_due_records(record_ids=["id1", "id2"])
# The user says not to delete one record yet -> extend by one retention cycle (7 days or 14 days)
result = vcs.extend_record_expiry(record_id="id1")
```
Command line:
```bash
# View all expired records
python "$SKILL_DIR/scripts/minivcs/minivcs.py" cleanup --project-root <project_root>
# Confirm cleanup
python "$SKILL_DIR/scripts/minivcs/minivcs.py" cleanup --confirm --project-root <project_root>
# Extend one record
python "$SKILL_DIR/scripts/minivcs/minivcs.py" extend <record_id> --project-root <project_root>
```
---
## Other Common Operations
```bash
# View operation history (records marked [rollback available] can be rolled back)
python "$SKILL_DIR/scripts/minivcs/minivcs.py" history --project-root <project_root>
python "$SKILL_DIR/scripts/minivcs/minivcs.py" history <file_path> -d --project-root <project_root>
# View files in trash that have not yet been restored
python "$SKILL_DIR/scripts/minivcs/minivcs.py" trash --project-root <project_root>
# Delete a specified record (clean both the log and the physical file)
python "$SKILL_DIR/scripts/minivcs/minivcs.py" remove <record_id> --project-root <project_root>
```
---
## Record Field Reference
| Field | Type | Description |
|------|------|------|
| `recordId` | string | Unique record ID (millisecond timestamp) |
| `filePath` | string | Relative or absolute path |
| `action` | string | `MODIFY` / `DELETE` / `BINARY_BACKUP` / `RESTORED` / `ROLLED_BACK` |
| `timestamp` | number | Creation time (milliseconds) |
| `datetime` | string | Creation time (human-readable) |
| `isImportant` | bool | Whether the file is important |
| `retentionDays` | number | Retention days (7 or 14) |
| `expireAt` | number | Expiration time (milliseconds) |
| `expireAtDatetime` | string | Expiration time (human-readable) |
| `diffFile` | string | Diff patch path (MODIFY only) |
| `snapshotFile` | string | Pre-modification snapshot path (only present when MODIFY has a snapshot) |
| `trashFile` | string | Trash backup path (DELETE only) |
| `backupFile` | string | `.bak` copy path (BINARY_BACKUP only) |
| `summary` | string | Change summary |
---
## Full Operation Flow Diagram
```
The user requests a modification/deletion
│
▼
[Ask for confirmation] Explain the purpose, impact, and existing protection -> wait for user confirmation
│
├─── Modify file ───────────────────────────────────────────────┐
│ 1. Perform the actual modification │
│ 2. record_modify (Diff + snapshot -> canRollback=True) │
│ snapshot = content before this modification, so │
│ historical states can be rolled back step by step │
│ │
└─── Delete file ──────────────────────────────────────────────┤
record_delete (move to trash, do not delete directly) │
│
┌─────────────────────────────────────────┘
▼
Tell the user: Record ID / retention period / rollback or restore availability
│
▼
Check the returned value `due_for_cleanup`
│
┌───────────┴───────────┐
Expired records exist No expired records
│ │
▼ ▼
Show expired records Operation complete
(from earliest to latest)
Ask the user for confirmation
│
┌───────┴──────────┐
Confirm deletion Do not delete yet
│ │
▼ ▼
delete_due_records extend_record_expiry
(extend by one retention cycle)
------- When the user needs rollback/restore -------
User: "Help me restore/roll back xxx"
│
▼
restore_file(record_id)
├── DELETE record -> restore from trash and mark as RESTORED
└── MODIFY record (with snapshot) -> write back snapshot content and mark as ROLLED_BACK
```
FILE:file_guardian.DEPENDENCIES.md
# file-guardian dependency notes
## Python dependencies
| Dependency | Source | Purpose |
|------|------|------|
| os | standard library | Path handling, directory creation, environment paths |
| sys | standard library | Platform detection, CLI entry |
| json | standard library | Read/write MiniVCS log records |
| time | standard library | Record IDs and timestamps |
| shutil | standard library | Move/copy files for trash and backup flows |
| difflib | standard library | Generate text diffs |
| argparse | standard library | CLI argument parsing |
| datetime | standard library | Human-readable timestamps |
| typing | standard library | Type annotations |
Only the Python standard library is used. There are no `pip` dependencies and no project-internal Python dependencies.
## External runtime requirement
| Dependency | Version | Purpose |
|------|------|------|
| Python 3 | required | Run `file-guardian/scripts/minivcs/minivcs.py` |
Notes:
- On macOS/Linux, the command is typically `python3`
- On some Windows environments, the command may be `python`
- This dependency file documents the runtime requirement only. It does not authorize installation or shell-environment changes from within the Skill
## Bundled files
| File | Purpose |
|------|------|
| `file-guardian/SKILL.md` | Skill instructions and operating constraints |
| `file-guardian/scripts/minivcs/minivcs.py` | Local MiniVCS implementation used by the Skill |
## Local storage behavior
When protection is used, MiniVCS stores local data under `~/.openclaw/minivcs/`, including:
- `logs.json`
- `diffs/`
- `bases/`
- `snapshots/`
- `trash/`
- `backups/`
This is local runtime data, not an additional package dependency.
FILE:scripts/minivcs/minivcs.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MiniVCS Skill - lightweight file version management system
- Modify: save an incremental diff plus a full pre-edit snapshot (supports rollback)
- Delete: save the full file to trash (supports restore)
Retention policy:
- Normal files: 7 days
- Important files: 14 days (system paths, config files, entry files, etc.)
After each record operation, the full record table is scanned automatically
(from earliest to latest), and a list of expired records that can be cleaned
up is returned for the Agent to confirm with the user.
"""
import os
import sys
import json
import time
import shutil
import difflib
import argparse
from datetime import datetime
from typing import List, Dict, Any, Optional
# -------------------------------------------------------------------
# Important file detection rules
# -------------------------------------------------------------------
# Absolute system paths that always qualify as important
IMPORTANT_PATH_PREFIXES_ABSOLUTE = [
"/etc/",
"/root/",
"/usr/local/etc/",
"/opt/",
]
# User-home-relative directories that qualify as important (expanded at runtime).
# Only specific config/credential dirs -- NOT the entire home directory.
IMPORTANT_HOME_SUBDIRS = [
".ssh",
".gnupg",
".gpg",
".config",
".local/share",
".openclaw",
".kube",
".docker",
".aws",
".azure",
]
# Windows system directories (matched case-insensitively)
IMPORTANT_WINDOWS_PREFIXES = [
"c:\\windows\\",
"c:\\programdata\\",
"c:\\program files\\",
"c:\\program files (x86)\\",
]
IMPORTANT_FILENAME_PATTERNS = [
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".env",
"main.py",
"app.py",
"server.py",
"wsgi.py",
"asgi.py",
"index.ts",
"index.js",
"main.ts",
"main.go",
"main.rs",
"manage.py",
"settings.py",
"config.py",
"dockerfile",
"docker-compose.yml",
"docker-compose.yaml",
"makefile",
"cmakelists.txt",
]
# Paths containing any of these segments are auto-skipped (generated / vendored content)
SKIP_PATH_SEGMENTS = [
"node_modules",
".git",
"__pycache__",
".venv",
"venv",
".tox",
".mypy_cache",
".pytest_cache",
".ruff_cache",
"dist",
"build",
".next",
".nuxt",
".turbo",
]
RETENTION_NORMAL_DAYS = 7
RETENTION_IMPORTANT_DAYS = 14
BINARY_SIZE_WARN_THRESHOLD = 50 * 1024 * 1024 # 50 MB
def _build_important_home_prefixes() -> list[str]:
"""Expand IMPORTANT_HOME_SUBDIRS into absolute prefixes for the current user."""
home = os.path.expanduser("~")
return [os.path.join(home, d) + os.sep for d in IMPORTANT_HOME_SUBDIRS]
def is_important_file(file_path: str) -> bool:
"""Determine whether a file is important and needs a longer retention period."""
abs_path = os.path.abspath(file_path)
# System path prefixes (Unix)
for prefix in IMPORTANT_PATH_PREFIXES_ABSOLUTE:
if abs_path.startswith(prefix):
return True
# Specific user config/credential directories under home
for prefix in _build_important_home_prefixes():
if abs_path.startswith(prefix):
return True
# Windows system directories (case-insensitive)
if sys.platform == "win32":
lower = abs_path.lower().replace("/", "\\")
for prefix in IMPORTANT_WINDOWS_PREFIXES:
if lower.startswith(prefix):
return True
# Filename patterns (lowercase match)
filename = os.path.basename(abs_path).lower()
for pattern in IMPORTANT_FILENAME_PATTERNS:
if filename == pattern or filename.endswith(pattern):
return True
return False
def should_skip_path(file_path: str) -> Optional[str]:
"""Return the matched skip segment if the path should be auto-skipped, else None."""
normalized = os.path.abspath(file_path).replace("\\", "/")
parts = normalized.split("/")
for segment in SKIP_PATH_SEGMENTS:
if segment in parts:
return segment
return None
def get_retention_days(file_path: str) -> int:
"""Return retention days based on file importance."""
return RETENTION_IMPORTANT_DAYS if is_important_file(file_path) else RETENTION_NORMAL_DAYS
def _make_safe_path(relative_path: str) -> str:
"""
Convert a relative path into a safe string that can be used as a filename.
Use the full relative path rather than just the filename to avoid conflicts
between files with the same name.
"""
return relative_path.replace(os.sep, "__").replace("/", "__")
# -------------------------------------------------------------------
# LogManager
# -------------------------------------------------------------------
class LogManager:
"""Log manager responsible for reading and writing JSON change records."""
def __init__(self, vcs_root: str):
self.vcs_root = vcs_root
self.log_file = os.path.join(vcs_root, "logs.json")
self._ensure_log_file()
def _ensure_log_file(self):
if not os.path.exists(self.log_file):
os.makedirs(os.path.dirname(self.log_file), exist_ok=True)
with open(self.log_file, "w", encoding="utf-8") as f:
json.dump({"version": "1.0", "records": []}, f, ensure_ascii=False, indent=2)
def _read_log(self) -> Dict[str, Any]:
try:
with open(self.log_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return {"version": "1.0", "records": []}
def _write_log(self, data: Dict[str, Any]):
# Atomic write: write a temp file first, then rename it to avoid corrupting logs.json on crash
tmp_file = self.log_file + ".tmp"
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_file, self.log_file)
def append_record(self, record: Dict[str, Any]) -> str:
data = self._read_log()
record_id = record.get("recordId", f"{int(time.time() * 1000)}")
record["recordId"] = record_id
now_ms = int(time.time() * 1000)
if "timestamp" not in record:
record["timestamp"] = now_ms
if "datetime" not in record:
record["datetime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# If expireAt is not set, calculate it automatically from retentionDays
if "expireAt" not in record:
retention_days = record.get("retentionDays", RETENTION_NORMAL_DAYS)
record["expireAt"] = record["timestamp"] + retention_days * 24 * 60 * 60 * 1000
if "expireAtDatetime" not in record:
record["expireAtDatetime"] = datetime.fromtimestamp(record["expireAt"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
data["records"].append(record)
self._write_log(data)
return record_id
def update_record(self, record_id: str, updates: Dict[str, Any]) -> bool:
"""Update fields on the specified record."""
data = self._read_log()
for record in data["records"]:
if record.get("recordId") == record_id:
record.update(updates)
self._write_log(data)
return True
return False
def get_history_by_file(self, file_path: str, descending: bool = True) -> List[Dict[str, Any]]:
data = self._read_log()
records = [r for r in data["records"] if r.get("filePath") == file_path]
records.sort(key=lambda x: x.get("timestamp", 0), reverse=descending)
return records
def get_all_history(self, descending: bool = True) -> List[Dict[str, Any]]:
data = self._read_log()
records = list(data["records"])
records.sort(key=lambda x: x.get("timestamp", 0), reverse=descending)
return records
def get_due_for_cleanup(self) -> List[Dict[str, Any]]:
"""
Return expired records (expireAt <= current time), sorted by expireAt
from earliest to latest. Skip records that were already restored or rolled back.
"""
data = self._read_log()
current_time = int(time.time() * 1000)
skip_actions = {"RESTORED", "ROLLED_BACK"}
due = [
r
for r in data["records"]
if r.get("expireAt") is not None and r["expireAt"] <= current_time and r.get("action") not in skip_actions
]
due.sort(key=lambda x: x.get("expireAt", 0))
return due
def extend_record_expiry(self, record_id: str, additional_ms: int) -> bool:
"""Extend expireAt on the specified record by additional_ms milliseconds."""
data = self._read_log()
for record in data["records"]:
if record.get("recordId") == record_id:
record["expireAt"] = record.get("expireAt", int(time.time() * 1000)) + additional_ms
record["expireAtDatetime"] = datetime.fromtimestamp(record["expireAt"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
self._write_log(data)
return True
return False
def get_expired_records(self, days: int) -> List[Dict[str, Any]]:
"""Compatibility for the old interface: query expired records by fixed days."""
data = self._read_log()
current_time = int(time.time() * 1000)
expire_ms = days * 24 * 60 * 60 * 1000
return [r for r in data["records"] if current_time - r.get("timestamp", 0) > expire_ms]
def delete_record(self, record_id: str) -> bool:
data = self._read_log()
original_count = len(data["records"])
data["records"] = [r for r in data["records"] if r.get("recordId") != record_id]
if len(data["records"]) < original_count:
self._write_log(data)
return True
return False
def clean_expired_records(self, days: int) -> Dict[str, Any]:
expired = self.get_expired_records(days)
deleted_count = sum(1 for r in expired if self.delete_record(r.get("recordId", "")))
return {"deleted_count": deleted_count, "deleted_records": expired}
def get_record_by_id(self, record_id: str) -> Optional[Dict[str, Any]]:
data = self._read_log()
for record in data["records"]:
if record.get("recordId") == record_id:
return record
return None
# -------------------------------------------------------------------
# DiffEngine
# -------------------------------------------------------------------
class DiffEngine:
"""Diff engine that generates file change differences."""
@staticmethod
def generate_diff(old_content: str, new_content: str, context: int = 3) -> str:
old_lines = old_content.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
diff = difflib.unified_diff(
old_lines,
new_lines,
fromfile="original",
tofile="modified",
lineterm="\n",
n=context,
)
return "".join(diff)
@staticmethod
def parse_diff_summary(diff_text: str) -> Dict[str, int | str]:
added = removed = 0
for line in diff_text.splitlines():
if line.startswith("+") and not line.startswith("+++"):
added += 1
elif line.startswith("-") and not line.startswith("---"):
removed += 1
return {
"added": added,
"removed": removed,
"summary": f"+{added} lines, -{removed} lines" if added or removed else "No changes",
}
# -------------------------------------------------------------------
# FileManager
# -------------------------------------------------------------------
class FileManager:
"""File manager responsible for physical file operations."""
def __init__(self, vcs_root: str, project_root: str):
self.vcs_root = vcs_root
self.project_root = project_root
self.trash_dir = os.path.join(vcs_root, "trash")
self.diffs_dir = os.path.join(vcs_root, "diffs")
self.bases_dir = os.path.join(vcs_root, "bases")
self.snapshots_dir = os.path.join(vcs_root, "snapshots") # Full pre-edit snapshots for text files, used for rollback
self.backups_dir = os.path.join(vcs_root, "backups") # Full-copy backups for binary files
os.makedirs(self.trash_dir, exist_ok=True)
os.makedirs(self.diffs_dir, exist_ok=True)
os.makedirs(self.bases_dir, exist_ok=True)
os.makedirs(self.snapshots_dir, exist_ok=True)
os.makedirs(self.backups_dir, exist_ok=True)
def get_file_content(self, file_path: str) -> Optional[str]:
if not os.path.exists(file_path):
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
try:
with open(file_path, "rb") as f:
return f.read().decode("utf-8", errors="ignore")
except Exception:
return None
def is_binary_file(self, file_path: str) -> bool:
"""Detect whether a file is binary using a simple heuristic: read the first 1KB and check for NUL bytes."""
try:
with open(file_path, "rb") as f:
chunk = f.read(1024)
return b"\x00" in chunk
except Exception:
return False
def save_to_trash(self, file_path: str) -> str:
"""Move a deleted file into the trash directory while preserving its full content."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
timestamp = int(time.time() * 1000)
filename = os.path.basename(file_path)
trash_filename = f"{timestamp}_{filename}.bak"
trash_path = os.path.join(self.trash_dir, trash_filename)
shutil.move(file_path, trash_path)
return trash_path
def restore_from_trash(self, trash_path: str, target_path: str) -> bool:
"""Restore a file from the trash directory (move; the original backup disappears)."""
try:
if not os.path.exists(trash_path):
return False
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.move(trash_path, target_path)
return True
except Exception:
return False
def save_binary_backup(self, file_path: str) -> str:
"""Copy a binary file in full to the backups directory, saved with a .bak extension, without deleting the original file."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
timestamp = int(time.time() * 1000)
filename = os.path.basename(file_path)
backup_filename = f"{timestamp}_{filename}.bak"
backup_path = os.path.join(self.backups_dir, backup_filename)
shutil.copy2(file_path, backup_path)
return backup_path
def restore_binary_backup(self, backup_path: str, target_path: str) -> bool:
"""Copy a .bak file from the backups directory back to the original path, overwriting the current file while keeping the backup."""
try:
if not os.path.exists(backup_path):
return False
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy2(backup_path, target_path)
return True
except Exception:
return False
def save_diff(self, relative_path: str, diff_content: str) -> str:
"""Save a diff patch into the diffs directory, named by full relative path to avoid same-name conflicts."""
timestamp = int(time.time() * 1000)
safe_path = _make_safe_path(relative_path)
diff_path = os.path.join(self.diffs_dir, f"{timestamp}_{safe_path}.patch")
with open(diff_path, "w", encoding="utf-8") as f:
f.write(diff_content)
return diff_path
def save_snapshot(self, relative_path: str, content: str) -> str:
"""Save a full snapshot from before file modification for rollback."""
timestamp = int(time.time() * 1000)
safe_path = _make_safe_path(relative_path)
snap_path = os.path.join(self.snapshots_dir, f"{timestamp}_{safe_path}.snap")
with open(snap_path, "w", encoding="utf-8") as f:
f.write(content)
return snap_path
def save_base(self, relative_path: str, content: str) -> str:
"""Save current content as the baseline for the next comparison, named by full relative path."""
safe_path = _make_safe_path(relative_path)
base_path = os.path.join(self.bases_dir, f"{safe_path}.base")
with open(base_path, "w", encoding="utf-8") as f:
f.write(content)
return base_path
def get_base(self, relative_path: str) -> Optional[str]:
"""Get the baseline content from the previous record."""
safe_path = _make_safe_path(relative_path)
base_path = os.path.join(self.bases_dir, f"{safe_path}.base")
return self.get_file_content(base_path)
def delete_base(self, relative_path: str) -> bool:
"""Delete the baseline file."""
safe_path = _make_safe_path(relative_path)
base_path = os.path.join(self.bases_dir, f"{safe_path}.base")
if os.path.exists(base_path):
os.remove(base_path)
return True
return False
# -------------------------------------------------------------------
# MiniVCS
# -------------------------------------------------------------------
class MiniVCS:
"""MiniVCS core controller."""
def __init__(self, project_root: str, vcs_root: Optional[str] = None):
self.project_root = os.path.abspath(project_root)
# By default, store data in ~/.openclaw/minivcs/ so it is decoupled from the project directory
if vcs_root is not None:
self.vcs_root = os.path.abspath(vcs_root)
else:
self.vcs_root = os.path.join(os.path.expanduser("~"), ".openclaw", "minivcs")
self.log_manager = LogManager(self.vcs_root)
self.diff_engine = DiffEngine()
self.file_manager = FileManager(self.vcs_root, self.project_root)
def _get_relative_path(self, absolute_path: str) -> str:
if absolute_path.startswith(self.project_root + os.sep):
return absolute_path[len(self.project_root) + 1 :]
return absolute_path
# ------------------------------------------------------------------
# Core operations
# ------------------------------------------------------------------
def record_modify(self, file_path: str) -> Dict[str, Any]:
"""
Record a file modification (incremental diff + pre-edit snapshot).
Call it once after each edit; there is no need to call it both before and after:
- First call (no base): save current content as the baseline; this record has no snapshot
- Later calls (with base): diff = base -> current, snapshot = base (that is, the pre-edit content), then update base
After the call, a list of expired records pending cleanup is returned automatically.
Snapshot chain example (call once after each edit):
Initial C0 -> record() -> base=C0
Edit C1 -> record() -> snapshot=C0, base=C1 (R1, can roll back to C0)
Edit C2 -> record() -> snapshot=C1, base=C2 (R2, can roll back to C1 = state at t1)
Edit C3 -> record() -> snapshot=C2, base=C3 (R3, can roll back to C2 = state at t2)
"""
abs_path = os.path.abspath(file_path)
if not os.path.exists(abs_path):
return {"success": False, "error": f"File not found: {file_path}"}
skip_segment = should_skip_path(abs_path)
if skip_segment:
return {
"success": False,
"skipped": True,
"reason": f"Path contains '{skip_segment}' which is in the auto-skip list",
}
# Binary files: cannot generate diffs, so use a full-copy backup instead (.bak copy, original file kept)
if self.file_manager.is_binary_file(abs_path):
return self._record_binary_backup(abs_path)
try:
new_content = self.file_manager.get_file_content(abs_path)
if new_content is None:
return {"success": False, "error": "Cannot read file content"}
relative_path = self._get_relative_path(abs_path)
prev_base = self.file_manager.get_base(relative_path)
old_content = prev_base if prev_base is not None else ""
diff_text = self.diff_engine.generate_diff(old_content, new_content)
diff_summary = self.diff_engine.parse_diff_summary(diff_text)
if old_content != "" and diff_summary["added"] == 0 and diff_summary["removed"] == 0:
due = self.query_due_for_cleanup()
return {"success": True, "message": "No changes detected", "skipped": True, "due_for_cleanup": due}
diff_path = self.file_manager.save_diff(relative_path, diff_text)
# When a baseline exists, old_content is the pre-edit content; save it as a snapshot for rollback
snap_path = None
if old_content:
snap_path = self.file_manager.save_snapshot(relative_path, old_content)
# Update the baseline to the current content
self.file_manager.save_base(relative_path, new_content)
retention_days = get_retention_days(abs_path)
important = is_important_file(abs_path)
record: Dict[str, Any] = {
"filePath": relative_path,
"action": "MODIFY",
"diffFile": diff_path,
"summary": diff_summary["summary"],
"linesAdded": diff_summary["added"],
"linesRemoved": diff_summary["removed"],
"isImportant": important,
"retentionDays": retention_days,
}
if snap_path:
record["snapshotFile"] = snap_path
record_id = self.log_manager.append_record(record)
due = self.query_due_for_cleanup()
return {
"success": True,
"recordId": record_id,
"summary": diff_summary["summary"],
"diffFile": diff_path,
"snapshotFile": snap_path,
"canRollback": snap_path is not None,
"isImportant": important,
"retentionDays": retention_days,
"due_for_cleanup": due,
}
except Exception as e:
return {"success": False, "error": str(e)}
def _record_binary_backup(self, abs_path: str) -> Dict[str, Any]:
"""Binary file backup: copy the current file in full as a .bak, keep the original file, and support later restoration."""
try:
relative_path = self._get_relative_path(abs_path)
important = is_important_file(abs_path)
retention_days = get_retention_days(abs_path)
file_size = os.path.getsize(abs_path)
size_warning = file_size > BINARY_SIZE_WARN_THRESHOLD
backup_path = self.file_manager.save_binary_backup(abs_path)
record = {
"filePath": relative_path,
"action": "BINARY_BACKUP",
"backupFile": backup_path,
"summary": f"Binary file backup ({file_size} bytes)",
"isImportant": important,
"retentionDays": retention_days,
}
record_id = self.log_manager.append_record(record)
due = self.query_due_for_cleanup()
result: Dict[str, Any] = {
"success": True,
"recordId": record_id,
"backupFile": backup_path,
"summary": record["summary"],
"canRollback": True,
"isImportant": important,
"retentionDays": retention_days,
"fileSize": file_size,
"due_for_cleanup": due,
}
if size_warning:
result["sizeWarning"] = True
result["sizeWarningMessage"] = (
f"Binary file is {file_size / 1024 / 1024:.1f} MB, "
f"exceeding the {BINARY_SIZE_WARN_THRESHOLD / 1024 / 1024:.0f} MB warning threshold"
)
return result
except Exception as e:
return {"success": False, "error": str(e)}
def record_delete(self, file_path: str) -> Dict[str, Any]:
"""
Record a file deletion by moving the file into trash as a full local copy.
After the call, a list of expired records pending cleanup is returned automatically.
"""
abs_path = os.path.abspath(file_path)
if not os.path.exists(abs_path):
return {"success": False, "error": f"File not found: {file_path}"}
skip_segment = should_skip_path(abs_path)
if skip_segment:
return {
"success": False,
"skipped": True,
"reason": f"Path contains '{skip_segment}' which is in the auto-skip list",
}
try:
relative_path = self._get_relative_path(abs_path)
important = is_important_file(abs_path)
retention_days = get_retention_days(abs_path)
trash_path = self.file_manager.save_to_trash(abs_path)
record = {
"filePath": relative_path,
"action": "DELETE",
"trashFile": trash_path,
"summary": "File moved to trash as a local restore copy",
"isImportant": important,
"retentionDays": retention_days,
}
record_id = self.log_manager.append_record(record)
due = self.query_due_for_cleanup()
return {
"success": True,
"recordId": record_id,
"trashFile": trash_path,
"message": "File deleted and stored in trash as a local restore copy",
"isImportant": important,
"retentionDays": retention_days,
"due_for_cleanup": due,
}
except Exception as e:
return {"success": False, "error": str(e)}
# ------------------------------------------------------------------
# Restore / rollback
# ------------------------------------------------------------------
def restore_file(self, record_id: str) -> Dict[str, Any]:
"""
Restore a file:
- DELETE record: restore the full file from trash
- MODIFY record: roll back to the pre-edit state using the pre-edit snapshot
After success, mark the record as RESTORED / ROLLED_BACK.
"""
record = self.log_manager.get_record_by_id(record_id)
if not record:
return {"success": False, "error": "Record not found"}
action = record.get("action")
if action == "DELETE":
trash_file = record.get("trashFile")
if not trash_file or not os.path.exists(trash_file):
return {"success": False, "error": "Backup file not found (may have been cleaned up)"}
# filePath may be relative or absolute
file_path = record.get("filePath", "")
target_path = file_path if os.path.isabs(file_path) else os.path.join(self.project_root, file_path)
success = self.file_manager.restore_from_trash(trash_file, target_path)
if success:
# Mark the record as restored to prevent duplicate restore or appearance in the trash list
self.log_manager.update_record(
record_id,
{
"action": "RESTORED",
"restoredAt": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"trashFile": None,
},
)
return {"success": True, "message": f"File restored to: {target_path}"}
return {"success": False, "error": "Failed to restore file from trash"}
elif action == "MODIFY":
snap_file = record.get("snapshotFile")
if not snap_file or not os.path.exists(snap_file):
return {
"success": False,
"error": (
"No pre-edit snapshot available for this record. "
"Rollback is only possible when record_modify was called "
"both before AND after the edit."
),
}
content = self.file_manager.get_file_content(snap_file)
if content is None:
return {"success": False, "error": "Cannot read snapshot file"}
file_path = record.get("filePath", "")
target_path = file_path if os.path.isabs(file_path) else os.path.join(self.project_root, file_path)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
f.write(content)
self.log_manager.update_record(
record_id,
{
"action": "ROLLED_BACK",
"rolledBackAt": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
)
return {"success": True, "message": f"File rolled back to pre-edit state: {target_path}"}
elif action == "BINARY_BACKUP":
backup_file = record.get("backupFile")
if not backup_file or not os.path.exists(backup_file):
return {"success": False, "error": "Backup file not found (may have been cleaned up)"}
file_path = record.get("filePath", "")
target_path = file_path if os.path.isabs(file_path) else os.path.join(self.project_root, file_path)
success = self.file_manager.restore_binary_backup(backup_file, target_path)
if success:
self.log_manager.update_record(
record_id,
{
"action": "ROLLED_BACK",
"rolledBackAt": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
)
return {"success": True, "message": f"Binary file restored to: {target_path}"}
return {"success": False, "error": "Failed to restore binary backup"}
elif action in ("RESTORED", "ROLLED_BACK"):
return {"success": False, "error": f"This record has already been {action.lower()}"}
return {"success": False, "error": f"Unknown action: {action}"}
# ------------------------------------------------------------------
# Expired cleanup management
# ------------------------------------------------------------------
def query_due_for_cleanup(self) -> List[Dict[str, Any]]:
"""
Scan the full record table and return records whose expireAt <= current time,
ordered from earliest to latest. Restored/rolled-back records are excluded.
"""
return self.log_manager.get_due_for_cleanup()
def extend_record_expiry(self, record_id: str) -> Dict[str, Any]:
"""Extend a record's expiration time by one retention cycle (retentionDays days)."""
record = self.log_manager.get_record_by_id(record_id)
if not record:
return {"success": False, "error": "Record not found"}
retention_days = record.get("retentionDays", RETENTION_NORMAL_DAYS)
additional_ms = retention_days * 24 * 60 * 60 * 1000
ok = self.log_manager.extend_record_expiry(record_id, additional_ms)
if ok:
updated = self.log_manager.get_record_by_id(record_id)
if updated is None:
return {"success": False, "error": "Failed to reload updated record"}
return {
"success": True,
"recordId": record_id,
"newExpireAt": updated.get("expireAt"),
"newExpireAtDatetime": updated.get("expireAtDatetime"),
"message": f"Expiry extended by {retention_days} days",
}
return {"success": False, "error": "Failed to extend expiry"}
def delete_due_records(self, record_ids: List[str]) -> Dict[str, Any]:
"""Batch-delete expired records that the user has confirmed can be cleaned up (log + physical files)."""
deleted_records = []
failed_records = []
deleted_files: List[str] = []
for record_id in record_ids:
result = self.delete_record_by_id(record_id, delete_files=True)
if result.get("success"):
deleted_records.append(record_id)
deleted_files.extend(result.get("deleted_files", []))
else:
failed_records.append({"recordId": record_id, "error": result.get("error")})
return {
"success": True,
"deleted_count": len(deleted_records),
"deleted_records": deleted_records,
"failed_records": failed_records,
"deleted_files": deleted_files,
}
# ------------------------------------------------------------------
# Query / history
# ------------------------------------------------------------------
def get_history(self, file_path: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
if file_path:
abs_path = os.path.abspath(file_path)
relative_path = self._get_relative_path(abs_path)
history = self.log_manager.get_history_by_file(relative_path)
else:
history = self.log_manager.get_all_history()
return history[:limit]
def get_file_diff(self, record_id: str) -> Optional[str]:
record = self.log_manager.get_record_by_id(record_id)
if not record or record.get("action") != "MODIFY":
return None
diff_file = record.get("diffFile")
if not diff_file or not os.path.exists(diff_file):
return None
return self.file_manager.get_file_content(diff_file)
def query_expired(self, days: int) -> List[Dict[str, Any]]:
"""Compatibility for the old interface: query expired records by fixed days."""
return self.log_manager.get_expired_records(days)
def clean_expired(self, days: int, delete_files: bool = False) -> Dict[str, Any]:
"""Compatibility for the old interface: clean expired records."""
expired = self.log_manager.get_expired_records(days)
deleted_files = []
if delete_files:
for record in expired:
diff_file = record.get("diffFile")
if diff_file and os.path.exists(diff_file):
os.remove(diff_file)
deleted_files.append(diff_file)
result = self.log_manager.clean_expired_records(days)
result["deleted_files"] = deleted_files
return result
def list_trash(self) -> List[Dict[str, Any]]:
"""List deleted files in trash that have not yet been restored."""
records = self.log_manager.get_all_history()
return [
{
"recordId": r.get("recordId"),
"filePath": r.get("filePath"),
"datetime": r.get("datetime"),
"trashFile": r.get("trashFile"),
"isImportant": r.get("isImportant", False),
"retentionDays": r.get("retentionDays", RETENTION_NORMAL_DAYS),
"expireAtDatetime": r.get("expireAtDatetime"),
}
for r in records
if r.get("action") == "DELETE"
]
def delete_record_by_id(self, record_id: str, delete_files: bool = True) -> Dict[str, Any]:
"""Delete the specified record (log + physical files)."""
record = self.log_manager.get_record_by_id(record_id)
if not record:
return {"success": False, "error": "Record not found"}
deleted_files = []
if delete_files:
# Delete diff file
diff_file = record.get("diffFile")
if diff_file and os.path.exists(diff_file):
os.remove(diff_file)
deleted_files.append(diff_file)
# Delete trash backup
trash_file = record.get("trashFile")
if trash_file and os.path.exists(trash_file):
os.remove(trash_file)
deleted_files.append(trash_file)
# Delete snapshot file
snap_file = record.get("snapshotFile")
if snap_file and os.path.exists(snap_file):
os.remove(snap_file)
deleted_files.append(snap_file)
# Delete binary backup file
backup_file = record.get("backupFile")
if backup_file and os.path.exists(backup_file):
os.remove(backup_file)
deleted_files.append(backup_file)
# Delete the base file only when no other MODIFY records exist for this file
file_path = record.get("filePath")
if file_path and record.get("action") == "MODIFY":
remaining = [
r
for r in self.log_manager.get_history_by_file(file_path)
if r.get("action") == "MODIFY" and r.get("recordId") != record_id
]
if not remaining:
self.file_manager.delete_base(file_path)
self.log_manager.delete_record(record_id)
return {
"success": True,
"deleted_record": record_id,
"deleted_files": deleted_files,
"message": f"Record {record_id} deleted",
}
def merge_modify(self, file_path: str, keep_record_id: Optional[str] = None) -> Dict[str, Any]:
"""Merge multiple modification records for the same file, keeping the latest one by default."""
abs_path = os.path.abspath(file_path)
relative_path = self._get_relative_path(abs_path)
records = self.log_manager.get_history_by_file(relative_path, descending=True)
modify_records = [r for r in records if r.get("action") == "MODIFY"]
if len(modify_records) <= 1:
return {"success": False, "error": "No need to merge (only one or zero record)"}
if keep_record_id:
keep_record = next((r for r in modify_records if r.get("recordId") == keep_record_id), None)
if not keep_record:
return {"success": False, "error": "Record not found"}
else:
keep_record = modify_records[0]
deleted_count = 0
deleted_files = []
for r in modify_records:
if r.get("recordId") == keep_record.get("recordId"):
continue
rid = r.get("recordId")
if isinstance(rid, str) and self.log_manager.delete_record(rid):
deleted_count += 1
for fkey in ("diffFile", "snapshotFile"):
f = r.get(fkey)
if f and os.path.exists(f):
os.remove(f)
deleted_files.append(f)
return {
"success": True,
"kept_record": keep_record.get("recordId"),
"deleted_count": deleted_count,
"deleted_files": deleted_files,
"message": f"Merged {deleted_count} records, kept latest: {keep_record.get('recordId')}",
}
# -------------------------------------------------------------------
# CLI output helpers
# -------------------------------------------------------------------
def print_records(records: list, show_diff: bool = False, vcs: Optional[MiniVCS] = None):
if not records:
print("No records found.")
return
for i, record in enumerate(records):
important_tag = " [IMPORTANT]" if record.get("isImportant") else ""
action = record.get("action", "")
rollback_tag = " [rollback available]" if action == "MODIFY" and record.get("snapshotFile") else ""
print(f"\n[{i + 1}] Record ID: {record.get('recordId')}")
print(f" File : {record.get('filePath')}{important_tag}")
print(f" Action : {action}{rollback_tag}")
print(f" Time : {record.get('datetime')}")
print(f" Summary : {record.get('summary')}")
print(f" Expires : {record.get('expireAtDatetime', 'N/A')} (retention {record.get('retentionDays', '?')}d)")
if show_diff and vcs and action == "MODIFY":
diff_content = vcs.get_file_diff(record.get("recordId"))
if diff_content:
print("\n Diff Content:")
print(" " + "-" * 40)
for line in diff_content.splitlines()[:30]:
print(f" {line}")
def print_due_for_cleanup(due: list):
if not due:
return
print(f"\n{'=' * 50}")
print(f" [CLEANUP NOTICE] {len(due)} record(s) have expired and are pending cleanup:")
for i, r in enumerate(due):
important_tag = " [IMPORTANT]" if r.get("isImportant") else ""
print(f" [{i + 1}] ID={r.get('recordId')} File={r.get('filePath')}{important_tag}")
print(f" Action={r.get('action')} Created={r.get('datetime')} Expired={r.get('expireAtDatetime')}")
print(" Use 'cleanup --confirm' to delete, or 'extend <record_id>' to postpone.")
print(f"{'=' * 50}")
# -------------------------------------------------------------------
# CLI entry
# -------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="MiniVCS - lightweight file version management system")
parser.add_argument(
"command",
choices=["record", "delete", "history", "restore", "expired", "trash", "remove", "cleanup", "extend"],
help="Command",
)
parser.add_argument("file", nargs="?", help="File path or Record ID")
parser.add_argument("-n", "--limit", type=int, default=10, help="Record count limit")
parser.add_argument("-d", "--show-diff", action="store_true", help="Show diff content")
parser.add_argument("-c", "--clean", action="store_true", help="Clean expired records (used with the expired command)")
parser.add_argument("--delete-files", action="store_true", help="Delete physical files as well")
parser.add_argument("--confirm", action="store_true", help="Confirm cleanup execution (used with the cleanup command)")
parser.add_argument("--project-root", default=os.getcwd(), help="Project root directory (used to compute relative paths)")
parser.add_argument("--vcs-root", default=None, help="VCS data storage directory (default: ~/.openclaw/minivcs)")
args = parser.parse_args()
try:
vcs = MiniVCS(args.project_root, vcs_root=args.vcs_root)
except Exception as e:
print(f"Error: Failed to initialize MiniVCS: {e}")
return 1
if args.command == "record":
if not args.file:
print("Error: Please specify a file to record")
return 1
if not os.path.exists(args.file):
print(f"Error: File not found: {args.file}")
return 1
result = vcs.record_modify(args.file)
if result.get("skipped") and not result.get("success"):
print(f"~ Skipped: {result.get('reason')}")
elif result.get("success"):
if result.get("skipped"):
print("~ No changes detected, skipped.")
elif result.get("action") == "BINARY_BACKUP" or result.get("backupFile"):
print("✓ Binary file backed up! (.bak copy saved)")
print(f" Record ID : {result.get('recordId')}")
print(f" Backup : {result.get('backupFile')}")
print(f" Summary : {result.get('summary')}")
important_tag = " [IMPORTANT]" if result.get("isImportant") else ""
print(f" Retention : {result.get('retentionDays')} days{important_tag}")
print(f" Rollback : available (use restore {result.get('recordId')})")
if result.get("sizeWarning"):
print(f" ⚠ WARNING : {result.get('sizeWarningMessage')}")
else:
print("✓ File change recorded!")
print(f" Record ID : {result.get('recordId')}")
print(f" Summary : {result.get('summary')}")
important_tag = " [IMPORTANT]" if result.get("isImportant") else ""
print(f" Retention : {result.get('retentionDays')} days{important_tag}")
if result.get("canRollback"):
print(f" Rollback : available (use restore {result.get('recordId')})")
print_due_for_cleanup(result.get("due_for_cleanup", []))
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "delete":
if not args.file:
print("Error: Please specify a file to delete")
return 1
if not os.path.exists(args.file):
print(f"Error: File not found: {args.file}")
return 1
result = vcs.record_delete(args.file)
if result.get("skipped") and not result.get("success"):
print(f"~ Skipped: {result.get('reason')}")
elif result.get("success"):
print("✓ File deleted and backed up!")
print(f" Record ID : {result.get('recordId')}")
print(f" Backup : {result.get('trashFile')}")
important_tag = " [IMPORTANT]" if result.get("isImportant") else ""
print(f" Retention : {result.get('retentionDays')} days{important_tag}")
print_due_for_cleanup(result.get("due_for_cleanup", []))
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "history":
records = vcs.get_history(args.file, args.limit)
if args.file:
print(f"\n=== History for: {args.file} ===")
else:
print(f"\n=== All History (latest {args.limit} records) ===")
print_records(records, args.show_diff, vcs)
elif args.command == "restore":
if not args.file:
print("Error: Please specify a record ID to restore/rollback")
return 1
result = vcs.restore_file(args.file)
if result.get("success"):
print(f"✓ {result.get('message')}")
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "expired":
if not args.file:
print("Error: Please specify number of days")
return 1
try:
days = int(args.file)
except ValueError:
print("Error: Days must be a number")
return 1
if args.clean:
result = vcs.clean_expired(days, args.delete_files)
print(f"\n=== Cleaned Expired Records (>{days} days) ===")
print(f"Records deleted: {result.get('deleted_count')}")
if args.delete_files and result.get("deleted_files"):
print(f"Diff files deleted: {len(result.get('deleted_files', []))}")
else:
records = vcs.query_expired(days)
print(f"\n=== Expired Records (>{days} days) ===")
print(f"Total: {len(records)} records")
print_records(records)
elif args.command == "trash":
files = vcs.list_trash()
print("\n=== Trash Bin ===")
if not files:
print("Trash is empty.")
return 0
for i, f in enumerate(files):
important_tag = " [IMPORTANT]" if f.get("isImportant") else ""
print(f"\n[{i + 1}] {f.get('filePath')}{important_tag}")
print(f" Deleted at : {f.get('datetime')}")
print(f" Expires at : {f.get('expireAtDatetime', 'N/A')}")
print(f" Record ID : {f.get('recordId')}")
elif args.command == "remove":
if not args.file:
print("Error: Please specify a record ID to remove")
print(" Use 'history' command to see record IDs")
return 1
result = vcs.delete_record_by_id(args.file)
if result.get("success"):
print("✓ Record removed successfully!")
print(f" {result.get('message')}")
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "cleanup":
due = vcs.query_due_for_cleanup()
if not due:
print("✓ No records due for cleanup.")
return 0
print(f"\n=== Records Due for Cleanup ({len(due)} total, oldest first) ===")
print_records(due, vcs=vcs)
if args.confirm:
record_ids = [rid for r in due if isinstance((rid := r.get("recordId")), str)]
result = vcs.delete_due_records(record_ids)
print(f"\n✓ Cleanup complete: {result.get('deleted_count')} records deleted.")
if result.get("failed_records"):
print(f" Failed: {result.get('failed_records')}")
else:
print("\n To delete these records, run with --confirm")
print(" To postpone a record, run: extend <record_id>")
elif args.command == "extend":
if not args.file:
print("Error: Please specify a record ID to extend")
return 1
result = vcs.extend_record_expiry(args.file)
if result.get("success"):
print("✓ Expiry extended!")
print(f" Record ID : {result.get('recordId')}")
print(f" New expire at : {result.get('newExpireAtDatetime')}")
print(f" {result.get('message')}")
else:
print(f"✗ Failed: {result.get('error')}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
帮助用户使用openclaw时提供文件保护,所有文件操作均建立版本索引,支持删除恢复和修改回滚
---
name: mmxagent-guardian
version: 1.0.0
description: 帮助用户使用openclaw时提供文件保护,所有文件操作均建立版本索引,支持删除恢复和修改回滚
---
# 文件保护与版本管理
本 Skill 使用 MiniVCS 对所有文件操作进行版本追踪。
**脚本位置**:与本 SKILL.md 同目录下的 `scripts/minivcs/minivcs.py`
(执行前需先确定本文件所在目录,下文以 `$SKILL_DIR` 表示)
**核心机制:**
- **修改**:保存增量 Diff + 修改前完整快照,**支持回滚**
- **删除**:将文件完整移入 `~/.openclaw/minivcs/trash/`,**支持恢复**
- **所有文件均建立索引**,根据重要程度设定不同留存期
- **每次操作后自动扫描**整体记录表,找出已到期记录并通知用户确认
**使用限制:**
- **二进制文件(图片、PDF、音视频等)**:无法进行文本 diff,改为直接将文件完整复制一份存为 `.bak` 备份,支持回滚还原,原文件不受影响。需告知用户:二进制文件每次调用 `record` 都会保存一份完整副本,占用空间相对较大。
- **首条记录无法回滚**:一个文件第一次被 record 时尚无历史基准,该条记录没有快照;此后每次编辑后调用一次 `record_modify`,均会自动保存快照,支持回滚到任意一次编辑前的状态
---
## 前置检查:确认 Python 已安装
**在执行任何操作前,必须先检查用户环境是否安装了 Python 3。**
### 检测方式
```bash
# macOS / Linux
python3 --version
# Windows(PowerShell)
python --version
```
- 输出 `Python 3.x.x` 即为已安装,可继续
- 提示 `command not found` 或 `不是内部或外部命令` 则需要帮用户安装
### 安装指引(未安装时)
#### macOS
推荐通过 Homebrew 安装:
```bash
# 先检查是否有 Homebrew
brew --version
# 有 Homebrew:直接安装
brew install python3
# 没有 Homebrew:先安装 Homebrew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
brew install python3
```
**如果用户在中国大陆**,访问 GitHub 速度慢,安装前先配置镜像源加速:
```bash
# 使用清华大学镜像安装 Homebrew
/bin/bash -c "$(curl -fsSL https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/install.sh)"
# 安装完成后,替换 Homebrew 核心源为清华镜像
export HOMEBREW_API_DOMAIN="https://mirrors.tuna.tsinghua.edu.cn/homebrew-bottles/api"
export HOMEBREW_BOTTLE_DOMAIN="https://mirrors.tuna.tsinghua.edu.cn/homebrew-bottles"
export HOMEBREW_BREW_GIT_REMOTE="https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/brew.git"
export HOMEBREW_CORE_GIT_REMOTE="https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/homebrew-core.git"
# 将上面的环境变量写入 shell 配置文件,永久生效
echo 'export HOMEBREW_API_DOMAIN="https://mirrors.tuna.tsinghua.edu.cn/homebrew-bottles/api"' >> ~/.zshrc
echo 'export HOMEBREW_BOTTLE_DOMAIN="https://mirrors.tuna.tsinghua.edu.cn/homebrew-bottles"' >> ~/.zshrc
echo 'export HOMEBREW_BREW_GIT_REMOTE="https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/brew.git"' >> ~/.zshrc
echo 'export HOMEBREW_CORE_GIT_REMOTE="https://mirrors.tuna.tsinghua.edu.cn/git/homebrew/homebrew-core.git"' >> ~/.zshrc
source ~/.zshrc
# 再安装 Python
brew install python3
```
> 如果用户使用 bash 而非 zsh,把上面的 `~/.zshrc` 替换为 `~/.bash_profile`。
> 中国大陆镜像源参考:清华 https://mirrors.tuna.tsinghua.edu.cn/help/homebrew/
也可引导用户前往官网下载安装包:https://www.python.org/downloads/macos/
#### Linux(Debian / Ubuntu)
```bash
sudo apt update && sudo apt install -y python3 python3-pip
```
#### Linux(CentOS / RHEL / Fedora)
```bash
# CentOS / RHEL
sudo yum install -y python3
# Fedora
sudo dnf install -y python3
```
#### Windows
推荐引导用户从官网下载安装:https://www.python.org/downloads/windows/
安装时勾选 **"Add Python to PATH"**,否则命令行无法识别 `python` 命令。
安装完成后在 PowerShell 中验证:
```powershell
python --version
pip --version
```
> **注意**:Windows 上部分系统命令为 `python`,macOS/Linux 为 `python3`。
> 后续所有命令中的 `python` 请根据实际环境替换为正确的命令名。
---
## 留存策略
| 文件类型 | 留存天数 | 判定规则 |
|---------|---------|---------|
| 重要文件 | **14 天** | 系统路径(`/etc/`、`/root/`、`~` 目录)、Windows C 盘、配置文件(`.yaml/.toml/.env` 等)、入口文件(`main.py/index.ts` 等) |
| 普通文件 | **7 天** | 其余所有文件 |
每条记录创建时自动设置 `expireAt`(到期时间戳)和 `expireAtDatetime`(可读时间)字段。
---
## 操作流程
### 第一步:初始化 MiniVCS 工作目录
首次使用时自动创建存储目录,无需额外操作:
```bash
python "$SKILL_DIR/scripts/minivcs/minivcs.py" history --project-root <project_root>
```
数据统一存储在 `~/.openclaw/minivcs/`:
```
~/.openclaw/minivcs/
logs.json # 操作日志(含 expireAt 字段)
diffs/ # 文本文件修改的增量补丁
bases/ # 下次比较的基准(按完整相对路径命名,无同名冲突)
snapshots/ # 文本文件修改前的完整快照(用于回滚)
trash/ # 已删除文件的完整备份(用于恢复)
backups/ # 二进制文件的 .bak 完整副本备份(用于回滚)
```
---
### 第二步:操作前询问用户确认
**删除或修改文件前,必须向用户说明以下内容,并等待确认:**
1. 要操作的文件路径
2. 操作类型(修改 / 删除)
3. 操作目的与意图
4. 可能产生的影响
5. **告知保护范围:只有文本文件会被自动备份和追踪;若涉及二进制文件(图片、PDF 等),不会被记录,需用户自行保管**
6. **告知文本文件已纳入版本记录,可随时恢复/回滚**
操作前示例:
```
我即将对以下文件进行操作,请确认:
- 文件:/path/to/file.py
- 操作:删除
- 原因:该文件已被新版本替代,不再使用
- 影响:需确认没有其他模块导入此文件
- 保护:操作完成后将自动备份,留存 7 天(重要文件 14 天),期间可随时恢复
是否确认继续?
```
操作完成后,**必须告知用户记录结果**,示例:
```
# 修改完成后
已完成对 path/to/file.py 的修改,并记录了本次变更(Record ID: 1710000000000)。
- 变更摘要:+5 lines, -2 lines
- 留存期:7 天(到期时间:2026-03-20 10:00:00)
- 可回滚:是(使用 restore 1710000000000 可恢复到修改前状态)
如需查看 Diff 或回滚,请告诉我。
# 删除完成后
已将 path/to/file.py 移入回收站并建立备份(Record ID: 1710000001000)。
- 留存期:14 天(到期时间:2026-03-27 10:00:00)[重要文件]
如需恢复该文件,请告诉我。
```
---
### 第三步:使用 MiniVCS 操作文件
#### 修改文件(支持回滚)
**每次编辑后调用一次 `record` 即可**,快照链自动形成,支持回滚到任意历史状态:
```
初次使用该文件 → record() → base 建立(无快照,首条记录不可回滚)
编辑 → C1 → record() → snapshot=初始内容, R1 可回滚
编辑 → C2 → record() → snapshot=C1, R2 可回滚 → 恢复 R2 得到 C1
编辑 → C3 → record() → snapshot=C2, R3 可回滚 → 恢复 R3 得到 C2(即 t2 时刻)
```
```bash
# 每次编辑完成后执行一次
python "$SKILL_DIR/scripts/minivcs/minivcs.py" record <file_path> --project-root <project_root>
# 输出含 "Rollback: available" 表示快照已保存,可回滚
```
Python API:
```python
import sys, os
sys.path.insert(0, os.path.join(SKILL_DIR, "scripts", "minivcs"))
from minivcs import MiniVCS
vcs = MiniVCS(project_root="/path/to/project")
# 每次编辑后调用一次
result = vcs.record_modify("path/to/file.py")
# result 包含:
# {
# "success": True,
# "recordId": "...",
# "summary": "+5 lines, -2 lines",
# "canRollback": True, # 有快照时为 True,首条记录为 False
# "snapshotFile": "...",
# "isImportant": False,
# "retentionDays": 7,
# "due_for_cleanup": [...]
# }
```
#### 删除文件(支持恢复)
**不要直接删除文件**,改用 `record_delete`,文件自动移入 trash:
```bash
python "$SKILL_DIR/scripts/minivcs/minivcs.py" delete <file_path> --project-root <project_root>
```
Python API:
```python
result = vcs.record_delete("path/to/file.py")
# result 包含:
# {
# "success": True,
# "recordId": "...",
# "trashFile": "~/.openclaw/minivcs/trash/1234567_file.py.bak",
# "isImportant": True,
# "retentionDays": 14,
# "due_for_cleanup": [...]
# }
```
#### 恢复/回滚
同一命令 `restore` 处理两种场景:
```bash
# DELETE 记录 → 恢复文件到原路径
# MODIFY 记录(含快照)→ 回滚到修改前状态
python "$SKILL_DIR/scripts/minivcs/minivcs.py" restore <record_id> --project-root <project_root>
```
Python API:
```python
result = vcs.restore_file(record_id="...")
# DELETE 成功后:记录标记为 RESTORED,不再出现在 trash 列表
# MODIFY 成功后:记录标记为 ROLLED_BACK
# 重复恢复:返回 {"success": False, "error": "already been restored/rolled_back"}
```
---
### 第四步:处理到期清理通知(每次操作后必须执行)
每次调用 `record_modify` 或 `record_delete` 后,返回值中包含 `due_for_cleanup` 字段。
**若 `due_for_cleanup` 不为空,Agent 必须:**
1. 向用户展示到期记录列表(从早到晚排序)
2. 询问用户哪些可以删除、哪些需要延期
展示格式示例:
```
以下 N 条历史记录已到期,请确认是否可以清理:
[1] ID=1710000000000 文件=src/old.py 操作=MODIFY
记录时间=2026-03-01 10:00:00 到期时间=2026-03-08 10:00:00
[2] ID=1710000001000 文件=config.yaml 操作=DELETE [重要文件]
记录时间=2026-02-28 09:00:00 到期时间=2026-03-14 09:00:00
请问这些记录可以删除吗?(可以全部删除 / 指定哪些延期)
```
用户响应后的处理:
```python
# 用户确认全部删除
result = vcs.delete_due_records(record_ids=["id1", "id2"])
# 用户说某条暂不删除 → 延后一个留存周期(7天或14天)
result = vcs.extend_record_expiry(record_id="id1")
```
命令行:
```bash
# 查看所有到期记录
python "$SKILL_DIR/scripts/minivcs/minivcs.py" cleanup --project-root <project_root>
# 确认清理
python "$SKILL_DIR/scripts/minivcs/minivcs.py" cleanup --confirm --project-root <project_root>
# 延期某条记录
python "$SKILL_DIR/scripts/minivcs/minivcs.py" extend <record_id> --project-root <project_root>
```
---
## 其他常用操作
```bash
# 查看操作历史(标注 [rollback available] 的记录可回滚)
python "$SKILL_DIR/scripts/minivcs/minivcs.py" history --project-root <project_root>
python "$SKILL_DIR/scripts/minivcs/minivcs.py" history <file_path> -d --project-root <project_root>
# 查看 trash 中尚未恢复的文件
python "$SKILL_DIR/scripts/minivcs/minivcs.py" trash --project-root <project_root>
# 删除指定记录(日志 + 物理文件一并清理)
python "$SKILL_DIR/scripts/minivcs/minivcs.py" remove <record_id> --project-root <project_root>
```
---
## 记录字段说明
| 字段 | 类型 | 说明 |
|------|------|------|
| `recordId` | string | 唯一记录 ID(毫秒时间戳) |
| `filePath` | string | 相对或绝对路径 |
| `action` | string | `MODIFY` / `DELETE` / `BINARY_BACKUP` / `RESTORED` / `ROLLED_BACK` |
| `timestamp` | number | 创建时间(毫秒) |
| `datetime` | string | 创建时间(可读) |
| `isImportant` | bool | 是否为重要文件 |
| `retentionDays` | number | 留存天数(7 或 14) |
| `expireAt` | number | 到期时间(毫秒) |
| `expireAtDatetime` | string | 到期时间(可读) |
| `diffFile` | string | Diff 补丁路径(MODIFY 专有) |
| `snapshotFile` | string | 修改前快照路径(MODIFY 有快照时才有) |
| `trashFile` | string | Trash 备份路径(DELETE 专有) |
| `backupFile` | string | `.bak` 副本路径(BINARY_BACKUP 专有) |
| `summary` | string | 变更摘要 |
---
## 完整操作流程图
```
用户发起修改/删除请求
│
▼
[询问确认] 说明操作目的、影响、已有保护 → 等待用户确认
│
├─── 修改文件 ─────────────────────────────────────────┐
│ 1. 执行实际修改 │
│ 2. record_modify(Diff + 快照 → canRollback=True) │
│ 快照 = 本次修改前内容,可逐级回滚历史状态 │
│ │
└─── 删除文件 ─────────────────────────────────────────┤
record_delete(移入 trash,不直接删除) │
│
┌──────────────────────────────────┘
▼
告知用户:Record ID / 留存期 / 可回滚/可恢复
│
▼
检查返回值 due_for_cleanup
│
┌───────────┴───────────┐
有到期记录 无到期记录
│ │
▼ ▼
展示到期记录(从早到晚) 操作完成
询问用户确认
│
┌───────┴──────────┐
确认删除 暂不删除
│ │
▼ ▼
delete_due_records extend_record_expiry
(延后一个留存周期)
------- 用户需要回滚/恢复时 -------
用户:"帮我恢复/回滚 xxx"
│
▼
restore_file(record_id)
├── DELETE 记录 → 从 trash 恢复,标记 RESTORED
└── MODIFY 记录(有快照)→ 写回快照内容,标记 ROLLED_BACK
```
FILE:scripts/minivcs/minivcs.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MiniVCS Skill - 轻量级文件版本管理系统
- 修改:保存增量 Diff + 修改前完整快照(支持回滚)
- 删除:保存完整文件到 trash(支持恢复)
留存策略:
- 普通文件:7 天
- 重要文件:14 天(系统路径、配置文件、入口文件等)
每次 record 操作后,自动扫描整体记录表(从早到晚),
返回已到期可清理的记录列表,供 Agent 向用户确认。
"""
import os
import sys
import json
import time
import shutil
import difflib
import argparse
from datetime import datetime
from typing import List, Dict, Any, Optional
# -------------------------------------------------------------------
# 重要文件判定规则
# -------------------------------------------------------------------
IMPORTANT_PATH_PREFIXES = [
"/etc/",
"/root/",
"/usr/local/etc/",
"/opt/",
]
IMPORTANT_FILENAME_PATTERNS = [
# 配置文件后缀
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".env",
# 常见入口文件名(精确匹配)
"main.py",
"app.py",
"server.py",
"wsgi.py",
"asgi.py",
"index.ts",
"index.js",
"main.ts",
"main.go",
"main.rs",
"manage.py",
"settings.py",
"config.py",
"dockerfile",
"docker-compose.yml",
"docker-compose.yaml",
"makefile",
"cmakelists.txt",
]
RETENTION_NORMAL_DAYS = 7
RETENTION_IMPORTANT_DAYS = 14
def is_important_file(file_path: str) -> bool:
"""判断文件是否为重要文件(需要更长留存期)"""
abs_path = os.path.abspath(file_path)
home_dir = os.path.expanduser("~")
# macOS/Linux:home 目录下的文件视为重要
if abs_path.startswith(home_dir + os.sep):
return True
# Windows:C 盘视为重要
if sys.platform == "win32" and (abs_path.lower().startswith("c:\\") or abs_path.lower().startswith("c:/")):
return True
# 系统路径前缀
for prefix in IMPORTANT_PATH_PREFIXES:
if abs_path.startswith(prefix):
return True
# 文件名模式(小写匹配)
filename = os.path.basename(abs_path).lower()
for pattern in IMPORTANT_FILENAME_PATTERNS:
if filename == pattern or filename.endswith(pattern):
return True
return False
def get_retention_days(file_path: str) -> int:
"""根据文件重要性返回留存天数"""
return RETENTION_IMPORTANT_DAYS if is_important_file(file_path) else RETENTION_NORMAL_DAYS
def _make_safe_path(relative_path: str) -> str:
"""
将相对路径转换为可用于文件名的安全字符串。
使用完整相对路径(非仅文件名),避免同名文件冲突。
"""
return relative_path.replace(os.sep, "__").replace("/", "__")
# -------------------------------------------------------------------
# LogManager
# -------------------------------------------------------------------
class LogManager:
"""日志管理器 - 负责JSON格式的变更记录读写操作"""
def __init__(self, vcs_root: str):
self.vcs_root = vcs_root
self.log_file = os.path.join(vcs_root, "logs.json")
self._ensure_log_file()
def _ensure_log_file(self):
if not os.path.exists(self.log_file):
os.makedirs(os.path.dirname(self.log_file), exist_ok=True)
with open(self.log_file, "w", encoding="utf-8") as f:
json.dump({"version": "1.0", "records": []}, f, ensure_ascii=False, indent=2)
def _read_log(self) -> Dict[str, Any]:
try:
with open(self.log_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
return {"version": "1.0", "records": []}
def _write_log(self, data: Dict[str, Any]):
# 原子写入:先写临时文件,再 rename,防止进程崩溃损坏 logs.json
tmp_file = self.log_file + ".tmp"
with open(tmp_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_file, self.log_file)
def append_record(self, record: Dict[str, Any]) -> str:
data = self._read_log()
record_id = record.get("recordId", f"{int(time.time() * 1000)}")
record["recordId"] = record_id
now_ms = int(time.time() * 1000)
if "timestamp" not in record:
record["timestamp"] = now_ms
if "datetime" not in record:
record["datetime"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 若未设置 expireAt,根据 retentionDays 自动计算
if "expireAt" not in record:
retention_days = record.get("retentionDays", RETENTION_NORMAL_DAYS)
record["expireAt"] = record["timestamp"] + retention_days * 24 * 60 * 60 * 1000
if "expireAtDatetime" not in record:
record["expireAtDatetime"] = datetime.fromtimestamp(record["expireAt"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
data["records"].append(record)
self._write_log(data)
return record_id
def update_record(self, record_id: str, updates: Dict[str, Any]) -> bool:
"""更新指定记录的字段"""
data = self._read_log()
for record in data["records"]:
if record.get("recordId") == record_id:
record.update(updates)
self._write_log(data)
return True
return False
def get_history_by_file(self, file_path: str, descending: bool = True) -> List[Dict[str, Any]]:
data = self._read_log()
records = [r for r in data["records"] if r.get("filePath") == file_path]
records.sort(key=lambda x: x.get("timestamp", 0), reverse=descending)
return records
def get_all_history(self, descending: bool = True) -> List[Dict[str, Any]]:
data = self._read_log()
records = list(data["records"])
records.sort(key=lambda x: x.get("timestamp", 0), reverse=descending)
return records
def get_due_for_cleanup(self) -> List[Dict[str, Any]]:
"""
返回已到期(expireAt <= 当前时间)的记录,按 expireAt 从早到晚排序。
跳过已恢复/已回滚的记录。
"""
data = self._read_log()
current_time = int(time.time() * 1000)
skip_actions = {"RESTORED", "ROLLED_BACK"}
due = [
r
for r in data["records"]
if r.get("expireAt") is not None and r["expireAt"] <= current_time and r.get("action") not in skip_actions
]
due.sort(key=lambda x: x.get("expireAt", 0))
return due
def extend_record_expiry(self, record_id: str, additional_ms: int) -> bool:
"""将指定记录的 expireAt 延后 additional_ms 毫秒"""
data = self._read_log()
for record in data["records"]:
if record.get("recordId") == record_id:
record["expireAt"] = record.get("expireAt", int(time.time() * 1000)) + additional_ms
record["expireAtDatetime"] = datetime.fromtimestamp(record["expireAt"] / 1000).strftime("%Y-%m-%d %H:%M:%S")
self._write_log(data)
return True
return False
def get_expired_records(self, days: int) -> List[Dict[str, Any]]:
"""兼容旧接口:按固定天数查过期记录"""
data = self._read_log()
current_time = int(time.time() * 1000)
expire_ms = days * 24 * 60 * 60 * 1000
return [r for r in data["records"] if current_time - r.get("timestamp", 0) > expire_ms]
def delete_record(self, record_id: str) -> bool:
data = self._read_log()
original_count = len(data["records"])
data["records"] = [r for r in data["records"] if r.get("recordId") != record_id]
if len(data["records"]) < original_count:
self._write_log(data)
return True
return False
def clean_expired_records(self, days: int) -> Dict[str, Any]:
expired = self.get_expired_records(days)
deleted_count = sum(1 for r in expired if self.delete_record(r.get("recordId", "")))
return {"deleted_count": deleted_count, "deleted_records": expired}
def get_record_by_id(self, record_id: str) -> Optional[Dict[str, Any]]:
data = self._read_log()
for record in data["records"]:
if record.get("recordId") == record_id:
return record
return None
# -------------------------------------------------------------------
# DiffEngine
# -------------------------------------------------------------------
class DiffEngine:
"""Diff引擎 - 生成文件变更差异"""
@staticmethod
def generate_diff(old_content: str, new_content: str, context: int = 3) -> str:
old_lines = old_content.splitlines(keepends=True)
new_lines = new_content.splitlines(keepends=True)
diff = difflib.unified_diff(
old_lines,
new_lines,
fromfile="original",
tofile="modified",
lineterm="\n",
n=context,
)
return "".join(diff)
@staticmethod
def parse_diff_summary(diff_text: str) -> Dict[str, int]:
added = removed = 0
for line in diff_text.splitlines():
if line.startswith("+") and not line.startswith("+++"):
added += 1
elif line.startswith("-") and not line.startswith("---"):
removed += 1
return {
"added": added,
"removed": removed,
"summary": f"+{added} lines, -{removed} lines" if added or removed else "No changes",
}
# -------------------------------------------------------------------
# FileManager
# -------------------------------------------------------------------
class FileManager:
"""文件管理器 - 负责物理文件操作"""
def __init__(self, vcs_root: str, project_root: str):
self.vcs_root = vcs_root
self.project_root = project_root
self.trash_dir = os.path.join(vcs_root, "trash")
self.diffs_dir = os.path.join(vcs_root, "diffs")
self.bases_dir = os.path.join(vcs_root, "bases")
self.snapshots_dir = os.path.join(vcs_root, "snapshots") # 文本文件修改前完整快照,用于回滚
self.backups_dir = os.path.join(vcs_root, "backups") # 二进制文件完整副本备份
os.makedirs(self.trash_dir, exist_ok=True)
os.makedirs(self.diffs_dir, exist_ok=True)
os.makedirs(self.bases_dir, exist_ok=True)
os.makedirs(self.snapshots_dir, exist_ok=True)
os.makedirs(self.backups_dir, exist_ok=True)
def get_file_content(self, file_path: str) -> Optional[str]:
if not os.path.exists(file_path):
return None
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
try:
with open(file_path, "rb") as f:
return f.read().decode("utf-8", errors="ignore")
except Exception:
return None
def is_binary_file(self, file_path: str) -> bool:
"""检测是否为二进制文件(简单启发式:读取开头1KB,检查是否含有 NUL 字节)"""
try:
with open(file_path, "rb") as f:
chunk = f.read(1024)
return b"\x00" in chunk
except Exception:
return False
def save_to_trash(self, file_path: str) -> str:
"""将删除的文件移入 trash 目录(保留完整内容)"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
timestamp = int(time.time() * 1000)
filename = os.path.basename(file_path)
trash_filename = f"{timestamp}_{filename}.bak"
trash_path = os.path.join(self.trash_dir, trash_filename)
shutil.move(file_path, trash_path)
return trash_path
def restore_from_trash(self, trash_path: str, target_path: str) -> bool:
"""从 trash 目录恢复文件(move,原备份消失)"""
try:
if not os.path.exists(trash_path):
return False
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.move(trash_path, target_path)
return True
except Exception:
return False
def save_binary_backup(self, file_path: str) -> str:
"""将二进制文件完整复制到 backups 目录,以 .bak 扩展名保存(不删除原文件)"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
timestamp = int(time.time() * 1000)
filename = os.path.basename(file_path)
backup_filename = f"{timestamp}_{filename}.bak"
backup_path = os.path.join(self.backups_dir, backup_filename)
shutil.copy2(file_path, backup_path)
return backup_path
def restore_binary_backup(self, backup_path: str, target_path: str) -> bool:
"""从 backups 目录将 .bak 文件复制回原路径(覆盖当前文件,保留备份)"""
try:
if not os.path.exists(backup_path):
return False
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy2(backup_path, target_path)
return True
except Exception:
return False
def save_diff(self, relative_path: str, diff_content: str) -> str:
"""保存差异补丁到 diffs 目录,使用完整相对路径命名(避免同名文件冲突)"""
timestamp = int(time.time() * 1000)
safe_path = _make_safe_path(relative_path)
diff_path = os.path.join(self.diffs_dir, f"{timestamp}_{safe_path}.patch")
with open(diff_path, "w", encoding="utf-8") as f:
f.write(diff_content)
return diff_path
def save_snapshot(self, relative_path: str, content: str) -> str:
"""保存文件修改前的完整快照,用于回滚"""
timestamp = int(time.time() * 1000)
safe_path = _make_safe_path(relative_path)
snap_path = os.path.join(self.snapshots_dir, f"{timestamp}_{safe_path}.snap")
with open(snap_path, "w", encoding="utf-8") as f:
f.write(content)
return snap_path
def save_base(self, relative_path: str, content: str) -> str:
"""保存当前内容作为下次比较的基准,使用完整相对路径命名"""
safe_path = _make_safe_path(relative_path)
base_path = os.path.join(self.bases_dir, f"{safe_path}.base")
with open(base_path, "w", encoding="utf-8") as f:
f.write(content)
return base_path
def get_base(self, relative_path: str) -> Optional[str]:
"""获取上一次记录的基准内容"""
safe_path = _make_safe_path(relative_path)
base_path = os.path.join(self.bases_dir, f"{safe_path}.base")
return self.get_file_content(base_path)
def delete_base(self, relative_path: str) -> bool:
"""删除基准文件"""
safe_path = _make_safe_path(relative_path)
base_path = os.path.join(self.bases_dir, f"{safe_path}.base")
if os.path.exists(base_path):
os.remove(base_path)
return True
return False
# -------------------------------------------------------------------
# MiniVCS
# -------------------------------------------------------------------
class MiniVCS:
"""MiniVCS 核心控制器"""
def __init__(self, project_root: str, vcs_root: Optional[str] = None):
self.project_root = os.path.abspath(project_root)
# 默认存储在 ~/.openclaw/minivcs/,与项目目录解耦
if vcs_root is not None:
self.vcs_root = os.path.abspath(vcs_root)
else:
self.vcs_root = os.path.join(os.path.expanduser("~"), ".openclaw", "minivcs")
self.log_manager = LogManager(self.vcs_root)
self.diff_engine = DiffEngine()
self.file_manager = FileManager(self.vcs_root, self.project_root)
def _get_relative_path(self, absolute_path: str) -> str:
if absolute_path.startswith(self.project_root + os.sep):
return absolute_path[len(self.project_root) + 1 :]
return absolute_path
# ------------------------------------------------------------------
# 核心操作
# ------------------------------------------------------------------
def record_modify(self, file_path: str) -> Dict[str, Any]:
"""
记录文件修改(增量 Diff + 修改前快照)。
每次编辑后调用一次即可,无需"修改前后各调一次":
- 首次调用(无 base):将当前内容保存为基准,本条记录无快照
- 后续调用(有 base):diff = base→当前,snapshot = base(即修改前内容),更新 base
调用后自动返回已到期待清理的记录列表。
快照链示意(每次编辑后调一次):
初始 C0 → record() → base=C0
编辑C1 → record() → snapshot=C0, base=C1 (R1, 可回滚到C0)
编辑C2 → record() → snapshot=C1, base=C2 (R2, 可回滚到C1=t1状态)
编辑C3 → record() → snapshot=C2, base=C3 (R3, 可回滚到C2=t2状态)
"""
abs_path = os.path.abspath(file_path)
if not os.path.exists(abs_path):
return {"success": False, "error": f"File not found: {file_path}"}
# 二进制文件:无法 diff,改用完整副本备份(复制为 .bak,不删除原文件)
if self.file_manager.is_binary_file(abs_path):
return self._record_binary_backup(abs_path)
try:
new_content = self.file_manager.get_file_content(abs_path)
if new_content is None:
return {"success": False, "error": "Cannot read file content"}
relative_path = self._get_relative_path(abs_path)
prev_base = self.file_manager.get_base(relative_path)
old_content = prev_base if prev_base is not None else ""
diff_text = self.diff_engine.generate_diff(old_content, new_content)
diff_summary = self.diff_engine.parse_diff_summary(diff_text)
if old_content != "" and diff_summary["added"] == 0 and diff_summary["removed"] == 0:
due = self.query_due_for_cleanup()
return {"success": True, "message": "No changes detected", "skipped": True, "due_for_cleanup": due}
diff_path = self.file_manager.save_diff(relative_path, diff_text)
# 有基准时,old_content 就是修改前的内容,保存为快照供回滚使用
snap_path = None
if old_content:
snap_path = self.file_manager.save_snapshot(relative_path, old_content)
# 更新基准为当前内容
self.file_manager.save_base(relative_path, new_content)
retention_days = get_retention_days(abs_path)
important = is_important_file(abs_path)
record: Dict[str, Any] = {
"filePath": relative_path,
"action": "MODIFY",
"diffFile": diff_path,
"summary": diff_summary["summary"],
"linesAdded": diff_summary["added"],
"linesRemoved": diff_summary["removed"],
"isImportant": important,
"retentionDays": retention_days,
}
if snap_path:
record["snapshotFile"] = snap_path
record_id = self.log_manager.append_record(record)
due = self.query_due_for_cleanup()
return {
"success": True,
"recordId": record_id,
"summary": diff_summary["summary"],
"diffFile": diff_path,
"snapshotFile": snap_path,
"canRollback": snap_path is not None,
"isImportant": important,
"retentionDays": retention_days,
"due_for_cleanup": due,
}
except Exception as e:
return {"success": False, "error": str(e)}
def _record_binary_backup(self, abs_path: str) -> Dict[str, Any]:
"""二进制文件备份:将当前文件完整复制为 .bak,保留原文件,支持后续还原。"""
try:
relative_path = self._get_relative_path(abs_path)
important = is_important_file(abs_path)
retention_days = get_retention_days(abs_path)
backup_path = self.file_manager.save_binary_backup(abs_path)
file_size = os.path.getsize(abs_path)
record = {
"filePath": relative_path,
"action": "BINARY_BACKUP",
"backupFile": backup_path,
"summary": f"Binary file backup ({file_size} bytes)",
"isImportant": important,
"retentionDays": retention_days,
}
record_id = self.log_manager.append_record(record)
due = self.query_due_for_cleanup()
return {
"success": True,
"recordId": record_id,
"backupFile": backup_path,
"summary": record["summary"],
"canRollback": True,
"isImportant": important,
"retentionDays": retention_days,
"due_for_cleanup": due,
}
except Exception as e:
return {"success": False, "error": str(e)}
def record_delete(self, file_path: str) -> Dict[str, Any]:
"""
记录文件删除(文件移入 trash)。
调用后自动返回已到期待清理的记录列表。
"""
abs_path = os.path.abspath(file_path)
if not os.path.exists(abs_path):
return {"success": False, "error": f"File not found: {file_path}"}
try:
relative_path = self._get_relative_path(abs_path)
important = is_important_file(abs_path)
retention_days = get_retention_days(abs_path)
trash_path = self.file_manager.save_to_trash(abs_path)
record = {
"filePath": relative_path,
"action": "DELETE",
"trashFile": trash_path,
"summary": "File moved to trash",
"isImportant": important,
"retentionDays": retention_days,
}
record_id = self.log_manager.append_record(record)
due = self.query_due_for_cleanup()
return {
"success": True,
"recordId": record_id,
"trashFile": trash_path,
"message": "File deleted and backed up to trash",
"isImportant": important,
"retentionDays": retention_days,
"due_for_cleanup": due,
}
except Exception as e:
return {"success": False, "error": str(e)}
# ------------------------------------------------------------------
# 恢复 / 回滚
# ------------------------------------------------------------------
def restore_file(self, record_id: str) -> Dict[str, Any]:
"""
恢复文件:
- DELETE 记录:从 trash 恢复完整文件
- MODIFY 记录:从修改前快照回滚到修改前状态
成功后将记录标记为 RESTORED / ROLLED_BACK。
"""
record = self.log_manager.get_record_by_id(record_id)
if not record:
return {"success": False, "error": "Record not found"}
action = record.get("action")
if action == "DELETE":
trash_file = record.get("trashFile")
if not trash_file or not os.path.exists(trash_file):
return {"success": False, "error": "Backup file not found (may have been cleaned up)"}
# filePath 可能是相对路径或绝对路径
file_path = record.get("filePath", "")
target_path = file_path if os.path.isabs(file_path) else os.path.join(self.project_root, file_path)
success = self.file_manager.restore_from_trash(trash_file, target_path)
if success:
# 标记记录为已恢复,防止重复恢复或出现在 trash 列表
self.log_manager.update_record(
record_id,
{
"action": "RESTORED",
"restoredAt": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"trashFile": None,
},
)
return {"success": True, "message": f"File restored to: {target_path}"}
return {"success": False, "error": "Failed to restore file from trash"}
elif action == "MODIFY":
snap_file = record.get("snapshotFile")
if not snap_file or not os.path.exists(snap_file):
return {
"success": False,
"error": (
"No pre-edit snapshot available for this record. "
"Rollback is only possible when record_modify was called "
"both before AND after the edit."
),
}
content = self.file_manager.get_file_content(snap_file)
if content is None:
return {"success": False, "error": "Cannot read snapshot file"}
file_path = record.get("filePath", "")
target_path = file_path if os.path.isabs(file_path) else os.path.join(self.project_root, file_path)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
with open(target_path, "w", encoding="utf-8") as f:
f.write(content)
self.log_manager.update_record(
record_id,
{
"action": "ROLLED_BACK",
"rolledBackAt": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
)
return {"success": True, "message": f"File rolled back to pre-edit state: {target_path}"}
elif action == "BINARY_BACKUP":
backup_file = record.get("backupFile")
if not backup_file or not os.path.exists(backup_file):
return {"success": False, "error": "Backup file not found (may have been cleaned up)"}
file_path = record.get("filePath", "")
target_path = file_path if os.path.isabs(file_path) else os.path.join(self.project_root, file_path)
success = self.file_manager.restore_binary_backup(backup_file, target_path)
if success:
self.log_manager.update_record(
record_id,
{
"action": "ROLLED_BACK",
"rolledBackAt": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
},
)
return {"success": True, "message": f"Binary file restored to: {target_path}"}
return {"success": False, "error": "Failed to restore binary backup"}
elif action in ("RESTORED", "ROLLED_BACK"):
return {"success": False, "error": f"This record has already been {action.lower()}"}
return {"success": False, "error": f"Unknown action: {action}"}
# ------------------------------------------------------------------
# 到期清理管理
# ------------------------------------------------------------------
def query_due_for_cleanup(self) -> List[Dict[str, Any]]:
"""
扫描整体记录表,从早到晚返回 expireAt <= 当前时间 的记录列表。
已恢复/回滚的记录不包含在内。
"""
return self.log_manager.get_due_for_cleanup()
def extend_record_expiry(self, record_id: str) -> Dict[str, Any]:
"""将指定记录的过期时间延后一个留存周期(retentionDays 天)"""
record = self.log_manager.get_record_by_id(record_id)
if not record:
return {"success": False, "error": "Record not found"}
retention_days = record.get("retentionDays", RETENTION_NORMAL_DAYS)
additional_ms = retention_days * 24 * 60 * 60 * 1000
ok = self.log_manager.extend_record_expiry(record_id, additional_ms)
if ok:
updated = self.log_manager.get_record_by_id(record_id)
return {
"success": True,
"recordId": record_id,
"newExpireAt": updated.get("expireAt"),
"newExpireAtDatetime": updated.get("expireAtDatetime"),
"message": f"Expiry extended by {retention_days} days",
}
return {"success": False, "error": "Failed to extend expiry"}
def delete_due_records(self, record_ids: List[str]) -> Dict[str, Any]:
"""批量删除用户已确认可清理的到期记录(日志 + 物理文件)"""
deleted_records = []
failed_records = []
deleted_files: List[str] = []
for record_id in record_ids:
result = self.delete_record_by_id(record_id, delete_files=True)
if result.get("success"):
deleted_records.append(record_id)
deleted_files.extend(result.get("deleted_files", []))
else:
failed_records.append({"recordId": record_id, "error": result.get("error")})
return {
"success": True,
"deleted_count": len(deleted_records),
"deleted_records": deleted_records,
"failed_records": failed_records,
"deleted_files": deleted_files,
}
# ------------------------------------------------------------------
# 查询 / 历史
# ------------------------------------------------------------------
def get_history(self, file_path: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]:
if file_path:
abs_path = os.path.abspath(file_path)
relative_path = self._get_relative_path(abs_path)
history = self.log_manager.get_history_by_file(relative_path)
else:
history = self.log_manager.get_all_history()
return history[:limit]
def get_file_diff(self, record_id: str) -> Optional[str]:
record = self.log_manager.get_record_by_id(record_id)
if not record or record.get("action") != "MODIFY":
return None
diff_file = record.get("diffFile")
if not diff_file or not os.path.exists(diff_file):
return None
return self.file_manager.get_file_content(diff_file)
def query_expired(self, days: int) -> List[Dict[str, Any]]:
"""兼容旧接口:按固定天数查过期记录"""
return self.log_manager.get_expired_records(days)
def clean_expired(self, days: int, delete_files: bool = False) -> Dict[str, Any]:
"""兼容旧接口:清理过期记录"""
expired = self.log_manager.get_expired_records(days)
deleted_files = []
if delete_files:
for record in expired:
diff_file = record.get("diffFile")
if diff_file and os.path.exists(diff_file):
os.remove(diff_file)
deleted_files.append(diff_file)
result = self.log_manager.clean_expired_records(days)
result["deleted_files"] = deleted_files
return result
def list_trash(self) -> List[Dict[str, Any]]:
"""列出 trash 中尚未恢复的已删除文件"""
records = self.log_manager.get_all_history()
return [
{
"recordId": r.get("recordId"),
"filePath": r.get("filePath"),
"datetime": r.get("datetime"),
"trashFile": r.get("trashFile"),
"isImportant": r.get("isImportant", False),
"retentionDays": r.get("retentionDays", RETENTION_NORMAL_DAYS),
"expireAtDatetime": r.get("expireAtDatetime"),
}
for r in records
if r.get("action") == "DELETE"
]
def delete_record_by_id(self, record_id: str, delete_files: bool = True) -> Dict[str, Any]:
"""删除指定记录(日志 + 物理文件)"""
record = self.log_manager.get_record_by_id(record_id)
if not record:
return {"success": False, "error": "Record not found"}
deleted_files = []
if delete_files:
# 删除 diff 文件
diff_file = record.get("diffFile")
if diff_file and os.path.exists(diff_file):
os.remove(diff_file)
deleted_files.append(diff_file)
# 删除 trash 备份
trash_file = record.get("trashFile")
if trash_file and os.path.exists(trash_file):
os.remove(trash_file)
deleted_files.append(trash_file)
# 删除快照文件
snap_file = record.get("snapshotFile")
if snap_file and os.path.exists(snap_file):
os.remove(snap_file)
deleted_files.append(snap_file)
# 删除二进制备份文件
backup_file = record.get("backupFile")
if backup_file and os.path.exists(backup_file):
os.remove(backup_file)
deleted_files.append(backup_file)
# 仅当该文件没有其他 MODIFY 记录时才删除 base 文件
file_path = record.get("filePath")
if file_path and record.get("action") == "MODIFY":
remaining = [
r
for r in self.log_manager.get_history_by_file(file_path)
if r.get("action") == "MODIFY" and r.get("recordId") != record_id
]
if not remaining:
self.file_manager.delete_base(file_path)
self.log_manager.delete_record(record_id)
return {
"success": True,
"deleted_record": record_id,
"deleted_files": deleted_files,
"message": f"Record {record_id} deleted",
}
def merge_modify(self, file_path: str, keep_record_id: Optional[str] = None) -> Dict[str, Any]:
"""合并同一文件的多次修改记录,默认保留最新一条"""
abs_path = os.path.abspath(file_path)
relative_path = self._get_relative_path(abs_path)
records = self.log_manager.get_history_by_file(relative_path, descending=True)
modify_records = [r for r in records if r.get("action") == "MODIFY"]
if len(modify_records) <= 1:
return {"success": False, "error": "No need to merge (only one or zero record)"}
if keep_record_id:
keep_record = next((r for r in modify_records if r.get("recordId") == keep_record_id), None)
if not keep_record:
return {"success": False, "error": "Record not found"}
else:
keep_record = modify_records[0]
deleted_count = 0
deleted_files = []
for r in modify_records:
if r.get("recordId") == keep_record.get("recordId"):
continue
if self.log_manager.delete_record(r.get("recordId")):
deleted_count += 1
for fkey in ("diffFile", "snapshotFile"):
f = r.get(fkey)
if f and os.path.exists(f):
os.remove(f)
deleted_files.append(f)
return {
"success": True,
"kept_record": keep_record.get("recordId"),
"deleted_count": deleted_count,
"deleted_files": deleted_files,
"message": f"Merged {deleted_count} records, kept latest: {keep_record.get('recordId')}",
}
# -------------------------------------------------------------------
# CLI 输出辅助
# -------------------------------------------------------------------
def print_records(records: list, show_diff: bool = False, vcs: Optional[MiniVCS] = None):
if not records:
print("No records found.")
return
for i, record in enumerate(records):
important_tag = " [IMPORTANT]" if record.get("isImportant") else ""
action = record.get("action", "")
rollback_tag = " [rollback available]" if action == "MODIFY" and record.get("snapshotFile") else ""
print(f"\n[{i + 1}] Record ID: {record.get('recordId')}")
print(f" File : {record.get('filePath')}{important_tag}")
print(f" Action : {action}{rollback_tag}")
print(f" Time : {record.get('datetime')}")
print(f" Summary : {record.get('summary')}")
print(f" Expires : {record.get('expireAtDatetime', 'N/A')} (retention {record.get('retentionDays', '?')}d)")
if show_diff and vcs and action == "MODIFY":
diff_content = vcs.get_file_diff(record.get("recordId"))
if diff_content:
print("\n Diff Content:")
print(" " + "-" * 40)
for line in diff_content.splitlines()[:30]:
print(f" {line}")
def print_due_for_cleanup(due: list):
if not due:
return
print(f"\n{'=' * 50}")
print(f" [CLEANUP NOTICE] {len(due)} record(s) have expired and are pending cleanup:")
for i, r in enumerate(due):
important_tag = " [IMPORTANT]" if r.get("isImportant") else ""
print(f" [{i + 1}] ID={r.get('recordId')} File={r.get('filePath')}{important_tag}")
print(f" Action={r.get('action')} Created={r.get('datetime')} Expired={r.get('expireAtDatetime')}")
print(" Use 'cleanup --confirm' to delete, or 'extend <record_id>' to postpone.")
print(f"{'=' * 50}")
# -------------------------------------------------------------------
# CLI 入口
# -------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="MiniVCS - 轻量级文件版本管理系统")
parser.add_argument(
"command",
choices=["record", "delete", "history", "restore", "expired", "trash", "remove", "cleanup", "extend"],
help="命令",
)
parser.add_argument("file", nargs="?", help="文件路径 或 Record ID")
parser.add_argument("-n", "--limit", type=int, default=10, help="记录数量限制")
parser.add_argument("-d", "--show-diff", action="store_true", help="显示差异内容")
parser.add_argument("-c", "--clean", action="store_true", help="清理过期记录(配合 expired 命令)")
parser.add_argument("--delete-files", action="store_true", help="同时删除物理文件")
parser.add_argument("--confirm", action="store_true", help="确认执行清理(配合 cleanup 命令)")
parser.add_argument("--project-root", default=os.getcwd(), help="项目根目录(用于计算相对路径)")
parser.add_argument("--vcs-root", default=None, help="VCS 数据存储目录(默认 ~/.openclaw/minivcs)")
args = parser.parse_args()
try:
vcs = MiniVCS(args.project_root, vcs_root=args.vcs_root)
except Exception as e:
print(f"Error: Failed to initialize MiniVCS: {e}")
return 1
if args.command == "record":
if not args.file:
print("Error: Please specify a file to record")
return 1
if not os.path.exists(args.file):
print(f"Error: File not found: {args.file}")
return 1
result = vcs.record_modify(args.file)
if result.get("success"):
if result.get("skipped"):
print("~ No changes detected, skipped.")
elif result.get("action") == "BINARY_BACKUP" or result.get("backupFile"):
print("✓ Binary file backed up! (.bak copy saved)")
print(f" Record ID : {result.get('recordId')}")
print(f" Backup : {result.get('backupFile')}")
print(f" Summary : {result.get('summary')}")
important_tag = " [IMPORTANT]" if result.get("isImportant") else ""
print(f" Retention : {result.get('retentionDays')} days{important_tag}")
print(f" Rollback : available (use restore {result.get('recordId')})")
else:
print("✓ File change recorded!")
print(f" Record ID : {result.get('recordId')}")
print(f" Summary : {result.get('summary')}")
important_tag = " [IMPORTANT]" if result.get("isImportant") else ""
print(f" Retention : {result.get('retentionDays')} days{important_tag}")
if result.get("canRollback"):
print(f" Rollback : available (use restore {result.get('recordId')})")
print_due_for_cleanup(result.get("due_for_cleanup", []))
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "delete":
if not args.file:
print("Error: Please specify a file to delete")
return 1
if not os.path.exists(args.file):
print(f"Error: File not found: {args.file}")
return 1
result = vcs.record_delete(args.file)
if result.get("success"):
print("✓ File deleted and backed up!")
print(f" Record ID : {result.get('recordId')}")
print(f" Backup : {result.get('trashFile')}")
important_tag = " [IMPORTANT]" if result.get("isImportant") else ""
print(f" Retention : {result.get('retentionDays')} days{important_tag}")
print_due_for_cleanup(result.get("due_for_cleanup", []))
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "history":
records = vcs.get_history(args.file, args.limit)
if args.file:
print(f"\n=== History for: {args.file} ===")
else:
print(f"\n=== All History (latest {args.limit} records) ===")
print_records(records, args.show_diff, vcs)
elif args.command == "restore":
if not args.file:
print("Error: Please specify a record ID to restore/rollback")
return 1
result = vcs.restore_file(args.file)
if result.get("success"):
print(f"✓ {result.get('message')}")
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "expired":
if not args.file:
print("Error: Please specify number of days")
return 1
try:
days = int(args.file)
except ValueError:
print("Error: Days must be a number")
return 1
if args.clean:
result = vcs.clean_expired(days, args.delete_files)
print(f"\n=== Cleaned Expired Records (>{days} days) ===")
print(f"Records deleted: {result.get('deleted_count')}")
if args.delete_files and result.get("deleted_files"):
print(f"Diff files deleted: {len(result.get('deleted_files', []))}")
else:
records = vcs.query_expired(days)
print(f"\n=== Expired Records (>{days} days) ===")
print(f"Total: {len(records)} records")
print_records(records)
elif args.command == "trash":
files = vcs.list_trash()
print("\n=== Trash Bin ===")
if not files:
print("Trash is empty.")
return 0
for i, f in enumerate(files):
important_tag = " [IMPORTANT]" if f.get("isImportant") else ""
print(f"\n[{i + 1}] {f.get('filePath')}{important_tag}")
print(f" Deleted at : {f.get('datetime')}")
print(f" Expires at : {f.get('expireAtDatetime', 'N/A')}")
print(f" Record ID : {f.get('recordId')}")
elif args.command == "remove":
if not args.file:
print("Error: Please specify a record ID to remove")
print(" Use 'history' command to see record IDs")
return 1
result = vcs.delete_record_by_id(args.file)
if result.get("success"):
print("✓ Record removed successfully!")
print(f" {result.get('message')}")
else:
print(f"✗ Failed: {result.get('error')}")
return 1
elif args.command == "cleanup":
due = vcs.query_due_for_cleanup()
if not due:
print("✓ No records due for cleanup.")
return 0
print(f"\n=== Records Due for Cleanup ({len(due)} total, oldest first) ===")
print_records(due, vcs=vcs)
if args.confirm:
record_ids = [r.get("recordId") for r in due]
result = vcs.delete_due_records(record_ids)
print(f"\n✓ Cleanup complete: {result.get('deleted_count')} records deleted.")
if result.get("failed_records"):
print(f" Failed: {result.get('failed_records')}")
else:
print("\n To delete these records, run with --confirm")
print(" To postpone a record, run: extend <record_id>")
elif args.command == "extend":
if not args.file:
print("Error: Please specify a record ID to extend")
return 1
result = vcs.extend_record_expiry(args.file)
if result.get("success"):
print("✓ Expiry extended!")
print(f" Record ID : {result.get('recordId')}")
print(f" New expire at : {result.get('newExpireAtDatetime')}")
print(f" {result.get('message')}")
else:
print(f"✗ Failed: {result.get('error')}")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())