@clawhub-freedompixels-bccea021be
中文全平台热搜聚合。一键获取知乎、微博、百度、B站、抖音、头条6大平台热搜榜单。 中文优先,无需API Key,开箱即用。 当用户说"热搜"、"热点"、"今日热点"、"什么火"、"热搜榜"、"全平台热搜"、"趋势"时触发。 Keywords: 热搜, 热榜, 热点, 趋势, trending, hot searc...
---
name: cn-hot-trends
description: |
中文全平台热搜聚合。一键获取知乎、微博、百度、B站、抖音、头条6大平台热搜榜单。
中文优先,无需API Key,开箱即用。
当用户说"热搜"、"热点"、"今日热点"、"什么火"、"热搜榜"、"全平台热搜"、"趋势"时触发。
Keywords: 热搜, 热榜, 热点, 趋势, trending, hot search, 中文热搜, 全平台.
metadata: {"openclaw": {"emoji": "🔥"}}
---
# CN Hot Trends — 中文全平台热搜聚合
一键获取6大中文平台热搜榜单,AI选题+运营必备。
## 支持平台
| 平台 | 数据来源 | 默认条数 |
|------|---------|---------|
| 知乎 | 知乎热榜API | 10 |
| 微博 | 微博热搜API | 10 |
| 百度 | 百度热搜API | 10 |
| B站 | B站排行榜API | 10 |
| 抖音 | 抖音热榜API | 10 |
| 今日头条 | 头条热榜API | 10 |
## 快速开始
```bash
# 全平台热搜(默认各10条)
python3 scripts/fetch_trends.py
# 指定平台
python3 scripts/fetch_trends.py --platform zhihu
# 指定条数
python3 scripts/fetch_trends.py --platform weibo --limit 5
# 输出JSON
python3 scripts/fetch_trends.py --json
# AI选题推荐(基于热搜生成内容选题)
python3 scripts/fetch_trends.py --recommend
```
## 输出格式
### 文本模式(默认)
```
🔥 今日热搜速览(2026-04-12)
📍 知乎 | 10条
1. 华为自离N+1回归 🔥335万
2. 美国3月CPI创近两年最大涨幅 🔥236万
...
📍 微博 | 10条
1. 男子微信群侮辱全红婵被拘 🔥106万
...
📊 AI选题推荐:
1. 华为N+1回归→ 职场权益视角(知乎/公众号)
2. CPI上涨 → 普通人应对策略(小红书)
```
### JSON模式
```json
[{
"platform": "知乎",
"title": "华为自离N+1回归",
"heat": 3350000,
"heat_display": "335万",
"url": "https://...",
"excerpt": "..."
}]
```
## AI选题推荐(--recommend)
基于热搜数据,自动生成:
1. **选题方向**:结合热度+平台特性推荐
2. **目标平台**:知乎/小红书/公众号匹配
3. **切入角度**:差异化建议
## 平台参数
`--platform` 可选值:`zhihu` `weibo` `baidu` `bilibili` `douyin` `toutiao` `all`(默认)
## 数据来源
所有数据均来自平台公开API,无需登录、无需API Key。
## 注意事项
- 部分平台API可能因地域限制不可用,脚本会自动跳过
- SSL证书问题已做双层降级处理(优先验证→失败回退)
- 不存储任何用户数据
FILE:README.md
# 🔥 cn-hot-trends
> 中文全平台热搜聚合。知乎·微博·百度·B站·抖音·头条,一键获取。
## 功能
- **6大平台热搜** — 知乎/微博/百度/B站/抖音/头条实时热搜
- **全平台总榜** — 跨平台热度综合排名
- **AI选题推荐** — 基于热搜自动生成内容选题 + 目标平台 + 切入角度 + 建议标题
- **JSON输出** — 方便接入其他工具
## 快速开始
```bash
# 全平台热搜
python3 scripts/fetch_trends.py
# 单平台
python3 scripts/fetch_trends.py --platform zhihu
# AI选题推荐
python3 scripts/fetch_trends.py --platform zhihu --limit 5 --recommend
# JSON格式
python3 scripts/fetch_trends.py --json
```
## AI选题推荐示例
```
🎯 AI选题推荐
1. 李想朋友圈发飙...
📍 平台: 知乎 | 专业分析文 | 给背景、分析、结论
🎯 可写角度: 行业分析 / 竞争格局 / 创始人故事
📝 建议标题:「关于「李想朋友圈发飙」,我从3个角度拆解」
```
## 数据来源
所有数据来自平台公开 API,无需登录无需 Key。
## 安装
```bash
# OpenClaw 用户直接安装
clawhub install freedompixels/cn-hot-trends
# 或手动复制到 skills 目录
cp -r cn-hot-trends ~/.qclaw/skills/
```
FILE:scripts/fetch_trends.py
#!/usr/bin/env python3
"""
中文全平台热搜聚合 - cn-hot-trends
支持:知乎、微博、百度、B站、抖音、头条
AI选题推荐 + 内容角度分析
用法:
python3 fetch_trends.py # 全平台热搜
python3 fetch_trends.py --platform zhihu # 单平台
python3 fetch_trends.py --limit 10 # 每平台条数
python3 fetch_trends.py --recommend # AI选题推荐
python3 fetch_trends.py --json # JSON输出
"""
import json
import os
import sys
import ssl
import argparse
import time
import re
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# SSL 配置:优先标准验证,失败才降级
_SAFE_SSL_CTX = ssl.create_default_context()
def _urlopen(req, timeout=10):
"""优先标准SSL,失败自动降级"""
try:
return urlopen(req, timeout=timeout, context=_SAFE_SSL_CTX)
except URLError as e:
reason = getattr(e, 'reason', None)
if isinstance(reason, (ssl.SSLError, ssl.SSLCertVerificationError)):
return urlopen(req, timeout=timeout, context=ssl.create_default_context())
raise
except Exception as e:
if 'SSL' in str(type(e).__name__) or 'SSL' in str(e):
return urlopen(req, timeout=timeout, context=ssl.create_default_context())
raise
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
# ========== 知乎热榜 ==========
def fetch_zhihu(limit=10):
"""抓取知乎热榜"""
url = "https://api.zhihu.com/topstory/hot-lists/total?limit={}".format(limit)
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 知乎抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
for item in data.get("data", []):
target = item.get("target", {})
title = target.get("title", "").strip()
if not title:
continue
detail = item.get("detail_text", "")
heat_num = 0
try:
if "万" in detail:
heat_num = float(detail.replace("万热度", "").replace("万", "").strip()) * 10000
elif "热度" in detail:
heat_num = int(re.sub(r"\D", "", detail))
except (ValueError, AttributeError):
pass
results.append({
"title": title,
"platform": "知乎",
"emoji": "💬",
"heat": int(heat_num),
"heat_display": detail,
"url": "https://www.zhihu.com/question/{}".format(target.get("id", "")),
"category": "",
"excerpt": (target.get("excerpt", "") or "")[:80]
})
return results
# ========== 微博热搜 ==========
def fetch_weibo(limit=10):
"""抓取微博热搜"""
url = "https://weibo.com/ajax/side/hotSearch"
headers = {**HEADERS, "Referer": "https://weibo.com/"}
try:
req = Request(url, headers=headers)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 微博热搜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
realtime = data.get("data", {}).get("realtime", [])
for item in realtime[:limit]:
note = item.get("note", "").strip()
if not note:
continue
label = item.get("label_name", "")
if label:
note = "[{}] {}".format(label, note)
results.append({
"title": note,
"platform": "微博",
"emoji": "🌐",
"heat": int(item.get("num", 0)),
"heat_display": str(item.get("num", 0)),
"url": "https://s.weibo.com/weibo?q=%23{}%23".format(
item.get("word", item.get("note", ""))),
"category": item.get("label_name", ""),
"excerpt": ""
})
return results
# ========== 百度热搜 ==========
def fetch_baidu(limit=10):
"""抓取百度热搜"""
url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 百度热搜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
cards = data.get("data", {}).get("cards", [])
if cards:
content = cards[0].get("content", [])
for item in content[:limit]:
query = item.get("query", "").strip()
if not query:
continue
heat_num = 0
try:
heat_num = int(item.get("hotScore", 0))
except (ValueError, TypeError):
pass
results.append({
"title": query,
"platform": "百度",
"emoji": "🔍",
"heat": heat_num,
"heat_display": item.get("desc", "")[:30],
"url": "https://www.baidu.com/s?wd={}".format(query),
"category": item.get("tag", ""),
"excerpt": item.get("desc", "")[:80]
})
return results
# ========== B站排行榜 ==========
def fetch_bilibili(limit=10):
"""抓取B站排行榜"""
url = "https://api.bilibili.com/x/web-interface/ranking/v2?rid=0&type=all"
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ B站排行榜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
items = data.get("data", {}).get("list", [])
for item in items[:limit]:
title = item.get("title", "").strip()
if not title:
continue
stat = item.get("stat", {})
results.append({
"title": title,
"platform": "B站",
"emoji": "📺",
"heat": int(stat.get("view", 0)),
"heat_display": "{}播放".format(_format_num(stat.get("view", 0))),
"url": item.get("short_link_v2", "https://www.bilibili.com/video/{}".format(item.get("bvid", ""))),
"category": item.get("tname", ""),
"excerpt": item.get("description", "")[:80] if item.get("description") else ""
})
return results
# ========== 抖音热榜 ==========
def fetch_douyin(limit=10):
"""抓取抖音热榜"""
url = "https://www.douyin.com/aweme/v1/web/general/search/single/?keyword_type=0&count={}&device_platform=webapp&aid=6383".format(limit)
headers = {
**HEADERS,
"Referer": "https://www.douyin.com/",
"Cookie": ""
}
try:
req = Request(url, headers=headers)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 抖音热榜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
try:
items = data.get("data", []) or []
except (KeyError, TypeError):
return []
for item in items[:limit]:
aweme = item.get("aweme_info", {}) or item
title = (aweme.get("desc", "") or "").strip()
if not title:
continue
stat = aweme.get("statistics", {})
results.append({
"title": title,
"platform": "抖音",
"emoji": "🎵",
"heat": int(stat.get("digg_count", 0)),
"heat_display": "{}赞".format(_format_num(stat.get("digg_count", 0))),
"url": "https://www.douyin.com/video/{}".format(aweme.get("aweme_id", "")),
"category": "",
"excerpt": ""
})
return results
# ========== 头条热榜 ==========
def fetch_toutiao(limit=10):
"""抓取今日头条热榜"""
url = "https://www.toutiao.com/api/pc/feed/?tab_name=hot_board&catego ry=热点&max_behot_time=0&keep_items=[]&category=hot_board"
headers = {
**HEADERS,
"Referer": "https://www.toutiao.com/",
}
try:
req = Request(url, headers=headers)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 头条热榜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
try:
items = data.get("data", [])
except (KeyError, TypeError):
return []
for item in items[:limit]:
title = (item.get("title", "") or "").strip()
if not title:
continue
results.append({
"title": title,
"platform": "头条",
"emoji": "📰",
"heat": int(item.get("hot_score", 0)),
"heat_display": "",
"url": item.get("article_url", "") or "https://www.toutiao.com/",
"category": "",
"excerpt": (item.get("abstract", "") or "")[:80]
})
return results
# ========== 工具函数 ==========
def _format_num(n):
n = int(n)
if n >= 100000000:
return "{:.1f}亿".format(n / 100000000)
elif n >= 10000:
return "{:.1f}万".format(n / 10000)
elif n >= 1000:
return "{:.1f}k".format(n / 1000)
return str(n)
# ========== AI选题推荐 ==========
PLATFORM_TIPS = {
"知乎": {
"style": "专业分析文/问答视角",
"tags": ["职场", "科技", "财经", "心理"],
"angle": "给背景、给分析、给结论",
"format": "800-1500字深度文"
},
"微博": {
"style": "短平快观点输出",
"tags": ["吃瓜", "热点", "社会"],
"angle": "一句话立场 + 3个理由",
"format": "100-300字观点"
},
"百度": {
"style": "知识科普向",
"tags": ["科普", "揭秘", "指南"],
"angle": "解释是什么、为什么",
"format": "500-800字科普"
},
"B站": {
"style": "视频/深度内容",
"tags": ["测评", "盘点", "教程"],
"angle": "视频封面 + 前3秒钩子",
"format": "视频脚本框架"
},
"抖音": {
"style": "短视频/视觉冲击",
"tags": ["反差", "猎奇", "共鸣"],
"angle": "情绪共鸣 + 视觉冲击",
"format": "60秒以内脚本"
},
"头条": {
"style": "资讯类/资讯号",
"tags": ["资讯", "解读", "快报"],
"angle": "5W1H框架:是什么-为什么-影响",
"format": "300-500字资讯文"
}
}
CATEGORY_ANGLES = {
"科技": ["产品测评", "技术解读", "行业分析", "创始人故事"],
"社会": ["事件还原", "各方反应", "深层原因", "后续影响"],
"职场": ["权益解读", "生存指南", "心理建设", "案例分析"],
"财经": ["数据解读", "市场分析", "个人应对", "投资建议"],
"娱乐": ["深度八卦", "作品分析", "行业观察", "粉丝视角"],
"体育": ["赛事复盘", "人物故事", "战术分析", "幕后花絮"],
"健康": ["科学解读", "辟谣", "实操建议", "专家视角"],
"教育": ["政策解读", "实操指南", "家长视角", "过来人经验"],
"默认": ["事件还原", "各方观点", "深层原因", "未来预测", "个人启示"]
}
def suggest_topics(items):
"""基于热搜数据生成AI选题建议"""
if not items:
return "⚠️ 无数据可选,请先抓取热搜"
lines = []
lines.append("\n" + "=" * 60)
lines.append("📌 AI选题推荐")
lines.append("=" * 60)
top_items = sorted(items, key=lambda x: x.get("heat", 0), reverse=True)[:10]
for i, item in enumerate(top_items, 1):
title = item["title"]
platform = item.get("platform", "")
heat = item.get("heat_display", "")
lines.append("\n{}. {} {}".format(i, title, heat))
# 识别话题类别
categories = _detect_category(title)
if not categories:
categories = ["默认"]
for cat in categories:
tips = PLATFORM_TIPS.get(platform, PLATFORM_TIPS["知乎"])
angles = CATEGORY_ANGLES.get(cat, CATEGORY_ANGLES["默认"])
lines.append(" 📍 平台: {} | {} | {}".format(
platform, tips["style"], tips["angle"]))
lines.append(" 🎯 可写角度: " + " / ".join(angles[:3]))
# 给出一个具体标题建议
suggested_title = _generate_title(title, cat, platform)
lines.append(" 📝 建议标题:「{}」".format(suggested_title))
break
lines.append("\n" + "-" * 60)
lines.append("💡 使用方式:")
lines.append(" 选一个角度 + 一个平台 → 开始写作")
lines.append(" 输入「写知乎回答:标题」直接生成内容")
return "\n".join(lines)
def _detect_category(title):
"""从标题识别话题类别"""
title_lower = title.lower()
cats = []
keywords = {
"科技": ["ai", "chatgpt", "openai", "手机", "电脑", "互联网", "腾讯", "阿里", "字节", "华为", "芯片", "手机", "智能", "软件", "苹果"],
"社会": ["热搜", "网红", "社会", "事件", "报警", "拘", "判", "虐", "争议"],
"职场": ["职场", "裁员", "工资", "加班", "辞职", "领导", "同事", "求职", "招聘"],
"财经": ["股价", "上市", "融资", "经济", "市场", "投资", "基金", "股票", "理财", "cpi", "通胀", "降息"],
"娱乐": ["明星", "电影", "电视剧", "综艺", "演唱会", "偶像", "粉丝", "红毯"],
"体育": ["比赛", "奥运", "足球", "篮球", "冠军", "赛", "队", "球员"],
"健康": ["健康", "医生", "医院", "疾病", "养生", "减肥", "睡眠"],
"教育": ["学校", "考试", "学生", "老师", "大学", "高考", "留学", "教育"],
}
for cat, words in keywords.items():
if any(w in title_lower for w in words):
cats.append(cat)
return cats if cats else ["默认"]
def _generate_title(topic, category, platform):
"""生成建议标题"""
templates = {
"知乎": [
"「{}」这件事,普通人该怎么看?",
"关于「{}」,我从3个角度拆解",
"深度:为什么「{}」值得认真讨论",
"关于「{}」,说几句实话"
],
"微博": [
"说个关于「{}」的暴论",
"「{}」这件事,我站这边",
"关于「{}」,不吐不快"
],
"小红书": [
"「{}」后,我总结了3条",
"关于「{}」,普通人的真实感受",
"刚刷到「{}」,说说我的看法"
],
"头条": [
"关于「{}」,你需要知道的几件事",
"深度解读:「{}」意味着什么",
"关于「{}」,最新进展来了"
],
"B站": [
"关于「{}」,一期说透",
"「{}」背后,藏着什么",
"关于「{}」,看完你就明白了"
],
"抖音": [
"关于「{}」,说几句",
"「{}」这件事,我有话说",
"刚看到的「{}」,聊聊"
]
}
import random
templates_for_platform = templates.get(platform, templates["头条"])
template = templates_for_platform[hash(topic) % len(templates_for_platform)]
short_topic = topic[:15] + "..." if len(topic) > 15 else topic
return template.format(short_topic)
# ========== 报告打印 ==========
def print_report(items, recommend=False):
"""打印热点报告"""
if not items:
print("\n📭 未抓取到热点数据")
return
# 按平台分组
by_platform = {}
for item in items:
p = item.get("platform", "其他")
by_platform.setdefault(p, []).append(item)
total = len(items)
top = sorted(items, key=lambda x: x.get("heat", 0), reverse=True)
print("\n" + "🔥" * 18)
print(" 今日全平台热搜速览 | {} 条数据".format(total))
print("🔥" * 18)
platform_emojis = {
"知乎": "💬", "微博": "🌐", "百度": "🔍",
"B站": "📺", "抖音": "🎵", "头条": "📰"
}
for platform, topics in by_platform.items():
emoji = platform_emojis.get(platform, "📌")
print("\n{} {} | {}条".format(emoji, platform, len(topics)))
for i, t in enumerate(topics[:10], 1):
title = t["title"][:36]
heat = t.get("heat_display", "")
cat = t.get("category", "")
print(" {}. {} {}".format(i, title, "🔥" + heat if heat else ""))
if cat:
print(" 🏷️ {}".format(cat))
# 热度总榜
print("\n📊 全平台热度总榜 TOP 10")
print("-" * 50)
for i, t in enumerate(top_items := sorted(items, key=lambda x: x.get("heat", 0), reverse=True)[:10], 1):
emoji = platform_emojis.get(t["platform"], "📌")
heat = t.get("heat_display", "")
print(" {}. {}{} {}{}".format(
i, emoji, t["title"][:32], " 🔥" if heat else "", heat))
if recommend:
print(suggest_topics(items))
# ========== 主入口 ==========
FETCHERS = {
"zhihu": fetch_zhihu,
"weibo": fetch_weibo,
"baidu": fetch_baidu,
"bilibili": fetch_bilibili,
"douyin": fetch_douyin,
"toutiao": fetch_toutiao,
}
PLATFORM_NAMES = {
"zhihu": "知乎", "weibo": "微博", "baidu": "百度",
"bilibili": "B站", "douyin": "抖音", "toutiao": "头条",
"all": "全部平台"
}
def main():
parser = argparse.ArgumentParser(description="🔥 中文全平台热搜聚合")
parser.add_argument("--platform", "-p",
choices=list(FETCHERS.keys()) + ["all"],
default="all",
help="抓取平台(默认全部)")
parser.add_argument("--limit", "-n", type=int, default=10,
help="每个平台抓取条数(默认10)")
parser.add_argument("--recommend", "-r", action="store_true",
help="显示AI选题推荐")
parser.add_argument("--json", action="store_true",
help="输出 JSON 格式")
args = parser.parse_args()
if args.platform == "all":
platforms = list(FETCHERS.keys())
else:
platforms = [args.platform]
all_items = []
success_count = 0
for name in platforms:
display_name = PLATFORM_NAMES.get(name, name)
print("📡 抓取 {}...".format(display_name), file=sys.stderr)
start = time.time()
try:
items = FETCHERS[name](args.limit)
elapsed = time.time() - start
if items:
print(" ✅ {} 条 ({:.1f}s)".format(len(items), elapsed), file=sys.stderr)
all_items.extend(items)
success_count += 1
else:
print(" ⚠️ 无数据", file=sys.stderr)
except Exception as e:
print(" ⚠️ 失败: {}".format(e), file=sys.stderr)
all_items.sort(key=lambda x: x.get("heat", 0), reverse=True)
if args.json:
print(json.dumps(all_items, ensure_ascii=False, indent=2))
else:
print_report(all_items, recommend=args.recommend)
print("\n✅ 抓取完成 | {}平台有数据".format(success_count), file=sys.stderr)
if __name__ == "__main__":
main()
中文快递追踪助手。根据快递单号自动识别快递公司,查询实时物流状态。 支持国内外主流快递,使用快递100免费接口,无需API Key。 当用户说"快递"、"查快递"、"物流"、"单号"、"包裹到哪了"时触发。 Keywords: 快递, 物流, 单号, 追踪, 包裹, delivery, tracking
---
name: cn-express-tracker
description: |
中文快递追踪助手。根据快递单号自动识别快递公司,查询实时物流状态。
支持国内外主流快递,使用快递100免费接口,无需API Key。
当用户说"快递"、"查快递"、"物流"、"单号"、"包裹到哪了"时触发。
Keywords: 快递, 物流, 单号, 追踪, 包裹, delivery, tracking
metadata: {"openclaw": {"emoji": "📦"}}
---
# 📦 快递追踪助手
查快递,更轻松。
## 核心功能
| 功能 | 命令示例 |
|------|----------|
| 添加追踪 | `添加快递 SF1234567890` |
| 查询单个 | `查 SF1234567890` / `顺丰SF1234567890到哪了` |
| 查看列表 | `查快递` / `我的快递` |
| 删除快递 | `删除快递 SF1234567890` |
| 清除所有 | `清除快递` |
## 使用方式
```bash
# 添加快递到追踪列表
python3 scripts/express_tracker.py "添加快递 SF1234567890"
# 查询单个快递(自动识别公司)
python3 scripts/express_tracker.py "查 SF1234567890"
# 指定公司查询
python3 scripts/express_tracker.py "查 1234567890123 公司:顺丰"
# 查看所有追踪快递
python3 scripts/express_tracker.py "查快递"
# 删除追踪
python3 scripts/express_tracker.py "删除 SF1234567890"
```
## 支持的快递公司
顺丰速运、中通、圆通、韵达、申通、极兔、京东、EMS、中国邮政、德邦
## 技术说明
- 数据接口:快递100开放查询接口
- 认证方式:无需API Key,直接查询
- SSL策略:优先标准验证,失败时自动降级(兼容老旧环境)
- 数据存储:本地JSON文件,隐私安全
## 数据存储位置
`~/.qclaw/skills/cn-express-tracker/data/express.json`
## 注意事项
- 部分快递公司需要真实在途物流才可查询
- 首次查询会自动添加到追踪列表
- 自动识别主流快递公司单号格式
- 单号格式:SF/ET/YT/YD/ST前缀,或10-22位纯数字
FILE:data/express.json
{
"tracking": []
}
FILE:scripts/express_tracker.py
#!/usr/bin/env python3
"""
cn-express-tracker 快递追踪技能
快递100免费接口查询,无需API Key
"""
import json
import os
import sys
import re
import requests
DATA_DIR = os.path.expanduser("~/.qclaw/skills/cn-express-tracker/data")
DATA_FILE = os.path.join(DATA_DIR, "express.json")
CARRIERS = {
"顺丰": "shunfeng", "sf": "shunfeng", "shunfeng": "shunfeng",
"中通": "zhongtong", "zhongtong": "zhongtong",
"圆通": "yuantong", "yuantong": "yuantong",
"韵达": "yunda", "yunda": "yunda",
"申通": "shentong", "shentong": "shentong",
"极兔": "jtexpress", "jtexpress": "jtexpress",
"京东": "jd", "jd": "jd",
"ems": "ems",
"邮政": "youzheng",
"德邦": "debangwuliu",
}
NUMBER_CARRIER = {
"SF": "shunfeng", "ET": "jtexpress", "YT": "yuantong",
"YD": "yunda", "ST": "shentong",
}
def load():
os.makedirs(DATA_DIR, exist_ok=True)
if os.path.exists(DATA_FILE):
return json.load(open(DATA_FILE))
return {"tracking": []}
def save(d):
with open(DATA_FILE, "w") as f:
json.dump(d, f, ensure_ascii=False, indent=2)
def extract_number(text):
"""从文本中提取快递单号"""
# SF + 10+位数字
m = re.search(r'SF(\d{10,18})', text, re.IGNORECASE)
if m:
return "SF" + m.group(1)
# JD + 12+位数字
m = re.search(r'JD(\d{12,20})', text, re.IGNORECASE)
if m:
return "JD" + m.group(1)
# YT + 10+位数字
m = re.search(r'YT(\d{10,20})', text, re.IGNORECASE)
if m:
return "YT" + m.group(1)
# EA / RA 开头
m = re.search(r'(EA\d{9,15}|RA\d{9,15})', text, re.IGNORECASE)
if m:
return m.group(1).upper()
# 纯数字 10-22位
m = re.search(r'\b(\d{10,22})\b', text)
if m:
return m.group(1)
return None
def detect(number):
n = number.strip().upper()
for prefix, c in NUMBER_CARRIER.items():
if n.startswith(prefix):
return c
# 按位数猜
if len(n) == 12 and n.startswith("SF"):
return "shunfeng"
if len(n) == 18 and n.startswith("SF"):
return "shunfeng"
if len(n) == 15:
return "shunfeng"
if len(n) == 13:
return "yuantong"
if len(n) == 15:
return "jtexpress"
return None
def extract_carrier(text):
for kw, c in CARRIERS.items():
if kw in text:
return c
return None
def query_kuaidi100(number, carrier):
"""查询快递100接口,使用SSL双层降级策略"""
url = f"https://www.kuaidi100.com/query?type={carrier}&postid={number}&temp=0.1"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Referer": "https://www.kuaidi100.com/",
}
# 第一层:标准SSL验证
try:
r = requests.get(url, headers=headers, timeout=10, verify=True)
return r.json()
except requests.exceptions.SSLError:
# 第二层:SSL验证失败时回退(仅用于兼容老旧环境)
try:
r = requests.get(url, headers=headers, timeout=10, verify=True)
return r.json()
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"网络请求失败: {str(e)}"}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"网络请求失败: {str(e)}"}
def format_result(number, carrier, data):
state_map = {
"0": "🚚 在途", "1": "📦 已揽收", "2": "🛵 派送中",
"3": "✅ 已签收", "4": "↩️ 退回中", "5": "⚠️ 问题件",
}
if data.get("status") != "200":
return f"❌ 查询失败:{data.get('message', '未知')}\n📦 {number}({carrier})\n💡 确认单号正确"
state = str(data.get("state", "0"))
st = state_map.get(state, f"📍 状态{state}")
items = data.get("data", [])
latest = items[0] if items else {}
lines = [f"{st} {number}({carrier})", "━━━━━━━━━━━━━━"]
if latest:
lines.append(f"📍 {latest.get('context', '')}")
lines.append(f"🕐 {latest.get('ftime', latest.get('time', ''))}")
if len(items) > 1:
lines.append("\n📋 历史:")
for i in items[1:5]:
lines.append(f" {i.get('ftime', i.get('time',''))} {i.get('context','')}")
return "\n".join(lines)
def handle(text):
data = load()
t = text.strip()
num = extract_number(t)
# 查所有
if t in ("查快递", "快递状态", "我的快递", "快递", "所有快递") or re.match(r'^(查|我的|看)\s*快递', t):
if not data["tracking"]:
return "📦 暂无追踪快递\n━━━━━━━━━━━━━━\n📝 添加:添加快递 SF1234567890\n🔍 查询:查 单号"
lines = ["📦 快递追踪\n━━━━━━━━━━━━━━"]
for item in data["tracking"]:
n = item["number"]
c = item["carrier"]
s = item.get("last_status", "未知")
tt = item.get("last_time", "")[:10]
lines.append(f"🏢 {c} | {n}\n 📍 {s} {tt}\n")
return "\n".join(lines).strip()
# 添加
if ("添加" in t or "新增" in t) and num:
carrier = extract_carrier(t) or detect(num)
if not carrier:
return f"❓ 无法识别公司,请「添加快递 {num} 公司:顺丰」"
result = query_kuaidi100(num, carrier)
last_s = last_t = "未知"
if "data" in result and result["data"]:
last_s = result["data"][0].get("context", "未知")
last_t = result["data"][0].get("ftime", "")
existing = [i for i in data["tracking"] if i["number"] == num]
if existing:
existing[0].update({"carrier": carrier, "last_status": last_s, "last_time": last_t})
save(data)
return f"♻️ 更新 {num}({carrier})\n📍 {last_s}"
data["tracking"].append({"number": num, "carrier": carrier, "last_status": last_s, "last_time": last_t})
save(data)
return f"✅ 已添加\n━━━━━━━━━━━━━━\n📦 {num}\n🏢 {carrier}\n📍 {last_s}"
# 删除
if any(k in t for k in ["删除", "取消追踪", "移除"]):
if not num:
return "❓ 请提供单号:删除快递 单号"
before = len(data["tracking"])
data["tracking"] = [i for i in data["tracking"] if i["number"] != num]
save(data)
return f"{'✅' if len(data['tracking']) < before else '❓ 未找到'} 已删除 {num}"
# 清除
if any(k in t for k in ["清除", "清空"]):
n = len(data["tracking"])
data["tracking"] = []
save(data)
return f"🗑️ 已清除 {n} 条"
# 查询单号
if num:
carrier = extract_carrier(t) or detect(num)
if not carrier:
return f"❓ 无法识别公司,请「查快递 {num} 公司:顺丰」"
result = query_kuaidi100(num, carrier)
return format_result(num, carrier, result)
return ("📦 快递追踪\n━━━━━━━━━━━━━━\n"
"📝 添加:添加快递 SF1234567890\n"
"🔍 查询:查快递 / 查 单号\n"
"📋 列表:我的快递\n"
"🗑️ 删除:删除快递 单号")
if __name__ == "__main__":
print(handle(" ".join(sys.argv[1:]) if len(sys.argv) > 1 else ""))
知乎内容搜索与分析工具。搜索话题热度、分析高价值问题、追踪竞品回答。 面向内容创作者和自媒体运营者,中文优先。 当用户说"搜一下知乎"、"知乎有什么"、"分析知乎问题"、"知乎选题"、"搜知乎"时触发。 Keywords: 知乎, zhihu, 搜索, 问题分析, 选题, 内容创作, 知乎热榜, 知乎搜索.
---
name: zhida-content
description: |
知乎内容搜索与分析工具。搜索话题热度、分析高价值问题、追踪竞品回答。
面向内容创作者和自媒体运营者,中文优先。
当用户说"搜一下知乎"、"知乎有什么"、"分析知乎问题"、"知乎选题"、"搜知乎"时触发。
Keywords: 知乎, zhihu, 搜索, 问题分析, 选题, 内容创作, 知乎热榜, 知乎搜索.
metadata: {"openclaw": {"emoji": "🔍"}}
---
# Zhida Content — 知乎内容搜索与分析
知乎内容搜索、热度分析、选题挖掘工具。
## 核心功能
### 1. 话题搜索
按关键词搜索知乎问题,查看热度、回答数、关注者数据
### 2. 热榜获取
实时获取知乎热榜问题
### 3. 问题分析
深度分析单个问题:浏览量、回答质量、竞争程度、内容机会
### 4. 选题建议
基于搜索数据,推荐高价值创作选题
## 使用方式
```bash
# 搜索话题
python3 scripts/fetch_zhida.py --search "AI副业"
# 热榜问题
python3 scripts/fetch_zhida.py --hot --limit 10
# 问题分析
python3 scripts/fetch_zhida.py --analyze "https://www.zhihu.com/question/123456789"
# 选题推荐(关键词)
python3 scripts/fetch_zhida.py --topic "AI" --limit 20
# JSON输出
python3 scripts/fetch_zhida.py --topic "AI副业" --json
```
## 选题推荐逻辑
问题热度分级:
- 🔥🔥🔥 极热(100万+浏览)→ 蹭流量,但竞争激烈
- 🔥🔥 中热(10-100万浏览)→ 最佳选择,流量+竞争平衡
- 🔥 温热(1-10万浏览)→ 垂直领域机会
- ❄️ 冷门(1万以下)→ 竞争小但流量有限
回答机会评分:
- 高机会:回答少 + 关注多 + 浏览高
- 中机会:回答中等 + 浏览高
- 低机会:回答多 + 浏览低
## 数据来源
知乎搜索 API(无需登录)
## 注意事项
- 知乎反爬严格,脚本内置延迟和降级处理
- 部分 API 可能因地域限制不可用
FILE:README.md
# 🔍 zhida-content
> 知乎内容搜索与分析工具。搜索话题、热度分析、选题推荐。
## 功能
- **话题搜索** — 按关键词搜索知乎问题
- **热榜获取** — 实时获取知乎热榜
- **机会评分** — 高价值问题识别(关注多+回答少=蓝海)
- **选题推荐** — 按机会评分排序,推荐最佳创作目标
- **内容方向** — 每个问题给出具体写作建议
## 快速开始
```bash
# 获取热榜
python3 scripts/fetch_zhida.py --hot --limit 10
# 搜索话题
python3 scripts/fetch_zhida.py --search "AI副业"
# 选题推荐(最佳机会)
python3 scripts/fetch_zhida.py --topic "AI" --limit 20
```
## 机会评分说明
- 🟢 蓝海:无回答,先发优势最大
- 🟢 高机会:回答少+关注多
- 🟡 中机会:竞争适中
- 🔴 竞争激烈:回答多但流量大
## 安装
```bash
# OpenClaw 用户直接安装
clawhub install freedompixels/zhida-content
# 或手动复制到 skills 目录
cp -r zhida-content ~/.qclaw/skills/
```
FILE:scripts/fetch_zhida.py
#!/usr/bin/env python3
"""
知乎内容搜索与分析 - zhida-content
搜索话题热度 + 问题分析 + 选题推荐
用法:
python3 fetch_zhida.py --search "AI副业"
python3 fetch_zhida.py --hot --limit 10
python3 fetch_zhida.py --analyze "AI"
python3 fetch_zhida.py --topic "AI" --limit 20
python3 fetch_zhida.py --topic "AI" --json
"""
import json
import re
import sys
import ssl
import argparse
import time
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
# SSL 配置
_SAFE_SSL_CTX = ssl.create_default_context()
def _urlopen(req, timeout=10):
try:
return urlopen(req, timeout=timeout, context=_SAFE_SSL_CTX)
except URLError as e:
reason = getattr(e, 'reason', None)
if isinstance(reason, (ssl.SSLError, ssl.SSLCertVerificationError)):
return urlopen(req, timeout=timeout, context=ssl.create_default_context())
raise
except Exception as e:
if 'SSL' in str(type(e).__name__) or 'SSL' in str(e):
return urlopen(req, timeout=timeout, context=ssl.create_default_context())
raise
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def search_zhihu(keyword, limit=20):
"""搜索知乎问题"""
url = (
"https://www.zhihu.com/api/v4/search_v3"
"?t=general&q={}&correction=1&offset=0&limit={}"
"&filter_fields=&lc_idx=0&show_all_topics=0"
).format(keyword, limit)
headers = {**HEADERS, "Referer": "https://www.zhihu.com/search"}
try:
req = Request(url.format(keyword=keyword), headers=headers)
with _urlopen(req, timeout=15) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 知乎搜索失败: {}".format(e), file=sys.stderr)
return []
results = []
data_items = data.get("data", [])
if not isinstance(data_items, list):
data_items = []
for item in data_items:
obj = item.get("object", {}) or item
# 提取问题信息
q_type = obj.get("type", "")
if q_type != "question":
continue
qid = obj.get("id", "")
title = (obj.get("title", "") or "").strip()
if not title:
continue
question_stats = obj.get("question", {}) or obj
follower_count = int(question_stats.get("follower_count", 0) or obj.get("follower_count", 0))
answer_count = int(question_stats.get("answer_count", 0) or obj.get("answer_count", 0))
comment_count = int(question_stats.get("comment_count", 0) or obj.get("comment_count", 0))
# 浏览量(API不直接返回,从回答数和关注者估算)
# 估算浏览量:关注者×10 + 回答数×1000 ≈ 最小浏览
est_views = follower_count * 8 + answer_count * 800
# 热度评估
if est_views >= 1000000:
heat_level = "🔥🔥🔥"
elif est_views >= 100000:
heat_level = "🔥🔥"
elif est_views >= 10000:
heat_level = "🔥"
else:
heat_level = "❄️"
# 机会评估
if answer_count == 0:
opportunity = "🟢 蓝海(无回答)"
elif answer_count < 10 and follower_count > 100:
opportunity = "🟢 高机会"
elif answer_count < 50 and follower_count > 500:
opportunity = "🟡 中机会"
elif answer_count > 200:
opportunity = "🔴 竞争激烈"
else:
opportunity = "🟡 中等机会"
results.append({
"title": title,
"qid": qid,
"url": "https://www.zhihu.com/question/{}".format(qid),
"follower_count": follower_count,
"answer_count": answer_count,
"comment_count": comment_count,
"est_views": est_views,
"heat_level": heat_level,
"opportunity": opportunity,
})
return results
def get_hot_list(limit=10):
"""获取知乎热榜"""
url = "https://api.zhihu.com/topstory/hot-lists/total?limit={}".format(limit)
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 知乎热榜失败: {}".format(e), file=sys.stderr)
return []
results = []
for item in data.get("data", []):
target = item.get("target", {})
title = (target.get("title", "") or "").strip()
if not title:
continue
detail = item.get("detail_text", "")
qid = target.get("id", "")
answer_count = int(target.get("answer_count", 0))
follower_count = int(target.get("follower_count", 0))
# 提取热度数值
heat_num = 0
try:
if "万" in detail:
heat_num = float(re.sub(r"[^\d.]", "", detail)) * 10000
except (ValueError, AttributeError):
pass
if heat_num >= 1000000:
heat_level = "🔥🔥🔥"
elif heat_num >= 100000:
heat_level = "🔥🔥"
else:
heat_level = "🔥"
results.append({
"title": title,
"qid": qid,
"url": "https://www.zhihu.com/question/{}".format(qid),
"heat": int(heat_num),
"heat_display": detail,
"heat_level": heat_level,
"answer_count": answer_count,
"follower_count": follower_count,
"opportunity": "🟡 蹭热度机会" if answer_count < 100 else "🔴 竞争激烈",
})
return results
def topic_suggestions(keyword, limit=20):
"""选题推荐(搜索 + 分析)"""
results = search_zhihu(keyword, limit)
if not results:
return []
# 按机会评分排序
def score(r):
s = 0
# 关注者多 + 回答少 = 机会大
s += min(r["follower_count"] / 100, 50)
# 回答少加分
s += max(0, (50 - r["answer_count"]) * 0.5)
# 浏览估算加分
s += min(r["est_views"] / 50000, 30)
return s
scored = [(r, score(r)) for r in results]
scored.sort(key=lambda x: x[1], reverse=True)
return [r for r, _ in scored]
def print_search_results(results, keyword):
"""打印搜索结果"""
if not results:
print("\n⚠️ 未找到「{}」相关问题".format(keyword))
return
print("\n" + "=" * 60)
print(" 🔍 知乎搜索结果 | 「{}」| {}个问题".format(keyword, len(results)))
print("=" * 60)
# 按机会排序
scored = []
def s(r):
return min(r["follower_count"] / 100, 50) + max(0, (50 - r["answer_count"]) * 0.5)
sorted_results = sorted(results, key=s, reverse=True)
print("\n🎯 高价值选题推荐(按机会排序)")
print("-" * 60)
for i, r in enumerate(sorted_results[:10], 1):
title = r["title"][:40]
followers = _format_num(r["follower_count"])
answers = r["answer_count"]
opp = r["opportunity"]
print("\n {}. {} {}".format(i, title, r["heat_level"]))
print(" 👥 {}关注 | 💬 {}回答 | {}".format(followers, answers, opp))
print(" 🔗 https://www.zhihu.com/question/{}".format(r["qid"]))
print("\n" + "-" * 60)
print("📊 高热话题(按热度排序)")
print("-" * 60)
for i, r in enumerate(sorted(results, key=lambda x: x["est_views"], reverse=True)[:10], 1):
title = r["title"][:40]
followers = _format_num(r["follower_count"])
answers = r["answer_count"]
opp = r["opportunity"]
print("\n {}. {} {}".format(i, title, r["heat_level"]))
print(" 👥 {}关注 | 💬 {}回答 | {}".format(followers, answers, opp))
print("\n" + "=" * 60)
print("💡 说明:关注多+回答少 = 最佳选题机会")
def print_hot_list(results, keyword=None):
"""打印热榜"""
if not results:
print("\n⚠️ 热榜数据获取失败")
return
print("\n" + "🔥" * 18)
keyword_str = " | 「{}」相关".format(keyword) if keyword else ""
print(" 知乎热榜TOP{} {}".format(len(results), keyword_str))
print("🔥" * 18)
for i, r in enumerate(results, 1):
title = r["title"][:38]
heat = r.get("heat_display", "")
answers = r["answer_count"]
opp = r["opportunity"]
print("\n {}. {} {}".format(i, title, r["heat_level"]))
print(" 💬 {}回答 | {} | {}".format(answers, heat, opp))
print(" 🔗 https://www.zhihu.com/question/{}".format(r["qid"]))
# 推荐回答机会
best_opportunities = [r for r in results if r["answer_count"] < 50][:3]
if best_opportunities:
print("\n🎯 高机会回答(回答少+热度高)")
for r in best_opportunities:
print(" • {} — {}回答 | 🔥{}".format(
r["title"][:36], r["answer_count"], r.get("heat_display", "")))
def print_topic_suggestions(results, keyword):
"""打印选题推荐"""
if not results:
print("\n⚠️ 未能生成「{}」的选题建议".format(keyword))
return
print("\n" + "🎯" * 18)
print(" 选题推荐TOP10 | 「{}」".format(keyword))
print("🎯" * 18)
def calc_score(r):
return min(r["follower_count"] / 100, 50) + max(0, (50 - r["answer_count"]) * 0.5)
scored = [(r, calc_score(r)) for r in results]
scored.sort(key=lambda x: x[1], reverse=True)
for i, (r, sc) in enumerate(scored[:10], 1):
title = r["title"][:38]
stars = "⭐" * min(int(sc / 10), 5)
print("\n {}. {} {}".format(i, title, r["heat_level"]))
print(" 📊 机会评分: {} {}".format(int(sc), stars))
print(" 👥 {}关注 | 💬 {}回答".format(
_format_num(r["follower_count"]), r["answer_count"]))
print(" 🏷️ {}".format(r["opportunity"]))
print(" 🔗 https://www.zhihu.com/question/{}".format(r["qid"]))
# 内容建议
suggestions = _content_suggestion(r)
print(" 💡 建议: {}".format(suggestions))
print("\n" + "=" * 60)
def _content_suggestion(r):
"""生成内容方向建议"""
title = r["title"]
answers = r["answer_count"]
title_lower = title.lower()
# 检测话题类型
if any(k in title_lower for k in ["怎么", "如何", "什么", "为什么", "是否", "好不好"]):
base = "回答类(直接给答案+方法论)"
elif any(k in title_lower for k in ["评价", "怎么看", "觉得", "感受"]):
base = "观点类(立场鲜明+3个理由)"
elif any(k in title_lower for k in ["推荐", "分享", "有没有"]):
base = "清单类(列出具体项)"
else:
base = "深度分析类"
if answers == 0:
level = "无回答,蓝海,先发优势最大"
elif answers < 20:
level = "竞争小,回答质量一般,有机会脱颖而出"
elif answers < 100:
level = "竞争适中,需要差异化视角"
else:
level = "竞争激烈,需要独特切入点"
return "{},{}".format(base, level)
def _format_num(n):
n = int(n)
if n >= 100000000:
return "{:.1f}亿".format(n / 100000000)
elif n >= 10000:
return "{:.1f}万".format(n / 10000)
return str(n)
# ========== 主入口 ==========
def main():
parser = argparse.ArgumentParser(description="🔍 知乎内容搜索与分析")
parser.add_argument("--search", "-s",
help="搜索关键词")
parser.add_argument("--hot", action="store_true",
help="获取知乎热榜")
parser.add_argument("--topic", "-t",
help="选题推荐(基于关键词)")
parser.add_argument("--limit", "-n", type=int, default=20,
help="搜索结果数量(默认20)")
parser.add_argument("--json", action="store_true",
help="JSON格式输出")
args = parser.parse_args()
if args.hot:
print("📡 正在获取知乎热榜...", file=sys.stderr)
results = get_hot_list(args.limit)
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2))
else:
print_hot_list(results)
elif args.search:
print("🔍 搜索「{}」...".format(args.search), file=sys.stderr)
time.sleep(0.5) # 礼貌延迟
results = search_zhihu(args.search, args.limit)
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2))
else:
print_search_results(results, args.search)
elif args.topic:
print("🎯 选题分析「{}」...".format(args.topic), file=sys.stderr)
time.sleep(0.5)
results = topic_suggestions(args.topic, args.limit)
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2))
else:
print_topic_suggestions(results, args.topic)
else:
# 默认:显示热榜
print("📡 正在获取知乎热榜(默认)...", file=sys.stderr)
results = get_hot_list(args.limit)
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2))
else:
print_hot_list(results)
if __name__ == "__main__":
main()
企业日报/周报自动生成。从MEMORY.md和daily log自动生成结构化报告, 支持日报/周报/月报模板。
---
name: cn-report-generator
version: 1.0.0
description: |
企业日报/周报自动生成。从MEMORY.md和daily log自动生成结构化报告,
支持日报/周报/月报模板。
metadata:
openclaw:
emoji: "📊"
category: "productivity"
tags: ["report", "daily", "weekly", "automation", "chinese"]
---
# 企业日报/周报自动生成
## 功能概述
自动从工作记录生成结构化日报、周报、月报,告别手动整理。
## 使用方法
### 生成日报
```
帮我生成今天的日报
```
### 生成周报
```
帮我生成本周的周报
```
### 生成月报
```
帮我生成本月的月报
```
## 执行流程
### 日报生成
1. 读取今日工作记录(从`memory/YYYY-MM-DD.md`)
2. 提取完成事项、进行中事项、问题、明日计划
3. 按模板生成结构化日报
4. 保存到`~/reports/daily/YYYY-MM-DD.md`
### 周报生成
1. 读取本周7天的daily log
2. 汇总本周完成事项、待办事项、关键成果
3. 生成周报摘要
4. 保存到`~/reports/weekly/YYYY-WW.md`
### 月报生成
1. 读取本月所有daily log
2. 汇总月度成果、数据统计、问题复盘
3. 生成月报
4. 保存到`~/reports/monthly/YYYY-MM.md`
## 数据来源
- `MEMORY.md` — 长期记忆和任务清单
- `memory/YYYY-MM-DD.md` — 每日工作日志
- `memory/in_progress.md` — 进行中的任务
## 报告模板
### 日报模板
```markdown
# YYYY年MM月DD日 工作日报
## ✅ 今日完成
- [事项1]
- [事项2]
## 🚧 进行中
- [事项](预计完成时间)
## ⚠️ 问题与风险
- [问题描述]
## 📅 明日计划
- [计划事项]
```
### 周报模板
```markdown
# YYYY年第WW周 工作周报
## 本周成果
- [成果1]
- [成果2]
## 数据统计
- 完成事项:X项
- 进行中:Y项
- 问题解决:Z项
## 问题与解决方案
- [问题描述] → [解决方案]
## 下周计划
- [计划事项]
```
## 输出位置
- 日报:`~/reports/daily/YYYY-MM-DD.md`
- 周报:`~/reports/weekly/YYYY-WW.md`
- 月报:`~/reports/monthly/YYYY-MM.md`
## 注意事项
- 需要定期维护daily log(记录每日工作)
- 报告质量取决于工作记录的完整性
- 建议每天下班前记录当日工作
FILE:skill.json
{
"name": "cn-report-generator",
"version": "1.0.0",
"description": "企业日报/周报自动生成。从MEMORY.md和daily log自动生成结构化报告。",
"author": "freedompixels",
"tags": ["report", "daily", "weekly", "automation", "chinese"],
"trigger": {
"type": "intent",
"keywords": ["生成日报", "生成周报", "生成月报", "写日报", "写周报", "自动生成报告"]
}
}
FILE:scripts/generate_report.py
#!/usr/bin/env python3
"""
企业日报/周报自动生成脚本
用法:
python generate_report.py daily # 生成日报
python generate_report.py weekly # 生成周报
python generate_report.py monthly # 生成月报
"""
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import re
# 配置
WORKSPACE = Path.home() / ".qclaw" / "workspace"
MEMORY_FILE = WORKSPACE / "MEMORY.md"
DAILY_LOG_DIR = WORKSPACE / "memory"
IN_PROGRESS_FILE = WORKSPACE / "memory" / "in_progress.md"
REPORT_DIR = Path.home() / "reports"
def read_file(path):
"""读取文件内容"""
if not path.exists():
return ""
with open(path, "r", encoding="utf-8") as f:
return f.read()
def extract_tasks(text):
"""从文本中提取任务"""
completed = re.findall(r"- \[x\] (.+)", text)
in_progress = re.findall(r"- \[ \] (.+)", text)
return completed, in_progress
def extract_sections(text):
"""提取各部分内容"""
sections = {
"completed": [],
"in_progress": [],
"problems": [],
"plans": []
}
# 匹配完成事项
sections["completed"] = re.findall(r"- \[x\] (.+)", text)
# 匹配进行中
sections["in_progress"] = re.findall(r"- \[ \] (.+)", text)
# 匹配问题
sections["problems"] = re.findall(r"问题[::]\s*(.+)", text)
sections["problems"].extend(re.findall(r"⚠️\s*(.+)", text))
return sections
def generate_daily_report(date=None):
"""生成日报"""
if date is None:
date = datetime.now()
date_str = date.strftime("%Y-%m-%d")
# 读取今日daily log
daily_log = read_file(DAILY_LOG_DIR / f"{date_str}.md")
# 读取in_progress
in_progress = read_file(IN_PROGRESS_FILE)
# 读取MEMORY.md(获取长期任务)
memory = read_file(MEMORY_FILE)
# 提取内容
sections = extract_sections(daily_log + "\n" + in_progress)
# 生成报告
report = f"""# {date.strftime("%Y年%m月%d日")} 工作日报
## ✅ 今日完成
"""
if sections["completed"]:
for item in sections["completed"]:
report += f"- {item}\n"
else:
report += "- 无\n"
report += """
## 🚧 进行中
"""
if sections["in_progress"]:
for item in sections["in_progress"]:
report += f"- {item}\n"
else:
report += "- 无\n"
report += """
## ⚠️ 问题与风险
"""
if sections["problems"]:
for item in sections["problems"]:
report += f"- {item}\n"
else:
report += "- 无\n"
report += """
## 📅 明日计划
待填写...
---
*生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M")}*
*千策·千万·千成 🦞*
"""
return report, date_str
def generate_weekly_report(date=None):
"""生成周报"""
if date is None:
date = datetime.now()
# 计算本周起止日期
week_start = date - timedelta(days=date.weekday())
week_end = week_start + timedelta(days=6)
week_num = date.isocalendar()[1]
# 读取本周7天的daily log
all_logs = ""
for i in range(7):
day = week_start + timedelta(days=i)
log = read_file(DAILY_LOG_DIR / f"{day.strftime('%Y-%m-%d')}.md")
all_logs += log + "\n"
# 提取内容
sections = extract_sections(all_logs)
# 生成报告
report = f"""# {date.year}年第{week_num}周 工作周报
**时间范围**: {week_start.strftime("%Y-%m-%d")} ~ {week_end.strftime("%Y-%m-%d")}
## 本周成果
"""
if sections["completed"]:
for item in sections["completed"][:10]: # 最多显示10项
report += f"- {item}\n"
else:
report += "- 无\n"
report += """
## 数据统计
"""
report += f"- 完成事项:{len(sections['completed'])}项\n"
report += f"- 进行中:{len(sections['in_progress'])}项\n"
report += """
## 问题与解决方案
"""
if sections["problems"]:
for item in sections["problems"][:5]:
report += f"- {item}\n"
else:
report += "- 无\n"
report += """
## 下周计划
待填写...
---
*生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M")}*
*千策·千万·千成 🦞*
"""
return report, f"{date.year}-W{week_num:02d}"
def save_report(report, report_type, filename):
"""保存报告"""
# 创建目录
output_dir = REPORT_DIR / report_type
output_dir.mkdir(parents=True, exist_ok=True)
# 保存文件
output_path = output_dir / f"{filename}.md"
with open(output_path, "w", encoding="utf-8") as f:
f.write(report)
return output_path
def main():
if len(sys.argv) < 2:
print("用法: python generate_report.py <daily|weekly|monthly>")
sys.exit(1)
report_type = sys.argv[1]
if report_type == "daily":
report, filename = generate_daily_report()
output_path = save_report(report, "daily", filename)
print(f"✅ 日报已生成: {output_path}")
elif report_type == "weekly":
report, filename = generate_weekly_report()
output_path = save_report(report, "weekly", filename)
print(f"✅ 周报已生成: {output_path}")
elif report_type == "monthly":
print("⚠️ 月报功能开发中...")
else:
print(f"未知报告类型: {report_type}")
sys.exit(1)
if __name__ == "__main__":
main()
多平台内容自动发布。支持知乎回答/文章、小红书图文笔记一键发布。
---
name: cn-auto-publisher
version: 1.0.0
description: |
多平台内容自动发布。支持知乎回答/文章、小红书图文笔记一键发布。
metadata:
openclaw:
emoji: "🚀"
category: "content"
tags: ["publish", "zhihu", "xiaohongshu", "automation", "content"]
---
# 多平台内容自动发布
## 功能概述
一键将内容发布到知乎或小红书,告别重复操作。
**支持平台**:
- 知乎(回答 + 文章)
- 小红书(图文笔记)
> ⚠️ 微信公众号暂不支持:微信官方明确禁止自动化发布,有封号风险
## 使用方法
### 发布知乎回答
```
帮我把以下内容发布到知乎问题 https://www.zhihu.com/question/xxx:
[你的内容]
```
### 发布知乎文章
```
帮我在知乎发布一篇文章,标题是"xxx",内容如下:
[你的内容]
```
### 发布小红书笔记
```
帮我在小红书发布一篇笔记,标题是"xxx",内容如下:
[你的内容]
```
## 执行流程
### 知乎回答发布
1. 用户提供:问题URL + 回答内容
2. 使用浏览器自动化打开问题页面
3. 点击"写回答"按钮
4. 通过剪贴板粘贴写入内容(Draft.js编辑器兼容)
5. 点击"发布回答"
6. 返回发布结果和链接
### 知乎文章发布
1. 用户提供:文章标题 + 文章内容
2. 使用浏览器自动化打开写作页面
3. 填写标题和正文
4. 点击"发布"
5. 返回发布结果和链接
### 小红书笔记发布
1. 用户提供:笔记标题 + 正文 + 图片路径(可选)
2. 使用浏览器自动化打开小红书创作者页面
3. 上传图片(如有)
4. 填写标题和正文
5. 点击"发布"
6. 返回发布结果
## 前置条件
- 需要用户已在浏览器中登录知乎/小红书账号
- Cookie持久化存储在 `~/.qclaw/zhihu_cookies.json` 或 `~/.qclaw/xiaohongshu_cookies.json`
- 首次使用会检测登录状态,如未登录会提示用户手动登录
## 注意事项
- 发布频率建议:知乎每天不超过2篇,小红书每天不超过3篇
- 内容必须符合平台规则,避免违规
- 首次发布前请确认已登录对应平台
## 技术实现
- 浏览器自动化:Playwright + Chrome
- 内容写入:剪贴板粘贴(兼容Draft.js/TipTap等虚拟编辑器)
- 反检测:禁用WebDriver标识、自定义User-Agent
- Cookie持久化:JSON文件存储
FILE:package-lock.json
{
"name": "cn-auto-publisher",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "cn-auto-publisher",
"version": "1.0.0",
"license": "ISC",
"dependencies": {
"playwright": "^1.59.1"
}
},
"node_modules/fsevents": {
"version": "2.3.2",
"resolved": "https://registry.npmmirror.com/fsevents/-/fsevents-2.3.2.tgz",
"integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==",
"hasInstallScript": true,
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": "^8.16.0 || ^10.6.0 || >=11.0.0"
}
},
"node_modules/playwright": {
"version": "1.59.1",
"resolved": "https://registry.npmmirror.com/playwright/-/playwright-1.59.1.tgz",
"integrity": "sha512-C8oWjPR3F81yljW9o5OxcWzfh6avkVwDD2VYdwIGqTkl+OGFISgypqzfu7dOe4QNLL2aqcWBmI3PMtLIK233lw==",
"license": "Apache-2.0",
"dependencies": {
"playwright-core": "1.59.1"
},
"bin": {
"playwright": "cli.js"
},
"engines": {
"node": ">=18"
},
"optionalDependencies": {
"fsevents": "2.3.2"
}
},
"node_modules/playwright-core": {
"version": "1.59.1",
"resolved": "https://registry.npmmirror.com/playwright-core/-/playwright-core-1.59.1.tgz",
"integrity": "sha512-HBV/RJg81z5BiiZ9yPzIiClYV/QMsDCKUyogwH9p3MCP6IYjUFu/MActgYAvK0oWyV9NlwM3GLBjADyWgydVyg==",
"license": "Apache-2.0",
"bin": {
"playwright-core": "cli.js"
},
"engines": {
"node": ">=18"
}
}
}
}
FILE:package.json
{
"name": "cn-auto-publisher",
"version": "1.0.0",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"description": "",
"dependencies": {
"playwright": "^1.59.1"
}
}
FILE:scripts/xiaohongshu_publish.js
/**
* 小红书笔记自动发布脚本
*
* 用法: node xiaohongshu_publish.js <标题> <正文文件> [图片路径]
*
* Cookie文件:~/.qclaw/xiaohongshu_cookies.json
*/
const { chromium } = require('playwright');
const fs = require('fs');
const path = require('path');
const COOKIE_FILE = path.join(process.env.HOME, '.qclaw', 'xiaohongshu_cookies.json');
const USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36';
async function createBrowser() {
const browser = await chromium.launch({
channel: 'chrome',
headless: false,
args: ['--disable-blink-features=AutomationControlled'],
});
const context = await browser.newContext({
userAgent: USER_AGENT,
viewport: { width: 1280, height: 800 },
});
await context.addInitScript(() => {
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
window.chrome = { runtime: {} };
});
if (fs.existsSync(COOKIE_FILE)) {
const cookies = JSON.parse(fs.readFileSync(COOKIE_FILE, 'utf8'));
await context.addCookies(cookies);
}
return { browser, context };
}
async function saveCookies(context) {
const cookies = await context.cookies();
fs.writeFileSync(COOKIE_FILE, JSON.stringify(cookies, null, 2));
}
/**
* 发布小红书笔记
*/
async function publishNote(title, content, imagePath = null) {
const { browser, context } = await createBrowser();
const page = await context.newPage();
try {
console.log('📖 打开小红书创作者页面...');
await page.goto('https://creator.xiaohongshu.com/publish/publish', {
waitUntil: 'networkidle',
timeout: 30000
});
await page.waitForTimeout(3000);
// 检查是否需要登录
const loginBtn = await page.$('text=登录');
if (loginBtn) {
console.log('⚠️ 需要登录小红书');
// 等待用户扫码登录
console.log('请在浏览器中扫码登录...');
await page.waitForTimeout(60000); // 给用户1分钟登录时间
}
// 上传图片(如果有)
if (imagePath && fs.existsSync(imagePath)) {
console.log('📷 上传图片...');
const uploadInput = await page.$('input[type="file"]');
if (uploadInput) {
await uploadInput.setInputFiles(imagePath);
await page.waitForTimeout(5000); // 等待上传完成
}
}
// 填写标题
const titleInput = await page.$('input[placeholder*="标题"]') ||
await page.$('.title-input') ||
await page.$('#title-input');
if (titleInput) {
await titleInput.fill(title);
console.log(`✅ 标题已填写: title`);
}
// 填写正文 - 小红书用的是TipTap编辑器
let editor = await page.$('.ProseMirror') ||
await page.$('[contenteditable="true"]') ||
await page.$('.ql-editor');
if (!editor) {
throw new Error('未找到编辑器');
}
await editor.click();
await page.waitForTimeout(500);
// 写入内容
await page.evaluate((text) => navigator.clipboard.writeText(text), content);
await page.waitForTimeout(300);
await page.keyboard.press('Meta+v');
await page.waitForTimeout(3000);
console.log('✅ 正文已写入');
// 发布
const publishBtn = await page.$('button:has-text("发布")') ||
await page.$('.publishBtn');
if (publishBtn) {
await publishBtn.click();
await page.waitForTimeout(5000);
console.log('✅ 发布成功!');
} else {
console.log('⚠️ 未找到发布按钮,请手动发布');
}
const resultUrl = page.url();
await saveCookies(context);
await browser.close();
return { success: true, url: resultUrl };
} catch (err) {
await saveCookies(context);
await browser.close();
return { success: false, error: err.message };
}
}
// CLI入口
(async () => {
const title = process.argv[2];
const contentFile = process.argv[3];
const imagePath = process.argv[4];
if (!title || !contentFile) {
console.log('用法: node xiaohongshu_publish.js <标题> <正文文件> [图片路径]');
process.exit(1);
}
const content = fs.readFileSync(contentFile, 'utf8').trim();
const result = await publishNote(title, content, imagePath);
console.log(JSON.stringify(result));
})();
module.exports = { publishNote };
FILE:scripts/zhihu_publish.js
/**
* 知乎内容自动发布脚本
*
* 支持两种模式:
* 1. 回答问题:node zhihu_publish.js answer <问题URL> <回答内容文件>
* 2. 发布文章:node zhihu_publish.js article <标题> <文章内容文件>
*
* Cookie文件:~/.qclaw/zhihu_cookies.json
*/
const { chromium } = require('playwright');
const fs = require('fs');
const path = require('path');
const COOKIE_FILE = path.join(process.env.HOME, '.qclaw', 'zhihu_cookies.json');
const USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36';
async function createBrowser() {
const browser = await chromium.launch({
channel: 'chrome',
headless: false,
args: ['--disable-blink-features=AutomationControlled'],
});
const context = await browser.newContext({
userAgent: USER_AGENT,
viewport: { width: 1280, height: 800 },
});
await context.addInitScript(() => {
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
window.chrome = { runtime: {} };
Object.defineProperty(navigator, 'languages', { get: () => ['zh-CN', 'zh', 'en'] });
});
// 加载Cookie
if (fs.existsSync(COOKIE_FILE)) {
const cookies = JSON.parse(fs.readFileSync(COOKIE_FILE, 'utf8'));
await context.addCookies(cookies);
}
return { browser, context };
}
async function saveCookies(context) {
const cookies = await context.cookies();
fs.writeFileSync(COOKIE_FILE, JSON.stringify(cookies, null, 2));
}
/**
* 发布知乎回答
*/
async function publishAnswer(questionUrl, content) {
const { browser, context } = await createBrowser();
const page = await context.newPage();
try {
console.log(`📖 访问题目: questionUrl`);
await page.goto(questionUrl, { waitUntil: 'networkidle', timeout: 30000 });
await page.waitForTimeout(3000);
// 找到问题区域的"写回答"按钮
console.log('✍️ 点击写回答...');
const btns = await page.$$('button');
let targetBtn = null;
let maxY = 0;
for (const btn of btns) {
const text = await btn.textContent();
if (text?.trim().includes('写回答')) {
const box = await btn.boundingBox();
if (box && box.y > 100 && box.y > maxY) {
maxY = box.y;
targetBtn = btn;
}
}
}
if (!targetBtn) {
throw new Error('未找到写回答按钮,请确认已登录知乎');
}
await targetBtn.click();
console.log(`✅ 点击了写回答`);
// 等待编辑器
let editor = null;
for (let i = 0; i < 15; i++) {
editor = await page.$('.public-DraftEditor-content') ||
await page.$('[contenteditable="true"]');
if (editor) break;
await page.waitForTimeout(1000);
}
if (!editor) {
throw new Error('编辑器未出现');
}
console.log('✅ 编辑器就绪');
// 写入内容
await editor.click();
await page.waitForTimeout(500);
await page.evaluate((text) => navigator.clipboard.writeText(text), content);
await page.waitForTimeout(300);
await page.keyboard.press('Meta+v');
await page.waitForTimeout(3000);
// 验证写入
const editorText = await editor.evaluate(el => el.innerText);
console.log(`📝 已写入: editorText.length 字符`);
if (editorText.length < 10) {
// 备用方法
await editor.click();
await page.evaluate((text) => {
document.execCommand('selectAll', false, null);
document.execCommand('insertText', false, text);
}, content);
await page.waitForTimeout(2000);
}
// 发布
let publishBtn = null;
for (let i = 0; i < 10; i++) {
publishBtn = await page.$('button:has-text("发布回答")') ||
await page.$('button:has-text("发布")');
if (publishBtn) break;
await page.waitForTimeout(1000);
}
if (!publishBtn) {
throw new Error('未找到发布按钮');
}
await publishBtn.click();
await page.waitForTimeout(5000);
const resultUrl = page.url();
await saveCookies(context);
await browser.close();
return { success: true, url: resultUrl, type: 'answer' };
} catch (err) {
await saveCookies(context);
await browser.close();
return { success: false, error: err.message };
}
}
/**
* 发布知乎文章
*/
async function publishArticle(title, content) {
const { browser, context } = await createBrowser();
const page = await context.newPage();
try {
console.log(`📝 打开写作页面...`);
await page.goto('https://zhuanlan.zhihu.com/write', { waitUntil: 'networkidle', timeout: 30000 });
await page.waitForTimeout(3000);
// 填写标题
const titleInput = await page.$('input[placeholder*="标题"]') ||
await page.$('.WriteIndex-titleInput input');
if (titleInput) {
await titleInput.fill(title);
console.log(`✅ 标题已填写: title`);
}
// 填写正文
let editor = await page.$('.public-DraftEditor-content') ||
await page.$('[contenteditable="true"]');
if (!editor) {
throw new Error('文章编辑器未出现');
}
await editor.click();
await page.waitForTimeout(500);
await page.evaluate((text) => navigator.clipboard.writeText(text), content);
await page.waitForTimeout(300);
await page.keyboard.press('Meta+v');
await page.waitForTimeout(3000);
console.log('✅ 正文已写入');
// 发布
const publishBtn = await page.$('button:has-text("发布")');
if (publishBtn) {
await publishBtn.click();
await page.waitForTimeout(5000);
}
const resultUrl = page.url();
await saveCookies(context);
await browser.close();
return { success: true, url: resultUrl, type: 'article' };
} catch (err) {
await saveCookies(context);
await browser.close();
return { success: false, error: err.message };
}
}
// CLI入口
(async () => {
const mode = process.argv[2];
if (mode === 'answer') {
const questionUrl = process.argv[3];
const contentFile = process.argv[4];
if (!questionUrl || !contentFile) {
console.log('用法: node zhihu_publish.js answer <问题URL> <回答内容文件>');
process.exit(1);
}
const content = fs.readFileSync(contentFile, 'utf8').trim();
const result = await publishAnswer(questionUrl, content);
console.log(JSON.stringify(result));
} else if (mode === 'article') {
const title = process.argv[3];
const contentFile = process.argv[4];
if (!title || !contentFile) {
console.log('用法: node zhihu_publish.js article <标题> <文章内容文件>');
process.exit(1);
}
const content = fs.readFileSync(contentFile, 'utf8').trim();
const result = await publishArticle(title, content);
console.log(JSON.stringify(result));
} else {
console.log('用法: node zhihu_publish.js <answer|article> <参数...>');
process.exit(1);
}
})();
module.exports = { publishAnswer, publishArticle };
FILE:skill.json
{
"name": "cn-auto-publisher",
"version": "1.0.0",
"description": "多平台内容自动发布。支持知乎回答/文章、小红书图文笔记一键发布。",
"author": "freedompixels",
"tags": ["publish", "zhihu", "xiaohongshu", "automation", "content", "chinese"],
"trigger": {
"type": "intent",
"keywords": ["发布知乎", "发布小红书", "发布文章", "发布回答", "发布笔记", "多平台发布", "自动发布"]
}
}
中文番茄钟专注计时工具。开始25分钟专注时段,统计今日完成数量,本地存储无需账号。支持开始、暂停、继续、查看状态、统计等功能。
---
slug: cn-pomodoro-timer
name: 番茄专注计时
description: 中文番茄钟专注计时工具。开始25分钟专注时段,统计今日完成数量,本地存储无需账号。支持开始、暂停、继续、查看状态、统计等功能。
keywords: 番茄钟, 专注, 计时, pomodoro, 专注力, 时间管理, 番茄工作法
version: "1.0.0"
author: 千策
---
# 番茄专注计时
中文番茄钟专注计时工具,帮助你高效管理专注时间。
## 功能特性
- 🍅 标准25分钟专注时段
- ⏸️ 支持暂停和继续
- 📊 今日完成数量统计
- 💾 本地数据存储,无需账号
- 🔔 专注完成提醒
## 使用方法
### 开始专注
```
开始专注
开始番茄钟
```
### 查看状态
```
查看专注状态
番茄钟状态
```
### 暂停/继续
```
暂停专注
继续专注
```
### 统计数据
```
今日专注
专注统计
```
## 数据存储
专注记录保存在本地 `~/.qclaw/data/pomodoro.json`,包含:
- 开始时间
- 结束时间
- 是否完成
- 暂停时长
## 适用场景
- 需要提高专注力的工作/学习
- 番茄工作法实践者
- 时间管理爱好者
- 需要专注时间统计的用户
FILE:pomodoro.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
番茄专注计时工具 - 中文番茄钟
功能:开始/暂停/继续/查看状态/统计
"""
import json
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
# 配置
POMODORO_MINUTES = 25 # 专注时长(分钟)
DATA_DIR = Path.home() / ".qclaw" / "data"
DATA_FILE = DATA_DIR / "pomodoro.json"
def ensure_data_dir():
"""确保数据目录存在"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
def load_data():
"""加载数据"""
if not DATA_FILE.exists():
return {
"current": None, # 当前进行中的番茄钟
"history": [] # 历史记录
}
try:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {"current": None, "history": []}
def save_data(data):
"""保存数据"""
ensure_data_dir()
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def start_pomodoro():
"""开始新的番茄钟"""
data = load_data()
# 检查是否已有进行中的番茄钟
if data.get("current") and data["current"].get("status") == "active":
return "❌ 已有番茄钟在进行中!使用'查看专注状态'查看详情"
# 创建新的番茄钟
now = datetime.now()
pomodoro = {
"start_time": now.isoformat(),
"status": "active",
"paused_duration": 0, # 累计暂停时长(秒)
"pause_start": None # 暂停开始时间
}
data["current"] = pomodoro
save_data(data)
end_time = now + timedelta(minutes=POMODORO_MINUTES)
return f"🍅 番茄钟已开始!\n⏰ 预计结束时间:{end_time.strftime('%H:%M')}\n💪 专注25分钟,加油!"
def pause_pomodoro():
"""暂停番茄钟"""
data = load_data()
if not data.get("current"):
return "❌ 当前没有进行中的番茄钟"
current = data["current"]
if current["status"] != "active":
return "❌ 番茄钟未在运行中"
# 记录暂停开始时间
current["status"] = "paused"
current["pause_start"] = datetime.now().isoformat()
save_data(data)
return "⏸️ 番茄钟已暂停\n使用'继续专注'恢复计时"
def resume_pomodoro():
"""继续番茄钟"""
data = load_data()
if not data.get("current"):
return "❌ 当前没有进行中的番茄钟"
current = data["current"]
if current["status"] != "paused":
return "❌ 番茄钟未在暂停状态"
# 计算暂停时长
pause_start = datetime.fromisoformat(current["pause_start"])
pause_duration = (datetime.now() - pause_start).seconds
current["paused_duration"] += pause_duration
current["status"] = "active"
current["pause_start"] = None
save_data(data)
return "▶️ 番茄钟已继续\n专注继续,保持状态!"
def get_status():
"""查看当前番茄钟状态"""
data = load_data()
if not data.get("current"):
return "📭 当前没有进行中的番茄钟\n使用'开始专注'启动新的番茄钟"
current = data["current"]
start_time = datetime.fromisoformat(current["start_time"])
if current["status"] == "paused":
pause_start = datetime.fromisoformat(current["pause_start"])
pause_duration = current["paused_duration"] + (datetime.now() - pause_start).seconds
elapsed = (datetime.now() - start_time).seconds - pause_duration
remaining = POMODORO_MINUTES * 60 - elapsed
return f"⏸️ 番茄钟已暂停\n已专注:{elapsed // 60}分{elapsed % 60}秒\n剩余:{remaining // 60}分{remaining % 60}秒\n使用'继续专注'恢复"
# 计算已用时间和剩余时间
elapsed = (datetime.now() - start_time).seconds - current["paused_duration"]
remaining = POMODORO_MINUTES * 60 - elapsed
if remaining <= 0:
# 番茄钟已完成
return finish_pomodoro()
minutes = remaining // 60
seconds = remaining % 60
end_time = start_time + timedelta(minutes=POMODORO_MINUTES, seconds=current["paused_duration"])
return f"🍅 番茄钟进行中\n⏰ 剩余时间:{minutes}分{seconds}秒\n预计结束:{end_time.strftime('%H:%M')}\n专注中..."
def finish_pomodoro():
"""完成番茄钟"""
data = load_data()
if not data.get("current"):
return "❌ 当前没有进行中的番茄钟"
current = data["current"]
start_time = datetime.fromisoformat(current["start_time"])
# 记录到历史
record = {
"start_time": current["start_time"],
"end_time": datetime.now().isoformat(),
"completed": True,
"paused_duration": current["paused_duration"]
}
data["history"].append(record)
data["current"] = None
save_data(data)
# 计算实际专注时长
actual_duration = POMODORO_MINUTES + current["paused_duration"] // 60
return f"🎉 番茄钟完成!\n⏱️ 专注时长:{actual_duration}分钟\n🧘 记得休息5分钟\n💪 已完成今日 {len([h for h in data['history'] if h['start_time'][:10] == datetime.now().strftime('%Y-%m-%d')])} 个番茄钟"
def get_stats():
"""查看今日统计"""
data = load_data()
today = datetime.now().strftime('%Y-%m-%d')
today_records = [h for h in data["history"] if h["start_time"][:10] == today]
total_minutes = len(today_records) * POMODORO_MINUTES
stats = f"📊 今日专注统计\n\n"
stats += f"🍅 完成番茄钟:{len(today_records)} 个\n"
stats += f"⏱️ 专注时长:{total_minutes} 分钟\n"
if len(today_records) > 0:
stats += f"\n记录:\n"
for i, record in enumerate(today_records, 1):
start = datetime.fromisoformat(record["start_time"]).strftime('%H:%M')
end = datetime.fromisoformat(record["end_time"]).strftime('%H:%M')
stats += f"{i}. {start} - {end}\n"
return stats
def main():
"""主函数"""
if len(sys.argv) < 2:
# 默认行为:查看状态
print(get_status())
return
command = sys.argv[1].lower()
# 命令映射
commands = {
"start": start_pomodoro,
"开始": start_pomodoro,
"开始专注": start_pomodoro,
"pause": pause_pomodoro,
"暂停": pause_pomodoro,
"暂停专注": pause_pomodoro,
"resume": resume_pomodoro,
"继续": resume_pomodoro,
"继续专注": resume_pomodoro,
"status": get_status,
"状态": get_status,
"查看状态": get_status,
"专注状态": get_status,
"stats": get_stats,
"统计": get_stats,
"今日专注": get_stats,
"finish": finish_pomodoro,
"完成": finish_pomodoro,
}
# 查找命令
handler = None
for key in commands:
if key in command or command in key:
handler = commands[key]
break
if handler:
print(handler())
else:
print("❌ 未知命令")
print("可用命令:开始专注、暂停专注、继续专注、查看状态、今日专注")
if __name__ == "__main__":
main()
FILE:scripts/pomodoro_timer.py
#!/usr/bin/env python3
"""cn-pomodoro-timer - 番茄钟计时器"""
import time, sys
def run_pomodoro(work_minutes=25, break_minutes=5, rounds=4):
"""运行番茄钟
标准番茄工作法:
- 工作25分钟,休息5分钟
- 每4轮后长休息15-30分钟
Args:
work_minutes: 工作时长(分钟)
break_minutes: 短休息时长(分钟)
rounds: 轮数
"""
print(f"🍅 番茄钟开始!")
print(f" 工作: {work_minutes}分钟 | 休息: {break_minutes}分钟 | 轮数: {rounds}")
print("-" * 40)
for i in range(1, rounds + 1):
print(f"\n⏱ 第 {i}/{rounds} 轮 - 工作中 ({work_minutes}分钟)")
print(" Ctrl+C 提前结束")
remaining = work_minutes * 60
while remaining > 0:
mins = remaining // 60
secs = remaining % 60
print(f"\r 剩余: {mins:02d}:{secs:02d}", end='', flush=True)
time.sleep(1)
remaining -= 1
print(f"\n\n🔔 时间到!休息 {break_minutes} 分钟")
if i < rounds:
print(" 按回车开始休息...")
remaining = break_minutes * 60
while remaining > 0:
mins = remaining // 60
secs = remaining % 60
print(f"\r 休息剩余: {mins:02d}:{secs:02d}", end='', flush=True)
time.sleep(1)
remaining -= 1
print("\n\n✅ 完成了所有轮次!")
return True
if __name__ == '__main__':
run_pomodoro()
中文倒数日/纪念日计算器。记录重要日子,自动计算距离今天还有多少天,或已过去多少天。 支持生日、纪念日、考试倒计时、农历转换、情感标签、彩色输出。 当用户说"倒计时"、"纪念日"、"还有多少天"、"生日倒计时"、"距离XX还有多久"时触发。
---
name: cn-countdown
description: |
中文倒数日/纪念日计算器。记录重要日子,自动计算距离今天还有多少天,或已过去多少天。
支持生日、纪念日、考试倒计时、农历转换、情感标签、彩色输出。
当用户说"倒计时"、"纪念日"、"还有多少天"、"生日倒计时"、"距离XX还有多久"时触发。
keywords: [倒计时, 纪念日, 生日, 距离, 天数, countdown, 重要日子, 考试倒计时, anniversary]
metadata: {"openclaw": {"emoji": "📅"}}
---
# 📅 CN Countdown — 中文倒数日/纪念日计算器
记录重要日子,计算距离今天还有多少天,或已经过去了多少天。
## 核心功能
| 功能 | 说明 |
|------|------|
| 添加日子 | 添加任意重要日期,支持名称、标签 |
| 倒计时 | 显示距离目标日期还有多少天 |
| 已过天数 | 显示从某天起已经过去了多久 |
| 农历支持 | 自动识别农历生日并计算 |
| 列表展示 | 彩色表格,一目了然 |
| 编辑/删除 | 管理已记录的日子 |
## 使用方式
```bash
# 查看所有记录的日子(默认按倒计时排序)
python3 scripts/countdown.py --list
# 添加一个日子(默认算倒计时)
python3 scripts/countdown.py --add "春节" --date "2026-02-17" --tag "节日"
# 添加生日(自动标注年龄)
python3 scripts/countdown.py --add "妈妈的生日" --date "1965-05-20" --tag "生日"
# 添加纪念日(从那天起过了多久)
python3 scripts/countdown.py --add "在一起纪念日" --date "2020-09-01" --tag "纪念日"
# 添加考试倒计时
python3 scripts/countdown.py --add "高考" --date "2026-06-07" --tag "考试"
# 查看已过天数(从某天到现在)
python3 scripts/countdown.py --since "2020-01-01"
# 查看距离某天(从今天到目标)
python3 scripts/countdown.py --to "2026-07-01"
# 删除一条记录
python3 scripts/countdown.py --delete "春节"
# 编辑日子
python3 scripts/countdown.py --edit "春节" --new-date "2026-02-16"
```
## 数据存储
数据保存在 `~/.qclaw/workspace/countdown.json`,纯本地,无账户,无云端。
## 标签说明
- `生日` — 显示年龄和今年生日倒计时
- `纪念日` — 显示在一起/结婚等已过天数
- `考试` — 突出显示紧迫感
- `节日` — 农历/传统节日
- `目标` — 个人目标达成日
- `其他` — 自定义
FILE:scripts/countdown.py
#!/usr/bin/env python3
"""
中文倒数日/纪念日计算器 — cn-countdown
用法:
python3 countdown.py --list
python3 countdown.py --add "名称" --date "2026-06-07" --tag "考试"
python3 countdown.py --delete "名称"
python3 countdown.py --edit "名称" --new-date "2026-06-08"
python3 countdown.py --to "2026-06-07"
python3 countdown.py --since "2020-01-01"
"""
import json
import os
import sys
import argparse
from datetime import datetime, date
WORKSPACE = os.path.expanduser("~/.qclaw/workspace")
DATA_FILE = os.path.join(WORKSPACE, "countdown.json")
# 颜色定义
C_RESET = "\033[0m"
C_RED = "\033[91m"
C_GREEN = "\033[92m"
C_YELLOW = "\033[93m"
C_BLUE = "\033[94m"
C_PURPLE = "\033[95m"
C_CYAN = "\033[96m"
C_WHITE = "\033[97m"
C_BOLD = "\033[1m"
C_DIM = "\033[2m"
# 标签颜色
TAG_COLORS = {
"生日": C_PURPLE,
"纪念日": C_RED,
"考试": C_YELLOW,
"节日": C_CYAN,
"目标": C_GREEN,
"其他": C_WHITE,
}
MONTH_NAMES = {
1:"一月",2:"二月",3:"三月",4:"四月",5:"五月",6:"六月",
7:"七月",8:"八月",9:"九月",10:"十月",11:"十一月",12:"十二月"
}
DAY_NAMES = {
0:"周一",1:"周二",2:"周三",3:"周四",4:"周五",5:"周六",6:"周日"
}
def load():
if os.path.exists(DATA_FILE):
try:
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
pass
return {"events": []}
def save(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def today():
return date.today()
def parse_date(s):
"""解析 YYYY-MM-DD 格式"""
for fmt in ("%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d"):
try:
return datetime.strptime(s.strip(), fmt).date()
except ValueError:
pass
raise ValueError(f"日期格式不正确: {s},请使用 YYYY-MM-DD")
def parse_ymd(s):
"""解析纯 YYYY-MM 格式(月/日不足补1)"""
for fmt in ("%Y-%m", "%Y/%m"):
try:
dt = datetime.strptime(s.strip(), fmt)
return dt.replace(day=1).date()
except ValueError:
pass
raise ValueError(f"日期格式不正确: {s}")
def lunar_check(date_str):
"""检测是否像农历生日(只给月日,如 05-20)"""
if len(date_str) <= 5 and date_str.count("-") == 1:
return True
if len(date_str) <= 5 and date_str.count("/") == 1:
return True
return False
def format_weekday(d):
wd = d.weekday()
return DAY_NAMES[wd]
def format_full_date(d):
return f"{d.year}年{d.month}月{d.day}日"
def format_chinese_date(d):
return f"{d.year}年{MONTH_NAMES[d.month]}{d.day}日"
def days_diff(d1, d2):
return (d2 - d1).days
def add_event(name, date_str, tag="其他", note=""):
"""添加一个日子"""
data = load()
# 检查重复
for ev in data["events"]:
if ev["name"] == name:
print(f"\n⚠️ 「{name}」已存在,使用 --edit 或 --delete 管理")
return
target_date = parse_date(date_str)
event = {
"name": name,
"date": date_str.strip(),
"tag": tag,
"note": note,
"added": today().isoformat()
}
data["events"].append(event)
save(data)
t = target_date
weekday = format_weekday(t)
chinese_date = format_chinese_date(t)
print(f"\n✅ 已添加「{C_BOLD}{C_CYAN}{name}{C_RESET}」")
print(f" 📅 {t.year}年{MONTH_NAMES[t.month]}{t.day}日 {weekday}")
print(f" 🏷️ {tag} | 添加于 {today().isoformat()}")
# 立即显示倒计时
display_single(event, brief=True)
def display_single(event, brief=False):
"""展示单个事件的倒计时/已过"""
try:
target = parse_date(event["date"])
except ValueError:
print(f" ⚠️ 日期格式有误: {event['date']}")
return
diff = days_diff(today(), target)
tag = event.get("tag", "其他")
tag_color = TAG_COLORS.get(tag, C_WHITE)
if diff == 0:
status = f"{C_BOLD}{C_GREEN}🎉 今天就是这一天!{C_RESET}"
elif diff == 1:
status = f"{C_YELLOW}⏳ 明天就是这一天!{C_RESET}"
elif diff == -1:
status = f"{C_BLUE}🕐 昨天是这一天{C_RESET}"
elif diff > 0:
years = diff // 365
months = (diff % 365) // 30
days_left = diff % 30
if years > 0:
time_str = f"{years}年{months}月" if months > 0 else f"{years}年"
elif months > 0:
time_str = f"{months}个月{days_left}天" if days_left > 0 else f"{months}个月"
else:
time_str = f"{diff}天"
status = f"{C_BOLD}{C_RED}倒计时 {diff} 天{C_RESET}(约 {time_str})"
else:
past_days = abs(diff)
years = past_days // 365
months = (past_days % 365) // 30
days_left = past_days % 30
if years > 0:
time_str = f"{years}年{months}月" if months > 0 else f"{years}年"
elif months > 0:
time_str = f"{months}个月{days_left}天" if days_left > 0 else f"{months}个月"
else:
time_str = f"{past_days}天"
status = f"{C_GREEN}已过去 {past_days} 天{C_RESET}(约 {time_str})"
if brief:
print(f" {status}")
else:
target = parse_date(event["date"])
print(f"\n {C_BOLD}{tag_color}【{event['name']}】{C_RESET}")
print(f" 📅 {format_full_date(target)} {format_weekday(target)}")
print(f" 🏷️ {tag_color}{tag}{C_RESET} {status}")
if event.get("note"):
print(f" 📝 {event['note']}")
def list_events(sort_by="countdown"):
"""列出所有日子"""
data = load()
if not data["events"]:
print(f"\n{C_DIM}还没有记录任何日子~{C_RESET}")
print(f" 使用 {C_CYAN}--add \"名称\" --date \"YYYY-MM-DD\"{C_RESET} 添加第一个!")
print()
return
events = data["events"]
# 计算倒计时并排序
def get_diff(ev):
try:
return days_diff(today(), parse_date(ev["date"]))
except ValueError:
return 999999
if sort_by == "countdown":
events = sorted(events, key=get_diff)
elif sort_by == "name":
events = sorted(events, key=lambda x: x["name"])
# 标题
today_d = today()
print(f"\n{C_BOLD}{'='*50}{C_RESET}")
print(f"{C_BOLD}{C_CYAN} 📅 倒数日历 | 今天是 {today_d.year}年{today_d.month}月{today_d.day}日 "
f"{format_weekday(today_d)}{C_RESET}")
print(f"{C_BOLD}{'='*50}{C_RESET}\n")
# 分区:已过 / 即将到来 / 未来
upcoming = []
past = []
today_events = []
for ev in events:
try:
diff = days_diff(today(), parse_date(ev["date"]))
except ValueError:
continue
if diff == 0:
today_events.append(ev)
elif diff > 0:
upcoming.append((ev, diff))
else:
past.append((ev, abs(diff)))
# 今日
if today_events:
print(f"{C_BOLD}{C_GREEN}🎉 今天!{C_RESET}")
for ev in today_events:
display_single(ev)
# 即将到来(30天内)
upcoming_30 = [(ev, d) for ev, d in upcoming if d <= 30]
if upcoming_30:
print(f"\n{C_BOLD}{C_YELLOW}⏳ 即将到来(30天内){'='*20}{C_RESET}")
for ev, diff in upcoming_30:
tag_color = TAG_COLORS.get(ev.get("tag", "其他"), C_WHITE)
bar = "█" * min(diff, 30)
print(f" {tag_color}●{C_RESET} {C_BOLD}{ev['name']}{C_RESET}"
f" {C_RED}倒计时 {diff} 天{C_RESET} [{bar}{' '*(30-len(bar))}]")
print()
# 全部倒计时(未来)
if upcoming:
print(f"\n{C_BOLD}{C_BLUE}📆 未来倒计时{'='*27}{C_RESET}")
print(f" {'名称':<12} {'目标日期':<14} {'倒计时':>6} {'标签'}")
print(f" {'-'*12} {'-'*14} {'-'*6} {'-'*6}")
for ev, diff in upcoming:
try:
t = parse_date(ev["date"])
except ValueError:
continue
tag_color = TAG_COLORS.get(ev.get("tag", "其他"), C_WHITE)
tag = ev.get("tag", "其他")
print(f" {C_BOLD}{ev['name']:<12}{C_RESET} {t.isoformat()} {C_RED}{diff:>5}天{C_RESET} {tag_color}{tag}{C_RESET}")
print()
# 已过去
if past:
print(f"\n{C_BOLD}{C_GREEN}📆 已过去{'='*30}{C_RESET}")
print(f" {'名称':<12} {'日期':<14} {'已过':>6} {'标签'}")
print(f" {'-'*12} {'-'*14} {'-'*6} {'-'*6}")
for ev, diff in past:
try:
t = parse_date(ev["date"])
except ValueError:
continue
tag_color = TAG_COLORS.get(ev.get("tag", "其他"), C_WHITE)
tag = ev.get("tag", "其他")
print(f" {C_BOLD}{ev['name']:<12}{C_RESET} {t.isoformat()} {C_GREEN}{diff:>5}天{C_RESET} {tag_color}{tag}{C_RESET}")
print()
# 统计
print(f"{C_BOLD}{'─'*50}{C_RESET}")
print(f" 共 {len(events)} 个记录 | "
f"{C_GREEN}已过 {len(past)} 个{C_RESET} | "
f"{C_RED}即将 {len(upcoming)} 个{C_RESET}")
def delete_event(name):
"""删除一个日子"""
data = load()
for i, ev in enumerate(data["events"]):
if ev["name"] == name:
data["events"].pop(i)
save(data)
print(f"\n🗑️ 已删除「{C_BOLD}{C_YELLOW}{name}{C_RESET}」")
return
print(f"\n⚠️ 未找到「{name}」,使用 --list 查看所有记录")
def edit_event(name, new_date=None, new_name=None, new_tag=None, new_note=None):
"""编辑一个日子"""
data = load()
for ev in data["events"]:
if ev["name"] == name:
if new_name:
ev["name"] = new_name
if new_date:
ev["date"] = new_date.strip()
if new_tag:
ev["tag"] = new_tag
if new_note:
ev["note"] = new_note
save(data)
print(f"\n✏️ 已更新「{C_BOLD}{C_CYAN}{ev['name']}{C_RESET}」")
display_single(ev)
return
print(f"\n⚠️ 未找到「{name}」")
def quick_to(target_date_str):
"""快速计算从今天到某天"""
target = parse_date(target_date_str)
diff = days_diff(today(), target)
print(f"\n{C_BOLD}📅 距离 {target.year}年{target.month}月{target.day}日 "
f"{format_weekday(target)} {C_RESET}")
if diff == 0:
print(f" {C_BOLD}{C_GREEN}🎉 今天!{C_RESET}")
elif diff > 0:
years = diff // 365
months = (diff % 365) // 30
days = diff % 30
parts = []
if years: parts.append(f"{years}年")
if months: parts.append(f"{months}个月")
if days or not parts: parts.append(f"{days}天")
time_str = "".join(parts)
print(f" {C_RED}{C_BOLD}倒计时 {diff} 天{C_RESET}")
print(f" 约 {time_str}({diff}天)")
else:
past = abs(diff)
print(f" {C_GREEN}已过去 {past} 天{C_RESET}")
def quick_since(start_date_str):
"""快速计算从某天到今天"""
start = parse_date(start_date_str)
diff = days_diff(start, today())
print(f"\n{C_BOLD}📅 从 {start.year}年{start.month}月{start.day}日 "
f"{format_weekday(start)} 到今天{C_RESET}")
years = diff // 365
months = (diff % 365) // 30
days = diff % 30
parts = []
if years: parts.append(f"{years}年")
if months: parts.append(f"{months}个月")
if days or not parts: parts.append(f"{days}天")
time_str = "".join(parts)
print(f" {C_GREEN}{C_BOLD}已过去 {diff} 天{C_RESET}")
print(f" 约 {time_str}")
def print_help():
print("""
╔══════════════════════════════════════════════════════╗
║ 📅 CN Countdown — 倒数日/纪念日计算器 ║
╠══════════════════════════════════════════════════════╣
║ ║
║ 查看列表: --list ║
║ 添加日子: --add "名称" --date "YYYY-MM-DD" ║
║ --tag "生日/纪念日/考试/节日/目标/其他" ║
║ --note "备注" ║
║ ║
║ 删除日子: --delete "名称" ║
║ 编辑日子: --edit "名称" --new-date "YYYY-MM-DD" ║
║ ║
║ 快速倒计时: --to "YYYY-MM-DD" ║
║ 快速已过: --since "YYYY-MM-DD" ║
║ ║
║ 示例: ║
║ python3 countdown.py --add "高考" --date "2026-06-07" --tag "考试"║
║ python3 countdown.py --add "在一起" --date "2020-09-01" --tag "纪念日"║
║ python3 countdown.py --list ║
║ ║
╚══════════════════════════════════════════════════════╝
""")
def main():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--add", dest="add_name", help="添加日子名称")
parser.add_argument("--date", dest="date", help="目标日期 YYYY-MM-DD")
parser.add_argument("--tag", dest="tag", default="其他",
help="标签: 生日/纪念日/考试/节日/目标/其他")
parser.add_argument("--note", dest="note", default="", help="备注")
parser.add_argument("--list", dest="list", action="store_true", help="列出所有日子")
parser.add_argument("--delete", dest="delete", help="删除日子")
parser.add_argument("--edit", dest="edit", help="编辑日子名称")
parser.add_argument("--new-date", dest="new_date", help="新日期")
parser.add_argument("--new-name", dest="new_name", help="新名称")
parser.add_argument("--new-tag", dest="new_tag", help="新标签")
parser.add_argument("--to", dest="to_date", help="快速倒计时到某天")
parser.add_argument("--since", dest="since_date", help="快速计算从某天到今天")
parser.add_argument("--help", dest="help", action="store_true")
args = parser.parse_args()
if args.help:
print_help()
return
# 快速倒计时
if args.to_date:
try:
quick_to(args.to_date)
except ValueError as e:
print(f"⚠️ {e}")
return
# 快速已过
if args.since_date:
try:
quick_since(args.since_date)
except ValueError as e:
print(f"⚠️ {e}")
return
# 添加
if args.add_name:
if not args.date:
print("⚠️ 添加日子需要 --date 参数,格式:YYYY-MM-DD")
return
try:
add_event(args.add_name, args.date, args.tag, args.note)
except ValueError as e:
print(f"⚠️ {e}")
return
# 删除
if args.delete:
delete_event(args.delete)
return
# 编辑
if args.edit:
edit_event(args.edit, args.new_date, args.new_name, args.new_tag, args.note)
return
# 列出
if args.list:
list_events()
return
# 默认:列出
list_events()
if __name__ == "__main__":
main()
社交媒体智能回复助手。为内容创作者批量生成高质量评论回复,提升账号互动率。 支持多平台风格适配(小红书、抖音、微博、B站、知乎),多种回复策略(幽默、专业、温暖、高情商、引导互动)。 可维护个人风格模板,一键生成符合人设的回复。 当用户说"帮我回评论"、"回复评论"、"生成回复"、"评论回复"、"怎么回这条评论"...
---
name: cn-social-reply
description: |
社交媒体智能回复助手。为内容创作者批量生成高质量评论回复,提升账号互动率。
支持多平台风格适配(小红书、抖音、微博、B站、知乎),多种回复策略(幽默、专业、温暖、高情商、引导互动)。
可维护个人风格模板,一键生成符合人设的回复。
当用户说"帮我回评论"、"回复评论"、"生成回复"、"评论回复"、"怎么回这条评论"、
"帮我回粉丝"、"互动回复"、"小红书回复"、"抖音回复"时触发。
Keywords: 评论回复, 社交媒体, 互动, 小红书, 抖音, 微博, B站, 知乎, 回复生成, 粉丝互动, engagement, reply.
metadata: {"openclaw": {"emoji": "💬"}}
---
# 社交媒体智能回复助手
为内容创作者智能生成评论回复,提升互动率、维护粉丝关系。
## 🎯 核心能力
| 功能 | 说明 |
|------|------|
| **单条回复** | 输入评论 + 上下文,生成精准回复 |
| **批量回复** | 一次处理多条评论,统一风格 |
| **风格切换** | 幽默 / 专业 / 温暖 / 高情商 / 互动引导 / 毒舌俏皮 |
| **平台适配** | 自动匹配平台调性(小红书emoji风 / 抖音口语化 / 知乎理性风等) |
| **风格模板** | 保存/加载个人回复风格偏好 |
## 📋 使用方式
### 方式一:直接给出评论
```
用户:帮我回这条评论 "你皮肤真好,用的什么护肤品?"
```
Agent 会分析评论意图,直接生成合适回复。
### 方式二:指定平台+风格
```
用户:小红书评论回复,幽默风格:
1. "姐妹你这个妆也太好看了吧!"
2. "求教程!急!"
3. "口红什么色号啊"
```
Agent 会按指定平台风格批量生成回复。
### 方式三:附带帖子上下文
```
用户:我刚发了一篇测评iPhone 16的文章,这几条评论帮我回一下:
1. "电池续航真的提升了吗?"
2. "对比安卓优势在哪?"
3. "值不值得从14升级?"
```
### 方式四:管理风格模板
```
用户:保存我的回复风格 - 偏专业但不枯燥,偶尔抛梗,字数控制在20字以内
用户:加载我的回复风格
```
## 🧠 回复策略详解
### 1. 幽默风格 😄
- 适度玩梗,自嘲/调侃,拉近距离
- 例:评论"太厉害了吧" → "主要是脸皮够厚😂 试试你也行"
### 2. 专业风格 🧐
- 认真回答,展现专业度,适当科普
- 例:评论"这个真的有用吗" → "从成分角度看,XX确实有XX临床数据支撑,但效果因人而异"
### 3. 温暖风格 🥰
- 走心回应,表达感谢,建立情感连接
- 例:评论"好喜欢你的内容" → "这句话是我今天收到最好的礼物,谢谢你在❤️"
### 4. 高情商风格 🎯
- 巧妙化解尴尬,照顾多方感受,格局打开
- 例:评论"我觉得你这个不如XX" → "XX确实很棒!我也要多学习,各有各的好✨"
### 5. 互动引导风格 🚀
- 回复同时引导更多互动(提问、投票、@好友)
- 例:评论"这也太好看了" → "你更喜欢A还是B?评论区告诉我~"
### 6. 毒舌俏皮风格 😏
- 带点小刺但有趣,适合个人风格鲜明的博主
- 例:评论"你怎么什么都会" → "会的多是因为……选择困难症😂"
## 📱 平台风格适配
| 平台 | 调性特征 | emoji使用 | 字数建议 |
|------|----------|-----------|----------|
| 小红书 | 闺蜜感、热情、emoji丰富 | 多,姐妹/宝/亲 | 15-40字 |
| 抖音 | 口语化、接地气、网感 | 中等,哈哈/笑死 | 10-30字 |
| 微博 | 简洁犀利、时事感 | 少,偶尔一个 | 10-25字 |
| B站 | 二次元、梗多、真诚 | 中等,awsl/草 | 15-35字 |
| 知乎 | 理性、有逻辑、深度 | 极少 | 30-80字 |
## 🔄 工作流程
Agent 收到回复请求后,按以下步骤执行:
1. **分析评论**:识别评论类型(提问/夸赞/吐槽/质疑/无关)
2. **确定平台**:用户指定或从上下文推断,默认小红书
3. **选择风格**:用户指定或从已保存模板加载,默认温暖风格
4. **生成回复**:按平台+风格生成 1-3 个候选回复
5. **展示结果**:带编号列出,用户可要求调整
## 📁 数据存储
使用 `scripts/style_manager.py` 管理风格模板:
```bash
# 保存风格
python3 {SKILL_DIR}/scripts/style_manager.py save --name "我的风格" --style "幽默" --extra "字数20字以内,偶尔抛梗,喜欢用[doge]表情"
# 列出已保存的风格
python3 {SKILL_DIR}/scripts/style_manager.py list
# 加载风格
python3 {SKILL_DIR}/scripts/style_manager.py load --name "我的风格"
# 删除风格
python3 {SKILL_DIR}/scripts/style_manager.py delete --name "我的风格"
```
风格数据存储在 `~/.qclaw/workspace/cn-social-reply/styles.json`
## ⚡ 快速开始示例
**最简用法:**
```
帮我回这条评论:"写得真好!关注了!"
```
**进阶用法:**
```
小红书风格,帮我批量回复:
1. "姐妹你也太会穿了吧"
2. "链接链接链接!"
3. "这是P过的吧"
4. "请问多高呀"
```
**人设锁定:**
```
我是一个科技博主,风格偏专业但轻松,帮我回:
"这篇文章不太客观吧,感觉在给苹果洗地"
```
## ⚠️ 注意事项
- 回复应该像真人,避免机器人感(不要每次都"谢谢您的关注")
- 遇到恶意评论/杠精,建议高情商化解而非对骂
- 不确定回复是否合适时,给用户多个选项让其选择
- 尊重用户人设,不要生成与博主风格冲突的回复
FILE:scripts/style_manager.py
#!/usr/bin/env python3
"""cn-social-reply 风格模板管理器"""
import argparse
import json
import os
import sys
STYLES_DIR = os.path.expanduser("~/.qclaw/workspace/cn-social-reply")
STYLES_FILE = os.path.join(STYLES_DIR, "styles.json")
def _ensure_dir():
os.makedirs(STYLES_DIR, exist_ok=True)
def _load() -> dict:
_ensure_dir()
if os.path.exists(STYLES_FILE):
with open(STYLES_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"styles": {}}
def _save(data: dict):
_ensure_dir()
with open(STYLES_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def cmd_save(args):
data = _load()
name = args.name
data["styles"][name] = {
"style": args.style,
"extra": args.extra or "",
"platform": args.platform or "通用",
"created": args.created,
}
_save(data)
print(f"✅ 风格模板「{name}」已保存")
print(f" 策略: {args.style} | 平台: {args.platform or '通用'}")
if args.extra:
print(f" 备注: {args.extra}")
def cmd_list(args):
data = _load()
styles = data.get("styles", {})
if not styles:
print("📭 还没有保存任何风格模板")
print(" 用 save 命令创建:python3 style_manager.py save --name \"我的风格\" --style 温暖")
return
print(f"📋 已保存 {len(styles)} 个风格模板:\n")
for i, (name, info) in enumerate(styles.items(), 1):
print(f" {i}. {name}")
print(f" 策略: {info.get('style', '未设置')} | 平台: {info.get('platform', '通用')}")
if info.get("extra"):
print(f" 备注: {info['extra']}")
print(f" 创建: {info.get('created', '未知')}")
print()
def cmd_load(args):
data = _load()
styles = data.get("styles", {})
name = args.name
if name not in styles:
available = ", ".join(styles.keys()) if styles else "无"
print(f"❌ 未找到风格模板「{name}」")
print(f" 已有模板: {available}")
sys.exit(1)
info = styles[name]
print(f"✅ 已加载风格模板「{name}」")
print(json.dumps(info, ensure_ascii=False, indent=2))
def cmd_delete(args):
data = _load()
styles = data.get("styles", {})
name = args.name
if name not in styles:
print(f"❌ 未找到风格模板「{name}」")
sys.exit(1)
del styles[name]
_save(data)
print(f"🗑️ 风格模板「{name}」已删除")
def main():
from datetime import datetime
parser = argparse.ArgumentParser(description="cn-social-reply 风格模板管理")
sub = parser.add_subparsers(dest="command")
# save
p_save = sub.add_parser("save", help="保存风格模板")
p_save.add_argument("--name", required=True, help="模板名称")
p_save.add_argument("--style", required=True, help="回复策略(幽默/专业/温暖/高情商/互动引导/毒舌俏皮)")
p_save.add_argument("--extra", default="", help="额外备注(字数限制、口头禅等)")
p_save.add_argument("--platform", default="", help="目标平台(小红书/抖音/微博/B站/知乎)")
p_save.add_argument("--created", default=datetime.now().strftime("%Y-%m-%d %H:%M"))
# list
sub.add_parser("list", help="列出所有风格模板")
# load
p_load = sub.add_parser("load", help="加载风格模板详情")
p_load.add_argument("--name", required=True, help="模板名称")
# delete
p_del = sub.add_parser("delete", help="删除风格模板")
p_del.add_argument("--name", required=True, help="模板名称")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
cmds = {"save": cmd_save, "list": cmd_list, "load": cmd_load, "delete": cmd_delete}
cmds[args.command](args)
if __name__ == "__main__":
main()
快递追踪助手。输入快递单号查询物流状态,自动识别快递公司。
---
name: cn-kuaidi-tracker
description: "快递追踪助手。输入快递单号查询物流状态,自动识别快递公司。"
metadata: {"openclaw": {"emoji": "📦"}}
---
# 快递追踪助手
输入单号,查询物流状态。
## 功能
- 输入单号查询物流
- 自动识别快递公司
- 本地追踪列表管理
## 用法
```bash
python3 scripts/express_tracker.py "添加快递 SF1234567890"
python3 scripts/express_tracker.py "查 SF1234567890"
python3 scripts/express_tracker.py "查快递"
```
## 支持快递公司
顺丰、中通、圆通、韵达、申通、极兔、京东、EMS、邮政、德邦
## 数据接口
快递100公开查询接口,无需注册。
## 数据存储
本地JSON文件:`~/.qclaw/skills/cn-express-tracker/data/express.json`
FILE:data/express.json
{
"tracking": []
}
FILE:scripts/express_tracker.py
#!/usr/bin/env python3
"""
cn-express-tracker 快递追踪技能
快递100免费接口查询,无需API Key
"""
import json
import os
import sys
import re
import requests
DATA_DIR = os.path.expanduser("~/.qclaw/skills/cn-express-tracker/data")
DATA_FILE = os.path.join(DATA_DIR, "express.json")
CARRIERS = {
"顺丰": "shunfeng", "sf": "shunfeng", "shunfeng": "shunfeng",
"中通": "zhongtong", "zhongtong": "zhongtong",
"圆通": "yuantong", "yuantong": "yuantong",
"韵达": "yunda", "yunda": "yunda",
"申通": "shentong", "shentong": "shentong",
"极兔": "jtexpress", "jtexpress": "jtexpress",
"京东": "jd", "jd": "jd",
"ems": "ems",
"邮政": "youzheng",
"德邦": "debangwuliu",
}
NUMBER_CARRIER = {
"SF": "shunfeng", "ET": "jtexpress", "YT": "yuantong",
"YD": "yunda", "ST": "shentong",
}
def load():
os.makedirs(DATA_DIR, exist_ok=True)
if os.path.exists(DATA_FILE):
return json.load(open(DATA_FILE))
return {"tracking": []}
def save(d):
with open(DATA_FILE, "w") as f:
json.dump(d, f, ensure_ascii=False, indent=2)
def extract_number(text):
"""从文本中提取快递单号"""
# SF + 10+位数字
m = re.search(r'SF(\d{10,18})', text, re.IGNORECASE)
if m:
return "SF" + m.group(1)
# JD + 12+位数字
m = re.search(r'JD(\d{12,20})', text, re.IGNORECASE)
if m:
return "JD" + m.group(1)
# YT + 10+位数字
m = re.search(r'YT(\d{10,20})', text, re.IGNORECASE)
if m:
return "YT" + m.group(1)
# EA / RA 开头
m = re.search(r'(EA\d{9,15}|RA\d{9,15})', text, re.IGNORECASE)
if m:
return m.group(1).upper()
# 纯数字 10-22位
m = re.search(r'\b(\d{10,22})\b', text)
if m:
return m.group(1)
return None
def detect(number):
n = number.strip().upper()
for prefix, c in NUMBER_CARRIER.items():
if n.startswith(prefix):
return c
# 按位数猜
if len(n) == 12 and n.startswith("SF"):
return "shunfeng"
if len(n) == 18 and n.startswith("SF"):
return "shunfeng"
if len(n) == 15:
return "shunfeng"
if len(n) == 13:
return "yuantong"
if len(n) == 15:
return "jtexpress"
return None
def extract_carrier(text):
for kw, c in CARRIERS.items():
if kw in text:
return c
return None
def query_kuaidi100(number, carrier):
"""查询快递100接口,使用SSL双层降级策略"""
url = f"https://www.kuaidi100.com/query?type={carrier}&postid={number}&temp=0.1"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Referer": "https://www.kuaidi100.com/",
}
# 第一层:标准SSL验证
try:
r = requests.get(url, headers=headers, timeout=10, verify=True)
return r.json()
except requests.exceptions.SSLError:
# 第二层:SSL验证失败时回退(仅用于兼容老旧环境)
try:
r = requests.get(url, headers=headers, timeout=10, verify=True)
return r.json()
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"网络请求失败: {str(e)}"}
except requests.exceptions.RequestException as e:
return {"status": "error", "message": f"网络请求失败: {str(e)}"}
def format_result(number, carrier, data):
state_map = {
"0": "🚚 在途", "1": "📦 已揽收", "2": "🛵 派送中",
"3": "✅ 已签收", "4": "↩️ 退回中", "5": "⚠️ 问题件",
}
if data.get("status") != "200":
return f"❌ 查询失败:{data.get('message', '未知')}\n📦 {number}({carrier})\n💡 确认单号正确"
state = str(data.get("state", "0"))
st = state_map.get(state, f"📍 状态{state}")
items = data.get("data", [])
latest = items[0] if items else {}
lines = [f"{st} {number}({carrier})", "━━━━━━━━━━━━━━"]
if latest:
lines.append(f"📍 {latest.get('context', '')}")
lines.append(f"🕐 {latest.get('ftime', latest.get('time', ''))}")
if len(items) > 1:
lines.append("\n📋 历史:")
for i in items[1:5]:
lines.append(f" {i.get('ftime', i.get('time',''))} {i.get('context','')}")
return "\n".join(lines)
def handle(text):
data = load()
t = text.strip()
num = extract_number(t)
# 查所有
if t in ("查快递", "快递状态", "我的快递", "快递", "所有快递") or re.match(r'^(查|我的|看)\s*快递', t):
if not data["tracking"]:
return "📦 暂无追踪快递\n━━━━━━━━━━━━━━\n📝 添加:添加快递 SF1234567890\n🔍 查询:查 单号"
lines = ["📦 快递追踪\n━━━━━━━━━━━━━━"]
for item in data["tracking"]:
n = item["number"]
c = item["carrier"]
s = item.get("last_status", "未知")
tt = item.get("last_time", "")[:10]
lines.append(f"🏢 {c} | {n}\n 📍 {s} {tt}\n")
return "\n".join(lines).strip()
# 添加
if ("添加" in t or "新增" in t) and num:
carrier = extract_carrier(t) or detect(num)
if not carrier:
return f"❓ 无法识别公司,请「添加快递 {num} 公司:顺丰」"
result = query_kuaidi100(num, carrier)
last_s = last_t = "未知"
if "data" in result and result["data"]:
last_s = result["data"][0].get("context", "未知")
last_t = result["data"][0].get("ftime", "")
existing = [i for i in data["tracking"] if i["number"] == num]
if existing:
existing[0].update({"carrier": carrier, "last_status": last_s, "last_time": last_t})
save(data)
return f"♻️ 更新 {num}({carrier})\n📍 {last_s}"
data["tracking"].append({"number": num, "carrier": carrier, "last_status": last_s, "last_time": last_t})
save(data)
return f"✅ 已添加\n━━━━━━━━━━━━━━\n📦 {num}\n🏢 {carrier}\n📍 {last_s}"
# 删除
if any(k in t for k in ["删除", "取消追踪", "移除"]):
if not num:
return "❓ 请提供单号:删除快递 单号"
before = len(data["tracking"])
data["tracking"] = [i for i in data["tracking"] if i["number"] != num]
save(data)
return f"{'✅' if len(data['tracking']) < before else '❓ 未找到'} 已删除 {num}"
# 清除
if any(k in t for k in ["清除", "清空"]):
n = len(data["tracking"])
data["tracking"] = []
save(data)
return f"🗑️ 已清除 {n} 条"
# 查询单号
if num:
carrier = extract_carrier(t) or detect(num)
if not carrier:
return f"❓ 无法识别公司,请「查快递 {num} 公司:顺丰」"
result = query_kuaidi100(num, carrier)
return format_result(num, carrier, result)
return ("📦 快递追踪\n━━━━━━━━━━━━━━\n"
"📝 添加:添加快递 SF1234567890\n"
"🔍 查询:查快递 / 查 单号\n"
"📋 列表:我的快递\n"
"🗑️ 删除:删除快递 单号")
if __name__ == "__main__":
print(handle(" ".join(sys.argv[1:]) if len(sys.argv) > 1 else ""))
中文体重追踪助手。记录每日体重、计算BMI、追踪趋势、设定目标。 本地存储,无需账号,隐私安全。 当用户说"体重"、"记录体重"、"今天多重"、"BMI"、"体脂"、"减重"、"目标体重"时触发。 Keywords: 体重, BMI, 减重, 增重, 目标体重, 体重记录, 健康
---
name: cn-weight-tracker
description: |
中文体重追踪助手。记录每日体重、计算BMI、追踪趋势、设定目标。
本地存储,无需账号,隐私安全。
当用户说"体重"、"记录体重"、"今天多重"、"BMI"、"体脂"、"减重"、"目标体重"时触发。
Keywords: 体重, BMI, 减重, 增重, 目标体重, 体重记录, 健康
metadata: {"openclaw": {"emoji": "⚖️"}}
---
# ⚖️ 体重追踪助手
记录体重,关注健康。
## 核心功能
| 功能 | 说明 |
|------|------|
| 记录体重 | 一句话记录:`记录体重 75.5kg` 或 `今天称了73.9` |
| 查看记录 | 查体重 / 体重记录,显示最近7天趋势图 |
| BMI计算 | `算BMI`(需先设置身高) |
| 趋势分析 | 体重趋势,支持ASCII趋势图 |
| 目标设定 | `目标体重70`,显示距目标差距 |
| 身高设置 | `身高175cm`(一次性设置) |
## 使用方式
```bash
# 设置身高(用于BMI计算,只需一次)
python3 scripts/weight_tracker.py "身高175cm"
# 记录体重
python3 scripts/weight_tracker.py "记录体重 75.5kg"
python3 scripts/weight_tracker.py "今天称了73.9"
# 查看统计和趋势
python3 scripts/weight_tracker.py "查体重"
python3 scripts/weight_tracker.py "体重趋势"
# 计算BMI
python3 scripts/weight_tracker.py "算BMI"
# 设定目标
python3 scripts/weight_tracker.py "目标体重70"
```
## 数据存储
`~/.qclaw/skills/cn-weight-tracker/data/weights.json`
## BMI标准(中国)
| BMI范围 | 分类 |
|---------|------|
| < 18.5 | 偏瘦 |
| 18.5-23.9 | 正常 |
| 24-27.9 | 超重 |
| ≥ 28 | 肥胖 |
## 注意事项
- 支持 kg / 公斤 / 斤 单位
- 趋势图需要至少2条记录才能显示
- 数据完全本地存储,隐私无忧
FILE:scripts/weight_tracker.py
#!/usr/bin/env python3
"""
cn-weight-tracker 体重追踪技能
"""
import json, os, sys, re
from datetime import datetime
DATA_DIR = os.path.expanduser("~/.qclaw/skills/cn-weight-tracker/data")
DATA_FILE = os.path.join(DATA_DIR, "weights.json")
def load_data():
os.makedirs(DATA_DIR, exist_ok=True)
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"height": None, "unit": "kg", "target": None, "records": []}
def save_data(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_weight(text):
"""从文本中提取体重值(kg)"""
# 带kg/公斤/千克
m = re.search(r'(\d+\.?\d*)\s*(?:kg|公斤|千克)', text, re.IGNORECASE)
if m: return float(m.group(1))
# 斤 -> kg
m = re.search(r'(\d+\.?\d*)\s*斤', text)
if m: return float(m.group(1)) / 2
# 目标体重70(无单位)
m = re.search(r'[\u76ee\u6807].*?(\d+\.?\d*)', text)
if m: return float(m.group(1))
# 记录体重74.8 / 体重74.8(无单位)
m = re.search(r'(?:体重|称了|记录.*?)\s*(\d+\.?\d*)', text)
if m: return float(m.group(1))
# 纯数字开头
m = re.search(r'^\s*(\d+\.?\d*)\s*$', text)
if m: return float(m.group(1))
return None
def get_height(text):
"""从文本中提取身高(cm)"""
m = re.search(r'(\d+\.?\d*)\s*(?:cm|厘米)', text, re.IGNORECASE)
if m: return float(m.group(1))
m = re.search(r'(\d+)\s*[米m]\s*(\d+)', text, re.IGNORECASE)
if m: return float(m.group(1)) * 100 + float(m.group(2))
m = re.search(r'^(\d+\.?\d*)\s*m\b', text, re.IGNORECASE)
if m: return float(m.group(1)) * 100
m = re.search(r'身高\s*(\d+\.?\d*)', text, re.IGNORECASE)
if m:
v = float(m.group(1))
return v if v > 3 else v * 100
return None
def bmi_str(w, h):
b = w / (h/100) ** 2
c = "偏瘦" if b < 18.5 else "正常" if b < 24 else "超重" if b < 28 else "肥胖"
e = "🟢" if b < 18.5 else "✅" if b < 24 else "🟡" if b < 28 else "🔴"
return f"{b:.1f}({c} {e})"
def trend_bar(records):
if len(records) < 2: return "需要至少2条记录"
recs = records[-7:]
ws = [r["weight"] for r in recs]
lo, hi = min(ws), max(ws)
rng = hi - lo if hi != lo else 0.5
lines = []
for r in recs:
bar = "█" * max(1, min(20, int((r["weight"] - lo) / rng * 20)))
lines.append(f" {r['date'][5:]} {r['weight']:5.1f} |{bar:<20}|")
return "\n".join(lines)
def handle(text):
data = load_data()
t = text.strip()
# 查询统计类
if t in ("查体重", "体重记录", "我的体重", "查", "看") or re.match(r'^(查|看).*(体重|记录)', t):
recs = data["records"]
if not recs: return "暂无记录,请说「记录体重 75.5kg」"
ws = [r["weight"] for r in recs[-7:]]
cur, avg = ws[-1], sum(ws)/len(ws)
chg = cur - ws[0]
e = "📈" if chg > 0.1 else "📉" if chg < -0.1 else "➡️"
bmi = f"\nBMI:{bmi_str(cur, data['height'])}" if data["height"] else ""
return (f"📊 体重统计(最近{len(recs[-7:])}天)\n━━━━━━━━━━━━━━\n"
f"当前:{cur:.1f}kg | 均:{avg:.1f}kg\n变化:{chg:+.1f}kg {e}{bmi}\n\n📈 趋势:\n{trend_bar(recs)}")
if "趋势" in t:
return handle("查体重")
if "bmi" in t.lower() or "体质" in t:
if not data["height"]: return "❓ 请先「身高175cm」"
if not data["records"]: return "❓ 请先记录体重"
w = data["records"][-1]["weight"]
return f"📏 BMI\n━━━━━━━━\n身高:{data['height']:.0f}cm\n体重:{w:.1f}kg\nBMI:{bmi_str(w, data['height'])}"
if "身高" in t:
h = get_height(t)
if h:
data["height"] = h
save_data(data)
return f"✅ 身高 {h:.0f}cm(BMI就绪)"
return "❓「身高175cm」或「身高1米75」"
if "目标" in t:
w = get_weight(t)
if w:
data["target"] = w
save_data(data)
if data["records"]:
d = data["records"][-1]["weight"] - w
s = f"还差{abs(d):.1f}kg" if d > 0 else "已达目标🎉"
else:
s = "(待记录体重后计算差距)"
return f"🎯 目标 {w:.1f}kg({s})"
return "❓「目标体重70」"
w = get_weight(t)
if w:
today = datetime.now().strftime("%Y-%m-%d")
recs = data["records"]
if recs and recs[-1]["date"] == today:
recs[-1]["weight"] = w
else:
recs.append({"date": today, "weight": w, "unit": "kg"})
save_data(data)
bmi_s = f"\nBMI:{bmi_str(w, data['height'])}" if data["height"] else ""
tgt_s = ""
if data["target"]:
d = w - data["target"]
tgt_s = f"\n距目标:{d:+.1f}kg"
return f"✅ 记录 {w:.1f}kg{bmi_s}{tgt_s}"
return ("⚖️ 体重追踪\n━━━━━━━━━━━━━━\n"
"📝 记录:记录体重 75.5kg\n"
"📊 查记录:查体重 / 体重记录\n"
"📏 BMI:算BMI(需先设身高)\n"
"🎯 目标:目标体重70\n"
"📏 身高:身高175cm")
if __name__ == "__main__":
print(handle(" ".join(sys.argv[1:]) if len(sys.argv) > 1 else ""))
FILE:data/weights.json
{
"height": 175.0,
"unit": "kg",
"target": 70.0,
"records": [
{
"date": "2026-04-15",
"weight": 73.9,
"unit": "kg"
}
]
}AI Token消耗监控。读取会话日志,统计Token消耗,检测异常并提供优化建议。 帮助用户了解AI使用成本,避免意外超支。 当用户说"Token"、"消耗多少"、"Token统计"、"费用"、"超支"时触发。 Keywords: Token, 消耗, 费用, 统计, 监控, 成本.
---
name: token-monitor
description: |
AI Token消耗监控。读取会话日志,统计Token消耗,检测异常并提供优化建议。
帮助用户了解AI使用成本,避免意外超支。
当用户说"Token"、"消耗多少"、"Token统计"、"费用"、"超支"时触发。
Keywords: Token, 消耗, 费用, 统计, 监控, 成本.
metadata: {"openclaw": {"emoji": "📊"}}
---
# Token Monitor - AI Token 消耗监控
监控 AI Token 消耗,检测异常,提供优化建议。
## 功能
- 📊 读取 session 日志统计 Token 消耗
- 🚨 检测异常消耗(短时激增、重复失败等)
- 💡 给出优化建议
- 📈 生成消耗趋势报告
## 使用
### 检查今日消耗
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --today
```
### 检查指定日期
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --date 2026-04-13
```
### 检查异常模式
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --check-anomaly
```
### 生成优化报告
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --report
```
## 异常检测规则
| 规则 | 阈值 | 说明 |
|------|------|------|
| 单日总量 | >1000万 | 严重超标 |
| 单小时 | >200万 | 需要检查 |
| 重复失败 | >3次 | 死循环风险 |
| 浏览器快照 | 单次>5000字 | 未压缩 |
## 历史数据参考
| 日期 | Input | Output | Total | 状态 |
|------|-------|--------|-------|------|
| 4/11 | 3.46M | 117K | 28.8M | 🚨 超标 |
| 4/12 | 5.57M | 127K | 37.0M | 🚨 严重超标 |
| 4/13 | 1.80M | 22K | 9.5M | ⚠️ 偏高 |
| 4/14 | 430K | 20K | 3.7M | ✅ 正常 |
**4/12是Token浪费最严重的一天**:3700万token,根因是Chrome CDP崩溃重试+ClawHub限速重试
## 优化建议库
- 浏览器快照用 `compact=true`
- 同一操作失败3次立即换方案
- 限速不硬等,跳转做其他事
- 后台进程不轮询
## 数据存储
- 日志:`~/.qclaw/agents/main/sessions/*.jsonl`
- 统计:`memory/token-usage-YYYY-MM-DD.json`
FILE:scripts/token_stats.py
#!/usr/bin/env python3
"""Token Monitor - 监控 AI Token 消耗"""
import json
import os
import sys
import glob
import argparse
from datetime import datetime, timedelta
from pathlib import Path
WORKSPACE = os.path.expanduser("~/.qclaw/workspace")
SESSIONS_DIR = os.path.expanduser("~/.qclaw/agents/main/sessions")
MEMORY_DIR = os.path.expanduser("~/.qclaw/workspace/memory")
# 异常阈值
ANOMALY_THRESHOLDS = {
"daily_total": 10_000_000, # 1000万/天
"hourly": 2_000_000, # 200万/小时
"single_call": 500_000, # 50万/单次调用
}
def parse_timestamp(ts_str):
"""解析 ISO 格式时间戳"""
try:
# 处理带毫秒的时间戳
if '.' in ts_str:
ts_str = ts_str.split('.')[0] + '.' + ts_str.split('.')[1][:6]
return datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
return datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
except:
return None
def parse_all_sessions_for_date(date_str):
"""解析所有 session 文件中指定日期的数据"""
target_date = datetime.strptime(date_str, "%Y-%m-%d").date()
tokens_in = 0
tokens_out = 0
total_tokens = 0
message_count = 0
files_checked = 0
pattern = os.path.join(SESSIONS_DIR, "*.jsonl")
all_files = glob.glob(pattern)
for filepath in all_files:
files_checked += 1
try:
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
if msg.get("type") != "message":
continue
# 检查时间戳
ts_str = msg.get("timestamp", "")
msg_time = parse_timestamp(ts_str)
if not msg_time:
continue
# 只统计目标日期的消息
if msg_time.date() != target_date:
continue
# 统计 token
message = msg.get("message", {})
usage = message.get("usage", {})
if usage:
tokens_in += usage.get("input", 0)
tokens_out += usage.get("output", 0)
total_tokens += usage.get("totalTokens", 0)
message_count += 1
except (json.JSONDecodeError, ValueError):
continue
except Exception as e:
print(f"Warning: Error reading {filepath}: {e}", file=sys.stderr)
return {
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"total": total_tokens,
"messages": message_count,
"files_checked": files_checked,
}
def format_number(n):
"""格式化数字"""
if n >= 1_000_000:
return f"{n/1_000_000:.2f}M"
elif n >= 1_000:
return f"{n/1_000:.1f}K"
return str(n)
def check_date(date_str):
"""检查指定日期消耗"""
print(f"\n🔍 正在分析 {date_str} 的 Token 消耗...")
stats = parse_all_sessions_for_date(date_str)
if stats["messages"] == 0:
print(f"📭 {date_str} 无消息记录")
return 0
print(f"\n📊 Token 消耗报告 - {date_str}")
print("=" * 50)
print(f"📁 检查文件: {stats['files_checked']}")
print(f"💬 消息数量: {stats['messages']}")
print(f"📥 Input: {format_number(stats['tokens_in']):>10}")
print(f"📤 Output: {format_number(stats['tokens_out']):>10}")
print(f"📊 Total: {format_number(stats['total']):>10}")
print("=" * 50)
# 异常检查
if stats["total"] > ANOMALY_THRESHOLDS["daily_total"]:
print(f"\n🚨 警告:单日消耗超过 {format_number(ANOMALY_THRESHOLDS['daily_total'])}!")
print(" 建议检查是否有重复失败或死循环")
print(" 参考: ~/.qclaw/workspace/memory/2026-04-14.md")
elif stats["total"] > 5_000_000:
print(f"\n⚠️ 提醒:消耗较高,建议检查优化空间")
# 保存统计
save_stats(date_str, stats)
return stats["total"]
def save_stats(date_str, stats):
"""保存统计到 memory"""
os.makedirs(MEMORY_DIR, exist_ok=True)
filepath = os.path.join(MEMORY_DIR, f"token-usage-{date_str}.json")
data = {
"date": date_str,
"tokens_in": stats["tokens_in"],
"tokens_out": stats["tokens_out"],
"total": stats["total"],
"messages": stats["messages"],
"recorded_at": datetime.now().isoformat(),
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"\n💾 统计已保存: memory/token-usage-{date_str}.json")
def generate_report():
"""生成优化报告"""
print("\n📋 Token 优化建议报告")
print("=" * 50)
suggestions = [
("浏览器快照", "使用 compact=true + depth=1", "可减少 50-80% token"),
("失败重试", "同一操作失败3次立即换方案", "避免死循环浪费"),
("限速等待", "不设后台轮询,跳转做其他事", "节省等待期 token"),
("后台进程", "设好 delay 后不 poll", "减少无效检查"),
("长文本", "分段处理,避免单次超大输入", "控制单次调用"),
]
for title, action, effect in suggestions:
print(f"\n✅ {title}")
print(f" 做法: {action}")
print(f" 效果: {effect}")
print("\n" + "=" * 50)
print("📚 详细规则: ~/.qclaw/workspace/AGENTS.md")
print("📊 历史统计: memory/token-usage-YYYY-MM-DD.json")
def show_history(days=7):
"""显示最近N天历史"""
print(f"\n📈 最近 {days} 天 Token 消耗趋势")
print("=" * 50)
print(f"{'日期':<12} {'Input':>10} {'Output':>10} {'Total':>10}")
print("-" * 50)
total_in = 0
total_out = 0
total_all = 0
for i in range(days-1, -1, -1):
date = (datetime.now() - timedelta(days=i)).strftime("%Y-%m-%d")
filepath = os.path.join(MEMORY_DIR, f"token-usage-{date}.json")
if os.path.exists(filepath):
with open(filepath, 'r') as f:
data = json.load(f)
print(f"{date:<12} {format_number(data['tokens_in']):>10} {format_number(data['tokens_out']):>10} {format_number(data['total']):>10}")
total_in += data['tokens_in']
total_out += data['tokens_out']
total_all += data['total']
else:
print(f"{date:<12} {'-':>10} {'-':>10} {'-':>10}")
print("-" * 50)
print(f"{'合计':<12} {format_number(total_in):>10} {format_number(total_out):>10} {format_number(total_all):>10}")
def main():
parser = argparse.ArgumentParser(description="Token Monitor - 监控 AI Token 消耗")
parser.add_argument("--today", action="store_true", help="检查今日消耗")
parser.add_argument("--date", type=str, help="检查指定日期 (YYYY-MM-DD)")
parser.add_argument("--report", action="store_true", help="生成优化报告")
parser.add_argument("--history", type=int, metavar="N", help="显示最近N天历史")
args = parser.parse_args()
if args.report:
generate_report()
elif args.history:
show_history(args.history)
elif args.date:
check_date(args.date)
else:
# 默认检查今日
today = datetime.now().strftime("%Y-%m-%d")
check_date(today)
if __name__ == "__main__":
main()
图片水印工具。为图片添加文字或Logo水印,防止内容被搬运。 支持批量处理、九宫格定位、透明度调节。 当用户说"水印"、"图片水印"、"添加水印"、"防伪"、"版权"时触发。 Keywords: 图片水印, 添加水印, 防搬运, 版权保护, watermark, image.
---
name: cn-image-watermark
description: |
图片水印工具。为图片添加文字或Logo水印,防止内容被搬运。
支持批量处理、九宫格定位、透明度调节。
当用户说"水印"、"图片水印"、"添加水印"、"防伪"、"版权"时触发。
Keywords: 图片水印, 添加水印, 防搬运, 版权保护, watermark, image.
metadata: {"openclaw": {"emoji": "🖼️"}}
---
# cn-image-watermark - 图片水印工具
为图片添加文字或图片水印,防止搬运。
## 核心功能
- **文字水印**:叠加文字(半透明、位置可调)
- **图片水印**:叠加Logo/小图标
- **批量处理**:对整个文件夹批量加水印
- **位置控制**:九宫格位置(9个锚点)
- **透明度**:0-100可调
## 使用场景
- 知识付费内容防搬运(水印",禁止转载")
- 社交媒体图片品牌标识
- 封面图添加版权信息
- 批量处理产品图片
## 输出格式
```json
{
"input": "photo.jpg",
"output": "photo_watermarked.jpg",
"type": "text",
"position": "右下",
"status": "ok"
}
```
## 使用方式
```bash
# 文字水印
python ~/.qclaw/skills/cn-image-watermark/watermark.py text "photo.jpg" "©养虾记" --position bottom-right
# Logo水印
python ~/.qclaw/skills/cn-image-watermark/watermark.py image "photo.jpg" "logo.png" --position bottom-right --opacity 60
# 批量处理
python ~/.qclaw/skills/cn-image-watermark/watermark.py batch "./photos" "output" "©养虾记"
```
## 依赖
- Python3
- Pillow(PIL):`pip3 install Pillow`
## 标签
cn, watermark, image, photo, protection
FILE:watermark.py
#!/usr/bin/env python3
"""图片水印工具 - 文字/图片水印,支持批量"""
import sys
import os
import json
import argparse
from pathlib import Path
try:
from PIL import Image, ImageDraw, ImageFont, ImageEnhance
except ImportError:
print("❌ 缺少 Pillow,请运行: pip3 install Pillow")
sys.exit(1)
POSITIONS = {
"top-left": (0.05, 0.05),
"top-center": (0.5, 0.05),
"top-right": (0.90, 0.05),
"center-left": (0.05, 0.5),
"center": (0.5, 0.5),
"center-right": (0.90, 0.5),
"bottom-left": (0.05, 0.90),
"bottom-center":(0.5, 0.90),
"bottom-right": (0.90, 0.90),
}
def get_font(size=20):
"""获取中文字体,优先系统字体"""
font_paths = [
"/System/Library/Fonts/PingFang.ttc",
"/System/Library/Fonts/STHeiti Light.ttc",
"/System/Library/Fonts/Hiragino Sans GB.ttc",
"/System/Library/Fonts/STSong.ttf",
"/System/Library/Fonts/Arial Unicode.ttf",
]
for fp in font_paths:
if os.path.exists(fp):
try:
return ImageFont.truetype(fp, size)
except Exception:
pass
return ImageFont.load_default()
def add_text_watermark(input_path, output_path, text, position="bottom-right",
opacity=60, font_size=20, color=(255,255,255)):
"""添加文字水印"""
try:
img = Image.open(input_path).convert("RGBA")
overlay = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
font = get_font(font_size)
# 计算文字尺寸
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
# 文字位置
rx, ry = POSITIONS.get(position, (0.90, 0.90))
margin = 20
x = int(img.width * rx - tw - margin if "right" in position else
img.width * rx - tw // 2 if "center" in position else margin)
y = int(img.height * ry - th - margin if "bottom" in position else
img.height * ry - th // 2 if "center" in position else margin)
alpha = int(255 * opacity / 100)
text_color = color + (alpha,)
draw.text((x, y), text, fill=text_color, font=font)
watermarked = Image.alpha_composite(img, overlay)
final = watermarked.convert("RGB")
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
final.save(output_path, "JPEG", quality=95)
return True
except Exception as e:
print(f"❌ 处理失败: {e}", file=sys.stderr)
return False
def add_image_watermark(input_path, output_path, logo_path, position="bottom-right", opacity=60):
"""添加图片水印(Logo)"""
try:
img = Image.open(input_path).convert("RGBA")
logo = Image.open(logo_path).convert("RGBA")
# 缩放Logo为原图的10%
ratio = 0.10
new_w = int(img.width * ratio)
new_h = int(logo.height * (new_w / logo.width))
logo = logo.resize((new_w, new_h), Image.LANCZOS)
# 透明度
if opacity < 100:
enhancer = ImageEnhance.Brightness(logo)
logo = enhancer.enhance(opacity / 100)
rx, ry = POSITIONS.get(position, (0.90, 0.90))
x = int(img.width * rx - new_w - 20)
y = int(img.height * ry - new_h - 20)
# 创建透明画布,贴Logo
layer = Image.new("RGBA", img.size, (0, 0, 0, 0))
layer.paste(logo, (x, y), logo)
watermarked = Image.alpha_composite(img, layer).convert("RGB")
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
watermarked.save(output_path, "JPEG", quality=95)
return True
except Exception as e:
print(f"❌ 处理失败: {e}", file=sys.stderr)
return False
def batch_process(input_dir, output_dir, text=None, logo_path=None,
position="bottom-right", opacity=60):
"""批量处理"""
os.makedirs(output_dir, exist_ok=True)
exts = {".jpg", ".jpeg", ".png", ".webp", ".bmp"}
results = []
for fp in Path(input_dir).rglob("*"):
if fp.suffix.lower() in exts:
out = os.path.join(output_dir, fp.name)
ok = False
if text:
ok = add_text_watermark(str(fp), out, text, position, opacity)
elif logo_path:
ok = add_image_watermark(str(fp), out, logo_path, position, opacity)
results.append({"file": str(fp), "output": out, "status": "ok" if ok else "failed"})
return results
def main():
p = argparse.ArgumentParser(description="图片水印工具")
sub = p.add_subparsers(dest="cmd", required=True)
# text subcommand
t = sub.add_parser("text", help="文字水印")
t.add_argument("input", help="输入图片")
t.add_argument("text", help="水印文字")
t.add_argument("--output", "-o", help="输出路径(默认添加_watermarked后缀)")
t.add_argument("--position", default="bottom-right",
choices=list(POSITIONS.keys()), help="位置")
t.add_argument("--opacity", type=int, default=60, help="透明度0-100")
t.add_argument("--font-size", type=int, default=20, help="字体大小")
t.add_argument("--color", default="255,255,255", help="颜色RGB")
# image subcommand
i = sub.add_parser("image", help="图片水印")
i.add_argument("input", help="输入图片")
i.add_argument("logo", help="水印图片(Logo)")
i.add_argument("--output", "-o")
i.add_argument("--position", default="bottom-right", choices=list(POSITIONS.keys()))
i.add_argument("--opacity", type=int, default=60)
# batch subcommand
b = sub.add_parser("batch", help="批量处理")
b.add_argument("input_dir", help="输入文件夹")
b.add_argument("output_dir", help="输出文件夹")
b.add_argument("--text", help="文字水印")
b.add_argument("--logo", help="Logo水印")
b.add_argument("--position", default="bottom-right", choices=list(POSITIONS.keys()))
b.add_argument("--opacity", type=int, default=60)
args = p.parse_args()
def get_output(input_path, user_output):
if user_output:
return user_output
p = Path(input_path)
return str(p.parent / f"{p.stem}_watermarked{p.suffix}")
result = None
if args.cmd == "text":
color = tuple(int(x) for x in args.color.split(","))
out = get_output(args.input, args.output)
ok = add_text_watermark(args.input, out, args.text, args.position,
args.opacity, args.font_size, color)
result = {"cmd": "text", "input": args.input, "output": out,
"text": args.text, "position": args.position,
"status": "ok" if ok else "failed"}
elif args.cmd == "image":
out = get_output(args.input, args.output)
ok = add_image_watermark(args.input, out, args.logo, args.position, args.opacity)
result = {"cmd": "image", "input": args.input, "output": out,
"logo": args.logo, "position": args.position, "status": "ok" if ok else "failed"}
elif args.cmd == "batch":
logo = args.logo if hasattr(args, "logo") and args.logo else None
results = batch_process(args.input_dir, args.output_dir,
args.text, logo, args.position, args.opacity)
result = {"cmd": "batch", "total": len(results), "details": results}
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result["status"] == "ok" else 1)
if __name__ == "__main__":
main()
FILE:scripts/image_watermark.py
#!/usr/bin/env python3
"""cn-image-watermark - 图片水印工具"""
import sys
def add_text_watermark(image_path, text, position='右下', opacity=128):
"""添加文字水印
使用PIL库添加文字水印到图片
Args:
image_path: 图片路径
text: 水印文字
position: 位置(右下/左下/右上/左上/居中)
opacity: 透明度 0-255
"""
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
return {"success": False, "error": "需要安装Pillow: pip install Pillow"}
try:
img = Image.open(image_path)
draw = ImageDraw.Draw(img)
w, h = img.size
# 字体大小
font_size = max(20, min(w, h) // 20)
try:
font = ImageFont.truetype("/System/Library/Fonts/PingFang.ttc", font_size)
except:
font = ImageFont.load_default()
# 水印位置
text_w = font_size * len(text) * 0.6
text_h = font_size * 1.2
positions = {
'右下': (w - text_w - 10, h - text_h - 10),
'左下': (10, h - text_h - 10),
'右上': (w - text_w - 10, 10),
'左上': (10, 10),
'居中': ((w - text_w) / 2, (h - text_h) / 2),
}
pos = positions.get(position, positions['右下'])
# 绘制水印
draw.text(pos, text, fill=(255, 255, 255, opacity), font=font)
output = image_path.rsplit('.', 1)[0] + '_watermarked.png'
img.save(output)
return {"success": True, "output": output}
except Exception as e:
return {"success": False, "error": str(e)}
if __name__ == '__main__':
if len(sys.argv) < 3:
print("用法: python image_watermark.py <图片路径> <水印文字> [位置]")
else:
pos = sys.argv[3] if len(sys.argv) > 3 else '右下'
result = add_text_watermark(sys.argv[1], sys.argv[2], pos)
print(result)
JSON工具箱。格式化、比对、提取、压缩、验证JSON。 中文优先,无需API Key,开箱即用。 当用户说"格式化JSON"、"JSON比对"、"JSON提取"、"JSON压缩"、"JSON验证"时触发。 Keywords: JSON格式化, JSON比对, JSON提取, JSON压缩, json, diff.
---
name: cn-json-tools
description: |
JSON工具箱。格式化、比对、提取、压缩、验证JSON。
中文优先,无需API Key,开箱即用。
当用户说"格式化JSON"、"JSON比对"、"JSON提取"、"JSON压缩"、"JSON验证"时触发。
Keywords: JSON格式化, JSON比对, JSON提取, JSON压缩, json, diff.
metadata: {"openclaw": {"emoji": "📋"}}
---
# cn-json-tools - JSON工具箱
JSON格式化、比对、提取、压缩。
## 核心功能
- **format**:格式化JSON(缩进美化)
- **diff**:比对两个JSON文件的差异
- **extract**:从JSON中提取指定路径字段
- **minify**:压缩JSON(去除空白)
- **validate**:验证JSON格式是否合法
## 使用场景
- 调试API返回数据
- 对比两个配置文件差异
- 从复杂JSON中提取关键字段
- 压缩JSON用于配置文件
## 使用方式
```bash
# 格式化
python ~/.qclaw/skills/cn-json-tools/json_tool.py format "{\"a\":1,\"b\":2}"
# 比对
python ~/.qclaw/skills/cn-json-tools/json_tool.py diff file1.json file2.json
# 提取
python ~/.qclaw/skills/cn-json-tools/json_tool.py extract "{\"user\":{\"name\":\"张三\"}}" "user.name"
# 压缩
python ~/.qclaw/skills/cn-json-tools/json_tool.py minify "{\"a\":1,\"b\":2}"
# 验证
python ~/.qclaw/skills/cn-json-tools/json_tool.py validate "{\"a\":1}"
```
## 依赖
- Python3(标准库,无需安装任何包)
## 标签
cn, json, tools, format, diff, formatter
FILE:json_tool.py
#!/usr/bin/env python3
"""JSON工具箱 - 格式化/比对/提取/压缩/验证"""
import sys
import os
import json
import argparse
def cmd_format(text):
"""格式化JSON"""
try:
data = json.loads(text)
print(json.dumps(data, ensure_ascii=False, indent=2))
return True
except json.JSONDecodeError as e:
print(f"❌ JSON格式错误: {e}", file=sys.stderr)
return False
def cmd_diff(file1, file2):
"""比对两个JSON文件"""
try:
with open(file1) as f:
d1 = json.load(f)
with open(file2) as f:
d2 = json.load(f)
except FileNotFoundError as e:
print(f"❌ 文件未找到: {e}", file=sys.stderr)
return False
except json.JSONDecodeError as e:
print(f"❌ JSON格式错误: {e}", file=sys.stderr)
return False
# 简单diff:检查键差异
only1 = set(get_keys(d1)) - set(get_keys(d2))
only2 = set(get_keys(d2)) - set(get_keys(d1))
all_keys = set(get_keys(d1)) & set(get_keys(d2))
changed = {k for k in all_keys
if d1.get(k) != d2.get(k)}
print("文件1独有:", only1 or "无")
print("文件2独有:", only2 or "无")
print("值不同:", changed or "无")
return True
def get_keys(obj, prefix=""):
"""递归获取所有键路径"""
if isinstance(obj, dict):
result = []
for k, v in obj.items():
path = f"{prefix}.{k}" if prefix else k
result.append(path)
result.extend(get_keys(v, path))
return result
elif isinstance(obj, list):
return [f"{prefix}[i]" for i in range(len(obj))]
return []
def cmd_extract(text, path):
"""从JSON提取指定路径"""
try:
data = json.loads(text)
except json.JSONDecodeError as e:
print(f"❌ JSON格式错误: {e}", file=sys.stderr)
return False
parts = path.split(".")
for part in parts:
if isinstance(data, dict):
data = data.get(part, {})
else:
data = None
break
if data is None:
print(f"❌ 路径 '{path}' 不存在", file=sys.stderr)
return False
print(json.dumps(data, ensure_ascii=False, indent=2))
return True
def cmd_minify(text):
"""压缩JSON"""
try:
data = json.loads(text)
print(json.dumps(data, ensure_ascii=False, separators=(',', ':')))
return True
except json.JSONDecodeError as e:
print(f"❌ JSON格式错误: {e}", file=sys.stderr)
return False
def cmd_validate(text):
"""验证JSON"""
try:
json.loads(text)
print("✅ 有效的JSON")
return True
except json.JSONDecodeError as e:
print(f"❌ JSON格式错误: {e}", file=sys.stderr)
return False
def main():
if len(sys.argv) < 2:
print("用法:")
print(" python json_tool.py format <json>")
print(" python json_tool.py validate <json>")
print(" python json_tool.py minify <json>")
print(" python json_tool.py extract <json> <path>")
print(" python json_tool.py diff <file1.json> <file2.json>")
print(" echo '<json>' | python json_tool.py format")
sys.exit(1)
cmd = sys.argv[1]
# diff 需要两个文件路径
if cmd == "diff":
if len(sys.argv) < 4:
print("用法: python json_tool.py diff <file1.json> <file2.json>", file=sys.stderr)
sys.exit(1)
sys.exit(0 if cmd_diff(sys.argv[2], sys.argv[3]) else 1)
# extract 需要路径参数
if cmd == "extract":
if len(sys.argv) < 4:
print("用法: python json_tool.py extract <json> <path>", file=sys.stderr)
sys.exit(1)
json_text = sys.argv[2]
path = sys.argv[3]
else:
json_text = sys.argv[2] if len(sys.argv) > 2 else ""
# 从stdin读取
if not json_text and not sys.stdin.isatty():
json_text = sys.stdin.read().strip()
if not json_text:
print(f"用法: python json_tool.py {cmd} '<JSON字符串或文件路径>'", file=sys.stderr)
sys.exit(1)
# 如果是文件路径,读取文件
if os.path.isfile(json_text):
with open(json_text) as f:
json_text = f.read().strip()
if cmd == "format":
sys.exit(0 if cmd_format(json_text) else 1)
elif cmd == "minify":
sys.exit(0 if cmd_minify(json_text) else 1)
elif cmd == "validate":
sys.exit(0 if cmd_validate(json_text) else 1)
elif cmd == "extract":
sys.exit(0 if cmd_extract(json_text, path) else 1)
else:
print(f"未知命令: {cmd}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
FILE:scripts/json_tools.py
#!/usr/bin/env python3
"""cn-json-tools - JSON工具箱"""
import json, sys
def format_json(text, indent=2):
"""格式化JSON"""
try:
obj = json.loads(text)
return json.dumps(obj, indent=indent, ensure_ascii=False)
except json.JSONDecodeError as e:
return f"JSON解析错误: {e}"
def minify_json(text):
"""压缩JSON"""
try:
obj = json.loads(text)
return json.dumps(obj, separators=(',', ':'))
except json.JSONDecodeError as e:
return f"JSON解析错误: {e}"
def validate_json(text):
"""验证JSON"""
try:
json.loads(text)
return {"valid": True}
except json.JSONDecodeError as e:
return {"valid": False, "error": str(e)}
def extract_keys(text, path=None):
"""提取JSON中的值"""
try:
obj = json.loads(text)
if path:
for key in path.split('.'):
obj = obj[key]
return json.dumps(obj, indent=2, ensure_ascii=False)
except Exception as e:
return f"提取失败: {e}"
if __name__ == '__main__':
text = sys.stdin.read() if not sys.stdin.isatty() else sys.argv[1] if len(sys.argv) > 1 else '{"key": "value"}'
print(format_json(text))
二维码生成器。输入URL或文本,生成PNG二维码,支持自定义颜色和尺寸。
---
name: cn-qrcode-generator
description: "二维码生成器。输入URL或文本,生成PNG二维码,支持自定义颜色和尺寸。"
metadata: {"openclaw": {"emoji": "📱"}}
---
# 二维码生成器
生成PNG二维码,支持自定义颜色和尺寸。
## 功能
- 输入URL/文本,生成二维码
- 自定义颜色(前景/背景)
- 自定义尺寸(默认300px)
- 支持PNG格式
## 用法
```bash
python3 generate.py "https://example.com" --output qr.png --size 300
```
## 依赖
- Python3
- requests
FILE:generate.py
#!/usr/bin/env python3
"""中文二维码生成器 - Google Chart API(qrserver.com)"""
import sys
import os
import json
import argparse
import urllib.parse
import subprocess
DEFAULT_OUTPUT = os.path.expanduser("~/Downloads/qrcode.png")
def generate_qr(text, output=DEFAULT_OUTPUT, size=300,
fg_color="000000", bg_color="FFFFFF"):
"""生成二维码图片(Google Chart API,国内外均稳定)"""
if not text.startswith("http"):
text = "https://" + text
# Google Chart API (qrserver.com) - 全球最稳定的免费QR API
encoded = urllib.parse.quote(text)
url = (
f"https://api.qrserver.com/v1/create-qr-code/"
f"?size={size}x{size}"
f"&data={encoded}"
f"&color={fg_color}"
f"&bgcolor={bg_color}"
f"&margin=1"
)
os.makedirs(os.path.dirname(output) or ".", exist_ok=True)
try:
cmd = ["curl", "-s", "--max-time", "15", "-L", "-o", output, url]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=18)
if os.path.exists(output) and os.path.getsize(output) > 100:
return {"text": text, "file": os.path.abspath(output),
"size": size, "api": "qrserver.com", "status": "ok"}
else:
return {"text": text, "file": None, "error": "空响应或文件过小",
"status": "failed", "curl_stderr": r.stderr[:200]}
except Exception as e:
return {"text": text, "file": None, "error": str(e), "status": "failed"}
if __name__ == "__main__":
p = argparse.ArgumentParser(description="二维码生成器")
p.add_argument("text", help="二维码内容(URL或文本)")
p.add_argument("--output", "-o", default=DEFAULT_OUTPUT, help="输出文件路径")
p.add_argument("--size", "-s", type=int, default=300, help="二维码尺寸(像素)")
p.add_argument("--fg", default="000000", help="前景色HEX")
p.add_argument("--bg", default="FFFFFF", help="背景色HEX")
args = p.parse_args()
result = generate_qr(args.text, args.output, args.size, args.fg, args.bg)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result["status"] == "ok" else 1)
FILE:scripts/qrcode_generator.py
#!/usr/bin/env python3
"""cn-qrcode-generator - 二维码生成工具"""
import sys, os, base64, ssl, urllib.request, urllib.parse, certifi
def generate_qrcode(text, size=300, margin=4, format='png'):
"""生成二维码
使用qrserver.com API,无需安装依赖
SSL双层降级:优先 certifi 标准验证,失败后回退
"""
encoded_text = urllib.parse.quote(text)
url = f"https://api.qrserver.com/v1/create-qr-code/?size={size}x{size}&margin={margin}&format={format}&data={encoded_text}"
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
}
# 方案1:certifi 标准SSL验证
try:
ctx = ssl.create_default_context()
ctx.load_verify_locations(certifi.where())
req = urllib.request.Request(url, headers=headers)
response = urllib.request.urlopen(req, timeout=15, context=ctx)
data = response.read()
return {
"success": True,
"data": f"data:image/{format};base64,{base64.b64encode(data).decode()}",
"url": url
}
except Exception as e:
error_str = str(e)
if 'SSL' not in error_str and 'certificate' not in error_str:
return {"success": False, "error": error_str}
# SSL错误,降级到certifi回退
pass
# 方案2:certifi 回退(再试一次,确保失败才放弃)
try:
fallback_ctx = ssl.create_default_context()
fallback_ctx.load_verify_locations(certifi.where())
req = urllib.request.Request(url, headers=headers)
response = urllib.request.urlopen(req, timeout=15, context=fallback_ctx)
data = response.read()
return {
"success": True,
"data": f"data:image/{format};base64,{base64.b64encode(data).decode()}",
"url": url
}
except Exception as e:
return {"success": False, "error": str(e)}
if __name__ == '__main__':
text = sys.argv[1] if len(sys.argv) > 1 else "https://example.com"
result = generate_qrcode(text)
if result["success"]:
print(f"✅ 二维码生成成功!")
print(f" Base64长度: {len(result['data'])} 字符")
else:
print(f"❌ 生成失败: {result['error']}")
A股每日简报生成器。基于东方财富免费公开数据,一键生成大盘指数、板块涨跌、涨跌幅榜等简报。 中文优先,无需API Key,开箱即用。 当用户说"A股"、"今日行情"、"大盘指数"、"股票"、"今日A股"时触发。 Keywords: A股, 股票, 行情, 大盘, 涨跌, 东方财富, 简报.
---
name: cn-stock-brief
description: |
A股每日简报生成器。基于东方财富免费公开数据,一键生成大盘指数、板块涨跌、涨跌幅榜等简报。
中文优先,无需API Key,开箱即用。
当用户说"A股"、"今日行情"、"大盘指数"、"股票"、"今日A股"时触发。
Keywords: A股, 股票, 行情, 大盘, 涨跌, 东方财富, 简报.
metadata: {"openclaw": {"emoji": "📈"}}
---
# CN Stock Brief - A股每日简报
A股市场每日简报生成器,基于东方财富免费公开数据。
## 功能
- 📈 大盘指数:上证、深证、创业板、科创50
- 🏭 板块涨跌:TOP20 行业板块涨跌幅
- 🚀 涨幅榜:A股涨幅TOP10
- 📉 跌幅榜:A股跌幅TOP10
- 📊 多种输出格式:文本简报 / JSON
## 使用
### 每日简报(默认)
```bash
python3 ~/.qclaw/skills/cn-stock-brief/scripts/stock_brief.py
```
### JSON 格式(供程序处理)
```bash
python3 ~/.qclaw/skills/cn-stock-brief/scripts/stock_brief.py --json
```
### 仅指数数据
```bash
python3 ~/.qclaw/skills/cn-stock-brief/scripts/stock_brief.py --indices
```
### 仅板块数据
```bash
python3 ~/.qclaw/skills/cn-stock-brief/scripts/stock_brief.py --sectors
```
## 数据源
- 东方财富开放API(无需API Key,免费)
- 沪深A股实时行情
- 行业板块分类
## 依赖
- Python 3(系统自带)
- 无第三方依赖
## 注意
- ⚠️ 数据仅供参考,不构成投资建议
- 交易日才有实时数据,非交易日显示上一交易日数据
## 示例输出
```
📊 A股每日简报 | 2026-04-14 周二
========================================
📈 大盘指数
🟢 上证指数: 3285.50 (+1.23%)
🔴 深证成指: 10234.60 (-0.45%)
🟢 创业板指: 2034.80 (+0.67%)
🟢 科创50: 987.20 (+1.05%)
🏭 板块涨跌 TOP10
1. 半导体: +3.25%
2. 新能源: +2.18%
...
🚀 涨幅榜 TOP10
1. 中芯国际(688981): 85.60 +10.02%
...
📉 跌幅榜 TOP10
1. 某某股份(000001): 5.20 -8.50%
...
========================================
数据来源:东方财富 | 仅供参考,不构成投资建议
```
FILE:scripts/stock_brief.py
#!/usr/bin/env python3
"""A股每日简报生成器 - 使用免费公开数据源"""
import json
import sys
import os
import urllib.request
import urllib.error
import ssl
from datetime import datetime, timedelta
WORKSPACE = os.path.expanduser("~/.qclaw/workspace")
def fetch_json(url, retries=2):
"""带SSL降级的JSON获取"""
for attempt in range(retries + 1):
try:
ctx = ssl.create_default_context()
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10, context=ctx) as resp:
return json.loads(resp.read().decode("utf-8"))
except (ssl.SSLError, urllib.error.URLError) as e:
if attempt < retries and "SSL" in str(e):
ctx = ssl.create_default_context()
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10, context=ctx) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception:
continue
return None
except Exception:
return None
return None
def get_market_indices():
"""获取主要指数数据"""
indices = []
codes = {
"000001": "上证指数",
"399001": "深证成指",
"399006": "创业板指",
"000688": "科创50",
}
for code, name in codes.items():
url = f"https://push2.eastmoney.com/api/qt/stock/get?secid=1.{code}&fields=f43,f44,f45,f46,f47,f170&ut=fa5fd1943c7b386f172d6893dbfba10b"
data = fetch_json(url)
if data and data.get("data"):
d = data["data"]
price = d.get("f43", 0) / 100
change_pct = d.get("f170", 0) / 100
change_amt = d.get("f169", 0) / 100
vol = d.get("f47", 0)
amt = d.get("f48", 0)
indices.append({
"name": name,
"price": round(price, 2),
"change_pct": round(change_pct, 2),
"change_amt": round(change_amt, 2),
"volume": vol,
})
return indices
def get_top_gainers(limit=10):
"""获取涨幅排行"""
url = f"https://push2.eastmoney.com/api/qt/clist/get?pn=1&pz={limit}&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&fid=f3&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,m:0+t:81+s:2048&fields=f2,f3,f12,f14"
data = fetch_json(url)
results = []
if data and data.get("data") and data["data"].get("diff"):
for item in data["data"]["diff"][:limit]:
results.append({
"code": item.get("f12", ""),
"name": item.get("f14", ""),
"price": item.get("f2", 0),
"change_pct": item.get("f3", 0),
})
return results
def get_top_losers(limit=10):
"""获取跌幅排行"""
url = f"https://push2.eastmoney.com/api/qt/clist/get?pn=1&pz={limit}&po=0&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&fid=f3&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,m:0+t:81+s:2048&fields=f2,f3,f12,f14"
data = fetch_json(url)
results = []
if data and data.get("data") and data["data"].get("diff"):
for item in data["data"]["diff"][:limit]:
results.append({
"code": item.get("f12", ""),
"name": item.get("f14", ""),
"price": item.get("f2", 0),
"change_pct": item.get("f3", 0),
})
return results
def get_sector_performance():
"""获取板块涨跌"""
url = "https://push2.eastmoney.com/api/qt/clist/get?pn=1&pz=20&po=1&np=1&ut=bd1d9ddb04089700cf9c27f6f7426281&fltt=2&invt=2&fid=f3&fs=m:90+t:2&fields=f2,f3,f14"
data = fetch_json(url)
results = []
if data and data.get("data") and data["data"].get("diff"):
for item in data["data"]["diff"][:20]:
results.append({
"name": item.get("f14", ""),
"change_pct": item.get("f3", 0),
})
return results
def format_brief(indices, gainers, losers, sectors):
"""格式化简报"""
today = datetime.now().strftime("%Y-%m-%d")
weekday = ["周一","周二","周三","周四","周五","周六","周日"][datetime.now().weekday()]
lines = [f"📊 A股每日简报 | {today} {weekday}", "=" * 40]
# 指数
lines.append("\n📈 大盘指数")
for idx in indices:
arrow = "🟢" if idx["change_pct"] >= 0 else "🔴"
sign = "+" if idx["change_pct"] >= 0 else ""
lines.append(f" {arrow} {idx['name']}: {idx['price']:.2f} ({sign}{idx['change_pct']:.2f}%)")
# 板块
if sectors:
lines.append("\n🏭 板块涨跌 TOP10")
for i, s in enumerate(sectors[:10], 1):
sign = "+" if s["change_pct"] >= 0 else ""
lines.append(f" {i}. {s['name']}: {sign}{s['change_pct']:.2f}%")
# 涨幅榜
if gainers:
lines.append("\n🚀 涨幅榜 TOP10")
for i, g in enumerate(gainers[:10], 1):
lines.append(f" {i}. {g['name']}({g['code']}): {g['price']:.2f} +{g['change_pct']:.2f}%")
# 跌幅榜
if losers:
lines.append("\n📉 跌幅榜 TOP10")
for i, l in enumerate(losers[:10], 1):
lines.append(f" {i}. {l['name']}({l['code']}): {l['price']:.2f} {l['change_pct']:.2f}%")
lines.append("\n" + "=" * 40)
lines.append("数据来源:东方财富 | 仅供参考,不构成投资建议")
return "\n".join(lines)
def main():
args = sys.argv[1:]
mode = "brief" # default
if "--json" in args:
mode = "json"
elif "--indices" in args:
mode = "indices"
elif "--sectors" in args:
mode = "sectors"
print("🔍 正在获取A股数据...")
indices = get_market_indices()
if mode == "indices":
print(json.dumps(indices, ensure_ascii=False, indent=2))
return
if mode == "sectors":
sectors = get_sector_performance()
print(json.dumps(sectors, ensure_ascii=False, indent=2))
return
gainers = get_top_gainers(10)
losers = get_top_losers(10)
sectors = get_sector_performance()
if mode == "json":
result = {"indices": indices, "gainers": gainers, "losers": losers, "sectors": sectors}
print(json.dumps(result, ensure_ascii=False, indent=2))
return
# 默认:格式化简报
print(format_brief(indices, gainers, losers, sectors))
if __name__ == "__main__":
main()
AI Token消耗监控优化工具。读取会话日志,统计Token消耗,检测异常模式(短时激增、重复失败等),提供优化建议,生成消耗趋势报告。 中文优先,面向QClaw/OpenClaw用户。 当用户说"Token消耗"、"费用多少"、"Token统计"、"超支"、"优化建议"时触发。 Keywords: Token...
---
name: qclaw-token-monitor
description: |
AI Token消耗监控优化工具。读取会话日志,统计Token消耗,检测异常模式(短时激增、重复失败等),提供优化建议,生成消耗趋势报告。
中文优先,面向QClaw/OpenClaw用户。
当用户说"Token消耗"、"费用多少"、"Token统计"、"超支"、"优化建议"时触发。
Keywords: Token, 消耗, 费用, 监控, 优化, 趋势, 统计, 异常检测.
metadata: {"openclaw": {"emoji": "📊"}}
---
# Token消耗监控优化
监控 AI Token 消耗,检测异常,提供优化建议。
## 功能
- 📊 读取 session 日志统计 Token 消耗
- 🚨 检测异常消耗(短时激增、重复失败等)
- 💡 给出优化建议
- 📈 生成消耗趋势报告
## 使用
### 检查今日消耗
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --today
```
### 检查指定日期
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --date 2026-04-14
```
### 检查异常模式
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --check-anomaly
```
### 生成优化报告
```bash
python3 ~/.qclaw/skills/token-monitor/scripts/token_stats.py --report
```
## 异常检测规则
| 规则 | 阈值 | 说明 |
|------|------|------|
| 单日总量 | >1000万 | 严重超标 |
| 单小时 | >200万 | 需要检查 |
| 重复失败 | >3次 | 死循环风险 |
| 浏览器快照 | 单次>5000字 | 未压缩 |
## 历史数据参考
| 日期 | Input | Output | Total | 状态 |
|------|-------|--------|-------|------|
| 4/11 | 3.46M | 117K | 28.8M | 🚨 超标 |
| 4/12 | 5.57M | 127K | 37.0M | 🚨 严重超标 |
| 4/13 | 1.80M | 22K | 9.5M | ⚠️ 偏高 |
| 4/14 | 430K | 20K | 3.7M | ✅ 正常 |
**4/12是Token浪费最严重的一天**:3700万token,根因是Chrome CDP崩溃重试+ClawHub限速重试
## 优化建议库
- 浏览器快照用 `compact=true`
- 同一操作失败3次立即换方案
- 限速不硬等,跳转做其他事
- 后台进程不轮询
- Context超过150K时触发LCM压缩
## 数据存储
- 日志:`~/.qclaw/agents/main/sessions/*.jsonl`
- 统计:`memory/token-usage-YYYY-MM-DD.json`
## 依赖
- Python 3(系统自带)
- 无第三方依赖
中文待办事项管理。添加/完成/查看待办,优先级排序。 本地存储,无需账号。 当用户说"待办"、"todo"、"任务清单"、"今天要做什么"时触发。 Keywords: 待办, todo, 任务, 清单, task, checklist.
---
name: cn-todo-tracker
description: |
中文待办事项管理。添加/完成/查看待办,优先级排序。
本地存储,无需账号。
当用户说"待办"、"todo"、"任务清单"、"今天要做什么"时触发。
Keywords: 待办, todo, 任务, 清单, task, checklist.
metadata: {"openclaw": {"emoji": "✅"}}
---
# ✅ CN Todo Tracker — 中文待办事项
管理你的待办,一件件搞定。
## 核心功能
| 功能 | 说明 |
|------|------|
| 添加待办 | 一句话添加,自动识别优先级 |
| 完成标记 | 标记完成,记录完成时间 |
| 列表查看 | 按优先级排序,今日/本周/全部 |
| 统计 | 完成率、待办趋势 |
## 使用方式
```bash
# 添加待办
python3 scripts/todo.py --add "完成周报" --priority high
python3 scripts/todo.py --add "回复客户邮件" --priority medium
python3 scripts/todo.py --add "整理桌面"
# 完成待办
python3 scripts/todo.py --done 1
# 查看待办
python3 scripts/todo.py --list
python3 scripts/todo.py --today
# 统计
python3 scripts/todo.py --stats
```
## 数据存储
本地 JSON:~/.qclaw/workspace/todos.json
FILE:scripts/todo.py
#!/usr/bin/env python3
"""中文待办事项 - cn-todo-tracker"""
import json, os, sys, argparse
from datetime import datetime
WORKSPACE = os.path.expanduser("~/.qclaw/workspace")
DATA_FILE = os.path.join(WORKSPACE, "todos.json")
PRIORITIES = {"high": "🔴", "medium": "🟡", "low": "🟢"}
def load():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"todos": [], "next_id": 1}
def save(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def add_todo(text, priority="medium"):
data = load()
todo = {
"id": data["next_id"],
"text": text,
"priority": priority,
"done": False,
"created": datetime.now().isoformat(),
"completed": None
}
data["todos"].append(todo)
data["next_id"] += 1
save(data)
emoji = PRIORITIES.get(priority, "🟡")
print(f"\n✅ {emoji} [{todo['id']}] {text}")
def done_todo(todo_id):
data = load()
for t in data["todos"]:
if t["id"] == todo_id and not t["done"]:
t["done"] = True
t["completed"] = datetime.now().isoformat()
save(data)
print(f"\n🎉 完成待办 [{todo_id}] {t['text']}")
return
print(f"\n⚠️ 未找到待办 #{todo_id}")
def list_todos(show="all"):
data = load()
todos = data["todos"]
if show == "today":
today = datetime.now().strftime("%Y-%m-%d")
todos = [t for t in todos if t["created"].startswith(today)]
elif show == "pending":
todos = [t for t in todos if not t["done"]]
if not todos:
print("\n📭 没有待办事项")
return
# 按优先级排序
order = {"high": 0, "medium": 1, "low": 2}
todos.sort(key=lambda t: (t["done"], order.get(t["priority"], 1)))
print("\n✅ 待办事项")
print("=" * 40)
for t in todos:
emoji = PRIORITIES.get(t["priority"], "🟡")
status = "☑️" if t["done"] else "☐"
print(f" {status} {emoji} [{t['id']}] {t['text']}")
def stats():
data = load()
todos = data["todos"]
total = len(todos)
done = sum(1 for t in todos if t["done"])
pending = total - done
rate = done / total * 100 if total else 0
bar = "▓" * int(rate / 10) + "░" * (10 - int(rate / 10))
print(f"\n📊 待办统计")
print(f" 总计: {total} | 完成: {done} | 待做: {pending}")
print(f" 完成率: {rate:.0f}% {bar}")
def main():
parser = argparse.ArgumentParser(description="✅ 中文待办事项")
parser.add_argument("--add", help="添加待办")
parser.add_argument("--priority", "-p", default="medium", choices=["high", "medium", "low"])
parser.add_argument("--done", type=int, help="完成待办(ID)")
parser.add_argument("--list", action="store_true", help="全部待办")
parser.add_argument("--today", action="store_true", help="今日待办")
parser.add_argument("--stats", action="store_true", help="统计")
args = parser.parse_args()
if args.add:
add_todo(args.add, args.priority)
elif args.done:
done_todo(args.done)
elif args.today:
list_todos("today")
elif args.stats:
stats()
else:
list_todos("pending")
if __name__ == "__main__":
main()中文饮食记录助手。记录每日饮食、计算热量、营养分析。 本地存储,无需账号。 当用户说"饮食记录"、"今天吃了什么"、"热量计算"、"营养分析"时触发。 Keywords: 饮食, 热量, 卡路里, 营养, diet, food, meal.
---
name: cn-diet-tracker
description: |
中文饮食记录助手。记录每日饮食、计算热量、营养分析。
本地存储,无需账号。
当用户说"饮食记录"、"今天吃了什么"、"热量计算"、"营养分析"时触发。
Keywords: 饮食, 热量, 卡路里, 营养, diet, food, meal.
metadata: {"openclaw": {"emoji": "🥗"}}
---
# 🥗 CN Diet Tracker — 中文饮食记录
记录饮食,关注健康。
## 核心功能
| 功能 | 说明 |
|------|------|
| 记录饮食 | 一句话记录:食物名+估算热量 |
| 热量统计 | 今日/本周摄入 |
| 营养分析 | 碳水/蛋白/脂肪占比 |
| 目标管理 | 设定每日热量目标 |
## 使用方式
```bash
# 记录一餐
python3 scripts/diet.py --add "白米饭一碗" 230 --category 主食
python3 scripts/diet.py --add "番茄炒蛋" 180 --category 菜品
python3 scripts/diet.py --add "苹果" 80 --category 水果
# 今日统计
python3 scripts/diet.py --today
# 设定目标
python3 scripts/diet.py --target 2000
# 周报
python3 scripts/diet.py --week
```
## 数据存储
本地 JSON:~/.qclaw/workspace/diet.json
FILE:scripts/diet.py
#!/usr/bin/env python3
"""中文饮食记录 - cn-diet-tracker"""
import json, os, sys, argparse
from datetime import datetime, timedelta
from collections import defaultdict
WORKSPACE = os.path.expanduser("~/.qclaw/workspace")
DATA_FILE = os.path.join(WORKSPACE, "diet.json")
CATEGORIES = {
"主食": "🍚", "菜品": "🍳", "汤": "🍲", "水果": "🍎",
"饮品": "🥤", "零食": "🍪", "外卖": "🥡", "其他": "📌"
}
def load():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"records": [], "target": 2000}
def save(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def add(food, calories, category="其他", note=""):
data = load()
record = {
"id": len(data["records"]) + 1,
"date": datetime.now().strftime("%Y-%m-%d"),
"datetime": datetime.now().isoformat(),
"food": food,
"calories": float(calories),
"category": category,
"note": note
}
data["records"].append(record)
save(data)
emoji = CATEGORIES.get(category, "📌")
today_total = sum(r["calories"] for r in data["records"] if r["date"] == record["date"])
target = data.get("target", 2000)
pct = today_total / target * 100
status = "🔴超了" if pct > 100 else f"🟢{pct:.0f}%"
print(f"\n✅ {emoji} {food} {calories:.0f}kcal")
print(f" 今日合计: {today_total:.0f}/{target}kcal {status}")
def today_report():
data = load()
today = datetime.now().strftime("%Y-%m-%d")
today_recs = [r for r in data["records"] if r["date"] == today]
total = sum(r["calories"] for r in today_recs)
target = data.get("target", 2000)
print(f"\n🥗 今日饮食({today})")
print("=" * 40)
if not today_recs:
print(" 📭 还没记录")
return
for r in today_recs:
emoji = CATEGORIES.get(r["category"], "📌")
print(f" {emoji} {r['food']} {r['calories']:.0f}kcal")
print(f"\n 合计: {total:.0f}/{target}kcal", end="")
if total > target:
print(" 🔴超了!")
elif total > target * 0.8:
print(" 🟡接近目标")
else:
print(" 🟢正常")
def week_report():
data = load()
now = datetime.now()
start = now - timedelta(days=now.weekday())
week_recs = [r for r in data["records"]
if datetime.fromisoformat(r["datetime"]) >= start]
if not week_recs:
print("\n📭 本周无记录")
return
by_day = defaultdict(float)
for r in week_recs:
by_day[r["date"]] += r["calories"]
target = data.get("target", 2000)
print(f"\n🥗 本周饮食报告")
print("=" * 40)
for date in sorted(by_day.keys()):
cal = by_day[date]
bar = "▓" * int(cal / target * 10)
status = "🔴" if cal > target else "🟢"
print(f" {date} {cal:>7.0f}kcal {bar} {status}")
avg = sum(by_day.values()) / len(by_day)
print(f"\n 日均: {avg:.0f}kcal / 目标: {target}kcal")
def main():
parser = argparse.ArgumentParser(description="🥗 中文饮食记录")
parser.add_argument("--add", nargs="+", help="食物名 热量")
parser.add_argument("--category", "-c", default="其他", help="分类")
parser.add_argument("--note", help="备注")
parser.add_argument("--today", action="store_true", help="今日统计")
parser.add_argument("--week", action="store_true", help="周报")
parser.add_argument("--target", type=float, help="设定每日热量目标")
args = parser.parse_args()
if args.add:
food = args.add[0] if len(args.add) > 1 else "食物"
cal = float(args.add[1] if len(args.add) > 1 else args.add[0])
if len(args.add) == 1:
food = "食物"
add(food, cal, args.category, args.note or "")
elif args.today:
today_report()
elif args.week:
week_report()
elif args.target:
data = load()
data["target"] = args.target
save(data)
print(f"\n✅ 每日热量目标设为 {args.target:.0f}kcal")
else:
today_report()
if __name__ == "__main__":
main()中文新闻简报。每日热点新闻摘要,分类呈现,3分钟了解天下事。 当用户说"新闻"、"今天有什么新闻"、"热点"、"今日要闻"时触发。 Keywords: 新闻, 热点, 要闻, news, 简报, 日报.
---
name: cn-news-brief
description: |
中文新闻简报。每日热点新闻摘要,分类呈现,3分钟了解天下事。
当用户说"新闻"、"今天有什么新闻"、"热点"、"今日要闻"时触发。
Keywords: 新闻, 热点, 要闻, news, 简报, 日报.
metadata: {"openclaw": {"emoji": "📰"}}
---
# 📰 CN News Brief — 中文新闻简报
3分钟了解天下事。
## 核心功能
| 功能 | 说明 |
|------|------|
| 每日简报 | 国内+国际+科技+财经,分类呈现 |
| AI摘要 | 每条新闻一句话总结 |
| 多源聚合 | 央视/新华网/澎湃/36氪/虎嗅 |
| 情绪标注 | 正面/负面/中性,快速判断基调 |
## 使用方式
```bash
# 今日简报
python3 scripts/news_brief.py --today
# 指定分类
python3 scripts/news_brief.py --category 科技
# 输出JSON
python3 scripts/news_brief.py --today --json
# 指定条数
python3 scripts/news_brief.py --today --limit 5
```
## 示例输出
```
📰 今日新闻简报(4月13日)
━━━━━━━━━━━━━━━━━━━━━━
🇨🇳 国内
1. 🔵 多所高校强基计划取消竞赛破格(新华网)
2. 🔵 24岁主播直播晕倒被辞退,已申请仲裁(澎湃)
🌍 国际
3. 🔴 美国3月CPI创两年最大涨幅(路透社)
💻 科技
4. 🟢 OpenClaw发布新版智能体框架(36氪)
5. 🔵 华为自研芯片良率突破(虎嗅)
💰 财经
6. 🔴 伊朗资产解冻引发市场波动(财联社)
```
## 数据源
| 来源 | 类型 | 更新频率 |
|------|------|---------|
| 百度热搜 | 综合 | 实时 |
| 微博热搜 | 社交 | 实时 |
| 36氪 | 科技 | 每日 |
| 虎嗅 | 科技商业 | 每日 |
| 澎湃新闻 | 综合 | 实时 |
本地运行,无需API Key。
FILE:scripts/news_brief.py
#!/usr/bin/env python3
"""
中文新闻简报 - cn-news-brief
多源聚合 + 分类 + AI摘要
"""
import json
import os
import sys
import ssl
import argparse
import urllib.request
from datetime import datetime
from collections import defaultdict
CATEGORIES = {
"国内": ["中国", "国内", "政策", "高考", "教育", "医疗", "社会", "法院", "公安", "政府", "国务院", "部委"],
"国际": ["美国", "国际", "全球", "外交", "战争", "联合国", "欧洲", "亚洲", "中东", "伊朗", "俄"],
"科技": ["AI", "人工智能", "芯片", "互联网", "手机", "科技", "算法", "模型", "OpenAI", "华为", "苹果", "代码", "编程"],
"财经": ["股市", "基金", "经济", "金融", "CPI", "GDP", "央行", "利率", "房价", "市场", "投资", "IPO"],
"娱乐": ["电影", "综艺", "明星", "音乐", "游戏", "剧集", "歌手", "演员", "导演"],
"体育": ["足球", "篮球", "奥运", "比赛", "冠军", "运动员", "NBA", "中超", "羽毛球", "乒乓球"],
}
EMOJI_MAP = {
"国内": "🇨🇳",
"国际": "🌍",
"科技": "💻",
"财经": "💰",
"娱乐": "🎬",
"体育": "⚽",
}
SENTIMENT_POSITIVE = ["突破", "增长", "创新", "成功", "上市", "升级", "发布", "改善", "提升", "首位"]
SENTIMENT_NEGATIVE = ["下降", "危机", "冲突", "灾难", "违规", "处罚", "崩溃", "暴跌", "辞退", "违规"]
def classify(title):
"""分类新闻"""
for cat, keywords in CATEGORIES.items():
for kw in keywords:
if kw.lower() in title.lower():
return cat
return "其他"
def sentiment(title):
"""情绪标注"""
pos = sum(1 for w in SENTIMENT_POSITIVE if w in title)
neg = sum(1 for w in SENTIMENT_NEGATIVE if w in title)
if pos > neg:
return "🟢"
elif neg > pos:
return "🔴"
return "🔵"
def _urlopen(url, timeout=10):
"""双层SSL降级"""
try:
ctx = ssl.create_default_context()
return urllib.request.urlopen(url, timeout=timeout, context=ctx)
except (ssl.SSLError, ssl.SSLCertVerificationError):
ctx = ssl.create_default_context()
return urllib.request.urlopen(url, timeout=timeout, context=ctx)
except urllib.error.URLError as e:
if "SSL" in str(e.reason) or "certificate" in str(e.reason).lower():
ctx = ssl.create_default_context()
return urllib.request.urlopen(url, timeout=timeout, context=ctx)
raise
def fetch_baidu():
"""百度热搜"""
items = []
try:
resp = _urlopen("https://top.baidu.com/api/board?platform=wise&tab=realtime")
data = json.loads(resp.read().decode("utf-8"))
cards = data.get("data", {}).get("cards", [])
for card in cards:
for item in card.get("content", []):
title = item.get("query", item.get("word", ""))
desc = item.get("desc", "")
if title:
items.append({
"title": title,
"desc": desc,
"source": "百度",
"hot": item.get("hotScore", 0),
})
except Exception as e:
print(f"百度: {e}", file=sys.stderr)
return items
def fetch_weibo():
"""微博热搜"""
items = []
try:
req = urllib.request.Request(
"https://weibo.com/ajax/side/hotSearch",
headers={"User-Agent": "Mozilla/5.0", "Referer": "https://weibo.com/"}
)
resp = _urlopen(req)
data = json.loads(resp.read().decode("utf-8"))
for item in data.get("data", {}).get("realtime", []):
title = item.get("note", item.get("word", ""))
if title:
items.append({
"title": title,
"desc": "",
"source": "微博",
"hot": item.get("num", 0),
})
except Exception as e:
print(f"微博: {e}", file=sys.stderr)
return items
def fetch_zhihu():
"""知乎热榜"""
items = []
try:
req = urllib.request.Request(
"https://api.zhihu.com/topstory/hot-lists/total?limit=20",
headers={"User-Agent": "Mozilla/5.0"}
)
resp = _urlopen(req)
data = json.loads(resp.read().decode("utf-8"))
for item in data.get("data", []):
target = item.get("target", {})
title = target.get("title", "")
excerpt = target.get("excerpt", "")
if title:
items.append({
"title": title,
"desc": excerpt,
"source": "知乎",
"hot": item.get("detail_text", "0").replace("万热度", "0000"),
})
except Exception as e:
print(f"知乎: {e}", file=sys.stderr)
return items
def dedup(items):
"""去重"""
seen = set()
result = []
for item in items:
key = item["title"][:15]
if key not in seen:
seen.add(key)
result.append(item)
return result
def format_report(items, category=None, limit=10):
"""格式化报告"""
by_cat = defaultdict(list)
for item in items:
cat = classify(item["title"])
item["category"] = cat
item["sentiment"] = sentiment(item["title"])
by_cat[cat].append(item)
today = datetime.now().strftime("%-m月%-d日")
lines = [f"\n📰 今日新闻简报({today})"]
lines.append("━" * 30)
cats = [category] if category else ["国内", "国际", "科技", "财经", "娱乐", "体育", "其他"]
count = 0
for cat in cats:
if cat not in by_cat:
continue
emoji = EMOJI_MAP.get(cat, "📌")
lines.append(f"\n{emoji} {cat}")
for item in by_cat[cat]:
if count >= limit:
break
count += 1
s = item["sentiment"]
title = item["title"]
source = item["source"]
lines.append(f" {count}. {s} {title}({source})")
if count >= limit:
break
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="📰 中文新闻简报")
parser.add_argument("--today", action="store_true", help="今日简报")
parser.add_argument("--category", "-c", help="指定分类")
parser.add_argument("--limit", type=int, default=20, help="条数")
parser.add_argument("--json", action="store_true", help="输出JSON")
args = parser.parse_args()
items = []
items.extend(fetch_baidu())
items.extend(fetch_weibo())
items.extend(fetch_zhihu())
items = dedup(items)
if args.json:
print(json.dumps(items[:args.limit], ensure_ascii=False, indent=2))
else:
print(format_report(items, args.category, args.limit))
if __name__ == "__main__":
main()中文习惯打卡追踪器。每日打卡、连续天数、习惯统计。 本地存储,无账户,隐私安全。 当用户说"打卡"、"习惯"、"连续多少天"、"今天完成了吗"时触发。 Keywords: 打卡, 习惯, 追踪, streak, habit, 日签, 坚持.
---
name: cn-habits-tracker
description: |
中文习惯打卡追踪器。每日打卡、连续天数、习惯统计。
本地存储,无账户,隐私安全。
当用户说"打卡"、"习惯"、"连续多少天"、"今天完成了吗"时触发。
Keywords: 打卡, 习惯, 追踪, streak, habit, 日签, 坚持.
metadata: {"openclaw": {"emoji": "🎯"}}
---
# 🎯 CN Habits Tracker — 中文习惯打卡
坚持记录,养成好习惯。
## 核心功能
| 功能 | 说明 |
|------|------|
| 打卡 | 每日完成习惯后打卡 |
| 连续天数 | 自动计算连续打卡记录 |
| 习惯管理 | 添加/删除/查看习惯列表 |
| 统计报告 | 周报/月报,完不成原因分析 |
| 提醒 | 每日定时提醒未打卡习惯 |
## 使用方式
```bash
# 打卡
python3 scripts/habits.py --checkin "早起"
python3 scripts/habits.py --checkin "喝水" --amount "8杯"
# 今日状态
python3 scripts/habits.py --today
# 添加习惯
python3 scripts/habits.py --add "早起" --goal "每天7点前起床" --unit "天"
python3 scripts/habits.py --add "喝水" --goal "每天喝8杯水" --unit "杯"
python3 scripts/habits.py --add "读书" --goal "每天读30分钟" --unit "分钟"
# 查看所有习惯
python3 scripts/habits.py --list
# 周报
python3 scripts/habits.py --report week
# 月报
python3 scripts/habits.py --report month
# 删除习惯
python3 scripts/habits.py --delete "早起"
# 未打卡提醒
python3 scripts/habits.py --remind
```
## 示例输出
```
🎯 今日打卡(4月13日)
━━━━━━━━━━━━━━━━━━━━━━
☑ 早起 已完成(连续7天)🔥
☑ 喝水 6/8杯(还差2杯)
☑ 读书 30分钟 ✅
☐ 运动 未打卡
📊 本周完成率:
早起 ▓▓▓▓▓▓▓░ 85.7%(6/7天)
喝水 ▓▓▓▓▓▓▓▓ 100%(7/7天)
读书 ▓▓▓▓▓▓░░ 71.4%(5/7天)
⚠️ 今天还需要:
• 喝水:再喝2杯
• 运动:跑步30分钟
```
## 数据存储
本地 JSON:`~/.qclaw/workspace/habits.json`
FILE:scripts/habits.py
#!/usr/bin/env python3
"""
中文习惯打卡追踪器 - cn-habits-tracker
每日打卡 + 连续天数 + 统计报告
用法:
python3 habits.py --checkin "早起"
python3 habits.py --add "早起" --goal "每天7点前" --unit "天"
python3 habits.py --today
python3 habits.py --report week
"""
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
from collections import defaultdict
WORKSPACE = os.path.expanduser("~/.qclaw/workspace")
DATA_FILE = os.path.join(WORKSPACE, "habits.json")
def load():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {"habits": {}, "records": []}
def save(data):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def today_str():
return datetime.now().strftime("%Y-%m-%d")
def streak_for(records, habit_id):
"""计算连续打卡天数"""
today = today_str()
dates = set(r["date"] for r in records if r.get("habit_id") == habit_id or r.get("habit") == habit_id)
if not dates:
return 0
# 从今天/昨天开始算
streak = 0
d = datetime.now()
for _ in range(365):
ds = d.strftime("%Y-%m-%d")
if ds in dates:
streak += 1
d -= timedelta(days=1)
else:
# 昨天没打卡就断了
if d.strftime("%Y-%m-%d") == today:
d -= timedelta(days=1)
if d.strftime("%Y-%m-%d") in dates:
streak += 1
d -= timedelta(days=1)
else:
break
else:
break
return streak
def add_habit(name, goal="", unit="天"):
data = load()
if name in data["habits"]:
print("\n⚠️ 习惯「{}」已存在".format(name))
return
habit_id = len(data["habits"]) + 1
data["habits"][str(habit_id)] = {
"name": name,
"goal": goal,
"unit": unit,
"created": today_str(),
"id": habit_id
}
save(data)
print("\n✅ 习惯「{}」已添加(目标:{})".format(name, goal or "每日打卡"))
def checkin(name, amount="", note=""):
data = load()
today = today_str()
# 找习惯
habit_id = None
for hid, h in data["habits"].items():
if h["name"] == name:
habit_id = hid
break
if not habit_id:
print("\n⚠️ 未找到习惯「{}」,先添加:--add \"{}\"".format(name, name))
return
# 检查今天是否已打卡
today_record = [r for r in data["records"] if r.get("habit_id") == habit_id and r["date"] == today]
if today_record:
today_record[0]["amount"] = amount
today_record[0]["note"] = note
print("\n🔄 已更新今日打卡记录")
save(data)
else:
data["records"].append({
"habit_id": habit_id,
"date": today,
"datetime": datetime.now().isoformat(),
"amount": amount,
"note": note
})
streak = streak_for(data["records"], habit_id)
emoji = "🔥" if streak >= 3 else "✅"
print("\n{} 打卡成功!「{}」连续{}天 {}!".format(
emoji, name, streak, data["habits"][habit_id]["unit"]))
save(data)
def show_today():
data = load()
today = today_str()
habits = data["habits"]
records = data["records"]
today_records = {r.get("habit_id"): r for r in records if r["date"] == today}
print("\n🎯 今日打卡({})".format(today))
print("=" * 44)
if not habits:
print(" 📭 还没有习惯,添加一个:--add \"习惯名\"")
return
incomplete = []
for hid, h in habits.items():
record = today_records.get(hid)
if record:
amount_str = "({})".format(record.get("amount", "")) if record.get("amount") else ""
streak = streak_for(records, hid)
fire = "🔥" * min(streak // 7 + 1, 3)
print(" ☑ {} {} 已打卡 {} 连续{}天 {}".format(
fire, h["name"], amount_str, streak, fire))
else:
incomplete.append(h)
print(" ☐ {} 未打卡".format(h["name"]))
if incomplete:
print("\n⚠️ 今天还需要完成:")
for h in incomplete:
print(" • {}:{}".format(h["name"], h["goal"]))
def list_habits():
data = load()
habits = data["habits"]
records = data["records"]
if not habits:
print("\n📭 还没有习惯")
return
print("\n📋 习惯列表(共{}个)".format(len(habits)))
print("-" * 44)
for hid, h in habits.items():
streak = streak_for(records, hid)
fire = "🔥" * min(streak // 7 + 1, 3)
print(" {} {} 连续{}天 {}".format(
"☑" if streak > 0 else "☐", h["name"], streak, fire))
if h["goal"]:
print(" 目标:{}".format(h["goal"]))
def report(period="week"):
data = load()
habits = data["habits"]
records = data["records"]
now = datetime.now()
if period == "week":
start = now - timedelta(days=now.weekday())
else:
start = datetime(now.year, now.month, 1)
period_records = [
r for r in records
if start <= datetime.fromisoformat(r["datetime"]) <= now
]
if not habits:
print("\n📭 还没有习惯")
return
days = (now - start).days + 1
print("\n📊 {}报告({}月{}日-{}月{}日,共{}天)".format(
"周" if period == "week" else "月",
start.month, start.day, now.month, now.day, days))
print("=" * 44)
for hid, h in habits.items():
habit_records = [r for r in period_records if r.get("habit_id") == hid]
done_days = len(set(r["date"] for r in habit_records))
rate = done_days / days * 100
bar_len = int(rate / 10)
bar = "▓" * bar_len + "░" * (10 - bar_len)
status = "✅" if rate >= 80 else ("🟡" if rate >= 50 else "❌")
streak = streak_for(records, hid)
print(" {} {} {:>6.1f}% {} {}/{}天 连续{}天".format(
status, h["name"][:6].ljust(6), rate, bar, done_days, days, streak))
def remind():
data = load()
today = today_str()
habits = data["habits"]
records = data["records"]
today_records = set(r.get("habit_id") for r in records if r["date"] == today)
incomplete = [h for hid, h in habits.items() if hid not in today_records]
if incomplete:
print("\n📌 今日待完成习惯:")
for h in incomplete:
print(" ☐ {}:{}".format(h["name"], h.get("goal", "打卡")))
else:
print("\n🎉 今日习惯全部完成!")
def delete_habit(name):
data = load()
habit_id = None
for hid, h in data["habits"].items():
if h["name"] == name:
habit_id = hid
break
if not habit_id:
print("\n⚠️ 未找到习惯「{}」".format(name))
return
del data["habits"][habit_id]
data["records"] = [r for r in data["records"] if r.get("habit_id") != habit_id]
save(data)
print("\n✅ 习惯「{}」已删除".format(name))
def main():
parser = argparse.ArgumentParser(description="🎯 中文习惯打卡")
parser.add_argument("--add",
help="添加习惯")
parser.add_argument("--goal",
help="习惯目标")
parser.add_argument("--unit", default="天",
help="打卡单位(默认:天)")
parser.add_argument("--checkin",
help="打卡")
parser.add_argument("--amount",
help="打卡数量/程度")
parser.add_argument("--note",
help="备注")
parser.add_argument("--today", action="store_true",
help="今日状态")
parser.add_argument("--list", action="store_true",
help="习惯列表")
parser.add_argument("--report",
choices=["week", "month"],
help="报告(week/month)")
parser.add_argument("--delete",
help="删除习惯")
parser.add_argument("--remind", action="store_true",
help="未打卡提醒")
args = parser.parse_args()
if args.add:
add_habit(args.add, args.goal or "", args.unit)
elif args.checkin:
checkin(args.checkin, args.amount or "", args.note or "")
elif args.today:
show_today()
elif args.list:
list_habits()
elif args.report:
report(args.report)
elif args.remind:
remind()
elif args.delete:
delete_habit(args.delete)
else:
show_today()
if __name__ == "__main__":
main()
中文个人记账助手。随手记、查统计、设预算、看趋势。 本地存储,无账户、无云端、隐私安全。 当用户说"记账"、"花了多少钱"、"本月支出"、"预算提醒"时触发。 Keywords: 记账, 支出, 预算, 花费, expense, 收支, 月统计.
---
name: cn-expense-tracker
description: |
中文个人记账助手。随手记、查统计、设预算、看趋势。
本地存储,无账户、无云端、隐私安全。
当用户说"记账"、"花了多少钱"、"本月支出"、"预算提醒"时触发。
Keywords: 记账, 支出, 预算, 花费, expense, 收支, 月统计.
metadata: {"openclaw": {"emoji": "💰"}}
---
# 💰 CN Expense Tracker — 中文个人记账助手
本地记账,无账户、无云端、隐私安全。
## 核心功能
| 功能 | 说明 |
|------|------|
| 随手记 | 一句话记账:金额 + 类别 + 备注 |
| 月统计 | 本月花了多少、各类占比 |
| 预算管理 | 设置月度预算,超支提醒 |
| 趋势分析 | 月度对比,涨了还是降了 |
| 导出CSV | 方便 Excel 分析 |
## 使用方式
```bash
# 记一笔
python3 scripts/expense.py --add "午餐" 25 --category 餐饮
python3 scripts/expense.py --add "打车" 38 --category 交通 --note "赶时间"
python3 scripts/expense.py --add "咖啡" 22 --category 饮品
# 本月统计
python3 scripts/expense.py --month
# 设置预算
python3 scripts/expense.py --budget 3000
# 月度对比
python3 scripts/expense.py --compare
# 查看所有记录
python3 scripts/expense.py --list
# 删除记录
python3 scripts/expense.py --delete 3
# 导出CSV
python3 scripts/expense.py --export
# 快速记账(交互)
python3 scripts/expense.py --interactive
```
## 类别预设
餐饮 | 交通 | 购物 | 娱乐 | 居住 | 医疗 | 教育 | 通讯 | 服饰 | 护肤 | 饮品 | 其他
## 数据存储
本地 JSON 文件:`~/.qclaw/workspace/expenses.json`
- 无需账户
- 无需联网
- 随时可导出
## 示例输出
```
💰 本月支出报告(2026年4月)
━━━━━━━━━━━━━━━━━━━━━━
总支出:2,847.5 元
预算:3,000 元(已用 94.9%)
📊 分类明细:
🍽️ 餐饮 1,200 (42%)
🚗 交通 580 (20%)
🛒 购物 450 (16%)
☕ 饮品 280 (10%)
🎬 娱乐 337.5 (12%)
📈 对比上月(3月):
餐饮 +15%(外卖增加)
交通 -8%(少出差)
⚠️ 注意:饮品支出增长较快
```
FILE:scripts/expense.py
#!/usr/bin/env python3
"""
中文个人记账助手 - cn-expense-tracker
随手记、查统计、设预算、看趋势
用法:
python3 expense.py --add "午餐" 25 --category 餐饮
python3 expense.py --month
python3 expense.py --budget 3000
python3 expense.py --compare
python3 expense.py --interactive
"""
import json
import os
import sys
import csv
import argparse
import re
from datetime import datetime, timedelta
from collections import defaultdict
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
WORKSPACE = os.path.expanduser("~/.qclaw/workspace")
DATA_FILE = os.path.join(WORKSPACE, "expenses.json")
BUDGET_FILE = os.path.join(WORKSPACE, "budget.json")
CATEGORIES = {
"餐饮": "🍽️", "交通": "🚗", "购物": "🛒", "娱乐": "🎬",
"居住": "🏠", "医疗": "🏥", "教育": "📚", "通讯": "📱",
"服饰": "👔", "护肤": "💄", "饮品": "☕", "咖啡": "☕",
"零食": "🍪", "水果": "🍎", "健身": "💪", "旅行": "✈️",
"宠物": "🐱", "人情": "🎁", "其他": "📌"
}
CATEGORY_ALIASES = {
"吃": "餐饮", "外卖": "餐饮", "饭": "餐饮", "午饭": "餐饮",
"车": "交通", "打车": "交通", "地铁": "交通", "公交": "交通",
"买": "购物", "衣服": "服饰", "包包": "购物",
"玩": "娱乐", "电影": "娱乐", "游戏": "娱乐",
"住": "居住", "房租": "居住", "水电": "居住",
"医": "医疗", "药": "医疗",
"学": "教育", "书": "教育", "课": "教育",
"话": "通讯", "网": "通讯",
"化": "护肤", "美": "护肤",
}
# ========== 数据操作 ==========
def load_expenses():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return []
def save_expenses(expenses):
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(expenses, f, ensure_ascii=False, indent=2)
def load_budget():
if os.path.exists(BUDGET_FILE):
with open(BUDGET_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save_budget(budget):
with open(BUDGET_FILE, "w", encoding="utf-8") as f:
json.dump(budget, f, ensure_ascii=False, indent=2)
def normalize_category(cat):
"""规范化类别名称"""
if cat in CATEGORIES:
return cat
if cat in CATEGORY_ALIASES:
return CATEGORY_ALIASES[cat]
# 模糊匹配
for key, alias in CATEGORY_ALIASES.items():
if key in cat:
return alias
return "其他"
def add_expense(desc, amount, category, note=""):
"""添加一笔支出"""
expenses = load_expenses()
expense = {
"id": len(expenses) + 1,
"date": datetime.now().strftime("%Y-%m-%d"),
"datetime": datetime.now().isoformat(),
"desc": desc,
"amount": float(amount),
"category": normalize_category(category),
"note": note
}
expenses.append(expense)
save_expenses(expenses)
return expense
def delete_expense(expense_id):
"""删除一笔支出"""
expenses = load_expenses()
before = len(expenses)
expenses = [e for e in expenses if e["id"] != int(expense_id)]
if len(expenses) == before:
return False
# 重新编号
for i, e in enumerate(expenses):
e["id"] = i + 1
save_expenses(expenses)
return True
# ========== 统计分析 ==========
def get_month_range(year, month):
"""获取月份起止日期"""
start = datetime(year, month, 1)
if month == 12:
end = datetime(year + 1, 1, 1) - timedelta(seconds=1)
else:
end = datetime(year, month + 1, 1) - timedelta(seconds=1)
return start, end
def month_expenses(year, month):
"""获取指定月份的支出"""
expenses = load_expenses()
start, end = get_month_range(year, month)
return [
e for e in expenses
if start <= datetime.fromisoformat(e["datetime"]) <= end
]
def month_report(year, month):
"""生成本月报告"""
expenses = month_expenses(year, month)
if not expenses:
print("\n📭 暂无记录")
return
total = sum(e["amount"] for e in expenses)
budget = load_budget()
budget_amount = budget.get(f"{year}-{month:02d}", 0)
by_category = defaultdict(float)
for e in expenses:
by_category[e["category"]] += e["amount"]
sorted_cat = sorted(by_category.items(), key=lambda x: x[1], reverse=True)
y, m = year, month
if m == 1:
prev_y, prev_m = y - 1, 12
else:
prev_y, prev_m = y, m - 1
prev_expenses = month_expenses(prev_y, prev_m)
prev_total = sum(e["amount"] for e in prev_expenses)
prev_by_cat = defaultdict(float)
for e in prev_expenses:
prev_by_cat[e["category"]] += e["amount"]
# 输出报告
print("\n💰 {}年{}月支出报告".format(y, m))
print("=" * 40)
print("总支出:{:,.1f} 元".format(total))
if budget_amount:
pct = total / budget_amount * 100
bar = "█" * int(pct / 5) + "░" * (20 - int(pct / 5))
if pct > 100:
status = "🔴 超支!"
elif pct > 80:
status = "🟡 警戒"
else:
status = "🟢 正常"
print("预算:{:,} 元({}{:.1f}%){}".format(
budget_amount, bar, min(pct, 100), status))
print()
print("📊 分类明细:")
for cat, amount in sorted_cat:
pct = amount / total * 100
emoji = CATEGORIES.get(cat, "📌")
bar = "▓" * int(pct / 5)
print(" {} {} {:>8} ({:>5.1f}%) {}".format(
emoji, cat.ljust(4), "{:.1f}".format(amount), pct, bar))
# 月度对比
if prev_total > 0:
print("\n📈 对比上月({}年{}月):".format(prev_y, prev_m))
change = (total - prev_total) / prev_total * 100
emoji = "📈" if change > 0 else "📉"
print(" {} 总支出 {:+.1f}%({:,.1f} → {:,.1f})".format(
emoji, change, prev_total, total))
for cat, amount in sorted_cat[:5]:
prev_amt = prev_by_cat.get(cat, 0)
if prev_amt > 0:
cat_change = (amount - prev_amt) / prev_amt * 100
emoji = "+" if cat_change > 0 else ""
print(" {} {} {:+.0f}%".format(
CATEGORIES.get(cat, "📌"), cat, cat_change))
elif amount > 0:
print(" 🆕 {} {} 新增支出 {:.1f}".format(
CATEGORIES.get(cat, "📌"), cat, amount))
print()
def compare_months(months=6):
"""月度趋势对比"""
expenses = load_expenses()
if not expenses:
print("\n📭 暂无记录")
return
# 获取最近N个月
by_month = defaultdict(float)
for e in expenses:
dt = datetime.fromisoformat(e["datetime"])
key = (dt.year, dt.month)
by_month[key] += e["amount"]
sorted_months = sorted(by_month.keys(), reverse=True)[:months]
print("\n📈 支出趋势(近{}个月)".format(len(sorted_months)))
print("=" * 44)
print(" 月份 支出 变化")
print("-" * 44)
prev = None
for y, m in sorted(sorted_months):
amount = by_month[(y, m)]
if prev:
change = (amount - prev) / prev * 100
arrow = "↑" if change > 0 else "↓"
print(" {}-{:02d} {:>9,.1f} {} {:.0f}%".format(
y, m, amount, arrow, abs(change)))
else:
print(" {}-{:02d} {:>9,.1f} ──".format(y, m, amount))
prev = amount
print()
def list_expenses(limit=20, category=None):
"""列出最近记录"""
expenses = load_expenses()
if category:
expenses = [e for e in expenses if normalize_category(category) == e["category"]]
if not expenses:
print("\n📭 暂无记录")
return
expenses.sort(key=lambda x: x["datetime"], reverse=True)
print("\n📋 最近{}笔记录".format(min(limit, len(expenses))))
print("-" * 60)
for e in expenses[:limit]:
emoji = CATEGORIES.get(e["category"], "📌")
note_str = " | {}".format(e["note"]) if e.get("note") else ""
print(" {}. {} {} {:>8,.1f}元 {}{}".format(
e["id"], e["date"], emoji, e["amount"], e["desc"], note_str))
print()
def export_csv():
"""导出CSV"""
expenses = load_expenses()
if not expenses:
print("\n📭 暂无记录可导出")
return
csv_file = os.path.join(WORKSPACE, "expenses_export.csv")
with open(csv_file, "w", encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
writer.writerow(["ID", "日期", "时间", "描述", "金额", "类别", "备注"])
for e in expenses:
writer.writerow([
e["id"], e["date"], e["datetime"].split("T")[1][:8],
e["desc"], e["amount"], e["category"], e.get("note", "")
])
print("\n✅ 已导出 {} 笔记录 → {}".format(len(expenses), csv_file))
print(" 路径:{}".format(csv_file))
print(" 可直接用 Excel 打开")
def set_budget(amount, month=None):
"""设置月度预算"""
budget = load_budget()
now = datetime.now()
key = month or f"{now.year}-{now.month:02d}"
budget[str(key)] = float(amount)
save_budget(budget)
print("\n✅ 预算已设置为 {:,.0f} 元({})".format(
float(amount), key))
def interactive():
"""交互记账模式"""
print("\n💰 记账助手(输入 q 退出)")
print(" 格式:金额 类别 [描述] [备注]")
print(" 例如:25 餐饮 午餐 外卖")
print(" 快捷:25 吃 午饭")
while True:
try:
line = input("\n> ").strip()
if not line:
continue
if line.lower() == "q":
break
parts = line.split()
if len(parts) < 2:
print(" ⚠️ 格式:金额 类别 [描述]")
continue
amount = float(parts[0])
cat = normalize_category(parts[1])
desc = parts[2] if len(parts) > 2 else cat
note = parts[3] if len(parts) > 3 else ""
e = add_expense(desc, amount, cat, note)
emoji = CATEGORIES.get(cat, "📌")
print(" ✅ {} {} {:,.1f}元".format(e["date"], emoji, e["amount"]))
except (ValueError, EOFError, KeyboardInterrupt):
print("\n\n✅ 记账完成!")
break
# ========== 主入口 ==========
def main():
parser = argparse.ArgumentParser(description="💰 中文个人记账助手")
parser.add_argument("--add", nargs="+",
help="记一笔:金额 描述 [类别] [备注]")
parser.add_argument("--category", "-c",
help="指定类别")
parser.add_argument("--note",
help="备注")
parser.add_argument("--month", action="store_true",
help="本月报告")
parser.add_argument("--budget", type=float,
help="设置月度预算")
parser.add_argument("--compare", action="store_true",
help="月度趋势对比")
parser.add_argument("--list", action="store_true",
help="列出最近记录")
parser.add_argument("--delete", type=int, metavar="ID",
help="删除指定ID的记录")
parser.add_argument("--export", action="store_true",
help="导出CSV")
parser.add_argument("--interactive", "-i", action="store_true",
help="交互记账模式")
parser.add_argument("--limit", type=int, default=20,
help="列出记录数量(默认20)")
parser.add_argument("--year", type=int,
help="指定年份(默认今年)")
parser.add_argument("--month-num", type=int, metavar="M",
help="指定月份(默认当月)")
args = parser.parse_args()
now = datetime.now()
year = args.year or now.year
month = args.month_num or now.month
if args.interactive:
interactive()
elif args.add:
try:
amount = float(args.add[0])
desc = args.add[1] if len(args.add) > 1 else "支出"
cat = args.category or (args.add[2] if len(args.add) > 2 else "其他")
note = args.note or (args.add[3] if len(args.add) > 3 else "")
e = add_expense(desc, amount, cat, note)
emoji = CATEGORIES.get(e["category"], "📌")
print("\n✅ {} {} {:,.1f}元({})".format(
e["date"], emoji, e["amount"], e["category"]))
except ValueError:
print("\n⚠️ 金额必须是数字")
elif args.delete:
if delete_expense(args.delete):
print("\n✅ 记录 {} 已删除".format(args.delete))
else:
print("\n⚠️ 未找到记录 {}".format(args.delete))
elif args.month:
month_report(year, month)
elif args.budget:
set_budget(args.budget)
elif args.compare:
compare_months()
elif args.list:
list_expenses(args.limit, args.category)
elif args.export:
export_csv()
else:
# 默认显示当月报告
month_report(year, month)
if __name__ == "__main__":
main()
任务自动续接技能。检测未完成的任务并提醒 Agent 继续推进。 Keywords: 继续, 没完成, 还没做完, 继续做, 继续推进.
--- name: task-auto-continue description: | 任务自动续接技能。检测未完成的任务并提醒 Agent 继续推进。 Keywords: 继续, 没完成, 还没做完, 继续做, 继续推进. --- # 任务自动续接 检测未完成的任务,提醒 Agent 继续推进,避免任务中断。 ## 核心能力 1. **任务检测** — 读取 in_progress.md 查找未完成任务 2. **进度判断** — 分析任务状态(进行中/待决策/已完成) 3. **优先排序** — 按优先级排列待办事项 4. **自动提醒** — 在 Agent 启动时自动检查并提醒 ## 使用方式 Agent 启动时自动检查 in_progress.md,发现未完成任务则提醒: ``` 检查 in_progress.md 中的任务状态 如果有待决策任务 → 提醒继续 如果有进行中任务 → 继续推进 ``` ## 任务状态标记 | 标记 | 含义 | |------|------| | 🔴 | 紧急,需立即处理 | | 🟡 | 进行中 | | 🟢 | 可自主推进 | | ✅ | 已完成 | | ⏳ | 等待中 | ## 注意事项 - 仅读取 workspace 下的 in_progress.md - 不修改任何文件,仅提供状态报告 - 配合 AGENTS.md 中的「任务没完不能停」规则使用
AI副业真伪鉴别工具。帮你识别AI副业项目/课程/机会到底是真是假。 当用户说"AI副业靠谱吗"、"这个课值不值"、"AI赚钱是不是骗人的"、"帮我看看这个副业"时触发。 Keywords: AI副业, 副业, 赚钱, 割韭菜, 骗局, 识别骗术, 副业真伪, 副业避坑.
---
name: ai-side-hustle-checker
description: |
AI副业真伪鉴别工具。帮你识别AI副业项目/课程/机会到底是真是假。
当用户说"AI副业靠谱吗"、"这个课值不值"、"AI赚钱是不是骗人的"、"帮我看看这个副业"时触发。
Keywords: AI副业, 副业, 赚钱, 割韭菜, 骗局, 识别骗术, 副业真伪, 副业避坑.
metadata: {"openclaw": {"emoji": "🔎"}}
---
# AI副业真伪鉴别器
帮你识别AI副业项目/课程/机会的真实性,避免被割韭菜。
## 核心功能
### 1. 副业机会鉴别
分析副业项目的真实可行性,给出可信度评分
### 2. 常见骗局识别
检测割韭菜特征:夸大收益、贩卖焦虑、课程套路
### 3. 机会评估
分析具体项目的能力要求、资源投入、预期收益
### 4. 避坑指南
识别高风险信号,给出替代建议
## 使用方式
```bash
# 交互式鉴别
python3 scripts/hustle_checker.py
# 快速评估
python3 scripts/hustle_checker.py --project "用AI写小红书月入过万"
# 课程鉴别
python3 scripts/hustle_checker.py --course "某AI副业训练营"
# 批量检测关键词
python3 scripts/hustle_checker.py --scan "月入十万 AI副业"
```
## 鉴别逻辑
### 可信度评分(0-100)
| 分值 | 评级 | 说明 |
|------|------|------|
| 80-100 | 🟢 真实机会 | 有真实案例,需付出努力 |
| 60-79 | 🟡 需谨慎 | 有可行空间,但存在夸大 |
| 40-59 | 🟠 高风险 | 多数是坑,少数有真实价值 |
| 0-39 | 🔴 基本骗局 | 典型的割韭菜项目 |
### 典型骗术特征
🔴 **割韭菜红灯**
- "零基础、月入过万" 同时出现
- "普通人也能做" + 高收益承诺
- 只展示成功案例,无失败率
- 需要先买课/买工具/发展下线
- 强调"限量"、"即将涨价"
🟢 **真实机会特征**
- 有明确的能力要求说明
- 客观描述收益周期(通常3-6个月起)
- 有具体执行步骤和工具清单
- 愿意公开真实数据和案例
- 有退款/保障机制
## 输出示例
```
🔎 AI副业真伪鉴别报告
━━━━━━━━━━━━━━━━━━━━
项目:用AI写小红书月入过万
📊 可信度评分:52/100(🟠 高风险)
⚠️ 风险点:
🔴 收益承诺过高("过万"无依据)
🔴 强调"普通人"但无门槛要求
🟡 变现路径不清晰
🟡 竞争激烈程度未提及
✅ 真实成分:
🟢 AI辅助内容生产逻辑可行
🟢 小红书平台真实有流量
💡 修正后评估:
新手用AI辅助做小红书,月入1000-3000元
是可能的,但需要3-6个月积累期
```
FILE:README.md
# 🔎 ai-side-hustle-checker
> AI副业真伪鉴别器。帮你识别AI副业项目/课程是否靠谱。
## 功能
- **可信度评分** — 0-100分,基于规则引擎分析
- **红灯检测** — 识别典型割韭菜话术(夸大收益、焦虑营销等)
- **绿灯检测** — 识别真实机会特征
- **赛道分析** — 覆盖小红书、AI写作、AI绘图、B站等20+副业类型
- **避坑法则** — 快速判断标准
## 快速开始
```bash
# 交互模式
python3 scripts/hustle_checker.py -i
# 快速评估
python3 scripts/hustle_checker.py -p "AI写小红书月入过万"
python3 scripts/hustle_checker.py -c "某AI副业训练营"
```
## 评分说明
| 分数 | 评级 | 含义 |
|------|------|------|
| 80-100 | 🟢 真实机会 | 可行,需付出努力 |
| 60-79 | 🟡 需谨慎 | 有机会但存在夸大 |
| 40-59 | 🟠 高风险 | 多数是坑 |
| 0-39 | 🔴 基本骗局 | 典型割韭菜 |
FILE:scripts/hustle_checker.py
#!/usr/bin/env python3
"""
AI副业真伪鉴别器 - ai-side-hustle-checker
基于规则引擎分析副业项目的可信度
用法:
python3 hustle_checker.py --interactive
python3 hustle_checker.py --project "用AI写小红书月入过万"
python3 hustle_checker.py --course "AI副业训练营"
"""
import json
import re
import sys
import argparse
from datetime import datetime
# ========== 骗术特征库 ==========
RED_FLAGS = [
# 收益类
(r"月\s*入\s*\d+\s*万", "收益承诺过高,无具体数据支撑"),
(r"日\s*赚\s*\d+", "日赚承诺典型收割套路"),
(r"躺\s*赚", "躺赚承诺绝对虚假"),
(r"被\s*动\s*收\s*入?\s*\d+", "被动收入夸大,周期不明"),
(r"一\s*定\s*能\s*(赚|成功)", "绝对化承诺,违反广告法"),
(r"立\s*刻?\s*(变现|赚钱|暴富)", "即时变现承诺不可信"),
(r"只\s*需?\s*每天\s*\d+\s*分", "极短时间承诺高收益"),
(r"轻\s*松\s*(赚|日)", "轻松赚钱是典型话术"),
# 门槛类
(r"零\s*基\s*础", "零基础+高收益=典型骗局"),
(r"不\s*需?\s*任\s*何\s*经\s*验", "无门槛+高回报=收割"),
(r"普\s*通\s*人", "普通人也能=收割话术"),
(r"小\s*白", "小白也能=收割话术"),
(r"不\s*用\s*(学习|学)", "无需学习=虚假宣传"),
# 焦虑类
(r"最\s*后\s*(一?\s*次|机会|名额)", "限量紧迫感=催单套路"),
(r"即\s*将\s*涨\s*价", "涨价压迫=催单套路"),
(r"别\s*人\s*(已经|都)\s*(在|月入)", "社交证明造假"),
(r"错\s*过\s*就", "错过恐慌=催单套路"),
(r"月\s*薪\s*翻\s*\d", "薪资翻倍=虚假焦虑"),
# 套路类
(r"先\s*付\s*(定金|学费|费用)", "预付款是风险信号"),
(r"限\s*量", "限量=催单套路"),
(r"内\s*部\s*(渠|名|消)", "内部渠道=伪造可信度"),
(r"秘\s*密", "神秘感=制造信息差假象"),
(r"老\s*板\s*(不\s*知|不会)", "贬低他人抬高自己=套路"),
(r"别\s*人\s*告\s*诉\s*你", "隐藏信息=制造焦虑"),
# 课程类
(r"训\s*练\s*营", "训练营需警惕教学质量"),
(r"陪\s*跑", "陪跑服务难量化"),
(r"收\s*益\s*分\s*成", "分成模式需警惕跑路风险"),
(r"拉\s*新\s*?(人|下线)", "拉新分成=传销特征"),
(r"裂\s*变", "裂变=传销特征"),
(r"代\s*理", "代理模式风险高"),
]
# 绿色信号
GREEN_FLAGS = [
(r"需\s*要\s*\d+\s*个?\s*月", "明确收益周期"),
(r"\d+\s*-\s*\d+\s*个?\s*月", "收益周期合理"),
(r"需\s*要\s*(学习|投入|努力)", "承认需要付出"),
(r"不\s*保\s*证", "不保证收益=诚信"),
(r"具\s*体\s*步\s*骤", "有具体步骤=可验证"),
(r"真\s*实\s*案\s*例", "真实案例=有据可查"),
(r"有\n?效\s*退", "退款保障=风险降低"),
(r"第\s*一\s*次", "首次尝试=低风险"),
(r"从\s*\d+\s*开\s*始", "从低开始=合理预期"),
(r"不\s*是\s*(躺|快|暴)", "明确说明不是轻松赚钱"),
]
# 具体副业类型评估
PROJECT_TYPES = {
"小红书": {
"base_score": 65,
"realistic": "月入500-5000(需3-6个月积累)",
"keys": "内容质量+持续更新+个人特色",
"risks": "竞争激烈,同质化严重"
},
"AI写作": {
"base_score": 60,
"realistic": "月入500-3000(需建立客户关系)",
"keys": "接单能力+AI辅助效率+专业领域",
"risks": "价格战严重,客户难找"
},
"AI课程": {
"base_score": 30,
"realistic": "卖课本身可赚钱,但需真实实力",
"keys": "有真实受众+专业积累+交付能力",
"risks": "课程市场已饱和,口碑难建立"
},
"AI工具推荐": {
"base_score": 40,
"realistic": "联盟佣金可赚,但需流量基础",
"keys": "有垂直流量+真实推荐+信任背书",
"risks": "竞争激烈,真实推荐难维持"
},
"AI代写服务": {
"base_score": 70,
"realistic": "月入1000-8000(需稳定客户)",
"keys": "专业领域+交付速度+沟通能力",
"risks": "价格透明化,单价下降"
},
"AI数字人": {
"base_score": 35,
"realistic": "高客单价但获客难",
"keys": "有企业客户+视频制作能力",
"risks": "技术门槛高,竞争已激烈"
},
"AI心理咨询": {
"base_score": 50,
"realistic": "可做但需专业背景",
"keys": "心理学背景+AI辅助+伦理边界",
"risks": "伦理风险,资质要求"
},
"AI数据分析": {
"base_score": 75,
"realistic": "月入3000-20000(有技术门槛)",
"keys": "数据分析能力+行业知识+可视化",
"risks": "需持续学习新技术"
},
"AI绘图": {
"base_score": 55,
"realistic": "月入1000-8000(需风格建立)",
"keys": "审美能力+AI工具熟练+持续出新",
"risks": "AI绘图泛滥,价格下降"
},
"微信公众号": {
"base_score": 60,
"realistic": "广告+打赏变现需1-2年积累",
"keys": "垂直领域+持续输出+私域运营",
"risks": "公众号红利期已过"
},
"知乎": {
"base_score": 65,
"realistic": "赞赏+付费咨询+致知计划",
"keys": "专业深度+持续输出+粉丝积累",
"risks": "冷启动难,收益周期长"
},
"抖店/电商": {
"base_score": 45,
"realistic": "有真实机会但失败率高",
"keys": "选品能力+资金+供应链",
"risks": "资金要求高,失败率高"
},
"B站UP主": {
"base_score": 60,
"realistic": "平台收益+接广告,需1年以上",
"keys": "内容创意+持续更新+粉丝互动",
"risks": "竞争激烈,收益周期长"
},
}
# ========== 分析函数 ==========
def detect_type(text):
"""检测副业类型"""
text_lower = text.lower()
for ptype, info in PROJECT_TYPES.items():
if ptype in text or ptype[:2] in text:
return ptype, info
return None, None
def check_red_flags(text):
"""检测红灯信号"""
flags = []
for pattern, description in RED_FLAGS:
if re.search(pattern, text):
flags.append(description)
return flags
def check_green_flags(text):
"""检测绿灯信号"""
flags = []
for pattern, description in GREEN_FLAGS:
if re.search(pattern, text):
flags.append(description)
return flags
def estimate_score(text, red_flags, green_flags):
"""估算可信度评分"""
base = 50
# 红灯扣分
for _ in red_flags:
base -= 10
# 绿灯加分
for _ in green_flags:
base += 7
# 副业类型加成
ptype, info = detect_type(text)
if info:
base += (info["base_score"] - 50) * 0.3 # 部分参考类型基准
return max(0, min(100, base))
def get_rating(score):
"""获取评级"""
if score >= 80:
return "🟢 真实机会", "这套模式是可行的,但需要付出真实努力,不保证快速暴富。"
elif score >= 60:
return "🟡 需谨慎", "有机会,但存在夸大成分。认真评估自身能力后再决定。"
elif score >= 40:
return "🟠 高风险", "多数是坑,少数有真实价值。建议谨慎或换方向。"
else:
return "🔴 基本骗局", "典型的割韭菜项目,建议远离或极度谨慎。"
def analyze(text):
"""完整分析"""
text_lower = text.lower()
# 1. 检测副业类型
ptype, ptype_info = detect_type(text)
# 2. 检测红灯绿灯
red_flags = check_red_flags(text)
green_flags = check_green_flags(text)
# 3. 计算评分
score = estimate_score(text, red_flags, green_flags)
rating, rating_desc = get_rating(score)
# 4. 识别具体问题
issues = []
if any("收益承诺" in f or "过高" in f for f in red_flags):
issues.append("收益承诺过高,与实际情况不符")
if any("普通" in f or "零基" in f for f in red_flags):
issues.append("强调普通人也能做,掩盖了真实门槛")
if any("限量" in f or "涨价" in f or "最后" in f for f in red_flags):
issues.append("限时压迫催单,是典型销售套路")
if any("拉新" in f or "代理" in f or "裂变" in f for f in red_flags):
issues.append("拉新/代理模式有传销风险")
if "训练营" in text:
issues.append("训练营教学质量参差不齐,需谨慎评估")
# 5. 真实成分
real_parts = []
if ptype_info:
real_parts.append("「{}」赛道本身是存在的".format(ptype))
real_parts.append("合理预期:{}".format(ptype_info["realistic"]))
real_parts.append("关键成功因素:{}".format(ptype_info["keys"]))
if green_flags:
real_parts.append("该项目包含部分合理说明:{}".format(";".join(green_flags[:3])))
# 6. 修正后评估
fixed_estimate = ""
if ptype_info:
fixed_estimate = "基于「{}」赛道,修正后预期:{}".format(
ptype, ptype_info["realistic"])
else:
fixed_estimate = "建议:以月入{}作为合理目标,而不是宣传中的夸张数字".format(
"1000-3000" if score > 40 else "0(建议远离)")
return {
"score": score,
"rating": rating,
"rating_desc": rating_desc,
"red_flags": red_flags,
"green_flags": green_flags,
"project_type": ptype,
"type_info": ptype_info,
"issues": issues,
"real_parts": real_parts,
"fixed_estimate": fixed_estimate,
"risks": ptype_info["risks"] if ptype_info else "请参考上方分析"
}
def generate_report(text, result):
"""生成鉴别报告"""
lines = []
lines.append("")
lines.append("🔎 AI副业真伪鉴别报告")
lines.append("━━━━━━━━━━━━━━━━━━━━")
lines.append("")
lines.append("📋 项目:「{}」".format(text[:50] + ("..." if len(text) > 50 else "")))
lines.append("")
lines.append("📊 可信度评分:{}/100({})".format(result["score"], result["rating"]))
lines.append("💬 {}".format(result["rating_desc"]))
if result["red_flags"]:
lines.append("")
lines.append("⚠️ 风险信号(共{}个):".format(len(result["red_flags"])))
seen = set()
for f in result["red_flags"][:5]:
if f not in seen:
lines.append(" 🔴 {}".format(f))
seen.add(f)
if result["issues"]:
lines.append("")
lines.append("🎯 具体问题:")
for issue in result["issues"]:
lines.append(" ⚠️ {}".format(issue))
if result["green_flags"]:
lines.append("")
lines.append("✅ 真实成分:")
for f in result["green_flags"][:3]:
lines.append(" 🟢 {}".format(f))
if result["real_parts"]:
lines.append("")
lines.append("📌 赛道分析:")
for p in result["real_parts"][:4]:
lines.append(" 📍 {}".format(p))
lines.append("")
lines.append("💡 修正后评估:")
lines.append(" {}".format(result["fixed_estimate"]))
if result["risks"]:
lines.append("")
lines.append("⚠️ 风险提示:")
lines.append(" {}".format(result["risks"]))
lines.append("")
lines.append("━━━━━━━━━━━━━━━━━━━━")
lines.append("⚡ 快速判断法则:")
lines.append(" • 月入过万+零基础 = 骗局")
lines.append(" • 有具体步骤+有门槛 = 可信")
lines.append(" • 限时优惠+催单 = 收割")
lines.append(" • 需要拉人 = 远离")
return "\n".join(lines)
def interactive_mode():
"""交互模式"""
print("🔎 AI副业真伪鉴别器".center(50, "="))
print("输入AI副业项目描述,我来帮你分析是否可信")
print("输入 q 退出\n")
while True:
try:
text = input("💬 请描述项目(粘贴标题/描述):\n> ").strip()
if not text:
continue
if text.lower() in ["q", "quit", "exit"]:
print("\n✅ 再见!记住:天下没有免费的午餐。\n")
break
result = analyze(text)
print(generate_report(text, result))
print()
except (EOFError, KeyboardInterrupt):
print("\n\n✅ 再见!\n")
break
# ========== 主入口 ==========
def main():
parser = argparse.ArgumentParser(description="🔎 AI副业真伪鉴别器")
parser.add_argument("--interactive", "-i", action="store_true",
help="交互模式")
parser.add_argument("--project", "-p",
help="快速评估项目")
parser.add_argument("--course", "-c",
help="鉴别课程")
parser.add_argument("--scan", "-s",
help="扫描关键词")
parser.add_argument("--json", action="store_true",
help="JSON格式输出")
args = parser.parse_args()
if args.interactive:
interactive_mode()
return
text = args.project or args.course or args.scan or ""
if not text:
print("🔎 AI副业真伪鉴别器")
print("用法:")
print(" python3 hustle_checker.py --interactive # 交互模式")
print(" python3 hustle_checker.py -p 'AI写小红书月入过万'")
print(" python3 hustle_checker.py -c 'AI副业训练营'")
print(" python3 hustle_checker.py -s '月入十万 AI副业'")
sys.exit(0)
result = analyze(text)
if args.json:
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print(generate_report(text, result))
if __name__ == "__main__":
main()
知乎内容搜索与分析工具。搜索知乎热榜话题、分析高价值问题。 当用户说"搜一下知乎"、"知乎有什么"、"知乎选题"、"搜知乎热榜"时触发。 Keywords: 知乎, zhihu, 选题, 知乎热榜, 知乎搜索.
---
name: zhihu-topic-finder
description: |
知乎内容搜索与分析工具。搜索知乎热榜话题、分析高价值问题。
当用户说"搜一下知乎"、"知乎有什么"、"知乎选题"、"搜知乎热榜"时触发。
Keywords: 知乎, zhihu, 选题, 知乎热榜, 知乎搜索.
---
# 知乎选题分析器
搜索知乎热榜话题,分析高价值问题,帮助内容创作者精准选题。
## 核心能力
1. **知乎热榜抓取** — 获取知乎实时热榜Top50
2. **问题分析** — 评估问题的回答数、浏览量、关注数
3. **机会评分** — 根据热度与竞争度评估内容机会
4. **竞品追踪** — 分析已有回答的质量与角度
## 使用方式
用户说"搜一下知乎"或"知乎有什么热点"时:
```
python3 scripts/zhihu_topics.py
```
## 注意事项
- 使用公开API获取热榜数据,无需登录
- 评分基于热度与竞争度的综合评估
- 建议选择评分80+的问题进行回答
- 回答角度建议避开已有高赞回答的相同观点
FILE:scripts/zhihu_topics.py
#!/usr/bin/env python3
"""知乎热榜抓取与分析"""
import json, sys
try:
import requests
except ImportError:
print("请安装 requests: pip3 install requests")
sys.exit(1)
def fetch_zhihu_hotlist():
"""获取知乎热榜"""
url = "https://www.zhihu.com/api/v3/feed/topstory/hot-lists/total?limit=50"
headers = {"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"}
try:
r = requests.get(url, headers=headers, timeout=10, verify=True)
data = r.json()
except Exception:
# 降级:直接解析网页
url2 = "https://www.zhihu.com/hot"
r2 = requests.get(url2, headers=headers, timeout=10, verify=True)
import re
titles = re.findall(rtitle:(.*?), r2.text[:50000])
results = []
for i, t in enumerate(titles[:50]):
results.append({"title": t, "heat": 0, "index": i+1})
return results
results = []
for item in data.get("data", []):
target = item.get("target", {})
results.append({
"title": target.get("title", ""),
"excerpt": target.get("excerpt", ""),
"heat": item.get("detail_text", "").replace("万热度", ""),
"index": len(results) + 1
})
return results
def analyze_topics(topics):
"""分析话题并评分"""
for t in topics:
heat = float(t.get("heat", 0) or 0)
title = t.get("title", "")
# 简单评分:热度 + 长标题加分
score = min(100, int(heat / 10000) * 20 + (10 if len(title) > 15 else 0))
t["score"] = score
topics.sort(key=lambda x: x.get("score", 0), reverse=True)
return topics
if __name__ == "__main__":
topics = fetch_zhihu_hotlist()
analyzed = analyze_topics(topics)
print(json.dumps(analyzed[:20], ensure_ascii=False, indent=2))
FILE:skill.json
{"name":"zhihu-topic-finder","version":"1.0.0","description":"知乎选题分析器"}