@clawhub-sougannkyou-b10c5be8c3
轮询 Elasticsearch 新数据,自动推送到飞书。支持自定义 ES 查询、增量轮询、去重游标。
---
name: "es-poll-feishu"
description: "轮询 Elasticsearch 新数据,自动推送到飞书。支持自定义 ES 查询、增量轮询、去重游标。"
---
# ES-Poll-Feishu
轮询 Elasticsearch → 增量检测新数据 → 飞书推送。
## 功能
- 🔍 定时轮询 ES 索引,检测新数据
- 📌 search_after 游标机制,基于排序值精确翻页,不丢不重
- 📱 飞书消息自动推送
- 🔧 支持自定义 ES 查询(term / bool / match 等任意查询)
- 📊 统计持久化(轮询次数、命中数、推送数)
- 🛑 优雅关闭,游标持久化
- 🔄 自动分页,单次轮询可翻多页拉取所有新数据
## 安装
```bash
cd skills/es-poll-feishu
npm install
chmod +x scripts/es-poll-feishu
```
## 配置(必须)
首次使用前,创建配置文件 `~/.openclaw/es-poll-feishu.json`:
```json
{
"es_url": "http://your-es-host:9200",
"es_index": "your_index_pattern*",
"es_auth": "your_base64_auth_string",
"es_params": {
"app_customer_name": "your_customer",
"app_user_name": "your_user"
},
"es_query": {
"bool": {
"must": [
{ "term": { "your_field": "your_value" } },
{ "range": { "analysis.sentiment": { "lt": 0 } } }
]
}
},
"es_time_field": "ctime",
"es_sort_field": "ctime",
"es_size": 50,
"poll_interval": 60,
"feishu_app_id": "your_feishu_app_id",
"feishu_app_secret": "your_feishu_app_secret",
"feishu_user_id": "your_feishu_open_id",
"title_field": "title",
"content_field": "content",
"url_field": "url"
}
```
或运行 `es-poll-feishu config` 创建模板。
### 配置项说明
| 配置项 | 必填 | 默认值 | 说明 |
|--------|------|--------|------|
| `es_url` | ✅ | — | ES 服务地址 |
| `es_index` | ✅ | — | ES 索引名(支持通配符和时间模板,见下方说明) |
| `es_auth` | ✅ | — | Basic Auth 的 base64 字符串 |
| `es_params` | ❌ | `{}` | URL 附加查询参数 |
| `es_query` | ❌ | `null` | 自定义 ES 查询体(query 对象),为 null 则仅按时间增量 |
| `es_time_field` | ❌ | `ctime` | 时间字段名,用于增量游标 |
| `es_sort_field` | ❌ | `ctime` | 排序字段 |
| `es_size` | ❌ | `50` | 每次轮询每页拉取条数 |
| `es_tiebreaker_field` | ❌ | `_doc` | search_after tiebreaker 字段,保证同秒数据不丢失 |
| `es_max_pages` | ❌ | `20` | 单次轮询最大翻页数(防止无限循环),0=不限 |
| `poll_interval` | ❌ | `60` | 轮询间隔(秒) |
| `feishu_app_id` | ✅ | — | 飞书应用 App ID |
| `feishu_app_secret` | ✅ | — | 飞书应用 App Secret |
| `feishu_user_id` | ✅ | — | 接收消息的飞书用户 open_id |
| `title_field` | ❌ | `title` | ES 文档中标题字段路径(支持嵌套如 `retweeted.title`) |
| `content_field` | ❌ | `content` | ES 文档中内容字段路径 |
| `url_field` | ❌ | `url` | ES 文档中链接字段路径 |
### 索引名时间模板
`es_index` 支持 `{yyyyMM}` 占位符,运行时自动替换为当前年月,无需每月手动修改配置。
| 占位符 | 含义 | 示例值 |
|--------|------|--------|
| `{yyyyMM}` | 当前年月(6位) | `202603` |
| `{yyyy}` | 当前年份(4位) | `2026` |
| `{MM}` | 当前月份(2位,补零) | `03` |
例如配置 `"es_index": "xgks_{yyyyMM}*"`,在 2026 年 3 月运行时自动解析为 `xgks_202603*`,4 月自动变为 `xgks_202604*`。
## CLI 工具
仿照 istarshine-search-skill 的 CLI 模式,提供独立的命令行工具,可直接调用或通过 stdin 传入 JSON 参数。
```bash
node scripts/cli.js <tool_name> '<json_args>'
echo '<json>' | node scripts/cli.js <tool_name>
```
### 可用工具
| 工具 | 用途 | 参数 |
|------|------|------|
| `search` | 直接查询 ES,返回原始结果 | `query`, `index`, `size`, `sort`, `_source` |
| `poll_once` | 执行一次增量轮询 + 飞书推送 | `dry_run`(true 时仅查询不推送) |
| `test_es` | 测试 ES 连接 | `index` |
| `test_feishu` | 发送测试消息到飞书 | `text` |
| `status` | 查看轮询服务状态和统计 | 无 |
| `cursor` | 查看当前游标 | 无 |
| `reset_cursor` | 重置游标 | 无 |
| `set_cursor` | 手动设置游标 | `lastTimestamp`, `searchAfter` |
### CLI 示例
```bash
# 测试 ES 连接
node scripts/cli.js test_es '{}'
# 自定义查询 ES
node scripts/cli.js search '{"query":{"match_all":{}},"size":5}'
# 执行一次增量轮询(dry_run 仅查询不推送)
node scripts/cli.js poll_once '{"dry_run":true}'
# 执行一次增量轮询并推送飞书
node scripts/cli.js poll_once '{}'
# 测试飞书推送
node scripts/cli.js test_feishu '{"text":"Hello from ES-Poll-Feishu"}'
# 查看服务状态
node scripts/cli.js status '{}'
# 查看/重置/设置游标
node scripts/cli.js cursor '{}'
node scripts/cli.js reset_cursor '{}'
node scripts/cli.js set_cursor '{"lastTimestamp":1711234567}'
```
## 服务管理
```bash
# 启动服务
es-poll-feishu start
# 查看状态
es-poll-feishu status
# 查看日志
es-poll-feishu logs
# 停止服务
es-poll-feishu stop
# 重启
es-poll-feishu restart
# 查看/创建配置
es-poll-feishu config
# 重置游标(下次从头拉取)
es-poll-feishu reset-cursor
```
## 工作原理
```
┌─────────────┐ search_after ┌──────────────┐ 推送 ┌──────────┐
│ ES 索引 │ ◄──────────── │ poller.js │ ─────────► │ 飞书 │
│ xgks_* │ 分页增量查询 │ 游标管理 │ 逐条推送 │ 机器人 │
└─────────────┘ └──────────────┘ └──────────┘
```
1. 每隔 `poll_interval` 秒查询 ES,使用 `search_after` + tiebreaker 精确翻页
2. **首次运行无游标时,拉取最新一条数据初始化游标位置,不推送历史数据** — 即从"此刻"开始监听新数据
3. 后续轮询使用 `search_after` 值定位上次处理到的位置,自动翻页拉取所有新数据
4. 每条新数据推送到飞书,推送成功才推进游标
5. 游标持久化到 `cursor.json`(包含 `searchAfter` 排序值),重启后精确续传
6. 兼容 v1.0 旧游标格式,升级后自动迁移
> ⚠️ **注意**:首次启动只会推送启动后产生的新数据。如果服务曾经运行过但中途故障停机,重启后会自动补推积压的数据(游标记录了上次处理到的位置)。如需从头拉取历史数据,请先执行 `es-poll-feishu reset-cursor`。
## 自定义查询示例
配置中的 `es_query` 支持任意 ES 查询语法:
```json
// term 精确匹配
"es_query": {
"term": { "your_field": "your_value" }
}
// bool 组合查询
"es_query": {
"bool": {
"must": [
{ "term": { "gather.site_domain": "iesdouyin.com" } },
{ "match": { "content": "新能源" } }
]
}
}
// match 全文搜索
"es_query": {
"match": { "title": "人工智能" }
}
```
## 数据目录
```
~/clawd/data/es-poll-feishu/
├── poller.pid # PID 文件
├── poller.log # 运行日志
├── stats.json # 统计数据
└── cursor.json # 轮询游标(searchAfter: [timestamp, tiebreaker])
```
## License
MIT
FILE:metadata.json
{
"name": "es-poll-feishu",
"description": "轮询 Elasticsearch 新数据,自动推送到飞书。支持 search_after 分页、自定义查询、去重、雪崩保护。",
"author": "sougannkyou",
"license": "MIT",
"requires": ["axios"],
"node": ">=16"
}
FILE:package-lock.json
{
"name": "es-poll-feishu",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "es-poll-feishu",
"version": "1.0.0",
"dependencies": {
"axios": "^1.6.0"
}
},
"node_modules/asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmmirror.com/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==",
"license": "MIT"
},
"node_modules/axios": {
"version": "1.13.6",
"resolved": "https://registry.npmmirror.com/axios/-/axios-1.13.6.tgz",
"integrity": "sha512-ChTCHMouEe2kn713WHbQGcuYrr6fXTBiu460OTwWrWob16g1bXn4vtz07Ope7ewMozJAnEquLk5lWQWtBig9DQ==",
"license": "MIT",
"dependencies": {
"follow-redirects": "^1.15.11",
"form-data": "^4.0.5",
"proxy-from-env": "^1.1.0"
}
},
"node_modules/call-bind-apply-helpers": {
"version": "1.0.2",
"resolved": "https://registry.npmmirror.com/call-bind-apply-helpers/-/call-bind-apply-helpers-1.0.2.tgz",
"integrity": "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0",
"function-bind": "^1.1.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/combined-stream": {
"version": "1.0.8",
"resolved": "https://registry.npmmirror.com/combined-stream/-/combined-stream-1.0.8.tgz",
"integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==",
"license": "MIT",
"dependencies": {
"delayed-stream": "~1.0.0"
},
"engines": {
"node": ">= 0.8"
}
},
"node_modules/delayed-stream": {
"version": "1.0.0",
"resolved": "https://registry.npmmirror.com/delayed-stream/-/delayed-stream-1.0.0.tgz",
"integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==",
"license": "MIT",
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/dunder-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/dunder-proto/-/dunder-proto-1.0.1.tgz",
"integrity": "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A==",
"license": "MIT",
"dependencies": {
"call-bind-apply-helpers": "^1.0.1",
"es-errors": "^1.3.0",
"gopd": "^1.2.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-define-property": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/es-define-property/-/es-define-property-1.0.1.tgz",
"integrity": "sha512-e3nRfgfUZ4rNGL232gUgX06QNyyez04KdjFrF+LTRoOXmrOgFKDg4BCdsjW8EnT69eqdYGmRpJwiPVYNrCaW3g==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-errors": {
"version": "1.3.0",
"resolved": "https://registry.npmmirror.com/es-errors/-/es-errors-1.3.0.tgz",
"integrity": "sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-object-atoms": {
"version": "1.1.1",
"resolved": "https://registry.npmmirror.com/es-object-atoms/-/es-object-atoms-1.1.1.tgz",
"integrity": "sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-set-tostringtag": {
"version": "2.1.0",
"resolved": "https://registry.npmmirror.com/es-set-tostringtag/-/es-set-tostringtag-2.1.0.tgz",
"integrity": "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0",
"get-intrinsic": "^1.2.6",
"has-tostringtag": "^1.0.2",
"hasown": "^2.0.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/follow-redirects": {
"version": "1.15.11",
"resolved": "https://registry.npmmirror.com/follow-redirects/-/follow-redirects-1.15.11.tgz",
"integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==",
"funding": [
{
"type": "individual",
"url": "https://github.com/sponsors/RubenVerborgh"
}
],
"license": "MIT",
"engines": {
"node": ">=4.0"
},
"peerDependenciesMeta": {
"debug": {
"optional": true
}
}
},
"node_modules/form-data": {
"version": "4.0.5",
"resolved": "https://registry.npmmirror.com/form-data/-/form-data-4.0.5.tgz",
"integrity": "sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w==",
"license": "MIT",
"dependencies": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
"es-set-tostringtag": "^2.1.0",
"hasown": "^2.0.2",
"mime-types": "^2.1.12"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/function-bind": {
"version": "1.1.2",
"resolved": "https://registry.npmmirror.com/function-bind/-/function-bind-1.1.2.tgz",
"integrity": "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/get-intrinsic": {
"version": "1.3.0",
"resolved": "https://registry.npmmirror.com/get-intrinsic/-/get-intrinsic-1.3.0.tgz",
"integrity": "sha512-9fSjSaos/fRIVIp+xSJlE6lfwhES7LNtKaCBIamHsjr2na1BiABJPo0mOjjz8GJDURarmCPGqaiVg5mfjb98CQ==",
"license": "MIT",
"dependencies": {
"call-bind-apply-helpers": "^1.0.2",
"es-define-property": "^1.0.1",
"es-errors": "^1.3.0",
"es-object-atoms": "^1.1.1",
"function-bind": "^1.1.2",
"get-proto": "^1.0.1",
"gopd": "^1.2.0",
"has-symbols": "^1.1.0",
"hasown": "^2.0.2",
"math-intrinsics": "^1.1.0"
},
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/get-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/get-proto/-/get-proto-1.0.1.tgz",
"integrity": "sha512-sTSfBjoXBp89JvIKIefqw7U2CCebsc74kiY6awiGogKtoSGbgjYE/G/+l9sF3MWFPNc9IcoOC4ODfKHfxFmp0g==",
"license": "MIT",
"dependencies": {
"dunder-proto": "^1.0.1",
"es-object-atoms": "^1.0.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/gopd": {
"version": "1.2.0",
"resolved": "https://registry.npmmirror.com/gopd/-/gopd-1.2.0.tgz",
"integrity": "sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/has-symbols": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/has-symbols/-/has-symbols-1.1.0.tgz",
"integrity": "sha512-1cDNdwJ2Jaohmb3sg4OmKaMBwuC48sYni5HUw2DvsC8LjGTLK9h+eb1X6RyuOHe4hT0ULCW68iomhjUoKUqlPQ==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/has-tostringtag": {
"version": "1.0.2",
"resolved": "https://registry.npmmirror.com/has-tostringtag/-/has-tostringtag-1.0.2.tgz",
"integrity": "sha512-NqADB8VjPFLM2V0VvHUewwwsw0ZWBaIdgo+ieHtK3hasLz4qeCRjYcqfB6AQrBggRKppKF8L52/VqdVsO47Dlw==",
"license": "MIT",
"dependencies": {
"has-symbols": "^1.0.3"
},
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/hasown": {
"version": "2.0.2",
"resolved": "https://registry.npmmirror.com/hasown/-/hasown-2.0.2.tgz",
"integrity": "sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ==",
"license": "MIT",
"dependencies": {
"function-bind": "^1.1.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/math-intrinsics": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/math-intrinsics/-/math-intrinsics-1.1.0.tgz",
"integrity": "sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/mime-db": {
"version": "1.52.0",
"resolved": "https://registry.npmmirror.com/mime-db/-/mime-db-1.52.0.tgz",
"integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
"license": "MIT",
"engines": {
"node": ">= 0.6"
}
},
"node_modules/mime-types": {
"version": "2.1.35",
"resolved": "https://registry.npmmirror.com/mime-types/-/mime-types-2.1.35.tgz",
"integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
"license": "MIT",
"dependencies": {
"mime-db": "1.52.0"
},
"engines": {
"node": ">= 0.6"
}
},
"node_modules/proxy-from-env": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==",
"license": "MIT"
}
}
}
FILE:package.json
{
"name": "es-poll-feishu",
"version": "1.2.5",
"description": "ES 轮询新数据 → 飞书推送",
"main": "scripts/poller.js",
"scripts": {
"start": "node scripts/poller.js"
},
"dependencies": {
"axios": "^1.6.0"
}
}
FILE:scripts/cli.js
#!/usr/bin/env node
/**
* ES-Poll-Feishu CLI 入口
*
* 用法:node cli.js <tool_name> '<json_args>'
* echo '<json>' | node cli.js <tool_name>
*
* 示例:
* node cli.js search '{"query":{"match_all":{}},"size":5}'
* node cli.js poll_once '{}'
* node cli.js test_feishu '{"text":"测试消息"}'
* node cli.js status '{}'
* node cli.js cursor '{}'
* node cli.js reset_cursor '{}'
* node cli.js set_cursor '{"lastTimestamp":1711234567}'
*/
"use strict";
const fs = require("fs");
const path = require("path");
const http = require("http");
const https = require("https");
const os = require("os");
// ---------- 路径 & 配置 ----------
const CONFIG_PATH = path.join(os.homedir(), ".openclaw", "es-poll-feishu.json");
const DATA_DIR = path.join(os.homedir(), "clawd", "data", "es-poll-feishu");
const PID_FILE = path.join(DATA_DIR, "poller.pid");
const STATS_FILE = path.join(DATA_DIR, "stats.json");
const CURSOR_FILE = path.join(DATA_DIR, "cursor.json");
const DEFAULT_CONFIG = {
es_url: "",
es_index: "",
es_auth: "",
es_params: {},
es_query: null,
es_time_field: "ctime",
es_sort_field: "ctime",
es_size: 50,
es_tiebreaker_field: "_doc",
es_max_pages: 20,
poll_interval: 60,
flood_threshold: 500,
flood_cooldown: 300,
push_rate_limit: 3,
feishu_app_id: "",
feishu_app_secret: "",
feishu_user_id: "",
title_field: "title",
content_field: "content",
url_field: "url",
};
let config = { ...DEFAULT_CONFIG };
function loadConfig() {
if (!fs.existsSync(CONFIG_PATH)) {
return { ok: false, error: `配置文件不存在: CONFIG_PATH,请先运行 es-poll-feishu config` };
}
try {
const fileConfig = JSON.parse(fs.readFileSync(CONFIG_PATH, "utf-8"));
config = { ...DEFAULT_CONFIG, ...fileConfig };
return { ok: true };
} catch (e) {
return { ok: false, error: `配置文件解析失败: e.message` };
}
}
// ---------- HTTP 请求(零依赖) ----------
function request(method, urlStr, body, headers = {}, timeoutMs = 30000) {
return new Promise((resolve) => {
const url = new URL(urlStr);
const isHttps = url.protocol === "https:";
const mod = isHttps ? https : http;
const reqHeaders = { ...headers };
let payload;
if (body !== undefined) {
payload = Buffer.from(JSON.stringify(body), "utf-8");
reqHeaders["Content-Type"] = "application/json";
reqHeaders["Content-Length"] = payload.length;
}
const opts = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname + url.search,
method,
headers: reqHeaders,
timeout: timeoutMs,
};
const req = mod.request(opts, (res) => {
const chunks = [];
res.on("data", (c) => chunks.push(c));
res.on("end", () => {
const raw = Buffer.concat(chunks).toString("utf-8");
try {
resolve({ status: res.statusCode, data: JSON.parse(raw) });
} catch (_) {
resolve({ status: res.statusCode, data: raw });
}
});
});
req.on("timeout", () => {
req.destroy();
resolve({ status: 504, data: { error: "请求超时" } });
});
req.on("error", (err) => {
resolve({ status: 0, data: { error: err.message, code: err.code } });
});
if (payload) req.write(payload);
req.end();
});
}
// ---------- ES 请求封装 ----------
function buildEsUrl(indexOverride) {
const idx = resolveIndex(indexOverride);
const params = new URLSearchParams(config.es_params || {});
return `config.es_url.replace(/\/+$/, "")/idx/_search?params.toString()`;
}
function esHeaders() {
return {
"Content-Type": "application/json",
Authorization: `Basic config.es_auth`,
};
}
// ---------- 飞书封装 ----------
let feishuAccessToken = "";
let feishuTokenExpiry = 0;
async function getFeishuToken() {
if (feishuAccessToken && Date.now() < feishuTokenExpiry - 300000) {
return feishuAccessToken;
}
const res = await request("POST", "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal", {
app_id: config.feishu_app_id,
app_secret: config.feishu_app_secret,
});
if (res.data?.app_access_token) {
feishuAccessToken = res.data.app_access_token;
feishuTokenExpiry = Date.now() + (res.data.expire || 7200) * 1000;
return feishuAccessToken;
}
throw new Error(`飞书 Token 获取失败: JSON.stringify(res.data)`);
}
// ---------- 辅助 ----------
function getNestedValue(obj, fieldPath) {
return fieldPath.split(".").reduce((o, k) => (o && o[k] !== undefined ? o[k] : null), obj);
}
function readJsonFile(filePath) {
try {
if (fs.existsSync(filePath)) return JSON.parse(fs.readFileSync(filePath, "utf-8"));
} catch (_) {}
return null;
}
function ensureDataDir() {
if (!fs.existsSync(DATA_DIR)) fs.mkdirSync(DATA_DIR, { recursive: true });
}
// ---------- 索引名动态解析 ----------
function resolveIndex(override) {
const raw = override || config.es_index;
const now = new Date();
const yyyy = now.getFullYear().toString();
const MM = (now.getMonth() + 1).toString().padStart(2, '0');
return raw
.replace(/\{yyyyMM\}/g, `yyyyMM`)
.replace(/\{yyyy\}/g, yyyy)
.replace(/\{MM\}/g, MM);
}
// ========== 工具实现 ==========
/**
* search — 直接查询 ES,返回原始结果
* 参数: { query?, index?, size?, sort?, _source? }
*/
async function search({ query, index, size, sort, _source } = {}) {
const body = {};
body.query = query || config.es_query || { match_all: {} };
body.size = size || config.es_size;
if (sort) {
body.sort = sort;
} else {
body.sort = [{ [config.es_sort_field]: { order: "desc" } }];
}
if (_source) body._source = _source;
const url = buildEsUrl(index);
const res = await request("POST", url, body, esHeaders());
if (res.status === 0 || res.status >= 400) {
return JSON.stringify({ code: res.status || 500, message: "ES 查询失败", data: res.data });
}
const hits = res.data?.hits || {};
return JSON.stringify({
code: 200,
total: hits.total?.value ?? hits.total ?? 0,
count: (hits.hits || []).length,
items: (hits.hits || []).map((h) => ({
_index: h._index,
_id: h._id,
...h._source,
})),
});
}
/**
* poll_once — 执行一次轮询(search_after 分页 + 飞书推送),更新游标
* 参数: { dry_run? } — dry_run=true 时只查询不推送
*/
async function poll_once({ dry_run = false } = {}) {
ensureDataDir();
const cursorData = readJsonFile(CURSOR_FILE) || { searchAfter: null, lastTimestamp: null };
// 兼容 v1.0 旧游标格式
if (!cursorData.searchAfter && cursorData.lastTimestamp != null) {
cursorData.searchAfter = [cursorData.lastTimestamp, null];
}
const tiebreakerField = config.es_tiebreaker_field || "_id";
const isFirstRun = !cursorData.searchAfter;
// 基础查询
let baseQuery;
if (isFirstRun) {
baseQuery = config.es_query || { match_all: {} };
} else if (cursorData.searchAfter[1] === null) {
// v1.0 迁移兜底
const q = config.es_query || { match_all: {} };
if (cursorData.lastTimestamp) {
baseQuery = { bool: { must: [q, { range: { [config.es_time_field]: { gte: cursorData.lastTimestamp } } }] } };
} else {
baseQuery = q;
}
} else {
baseQuery = config.es_query || { match_all: {} };
}
const sortDef = isFirstRun
? [{ [config.es_sort_field]: { order: "desc" } }, { [tiebreakerField]: { order: "desc" } }]
: [{ [config.es_sort_field]: { order: "asc" } }, { [tiebreakerField]: { order: "asc" } }];
const url = buildEsUrl();
// 首次运行:只拉一条初始化游标
if (isFirstRun) {
const body = { size: 1, sort: sortDef, query: baseQuery };
const res = await request("POST", url, body, esHeaders());
if (res.status === 0 || res.status >= 400) {
return JSON.stringify({ code: res.status || 500, message: "ES 查询失败", data: res.data });
}
const hits = res.data?.hits?.hits || [];
if (hits.length > 0 && hits[0].sort) {
cursorData.searchAfter = hits[0].sort;
cursorData.lastTimestamp = getNestedValue(hits[0]._source || {}, config.es_time_field);
fs.writeFileSync(CURSOR_FILE, JSON.stringify(cursorData, null, 2));
return JSON.stringify({ code: 200, message: "首次运行,游标已初始化", cursor: cursorData });
}
return JSON.stringify({ code: 200, message: "索引中无数据" });
}
// 分页循环
let currentSearchAfter = cursorData.searchAfter;
let totalHits = 0;
let pushed = 0;
let pageCount = 0;
const maxPages = config.es_max_pages || 20;
const cursorBefore = cursorData.searchAfter ? [...cursorData.searchAfter] : null;
while (true) {
pageCount++;
if (maxPages > 0 && pageCount > maxPages) break;
const body = { size: config.es_size, sort: sortDef, query: baseQuery };
if (currentSearchAfter && currentSearchAfter[1] !== null) {
body.search_after = currentSearchAfter;
}
const res = await request("POST", url, body, esHeaders());
if (res.status === 0 || res.status >= 400) {
// 查询失败,保存已推进的游标后返回
fs.writeFileSync(CURSOR_FILE, JSON.stringify(cursorData, null, 2));
return JSON.stringify({ code: res.status || 500, message: "ES 查询失败", data: res.data, pushed, totalHits });
}
const hits = res.data?.hits?.hits || [];
if (hits.length === 0) break;
totalHits += hits.length;
for (const hit of hits) {
const source = hit._source || {};
if (!dry_run) {
const token = await getFeishuToken();
const title = getNestedValue(source, config.title_field) || "(无标题)";
const content = getNestedValue(source, config.content_field) || "";
const hitUrl = getNestedValue(source, config.url_field) || "";
const timeVal = getNestedValue(source, config.es_time_field);
const timeStr = timeVal ? new Date(timeVal * 1000).toLocaleString("zh-CN") : "";
const text = `📢 新数据推送\n\n【title】\n\ncontent.slice(0, 50)""\n\n""""`;
const pushRes = await request(
"POST",
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id",
{ receive_id: config.feishu_user_id, msg_type: "text", content: JSON.stringify({ text }) },
{ Authorization: `Bearer token`, "Content-Type": "application/json" }
);
if (pushRes.status === 200 || pushRes.data?.code === 0) pushed++;
}
// 推进游标
if (hit.sort && hit.sort.length >= 2) {
currentSearchAfter = hit.sort;
cursorData.searchAfter = hit.sort;
const ts = getNestedValue(source, config.es_time_field);
if (ts) cursorData.lastTimestamp = ts;
}
}
if (hits.length < config.es_size) break;
}
// 持久化游标
fs.writeFileSync(CURSOR_FILE, JSON.stringify(cursorData, null, 2));
return JSON.stringify({
code: 200,
total_hits: totalHits,
pages: pageCount,
pushed,
dry_run,
cursor_before: cursorBefore,
cursor_after: cursorData.searchAfter,
});
}
/**
* test_feishu — 发送测试消息到飞书
* 参数: { text? }
*/
async function test_feishu({ text } = {}) {
const msg = text || "🔔 ES-Poll-Feishu 测试消息 — 飞书推送正常!";
const token = await getFeishuToken();
const res = await request(
"POST",
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id",
{ receive_id: config.feishu_user_id, msg_type: "text", content: JSON.stringify({ text: msg }) },
{ Authorization: `Bearer token`, "Content-Type": "application/json" }
);
if (res.data?.code === 0 || res.status === 200) {
return JSON.stringify({ code: 200, message: "飞书推送成功" });
}
return JSON.stringify({ code: res.status || 500, message: "飞书推送失败", data: res.data });
}
/**
* test_es — 测试 ES 连接(查询索引信息)
* 参数: { index? }
*/
async function test_es({ index } = {}) {
const idx = index || config.es_index;
const params = new URLSearchParams(config.es_params || {});
const url = `config.es_url.replace(/\/+$/, "")/idx/_search?params.toString()`;
const body = { size: 0, query: config.es_query || { match_all: {} } };
const res = await request("POST", url, body, esHeaders());
if (res.status === 0 || res.status >= 400) {
return JSON.stringify({ code: res.status || 500, message: "ES 连接失败", data: res.data });
}
const total = res.data?.hits?.total?.value ?? res.data?.hits?.total ?? 0;
return JSON.stringify({ code: 200, message: "ES 连接正常", index: idx, total_docs: total });
}
/**
* status — 查看轮询服务状态和统计
*/
async function status() {
const statsData = readJsonFile(STATS_FILE);
const cursorData = readJsonFile(CURSOR_FILE);
let running = false;
let pid = null;
if (fs.existsSync(PID_FILE)) {
try {
pid = parseInt(fs.readFileSync(PID_FILE, "utf-8").trim());
process.kill(pid, 0);
running = true;
} catch (_) {
pid = null;
}
}
return JSON.stringify({
code: 200,
running,
pid,
stats: statsData || { totalPolled: 0, totalPushed: 0, totalHits: 0 },
cursor: cursorData || { lastTimestamp: null },
config_path: CONFIG_PATH,
data_dir: DATA_DIR,
});
}
/**
* cursor — 查看当前游标
*/
async function cursor() {
const cursorData = readJsonFile(CURSOR_FILE) || { lastTimestamp: null };
return JSON.stringify({ code: 200, cursor: cursorData });
}
/**
* reset_cursor — 重置游标
*/
async function reset_cursor() {
ensureDataDir();
if (fs.existsSync(CURSOR_FILE)) fs.unlinkSync(CURSOR_FILE);
return JSON.stringify({ code: 200, message: "游标已重置,下次轮询将从头拉取" });
}
/**
* set_cursor — 手动设置游标
* 参数: { lastTimestamp, searchAfter? }
* searchAfter 为 [timestampValue, tiebreakerValue] 数组
* 如果只提供 lastTimestamp,会生成兼容格式的游标
*/
async function set_cursor({ lastTimestamp, searchAfter } = {}) {
if (!searchAfter && (lastTimestamp === undefined || lastTimestamp === null)) {
return JSON.stringify({ code: 400, message: "请提供 lastTimestamp 或 searchAfter 参数" });
}
ensureDataDir();
const cursorData = {
searchAfter: searchAfter || [lastTimestamp, null],
lastTimestamp: lastTimestamp || (searchAfter ? searchAfter[0] : null),
};
fs.writeFileSync(CURSOR_FILE, JSON.stringify(cursorData, null, 2));
return JSON.stringify({ code: 200, message: `游标已设置`, cursor: cursorData });
}
// ---------- CLI 分发 ----------
const TOOLS = {
search,
poll_once,
test_feishu,
test_es,
status,
cursor,
reset_cursor,
set_cursor,
};
function readStdin() {
return new Promise((resolve) => {
if (process.stdin.isTTY) {
resolve("{}");
return;
}
const chunks = [];
process.stdin.setEncoding("utf-8");
process.stdin.on("data", (c) => chunks.push(c));
process.stdin.on("end", () => resolve(chunks.join("").trim() || "{}"));
});
}
async function main() {
const args = process.argv.slice(2);
if (args.length < 1) {
console.log(JSON.stringify({
code: 400,
message: [
"用法: node cli.js <tool_name> '<json_args>'",
"也可通过 stdin 传入 JSON: echo '<json>' | node cli.js <tool_name>",
"",
`可用工具: Object.keys(TOOLS).join(", ")`,
"",
"工具说明:",
" search — 直接查询 ES(自定义 query/size/sort)",
" poll_once — 执行一次增量轮询 + 飞书推送(dry_run=true 仅查询)",
" test_feishu — 发送测试消息到飞书",
" test_es — 测试 ES 连接",
" status — 查看轮询服务状态和统计",
" cursor — 查看当前游标",
" reset_cursor — 重置游标",
" set_cursor — 手动设置游标(需提供 lastTimestamp)",
].join("\n"),
data: null,
}));
process.exit(1);
}
const toolName = args[0];
if (!TOOLS[toolName]) {
console.log(JSON.stringify({
code: 400,
message: `未知工具: toolName,可用工具: Object.keys(TOOLS).join(", ")`,
data: null,
}));
process.exit(1);
}
// status / cursor / reset_cursor 不需要 ES 配置
const noConfigTools = ["status", "cursor", "reset_cursor"];
if (!noConfigTools.includes(toolName)) {
const configResult = loadConfig();
if (!configResult.ok) {
console.log(JSON.stringify({ code: 500, message: configResult.error, data: null }));
process.exit(1);
}
}
let rawArgs;
if (args.length > 1) {
rawArgs = args[1];
} else {
rawArgs = await readStdin();
}
let parsed;
try {
if (rawArgs.charCodeAt(0) === 0xfeff) rawArgs = rawArgs.slice(1);
parsed = JSON.parse(rawArgs);
} catch (e) {
console.log(JSON.stringify({
code: 400,
message: `JSON 参数解析失败: e.message`,
data: null,
}));
process.exit(1);
}
const result = await TOOLS[toolName](parsed);
console.log(result);
}
main();
FILE:scripts/poller.js
#!/usr/bin/env node
/**
* ES Poll Feishu
* 轮询 Elasticsearch 新数据 → 飞书推送
*
* 版本号从 package.json 读取
*/
const axios = require('axios');
const fs = require('fs');
const path = require('path');
const os = require('os');
const pkg = require('../package.json');
const VERSION = pkg.version;
// ============ 路径 ============
const CONFIG_PATH = path.join(os.homedir(), '.openclaw', 'es-poll-feishu.json');
const DATA_DIR = path.join(os.homedir(), 'clawd', 'data', 'es-poll-feishu');
const PID_FILE = path.join(DATA_DIR, 'poller.pid');
const LOG_FILE = path.join(DATA_DIR, 'poller.log');
const STATS_FILE = path.join(DATA_DIR, 'stats.json');
const CURSOR_FILE = path.join(DATA_DIR, 'cursor.json');
// ============ 默认配置 ============
const DEFAULT_CONFIG = {
// ES 连接
es_url: '', // 必填,如 http://your-es-host:9200
es_index: '', // 必填,如 xgks_*(支持 {yyyyMM} 占位符自动替换为当前年月,如 xgks_{yyyyMM}*)
es_auth: '', // 必填,Basic auth 字符串(base64)
es_params: {}, // URL 附加参数,如 { "app_customer_name": "11", "app_user_name": "22" }
// ES 查询
es_query: null, // 自定义查询体(完整 query 对象),为 null 则使用默认的时间范围查询
es_time_field: 'ctime', // 时间字段名,用于增量轮询
es_sort_field: 'ctime', // 排序字段
es_size: 50, // 每次拉取条数
es_tiebreaker_field: '_doc', // search_after tiebreaker 字段(_doc 为 ES 内部文档顺序,无需 fielddata)
es_max_pages: 20, // 单次轮询最大翻页数(防止无限循环),0=不限
// 轮询
poll_interval: 60, // 轮询间隔(秒)
// 飞书
feishu_app_id: '', // 必填
feishu_app_secret: '', // 必填
feishu_user_id: '', // 必填,open_id
// 雪崩保护
flood_threshold: 5000, // 单次轮询命中超过此数量,触发雪崩保护(只发告警,不逐条推)
flood_cooldown: 300, // 雪崩告警冷却时间(秒),避免重复告警刷屏
push_rate_limit: 3, // 推送限速(条/秒),飞书同用户上限 5 QPS,留余量
// 消息格式
title_field: 'title', // 标题字段路径
content_field: 'content', // 内容字段路径
url_field: 'url', // 链接字段路径
};
let config = { ...DEFAULT_CONFIG };
// ============ 配置加载 ============
function loadConfig() {
if (!fs.existsSync(CONFIG_PATH)) {
console.error('❌ 配置文件不存在:', CONFIG_PATH);
console.error('请先运行: es-poll-feishu config');
process.exit(1);
}
// 检查配置文件权限,仅允许 owner 读写(防止同机其他用户读取敏感凭据)
try {
const stat = fs.statSync(CONFIG_PATH);
const mode = stat.mode & 0o777;
if (mode & 0o077) {
console.error(`⚠️ 配置文件权限过宽 (mode.toString(8)),正在修正为 600...`);
fs.chmodSync(CONFIG_PATH, 0o600);
}
} catch (_) {}
try {
const fileConfig = JSON.parse(fs.readFileSync(CONFIG_PATH, 'utf-8'));
config = { ...DEFAULT_CONFIG, ...fileConfig };
} catch (e) {
console.error('❌ 配置文件解析失败:', e.message);
process.exit(1);
}
const required = ['es_url', 'es_index', 'es_auth', 'feishu_app_id', 'feishu_app_secret', 'feishu_user_id'];
const missing = required.filter(k => !config[k]);
if (missing.length > 0) {
console.error('❌ 缺少必填配置:', missing.join(', '));
process.exit(1);
}
}
// ============ 日志 ============
function ensureDataDir() {
if (!fs.existsSync(DATA_DIR)) fs.mkdirSync(DATA_DIR, { recursive: true });
}
const MAX_LOG_SIZE = 10 * 1024 * 1024; // 10MB
let _logLineCount = 0; // 每 100 行检查一次文件大小,减少 statSync 开销
function rotateLogIfNeeded() {
try {
if (!fs.existsSync(LOG_FILE)) return;
const stat = fs.statSync(LOG_FILE);
if (stat.size > MAX_LOG_SIZE) {
const backupPath = LOG_FILE + '.1';
if (fs.existsSync(backupPath)) fs.unlinkSync(backupPath);
fs.renameSync(LOG_FILE, backupPath);
}
} catch (_) {}
}
function log(msg) {
const line = `new Date().toISOString() - msg`;
try {
// 每 100 行检查一次日志大小,避免每次写入都 statSync
if (++_logLineCount % 100 === 0) rotateLogIfNeeded();
fs.appendFileSync(LOG_FILE, line + '\n');
} catch (_) {}
}
// ============ PID 管理 ============
function writePid() {
// 写入 PID 和启动时间戳,防止 PID 复用误判
const data = { pid: process.pid, startedAt: Date.now() };
fs.writeFileSync(PID_FILE, JSON.stringify(data));
log(`PID process.pid written`);
}
function removePid() {
try { if (fs.existsSync(PID_FILE)) fs.unlinkSync(PID_FILE); } catch (_) {}
}
function isAlreadyRunning() {
if (!fs.existsSync(PID_FILE)) return false;
try {
const raw = fs.readFileSync(PID_FILE, 'utf-8').trim();
let pid, startedAt;
// 兼容旧格式(纯数字)和新格式(JSON)
if (raw.startsWith('{')) {
const data = JSON.parse(raw);
pid = data.pid;
startedAt = data.startedAt;
} else {
pid = parseInt(raw);
startedAt = null;
}
// 检查进程是否存在
process.kill(pid, 0);
// 进程存在,再校验启动时间(如果有)
if (startedAt) {
// 通过 /proc 或 ps 获取进程启动时间比较复杂,这里用简化方案:
// 如果 PID 文件的启动时间超过 30 天,认为是 PID 复用,清理掉
const ageMs = Date.now() - startedAt;
if (ageMs > 30 * 24 * 60 * 60 * 1000) {
log(`⚠️ PID 文件过旧 (Math.round(ageMs / 86400000) 天),可能是 PID 复用,清理`);
removePid();
return false;
}
}
return true;
} catch (_) {
removePid();
return false;
}
}
// ============ 统计 & 游标 ============
// 游标格式 v1.1: { searchAfter: [timestampValue, tiebreakerValue], lastTimestamp: number }
// 兼容 v1.0: { lastTimestamp: number } — 加载时自动迁移
let stats = { totalPolled: 0, totalPushed: 0, totalHits: 0, lastPollTime: null };
let cursor = { searchAfter: null, lastTimestamp: null };
function loadStats() {
try { if (fs.existsSync(STATS_FILE)) stats = JSON.parse(fs.readFileSync(STATS_FILE, 'utf-8')); } catch (_) {}
}
function saveStats() {
stats.lastUpdate = new Date().toISOString();
// 原子写入,和 saveCursor 一致,防止写入中途崩溃导致文件损坏
const tmpFile = STATS_FILE + '.tmp';
try {
fs.writeFileSync(tmpFile, JSON.stringify(stats, null, 2));
fs.renameSync(tmpFile, STATS_FILE);
} catch (_) {}
}
function loadCursor() {
try {
if (fs.existsSync(CURSOR_FILE)) {
const raw = JSON.parse(fs.readFileSync(CURSOR_FILE, 'utf-8'));
if (raw.searchAfter) {
// v1.1 格式
cursor = raw;
} else if (raw.lastTimestamp != null) {
// v1.0 旧格式迁移:只有 lastTimestamp,没有 tiebreaker,用 null 占位
// 首次 search_after 查询时会用 gte + 时间范围兜底,不会丢数据
log('📌 检测到 v1.0 游标格式,自动迁移为 v1.1 (search_after)');
cursor = { searchAfter: [raw.lastTimestamp, null], lastTimestamp: raw.lastTimestamp };
}
}
} catch (_) {}
}
function saveCursor() {
// 原子写入:先写临时文件再 rename,防止写入中途崩溃导致游标文件损坏
const tmpFile = CURSOR_FILE + '.tmp';
try {
fs.writeFileSync(tmpFile, JSON.stringify(cursor, null, 2));
fs.renameSync(tmpFile, CURSOR_FILE);
} catch (_) {}
}
// ============ 嵌套字段取值 ============
function getNestedValue(obj, fieldPath) {
return fieldPath.split('.').reduce((o, k) => (o && o[k] !== undefined ? o[k] : null), obj);
}
// ============ 索引名动态解析 ============
function resolveIndex() {
const now = new Date();
const yyyy = now.getFullYear().toString();
const MM = (now.getMonth() + 1).toString().padStart(2, '0');
return config.es_index
.replace(/\{yyyyMM\}/g, `yyyyMM`)
.replace(/\{yyyy\}/g, yyyy)
.replace(/\{MM\}/g, MM);
}
// ============ 飞书 Token ============
let feishuAccessToken = '';
let feishuTokenExpiry = 0;
async function getFeishuToken() {
if (feishuAccessToken && Date.now() < feishuTokenExpiry - 300000) {
return feishuAccessToken;
}
try {
const res = await axios.post('https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal', {
app_id: config.feishu_app_id,
app_secret: config.feishu_app_secret,
});
feishuAccessToken = res.data.app_access_token;
feishuTokenExpiry = Date.now() + res.data.expire * 1000;
log('✅ 飞书 Token 获取成功');
return feishuAccessToken;
} catch (err) {
log('❌ 飞书 Token 获取失败: ' + err.message);
return null;
}
}
// ============ 发送飞书消息(带重试) ============
const FEISHU_MAX_RETRIES = 3;
const FEISHU_RETRY_DELAY_MS = 2000;
async function sendToFeishu(hit) {
const source = hit._source || {};
const title = getNestedValue(source, config.title_field) || '(无标题)';
const content = getNestedValue(source, config.content_field) || '';
const url = getNestedValue(source, config.url_field) || '';
const timeVal = getNestedValue(source, config.es_time_field);
const timeStr = timeVal ? new Date(timeVal * 1000).toLocaleString('zh-CN') : '';
const text = `📢 新数据推送
【title】
content.slice(0, 50)''
''
''
📋 索引: hit._index || ''`;
for (let attempt = 1; attempt <= FEISHU_MAX_RETRIES; attempt++) {
const token = await getFeishuToken();
if (!token) {
log(`❌ 推送失败: 无法获取飞书 Token (尝试 attempt/FEISHU_MAX_RETRIES)`);
if (attempt < FEISHU_MAX_RETRIES) await sleep(FEISHU_RETRY_DELAY_MS);
continue;
}
try {
await axios.post(
'https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id',
{
receive_id: config.feishu_user_id,
msg_type: 'text',
content: JSON.stringify({ text }),
},
{ headers: { Authorization: `Bearer token`, 'Content-Type': 'application/json' } }
);
log(`✅ 已推送: title.slice(0, 40)`);
return true;
} catch (err) {
log(`❌ 推送失败 (尝试 attempt/FEISHU_MAX_RETRIES): err.message`);
if (err.response?.status === 401) {
// Token 过期,清除缓存后重试时会自动重新获取
feishuAccessToken = '';
feishuTokenExpiry = 0;
}
if (attempt < FEISHU_MAX_RETRIES) await sleep(FEISHU_RETRY_DELAY_MS);
}
}
return false;
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
// ============ 雪崩保护 ============
let lastFloodAlertTime = 0; // 上次雪崩告警时间戳(ms)
/**
* 探测待处理数据总量(size:0 count 查询,不拉数据)
* 用游标时间戳做 range 过滤,近似估算增量数据量(而非全量)
* 返回 -1 表示查询失败
*/
async function countPending(url, baseQuery) {
// 在 baseQuery 基础上加上游标时间范围,避免把历史全量数据算进去
let countQuery = baseQuery;
if (cursor.lastTimestamp != null) {
countQuery = {
bool: {
must: [baseQuery],
filter: [{ range: { [config.es_time_field]: { gt: cursor.lastTimestamp } } }],
},
};
}
const body = { size: 0, query: countQuery, track_total_hits: true };
try {
const res = await axios.post(url, body, {
headers: { 'Content-Type': 'application/json', Authorization: `Basic config.es_auth` },
timeout: 15000,
});
return res.data?.hits?.total?.value ?? res.data?.hits?.total ?? -1;
} catch (_) {
return -1;
}
}
/**
* 发送雪崩告警到飞书(单条汇总消息)
*/
async function sendFloodAlert(pendingCount) {
const now = Date.now();
const cooldown = (config.flood_cooldown || 300) * 1000;
if (now - lastFloodAlertTime < cooldown) {
log(`🔇 雪崩告警冷却中 (Math.round((cooldown - (now - lastFloodAlertTime)) / 1000)s 后可再次告警)`);
return;
}
const text = `🚨 数据雪崩告警
检测到待处理数据量异常:pendingCount 条
阈值:config.flood_threshold 条
时间:new Date().toLocaleString('zh-CN')
⚠️ 已自动暂停逐条推送,游标快进至最新位置。
疑似上游系统异常导致大量写入,请排查数据源。
恢复正常后,轮询将自动继续。`;
const token = await getFeishuToken();
if (!token) { log('❌ 雪崩告警发送失败: 无法获取飞书 Token'); return; }
try {
await axios.post(
'https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id',
{ receive_id: config.feishu_user_id, msg_type: 'text', content: JSON.stringify({ text }) },
{ headers: { Authorization: `Bearer token`, 'Content-Type': 'application/json' } }
);
lastFloodAlertTime = now;
log(`🚨 雪崩告警已发送 (待处理: pendingCount)`);
} catch (err) {
log(`❌ 雪崩告警发送失败: err.message`);
}
}
/**
* 雪崩时快进游标:跳到最新数据位置,不推送
*/
async function fastForwardCursor(url, baseQuery, tiebreakerField) {
// 拉最新一条(倒序),用它的 sort 值作为新游标
const sortDesc = [
{ [config.es_sort_field]: { order: 'desc' } },
{ [tiebreakerField]: { order: 'desc' } },
];
try {
const res = await axios.post(url, { size: 1, sort: sortDesc, query: baseQuery }, {
headers: { 'Content-Type': 'application/json', Authorization: `Basic config.es_auth` },
timeout: 15000,
});
const hits = res.data?.hits?.hits || [];
if (hits.length > 0 && hits[0].sort && hits[0].sort.length >= 2) {
const oldCursor = cursor.searchAfter ? JSON.stringify(cursor.searchAfter) : 'null';
cursor.searchAfter = hits[0].sort;
const ts = getNestedValue(hits[0]._source || {}, config.es_time_field);
if (ts) cursor.lastTimestamp = ts;
saveCursor();
log(`⏩ 游标快进: oldCursor → JSON.stringify(cursor.searchAfter)`);
return true;
}
} catch (err) {
log(`❌ 游标快进失败: err.message`);
}
return false;
}
// ============ ES 查询(search_after 分页) ============
const MAX_ES_SIZE = 200; // es_size 上限,防止单页过大
const DEFAULT_MAX_PAGES = 20; // 单次轮询默认最大翻页数
async function pollES() {
stats.totalPolled++;
stats.lastPollTime = new Date().toISOString();
const effectiveSize = Math.min(config.es_size, MAX_ES_SIZE);
const maxPages = config.es_max_pages || DEFAULT_MAX_PAGES;
const tiebreakerField = config.es_tiebreaker_field || '_id';
// 首次运行标记:无游标时只初始化游标,不推送
const isFirstRun = !cursor.searchAfter;
// 构建基础查询(不含时间范围,时间范围由 search_after 的 sort 值隐式处理)
let baseQuery;
if (isFirstRun) {
// 首次运行:拉最新一条初始化游标
baseQuery = config.es_query || { match_all: {} };
} else if (config.es_query) {
// 有自定义查询:直接用,search_after 会自动跳过已处理的数据
// 但如果是从 v1.0 迁移(tiebreaker 为 null),需要加时间范围兜底
if (cursor.searchAfter[1] === null) {
const must = [config.es_query];
must.push({ range: { [config.es_time_field]: { gte: cursor.lastTimestamp } } });
baseQuery = { bool: { must } };
} else {
baseQuery = config.es_query;
}
} else {
// 无自定义查询:search_after 已经隐式过滤了旧数据
if (cursor.searchAfter[1] === null) {
// v1.0 迁移兜底
baseQuery = { range: { [config.es_time_field]: { gte: cursor.lastTimestamp } } };
} else {
baseQuery = { match_all: {} };
}
}
// sort 定义:主排序字段 + tiebreaker
const sortDef = isFirstRun
? [{ [config.es_sort_field]: { order: 'desc' } }, { [tiebreakerField]: { order: 'desc' } }]
: [{ [config.es_sort_field]: { order: 'asc' } }, { [tiebreakerField]: { order: 'asc' } }];
// 构建 URL
const params = new URLSearchParams(config.es_params || {});
const index = resolveIndex();
const url = `config.es_url.replace(/\/+$/, '')/index/_search?params.toString()`;
if (isFirstRun) {
log('📌 首次运行,仅拉取最新数据初始化游标');
await pollFirstRun(url, baseQuery, sortDef, tiebreakerField);
return;
}
// ---- 雪崩检测:先探测待处理数据量 ----
const floodThreshold = config.flood_threshold || 500;
const pendingCount = await countPending(url, baseQuery);
if (pendingCount > floodThreshold) {
log(`🚨 雪崩检测触发: 待处理 pendingCount 条 > 阈值 floodThreshold`);
await sendFloodAlert(pendingCount);
await fastForwardCursor(url, baseQuery, tiebreakerField);
stats.totalHits += pendingCount;
saveStats();
return;
} else if (pendingCount >= 0) {
log(`📊 待处理数据量: pendingCount (阈值: floodThreshold)`);
}
// ---- 推送限速参数 ----
const pushRateLimit = config.push_rate_limit || 3;
const pushIntervalMs = Math.ceil(1000 / pushRateLimit);
// ---- 分页循环:持续拉取直到没有更多数据 ----
let currentSearchAfter = cursor.searchAfter;
let totalHitsThisRound = 0;
let totalPushedThisRound = 0;
let pageCount = 0;
let pushFailed = false;
while (true) {
pageCount++;
if (maxPages > 0 && pageCount > maxPages) {
log(`⚠️ 达到单次轮询最大翻页数 (maxPages),剩余数据下次轮询继续`);
break;
}
const body = { size: effectiveSize, sort: sortDef, query: baseQuery };
// search_after:跳过已处理的数据(v1.0 迁移时 tiebreaker 为 null,不传 search_after,靠时间范围兜底)
if (currentSearchAfter && currentSearchAfter[1] !== null) {
body.search_after = currentSearchAfter;
}
try {
const res = await axios.post(url, body, {
headers: {
'Content-Type': 'application/json',
Authorization: `Basic config.es_auth`,
},
timeout: 30000,
});
const hits = res.data?.hits?.hits || [];
if (hits.length === 0) {
break; // 没有更多数据了
}
totalHitsThisRound += hits.length;
stats.totalHits += hits.length;
log(`🔍 轮询 #stats.totalPolled 第pageCount页: hits.length 条`);
// 逐条推送(带限速)
for (const hit of hits) {
const ok = await sendToFeishu(hit);
if (ok) {
stats.totalPushed++;
totalPushedThisRound++;
// 推送成功才推进游标
const sortValues = hit.sort;
if (sortValues && sortValues.length >= 2) {
currentSearchAfter = sortValues;
cursor.searchAfter = sortValues;
// 同时维护 lastTimestamp 用于日志和兼容
const ts = getNestedValue(hit._source || {}, config.es_time_field);
if (ts) cursor.lastTimestamp = ts;
}
// 限速:控制推送频率,避免触发飞书 QPS 限制
await sleep(pushIntervalMs);
} else {
pushFailed = true;
const remaining = hits.length - hits.indexOf(hit);
log(`⚠️ 推送失败,停止本轮处理,本页剩余 remaining 条将在下次轮询重试`);
break;
}
}
// 推送失败,停止翻页
if (pushFailed) break;
// 本页数据量不足一页,说明已经拉完
if (hits.length < effectiveSize) break;
} catch (err) {
if (err.response) {
log(`❌ ES 查询失败 [err.response.status]: JSON.stringify(err.response.data).slice(0, 200)`);
} else {
log('❌ ES 查询失败: ' + err.message);
}
break; // 查询失败,停止翻页,保留当前游标
}
}
if (totalHitsThisRound === 0) {
log(`🔍 轮询 #stats.totalPolled: 无新数据`);
} else {
log(`📊 轮询 #stats.totalPolled 完成: pageCount页, 命中 totalHitsThisRound, 推送 totalPushedThisRound`);
}
saveCursor();
saveStats();
}
/**
* 首次运行:拉最新一条初始化游标,不推送
*/
async function pollFirstRun(url, query, sortDef, tiebreakerField) {
try {
const body = { size: 1, sort: sortDef, query };
const res = await axios.post(url, body, {
headers: {
'Content-Type': 'application/json',
Authorization: `Basic config.es_auth`,
},
timeout: 30000,
});
const hits = res.data?.hits?.hits || [];
if (hits.length > 0) {
const hit = hits[0];
const sortValues = hit.sort;
const source = hit._source || {};
const ts = getNestedValue(source, config.es_time_field);
if (sortValues && sortValues.length >= 2) {
cursor.searchAfter = sortValues;
cursor.lastTimestamp = ts;
saveCursor();
log(`📌 游标已初始化: searchAfter=JSON.stringify(sortValues), config.es_time_field=ts`);
} else if (ts) {
// fallback:ES 没返回完整 sort(比如 tiebreaker 字段不存在),用时间戳兜底
cursor.searchAfter = [ts, null];
cursor.lastTimestamp = ts;
saveCursor();
log(`⚠️ ES 未返回 tiebreaker sort 值,游标降级为时间戳模式: ts`);
log(`⚠️ 请检查 es_tiebreaker_field (tiebreakerField) 是否存在于索引中`);
}
} else {
log('📌 索引中无数据,等待下次轮询');
}
} catch (err) {
if (err.response) {
log(`❌ 首次查询失败 [err.response.status]: JSON.stringify(err.response.data).slice(0, 200)`);
} else {
log('❌ 首次查询失败: ' + err.message);
}
}
saveStats();
}
// ============ 轮询循环(setTimeout 递归,防止堆叠) ============
let pollTimer = null;
let isShuttingDown = false;
async function schedulePoll() {
if (isShuttingDown) return;
try {
await pollES();
} catch (err) {
log(`❌ pollES 异常: err.message`);
}
if (!isShuttingDown) {
pollTimer = setTimeout(schedulePoll, config.poll_interval * 1000);
}
}
function startPolling() {
log(`⏱️ 轮询间隔: config.poll_intervals`);
// 立即执行一次,后续等上一次完成后再调度下一次
schedulePoll();
}
// ============ 优雅关闭 ============
function shutdown() {
if (isShuttingDown) return; // 防止重复触发
isShuttingDown = true;
log('🛑 Shutting down...');
log(`📊 总计轮询: stats.totalPolled, 命中: stats.totalHits, 推送: stats.totalPushed`);
if (pollTimer) clearTimeout(pollTimer);
saveStats();
saveCursor();
removePid();
process.exit(0);
}
// ============ 异常兜底 ============
// 防重入标记:避免 handler 自身抛异常导致无限递归(14GB 日志的元凶)
let _crashHandling = false;
function crashExit(label, detail) {
if (_crashHandling) {
// 已经在处理中,直接强退,不做任何 I/O
process.exit(2);
}
_crashHandling = true;
try { fs.appendFileSync(LOG_FILE, `new Date().toISOString() - 💥 label: detail\n`); } catch (_) {}
try { saveCursor(); } catch (_) {}
try { saveStats(); } catch (_) {}
try { removePid(); } catch (_) {}
process.exit(1);
}
process.on('uncaughtException', (err) => {
crashExit('未捕获异常', err.stack || err.message);
});
process.on('unhandledRejection', (reason) => {
crashExit('未处理的 Promise 拒绝', String(reason));
});
// ============ 主入口 ============
async function main() {
ensureDataDir();
loadConfig();
if (isAlreadyRunning()) {
console.error('❌ 已有实例在运行,请先停止');
process.exit(1);
}
writePid();
loadStats();
loadCursor();
log('========================================================');
log('🚀 ES-Poll-Feishu v' + VERSION + ' Started (search_after + 雪崩保护)');
log(`📡 ES: config.es_url/resolveIndex() (模板: config.es_index)`);
log(`⏱️ 间隔: config.poll_intervals, 每页: config.es_size, tiebreaker: config.es_tiebreaker_field || '_doc'`);
log(`🛡️ 雪崩保护: 阈值 config.flood_threshold 条, 冷却 config.flood_cooldowns, 推送限速 config.push_rate_limit/s`);
log(`📊 历史: 轮询 stats.totalPolled, 命中 stats.totalHits, 推送 stats.totalPushed`);
if (cursor.searchAfter) log(`📌 游标: searchAfter=JSON.stringify(cursor.searchAfter)`);
else if (cursor.lastTimestamp) log(`📌 游标(旧): config.es_time_field > cursor.lastTimestamp`);
log('========================================================');
await getFeishuToken();
startPolling();
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
}
main();
订阅-过滤-飞书推送。通过WebSocket订阅数据流,大模型智能过滤,自动推送到飞书。
---
name: "subscribe-filter-feishu"
description: "订阅-过滤-飞书推送。通过WebSocket订阅数据流,大模型智能过滤,自动推送到飞书。"
---
# Subscribe-Filter-Feishu v1.0.2
订阅数据流 → 大模型智能过滤 → 飞书推送。
## 功能
- 🔌 WebSocket 实时订阅数据流
- 🤖 大模型智能过滤
- 📱 飞书消息推送
- 📊 统计持久化
- 🔄 指数退避重连
- 🛑 优雅关闭
## 安装
```bash
cd skills/subscribe-filter-feishu
npm install
chmod +x scripts/subscribe-filter-feishu
```
## 配置(必须)
首次使用前,创建配置文件 `~/.openclaw/subscribe-filter-feishu.json`:
```json
{
"ws_url": "ws://your-server:port/ws",
"feishu_app_id": "your_feishu_app_id",
"feishu_app_secret": "your_feishu_app_secret",
"feishu_user_id": "your_feishu_open_id",
"model_api_key": "your_ark_api_key",
"model_base_url": "https://ark.cn-beijing.volces.com/api/v3",
"model_name": "your_endpoint_id"
}
```
或运行 `subscribe-filter-feishu config` 创建模板。
### 配置项说明
| 配置项 | 必填 | 说明 |
|--------|------|------|
| `ws_url` | ✅ | WebSocket 数据源地址 |
| `feishu_app_id` | ✅ | 飞书应用 App ID |
| `feishu_app_secret` | ✅ | 飞书应用 App Secret |
| `feishu_user_id` | ✅ | 接收消息的飞书用户 open_id |
| `model_api_key` | ✅ | 火山引擎 ARK API Key |
| `model_base_url` | ❌ | 大模型 API 地址(默认豆包2.0) |
| `model_name` | ✅ | 火山引擎 Endpoint ID |
## 使用
```bash
# 启动服务
subscribe-filter-feishu start
# 查看状态
subscribe-filter-feishu status
# 查看日志
subscribe-filter-feishu logs
# 停止服务
subscribe-filter-feishu stop
# 重启
subscribe-filter-feishu restart
# 查看/创建配置
subscribe-filter-feishu config
```
## 过滤规则(示例:AI新闻)
默认过滤规则只推送明确涉及 AI 核心技术的新闻:
- 机器学习/深度学习/神经网络
- 大语言模型(LLM)、NLP、计算机视觉
- AI 生成内容(AIGC)
- Transformer、GPT、BERT 等
不推送:
- 机器人/无人机/自动化机械
- 合成生物学/基因编辑
- 电池/储能/新能源
- 材料科学
可在 `receiver.js` 中修改 `isAIRelated()` 的 prompt 自定义过滤规则。
## 数据目录
```
~/clawd/data/subscribe-filter-feishu/
├── receiver.pid # PID 文件
├── receiver.log # 运行日志
└── stats.json # 统计数据
```
## 版本历史
### v1.0.2
- 异常兜底(uncaughtException / unhandledRejection 不退出进程)
### v1.0.0
- 配置文件管理(敏感信息不硬编码)
- PID 管理(防止重复启动)
- 管理脚本(start/stop/status)
- 指数退避重连
- 统计持久化
- 飞书 token 自动刷新
- 豆包2.0 大模型
## License
MIT
FILE:metadata.json
{
"name": "subscribe-filter-feishu",
"version": "1.0.3",
"description": "订阅-过滤-飞书推送。通过WebSocket订阅数据流,大模型智能过滤,自动推送到飞书。",
"author": "sougannkyou",
"license": "MIT",
"requires": ["ws", "axios"],
"node": ">=16"
}
FILE:package-lock.json
{
"name": "ai-news-feishu",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "ai-news-feishu",
"version": "1.0.0",
"dependencies": {
"axios": "^1.6.0",
"ws": "^8.14.2"
}
},
"node_modules/asynckit": {
"version": "0.4.0",
"resolved": "https://registry.npmmirror.com/asynckit/-/asynckit-0.4.0.tgz",
"integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==",
"license": "MIT"
},
"node_modules/axios": {
"version": "1.13.6",
"resolved": "https://registry.npmmirror.com/axios/-/axios-1.13.6.tgz",
"integrity": "sha512-ChTCHMouEe2kn713WHbQGcuYrr6fXTBiu460OTwWrWob16g1bXn4vtz07Ope7ewMozJAnEquLk5lWQWtBig9DQ==",
"license": "MIT",
"dependencies": {
"follow-redirects": "^1.15.11",
"form-data": "^4.0.5",
"proxy-from-env": "^1.1.0"
}
},
"node_modules/call-bind-apply-helpers": {
"version": "1.0.2",
"resolved": "https://registry.npmmirror.com/call-bind-apply-helpers/-/call-bind-apply-helpers-1.0.2.tgz",
"integrity": "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0",
"function-bind": "^1.1.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/combined-stream": {
"version": "1.0.8",
"resolved": "https://registry.npmmirror.com/combined-stream/-/combined-stream-1.0.8.tgz",
"integrity": "sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==",
"license": "MIT",
"dependencies": {
"delayed-stream": "~1.0.0"
},
"engines": {
"node": ">= 0.8"
}
},
"node_modules/delayed-stream": {
"version": "1.0.0",
"resolved": "https://registry.npmmirror.com/delayed-stream/-/delayed-stream-1.0.0.tgz",
"integrity": "sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==",
"license": "MIT",
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/dunder-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/dunder-proto/-/dunder-proto-1.0.1.tgz",
"integrity": "sha512-KIN/nDJBQRcXw0MLVhZE9iQHmG68qAVIBg9CqmUYjmQIhgij9U5MFvrqkUL5FbtyyzZuOeOt0zdeRe4UY7ct+A==",
"license": "MIT",
"dependencies": {
"call-bind-apply-helpers": "^1.0.1",
"es-errors": "^1.3.0",
"gopd": "^1.2.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-define-property": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/es-define-property/-/es-define-property-1.0.1.tgz",
"integrity": "sha512-e3nRfgfUZ4rNGL232gUgX06QNyyez04KdjFrF+LTRoOXmrOgFKDg4BCdsjW8EnT69eqdYGmRpJwiPVYNrCaW3g==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-errors": {
"version": "1.3.0",
"resolved": "https://registry.npmmirror.com/es-errors/-/es-errors-1.3.0.tgz",
"integrity": "sha512-Zf5H2Kxt2xjTvbJvP2ZWLEICxA6j+hAmMzIlypy4xcBg1vKVnx89Wy0GbS+kf5cwCVFFzdCFh2XSCFNULS6csw==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-object-atoms": {
"version": "1.1.1",
"resolved": "https://registry.npmmirror.com/es-object-atoms/-/es-object-atoms-1.1.1.tgz",
"integrity": "sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/es-set-tostringtag": {
"version": "2.1.0",
"resolved": "https://registry.npmmirror.com/es-set-tostringtag/-/es-set-tostringtag-2.1.0.tgz",
"integrity": "sha512-j6vWzfrGVfyXxge+O0x5sh6cvxAog0a/4Rdd2K36zCMV5eJ+/+tOAngRO8cODMNWbVRdVlmGZQL2YS3yR8bIUA==",
"license": "MIT",
"dependencies": {
"es-errors": "^1.3.0",
"get-intrinsic": "^1.2.6",
"has-tostringtag": "^1.0.2",
"hasown": "^2.0.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/follow-redirects": {
"version": "1.15.11",
"resolved": "https://registry.npmmirror.com/follow-redirects/-/follow-redirects-1.15.11.tgz",
"integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==",
"funding": [
{
"type": "individual",
"url": "https://github.com/sponsors/RubenVerborgh"
}
],
"license": "MIT",
"engines": {
"node": ">=4.0"
},
"peerDependenciesMeta": {
"debug": {
"optional": true
}
}
},
"node_modules/form-data": {
"version": "4.0.5",
"resolved": "https://registry.npmmirror.com/form-data/-/form-data-4.0.5.tgz",
"integrity": "sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w==",
"license": "MIT",
"dependencies": {
"asynckit": "^0.4.0",
"combined-stream": "^1.0.8",
"es-set-tostringtag": "^2.1.0",
"hasown": "^2.0.2",
"mime-types": "^2.1.12"
},
"engines": {
"node": ">= 6"
}
},
"node_modules/function-bind": {
"version": "1.1.2",
"resolved": "https://registry.npmmirror.com/function-bind/-/function-bind-1.1.2.tgz",
"integrity": "sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==",
"license": "MIT",
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/get-intrinsic": {
"version": "1.3.0",
"resolved": "https://registry.npmmirror.com/get-intrinsic/-/get-intrinsic-1.3.0.tgz",
"integrity": "sha512-9fSjSaos/fRIVIp+xSJlE6lfwhES7LNtKaCBIamHsjr2na1BiABJPo0mOjjz8GJDURarmCPGqaiVg5mfjb98CQ==",
"license": "MIT",
"dependencies": {
"call-bind-apply-helpers": "^1.0.2",
"es-define-property": "^1.0.1",
"es-errors": "^1.3.0",
"es-object-atoms": "^1.1.1",
"function-bind": "^1.1.2",
"get-proto": "^1.0.1",
"gopd": "^1.2.0",
"has-symbols": "^1.1.0",
"hasown": "^2.0.2",
"math-intrinsics": "^1.1.0"
},
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/get-proto": {
"version": "1.0.1",
"resolved": "https://registry.npmmirror.com/get-proto/-/get-proto-1.0.1.tgz",
"integrity": "sha512-sTSfBjoXBp89JvIKIefqw7U2CCebsc74kiY6awiGogKtoSGbgjYE/G/+l9sF3MWFPNc9IcoOC4ODfKHfxFmp0g==",
"license": "MIT",
"dependencies": {
"dunder-proto": "^1.0.1",
"es-object-atoms": "^1.0.0"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/gopd": {
"version": "1.2.0",
"resolved": "https://registry.npmmirror.com/gopd/-/gopd-1.2.0.tgz",
"integrity": "sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/has-symbols": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/has-symbols/-/has-symbols-1.1.0.tgz",
"integrity": "sha512-1cDNdwJ2Jaohmb3sg4OmKaMBwuC48sYni5HUw2DvsC8LjGTLK9h+eb1X6RyuOHe4hT0ULCW68iomhjUoKUqlPQ==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/has-tostringtag": {
"version": "1.0.2",
"resolved": "https://registry.npmmirror.com/has-tostringtag/-/has-tostringtag-1.0.2.tgz",
"integrity": "sha512-NqADB8VjPFLM2V0VvHUewwwsw0ZWBaIdgo+ieHtK3hasLz4qeCRjYcqfB6AQrBggRKppKF8L52/VqdVsO47Dlw==",
"license": "MIT",
"dependencies": {
"has-symbols": "^1.0.3"
},
"engines": {
"node": ">= 0.4"
},
"funding": {
"url": "https://github.com/sponsors/ljharb"
}
},
"node_modules/hasown": {
"version": "2.0.2",
"resolved": "https://registry.npmmirror.com/hasown/-/hasown-2.0.2.tgz",
"integrity": "sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ==",
"license": "MIT",
"dependencies": {
"function-bind": "^1.1.2"
},
"engines": {
"node": ">= 0.4"
}
},
"node_modules/math-intrinsics": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/math-intrinsics/-/math-intrinsics-1.1.0.tgz",
"integrity": "sha512-/IXtbwEk5HTPyEwyKX6hGkYXxM9nbj64B+ilVJnC/R6B0pH5G4V3b0pVbL7DBj4tkhBAppbQUlf6F6Xl9LHu1g==",
"license": "MIT",
"engines": {
"node": ">= 0.4"
}
},
"node_modules/mime-db": {
"version": "1.52.0",
"resolved": "https://registry.npmmirror.com/mime-db/-/mime-db-1.52.0.tgz",
"integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==",
"license": "MIT",
"engines": {
"node": ">= 0.6"
}
},
"node_modules/mime-types": {
"version": "2.1.35",
"resolved": "https://registry.npmmirror.com/mime-types/-/mime-types-2.1.35.tgz",
"integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==",
"license": "MIT",
"dependencies": {
"mime-db": "1.52.0"
},
"engines": {
"node": ">= 0.6"
}
},
"node_modules/proxy-from-env": {
"version": "1.1.0",
"resolved": "https://registry.npmmirror.com/proxy-from-env/-/proxy-from-env-1.1.0.tgz",
"integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==",
"license": "MIT"
},
"node_modules/ws": {
"version": "8.19.0",
"resolved": "https://registry.npmmirror.com/ws/-/ws-8.19.0.tgz",
"integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==",
"license": "MIT",
"engines": {
"node": ">=10.0.0"
},
"peerDependencies": {
"bufferutil": "^4.0.1",
"utf-8-validate": ">=5.0.2"
},
"peerDependenciesMeta": {
"bufferutil": {
"optional": true
},
"utf-8-validate": {
"optional": true
}
}
}
}
}
FILE:package.json
{
"name": "ai-news-feishu",
"version": "1.0.0",
"description": "AI News Feishu - WebSocket receiver with LLM filtering",
"main": "scripts/receiver.js",
"scripts": {
"start": "node scripts/receiver.js"
},
"dependencies": {
"ws": "^8.14.2",
"axios": "^1.6.0"
}
}
FILE:scripts/receiver.js
#!/usr/bin/env node
/**
* AI News Feishu v2.0
* WebSocket 数据接收 + 大模型过滤 + 飞书推送
*/
const WebSocket = require('ws');
const axios = require('axios');
const fs = require('fs');
const path = require('path');
const os = require('os');
// ============ 配置加载 ============
const CONFIG_PATH = path.join(os.homedir(), '.openclaw', 'subscribe-filter-feishu.json');
const DATA_DIR = path.join(os.homedir(), 'clawd', 'data', 'subscribe-filter-feishu');
const PID_FILE = path.join(DATA_DIR, 'receiver.pid');
const LOG_FILE = path.join(DATA_DIR, 'receiver.log');
const STATS_FILE = path.join(DATA_DIR, 'stats.json');
// 默认配置(敏感信息必须从配置文件读取)
const DEFAULT_CONFIG = {
ws_url: '', // 必填
feishu_app_id: '', // 必填
feishu_app_secret: '', // 必填
feishu_user_id: '', // 必填
model_api_key: '', // 必填
model_base_url: 'https://ark.cn-beijing.volces.com/api/v3',
model_name: '',
reconnect_delay: 2,
reconnect_max_delay: 60,
};
let config = { ...DEFAULT_CONFIG };
function loadConfig() {
if (!fs.existsSync(CONFIG_PATH)) {
console.error('❌ 配置文件不存在:', CONFIG_PATH);
console.error('');
console.error('请创建配置文件:');
console.error(`mkdir -p ~/.openclaw && cat > CONFIG_PATH << 'EOF'`);
console.error(JSON.stringify({
ws_url: 'ws://your-server:port/ws',
feishu_app_id: 'your_app_id',
feishu_app_secret: 'your_app_secret',
feishu_user_id: 'your_open_id',
model_api_key: 'your_api_key',
}, null, 2));
console.error('EOF');
process.exit(1);
}
try {
const fileConfig = JSON.parse(fs.readFileSync(CONFIG_PATH, 'utf-8'));
config = { ...DEFAULT_CONFIG, ...fileConfig };
} catch (e) {
console.error('❌ 配置文件解析失败:', e.message);
process.exit(1);
}
// 验证必填项
const required = ['ws_url', 'feishu_app_id', 'feishu_app_secret', 'feishu_user_id', 'model_api_key', 'model_name'];
const missing = required.filter(k => !config[k]);
if (missing.length > 0) {
console.error('❌ 缺少必填配置:', missing.join(', '));
console.error('请编辑配置文件:', CONFIG_PATH);
process.exit(1);
}
}
// ============ 日志 ============
function ensureDataDir() {
if (!fs.existsSync(DATA_DIR)) {
fs.mkdirSync(DATA_DIR, { recursive: true });
}
}
function log(msg) {
const line = `new Date().toISOString() - msg`;
console.log(line);
try {
fs.appendFileSync(LOG_FILE, line + '\n');
} catch (e) {}
}
// ============ PID 管理 ============
function writePid() {
fs.writeFileSync(PID_FILE, process.pid.toString());
log(`PID process.pid written to PID_FILE`);
}
function removePid() {
try {
if (fs.existsSync(PID_FILE)) {
fs.unlinkSync(PID_FILE);
}
} catch (e) {}
}
function isAlreadyRunning() {
if (!fs.existsSync(PID_FILE)) return false;
try {
const pid = parseInt(fs.readFileSync(PID_FILE, 'utf-8').trim());
// 检查进程是否存在
process.kill(pid, 0);
return true;
} catch (e) {
// 进程不存在,清理残留 PID 文件
removePid();
return false;
}
}
// ============ 统计持久化 ============
let stats = { totalReceived: 0, totalAI: 0, lastUpdate: null };
function loadStats() {
try {
if (fs.existsSync(STATS_FILE)) {
stats = JSON.parse(fs.readFileSync(STATS_FILE, 'utf-8'));
}
} catch (e) {}
}
function saveStats() {
stats.lastUpdate = new Date().toISOString();
try {
fs.writeFileSync(STATS_FILE, JSON.stringify(stats, null, 2));
} catch (e) {}
}
// ============ 全局状态 ============
let feishuAccessToken = '';
let feishuTokenExpiry = 0;
let reconnectCount = 0;
let ws = null;
// ============ 飞书 Token ============
async function getFeishuToken() {
// 检查 token 是否还有效(提前 5 分钟刷新)
if (feishuAccessToken && Date.now() < feishuTokenExpiry - 300000) {
return feishuAccessToken;
}
try {
const response = await axios.post(
'https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal',
{
app_id: config.feishu_app_id,
app_secret: config.feishu_app_secret,
}
);
feishuAccessToken = response.data.app_access_token;
// token 有效期 2 小时
feishuTokenExpiry = Date.now() + response.data.expire * 1000;
log('✅ 飞书 Token 获取成功');
return feishuAccessToken;
} catch (err) {
log('❌ 飞书 Token 获取失败: ' + err.message);
return null;
}
}
// ============ 发送飞书消息 ============
async function sendToFeishu(news, llmAnswer, sendDelay, processDelay) {
const token = await getFeishuToken();
if (!token) return false;
try {
let timing = '';
if (sendDelay !== null) {
const total = sendDelay + processDelay;
timing = `\n\n⏱️ 耗时: 服务端 sendDelayms + 处理 processDelayms = 总计 totalms`;
}
const content = `【news.title】
news.content
🔗 news.urltiming
🤖 AI判断: llmAnswer`;
await axios.post(
'https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id',
{
receive_id: config.feishu_user_id,
msg_type: 'text',
content: JSON.stringify({ text: content })
},
{
headers: {
'Authorization': `Bearer token`,
'Content-Type': 'application/json'
}
}
);
log(`✅ 已推送: news.title`);
return true;
} catch (err) {
log('❌ 推送失败: ' + err.message);
// token 过期,清除缓存
if (err.response?.status === 401) {
feishuAccessToken = '';
feishuTokenExpiry = 0;
}
return false;
}
}
// ============ 大模型判断 ============
async function isAIRelated(news) {
const prompt = `严格判断以下新闻是否与AI(人工智能)核心技术的应用相关。
AI核心技术包括:机器学习、深度学习、神经网络、大语言模型(LLM)、自然语言处理(NLP)、计算机视觉(CV)、强化学习、Transformer架构、GPT/BERT等模型、AIGC(AI生成内容)等。
只回答"是"或"否",不要解释:
新闻标题:news.title
新闻内容:news.content`;
try {
const response = await axios.post(
`config.model_base_url/chat/completions`,
{
model: config.model_name,
messages: [{ role: 'user', content: prompt }],
max_tokens: 10,
temperature: 0,
},
{
headers: {
'Authorization': `Bearer config.model_api_key`,
'Content-Type': 'application/json'
},
timeout: 30000,
}
);
const answer = response.data.choices[0].message.content.trim();
log(` [LLM] "news.title.slice(0, 30)..." -> answer`);
return { isAI: answer.includes('是') || answer.toLowerCase().includes('yes'), answer };
} catch (err) {
log('❌ LLM 调用失败: ' + err.message);
return { isAI: false, answer: '调用失败' };
}
}
// ============ 处理新闻 ============
async function processNews(news) {
const receiveTime = Date.now();
let sendDelay = null;
if (news.timestamp) {
let ts = news.timestamp;
if (ts < 1e12) ts = ts * 1000;
sendDelay = receiveTime - ts;
}
stats.totalReceived++;
const llmResult = await isAIRelated(news);
const processDelay = Date.now() - receiveTime;
if (llmResult.isAI) {
stats.totalAI++;
log(`🤖 [stats.totalReceived] AI新闻: news.title`);
await sendToFeishu(news, llmResult.answer, sendDelay, processDelay);
} else {
log(`📰 [stats.totalReceived] 非AI: news.title`);
}
// 每 10 条保存一次统计
if (stats.totalReceived % 10 === 0) {
saveStats();
}
}
// ============ 指数退避 ============
function calculateBackoff() {
const delay = Math.min(
config.reconnect_delay * Math.pow(2, reconnectCount),
config.reconnect_max_delay
);
return delay * 1000;
}
// ============ WebSocket 连接 ============
function connect() {
log(`🔌 Connecting to config.ws_url...`);
ws = new WebSocket(config.ws_url);
ws.on('open', () => {
log('✅ Connected!');
reconnectCount = 0; // 重置重连计数
});
ws.on('message', async (data) => {
try {
const json = JSON.parse(data.toString());
await processNews(json);
} catch (e) {
// 忽略解析错误
}
});
ws.on('close', () => {
const delay = calculateBackoff();
reconnectCount++;
log(`⚠️ Disconnected, reconnecting in delay/1000s... (attempt #reconnectCount)`);
setTimeout(connect, delay);
});
ws.on('error', (e) => {
log('❌ WebSocket Error: ' + e.message);
});
}
// ============ 优雅关闭 ============
function shutdown() {
log('\n🛑 Shutting down...');
log(`📊 总计接收: stats.totalReceived, AI新闻: stats.totalAI`);
saveStats();
removePid();
if (ws) {
ws.close();
}
process.exit(0);
}
// ============ 异常兜底 ============
process.on('uncaughtException', (err) => {
log(`💥 未捕获异常: err.message`);
log(err.stack || '');
// 不退出,继续运行
});
process.on('unhandledRejection', (reason) => {
log(`💥 未处理的 Promise 拒绝: reason`);
// 不退出,继续运行
});
// ============ 主入口 ============
async function main() {
ensureDataDir();
loadConfig();
// 检查是否已运行
if (isAlreadyRunning()) {
console.error('❌ 已有实例在运行,请先停止');
console.error('使用: ai-news-feishu stop');
process.exit(1);
}
writePid();
loadStats();
log('='.repeat(60));
log('🚀 Subscribe-Filter-Feishu v1.0.0 Started');
log(`📡 WebSocket: config.ws_url`);
log(`🤖 Model: config.model_name`);
log(`📊 历史统计: 接收 stats.totalReceived, AI stats.totalAI`);
log('='.repeat(60));
// 预先获取飞书 token
await getFeishuToken();
connect();
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
}
main();
基于 kiro-spec-engine (kse) 的 CLI 开发工作流。使用 spec 驱动开发,通过 kse 命令创建规格文档、管理项目结构、自动化开发流程。当用户需要:(1) 用 kse CLI 进行 spec-driven 开发,(2) 创建和管理项目 specs,(3) 使用 enhance 提升文档...
---
name: kse-dev
description: 基于 kiro-spec-engine (kse) 的 CLI 开发工作流。使用 spec 驱动开发,通过 kse 命令创建规格文档、管理项目结构、自动化开发流程。当用户需要:(1) 用 kse CLI 进行 spec-driven 开发,(2) 创建和管理项目 specs,(3) 使用 enhance 提升文档质量,(4) 运行 doctor 检查系统环境,(5) 初始化新项目或 adoption 现有项目。
---
# KSE CLI 开发工作流
## 概述
KSE (Kiro Spec Engine) 是一个 npm 包,提供 CLI 工具支持规格驱动开发。
**安装:**
```bash
npm install -g kiro-spec-engine
```
**命令:** `kse`
## 快速开始
### 1. 初始化新项目
```bash
cd <your-project-folder>
kse init
```
### 2. 创建 Spec
```bash
kse spec create <spec-name>
```
### 3. 编写规格文档
在 `.kiro/specs/<spec-name>/` 下创建:
- `requirements.md` - 需求定义和验收标准
- `design.md` - 技术设计
- `tasks.md` - 任务清单
### 4. 增强文档质量
```bash
kse enhance requirements <file-path>
kse enhance design <file-path>
```
### 5. 检查系统
```bash
kse doctor
```
## 常用命令
| 命令 | 说明 |
|------|------|
| `kse init` | 初始化 Kiro 项目结构 |
| `kse spec create <name>` | 创建新 spec |
| `kse enhance <stage> <file>` | 增强文档 |
| `kse doctor` | 检查系统环境 |
| `kse status` | 查看项目状态 |
## 开发流程
1. **创建项目文件夹** → 创建独立的项目目录
2. **初始化** → `cd <project> && kse init`
3. **创建 Spec** → `kse spec create <feature-name>`
4. **编写需求** → 编辑 `requirements.md`
5. **实现代码** → 按需求实现
6. **验证** → 运行代码确认符合验收标准
## 注意事项
- 项目放在独立子文件夹中,避免互相污染
- 不使用 GUI,纯 CLI 开发
- 遵循 spec-driven 开发流程
FILE:_meta.json
{
"version": "1.0.0",
"author": "song",
"description": "基于 kiro-spec-engine (kse) 的 CLI 开发工作流"
}
WebSocket 数据接收练手 skill。支持自动重连、批量处理和 AI 分析集成。
---
name: "websocket-receiver"
description: "WebSocket 数据接收练手 skill。支持自动重连、批量处理和 AI 分析集成。"
---
# WebSocket 接收器 v1.1.2
🎓 这是一个 WebSocket 对接的练手 skill,适合学习如何:
- 建立和维护 WebSocket 长连接
- 处理实时数据流
- 实现自动重连和错误恢复
- 批量处理和 AI 分析集成
可作为模板,修改后对接你自己的 WebSocket 数据源。
## 功能特点
- 🔌 自动重连(指数退避算法)
- 📦 批量数据处理
- 🤖 可选 AI 分析集成
- 📊 日志轮转
- 🛑 优雅关闭(处理完缓冲区再退出)
- 💾 JSONL 数据持久化
## 安装
```bash
# 安装依赖
pip install websockets
# 或使用虚拟环境
~/clawd/venv/bin/pip install websockets
```
## 获取 WebSocket 地址
⚠️ 本 skill 仅供学习练手,不包含真实数据源。
配套的测试服务端每隔 10 秒推送一条模拟数据,方便你观察接收和批量处理流程。
如需测试,你可以:
1. 联系作者获取测试服务器地址
2. 自己搭建一个 WebSocket 服务器
3. 使用公开的测试 WebSocket
配置方式:
- 环境变量:`WEBSOCKET_URL=ws://your-server:port/ws`
- 配置文件:`~/.openclaw/websocket-config.json`
## 快速开始
```bash
# 前台测试(替换为你的真实地址)
WEBSOCKET_URL=ws://your-server:port/ws websocket-receiver test
# 后台运行
WEBSOCKET_URL=ws://your-server:port/ws websocket-receiver start
# 查看状态
websocket-receiver status
# 查看日志
websocket-receiver logs
# 停止
websocket-receiver stop
```
## 配置
### 环境变量
| 变量 | 说明 | 默认值 |
|------|------|--------|
| `WEBSOCKET_URL` | WebSocket 服务器地址 | (需配置) |
| `WEBSOCKET_BATCH` | 批次大小 | `10` |
| `WEBSOCKET_DATA_DIR` | 数据目录 | `~/clawd/data/websocket` |
| `WEBSOCKET_CONFIG` | 配置文件路径 | `~/.openclaw/websocket-config.json` |
### 配置文件
创建 `~/.openclaw/websocket-config.json`:
```json
{
"ws_url": "ws://your-server:port/ws",
"batch_size": 10,
"auto_analyze": true,
"data_dir": "~/clawd/data/websocket",
"reconnect_delay": 2,
"reconnect_max_delay": 60,
"reconnect_max_attempts": 0
}
```
### 配置项说明
| 配置项 | 说明 | 默认值 |
|--------|------|--------|
| `ws_url` | WebSocket 地址 | (需配置) |
| `batch_size` | 触发批量处理的消息数 | `10` |
| `auto_analyze` | 是否自动 AI 分析 | `true` |
| `data_dir` | 数据存储目录 | `~/clawd/data/websocket` |
| `reconnect_delay` | 初始重连延迟(秒) | `2` |
| `reconnect_max_delay` | 最大重连延迟(秒) | `60` |
| `reconnect_max_attempts` | 最大重连次数(0=无限) | `0` |
| `connect_timeout` | 连接超时(秒) | `30` |
| `ping_interval` | 心跳间隔(秒) | `30` |
| `ping_timeout` | 心跳超时(秒) | `10` |
## 命令
```bash
websocket-receiver start # 后台启动
websocket-receiver stop # 停止
websocket-receiver restart # 重启
websocket-receiver status # 查看状态
websocket-receiver logs # 实时日志
websocket-receiver config # 查看或创建配置
websocket-receiver test # 前台测试
```
## 数据格式
接收的 JSON 消息格式:
```json
{
"id": "unique-id",
"title": "标题",
"content": "内容",
"url": "链接",
"timestamp": "2026-03-12T12:00:00Z"
}
```
数据保存为 JSONL 格式:
```json
{"received_at": "2026-03-12T12:00:00", "data": {...}}
```
## 文件结构
```
~/clawd/data/websocket/
├── receiver.pid # 进程 ID 文件
├── receiver.log # 日志文件(自动轮转)
├── data_20260312_14.jsonl # 按小时分割的数据文件
├── data_20260312_15.jsonl
└── analysis_20260312.md # AI 分析报告
```
## 自定义处理
```python
from receiver import WebSocketReceiver
receiver = WebSocketReceiver(config)
# 自定义消息处理函数
def my_handler(data):
print(f"收到数据: {data}")
return True # 返回 True 表示处理成功
receiver.on_message = my_handler
# 自定义批量处理函数(支持 async)
async def my_batch_handler(batch):
# 在这里编写自定义分析逻辑
return "分析结果"
receiver.on_batch = my_batch_handler
receiver.run()
```
## 依赖
- Python 3.8+
- websockets
## 版本历史
### v1.1.2
- 强制配置 WebSocket 地址,未配置时启动报错
### v1.1.1
- 重写核心逻辑,提升稳定性
- 添加指数退避重连
- 异步 subprocess 调用
- 优雅关闭机制
- 日志轮转
- 正确的 PID 管理
### v1.0.0
- 初始版本
## 许可证
MIT
FILE:install.sh
#!/bin/bash
# WebSocket Receiver Skill - 安装脚本
echo "🚀 安装 WebSocket Receiver Skill..."
# 检查 Python
if ! command -v python3 &> /dev/null; then
echo "❌ 需要 Python 3"
exit 1
fi
# 安装依赖
echo "📦 安装依赖..."
python3 -m pip install --user websockets 2>/dev/null || python3 -m pip install websockets
# 创建数据目录
mkdir -p ~/clawd/data/websocket
# 添加到 PATH
SKILL_BIN="$HOME/clawd/skills/websocket-receiver/scripts"
SHELL_RC=""
if [ -f "$HOME/.zshrc" ]; then
SHELL_RC="$HOME/.zshrc"
elif [ -f "$HOME/.bashrc" ]; then
SHELL_RC="$HOME/.bashrc"
fi
if [ -n "$SHELL_RC" ]; then
if ! grep -q "websocket-receiver" "$SHELL_RC"; then
echo "export PATH=\"$SKILL_BIN:\$PATH\"" >> "$SHELL_RC"
echo "✅ 已添加到 PATH ($SHELL_RC)"
fi
fi
# 创建示例配置
CONFIG_FILE="$HOME/.openclaw/websocket-config.json"
if [ ! -f "$CONFIG_FILE" ]; then
cat > "$CONFIG_FILE" << 'EOF'
{
"ws_url": "ws://59.110.46.1:6680/ws",
"batch_size": 10,
"auto_analyze": true,
"data_dir": "~/clawd/data/websocket"
}
EOF
echo "✅ 创建配置: $CONFIG_FILE"
fi
echo ""
echo "✅ 安装完成!"
echo ""
echo "使用方法:"
echo " websocket-receiver start # 启动"
echo " websocket-receiver stop # 停止"
echo " websocket-receiver status # 状态"
echo " websocket-receiver logs # 日志"
echo ""
echo "或者使用环境变量:"
echo " WEBSOCKET_URL=ws://example.com:8765 websocket-receiver start"
echo ""
FILE:metadata.json
{
"name": "websocket-receiver",
"version": "1.1.2",
"description": "WebSocket data receiver skill for learning and practice. Supports auto-reconnect, batch processing, and AI analysis.",
"author": "Song",
"license": "MIT",
"requires": ["websockets"],
"python": ">=3.8"
}
FILE:scripts/receiver.py
#!/usr/bin/env python3
"""
WebSocket Receiver Skill v2.0
稳定、可靠的 WebSocket 数据接收框架
"""
import asyncio
import atexit
import json
import logging
import os
import signal
import sys
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Callable, List, Dict, Any, Optional
try:
import websockets
from websockets.exceptions import ConnectionClosed, InvalidStatusCode
except ImportError:
print("请安装 websockets: pip install websockets")
sys.exit(1)
# ============ 默认配置 ============
DEFAULT_CONFIG = {
"ws_url": "", # 必须配置,否则无法启动
"batch_size": 10,
"auto_analyze": True,
"data_dir": "~/clawd/data/websocket",
# 重连配置
"reconnect_delay": 2, # 初始重连延迟(秒)
"reconnect_max_delay": 60, # 最大重连延迟(秒)
"reconnect_max_attempts": 0, # 最大重连次数,0=无限
# 连接配置
"connect_timeout": 30, # 连接超时(秒)
"ping_interval": 30, # 心跳间隔(秒)
"ping_timeout": 10, # 心跳超时(秒)
# 日志配置
"log_max_bytes": 10 * 1024 * 1024, # 10MB
"log_backup_count": 3,
}
# ==================================
class WebSocketReceiver:
"""WebSocket 接收器主类"""
def __init__(self, config: Dict[str, Any] = None):
self.config = {**DEFAULT_CONFIG, **(config or {})}
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.running = False
self.shutting_down = False
self.reconnect_count = 0
self.processed_count = 0
self.analysis_count = 0
self.batch_buffer: List[Dict] = []
self.current_delay = self.config["reconnect_delay"]
# 数据目录
self.data_dir = Path(self.config["data_dir"]).expanduser()
self.data_dir.mkdir(parents=True, exist_ok=True)
# PID 文件
self.pid_file = self.data_dir / "receiver.pid"
# 设置日志
self._setup_logging()
# 自定义处理器
self.on_message: Callable = self._default_on_message
self.on_batch: Callable = self._default_on_batch
def _setup_logging(self):
"""配置带轮转的日志"""
log_file = self.data_dir / "receiver.log"
# 创建 logger
self.logger = logging.getLogger("websocket-receiver")
self.logger.setLevel(logging.INFO)
self.logger.handlers.clear()
# 文件处理器(带轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=self.config["log_max_bytes"],
backupCount=self.config["log_backup_count"],
encoding='utf-8'
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def _write_pid(self):
"""写入 PID 文件"""
try:
self.pid_file.write_text(str(os.getpid()))
self.logger.info(f"PID {os.getpid()} written to {self.pid_file}")
except Exception as e:
self.logger.error(f"Failed to write PID file: {e}")
def _remove_pid(self):
"""删除 PID 文件"""
try:
if self.pid_file.exists():
self.pid_file.unlink()
self.logger.info("PID file removed")
except Exception as e:
self.logger.error(f"Failed to remove PID file: {e}")
def _default_on_message(self, data: Dict) -> bool:
"""默认消息处理:保存到 JSONL 文件"""
try:
timestamp = datetime.now().strftime('%Y%m%d_%H')
data_file = self.data_dir / f"data_{timestamp}.jsonl"
with open(data_file, 'a', encoding='utf-8') as f:
record = {
"received_at": datetime.now().isoformat(),
"data": data
}
f.write(json.dumps(record, ensure_ascii=False) + '\n')
return True
except Exception as e:
self.logger.error(f"Failed to save message: {e}")
return False
async def _default_on_batch(self, batch: List[Dict]) -> Optional[str]:
"""默认批量处理:异步 AI 分析"""
if not self.config.get("auto_analyze"):
return None
try:
# 构建分析提示
news_list = []
for i, news in enumerate(batch, 1):
title = news.get('title', 'N/A')
content = news.get('content', 'N/A')[:300]
news_list.append(f"【{i}】{title}: {content}...")
all_news = "\n".join(news_list)
prompt = f"""请分析以下 {len(batch)} 条新闻,提供批量总结:
{all_news}
请用中文回复,格式:
## 批量总结({len(batch)}条)
### 📌 核心趋势
### 🔥 热点话题
### 💡 关键洞察
"""
self.logger.info(f"🤖 Analyzing batch of {len(batch)} items...")
# 异步调用 OpenClaw
proc = await asyncio.create_subprocess_exec(
'openclaw', 'agent', '--agent', 'main', '--message', prompt,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=120
)
if proc.returncode == 0:
return stdout.decode('utf-8').strip()
else:
self.logger.error(f"OpenClaw error: {stderr.decode('utf-8')}")
return None
except asyncio.TimeoutError:
proc.kill()
self.logger.error("OpenClaw timeout")
return None
except Exception as e:
self.logger.error(f"Failed to analyze batch: {e}")
return None
def _save_analysis(self, batch: List[Dict], analysis: str):
"""保存分析报告"""
try:
analysis_file = self.data_dir / f"analysis_{datetime.now().strftime('%Y%m%d')}.md"
with open(analysis_file, 'a', encoding='utf-8') as f:
f.write(f"\n{'='*60}\n")
f.write(f"## 批量分析 #{self.analysis_count} ({len(batch)}条)\n")
f.write(f"**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(analysis)
f.write(f"\n{'='*60}\n")
except Exception as e:
self.logger.error(f"Failed to save analysis: {e}")
async def _notify(self, message: str):
"""异步发送通知"""
try:
proc = await asyncio.create_subprocess_exec(
'openclaw', 'notify', message,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await asyncio.wait_for(proc.wait(), timeout=10)
except Exception as e:
self.logger.error(f"Failed to notify: {e}")
async def _process_batch(self):
"""处理批量数据"""
if not self.batch_buffer:
return
batch = self.batch_buffer.copy()
self.batch_buffer.clear()
# 调用批量处理器(支持同步和异步)
if asyncio.iscoroutinefunction(self.on_batch):
analysis = await self.on_batch(batch)
else:
analysis = self.on_batch(batch)
if analysis:
self.analysis_count += 1
self._save_analysis(batch, analysis)
report = f"📊 已处理 {self.processed_count} 条,完成第 {self.analysis_count} 次批量分析({len(batch)}条)"
self.logger.info(f"\n{'='*60}\n{report}\n{'='*60}")
await self._notify(report)
async def _handle_message(self, message: str):
"""处理收到的消息"""
try:
data = json.loads(message)
title = data.get('title', 'N/A')[:40]
batch_size = self.config["batch_size"]
self.logger.info(f"📩 [{len(self.batch_buffer)+1}/{batch_size}] {title}...")
# 调用消息处理器
if self.on_message(data):
self.processed_count += 1
self.batch_buffer.append(data)
# 达到批量阈值,触发处理
if len(self.batch_buffer) >= batch_size:
await self._process_batch()
except json.JSONDecodeError:
self.logger.warning(f"Non-JSON message: {message[:200]}")
except Exception as e:
self.logger.error(f"Error handling message: {e}")
def _calculate_backoff(self) -> float:
"""计算指数退避延迟"""
delay = min(
self.config["reconnect_delay"] * (2 ** self.reconnect_count),
self.config["reconnect_max_delay"]
)
return delay
async def _connect_loop(self):
"""主连接循环(带指数退避重连)"""
ws_url = self.config["ws_url"]
max_attempts = self.config["reconnect_max_attempts"]
while self.running and not self.shutting_down:
# 检查最大重连次数
if max_attempts > 0 and self.reconnect_count >= max_attempts:
self.logger.error(f"❌ Max reconnect attempts ({max_attempts}) reached. Giving up.")
break
try:
self.logger.info(f"🔌 Connecting to {ws_url}...")
# 带超时的连接
self.ws = await asyncio.wait_for(
websockets.connect(
ws_url,
ping_interval=self.config["ping_interval"],
ping_timeout=self.config["ping_timeout"],
close_timeout=10
),
timeout=self.config["connect_timeout"]
)
# 连接成功,重置计数器
self.reconnect_count = 0
self.logger.info("✅ Connected!")
try:
async for message in self.ws:
if self.shutting_down:
break
await self._handle_message(message)
except ConnectionClosed as e:
self.logger.warning(f"⚠️ Connection closed: code={e.code}, reason={e.reason}")
finally:
self.ws = None
except asyncio.TimeoutError:
self.logger.error(f"❌ Connection timeout ({self.config['connect_timeout']}s)")
except ConnectionRefusedError:
self.logger.error("❌ Connection refused")
except InvalidStatusCode as e:
self.logger.error(f"❌ Invalid status code: {e.status_code}")
except OSError as e:
self.logger.error(f"❌ Network error: {e}")
except Exception as e:
self.logger.error(f"❌ Unexpected error: {type(e).__name__}: {e}")
# 重连逻辑
if self.running and not self.shutting_down:
self.reconnect_count += 1
delay = self._calculate_backoff()
self.logger.info(f"🔄 Reconnecting in {delay:.1f}s... (attempt #{self.reconnect_count})")
# 可中断的等待
try:
await asyncio.sleep(delay)
except asyncio.CancelledError:
break
async def _shutdown(self):
"""优雅关闭"""
if self.shutting_down:
return
self.shutting_down = True
self.logger.info("\n🛑 Shutting down gracefully...")
# 处理剩余缓冲区
if self.batch_buffer:
self.logger.info(f"Processing remaining {len(self.batch_buffer)} items...")
await self._process_batch()
# 关闭 WebSocket 连接
if self.ws:
try:
await self.ws.close()
except Exception:
pass
# 最终报告
final_report = f"📊 本次共处理 {self.processed_count} 条,完成 {self.analysis_count} 次批量分析"
self.logger.info(final_report)
await self._notify(final_report)
self.running = False
def _setup_signal_handlers(self, loop: asyncio.AbstractEventLoop):
"""设置信号处理器"""
def handle_signal(sig):
self.logger.info(f"Received signal {sig.name}")
if not self.shutting_down:
asyncio.create_task(self._shutdown())
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, lambda s=sig: handle_signal(s))
except NotImplementedError:
# Windows 不支持 add_signal_handler
signal.signal(sig, lambda s, f, sig=sig: handle_signal(sig))
async def _run_async(self):
"""异步主入口"""
loop = asyncio.get_running_loop()
self._setup_signal_handlers(loop)
self.running = True
self._write_pid()
atexit.register(self._remove_pid)
self.logger.info("=" * 60)
self.logger.info("🚀 WebSocket Receiver v2.0 Started")
self.logger.info(f"📁 Data: {self.data_dir}")
self.logger.info(f"🔗 URL: {self.config['ws_url']}")
self.logger.info(f"📦 Batch: {self.config['batch_size']}")
self.logger.info(f"🔄 Reconnect: delay={self.config['reconnect_delay']}s, max={self.config['reconnect_max_delay']}s")
self.logger.info("=" * 60)
try:
await self._connect_loop()
finally:
if not self.shutting_down:
await self._shutdown()
self._remove_pid()
def run(self):
"""启动接收器(同步入口)"""
# 检查必须配置
if not self.config.get("ws_url"):
print("❌ 错误: 未配置 WebSocket 地址")
print("")
print("请通过以下方式配置:")
print(" 1. 环境变量: WEBSOCKET_URL=ws://your-server:port/ws")
print(" 2. 配置文件: ~/.openclaw/websocket-config.json")
print(" 3. 命令行: --url ws://your-server:port/ws")
print("")
print("联系作者获取测试服务器地址,或自行搭建 WebSocket 服务。")
sys.exit(1)
try:
asyncio.run(self._run_async())
except KeyboardInterrupt:
pass # 已在信号处理器中处理
def main():
"""命令行入口"""
import argparse
parser = argparse.ArgumentParser(description='WebSocket Receiver Skill v2.0')
parser.add_argument('--config', '-c', help='Config JSON file')
parser.add_argument('--url', '-u', help='WebSocket URL')
parser.add_argument('--batch-size', '-b', type=int, help='Batch size')
parser.add_argument('--data-dir', '-d', help='Data directory')
parser.add_argument('--no-analyze', action='store_true', help='Disable auto analysis')
args = parser.parse_args()
# 加载配置
config = DEFAULT_CONFIG.copy()
if args.config:
config_path = Path(args.config).expanduser()
if config_path.exists():
with open(config_path) as f:
config.update(json.load(f))
# 命令行参数覆盖
if args.url:
config['ws_url'] = args.url
if args.batch_size:
config['batch_size'] = args.batch_size
if args.data_dir:
config['data_dir'] = args.data_dir
if args.no_analyze:
config['auto_analyze'] = False
# 启动
receiver = WebSocketReceiver(config)
receiver.run()
if __name__ == "__main__":
main()