@clawhub-freedompixels-bccea021be
获取知乎、微博、百度、B站实时热搜榜单。
---
name: cn-trends-ai
description: "获取知乎、微博、百度、B站实时热搜榜单。"
---
# 中文热搜聚合
获取4大平台实时热搜。
## 功能
- 知乎热榜
- 微博热搜
- 百度热搜
- B站排行榜
- 统一格式输出、关键词过滤
## 用法
python3 scripts/fetch_all_trends.py --all
python3 scripts/fetch_all_trends.py --platform zhihu --limit 10
python3 scripts/fetch_all_trends.py --all --keyword "AI" --json
## 依赖
- Python 3.7+
- certifi(SSL证书验证)
## 数据来源
所有数据来自公开接口,无需登录,无需API Key。
FILE:fetch_all_trends.py
#!/usr/bin/env python3
"""
多平台热点聚合抓取引擎
支持:知乎热榜、微博热搜、百度热搜、B站排行榜
用法:
python3 fetch_trends.py --all --limit 20
python3 fetch_trends.py --platform zhihu --limit 10
python3 fetch_trends.py --all --keyword "AI" --json
"""
import json
import os
import sys
import ssl
import argparse
import time
import certifi
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# SSL 配置:优先标准验证,失败才降级
# macOS Python 需要显式加载 certifi 根证书
_SAFE_SSL_CTX = ssl.create_default_context()
_SAFE_SSL_CTX.load_verify_locations(certifi.where())
def _urlopen(req, timeout=10):
"""标准SSL验证(certifi根证书)"""
return urlopen(req, timeout=timeout, context=_SAFE_SSL_CTX)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
}
# ========== 知乎热榜 ==========
def fetch_zhihu(limit=20):
"""抓取知乎热榜"""
url = "https://api.zhihu.com/topstory/hot-lists/total?limit={}".format(limit)
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 知乎热榜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
for item in data.get("data", []):
target = item.get("target", {})
title = target.get("title", "").strip()
if not title:
continue
# 提取热度数值
detail = item.get("detail_text", "")
heat_num = 0
if "万" in detail:
try:
heat_num = float(detail.replace("万热度", "").replace("万", "").strip()) * 10000
except ValueError:
pass
elif "热度" in detail:
try:
heat_num = int(detail.replace("热度", "").strip())
except ValueError:
pass
results.append({
"title": title,
"platform": "知乎",
"heat": int(heat_num),
"heat_display": detail,
"url": "https://www.zhihu.com/question/{}".format(target.get("id", "")),
"category": "",
"excerpt": (target.get("excerpt", "") or "")[:100]
})
return results
# ========== 微博热搜 ==========
def fetch_weibo(limit=20):
"""抓取微博热搜"""
url = "https://weibo.com/ajax/side/hotSearch"
headers = {**HEADERS, "Referer": "https://weibo.com/"}
try:
req = Request(url, headers=headers)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 微博热搜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
realtime = data.get("data", {}).get("realtime", [])
for item in realtime[:limit]:
note = item.get("note", "").strip()
if not note:
continue
note = item.get("note", "").strip()
if not note:
continue
label = item.get("label_name", "")
if label:
note = "[{}]{}".format(label, note)
results.append({
"title": note,
"platform": "微博",
"heat": int(item.get("num", 0)),
"heat_display": str(item.get("num", 0)),
"url": "https://s.weibo.com/weibo?q=%23{}%23".format(
item.get("word", item.get("note", ""))
),
"category": "",
"excerpt": ""
})
return results
# ========== 百度热搜 ==========
def fetch_baidu(limit=20):
"""抓取百度热搜"""
url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 百度热搜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
cards = data.get("data", {}).get("cards", [])
if cards:
content = cards[0].get("content", [])
for item in content[:limit]:
query = item.get("query", "").strip()
if not query:
continue
desc = item.get("desc", "")
heat_num = 0
try:
heat_num = int(item.get("hotScore", 0))
except (ValueError, TypeError):
pass
results.append({
"title": query,
"platform": "百度",
"heat": heat_num,
"heat_display": desc[:30] if desc else "",
"url": "https://www.baidu.com/s?wd={}".format(query),
"category": item.get("tag", ""),
"excerpt": desc[:100] if desc else ""
})
return results
# ========== B站排行榜 ==========
def fetch_bilibili(limit=20):
"""抓取B站排行榜"""
url = "https://api.bilibili.com/x/web-interface/ranking/v2?rid=0&type=all"
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ B站排行榜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
items = data.get("data", {}).get("list", [])
for item in items[:limit]:
title = item.get("title", "").strip()
if not title:
continue
stat = item.get("stat", {})
results.append({
"title": title,
"platform": "B站",
"heat": int(stat.get("view", 0)),
"heat_display": "{}播放".format(_format_num(stat.get("view", 0))),
"url": item.get("short_link_v2", "https://www.bilibili.com/video/{}".format(
item.get("bvid", "")
)),
"category": item.get("tname", ""),
"excerpt": item.get("description", "")[:100] if item.get("description") else ""
})
return results
# ========== 工具函数 ==========
def _format_num(n):
"""格式化数字"""
n = int(n)
if n >= 10000:
return "{:.1f}万".format(n / 10000)
elif n >= 1000:
return "{:.1f}k".format(n / 1000)
return str(n)
def filter_by_keyword(items, keyword):
"""按关键词过滤"""
kw = keyword.lower()
return [item for item in items if kw in item["title"].lower()]
def print_report(items, keyword=None):
"""打印热点报告"""
if not items:
print("\n📭 未抓取到热点数据")
return
# 按平台分组
by_platform = {}
for item in items:
p = item["platform"]
by_platform.setdefault(p, []).append(item)
print("\n" + "=" * 60)
if keyword:
print("📊 热点速览 | 关键词: 「{}」".format(keyword))
else:
print("📊 今日热点速览")
print("=" * 60)
for platform, topics in by_platform.items():
print("\n🔥 {} | {} 条".format(platform, len(topics)))
for i, t in enumerate(topics, 1):
heat = t.get("heat_display", "")
if heat:
print(" {}. {} 🔥{}".format(i, t["title"][:48], heat))
else:
print(" {}. {}".format(i, t["title"][:48]))
if t.get("excerpt"):
print(" {}".format(t["excerpt"][:60]))
print("\n" + "-" * 60)
print("💡 AI 分析:输入「分析选题」获取内容机会评分")
print("💡 输入「生成内容」直接改写为知乎/小红书文案")
# ========== 主入口 ==========
FETCHERS = {
"zhihu": fetch_zhihu,
"weibo": fetch_weibo,
"baidu": fetch_baidu,
"bilibili": fetch_bilibili,
}
PLATFORM_NAMES = {
"zhihu": "知乎",
"weibo": "微博",
"baidu": "百度",
"bilibili": "B站",
"all": "全部平台",
}
def main():
parser = argparse.ArgumentParser(description="多平台热点聚合")
parser.add_argument("--platform", "-p",
choices=list(FETCHERS.keys()) + ["all"],
default="all",
help="抓取平台(默认全部)")
parser.add_argument("--limit", "-n", type=int, default=15,
help="每个平台抓取条数(默认15)")
parser.add_argument("--keyword", "-k",
help="按关键词过滤")
parser.add_argument("--json", action="store_true",
help="输出 JSON 格式")
args = parser.parse_args()
# 确定要抓取的平台
if args.platform == "all":
platforms = list(FETCHERS.keys())
else:
platforms = [args.platform]
# 依次抓取
all_items = []
for name in platforms:
display_name = PLATFORM_NAMES.get(name, name)
print("📡 抓取 {}...".format(display_name), file=sys.stderr)
start = time.time()
items = FETCHERS[name](args.limit)
elapsed = time.time() - start
if items:
print(" ✅ {} 条 ({:.1f}s)".format(len(items), elapsed), file=sys.stderr)
else:
print(" ⚠️ 无数据 ({:.1f}s)".format(elapsed), file=sys.stderr)
all_items.extend(items)
# 关键词过滤
if args.keyword:
all_items = filter_by_keyword(all_items, args.keyword)
# 按热度排序
all_items.sort(key=lambda x: x.get("heat", 0), reverse=True)
# 输出
if args.json:
print(json.dumps(all_items, ensure_ascii=False, indent=2))
else:
print_report(all_items, args.keyword)
if __name__ == "__main__":
main()
FILE:skill.json
{"name":"cn-trends-ai","version":"1.0.0","description":"中文全平台热搜聚合"}
个人安全检查工具。检查账号安全状况和密码强度,生成安全评分报告。
---
name: security-health-check
description: "个人安全检查工具。检查账号安全状况和密码强度,生成安全评分报告。"
metadata: {"openclaw": {"emoji": "🔒"}}
---
# Security Health Check
检查账号安全状况和密码强度。
## 功能
- 账号安全状态检查(HIBP公开数据)
- 密码强度评估
- 安全评分报告(0-100分)
## 用法
```bash
python3 scripts/security_check.py --email [email protected]
python3 scripts/security_check.py --password "test"
python3 scripts/security_check.py --email [email protected] --report markdown
```
## 数据来源
- HIBP公开数据接口(k-匿名查询,数据不离开本地)
- 本地密码强度计算
FILE:ENTERPRISE_PLAN.md
# 企业安全审计 Skill 升级方案
> 规划时间:2026-04-19
> 基于现有版本:security-health-check v1.x(个人版)
> 目标版本:Enterprise Security Audit v2.0
---
## 一、现有能力回顾
| 功能 | 技术栈 | 数据源 |
|------|--------|--------|
| 邮箱泄露检查 | HIBP API v3 | haveibeenpwned.com |
| 密码泄露检查 | HIBP k-匿名 API | 本地SHA1前缀查询 |
| 密码强度分析 | zxcvbn-style 本地计算 | 内置弱密码字典 |
| 安全评分报告 | 本地综合计算 | — |
**现有架构**:单文件 Python(~16KB),无状态,无日志,轻量级,适合个人。
---
## 二、功能扩展规格
### 2.1 钓鱼邮件识别(Phishing Detection)
**功能描述**
通过 URL 安全分析 + 发件人域名信誉查询,判断邮件内容是否属于钓鱼攻击。支持批量导入.eml 文件或直接输入邮件 Header+Body。核心检测逻辑:域名年龄(注册 < 6 个月高风险)、相似域名识别(lookalike,如 g00gle.com)、已知的钓鱼域名黑名单(phishtank.com API)、URL 危险特征(短链接解包、IP 直连 URL、伪装的登录表单 URL)。输出:风险等级(安全/可疑/危险)+ 详细推理依据 + 处置建议。
**技术实现路径**
- `python-evalanche` / `phish tank API`(免费,查询已确认钓鱼域名)
- `whois` 库查询域名注册时间
- `tldextract` 识别根域名 + 注册商信息
- 自建 lookalike 域名相似度引擎(Levenshtein + 视觉混淆检测)
- 短链接解包:`unshorten-api` 或直接 HTTP HEAD 追踪重定向链
**API/库依赖**:phishtank (CSV), python-whois, tldextract, Levenshtein
---
### 2.2 密码策略审计(Password Policy Audit)
**功能描述**
面向企业场景,审计 Active Directory / LDAP / SaaS 平台(钉钉、企业微信、飞书)的密码策略合规性。同时提供"弱密码模式扫描"——将所有历史泄露密码的规律提取为正则模式(如 `YYYYMMDD` 生日格式、`companyName+123` 命名法),用于检测企业内部是否存在系统性弱密码文化。输入:管理员提供的密码哈希列表(SM3/SHA256,不可逆)或密码策略配置文件。输出:策略合规评分(8 分项)+ 高危模式列表 + 改进建议。
**技术实现路径**
- 密码模式挖掘:基于历史泄露数据(HIBP 免费泄露列表),提取 top 1000 密码的规律正则化(用 `exrex` 生成退化字符串)
- AD 密码策略解析:`python-ldap` 读取组策略(GPO)
- 飞书/钉钉密码策略:通过企业管理员 API 查询(需 OAuth2 授权)
- 弱密码检测正则库:自建 `weak_patterns.json`,含中文互联网常见弱密码模式
**API/库依赖**:python-ldap(可选)、exrex、re(内置)
---
### 2.3 API Key / Token 泄露扫描(Secret Scanning)
**功能描述**
扫描代码仓库、文档、配置文件中的硬编码凭证(API Key、Access Token、私钥、数据库连接字符串)。支持:GitHub 公开仓库扫描(通过 GitHub API + 正则匹配)、本地文件/目录扫描(支持 `.env`, `.json`, `.yaml`, `.py`, `.js` 等)、GitHub Secret Scanning API 集成(检测推送到仓库的凭证)。输出:泄露凭证清单(类型/位置/严重程度)+ 撤销建议 + 补救步骤。
**技术实现路径**
- 正则模式库:来自 `trufflehog` 项目开源规则(支持 300+ 凭证类型)
- GitHub API:遍历用户仓库 + 使用 Code Search API 搜索高风险路径
- 本地扫描:`python-regex` 批量匹配,支持多文件并行(`concurrent.futures`)
- GitHub Secret Scanning:调用 GitHub API `GET /repos/{owner}/{repo}/secret-scanning/alerts`
**API/库依赖**:trufflehog-regex(规则抽取)、PyGithub、python-regex、concurrent.futures(内置)
---
### 2.4 社会工程学风险评估(Social Engineering Risk Assessment)
**功能描述**
评估企业员工在社工攻击下的脆弱程度。核心模块:OSINT 个人信息收集(基于输入邮箱,收集该员工在公开社交网络的黑客视角资料:LinkedIn 职位/部门/GitHub 代码提交记录/演讲PPT暴露的组织架构),输出"攻击面报告"——如果攻击者要对这个员工发起钓鱼,成功率有多高。同时提供模拟钓鱼测试工具(生成假钓鱼邮件模板,供安全团队内部演练)。注意:本功能严格限制在用户自己的企业资产范围内,禁止扫描他人。
**技术实现路径**
- LinkedIn/OSINT:`apify.linkedin-scraper` 或直接搜索引擎查询(Google Custom Search API)
- GitHub OSINT:GitHub API 查询用户的代码提交贡献图和组织信息
- 社工攻击面评分算法:基于公开信息量 × 凭证关联度 × 职位敏感度综合打分
- 模拟钓鱼生成:模板引擎(Jinja2),内置 5 种钓鱼场景模板
**API/库依赖**:PyGithub、Google Custom Search API(免费 tier: 100次/天)、apify(可选)、Jinja2
---
### 2.5 勒索软件 Preparedness 检查(Ransomware Readiness Assessment)
**功能描述**
评估企业在遭受勒索软件攻击时的恢复能力。5 大检查维度:备份覆盖率(是否至少 3-2-1 备份:3份副本、2种介质、1份离线)、备份完整性(定期演练验证)、网络分段(关键资产是否与互联网隔离)、终端防护(EDR 安装率 + 病毒库更新率)、 Incident Response 计划(是否有成文的 IR 流程 + 联系人清单)。输出:Ransomware Readiness Score(RRS 0-100)+ 分维度雷达图 + Top 5 紧急修复项。
**技术实现路径**
- 问卷驱动评估:自动化问卷收集 20+ 关键安全控制项状态
- 云存储备份检查:调用 AWS S3 / 阿里云 OSS API 检查版本控制设置
- 网络分段检查:Nmap 扫描(需内网授权)或用户提供网络拓扑图
- EDR 覆盖率:SentinelOne/CrowdStrike API 查询(需企业授权)
- 报告生成:matplotlib 雷达图 + MarkDown 报告
**API/库依赖**:boto3(AWS)、aliyun-openapi-python-sdk(阿里云)、python-nmap(需安装nmap)、matplotlib
---
## 三、技术实现路径汇总
```
┌─────────────────────────────────────────────────────┐
│ Enterprise Skill v2.0 │
├──────────┬──────────┬──────────┬──────────┬────────┤
│ Phase 1 │ Phase 2 │ Phase 2 │ Phase 3 │ Phase3 │
│ 钓鱼检测 │ 密码审计 │ API泄露 │ 社工评估 │ 勒索防御│
├──────────┴──────────┴──────────┴──────────┴────────┤
│ 共用基础设施 │
│ • HIBP API(已有) • 正则凭证库 • 报告生成器 │
│ • 企业上下文管理(多用户Session) │
│ • 飞书/企微 Webhook 通知 │
└─────────────────────────────────────────────────────┘
```
---
## 四、定价建议
### 4.1 套餐设计
| 套餐 | 价格 | 目标用户 | 功能范围 |
|------|------|----------|----------|
| **Free** | ¥0 | 个人用户 | 邮箱泄露检查 + 密码强度分析(现有功能) |
| **Starter** | ¥999/年 | 小微企业(≤50人) | Free + 钓鱼邮件识别(每月50次)+ 密码策略基础审计 |
| **Professional** | ¥2,999/年 | 成长期企业(50-500人) | Starter + API Key泄露扫描(GitHub公开仓库)+ 社工风险评估(5人份)+ 勒索 Preparedness 基础版 |
| **Enterprise** | ¥4,999/年 | 大型企业(500人+) | Professional + 无限次扫描 + 多账号管理 + 全员社工评估 + 备份覆盖率API集成 + 专属报告 + Slack/企微告警推送 |
### 4.2 定价逻辑
- **Free 层**:留住用户,形成漏斗入口
- **¥999 Starter**:解决"我的员工会不会点钓鱼邮件"这个老板最痛的点
- **¥2,999 Professional**:安全合规需求(等保/ISO 27001 配套工具)
- **¥4,999 Enterprise**:CISO 采购清单,B2B 销售主力SKU
---
## 五、开发优先级(Phase 1/2/3)
### Phase 1(1-2个月,快速上线)
**目标**:在现有架构上最小化扩展,快速产生商业价值
| 功能 | 优先级 | 理由 |
|------|--------|------|
| 钓鱼邮件识别 | P0 | 用户感知最强,解决"这封邮件安不安全"的直接需求 |
| 密码策略审计 | P0 | 企业刚需,可直接用于现有员工密码整改 |
### Phase 2(3-4个月,能力深化)
**目标**:构建差异化竞争壁垒
| 功能 | 优先级 | 理由 |
|------|--------|------|
| API Key泄露扫描 | P1 | 开发者安全意识崛起,GitHub泄露事件频发 |
| 社会工程学风险评估 | P2 | 依赖OSINT,数据质量不稳定,推迟至Phase 2完善 |
### Phase 3(5-6个月,企业级完善)
**目标**:完整的企业安全运营体系
| 功能 | 优先级 | 理由 |
|------|--------|------|
| 勒索软件 Preparedness | P1 | 监管合规(等保2.0)驱动需求明确 |
| 完整社工评估 | P1 | Phase 2数据积累成熟后,评分模型更准确 |
| 企业级管理后台 | P0 | 多租户、报告导出、SSO集成 |
---
## 六、关键里程碑
```
Month 1: Phase 1 MVP
- 钓鱼邮件识别(URL分析 + 黑名单)上线
- 基础密码策略问卷审计上线
Month 3: Phase 2 扩展
- API Key泄露扫描上线(GitHub集成)
- 飞书文档 + 定价页上线
Month 6: Phase 3 企业版
- 勒索 Preparedness 检查上线
- 企业管理后台 + 多租户
- 完整报告 + Webhook告警
```
---
## 七、风险与依赖
| 风险 | 影响 | 缓解措施 |
|------|------|----------|
| HIBP API 速率限制 | 免费版限制严格 | 提供 HIBP API Key 配置入口(用户自备) |
| OSINT 数据合规性 | 社工评估可能触碰隐私法规 | 明确仅扫描用户授权的自身资产,增加同意书 |
| GitHub API 配额 | 公开仓库扫描受限 | 企业GitHub Token可提升配额至5000次/小时 |
| 钓鱼检测误报 | 影响用户体验 | 提供"加入白名单"功能,允许人工复核 |
FILE:README.md
# 🔒 Security Health Check — 个人数字安全体检
一键检查你的数字安全状况:邮箱泄露、密码强度、安全评分。
## 功能
| 检查项 | 说明 | 数据来源 |
|--------|------|----------|
| 📧 邮箱泄露 | 检查邮箱是否出现在数据泄露中 | HIBP API |
| 🔑 密码泄露 | 检查密码是否已被泄露(k-匿名,密码不离开本地) | HIBP Password API |
| 💪 密码强度 | 分析密码复杂度和暴力破解时间 | 本地计算 |
| 📊 安全评分 | 综合评分 0-100 分 | 综合分析 |
| 💡 安全建议 | 基于 HIBP 数据提供修复建议 | 智能推荐 |
## 安装
```bash
npx clawhub install security-health-check
```
## 使用
```bash
# 完整体检
python3 scripts/security_check.py --email [email protected]
# 仅检查密码
python3 scripts/security_check.py --password "your_password"
# JSON 格式输出
python3 scripts/security_check.py --email [email protected] --json
```
## 隐私
- **密码永远不离开本地**:使用 HIBP k-匿名前缀查询,只发送 SHA1 前5位
- **邮箱检查结果仅展示摘要**:不存储泄露原始数据
- **报告可随时删除**:所有生成的报告可手动清理
## License
MIT-0
FILE:scripts/security_check.py
import os
#!/usr/bin/env python3
"""
个人数字安全体检工具 - security-health-check Skill
检查邮箱泄露、密码强度、生成安全评分报告
"""
import hashlib
import json
import math
import re
import ssl
import sys
import certifi
import urllib.request
import urllib.error
from datetime import datetime
# SSL 上下文:优先验证,失败时回退
def _get_ssl_context():
"""创建SSL上下文,兼容证书不完整的环境"""
ctx = ssl.create_default_context()
ctx.load_verify_locations(certifi.where())
return ctx
def check_email_breach(email):
"""通过 HIBP API 检查邮箱是否出现在已知数据泄露中"""
url = f"https://haveibeenpwned.com/api/v3/breachedaccount/{urllib.parse.quote(email)}?truncateResponse=false"
headers = {
"User-Agent": "SecurityHealthCheck-Skill/1.0",
"hibp-api-key": os.environ.get("HIBP_API_KEY", "") # Optional: set HIBP_API_KEY env var for higher rate limits
}
try:
req = urllib.request.Request(url, headers=headers)
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=_get_ssl_context()))
resp = opener.open(req, timeout=15)
if resp.status == 200:
breaches = json.loads(resp.read().decode("utf-8"))
return {"breached": True, "breaches": breaches, "count": len(breaches)}
except urllib.error.HTTPError as e:
if e.code == 404:
return {"breached": False, "breaches": [], "count": 0}
elif e.code == 429:
return {"breached": None, "error": "API速率限制,请稍后再试", "breaches": [], "count": 0}
else:
return {"breached": None, "error": f"API错误: {e.code}", "breaches": [], "count": 0}
except Exception as e:
return {"breached": None, "error": f"网络错误: {str(e)}", "breaches": [], "count": 0}
return {"breached": False, "breaches": [], "count": 0}
# ============================================================
# 密码泄露检查 (HIBP k-匿名 API)
# ============================================================
def check_password_pwned(password):
"""使用 HIBP k-匿名前缀查询,密码不离开本地"""
sha1 = hashlib.sha1(password.encode("utf-8")).hexdigest().upper()
prefix, suffix = sha1[:5], sha1[5:]
url = f"https://api.pwnedpasswords.com/range/{prefix}"
headers = {"User-Agent": "SecurityHealthCheck-Skill/1.0", "Add-Padding": "true"}
try:
req = urllib.request.Request(url, headers=headers)
try:
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=_get_ssl_context()))
resp = opener.open(req, timeout=15)
except (ssl.SSLError, urllib.error.URLError) as ssl_err:
if "CERTIFICATE" in str(ssl_err) or "SSL" in str(ssl_err):
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=ssl.create_default_context()))
resp = opener.open(req, timeout=15)
else:
raise
text = resp.read().decode("utf-8")
for line in text.splitlines():
parts = line.strip().split(":")
if len(parts) == 2 and parts[0] == suffix:
count = int(parts[1])
return {"pwned": True, "count": count}
return {"pwned": False, "count": 0}
except Exception as e:
return {"pwned": None, "error": f"查询失败: {str(e)}", "count": 0}
# ============================================================
# 密码强度分析 (本地计算)
# ============================================================
COMMON_PASSWORDS = {
"password", "123456", "12345678", "qwerty", "abc123", "monkey", "master",
"dragon", "login", "princess", "football", "shadow", "sunshine", "trustno1",
"iloveyou", "batman", "access", "hello", "charlie", "donald", "password1",
"qwerty123", "letmein", "welcome", "admin", "passw0rd", "1234567890",
"000000", "123123", "111111", "1q2w3e4r", "password123", "mima123456",
"woaini", "woaini1314", "abcd1234", "123321", "666666", "888888",
}
COMMON_WORDS = {
"password", "admin", "login", "welcome", "hello", "love", "secret",
"sunshine", "dragon", "master", "monkey", "shadow", "qwerty",
"abcdef", "baseball", "football", "soccer", "hockey",
}
def analyze_password_strength(password):
"""本地分析密码强度"""
result = {
"length": len(password),
"has_upper": bool(re.search(r"[A-Z]", password)),
"has_lower": bool(re.search(r"[a-z]", password)),
"has_digit": bool(re.search(r"\d", password)),
"has_special": bool(re.search(r"[^A-Za-z0-9]", password)),
"is_common": password.lower() in COMMON_PASSWORDS,
"has_common_word": any(w in password.lower() for w in COMMON_WORDS),
"is_sequential": _has_sequential(password),
"is_repeated": _has_repeated(password),
}
# 计算熵
charset_size = 0
if result["has_upper"]: charset_size += 26
if result["has_lower"]: charset_size += 26
if result["has_digit"]: charset_size += 10
if result["has_special"]: charset_size += 32
if charset_size == 0: charset_size = 26 # 默认小写
entropy = len(password) * math.log2(charset_size) if charset_size > 0 else 0
result["entropy_bits"] = round(entropy, 1)
# 估算暴力破解时间(假设10亿次/秒)
combinations = charset_size ** len(password) if charset_size > 0 else 0
seconds = combinations / 1e9 if combinations > 0 else 0
result["crack_time"] = _format_time(seconds)
result["crack_time_seconds"] = seconds
# 评分 (0-100)
score = 0
score += min(len(password) * 4, 40) # 长度加分,最多40
if result["has_upper"]: score += 10
if result["has_lower"]: score += 5
if result["has_digit"]: score += 10
if result["has_special"]: score += 15
if entropy > 60: score += 10
if entropy > 80: score += 10
# 扣分
if result["is_common"]: score = max(score - 40, 5)
if result["has_common_word"]: score = max(score - 20, 5)
if result["is_sequential"]: score = max(score - 15, 5)
if result["is_repeated"]: score = max(score - 10, 5)
if len(password) < 6: score = max(score - 20, 5)
result["score"] = min(max(score, 0), 100)
# 评级
if result["score"] >= 80:
result["grade"] = "强"
result["emoji"] = "🟢"
elif result["score"] >= 60:
result["grade"] = "中等"
result["emoji"] = "🟡"
elif result["score"] >= 40:
result["grade"] = "弱"
result["emoji"] = "🟠"
else:
result["grade"] = "极弱"
result["emoji"] = "🔴"
return result
def _has_sequential(s):
"""检测连续字符序列"""
sequences = [
"abcdefghijklmnopqrstuvwxyz",
"zyxwvutsrqponmlkjihgfedcba",
"01234567890",
"09876543210",
"qwertyuiop",
"asdfghjkl",
"zxcvbnm",
]
s_lower = s.lower()
for seq in sequences:
for i in range(len(seq) - 2):
if seq[i:i+3] in s_lower:
return True
return False
def _has_repeated(s):
"""检测重复字符"""
for i in range(len(s) - 2):
if s[i] == s[i+1] == s[i+2]:
return True
return False
def _format_time(seconds):
"""将秒数格式化为可读时间"""
if seconds < 1:
return "瞬间"
elif seconds < 60:
return f"{seconds:.0f}秒"
elif seconds < 3600:
return f"{seconds/60:.0f}分钟"
elif seconds < 86400:
return f"{seconds/3600:.1f}小时"
elif seconds < 86400 * 365:
return f"{seconds/86400:.0f}天"
elif seconds < 86400 * 365 * 1000:
return f"{seconds/(86400*365):.0f}年"
elif seconds < 86400 * 365 * 1e6:
return f"{seconds/(86400*365*1000):.0f}千年"
elif seconds < 86400 * 365 * 1e9:
return f"{seconds/(86400*365*1e6):.0f}百万年"
else:
return "宇宙级别"
# ============================================================
# 安全评分计算
# ============================================================
def calculate_security_score(breach_result, password_result=None):
"""综合安全评分 0-100"""
score = 100
# 邮箱泄露扣分
if breach_result.get("breached"):
count = breach_result.get("count", 0)
score -= min(count * 8, 40) # 每次泄露扣8分,最多扣40
elif breach_result.get("breached") is None:
score -= 5 # 检查失败,略扣
# 密码安全扣分
if password_result:
if password_result.get("pwned"):
score -= 30 # 密码已泄露,重扣
count = password_result.get("count", 0)
if count > 1000:
score -= 10
pw_strength = password_result.get("score", 50)
if pw_strength < 60:
score -= 15
elif pw_strength < 80:
score -= 5
score = max(score, 0)
if score >= 80:
level = "安全"
emoji = "🟢"
elif score >= 60:
level = "基本安全"
emoji = "🟡"
elif score >= 40:
level = "存在风险"
emoji = "🟠"
else:
level = "高危"
emoji = "🔴"
return {"score": score, "level": level, "emoji": emoji}
# ============================================================
# 报告生成
# ============================================================
def generate_report(email, breach_result, password_result=None, password_strength=None, format="markdown"):
"""生成安全体检报告"""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
# 遮蔽邮箱
if email and "@" in email:
local, domain = email.split("@", 1)
masked = f"{local[0]}***@{domain}"
elif email:
masked = f"{email[:2]}***"
else:
masked = "未提供"
# 安全评分
sec_score = calculate_security_score(breach_result, password_result)
lines = []
lines.append(f"🔒 个人数字安全体检报告")
lines.append(f"━━━━━━━━━━━━━━━━━━━━━")
lines.append(f"")
lines.append(f"📧 邮箱:{masked}")
lines.append(f"🕐 检查时间:{now}")
lines.append(f"📊 安全评分:{sec_score['emoji']} {sec_score['score']}/100({sec_score['level']})")
lines.append(f"")
# 邮箱泄露详情
lines.append(f"├─ 邮箱泄露检查:")
if breach_result.get("breached") is True:
lines.append(f"│ ⚠️ 发现{breach_result['count']}次泄露")
for b in breach_result.get("breaches", []):
name = b.get("Name", "未知")
date = b.get("BreachDate", "未知")
data = ", ".join(b.get("DataClasses", []))
lines.append(f"│ ├─ {name} ({date}) — {data}")
elif breach_result.get("breached") is False:
lines.append(f"│ ✅ 未发现泄露")
else:
err = breach_result.get("error", "未知错误")
lines.append(f"│ ❓ 检查失败:{err}")
lines.append(f"")
# 密码安全
if password_result or password_strength:
lines.append(f"├─ 密码安全检查:")
if password_result:
if password_result.get("pwned"):
lines.append(f"│ 🔴 密码已泄露!出现在{password_result['count']:,}次泄露中")
lines.append(f"│ → 立即更换此密码!")
elif password_result.get("pwned") is False:
lines.append(f"│ ✅ 密码未出现在已知泄露中")
else:
lines.append(f"│ ❓ 密码泄露检查失败:{password_result.get('error', '')}")
if password_strength:
lines.append(f"│ ")
lines.append(f"│ 密码强度:{password_strength['emoji']} {password_strength['grade']}({password_strength['score']}/100)")
lines.append(f"│ ├─ 长度:{password_strength['length']}字符")
chars = []
if password_strength["has_upper"]: chars.append("大写")
if password_strength["has_lower"]: chars.append("小写")
if password_strength["has_digit"]: chars.append("数字")
if password_strength["has_special"]: chars.append("特殊字符")
lines.append(f"│ ├─ 包含:{'、'.join(chars) if chars else '无'}")
lines.append(f"│ ├─ 熵:{password_strength['entropy_bits']} bits")
lines.append(f"│ └─ 暴力破解时间:{password_strength['crack_time']}")
if password_strength["is_common"]:
lines.append(f"│ ⚠️ 这是常见密码,极易被破解!")
if password_strength["has_common_word"]:
lines.append(f"│ ⚠️ 包含常见词汇,建议替换")
lines.append(f"")
# 安全建议
lines.append(f"├─ 💡 安全建议:")
suggestions = []
if breach_result.get("breached"):
suggestions.append(("🔴", "立即更换相关泄露网站的密码"))
suggestions.append(("🔴", "不同网站使用不同密码"))
suggestions.append(("🟡", "启用两步验证(2FA)— 推荐 Google Authenticator"))
suggestions.append(("🟡", "使用密码管理器(推荐 Bitwarden/1Password)"))
suggestions.append(("🟢", "定期进行安全体检(建议每30天一次)"))
suggestions.append(("🟢", "不要在公共WiFi下输入重要密码"))
if password_strength and password_strength["score"] < 60:
suggestions.insert(0, ("🔴", "当前密码过弱,请立即更换为12位以上的复杂密码"))
for emoji, text in suggestions:
lines.append(f"│ {emoji} {text}")
lines.append(f"")
lines.append(f"━━━━━━━━━━━━━━━━━━━━━")
lines.append(f"💡 下次检查建议:30天后")
return "\n".join(lines)
# ============================================================
# 主入口
# ============================================================
def main():
import argparse
parser = argparse.ArgumentParser(description="🔒 个人数字安全体检工具")
parser.add_argument("--email", help="要检查的邮箱地址")
parser.add_argument("--password", help="要检查强度的密码")
parser.add_argument("--check", choices=["breach", "password"], help="仅检查某一项")
parser.add_argument("--report", choices=["markdown", "json"], default="markdown", help="报告格式")
parser.add_argument("--json", action="store_true", help="以JSON格式输出")
args = parser.parse_args()
if not args.email and not args.password:
parser.print_help()
print("\n⚠️ 请至少提供 --email 或 --password 参数")
sys.exit(1)
breach_result = {"breached": None, "breaches": [], "count": 0}
password_result = None
password_strength = None
# 邮箱泄露检查
if args.email and (not args.check or args.check == "breach"):
print(f"🔍 正在检查邮箱泄露情况...", file=sys.stderr)
breach_result = check_email_breach(args.email)
# 密码检查
if args.password and (not args.check or args.check == "password"):
print(f"🔍 正在检查密码泄露情况...", file=sys.stderr)
password_result = check_password_pwned(args.password)
print(f"🔍 正在分析密码强度...", file=sys.stderr)
password_strength = analyze_password_strength(args.password)
# 输出
if args.json or args.report == "json":
output = {
"timestamp": datetime.now().isoformat(),
"email": args.email,
"breach_check": breach_result,
"password_check": password_result,
"password_strength": password_strength,
"security_score": calculate_security_score(breach_result, password_result),
}
print(json.dumps(output, ensure_ascii=False, indent=2))
else:
report = generate_report(args.email, breach_result, password_result, password_strength)
print(report)
if __name__ == "__main__":
import urllib.parse
main()
多平台热点聚合与选题分析。一站式抓取知乎热榜、微博热搜、百度热搜等中国主流平台实时热点, AI 分析热度趋势,自动匹配内容机会评分,帮助创作者精准选题。 触发场景:用户说"今天什么热点"、"帮我看看热搜"、"找选题"、"今天写什么"、 "知乎热榜"、"微博热搜"、"热搜分析"、"趋势分析"、"什么话题火"。 Ke...
---
name: multi-platform-trend
description: |
多平台热点聚合与选题分析。一站式抓取知乎热榜、微博热搜、百度热搜等中国主流平台实时热点,
AI 分析热度趋势,自动匹配内容机会评分,帮助创作者精准选题。
触发场景:用户说"今天什么热点"、"帮我看看热搜"、"找选题"、"今天写什么"、
"知乎热榜"、"微博热搜"、"热搜分析"、"趋势分析"、"什么话题火"。
Keywords: 热搜, 热榜, 热点, 选题, trending, hot topic, 知乎热榜, 微博热搜, 今日热点.
---
# 多平台热点聚合
一站式抓取中国主流平台实时热点,AI 分析并推荐最佳选题方向。
## 核心能力
1. **多平台抓取** — 知乎热榜、微博热搜、百度热搜、B站排行榜
2. **智能分类** — 自动按领域分类(科技、商业、社会、娱乐等)
3. **AI 选题评分** — 评估每个热点的「内容机会值」(热度 × 契合度 × 竞争窗口)
4. **选题推荐** — 根据创作者定位,推荐最适合写作的热点
## 快速开始
用户说"今天什么热点"时:
```
1. 运行 python3 scripts/fetch_trends.py --all
2. AI 分析热点,按领域分类
3. 评估每个热点的内容机会
4. 推荐 Top 5 选题 + 写作角度建议
```
## 支持平台
| 平台 | 数据源 | 频率 | 更新速度 |
|------|--------|------|----------|
| 知乎热榜 | api.zhihu.com | 实时 | ⚡ 最快 |
| 微博热搜 | weibo.com/ajax | 实时 | ⚡ 最快 |
| 百度热搜 | top.baidu.com | 实时 | ⚡ 最快 |
| B站排行榜 | api.bilibili.com | 每日 | 🕐 次日更新 |
| 36氪热榜 | 36kr.com/feed | 实时 | ⚡ 最快 |
## 使用方式
### 查看所有平台热点
```bash
python3 scripts/fetch_trends.py --all --limit 20
```
### 查看单平台
```bash
python3 scripts/fetch_trends.py --platform zhihu --limit 10
python3 scripts/fetch_trends.py --platform weibo --limit 10
python3 scripts/fetch_trends.py --platform baidu --limit 10
python3 scripts/fetch_trends.py --platform bilibili --limit 10
```
### 按关键词过滤
```bash
python3 scripts/fetch_trends.py --all --keyword "AI"
python3 scripts/fetch_trends.py --all --keyword "科技"
```
### JSON 输出(供程序处理)
```bash
python3 scripts/fetch_trends.py --all --json
```
## AI 选题分析
收到热点数据后,AI 自动执行:
```
1. 领域分类
将热点分为:科技/AI、商业/财经、社会/民生、娱乐/体育、教育/职场、其他
2. 内容机会评分(0-100)
评分维度:
- 🔥 热度分(30%):热度值越高分越高
- 🎯 契合度(30%):与用户定位的匹配度
- ⏰ 时效窗口(20%):热点越新窗口越大
- 📝 竞争度(20%):回答/文章越少,机会越大
3. 选题推荐
Top 5 推荐,每个包含:
- 热点标题
- 内容机会评分
- 推荐写作角度
- 建议发布平台
- 预估阅读量区间
```
## 输出格式
### 文字报告
```
📊 今日热点速览(2026-04-11 06:30)
🔥 科技/AI
1. [92分] 追觅科技砸2亿年薪招首席科学家
→ 角度:AI人才争夺战背后的行业真相
→ 建议:知乎长文 + 小红书短评
2. [85分] OpenAI发布新模型
→ 角度:实测对比+实用场景
→ 建议:知乎回答 + 公众号深度
📰 商业/财经
3. [78分] 某品牌被曝都是假洋牌
→ 角度:国货崛起的商业逻辑
→ 建议:知乎文章 + 小红书种草
---
💡 今日最佳选题:追觅科技AI人才争夺战
- 热度高 + AI相关 + 竞争少 = 优质窗口
- 建议在2小时内发布(时效窗口)
```
### JSON 输出
```json
[
{
"title": "...",
"platform": "zhihu",
"heat": 1520000,
"category": "科技/AI",
"opportunity_score": 92,
"suggested_angle": "AI人才争夺战",
"suggested_platform": ["知乎", "小红书"]
}
]
```
## 配置项
首次使用时向用户确认:
| 配置 | 说明 | 默认值 |
|------|------|--------|
| 监控平台 | 抓取哪些平台 | 知乎+微博+百度 |
| 过滤关键词 | 只看哪些领域 | 无(全部) |
| 创作者定位 | 内容方向定位 | AI/科技 |
配置确认后记录到 memory/YYYY-MM-DD.md。
## 与其他 Skill 的配合
- **rss-content-flow**:热点 + RSS = 完整内容来源
- **content-creator**:选题 → 生成内容 → 发布的完整链路
## 文件结构
```
multi-platform-trend/
├── SKILL.md # 本文件
├── README.md # 用户文档
├── scripts/
│ ├── fetch_trends.py # 热点抓取脚本
│ └── config.json # 用户配置(运行时生成)
└── references/
└── scoring_guide.md # 选题评分指南
```
## 注意事项
- API 请求设置合理的超时(10秒),失败时自动跳过
- 微博热搜需要 User-Agent 伪装,已在脚本中处理
- 热度数据为估算值,仅供参考
- AI 分析基于标题和热度,无法获取文章完整内容
- 建议结合 rss-content-flow 获取完整文章内容后再写作
FILE:README.md
# multi-platform-trend
多平台热点聚合与选题分析。一站式抓取知乎热榜、微博热搜、百度热搜等中国主流平台实时热点,AI 分析热度趋势并推荐最佳选题方向。
## 功能
- 🔥 **多平台抓取** — 知乎热榜、微博热搜、百度热搜、B站排行榜
- 🏷️ **智能分类** — AI 自动按领域分类(科技、商业、社会、娱乐等)
- 📊 **选题评分** — 评估每个热点的「内容机会值」(热度×契合度×竞争窗口)
- 💡 **选题推荐** — 根据创作者定位推荐最适合写作的热点
- ⏰ **定时监控** — 每日定时抓取,新热点即时提醒
## 使用方法
### 触发词
"今天什么热点"、"帮我看看热搜"、"找选题"、"知乎热榜"、"微博热搜"、"热搜分析"
### 查看所有平台热点
```bash
python3 scripts/fetch_trends.py -p all -n 20
```
### 查看单平台
```bash
python3 scripts/fetch_trends.py -p zhihu -n 10
python3 scripts/fetch_trends.py -p weibo -n 10
```
### 按关键词过滤
```bash
python3 scripts/fetch_trends.py -p all -k "AI"
python3 scripts/fetch_trends.py -p all -k "科技"
```
### 输出示例
```
📊 今日热点速览
🔥 知乎 | 30 条
1. 警方通报全红婵遭网暴案 🔥1065万热度
2. 小米食堂发布「小米」冰激凌 🔥207万热度
3. 追觅科技砸2亿年薪招首席科学家 🔥152万热度
🔥 微博 | 10 条
1. 男子微信群多次侮辱全红婵被拘 🔥413926
2. 这些品牌竟然都是假洋牌 🔥292902
```
## 支持平台
| 平台 | 状态 | 数据量 | 更新速度 |
|------|------|--------|----------|
| 知乎热榜 | ✅ 稳定 | Top 50 | 实时 |
| 微博热搜 | ✅ 稳定 | Top 50 | 实时 |
| 百度热搜 | ⚠️ 部分可用 | Top 50 | 实时 |
| B站排行榜 | ⚠️ 部分可用 | Top 100 | 每日 |
## AI 选题分析工作流
```
热点数据抓取
↓
AI 领域分类(科技/商业/社会/娱乐/教育)
↓
内容机会评分(0-100分)
- 🔥 热度分 30%
- 🎯 契合度 30%
- ⏰ 时效窗口 20%
- 📝 竞争度 20%
↓
Top 5 选题推荐
- 写作角度建议
- 建议发布平台
- 预估阅读量区间
```
## 与其他 Skill 配合
- **rss-content-flow**:热点(实时) + RSS(深度) = 完整内容来源
- **social-media-poster**:选题确定后直接发布
- **content-creator**:选题 → 生成 → 发布完整链路
- **feishu-daily-report**:热点分析纳入日报
## 系统要求
- Python 3.8+
- 可访问知乎/微博 API(部分环境需要代理)
## License
MIT
FILE:scripts/fetch_trends.py
#!/usr/bin/env python3
"""
多平台热点聚合抓取引擎
支持:知乎热榜、微博热搜、百度热搜、B站排行榜
用法:
python3 fetch_trends.py --all --limit 20
python3 fetch_trends.py --platform zhihu --limit 10
python3 fetch_trends.py --all --keyword "AI" --json
"""
import json
import os
import sys
import ssl
import argparse
import time
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# SSL 配置:优先标准验证,失败才降级
_SAFE_SSL_CTX = ssl.create_default_context() # 标准验证
def _urlopen(req, timeout=10):
"""优先标准SSL,失败自动降级"""
try:
return urlopen(req, timeout=timeout, context=_SAFE_SSL_CTX)
except URLError as e:
# SSL证书验证失败时降级
reason = getattr(e, 'reason', None)
if isinstance(reason, (ssl.SSLError, ssl.SSLCertVerificationError)):
return urlopen(req, timeout=timeout, context=ssl.create_default_context())
raise
except Exception as e:
# 其他SSL错误也尝试降级
if 'SSL' in str(type(e).__name__) or 'SSL' in str(e):
return urlopen(req, timeout=timeout, context=ssl.create_default_context())
raise
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
}
# ========== 知乎热榜 ==========
def fetch_zhihu(limit=20):
"""抓取知乎热榜"""
url = "https://api.zhihu.com/topstory/hot-lists/total?limit={}".format(limit)
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 知乎热榜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
for item in data.get("data", []):
target = item.get("target", {})
title = target.get("title", "").strip()
if not title:
continue
# 提取热度数值
detail = item.get("detail_text", "")
heat_num = 0
if "万" in detail:
try:
heat_num = float(detail.replace("万热度", "").replace("万", "").strip()) * 10000
except ValueError:
pass
elif "热度" in detail:
try:
heat_num = int(detail.replace("热度", "").strip())
except ValueError:
pass
results.append({
"title": title,
"platform": "知乎",
"heat": int(heat_num),
"heat_display": detail,
"url": "https://www.zhihu.com/question/{}".format(target.get("id", "")),
"category": "",
"excerpt": (target.get("excerpt", "") or "")[:100]
})
return results
# ========== 微博热搜 ==========
def fetch_weibo(limit=20):
"""抓取微博热搜"""
url = "https://weibo.com/ajax/side/hotSearch"
headers = {**HEADERS, "Referer": "https://weibo.com/"}
try:
req = Request(url, headers=headers)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 微博热搜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
realtime = data.get("data", {}).get("realtime", [])
for item in realtime[:limit]:
note = item.get("note", "").strip()
if not note:
continue
note = item.get("note", "").strip()
if not note:
continue
label = item.get("label_name", "")
if label:
note = "[{}]{}".format(label, note)
results.append({
"title": note,
"platform": "微博",
"heat": int(item.get("num", 0)),
"heat_display": str(item.get("num", 0)),
"url": "https://s.weibo.com/weibo?q=%23{}%23".format(
item.get("word", item.get("note", ""))
),
"category": "",
"excerpt": ""
})
return results
# ========== 百度热搜 ==========
def fetch_baidu(limit=20):
"""抓取百度热搜"""
url = "https://top.baidu.com/api/board?platform=wise&tab=realtime"
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ 百度热搜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
cards = data.get("data", {}).get("cards", [])
if cards:
content = cards[0].get("content", [])
for item in content[:limit]:
query = item.get("query", "").strip()
if not query:
continue
desc = item.get("desc", "")
heat_num = 0
try:
heat_num = int(item.get("hotScore", 0))
except (ValueError, TypeError):
pass
results.append({
"title": query,
"platform": "百度",
"heat": heat_num,
"heat_display": desc[:30] if desc else "",
"url": "https://www.baidu.com/s?wd={}".format(query),
"category": item.get("tag", ""),
"excerpt": desc[:100] if desc else ""
})
return results
# ========== B站排行榜 ==========
def fetch_bilibili(limit=20):
"""抓取B站排行榜"""
url = "https://api.bilibili.com/x/web-interface/ranking/v2?rid=0&type=all"
try:
req = Request(url, headers=HEADERS)
with _urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(" ⚠️ B站排行榜抓取失败: {}".format(e), file=sys.stderr)
return []
results = []
items = data.get("data", {}).get("list", [])
for item in items[:limit]:
title = item.get("title", "").strip()
if not title:
continue
stat = item.get("stat", {})
results.append({
"title": title,
"platform": "B站",
"heat": int(stat.get("view", 0)),
"heat_display": "{}播放".format(_format_num(stat.get("view", 0))),
"url": item.get("short_link_v2", "https://www.bilibili.com/video/{}".format(
item.get("bvid", "")
)),
"category": item.get("tname", ""),
"excerpt": item.get("description", "")[:100] if item.get("description") else ""
})
return results
# ========== 工具函数 ==========
def _format_num(n):
"""格式化数字"""
n = int(n)
if n >= 10000:
return "{:.1f}万".format(n / 10000)
elif n >= 1000:
return "{:.1f}k".format(n / 1000)
return str(n)
def filter_by_keyword(items, keyword):
"""按关键词过滤"""
kw = keyword.lower()
return [item for item in items if kw in item["title"].lower()]
def print_report(items, keyword=None):
"""打印热点报告"""
if not items:
print("\n📭 未抓取到热点数据")
return
# 按平台分组
by_platform = {}
for item in items:
p = item["platform"]
by_platform.setdefault(p, []).append(item)
print("\n" + "=" * 60)
if keyword:
print("📊 热点速览 | 关键词: 「{}」".format(keyword))
else:
print("📊 今日热点速览")
print("=" * 60)
for platform, topics in by_platform.items():
print("\n🔥 {} | {} 条".format(platform, len(topics)))
for i, t in enumerate(topics, 1):
heat = t.get("heat_display", "")
if heat:
print(" {}. {} 🔥{}".format(i, t["title"][:48], heat))
else:
print(" {}. {}".format(i, t["title"][:48]))
if t.get("excerpt"):
print(" {}".format(t["excerpt"][:60]))
print("\n" + "-" * 60)
print("💡 AI 分析:输入「分析选题」获取内容机会评分")
print("💡 输入「生成内容」直接改写为知乎/小红书文案")
# ========== 主入口 ==========
FETCHERS = {
"zhihu": fetch_zhihu,
"weibo": fetch_weibo,
"baidu": fetch_baidu,
"bilibili": fetch_bilibili,
}
PLATFORM_NAMES = {
"zhihu": "知乎",
"weibo": "微博",
"baidu": "百度",
"bilibili": "B站",
"all": "全部平台",
}
def main():
parser = argparse.ArgumentParser(description="多平台热点聚合")
parser.add_argument("--platform", "-p",
choices=list(FETCHERS.keys()) + ["all"],
default="all",
help="抓取平台(默认全部)")
parser.add_argument("--limit", "-n", type=int, default=15,
help="每个平台抓取条数(默认15)")
parser.add_argument("--keyword", "-k",
help="按关键词过滤")
parser.add_argument("--json", action="store_true",
help="输出 JSON 格式")
args = parser.parse_args()
# 确定要抓取的平台
if args.platform == "all":
platforms = list(FETCHERS.keys())
else:
platforms = [args.platform]
# 依次抓取
all_items = []
for name in platforms:
display_name = PLATFORM_NAMES.get(name, name)
print("📡 抓取 {}...".format(display_name), file=sys.stderr)
start = time.time()
items = FETCHERS[name](args.limit)
elapsed = time.time() - start
if items:
print(" ✅ {} 条 ({:.1f}s)".format(len(items), elapsed), file=sys.stderr)
else:
print(" ⚠️ 无数据 ({:.1f}s)".format(elapsed), file=sys.stderr)
all_items.extend(items)
# 关键词过滤
if args.keyword:
all_items = filter_by_keyword(all_items, args.keyword)
# 按热度排序
all_items.sort(key=lambda x: x.get("heat", 0), reverse=True)
# 输出
if args.json:
print(json.dumps(all_items, ensure_ascii=False, indent=2))
else:
print_report(all_items, args.keyword)
if __name__ == "__main__":
main()
RSS 订阅 + AI 内容流引擎。监控多个 RSS 源,自动抓取最新文章, AI 分析提取核心观点,一键生成适配知乎/小红书/公众号的原创内容。 触发场景:用户说"帮我找今天的选题"、"RSS订阅"、"监控XX的更新"、 "把XX的文章改写成小红书"、"我需要一些内容灵感"、"自动生成今日内容"。 Keywor...
---
name: rss-content-flow
description: |
RSS 订阅 + AI 内容流引擎。监控多个 RSS 源,自动抓取最新文章,
AI 分析提取核心观点,一键生成适配知乎/小红书/公众号的原创内容。
触发场景:用户说"帮我找今天的选题"、"RSS订阅"、"监控XX的更新"、
"把XX的文章改写成小红书"、"我需要一些内容灵感"、"自动生成今日内容"。
Keywords: RSS, 订阅, 内容选题, 今日内容, 内容灵感, 文章改写, feed, 监控.
---
# RSS 内容流
将 RSS 订阅转化为结构化内容素材,一站式生成多平台发布文案。
## 核心能力
1. **RSS 订阅管理** — 添加/删除/列出订阅源
2. **智能抓取** — 自动获取最新 N 篇文章
3. **AI 分析** — 提取核心观点、关键数据、金句
4. **内容生成** — 按平台风格改写(知乎/小红书/公众号)
5. **草稿保存** — 保存到本地文件或直接输出
## 快速开始
用户说"帮我找今天的选题"时:
```
1. 读取 ~/.qclaw/skills/rss-content-flow/scripts/config.json(订阅列表)
2. 用 fetch_feed() 获取各源最新条目
3. AI 分析后呈现 3-5 个可用选题
4. 用户确认后 → 生成完整内容
```
## 订阅管理
### 添加订阅源
```bash
python3 scripts/manage_feeds.py --add <name> <url>
# 示例:python3 scripts/manage_feeds.py --add 36氪 https://36kr.com/feed
```
### 列出订阅
```bash
python3 scripts/manage_feeds.py --list
```
### 删除订阅
```bash
python3 scripts/manage_feeds.py --remove <name>
```
### 默认订阅源(预配置)
| 名称 | URL | 内容方向 |
|------|-----|---------|
| 36氪 | https://36kr.com/feed | 科技/商业 |
| 虎嗅 | https://www.huxiu.com/rss/0.xml | 商业/创业 |
| 少数派 | https://sspai.com/feed | 效率/工具 |
| AI研习社 | https://ai.googleblog.com/feed/ | AI/技术 |
| OpenAI Blog | https://OpenAI.com/blog/rss.xml | AI产品 |
## 内容抓取
### 单源抓取
```bash
python3 scripts/fetch_feed.py --source 36氪 --limit 5
```
### 全源抓取
```bash
python3 scripts/fetch_feed.py --all --limit 3
```
输出示例:
```
📰 36氪 | 4篇新文章
① [AI创业] 标题:xxx | 浏览量预估:xxx
② [商业] 标题:xxx
...
📰 虎嗅 | 2篇新文章
① [汽车] 标题:xxx
...
```
### 抓取逻辑
1. 解析 RSS XML,获取 title/link/description/pubDate
2. 过滤:去除广告/软文/太旧的文章(>7天)
3. 按时间倒序排列
4. 返回结构化 JSON 供 AI 分析
## AI 内容分析
收到文章列表后,AI 自动执行:
```
对每篇文章:
1. 读取 description / 摘要
2. 判断:这篇文章的核心价值是什么?
3. 提取:
- 核心观点(1-2句)
- 关键数据(数字/排名/增长)
- 金句(适合引用的原话)
- 相关话题标签(3-5个)
4. 评估:适合哪些平台?
- 知乎:深度分析/观点争议类
- 小红书:实操教程/工具推荐/个人经验类
- 公众号:行业趋势/商业观察类
```
## 平台内容生成
### 知乎风格
- 字数:800-1500字
- 结构:背景→问题→分析→结论
- 开头:痛点/争议性观点
- 结尾:行动号召/评论区互动
- 关键词:#AI #副业 #效率工具
### 小红书风格
- 字数:300-600字
- 结构:钩子→干货→互动
- emoji 序号 + 话题标签(#AI副业 #效率神器)
- 每段不超过3行
- 结尾:评论区互动问题
### 公众号风格
- 字数:1000-2000字
- 结构:开篇引子→案例拆解→方法论→资源推荐
- 标题公式:数字+情绪+悬念
- 文末:往期回顾/互动话题
## 输出方式
### 保存为草稿(默认)
```
feishu_doc action=create title="内容草稿 | {日期}" → 获取 doc_token
feishu_doc action=write doc_token={token} content={内容}
```
### 预览后发布
生成内容 → 用户确认 → 调用 social-media-poster 发布
### 自动发布(需用户授权)
```
用户说"自动发布"时:
1. 将内容保存为本地Markdown文件
2. 调用 social-media-poster 发布
3. 返回发布结果
```
## 配置项
首次使用时向用户确认:
| 配置 | 说明 | 默认值 |
|------|------|--------|
| 订阅源 | 监控哪些 RSS | 36氪+虎嗅+少数派 |
| 抓取数量 | 每次取几条 | 每源3条 |
| 生成平台 | 生成哪个平台内容 | 用户指定 |
| 输出方式 | 草稿/直接发布 | 草稿 |
| 风格偏好 | 正式/轻松/专业 | 轻松 |
## 文件结构
```
rss-content-flow/
├── SKILL.md # 本文件
├── README.md # 用户文档
├── scripts/
│ ├── manage_feeds.py # 订阅管理(增删查)
│ ├── fetch_feed.py # RSS 抓取脚本
│ └── config.json # 订阅配置(运行时生成)
├── references/
│ └── platform_style.md # 各平台风格指南
└── assets/ # 封面图模板等
```
## 与其他 Skill 的配合
- **social-media-poster**:生成内容后直接发布到知乎/小红书
- **feishu-daily-report**:每日内容草稿自动归入日报数据源
- **content-repurposer**:同一篇文章改写到不同平台
## 注意事项
- RSS 源需确保可访问,过期/失效的源自动跳过
- 生成内容时必须注明原文来源(保留链接)
- 版权合规:RSS 文章只能分析观点+改写,不得直接复制原文
- 抓取间隔建议不低于 30 分钟,避免对源站造成压力
FILE:README.md
# RSS 内容流 (rss-content-flow)
将 RSS 订阅转化为多平台发布内容。自动抓取最新资讯,AI 分析核心观点,一键生成适配知乎/小红书/公众号的原创文案。
## 功能
- 📡 **RSS 订阅管理** — 添加/删除/列出订阅源
- 🔍 **智能抓取** — 自动获取最新文章,过滤广告和过期内容
- 🧠 **AI 分析** — 提取核心观点、关键数据、金句标签
- ✍️ **平台改写** — 生成知乎/小红书/公众号风格的原创内容
- 💾 **草稿保存** — 写入飞书文档或直接调用发布工具
## 使用方法
### 触发词
"帮我找今天的选题"、"RSS 订阅"、"监控 XX 更新"、"把 XX 改成小红书"、"我需要内容灵感"
### 快速开始
**1. 初始化订阅源**
```bash
python3 scripts/manage_feeds.py --init
```
**2. 查看今日内容选题**
```bash
python3 scripts/fetch_feed.py --all --limit 5
```
**3. 生成内容**
AI 根据选题列表,为选中的文章生成多平台文案。
### 预置订阅源
| 名称 | URL | 方向 |
|------|-----|------|
| 36氪 | https://36kr.com/feed | 科技/商业 |
| 少数派 | https://sspai.com/feed | 效率/工具 |
### 添加自定义订阅
```bash
python3 scripts/manage_feeds.py --add <名称> <RSS_URL>
python3 scripts/fetch_feed.py --source <名称> --limit 3
```
## 工作流程
```
RSS 订阅源
↓
fetch_feed.py 抓取最新文章
↓
AI 分析:提取观点+评估平台适配度
↓
生成内容草稿(知乎/小红书/公众号)
↓
保存到飞书 或 直接发布
```
## 系统要求
- Python 3.8+
- OpenClaw + feishu 工具(用于保存草稿)
- 可访问的 RSS 源(部分源需要代理)
## 文件结构
```
rss-content-flow/
├── SKILL.md # AI 技能定义
├── scripts/
│ ├── manage_feeds.py # 订阅管理
│ └── fetch_feed.py # RSS 抓取
└── references/
└── platform_style.md # 平台风格指南
```
## 与其他 Skill 配合
- **social-media-poster**:生成内容后直接发布
- **feishu-daily-report**:内容草稿归入日报数据源
- **content-repurposer**:同一篇文章改写到不同平台
## License
MIT
FILE:scripts/config.json
{
"36氪": "https://36kr.com/feed",
"少数派": "https://sspai.com/feed",
"机器之心": "https://ai.jiqizhixin.com/rss",
"量子位": "https://www.ijiandao.com/feed"
}
FILE:scripts/fetch_feed.py
#!/usr/bin/env python3
"""
RSS 抓取脚本:从多个订阅源获取最新文章
用法:
python3 fetch_feed.py --all --limit 5
python3 fetch_feed.py --source 36氪 --limit 5
python3 fetch_feed.py --source 36氪 --url "https://36kr.com/feed"
"""
import json
import os
import sys
import ssl
import socket
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
import argparse
import re
# 双层 SSL 上下文
# 第1层:标准验证(优先)
_SAFE_SSL_CTX = ssl.create_default_context()
# 第2层:降级验证(标准证书失败时使用)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(SCRIPT_DIR, "config.json")
def load_feeds():
"""加载订阅配置"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def fetch_rss(url, source_name="未知", limit=5, max_age_days=7):
"""
获取单个 RSS 源的最新文章
返回: list of dict {title, link, description, pub_date, source}
"""
articles = []
xml_content = None
# 优先使用标准 SSL 证书验证,失败自动降级
try:
with urlopen(url, timeout=15, context=_SAFE_SSL_CTX) as response:
xml_content = response.read().decode("utf-8", errors="replace")
except URLError as e:
# SSL错误被包装在URLError.reason里
reason = getattr(e, 'reason', None)
if isinstance(reason, ssl.SSLError):
try:
with urlopen(url, timeout=15, context=ssl.create_default_context()) as response:
xml_content = response.read().decode("utf-8", errors="replace")
except Exception as e2:
print(f" ⚠️ 连接失败: {e2}", file=sys.stderr)
return []
else:
print(f" ⚠️ 连接失败: {e}", file=sys.stderr)
return []
except Exception as e:
print(f" ⚠️ 连接失败: {e}", file=sys.stderr)
return []
if xml_content is None:
return []
try:
root = ET.fromstring(xml_content)
except ET.ParseError:
print(f" ⚠️ XML 解析错误", file=sys.stderr)
return []
# 判断 RSS 格式 (RSS 2.0 vs Atom)
ns = {"dc": "http://purl.org/dc/elements/1.1/"}
if root.tag == "rss":
items = root.findall(".//item")
date_format = "%a, %d %b %Y %H:%M:%S %z"
elif root.tag.endswith("}feed") or root.tag == "feed":
# Atom 格式
items = root.findall(".//entry")
date_format = "%Y-%m-%dT%H:%M:%S"
else:
items = root.findall(".//item") or root.findall(".//entry")
date_format = "%a, %d %b %Y %H:%M:%S %z"
cutoff_date = datetime.now() - timedelta(days=max_age_days)
for item in items[:limit * 3]: # 先多取一些,过滤后再截取
title = ""
link = ""
description = ""
pub_date = ""
# 提取标题(兼容 RSS 和 Atom)
title = ""
for t in list(item):
if t.tag in ("title", "{http://purl.org/dc/elements/1.1/}title"):
if t.text:
title = t.text.strip()
break
# 提取链接
link = ""
for l in list(item):
if l.tag == "link":
if l.text and l.text.strip() and l.text.startswith("http"):
link = l.text.strip()
elif l.attrib.get("href"):
link = l.attrib["href"]
break
# Atom 格式的 link
if not link:
atom_links = item.findall("link")
for al in atom_links:
href = al.attrib.get("href") or (al.text if al.text else "")
if href and "http" in str(href):
link = href
break
# 提取描述
description = ""
for d in list(item):
if d.tag in ("description", "summary", "content"):
if d.text:
desc_text = d.text.strip()
# 去除 HTML 标签
desc_text = re.sub(r"<[^>]+>", "", desc_text)
desc_text = re.sub(r"&[^;]+;", "", desc_text)
description = desc_text[:300]
break
# 提取日期
pub_date = ""
for date_elem in list(item):
if date_elem.tag in ("pubDate", "published", "updated"):
if date_elem.text:
pub_date = date_elem.text.strip()
break
# 过滤:去除广告/太旧的
ad_keywords = ["广告", "推广", "sponsored", "sponsor", "promoted"]
is_ad = any(kw in title.lower() for kw in ad_keywords)
# 日期过滤
try:
if pub_date:
if "(" in pub_date:
pub_date = pub_date.split("(")[0].strip()
article_date = datetime.strptime(pub_date[:25].strip(), date_format)
if article_date < cutoff_date:
continue
except (ValueError, TypeError):
pass # 日期解析失败时不过滤
if not is_ad and title:
articles.append({
"title": title,
"link": link,
"description": description,
"pub_date": pub_date[:16] if pub_date else "",
"source": source_name
})
if len(articles) >= limit:
break
return articles
def fetch_all_feeds(limit=3, max_age_days=7):
"""获取所有订阅源的最新文章"""
feeds = load_feeds()
if not feeds:
print("❌ 暂无订阅源,请先运行 --init 或 --add 添加订阅", file=sys.stderr)
return []
all_articles = []
results = {}
for name, url in feeds.items():
print(f"📡 抓取 {name}...", file=sys.stderr)
articles = fetch_rss(url, name, limit, max_age_days)
results[name] = articles
all_articles.extend(articles)
if articles:
print(f" ✅ 获取 {len(articles)} 篇", file=sys.stderr)
else:
print(f" ⚠️ 无新文章或抓取失败", file=sys.stderr)
# 按日期排序
return all_articles
def print_summary(all_articles):
"""打印内容摘要"""
if not all_articles:
print("\n📭 所有订阅源均无新文章(7天内)")
return
print(f"\n{'='*50}")
print(f"📬 今日内容速览(共 {len(all_articles)} 篇)")
print(f"{'='*50}\n")
# 按来源分组
from collections import defaultdict
by_source = defaultdict(list)
for a in all_articles:
by_source[a["source"]].append(a)
for source, articles in by_source.items():
print(f"📰 {source} | {len(articles)} 篇新文章")
for i, a in enumerate(articles, 1):
print(f" {i}. {a['title']}")
if a["description"]:
desc = a["description"][:80]
print(f" {desc}...")
if a["pub_date"]:
print(f" 📅 {a['pub_date']}")
print()
print("-" * 50)
print("💡 使用「生成知乎内容」或「改写成小红书」来制作发布内容")
def main():
parser = argparse.ArgumentParser(description="RSS 内容抓取")
parser.add_argument("--all", action="store_true", help="抓取所有订阅源")
parser.add_argument("--source", metavar="名称", help="指定订阅源名称")
parser.add_argument("--url", metavar="URL", help="直接指定 RSS URL(不依赖配置)")
parser.add_argument("--limit", type=int, default=3, help="每个源抓取文章数量(默认3)")
parser.add_argument("--days", type=int, default=7, help="文章有效期天数(默认7)")
parser.add_argument("--json", action="store_true", help="输出 JSON 格式")
parser.add_argument("--init", action="store_true", help="初始化默认订阅源")
args = parser.parse_args()
if args.init:
from manage_feeds import init_default
init_default()
return
# 加载订阅
feeds = load_feeds()
if args.url:
# 直接指定 URL
print(f"📡 抓取 {args.url}...", file=sys.stderr)
articles = fetch_rss(args.url, args.source or "自定义", args.limit, args.days)
elif args.source:
# 指定源名
if args.source not in feeds:
print(f"❌ 未找到订阅源「{args.source}」", file=sys.stderr)
print(f" 现有订阅:{', '.join(feeds.keys())}", file=sys.stderr)
sys.exit(1)
articles = fetch_rss(feeds[args.source], args.source, args.limit, args.days)
elif args.all:
articles = fetch_all_feeds(args.limit, args.days)
else:
print("❌ 请指定 --all(所有源)或 --source <名称> 或 --url <URL>", file=sys.stderr)
sys.exit(1)
if args.json:
print(json.dumps(articles, ensure_ascii=False, indent=2))
else:
print_summary(articles)
if __name__ == "__main__":
main()
FILE:scripts/manage_feeds.py
#!/usr/bin/env python3
"""
RSS 订阅管理:添加/删除/列出订阅源
用法:
python3 manage_feeds.py --list
python3 manage_feeds.py --add 36氪 https://36kr.com/feed
python3 manage_feeds.py --remove 36氪
"""
import json
import os
import sys
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(SCRIPT_DIR, "config.json")
DEFAULT_FEEDS = {
"36氪": "https://36kr.com/feed",
"虎嗅": "https://www.huxiu.com/rss/0.xml",
"少数派": "https://sspai.com/feed",
"AI研习社": "https://ai.googleblog.com/feed/",
}
def load_feeds():
"""加载订阅配置"""
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save_feeds(feeds):
"""保存订阅配置"""
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(feeds, f, ensure_ascii=False, indent=2, sort_keys=True)
print(f"✅ 已保存 {len(feeds)} 个订阅源")
def list_feeds():
"""列出所有订阅"""
feeds = load_feeds()
if not feeds:
print("📭 暂无订阅源(使用 --add 添加)")
print("\n默认推荐订阅:")
for name, url in DEFAULT_FEEDS.items():
print(f" {name}: {url}")
return
print(f"📡 当前订阅(共 {len(feeds)} 个):\n")
for name, url in feeds.items():
print(f" • {name}")
print(f" {url}")
print()
def add_feed(name, url):
"""添加订阅源"""
if not name or not url:
print("❌ 请提供名称和 URL:--add <名称> <URL>")
sys.exit(1)
feeds = load_feeds()
if name in feeds:
print(f"⚠️ 已存在同名订阅源「{name}」,将覆盖原 URL")
feeds[name] = url
save_feeds(feeds)
print(f"✅ 已添加/更新:{name}")
print(f" URL: {url}")
def remove_feed(name):
"""删除订阅源"""
feeds = load_feeds()
if name not in feeds:
print(f"❌ 未找到订阅源「{name}」")
available = list(feeds.keys())
if available:
print(f" 现有订阅:{', '.join(available)}")
sys.exit(1)
removed_url = feeds.pop(name)
save_feeds(feeds)
print(f"🗑️ 已删除:{name}")
print(f" 原URL: {removed_url}")
def init_default():
"""初始化默认订阅"""
feeds = load_feeds()
if not feeds:
feeds = DEFAULT_FEEDS.copy()
save_feeds(feeds)
print("✅ 已初始化默认订阅源(36氪、虎嗅、少数派、AI研习社)")
else:
print(f"📡 已有 {len(feeds)} 个订阅源,跳过默认初始化")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="RSS 订阅管理")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--list", action="store_true", help="列出所有订阅")
group.add_argument("--add", nargs=2, metavar=("名称", "URL"), help="添加订阅源")
group.add_argument("--remove", metavar="名称", help="删除订阅源")
group.add_argument("--init", action="store_true", help="初始化默认订阅源")
args = parser.parse_args()
if args.list:
list_feeds()
elif args.add:
add_feed(args.add[0], args.add[1])
elif args.remove:
remove_feed(args.remove)
elif args.init:
init_default()
飞书日报/周报自动生成技能。从飞书文档、聊天记录、Bitable等数据源自动汇总生成结构化日报/周报,并发送到指定飞书文档或群聊。 触发场景:用户说"生成日报"、"写周报"、"帮我汇总今天的工作"、"整理飞书内容发日报"、"每天自动发日报"、"定时生成报告"。 Keywords: 日报, 周报, 日报生成, 自动...
---
name: feishu-daily-report
description: |
飞书日报/周报自动生成技能。从飞书文档、聊天记录、Bitable等数据源自动汇总生成结构化日报/周报,并发送到指定飞书文档或群聊。
触发场景:用户说"生成日报"、"写周报"、"帮我汇总今天的工作"、"整理飞书内容发日报"、"每天自动发日报"、"定时生成报告"。
Keywords: 日报, 周报, 日报生成, 自动日报, 飞书日报, 工作总结, daily report, 自动汇总, 定时报告。
---
# 飞书日报自动生成
从飞书数据源自动汇总生成结构化日报/周报。
## 快速开始
用户说"生成日报"时,执行以下流程:
1. 确定报告类型(日报/周报/自定义)
2. 收集数据源(飞书文档/Bitable/聊天记录)
3. 生成报告内容
4. 写入飞书文档或发送到群聊
## 报告类型
| 类型 | 触发词 | 时间范围 | 结构 |
|------|--------|----------|------|
| 日报 | "日报"、"今天总结" | 当天 | 完成/进行中/明日计划/风险 |
| 周报 | "周报"、"本周总结" | 本周一至今天 | 本周完成/下周计划/数据指标/风险 |
| 自定义 | "XX报告"、"汇总XX" | 用户指定 | 按需定制 |
## 数据收集
### 飞书文档
```
1. 使用 feishu_doc action=read 读取指定文档
2. 提取关键信息和数据
3. 按类别归入报告结构
```
### Bitable多维表格
```
1. 使用 feishu_bitable_list_records 读取记录
2. 筛选时间范围内的数据
3. 汇总统计指标
```
### 聊天记录
```
1. 使用 feishu_chat 获取最近消息
2. 提取关键事件和决策
3. 整理为报告条目
```
## 报告模板
### 日报模板
```markdown
# 📋 日报 | {date}
## ✅ 今日完成
- [条目1]
- [条目2]
## 🔄 进行中
- [条目]
## 📅 明日计划
- [条目]
## ⚠️ 风险/阻塞
- [条目](如有)
```
### 周报模板
```markdown
# 📊 周报 | {date_range}
## 本周完成
### [项目/模块1]
- [条目]
## 数据指标
| 指标 | 数值 | 环比 |
|------|------|------|
| [指标] | [值] | [变化] |
## 下周计划
- [条目]
## 风险与建议
- [条目]
```
## 输出方式
### 写入飞书文档(默认)
```
feishu_doc action=create title="日报 | {date}" → 获取doc_token
feishu_doc action=write doc_token={token} content={报告内容}
(可选)feishu_wiki action=create 将文档挂到知识库
```
### 发送到群聊
```
message action=send target=chat:{chat_id} message={报告内容}
```
### 同时写入+发送
先创建文档,再将文档链接发到群聊。
## 定时任务
用户要求"每天自动生成日报"时,使用 cron 定时任务:
1. 读取 qclaw-openclaw SKILL.md 了解 cron 创建方式
2. 创建 cron 任务,设置触发时间(默认每天18:00)
3. payload.message 格式:「执行飞书日报生成技能,收集今日数据并生成日报,写入飞书文档 {doc_token}」
4. sessionTarget: "isolated", payload.kind: "agentTurn"
## 配置项
首次使用时,向用户确认:
| 配置 | 说明 | 示例 |
|------|------|------|
| 数据源 | 从哪里获取数据 | 飞书文档ID/Bitable URL/聊天ID |
| 报告类型 | 日报/周报/自定义 | 日报 |
| 输出目标 | 发到哪里 | 飞书文档/群聊 |
| 发送时间 | 定时任务触发时间 | 18:00 |
| 报告模板 | 使用哪个模板 | 默认/自定义 |
配置确认后记录到 memory/YYYY-MM-DD.md,后续自动使用。
## 注意事项
- 读取数据时注意权限,无权限的文档需用户授权
- 报告内容基于实际数据生成,不编造信息
- 无法获取的数据源,明确告知用户并跳过
- 定时任务创建需遵循 qclaw-openclaw skill 的强制规则
FILE:README.md
# feishu-daily-report
飞书日报/周报自动生成 Skill。从飞书文档、Bitable、聊天记录等数据源自动汇总,生成结构化日报/周报,支持定时发送。
## 功能
- 📋 **日报生成** — 自动汇总今日完成事项、进行中任务、明日计划
- 📊 **周报生成** — 按周汇总,支持数据指标表格
- 🔄 **多数据源** — 飞书文档、Bitable、聊天记录
- ⏰ **定时任务** — 每天自动生成并发送
- 📤 **多输出方式** — 飞书文档/群聊/同时
## 使用方法
### 触发词
"生成日报"、"写周报"、"帮我汇总今天的工作"、"每天自动发日报"、"定时生成报告"
### 配置(首次使用)
向 AI 确认以下配置:
- 数据源:飞书文档 ID 或 Bitable URL
- 报告类型:日报 / 周报 / 自定义
- 输出目标:飞书文档 / 群聊
- 发送时间:定时任务的触发时间(默认每天 18:00)
### 示例输出
```markdown
# 📋 日报 | 2026年04月10日
## ✅ 今日完成
- 完成 OpenClaw 部署
- 发布知乎文章 2 篇
## 🔄 进行中
- 知识库内容更新
## 📅 明日计划
- 继续内容发布
- 跟进客户咨询
```
## 系统要求
- OpenClaw 已安装
- 飞书企业账号(具有文档读取权限)
- Python 3.8+
## 技术实现
- 数据收集:`scripts/collect_data.py`
- 内容生成:`scripts/generate_report.py`
- 定时任务:通过 OpenClaw cron 创建
- API 调用:使用 OpenClaw 内置 feishu 工具
## License
MIT
FILE:scripts/collect_data.py
#!/usr/bin/env python3
"""
飞书数据收集器 - 从飞书文档/Bitable/聊天收集报告数据
此脚本生成一个JSON数据文件,供 generate_report.py 使用
注意:实际的飞书API调用由OpenClaw的feishu工具完成,
此脚本负责解析和结构化收集到的数据。
"""
import json
import argparse
from datetime import datetime
def _classify_line(line):
"""对单行文本进行分类,返回类别名或 None。
优先级(高到低):风险 > 完成 > 进行中 > 计划
一行只归入一个类别,避免重复匹配。
"""
if not line.strip():
return None
# 用行首 emoji 前缀做最高优先级判断
prefix_map = {
"✅": "completed",
"✔️": "completed",
"🔄": "in_progress",
"⏳": "in_progress",
"📅": "planned",
"📌": "planned",
"⚠️": "risks",
"🚨": "risks",
"❌": "risks",
}
for emoji, category in prefix_map.items():
if line.lstrip().startswith(emoji):
return category
# 无 emoji 前缀时,按优先级关键词匹配(互斥)
# 优先级:风险 > 完成 > 进行中 > 计划
risk_markers = ["风险", "阻塞", "延期", "问题", "bug", "失败", "异常"]
complete_markers = ["完成", "已完成", "done", "上线", "发布", "搞定", "解决"]
progress_markers = ["进行中", "开发中", "wip", "待完成", "待审核"]
plan_markers = ["计划", "预计", "明天", "下周", "待办", "TODO"]
line_lower = line.lower()
for marker in risk_markers:
if marker in line_lower:
return "risks"
for marker in complete_markers:
if marker in line_lower:
return "completed"
for marker in progress_markers:
if marker in line_lower:
return "in_progress"
for marker in plan_markers:
if marker in line_lower:
return "planned"
return None
def parse_doc_content(raw_content, keywords=None):
"""从飞书文档内容中提取关键信息"""
items = {
"completed": [],
"in_progress": [],
"planned": [],
"risks": [],
"metrics": []
}
if not raw_content:
return items
lines = raw_content.split("\n") if isinstance(raw_content, str) else []
for line in lines:
line = line.strip()
if not line:
continue
category = _classify_line(line)
if category and category in items:
items[category].append(line)
return items
def merge_data_sources(sources):
"""合并多个数据源的提取结果"""
merged = {
"completed": [],
"in_progress": [],
"planned": [],
"risks": [],
"metrics": []
}
for source in sources:
for key in merged:
if key in source:
merged[key].extend(source[key])
# 去重
for key in merged:
merged[key] = list(dict.fromkeys(merged[key]))
return merged
def main():
parser = argparse.ArgumentParser(description="飞书数据收集器")
parser.add_argument("--input", help="输入JSON文件(飞书工具的原始输出)")
parser.add_argument("--output", help="输出JSON文件路径")
parser.add_argument("--source-type", choices=["doc", "bitable", "chat"], default="doc", help="数据源类型")
args = parser.parse_args()
if args.input:
with open(args.input, "r", encoding="utf-8") as f:
raw_data = json.load(f)
else:
raw_data = {}
# 解析数据
if args.source_type == "doc":
items = parse_doc_content(raw_data.get("content", ""))
else:
items = parse_doc_content(str(raw_data))
# 输出
result = {
"generated_at": datetime.now().isoformat(),
"source_type": args.source_type,
"data": items
}
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"数据已写入: {args.output}")
else:
print(json.dumps(result, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
FILE:scripts/generate_report.py
#!/usr/bin/env python3
"""
飞书日报生成器 - 从多个数据源汇总生成结构化日报
用法: python3 generate_report.py --type daily --date 2026-04-10 --output doc
"""
import argparse
import json
import os
from datetime import datetime, timedelta
def get_date_range(report_type, date_str=None):
"""根据报告类型计算日期范围"""
if date_str:
target_date = datetime.strptime(date_str, "%Y-%m-%d")
else:
target_date = datetime.now()
if report_type == "daily":
return {
"start": target_date.strftime("%Y-%m-%d"),
"end": target_date.strftime("%Y-%m-%d"),
"display": target_date.strftime("%Y年%m月%d日")
}
elif report_type == "weekly":
# 本周一到今天
monday = target_date - timedelta(days=target_date.weekday())
return {
"start": monday.strftime("%Y-%m-%d"),
"end": target_date.strftime("%Y-%m-%d"),
"display": f"{monday.strftime('%m月%d日')} - {target_date.strftime('%m月%d日')}"
}
else:
return {
"start": target_date.strftime("%Y-%m-%d"),
"end": target_date.strftime("%Y-%m-%d"),
"display": target_date.strftime("%Y年%m月%d日")
}
def generate_daily_report(data, date_info):
"""生成日报内容"""
completed = data.get("completed", [])
in_progress = data.get("in_progress", [])
planned = data.get("planned", [])
risks = data.get("risks", [])
lines = [
f"# 📋 日报 | {date_info['display']}",
"",
"## ✅ 今日完成",
]
for item in completed:
lines.append(f"- {item}")
if not completed:
lines.append("- (暂无记录)")
lines.extend([
"",
"## 🔄 进行中",
])
for item in in_progress:
lines.append(f"- {item}")
if not in_progress:
lines.append("- (暂无)")
lines.extend([
"",
"## 📅 明日计划",
])
for item in planned:
lines.append(f"- {item}")
if not planned:
lines.append("- (待规划)")
if risks:
lines.extend([
"",
"## ⚠️ 风险/阻塞",
])
for item in risks:
lines.append(f"- {item}")
return "\n".join(lines)
def generate_weekly_report(data, date_info):
"""生成周报内容"""
completed = data.get("completed", [])
metrics = data.get("metrics", [])
planned = data.get("planned", [])
risks = data.get("risks", [])
lines = [
f"# 📊 周报 | {date_info['display']}",
"",
"## 本周完成",
]
for item in completed:
lines.append(f"- {item}")
if not completed:
lines.append("- (暂无记录)")
if metrics:
lines.extend([
"",
"## 数据指标",
"",
"| 指标 | 数值 | 环比 |",
"|------|------|------|",
])
for m in metrics:
lines.append(f"| {m.get('name', '')} | {m.get('value', '')} | {m.get('change', '')} |")
lines.extend([
"",
"## 下周计划",
])
for item in planned:
lines.append(f"- {item}")
if not planned:
lines.append("- (待规划)")
if risks:
lines.extend([
"",
"## 风险与建议",
])
for item in risks:
lines.append(f"- {item}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="飞书日报生成器")
parser.add_argument("--type", choices=["daily", "weekly", "custom"], default="daily", help="报告类型")
parser.add_argument("--date", help="目标日期 YYYY-MM-DD")
parser.add_argument("--data", help="数据JSON文件路径")
parser.add_argument("--output", choices=["doc", "chat", "both"], default="doc", help="输出方式")
args = parser.parse_args()
# 日期范围
date_info = get_date_range(args.type, args.date)
# 加载数据
if args.data and os.path.exists(args.data):
with open(args.data, "r", encoding="utf-8") as f:
data = json.load(f)
else:
# 空模板数据
data = {
"completed": [],
"in_progress": [],
"planned": [],
"risks": [],
"metrics": []
}
# 生成报告
if args.type == "daily":
report = generate_daily_report(data, date_info)
elif args.type == "weekly":
report = generate_weekly_report(data, date_info)
else:
report = generate_daily_report(data, date_info)
# 输出
print(report)
print(f"\n---\n报告类型: {args.type} | 日期: {date_info['display']} | 输出: {args.output}")
if __name__ == "__main__":
main()