@clawhub-aohoyo-680d642113
技能安全审查工具。深度审查已安装或远程技能的代码安全性、权限风险、数据泄露和内容合规性。支持预装审查(远程拉取)和已装审查两种模式。当用户说"审查技能"、"检查安全"、"审核技能"、"skill audit"时使用。
---
name: silas_skill_auditor
description: 技能安全审查工具。深度审查已安装或远程技能的代码安全性、权限风险、数据泄露和内容合规性。支持预装审查(远程拉取)和已装审查两种模式。当用户说"审查技能"、"检查安全"、"审核技能"、"skill audit"时使用。
metadata:
{
"clawdbot":
{
"emoji": "🔍",
"tags": ["security", "audit", "skill-review", "safety"],
"category": "Security",
},
}
---
# 🔍 Skill Auditor — 技能安全审查器
专业级技能安全审查工具,帮助你在安装前或安装后全面评估技能的安全性和合规性。
## 触发条件
- 用户说"审查技能"、"检查安全"、"审核技能"、"skill audit"
- 安装新技能前需要安全评估
- 怀疑已安装技能有安全风险
## 审查模式
### 模式 A:预装审查(推荐)
在安装前审查远程技能,防止引入风险。
```bash
# 拉取远程技能到临时目录(不安装)
clawdhub install <技能名> --dir /tmp/skill-audit-tmp
# 审查完成后删除
rm -rf /tmp/skill-audit-tmp
```
### 模式 B:已装审查
审查已安装的技能。
```bash
# 工作空间技能
ls ~/.openclaw/workspace/skills/<技能名>/
# 全局技能
ls ~/.openclaw/skills/<技能名>/
```
## 审查流程
### Step 1:文件清单
扫描技能目录,列出所有文件及类型:
```bash
find <技能目录> -type f | head -50
```
关注重点:
- `.py` / `.js` / `.sh` — 可执行代码
- `SKILL.md` — 指令文件(可能含 prompt 注入)
- `_meta.json` / `package.json` — 元数据
- `.env` / `.config` — 配置文件
- 隐藏文件(`.` 开头)— 可能藏有恶意内容
### Step 2:深度安全扫描
逐项检查以下风险类别:
#### 🔴 致命风险(Critical)
| # | 检查项 | 检查方法 |
|---|--------|----------|
| C1 | 硬编码密钥/Token | 搜索 `API_KEY`, `SECRET`, `TOKEN`, `PASSWORD`, `PRIVATE_KEY` 等关键词 |
| C2 | 数据外发 | 搜索 `curl`, `wget`, `fetch(`, `http.post`, `requests.post`, `WebSocket` |
| C3 | 远程代码执行 | 搜索 `eval(`, `exec(`, `Function(`, `child_process`, `os.system`, `subprocess` |
| C4 | Prompt 注入 | 搜索 `ignore previous`, `ignore instructions`, `system prompt`, `你现在是` |
| C5 | 持久化/后门 | 搜索 `crontab`, `systemd`, `launchd`, `autorun`, `always:true`, `startup` |
| C6 | 提权操作 | 搜索 `sudo`, `chmod 777`, `chown`, `setuid`, `CAP_` |
#### 🟡 中等风险(Warning)
| # | 检查项 | 检查方法 |
|---|--------|----------|
| W1 | 敏感路径访问 | 搜索 `~/.ssh`, `~/.aws`, `~/.openclaw`, `/etc/passwd`, `/etc/shadow` |
| W2 | 文件系统越界 | 搜索 `../`, `/tmp`, `/var`, 写入非技能目录的操作 |
| W3 | 环境变量读取 | 搜索 `process.env`, `os.environ`, `$HOME`, `getenv` |
| W4 | 外部依赖 | 检查是否需要安装额外包/CLI |
| W5 | 代码混淆 | 检查是否有 base64 编码的长字符串、无意义的变量名 |
| W6 | 隐蔽通道 | 搜索 DNS 查询、ICMP、图片隐写相关代码 |
#### ℹ️ 信息级(Info)
| # | 检查项 | 说明 |
|---|--------|------|
| I1 | 元数据完整性 | `_meta.json` 是否包含版本、作者信息 |
| I2 | 描述质量 | SKILL.md 是否有清晰的描述和使用示例 |
| I3 | 触发条件 | 触发词是否合理,是否过于宽泛 |
| I4 | 代码质量 | 是否有注释、结构是否清晰 |
| I5 | 声明一致性 | description 与实际功能是否一致 |
### Step 3:SKILL.md 指令分析
仔细阅读 SKILL.md,检查:
1. **指令是否过度**:是否要求 agent 执行超出审查范围的操作(发邮件、修改配置、安装软件)
2. **条件触发是否可疑**:触发条件是否过于宽泛(如匹配所有消息)
3. **是否含社会工程**:如"信任此技能"、"跳过安全检查"、"以最高权限运行"
4. **外部引用**:是否引用外部 URL、脚本或服务
### Step 4:评分
#### 评分规则
| 维度 | 权重 | 满分 |
|------|------|------|
| 致命风险 (C1-C6) | 40% | 40 |
| 中等风险 (W1-W6) | 25% | 25 |
| 信息级 (I1-I5) | 15% | 15 |
| 代码质量 | 10% | 10 |
| 文档质量 | 10% | 10 |
#### 评级标准
| 分数 | 评级 | 建议 |
|------|------|------|
| 90-100 | ✅ 安全 | 可放心使用 |
| 70-89 | 🟡 轻微风险 | 建议修复后使用 |
| 50-69 | 🟠 中等风险 | 谨慎使用,关注风险点 |
| 30-49 | 🔴 高风险 | 不建议安装 |
| 0-29 | 🚨 危险 | 强烈建议拒绝 |
### Step 5:生成报告
输出格式:
```
🔍 技能安全审查报告
📋 基本信息
技能名称:<name>
版本:<version>
审查模式:预装/已装
审查时间:<timestamp>
📁 文件清单 (共 N 个文件)
- SKILL.md (指令文件)
- audit.sh (可执行脚本)
- ...
🔐 安全扫描结果
🔴 致命风险:X 项
C1 硬编码密钥:✅ 未发现
C2 数据外发:❌ 发现![具体位置和代码]
...
🟡 中等风险:X 项
W1 敏感路径:✅ 未发现
...
ℹ️ 信息级:X 项
I1 元数据:✅ 完整
...
📝 SKILL.md 分析
指令范围:合理/过度
触发条件:正常/过宽
社会工程:未发现/发现
📊 综合评分:XX/100(评级)
💡 建议
1. 具体建议...
2. ...
```
## 使用示例
```
用户:审查 yoder-skill-auditor
→ 拉取到临时目录 → 全面扫描 → 输出报告
用户:帮我检查刚装的 claw1-skill-auditor
→ 扫描已装目录 → 全面扫描 → 输出报告
用户:审查 sam-skill-auditor 并评分
→ 扫描 → 输出带评分的完整报告
```
## 注意事项
1. 预装审查使用临时目录,审查完立即删除
2. 不要将审查中发现的密钥/Token 发送给外部服务
3. 评分仅供参考,最终决策权在用户
4. 对于代码混淆的技能,标记为高风险并建议人工审查
5. 审查远程技能时注意网络请求可能触发真实的外部调用
FILE:_meta.json
{
"ownerId": "user",
"slug": "sam-skill-auditor",
"version": "2.0.0",
"publishedAt": 1772445600000
}微信公众号文章搜索与解析。搜狗微信+新榜双源搜索,Python脚本解析全文(零Node依赖),Serper转载兜底。
---
name: silas_wechat_article_search
description: 微信公众号文章搜索与解析。搜狗微信+新榜双源搜索,Python脚本解析全文(零Node依赖),Serper转载兜底。
metadata:
openclaw:
os: linux
---
# 微信公众号文章搜索与解析 v2.0
搜索微信公众号文章 → 解析全文 → 评分 → 入库知识库。
## 搜索源(按优先级)
### 源1:搜狗微信(主力,零依赖)
```bash
curl -s "https://weixin.sogou.com/weixin?type=2&query=关键词" \
-H "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
```
用 Python 正则提取标题和链接,返回搜狗中间链接列表。
### 源2:新榜(补充)
```bash
curl -s "https://google.serper.dev/search" \
-H "X-API-KEY: $SERPER_API_KEY" \
-d '{"q":"site:newrank.cn 关键词","num":10}'
```
### 源3:Serper 转载兜底
当微信原文无法解析时,搜索转载版:
```bash
curl -s "https://google.serper.dev/search" \
-H "X-API-KEY: $SERPER_API_KEY" \
-d '{"q":"文章标题","num":5}'
```
优先选新浪/搜狐/网易等全文转载。
## 内容解析(Python 脚本)
**核心脚本**:`scripts/parse_article.py`
### 依赖
```bash
pip3 install requests beautifulsoup4
```
### 用法
```bash
python3 scripts/parse_article.py "https://mp.weixin.qq.com/s/xxxxx"
python3 scripts/parse_article.py "URL" --save
python3 scripts/parse_article.py "URL" --save --output article.json
```
### 输出 JSON
```json
{
"title": "文章标题",
"author": "公众号名称",
"publish_time": "2026-04-18",
"content": "正文全文...",
"word_count": 5594,
"images_count": 21,
"images": ["url1", "url2", ...],
"url": "原始链接",
"parsed_at": "2026-04-18 22:00:00"
}
```
### 原理
- **iPhone UA** 绕过微信验证(关键!)
- **BeautifulSoup** 解析 `rich_media_content` 提取正文
- **段落结构保留**,过滤 <10 字的噪声
- **图片提取**:`data-src` 属性(微信防盗链图)
### 微信原文解析失败时
1. Serper 搜转载版(新浪/搜狐/网易)
2. web_fetch 抓转载全文
3. browser-search 兜底
## 入库流程
### 1. 搜索
```
选词(从 web-keywords.json)→ 搜狗搜索 → 新榜补充 → 合并去重
```
### 2. 解析
```
对每篇目标文章:
a. 先用 Python 脚本解析微信原文
b. 失败 → Serper 搜转载 → web_fetch 抓全文
c. 提取:标题、作者、正文、图片
```
### 3. 评分
```
5 维度评分(同 web-search 技能标准):
- 数据密度 30%、实操性 25%、时效性 20%、相关性 15%、来源权威 10%
- < 5.0 不入库
- ≥ 8.5 通知管理员
```
### 4. 去重
```
a. feishu_search_doc_wiki 搜索知识库标题
b. memory/collect-log.json 查历史
c. URL + 标题去重
```
### 5. 入库
```
a. 读 wiki-directory-manager 技能匹配目录
b. feishu_create_doc 创建文档(标题不带评分)
c. 正文格式:
> 来源:URL
> 发布日期:YYYY-MM-DD
> 采集日期:YYYY-MM-DD
> 评分:X.X 🟢/🟡/🟠/🔴
正文内容...
d. 有信息量图片 → 保存本地 → feishu_doc_media insert
e. 写多维表格索引(目录必须和实际一致)
```
## 图片处理
- **保存标准**:数据图表/流程图/对比截图/产品截图
- **跳过**:装饰图/广告/头像/logo
- 每篇最多 10 张,单张 <20MB
- 保存路径:`/tmp/openclaw/images/`
## 踩坑指南
- **微信验证码**:iPhone UA 可绕过,Desktop UA 会被拦截
- **搜狗中间链接**:不能直接 web_fetch,需先解析跳转或用 Python 脚本
- **微信图片防盗链**:`data-src` 是真实地址,但直接访问需要 Referer
- **频繁请求被封**:搜狗搜索间隔 3-5 秒
- **转载版内容可能有删减**:优先用微信原文,转载版做兜底
- **不要用 Node.js**:Python 脚本足够,避免 cheerio 依赖
- **save_to_feishu.py 不用**:飞书写入用 agent 原生工具(feishu_create_doc)
## 频率
Cron:每小时第22分钟(ID: de3ee2d3)
每次 1 个关键词,搜索 → 解析 → 评分 → 入库
FILE:references/example-output.json
{
"query": "AI 一人公司 创业",
"total": 5,
"articles": [
{
"title": "AI时代一人公司怎么赚钱?",
"url": "https://weixin.sogou.com/link?url=xxx",
"summary": "随着AI工具的普及,越来越多的人开始尝试一人公司模式...",
"datetime": "2026-04-15 10:30:00",
"date_text": "2026年04月15日",
"date_description": "3天前",
"source": "AI创业笔记"
}
]
}
FILE:scripts/parse_article.py
#!/usr/bin/env python3
"""
微信公众号文章解析器
解析 mp.weixin.qq.com 文章,提取标题、作者、正文、图片。
核心:iPhone UA 绕过微信验证 + BeautifulSoup 解析。
"""
import requests
from bs4 import BeautifulSoup
import re
import json
import sys
from datetime import datetime
def parse_article(url):
"""解析微信公众号文章"""
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) '
'AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 '
'Mobile/15E148 Safari/604.1',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
}
try:
resp = requests.get(url, headers=headers, timeout=30)
resp.encoding = 'utf-8'
soup = BeautifulSoup(resp.text, 'html.parser')
# 标题
title_tag = soup.find('h1', class_='rich_media_title')
title = title_tag.get_text(strip=True) if title_tag else None
if not title:
m = re.search(r'var\s+msg_title\s*=\s*["\']([^"\']+)["\']', resp.text)
title = m.group(1) if m else "未知标题"
# 作者/公众号
author_tag = soup.find('a', class_='rich_media_meta_link')
if not author_tag:
author_tag = soup.find('span', class_='rich_media_meta_nickname')
author = author_tag.get_text(strip=True) if author_tag else "未知"
# 发布时间
time_tag = soup.find('span', id='publish_time')
if time_tag:
publish_time = time_tag.get_text(strip=True)
else:
m = re.search(r'var\s+publish_time\s*=\s*"([^"]+)"', resp.text)
publish_time = m.group(1) if m else None
# 正文
content_div = soup.find('div', class_='rich_media_content')
content = ""
images = []
if content_div:
for tag in content_div(['script', 'style']):
tag.decompose()
# 提取图片
for img in content_div.find_all('img'):
img_url = img.get('data-src') or img.get('src')
if img_url and not img_url.startswith('data:'):
images.append(img_url)
# 提取段落(去重,保留结构)
seen = set()
paragraphs = []
for p in content_div.find_all(['p', 'section']):
text = p.get_text(strip=True)
if text and len(text) > 10 and text not in seen:
seen.add(text)
paragraphs.append(text)
content = '\n\n'.join(paragraphs)
return {
'title': title,
'author': author,
'publish_time': publish_time,
'content': content,
'word_count': len(content),
'images_count': len(images),
'images': images[:20],
'url': url,
'parsed_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'status': 'ok'
}
except Exception as e:
return {
'error': str(e),
'url': url,
'status': 'error'
}
def print_result(article):
if article.get('status') == 'error':
print(f"❌ 解析失败: {article['error']}")
return
print(f"📰 标题: {article['title']}")
print(f"✍️ 作者: {article['author']}")
print(f"🕐 时间: {article.get('publish_time', '未知')}")
print(f"📊 字数: {article['word_count']}")
print(f"🖼️ 图片: {article['images_count']}")
print("─" * 60)
text = article['content']
print(text[:2000] + ("..." if len(text) > 2000 else ""))
def main():
if len(sys.argv) < 2:
print("用法: python3 parse_article.py <URL> [--save] [--output file.json]")
sys.exit(1)
url = sys.argv[1]
article = parse_article(url)
print_result(article)
if '--save' in sys.argv:
out = 'output.json'
if '--output' in sys.argv:
idx = sys.argv.index('--output')
if idx + 1 < len(sys.argv):
out = sys.argv[idx + 1]
with open(out, 'w', encoding='utf-8') as f:
json.dump(article, f, ensure_ascii=False, indent=2)
print(f"\n✅ 已保存到 {out}")
if __name__ == '__main__':
main()
Clash/Mihomo 代理管理。安装配置、查看状态、切换模式/节点、测速、外网失败时自动代理。
---
name: silas_clash
description: Clash/Mihomo 代理管理。安装配置、查看状态、切换模式/节点、测速、外网失败时自动代理。
metadata:
openclaw:
os: linux
---
# Silas-Clash 代理管理
通过 Clash RESTful API 管理本地代理(Mihomo/Clash Meta)。
## 首次安装配置
### 1. 安装 Mihomo(Clash Meta 内核)
```bash
# 检查架构
uname -m
# 下载对应版本(以 amd64 为例)
curl -Lo /tmp/mihomo.gz https://github.com/MetaCubeX/mihomo/releases/download/v1.19.21/mihomo-linux-amd64-v1.19.21.gz
gunzip /tmp/mihomo.gz
chmod +x /tmp/mihomo
sudo mv /tmp/mihomo /usr/local/bin/mihomo
```
### 2. 获取配置
**询问用户订阅地址**,然后下载配置:
```bash
# 方式一:订阅链接直接下载
curl -Lo ~/.config/mihomo/config.yaml "用户提供的订阅地址"
# 方式二:Clash 仪表盘配置(推荐)
# 访问 https://yacd.haishan.me 或 https://metacubexd.github.io/metacubexd
# 在仪表盘中粘贴订阅地址,自动生成 config.yaml
```
### 3. 确保配置开启 API
检查 `~/.config/mihomo/config.yaml` 中有:
```yaml
external-controller: 127.0.0.1:9090
# secret: "你的密码" # 可选,建议空
```
### 4. 启动
```bash
mihomo -d ~/.config/mihomo
```
### 5. 验证
```bash
curl -s http://127.0.0.1:9090/version
# 应返回 {"meta":true,"version":"v1.x.x"}
```
### 6. 写入本地配置
将连接信息保存到 `memory/clash-config.json`:
```json
{
"api_url": "http://127.0.0.1:9090",
"api_secret": "",
"proxy_http": "http://127.0.0.1:7890",
"proxy_socks5": "socks5://127.0.0.1:7891",
"proxy_all": "socks5://127.0.0.1:7891",
"install_path": "/usr/local/bin/mihomo",
"config_path": "~/.config/mihomo/config.yaml"
}
```
> ⚠️ 代理端口(7890/7891)以实际 config.yaml 中的 mixed-port / port 为准。
## 常用操作
### 查看状态
```bash
curl -s http://127.0.0.1:9090/version
curl -s http://127.0.0.1:9090/configs | python3 -c "import json,sys;d=json.load(sys.stdin);print(f\"模式: {d.get('mode')}\")"
```
### 查看当前节点
```bash
curl -s http://127.0.0.1:9090/proxies/%F0%9F%9A%80%20%E8%8A%82%E7%82%B9%E9%80%89%E6%8B%A9 | python3 -c "
import json,sys
d=json.load(sys.stdin)
print(f\"当前节点: {d.get('now','?')}\")
"
```
### 切换模式
```bash
# rule(规则分流,推荐日常)
curl -X PATCH http://127.0.0.1:9090/configs -H "Content-Type: application/json" -d '{"mode":"rule"}'
# global(全局代理)
curl -X PATCH http://127.0.0.1:9090/configs -H "Content-Type: application/json" -d '{"mode":"global"}'
# direct(直连)
curl -X PATCH http://127.0.0.1:9090/configs -H "Content-Type: application/json" -d '{"mode":"direct"}'
```
### 切换节点
```bash
# 先列出节点名
curl -s http://127.0.0.1:9090/proxies/%F0%9F%9A%80%20%E8%8A%82%E7%82%B9%E9%80%89%E6%8B%A9 | python3 -c "
import json,sys
d=json.load(sys.stdin)
for n in d.get('all',[]): print(n)
"
# 切换到指定节点
curl -X PUT "http://127.0.0.1:9090/proxies/%F0%9F%9A%80%20%E8%8A%82%E7%82%B9%E9%80%89%E6%8B%A9" \
-H "Content-Type: application/json" \
-d '{"name":"节点名称"}'
```
### 测速
```bash
# 测所有节点
curl -X GET "http://127.0.0.1:9090/group/%F0%9F%9A%80%20%E8%8A%82%E7%82%B9%E9%80%89%E6%8B%A9/delay?timeout=5000&url=https://www.google.com/generate_204"
# 测当前节点
curl -X GET "http://127.0.0.1:9090/proxies/%F0%9F%9A%80%20%E8%8A%82%E7%82%B9%E9%80%89%E6%8B%A9/delay?timeout=5000&url=https://www.google.com/generate_204"
```
## 外网访问代理策略
当 web_search / web_fetch / Serper / Tavily 等外网工具失败时:
1. **第一次重试**:设置环境变量走代理
```bash
export http_proxy=http://127.0.0.1:7890
export https_proxy=http://127.0.0.1:7890
export all_proxy=socks5://127.0.0.1:7891
```
2. **第二次重试**:如果还失败,切换 Clash 为 global 模式重试
3. **恢复**:成功后切回 rule 模式
## 订阅更新(手动操作)
⚠️ 订阅更新**不要通过技能自动执行**,手动操作更安全:
```bash
# 备份当前配置
cp ~/.config/mihomo/config.yaml ~/.config/mihomo/config.yaml.bak
# 下载最新订阅
curl -Lo ~/.config/mihomo/config.yaml "订阅地址"
# 重启
pkill mihomo && mihomo -d ~/.config/mihomo
```
## 踩坑指南
- **节点名称含特殊字符**:URL encode 处理,或先列出节点再选
- **API 无响应**:检查 mihomo 是否运行(`pgrep mihomo`),端口是否正确
- **测速超时**:timeout 设 5000ms,太短会误判
- **切节点后不生效**:部分应用有 DNS 缓存,等几秒或重启应用
- **订阅更新搞挂网络**:先备份再更新,挂了用 bak 恢复
- **rule 模式足够**:日常用 rule,只在需要全局代理时临时切 global
- **代理端口**:以 config.yaml 中 mixed-port / port 配置为准,不要硬编码
- **macOS 兼容**:安装路径改为 `/opt/homebrew/bin/mihomo`,其余操作相同
FILE:references/clash-config.example.json
{
"api_url": "http://127.0.0.1:9090",
"api_secret": "",
"proxy_http": "http://127.0.0.1:7890",
"proxy_socks5": "socks5://127.0.0.1:7891",
"proxy_all": "socks5://127.0.0.1:7891"
}
QQ 邮箱智能管理工具。支持收发邮件、搜索筛选、附件处理,以及 AI 智能整理功能(自动摘要、分类、优先级排序、待办提取)。当用户需要操作 QQ 邮箱、查收邮件、发送邮件、整理收件箱或处理邮件相关任务时使用此技能。
--- name: qq-email description: QQ 邮箱智能管理工具。支持收发邮件、搜索筛选、附件处理,以及 AI 智能整理功能(自动摘要、分类、优先级排序、待办提取)。当用户需要操作 QQ 邮箱、查收邮件、发送邮件、整理收件箱或处理邮件相关任务时使用此技能。 --- # QQ 邮箱智能助手 ## 功能概览 本技能提供完整的 QQ 邮箱管理能力,结合 AI 智能整理功能: | 功能 | 说明 | |------|------| | 📥 **收件管理** | 读取收件箱、未读邮件、指定文件夹 | | ✉️ **发送邮件** | 发送纯文本/HTML 邮件,支持附件 | | 🔍 **智能搜索** | 按关键词、发件人、时间、主题搜索 | | 🗂️ **邮件管理** | 标记已读/未读、删除、移动、归档 | | 📎 **附件处理** | 下载附件、发送附件 | | 🤖 **AI 整理** | 自动摘要、智能分类、优先级排序、待办提取 | ## 快速开始 ### 配置邮箱账号 首次使用需要配置 QQ 邮箱的 IMAP/SMTP 授权码: 1. 登录 QQ 邮箱网页版 2. 设置 → 账户 → 开启 IMAP/SMTP 服务 3. 获取 **授权码**(不是登录密码) 4. 配置环境变量或配置文件 ### 环境变量配置 ```bash export QQ_EMAIL="[email protected]" export QQ_EMAIL_AUTH_CODE="your_auth_code" # 16位授权码 ``` ## 核心功能 ### 1. 读取邮件 **读取最新邮件:** ```python # 读取收件箱最新 10 封邮件 python scripts/fetch_emails.py --limit 10 # 读取未读邮件 python scripts/fetch_emails.py --unread # 读取指定文件夹 python scripts/fetch_emails.py --folder "Sent Messages" --limit 5 ``` **输出格式:** ```json { "emails": [ { "id": "msg_001", "subject": "会议通知", "sender": "[email protected]", "date": "2024-03-19 14:30:00", "body_text": "邮件正文...", "body_html": "<html>...</html>", "attachments": ["report.pdf"], "flags": ["\\Seen"] } ] } ``` ### 2. 发送邮件 **发送纯文本邮件:** ```python python scripts/send_email.py \ --to "[email protected]" \ --subject "测试邮件" \ --body "这是一封测试邮件" ``` **发送带附件邮件:** ```python python scripts/send_email.py \ --to "[email protected]" \ --cc "[email protected]" \ --subject "报告附件" \ --body "请查收附件" \ --attachments "/path/to/report.pdf,/path/to/data.xlsx" ``` **发送 HTML 邮件:** ```python python scripts/send_email.py \ --to "[email protected]" \ --subject "HTML 邮件" \ --html "<h1>标题</h1><p>内容</p>" ``` ### 3. 搜索邮件 **基础搜索:** ```python # 按关键词搜索主题和内容 python scripts/search_emails.py --query "项目进度" # 按发件人搜索 python scripts/search_emails.py --from "[email protected]" # 按时间范围搜索 python scripts/search_emails.py --since "2024-03-01" --before "2024-03-20" # 组合条件 python scripts/search_emails.py \ --query "合同" \ --from "[email protected]" \ --unread ``` ### 4. 邮件管理 **标记和移动:** ```python # 标记已读 python scripts/manage_email.py --id "msg_001" --action mark_read # 标记未读 python scripts/manage_email.py --id "msg_001" --action mark_unread # 删除邮件 python scripts/manage_email.py --id "msg_001" --action delete # 移动到文件夹 python scripts/manage_email.py --id "msg_001" --action move --folder "Archive" # 批量操作(逗号分隔 ID) python scripts/manage_email.py --id "msg_001,msg_002,msg_003" --action mark_read ``` ### 5. 附件处理 **下载附件:** ```python # 下载邮件的所有附件 python scripts/download_attachments.py \ --email-id "msg_001" \ --output-dir "./downloads" # 下载指定附件 python scripts/download_attachments.py \ --email-id "msg_001" \ --filename "report.pdf" \ --output-path "./downloads/report.pdf" ``` ## AI 智能整理 ### 自动摘要 为每封邮件生成简洁摘要: ```python python scripts/ai_summarize.py --limit 20 ``` **输出示例:** ```json { "summary": { "total": 20, "processed": 20, "results": [ { "id": "msg_001", "subject": "Q1 季度总结会议通知", "summary": "3月25日下午2点召开Q1总结会议,需准备部门汇报PPT,地点:会议室A", "key_points": ["时间:3月25日 14:00", "需准备PPT", "地点:会议室A"] } ] } } ``` ### 智能分类 自动将邮件分类到不同类别: ```python # 分类最新邮件 python scripts/ai_classify.py --limit 50 # 分类指定邮件 python scripts/ai_classify.py --email-ids "msg_001,msg_002" ``` **分类类别:** - `work` - 工作相关 - `promotion` - 推广/营销 - `social` - 社交/通知 - `important` - 重要邮件 - `newsletter` - 订阅邮件 - `spam` - 垃圾邮件 **输出示例:** ```json { "classifications": [ { "id": "msg_001", "category": "work", "confidence": 0.95, "reason": "包含会议通知、工作安排等关键词" }, { "id": "msg_002", "category": "promotion", "confidence": 0.88, "reason": "来自电商平台,包含优惠信息" } ] } ``` ### 优先级排序 根据内容和发件人智能排序邮件优先级: ```python python scripts/ai_prioritize.py --limit 30 ``` **优先级等级:** - `urgent` - 紧急(需立即处理) - `high` - 高优先级(24小时内处理) - `medium` - 中优先级(本周处理) - `low` - 低优先级(可延后) ### 待办事项提取 从邮件中提取待办任务: ```python python scripts/ai_extract_todos.py --limit 50 ``` **输出示例:** ```json { "todos": [ { "email_id": "msg_001", "subject": "项目进度汇报", "todos": [ { "task": "完成项目进度报告", "deadline": "2024-03-22", "priority": "high" }, { "task": "准备周五汇报PPT", "deadline": "2024-03-24", "priority": "medium" } ] } ] } ``` ### 一键智能整理 执行完整的 AI 整理流程: ```python python scripts/ai_organize.py --limit 50 ``` 此命令会依次执行: 1. 读取最新邮件 2. 生成摘要 3. 智能分类 4. 优先级排序 5. 提取待办事项 6. 生成整理报告 ## 工作流示例 ### 场景 1:早晨快速处理邮件 ```python # 1. 获取未读邮件并生成摘要 python scripts/ai_summarize.py --unread # 2. 提取待办事项 python scripts/ai_extract_todos.py --unread # 3. 标记已处理的不重要邮件 python scripts/manage_email.py --id "msg_003,msg_004" --action mark_read ``` ### 场景 2:周末清理收件箱 ```python # 1. 智能分类所有邮件 python scripts/ai_classify.py --limit 100 # 2. 批量删除推广邮件(根据分类结果) python scripts/manage_email.py --id "msg_005,msg_006" --action delete # 3. 归档已处理的旧邮件 python scripts/manage_email.py --id "msg_007,msg_008" --action move --folder "Archive" ``` ### 场景 3:查找重要邮件 ```python # 1. 搜索老板发来的未读邮件 python scripts/search_emails.py --from "[email protected]" --unread # 2. 对搜索结果进行优先级排序 python scripts/ai_prioritize.py --email-ids "msg_010,msg_011" ``` ## 文件夹说明 QQ 邮箱标准文件夹名称: | 文件夹 | 说明 | |--------|------| | `INBOX` | 收件箱 | | `Sent Messages` | 已发送 | | `Drafts` | 草稿箱 | | `Deleted Messages` | 已删除 | | `Junk` | 垃圾箱 | | `Archive` | 归档(需手动创建)| ## 常见问题 **Q: 连接失败怎么办?** - 确认已开启 QQ 邮箱 IMAP/SMTP 服务 - 检查使用的是授权码而非登录密码 - 确认网络可以访问 imap.qq.com:993 和 smtp.qq.com:465 **Q: 中文显示乱码?** - 脚本已自动处理 UTF-8 编码,如遇乱码请检查终端编码设置 **Q: 附件大小限制?** - QQ 邮箱普通附件最大 50MB - 超大附件最大 3GB(通过中转站) ## 参考资料 - [QQ 邮箱 IMAP/SMTP 设置指南](references/qq_email_setup.md) - [AI 整理算法说明](references/ai_organization.md) FILE:references/ai_organization.md # AI 智能整理算法说明 ## 概述 本技能提供完整的 AI 邮件智能整理能力,包含四个核心功能模块: 1. **AI 摘要生成** - 自动生成邮件内容摘要 2. **智能分类** - 自动将邮件分类到预定义类别 3. **优先级排序** - 评估邮件处理优先级 4. **待办事项提取** - 从邮件中提取可执行任务 ## 算法架构 ### 1. AI 摘要生成 (ai_summarize.py) **技术方案:** - 主要使用通义千问 Qwen-Plus 大模型 - 降级方案:基于规则的简单摘要 **提示词设计:** ``` 请为以下邮件生成简洁摘要: 主题:{subject} 内容: {body} 要求: 1. 用一句话概括邮件核心内容(50 字以内) 2. 提取 3-5 个关键信息点(时间、地点、任务等) 3. 如果有明确的行动要求,请标注 输出 JSON 格式... ``` **输出格式:** ```json { "one_sentence": "一句话摘要", "key_points": ["关键点 1", "关键点 2", ...], "action_required": "需要做什么" 或 null } ``` ### 2. 智能分类 (ai_classify.py) **分类体系:** - `work` - 工作相关(会议、项目、汇报、同事沟通) - `important` - 重要邮件(老板、客户、紧急事项) - `promotion` - 推广营销(广告、优惠、电商) - `social` - 社交通知(朋友圈、社交软件、活动邀请) - `newsletter` - 订阅邮件(资讯、周报、公众号) - `finance` - 财务金融(银行、账单、发票、报销) - `travel` - 出行旅游(机票、酒店、订单确认) - `spam` - 垃圾邮件(可疑、诈骗、无关内容) **双层分类策略:** 1. **规则层**:基于关键词快速匹配(高性能) 2. **AI 层**:大模型深度理解(高准确率) **规则关键词示例:** - 工作类:["会议", "项目", "汇报", "工作", "任务", "deadline"] - 推广类:["优惠", "折扣", "促销", "限时", "购买", "下单"] ### 3. 优先级排序 (ai_prioritize.py) **评分维度:** | 维度 | 权重 | 说明 | |------|------|------| | 紧急关键词 | +30 | 包含"紧急"、"ASAP"、"截止"等 | | 重要发件人 | +25 | 发件人包含 boss、manager、hr 等 | | 时效性 | +20 | 2小时内新邮件得高分 | | 未读状态 | +10 | 未读邮件优先级更高 | | 附件存在 | +5 | 有附件通常更重要 | | 请求/问题 | +10 | 主题包含问号或请求 | **优先级映射:** - **紧急 (70-100分)**:需立即处理(今天内) - **高 (50-69分)**:24 小时内处理 - **中 (30-49分)**:本周内处理 - **低 (0-29分)**:可延后处理 ### 4. 待办事项提取 (ai_extract_todos.py) **提取策略:** 1. **正则模式匹配**:识别常见任务句式 - "请 (.+?)[。.!]" - "需要 (.+?)[。.!]" - "记得 (.+?)[。.!]" 2. **AI 深度理解**:大模型语义分析 **日期识别:** - 支持多种日期格式:YYYY-MM-DD、MM月DD日、今天/明天 - 自动标准化为 YYYY-MM-DD 格式 **优先级评估:** - 包含"紧急"、"ASAP" → high - 包含"最好"、"有空" → low - 默认 → medium ## 性能优化 ### 缓存机制 - 邮件内容缓存避免重复下载 - 分类结果缓存减少 AI 调用 ### 批量处理 - 支持批量邮件处理 - 异步并行处理(未来扩展) ### 降级策略 - AI 服务不可用时自动降级到规则引擎 - 保证基础功能可用性 ## 使用建议 ### 日常使用场景 **早晨快速处理:** ```bash # 获取未读邮件摘要和待办 python scripts/ai_organize.py --unread --limit 20 ``` **周末清理收件箱:** ```bash # 分类所有邮件,批量删除推广邮件 python scripts/ai_classify.py --limit 100 python scripts/manage_email.py --id "msg_001,msg_002" --action delete ``` **查找重要邮件:** ```bash # 搜索老板邮件并评估优先级 python scripts/search_emails.py --from "[email protected]" --unread python scripts/ai_prioritize.py --email-ids "msg_001,msg_002" ``` ### AI 功能配置 **获取 DashScope API Key:** 1. 访问 [阿里云百炼控制台](https://bailian.console.aliyun.com/) 2. 开通 DashScope 服务 3. 在 API Key 管理页面创建 Key **环境变量设置:** ```bash export DASHSCOPE_API_KEY="sk-xxxxxxxxxxxxxxxxxxxxxxxx" ``` **无 AI 模式:** ```bash # 禁用 AI,仅使用规则引擎 python scripts/ai_extract_todos.py --no-ai ``` ## 扩展性 ### 自定义分类 修改 `ai_classify.py` 中的 `CATEGORIES` 和 `KEYWORDS` 字典。 ### 自定义优先级规则 调整 `ai_prioritize.py` 中的 `URGENT_KEYWORDS` 和 `IMPORTANT_SENDER_PATTERNS`。 ### 添加新功能 在 `scripts/` 目录下创建新的 Python 脚本,遵循现有代码风格。 ## 技术依赖 - **Python 3.8+** - **imaplib** - IMAP 协议支持 - **smtplib** - SMTP 协议支持 - **dashscope** - 阿里云大模型 API(可选) - **email** - 邮件解析 ## 限制与注意事项 1. **API 调用限制**:DashScope 有调用频率限制 2. **邮件大小限制**:正文截断到 2000 字符用于 AI 处理 3. **中文支持**:完全支持中文邮件处理 4. **安全性**:敏感信息通过环境变量配置,不硬编码 FILE:references/api_reference.md # Reference Documentation for Qq Email This is a placeholder for detailed reference documentation. Replace with actual reference content or delete if not needed. Example real reference docs from other skills: - product-management/references/communication.md - Comprehensive guide for status updates - product-management/references/context_building.md - Deep-dive on gathering context - bigquery/references/ - API references and query examples ## When Reference Docs Are Useful Reference docs are ideal for: - Comprehensive API documentation - Detailed workflow guides - Complex multi-step processes - Information too lengthy for main SKILL.md - Content that's only needed for specific use cases ## Structure Suggestions ### API Reference Example - Overview - Authentication - Endpoints with examples - Error codes - Rate limits ### Workflow Guide Example - Prerequisites - Step-by-step instructions - Common patterns - Troubleshooting - Best practices FILE:references/qq_email_setup.md # QQ 邮箱配置指南 ## 开启 IMAP/SMTP 服务 ### 步骤 1: 登录 QQ 邮箱 访问 [mail.qq.com](https://mail.qq.com) 并登录你的 QQ 邮箱账号。 ### 步骤 2: 进入设置 1. 点击页面顶部的 **设置** 2. 选择 **账户** 标签页 ### 步骤 3: 开启服务 找到 **POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV 服务** 部分: 1. 开启 **IMAP/SMTP 服务** 2. 开启 **POP3/SMTP 服务**(可选) ### 步骤 4: 获取授权码 点击 **生成授权码**,按提示操作: 1. 可能需要短信验证 2. 获得 16 位授权码(类似:`abcd1234efgh5678`) **重要:** 授权码只显示一次,请妥善保存! ## 服务器配置 ### IMAP 设置(接收邮件) | 配置项 | 值 | |--------|-----| | 服务器 | `imap.qq.com` | | 端口 | `993` | | 加密 | `SSL` | | 用户名 | 完整 QQ 邮箱地址 | | 密码 | 授权码(不是 QQ 密码) | ### SMTP 设置(发送邮件) | 配置项 | 值 | |--------|-----| | 服务器 | `smtp.qq.com` | | 端口 | `465` | | 加密 | `SSL` | | 用户名 | 完整 QQ 邮箱地址 | | 密码 | 授权码(不是 QQ 密码) | ## 环境变量配置 ### Linux/macOS 在 `~/.bashrc` 或 `~/.zshrc` 中添加: ```bash export QQ_EMAIL="[email protected]" export QQ_EMAIL_AUTH_CODE="your_16_char_auth_code" export DASHSCOPE_API_KEY="your_dashscope_api_key" # AI 功能需要 ``` 然后执行: ```bash source ~/.bashrc # 或 source ~/.zshrc ``` ### Windows 在系统环境变量中添加: 1. 右键 **此电脑** → **属性** → **高级系统设置** 2. 点击 **环境变量** 3. 新建系统变量: - `QQ_EMAIL` = `[email protected]` - `QQ_EMAIL_AUTH_CODE` = `your_auth_code` - `DASHSCOPE_API_KEY` = `your_api_key` ### 临时配置(当前会话) ```bash export QQ_EMAIL="[email protected]" export QQ_EMAIL_AUTH_CODE="your_auth_code" ``` ## 测试连接 ### 测试 IMAP 连接 ```bash python scripts/fetch_emails.py --limit 1 ``` 预期输出: ```json { "success": true, "folder": "INBOX", "count": 1, "emails": [...] } ``` ### 测试 SMTP 发送 ```bash python scripts/send_email.py \ --to "[email protected]" \ --subject "测试邮件" \ --body "这是一封测试邮件" ``` ## 常见问题 ### Q1: 登录失败/认证错误 **可能原因:** - 使用了 QQ 登录密码而非授权码 - 授权码已过期 - IMAP/SMTP 服务未开启 **解决方案:** 1. 确认使用的是 16 位授权码 2. 重新生成授权码 3. 检查是否开启了 IMAP/SMTP 服务 ### Q2: 连接超时 **可能原因:** - 防火墙阻止了 993/465 端口 - 网络问题 **解决方案:** ```bash # 测试端口连通性 telnet imap.qq.com 993 telnet smtp.qq.com 465 ``` ### Q3: 中文乱码 确保终端使用 UTF-8 编码: ```bash # Linux/macOS export LANG=en_US.UTF-8 # Windows PowerShell [Console]::OutputEncoding = [System.Text.Encoding]::UTF8 ``` ### Q4: AI 功能不可用 需要配置通义千问 API Key: 1. 访问 [阿里云百炼](https://bailian.console.aliyun.com/) 2. 开通 DashScope 服务 3. 创建 API Key 4. 设置环境变量:`export DASHSCOPE_API_KEY="sk-xxx"` ## 安全建议 1. **不要泄露授权码** - 授权码等同于密码 2. **定期更换授权码** - 建议每 3-6 个月更换 3. **使用环境变量** - 不要在代码中硬编码敏感信息 4. **限制脚本权限** - 确保脚本文件只有所有者可执行 ## 文件夹结构 QQ 邮箱标准文件夹名称(IMAP): | 文件夹 | 说明 | IMAP 名称 | |--------|------|----------| | 收件箱 | 收到的邮件 | `INBOX` | | 已发送 | 发出的邮件 | `Sent Messages` | | 草稿箱 | 未完成的邮件 | `Drafts` | | 垃圾箱 | 垃圾邮件 | `Junk` | | 已删除 | 删除的邮件 | `Deleted Messages` | | 归档 | 归档邮件 | `Archive` | 自定义文件夹可以直接使用中文名称。 FILE:scripts/ai_classify.py #!/usr/bin/env python3 """ AI 邮件智能分类器 自动将邮件分类到不同类别 """ import os import json import argparse from typing import List, Dict from fetch_emails import QQEmailClient class EmailClassifier(QQEmailClient): """邮件分类器""" # 预定义类别 CATEGORIES = { "work": "工作相关(会议、项目、汇报、同事沟通)", "important": "重要邮件(老板、客户、紧急事项)", "promotion": "推广营销(广告、优惠、电商)", "social": "社交通知(朋友圈、社交软件、活动邀请)", "newsletter": "订阅邮件(资讯、周报、公众号)", "finance": "财务金融(银行、账单、发票、报销)", "travel": "出行旅游(机票、酒店、订单确认)", "spam": "垃圾邮件(可疑、诈骗、无关内容)" } # 关键词规则(快速分类) KEYWORDS = { "work": ["会议", "项目", "汇报", "工作", "任务", "deadline", "进度", "需求", "方案"], "promotion": ["优惠", "折扣", "促销", "限时", "购买", "下单", "优惠券", "满减"], "finance": ["银行", "账单", "发票", "报销", "付款", "收款", "转账", "余额"], "travel": ["机票", "酒店", "订单", "出行", "航班", "高铁", "预订", "行程"], "spam": ["中奖", "恭喜", "领取", "验证", "账户异常", "点击链接"] } def classify_with_rules(self, subject: str, body: str, sender: str) -> Dict: """基于规则快速分类""" text = f"{subject} {body[:500]} {sender}".lower() scores = {} for category, keywords in self.KEYWORDS.items(): score = sum(1 for kw in keywords if kw.lower() in text) scores[category] = score if max(scores.values()) > 0: best_category = max(scores, key=scores.get) confidence = min(scores[best_category] / 5.0, 0.95) return { "category": best_category, "confidence": confidence, "method": "rules" } return None def classify_with_ai(self, subject: str, body: str, sender: str) -> Dict: """使用 AI 分类""" categories_desc = "\n".join([f"- {k}: {v}" for k, v in self.CATEGORIES.items()]) prompt = f"""请分析以下邮件并分类: 发件人:{sender} 主题:{subject} 内容摘要: {body[:1000]} 可选类别: {categories_desc} 要求: 1. 选择最匹配的一个类别 2. 给出置信度 (0-1) 3. 说明分类理由 输出 JSON 格式: {{ "category": "类别名", "confidence": 0.95, "reason": "分类理由" }} """ try: import dashscope from dashscope import Generation api_key = os.getenv("DASHSCOPE_API_KEY") response = Generation.call( model="qwen-plus", api_key=api_key, messages=[{"role": "user", "content": prompt}], result_format="message" ) if response.status_code == 200: content = response.output.choices[0].message.content import re json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: result = json.loads(json_match.group()) if result.get("category") in self.CATEGORIES: result["method"] = "ai" return result # AI 失败,降级到规则 return self.classify_with_rules(subject, body, sender) or { "category": "social", "confidence": 0.5, "reason": "无法明确分类,默认为社交类", "method": "default" } except Exception as e: print(f"AI 分类失败:{e}") return self.classify_with_rules(subject, body, sender) or { "category": "social", "confidence": 0.5, "reason": "AI 不可用,使用默认分类", "method": "fallback" } def classify_emails( self, folder: str = "INBOX", limit: int = 50, email_ids: List[str] = None ) -> List[Dict]: """批量分类邮件""" # 获取邮件 if email_ids: emails = [] self.conn.select(folder) for eid in email_ids: status, msg_data = self.conn.fetch(eid.encode(), "(RFC822)") if status == "OK": email_info = self.parse_email(msg_data[0][1]) email_info["id"] = eid emails.append(email_info) else: emails = self.fetch_emails(folder, limit, False) # 分类 results = [] for email in emails: print(f"分类:{email.get('subject', '无主题')}...") # 先尝试规则分类 rule_result = self.classify_with_rules( email.get("subject", ""), email.get("body_text", ""), email.get("sender", "") ) # 规则不确定时用 AI if rule_result and rule_result["confidence"] > 0.7: classification = rule_result else: classification = self.classify_with_ai( email.get("subject", ""), email.get("body_text", ""), email.get("sender", "") ) results.append({ "id": email["id"], "subject": email["subject"], "sender": email["sender"], "date": email["date"], **classification }) return results def main(): parser = argparse.ArgumentParser(description="AI 邮件分类") parser.add_argument("--folder", default="INBOX", help="文件夹") parser.add_argument("--limit", type=int, default=50, help="处理数量") parser.add_argument("--email-ids", help="指定邮件 ID(逗号分隔)") parser.add_argument("--output", help="输出文件路径") parser.add_argument("--email", help="邮箱地址") parser.add_argument("--auth-code", help="授权码") args = parser.parse_args() email_ids = None if args.email_ids: email_ids = [eid.strip() for eid in args.email_ids.split(",")] try: classifier = EmailClassifier(args.email, args.auth_code) with classifier: results = classifier.classify_emails( folder=args.folder, limit=args.limit, email_ids=email_ids ) # 统计分类结果 stats = {} for r in results: cat = r["category"] stats[cat] = stats.get(cat, 0) + 1 result = { "success": True, "total": len(results), "statistics": stats, "classifications": results } output = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(output) print(f"结果已保存到:{args.output}") else: print(output) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/ai_extract_todos.py #!/usr/bin/env python3 """ AI 待办事项提取器 从邮件中自动提取待办任务和截止日期 """ import os import json import argparse import re from datetime import datetime from typing import List, Dict from fetch_emails import QQEmailClient class TodoExtractor(QQEmailClient): """待办事项提取器""" # 任务关键词 TASK_PATTERNS = [ r"请 (.+?)[。.!]", r"需要 (.+?)[。.!]", r"记得 (.+?)[。.!]", r"安排 (.+?)[。.!]", r"准备 (.+?)[。.!]", r"完成 (.+?)[。.!]", r"提交 (.+?)[。.!]", r"处理 (.+?)[。.!]", r"todo: (.+)", r"task: (.+)", r"action item: (.+)", ] # 日期时间模式 DATE_PATTERNS = [ r"(\d{4}[-/]\d{1,2}[-/]\d{1,2})", r"(\d{1,2}月\d{1,2}日)", r"(\d{1,2}:\d{2})", r"(今天 | 明天 | 后天 | 本周五 | 下周一)", r"(本周 | 下周 | 月底 | 月初)", ] def extract_date(self, text: str) -> str: """从文本中提取日期""" for pattern in self.DATE_PATTERNS: match = re.search(pattern, text) if match: date_str = match.group(1) if match.lastindex else match.group(0) return self._normalize_date(date_str) return None def _normalize_date(self, date_str: str) -> str: """标准化日期格式""" # 简单处理,实际可更复杂 if "月" in date_str and "日" in date_str: # "3 月 25 日" -> "2026-03-25" match = re.match(r"(\d{1,2}) 月 (\d{1,2}) 日", date_str) if match: month, day = match.groups() year = datetime.now().year return f"{year}-{month.zfill(2)}-{day.zfill(2)}" if "今天" in date_str: return datetime.now().strftime("%Y-%m-%d") elif "明天" in date_str: from datetime import timedelta return (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d") elif "后天" in date_str: from datetime import timedelta return (datetime.now() + timedelta(days=2)).strftime("%Y-%m-%d") # YYYY-MM-DD 格式 if re.match(r"\d{4}-\d{2}-\d{2}", date_str): return date_str return date_str def extract_todos_from_email(self, email: Dict) -> List[Dict]: """从单封邮件提取待办""" subject = email.get("subject", "") body = email.get("body_text", "") text = f"{subject}\n{body}" todos = [] # 提取任务 for pattern in self.TASK_PATTERNS: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: task_text = match.strip() if len(task_text) > 5 and len(task_text) < 200: # 过滤太短或太长的 # 尝试从上下文中提取截止日期 deadline = self.extract_date(text) # 评估优先级 priority = "medium" if any(kw in task_text.lower() for kw in ["紧急", "急", "asap", "today"]): priority = "high" elif any(kw in task_text.lower() for kw in ["最好", "有空", "方便"]): priority = "low" todos.append({ "task": task_text, "deadline": deadline, "priority": priority, "source": "pattern" }) # 去重 seen = set() unique_todos = [] for todo in todos: task_key = todo["task"][:50] if task_key not in seen: seen.add(task_key) unique_todos.append(todo) return unique_todos def extract_todos_with_ai(self, email: Dict) -> List[Dict]: """使用 AI 提取待办(更准确)""" subject = email.get("subject", "") body = email.get("body_text", "")[:2000] prompt = f"""请从以下邮件中提取所有待办事项和任务: 主题:{subject} 内容: {body} 要求: 1. 提取所有明确的任务/待办事项 2. 识别截止日期(如果有) 3. 评估优先级 (high/medium/low) 输出 JSON 数组格式: [ {{ "task": "具体任务描述", "deadline": "YYYY-MM-DD" 或 null, "priority": "high|medium|low" }} ] 如果没有明确任务,返回空数组 []。 """ try: import dashscope from dashscope import Generation api_key = os.getenv("DASHSCOPE_API_KEY") response = Generation.call( model="qwen-plus", api_key=api_key, messages=[{"role": "user", "content": prompt}], result_format="message" ) if response.status_code == 200: content = response.output.choices[0].message.content json_match = re.search(r'\[.*\]', content, re.DOTALL) if json_match: todos = json.loads(json_match.group()) for todo in todos: todo["source"] = "ai" return todos except Exception as e: print(f"AI 提取失败:{e}") # 降级到规则提取 return self.extract_todos_from_email(email) def extract_all( self, folder: str = "INBOX", limit: int = 50, email_ids: List[str] = None, use_ai: bool = True ) -> List[Dict]: """批量提取待办""" # 获取邮件 if email_ids: emails = [] self.conn.select(folder) for eid in email_ids: status, msg_data = self.conn.fetch(eid.encode(), "(RFC822)") if status == "OK": email_info = self.parse_email(msg_data[0][1]) email_info["id"] = eid emails.append(email_info) else: emails = self.fetch_emails(folder, limit, False) # 提取待办 results = [] for email in emails: print(f"提取:{email.get('subject', '无主题')}...") if use_ai: todos = self.extract_todos_with_ai(email) else: todos = self.extract_todos_from_email(email) if todos: results.append({ "email_id": email["id"], "subject": email["subject"], "sender": email["sender"], "date": email["date"], "todos": todos, "todo_count": len(todos) }) # 按待办数量排序 results.sort(key=lambda x: -x["todo_count"]) return results def main(): parser = argparse.ArgumentParser(description="AI 待办事项提取") parser.add_argument("--folder", default="INBOX", help="文件夹") parser.add_argument("--limit", type=int, default=50, help="处理数量") parser.add_argument("--email-ids", help="指定邮件 ID(逗号分隔)") parser.add_argument("--no-ai", action="store_true", help="不使用 AI,仅规则提取") parser.add_argument("--output", help="输出文件路径") parser.add_argument("--email", help="邮箱地址") parser.add_argument("--auth-code", help="授权码") args = parser.parse_args() email_ids = None if args.email_ids: email_ids = [eid.strip() for eid in args.email_ids.split(",")] try: extractor = TodoExtractor(args.email, args.auth_code) with extractor: results = extractor.extract_all( folder=args.folder, limit=args.limit, email_ids=email_ids, use_ai=not args.no_ai ) # 统计 total_todos = sum(r["todo_count"] for r in results) result = { "success": True, "emails_with_todos": len(results), "total_todos": total_todos, "todos_by_email": results } output = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(output) print(f"结果已保存到:{args.output}") else: print(output) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/ai_organize.py #!/usr/bin/env python3 """ AI 邮件一键智能整理 整合摘要、分类、优先级、待办提取的完整流程 """ import os import sys import json import argparse from datetime import datetime from typing import List, Dict # 导入其他模块 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from fetch_emails import QQEmailClient from ai_summarize import AISummarizer from ai_classify import EmailClassifier from ai_prioritize import EmailPrioritizer from ai_extract_todos import TodoExtractor class EmailOrganizer(QQEmailClient): """邮件智能整理器""" def organize( self, folder: str = "INBOX", limit: int = 30, unread_only: bool = False, email_ids: List[str] = None ) -> Dict: """ 一键智能整理邮件 执行流程: 1. 获取邮件列表 2. 生成摘要 3. 智能分类 4. 优先级排序 5. 提取待办 6. 生成整理报告 """ print(f"📧 开始整理邮箱...\n") # 1. 获取邮件 print("Step 1: 获取邮件列表...") if email_ids: emails = [] self.conn.select(folder) for eid in email_ids: status, msg_data = self.conn.fetch(eid.encode(), "(RFC822)") if status == "OK": email_info = self.parse_email(msg_data[0][1]) email_info["id"] = eid emails.append(email_info) else: emails = self.fetch_emails(folder, limit, unread_only) print(f" 获取到 {len(emails)} 封邮件\n") if not emails: return { "success": True, "message": "没有邮件需要整理", "timestamp": datetime.now().isoformat() } # 2. 生成摘要 print("Step 2: 生成 AI 摘要...") summarizer = AISummarizer(self.email, self.auth_code) summaries = {} for i, email in enumerate(emails, 1): print(f" [{i}/{len(emails)}] {email.get('subject', '无主题')[:30]}...") summary_result = summarizer.generate_summary( email.get("subject", ""), email.get("body_text", "") ) summaries[email["id"]] = summary_result.get("summary", {}) print(f" ✓ 完成 {len(summaries)} 封邮件摘要\n") # 3. 智能分类 print("Step 3: 智能分类...") classifier = EmailClassifier(self.email, self.auth_code) classifications = {} category_stats = {} for i, email in enumerate(emails, 1): classification = classifier.classify_with_ai( email.get("subject", ""), email.get("body_text", ""), email.get("sender", "") ) classifications[email["id"]] = classification cat = classification["category"] category_stats[cat] = category_stats.get(cat, 0) + 1 print(f" ✓ 完成分类:{category_stats}\n") # 4. 优先级排序 print("Step 4: 优先级评估...") prioritizer = EmailPrioritizer(self.email, self.auth_code) priorities = {} priority_stats = {"urgent": 0, "high": 0, "medium": 0, "low": 0} for email in emails: priority_info = prioritizer.calculate_priority(email) priorities[email["id"]] = priority_info priority_stats[priority_info["priority"]] += 1 print(f" ✓ 优先级分布:紧急{priority_stats['urgent']} 高{priority_stats['high']} " f"中{priority_stats['medium']} 低{priority_stats['low']}\n") # 5. 提取待办 print("Step 5: 提取待办事项...") extractor = TodoExtractor(self.email, self.auth_code) todos_by_email = {} total_todos = 0 for email in emails: todos = extractor.extract_todos_with_ai(email) if todos: todos_by_email[email["id"]] = todos total_todos += len(todos) print(f" ✓ 从 {len(todos_by_email)} 封邮件中提取 {total_todos} 个待办\n") # 6. 生成整理报告 print("Step 6: 生成整理报告...") organized_emails = [] for email in emails: eid = email["id"] organized_emails.append({ "id": eid, "subject": email["subject"], "sender": email["sender"], "date": email["date"], "is_unread": "\\Seen" not in email.get("flags", []), "summary": summaries.get(eid, {}), "classification": classifications.get(eid, {}), "priority": priorities.get(eid, {}), "todos": todos_by_email.get(eid, []) }) # 按优先级排序 priority_order = {"urgent": 0, "high": 1, "medium": 2, "low": 3} organized_emails.sort(key=lambda x: priority_order.get( x["priority"].get("priority", "low"), 3 )) report = { "success": True, "timestamp": datetime.now().isoformat(), "summary": { "total_emails": len(emails), "unread_count": sum(1 for e in emails if "\\Seen" not in e.get("flags", [])), "category_distribution": category_stats, "priority_distribution": priority_stats, "todos_count": total_todos, "emails_with_todos": len(todos_by_email) }, "emails": organized_emails } print("✓ 整理完成!\n") return report def main(): parser = argparse.ArgumentParser(description="AI 邮件一键智能整理") parser.add_argument("--folder", default="INBOX", help="文件夹") parser.add_argument("--limit", type=int, default=30, help="处理数量") parser.add_argument("--unread", action="store_true", help="仅未读邮件") parser.add_argument("--email-ids", help="指定邮件 ID(逗号分隔)") parser.add_argument("--output", help="输出报告文件路径") parser.add_argument("--email", help="邮箱地址") parser.add_argument("--auth-code", help="授权码") args = parser.parse_args() email_ids = None if args.email_ids: email_ids = [eid.strip() for eid in args.email_ids.split(",")] try: organizer = EmailOrganizer(args.email, args.auth_code) with organizer: report = organizer.organize( folder=args.folder, limit=args.limit, unread_only=args.unread, email_ids=email_ids ) output = json.dumps(report, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(output) print(f"📄 报告已保存到:{args.output}") else: print(output) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/ai_prioritize.py #!/usr/bin/env python3 """ AI 邮件优先级排序 根据内容、发件人、时间等智能评估邮件优先级 """ import os import json import argparse from datetime import datetime, timedelta from typing import List, Dict from fetch_emails import QQEmailClient class EmailPrioritizer(QQEmailClient): """邮件优先级评估器""" # 优先级定义 PRIORITIES = { "urgent": { "label": "紧急", "description": "需立即处理(今天内)", "color": "🔴" }, "high": { "label": "高", "description": "24 小时内处理", "color": "🟠" }, "medium": { "label": "中", "description": "本周内处理", "color": "🟡" }, "low": { "label": "低", "description": "可延后处理", "color": "🟢" } } # 紧急关键词 URGENT_KEYWORDS = [ "紧急", "急", "尽快", "立即", "马上", "today", "ASAP", "截止", "deadline", "超时", "过期", "最后" ] # 重要发件人模式(可配置) IMPORTANT_SENDER_PATTERNS = ["boss", "ceo", "manager", "hr", "finance"] def calculate_priority(self, email: Dict) -> Dict: """ 计算邮件优先级 考虑因素: 1. 关键词紧急程度 2. 发件人重要性 3. 邮件时效性 4. 是否未读 5. 是否有附件 """ score = 0 # 0-100 分 reasons = [] subject = email.get("subject", "").lower() body = email.get("body_text", "").lower()[:1000] sender = email.get("sender", "").lower() text = f"{subject} {body}" # 1. 紧急关键词 (+30 分) urgent_count = sum(1 for kw in self.URGENT_KEYWORDS if kw.lower() in text) if urgent_count >= 3: score += 30 reasons.append(f"包含多个紧急关键词 ({urgent_count}个)") elif urgent_count >= 1: score += 15 reasons.append(f"包含紧急关键词") # 2. 重要发件人 (+25 分) for pattern in self.IMPORTANT_SENDER_PATTERNS: if pattern in sender: score += 25 reasons.append(f"重要发件人 ({pattern})") break # 3. 时效性 (+20 分) try: email_date = datetime.strptime(email.get("date", ""), "%Y-%m-%d %H:%M:%S") hours_old = (datetime.now() - email_date).total_seconds() / 3600 if hours_old < 2: score += 20 reasons.append("2 小时内的新邮件") elif hours_old < 24: score += 10 reasons.append("24 小时内的邮件") except: pass # 4. 未读邮件 (+10 分) if "\\Seen" not in email.get("flags", []): score += 10 reasons.append("未读邮件") # 5. 有附件 (+5 分) if email.get("attachments"): score += 5 reasons.append("包含附件") # 6. 主题包含问号/请求 (+10 分) if "?" in subject or "请" in subject or "help" in subject.lower(): score += 10 reasons.append("包含请求/问题") # 确定优先级 if score >= 70: priority = "urgent" elif score >= 50: priority = "high" elif score >= 30: priority = "medium" else: priority = "low" return { "priority": priority, "priority_label": self.PRIORITIES[priority]["label"], "priority_emoji": self.PRIORITIES[priority]["color"], "score": min(score, 100), "reasons": reasons } def prioritize_emails( self, folder: str = "INBOX", limit: int = 50, email_ids: List[str] = None ) -> List[Dict]: """批量评估邮件优先级""" # 获取邮件 if email_ids: emails = [] self.conn.select(folder) for eid in email_ids: status, msg_data = self.conn.fetch(eid.encode(), "(RFC822)") if status == "OK": email_info = self.parse_email(msg_data[0][1]) email_info["id"] = eid emails.append(email_info) else: emails = self.fetch_emails(folder, limit, False) # 评估优先级 results = [] for email in emails: priority_info = self.calculate_priority(email) results.append({ "id": email["id"], "subject": email["subject"], "sender": email["sender"], "date": email["date"], "is_unread": "\\Seen" not in email.get("flags", []), "has_attachments": len(email.get("attachments", [])) > 0, **priority_info }) # 按优先级排序 priority_order = {"urgent": 0, "high": 1, "medium": 2, "low": 3} results.sort(key=lambda x: (priority_order[x["priority"]], -x["score"])) return results def main(): parser = argparse.ArgumentParser(description="AI 邮件优先级排序") parser.add_argument("--folder", default="INBOX", help="文件夹") parser.add_argument("--limit", type=int, default=50, help="处理数量") parser.add_argument("--email-ids", help="指定邮件 ID(逗号分隔)") parser.add_argument("--output", help="输出文件路径") parser.add_argument("--email", help="邮箱地址") parser.add_argument("--auth-code", help="授权码") args = parser.parse_args() email_ids = None if args.email_ids: email_ids = [eid.strip() for eid in args.email_ids.split(",")] try: prioritizer = EmailPrioritizer(args.email, args.auth_code) with prioritizer: results = prioritizer.prioritize_emails( folder=args.folder, limit=args.limit, email_ids=email_ids ) # 统计优先级分布 stats = {} for r in results: p = r["priority"] stats[p] = stats.get(p, 0) + 1 result = { "success": True, "total": len(results), "statistics": { p: { "count": stats.get(p, 0), "label": EmailPrioritizer.PRIORITIES[p]["label"], "emoji": EmailPrioritizer.PRIORITIES[p]["color"] } for p in ["urgent", "high", "medium", "low"] }, "emails": results } output = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(output) print(f"结果已保存到:{args.output}") else: print(output) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/ai_summarize.py #!/usr/bin/env python3 """ AI 邮件摘要生成器 使用大模型为邮件生成简洁摘要 """ import os import json import argparse from typing import List, Dict from fetch_emails import QQEmailClient class AISummarizer(QQEmailClient): """AI 邮件摘要器""" def __init__(self, email: str = None, auth_code: str = None, model: str = "qwen"): super().__init__(email, auth_code) self.model = model self.api_key = os.getenv("DASHSCOPE_API_KEY") def generate_summary(self, subject: str, body: str, max_length: int = 200) -> Dict: """ 生成邮件摘要 Args: subject: 邮件主题 body: 邮件正文 max_length: 摘要最大长度 """ # 构建提示词 prompt = f"""请为以下邮件生成简洁摘要: 主题:{subject} 内容: {body[:2000]} 要求: 1. 用一句话概括邮件核心内容(50 字以内) 2. 提取 3-5 个关键信息点(时间、地点、任务等) 3. 如果有明确的行动要求,请标注 输出 JSON 格式: {{ "one_sentence": "一句话摘要", "key_points": ["关键点 1", "关键点 2", ...], "action_required": "需要做什么" 或 null }} """ try: # 调用通义千问 API import dashscope from dashscope import Generation response = Generation.call( model="qwen-plus", api_key=self.api_key, messages=[{"role": "user", "content": prompt}], result_format="message" ) if response.status_code == 200: content = response.output.choices[0].message.content # 提取 JSON import re json_match = re.search(r'\{.*\}', content, re.DOTALL) if json_match: summary_data = json.loads(json_match.group()) return { "success": True, "summary": summary_data } # 备用方案:简单摘要 return self._simple_summary(subject, body, max_length) except Exception as e: # 降级到简单摘要 print(f"AI 摘要失败,使用简单摘要:{e}") return self._simple_summary(subject, body, max_length) def _simple_summary(self, subject: str, body: str, max_length: int = 200) -> Dict: """简单摘要(不使用 AI)""" # 提取前 200 字 clean_body = body.replace('\n', ' ').strip() summary = clean_body[:max_length] + "..." if len(clean_body) > max_length else clean_body # 提取可能的时间点 import re time_pattern = r'\d{4}[-/]\d{1,2}[-/]\d{1,2}[日天]?|\d{1,2}:\d{2}' times = re.findall(time_pattern, body[:500]) return { "success": True, "summary": { "one_sentence": summary, "key_points": times[:3] if times else ["无明确时间点"], "action_required": None } } def summarize_emails( self, folder: str = "INBOX", limit: int = 10, unread_only: bool = False, email_ids: List[str] = None ) -> List[Dict]: """ 批量摘要邮件 Args: folder: 文件夹 limit: 数量限制 unread_only: 仅未读 email_ids: 指定邮件 ID 列表 """ # 获取邮件 if email_ids: emails = [] self.conn.select(folder) for eid in email_ids: status, msg_data = self.conn.fetch(eid.encode(), "(RFC822)") if status == "OK": email_info = self.parse_email(msg_data[0][1]) email_info["id"] = eid emails.append(email_info) else: emails = self.fetch_emails(folder, limit, unread_only) # 生成摘要 results = [] for email in emails: print(f"处理:{email.get('subject', '无主题')}...") summary_result = self.generate_summary( email.get("subject", ""), email.get("body_text", "") ) results.append({ "id": email["id"], "subject": email["subject"], "sender": email["sender"], "date": email["date"], "summary": summary_result.get("summary", {}), "ai_success": summary_result.get("success", False) }) return results def main(): parser = argparse.ArgumentParser(description="AI 邮件摘要生成") parser.add_argument("--folder", default="INBOX", help="文件夹") parser.add_argument("--limit", type=int, default=10, help="处理数量") parser.add_argument("--unread", action="store_true", help="仅未读邮件") parser.add_argument("--email-ids", help="指定邮件 ID(逗号分隔)") parser.add_argument("--output", help="输出文件路径") parser.add_argument("--email", help="邮箱地址") parser.add_argument("--auth-code", help="授权码") args = parser.parse_args() email_ids = None if args.email_ids: email_ids = [eid.strip() for eid in args.email_ids.split(",")] try: summarizer = AISummarizer(args.email, args.auth_code) with summarizer: results = summarizer.summarize_emails( folder=args.folder, limit=args.limit, unread_only=args.unread, email_ids=email_ids ) result = { "success": True, "total": len(results), "summaries": results } output = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(output) print(f"结果已保存到:{args.output}") else: print(output) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/example.py #!/usr/bin/env python3 """ Example helper script for qq-email This is a placeholder script that can be executed directly. Replace with actual implementation or delete if not needed. Example real scripts from other skills: - pdf/scripts/fill_fillable_fields.py - Fills PDF form fields - pdf/scripts/convert_pdf_to_images.py - Converts PDF pages to images """ def main(): print("This is an example script for qq-email") # TODO: Add actual script logic here # This could be data processing, file conversion, API calls, etc. if __name__ == "__main__": main() FILE:scripts/fetch_emails.py #!/usr/bin/env python3 """ QQ 邮箱邮件获取工具 支持读取收件箱、未读邮件、指定文件夹 """ import os import json import argparse from datetime import datetime from typing import List, Dict, Optional import imaplib import email from email.header import decode_header from email.utils import parsedate_to_datetime class QQEmailClient: """QQ 邮箱 IMAP 客户端""" IMAP_SERVER = "imap.qq.com" IMAP_PORT = 993 def __init__(self, email: str = None, auth_code: str = None): self.email = email or os.getenv("QQ_EMAIL") self.auth_code = auth_code or os.getenv("QQ_EMAIL_AUTH_CODE") self.conn = None if not self.email or not self.auth_code: raise ValueError("请设置 QQ_EMAIL 和 QQ_EMAIL_AUTH_CODE 环境变量") def connect(self): """连接 IMAP 服务器""" self.conn = imaplib.IMAP4_SSL(self.IMAP_SERVER, self.IMAP_PORT) self.conn.login(self.email, self.auth_code) return self def disconnect(self): """断开连接""" if self.conn: self.conn.logout() def __enter__(self): return self.connect() def __exit__(self, exc_type, exc_val, exc_tb): self.disconnect() def decode_str(self, s: str) -> str: """解码邮件头""" if not s: return "" decoded = decode_header(s) result = [] for value, charset in decoded: if isinstance(value, bytes): result.append(value.decode(charset or 'utf-8', errors='ignore')) else: result.append(value) return ''.join(result) def parse_email(self, msg_data: bytes) -> Dict: """解析邮件内容""" msg = email.message_from_bytes(msg_data) # 基本信息 subject = self.decode_str(msg.get("Subject", "")) sender = self.decode_str(msg.get("From", "")) to = self.decode_str(msg.get("To", "")) date_str = msg.get("Date", "") # 解析日期 try: date = parsedate_to_datetime(date_str) date_formatted = date.strftime("%Y-%m-%d %H:%M:%S") except: date_formatted = date_str # 获取正文 body_text = "" body_html = "" attachments = [] if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition", "")) # 附件 if "attachment" in content_disposition: filename = part.get_filename() if filename: attachments.append(self.decode_str(filename)) # 正文 elif content_type == "text/plain" and not body_text: try: body_text = part.get_payload(decode=True).decode('utf-8', errors='ignore') except: pass elif content_type == "text/html" and not body_html: try: body_html = part.get_payload(decode=True).decode('utf-8', errors='ignore') except: pass else: content_type = msg.get_content_type() try: body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') if content_type == "text/html": body_html = body else: body_text = body except: pass return { "subject": subject, "sender": sender, "to": to, "date": date_formatted, "body_text": body_text[:5000] if body_text else "", # 限制长度 "body_html": body_html[:10000] if body_html else "", "attachments": attachments, "flags": [] } def fetch_emails( self, folder: str = "INBOX", limit: int = 10, unread_only: bool = False, since: str = None ) -> List[Dict]: """ 获取邮件列表 Args: folder: 文件夹名称 limit: 返回数量限制 unread_only: 仅未读邮件 since: 起始日期 (YYYY-MM-DD) """ # 选择文件夹 status, _ = self.conn.select(folder) if status != "OK": raise Exception(f"无法选择文件夹: {folder}") # 构建搜索条件 search_criteria = ["ALL"] if unread_only: search_criteria = ["UNSEEN"] if since: date_obj = datetime.strptime(since, "%Y-%m-%d") search_criteria.append("SINCE") search_criteria.append(date_obj.strftime("%d-%b-%Y")) # 搜索邮件 status, messages = self.conn.search(None, *search_criteria) if status != "OK": return [] email_ids = messages[0].split() # 获取最新的 limit 封 email_ids = email_ids[-limit:] if len(email_ids) > limit else email_ids email_ids.reverse() # 最新的在前 emails = [] for eid in email_ids: status, msg_data = self.conn.fetch(eid, "(RFC822)") if status == "OK": email_content = msg_data[0][1] email_info = self.parse_email(email_content) email_info["id"] = eid.decode('utf-8') # 获取邮件状态 status, flags_data = self.conn.fetch(eid, "(FLAGS)") if status == "OK": flags_str = flags_data[0].decode('utf-8') if "\\Seen" in flags_str: email_info["flags"].append("\\Seen") emails.append(email_info) return emails def main(): parser = argparse.ArgumentParser(description="获取 QQ 邮箱邮件") parser.add_argument("--folder", default="INBOX", help="文件夹名称 (默认: INBOX)") parser.add_argument("--limit", type=int, default=10, help="获取数量限制 (默认: 10)") parser.add_argument("--unread", action="store_true", help="仅获取未读邮件") parser.add_argument("--since", help="起始日期 (YYYY-MM-DD)") parser.add_argument("--output", help="输出文件路径 (JSON 格式)") parser.add_argument("--email", help="邮箱地址 (覆盖环境变量)") parser.add_argument("--auth-code", help="授权码 (覆盖环境变量)") args = parser.parse_args() try: client = QQEmailClient(args.email, args.auth_code) with client: emails = client.fetch_emails( folder=args.folder, limit=args.limit, unread_only=args.unread, since=args.since ) result = { "success": True, "folder": args.folder, "count": len(emails), "emails": emails } output = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(output) print(f"结果已保存到: {args.output}") else: print(output) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/manage_email.py #!/usr/bin/env python3 """ QQ 邮箱邮件管理工具 支持标记已读/未读、删除、移动邮件 """ import os import json import argparse from fetch_emails import QQEmailClient class EmailManager(QQEmailClient): """邮件管理器""" def mark_read(self, email_ids: list, folder: str = "INBOX"): """标记为已读""" self.conn.select(folder) for eid in email_ids: self.conn.store(eid.encode(), '+FLAGS', '\\Seen') return {"action": "mark_read", "count": len(email_ids)} def mark_unread(self, email_ids: list, folder: str = "INBOX"): """标记为未读""" self.conn.select(folder) for eid in email_ids: self.conn.store(eid.encode(), '-FLAGS', '\\Seen') return {"action": "mark_unread", "count": len(email_ids)} def delete(self, email_ids: list, folder: str = "INBOX"): """删除邮件(移动到已删除文件夹)""" self.conn.select(folder) for eid in email_ids: self.conn.store(eid.encode(), '+FLAGS', '\\Deleted') self.conn.expunge() return {"action": "delete", "count": len(email_ids)} def move(self, email_ids: list, target_folder: str, source_folder: str = "INBOX"): """移动邮件到指定文件夹""" self.conn.select(source_folder) results = [] for eid in email_ids: # 复制到目标文件夹 status, _ = self.conn.copy(eid.encode(), target_folder) if status == "OK": # 标记原邮件为删除 self.conn.store(eid.encode(), '+FLAGS', '\\Deleted') results.append(eid) self.conn.expunge() return {"action": "move", "count": len(results), "target": target_folder} def list_folders(self): """列出所有文件夹""" status, folders = self.conn.list() if status != "OK": return [] folder_list = [] for folder in folders: if folder: # 解析文件夹名称 parts = folder.decode().split(' "/" ') if len(parts) >= 2: folder_name = parts[-1].strip('"') folder_list.append(folder_name) return folder_list def main(): parser = argparse.ArgumentParser(description="管理 QQ 邮箱邮件") parser.add_argument("--id", required=True, help="邮件 ID(多个用逗号分隔)") parser.add_argument("--action", required=True, choices=["mark_read", "mark_unread", "delete", "move", "list_folders"], help="操作类型") parser.add_argument("--folder", default="INBOX", help="源文件夹") parser.add_argument("--target-folder", help="目标文件夹(move 操作需要)") parser.add_argument("--email", help="邮箱地址") parser.add_argument("--auth-code", help="授权码") args = parser.parse_args() try: manager = EmailManager(args.email, args.auth_code) with manager: if args.action == "list_folders": folders = manager.list_folders() result = { "success": True, "action": "list_folders", "folders": folders } else: email_ids = [eid.strip() for eid in args.id.split(",")] if args.action == "mark_read": action_result = manager.mark_read(email_ids, args.folder) elif args.action == "mark_unread": action_result = manager.mark_unread(email_ids, args.folder) elif args.action == "delete": action_result = manager.delete(email_ids, args.folder) elif args.action == "move": if not args.target_folder: raise ValueError("move 操作需要 --target-folder 参数") action_result = manager.move(email_ids, args.target_folder, args.folder) result = { "success": True, **action_result, "email_ids": email_ids } print(json.dumps(result, ensure_ascii=False, indent=2)) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/search_emails.py #!/usr/bin/env python3 """ QQ 邮箱搜索工具 支持按关键词、发件人、时间等条件搜索邮件 """ import os import json import argparse from datetime import datetime from fetch_emails import QQEmailClient class EmailSearcher(QQEmailClient): """邮件搜索器""" def search( self, folder: str = "INBOX", query: str = None, from_addr: str = None, to_addr: str = None, subject: str = None, since: str = None, before: str = None, unread_only: bool = False, limit: int = 50 ) -> list: """ 搜索邮件 Args: folder: 文件夹 query: 关键词(搜索主题和内容) from_addr: 发件人 to_addr: 收件人 subject: 主题关键词 since: 起始日期 (YYYY-MM-DD) before: 结束日期 (YYYY-MM-DD) unread_only: 仅未读 limit: 返回数量限制 """ # 选择文件夹 status, _ = self.conn.select(folder) if status != "OK": raise Exception(f"无法选择文件夹: {folder}") # 构建搜索条件 search_criteria = [] if unread_only: search_criteria.append("UNSEEN") if from_addr: search_criteria.extend(["FROM", from_addr]) if to_addr: search_criteria.extend(["TO", to_addr]) if subject: search_criteria.extend(["SUBJECT", subject]) if since: date_obj = datetime.strptime(since, "%Y-%m-%d") search_criteria.extend(["SINCE", date_obj.strftime("%d-%b-%Y")]) if before: date_obj = datetime.strptime(before, "%Y-%m-%d") search_criteria.extend(["BEFORE", date_obj.strftime("%d-%b-%Y")]) # 如果没有特定条件,搜索全部 if not search_criteria: search_criteria = ["ALL"] # 搜索邮件 status, messages = self.conn.search(None, *search_criteria) if status != "OK": return [] email_ids = messages[0].split() # 获取邮件详情 emails = [] for eid in email_ids: status, msg_data = self.conn.fetch(eid, "(RFC822)") if status == "OK": email_content = msg_data[0][1] email_info = self.parse_email(email_content) email_info["id"] = eid.decode('utf-8') # 获取邮件状态 status, flags_data = self.conn.fetch(eid, "(FLAGS)") if status == "OK": flags_str = flags_data[0].decode('utf-8') if "\\Seen" in flags_str: email_info["flags"].append("\\Seen") emails.append(email_info) # 如果有 query 关键词,在主题和内容中过滤 if query: query_lower = query.lower() filtered_emails = [] for email in emails: subject_match = query_lower in email.get("subject", "").lower() body_match = query_lower in email.get("body_text", "").lower() if subject_match or body_match: filtered_emails.append(email) emails = filtered_emails # 限制数量,最新的在前 emails.reverse() emails = emails[:limit] return emails def main(): parser = argparse.ArgumentParser(description="搜索 QQ 邮箱邮件") parser.add_argument("--folder", default="INBOX", help="文件夹名称") parser.add_argument("--query", help="关键词(搜索主题和内容)") parser.add_argument("--from", dest="from_addr", help="发件人邮箱") parser.add_argument("--to", dest="to_addr", help="收件人邮箱") parser.add_argument("--subject", help="主题关键词") parser.add_argument("--since", help="起始日期 (YYYY-MM-DD)") parser.add_argument("--before", help="结束日期 (YYYY-MM-DD)") parser.add_argument("--unread", action="store_true", help="仅未读邮件") parser.add_argument("--limit", type=int, default=50, help="返回数量限制") parser.add_argument("--output", help="输出文件路径") parser.add_argument("--email", help="邮箱地址") parser.add_argument("--auth-code", help="授权码") args = parser.parse_args() try: searcher = EmailSearcher(args.email, args.auth_code) with searcher: emails = searcher.search( folder=args.folder, query=args.query, from_addr=args.from_addr, to_addr=args.to_addr, subject=args.subject, since=args.since, before=args.before, unread_only=args.unread, limit=args.limit ) result = { "success": True, "folder": args.folder, "search_params": { "query": args.query, "from": args.from_addr, "subject": args.subject, "since": args.since, "before": args.before, "unread": args.unread }, "count": len(emails), "emails": emails } output = json.dumps(result, ensure_ascii=False, indent=2) if args.output: with open(args.output, 'w', encoding='utf-8') as f: f.write(output) print(f"结果已保存到: {args.output}") else: print(output) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main() FILE:scripts/send_email.py #!/usr/bin/env python3 """ QQ 邮箱发送工具 支持发送纯文本、HTML 邮件,支持附件 """ import os import json import argparse import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.header import Header from email import encoders from pathlib import Path class QQEmailSender: """QQ 邮箱 SMTP 发送器""" SMTP_SERVER = "smtp.qq.com" SMTP_PORT = 465 def __init__(self, email: str = None, auth_code: str = None): self.email = email or os.getenv("QQ_EMAIL") self.auth_code = auth_code or os.getenv("QQ_EMAIL_AUTH_CODE") if not self.email or not self.auth_code: raise ValueError("请设置 QQ_EMAIL 和 QQ_EMAIL_AUTH_CODE 环境变量") def send( self, to: str, subject: str, body: str = None, html: str = None, cc: str = None, attachments: list = None ) -> dict: """ 发送邮件 Args: to: 收件人(多个用逗号分隔) subject: 主题 body: 纯文本正文 html: HTML 正文(优先于 body) cc: 抄送(多个用逗号分隔) attachments: 附件路径列表 """ # 创建邮件 msg = MIMEMultipart() msg['From'] = self.email msg['To'] = to msg['Subject'] = Header(subject, 'utf-8') if cc: msg['Cc'] = cc # 添加正文 if html: msg.attach(MIMEText(html, 'html', 'utf-8')) elif body: msg.attach(MIMEText(body, 'plain', 'utf-8')) else: msg.attach(MIMEText("", 'plain', 'utf-8')) # 添加附件 if attachments: for filepath in attachments: filepath = filepath.strip() if not os.path.exists(filepath): print(f"警告:附件不存在: {filepath}") continue filename = os.path.basename(filepath) with open(filepath, 'rb') as f: attachment = MIMEBase('application', 'octet-stream') attachment.set_payload(f.read()) encoders.encode_base64(attachment) attachment.add_header( 'Content-Disposition', f'attachment; filename="{filename}"' ) msg.attach(attachment) # 构建收件人列表 recipients = to.split(',') if cc: recipients.extend(cc.split(',')) # 发送邮件 with smtplib.SMTP_SSL(self.SMTP_SERVER, self.SMTP_PORT) as server: server.login(self.email, self.auth_code) server.sendmail(self.email, recipients, msg.as_string()) return { "success": True, "to": to, "subject": subject, "attachments_count": len(attachments) if attachments else 0 } def main(): parser = argparse.ArgumentParser(description="发送 QQ 邮箱邮件") parser.add_argument("--to", required=True, help="收件人邮箱(多个用逗号分隔)") parser.add_argument("--subject", required=True, help="邮件主题") parser.add_argument("--body", help="纯文本正文") parser.add_argument("--html", help="HTML 正文(优先于 --body)") parser.add_argument("--cc", help="抄送邮箱(多个用逗号分隔)") parser.add_argument("--attachments", help="附件路径(多个用逗号分隔)") parser.add_argument("--email", help="发件人邮箱(覆盖环境变量)") parser.add_argument("--auth-code", help="授权码(覆盖环境变量)") args = parser.parse_args() # 检查参数 if not args.body and not args.html: print("错误:请提供 --body 或 --html 参数") exit(1) # 解析附件 attachments = None if args.attachments: attachments = [a.strip() for a in args.attachments.split(',')] try: sender = QQEmailSender(args.email, args.auth_code) result = sender.send( to=args.to, subject=args.subject, body=args.body, html=args.html, cc=args.cc, attachments=attachments ) print(json.dumps(result, ensure_ascii=False, indent=2)) except Exception as e: error_result = { "success": False, "error": str(e) } print(json.dumps(error_result, ensure_ascii=False, indent=2)) exit(1) if __name__ == "__main__": main()
阿里云 OSS 对象存储技能。支持文件上传、下载、列出、删除、获取 URL 等操作。 两层架构:Node.js SDK(优先)→ ossutil CLI。
---
name: aliyun-oss
description: |
阿里云 OSS 对象存储技能。支持文件上传、下载、列出、删除、获取 URL 等操作。
两层架构:Node.js SDK(优先)→ ossutil CLI。
metadata:
{
"openclaw":
{
"emoji": "☁️",
"requires": { "bins": ["node"], "packages": ["ali-oss"] },
"install":
[
{ "id": "ali-oss-sdk", "kind": "node", "package": "ali-oss", "label": "Install ali-oss Node.js SDK" },
],
},
}
---
# ☁️ 阿里云 OSS 技能
通过 **Node.js SDK** / **ossutil CLI** 管理阿里云对象存储。
---
## 🎯 执行策略(两层降级)
| 优先级 | 工具 | 使用场景 |
|--------|------|----------|
| **1** | Node.js SDK (`ali-oss`) | 优先使用 |
| **2** | ossutil CLI | 备选 |
---
## 🚀 快速开始
### 1. 安装依赖
```bash
# 进入技能目录
cd ~/.openclaw/workspace/skills/aliyun-oss-skill
# 运行自动安装
bash scripts/setup.sh
```
### 2. 配置凭证
```bash
bash scripts/setup.sh \
--access-key-id "YOUR_ACCESS_KEY_ID" \
--access-key-secret "YOUR_ACCESS_KEY_SECRET" \
--region "oss-cn-hangzhou" \
--bucket "mybucket"
```
### 3. 测试连接
```bash
node scripts/oss_node.mjs test-connection
```
---
## 📋 使用示例
### 上传文件
```bash
node scripts/oss_node.mjs upload \
--local "/path/to/file.txt" \
--key "uploads/file.txt"
```
### 列出文件
```bash
node scripts/oss_node.mjs list --prefix "uploads/" --limit 100
```
### 下载文件
```bash
node scripts/oss_node.mjs download \
--key "uploads/file.txt" \
--local "/path/to/save.txt"
```
### 删除文件
```bash
node scripts/oss_node.mjs delete --key "uploads/file.txt" --force
```
### 获取文件 URL
```bash
# 公开空间
node scripts/oss_node.mjs url --key "uploads/file.txt"
# 私有空间(1小时有效)
node scripts/oss_node.mjs url --key "uploads/file.txt" --private --expires 3600
```
---
## 🔧 Node.js SDK API
| 命令 | 说明 |
|------|------|
| `upload --local <path> --key <key>` | 上传文件 |
| `download --key <key> --local <path>` | 下载文件 |
| `list [--prefix <p>] [--limit <n>]` | 列出文件 |
| `delete --key <key> [--force]` | 删除文件 |
| `url --key <key> [--private] [--expires <s>]` | 获取 URL |
| `stat --key <key>` | 文件信息 |
| `move --src-key <a> --dest-key <b>` | 移动文件 |
| `copy --src-key <a> --dest-key <b>` | 复制文件 |
| `test-connection` | 测试连接 |
---
## ⚙️ 配置文件
**config/oss-config.json**
```json
{
"accessKeyId": "YOUR_ACCESS_KEY_ID",
"accessKeySecret": "YOUR_ACCESS_KEY_SECRET",
"bucket": "mybucket",
"region": "oss-cn-hangzhou",
"domain": "https://cdn.example.com"
}
```
**常用区域**:
- `oss-cn-hangzhou` - 华东1(杭州)
- `oss-cn-shanghai` - 华东2(上海)
- `oss-cn-beijing` - 华北2(北京)
- `oss-cn-shenzhen` - 华南1(深圳)
---
## 🐛 故障排查
| 问题 | 解决 |
|------|------|
| `Cannot find module 'ali-oss'` | `npm install ali-oss` |
| `403 Forbidden` | 检查 AccessKey 权限 |
| `连接超时` | 检查区域代码和网络 |
---
## 📚 相关链接
- [阿里云 OSS Node.js SDK](https://help.aliyun.com/document_detail/32068.html)
- [ossutil 工具](https://help.aliyun.com/document_detail/120075.html)
---
## 📄 许可证
MIT License
FILE:README.md
# ☁️ 阿里云 OSS 技能
OpenClaw 技能,用于管理阿里云对象存储(OSS)。
## ✨ 功能
- 📤 上传文件
- 📥 下载文件
- 📋 列出文件
- 🗑️ 删除文件
- 🔗 获取文件 URL(支持私有空间签名)
- 📊 查看文件信息
- 📁 移动/复制文件
## 🚀 快速开始
```bash
# 安装依赖
npm install
# 配置凭证
bash scripts/setup.sh --access-key-id "xxx" --access-key-secret "xxx" --region "oss-cn-hangzhou" --bucket "mybucket"
# 测试连接
node scripts/oss_node.mjs test-connection
```
## 📖 使用示例
```bash
# 上传
node scripts/oss_node.mjs upload --local file.txt --key uploads/file.txt
# 列出
node scripts/oss_node.mjs list --prefix uploads/
# 下载
node scripts/oss_node.mjs download --key uploads/file.txt --local file.txt
# 删除
node scripts/oss_node.mjs delete --key uploads/file.txt --force
# 获取 URL
node scripts/oss_node.mjs url --key uploads/file.txt
```
## 🔧 架构
两层降级策略:
1. **Node.js SDK** (`ali-oss`) - 优先使用
2. **ossutil CLI** - 备选
## 📄 许可证
MIT
FILE:_meta.json
{
"name": "aliyun-oss",
"version": "1.0.0",
"description": "阿里云 OSS 对象存储技能",
"author": "33",
"created": "2026-03-06",
"tags": ["aliyun", "oss", "cloud", "storage"],
"requirements": {
"bins": ["node", "npm"],
"packages": ["ali-oss"]
},
"config": {
"configFile": "config/oss-config.json"
}
}
FILE:config/oss-config.example.json
{
"accessKeyId": "你的AccessKey ID",
"accessKeySecret": "你的AccessKey Secret",
"bucket": "你的存储桶名称",
"region": "oss-cn-hangzhou",
"domain": "https://你的域名.com",
"options": {
"secure": true,
"timeout": 60000,
"upload_threshold": 1048576,
"chunk_size": 1048576,
"retry_times": 3
}
}
FILE:docs/EXAMPLES.md
# 阿里云 OSS 技能 - 使用示例
本文档包含阿里云 OSS 技能的各种使用示例。
---
## 📤 示例1:上传文件
### 在 OpenClaw 中
**用户说:**
```
帮我上传 /backups/daily-20260301.tar.gz 到阿里云 OSS
```
**AI 执行:**
```
使用 Node.js SDK 脚本上传
```
**返回:**
```
✅ 上传成功!
文件:daily-20260301.tar.gz
大小:15.2 MB
URL:https://bucket.oss-cn-hangzhou.aliyuncs.com/backups/daily-20260301.tar.gz
```
### 命令行方式
```bash
node scripts/oss_node.mjs upload \
--local /backups/daily-20260301.tar.gz \
--key backups/daily-20260301.tar.gz
```
---
## 📋 示例2:列出文件
### 在 OpenClaw 中
**用户说:**
```
列出阿里云 OSS backups 目录下的所有文件
```
**返回:**
```
📋 共找到 15 个文件:
backups/backup-20260301.tar.gz - 15.2 MB - 2026-03-01 20:00
backups/backup-20260228.tar.gz - 14.8 MB - 2026-02-28 20:00
backups/backup-20260227.tar.gz - 16.1 MB - 2026-02-27 20:00
...
```
### 命令行方式
```bash
# 列出所有文件
node scripts/oss_node.mjs list
# 列出指定前缀的文件
node scripts/oss_node.mjs list --prefix backups/ --limit 50
# JSON 格式输出
node scripts/oss_node.mjs list --prefix backups/ --format json
```
---
## 🔗 示例3:获取文件 URL
### 在 OpenClaw 中
**用户说:**
```
给我 images/photo.jpg 的访问链接,1小时有效
```
**返回:**
```
🔗 临时访问链接(1小时有效):
https://bucket.oss-cn-hangzhou.aliyuncs.com/images/photo.jpg?OSSAccessKeyId=...&Expires=...&Signature=...
```
### 命令行方式
```bash
# 公开 URL
node scripts/oss_node.mjs url --key images/photo.jpg
# 私有 URL(1小时有效)
node scripts/oss_node.mjs url --key documents/report.pdf --private --expires 3600
```
---
## 🗑️ 示例4:删除文件
### 在 OpenClaw 中
**用户说:**
```
删除阿里云 OSS 上的 old-backup.tar.gz
```
**AI 执行:**
```
1. 先确认:是否真的要删除?
2. 用户确认后执行删除
```
**返回:**
```
✅ 删除成功!
文件:old-backup.tar.gz
```
### 命令行方式
```bash
# 删除单个文件(需要确认)
node scripts/oss_node.mjs delete --key backups/old-backup.tar.gz
# 强制删除(不需要确认)
node scripts/oss_node.mjs delete --key backups/old-backup.tar.gz --force
# 批量删除
cat > delete-list.txt << EOF
backups/backup-20260101.tar.gz
backups/backup-20260102.tar.gz
EOF
node scripts/oss_node.mjs batch-delete --file delete-list.txt
```
---
## 📊 示例5:获取文件信息
### 在 OpenClaw 中
**用户说:**
```
查看 documents/report.pdf 的详细信息
```
**返回:**
```
📊 文件信息:
文件名:documents/report.pdf
大小:2.5 MB
类型:application/pdf
ETag:"abc123..."
修改时间:2026-03-01 15:30:00
```
### 命令行方式
```bash
node scripts/oss_node.mjs stat --key documents/report.pdf
```
---
## 📥 示例6:下载文件
### 在 OpenClaw 中
**用户说:**
```
从阿里云 OSS 下载 documents/report.pdf 到本地
```
**返回:**
```
✅ 下载成功!
文件:documents/report.pdf
大小:2.5 MB
保存到:/downloads/report.pdf
```
### 命令行方式
```bash
node scripts/oss_node.mjs download \
--key documents/report.pdf \
--local /downloads/report.pdf
```
---
## 🤖 示例7:自动化备份脚本
### 完整脚本
```bash
#!/bin/bash
# 每日备份脚本
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d)
BACKUP_FILE="backup-DATE.tar.gz"
# 1. 创建备份
echo "📦 创建备份..."
tar -czf "BACKUP_DIR/BACKUP_FILE" /data
# 2. 上传到阿里云 OSS
echo "📤 上传到阿里云 OSS..."
cd /home/node/.openclaw/workspace/skills/aliyun-oss-skill
node scripts/oss_node.mjs upload \
--local "BACKUP_DIR/BACKUP_FILE" \
--key "backups/BACKUP_FILE"
# 3. 清理本地旧备份(保留7天)
echo "🗑️ 清理旧备份..."
find BACKUP_DIR -name "backup-*.tar.gz" -mtime +7 -delete
# 4. 清理云端旧备份(保留30天)
echo "🗑️ 清理云端旧备份..."
node -e "
const { execSync } = require('child_process');
const cutoff = Date.now() - (30 * 24 * 60 * 60 * 1000);
const files = JSON.parse(execSync('node scripts/oss_node.mjs list --prefix \"backups/\" --format json'));
for (const file of files) {
if (file.mtime * 1000 < cutoff) {
execSync(\`node scripts/oss_node.mjs delete --key \file.key --force\`);
console.log(\`已删除: \file.key\`);
}
}
"
echo "✅ 备份完成!"
```
---
## 📝 示例8:批量操作
### 批量上传图片
```javascript
const { execSync } = require('child_process');
const fs = require('fs');
const path = require('path');
const imageDir = '/path/to/images';
let uploaded = 0;
fs.readdirSync(imageDir).forEach(filename => {
if (filename.endsWith('.jpg') || filename.endsWith('.png') || filename.endsWith('.gif')) {
const localPath = path.join(imageDir, filename);
const key = `images/filename`;
try {
execSync(`node scripts/oss_node.mjs upload --local "localPath" --key "key"`);
console.log(`✅ filename 上传成功`);
uploaded++;
} catch (error) {
console.error(`❌ filename 上传失败:`, error.message);
}
}
});
console.log(`\n✅ 共上传 uploaded 个文件`);
```
---
## 🎯 更多示例
查看 [SKILL.md](../SKILL.md) 获取更多详细信息和 API 文档。
FILE:package.json
{
"name": "aliyun-oss-skill",
"version": "1.0.0",
"description": "阿里云 OSS 对象存储技能",
"main": "scripts/oss_node.mjs",
"type": "module",
"scripts": {
"setup": "bash scripts/setup.sh",
"test": "node scripts/oss_node.mjs test-connection"
},
"dependencies": {
"ali-oss": "^6.21.0"
},
"keywords": ["aliyun", "oss", "cloud", "storage", "openclaw", "skill"],
"author": "33",
"license": "MIT"
}
FILE:scripts/oss_node.mjs
#!/usr/bin/env node
/**
* 阿里云 OSS Node.js SDK 脚本
* 功能:文件上传、下载、列出、删除、获取URL等操作
*/
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// 尝试加载 ali-oss SDK
import OSS from 'ali-oss';
// 配置文件路径
const SKILL_DIR = path.dirname(__dirname);
const CONFIG_DIR = path.join(SKILL_DIR, 'config');
const CONFIG_FILE = path.join(CONFIG_DIR, 'oss-config.json');
/**
* 加载配置文件
*/
function loadConfig() {
if (!fs.existsSync(CONFIG_FILE)) {
throw new Error(
`配置文件不存在: CONFIG_FILE\n` +
`请复制 config/oss-config.example.json 为 oss-config.json 并填写配置`
);
}
const config = JSON.parse(fs.readFileSync(CONFIG_FILE, 'utf-8'));
// 验证必填配置
const required = ['accessKeyId', 'accessKeySecret', 'bucket', 'region'];
for (const key of required) {
if (!config[key] || config[key].startsWith('你的')) {
throw new Error(`配置项 key 不能为空或使用示例值`);
}
}
return config;
}
/**
* 阿里云 OSS 操作类
*/
class AliyunOSS {
constructor(config) {
this.config = config;
this.client = new OSS({
region: config.region,
bucket: config.bucket,
accessKeyId: config.accessKeyId,
accessKeySecret: config.accessKeySecret,
secure: config.options?.secure !== false,
timeout: config.options?.timeout || 60000
});
this.bucket = config.bucket;
this.domain = config.domain || `https://config.bucket.config.region.aliyuncs.com`;
}
/**
* 上传文件
*/
async upload(localPath, key) {
if (!fs.existsSync(localPath)) {
throw new Error(`文件不存在: localPath`);
}
try {
const result = await this.client.put(key, localPath);
return {
success: true,
key: result.name,
url: this.getUrl(key),
size: fs.statSync(localPath).size,
etag: result.etag
};
} catch (error) {
throw new Error(`上传失败: error.message`);
}
}
/**
* 下载文件
*/
async download(key, localPath) {
try {
const result = await this.client.get(key, localPath);
return {
success: true,
key: key,
size: fs.statSync(localPath).size
};
} catch (error) {
throw new Error(`下载失败: error.message`);
}
}
/**
* 列出文件
*/
async listFiles(prefix = '', limit = 100) {
try {
const result = await this.client.list({
prefix: prefix,
'max-keys': limit
});
const files = (result.objects || []).map(item => ({
key: item.name,
size: item.size,
mtime: new Date(item.lastModified).getTime() / 1000,
etag: item.etag,
mimeType: item.type || 'application/octet-stream'
}));
return files;
} catch (error) {
throw new Error(`列出文件失败: error.message`);
}
}
/**
* 删除文件
*/
async delete(key) {
try {
await this.client.delete(key);
return {
success: true,
key: key
};
} catch (error) {
if (error.code === 'NoSuchKey') {
throw new Error(`文件不存在: key`);
}
throw new Error(`删除失败: error.message`);
}
}
/**
* 批量删除
*/
async batchDelete(keys) {
try {
const result = await this.client.deleteMulti(keys);
let success = 0;
let failed = 0;
if (result.deleted) {
success = result.deleted.length;
failed = keys.length - success;
}
return { success, failed };
} catch (error) {
throw new Error(`批量删除失败: error.message`);
}
}
/**
* 获取文件信息
*/
async stat(key) {
try {
const result = await this.client.head(key);
return {
key: key,
size: parseInt(result.res.headers['content-length']),
etag: result.res.headers.etag,
mimeType: result.res.headers['content-type'],
mtime: new Date(result.res.headers['last-modified']).getTime() / 1000
};
} catch (error) {
throw new Error(`获取文件信息失败: error.message`);
}
}
/**
* 获取文件 URL
*/
getUrl(key, isPrivate = false, expires = 3600) {
if (isPrivate) {
return this.client.signatureUrl(key, {
expires: expires
});
} else {
return `this.domain/key`;
}
}
/**
* 移动文件
*/
async move(srcKey, destKey) {
try {
await this.client.copy(destKey, srcKey);
await this.client.delete(srcKey);
return {
success: true,
srcKey: srcKey,
destKey: destKey
};
} catch (error) {
throw new Error(`移动失败: error.message`);
}
}
/**
* 复制文件
*/
async copy(srcKey, destKey) {
try {
await this.client.copy(destKey, srcKey);
return {
success: true,
srcKey: srcKey,
destKey: destKey
};
} catch (error) {
throw new Error(`复制失败: error.message`);
}
}
}
// 工具函数
function formatSize(bytes) {
const units = ['B', 'KB', 'MB', 'GB', 'TB'];
let size = bytes;
let unitIndex = 0;
while (size >= 1024 && unitIndex < units.length - 1) {
size /= 1024;
unitIndex++;
}
return `size.toFixed(2) units[unitIndex]`;
}
function formatTime(timestamp) {
const date = new Date(timestamp * 1000);
return date.toISOString().replace('T', ' ').substring(0, 19);
}
// 命令行接口
async function main() {
const args = process.argv.slice(2);
const command = args[0];
try {
const config = loadConfig();
const oss = new AliyunOSS(config);
switch (command) {
case 'upload': {
const localPath = args[args.indexOf('--local') + 1];
const key = args[args.indexOf('--key') + 1];
const result = await oss.upload(localPath, key);
console.log('✅ 上传成功!');
console.log(` 文件: result.key`);
console.log(` 大小: formatSize(result.size)`);
console.log(` URL: result.url`);
break;
}
case 'download': {
const key = args[args.indexOf('--key') + 1];
const localPath = args[args.indexOf('--local') + 1];
const result = await oss.download(key, localPath);
console.log('✅ 下载成功!');
console.log(` 文件: result.key`);
console.log(` 大小: formatSize(result.size)`);
console.log(` 保存到: localPath`);
break;
}
case 'list': {
const prefixIndex = args.indexOf('--prefix');
const prefix = prefixIndex !== -1 ? args[prefixIndex + 1] : '';
const limitIndex = args.indexOf('--limit');
const limit = limitIndex !== -1 ? parseInt(args[limitIndex + 1]) : 100;
const formatIndex = args.indexOf('--format');
const format = formatIndex !== -1 ? args[formatIndex + 1] : 'table';
const files = await oss.listFiles(prefix, limit);
if (files.length === 0) {
console.log('📭 没有找到文件');
return;
}
if (format === 'json') {
console.log(JSON.stringify(files, null, 2));
} else {
console.log(`📋 共 files.length 个文件:\n`);
console.log(`'文件名'.padEnd(50) '大小'.padStart(12) '修改时间'.padEnd(20)`);
console.log('-'.repeat(82));
files.forEach(file => {
console.log(`file.key.padEnd(50) formatSize(file.size).padStart(12) formatTime(file.mtime).padEnd(20)`);
});
}
break;
}
case 'delete': {
const key = args[args.indexOf('--key') + 1];
const forceIndex = args.indexOf('--force');
if (forceIndex === -1) {
console.log(`⚠️ 确定要删除 key 吗?(y/N):`);
// 注意:这里需要 readline,为简化暂时跳过确认
}
const result = await oss.delete(key);
console.log('✅ 删除成功!');
console.log(` 文件: result.key`);
break;
}
case 'batch-delete': {
const fileIndex = args.indexOf('--file');
const listFile = args[fileIndex + 1];
const keys = fs.readFileSync(listFile, 'utf-8')
.split('\n')
.map(line => line.trim())
.filter(line => line);
const result = await oss.batchDelete(keys);
console.log('✅ 批量删除完成!');
console.log(` 成功: result.success`);
console.log(` 失败: result.failed`);
break;
}
case 'url': {
const key = args[args.indexOf('--key') + 1];
const privateIndex = args.indexOf('--private');
const expiresIndex = args.indexOf('--expires');
const isPrivate = privateIndex !== -1;
const expires = expiresIndex !== -1 ? parseInt(args[expiresIndex + 1]) : 3600;
const url = oss.getUrl(key, isPrivate, expires);
console.log('🔗 文件 URL:');
console.log(` url`);
if (isPrivate) {
console.log(`\n ⏱️ 有效期: expires 秒`);
}
break;
}
case 'stat': {
const key = args[args.indexOf('--key') + 1];
const info = await oss.stat(key);
console.log('📊 文件信息:\n');
console.log(` 文件名: info.key`);
console.log(` 大小: formatSize(info.size)`);
console.log(` 类型: info.mimeType`);
console.log(` ETag: info.etag`);
console.log(` 修改时间: formatTime(info.mtime)`);
break;
}
case 'test-connection': {
await oss.listFiles('', 1);
console.log('✅ 阿里云 OSS 连接验证成功!');
break;
}
default:
console.log('使用方法:');
console.log(' node oss_node.mjs upload --local <LocalPath> --key <Key>');
console.log(' node oss_node.mjs download --key <Key> --local <LocalPath>');
console.log(' node oss_node.mjs list [--prefix <Prefix>] [--limit <Limit>]');
console.log(' node oss_node.mjs delete --key <Key> [--force]');
console.log(' node oss_node.mjs url --key <Key> [--private] [--expires <Seconds>]');
console.log(' node oss_node.mjs stat --key <Key>');
console.log(' node oss_node.mjs test-connection');
}
} catch (error) {
console.error(`❌ 错误: error.message`);
process.exit(1);
}
}
main();
FILE:scripts/setup.sh
#!/bin/bash
#######################################
# 阿里云 OSS 技能 - 自动安装配置脚本
# 参考:qiniu-kodo 技能
#######################################
set -e
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 默认值
CHECK_ONLY=false
INSTALL_SDK=false
ACCESS_KEY_ID=""
ACCESS_KEY_SECRET=""
REGION=""
BUCKET=""
DOMAIN=""
# 脚本目录
SCRIPT_DIR="$(cd "$(dirname "BASH_SOURCE[0]")" && pwd)"
BASE_DIR="$(dirname "$SCRIPT_DIR")"
CONFIG_DIR="$BASE_DIR/config"
CONFIG_FILE="$CONFIG_DIR/oss-config.json"
# 日志函数
log_info() {
echo -e "GREEN[INFO]NC $1"
}
log_warn() {
echo -e "YELLOW[WARN]NC $1"
}
log_error() {
echo -e "RED[ERROR]NC $1"
}
# 解析参数
parse_args() {
while [[ $# -gt 0 ]]; do
case $1 in
--check-only)
CHECK_ONLY=true
shift
;;
--install-sdk)
INSTALL_SDK=true
shift
;;
--access-key-id)
ACCESS_KEY_ID="$2"
shift 2
;;
--access-key-secret)
ACCESS_KEY_SECRET="$2"
shift 2
;;
--region)
REGION="$2"
shift 2
;;
--bucket)
BUCKET="$2"
shift 2
;;
--domain)
DOMAIN="$2"
shift 2
;;
*)
log_error "未知参数: $1"
exit 1
;;
esac
done
}
# 检查命令是否存在
command_exists() {
command -v "$1" >/dev/null 2>&1
}
# 检查 Node.js 环境
check_nodejs() {
log_info "检查 Node.js 环境..."
if command_exists node; then
NODE_VERSION=$(node --version 2>&1)
log_info "✅ Node.js 已安装: $NODE_VERSION"
return 0
else
log_error "❌ Node.js 未安装"
return 1
fi
}
# 检查 npm
check_npm() {
log_info "检查 npm..."
if command_exists npm; then
NPM_VERSION=$(npm --version 2>&1)
log_info "✅ npm 已安装: $NPM_VERSION"
return 0
else
log_error "❌ npm 未安装"
return 1
fi
}
# 检查 ali-oss Node.js SDK
check_sdk() {
log_info "检查 ali-oss Node.js SDK..."
# 检查技能目录下的 node_modules
if [ -d "$BASE_DIR/node_modules/ali-oss" ]; then
SDK_VERSION=$(node -p "require('$BASE_DIR/node_modules/ali-oss/package.json').version" 2>/dev/null || echo "unknown")
log_info "✅ ali-oss Node.js SDK 已安装: $SDK_VERSION"
return 0
else
log_warn "⚠️ ali-oss Node.js SDK 未安装"
return 1
fi
}
# 检查 ossutil 命令行工具
check_ossutil() {
log_info "检查 ossutil..."
if command_exists ossutil; then
log_info "✅ ossutil 已安装"
return 0
else
log_warn "⚠️ ossutil 未安装"
return 1
fi
}
# 检查配置文件
check_config() {
log_info "检查配置文件..."
if [ -f "$CONFIG_FILE" ]; then
log_info "✅ 配置文件已存在: $CONFIG_FILE"
# 验证配置(使用 node 检查 JSON)
if node -e "require('$CONFIG_FILE')" 2>/dev/null; then
log_info "✅ 配置文件格式正确"
return 0
else
log_error "❌ 配置文件格式错误"
return 1
fi
else
log_warn "⚠️ 配置文件不存在"
return 1
fi
}
# 检查凭证是否已配置
check_credentials() {
log_info "检查凭证配置..."
if [ -f "$CONFIG_FILE" ]; then
ACCESS_KEY_ID_CHECK=$(node -p "require('$CONFIG_FILE').accessKeyId || ''" 2>/dev/null)
if [ -n "$ACCESS_KEY_ID_CHECK" ] && [ "$ACCESS_KEY_ID_CHECK" != "你的AccessKey ID" ]; then
log_info "✅ 凭证已配置"
return 0
else
log_warn "⚠️ 凭证未配置或使用示例值"
return 1
fi
else
log_warn "⚠️ 配置文件不存在"
return 1
fi
}
# 安装 ali-oss Node.js SDK
install_sdk() {
log_info "安装 ali-oss Node.js SDK..."
cd "$BASE_DIR"
# 初始化 package.json(如果不存在)
if [ ! -f "package.json" ]; then
npm init -y > /dev/null 2>&1
fi
# 安装 ali-oss SDK
if npm install ali-oss --save 2>/dev/null; then
log_info "✅ ali-oss Node.js SDK 安装成功"
return 0
else
log_error "❌ ali-oss Node.js SDK 安装失败"
return 1
fi
}
# 创建配置文件
create_config() {
log_info "创建配置文件..."
mkdir -p "$CONFIG_DIR"
cat > "$CONFIG_FILE" <<EOF
{
"accessKeyId": "$ACCESS_KEY_ID",
"accessKeySecret": "$ACCESS_KEY_SECRET",
"bucket": "$BUCKET",
"region": "$REGION",
"domain": "$DOMAIN",
"options": {
"secure": true,
"timeout": 60000,
"upload_threshold": 1048576,
"chunk_size": 1048576,
"retry_times": 3
}
}
EOF
# 设置权限
chmod 600 "$CONFIG_FILE"
log_info "✅ 配置文件已创建: $CONFIG_FILE"
}
# 配置 shell 环境
configure_shell() {
log_info "配置 shell 环境..."
local SHELL_RC=""
if [ -n "$ZSH_VERSION" ]; then
SHELL_RC="$HOME/.zshrc"
else
SHELL_RC="$HOME/.bashrc"
fi
# 添加环境变量
if ! grep -q "ALIYUN_ACCESS_KEY_ID" "$SHELL_RC" 2>/dev/null; then
cat >> "$SHELL_RC" <<EOF
# 阿里云 OSS 配置
export ALIYUN_ACCESS_KEY_ID="$ACCESS_KEY_ID"
export ALIYUN_ACCESS_KEY_SECRET="$ACCESS_KEY_SECRET"
export ALIYUN_BUCKET="$BUCKET"
export ALIYUN_REGION="$REGION"
EOF
log_info "✅ Shell 环境已配置: $SHELL_RC"
else
log_info "✅ Shell 环境已存在配置"
fi
}
# 验证连接
verify_connection() {
log_info "验证阿里云 OSS 连接..."
if node "$SCRIPT_DIR/oss_node.mjs" test-connection 2>/dev/null; then
log_info "✅ 阿里云 OSS 连接验证成功"
return 0
else
log_error "❌ 阿里云 OSS 连接验证失败"
return 1
fi
}
# 环境检查
check_environment() {
log_info "=========================================="
log_info " 阿里云 OSS 环境检查"
log_info "=========================================="
echo ""
local ALL_OK=true
check_nodejs || ALL_OK=false
check_npm || ALL_OK=false
check_sdk || ALL_OK=false
check_ossutil || true # 可选
check_config || true
check_credentials || true
echo ""
log_info "=========================================="
if $ALL_OK; then
log_info "✅ 环境检查通过"
return 0
else
log_warn "⚠️ 环境检查未完全通过"
return 1
fi
}
# 完整安装
full_setup() {
log_info "=========================================="
log_info " 阿里云 OSS 自动安装"
log_info "=========================================="
echo ""
# 检查必需项
if ! check_nodejs; then
log_error "Node.js 是必需的"
exit 1
fi
if ! check_npm; then
log_error "npm 是必需的"
exit 1
fi
# 安装 SDK
if ! check_sdk; then
install_sdk
fi
# 创建配置
if [ -n "$ACCESS_KEY_ID" ] && [ -n "$ACCESS_KEY_SECRET" ]; then
create_config
configure_shell
verify_connection
fi
echo ""
log_info "=========================================="
log_info "✅ 安装完成!"
log_info "=========================================="
}
# 主函数
main() {
parse_args "$@"
if $CHECK_ONLY; then
check_environment
elif $INSTALL_SDK; then
install_sdk
elif [ -n "$ACCESS_KEY_ID" ]; then
full_setup
else
check_environment
echo ""
log_info "使用方法:"
log_info " 检查环境: $0 --check-only"
log_info " 完整安装: $0 --access-key-id <ID> --access-key-secret <SECRET> --region <REGION> --bucket <BUCKET>"
log_info " 安装 SDK: $0 --install-sdk"
fi
}
main "$@"
七牛云 KODO 对象存储技能。支持文件上传、下载、列出、删除、获取 URL 等操作。 三层架构:MCP 工具(优先)→ Node.js SDK → qshell CLI。
---
name: qiniu-kodo
description: |
七牛云 KODO 对象存储技能。支持文件上传、下载、列出、删除、获取 URL 等操作。
三层架构:MCP 工具(优先)→ Node.js SDK → qshell CLI。
metadata:
{
"openclaw":
{
"emoji": "☁️",
"requires": { "bins": ["node", "python3", "pip3"], "packages": ["qiniu", "qiniu-mcp-server"] },
"install":
[
{ "id": "qiniu-sdk", "kind": "node", "package": "qiniu", "label": "Install qiniu Node.js SDK" },
{ "id": "qiniu-mcp", "kind": "python", "package": "qiniu-mcp-server", "label": "Install qiniu-mcp-server" },
],
},
}
---
# ☁️ 七牛云 KODO 技能
通过 **MCP 工具** / **Node.js SDK** / **qshell CLI** 管理七牛云对象存储。
---
## 🎯 执行策略(三层降级)
| 优先级 | 工具 | 使用场景 |
|--------|------|----------|
| **1** | `qiniu-mcp-server` | 功能最全,优先使用 |
| **2** | Node.js SDK | MCP 不可用时降级 |
| **3** | qshell CLI | 最后备选 |
---
## 🚀 快速开始
### 1. 安装依赖
```bash
# 进入技能目录
cd ~/.openclaw/workspace/skills/qiniu-kodo
# 运行自动安装
bash scripts/setup.sh
```
### 2. 配置凭证
```bash
bash scripts/setup.sh \
--access-key "YOUR_ACCESS_KEY" \
--secret-key "YOUR_SECRET_KEY" \
--region "z0" \
--bucket "mybucket"
```
### 3. 测试连接
```bash
node scripts/qiniu_node.mjs test-connection
```
---
## 📋 使用示例
### 上传文件
```bash
node scripts/qiniu_node.mjs upload \
--local "/path/to/file.txt" \
--key "uploads/file.txt"
```
### 列出文件
```bash
node scripts/qiniu_node.mjs list --prefix "uploads/" --limit 100
```
### 下载文件
```bash
node scripts/qiniu_node.mjs download \
--key "uploads/file.txt" \
--local "/path/to/save.txt"
```
### 删除文件
```bash
node scripts/qiniu_node.mjs delete --key "uploads/file.txt" --force
```
### 获取文件 URL
```bash
# 公开空间
node scripts/qiniu_node.mjs url --key "uploads/file.txt"
# 私有空间(1小时有效)
node scripts/qiniu_node.mjs url --key "uploads/file.txt" --private --expires 3600
```
---
## 🔧 Node.js SDK API
| 命令 | 说明 |
|------|------|
| `upload --local <path> --key <key>` | 上传文件 |
| `download --key <key> --local <path>` | 下载文件 |
| `list [--prefix <p>] [--limit <n>]` | 列出文件 |
| `delete --key <key> [--force]` | 删除文件 |
| `url --key <key> [--private] [--expires <s>]` | 获取 URL |
| `stat --key <key>` | 文件信息 |
| `move --src-key <a> --dest-key <b>` | 移动文件 |
| `copy --src-key <a> --dest-key <b>` | 复制文件 |
| `test-connection` | 测试连接 |
---
## ⚙️ 配置文件
**config/qiniu-config.json**
```json
{
"accessKey": "YOUR_ACCESS_KEY",
"secretKey": "YOUR_SECRET_KEY",
"bucket": "mybucket",
"region": "z0",
"domain": "https://cdn.example.com"
}
```
**区域代码**:
- `z0` - 华东(杭州)
- `z1` - 华北(河北)
- `z2` - 华南(广州)
- `na0` - 北美(洛杉矶)
- `as0` - 东南亚(新加坡)
---
## 🐛 故障排查
| 问题 | 解决 |
|------|------|
| `Cannot find module 'qiniu'` | `npm install qiniu` |
| `401 Unauthorized` | 检查 AccessKey/SecretKey |
| `连接超时` | 检查区域代码和网络 |
---
## 📚 相关链接
- [七牛云 MCP Server](https://github.com/qiniu/qiniu-mcp-server)
- [七牛云 Node.js SDK](https://developer.qiniu.com/kodo/sdk/1289/nodejs)
- [qshell 工具](https://developer.qiniu.com/kodo/tools/1302/qshell)
---
## 📄 许可证
MIT License
FILE:README.md
# ☁️ 七牛云 KODO 技能
OpenClaw 技能,用于管理七牛云对象存储(KODO)。
## ✨ 功能
- 📤 上传文件
- 📥 下载文件
- 📋 列出文件
- 🗑️ 删除文件
- 🔗 获取文件 URL(支持私有空间签名)
- 📊 查看文件信息
- 📁 移动/复制文件
## 🚀 快速开始
```bash
# 安装依赖
npm install
# 配置凭证
bash scripts/setup.sh --access-key "xxx" --secret-key "xxx" --region "z0" --bucket "mybucket"
# 测试连接
node scripts/qiniu_node.mjs test-connection
```
## 📖 使用示例
```bash
# 上传
node scripts/qiniu_node.mjs upload --local file.txt --key uploads/file.txt
# 列出
node scripts/qiniu_node.mjs list --prefix uploads/
# 下载
node scripts/qiniu_node.mjs download --key uploads/file.txt --local file.txt
# 删除
node scripts/qiniu_node.mjs delete --key uploads/file.txt --force
# 获取 URL
node scripts/qiniu_node.mjs url --key uploads/file.txt
```
## 🔧 架构
三层降级策略:
1. **MCP 工具** (`qiniu-mcp-server`) - 功能最全
2. **Node.js SDK** (`qiniu`) - 稳定可靠
3. **qshell CLI** - 官方命令行
## 📄 许可证
MIT
FILE:_meta.json
{
"name": "qiniu-kodo",
"version": "1.0.0",
"description": "七牛云 KODO 对象存储技能",
"author": "33",
"created": "2026-03-06",
"tags": ["qiniu", "kodo", "oss", "cloud", "storage"],
"requirements": {
"bins": ["node", "npm"],
"packages": ["qiniu"]
},
"config": {
"configFile": "config/qiniu-config.json"
}
}
FILE:config/qiniu-config.example.json
{
"accessKey": "你的AccessKey",
"secretKey": "你的SecretKey",
"bucket": "你的存储桶名称",
"region": "z0",
"domain": "http://你的域名.com",
"options": {
"use_https": true,
"use_cdn": true,
"timeout": 30,
"upload_threshold": 4194304,
"chunk_size": 4194304,
"retry_times": 3
}
}
FILE:docs/EXAMPLES.md
# 七牛云 KODO 技能 - 使用示例
本文档包含七牛云 KODO 技能的各种使用示例。
---
## 📤 示例1:上传文件
### 在 OpenClaw 中
**用户说:**
```
帮我上传 /backups/daily-20260301.tar.gz 到七牛云
```
**AI 执行:**
```
优先级 1:尝试使用 qiniu-mcp MCP 工具
优先级 2:如果 MCP 不可用,使用 Python SDK
优先级 3:如果 SDK 也不可用,使用 qshell
```
**返回:**
```
✅ 上传成功!
文件:daily-20260301.tar.gz
大小:15.2 MB
URL:http://your-domain.com/backups/daily-20260301.tar.gz
```
### 命令行方式
```bash
node scripts/qiniu_node.mjs upload \
--local /backups/daily-20260301.tar.gz \
--key backups/daily-20260301.tar.gz
```
---
## 📋 示例2:列出文件
### 在 OpenClaw 中
**用户说:**
```
列出七牛云 backups 目录下的所有文件
```
**返回:**
```
📋 共找到 15 个文件:
backups/backup-20260301.tar.gz - 15.2 MB - 2026-03-01 20:00
backups/backup-20260228.tar.gz - 14.8 MB - 2026-02-28 20:00
backups/backup-20260227.tar.gz - 16.1 MB - 2026-02-27 20:00
...
```
### 命令行方式
```bash
# 列出所有文件
node scripts/qiniu_node.mjs list
# 列出指定前缀的文件
node scripts/qiniu_node.mjs list --prefix backups/ --limit 50
# JSON 格式输出
node scripts/qiniu_node.mjs list --prefix backups/ --format json
```
---
## 🔗 示例3:获取文件 URL
### 在 OpenClaw 中
**用户说:**
```
给我 images/photo.jpg 的访问链接,1小时有效
```
**返回:**
```
🔗 临时访问链接(1小时有效):
http://your-domain.com/images/photo.jpg?e=1583097600&token=...
```
### 命令行方式
```bash
# 公开空间 URL
node scripts/qiniu_node.mjs url --key images/photo.jpg
# 私有空间 URL(1小时有效)
node scripts/qiniu_node.mjs url --key documents/report.pdf --private --expires 3600
```
---
## 🗑️ 示例4:删除文件
### 在 OpenClaw 中
**用户说:**
```
删除七牛云上的 old-backup.tar.gz
```
**AI 执行:**
```
1. 先确认:是否真的要删除?
2. 用户确认后执行删除
```
**返回:**
```
✅ 删除成功!
文件:old-backup.tar.gz
```
### 命令行方式
```bash
# 删除单个文件(需要确认)
node scripts/qiniu_node.mjs delete --key backups/old-backup.tar.gz
# 强制删除(不需要确认)
node scripts/qiniu_node.mjs delete --key backups/old-backup.tar.gz --force
# 批量删除
cat > delete-list.txt << EOF
backups/backup-20260101.tar.gz
backups/backup-20260102.tar.gz
EOF
node scripts/qiniu_node.mjs batch-delete --file delete-list.txt
```
---
## 📊 示例5:获取文件信息
### 在 OpenClaw 中
**用户说:**
```
查看 documents/report.pdf 的详细信息
```
**返回:**
```
📊 文件信息:
文件名:documents/report.pdf
大小:2.5 MB
类型:application/pdf
Hash:FhGxK...(文件哈希值)
修改时间:2026-03-01 15:30:00
```
### 命令行方式
```bash
node scripts/qiniu_node.mjs stat --key documents/report.pdf
```
---
## 📥 示例6:下载文件
### 在 OpenClaw 中
**用户说:**
```
从七牛云下载 documents/report.pdf 到本地
```
**返回:**
```
✅ 下载成功!
文件:documents/report.pdf
大小:2.5 MB
保存到:/downloads/report.pdf
```
### 命令行方式
```bash
node scripts/qiniu_node.mjs download \
--key documents/report.pdf \
--local /downloads/report.pdf
```
---
## 🔄 示例7:移动和复制文件
### 在 OpenClaw 中
**用户说:**
```
把 temp/file.txt 移动到 archive/file.txt
```
**返回:**
```
✅ 文件已移动
原路径:temp/file.txt
新路径:archive/file.txt
```
### 命令行方式(需要扩展)
```bash
# 移动文件(需要扩展 Python 脚本)
# 复制文件(需要扩展 Python 脚本)
```
---
## 🖼️ 示例8:图片处理(需要 MCP)
### 在 OpenClaw 中
**用户说:**
```
把 images/photo.jpg 缩小到 800x600,加水印
```
**AI 执行:**
```
使用 qiniu-mcp MCP 工具进行图片处理
(需要 MCP 工具支持)
```
**返回:**
```
✅ 图片处理完成!
原图:http://your-domain.com/images/photo.jpg
处理后:http://your-domain.com/images/photo.jpg?imageView2/1/w/800/h/600|watermark/...
```
---
## 🤖 示例9:自动化备份脚本
### 完整脚本
```bash
#!/bin/bash
# 每日备份脚本
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d)
BACKUP_FILE="backup-DATE.tar.gz"
# 1. 创建备份
echo "📦 创建备份..."
tar -czf "BACKUP_DIR/BACKUP_FILE" /data
# 2. 上传到七牛云
echo "📤 上传到七牛云..."
cd /home/node/.openclaw/workspace/skills/qiniu-kodo
node scripts/qiniu_node.mjs upload \
--local "BACKUP_DIR/BACKUP_FILE" \
--key "backups/BACKUP_FILE"
# 3. 清理本地旧备份(保留7天)
echo "🗑️ 清理旧备份..."
find BACKUP_DIR -name "backup-*.tar.gz" -mtime +7 -delete
# 4. 清理云端旧备份(保留30天)
echo "🗑️ 清理云端旧备份..."
python -c "
from scripts.qiniu_python import QiniuKodo
from datetime import datetime, timedelta
kodo = QiniuKodo()
cutoff = datetime.now() - timedelta(days=30)
for file in kodo.list_files(prefix='backups/'):
if datetime.fromtimestamp(file['mtime']) < cutoff:
kodo.delete(file['key'])
print(f'已删除: {file[\"key\"]}')
"
echo "✅ 备份完成!"
```
---
## 📝 示例10:批量操作
### 批量上传图片
```python
from scripts.qiniu_python import QiniuKodo
import os
kodo = QiniuKodo()
image_dir = "/path/to/images"
uploaded = 0
for filename in os.listdir(image_dir):
if filename.endswith(('.jpg', '.png', '.gif')):
local_path = os.path.join(image_dir, filename)
key = f"images/{filename}"
try:
result = kodo.upload(local_path, key)
print(f"✅ {filename} -> {result['url']}")
uploaded += 1
except Exception as e:
print(f"❌ {filename} 上传失败: {e}")
print(f"\n✅ 共上传 {uploaded} 个文件")
```
---
## 🔧 高级用法
### 使用配置文件
```python
from scripts.qiniu_python import QiniuKodo
# 使用自定义配置文件
kodo = QiniuKodo(config_path='/path/to/custom-config.json')
# 上传文件
result = kodo.upload('/path/to/file.txt', 'uploads/file.txt')
```
### 错误处理
```python
from scripts.qiniu_python import QiniuKodo
kodo = QiniuKodo()
try:
result = kodo.upload('/path/to/file.txt', 'uploads/file.txt')
print(f"上传成功: {result['url']}")
except FileNotFoundError as e:
print(f"文件不存在: {e}")
except Exception as e:
print(f"上传失败: {e}")
```
---
## 🎯 更多示例
查看 [SKILL.md](../SKILL.md) 获取更多详细信息和 API 文档。
FILE:package.json
{
"name": "qiniu-kodo-skill",
"version": "1.0.0",
"description": "七牛云 KODO 对象存储技能",
"main": "scripts/qiniu_node.mjs",
"type": "module",
"scripts": {
"setup": "bash scripts/setup.sh",
"test": "node scripts/qiniu_node.mjs test-connection"
},
"dependencies": {
"qiniu": "^7.14.0"
},
"keywords": ["qiniu", "kodo", "oss", "openclaw", "skill"],
"author": "33",
"license": "MIT"
}
FILE:scripts/qiniu_node.mjs
#!/usr/bin/env node
/**
* 七牛云 KODO Node.js SDK 脚本
* 功能:文件上传、下载、列出、删除、获取URL等操作
*/
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import https from 'https';
import http from 'http';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
// 尝试加载 qiniu SDK
import qiniu from 'qiniu';
// 配置文件路径
const SKILL_DIR = path.dirname(__dirname);
const CONFIG_DIR = path.join(SKILL_DIR, 'config');
const CONFIG_FILE = path.join(CONFIG_DIR, 'qiniu-config.json');
/**
* 加载配置文件
*/
function loadConfig() {
if (!fs.existsSync(CONFIG_FILE)) {
throw new Error(
`配置文件不存在: CONFIG_FILE\n` +
`请复制 config/qiniu-config.example.json 为 qiniu-config.json 并填写配置`
);
}
const config = JSON.parse(fs.readFileSync(CONFIG_FILE, 'utf-8'));
// 验证必填配置
const required = ['accessKey', 'secretKey', 'bucket'];
for (const key of required) {
if (!config[key] || config[key].startsWith('你的')) {
throw new Error(`配置项 key 不能为空或使用示例值`);
}
}
return config;
}
/**
* 七牛云 KODO 操作类
*/
class QiniuKodo {
constructor(config) {
this.config = config;
this.mac = new qiniu.auth.digest.Mac(config.accessKey, config.secretKey);
this.bucketManager = new qiniu.rs.BucketManager(this.mac);
this.bucket = config.bucket;
this.domain = config.domain || '';
// 配置上传选项
this.uploadOptions = {
uphost: this.getUpHost(config.region)
};
}
/**
* 获取上传域名
*/
getUpHost(region) {
const hosts = {
'z0': 'https://upload.qiniup.com',
'z1': 'https://upload-z1.qiniup.com',
'z2': 'https://upload-z2.qiniup.com',
'na0': 'https://upload-na0.qiniup.com',
'as0': 'https://upload-as0.qiniup.com'
};
return hosts[region] || hosts['z0'];
}
/**
* 上传文件
*/
async upload(localPath, key) {
if (!fs.existsSync(localPath)) {
throw new Error(`文件不存在: localPath`);
}
return new Promise((resolve, reject) => {
const putPolicy = new qiniu.rs.PutPolicy({ scope: `this.bucket:key` });
const uploadToken = putPolicy.uploadToken(this.mac);
const config = new qiniu.conf.Config();
config.zone = qiniu.zone.Zone_z0; // 根据区域设置
const formUploader = new qiniu.form_up.FormUploader(config);
const putExtra = new qiniu.form_up.PutExtra();
formUploader.putFile(uploadToken, key, localPath, putExtra, (respErr, respBody, respInfo) => {
if (respErr) {
reject(respErr);
return;
}
if (respInfo.statusCode === 200) {
resolve({
success: true,
key: respBody.key,
hash: respBody.hash,
url: this.domain ? this.getUrl(key) : null,
size: fs.statSync(localPath).size
});
} else {
reject(new Error(`上传失败: respInfo.statusCode JSON.stringify(respBody)`));
}
});
});
}
/**
* 下载文件
*/
async download(key, localPath) {
const url = this.getUrl(key, true);
return new Promise((resolve, reject) => {
const protocol = url.startsWith('https') ? https : http;
protocol.get(url, (response) => {
if (response.statusCode !== 200) {
reject(new Error(`下载失败: response.statusCode`));
return;
}
// 创建目录
const dir = path.dirname(localPath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
const fileStream = fs.createWriteStream(localPath);
response.pipe(fileStream);
fileStream.on('finish', () => {
fileStream.close();
resolve({
success: true,
key: key,
size: fs.statSync(localPath).size
});
});
}).on('error', reject);
});
}
/**
* 列出文件
*/
async listFiles(prefix = '', limit = 100) {
return new Promise((resolve, reject) => {
this.bucketManager.listPrefix(this.bucket, {
prefix: prefix,
limit: limit
}, (err, respBody, respInfo) => {
if (err) {
reject(err);
return;
}
if (respInfo.statusCode === 200) {
const files = respBody.items.map(item => ({
key: item.key,
size: item.fsize,
mtime: item.putTime / 1000000,
hash: item.hash,
mimeType: item.mimeType
}));
resolve(files);
} else {
reject(new Error(`列出文件失败: respInfo.statusCode`));
}
});
});
}
/**
* 删除文件
*/
async delete(key) {
return new Promise((resolve, reject) => {
this.bucketManager.delete(this.bucket, key, (err, respBody, respInfo) => {
if (err) {
if (respInfo && respInfo.statusCode === 612) {
reject(new Error(`文件不存在: key`));
} else {
reject(err);
}
return;
}
if (respInfo.statusCode === 200) {
resolve({ success: true, key: key });
} else {
reject(new Error(`删除失败: respInfo.statusCode`));
}
});
});
}
/**
* 批量删除
*/
async batchDelete(keys) {
const deleteOperations = keys.map(key =>
qiniu.rs.deleteOp(this.bucket, key)
);
return new Promise((resolve, reject) => {
this.bucketManager.batch(deleteOperations, (err, respBody, respInfo) => {
if (err) {
reject(err);
return;
}
let success = 0;
let failed = 0;
respBody.forEach(item => {
if (item.code === 200) {
success++;
} else {
failed++;
}
});
resolve({ success, failed });
});
});
}
/**
* 获取文件信息
*/
async stat(key) {
return new Promise((resolve, reject) => {
this.bucketManager.stat(this.bucket, key, (err, respBody, respInfo) => {
if (err) {
reject(err);
return;
}
if (respInfo.statusCode === 200) {
resolve({
key: key,
size: respBody.fsize,
hash: respBody.hash,
mimeType: respBody.mimeType,
mtime: respBody.putTime / 1000000
});
} else {
reject(new Error(`获取文件信息失败: respInfo.statusCode`));
}
});
});
}
/**
* 获取文件 URL
*/
getUrl(key, isPrivate = false, expires = 3600) {
if (!this.domain) {
throw new Error('配置中缺少 domain 字段');
}
const baseUrl = `this.domain/key`;
if (isPrivate) {
const deadline = Math.floor(Date.now() / 1000) + expires;
const sign = qiniu.util.hmacSha1(baseUrl, this.mac.secretKey);
const encodedSign = qiniu.util.base64ToUrlSafe(sign);
const downloadToken = `this.mac.accessKey:encodedSign`;
return `baseUrl?e=deadline&token=downloadToken`;
} else {
return baseUrl;
}
}
/**
* 移动文件
*/
async move(srcKey, destKey, force = false) {
return new Promise((resolve, reject) => {
this.bucketManager.move(this.bucket, srcKey, this.bucket, destKey, { force }, (err, respBody, respInfo) => {
if (err) {
reject(err);
return;
}
if (respInfo.statusCode === 200) {
resolve({
success: true,
srcKey: srcKey,
destKey: destKey
});
} else {
reject(new Error(`移动失败: respInfo.statusCode`));
}
});
});
}
/**
* 复制文件
*/
async copy(srcKey, destKey, force = false) {
return new Promise((resolve, reject) => {
this.bucketManager.copy(this.bucket, srcKey, this.bucket, destKey, { force }, (err, respBody, respInfo) => {
if (err) {
reject(err);
return;
}
if (respInfo.statusCode === 200) {
resolve({
success: true,
srcKey: srcKey,
destKey: destKey
});
} else {
reject(new Error(`复制失败: respInfo.statusCode`));
}
});
});
}
}
// 工具函数
function formatSize(bytes) {
const units = ['B', 'KB', 'MB', 'GB', 'TB'];
let size = bytes;
let unitIndex = 0;
while (size >= 1024 && unitIndex < units.length - 1) {
size /= 1024;
unitIndex++;
}
return `size.toFixed(2) units[unitIndex]`;
}
function formatTime(timestamp) {
const date = new Date(timestamp * 1000);
return date.toISOString().replace('T', ' ').substring(0, 19);
}
// 命令行接口
async function main() {
const args = process.argv.slice(2);
const command = args[0];
try {
const config = loadConfig();
const kodo = new QiniuKodo(config);
switch (command) {
case 'upload': {
const localPath = args[args.indexOf('--local') + 1];
const key = args[args.indexOf('--key') + 1];
const result = await kodo.upload(localPath, key);
console.log('✅ 上传成功!');
console.log(` 文件: result.key`);
console.log(` 大小: formatSize(result.size)`);
console.log(` URL: result.url`);
break;
}
case 'download': {
const key = args[args.indexOf('--key') + 1];
const localPath = args[args.indexOf('--local') + 1];
const result = await kodo.download(key, localPath);
console.log('✅ 下载成功!');
console.log(` 文件: result.key`);
console.log(` 大小: formatSize(result.size)`);
console.log(` 保存到: localPath`);
break;
}
case 'list': {
const prefixIndex = args.indexOf('--prefix');
const prefix = prefixIndex !== -1 ? args[prefixIndex + 1] : '';
const limitIndex = args.indexOf('--limit');
const limit = limitIndex !== -1 ? parseInt(args[limitIndex + 1]) : 100;
const formatIndex = args.indexOf('--format');
const format = formatIndex !== -1 ? args[formatIndex + 1] : 'table';
const files = await kodo.listFiles(prefix, limit);
if (files.length === 0) {
console.log('📭 没有找到文件');
return;
}
if (format === 'json') {
console.log(JSON.stringify(files, null, 2));
} else {
console.log(`📋 共 files.length 个文件:\n`);
console.log(`'文件名'.padEnd(50) '大小'.padStart(12) '修改时间'.padEnd(20)`);
console.log('-'.repeat(82));
files.forEach(file => {
console.log(`file.key.padEnd(50) formatSize(file.size).padStart(12) formatTime(file.mtime).padEnd(20)`);
});
}
break;
}
case 'delete': {
const key = args[args.indexOf('--key') + 1];
const forceIndex = args.indexOf('--force');
if (forceIndex === -1) {
console.log(`⚠️ 确定要删除 key 吗?(y/N):`);
// 注意:这里需要 readline,为简化暂时跳过确认
}
const result = await kodo.delete(key);
console.log('✅ 删除成功!');
console.log(` 文件: result.key`);
break;
}
case 'batch-delete': {
const fileIndex = args.indexOf('--file');
const listFile = args[fileIndex + 1];
const keys = fs.readFileSync(listFile, 'utf-8')
.split('\n')
.map(line => line.trim())
.filter(line => line);
const result = await kodo.batchDelete(keys);
console.log('✅ 批量删除完成!');
console.log(` 成功: result.success`);
console.log(` 失败: result.failed`);
break;
}
case 'url': {
const key = args[args.indexOf('--key') + 1];
const privateIndex = args.indexOf('--private');
const expiresIndex = args.indexOf('--expires');
const isPrivate = privateIndex !== -1;
const expires = expiresIndex !== -1 ? parseInt(args[expiresIndex + 1]) : 3600;
const url = kodo.getUrl(key, isPrivate, expires);
console.log('🔗 文件 URL:');
console.log(` url`);
if (isPrivate) {
console.log(`\n ⏱️ 有效期: expires 秒`);
}
break;
}
case 'stat': {
const key = args[args.indexOf('--key') + 1];
const info = await kodo.stat(key);
console.log('📊 文件信息:\n');
console.log(` 文件名: info.key`);
console.log(` 大小: formatSize(info.size)`);
console.log(` 类型: info.mimeType`);
console.log(` Hash: info.hash`);
console.log(` 修改时间: formatTime(info.mtime)`);
break;
}
case 'test-connection': {
await kodo.listFiles('', 1);
console.log('✅ 七牛云连接验证成功!');
break;
}
default:
console.log('使用方法:');
console.log(' node qiniu_node.mjs upload --local <LocalPath> --key <Key>');
console.log(' node qiniu_node.mjs download --key <Key> --local <LocalPath>');
console.log(' node qiniu_node.mjs list [--prefix <Prefix>] [--limit <Limit>]');
console.log(' node qiniu_node.mjs delete --key <Key> [--force]');
console.log(' node qiniu_node.mjs url --key <Key> [--private] [--expires <Seconds>]');
console.log(' node qiniu_node.mjs stat --key <Key>');
console.log(' node qiniu_node.mjs test-connection');
}
} catch (error) {
console.error(`❌ 错误: error.message`);
process.exit(1);
}
}
main();
FILE:scripts/setup.sh
#!/bin/bash
#######################################
# 七牛云 KODO 技能 - 自动安装配置脚本
# 参考:Tencent Cloud COS 技能
#######################################
set -e
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 默认值
CHECK_ONLY=false
INSTALL_MCP=false
INSTALL_SDK=false
ACCESS_KEY=""
SECRET_KEY=""
REGION=""
BUCKET=""
DOMAIN=""
# 脚本目录
SCRIPT_DIR="$(cd "$(dirname "BASH_SOURCE[0]")" && pwd)"
BASE_DIR="$(dirname "$SCRIPT_DIR")"
CONFIG_DIR="$BASE_DIR/config"
CONFIG_FILE="$CONFIG_DIR/qiniu-config.json"
# 日志函数
log_info() {
echo -e "GREEN[INFO]NC $1"
}
log_warn() {
echo -e "YELLOW[WARN]NC $1"
}
log_error() {
echo -e "RED[ERROR]NC $1"
}
# 解析参数
parse_args() {
while [[ $# -gt 0 ]]; do
case $1 in
--check-only)
CHECK_ONLY=true
shift
;;
--install-mcp)
INSTALL_MCP=true
shift
;;
--install-sdk)
INSTALL_SDK=true
shift
;;
--access-key)
ACCESS_KEY="$2"
shift 2
;;
--secret-key)
SECRET_KEY="$2"
shift 2
;;
--region)
REGION="$2"
shift 2
;;
--bucket)
BUCKET="$2"
shift 2
;;
--domain)
DOMAIN="$2"
shift 2
;;
*)
log_error "未知参数: $1"
exit 1
;;
esac
done
}
# 检查命令是否存在
command_exists() {
command -v "$1" >/dev/null 2>&1
}
# 检查 Node.js 环境
check_nodejs() {
log_info "检查 Node.js 环境..."
if command_exists node; then
NODE_VERSION=$(node --version 2>&1)
log_info "✅ Node.js 已安装: $NODE_VERSION"
return 0
else
log_error "❌ Node.js 未安装"
return 1
fi
}
# 检查 npm
check_npm() {
log_info "检查 npm..."
if command_exists npm; then
NPM_VERSION=$(npm --version 2>&1)
log_info "✅ npm 已安装: $NPM_VERSION"
return 0
else
log_error "❌ npm 未安装"
return 1
fi
}
# 检查 qiniu-mcp 是否已安装
check_mcp() {
log_info "检查 qiniu-mcp..."
if command_exists qiniu-mcp || [ -f "$HOME/.mcporter/servers/qiniu-mcp" ]; then
log_info "✅ qiniu-mcp 已安装"
return 0
else
log_warn "⚠️ qiniu-mcp 未安装"
return 1
fi
}
# 检查 qiniu Node.js SDK
check_sdk() {
log_info "检查 qiniu Node.js SDK..."
# 检查技能目录下的 node_modules
if [ -d "$BASE_DIR/node_modules/qiniu" ]; then
SDK_VERSION=$(node -p "require('$BASE_DIR/node_modules/qiniu/package.json').version" 2>/dev/null || echo "unknown")
log_info "✅ qiniu Node.js SDK 已安装: $SDK_VERSION"
return 0
else
log_warn "⚠️ qiniu Node.js SDK 未安装"
return 1
fi
}
# 检查 qshell 命令行工具
check_qshell() {
log_info "检查 qshell..."
if command_exists qshell; then
QSHELL_VERSION=$(qshell version 2>&1 | head -1)
log_info "✅ qshell 已安装: $QSHELL_VERSION"
return 0
else
log_warn "⚠️ qshell 未安装"
return 1
fi
}
# 检查配置文件
check_config() {
log_info "检查配置文件..."
if [ -f "$CONFIG_FILE" ]; then
log_info "✅ 配置文件已存在: $CONFIG_FILE"
# 验证配置(使用 node 检查 JSON)
if node -e "require('$CONFIG_FILE')" 2>/dev/null; then
log_info "✅ 配置文件格式正确"
return 0
else
log_error "❌ 配置文件格式错误"
return 1
fi
else
log_warn "⚠️ 配置文件不存在"
return 1
fi
}
# 检查凭证是否已配置
check_credentials() {
log_info "检查凭证配置..."
if [ -f "$CONFIG_FILE" ]; then
ACCESS_KEY_CHECK=$(node -p "require('$CONFIG_FILE').accessKey || ''" 2>/dev/null)
if [ -n "$ACCESS_KEY_CHECK" ] && [ "$ACCESS_KEY_CHECK" != "你的AccessKey" ]; then
log_info "✅ 凭证已配置"
return 0
else
log_warn "⚠️ 凭证未配置或使用示例值"
return 1
fi
else
log_warn "⚠️ 配置文件不存在"
return 1
fi
}
# 安装 qiniu Node.js SDK
install_sdk() {
log_info "安装 qiniu Node.js SDK..."
cd "$BASE_DIR"
# 初始化 package.json(如果不存在)
if [ ! -f "package.json" ]; then
npm init -y > /dev/null 2>&1
fi
# 安装 qiniu SDK
if npm install qiniu --save 2>/dev/null; then
log_info "✅ qiniu Node.js SDK 安装成功"
return 0
else
log_error "❌ qiniu Node.js SDK 安装失败"
return 1
fi
}
# 安装 qiniu-mcp
install_mcp() {
log_info "安装 qiniu-mcp..."
# 检查 Node.js
if ! check_nodejs; then
log_error "需要先安装 Node.js"
return 1
fi
# 检查 npm
if ! check_npm; then
log_error "需要先安装 npm"
return 1
fi
# 安装 mcporter
if ! command_exists mcporter; then
log_info "安装 mcporter..."
npm install -g @openclaw/mcporter
fi
# 安装 qiniu-mcp-server
if npm install -g @qiniu/qiniu-mcp-server 2>/dev/null; then
log_info "✅ qiniu-mcp 安装成功"
# 配置 mcporter
if [ -d "$HOME/.mcporter" ]; then
cat > "$HOME/.mcporter/mcporter.json" <<EOF
{
"servers": {
"qiniu-mcp": {
"command": "qiniu-mcp-server",
"env": {
"QINIU_ACCESS_KEY": "$ACCESS_KEY",
"QINIU_SECRET_KEY": "$SECRET_KEY"
}
}
}
}
EOF
log_info "✅ mcporter 配置已更新"
fi
return 0
else
log_error "❌ qiniu-mcp 安装失败"
return 1
fi
}
# 安装 qshell
install_qshell() {
log_info "安装 qshell..."
local OS=$(uname -s | tr '[:upper:]' '[:lower:]')
local ARCH=$(uname -m)
if [ "$ARCH" = "x86_64" ]; then
ARCH="x64"
elif [ "$ARCH" = "aarch64" ]; then
ARCH="arm64"
fi
local QSHELL_URL="https://devtools.qiniu.com/qshell-OS-ARCH-v2.6.2.zip"
log_info "下载 qshell: $QSHELL_URL"
local TEMP_DIR=$(mktemp -d)
cd "$TEMP_DIR"
if wget -q "$QSHELL_URL" -O qshell.zip && unzip -q qshell.zip && chmod +x qshell; then
sudo mv qshell /usr/local/bin/ 2>/dev/null || mv qshell "$HOME/.local/bin/"
log_info "✅ qshell 安装成功"
cd - > /dev/null
rm -rf "$TEMP_DIR"
return 0
else
log_error "❌ qshell 安装失败"
cd - > /dev/null
rm -rf "$TEMP_DIR"
return 1
fi
}
# 创建配置文件
create_config() {
log_info "创建配置文件..."
mkdir -p "$CONFIG_DIR"
cat > "$CONFIG_FILE" <<EOF
{
"accessKey": "$ACCESS_KEY",
"secretKey": "$SECRET_KEY",
"bucket": "$BUCKET",
"region": "$REGION",
"domain": "$DOMAIN",
"options": {
"use_https": true,
"use_cdn": true,
"timeout": 30,
"upload_threshold": 4194304,
"chunk_size": 4194304,
"retry_times": 3
}
}
EOF
# 设置权限
chmod 600 "$CONFIG_FILE"
log_info "✅ 配置文件已创建: $CONFIG_FILE"
}
# 配置 shell 环境
configure_shell() {
log_info "配置 shell 环境..."
local SHELL_RC=""
if [ -n "$ZSH_VERSION" ]; then
SHELL_RC="$HOME/.zshrc"
else
SHELL_RC="$HOME/.bashrc"
fi
# 添加环境变量
if ! grep -q "QINIU_ACCESS_KEY" "$SHELL_RC" 2>/dev/null; then
cat >> "$SHELL_RC" <<EOF
# 七牛云 KODO 配置
export QINIU_ACCESS_KEY="$ACCESS_KEY"
export QINIU_SECRET_KEY="$SECRET_KEY"
export QINIU_BUCKET="$BUCKET"
export QINIU_REGION="$REGION"
EOF
log_info "✅ Shell 环境已配置: $SHELL_RC"
else
log_info "✅ Shell 环境已存在配置"
fi
}
# 配置 qshell
configure_qshell() {
if command_exists qshell; then
log_info "配置 qshell 账号..."
if qshell account "$ACCESS_KEY" "$SECRET_KEY" "openclaw" 2>/dev/null; then
log_info "✅ qshell 账号配置成功"
else
log_warn "⚠️ qshell 账号配置失败"
fi
fi
}
# 验证连接
verify_connection() {
log_info "验证七牛云连接..."
if node "$SCRIPT_DIR/qiniu_node.mjs" test-connection 2>/dev/null; then
log_info "✅ 七牛云连接验证成功"
return 0
else
log_error "❌ 七牛云连接验证失败"
return 1
fi
}
# 环境检查
check_environment() {
log_info "=========================================="
log_info " 七牛云 KODO 环境检查"
log_info "=========================================="
echo ""
local ALL_OK=true
check_nodejs || ALL_OK=false
check_npm || ALL_OK=false
check_mcp || true # 可选
check_sdk || ALL_OK=false
check_qshell || true # 可选
check_config || true
check_credentials || true
echo ""
log_info "=========================================="
if $ALL_OK; then
log_info "✅ 环境检查通过"
return 0
else
log_warn "⚠️ 环境检查未完全通过"
return 1
fi
}
# 完整安装
full_setup() {
log_info "=========================================="
log_info " 七牛云 KODO 自动安装"
log_info "=========================================="
echo ""
# 检查必需项
if ! check_nodejs; then
log_error "Node.js 是必需的"
exit 1
fi
if ! check_npm; then
log_error "npm 是必需的"
exit 1
fi
# 安装 SDK
if ! check_sdk; then
install_sdk
fi
# 安装 MCP(可选)
if ! check_mcp; then
log_info "是否安装 qiniu-mcp? [y/N]:"
read -r INSTALL_MCP_CHOICE
if [ "$INSTALL_MCP_CHOICE" = "y" ] || [ "$INSTALL_MCP_CHOICE" = "Y" ]; then
install_mcp
fi
fi
# 安装 qshell(可选)
if ! check_qshell; then
log_info "是否安装 qshell? [y/N]:"
read -r INSTALL_QSHELL_CHOICE
if [ "$INSTALL_QSHELL_CHOICE" = "y" ] || [ "$INSTALL_QSHELL_CHOICE" = "Y" ]; then
install_qshell
fi
fi
# 创建配置
if [ -n "$ACCESS_KEY" ] && [ -n "$SECRET_KEY" ]; then
create_config
configure_shell
configure_qshell
verify_connection
fi
echo ""
log_info "=========================================="
log_info "✅ 安装完成!"
log_info "=========================================="
}
# 主函数
main() {
parse_args "$@"
if $CHECK_ONLY; then
check_environment
elif $INSTALL_MCP; then
install_mcp
elif $INSTALL_SDK; then
install_sdk
elif [ -n "$ACCESS_KEY" ]; then
full_setup
else
check_environment
echo ""
log_info "使用方法:"
log_info " 检查环境: $0 --check-only"
log_info " 完整安装: $0 --access-key <KEY> --secret-key <KEY> --region <REGION> --bucket <BUCKET>"
log_info " 安装 SDK: $0 --install-sdk"
log_info " 安装 MCP: $0 --install-mcp"
fi
}
main "$@"