@clawhub-mguozhen-8f4d31fbc1
Reddit & X/Twitter auto-reply bot for ecommerce/SaaS growth. Finds relevant posts about AI customer service, Amazon FBA, Shopify — posts genuine AI-generated...
---
name: social-reply-bot
description: "Reddit & X/Twitter auto-reply bot for ecommerce/SaaS growth. Finds relevant posts about AI customer service, Amazon FBA, Shopify — posts genuine AI-generated replies mentioning your product. Includes Reddit account warmup (karma building) and lead tracking. Triggers: social reply bot, reddit auto reply, twitter auto reply, x auto reply, social media bot, amazon seller engagement, ecommerce social engagement, reddit warmup, karma building, warmup reddit, social leads, potential customers"
allowed-tools: Bash
metadata:
openclaw:
homepage: https://github.com/mguozhen/social-bot
---
# Social Reply Bot
Automatically finds and replies to relevant Reddit and X/Twitter posts about ecommerce, Amazon FBA, and AI customer service. Also builds Reddit account karma and tracks potential customer leads.
## Commands
```
social reply bot # run both platforms
social reply bot x only # X/Twitter only
social reply bot reddit only # Reddit only
social reply bot warmup # build Reddit karma (8 comments)
social reply bot warmup 15 # warmup with custom target
social reply bot leads # show potential customers found
social reply bot stats # today's stats
social reply bot dashboard # open web dashboard
```
## Setup
```bash
curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
```
## Requirements
- `browse` CLI: `npm install -g @browserbasehq/browse-cli`
- Log in to Reddit and X in the browse-controlled Chrome window
- `ANTHROPIC_API_KEY` in `.env`
## Features
### Daily Reply Bot
- Searches subreddits and X for posts matching your product keywords
- Claude generates genuine, on-topic replies (not spam)
- Browser automation — no Reddit/X API key needed
- SQLite deduplication — never replies to the same post twice
### Reddit Warmup (karma building)
- Visits low-moderation subreddits (r/karma, r/CasualConversation, r/self)
- Claude Haiku generates authentic short comments (no product mentions)
- Natural delays between posts (90–180s)
- Builds Comment Karma to unlock restricted subreddits
### Lead Tracking
- Every replied post analyzed by Claude for customer potential
- Scored 1–10 with urgency level
- Extracts business type and pain points
- View with: `social reply bot leads`
## Configuration
Edit `~/social-bot/config.json` to set your subreddits, X search queries, product descriptions, and daily targets.
FILE:README.md
# 我用 AI 让账号每天自动在 Reddit 和 X 上发 30 条高质量回复,完全不用人工干预
> 作为跨境电商卖家/工具创业者,我们都知道社媒曝光有多重要——但每天手动刷帖子回复,成本太高。这篇文章讲我怎么用 Claude + 浏览器自动化,搭了一套 Social Reply Bot,现在每天自动在 X 和 Reddit 帮 Solvea 和 VOC.ai 做精准曝光。
---
## 🤔 我在解决什么问题
做 AI 工具的,最难的不是产品,是**让目标用户知道你存在**。
我们的两个产品:
- **Solvea** — Amazon/Shopify 卖家的 AI 客服 Agent,自动回答买家问题
- **VOC.ai** — Amazon 差评分析工具,把 1-star 变成优化 listing 的情报
用户在哪?就在 Reddit 的 r/FulfillmentByAmazon、r/ecommerce,在 X 上搜 "AI customer service ecommerce"。
他们每天都在发帖问问题:
- "我的客服自动化怎么配置?"
- "怎么分析哪些差评影响 BSR 最大?"
- "有没有 AI 工具能处理买家消息?"
**这些帖子,每一条都是精准获客机会。** 但手动回复太累,外包给人又很难保证质量。
---
## ⚙️ 系统架构:三层设计
```
搜索引擎(关键词匹配)
↓
浏览器自动化(browse CLI)
↓
Claude AI(生成真实、有价值的回复)
↓
发帖 → 记录 → Dashboard 监控
```
**核心理念:不是垃圾广告,而是真实的行业洞见**
AI 会先判断这个帖子是否和我们的产品相关。如果相关,它会以"有 5 年经验的亚马逊卖家"身份,分享真实经验,自然带出产品。如果不相关,直接 skip,绝不强行植入。
---
## 📸 实际运行效果
### 1. Dashboard — 实时监控每日进度

系统全自动运行,Dashboard 显示:
- 今日 X 发帖:1条(目标 20条)
- 今日 Reddit 评论:1条(目标 10条)
- Solvea 被提及:2次
- 历史总回复:2条(刚启动第一天)
---
### 2. X (Twitter) — 精准找到目标帖子,秒速回复
系统搜索关键词 `"ecommerce customer support automation"`,找到竞品 WukongChat 的推文:
> **@wukongchat** 发帖:Customer support made simple. WukongChat AI automates replies, categorizes inquiries...
**AI 生成的回复(以 @VocAiSage 身份发出):**

> "Centralizing convos is step 1 — but the real win is automating *resolution*, not just visibility. We use Solvea to actually *answer* buyers using order data, not just route tickets. Dashboard unification means nothing if agents still do manual lookups."
**分析这条回复为什么有效:**
- ✅ 不是广告语气,是行业从业者的真实观点
- ✅ 指出了对方产品的局限(路由 ≠ 解决)
- ✅ 自然带出 Solvea,以"我们用的工具"的方式
- ✅ 260 字以内,符合 X 的最佳互动长度
---
### 3. Reddit — 深度评论,带来真实讨论
r/ecommerce 上有人发帖讨论 WooCommerce chatbot 的库存同步坑:
**AI 生成并自动发出的评论(账号:mguozhen):**

> "That last point hits hard — customers absolutely hold the *store* accountable for what the bot says. No separation in their mind.
>
> We burned inventory sync trust early on too. Daily sync sounds safe until you're running flash sales or dealing with supplier delays. The only real fix was event-triggered sync, not scheduled. Bad review from a bot mistake is genuinely worse than a human CS mistake because it feels systemic to the customer."
**为什么这条评论质量高:**
- ✅ 先共鸣,再输出洞见("That last point hits hard")
- ✅ 分享了具体的技术方案(event-triggered vs. scheduled sync)
- ✅ 提供了额外价值(bot 失误比人工失误评论影响更大)
- ✅ 获得了其他用户的正向互动
---
## 🔧 技术实现细节
### 关键词匹配 → AI 过滤的双重机制
```python
# 第一层:关键词快速过滤(不消耗 API)
def detect_product(text: str) -> Optional[str]:
keywords = {
"Solvea": ["customer service", "ai agent", "chatbot", "support automation"],
"VOC.ai": ["amazon review", "vine review", "review analysis", "1-star"]
}
# 匹配得分最高的产品,无匹配返回 None
# 第二层:Claude 判断是否值得回复
# 提示词让 Claude 以"有 5 年经验的 Amazon 卖家"身份
# 不相关时返回 SKIP,不强行植入
```
### 浏览器自动化:不需要任何平台 API
用 `browse` CLI 控制本地 Chrome:
- Reddit:账号用 Google OAuth 登录,直接操作 old.reddit.com
- X:账号 @VocAiSage 已登录,search → 找相关推文 → 发回复
**完全不需要 Reddit API Key 或 Twitter API Key**,用的是浏览器会话,成本为零。
### 防重复 + 速率控制
```python
# 每条帖子只回复一次(无论成功失败)
def already_replied(post_url: str) -> bool:
# 查 SQLite,posted 和 failed 都不重试
# Reddit 每条间隔 10 分钟
# X 每条间隔 5 分钟
# 每天 X 最多 20 条,Reddit 最多 10 条
```
---
## 📊 已知局限和解决方案
| 问题 | 原因 | 解决方案 |
|------|------|----------|
| Reddit 新账号评论被自动删除 | 账号 karma < 10 或注册 < 10 天 | 先手动养号 2 周,刷满基础 karma |
| X 某些帖子回复失败 | 页面结构变化/rate limit | 已加重试逻辑,失败后跳过不死循环 |
| FBA 子版块匹配率低 | 今天的帖子话题不对口 | 多配几个 subreddit,r/ecommerce 效果更好 |
---
## 🚀 如何自己搭一套
### 方案一:一键安装到新 Mac
```bash
curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
```
安装脚本会自动完成:
1. Clone 代码
2. 安装 Python 依赖(anthropic, flask)
3. 检查 browse CLI
4. 引导填写 `.env`(只需要 ANTHROPIC_API_KEY)
5. 初始化 SQLite 数据库
6. 注册 macOS LaunchAgent(每天 10:05 自动运行)
### 方案二:通过 openclaw 一行调用
如果你已经安装了 openclaw:
```bash
clawhub install social-reply-bot
```
然后直接说:
- `"social reply bot"` — 运行两个平台
- `"social reply bot x only"` — 只跑 X
- `"social reply bot stats"` — 查看今日数据
- `"social reply bot dashboard"` — 打开可视化面板
### 配置文件说明
编辑 `~/social-bot/config.json`,自定义你的场景:
```json
{
"x": {
"daily_target": 20,
"search_queries": [
"你的关键词1",
"你的关键词2"
]
},
"reddit": {
"daily_target": 10,
"subreddits": ["你的目标社区"]
},
"products": {
"你的产品名": {
"description": "产品描述,AI 会用这个决定怎么提及",
"trigger_keywords": ["相关关键词"]
}
}
}
```
---
## 💡 适合哪些场景
✅ **跨境电商工具/SaaS** — 目标用户在 Reddit/X 上非常活跃,且乐于分享经验
✅ **B2B 产品冷启动** — 社区回复是最自然的 PLG 方式,比广告信任度高 10 倍
✅ **个人品牌建设** — 以专家身份持续输出,积累行业影响力
✅ **竞品监控+截流** — 搜竞品关键词,在竞品帖子下自然推荐自己
❌ **不适合**:纯 C 端消费品(用户不在专业论坛)、需要图片/视频的场景
---
## 📈 预期效果
按目前配置(X 20条/天,Reddit 10条/天):
- 每月覆盖 **600+ 条精准帖子**
- 触达 **600+ 位正在讨论相关问题的用户**
- 其中约 10-15% 会点击主页/链接了解更多
这不是一夜暴富的流量,而是**持续的、精准的品牌曝光**,在目标用户最需要你的时刻出现。
---
## 🔗 资源链接
- GitHub 仓库:https://github.com/mguozhen/social-bot
- clawhub skill:`clawhub install social-reply-bot`
- 所需费用:约 $0.01/条回复(Claude API),其余零成本
---
*用 Claude Code + browse CLI 构建,完整代码开源。有问题欢迎在评论区讨论。*
FILE:article_social_bot.md
# 我用 AI 让账号每天自动在 Reddit 和 X 上发 30 条高质量回复,完全不用人工干预
> 作为跨境电商卖家/工具创业者,我们都知道社媒曝光有多重要——但每天手动刷帖子回复,成本太高。这篇文章讲我怎么用 Claude + 浏览器自动化,搭了一套 Social Reply Bot,现在每天自动在 X 和 Reddit 帮 Solvea 和 VOC.ai 做精准曝光。
---
## 🤔 我在解决什么问题
做 AI 工具的,最难的不是产品,是**让目标用户知道你存在**。
我们的两个产品:
- **Solvea** — Amazon/Shopify 卖家的 AI 客服 Agent,自动回答买家问题
- **VOC.ai** — Amazon 差评分析工具,把 1-star 变成优化 listing 的情报
用户在哪?就在 Reddit 的 r/FulfillmentByAmazon、r/ecommerce,在 X 上搜 "AI customer service ecommerce"。
他们每天都在发帖问问题:
- "我的客服自动化怎么配置?"
- "怎么分析哪些差评影响 BSR 最大?"
- "有没有 AI 工具能处理买家消息?"
**这些帖子,每一条都是精准获客机会。** 但手动回复太累,外包给人又很难保证质量。
---
## ⚙️ 系统架构:三层设计
```
搜索引擎(关键词匹配)
↓
浏览器自动化(browse CLI)
↓
Claude AI(生成真实、有价值的回复)
↓
发帖 → 记录 → Dashboard 监控
```
**核心理念:不是垃圾广告,而是真实的行业洞见**
AI 会先判断这个帖子是否和我们的产品相关。如果相关,它会以"有 5 年经验的亚马逊卖家"身份,分享真实经验,自然带出产品。如果不相关,直接 skip,绝不强行植入。
---
## 📸 实际运行效果
### 1. Dashboard — 实时监控每日进度

系统全自动运行,Dashboard 显示:
- 今日 X 发帖:1条(目标 20条)
- 今日 Reddit 评论:1条(目标 10条)
- Solvea 被提及:2次
- 历史总回复:2条(刚启动第一天)
---
### 2. X (Twitter) — 精准找到目标帖子,秒速回复
系统搜索关键词 `"ecommerce customer support automation"`,找到竞品 WukongChat 的推文:
> **@wukongchat** 发帖:Customer support made simple. WukongChat AI automates replies, categorizes inquiries...
**AI 生成的回复(以 @VocAiSage 身份发出):**

> "Centralizing convos is step 1 — but the real win is automating *resolution*, not just visibility. We use Solvea to actually *answer* buyers using order data, not just route tickets. Dashboard unification means nothing if agents still do manual lookups."
**分析这条回复为什么有效:**
- ✅ 不是广告语气,是行业从业者的真实观点
- ✅ 指出了对方产品的局限(路由 ≠ 解决)
- ✅ 自然带出 Solvea,以"我们用的工具"的方式
- ✅ 260 字以内,符合 X 的最佳互动长度
---
### 3. Reddit — 深度评论,带来真实讨论
r/ecommerce 上有人发帖讨论 WooCommerce chatbot 的库存同步坑:
**AI 生成并自动发出的评论(账号:mguozhen):**

> "That last point hits hard — customers absolutely hold the *store* accountable for what the bot says. No separation in their mind.
>
> We burned inventory sync trust early on too. Daily sync sounds safe until you're running flash sales or dealing with supplier delays. The only real fix was event-triggered sync, not scheduled. Bad review from a bot mistake is genuinely worse than a human CS mistake because it feels systemic to the customer."
**为什么这条评论质量高:**
- ✅ 先共鸣,再输出洞见("That last point hits hard")
- ✅ 分享了具体的技术方案(event-triggered vs. scheduled sync)
- ✅ 提供了额外价值(bot 失误比人工失误评论影响更大)
- ✅ 获得了其他用户的正向互动
---
## 🔧 技术实现细节
### 关键词匹配 → AI 过滤的双重机制
```python
# 第一层:关键词快速过滤(不消耗 API)
def detect_product(text: str) -> Optional[str]:
keywords = {
"Solvea": ["customer service", "ai agent", "chatbot", "support automation"],
"VOC.ai": ["amazon review", "vine review", "review analysis", "1-star"]
}
# 匹配得分最高的产品,无匹配返回 None
# 第二层:Claude 判断是否值得回复
# 提示词让 Claude 以"有 5 年经验的 Amazon 卖家"身份
# 不相关时返回 SKIP,不强行植入
```
### 浏览器自动化:不需要任何平台 API
用 `browse` CLI 控制本地 Chrome:
- Reddit:账号用 Google OAuth 登录,直接操作 old.reddit.com
- X:账号 @VocAiSage 已登录,search → 找相关推文 → 发回复
**完全不需要 Reddit API Key 或 Twitter API Key**,用的是浏览器会话,成本为零。
### 防重复 + 速率控制
```python
# 每条帖子只回复一次(无论成功失败)
def already_replied(post_url: str) -> bool:
# 查 SQLite,posted 和 failed 都不重试
# Reddit 每条间隔 10 分钟
# X 每条间隔 5 分钟
# 每天 X 最多 20 条,Reddit 最多 10 条
```
---
## 📊 已知局限和解决方案
| 问题 | 原因 | 解决方案 |
|------|------|----------|
| Reddit 新账号评论被自动删除 | 账号 karma < 10 或注册 < 10 天 | 先手动养号 2 周,刷满基础 karma |
| X 某些帖子回复失败 | 页面结构变化/rate limit | 已加重试逻辑,失败后跳过不死循环 |
| FBA 子版块匹配率低 | 今天的帖子话题不对口 | 多配几个 subreddit,r/ecommerce 效果更好 |
---
## 🚀 如何自己搭一套
### 方案一:一键安装到新 Mac
```bash
curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
```
安装脚本会自动完成:
1. Clone 代码
2. 安装 Python 依赖(anthropic, flask)
3. 检查 browse CLI
4. 引导填写 `.env`(只需要 ANTHROPIC_API_KEY)
5. 初始化 SQLite 数据库
6. 注册 macOS LaunchAgent(每天 10:05 自动运行)
### 方案二:通过 openclaw 一行调用
如果你已经安装了 openclaw:
```bash
clawhub install social-reply-bot
```
然后直接说:
- `"social reply bot"` — 运行两个平台
- `"social reply bot x only"` — 只跑 X
- `"social reply bot stats"` — 查看今日数据
- `"social reply bot dashboard"` — 打开可视化面板
### 配置文件说明
编辑 `~/social-bot/config.json`,自定义你的场景:
```json
{
"x": {
"daily_target": 20,
"search_queries": [
"你的关键词1",
"你的关键词2"
]
},
"reddit": {
"daily_target": 10,
"subreddits": ["你的目标社区"]
},
"products": {
"你的产品名": {
"description": "产品描述,AI 会用这个决定怎么提及",
"trigger_keywords": ["相关关键词"]
}
}
}
```
---
## 💡 适合哪些场景
✅ **跨境电商工具/SaaS** — 目标用户在 Reddit/X 上非常活跃,且乐于分享经验
✅ **B2B 产品冷启动** — 社区回复是最自然的 PLG 方式,比广告信任度高 10 倍
✅ **个人品牌建设** — 以专家身份持续输出,积累行业影响力
✅ **竞品监控+截流** — 搜竞品关键词,在竞品帖子下自然推荐自己
❌ **不适合**:纯 C 端消费品(用户不在专业论坛)、需要图片/视频的场景
---
## 📈 预期效果
按目前配置(X 20条/天,Reddit 10条/天):
- 每月覆盖 **600+ 条精准帖子**
- 触达 **600+ 位正在讨论相关问题的用户**
- 其中约 10-15% 会点击主页/链接了解更多
这不是一夜暴富的流量,而是**持续的、精准的品牌曝光**,在目标用户最需要你的时刻出现。
---
## 🔗 资源链接
- GitHub 仓库:https://github.com/mguozhen/social-bot
- clawhub skill:`clawhub install social-reply-bot`
- 所需费用:约 $0.01/条回复(Claude API),其余零成本
---
*用 Claude Code + browse CLI 构建,完整代码开源。有问题欢迎在评论区讨论。*
FILE:bot/__init__.py
FILE:bot/ai_engine.py
import os
import anthropic
import json
from pathlib import Path
from typing import Optional, Tuple
CONFIG = json.loads((Path(__file__).parent.parent / "config.json").read_text())
def detect_product(text: str) -> Optional[str]:
"""Return 'Solvea', 'VOC.ai', or None based on keyword match."""
text_lower = text.lower()
scores = {}
for product, data in CONFIG["products"].items():
score = sum(1 for kw in data["trigger_keywords"] if kw in text_lower)
if score > 0:
scores[product] = score
if not scores:
return None
return max(scores, key=scores.get)
def generate_reply(post_title: str, post_content: str, platform: str) -> Tuple[Optional[str], Optional[str]]:
"""
Returns (reply_text, product_mentioned) or (None, None) if should skip.
"""
product = detect_product(f"{post_title} {post_content}")
if not product:
return None, None
product_desc = CONFIG["products"][product]["description"]
style_rules = "\n".join(f"- {r}" for r in CONFIG["reply_style"]["rules"])
max_len = (CONFIG["reply_style"]["max_length_x"] if platform == "x"
else CONFIG["reply_style"]["max_length_reddit"])
system_prompt = f"""You are a hands-on Amazon/Shopify seller and builder who has been selling for 5+ years.
You reply to social media posts with genuine insights from your own experience.
You sometimes mention {product} ({product_desc}) as a tool you personally use — but only when it's directly relevant.
Reply style rules:
{style_rules}
Max length: {max_len} characters for {platform}. Be concise."""
user_prompt = f"""Post title: {post_title}
Post content:
{post_content[:800]}
Write a reply that adds real value. Mention {product} only if it fits naturally.
If it doesn't fit, reply with just: SKIP
Output only the reply text, nothing else."""
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
message = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=400,
messages=[{"role": "user", "content": user_prompt}],
system=system_prompt,
)
reply = message.content[0].text.strip()
if reply.upper().startswith("SKIP") or len(reply) < 20:
return None, None
# Trim to platform max
if len(reply) > max_len + 50:
reply = reply[:max_len].rsplit(" ", 1)[0] + "..."
return reply, product
def analyze_lead(post_title: str, post_content: str, post_url: str, platform: str) -> Optional[dict]:
"""
判断发帖人是否是 Solvea 的潜在客户,并提取关键信息。
返回 dict 或 None(不是潜在客户)。
"""
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
user_prompt = f"""Analyze this social media post and determine if the author is a potential customer for Solvea.
Solvea is an AI customer support agent for Shopify/ecommerce stores that:
- Autonomously handles support tickets (tracking, returns, product questions)
- Integrates directly with Shopify to take actions (process returns, update shipping)
- Provides a unified inbox for human handoff
Post URL: {post_url}
Platform: {platform}
Title: {post_title}
Content: {post_content[:600]}
Respond in JSON only:
{{
"is_lead": true/false,
"lead_score": 1-10,
"pain_points": ["list of pain points mentioned"],
"business_type": "shopify store / amazon seller / saas / other / unknown",
"urgency": "high / medium / low",
"reason": "one sentence why they are or aren't a lead"
}}
Only return JSON, nothing else."""
try:
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=300,
messages=[{"role": "user", "content": user_prompt}],
)
text = msg.content[0].text.strip()
# Extract JSON
import re
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if not json_match:
return None
data = json.loads(json_match.group())
if not data.get("is_lead"):
return None
data["post_url"] = post_url
data["platform"] = platform
data["post_title"] = post_title
return data
except Exception:
return None
FILE:bot/browser.py
"""
Thin wrapper around the `browse` CLI.
All commands return parsed JSON or raise BrowseError.
"""
import subprocess
import json
import time
import re
class BrowseError(Exception):
pass
def _run(args: str, timeout: int = 30) -> dict:
result = subprocess.run(
f"browse {args}",
shell=True,
capture_output=True,
text=True,
timeout=timeout,
)
out = result.stdout.strip()
if not out:
return {}
try:
return json.loads(out)
except json.JSONDecodeError:
return {"raw": out}
def open_url(url: str) -> dict:
return _run(f'open "{url}"', timeout=20)
def snapshot() -> str:
"""Returns the accessibility tree as a raw string."""
data = _run("snapshot", timeout=15)
return data.get("tree", "")
def screenshot(path: str) -> str:
_run(f'screenshot "{path}"', timeout=15)
return path
def click(ref: str) -> bool:
data = _run(f"click @{ref}", timeout=10)
return data.get("clicked", False)
def click_xy(x: int, y: int) -> bool:
data = _run(f"click {x} {y}", timeout=10)
return data.get("clicked", False)
def type_text(text: str) -> bool:
# Escape single quotes in text
safe = text.replace("'", "\\'")
data = _run(f"type '{safe}'", timeout=15)
return data.get("typed", False)
def press(key: str) -> bool:
data = _run(f"press '{key}'", timeout=10)
return bool(data)
def scroll(x: int, y: int, dx: int, dy: int) -> bool:
data = _run(f"scroll {x} {y} {dx} {dy}", timeout=10)
return data.get("scrolled", False)
def get_url() -> str:
data = _run("get url", timeout=10)
return data.get("url", "")
def find_refs(tree: str, pattern: str) -> list[str]:
"""Find element refs matching pattern in the accessibility tree."""
return re.findall(rf'\[(\d+-\d+)\] {pattern}', tree)
def find_text_refs(tree: str, text: str) -> list[str]:
"""Find refs of elements containing given text."""
escaped = re.escape(text)
return re.findall(rf'\[(\d+-\d+)\][^\n]*{escaped}', tree)
def wait_seconds(n: float):
time.sleep(n)
FILE:bot/db.py
import sqlite3
import os
from datetime import datetime, date
from pathlib import Path
DB_PATH = Path(__file__).parent.parent / "logs" / "social_bot.db"
def get_conn():
DB_PATH.parent.mkdir(exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS replies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL, -- 'x' or 'reddit'
post_url TEXT NOT NULL,
post_title TEXT,
post_snippet TEXT,
reply_text TEXT NOT NULL,
product TEXT, -- 'Solvea' | 'VOC.ai' | None
status TEXT DEFAULT 'posted', -- 'posted' | 'failed' | 'skipped'
error_msg TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS daily_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_date TEXT NOT NULL,
platform TEXT NOT NULL,
target INTEGER,
posted INTEGER DEFAULT 0,
failed INTEGER DEFAULT 0,
skipped INTEGER DEFAULT 0,
duration_secs INTEGER,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_replies_platform_date
ON replies(platform, created_at);
CREATE INDEX IF NOT EXISTS idx_replies_post_url
ON replies(post_url);
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL,
post_url TEXT NOT NULL UNIQUE,
post_title TEXT,
business_type TEXT,
pain_points TEXT, -- JSON array
lead_score INTEGER,
urgency TEXT,
reason TEXT,
replied INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
""")
def log_reply(platform, post_url, post_title, post_snippet, reply_text,
product=None, status="posted", error_msg=None):
with get_conn() as conn:
conn.execute("""
INSERT INTO replies
(platform, post_url, post_title, post_snippet, reply_text, product, status, error_msg)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (platform, post_url, post_title, post_snippet, reply_text,
product, status, error_msg))
def already_replied(post_url: str) -> bool:
with get_conn() as conn:
row = conn.execute(
"SELECT id FROM replies WHERE post_url = ? AND status IN ('posted', 'failed')",
(post_url,)
).fetchone()
return row is not None
def get_today_count(platform: str) -> int:
today = date.today().isoformat()
with get_conn() as conn:
row = conn.execute("""
SELECT COUNT(*) as cnt FROM replies
WHERE platform = ? AND status = 'posted'
AND date(created_at) = ?
""", (platform, today)).fetchone()
return row["cnt"] if row else 0
def get_stats(days=30):
with get_conn() as conn:
rows = conn.execute("""
SELECT
date(created_at) as day,
platform,
COUNT(*) FILTER (WHERE status='posted') as posted,
COUNT(*) FILTER (WHERE status='failed') as failed,
COUNT(*) FILTER (WHERE status='skipped') as skipped,
COUNT(*) as total
FROM replies
WHERE date(created_at) >= date('now', ? || ' days')
GROUP BY day, platform
ORDER BY day DESC
""", (f"-{days}",)).fetchall()
return [dict(r) for r in rows]
def save_lead(lead: dict):
import json as _json
with get_conn() as conn:
conn.execute("""
INSERT OR IGNORE INTO leads
(platform, post_url, post_title, business_type, pain_points, lead_score, urgency, reason, replied)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
lead.get("platform"),
lead.get("post_url"),
lead.get("post_title"),
lead.get("business_type"),
_json.dumps(lead.get("pain_points", [])),
lead.get("lead_score"),
lead.get("urgency"),
lead.get("reason"),
1, # we replied
))
def get_leads(days=7):
with get_conn() as conn:
rows = conn.execute("""
SELECT * FROM leads
WHERE date(created_at) >= date('now', ? || ' days')
ORDER BY lead_score DESC, created_at DESC
""", (f"-{days}",)).fetchall()
return [dict(r) for r in rows]
def get_recent_replies(limit=50):
with get_conn() as conn:
rows = conn.execute("""
SELECT id, platform, post_url, post_title, reply_text,
product, status, created_at
FROM replies
ORDER BY created_at DESC
LIMIT ?
""", (limit,)).fetchall()
return [dict(r) for r in rows]
FILE:bot/reddit_bot.py
"""
Reddit reply automation via browse CLI (old.reddit.com).
No API key needed — uses the browser session logged in via Google OAuth.
"""
import re
import time
import logging
from typing import List, Tuple
from . import browser as B
from .ai_engine import generate_reply, analyze_lead
from .db import log_reply, already_replied, get_today_count, save_lead
logger = logging.getLogger(__name__)
LOGIN_URL = "https://old.reddit.com/login"
BASE_URL = "https://old.reddit.com"
# ── Login ─────────────────────────────────────────────────────────────────────
def _is_logged_in() -> bool:
tree = B.snapshot()
return "mguozhen" in tree or "logout" in tree.lower()
def _login_google():
"""Trigger Google OAuth login on old.reddit.com."""
B.open_url(LOGIN_URL)
B.wait_seconds(3)
if _is_logged_in():
return True
# Click "Continue with Google" button
tree = B.snapshot()
google_refs = B.find_text_refs(tree, "Google")
if not google_refs:
# Try navigating directly to Google OAuth via new Reddit then come back
B.open_url("https://www.reddit.com/login")
B.wait_seconds(3)
tree = B.snapshot()
google_refs = B.find_text_refs(tree, "Continue as Hunter")
if not google_refs:
google_refs = B.find_text_refs(tree, "mguozhen")
if google_refs:
B.click(google_refs[0])
B.wait_seconds(5)
# May hit Google account chooser
tree = B.snapshot()
hunter_refs = B.find_text_refs(tree, "Hunter G")
if not hunter_refs:
hunter_refs = B.find_text_refs(tree, "mguozhen")
if hunter_refs:
B.click(hunter_refs[0])
B.wait_seconds(5)
# Verify login
B.open_url(BASE_URL)
B.wait_seconds(3)
return _is_logged_in()
def _ensure_logged_in() -> bool:
B.open_url(BASE_URL)
B.wait_seconds(3)
if _is_logged_in():
logger.info("Reddit: already logged in")
return True
logger.info("Reddit: not logged in, attempting Google OAuth...")
result = _login_google()
if result:
logger.info("Reddit: login successful")
else:
logger.error("Reddit: login failed")
return result
# ── Post scraping ──────────────────────────────────────────────────────────────
def _get_subreddit_posts(subreddit: str) -> List[dict]:
"""
Browse /r/subreddit/new/ and collect (title, comment_count) for each post.
Returns list of dicts with {title, comment_count}. No navigation needed.
"""
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(3)
tree = B.snapshot()
posts = []
# In old Reddit each post row has: title link followed by "submitted X ago" then "N comments"
# Pair title links with comment links by scanning "submitted" anchors
submitted_positions = [m.start() for m in re.finditer(r'\bsubmitted\b', tree)]
comment_positions = [(m.start(), m.group(1)) for m in
re.finditer(r'\[(\d+-\d+)\] link: (\d+ comments?)', tree)]
seen_titles = set()
for pos in submitted_positions[:30]:
# Title link: last link before 'submitted'
chunk_before = tree[max(0, pos - 1500):pos]
links_before = re.findall(r'\[(\d+-\d+)\] link: ([^\n]{15,200})', chunk_before)
if not links_before:
continue
_ref, title = links_before[-1]
title = title.strip()
skip_words = ["Submit a new", "Welcome to", "How To Get", "Contacting Amazon",
"About /r/", "wiki", "Discord", "/r/", "http", "View Poll"]
if any(s.lower() in title.lower() for s in skip_words):
continue
if title in seen_titles:
continue
# Comment count: first comment link after 'submitted'
comment_count = 0
for cpos, cref in comment_positions:
if cpos > pos:
m = re.search(r'(\d+)', tree[cpos:cpos+100])
comment_count = int(m.group(1)) if m else 0
break
seen_titles.add(title)
posts.append({"title": title, "comment_count": comment_count})
return posts[:20]
def _navigate_and_get_content(post_index: int, subreddit: str) -> Tuple[str, str]:
"""
From the subreddit listing, click the Nth comment link to open post.
Returns (url, snippet).
"""
tree = B.snapshot()
comment_links = re.findall(r'\[(\d+-\d+)\] link: \d+ comments?', tree)
if post_index >= len(comment_links):
return "", ""
B.click(comment_links[post_index])
B.wait_seconds(3)
url = B.get_url()
tree2 = B.snapshot()
text_blocks = re.findall(r'StaticText: ([^\n]{20,})', tree2)
snippet = " ".join(text_blocks[:10])[:700]
return url, snippet
# ── Commenting ─────────────────────────────────────────────────────────────────
def _post_comment(reply_text: str) -> bool:
"""
Assumes we're already on an old.reddit.com post page.
Finds the comment textarea, types the reply, clicks save.
"""
tree = B.snapshot()
# Find comment textarea ref
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox(?!\: search)', tree)
if not textarea_refs:
# Fallback: look for any textarea near "save" button
save_refs = re.findall(r'\[(\d+-\d+)\] button: save', tree, re.I)
if not save_refs:
logger.warning("Reddit: no comment form found")
return False
# Find textarea before save button in tree
save_idx = tree.find(save_refs[0])
chunk = tree[max(0, save_idx - 2000):save_idx]
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox', chunk)
if not textarea_refs:
return False
B.click(textarea_refs[-1])
B.wait_seconds(1)
# Type reply paragraph by paragraph
paragraphs = reply_text.split("\n\n")
for i, para in enumerate(paragraphs):
# Remove dollar signs to avoid shell variable expansion
safe = para.replace("$", "").strip()
if safe:
B.type_text(safe)
if i < len(paragraphs) - 1:
B.press("Enter")
B.press("Enter")
B.wait_seconds(1)
# Find and click save button
tree = B.snapshot()
save_refs = re.findall(r'\[(\d+-\d+)\] button: save', tree, re.I)
if not save_refs:
return False
B.click(save_refs[0])
B.wait_seconds(4)
# Verify: comment should appear with "mguozhen" and "just now"
confirm_tree = B.snapshot()
return "mguozhen" in confirm_tree and (
"just now" in confirm_tree or "1 minute ago" in confirm_tree
)
# ── Main ──────────────────────────────────────────────────────────────────────
def run(config: dict) -> dict:
"""
Main entry point. Returns summary dict.
config: from config.json["reddit"]
"""
target = config["daily_target"]
subreddits = config["subreddits"]
delay = config["min_delay_seconds"]
summary = {"posted": 0, "failed": 0, "skipped": 0, "target": target}
if not _ensure_logged_in():
logger.error("Reddit: cannot proceed without login")
return summary
today_count = get_today_count("reddit")
if today_count >= target:
logger.info(f"Reddit: already hit target ({today_count}/{target})")
summary["posted"] = today_count
return summary
for subreddit in subreddits:
if get_today_count("reddit") >= target:
break
logger.info(f"Reddit: scanning r/{subreddit}")
posts = _get_subreddit_posts(subreddit)
# We're now on the subreddit listing page
visited = 0
for post in posts:
if get_today_count("reddit") >= target:
break
# From listing, click Nth comment link (fresh snapshot each time)
try:
tree = B.snapshot()
comment_links = re.findall(r'\[(\d+-\d+)\] link: \d+ comments?', tree)
if visited >= len(comment_links):
break
B.click(comment_links[visited])
B.wait_seconds(3)
post_url = B.get_url()
except Exception as e:
logger.warning(f"Reddit: nav failed — {e}")
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(3)
visited += 1
summary["skipped"] += 1
continue
visited += 1
if already_replied(post_url):
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(2)
summary["skipped"] += 1
continue
# Get post content — skip sidebar, anchor on post title
tree = B.snapshot()
title_idx = tree.find(post["title"][:40])
if title_idx > 0:
# Extract StaticText after the title (post body + comments)
chunk = tree[title_idx:title_idx + 3000]
else:
chunk = tree[len(tree) // 2:] # fallback: second half of tree
text_blocks = re.findall(r'StaticText: ([^\n]{15,})', chunk)
# Skip meta lines (submitted, by, share, save, etc.)
meta = {"submitted", "by", "share", "save", "hide", "report",
"crosspost", "sorted by:", "best", "formatting help"}
clean = [t for t in text_blocks if t.strip().lower() not in meta]
snippet = " ".join(clean[:12])[:700]
reply_text, product = generate_reply(
post_title=post["title"],
post_content=snippet,
platform="reddit"
)
if not reply_text:
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(2)
summary["skipped"] += 1
continue
success = _post_comment(reply_text)
if success:
log_reply("reddit", post_url, post["title"],
snippet[:200], reply_text, product, "posted")
summary["posted"] += 1
logger.info(f"Reddit: posted #{summary['posted']} — {post['title'][:60]}")
# Lead analysis
lead = analyze_lead(post["title"], snippet, post_url, "reddit")
if lead:
save_lead(lead)
logger.info(f"Reddit: 🎯 lead saved score={lead.get('lead_score')} urgency={lead.get('urgency')}")
else:
log_reply("reddit", post_url, post["title"],
snippet[:200], reply_text, product, "failed",
"comment not confirmed")
summary["failed"] += 1
logger.warning(f"Reddit: comment failed — {post['title'][:60]}")
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(delay if success else 10)
return summary
FILE:bot/x_bot.py
"""
X/Twitter reply automation via browse CLI.
Searches for relevant posts and posts replies as @VocAiSage.
"""
import re
import time
import logging
from typing import List
from . import browser as B
from .ai_engine import generate_reply, analyze_lead
from .db import log_reply, already_replied, get_today_count, save_lead
logger = logging.getLogger(__name__)
LOGIN_URL = "https://x.com/login"
SEARCH_URL = "https://x.com/search?q={query}&src=typed_query&f=live"
def _is_logged_in() -> bool:
tree = B.snapshot()
return "VocAiSage" in tree or "Hunter Guo" in tree
def _login_if_needed():
"""Open X and check login. If not logged in, open login page for manual auth."""
B.open_url("https://x.com")
B.wait_seconds(3)
if _is_logged_in():
logger.info("X: already logged in")
return True
logger.warning("X: not logged in — opening login page")
B.open_url(LOGIN_URL)
# Wait up to 60s for user to log in (for first-run setup)
for _ in range(12):
B.wait_seconds(5)
if _is_logged_in():
logger.info("X: login confirmed")
return True
logger.error("X: login timeout")
return False
def _search_posts(query: str) -> List[dict]:
"""Search X and return list of {snippet, time_ref} dicts."""
url = SEARCH_URL.format(query=query.replace(" ", "+"))
B.open_url(url)
B.wait_seconds(3)
posts = []
tree = B.snapshot()
# Find article start positions in the full tree
article_positions = [(m.start(), m.group(1)) for m in
re.finditer(r'\[(\d+-\d+)\] article:', tree)]
for i, (pos, article_ref) in enumerate(article_positions[:15]):
# Article block ends where next top-level sibling starts
next_pos = article_positions[i + 1][0] if i + 1 < len(article_positions) else len(tree)
block = tree[pos:next_pos]
# Time link: [ref] link: Mar 16 OR link: 5h OR link: just now
time_match = re.search(
r'\[(\d+-\d+)\] link: (?:(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d+|\d+[hm]|just now)',
block
)
if not time_match:
continue
time_ref = time_match.group(1)
# Snippet from StaticText nodes (skip very short ones like emojis/counts)
texts = re.findall(r'StaticText: ([^\n]{10,})', block)
snippet = " ".join(texts[:6])[:350]
posts.append({
"article_ref": article_ref,
"time_ref": time_ref,
"snippet": snippet,
})
return posts
def _reply_current_page(reply_text: str) -> bool:
"""Type and submit a reply on the currently open tweet page."""
tree = B.snapshot()
# Find reply textbox
boxes = re.findall(r'\[(\d+-\d+)\] textbox: Post text', tree)
if not boxes:
return False
B.click(boxes[0])
B.wait_seconds(1)
# Type reply paragraph by paragraph
paragraphs = reply_text.split("\n\n")
for i, para in enumerate(paragraphs):
safe_para = para.replace("$", "")
B.type_text(safe_para)
if i < len(paragraphs) - 1:
B.press("Enter")
B.press("Enter")
B.wait_seconds(1)
# Find and click reply button
tree = B.snapshot()
reply_btns = re.findall(r'\[(\d+-\d+)\] button: Reply', tree)
if len(reply_btns) >= 2:
B.click(reply_btns[-1])
B.wait_seconds(3)
confirm_tree = B.snapshot()
return "Your post was sent" in confirm_tree or "post was sent" in confirm_tree.lower()
return False
def run(config: dict) -> dict:
"""
Main entry point. Returns summary dict.
config: from config.json["x"]
"""
target = config["daily_target"]
queries = config["search_queries"]
delay = config["min_delay_seconds"]
summary = {"posted": 0, "failed": 0, "skipped": 0, "target": target}
if not _login_if_needed():
logger.error("X: cannot proceed without login")
return summary
today_count = get_today_count("x")
if today_count >= target:
logger.info(f"X: already hit target ({today_count}/{target})")
summary["posted"] = today_count
return summary
for query in queries:
if get_today_count("x") >= target:
break
logger.info(f"X: searching '{query}'")
posts = _search_posts(query)
for post in posts:
if get_today_count("x") >= target:
break
snippet = post.get("snippet", "")
if not snippet or len(snippet) < 30:
continue
# Build a fake URL key for dedup (we don't have URL yet)
# We'll update after opening
reply_text, product = generate_reply(
post_title=query,
post_content=snippet,
platform="x"
)
if not reply_text:
summary["skipped"] += 1
continue
# Open tweet, get real URL, dedup check, then reply (single open)
if not post.get("time_ref"):
summary["skipped"] += 1
continue
B.click(post["time_ref"])
B.wait_seconds(3)
real_url = B.get_url()
if already_replied(real_url):
B.press("Alt+Left")
B.wait_seconds(2)
summary["skipped"] += 1
continue
# Already on tweet page — reply directly
success = _reply_current_page(reply_text)
if success:
log_reply("x", real_url, query, snippet, reply_text, product, "posted")
summary["posted"] += 1
logger.info(f"X: posted reply #{summary['posted']} — {real_url}")
# Lead analysis
lead = analyze_lead(query, snippet, real_url, "x")
if lead:
save_lead(lead)
logger.info(f"X: 🎯 lead saved score={lead.get('lead_score')} urgency={lead.get('urgency')}")
time.sleep(delay)
else:
log_reply("x", real_url, query, snippet, reply_text, product, "failed")
summary["failed"] += 1
logger.warning(f"X: failed to post — {real_url}")
time.sleep(10)
return summary
FILE:config.json
{
"x": {
"username": "@VocAiSage",
"daily_target": 20,
"min_delay_seconds": 300,
"search_queries": [
"customer service AI agent ecommerce",
"AI support agent Amazon seller",
"ecommerce customer support automation",
"Amazon FBA AI tools",
"AI ecommerce chatbot",
"Amazon seller review analysis",
"product review AI analysis",
"Amazon review management tools",
"customer service agent AI startup",
"ecommerce AI automation"
]
},
"reddit": {
"username": "mguozhen",
"daily_target": 10,
"min_delay_seconds": 600,
"subreddits": [
"FulfillmentByAmazon",
"AmazonSeller",
"ecommerce",
"Entrepreneur",
"AmazonFBA"
]
},
"products": {
"Solvea": {
"description": "AI customer service agent for Amazon/Shopify sellers — handles buyer queries autonomously using product and order data",
"trigger_keywords": [
"customer service", "support agent", "ai agent", "chatbot",
"customer support", "buyer queries", "customer queries",
"support automation", "helpdesk", "ticket", "response time"
]
},
"VOC.ai": {
"description": "Amazon review intelligence tool — clusters review pain points by product attribute, maps to BSR impact, surfaces optimization suggestions",
"trigger_keywords": [
"amazon review", "review analysis", "product review", "customer feedback",
"review management", "1-star", "negative review", "vine review",
"review sentiment", "listing optimization", "bsr", "browse node",
"review response", "review insight"
]
}
},
"reply_style": {
"tone": "knowledgeable seller/builder sharing experience",
"max_length_x": 260,
"max_length_reddit": 400,
"rules": [
"Lead with a genuine insight or question about the post",
"Mention product naturally as 'what we use/built' — not as an ad",
"Never start with 'Great post!' or 'I agree!'",
"Sound like a real seller who has faced this problem",
"If post is not directly relevant, skip — do not force a mention"
]
}
}
FILE:dashboard/app.py
#!/usr/bin/env python3
"""
Social Bot Dashboard — Flask web app.
Run: python dashboard/app.py
Open: http://localhost:5050
"""
import sys
import json
from pathlib import Path
from datetime import date, timedelta
from flask import Flask, render_template, jsonify
from flask_cors import CORS
sys.path.insert(0, str(Path(__file__).parent.parent))
from bot.db import get_stats, get_recent_replies, get_today_count, init_db
app = Flask(__name__)
CORS(app)
CONFIG = json.loads((Path(__file__).parent.parent / "config.json").read_text())
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/overview")
def api_overview():
today = date.today().isoformat()
yesterday = (date.today() - timedelta(days=1)).isoformat()
stats = get_stats(days=30)
today_stats = [s for s in stats if s["day"] == today]
ytd_stats = [s for s in stats if s["day"] == yesterday]
def sum_field(rows, field):
return sum(r.get(field, 0) for r in rows)
x_today = next((s for s in today_stats if s["platform"] == "x"), {})
r_today = next((s for s in today_stats if s["platform"] == "reddit"), {})
x_ytd = next((s for s in ytd_stats if s["platform"] == "x"), {})
r_ytd = next((s for s in ytd_stats if s["platform"] == "reddit"), {})
return jsonify({
"today": {
"x_posted": x_today.get("posted", 0),
"x_target": CONFIG["x"]["daily_target"],
"reddit_posted": r_today.get("posted", 0),
"reddit_target": CONFIG["reddit"]["daily_target"],
"total_posted": x_today.get("posted", 0) + r_today.get("posted", 0),
},
"yesterday": {
"x_posted": x_ytd.get("posted", 0),
"reddit_posted": r_ytd.get("posted", 0),
},
"total_all_time": sum_field(stats, "posted"),
"solvea_mentions": _count_product("Solvea"),
"voc_mentions": _count_product("VOC.ai"),
})
@app.route("/api/chart/daily")
def api_chart_daily():
stats = get_stats(days=30)
# Build per-day totals
days = {}
for s in stats:
d = s["day"]
if d not in days:
days[d] = {"x": 0, "reddit": 0}
days[d][s["platform"]] = s.get("posted", 0)
labels = sorted(days.keys())
return jsonify({
"labels": labels,
"x": [days[d]["x"] for d in labels],
"reddit": [days[d]["reddit"] for d in labels],
})
@app.route("/api/replies")
def api_replies():
replies = get_recent_replies(limit=100)
return jsonify(replies)
def _count_product(name: str) -> int:
from bot.db import get_conn
with get_conn() as conn:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM replies WHERE product=? AND status='posted'",
(name,)
).fetchone()
return row["cnt"] if row else 0
if __name__ == "__main__":
init_db()
print("Dashboard: http://localhost:5050")
app.run(host="0.0.0.0", port=5050, debug=False)
FILE:dashboard/templates/index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Social Bot Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/chart.umd.min.js"></script>
<style>
:root {
--bg: #0f1117; --surface: #1a1d27; --border: #2d3148;
--text: #e2e8f0; --muted: #8892a4; --accent: #6366f1;
--green: #10b981; --yellow: #f59e0b; --red: #ef4444;
--x-color: #1d9bf0; --reddit-color: #ff4500;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body { background: var(--bg); color: var(--text); font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; }
header { background: var(--surface); border-bottom: 1px solid var(--border); padding: 16px 32px; display: flex; align-items: center; gap: 12px; }
header h1 { font-size: 18px; font-weight: 600; }
.badge { background: var(--accent); color: #fff; font-size: 11px; padding: 2px 8px; border-radius: 99px; }
.live-dot { width: 8px; height: 8px; border-radius: 50%; background: var(--green); animation: pulse 2s infinite; }
@keyframes pulse { 0%,100%{opacity:1} 50%{opacity:.3} }
main { padding: 24px 32px; max-width: 1400px; margin: 0 auto; }
/* Stats grid */
.stats { display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 16px; margin-bottom: 28px; }
.card { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; padding: 20px; }
.card .label { font-size: 12px; color: var(--muted); text-transform: uppercase; letter-spacing: .5px; margin-bottom: 8px; }
.card .value { font-size: 32px; font-weight: 700; }
.card .sub { font-size: 12px; color: var(--muted); margin-top: 4px; }
.card.accent { border-color: var(--accent); }
.progress-bar { height: 6px; background: var(--border); border-radius: 3px; margin-top: 12px; overflow: hidden; }
.progress-fill { height: 100%; border-radius: 3px; transition: width .6s ease; }
.x-fill { background: var(--x-color); }
.reddit-fill { background: var(--reddit-color); }
/* Chart */
.chart-section { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; padding: 24px; margin-bottom: 28px; }
.chart-section h2 { font-size: 14px; font-weight: 600; margin-bottom: 20px; color: var(--muted); text-transform: uppercase; letter-spacing: .5px; }
canvas { max-height: 240px; }
/* Table */
.table-section { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; padding: 24px; }
.table-section h2 { font-size: 14px; font-weight: 600; margin-bottom: 16px; color: var(--muted); text-transform: uppercase; letter-spacing: .5px; }
.table-wrap { overflow-x: auto; }
table { width: 100%; border-collapse: collapse; font-size: 13px; }
th { text-align: left; color: var(--muted); font-weight: 500; font-size: 11px; text-transform: uppercase; letter-spacing: .5px; padding: 8px 12px; border-bottom: 1px solid var(--border); }
td { padding: 10px 12px; border-bottom: 1px solid rgba(45,49,72,.5); vertical-align: top; }
tr:last-child td { border-bottom: none; }
tr:hover td { background: rgba(99,102,241,.05); }
.platform-badge { display: inline-block; padding: 2px 8px; border-radius: 4px; font-size: 11px; font-weight: 600; }
.p-x { background: rgba(29,155,240,.15); color: var(--x-color); }
.p-reddit { background: rgba(255,69,0,.15); color: var(--reddit-color); }
.product-badge { display: inline-block; padding: 2px 8px; border-radius: 4px; font-size: 11px; }
.p-solvea { background: rgba(99,102,241,.2); color: #a5b4fc; }
.p-voc { background: rgba(16,185,129,.2); color: #6ee7b7; }
.status-dot { width: 7px; height: 7px; border-radius: 50%; display: inline-block; margin-right: 5px; }
.s-posted { background: var(--green); }
.s-failed { background: var(--red); }
.s-skipped { background: var(--yellow); }
.reply-text { max-width: 360px; color: var(--muted); line-height: 1.5; }
a { color: var(--accent); text-decoration: none; }
a:hover { text-decoration: underline; }
.refresh-btn { margin-left: auto; background: var(--border); border: none; color: var(--text); padding: 6px 14px; border-radius: 6px; cursor: pointer; font-size: 13px; }
.refresh-btn:hover { background: var(--accent); }
</style>
</head>
<body>
<header>
<div class="live-dot"></div>
<h1>Social Bot Dashboard</h1>
<span class="badge">Solvea + VOC.ai</span>
<button class="refresh-btn" onclick="loadAll()">↻ Refresh</button>
</header>
<main>
<!-- Stats -->
<div class="stats">
<div class="card accent">
<div class="label">Total Today</div>
<div class="value" id="total-today">—</div>
<div class="sub" id="total-sub">Loading...</div>
</div>
<div class="card">
<div class="label">X / Twitter</div>
<div class="value" id="x-today">—</div>
<div class="sub" id="x-sub">target: 20</div>
<div class="progress-bar"><div class="progress-fill x-fill" id="x-prog" style="width:0%"></div></div>
</div>
<div class="card">
<div class="label">Reddit</div>
<div class="value" id="reddit-today">—</div>
<div class="sub" id="reddit-sub">target: 10</div>
<div class="progress-bar"><div class="progress-fill reddit-fill" id="reddit-prog" style="width:0%"></div></div>
</div>
<div class="card">
<div class="label">Solvea Mentions</div>
<div class="value" id="solvea-total">—</div>
<div class="sub">all time</div>
</div>
<div class="card">
<div class="label">VOC.ai Mentions</div>
<div class="value" id="voc-total">—</div>
<div class="sub">all time</div>
</div>
<div class="card">
<div class="label">All Time Replies</div>
<div class="value" id="all-time">—</div>
<div class="sub">posted</div>
</div>
</div>
<!-- Chart -->
<div class="chart-section">
<h2>Daily Replies — Last 30 Days</h2>
<canvas id="chart"></canvas>
</div>
<!-- Table -->
<div class="table-section">
<h2>Recent Replies</h2>
<div class="table-wrap">
<table>
<thead>
<tr>
<th>Time</th>
<th>Platform</th>
<th>Post</th>
<th>Reply</th>
<th>Product</th>
<th>Status</th>
</tr>
</thead>
<tbody id="replies-body">
<tr><td colspan="6" style="text-align:center;color:var(--muted);padding:32px">Loading...</td></tr>
</tbody>
</table>
</div>
</div>
</main>
<script>
let chartInstance = null;
async function loadOverview() {
const d = await fetch('/api/overview').then(r => r.json());
document.getElementById('total-today').textContent = d.today.total_posted;
document.getElementById('total-sub').textContent = `yesterday: d.yesterday.x_posted + d.yesterday.reddit_posted`;
document.getElementById('x-today').textContent = d.today.x_posted;
document.getElementById('x-sub').textContent = `target: d.today.x_target`;
document.getElementById('reddit-today').textContent = d.today.reddit_posted;
document.getElementById('reddit-sub').textContent = `target: d.today.reddit_target`;
document.getElementById('solvea-total').textContent = d.solvea_mentions;
document.getElementById('voc-total').textContent = d.voc_mentions;
document.getElementById('all-time').textContent = d.total_all_time;
document.getElementById('x-prog').style.width = Math.min(100, (d.today.x_posted / d.today.x_target) * 100) + '%';
document.getElementById('reddit-prog').style.width = Math.min(100, (d.today.reddit_posted / d.today.reddit_target) * 100) + '%';
}
async function loadChart() {
const d = await fetch('/api/chart/daily').then(r => r.json());
const labels = d.labels.map(l => l.slice(5)); // MM-DD
const ctx = document.getElementById('chart').getContext('2d');
if (chartInstance) chartInstance.destroy();
chartInstance = new Chart(ctx, {
type: 'bar',
data: {
labels,
datasets: [
{ label: 'X/Twitter', data: d.x, backgroundColor: 'rgba(29,155,240,.7)', stack: 'a' },
{ label: 'Reddit', data: d.reddit, backgroundColor: 'rgba(255,69,0,.7)', stack: 'a' },
]
},
options: {
responsive: true, maintainAspectRatio: true,
plugins: { legend: { labels: { color: '#8892a4', font: { size: 12 } } } },
scales: {
x: { stacked: true, grid: { color: '#2d3148' }, ticks: { color: '#8892a4' } },
y: { stacked: true, grid: { color: '#2d3148' }, ticks: { color: '#8892a4', stepSize: 5 } }
}
}
});
}
async function loadReplies() {
const rows = await fetch('/api/replies').then(r => r.json());
const tbody = document.getElementById('replies-body');
if (!rows.length) {
tbody.innerHTML = '<tr><td colspan="6" style="text-align:center;color:var(--muted);padding:32px">No replies yet today</td></tr>';
return;
}
tbody.innerHTML = rows.map(r => {
const time = r.created_at ? r.created_at.slice(0, 16).replace('T', ' ') : '—';
const plat = r.platform === 'x'
? '<span class="platform-badge p-x">𝕏</span>'
: '<span class="platform-badge p-reddit">Reddit</span>';
const title = r.post_url
? `<a href="r.post_url" target="_blank">(r.post_title || r.post_url).slice(0, 60)''</a>`
: '—';
const reply = `<div class="reply-text">(r.reply_text || '').slice(0, 120)''</div>`;
const prod = r.product
? `<span class="product-badge 'p-voc'">r.product</span>`
: '<span style="color:var(--muted)">—</span>';
const statusClass = { posted: 's-posted', failed: 's-failed', skipped: 's-skipped' }[r.status] || '';
const status = `<span class="status-dot statusClass"></span>r.status`;
return `<tr><td style="white-space:nowrap;color:var(--muted)">time</td><td>plat</td><td>title</td><td>reply</td><td>prod</td><td>status</td></tr>`;
}).join('');
}
async function loadAll() {
await Promise.all([loadOverview(), loadChart(), loadReplies()]);
}
loadAll();
setInterval(loadOverview, 30000); // refresh stats every 30s
setInterval(loadReplies, 60000); // refresh table every 60s
</script>
</body>
</html>
FILE:install.sh
#!/bin/bash
# ============================================================
# Social Reply Bot — One-command installer
# Works on any macOS with Homebrew + Python 3
#
# Usage:
# curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
# Or after cloning:
# bash install.sh
# ============================================================
set -e
REPO_URL="https://github.com/mguozhen/social-bot.git"
INSTALL_DIR="$HOME/social-bot"
PYTHON="python3"
echo ""
echo "╔══════════════════════════════════════╗"
echo "║ Social Reply Bot Installer ║"
echo "╚══════════════════════════════════════╝"
echo ""
# ── 1. Clone or update repo ──────────────────────────────────
echo "[1/6] Getting latest code..."
if [ -d "$INSTALL_DIR/.git" ]; then
cd "$INSTALL_DIR"
git pull --ff-only
echo " Updated existing repo at $INSTALL_DIR"
else
git clone "$REPO_URL" "$INSTALL_DIR"
echo " Cloned to $INSTALL_DIR"
fi
cd "$INSTALL_DIR"
# ── 2. Python dependencies ───────────────────────────────────
echo "[2/6] Installing Python packages..."
$PYTHON -m pip install -r requirements.txt --quiet
echo " Done: anthropic, flask, flask-cors, python-dotenv"
# ── 3. browse CLI ────────────────────────────────────────────
echo "[3/6] Checking browse CLI..."
if ! command -v browse &>/dev/null; then
if command -v npm &>/dev/null; then
npm install -g @browserbasehq/browse-cli --silent
echo " Installed browse CLI"
else
echo " WARNING: npm not found. Install Node.js then run:"
echo " npm install -g @browserbasehq/browse-cli"
fi
else
echo " browse CLI already installed ($(browse --version 2>/dev/null || echo 'ok'))"
fi
# ── 4. .env file ─────────────────────────────────────────────
echo "[4/6] Setting up credentials..."
if [ ! -f "$INSTALL_DIR/.env" ]; then
cp "$INSTALL_DIR/.env.template" "$INSTALL_DIR/.env"
echo ""
echo " ┌─────────────────────────────────────────────┐"
echo " │ ACTION REQUIRED: Add your API key │"
echo " │ │"
echo " │ nano $INSTALL_DIR/.env │"
echo " │ Set: ANTHROPIC_API_KEY=sk-ant-... │"
echo " └─────────────────────────────────────────────┘"
echo ""
read -p " Press Enter after editing .env to continue..." _
else
echo " .env already configured"
fi
# ── 5. Database init ─────────────────────────────────────────
echo "[5/6] Initializing database..."
$PYTHON -c "
import sys; sys.path.insert(0, '$INSTALL_DIR')
from bot.db import init_db; init_db()
print(' DB ready: $INSTALL_DIR/logs/social_bot.db')
"
# ── 6. LaunchAgent ───────────────────────────────────────────
echo "[6/6] Installing LaunchAgent (daily 10:05 AM)..."
PLIST_SRC="$INSTALL_DIR/launchd/com.socialbot.daily.plist"
PLIST_DST="$HOME/Library/LaunchAgents/com.socialbot.daily.plist"
mkdir -p "$HOME/Library/LaunchAgents"
# Inject real python3 path and API key
PYTHON_PATH="$(which $PYTHON)"
API_KEY="$(grep ANTHROPIC_API_KEY "$INSTALL_DIR/.env" | cut -d= -f2 | tr -d ' ')"
sed \
-e "s|/usr/bin/python3|$PYTHON_PATH|g" \
-e "s|FILL_IN|$API_KEY|g" \
-e "s|/Users/guozhen|$HOME|g" \
"$PLIST_SRC" > "$PLIST_DST"
launchctl unload "$PLIST_DST" 2>/dev/null || true
launchctl load "$PLIST_DST"
echo " LaunchAgent loaded — will run daily at 10:05 AM"
# ── Done ─────────────────────────────────────────────────────
echo ""
echo "╔══════════════════════════════════════════════════════╗"
echo "║ Installation complete! ║"
echo "║ ║"
echo "║ Next steps: ║"
echo "║ 1. Open browser and log in to Reddit + X manually ║"
echo "║ (just visit the sites — sessions are shared) ║"
echo "║ 2. Test run: ║"
echo "║ cd $HOME/social-bot ║"
echo "║ python3 run_daily.py --x-only ║"
echo "║ 3. Dashboard: python3 dashboard/app.py ║"
echo "║ Open: http://localhost:5050 ║"
echo "╚══════════════════════════════════════════════════════╝"
echo ""
FILE:post_from_pdf.py
#!/usr/bin/env python3
"""
直接按 PDF 里的 URL + 回复文案逐一发帖,不走关键词过滤。
"""
import sys, time, re, logging
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
sys.path.insert(0, str(Path(__file__).parent))
from bot import browser as B
from bot.db import init_db, log_reply, already_replied
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("pdf_poster")
# ── 从 PDF 提取的帖子列表 ────────────────────────────────────────────
POSTS = [
# ── Reddit ──────────────────────────────────────────────────────
{
"platform": "reddit",
"url": "https://old.reddit.com/r/SaaS/comments/1rvswsh/at_what_stage_should_a_startup_actually_add_ai/",
"title": "At what stage should a startup actually add AI customer support?",
"product": "Solvea",
"reply": (
"Love this take! So many founders wait too long and just drown in the inbox. "
"Connecting to Shopify/Stripe is definitely the game-changer here.\n\n"
"If you're ever looking to upgrade from a basic knowledge-base bot to something more 'agentic', "
"check out Solvea (solvea.cx). It doesn't just read your docs — it actually takes actions on Shopify "
"(processing returns, updating shipping addresses) so you don't even have to touch those 20-30% edge cases."
),
},
{
"platform": "reddit",
"url": "https://old.reddit.com/r/SaaS/comments/1rwu5eg/i_built_an_ai_customer_support_agent_for_shopify/",
"title": "I built an AI customer support agent for Shopify stores as a solo dev",
"product": "Solvea",
"reply": (
"Spot on! The LLM part is easy; the orchestration, edge cases, and actual reliability are the real nightmare.\n\n"
"We went through this exact same learning curve when building Solvea (solvea.cx). "
"We ended up focusing entirely on making it a true autonomous agent for Shopify that handles all those "
"messy edge cases (refunds, angry customers, complex order tracking) reliably out of the box. "
"Gorgias is definitely too bloated for most!"
),
},
{
"platform": "reddit",
"url": "https://old.reddit.com/r/SaaS/comments/1rwnow1/saas_idea_an_ai_sales_agent_for_shopify_that_only/",
"title": "SaaS idea: An AI Sales Agent for Shopify that only charges per successful sale",
"product": "Solvea",
"reply": (
"The performance-based pricing is an interesting hook! The core idea — that AI should be actively "
"closing sales and not just acting as a glorified FAQ bot — is exactly where the market is heading.\n\n"
"We built Solvea (solvea.cx) with this exact philosophy: it's an AI agent that handles support but "
"also actively guides purchasing decisions. Store owners definitely want ROI, not just another subscription."
),
},
{
"platform": "reddit",
"url": "https://old.reddit.com/r/FulfillmentByAmazon/comments/1rsfdwq/are_you_worried_ai_shopping_agents_are_now/",
"title": "Are you worried AI shopping agents are now deciding what gets bought on Amazon?",
"product": "VOC.ai",
"reply": (
"This is a huge shift. AI agents don't just read your keywords — they synthesize your actual customer "
"reviews to decide if your product is worth recommending. If your reviews mention specific flaws, the AI knows.\n\n"
"That's why tools like VOC.ai are becoming essential right now. It uses AI to deeply analyze your "
"(and your competitors') reviews to extract the exact sentiment and pain points. "
"If you optimize your product and listing based on what customers actually care about, "
"the AI agents will naturally favor you."
),
},
{
"platform": "reddit",
"url": "https://old.reddit.com/r/smallbusiness/comments/1rvc62f/heads_up_ai_assistants_are_now_recommending_your/",
"title": "Heads up: AI assistants are now recommending your competitors to your potential customers",
"product": "Solvea",
"reply": (
"This is the new reality of 'Agentic SEO.' If your business isn't easily understandable by an AI, "
"you don't exist.\n\n"
"Beyond making sure your site is crawlable, having your own AI agent on your site dramatically "
"improves how customers interact with your brand. We built Solvea (solvea.cx) to be that front-line "
"agent for ecommerce — it understands your entire business context and makes sure customers get the "
"exact right answers, rather than bouncing to Google or Perplexity to find a competitor."
),
},
{
"platform": "reddit",
"url": "https://old.reddit.com/r/FulfillmentByAmazon/comments/1rssrcu/software_stacks_for_678figure_sellers/",
"title": "Software Stacks for 6/7/8-Figure Sellers",
"product": "VOC.ai",
"reply": (
"Solid stack! One area that seems to be missing is deep Review Analysis / Voice of Customer.\n\n"
"For 7/8-figure brands, manually reading reviews to find product iteration opportunities is impossible. "
"A lot of top sellers are using VOC.ai right now — it uses AI to instantly break down thousands of reviews "
"into actionable insights (pain points, feature requests, sentiment trends). "
"Definitely worth adding to the stack for product development and listing optimization!"
),
},
{
"platform": "reddit",
"url": "https://old.reddit.com/r/Figsscrubs/comments/1rssb19/does_this_company_still_have_cs_representatives/",
"title": "Does this company still have CS representatives?",
"product": "Solvea",
"reply": (
"This is exactly what happens when brands just slap a cheap AI chatbot on their site to cut costs "
"without actually integrating it properly. It destroys the premium brand experience.\n\n"
"A real AI agent shouldn't just be a wall to block you from humans. Platforms like Solvea (solvea.cx) "
"are built to actually resolve the issue (pulling real-time tracking, processing a return autonomously) "
"and seamlessly hand off to a human with full context when it can't. "
"It's wild that billion-dollar brands are still using broken bots."
),
},
# ── X / Twitter ─────────────────────────────────────────────────
{
"platform": "x",
"url": "https://x.com/HamptonAc_/status/2032086324770193614",
"title": "The Laziest 2026 Shopify Blueprint",
"product": "Solvea",
"reply": (
"100% this. The leverage you get from AI agents in 2026 is insane. "
"When it comes to the customer support piece of this blueprint, Solvea (solvea.cx) is the cheat code. "
"It's an autonomous agent that plugs right into Shopify and handles the entire support inbox "
"(tracking, returns, product questions) so you can actually focus on scaling ads and finding winners "
"instead of answering emails."
),
},
{
"platform": "x",
"url": "https://x.com/jasonlk/status/2032991149322072517",
"title": "20+ AI Agents in production — unification not orchestration (Jason Lemkin)",
"product": "Solvea",
"reply": (
"Spot on, Jason. We saw this exact friction in the ecommerce support space. "
"Having multiple agents handling different parts of the customer journey creates a fragmented mess.\n\n"
"That's why with Solvea (solvea.cx), we built the agent to not only act autonomously on Shopify tickets, "
"but to have a unified inbox where the human can seamlessly step in, review the agent's context, "
"and take over without the customer ever feeling the handoff. Unification is the only way to scale this."
),
},
]
# ── Reddit 发评论 ────────────────────────────────────────────────────
def post_reddit_comment(reply_text: str) -> bool:
tree = B.snapshot()
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox(?!: search)', tree)
if not textarea_refs:
logger.warning("Reddit: no comment textarea found")
return False
B.click(textarea_refs[-1])
B.wait_seconds(1)
for i, para in enumerate(reply_text.split("\n\n")):
safe = para.replace("$", "").strip()
if safe:
B.type_text(safe)
if i < len(reply_text.split("\n\n")) - 1:
B.press("Enter")
B.press("Enter")
B.wait_seconds(1)
tree = B.snapshot()
save_refs = re.findall(r'\[(\d+-\d+)\] button: save', tree, re.I)
if not save_refs:
return False
B.click(save_refs[0])
B.wait_seconds(4)
confirm = B.snapshot()
return "mguozhen" in confirm and ("just now" in confirm or "1 minute ago" in confirm)
# ── X 发回复 ─────────────────────────────────────────────────────────
def post_x_reply(reply_text: str) -> bool:
tree = B.snapshot()
boxes = re.findall(r'\[(\d+-\d+)\] textbox: Post text', tree)
if not boxes:
logger.warning("X: no reply textbox found")
return False
B.click(boxes[0])
B.wait_seconds(1)
for i, para in enumerate(reply_text.split("\n\n")):
safe = para.replace("$", "").strip()
if safe:
B.type_text(safe)
if i < len(reply_text.split("\n\n")) - 1:
B.press("Enter")
B.press("Enter")
B.wait_seconds(1)
tree = B.snapshot()
reply_btns = re.findall(r'\[(\d+-\d+)\] button: Reply', tree)
if len(reply_btns) >= 2:
B.click(reply_btns[-1])
B.wait_seconds(3)
confirm = B.snapshot()
return "Your post was sent" in confirm or "post was sent" in confirm.lower()
return False
# ── 主流程 ────────────────────────────────────────────────────────────
def main():
init_db()
results = {"posted": 0, "skipped": 0, "failed": 0}
for i, post in enumerate(POSTS):
platform = post["platform"]
url = post["url"]
title = post["title"]
reply = post["reply"]
product = post["product"]
logger.info(f"[{i+1}/{len(POSTS)}] {platform.upper()} — {title[:55]}")
if already_replied(url):
logger.info(" → already replied, skip")
results["skipped"] += 1
continue
B.open_url(url)
B.wait_seconds(4)
if platform == "reddit":
success = post_reddit_comment(reply)
else:
success = post_x_reply(reply)
if success:
log_reply(platform, url, title, "", reply, product, "posted")
results["posted"] += 1
logger.info(f" ✓ posted ({product})")
time.sleep(8)
else:
log_reply(platform, url, title, "", reply, product, "failed")
results["failed"] += 1
logger.warning(f" ✗ failed")
time.sleep(5)
logger.info(f"\n=== 完成 === posted={results['posted']} failed={results['failed']} skipped={results['skipped']}")
return results
if __name__ == "__main__":
main()
FILE:requirements.txt
anthropic>=0.40.0
flask>=3.0.0
flask-cors>=4.0.0
python-dotenv>=1.0.0
FILE:run_daily.py
#!/usr/bin/env python3
"""
Daily social media reply bot.
Scheduled via macOS LaunchAgent at 10:05 AM.
Runs standalone — no Claude Code, no confirmation prompts.
Usage:
python run_daily.py # run both platforms
python run_daily.py --x-only
python run_daily.py --reddit-only
"""
import sys
import json
import logging
import time
import argparse
from pathlib import Path
from datetime import datetime
# Load env vars from .env if present
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
except ImportError:
pass
# Setup logging
LOG_DIR = Path(__file__).parent / "logs"
LOG_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(LOG_DIR / f"run_{datetime.now():%Y-%m-%d}.log"),
logging.StreamHandler(sys.stdout),
]
)
logger = logging.getLogger("main")
# Import bot modules
sys.path.insert(0, str(Path(__file__).parent))
from bot.db import init_db
from bot import x_bot, reddit_bot
CONFIG = json.loads((Path(__file__).parent / "config.json").read_text())
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--x-only", action="store_true")
parser.add_argument("--reddit-only", action="store_true")
args = parser.parse_args()
run_x = not args.reddit_only
run_reddit = not args.x_only
init_db()
start = time.time()
results = {}
logger.info("=" * 60)
logger.info(f"Daily bot started — {datetime.now():%Y-%m-%d %H:%M}")
logger.info("=" * 60)
# ── X/Twitter ──────────────────────────────────────────────
if run_x:
logger.info("Starting X/Twitter run...")
try:
results["x"] = x_bot.run(CONFIG["x"])
logger.info(f"X done: {results['x']}")
except Exception as e:
logger.error(f"X run failed: {e}", exc_info=True)
results["x"] = {"error": str(e)}
# ── Reddit ─────────────────────────────────────────────────
if run_reddit:
logger.info("Starting Reddit run...")
try:
results["reddit"] = reddit_bot.run(CONFIG["reddit"])
logger.info(f"Reddit done: {results['reddit']}")
except Exception as e:
logger.error(f"Reddit run failed: {e}", exc_info=True)
results["reddit"] = {"error": str(e)}
elapsed = int(time.time() - start)
logger.info(f"All done in {elapsed}s — {results}")
# Write today's summary for dashboard
summary_file = LOG_DIR / f"summary_{datetime.now():%Y-%m-%d}.json"
import json as _json
summary_file.write_text(_json.dumps({
"date": datetime.now().isoformat(),
"elapsed_secs": elapsed,
"results": results,
}, indent=2))
if __name__ == "__main__":
main()
FILE:setup.sh
#!/bin/bash
set -e
echo "========================================"
echo " Social Bot Setup"
echo "========================================"
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
# 1. Python deps
echo "[1/5] Installing Python packages..."
pip3 install -r "$SCRIPT_DIR/requirements.txt" --quiet
# 2. browse CLI
echo "[2/5] Checking browse CLI..."
if ! command -v browse &>/dev/null; then
echo " Installing @browserbasehq/browse-cli..."
npm install -g @browserbasehq/browse-cli
else
echo " browse CLI already installed: $(browse --version 2>/dev/null || echo 'ok')"
fi
# 3. .env file
echo "[3/5] Setting up .env..."
if [ ! -f "$SCRIPT_DIR/.env" ]; then
cp "$SCRIPT_DIR/.env.template" "$SCRIPT_DIR/.env"
echo " ⚠️ .env created — fill in your credentials:"
echo " nano $SCRIPT_DIR/.env"
else
echo " .env already exists"
fi
# 4. Init database
echo "[4/5] Initializing database..."
python3 -c "
import sys; sys.path.insert(0, '$SCRIPT_DIR')
from bot.db import init_db; init_db()
print(' DB initialized: $SCRIPT_DIR/logs/social_bot.db')
"
# 5. LaunchAgent install
echo "[5/5] Installing LaunchAgent (daily 10:05 AM)..."
PLIST_SRC="$SCRIPT_DIR/launchd/com.socialbot.daily.plist"
PLIST_DST="$HOME/Library/LaunchAgents/com.socialbot.daily.plist"
# Update Python path in plist to current python3
PYTHON_PATH="$(which python3)"
sed "s|/usr/bin/python3|$PYTHON_PATH|g" "$PLIST_SRC" > "$PLIST_DST"
# Unload if already loaded
launchctl unload "$PLIST_DST" 2>/dev/null || true
launchctl load "$PLIST_DST"
echo " LaunchAgent loaded — next run: tomorrow 10:05 AM"
echo ""
echo "========================================"
echo " Setup complete!"
echo ""
echo " Next steps:"
echo " 1. Fill in Anthropic key: nano $SCRIPT_DIR/.env"
echo " 2. Test run: python3 $SCRIPT_DIR/run_daily.py"
echo " (first run: browser opens for X/Reddit login — log in once)"
echo " 3. Start dashboard: python3 $SCRIPT_DIR/dashboard/app.py"
echo " 4. Open dashboard: http://localhost:5050"
echo ""
echo " For another Mac: scp -r $SCRIPT_DIR/ user@newmac:~/social-bot/"
echo " then run setup.sh + log in once on that machine"
echo "========================================"
FILE:skill.sh
#!/bin/bash
# social-reply-bot skill entry point
# Called by openclaw with user's prompt as $*
set -e
BOT_DIR="$(cd "$(dirname "$0")" && pwd)"
PROMPT="$(echo "$*" | tr '[:upper:]' '[:lower:]')"
# Load env
if [ -f "$BOT_DIR/.env" ]; then
set -a
source "$BOT_DIR/.env"
set +a
fi
# Check ANTHROPIC_API_KEY
if [ -z "$ANTHROPIC_API_KEY" ]; then
echo "ERROR: ANTHROPIC_API_KEY not set. Edit $BOT_DIR/.env"
exit 1
fi
# Check browse CLI
if ! command -v browse &>/dev/null; then
echo "ERROR: browse CLI not found. Run: npm install -g @browserbasehq/browse-cli"
exit 1
fi
cd "$BOT_DIR"
# Route by prompt keywords
if echo "$PROMPT" | grep -qE "warmup|warm up|karma|养号"; then
COUNT=8
if echo "$PROMPT" | grep -qE "[0-9]+"; then
COUNT=$(echo "$PROMPT" | grep -oE "[0-9]+" | head -1)
fi
echo "Running Reddit warmup — target COUNT comments..."
python3 "$BOT_DIR/warmup_reddit.py" "$COUNT"
elif echo "$PROMPT" | grep -qE "leads|客户|potential customer"; then
echo "=== Solvea Leads ==="
python3 -c "
import sys, json; sys.path.insert(0, '.')
from bot.db import get_leads
leads = get_leads(7)
if not leads:
print('No leads found in last 7 days.')
else:
print(f'Found {len(leads)} leads in last 7 days:\n')
for l in leads:
pain = json.loads(l.get('pain_points','[]'))
print(f\"[{l['lead_score']}/10] [{l['urgency'].upper()}] {l['platform'].upper()}\")
print(f\" Title: {l['post_title']}\")
print(f\" Type: {l['business_type']}\")
print(f\" Pain: {', '.join(pain[:2])}\")
print(f\" Why: {l['reason']}\")
print(f\" URL: {l['post_url']}\")
print()
"
elif echo "$PROMPT" | grep -qE "stat|count|how many|report"; then
echo "=== Social Bot Stats ==="
python3 -c "
import sys; sys.path.insert(0, '.')
from bot.db import get_stats, get_today_count
print('Today: X=%d Reddit=%d' % (get_today_count('x'), get_today_count('reddit')))
stats = get_stats(7)
if stats:
print('\nLast 7 days:')
for s in stats[:14]:
print(' %s [%s] posted=%d failed=%d' % (s['day'], s['platform'], s['posted'], s['failed']))
else:
print('No data yet.')
"
elif echo "$PROMPT" | grep -qE "dashboard|ui|open|browser"; then
echo "Starting dashboard at http://localhost:5050 ..."
python3 "$BOT_DIR/dashboard/app.py" &
sleep 2
open http://localhost:5050
echo "Dashboard opened."
elif echo "$PROMPT" | grep -qE "x only|twitter only|tweet only"; then
echo "Running X/Twitter bot..."
python3 "$BOT_DIR/run_daily.py" --x-only
elif echo "$PROMPT" | grep -qE "reddit only"; then
echo "Running Reddit bot..."
python3 "$BOT_DIR/run_daily.py" --reddit-only
else
echo "Running both X and Reddit bots..."
python3 "$BOT_DIR/run_daily.py"
fi
FILE:warmup_reddit.py
#!/usr/bin/env python3
"""
Reddit 账号养号脚本 — 在通用版块发真实评论积累 Karma
"""
import sys, time, re, logging, os
from pathlib import Path
from typing import Optional, List
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
sys.path.insert(0, str(Path(__file__).parent))
import anthropic
from bot import browser as B
from bot.reddit_bot import _ensure_logged_in, _post_comment
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("warmup")
BASE_URL = "https://old.reddit.com"
WARMUP_SUBREDDITS = [
"FreeKarma4U", # 专为新账号互赞设计
"karma", # 同上
"newreddits", # 新版块,门槛低
"CasualConversation", # 轻松聊天,对新账号友好
"self", # 自我分享,极少 automod
"NoStupidQuestions", # 问答,友好社区
"mildlyinteresting", # 轻度过滤
"Showerthoughts", # 脑洞,低门槛
]
MAX_COMMENTS = int(sys.argv[1]) if len(sys.argv) > 1 else 8
DELAY_SUCCESS = 90 # 两条评论之间间隔
DELAY_FAIL = 15
def generate_comment(post_title: str, post_content: str, subreddit: str) -> Optional[str]:
"""用 Claude Haiku 生成一条真实评论(不带任何产品推广)"""
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
system = """You are a genuine Reddit user. Write short, authentic comments.
Rules:
- Never mention products, companies, or services
- Sound like a real person, not AI
- Be conversational, curious, or share a relatable experience
- 1-3 sentences is ideal
- Casual tone, can use lowercase
- No hashtags, no bullet points
- Never start with "I" as the first word"""
user = f"""Subreddit: r/{subreddit}
Post: {post_title}
Content: {post_content[:500]}
Write one genuine comment. If too political/controversial/personal, reply: SKIP"""
try:
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=120,
messages=[{"role": "user", "content": user}],
system=system,
)
text = msg.content[0].text.strip()
# Remove any leading SKIP explanation
if text.upper().startswith("SKIP"):
return None
return text
except Exception as e:
logger.error(f"Claude error: {e}")
return None
def get_post_urls(subreddit: str, count: int = 8) -> List[dict]:
"""
访问版块 /hot/,收集帖子 URL 列表。
策略:依次点击 comment 链接获取 URL,再返回列表页。
"""
B.open_url(f"{BASE_URL}/r/{subreddit}/hot/")
time.sleep(3)
# 先收集所有帖子标题 + comment 数量
tree = B.snapshot()
submitted_positions = [m.start() for m in re.finditer(r'\bsubmitted\b', tree)]
candidates = []
seen_titles = set()
skip_words = ["Submit a new", "Welcome to", "About /r/", "wiki",
"Discord", "/r/", "http", "View Poll", "rules", "moderator"]
for pos in submitted_positions[:30]:
before = tree[max(0, pos - 1500):pos]
links = re.findall(r'\[(\d+-\d+)\] link: ([^\n]{15,200})', before)
if not links:
continue
_, title = links[-1]
title = title.strip()
if any(s.lower() in title.lower() for s in skip_words):
continue
if title in seen_titles:
continue
# Comment count
after = tree[pos:pos + 400]
cm = re.search(r'\[(\d+-\d+)\] link: (\d+) comments?', after)
if not cm:
continue
n = int(cm.group(2))
if n < 1 or n > 500:
continue
seen_titles.add(title)
candidates.append({"title": title, "comment_count": n})
if len(candidates) >= count:
break
if not candidates:
return []
# 逐一点击 comment 链接获取真实 URL
posts = []
for idx in range(min(len(candidates), count)):
# 每次都要重新获取 snapshot(因为导航后 refs 变了)
B.open_url(f"{BASE_URL}/r/{subreddit}/hot/")
time.sleep(3)
tree = B.snapshot()
comment_links = re.findall(r'\[(\d+-\d+)\] link: \d+ comments?', tree)
if idx >= len(comment_links):
break
B.click(comment_links[idx])
time.sleep(4)
url = B.get_url()
if not url or subreddit.lower() not in url.lower():
continue
# 确保是 old.reddit.com 格式
url = url.replace("www.reddit.com", "old.reddit.com")
candidates[idx]["url"] = url
posts.append(candidates[idx])
logger.info(f" [{idx+1}] {candidates[idx]['title'][:55]}")
return posts
def warmup_post(post: dict, subreddit: str) -> bool:
"""
访问帖子页面,生成评论,发出。
假设我们已经在热帖列表页。
"""
url = post.get("url", "")
if not url:
return False
B.open_url(url)
time.sleep(5) # 多等一点,让评论框加载
tree = B.snapshot()
# 提取帖子内容(跳过 sidebar,从标题后开始)
title_idx = tree.find(post["title"][:35])
if title_idx > 0:
chunk = tree[title_idx: title_idx + 3000]
else:
chunk = tree[3000:6000] # fallback
text_blocks = re.findall(r'StaticText: ([^\n]{20,})', chunk)
meta = {"submitted", "by", "share", "save", "hide", "report",
"crosspost", "sorted by:", "best", "formatting help"}
clean = [t for t in text_blocks if t.strip().lower() not in meta]
snippet = " ".join(clean[:10])[:600]
# 生成评论
comment = generate_comment(post["title"], snippet, subreddit)
if not comment:
logger.info(" → SKIP")
return False
logger.info(f" Comment: {comment[:90]}")
# 检查是否有评论框(说明已登录且帖子未锁定)
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox(?!: search)', tree)
if not textarea_refs:
# 尝试等待更长时间再次检查
logger.info(" → no textarea, waiting 3s and retrying...")
time.sleep(3)
tree = B.snapshot()
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox(?!: search)', tree)
if not textarea_refs:
logger.warning(" → textarea still not found (locked/not logged in?)")
# Debug: show what's in tree
logger.debug(f"Tree sample: {tree[3000:4500]}")
return False
return _post_comment(comment)
def main():
logger.info(f"=== Reddit 养号开始,目标 {MAX_COMMENTS} 条评论 ===")
if not _ensure_logged_in():
logger.error("Reddit 未登录,退出")
return
posted = 0
failed = 0
skipped = 0
for subreddit in WARMUP_SUBREDDITS:
if posted >= MAX_COMMENTS:
break
logger.info(f"\n[r/{subreddit}] 获取帖子列表...")
posts = get_post_urls(subreddit, count=4)
logger.info(f" 找到 {len(posts)} 个帖子")
for post in posts:
if posted >= MAX_COMMENTS:
break
logger.info(f"\n » {post['title'][:60]} ({post['comment_count']} 评论)")
success = warmup_post(post, subreddit)
if success:
posted += 1
logger.info(f" ✓ 评论发出!累计 {posted}/{MAX_COMMENTS}")
wait = DELAY_SUCCESS + posted * 15
logger.info(f" 等待 {wait}s...")
time.sleep(wait)
else:
skipped_or_failed = "skipped" if not post.get("url") else "failed"
if skipped_or_failed == "failed":
failed += 1
else:
skipped += 1
time.sleep(DELAY_FAIL)
logger.info(f"\n=== 完成 === ✓posted={posted} ✗failed={failed} skip={skipped}")
logger.info("Karma 查看: https://www.reddit.com/user/mguozhen/")
if __name__ == "__main__":
main()
深度思考与多维分析 AI Skill。通过发散、收敛、批判、创造四种思维模态对复杂问题进行结构化拆解,输出 ASCII 思维导图和 Markdown 分析报告。Triggers: 深度思考, 深度分析, 帮我分析, 帮我想清楚, 多维分析, 结构化思考, 决策分析, 帮我做决策, 我要做决定, 创意分析, 头脑风...
---
name: mirothinker1.7
description: "深度思考与多维分析 AI Skill。通过发散、收敛、批判、创造四种思维模态对复杂问题进行结构化拆解,输出 ASCII 思维导图和 Markdown 分析报告。Triggers: 深度思考, 深度分析, 帮我分析, 帮我想清楚, 多维分析, 结构化思考, 决策分析, 帮我做决策, 我要做决定, 创意分析, 头脑风暴, 脑暴, 思维导图, 问题拆解, deep think, brainstorm, decision analysis, mind map, should I, how should I, think deeply"
allowed-tools: Bash
metadata:
openclaw:
homepage: https://github.com/mguozhen/mirothinker1.7
---
# MiroThinker 1.7
AI 深度思考技能 —— 四维思维模态引擎。
输入任何复杂问题,通过发散 → 收敛 → 批判 → 创造四个阶段,输出结构化思维导图和行动方案。
## 使用方式
```bash
# 直接输入问题
深度分析:我要不要辞职创业?
# 快速模式(跳过批判模态,更快)
帮我快速分析:如何提升用户留存率
# 保存 Markdown 报告
帮我深度思考并保存报告:产品定价策略
```
## 思维模态
| 模态 | 职责 |
|------|------|
| P0 问题解构 | 识别问题类型、约束、成功指标 |
| M1 发散 | 无限展开可能性,生成 6-8 个维度 |
| M2 收敛 | 提炼核心洞察,构建优先级矩阵 |
| M3 批判 | 挑战假设,识别盲点和风险 |
| M4 创造 | 综合输出突破性方案 + 一句话答案 |
FILE:README.md
# MiroThinker 1.7
> AI 深度思考技能 —— 四维思维模态引擎
输入任何复杂问题,通过 **发散 → 收敛 → 批判 → 创造** 四个阶段,输出结构化 ASCII 思维导图和行动方案。
## 安装
```bash
openclaw install mirothinker1.7
```
## 使用方式
```bash
# 标准模式(推荐)
openclaw run mirothinker1.7 "我要不要辞职创业?"
# 快速模式(跳过 M3 批判,约 1-2 分钟)
openclaw run mirothinker1.7 --mode=quick "如何提升用户留存率"
# 深度模式(M2 迭代修订,约 4-6 分钟)
openclaw run mirothinker1.7 --mode=deep "公司未来三年战略方向"
# 保存 Markdown 报告
openclaw run mirothinker1.7 --save "产品定价策略分析"
# 从文件读取长问题
openclaw run mirothinker1.7 --file=problem.txt
# 管道输入
echo "如何设计好的 API" | openclaw run mirothinker1.7
```
## 思维模态说明
| 模态 | 代号 | 职责 | 模式 |
|------|------|------|------|
| 问题解构 | P0 | 识别类型/约束/成功指标 | 全部 |
| 发散思维 | M1 | 展开 6-8 个差异显著的维度 | 全部 |
| 收敛提炼 | M2 | 提炼 TOP3 洞察 + 优先级矩阵 | 全部 |
| 批判审查 | M3 | 挑战假设/找盲点/风险矩阵 | full/deep |
| 创造综合 | M4 | 突破性方案 + 一句话答案 | 全部 |
## 适用场景
- 重大职业/人生决策
- 产品/商业策略分析
- 技术选型评估
- 创意方案头脑风暴
- 复杂问题结构化拆解
## 输出示例
```
╔══════════════════════════════════════════════════════════════╗
║ MIROTHINKER 1.7 • 深度思考引擎
║ 问题:我要不要辞职创业?
╚══════════════════════════════════════════════════════════════╝
我要不要辞职创业?
├── [M1 发散] 可能性空间
│ ├── 财务维度
│ │ ├── 当前储蓄可支撑几个月?
│ │ └── ★ 野卡:假设已失败过一次,还会创业吗?
│ └── 机会成本维度
│ └── 错过创业窗口期的代价是什么?
├── [M2 收敛] 核心洞察
│ ├── [HIGH] 资金储备是决策的刚性约束
│ └── [HIGH] 合伙人结构比创意本身更关键
├── [M3 批判] 风险审查
│ ├── ⚠ [HIGH] 创业 ≠ 自由(认知偏差)
│ └── 裁定:NEEDS_REVISION
└── [M4 创造] 综合方案
├── 方案A:Side-Project 先验证 [置信度:HIGH]
│ 核心:降低沉没成本,验证后再辞职
│ 1. 3个月内 MVP 上线(本月内)
│ └── 预期:MRR $1000 再辞职,风险降低 80%
└── 一句话答案:"先验证,再离职,别两头摇摆。"
◆ 元认知洞察
"你问的是'要不要',真实问题是'怎样降低失败成本'。"
```
## 依赖
- `openclaw` CLI
- `python3`(macOS 预装)
## 版本历史
- **1.7.0** 初始发布,四维思维模态 + 三增强层
FILE:analyze.sh
#!/usr/bin/env bash
# analyze.sh — MiroThinker 1.7 核心分析引擎
# 通过 source 加载,共享 think.sh 的变量空间
# bash 3.2 兼容
# ── LLM 调用封装 ─────────────────────────────────────────────
# 用法:mt_call_llm "阶段名" "prompt内容" "输出变量名"
mt_call_llm() {
local stage="$1"
local prompt="$2"
local out_var="$3"
local tmpfile="/tmp/mt_stage_$$.txt"
local resp_file="/tmp/mt_stage_resp_$$.json"
local retry=0
# 把 prompt 写入临时文件,避免命令行引号问题
printf '%s' "$prompt" > "$tmpfile"
while [ $retry -lt 3 ]; do
if openclaw agent --local --json \
--message "$(cat "$tmpfile")" \
> "$resp_file" 2>/dev/null; then
break
fi
retry=$((retry + 1))
if [ $retry -lt 3 ]; then
echo -e "YELLOW 重试 ($retry/2)...RESET"
fi
done
local raw=""
if [ -f "$resp_file" ]; then
raw=$(python3 -c "
import json, sys
try:
data = json.load(open('$resp_file'))
payloads = data.get('payloads', [])
print(payloads[0].get('text', '') if payloads else '')
except:
sys.stdout.write('')
")
fi
rm -f "$tmpfile" "$resp_file"
# 导出到指定变量名
export "out_var"="$raw"
}
# ── JSON 安全提取 ─────────────────────────────────────────────
# 用法:mt_json_field "变量名" "字段" "默认值"
mt_json_field() {
local var_name="$1"
local field="$2"
local default="-"
local raw
raw=$(eval "echo \"\${var_name:-}\"")
python3 -c "
import json, re, sys
raw = '''$raw'''
field = '$field'
default = '$default'
# 从 LLM 输出中提取 JSON(可能有前缀文字)
match = re.search(r'\{.*\}', raw, re.DOTALL)
if not match:
print(default)
sys.exit(0)
try:
data = json.loads(match.group())
val = data.get(field, default)
if isinstance(val, (dict, list)):
print(json.dumps(val, ensure_ascii=False))
else:
print(val if val is not None else default)
except Exception:
print(default)
"
}
# ── JSON 数组转可读摘要 ───────────────────────────────────────
mt_json_array_summary() {
local json_str="$1"
local key="-name"
python3 -c "
import json, sys
try:
data = json.loads('''$json_str''')
if isinstance(data, list):
items = [str(x.get('$key', x) if isinstance(x, dict) else x) for x in data[:6]]
print(', '.join(items))
else:
print('')
except:
print('')
"
}
# ════════════════════════════════════════════════════════════════
# P0 — 问题解构
# ════════════════════════════════════════════════════════════════
run_p0() {
show_stage "🔍" "P0 问题解构" 1 "$MT_TOTAL_STAGES"
local prompt
prompt="你是专业的问题分析师。请对以下问题进行元分析,不要直接回答。
问题:MT_QUERY
以 JSON 格式输出(只输出 JSON,无任何前缀后缀):
{
\"topic\": \"问题核心主题(6字以内)\",
\"type\": \"DECISION或CREATIVE或ANALYSIS或PLANNING之一\",
\"domain\": \"所属领域(如:职业发展/产品设计/商业战略)\",
\"entities\": [\"关键实体1\", \"关键实体2\"],
\"constraints\": [\"约束条件1\", \"约束条件2\"],
\"success_metrics\": [\"成功标准1\", \"成功标准2\"],
\"complexity\": \"LOW或MEDIUM或HIGH之一\"
}
注意:只输出 JSON,不要有任何前缀、后缀或解释。"
mt_call_llm "p0" "$prompt" "MT_P0_RAW"
export MT_TOPIC
MT_TOPIC=$(mt_json_field "MT_P0_RAW" "topic" "未知主题")
export MT_TYPE
MT_TYPE=$(mt_json_field "MT_P0_RAW" "type" "ANALYSIS")
export MT_DOMAIN
MT_DOMAIN=$(mt_json_field "MT_P0_RAW" "domain" "通用")
export MT_COMPLEXITY
MT_COMPLEXITY=$(mt_json_field "MT_P0_RAW" "complexity" "MEDIUM")
export MT_ENTITIES
MT_ENTITIES=$(mt_json_field "MT_P0_RAW" "entities" "[]")
export MT_CONSTRAINTS
MT_CONSTRAINTS=$(mt_json_field "MT_P0_RAW" "constraints" "[]")
export MT_SUCCESS_METRICS
MT_SUCCESS_METRICS=$(mt_json_field "MT_P0_RAW" "success_metrics" "[]")
echo -e " DIM类型:RESET $MT_TYPE DIM复杂度:RESET $MT_COMPLEXITY DIM领域:RESET $MT_DOMAIN"
}
# ════════════════════════════════════════════════════════════════
# M1 — 发散模态
# ════════════════════════════════════════════════════════════════
run_m1() {
show_stage "🌊" "M1 发散思维" 2 "$MT_TOTAL_STAGES"
local prompt
prompt="你是发散思维专家。对以下问题,用最宽广视角展开思考,不评判,不收敛。
问题:MT_QUERY
问题类型:MT_TYPE
核心领域:MT_DOMAIN
生成 6 个完全不同的思维维度,维度间差异显著,覆盖理性/感性、短期/长期、个人/系统等对立面。
以 JSON 格式输出(只输出 JSON):
{
\"dimensions\": [
{
\"name\": \"维度名称(4字以内)\",
\"angle\": \"切入角度(20字以内)\",
\"perspectives\": [\"视角1(30字以内)\", \"视角2(30字以内)\"],
\"wild_card\": \"大胆假设(30字以内)\"
}
]
}
注意:只输出 JSON,不要有任何前缀、后缀或解释。"
mt_call_llm "m1" "$prompt" "MT_DIVERGE"
export MT_DIVERGE_SUMMARY
MT_DIVERGE_SUMMARY=$(python3 -c "
import json, re
raw = '''MT_DIVERGE'''
try:
match = re.search(r'\{.*\}', raw, re.DOTALL)
data = json.loads(match.group()) if match else {}
dims = data.get('dimensions', [])
names = [d.get('name','') for d in dims if d.get('name')]
print(', '.join(names[:6]))
except:
print('发散维度')
")
echo -e " DIM维度:RESET $MT_DIVERGE_SUMMARY"
}
# ════════════════════════════════════════════════════════════════
# M2 — 收敛模态
# ════════════════════════════════════════════════════════════════
run_m2() {
show_stage "🎯" "M2 收敛提炼" 3 "$MT_TOTAL_STAGES"
local prompt
prompt="你是结构化思维专家。基于发散思维产物,进行聚焦提炼。
原始问题:MT_QUERY
已识别维度:MT_DIVERGE_SUMMARY
从以上维度中提炼最有价值的洞察,构建优先级矩阵。
以 JSON 格式输出(只输出 JSON):
{
\"core_insights\": [
{
\"insight\": \"核心洞察(40字以内)\",
\"impact\": \"HIGH或MEDIUM或LOW之一\",
\"confidence\": \"HIGH或MEDIUM或LOW之一\",
\"rationale\": \"理由(30字以内)\"
}
],
\"priority_matrix\": {
\"do_first\": [\"立即行动项1\", \"立即行动项2\"],
\"plan_next\": [\"计划推进项1\"],
\"watch_later\": [\"观察等待项1\"],
\"drop\": [\"可忽略项1\"]
},
\"synthesis\": \"综合性结论(80字以内)\"
}
注意:只输出 JSON,不要有任何前缀、后缀或解释。"
mt_call_llm "m2" "$prompt" "MT_CONVERGE"
export MT_CONVERGE_SYNTHESIS
MT_CONVERGE_SYNTHESIS=$(mt_json_field "MT_CONVERGE" "synthesis" "综合分析完成")
export MT_PRIORITY_MATRIX
MT_PRIORITY_MATRIX=$(mt_json_field "MT_CONVERGE" "priority_matrix" "{}")
echo -e " DIM结论:RESET $(echo "$MT_CONVERGE_SYNTHESIS" | head -c 60)..."
}
# ════════════════════════════════════════════════════════════════
# M3 — 批判模态
# ════════════════════════════════════════════════════════════════
run_m3() {
show_stage "⚡" "M3 批判审查" 4 "$MT_TOTAL_STAGES"
local prompt
prompt="你是批判性思维专家,专门寻找漏洞、盲点和风险。直接挑战,不要客气。
原始问题:MT_QUERY
当前结论:MT_CONVERGE_SYNTHESIS
对以上结论进行深度批判性审查。
以 JSON 格式输出(只输出 JSON):
{
\"assumptions_challenged\": [
{
\"assumption\": \"被挑战的假设(30字以内)\",
\"challenge\": \"挑战方式(40字以内)\",
\"severity\": \"HIGH或MEDIUM或LOW之一\"
}
],
\"blind_spots\": [\"盲点1(30字以内)\", \"盲点2(30字以内)\"],
\"risks\": [
{
\"risk\": \"风险描述(30字以内)\",
\"probability\": \"HIGH或MEDIUM或LOW之一\",
\"impact\": \"HIGH或MEDIUM或LOW之一\",
\"mitigation\": \"缓解策略(30字以内)\"
}
],
\"steelman\": \"对当前结论最强力的辩护(50字以内)\",
\"verdict\": \"SOLID或NEEDS_REVISION或FUNDAMENTALLY_FLAWED之一\"
}
注意:只输出 JSON,不要有任何前缀、后缀或解释。"
mt_call_llm "m3" "$prompt" "MT_CRITIQUE"
export MT_CRITIQUE_VERDICT
MT_CRITIQUE_VERDICT=$(mt_json_field "MT_CRITIQUE" "verdict" "NEEDS_REVISION")
export MT_CRITIQUE_STEELMAN
MT_CRITIQUE_STEELMAN=$(mt_json_field "MT_CRITIQUE" "steelman" "")
echo -e " DIM裁定:RESET $MT_CRITIQUE_VERDICT"
}
# ════════════════════════════════════════════════════════════════
# M4 — 创造模态
# ════════════════════════════════════════════════════════════════
run_m4() {
local step_num="$1"
show_stage "✨" "M4 创造综合" "$step_num" "$MT_TOTAL_STAGES"
local critique_context=""
if [ -n "-" ]; then
critique_context="批判裁定:MT_CRITIQUE_VERDICT
最强辩护:MT_CRITIQUE_STEELMAN"
fi
local prompt
prompt="你是创造性综合专家。在发散和批判的基础上,提出突破性方案。
原始问题:MT_QUERY
发散维度:MT_DIVERGE_SUMMARY
收敛结论:MT_CONVERGE_SYNTHESIS
critique_context
综合以上思维过程,提出 1-2 个创新性最终方案。
以 JSON 格式输出(只输出 JSON):
{
\"final_solutions\": [
{
\"title\": \"方案标题(10字以内)\",
\"core_idea\": \"核心理念(30字以内)\",
\"action_steps\": [
{\"step\": 1, \"action\": \"具体行动(30字以内)\", \"timeline\": \"时间范围\"}
],
\"expected_outcome\": \"预期结果(40字以内)\",
\"confidence\": \"HIGH或MEDIUM或LOW之一\"
}
],
\"meta_insight\": \"这次思考过程带来的元认知洞察(60字以内)\",
\"one_sentence_answer\": \"如果只能给一句话答案(30字以内)\"
}
注意:只输出 JSON,不要有任何前缀、后缀或解释。"
mt_call_llm "m4" "$prompt" "MT_CREATE"
export MT_ONE_SENTENCE
MT_ONE_SENTENCE=$(mt_json_field "MT_CREATE" "one_sentence_answer" "请结合具体情况做决定")
export MT_META_INSIGHT
MT_META_INSIGHT=$(mt_json_field "MT_CREATE" "meta_insight" "")
echo -e " DIM一句话:RESET $MT_ONE_SENTENCE"
}
# ════════════════════════════════════════════════════════════════
# 主分析流程调度
# ════════════════════════════════════════════════════════════════
export MT_PARSE_ERROR=0
run_p0
run_m1
run_m2
case "-full" in
quick)
run_m4 4
;;
deep)
run_m3
# deep 模式:M3 后再跑一次 M2 修订
show_stage "🔄" "M2 修订(深度模式)" 5 "$MT_TOTAL_STAGES"
# 将 M3 的批判纳入 M2 重新收敛
MT_CONVERGE_SYNTHESIS="MT_CONVERGE_SYNTHESIS [经批判修订:MT_CRITIQUE_VERDICT]"
export MT_CONVERGE_SYNTHESIS
run_m4 6
;;
*) # full
run_m3
run_m4 5
;;
esac
FILE:render.sh
#!/usr/bin/env bash
# render.sh — MiroThinker 1.7 输出渲染
# 通过 source 加载,共享所有 MT_* 变量
# bash 3.2 兼容
# ── JSON 解析工具函数 ─────────────────────────────────────────
# 从 MT_DIVERGE 渲染发散维度树
render_diverge_tree() {
python3 -c "
import json, re, os
raw = os.environ.get('MT_DIVERGE', '')
try:
match = re.search(r'\{.*\}', raw, re.DOTALL)
data = json.loads(match.group()) if match else {}
dims = data.get('dimensions', [])
except:
dims = []
if not dims:
print('│ └── (发散数据解析失败,原始内容已记录)')
exit()
total = len(dims)
for i, d in enumerate(dims):
is_last_dim = (i == total - 1)
prefix = '└──' if is_last_dim else '├──'
cont = ' ' if is_last_dim else '│ '
name = d.get('name', '未知维度')
angle = d.get('angle', '')
print(f'│ {prefix} {name}')
if angle:
print(f'│ {cont} \033[2m{angle}\033[0m')
perspectives = d.get('perspectives', [])
wild = d.get('wild_card', '')
for j, p in enumerate(perspectives):
is_last_p = (j == len(perspectives)-1) and not wild
pp = '└──' if is_last_p else '├──'
print(f'│ {cont} {pp} {p}')
if wild:
print(f'│ {cont} └── \033[33m★ 野卡:{wild}\033[0m')
"
}
# 从 MT_CONVERGE 渲染收敛洞察
render_converge_tree() {
python3 -c "
import json, re, os
raw = os.environ.get('MT_CONVERGE', '')
try:
match = re.search(r'\{.*\}', raw, re.DOTALL)
data = json.loads(match.group()) if match else {}
insights = data.get('core_insights', [])
except:
insights = []
if not insights:
synth = os.environ.get('MT_CONVERGE_SYNTHESIS', '收敛分析完成')
print(f'│ └── {synth}')
exit()
colors = {'HIGH': '\033[32m', 'MEDIUM': '\033[33m', 'LOW': '\033[2m'}
reset = '\033[0m'
total = len(insights)
for i, ins in enumerate(insights):
is_last = (i == total - 1)
prefix = '└──' if is_last else '├──'
cont = ' ' if is_last else '│ '
impact = ins.get('impact', 'MEDIUM')
color = colors.get(impact, '')
text = ins.get('insight', '')
rat = ins.get('rationale', '')
print(f'│ {prefix} {color}[{impact}]{reset} {text}')
if rat:
print(f'│ {cont} \033[2m→ {rat}\033[0m')
"
}
# 从 MT_CRITIQUE 渲染批判树
render_critique_tree() {
python3 -c "
import json, re, os
raw = os.environ.get('MT_CRITIQUE', '')
try:
match = re.search(r'\{.*\}', raw, re.DOTALL)
data = json.loads(match.group()) if match else {}
except:
data = {}
verdict = os.environ.get('MT_CRITIQUE_VERDICT', 'NEEDS_REVISION')
steelman = os.environ.get('MT_CRITIQUE_STEELMAN', '')
v_colors = {'SOLID': '\033[32m', 'NEEDS_REVISION': '\033[33m', 'FUNDAMENTALLY_FLAWED': '\033[31m'}
reset = '\033[0m'
yellow = '\033[33m'
red = '\033[31m'
dim = '\033[2m'
# 假设被挑战
assumptions = data.get('assumptions_challenged', [])
if assumptions:
print(f'│ ├── 假设挑战')
for a in assumptions[:3]:
sev = a.get('severity','MEDIUM')
sc = red if sev=='HIGH' else yellow
print(f'│ │ ├── {sc}[{sev}]{reset} {a.get(\"assumption\",\"\")}')
ch = a.get('challenge','')
if ch:
print(f'│ │ │ {dim}└── {ch}{reset}')
# 盲点
blind = data.get('blind_spots', [])
if blind:
print(f'│ ├── 盲点清单')
for b in blind[:3]:
print(f'│ │ ├── {yellow}⚠{reset} {b}')
# 风险
risks = data.get('risks', [])
if risks:
print(f'│ ├── 风险矩阵')
for r in risks[:3]:
prob = r.get('probability','MEDIUM')
impact = r.get('impact','MEDIUM')
mit = r.get('mitigation','')
print(f'│ │ ├── {red}[{prob}×{impact}]{reset} {r.get(\"risk\",\"\")}')
if mit:
print(f'│ │ │ {dim}└── 缓解:{mit}{reset}')
# 裁定
vc = v_colors.get(verdict, yellow)
print(f'│ └── 裁定:{vc}{verdict}{reset}')
if steelman:
print(f'│ {dim}最强辩护:{steelman}{reset}')
"
}
# 从 MT_CREATE 渲染创造方案树
render_create_tree() {
python3 -c "
import json, re, os
raw = os.environ.get('MT_CREATE', '')
try:
match = re.search(r'\{.*\}', raw, re.DOTALL)
data = json.loads(match.group()) if match else {}
solutions = data.get('final_solutions', [])
except:
solutions = []
one_sentence = os.environ.get('MT_ONE_SENTENCE', '')
meta = os.environ.get('MT_META_INSIGHT', '')
green = '\033[32m'
cyan = '\033[36m'
bold = '\033[1m'
dim = '\033[2m'
reset = '\033[0m'
c_colors = {'HIGH': green, 'MEDIUM': '\033[33m', 'LOW': dim}
if not solutions:
if one_sentence:
print(f'│ └── {bold}{one_sentence}{reset}')
exit()
total = len(solutions)
for i, sol in enumerate(solutions):
is_last = (i == total - 1) and not one_sentence
prefix = '└──' if is_last else '├──'
cont = ' ' if is_last else '│ '
conf = sol.get('confidence','MEDIUM')
cc = c_colors.get(conf, '')
title = sol.get('title','方案')
idea = sol.get('core_idea','')
outcome= sol.get('expected_outcome','')
steps = sol.get('action_steps', [])
print(f'│ {prefix} {bold}方案{chr(65+i)}:{title}{reset} {cc}[置信度:{conf}]{reset}')
if idea:
print(f'│ {cont} {dim}核心:{idea}{reset}')
for s in steps[:3]:
act = s.get('action','')
tl = s.get('timeline','')
step_n = s.get('step', '')
print(f'│ {cont} ├── {step_n}. {act} {dim}({tl}){reset}')
if outcome:
print(f'│ {cont} └── {cyan}预期:{outcome}{reset}')
if one_sentence:
print(f'│ └── {bold}一句话答案:\"{one_sentence}\"{reset}')
"
}
# ── 置信度进度条 ──────────────────────────────────────────────
render_confidence_bar() {
python3 -c "
import json, re, os
# 计算各阶段完整度
def bar(pct):
filled = int(pct / 5)
empty = 20 - filled
return '█' * filled + '░' * empty
# M1 维度数
raw1 = os.environ.get('MT_DIVERGE','')
try:
m = re.search(r'\{.*\}', raw1, re.DOTALL)
d1 = json.loads(m.group()) if m else {}
div_count = len(d1.get('dimensions', []))
div_pct = min(100, div_count * 16)
except:
div_pct = 50
# M2 洞察数
raw2 = os.environ.get('MT_CONVERGE','')
try:
m = re.search(r'\{.*\}', raw2, re.DOTALL)
d2 = json.loads(m.group()) if m else {}
ins_count = len(d2.get('core_insights', []))
conv_pct = min(100, ins_count * 33)
except:
conv_pct = 60
# M3
verdict = os.environ.get('MT_CRITIQUE_VERDICT','')
crit_pct = 100 if verdict else 0
# M4
one_s = os.environ.get('MT_ONE_SENTENCE','')
crea_pct = 90 if one_s else 0
print(f' 发散完整度 {bar(div_pct)} {div_pct}%')
print(f' 收敛准确度 {bar(conv_pct)} {conv_pct}%')
if verdict:
print(f' 批判严格度 {bar(crit_pct)} {crit_pct}%')
print(f' 方案可行度 {bar(crea_pct)} {crea_pct}%')
"
}
# ── 生成 Markdown 报告 ────────────────────────────────────────
generate_markdown() {
local report_file="/tmp/mirothinker_MT_TIMESTAMP.md"
python3 -c "
import json, re, os
from datetime import datetime
query = os.environ.get('MT_QUERY','')
topic = os.environ.get('MT_TOPIC','')
mt_type = os.environ.get('MT_TYPE','')
domain = os.environ.get('MT_DOMAIN','')
complexity = os.environ.get('MT_COMPLEXITY','')
mode = os.environ.get('MT_MODE','full')
elapsed = os.environ.get('MT_ELAPSED','0')
timestamp = os.environ.get('MT_TIMESTAMP','')
entities = os.environ.get('MT_ENTITIES','[]')
constraints = os.environ.get('MT_CONSTRAINTS','[]')
metrics = os.environ.get('MT_SUCCESS_METRICS','[]')
div_summary = os.environ.get('MT_DIVERGE_SUMMARY','')
conv_synth = os.environ.get('MT_CONVERGE_SYNTHESIS','')
priority = os.environ.get('MT_PRIORITY_MATRIX','{}')
critique_raw = os.environ.get('MT_CRITIQUE','{}')
verdict = os.environ.get('MT_CRITIQUE_VERDICT','')
steelman = os.environ.get('MT_CRITIQUE_STEELMAN','')
create_raw = os.environ.get('MT_CREATE','{}')
one_sentence = os.environ.get('MT_ONE_SENTENCE','')
meta_insight = os.environ.get('MT_META_INSIGHT','')
diverge_raw = os.environ.get('MT_DIVERGE','{}')
converge_raw = os.environ.get('MT_CONVERGE','{}')
def parse_json(raw):
try:
m = re.search(r'\{.*\}', raw, re.DOTALL)
return json.loads(m.group()) if m else {}
except:
return {}
div_data = parse_json(diverge_raw)
conv_data = parse_json(converge_raw)
crit_data = parse_json(critique_raw)
crea_data = parse_json(create_raw)
prio_data = parse_json(priority)
try:
ents = json.loads(entities)
cons = json.loads(constraints)
mets = json.loads(metrics)
except:
ents = cons = mets = []
lines = []
lines.append(f'# MiroThinker 1.7 深度思考报告')
lines.append(f'')
lines.append(f'**问题:** {query}')
lines.append(f'**分析时间:** {timestamp} **耗时:** {elapsed}s **模式:** {mode}')
lines.append(f'**类型:** {mt_type} **复杂度:** {complexity} **领域:** {domain}')
lines.append(f'')
lines.append(f'---')
lines.append(f'')
lines.append(f'## P0 问题解构')
lines.append(f'')
lines.append(f'- **核心主题:** {topic}')
if ents:
lines.append(f'- **关键实体:** {\"、\".join(ents)}')
if cons:
lines.append(f'- **约束条件:** {\"、\".join(cons)}')
if mets:
lines.append(f'- **成功指标:** {\"、\".join(mets)}')
lines.append(f'')
lines.append(f'---')
lines.append(f'')
lines.append(f'## M1 发散模态')
lines.append(f'')
for d in div_data.get('dimensions', []):
lines.append(f'### {d.get(\"name\",\"\")}')
lines.append(f'> {d.get(\"angle\",\"\")}')
lines.append(f'')
for p in d.get('perspectives', []):
lines.append(f'- {p}')
wc = d.get('wild_card','')
if wc:
lines.append(f'- ★ **野卡:** {wc}')
lines.append(f'')
lines.append(f'---')
lines.append(f'')
lines.append(f'## M2 收敛模态')
lines.append(f'')
lines.append(f'### 核心洞察')
for ins in conv_data.get('core_insights', []):
lines.append(f'- **[{ins.get(\"impact\",\"\")}]** {ins.get(\"insight\",\"\")} — {ins.get(\"rationale\",\"\")}')
lines.append(f'')
if prio_data:
lines.append(f'### 优先级矩阵')
lines.append(f'')
lines.append(f'| 立即行动 | 计划推进 | 观察等待 | 可以忽略 |')
lines.append(f'|----------|----------|----------|----------|')
do_f = '<br>'.join(prio_data.get('do_first',[]))
plan = '<br>'.join(prio_data.get('plan_next',[]))
watch = '<br>'.join(prio_data.get('watch_later',[]))
drop = '<br>'.join(prio_data.get('drop',[]))
lines.append(f'| {do_f} | {plan} | {watch} | {drop} |')
lines.append(f'')
lines.append(f'**综合结论:** {conv_synth}')
lines.append(f'')
lines.append(f'---')
lines.append(f'')
if verdict:
lines.append(f'## M3 批判模态')
lines.append(f'')
for a in crit_data.get('assumptions_challenged', []):
lines.append(f'- **[{a.get(\"severity\",\"\")}]** 假设:{a.get(\"assumption\",\"\")} → {a.get(\"challenge\",\"\")}')
lines.append(f'')
bls = crit_data.get('blind_spots', [])
if bls:
lines.append(f'**盲点:** {\"、\".join(bls)}')
lines.append(f'')
risks = crit_data.get('risks', [])
if risks:
lines.append(f'| 风险 | 概率 | 影响 | 缓解策略 |')
lines.append(f'|------|------|------|----------|')
for r in risks:
lines.append(f'| {r.get(\"risk\",\"\")} | {r.get(\"probability\",\"\")} | {r.get(\"impact\",\"\")} | {r.get(\"mitigation\",\"\")} |')
lines.append(f'')
lines.append(f'**裁定:** {verdict}')
if steelman:
lines.append(f'**最强辩护:** {steelman}')
lines.append(f'')
lines.append(f'---')
lines.append(f'')
lines.append(f'## M4 创造模态')
lines.append(f'')
for i, sol in enumerate(crea_data.get('final_solutions', [])):
lines.append(f'### 方案 {chr(65+i)}:{sol.get(\"title\",\"\")}')
lines.append(f'')
lines.append(f'**核心理念:** {sol.get(\"core_idea\",\"\")}')
lines.append(f'')
for s in sol.get('action_steps', []):
lines.append(f'{s.get(\"step\",\"\")}. {s.get(\"action\",\"\")}({s.get(\"timeline\",\"\")})')
lines.append(f'')
lines.append(f'**预期结果:** {sol.get(\"expected_outcome\",\"\")}')
lines.append(f'')
lines.append(f'---')
lines.append(f'')
lines.append(f'## 一句话答案')
lines.append(f'')
lines.append(f'> {one_sentence}')
lines.append(f'')
if meta_insight:
lines.append(f'## 元认知洞察')
lines.append(f'')
lines.append(f'{meta_insight}')
lines.append(f'')
lines.append(f'---')
lines.append(f'*由 MiroThinker 1.7 生成 | openclaw skill*')
print('\n'.join(lines))
" > "$report_file"
echo "$report_file"
}
# ════════════════════════════════════════════════════════════════
# 主渲染逻辑
# ════════════════════════════════════════════════════════════════
echo ""
echo -e "BOLD╔══════════════════════════════════════════════════════════════╗RESET"
echo -e "BOLD║ 思维导图RESET"
echo -e "BOLD╚══════════════════════════════════════════════════════════════╝RESET"
echo ""
# 问题根节点
echo -e "BOLDMT_QUERYRESET"
# M1 发散分支
echo -e "├── CYAN[M1 发散]RESET 可能性空间"
render_diverge_tree
# M2 收敛分支
echo -e "├── GREEN[M2 收敛]RESET 核心洞察"
render_converge_tree
# M3 批判分支(quick 模式跳过)
if [ "-full" != "quick" ] && [ -n "-" ]; then
echo -e "├── YELLOW[M3 批判]RESET 风险审查"
render_critique_tree
fi
# M4 创造分支
echo -e "└── BOLD[M4 创造]RESET 综合方案"
render_create_tree
# 置信度仪表
echo ""
echo -e "BOLD◆ 置信度分布RESET"
render_confidence_bar
# 元认知洞察
if [ -n "-" ]; then
echo ""
echo -e "BOLD◆ 元认知洞察RESET"
echo -e " DIM\"MT_META_INSIGHT\"RESET"
fi
# 统计信息
echo ""
echo -e "DIM─────────────────────────────────────────────────────────────RESET"
echo -e "DIM耗时:-0s 模式:-full 复杂度:-MEDIUMRESET"
# 保存报告
if [ "-0" = "1" ]; then
report_file=$(generate_markdown)
echo -e "GREEN✓ 报告已保存:report_fileRESET"
fi
echo ""
FILE:think.sh
#!/usr/bin/env bash
# think.sh — MiroThinker 1.7 入口脚本
# bash 3.2 兼容
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
# ── 颜色定义 ─────────────────────────────────────────────────
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
CYAN='\033[0;36m'
BOLD='\033[1m'
DIM='\033[2m'
RESET='\033[0m'
# ── 依赖检查 ─────────────────────────────────────────────────
check_deps() {
if ! command -v openclaw >/dev/null 2>&1; then
echo -e "RED错误:未找到 openclaw 命令RESET"
echo "请先安装 openclaw CLI: https://openclaw.ai"
exit 1
fi
if ! command -v python3 >/dev/null 2>&1; then
echo -e "RED错误:需要 python3RESET"
exit 1
fi
}
# ── 使用说明 ─────────────────────────────────────────────────
show_usage() {
cat <<'EOF'
用法:
openclaw run mirothinker1.7 "你的问题"
openclaw run mirothinker1.7 --mode=quick "快速分析问题"
openclaw run mirothinker1.7 --mode=deep "深度分析重大决策"
openclaw run mirothinker1.7 --save "分析问题并保存报告"
openclaw run mirothinker1.7 --file=problem.txt
选项:
--mode=quick 快速模式,跳过 M3 批判(约 1-2 分钟)
--mode=full 标准模式,默认(约 2-4 分钟)
--mode=deep 深度模式,M2 迭代修订(约 4-6 分钟)
--save 保存 Markdown 报告到 /tmp/
--file=FILE 从文件读取问题
EOF
}
# ── 参数解析 ─────────────────────────────────────────────────
parse_args() {
export MT_MODE="full"
export MT_SAVE_REPORT=0
export MT_QUERY=""
export MT_TIMESTAMP
MT_TIMESTAMP=$(date +"%Y%m%d_%H%M%S")
export MT_REPORT_PATH="/tmp"
while [ $# -gt 0 ]; do
case "$1" in
--mode=*)
MT_MODE="1#--mode="
;;
--file=*)
local f="1#--file="
if [ -f "$f" ]; then
MT_QUERY=$(cat "$f")
else
echo -e "RED错误:文件不存在: $fRESET"
exit 1
fi
;;
--save)
MT_SAVE_REPORT=1
;;
--help|-h)
show_usage
exit 0
;;
-*)
echo -e "YELLOW未知参数: $1,已忽略RESET"
;;
*)
if [ -z "$MT_QUERY" ]; then
MT_QUERY="$1"
fi
;;
esac
shift
done
# 从 stdin 读取(管道输入)
if [ -z "$MT_QUERY" ] && [ ! -t 0 ]; then
MT_QUERY=$(cat)
fi
if [ -z "$MT_QUERY" ]; then
show_usage
exit 1
fi
export MT_QUERY MT_MODE MT_SAVE_REPORT MT_REPORT_PATH MT_TIMESTAMP
}
# ── 进度提示 ─────────────────────────────────────────────────
show_stage() {
local icon="$1"
local name="$2"
local current="$3"
local total="$4"
printf "\nCYAN[%d/%d]RESET %s BOLD%sRESET...\n" \
"$current" "$total" "$icon" "$name"
}
# ── 主流程 ───────────────────────────────────────────────────
main() {
check_deps
parse_args "$@"
local start_time=$SECONDS
# 确定执行阶段数
case "$MT_MODE" in
quick) export MT_TOTAL_STAGES=4 ;;
deep) export MT_TOTAL_STAGES=6 ;;
*) export MT_TOTAL_STAGES=5 ;;
esac
echo ""
echo -e "BOLD╔══════════════════════════════════════════════════════════════╗RESET"
echo -e "BOLD║ MIROTHINKER 1.7 • 深度思考引擎RESET"
printf "BOLD║ 问题:%-54s║RESET\n" "$(echo "$MT_QUERY" | head -c 50)"
echo -e "BOLD║ 模式:%-10sRESET" "$MT_MODE"
echo -e "BOLD╚══════════════════════════════════════════════════════════════╝RESET"
# source 共享变量空间
# shellcheck source=analyze.sh
source "$SCRIPT_DIR/analyze.sh"
export MT_ELAPSED=$((SECONDS - start_time))
# shellcheck source=render.sh
source "$SCRIPT_DIR/render.sh"
}
main "$@"
Reddit & X/Twitter auto-reply bot for ecommerce/SaaS growth. Finds relevant posts about AI customer service, Amazon FBA, Shopify — posts genuine AI-generated...
---
name: social-reply-bot
description: "Reddit & X/Twitter auto-reply bot for ecommerce/SaaS growth. Finds relevant posts about AI customer service, Amazon FBA, Shopify — posts genuine AI-generated replies mentioning your product. Includes Reddit account warmup (karma building) and lead tracking. Triggers: social reply bot, reddit auto reply, twitter auto reply, x auto reply, social media bot, amazon seller engagement, ecommerce social engagement, reddit warmup, karma building, warmup reddit, social leads, potential customers"
allowed-tools: Bash
metadata:
openclaw:
homepage: https://github.com/mguozhen/social-bot
---
# Social Reply Bot
Automatically finds and replies to relevant Reddit and X/Twitter posts about ecommerce, Amazon FBA, and AI customer service. Also builds Reddit account karma and tracks potential customer leads.
## Commands
```
social reply bot # run both platforms
social reply bot x only # X/Twitter only
social reply bot reddit only # Reddit only
social reply bot warmup # build Reddit karma (8 comments)
social reply bot warmup 15 # warmup with custom target
social reply bot leads # show potential customers found
social reply bot stats # today's stats
social reply bot dashboard # open web dashboard
```
## Setup
```bash
curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
```
## Requirements
- `browse` CLI: `npm install -g @browserbasehq/browse-cli`
- Log in to Reddit and X in the browse-controlled Chrome window
- `ANTHROPIC_API_KEY` in `.env`
## Features
### Daily Reply Bot
- Searches subreddits and X for posts matching your product keywords
- Claude generates genuine, on-topic replies (not spam)
- Browser automation — no Reddit/X API key needed
- SQLite deduplication — never replies to the same post twice
### Reddit Warmup (karma building)
- Visits low-moderation subreddits (r/karma, r/CasualConversation, r/self)
- Claude Haiku generates authentic short comments (no product mentions)
- Natural delays between posts (90–180s)
- Builds Comment Karma to unlock restricted subreddits
### Lead Tracking
- Every replied post analyzed by Claude for customer potential
- Scored 1–10 with urgency level
- Extracts business type and pain points
- View with: `social reply bot leads`
## Configuration
Edit `~/social-bot/config.json` to set your subreddits, X search queries, product descriptions, and daily targets.
FILE:README.md
<p align="center">
<img src="https://cdn-icons-png.flaticon.com/512/4712/4712027.png" width="80" alt="Social Bot">
</p>
<h1 align="center">Social Reply Bot</h1>
<p align="center">
<strong>AI-powered Reddit & X auto-reply bot that finds your customers and joins the conversation.</strong>
</p>
<p align="center">
<a href="#quick-start"><img src="https://img.shields.io/badge/setup-5min-brightgreen?style=flat-square" alt="5min Setup"></a>
<a href="https://agentskills.io"><img src="https://img.shields.io/badge/Agent%20Skills-compatible-blue?style=flat-square" alt="Agent Skills"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-MIT-green?style=flat-square" alt="MIT License"></a>
<img src="https://img.shields.io/badge/python-3.9+-3776AB?style=flat-square&logo=python&logoColor=white" alt="Python 3.9+">
<img src="https://img.shields.io/badge/platforms-Reddit%20%2B%20X-FF4500?style=flat-square" alt="Reddit + X">
</p>
<p align="center">
<a href="#quick-start">Quick Start</a> •
<a href="#how-it-works">How It Works</a> •
<a href="#features">Features</a> •
<a href="#screenshots">Screenshots</a> •
<a href="#configuration">Configuration</a> •
<a href="#architecture">Architecture</a>
</p>
---
## What is this?
**Social Reply Bot** searches Reddit and X/Twitter for posts relevant to your product, then uses Claude AI to generate genuine, helpful replies that naturally mention your brand. No API keys for Reddit or X required — it uses browser automation to act like a real user.
**Not spam.** The AI evaluates every post for relevance first. If a post isn't a good fit, it skips it. When it does reply, it sounds like an experienced practitioner sharing real insight — because that's what the prompt engineers it to be.
Currently configured for two products:
- **[Solvea](https://solvea.cx)** — AI customer service agent for Amazon/Shopify sellers
- **[VOC.ai](https://voc.ai)** — Amazon review intelligence and sentiment analysis
But it's fully configurable for any product/niche via `config.json`.
## Features
| Feature | Description |
|---------|-------------|
| **Dual Platform** | Reddit (subreddit search) + X/Twitter (keyword search) |
| **AI-Generated Replies** | Claude crafts genuine, on-topic responses — not templates |
| **Smart Filtering** | Two-layer filter: keyword match first, then AI relevance check |
| **Browser Automation** | No platform API keys needed — uses real browser sessions |
| **Reddit Warmup** | Karma-building mode for new accounts (safe subreddits, no product mentions) |
| **Lead Tracking** | Every replied post scored 1-10 for customer potential |
| **Web Dashboard** | Real-time Flask dashboard showing daily progress |
| **Deduplication** | SQLite-backed — never replies to the same post twice |
| **Rate Limiting** | Configurable delays (Reddit: 10min, X: 5min between replies) |
| **Daily Scheduling** | macOS LaunchAgent runs at 10:05 AM automatically |
## How It Works
```
Keyword Search (Reddit subreddits + X queries)
↓
Two-Layer Relevance Filter
├─ Layer 1: Keyword match (zero API cost)
└─ Layer 2: Claude AI judges if post is worth replying to
↓
Claude generates reply as "experienced seller sharing real insight"
↓
Browser automation posts the reply
↓
SQLite logs it → Dashboard displays it → Lead scored
```
**The core principle:** Every reply must provide genuine value. The AI is prompted as a "seller with 5 years of experience" who naturally mentions tools they use — not as a marketer pushing a product. Posts that don't fit get skipped, not force-fitted.
## Screenshots
### Dashboard — Real-time Daily Progress
<p align="center">
<img src="docs/screenshot_dashboard.png" width="700" alt="Dashboard showing daily reply targets and progress">
</p>
The web dashboard tracks daily X posts, Reddit comments, product mentions, and historical totals.
### X/Twitter — Targeted Competitive Replies
<p align="center">
<img src="docs/screenshot_x_reply.png" width="700" alt="AI-generated reply on X to a competitor's post">
</p>
The bot found a competitor's tweet about customer support automation and replied with a genuine insight that naturally positioned Solvea as an alternative. No "Great product!" fluff — real technical substance.
### Reddit — High-Quality Community Comments
<p align="center">
<img src="docs/screenshot_reddit_reply.png" width="700" alt="AI-generated Reddit comment about WooCommerce chatbot issues">
</p>
On r/ecommerce, a user discussed WooCommerce chatbot inventory sync issues. The bot shared specific technical experience (event-triggered vs. scheduled sync), earning genuine upvotes and discussion.
## Quick Start
### Option 1: One-Line Install
```bash
curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
```
This will:
1. Clone the repo to `~/social-bot`
2. Install Python dependencies (`anthropic`, `flask`)
3. Check for `browse` CLI
4. Guide you through `.env` setup
5. Initialize SQLite database
6. Register macOS LaunchAgent (daily 10:05 AM)
### Option 2: Manual Setup
```bash
git clone https://github.com/mguozhen/social-bot.git ~/social-bot
cd ~/social-bot
pip3 install -r requirements.txt
cp .env.template .env
# Edit .env with your ANTHROPIC_API_KEY
```
### Prerequisites
| Requirement | How to Get It |
|------------|---------------|
| Python 3.9+ | Pre-installed on macOS |
| `browse` CLI | `npm install -g @anthropic-ai/browse-cli` |
| Anthropic API Key | [console.anthropic.com](https://console.anthropic.com) |
| Reddit account | Log in once in the browse-controlled Chrome |
| X/Twitter account | Log in once in the browse-controlled Chrome |
> **No Reddit API key or Twitter API key required.** The bot uses browser sessions, not platform APIs.
## Commands
### As Claude Code Skill
Once installed as a skill, just tell Claude:
```
social reply bot # run both platforms
social reply bot x only # X/Twitter only
social reply bot reddit only # Reddit only
social reply bot warmup # build Reddit karma (8 comments)
social reply bot warmup 15 # warmup with custom target
social reply bot leads # show potential customers found
social reply bot stats # today's stats
social reply bot dashboard # open web dashboard
```
### As Standalone Script
```bash
python3 run_daily.py # run both platforms
python3 run_daily.py --x-only # X only
python3 run_daily.py --reddit-only # Reddit only
python3 warmup_reddit.py # karma building mode
```
## Configuration
Edit `config.json` to customize for your product:
```json
{
"x": {
"username": "@YourAccount",
"daily_target": 20,
"min_delay_seconds": 300,
"search_queries": [
"your product keyword 1",
"your product keyword 2"
]
},
"reddit": {
"username": "your_reddit_user",
"daily_target": 10,
"min_delay_seconds": 600,
"subreddits": ["YourTargetSubreddit", "AnotherOne"]
},
"products": {
"YourProduct": {
"description": "What your product does (AI uses this to craft mentions)",
"trigger_keywords": ["keyword1", "keyword2"]
}
},
"reply_style": {
"tone": "knowledgeable practitioner sharing experience",
"max_length_x": 260,
"max_length_reddit": 400,
"rules": [
"Lead with genuine insight about the post",
"Mention product as 'what we use/built' — not as an ad",
"Skip if not relevant — never force a mention"
]
}
}
```
### Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `ANTHROPIC_API_KEY` | Yes | Claude API key for reply generation |
| `BROWSERBASE_API_KEY` | No | Optional: persistent browser sessions across reboots |
| `BROWSERBASE_PROJECT_ID` | No | Optional: pairs with Browserbase key |
## Architecture
```
social-bot/
├── bot/
│ ├── ai_engine.py # Claude AI: relevance filter + reply generation
│ ├── browser.py # Browser automation via browse CLI
│ ├── db.py # SQLite: dedup, history, lead scoring
│ ├── reddit_bot.py # Reddit: search subreddits, post comments
│ └── x_bot.py # X/Twitter: search queries, post replies
├── dashboard/
│ ├── app.py # Flask web dashboard
│ └── templates/ # Dashboard HTML
├── docs/
│ ├── screenshot_dashboard.png
│ ├── screenshot_reddit_reply.png
│ └── screenshot_x_reply.png
├── launchd/ # macOS LaunchAgent plist
├── config.json # Platform targets, keywords, products
├── run_daily.py # Main entry point (scheduled daily)
├── warmup_reddit.py # Reddit karma builder
├── install.sh # One-line installer
├── setup.sh # Manual setup helper
├── SKILL.md # Claude Code / Agent Skills definition
└── .env.template # Credential template
```
### Two-Layer Filtering
```python
# Layer 1: Fast keyword match (zero API cost)
def detect_product(text: str) -> Optional[str]:
# Returns matching product name or None
# Layer 2: Claude AI relevance check
# If Layer 1 matches, Claude decides:
# - Is this post genuinely relevant?
# - Can we add real value with a reply?
# - SKIP if forcing a mention would feel unnatural
```
### Rate Limiting & Safety
| Platform | Delay Between Replies | Daily Cap | Deduplication |
|----------|----------------------|-----------|---------------|
| Reddit | 10 minutes | 10 posts | SQLite (URL-based) |
| X/Twitter | 5 minutes | 20 posts | SQLite (URL-based) |
## Reddit Warmup
New Reddit accounts need karma before posting in restricted subreddits. The warmup mode:
- Visits safe, low-moderation subreddits (r/karma, r/CasualConversation, r/self)
- Claude Haiku generates authentic short comments (no product mentions)
- 90-180 second natural delays between posts
- Default: 8 comments per session, configurable
```bash
python3 warmup_reddit.py # default 8 comments
python3 warmup_reddit.py --target 15 # custom target
```
## Expected Results
With default settings (X: 20/day, Reddit: 10/day):
| Metric | Monthly |
|--------|---------|
| Posts covered | 600+ targeted posts |
| Users reached | 600+ people actively discussing your niche |
| Click-through | ~10-15% visit your profile/link |
| Cost | ~$0.30/day (Claude API at ~$0.01/reply) |
This isn't viral growth — it's **consistent, targeted brand presence** in conversations where your product is genuinely relevant.
## Known Limitations
| Issue | Cause | Workaround |
|-------|-------|------------|
| Reddit comments auto-removed | Account karma < 10 or age < 10 days | Run `warmup` mode for 1-2 weeks first |
| Some X replies fail | Page structure changes or rate limits | Auto-retry with backoff, skips on persistent failure |
| Low match rate on some days | Today's posts don't match your keywords | Add more subreddits and search queries |
## FAQ
<details>
<summary><strong>Is this against Reddit/X terms of service?</strong></summary>
This tool uses browser automation (not API abuse) and generates unique, contextually relevant responses (not spam). However, automated posting always carries platform risk. Use responsibly, keep daily volumes reasonable, and ensure your replies genuinely add value to conversations.
</details>
<details>
<summary><strong>How do I add my own product?</strong></summary>
Edit the `products` section in `config.json`. Add your product name, description (used by Claude to craft natural mentions), and trigger keywords. Then update `search_queries` and `subreddits` to target where your audience hangs out.
</details>
<details>
<summary><strong>Can I use this without Claude Code?</strong></summary>
Yes. `run_daily.py` is a standalone script. You just need Python 3.9+, the `browse` CLI, and an Anthropic API key. The Claude Code skill integration is optional.
</details>
<details>
<summary><strong>How much does it cost to run?</strong></summary>
About $0.01 per reply (Claude API). At 30 replies/day, that's roughly $9/month. Browser automation is free — no Reddit or X API fees.
</details>
## Contributing
PRs welcome. Please test your changes with both platforms before submitting.
## License
[MIT](LICENSE)
---
<p align="center">
Built with <a href="https://claude.ai">Claude AI</a> • Browser automation via <a href="https://www.npmjs.com/package/@anthropic-ai/browse-cli">browse CLI</a>
</p>
FILE:article_social_bot.md
# 我用 AI 让账号每天自动在 Reddit 和 X 上发 30 条高质量回复,完全不用人工干预
> 作为跨境电商卖家/工具创业者,我们都知道社媒曝光有多重要——但每天手动刷帖子回复,成本太高。这篇文章讲我怎么用 Claude + 浏览器自动化,搭了一套 Social Reply Bot,现在每天自动在 X 和 Reddit 帮 Solvea 和 VOC.ai 做精准曝光。
---
## 🤔 我在解决什么问题
做 AI 工具的,最难的不是产品,是**让目标用户知道你存在**。
我们的两个产品:
- **Solvea** — Amazon/Shopify 卖家的 AI 客服 Agent,自动回答买家问题
- **VOC.ai** — Amazon 差评分析工具,把 1-star 变成优化 listing 的情报
用户在哪?就在 Reddit 的 r/FulfillmentByAmazon、r/ecommerce,在 X 上搜 "AI customer service ecommerce"。
他们每天都在发帖问问题:
- "我的客服自动化怎么配置?"
- "怎么分析哪些差评影响 BSR 最大?"
- "有没有 AI 工具能处理买家消息?"
**这些帖子,每一条都是精准获客机会。** 但手动回复太累,外包给人又很难保证质量。
---
## ⚙️ 系统架构:三层设计
```
搜索引擎(关键词匹配)
↓
浏览器自动化(browse CLI)
↓
Claude AI(生成真实、有价值的回复)
↓
发帖 → 记录 → Dashboard 监控
```
**核心理念:不是垃圾广告,而是真实的行业洞见**
AI 会先判断这个帖子是否和我们的产品相关。如果相关,它会以"有 5 年经验的亚马逊卖家"身份,分享真实经验,自然带出产品。如果不相关,直接 skip,绝不强行植入。
---
## 📸 实际运行效果
### 1. Dashboard — 实时监控每日进度

系统全自动运行,Dashboard 显示:
- 今日 X 发帖:1条(目标 20条)
- 今日 Reddit 评论:1条(目标 10条)
- Solvea 被提及:2次
- 历史总回复:2条(刚启动第一天)
---
### 2. X (Twitter) — 精准找到目标帖子,秒速回复
系统搜索关键词 `"ecommerce customer support automation"`,找到竞品 WukongChat 的推文:
> **@wukongchat** 发帖:Customer support made simple. WukongChat AI automates replies, categorizes inquiries...
**AI 生成的回复(以 @VocAiSage 身份发出):**

> "Centralizing convos is step 1 — but the real win is automating *resolution*, not just visibility. We use Solvea to actually *answer* buyers using order data, not just route tickets. Dashboard unification means nothing if agents still do manual lookups."
**分析这条回复为什么有效:**
- ✅ 不是广告语气,是行业从业者的真实观点
- ✅ 指出了对方产品的局限(路由 ≠ 解决)
- ✅ 自然带出 Solvea,以"我们用的工具"的方式
- ✅ 260 字以内,符合 X 的最佳互动长度
---
### 3. Reddit — 深度评论,带来真实讨论
r/ecommerce 上有人发帖讨论 WooCommerce chatbot 的库存同步坑:
**AI 生成并自动发出的评论(账号:mguozhen):**

> "That last point hits hard — customers absolutely hold the *store* accountable for what the bot says. No separation in their mind.
>
> We burned inventory sync trust early on too. Daily sync sounds safe until you're running flash sales or dealing with supplier delays. The only real fix was event-triggered sync, not scheduled. Bad review from a bot mistake is genuinely worse than a human CS mistake because it feels systemic to the customer."
**为什么这条评论质量高:**
- ✅ 先共鸣,再输出洞见("That last point hits hard")
- ✅ 分享了具体的技术方案(event-triggered vs. scheduled sync)
- ✅ 提供了额外价值(bot 失误比人工失误评论影响更大)
- ✅ 获得了其他用户的正向互动
---
## 🔧 技术实现细节
### 关键词匹配 → AI 过滤的双重机制
```python
# 第一层:关键词快速过滤(不消耗 API)
def detect_product(text: str) -> Optional[str]:
keywords = {
"Solvea": ["customer service", "ai agent", "chatbot", "support automation"],
"VOC.ai": ["amazon review", "vine review", "review analysis", "1-star"]
}
# 匹配得分最高的产品,无匹配返回 None
# 第二层:Claude 判断是否值得回复
# 提示词让 Claude 以"有 5 年经验的 Amazon 卖家"身份
# 不相关时返回 SKIP,不强行植入
```
### 浏览器自动化:不需要任何平台 API
用 `browse` CLI 控制本地 Chrome:
- Reddit:账号用 Google OAuth 登录,直接操作 old.reddit.com
- X:账号 @VocAiSage 已登录,search → 找相关推文 → 发回复
**完全不需要 Reddit API Key 或 Twitter API Key**,用的是浏览器会话,成本为零。
### 防重复 + 速率控制
```python
# 每条帖子只回复一次(无论成功失败)
def already_replied(post_url: str) -> bool:
# 查 SQLite,posted 和 failed 都不重试
# Reddit 每条间隔 10 分钟
# X 每条间隔 5 分钟
# 每天 X 最多 20 条,Reddit 最多 10 条
```
---
## 📊 已知局限和解决方案
| 问题 | 原因 | 解决方案 |
|------|------|----------|
| Reddit 新账号评论被自动删除 | 账号 karma < 10 或注册 < 10 天 | 先手动养号 2 周,刷满基础 karma |
| X 某些帖子回复失败 | 页面结构变化/rate limit | 已加重试逻辑,失败后跳过不死循环 |
| FBA 子版块匹配率低 | 今天的帖子话题不对口 | 多配几个 subreddit,r/ecommerce 效果更好 |
---
## 🚀 如何自己搭一套
### 方案一:一键安装到新 Mac
```bash
curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
```
安装脚本会自动完成:
1. Clone 代码
2. 安装 Python 依赖(anthropic, flask)
3. 检查 browse CLI
4. 引导填写 `.env`(只需要 ANTHROPIC_API_KEY)
5. 初始化 SQLite 数据库
6. 注册 macOS LaunchAgent(每天 10:05 自动运行)
### 方案二:通过 openclaw 一行调用
如果你已经安装了 openclaw:
```bash
clawhub install social-reply-bot
```
然后直接说:
- `"social reply bot"` — 运行两个平台
- `"social reply bot x only"` — 只跑 X
- `"social reply bot stats"` — 查看今日数据
- `"social reply bot dashboard"` — 打开可视化面板
### 配置文件说明
编辑 `~/social-bot/config.json`,自定义你的场景:
```json
{
"x": {
"daily_target": 20,
"search_queries": [
"你的关键词1",
"你的关键词2"
]
},
"reddit": {
"daily_target": 10,
"subreddits": ["你的目标社区"]
},
"products": {
"你的产品名": {
"description": "产品描述,AI 会用这个决定怎么提及",
"trigger_keywords": ["相关关键词"]
}
}
}
```
---
## 💡 适合哪些场景
✅ **跨境电商工具/SaaS** — 目标用户在 Reddit/X 上非常活跃,且乐于分享经验
✅ **B2B 产品冷启动** — 社区回复是最自然的 PLG 方式,比广告信任度高 10 倍
✅ **个人品牌建设** — 以专家身份持续输出,积累行业影响力
✅ **竞品监控+截流** — 搜竞品关键词,在竞品帖子下自然推荐自己
❌ **不适合**:纯 C 端消费品(用户不在专业论坛)、需要图片/视频的场景
---
## 📈 预期效果
按目前配置(X 20条/天,Reddit 10条/天):
- 每月覆盖 **600+ 条精准帖子**
- 触达 **600+ 位正在讨论相关问题的用户**
- 其中约 10-15% 会点击主页/链接了解更多
这不是一夜暴富的流量,而是**持续的、精准的品牌曝光**,在目标用户最需要你的时刻出现。
---
## 🔗 资源链接
- GitHub 仓库:https://github.com/mguozhen/social-bot
- clawhub skill:`clawhub install social-reply-bot`
- 所需费用:约 $0.01/条回复(Claude API),其余零成本
---
*用 Claude Code + browse CLI 构建,完整代码开源。有问题欢迎在评论区讨论。*
FILE:bot/__init__.py
FILE:bot/ai_engine.py
import os
import anthropic
import json
from pathlib import Path
from typing import Optional, Tuple
CONFIG = json.loads((Path(__file__).parent.parent / "config.json").read_text())
def detect_product(text: str) -> Optional[str]:
"""Return 'Solvea', 'VOC.ai', or None based on keyword match."""
text_lower = text.lower()
scores = {}
for product, data in CONFIG["products"].items():
score = sum(1 for kw in data["trigger_keywords"] if kw in text_lower)
if score > 0:
scores[product] = score
if not scores:
return None
return max(scores, key=scores.get)
def generate_reply(post_title: str, post_content: str, platform: str) -> Tuple[Optional[str], Optional[str]]:
"""
Returns (reply_text, product_mentioned) or (None, None) if should skip.
"""
product = detect_product(f"{post_title} {post_content}")
if not product:
return None, None
product_desc = CONFIG["products"][product]["description"]
style_rules = "\n".join(f"- {r}" for r in CONFIG["reply_style"]["rules"])
max_len = (CONFIG["reply_style"]["max_length_x"] if platform == "x"
else CONFIG["reply_style"]["max_length_reddit"])
system_prompt = f"""You are a hands-on Amazon/Shopify seller and builder who has been selling for 5+ years.
You reply to social media posts with genuine insights from your own experience.
You sometimes mention {product} ({product_desc}) as a tool you personally use — but only when it's directly relevant.
Reply style rules:
{style_rules}
Max length: {max_len} characters for {platform}. Be concise."""
user_prompt = f"""Post title: {post_title}
Post content:
{post_content[:800]}
Write a reply that adds real value. Mention {product} only if it fits naturally.
If it doesn't fit, reply with just: SKIP
Output only the reply text, nothing else."""
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
message = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=400,
messages=[{"role": "user", "content": user_prompt}],
system=system_prompt,
)
reply = message.content[0].text.strip()
if reply.upper().startswith("SKIP") or len(reply) < 20:
return None, None
# Trim to platform max
if len(reply) > max_len + 50:
reply = reply[:max_len].rsplit(" ", 1)[0] + "..."
return reply, product
def analyze_lead(post_title: str, post_content: str, post_url: str, platform: str) -> Optional[dict]:
"""
判断发帖人是否是 Solvea 的潜在客户,并提取关键信息。
返回 dict 或 None(不是潜在客户)。
"""
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
user_prompt = f"""Analyze this social media post and determine if the author is a potential customer for Solvea.
Solvea is an AI customer support agent for Shopify/ecommerce stores that:
- Autonomously handles support tickets (tracking, returns, product questions)
- Integrates directly with Shopify to take actions (process returns, update shipping)
- Provides a unified inbox for human handoff
Post URL: {post_url}
Platform: {platform}
Title: {post_title}
Content: {post_content[:600]}
Respond in JSON only:
{{
"is_lead": true/false,
"lead_score": 1-10,
"pain_points": ["list of pain points mentioned"],
"business_type": "shopify store / amazon seller / saas / other / unknown",
"urgency": "high / medium / low",
"reason": "one sentence why they are or aren't a lead"
}}
Only return JSON, nothing else."""
try:
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=300,
messages=[{"role": "user", "content": user_prompt}],
)
text = msg.content[0].text.strip()
# Extract JSON
import re
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if not json_match:
return None
data = json.loads(json_match.group())
if not data.get("is_lead"):
return None
data["post_url"] = post_url
data["platform"] = platform
data["post_title"] = post_title
return data
except Exception:
return None
FILE:bot/browser.py
"""
Thin wrapper around the `browse` CLI.
All commands return parsed JSON or raise BrowseError.
"""
import subprocess
import json
import time
import re
class BrowseError(Exception):
pass
def _run(args: str, timeout: int = 30) -> dict:
result = subprocess.run(
f"browse {args}",
shell=True,
capture_output=True,
text=True,
timeout=timeout,
)
out = result.stdout.strip()
if not out:
return {}
try:
return json.loads(out)
except json.JSONDecodeError:
return {"raw": out}
def open_url(url: str) -> dict:
return _run(f'open "{url}"', timeout=20)
def snapshot() -> str:
"""Returns the accessibility tree as a raw string."""
data = _run("snapshot", timeout=15)
return data.get("tree", "")
def screenshot(path: str) -> str:
_run(f'screenshot "{path}"', timeout=15)
return path
def click(ref: str) -> bool:
data = _run(f"click @{ref}", timeout=10)
return data.get("clicked", False)
def click_xy(x: int, y: int) -> bool:
data = _run(f"click {x} {y}", timeout=10)
return data.get("clicked", False)
def type_text(text: str) -> bool:
# Escape single quotes in text
safe = text.replace("'", "\\'")
data = _run(f"type '{safe}'", timeout=15)
return data.get("typed", False)
def press(key: str) -> bool:
data = _run(f"press '{key}'", timeout=10)
return bool(data)
def scroll(x: int, y: int, dx: int, dy: int) -> bool:
data = _run(f"scroll {x} {y} {dx} {dy}", timeout=10)
return data.get("scrolled", False)
def get_url() -> str:
data = _run("get url", timeout=10)
return data.get("url", "")
def find_refs(tree: str, pattern: str) -> list[str]:
"""Find element refs matching pattern in the accessibility tree."""
return re.findall(rf'\[(\d+-\d+)\] {pattern}', tree)
def find_text_refs(tree: str, text: str) -> list[str]:
"""Find refs of elements containing given text."""
escaped = re.escape(text)
return re.findall(rf'\[(\d+-\d+)\][^\n]*{escaped}', tree)
def wait_seconds(n: float):
time.sleep(n)
FILE:bot/db.py
import sqlite3
import os
from datetime import datetime, date
from pathlib import Path
DB_PATH = Path(__file__).parent.parent / "logs" / "social_bot.db"
def get_conn():
DB_PATH.parent.mkdir(exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
return conn
def init_db():
with get_conn() as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS replies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL, -- 'x' or 'reddit'
post_url TEXT NOT NULL,
post_title TEXT,
post_snippet TEXT,
reply_text TEXT NOT NULL,
product TEXT, -- 'Solvea' | 'VOC.ai' | None
status TEXT DEFAULT 'posted', -- 'posted' | 'failed' | 'skipped'
error_msg TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS daily_stats (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_date TEXT NOT NULL,
platform TEXT NOT NULL,
target INTEGER,
posted INTEGER DEFAULT 0,
failed INTEGER DEFAULT 0,
skipped INTEGER DEFAULT 0,
duration_secs INTEGER,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_replies_platform_date
ON replies(platform, created_at);
CREATE INDEX IF NOT EXISTS idx_replies_post_url
ON replies(post_url);
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
platform TEXT NOT NULL,
post_url TEXT NOT NULL UNIQUE,
post_title TEXT,
business_type TEXT,
pain_points TEXT, -- JSON array
lead_score INTEGER,
urgency TEXT,
reason TEXT,
replied INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now'))
);
""")
def log_reply(platform, post_url, post_title, post_snippet, reply_text,
product=None, status="posted", error_msg=None):
with get_conn() as conn:
conn.execute("""
INSERT INTO replies
(platform, post_url, post_title, post_snippet, reply_text, product, status, error_msg)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (platform, post_url, post_title, post_snippet, reply_text,
product, status, error_msg))
def already_replied(post_url: str) -> bool:
with get_conn() as conn:
row = conn.execute(
"SELECT id FROM replies WHERE post_url = ? AND status IN ('posted', 'failed')",
(post_url,)
).fetchone()
return row is not None
def get_today_count(platform: str) -> int:
today = date.today().isoformat()
with get_conn() as conn:
row = conn.execute("""
SELECT COUNT(*) as cnt FROM replies
WHERE platform = ? AND status = 'posted'
AND date(created_at) = ?
""", (platform, today)).fetchone()
return row["cnt"] if row else 0
def get_stats(days=30):
with get_conn() as conn:
rows = conn.execute("""
SELECT
date(created_at) as day,
platform,
COUNT(*) FILTER (WHERE status='posted') as posted,
COUNT(*) FILTER (WHERE status='failed') as failed,
COUNT(*) FILTER (WHERE status='skipped') as skipped,
COUNT(*) as total
FROM replies
WHERE date(created_at) >= date('now', ? || ' days')
GROUP BY day, platform
ORDER BY day DESC
""", (f"-{days}",)).fetchall()
return [dict(r) for r in rows]
def save_lead(lead: dict):
import json as _json
with get_conn() as conn:
conn.execute("""
INSERT OR IGNORE INTO leads
(platform, post_url, post_title, business_type, pain_points, lead_score, urgency, reason, replied)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
lead.get("platform"),
lead.get("post_url"),
lead.get("post_title"),
lead.get("business_type"),
_json.dumps(lead.get("pain_points", [])),
lead.get("lead_score"),
lead.get("urgency"),
lead.get("reason"),
1, # we replied
))
def get_leads(days=7):
with get_conn() as conn:
rows = conn.execute("""
SELECT * FROM leads
WHERE date(created_at) >= date('now', ? || ' days')
ORDER BY lead_score DESC, created_at DESC
""", (f"-{days}",)).fetchall()
return [dict(r) for r in rows]
def get_recent_replies(limit=50):
with get_conn() as conn:
rows = conn.execute("""
SELECT id, platform, post_url, post_title, reply_text,
product, status, created_at
FROM replies
ORDER BY created_at DESC
LIMIT ?
""", (limit,)).fetchall()
return [dict(r) for r in rows]
FILE:bot/reddit_bot.py
"""
Reddit reply automation via browse CLI (old.reddit.com).
No API key needed — uses the browser session logged in via Google OAuth.
"""
import re
import time
import logging
from typing import List, Tuple
from . import browser as B
from .ai_engine import generate_reply, analyze_lead
from .db import log_reply, already_replied, get_today_count, save_lead
logger = logging.getLogger(__name__)
LOGIN_URL = "https://old.reddit.com/login"
BASE_URL = "https://old.reddit.com"
# ── Login ─────────────────────────────────────────────────────────────────────
def _is_logged_in() -> bool:
tree = B.snapshot()
return "mguozhen" in tree or "logout" in tree.lower()
def _login_google():
"""Trigger Google OAuth login on old.reddit.com."""
B.open_url(LOGIN_URL)
B.wait_seconds(3)
if _is_logged_in():
return True
# Click "Continue with Google" button
tree = B.snapshot()
google_refs = B.find_text_refs(tree, "Google")
if not google_refs:
# Try navigating directly to Google OAuth via new Reddit then come back
B.open_url("https://www.reddit.com/login")
B.wait_seconds(3)
tree = B.snapshot()
google_refs = B.find_text_refs(tree, "Continue as Hunter")
if not google_refs:
google_refs = B.find_text_refs(tree, "mguozhen")
if google_refs:
B.click(google_refs[0])
B.wait_seconds(5)
# May hit Google account chooser
tree = B.snapshot()
hunter_refs = B.find_text_refs(tree, "Hunter G")
if not hunter_refs:
hunter_refs = B.find_text_refs(tree, "mguozhen")
if hunter_refs:
B.click(hunter_refs[0])
B.wait_seconds(5)
# Verify login
B.open_url(BASE_URL)
B.wait_seconds(3)
return _is_logged_in()
def _ensure_logged_in() -> bool:
B.open_url(BASE_URL)
B.wait_seconds(3)
if _is_logged_in():
logger.info("Reddit: already logged in")
return True
logger.info("Reddit: not logged in, attempting Google OAuth...")
result = _login_google()
if result:
logger.info("Reddit: login successful")
else:
logger.error("Reddit: login failed")
return result
# ── Post scraping ──────────────────────────────────────────────────────────────
def _get_subreddit_posts(subreddit: str) -> List[dict]:
"""
Browse /r/subreddit/new/ and collect (title, comment_count) for each post.
Returns list of dicts with {title, comment_count}. No navigation needed.
"""
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(3)
tree = B.snapshot()
posts = []
# In old Reddit each post row has: title link followed by "submitted X ago" then "N comments"
# Pair title links with comment links by scanning "submitted" anchors
submitted_positions = [m.start() for m in re.finditer(r'\bsubmitted\b', tree)]
comment_positions = [(m.start(), m.group(1)) for m in
re.finditer(r'\[(\d+-\d+)\] link: (\d+ comments?)', tree)]
seen_titles = set()
for pos in submitted_positions[:30]:
# Title link: last link before 'submitted'
chunk_before = tree[max(0, pos - 1500):pos]
links_before = re.findall(r'\[(\d+-\d+)\] link: ([^\n]{15,200})', chunk_before)
if not links_before:
continue
_ref, title = links_before[-1]
title = title.strip()
skip_words = ["Submit a new", "Welcome to", "How To Get", "Contacting Amazon",
"About /r/", "wiki", "Discord", "/r/", "http", "View Poll"]
if any(s.lower() in title.lower() for s in skip_words):
continue
if title in seen_titles:
continue
# Comment count: first comment link after 'submitted'
comment_count = 0
for cpos, cref in comment_positions:
if cpos > pos:
m = re.search(r'(\d+)', tree[cpos:cpos+100])
comment_count = int(m.group(1)) if m else 0
break
seen_titles.add(title)
posts.append({"title": title, "comment_count": comment_count})
return posts[:20]
def _navigate_and_get_content(post_index: int, subreddit: str) -> Tuple[str, str]:
"""
From the subreddit listing, click the Nth comment link to open post.
Returns (url, snippet).
"""
tree = B.snapshot()
comment_links = re.findall(r'\[(\d+-\d+)\] link: \d+ comments?', tree)
if post_index >= len(comment_links):
return "", ""
B.click(comment_links[post_index])
B.wait_seconds(3)
url = B.get_url()
tree2 = B.snapshot()
text_blocks = re.findall(r'StaticText: ([^\n]{20,})', tree2)
snippet = " ".join(text_blocks[:10])[:700]
return url, snippet
# ── Commenting ─────────────────────────────────────────────────────────────────
def _post_comment(reply_text: str) -> bool:
"""
Assumes we're already on an old.reddit.com post page.
Finds the comment textarea, types the reply, clicks save.
"""
tree = B.snapshot()
# Find comment textarea ref
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox(?!\: search)', tree)
if not textarea_refs:
# Fallback: look for any textarea near "save" button
save_refs = re.findall(r'\[(\d+-\d+)\] button: save', tree, re.I)
if not save_refs:
logger.warning("Reddit: no comment form found")
return False
# Find textarea before save button in tree
save_idx = tree.find(save_refs[0])
chunk = tree[max(0, save_idx - 2000):save_idx]
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox', chunk)
if not textarea_refs:
return False
B.click(textarea_refs[-1])
B.wait_seconds(1)
# Type reply paragraph by paragraph
paragraphs = reply_text.split("\n\n")
for i, para in enumerate(paragraphs):
# Remove dollar signs to avoid shell variable expansion
safe = para.replace("$", "").strip()
if safe:
B.type_text(safe)
if i < len(paragraphs) - 1:
B.press("Enter")
B.press("Enter")
B.wait_seconds(1)
# Find and click save button
tree = B.snapshot()
save_refs = re.findall(r'\[(\d+-\d+)\] button: save', tree, re.I)
if not save_refs:
return False
B.click(save_refs[0])
B.wait_seconds(4)
# Verify: comment should appear with "mguozhen" and "just now"
confirm_tree = B.snapshot()
return "mguozhen" in confirm_tree and (
"just now" in confirm_tree or "1 minute ago" in confirm_tree
)
# ── Main ──────────────────────────────────────────────────────────────────────
def run(config: dict) -> dict:
"""
Main entry point. Returns summary dict.
config: from config.json["reddit"]
"""
target = config["daily_target"]
subreddits = config["subreddits"]
delay = config["min_delay_seconds"]
summary = {"posted": 0, "failed": 0, "skipped": 0, "target": target}
if not _ensure_logged_in():
logger.error("Reddit: cannot proceed without login")
return summary
today_count = get_today_count("reddit")
if today_count >= target:
logger.info(f"Reddit: already hit target ({today_count}/{target})")
summary["posted"] = today_count
return summary
for subreddit in subreddits:
if get_today_count("reddit") >= target:
break
logger.info(f"Reddit: scanning r/{subreddit}")
posts = _get_subreddit_posts(subreddit)
# We're now on the subreddit listing page
visited = 0
for post in posts:
if get_today_count("reddit") >= target:
break
# From listing, click Nth comment link (fresh snapshot each time)
try:
tree = B.snapshot()
comment_links = re.findall(r'\[(\d+-\d+)\] link: \d+ comments?', tree)
if visited >= len(comment_links):
break
B.click(comment_links[visited])
B.wait_seconds(3)
post_url = B.get_url()
except Exception as e:
logger.warning(f"Reddit: nav failed — {e}")
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(3)
visited += 1
summary["skipped"] += 1
continue
visited += 1
if already_replied(post_url):
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(2)
summary["skipped"] += 1
continue
# Get post content — skip sidebar, anchor on post title
tree = B.snapshot()
title_idx = tree.find(post["title"][:40])
if title_idx > 0:
# Extract StaticText after the title (post body + comments)
chunk = tree[title_idx:title_idx + 3000]
else:
chunk = tree[len(tree) // 2:] # fallback: second half of tree
text_blocks = re.findall(r'StaticText: ([^\n]{15,})', chunk)
# Skip meta lines (submitted, by, share, save, etc.)
meta = {"submitted", "by", "share", "save", "hide", "report",
"crosspost", "sorted by:", "best", "formatting help"}
clean = [t for t in text_blocks if t.strip().lower() not in meta]
snippet = " ".join(clean[:12])[:700]
reply_text, product = generate_reply(
post_title=post["title"],
post_content=snippet,
platform="reddit"
)
if not reply_text:
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(2)
summary["skipped"] += 1
continue
success = _post_comment(reply_text)
if success:
log_reply("reddit", post_url, post["title"],
snippet[:200], reply_text, product, "posted")
summary["posted"] += 1
logger.info(f"Reddit: posted #{summary['posted']} — {post['title'][:60]}")
# Lead analysis
lead = analyze_lead(post["title"], snippet, post_url, "reddit")
if lead:
save_lead(lead)
logger.info(f"Reddit: 🎯 lead saved score={lead.get('lead_score')} urgency={lead.get('urgency')}")
else:
log_reply("reddit", post_url, post["title"],
snippet[:200], reply_text, product, "failed",
"comment not confirmed")
summary["failed"] += 1
logger.warning(f"Reddit: comment failed — {post['title'][:60]}")
B.open_url(f"{BASE_URL}/r/{subreddit}/new/")
B.wait_seconds(delay if success else 10)
return summary
FILE:bot/x_bot.py
"""
X/Twitter reply automation via browse CLI.
Searches for relevant posts and posts replies as @VocAiSage.
"""
import re
import time
import logging
from typing import List
from . import browser as B
from .ai_engine import generate_reply, analyze_lead
from .db import log_reply, already_replied, get_today_count, save_lead
logger = logging.getLogger(__name__)
LOGIN_URL = "https://x.com/login"
SEARCH_URL = "https://x.com/search?q={query}&src=typed_query&f=live"
def _is_logged_in() -> bool:
tree = B.snapshot()
return "VocAiSage" in tree or "Hunter Guo" in tree
def _login_if_needed():
"""Open X and check login. If not logged in, open login page for manual auth."""
B.open_url("https://x.com")
B.wait_seconds(3)
if _is_logged_in():
logger.info("X: already logged in")
return True
logger.warning("X: not logged in — opening login page")
B.open_url(LOGIN_URL)
# Wait up to 60s for user to log in (for first-run setup)
for _ in range(12):
B.wait_seconds(5)
if _is_logged_in():
logger.info("X: login confirmed")
return True
logger.error("X: login timeout")
return False
def _search_posts(query: str) -> List[dict]:
"""Search X and return list of {snippet, time_ref} dicts."""
url = SEARCH_URL.format(query=query.replace(" ", "+"))
B.open_url(url)
B.wait_seconds(3)
posts = []
tree = B.snapshot()
# Find article start positions in the full tree
article_positions = [(m.start(), m.group(1)) for m in
re.finditer(r'\[(\d+-\d+)\] article:', tree)]
for i, (pos, article_ref) in enumerate(article_positions[:15]):
# Article block ends where next top-level sibling starts
next_pos = article_positions[i + 1][0] if i + 1 < len(article_positions) else len(tree)
block = tree[pos:next_pos]
# Time link: [ref] link: Mar 16 OR link: 5h OR link: just now
time_match = re.search(
r'\[(\d+-\d+)\] link: (?:(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\s+\d+|\d+[hm]|just now)',
block
)
if not time_match:
continue
time_ref = time_match.group(1)
# Snippet from StaticText nodes (skip very short ones like emojis/counts)
texts = re.findall(r'StaticText: ([^\n]{10,})', block)
snippet = " ".join(texts[:6])[:350]
posts.append({
"article_ref": article_ref,
"time_ref": time_ref,
"snippet": snippet,
})
return posts
def _reply_current_page(reply_text: str) -> bool:
"""Type and submit a reply on the currently open tweet page."""
tree = B.snapshot()
# Find reply textbox
boxes = re.findall(r'\[(\d+-\d+)\] textbox: Post text', tree)
if not boxes:
return False
B.click(boxes[0])
B.wait_seconds(1)
# Type reply paragraph by paragraph
paragraphs = reply_text.split("\n\n")
for i, para in enumerate(paragraphs):
safe_para = para.replace("$", "")
B.type_text(safe_para)
if i < len(paragraphs) - 1:
B.press("Enter")
B.press("Enter")
B.wait_seconds(1)
# Find and click reply button
tree = B.snapshot()
reply_btns = re.findall(r'\[(\d+-\d+)\] button: Reply', tree)
if len(reply_btns) >= 2:
B.click(reply_btns[-1])
B.wait_seconds(3)
confirm_tree = B.snapshot()
return "Your post was sent" in confirm_tree or "post was sent" in confirm_tree.lower()
return False
def run(config: dict) -> dict:
"""
Main entry point. Returns summary dict.
config: from config.json["x"]
"""
target = config["daily_target"]
queries = config["search_queries"]
delay = config["min_delay_seconds"]
summary = {"posted": 0, "failed": 0, "skipped": 0, "target": target}
if not _login_if_needed():
logger.error("X: cannot proceed without login")
return summary
today_count = get_today_count("x")
if today_count >= target:
logger.info(f"X: already hit target ({today_count}/{target})")
summary["posted"] = today_count
return summary
for query in queries:
if get_today_count("x") >= target:
break
logger.info(f"X: searching '{query}'")
posts = _search_posts(query)
for post in posts:
if get_today_count("x") >= target:
break
snippet = post.get("snippet", "")
if not snippet or len(snippet) < 30:
continue
# Build a fake URL key for dedup (we don't have URL yet)
# We'll update after opening
reply_text, product = generate_reply(
post_title=query,
post_content=snippet,
platform="x"
)
if not reply_text:
summary["skipped"] += 1
continue
# Open tweet, get real URL, dedup check, then reply (single open)
if not post.get("time_ref"):
summary["skipped"] += 1
continue
B.click(post["time_ref"])
B.wait_seconds(3)
real_url = B.get_url()
if already_replied(real_url):
B.press("Alt+Left")
B.wait_seconds(2)
summary["skipped"] += 1
continue
# Already on tweet page — reply directly
success = _reply_current_page(reply_text)
if success:
log_reply("x", real_url, query, snippet, reply_text, product, "posted")
summary["posted"] += 1
logger.info(f"X: posted reply #{summary['posted']} — {real_url}")
# Lead analysis
lead = analyze_lead(query, snippet, real_url, "x")
if lead:
save_lead(lead)
logger.info(f"X: 🎯 lead saved score={lead.get('lead_score')} urgency={lead.get('urgency')}")
time.sleep(delay)
else:
log_reply("x", real_url, query, snippet, reply_text, product, "failed")
summary["failed"] += 1
logger.warning(f"X: failed to post — {real_url}")
time.sleep(10)
return summary
FILE:config.json
{
"x": {
"username": "@VocAiSage",
"daily_target": 20,
"min_delay_seconds": 300,
"search_queries": [
"customer service AI agent ecommerce",
"AI support agent Amazon seller",
"ecommerce customer support automation",
"Amazon FBA AI tools",
"AI ecommerce chatbot",
"Amazon seller review analysis",
"product review AI analysis",
"Amazon review management tools",
"customer service agent AI startup",
"ecommerce AI automation"
]
},
"reddit": {
"username": "mguozhen",
"daily_target": 10,
"min_delay_seconds": 600,
"subreddits": [
"FulfillmentByAmazon",
"AmazonSeller",
"ecommerce",
"Entrepreneur",
"AmazonFBA"
]
},
"products": {
"Solvea": {
"description": "AI customer service agent for Amazon/Shopify sellers — handles buyer queries autonomously using product and order data",
"trigger_keywords": [
"customer service", "support agent", "ai agent", "chatbot",
"customer support", "buyer queries", "customer queries",
"support automation", "helpdesk", "ticket", "response time"
]
},
"VOC.ai": {
"description": "Amazon review intelligence tool — clusters review pain points by product attribute, maps to BSR impact, surfaces optimization suggestions",
"trigger_keywords": [
"amazon review", "review analysis", "product review", "customer feedback",
"review management", "1-star", "negative review", "vine review",
"review sentiment", "listing optimization", "bsr", "browse node",
"review response", "review insight"
]
}
},
"reply_style": {
"tone": "knowledgeable seller/builder sharing experience",
"max_length_x": 260,
"max_length_reddit": 400,
"rules": [
"Lead with a genuine insight or question about the post",
"Mention product naturally as 'what we use/built' — not as an ad",
"Never start with 'Great post!' or 'I agree!'",
"Sound like a real seller who has faced this problem",
"If post is not directly relevant, skip — do not force a mention"
]
}
}
FILE:dashboard/app.py
#!/usr/bin/env python3
"""
Social Bot Dashboard — Flask web app.
Run: python dashboard/app.py
Open: http://localhost:5050
"""
import sys
import json
from pathlib import Path
from datetime import date, timedelta
from flask import Flask, render_template, jsonify
from flask_cors import CORS
sys.path.insert(0, str(Path(__file__).parent.parent))
from bot.db import get_stats, get_recent_replies, get_today_count, init_db
app = Flask(__name__)
CORS(app)
CONFIG = json.loads((Path(__file__).parent.parent / "config.json").read_text())
@app.route("/")
def index():
return render_template("index.html")
@app.route("/api/overview")
def api_overview():
today = date.today().isoformat()
yesterday = (date.today() - timedelta(days=1)).isoformat()
stats = get_stats(days=30)
today_stats = [s for s in stats if s["day"] == today]
ytd_stats = [s for s in stats if s["day"] == yesterday]
def sum_field(rows, field):
return sum(r.get(field, 0) for r in rows)
x_today = next((s for s in today_stats if s["platform"] == "x"), {})
r_today = next((s for s in today_stats if s["platform"] == "reddit"), {})
x_ytd = next((s for s in ytd_stats if s["platform"] == "x"), {})
r_ytd = next((s for s in ytd_stats if s["platform"] == "reddit"), {})
return jsonify({
"today": {
"x_posted": x_today.get("posted", 0),
"x_target": CONFIG["x"]["daily_target"],
"reddit_posted": r_today.get("posted", 0),
"reddit_target": CONFIG["reddit"]["daily_target"],
"total_posted": x_today.get("posted", 0) + r_today.get("posted", 0),
},
"yesterday": {
"x_posted": x_ytd.get("posted", 0),
"reddit_posted": r_ytd.get("posted", 0),
},
"total_all_time": sum_field(stats, "posted"),
"solvea_mentions": _count_product("Solvea"),
"voc_mentions": _count_product("VOC.ai"),
})
@app.route("/api/chart/daily")
def api_chart_daily():
stats = get_stats(days=30)
# Build per-day totals
days = {}
for s in stats:
d = s["day"]
if d not in days:
days[d] = {"x": 0, "reddit": 0}
days[d][s["platform"]] = s.get("posted", 0)
labels = sorted(days.keys())
return jsonify({
"labels": labels,
"x": [days[d]["x"] for d in labels],
"reddit": [days[d]["reddit"] for d in labels],
})
@app.route("/api/replies")
def api_replies():
replies = get_recent_replies(limit=100)
return jsonify(replies)
def _count_product(name: str) -> int:
from bot.db import get_conn
with get_conn() as conn:
row = conn.execute(
"SELECT COUNT(*) as cnt FROM replies WHERE product=? AND status='posted'",
(name,)
).fetchone()
return row["cnt"] if row else 0
if __name__ == "__main__":
init_db()
print("Dashboard: http://localhost:5050")
app.run(host="0.0.0.0", port=5050, debug=False)
FILE:dashboard/templates/index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Social Bot Dashboard</title>
<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/chart.umd.min.js"></script>
<style>
:root {
--bg: #0f1117; --surface: #1a1d27; --border: #2d3148;
--text: #e2e8f0; --muted: #8892a4; --accent: #6366f1;
--green: #10b981; --yellow: #f59e0b; --red: #ef4444;
--x-color: #1d9bf0; --reddit-color: #ff4500;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body { background: var(--bg); color: var(--text); font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif; }
header { background: var(--surface); border-bottom: 1px solid var(--border); padding: 16px 32px; display: flex; align-items: center; gap: 12px; }
header h1 { font-size: 18px; font-weight: 600; }
.badge { background: var(--accent); color: #fff; font-size: 11px; padding: 2px 8px; border-radius: 99px; }
.live-dot { width: 8px; height: 8px; border-radius: 50%; background: var(--green); animation: pulse 2s infinite; }
@keyframes pulse { 0%,100%{opacity:1} 50%{opacity:.3} }
main { padding: 24px 32px; max-width: 1400px; margin: 0 auto; }
/* Stats grid */
.stats { display: grid; grid-template-columns: repeat(auto-fit, minmax(180px, 1fr)); gap: 16px; margin-bottom: 28px; }
.card { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; padding: 20px; }
.card .label { font-size: 12px; color: var(--muted); text-transform: uppercase; letter-spacing: .5px; margin-bottom: 8px; }
.card .value { font-size: 32px; font-weight: 700; }
.card .sub { font-size: 12px; color: var(--muted); margin-top: 4px; }
.card.accent { border-color: var(--accent); }
.progress-bar { height: 6px; background: var(--border); border-radius: 3px; margin-top: 12px; overflow: hidden; }
.progress-fill { height: 100%; border-radius: 3px; transition: width .6s ease; }
.x-fill { background: var(--x-color); }
.reddit-fill { background: var(--reddit-color); }
/* Chart */
.chart-section { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; padding: 24px; margin-bottom: 28px; }
.chart-section h2 { font-size: 14px; font-weight: 600; margin-bottom: 20px; color: var(--muted); text-transform: uppercase; letter-spacing: .5px; }
canvas { max-height: 240px; }
/* Table */
.table-section { background: var(--surface); border: 1px solid var(--border); border-radius: 12px; padding: 24px; }
.table-section h2 { font-size: 14px; font-weight: 600; margin-bottom: 16px; color: var(--muted); text-transform: uppercase; letter-spacing: .5px; }
.table-wrap { overflow-x: auto; }
table { width: 100%; border-collapse: collapse; font-size: 13px; }
th { text-align: left; color: var(--muted); font-weight: 500; font-size: 11px; text-transform: uppercase; letter-spacing: .5px; padding: 8px 12px; border-bottom: 1px solid var(--border); }
td { padding: 10px 12px; border-bottom: 1px solid rgba(45,49,72,.5); vertical-align: top; }
tr:last-child td { border-bottom: none; }
tr:hover td { background: rgba(99,102,241,.05); }
.platform-badge { display: inline-block; padding: 2px 8px; border-radius: 4px; font-size: 11px; font-weight: 600; }
.p-x { background: rgba(29,155,240,.15); color: var(--x-color); }
.p-reddit { background: rgba(255,69,0,.15); color: var(--reddit-color); }
.product-badge { display: inline-block; padding: 2px 8px; border-radius: 4px; font-size: 11px; }
.p-solvea { background: rgba(99,102,241,.2); color: #a5b4fc; }
.p-voc { background: rgba(16,185,129,.2); color: #6ee7b7; }
.status-dot { width: 7px; height: 7px; border-radius: 50%; display: inline-block; margin-right: 5px; }
.s-posted { background: var(--green); }
.s-failed { background: var(--red); }
.s-skipped { background: var(--yellow); }
.reply-text { max-width: 360px; color: var(--muted); line-height: 1.5; }
a { color: var(--accent); text-decoration: none; }
a:hover { text-decoration: underline; }
.refresh-btn { margin-left: auto; background: var(--border); border: none; color: var(--text); padding: 6px 14px; border-radius: 6px; cursor: pointer; font-size: 13px; }
.refresh-btn:hover { background: var(--accent); }
</style>
</head>
<body>
<header>
<div class="live-dot"></div>
<h1>Social Bot Dashboard</h1>
<span class="badge">Solvea + VOC.ai</span>
<button class="refresh-btn" onclick="loadAll()">↻ Refresh</button>
</header>
<main>
<!-- Stats -->
<div class="stats">
<div class="card accent">
<div class="label">Total Today</div>
<div class="value" id="total-today">—</div>
<div class="sub" id="total-sub">Loading...</div>
</div>
<div class="card">
<div class="label">X / Twitter</div>
<div class="value" id="x-today">—</div>
<div class="sub" id="x-sub">target: 20</div>
<div class="progress-bar"><div class="progress-fill x-fill" id="x-prog" style="width:0%"></div></div>
</div>
<div class="card">
<div class="label">Reddit</div>
<div class="value" id="reddit-today">—</div>
<div class="sub" id="reddit-sub">target: 10</div>
<div class="progress-bar"><div class="progress-fill reddit-fill" id="reddit-prog" style="width:0%"></div></div>
</div>
<div class="card">
<div class="label">Solvea Mentions</div>
<div class="value" id="solvea-total">—</div>
<div class="sub">all time</div>
</div>
<div class="card">
<div class="label">VOC.ai Mentions</div>
<div class="value" id="voc-total">—</div>
<div class="sub">all time</div>
</div>
<div class="card">
<div class="label">All Time Replies</div>
<div class="value" id="all-time">—</div>
<div class="sub">posted</div>
</div>
</div>
<!-- Chart -->
<div class="chart-section">
<h2>Daily Replies — Last 30 Days</h2>
<canvas id="chart"></canvas>
</div>
<!-- Table -->
<div class="table-section">
<h2>Recent Replies</h2>
<div class="table-wrap">
<table>
<thead>
<tr>
<th>Time</th>
<th>Platform</th>
<th>Post</th>
<th>Reply</th>
<th>Product</th>
<th>Status</th>
</tr>
</thead>
<tbody id="replies-body">
<tr><td colspan="6" style="text-align:center;color:var(--muted);padding:32px">Loading...</td></tr>
</tbody>
</table>
</div>
</div>
</main>
<script>
let chartInstance = null;
async function loadOverview() {
const d = await fetch('/api/overview').then(r => r.json());
document.getElementById('total-today').textContent = d.today.total_posted;
document.getElementById('total-sub').textContent = `yesterday: d.yesterday.x_posted + d.yesterday.reddit_posted`;
document.getElementById('x-today').textContent = d.today.x_posted;
document.getElementById('x-sub').textContent = `target: d.today.x_target`;
document.getElementById('reddit-today').textContent = d.today.reddit_posted;
document.getElementById('reddit-sub').textContent = `target: d.today.reddit_target`;
document.getElementById('solvea-total').textContent = d.solvea_mentions;
document.getElementById('voc-total').textContent = d.voc_mentions;
document.getElementById('all-time').textContent = d.total_all_time;
document.getElementById('x-prog').style.width = Math.min(100, (d.today.x_posted / d.today.x_target) * 100) + '%';
document.getElementById('reddit-prog').style.width = Math.min(100, (d.today.reddit_posted / d.today.reddit_target) * 100) + '%';
}
async function loadChart() {
const d = await fetch('/api/chart/daily').then(r => r.json());
const labels = d.labels.map(l => l.slice(5)); // MM-DD
const ctx = document.getElementById('chart').getContext('2d');
if (chartInstance) chartInstance.destroy();
chartInstance = new Chart(ctx, {
type: 'bar',
data: {
labels,
datasets: [
{ label: 'X/Twitter', data: d.x, backgroundColor: 'rgba(29,155,240,.7)', stack: 'a' },
{ label: 'Reddit', data: d.reddit, backgroundColor: 'rgba(255,69,0,.7)', stack: 'a' },
]
},
options: {
responsive: true, maintainAspectRatio: true,
plugins: { legend: { labels: { color: '#8892a4', font: { size: 12 } } } },
scales: {
x: { stacked: true, grid: { color: '#2d3148' }, ticks: { color: '#8892a4' } },
y: { stacked: true, grid: { color: '#2d3148' }, ticks: { color: '#8892a4', stepSize: 5 } }
}
}
});
}
async function loadReplies() {
const rows = await fetch('/api/replies').then(r => r.json());
const tbody = document.getElementById('replies-body');
if (!rows.length) {
tbody.innerHTML = '<tr><td colspan="6" style="text-align:center;color:var(--muted);padding:32px">No replies yet today</td></tr>';
return;
}
tbody.innerHTML = rows.map(r => {
const time = r.created_at ? r.created_at.slice(0, 16).replace('T', ' ') : '—';
const plat = r.platform === 'x'
? '<span class="platform-badge p-x">𝕏</span>'
: '<span class="platform-badge p-reddit">Reddit</span>';
const title = r.post_url
? `<a href="r.post_url" target="_blank">(r.post_title || r.post_url).slice(0, 60)''</a>`
: '—';
const reply = `<div class="reply-text">(r.reply_text || '').slice(0, 120)''</div>`;
const prod = r.product
? `<span class="product-badge 'p-voc'">r.product</span>`
: '<span style="color:var(--muted)">—</span>';
const statusClass = { posted: 's-posted', failed: 's-failed', skipped: 's-skipped' }[r.status] || '';
const status = `<span class="status-dot statusClass"></span>r.status`;
return `<tr><td style="white-space:nowrap;color:var(--muted)">time</td><td>plat</td><td>title</td><td>reply</td><td>prod</td><td>status</td></tr>`;
}).join('');
}
async function loadAll() {
await Promise.all([loadOverview(), loadChart(), loadReplies()]);
}
loadAll();
setInterval(loadOverview, 30000); // refresh stats every 30s
setInterval(loadReplies, 60000); // refresh table every 60s
</script>
</body>
</html>
FILE:install.sh
#!/bin/bash
# ============================================================
# Social Reply Bot — One-command installer
# Works on any macOS with Homebrew + Python 3
#
# Usage:
# curl -fsSL https://raw.githubusercontent.com/mguozhen/social-bot/main/install.sh | bash
# Or after cloning:
# bash install.sh
# ============================================================
set -e
REPO_URL="https://github.com/mguozhen/social-bot.git"
INSTALL_DIR="$HOME/social-bot"
PYTHON="python3"
echo ""
echo "╔══════════════════════════════════════╗"
echo "║ Social Reply Bot Installer ║"
echo "╚══════════════════════════════════════╝"
echo ""
# ── 1. Clone or update repo ──────────────────────────────────
echo "[1/6] Getting latest code..."
if [ -d "$INSTALL_DIR/.git" ]; then
cd "$INSTALL_DIR"
git pull --ff-only
echo " Updated existing repo at $INSTALL_DIR"
else
git clone "$REPO_URL" "$INSTALL_DIR"
echo " Cloned to $INSTALL_DIR"
fi
cd "$INSTALL_DIR"
# ── 2. Python dependencies ───────────────────────────────────
echo "[2/6] Installing Python packages..."
$PYTHON -m pip install -r requirements.txt --quiet
echo " Done: anthropic, flask, flask-cors, python-dotenv"
# ── 3. browse CLI ────────────────────────────────────────────
echo "[3/6] Checking browse CLI..."
if ! command -v browse &>/dev/null; then
if command -v npm &>/dev/null; then
npm install -g @browserbasehq/browse-cli --silent
echo " Installed browse CLI"
else
echo " WARNING: npm not found. Install Node.js then run:"
echo " npm install -g @browserbasehq/browse-cli"
fi
else
echo " browse CLI already installed ($(browse --version 2>/dev/null || echo 'ok'))"
fi
# ── 4. .env file ─────────────────────────────────────────────
echo "[4/6] Setting up credentials..."
if [ ! -f "$INSTALL_DIR/.env" ]; then
cp "$INSTALL_DIR/.env.template" "$INSTALL_DIR/.env"
echo ""
echo " ┌─────────────────────────────────────────────┐"
echo " │ ACTION REQUIRED: Add your API key │"
echo " │ │"
echo " │ nano $INSTALL_DIR/.env │"
echo " │ Set: ANTHROPIC_API_KEY=sk-ant-... │"
echo " └─────────────────────────────────────────────┘"
echo ""
read -p " Press Enter after editing .env to continue..." _
else
echo " .env already configured"
fi
# ── 5. Database init ─────────────────────────────────────────
echo "[5/6] Initializing database..."
$PYTHON -c "
import sys; sys.path.insert(0, '$INSTALL_DIR')
from bot.db import init_db; init_db()
print(' DB ready: $INSTALL_DIR/logs/social_bot.db')
"
# ── 6. LaunchAgent ───────────────────────────────────────────
echo "[6/6] Installing LaunchAgent (daily 10:05 AM)..."
PLIST_SRC="$INSTALL_DIR/launchd/com.socialbot.daily.plist"
PLIST_DST="$HOME/Library/LaunchAgents/com.socialbot.daily.plist"
mkdir -p "$HOME/Library/LaunchAgents"
# Inject real python3 path and API key
PYTHON_PATH="$(which $PYTHON)"
API_KEY="$(grep ANTHROPIC_API_KEY "$INSTALL_DIR/.env" | cut -d= -f2 | tr -d ' ')"
sed \
-e "s|/usr/bin/python3|$PYTHON_PATH|g" \
-e "s|FILL_IN|$API_KEY|g" \
-e "s|/Users/guozhen|$HOME|g" \
"$PLIST_SRC" > "$PLIST_DST"
launchctl unload "$PLIST_DST" 2>/dev/null || true
launchctl load "$PLIST_DST"
echo " LaunchAgent loaded — will run daily at 10:05 AM"
# ── Done ─────────────────────────────────────────────────────
echo ""
echo "╔══════════════════════════════════════════════════════╗"
echo "║ Installation complete! ║"
echo "║ ║"
echo "║ Next steps: ║"
echo "║ 1. Open browser and log in to Reddit + X manually ║"
echo "║ (just visit the sites — sessions are shared) ║"
echo "║ 2. Test run: ║"
echo "║ cd $HOME/social-bot ║"
echo "║ python3 run_daily.py --x-only ║"
echo "║ 3. Dashboard: python3 dashboard/app.py ║"
echo "║ Open: http://localhost:5050 ║"
echo "╚══════════════════════════════════════════════════════╝"
echo ""
FILE:requirements.txt
anthropic>=0.40.0
flask>=3.0.0
flask-cors>=4.0.0
python-dotenv>=1.0.0
FILE:run_daily.py
#!/usr/bin/env python3
"""
Daily social media reply bot.
Scheduled via macOS LaunchAgent at 10:05 AM.
Runs standalone — no Claude Code, no confirmation prompts.
Usage:
python run_daily.py # run both platforms
python run_daily.py --x-only
python run_daily.py --reddit-only
"""
import sys
import json
import logging
import time
import argparse
from pathlib import Path
from datetime import datetime
# Load env vars from .env if present
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
except ImportError:
pass
# Setup logging
LOG_DIR = Path(__file__).parent / "logs"
LOG_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler(LOG_DIR / f"run_{datetime.now():%Y-%m-%d}.log"),
logging.StreamHandler(sys.stdout),
]
)
logger = logging.getLogger("main")
# Import bot modules
sys.path.insert(0, str(Path(__file__).parent))
from bot.db import init_db
from bot import x_bot, reddit_bot
CONFIG = json.loads((Path(__file__).parent / "config.json").read_text())
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--x-only", action="store_true")
parser.add_argument("--reddit-only", action="store_true")
args = parser.parse_args()
run_x = not args.reddit_only
run_reddit = not args.x_only
init_db()
start = time.time()
results = {}
logger.info("=" * 60)
logger.info(f"Daily bot started — {datetime.now():%Y-%m-%d %H:%M}")
logger.info("=" * 60)
# ── X/Twitter ──────────────────────────────────────────────
if run_x:
logger.info("Starting X/Twitter run...")
try:
results["x"] = x_bot.run(CONFIG["x"])
logger.info(f"X done: {results['x']}")
except Exception as e:
logger.error(f"X run failed: {e}", exc_info=True)
results["x"] = {"error": str(e)}
# ── Reddit ─────────────────────────────────────────────────
if run_reddit:
logger.info("Starting Reddit run...")
try:
results["reddit"] = reddit_bot.run(CONFIG["reddit"])
logger.info(f"Reddit done: {results['reddit']}")
except Exception as e:
logger.error(f"Reddit run failed: {e}", exc_info=True)
results["reddit"] = {"error": str(e)}
elapsed = int(time.time() - start)
logger.info(f"All done in {elapsed}s — {results}")
# Write today's summary for dashboard
summary_file = LOG_DIR / f"summary_{datetime.now():%Y-%m-%d}.json"
import json as _json
summary_file.write_text(_json.dumps({
"date": datetime.now().isoformat(),
"elapsed_secs": elapsed,
"results": results,
}, indent=2))
if __name__ == "__main__":
main()
FILE:setup.sh
#!/bin/bash
set -e
echo "========================================"
echo " Social Bot Setup"
echo "========================================"
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
# 1. Python deps
echo "[1/5] Installing Python packages..."
pip3 install -r "$SCRIPT_DIR/requirements.txt" --quiet
# 2. browse CLI
echo "[2/5] Checking browse CLI..."
if ! command -v browse &>/dev/null; then
echo " Installing @browserbasehq/browse-cli..."
npm install -g @browserbasehq/browse-cli
else
echo " browse CLI already installed: $(browse --version 2>/dev/null || echo 'ok')"
fi
# 3. .env file
echo "[3/5] Setting up .env..."
if [ ! -f "$SCRIPT_DIR/.env" ]; then
cp "$SCRIPT_DIR/.env.template" "$SCRIPT_DIR/.env"
echo " ⚠️ .env created — fill in your credentials:"
echo " nano $SCRIPT_DIR/.env"
else
echo " .env already exists"
fi
# 4. Init database
echo "[4/5] Initializing database..."
python3 -c "
import sys; sys.path.insert(0, '$SCRIPT_DIR')
from bot.db import init_db; init_db()
print(' DB initialized: $SCRIPT_DIR/logs/social_bot.db')
"
# 5. LaunchAgent install
echo "[5/5] Installing LaunchAgent (daily 10:05 AM)..."
PLIST_SRC="$SCRIPT_DIR/launchd/com.socialbot.daily.plist"
PLIST_DST="$HOME/Library/LaunchAgents/com.socialbot.daily.plist"
# Update Python path in plist to current python3
PYTHON_PATH="$(which python3)"
sed "s|/usr/bin/python3|$PYTHON_PATH|g" "$PLIST_SRC" > "$PLIST_DST"
# Unload if already loaded
launchctl unload "$PLIST_DST" 2>/dev/null || true
launchctl load "$PLIST_DST"
echo " LaunchAgent loaded — next run: tomorrow 10:05 AM"
echo ""
echo "========================================"
echo " Setup complete!"
echo ""
echo " Next steps:"
echo " 1. Fill in Anthropic key: nano $SCRIPT_DIR/.env"
echo " 2. Test run: python3 $SCRIPT_DIR/run_daily.py"
echo " (first run: browser opens for X/Reddit login — log in once)"
echo " 3. Start dashboard: python3 $SCRIPT_DIR/dashboard/app.py"
echo " 4. Open dashboard: http://localhost:5050"
echo ""
echo " For another Mac: scp -r $SCRIPT_DIR/ user@newmac:~/social-bot/"
echo " then run setup.sh + log in once on that machine"
echo "========================================"
FILE:skill.sh
#!/bin/bash
# social-reply-bot skill entry point
# Called by openclaw with user's prompt as $*
set -e
BOT_DIR="$(cd "$(dirname "$0")" && pwd)"
PROMPT="$(echo "$*" | tr '[:upper:]' '[:lower:]')"
# Load env
if [ -f "$BOT_DIR/.env" ]; then
set -a
source "$BOT_DIR/.env"
set +a
fi
# Check ANTHROPIC_API_KEY
if [ -z "$ANTHROPIC_API_KEY" ]; then
echo "ERROR: ANTHROPIC_API_KEY not set. Edit $BOT_DIR/.env"
exit 1
fi
# Check browse CLI
if ! command -v browse &>/dev/null; then
echo "ERROR: browse CLI not found. Run: npm install -g @browserbasehq/browse-cli"
exit 1
fi
cd "$BOT_DIR"
# Route by prompt keywords
if echo "$PROMPT" | grep -qE "warmup|warm up|karma|养号"; then
COUNT=8
if echo "$PROMPT" | grep -qE "[0-9]+"; then
COUNT=$(echo "$PROMPT" | grep -oE "[0-9]+" | head -1)
fi
echo "Running Reddit warmup — target COUNT comments..."
python3 "$BOT_DIR/warmup_reddit.py" "$COUNT"
elif echo "$PROMPT" | grep -qE "leads|客户|potential customer"; then
echo "=== Solvea Leads ==="
python3 -c "
import sys, json; sys.path.insert(0, '.')
from bot.db import get_leads
leads = get_leads(7)
if not leads:
print('No leads found in last 7 days.')
else:
print(f'Found {len(leads)} leads in last 7 days:\n')
for l in leads:
pain = json.loads(l.get('pain_points','[]'))
print(f\"[{l['lead_score']}/10] [{l['urgency'].upper()}] {l['platform'].upper()}\")
print(f\" Title: {l['post_title']}\")
print(f\" Type: {l['business_type']}\")
print(f\" Pain: {', '.join(pain[:2])}\")
print(f\" Why: {l['reason']}\")
print(f\" URL: {l['post_url']}\")
print()
"
elif echo "$PROMPT" | grep -qE "stat|count|how many|report"; then
echo "=== Social Bot Stats ==="
python3 -c "
import sys; sys.path.insert(0, '.')
from bot.db import get_stats, get_today_count
print('Today: X=%d Reddit=%d' % (get_today_count('x'), get_today_count('reddit')))
stats = get_stats(7)
if stats:
print('\nLast 7 days:')
for s in stats[:14]:
print(' %s [%s] posted=%d failed=%d' % (s['day'], s['platform'], s['posted'], s['failed']))
else:
print('No data yet.')
"
elif echo "$PROMPT" | grep -qE "dashboard|ui|open|browser"; then
echo "Starting dashboard at http://localhost:5050 ..."
python3 "$BOT_DIR/dashboard/app.py" &
sleep 2
open http://localhost:5050
echo "Dashboard opened."
elif echo "$PROMPT" | grep -qE "x only|twitter only|tweet only"; then
echo "Running X/Twitter bot..."
python3 "$BOT_DIR/run_daily.py" --x-only
elif echo "$PROMPT" | grep -qE "reddit only"; then
echo "Running Reddit bot..."
python3 "$BOT_DIR/run_daily.py" --reddit-only
else
echo "Running both X and Reddit bots..."
python3 "$BOT_DIR/run_daily.py"
fi
FILE:warmup_reddit.py
#!/usr/bin/env python3
"""
Reddit 账号养号脚本 — 在通用版块发真实评论积累 Karma
"""
import sys, time, re, logging, os
from pathlib import Path
from typing import Optional, List
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent / ".env")
sys.path.insert(0, str(Path(__file__).parent))
import anthropic
from bot import browser as B
from bot.reddit_bot import _ensure_logged_in, _post_comment
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("warmup")
BASE_URL = "https://old.reddit.com"
WARMUP_SUBREDDITS = [
"FreeKarma4U", # 专为新账号互赞设计
"karma", # 同上
"newreddits", # 新版块,门槛低
"CasualConversation", # 轻松聊天,对新账号友好
"self", # 自我分享,极少 automod
"NoStupidQuestions", # 问答,友好社区
"mildlyinteresting", # 轻度过滤
"Showerthoughts", # 脑洞,低门槛
]
MAX_COMMENTS = int(sys.argv[1]) if len(sys.argv) > 1 else 8
DELAY_SUCCESS = 90 # 两条评论之间间隔
DELAY_FAIL = 15
def generate_comment(post_title: str, post_content: str, subreddit: str) -> Optional[str]:
"""用 Claude Haiku 生成一条真实评论(不带任何产品推广)"""
client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
system = """You are a genuine Reddit user. Write short, authentic comments.
Rules:
- Never mention products, companies, or services
- Sound like a real person, not AI
- Be conversational, curious, or share a relatable experience
- 1-3 sentences is ideal
- Casual tone, can use lowercase
- No hashtags, no bullet points
- Never start with "I" as the first word"""
user = f"""Subreddit: r/{subreddit}
Post: {post_title}
Content: {post_content[:500]}
Write one genuine comment. If too political/controversial/personal, reply: SKIP"""
try:
msg = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=120,
messages=[{"role": "user", "content": user}],
system=system,
)
text = msg.content[0].text.strip()
# Remove any leading SKIP explanation
if text.upper().startswith("SKIP"):
return None
return text
except Exception as e:
logger.error(f"Claude error: {e}")
return None
def get_post_urls(subreddit: str, count: int = 8) -> List[dict]:
"""
访问版块 /hot/,收集帖子 URL 列表。
策略:依次点击 comment 链接获取 URL,再返回列表页。
"""
B.open_url(f"{BASE_URL}/r/{subreddit}/hot/")
time.sleep(3)
# 先收集所有帖子标题 + comment 数量
tree = B.snapshot()
submitted_positions = [m.start() for m in re.finditer(r'\bsubmitted\b', tree)]
candidates = []
seen_titles = set()
skip_words = ["Submit a new", "Welcome to", "About /r/", "wiki",
"Discord", "/r/", "http", "View Poll", "rules", "moderator"]
for pos in submitted_positions[:30]:
before = tree[max(0, pos - 1500):pos]
links = re.findall(r'\[(\d+-\d+)\] link: ([^\n]{15,200})', before)
if not links:
continue
_, title = links[-1]
title = title.strip()
if any(s.lower() in title.lower() for s in skip_words):
continue
if title in seen_titles:
continue
# Comment count
after = tree[pos:pos + 400]
cm = re.search(r'\[(\d+-\d+)\] link: (\d+) comments?', after)
if not cm:
continue
n = int(cm.group(2))
if n < 1 or n > 500:
continue
seen_titles.add(title)
candidates.append({"title": title, "comment_count": n})
if len(candidates) >= count:
break
if not candidates:
return []
# 逐一点击 comment 链接获取真实 URL
posts = []
for idx in range(min(len(candidates), count)):
# 每次都要重新获取 snapshot(因为导航后 refs 变了)
B.open_url(f"{BASE_URL}/r/{subreddit}/hot/")
time.sleep(3)
tree = B.snapshot()
comment_links = re.findall(r'\[(\d+-\d+)\] link: \d+ comments?', tree)
if idx >= len(comment_links):
break
B.click(comment_links[idx])
time.sleep(4)
url = B.get_url()
if not url or subreddit.lower() not in url.lower():
continue
# 确保是 old.reddit.com 格式
url = url.replace("www.reddit.com", "old.reddit.com")
candidates[idx]["url"] = url
posts.append(candidates[idx])
logger.info(f" [{idx+1}] {candidates[idx]['title'][:55]}")
return posts
def warmup_post(post: dict, subreddit: str) -> bool:
"""
访问帖子页面,生成评论,发出。
假设我们已经在热帖列表页。
"""
url = post.get("url", "")
if not url:
return False
B.open_url(url)
time.sleep(5) # 多等一点,让评论框加载
tree = B.snapshot()
# 提取帖子内容(跳过 sidebar,从标题后开始)
title_idx = tree.find(post["title"][:35])
if title_idx > 0:
chunk = tree[title_idx: title_idx + 3000]
else:
chunk = tree[3000:6000] # fallback
text_blocks = re.findall(r'StaticText: ([^\n]{20,})', chunk)
meta = {"submitted", "by", "share", "save", "hide", "report",
"crosspost", "sorted by:", "best", "formatting help"}
clean = [t for t in text_blocks if t.strip().lower() not in meta]
snippet = " ".join(clean[:10])[:600]
# 生成评论
comment = generate_comment(post["title"], snippet, subreddit)
if not comment:
logger.info(" → SKIP")
return False
logger.info(f" Comment: {comment[:90]}")
# 检查是否有评论框(说明已登录且帖子未锁定)
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox(?!: search)', tree)
if not textarea_refs:
# 尝试等待更长时间再次检查
logger.info(" → no textarea, waiting 3s and retrying...")
time.sleep(3)
tree = B.snapshot()
textarea_refs = re.findall(r'\[(\d+-\d+)\] textbox(?!: search)', tree)
if not textarea_refs:
logger.warning(" → textarea still not found (locked/not logged in?)")
# Debug: show what's in tree
logger.debug(f"Tree sample: {tree[3000:4500]}")
return False
return _post_comment(comment)
def main():
logger.info(f"=== Reddit 养号开始,目标 {MAX_COMMENTS} 条评论 ===")
if not _ensure_logged_in():
logger.error("Reddit 未登录,退出")
return
posted = 0
failed = 0
skipped = 0
for subreddit in WARMUP_SUBREDDITS:
if posted >= MAX_COMMENTS:
break
logger.info(f"\n[r/{subreddit}] 获取帖子列表...")
posts = get_post_urls(subreddit, count=4)
logger.info(f" 找到 {len(posts)} 个帖子")
for post in posts:
if posted >= MAX_COMMENTS:
break
logger.info(f"\n » {post['title'][:60]} ({post['comment_count']} 评论)")
success = warmup_post(post, subreddit)
if success:
posted += 1
logger.info(f" ✓ 评论发出!累计 {posted}/{MAX_COMMENTS}")
wait = DELAY_SUCCESS + posted * 15
logger.info(f" 等待 {wait}s...")
time.sleep(wait)
else:
skipped_or_failed = "skipped" if not post.get("url") else "failed"
if skipped_or_failed == "failed":
failed += 1
else:
skipped += 1
time.sleep(DELAY_FAIL)
logger.info(f"\n=== 完成 === ✓posted={posted} ✗failed={failed} skip={skipped}")
logger.info("Karma 查看: https://www.reddit.com/user/mguozhen/")
if __name__ == "__main__":
main()
Publish content to X/Twitter, LinkedIn, WeChat Official Account, and Xiaohongshu with one command. Automatically adapts content format for each platform — th...
---
name: multi-platform-publisher
description: "Publish content to X/Twitter, LinkedIn, WeChat Official Account, and Xiaohongshu with one command. Automatically adapts content format for each platform — threads for Twitter, professional articles for LinkedIn, HTML drafts for WeChat, emoji notes for Xiaohongshu. Triggers: publish content, social media, twitter post, linkedin post, wechat article, xiaohongshu note, cross-platform publish, multi-platform"
allowed-tools: Bash
metadata:
openclaw:
homepage: https://github.com/mguozhen/multi-platform-publisher
---
# Multi-Platform Social Media Publisher
Publish content to multiple social media platforms with a single command. Supports X/Twitter, LinkedIn, WeChat Official Account (微信公众号), and Xiaohongshu (小红书).
## Prerequisites
Install dependencies:
```bash
pip3 install requests tweepy Pillow
```
Set credentials via environment variables:
```bash
# Twitter / X
export TWITTER_API_KEY="your-api-key"
export TWITTER_API_SECRET="your-api-secret"
export TWITTER_ACCESS_TOKEN="your-access-token"
export TWITTER_ACCESS_TOKEN_SECRET="your-access-token-secret"
# LinkedIn
export LINKEDIN_ACCESS_TOKEN="your-linkedin-token"
export LINKEDIN_PERSON_URN="urn:li:person:abc123" # optional
# WeChat Official Account
export WECHAT_APPID="your-appid"
export WECHAT_APPSECRET="your-appsecret"
# Xiaohongshu
export XHS_COOKIE="your-cookie-string"
```
Or configure via `~/.openclaw/openclaw.json`:
```json
{
"skills": {
"entries": {
"multi-platform-publisher": {
"env": {
"TWITTER_API_KEY": "...",
"LINKEDIN_ACCESS_TOKEN": "...",
"WECHAT_APPID": "...",
"XHS_COOKIE": "..."
}
}
}
}
}
```
## Quick Start
```bash
# Publish to all configured platforms
python3 {baseDir}/main.py publish --content "Your post content here" --platforms all
# Specific platforms
python3 {baseDir}/main.py publish --content "Your content" --platforms twitter,linkedin
# Publish from a Markdown file
python3 {baseDir}/main.py publish --file /path/to/article.md --platforms all
# Twitter thread for long content
python3 {baseDir}/main.py publish --content "Long article..." --platforms twitter --thread
# Attach images
python3 {baseDir}/main.py publish --content "Check this out!" --platforms all --images photo.jpg
# Preview without publishing (dry run)
python3 {baseDir}/main.py publish --content "Test" --platforms all --dry-run
# Validate credentials
python3 {baseDir}/main.py validate
# List configured platforms
python3 {baseDir}/main.py list-platforms
```
## Supported Platforms
| Platform | Auth Method | Features |
|---|---|---|
| X/Twitter | OAuth 1.0a | Tweets, Threads, Images |
| LinkedIn | OAuth 2.0 | Posts, Articles, Images |
| WeChat Official Account | API Token | Draft creation, Images |
| Xiaohongshu (小红书) | MCP / Cookie | Notes, Images |
## Content Adaptation
Content is automatically adapted for each platform:
- **X/Twitter** — Strips Markdown, splits into 280-char tweets, creates numbered threads for long content
- **LinkedIn** — Formats as professional post, up to 3,000 chars, clean paragraphs
- **WeChat** — Converts to styled HTML article (draft, requires manual publish in dashboard)
- **Xiaohongshu** — Casual tone, emoji injection, topic tags, 1,000 char limit
## Project Structure
```
main.py # CLI entry point
adapters/
base_adapter.py # Abstract base class
twitter_adapter.py # X / Twitter (OAuth 1.0a)
linkedin_adapter.py # LinkedIn (OAuth 2.0)
wechat_adapter.py # WeChat Official Account
xiaohongshu_adapter.py # Xiaohongshu (MCP/Cookie)
utils/
config_loader.py # Credential loading (env > openclaw.json > config.json)
content_adapter.py # Platform-specific content transformation
image_handler.py # Image resize/validate before upload
logger.py # Stderr logger
```
FILE:main.py
#!/usr/bin/env python3
"""
Multi-Platform Social Media Publisher
======================================
CLI entry point. Publishes content to X/Twitter, LinkedIn,
WeChat Official Account, and Xiaohongshu with a single command.
Usage examples
--------------
python3 main.py publish --content "Hello world" --platforms all
python3 main.py publish --file article.md --platforms twitter,linkedin
python3 main.py publish --content "Long post…" --platforms twitter --thread
python3 main.py validate
python3 main.py list-platforms
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
# ---------------------------------------------------------------------------
# Local imports (adapters + utils)
# ---------------------------------------------------------------------------
from adapters.twitter_adapter import TwitterAdapter
from adapters.linkedin_adapter import LinkedInAdapter
from adapters.wechat_adapter import WeChatAdapter
from adapters.xiaohongshu_adapter import XiaohongshuAdapter
from utils.config_loader import ConfigLoader
from utils.content_adapter import ContentAdapter
from utils.image_handler import ImageHandler
from utils.logger import Logger
log = Logger(__name__)
# ---------------------------------------------------------------------------
# Platform registry
# ---------------------------------------------------------------------------
PLATFORM_REGISTRY: dict[str, type] = {
"twitter": TwitterAdapter,
"linkedin": LinkedInAdapter,
"wechat": WeChatAdapter,
"xiaohongshu": XiaohongshuAdapter,
}
PLATFORM_ALIASES: dict[str, str] = {
"x": "twitter",
"xhs": "xiaohongshu",
"wechat-oa": "wechat",
}
def resolve_platforms(platforms_arg: str) -> list[str]:
"""Expand 'all' and resolve aliases."""
if platforms_arg.lower() == "all":
return list(PLATFORM_REGISTRY.keys())
result = []
for p in platforms_arg.split(","):
p = p.strip().lower()
p = PLATFORM_ALIASES.get(p, p)
if p in PLATFORM_REGISTRY:
result.append(p)
else:
log.warning(f"Unknown platform '{p}', skipping.")
return result
def build_adapter(platform: str, config: dict):
"""Instantiate an adapter, returning None if credentials are missing."""
cls = PLATFORM_REGISTRY[platform]
try:
return cls(config.get(platform, {}))
except ValueError as exc:
log.warning(f"[{platform}] Skipping – {exc}")
return None
# ---------------------------------------------------------------------------
# Sub-commands
# ---------------------------------------------------------------------------
def cmd_publish(args: argparse.Namespace) -> int:
config = ConfigLoader.load()
content_adapter = ContentAdapter()
image_handler = ImageHandler()
# Load raw content
if args.file:
path = Path(args.file)
if not path.exists():
print(f"❌ File not found: {args.file}", file=sys.stderr)
return 1
raw_content = path.read_text(encoding="utf-8")
else:
raw_content = args.content or ""
if not raw_content.strip():
print("❌ No content provided. Use --content or --file.", file=sys.stderr)
return 1
# Process images
processed_images: list[str] = []
if args.images:
for img_path in args.images:
processed = image_handler.process_image(img_path)
if processed:
processed_images.append(processed)
else:
log.warning(f"Image skipped: {img_path}")
platforms = resolve_platforms(args.platforms)
if not platforms:
print("❌ No valid platforms specified.", file=sys.stderr)
return 1
dry_run = args.dry_run or config.get("settings", {}).get("dry_run", False)
results: dict[str, dict] = {}
exit_code = 0
for platform in platforms:
print(f"\n📤 [{platform.upper()}] Publishing…")
adapter = build_adapter(platform, config)
if adapter is None:
results[platform] = {"success": False, "error": "Missing credentials"}
exit_code = 1
continue
# Adapt content for this platform
adapted = content_adapter.adapt(
raw_content,
platform,
as_thread=(args.thread and platform == "twitter"),
)
if dry_run:
print(f" [DRY RUN] Would publish to {platform}:")
if isinstance(adapted, list):
for i, t in enumerate(adapted, 1):
print(f" [{i}] {t}")
elif isinstance(adapted, dict):
print(f" {json.dumps(adapted, ensure_ascii=False, indent=4)}")
else:
print(f" {adapted}")
results[platform] = {"success": True, "dry_run": True}
continue
try:
result = adapter.publish(adapted, processed_images or None)
results[platform] = result
if result.get("success"):
url = result.get("url") or result.get("media_id") or "✓"
print(f" ✅ Success → {url}")
else:
print(f" ❌ Failed: {result.get('error', 'Unknown error')}")
exit_code = 1
except Exception as exc:
results[platform] = {"success": False, "error": str(exc)}
print(f" ❌ Exception: {exc}")
exit_code = 1
# Summary
print("\n" + "─" * 50)
success = sum(1 for r in results.values() if r.get("success"))
total = len(results)
print(f"📊 Results: {success}/{total} platforms succeeded")
if args.json:
print(json.dumps(results, ensure_ascii=False, indent=2))
return exit_code
def cmd_validate(args: argparse.Namespace) -> int:
config = ConfigLoader.load()
platforms = resolve_platforms(args.platforms if hasattr(args, "platforms") else "all")
exit_code = 0
print("🔑 Validating credentials…\n")
for platform in platforms:
adapter = build_adapter(platform, config)
if adapter is None:
print(f" {platform:<15} ⚠️ No credentials configured")
continue
try:
ok = adapter.validate()
status = "✅ Valid" if ok else "❌ Invalid"
print(f" {platform:<15} {status}")
if not ok:
exit_code = 1
except Exception as exc:
print(f" {platform:<15} ❌ Error: {exc}")
exit_code = 1
return exit_code
def cmd_list_platforms(_args: argparse.Namespace) -> int:
config = ConfigLoader.load()
print("\n📋 Supported Platforms\n")
print(f" {'Name':<15} {'Auth':<15} {'Configured':<12} {'Features'}")
print(" " + "─" * 70)
for key, cls in PLATFORM_REGISTRY.items():
platform_cfg = config.get(key, {})
configured = "✅ Yes" if platform_cfg else "❌ No"
features = ", ".join(cls.FEATURES)
print(f" {cls.DISPLAY_NAME:<15} {cls.AUTH_METHOD:<15} {configured:<12} {features}")
print()
return 0
# ---------------------------------------------------------------------------
# Argument parser
# ---------------------------------------------------------------------------
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="main.py",
description="Multi-Platform Social Media Publisher",
)
sub = parser.add_subparsers(dest="command")
# publish
p_pub = sub.add_parser("publish", help="Publish content to social platforms")
p_pub.add_argument("--content", "-c", help="Content string to publish")
p_pub.add_argument("--file", "-f", help="Path to a Markdown/text file to publish")
p_pub.add_argument(
"--platforms",
"-p",
default="all",
help="Comma-separated platforms or 'all' (default: all)",
)
p_pub.add_argument(
"--images",
nargs="+",
metavar="IMAGE",
help="Image paths to attach",
)
p_pub.add_argument(
"--thread",
action="store_true",
help="Publish Twitter content as a thread",
)
p_pub.add_argument(
"--dry-run",
action="store_true",
help="Preview adapted content without actually publishing",
)
p_pub.add_argument(
"--json",
action="store_true",
help="Output results as JSON",
)
# validate
p_val = sub.add_parser("validate", help="Validate credentials for all platforms")
p_val.add_argument("--platforms", "-p", default="all")
# list-platforms
sub.add_parser("list-platforms", help="List supported platforms and their status")
return parser
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.command == "publish":
return cmd_publish(args)
if args.command == "validate":
return cmd_validate(args)
if args.command == "list-platforms":
return cmd_list_platforms(args)
parser.print_help()
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:requirements.txt
requests>=2.28.0
tweepy>=4.14.0
Pillow>=9.0.0
FILE:adapters/__init__.py
"""Platform adapters for multi-platform-publisher."""
from .twitter_adapter import TwitterAdapter
from .linkedin_adapter import LinkedInAdapter
from .wechat_adapter import WeChatAdapter
from .xiaohongshu_adapter import XiaohongshuAdapter
__all__ = [
"TwitterAdapter",
"LinkedInAdapter",
"WeChatAdapter",
"XiaohongshuAdapter",
]
FILE:adapters/base_adapter.py
"""
Base adapter interface for social media platforms.
All platform adapters must inherit from ``BaseAdapter`` and implement
the abstract methods defined here.
"""
from abc import ABC, abstractmethod
from typing import Any
class BaseAdapter(ABC):
"""Abstract base class for every social-media platform adapter."""
# Subclasses MUST override these class-level attributes.
DISPLAY_NAME: str = "Unknown"
AUTH_METHOD: str = "Unknown"
FEATURES: list[str] = []
MAX_TEXT_LENGTH: int = 0
MAX_IMAGES: int = 0
def __init__(self, config: dict):
self.config = config
# ------------------------------------------------------------------
# Abstract interface
# ------------------------------------------------------------------
@abstractmethod
def publish(self, content: Any, images: list[str] | None = None) -> dict:
"""Publish *content* (already adapted) to the platform.
Parameters
----------
content : Any
Platform-specific content payload (string, list of strings for
threads, HTML string, etc.).
images : list[str] | None
Paths to processed image files.
Returns
-------
dict
``{"success": bool, "message": str, "url": str | None, "error": str | None}``
"""
@abstractmethod
def validate(self) -> bool:
"""Return ``True`` if the stored credentials are valid."""
@abstractmethod
def upload_image(self, image_path: str) -> str | None:
"""Upload an image and return a platform-specific media identifier
(media_id, asset URN, media_id on WeChat, etc.). Return ``None``
on failure."""
# ------------------------------------------------------------------
# Helpers available to all adapters
# ------------------------------------------------------------------
@staticmethod
def _env(key: str, default: str = "") -> str:
"""Read an environment variable with a fallback."""
return os.environ.get(key, default)
import os # noqa: E402 – placed after class body for readability
FILE:adapters/twitter_adapter.py
"""
Twitter / X Adapter
===================
Publishes tweets and threads via the X API v2 using OAuth 1.0a
authentication (User Context).
Required environment variables
------------------------------
- ``TWITTER_API_KEY`` – Consumer / API key
- ``TWITTER_API_SECRET`` – Consumer / API secret
- ``TWITTER_ACCESS_TOKEN`` – User access token
- ``TWITTER_ACCESS_TOKEN_SECRET`` – User access-token secret
References
----------
- X API v2 Tweets endpoint: https://developer.x.com/en/docs/x-api/tweets/manage-tweets/api-reference/post-tweets
- Media Upload v1.1: https://developer.x.com/en/docs/x-api/media/upload-media/api-reference/post-media-upload
"""
from __future__ import annotations
import os
import time
import json
import hashlib
import hmac
import base64
import urllib.parse
from pathlib import Path
from typing import Any
import requests
from .base_adapter import BaseAdapter
class TwitterAdapter(BaseAdapter):
"""Adapter for X / Twitter using OAuth 1.0a."""
DISPLAY_NAME = "X / Twitter"
AUTH_METHOD = "OAuth 1.0a"
FEATURES = ["tweet", "thread", "image_upload"]
MAX_TEXT_LENGTH = 280
MAX_IMAGES = 4
# API endpoints
TWEET_URL = "https://api.x.com/2/tweets"
MEDIA_UPLOAD_URL = "https://upload.twitter.com/1.1/media/upload.json"
VERIFY_URL = "https://api.x.com/2/users/me"
def __init__(self, config: dict | None = None):
super().__init__(config or {})
self.api_key = self.config.get("api_key") or os.environ.get("TWITTER_API_KEY", "")
self.api_secret = self.config.get("api_secret") or os.environ.get("TWITTER_API_SECRET", "")
self.access_token = self.config.get("access_token") or os.environ.get("TWITTER_ACCESS_TOKEN", "")
self.access_token_secret = self.config.get("access_token_secret") or os.environ.get(
"TWITTER_ACCESS_TOKEN_SECRET", ""
)
if not all([self.api_key, self.api_secret, self.access_token, self.access_token_secret]):
raise ValueError(
"Twitter credentials incomplete. "
"Set TWITTER_API_KEY, TWITTER_API_SECRET, TWITTER_ACCESS_TOKEN, "
"and TWITTER_ACCESS_TOKEN_SECRET."
)
# ------------------------------------------------------------------
# OAuth 1.0a signing
# ------------------------------------------------------------------
def _oauth_signature(
self,
method: str,
url: str,
params: dict,
) -> str:
"""Generate an OAuth 1.0a signature."""
sorted_params = "&".join(
f"{urllib.parse.quote(k, safe='')}={urllib.parse.quote(str(v), safe='')}"
for k, v in sorted(params.items())
)
base_string = (
f"{method.upper()}&"
f"{urllib.parse.quote(url, safe='')}&"
f"{urllib.parse.quote(sorted_params, safe='')}"
)
signing_key = (
f"{urllib.parse.quote(self.api_secret, safe='')}&"
f"{urllib.parse.quote(self.access_token_secret, safe='')}"
)
signature = hmac.new(
signing_key.encode("utf-8"),
base_string.encode("utf-8"),
hashlib.sha1,
).digest()
return base64.b64encode(signature).decode("utf-8")
def _oauth_header(self, method: str, url: str, extra_params: dict | None = None) -> str:
"""Build the ``Authorization: OAuth …`` header value."""
oauth_params = {
"oauth_consumer_key": self.api_key,
"oauth_nonce": base64.b64encode(os.urandom(32)).decode("utf-8").rstrip("="),
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": str(int(time.time())),
"oauth_token": self.access_token,
"oauth_version": "1.0",
}
all_params = {**oauth_params, **(extra_params or {})}
oauth_params["oauth_signature"] = self._oauth_signature(method, url, all_params)
header_parts = ", ".join(
f'{urllib.parse.quote(k, safe="")}="{urllib.parse.quote(v, safe="")}"'
for k, v in sorted(oauth_params.items())
)
return f"OAuth {header_parts}"
def _request(
self,
method: str,
url: str,
json_body: dict | None = None,
data: dict | None = None,
files: dict | None = None,
) -> requests.Response:
"""Send an OAuth 1.0a-signed request."""
extra = {}
if data:
extra = dict(data)
headers = {"Authorization": self._oauth_header(method, url, extra)}
kwargs: dict[str, Any] = {"headers": headers, "timeout": 60}
if json_body is not None:
headers["Content-Type"] = "application/json"
kwargs["data"] = json.dumps(json_body)
elif data is not None:
kwargs["data"] = data
if files is not None:
kwargs["files"] = files
# Remove Content-Type so requests can set multipart boundary
headers.pop("Content-Type", None)
return requests.request(method, url, **kwargs)
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
def publish(self, content: Any, images: list[str] | None = None) -> dict:
"""Publish a tweet or thread.
*content* may be a ``str`` (single tweet) or ``list[str]`` (thread).
"""
media_ids = []
if images:
for img in images[: self.MAX_IMAGES]:
mid = self.upload_image(img)
if mid:
media_ids.append(mid)
if isinstance(content, list):
return self._publish_thread(content, media_ids)
return self._publish_single(content, media_ids)
def validate(self) -> bool:
resp = self._request("GET", self.VERIFY_URL)
return resp.status_code == 200
def upload_image(self, image_path: str) -> str | None:
path = Path(image_path)
if not path.exists():
return None
with open(path, "rb") as f:
resp = self._request(
"POST",
self.MEDIA_UPLOAD_URL,
files={"media": (path.name, f, "application/octet-stream")},
)
if resp.status_code in (200, 201):
return resp.json().get("media_id_string")
return None
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _publish_single(self, text: str, media_ids: list[str]) -> dict:
payload: dict[str, Any] = {"text": text}
if media_ids:
payload["media"] = {"media_ids": media_ids}
resp = self._request("POST", self.TWEET_URL, json_body=payload)
if resp.status_code in (200, 201):
data = resp.json().get("data", {})
tweet_id = data.get("id", "")
return {
"success": True,
"message": "Tweet published successfully.",
"url": f"https://x.com/i/status/{tweet_id}",
"tweet_id": tweet_id,
}
return {
"success": False,
"error": f"Twitter API error {resp.status_code}: {resp.text}",
}
def _publish_thread(self, tweets: list[str], media_ids: list[str]) -> dict:
"""Post a thread by chaining ``reply_to`` tweet IDs."""
published: list[dict] = []
previous_id: str | None = None
for idx, text in enumerate(tweets):
payload: dict[str, Any] = {"text": text}
# Attach images only to the first tweet of the thread
if idx == 0 and media_ids:
payload["media"] = {"media_ids": media_ids}
if previous_id:
payload["reply"] = {"in_reply_to_tweet_id": previous_id}
resp = self._request("POST", self.TWEET_URL, json_body=payload)
if resp.status_code not in (200, 201):
return {
"success": False,
"error": (
f"Thread failed at tweet #{idx + 1}: "
f"API {resp.status_code} – {resp.text}"
),
"published_tweets": published,
}
data = resp.json().get("data", {})
tweet_id = data.get("id", "")
previous_id = tweet_id
published.append({"index": idx, "tweet_id": tweet_id})
first_id = published[0]["tweet_id"] if published else ""
return {
"success": True,
"message": f"Thread published ({len(published)} tweets).",
"url": f"https://x.com/i/status/{first_id}",
"tweets": published,
}
FILE:adapters/linkedin_adapter.py
"""
LinkedIn Adapter
================
Publishes posts and articles to LinkedIn via the Community Management API
using OAuth 2.0 Bearer tokens.
Required environment variables
------------------------------
- ``LINKEDIN_ACCESS_TOKEN`` – OAuth 2.0 access token with ``w_member_social`` scope
- ``LINKEDIN_PERSON_URN`` – (Optional) Author URN, e.g. ``urn:li:person:abc123``.
If omitted the adapter fetches it via ``/v2/userinfo``.
References
----------
- Posts API: https://learn.microsoft.com/en-us/linkedin/marketing/community-management/shares/posts-api
- Images API: https://learn.microsoft.com/en-us/linkedin/marketing/community-management/shares/images-api
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
import requests
from .base_adapter import BaseAdapter
class LinkedInAdapter(BaseAdapter):
"""Adapter for LinkedIn using OAuth 2.0."""
DISPLAY_NAME = "LinkedIn"
AUTH_METHOD = "OAuth 2.0"
FEATURES = ["article", "post", "image_upload"]
MAX_TEXT_LENGTH = 3000
MAX_IMAGES = 9
BASE_URL = "https://api.linkedin.com"
POSTS_URL = f"{BASE_URL}/rest/posts"
IMAGES_URL = f"{BASE_URL}/rest/images"
USERINFO_URL = f"{BASE_URL}/v2/userinfo"
def __init__(self, config: dict | None = None):
super().__init__(config or {})
self.access_token = self.config.get("access_token") or os.environ.get(
"LINKEDIN_ACCESS_TOKEN", ""
)
self.person_urn = self.config.get("person_urn") or os.environ.get(
"LINKEDIN_PERSON_URN", ""
)
if not self.access_token:
raise ValueError(
"LinkedIn credentials incomplete. Set LINKEDIN_ACCESS_TOKEN."
)
# ------------------------------------------------------------------
# HTTP helpers
# ------------------------------------------------------------------
def _headers(self, extra: dict | None = None) -> dict:
h = {
"Authorization": f"Bearer {self.access_token}",
"X-Restli-Protocol-Version": "2.0.0",
"LinkedIn-Version": "202402",
"Content-Type": "application/json",
}
if extra:
h.update(extra)
return h
def _get_person_urn(self) -> str:
"""Fetch the authenticated user's person URN if not configured."""
if self.person_urn:
return self.person_urn
resp = requests.get(self.USERINFO_URL, headers=self._headers(), timeout=30)
resp.raise_for_status()
sub = resp.json().get("sub", "")
self.person_urn = f"urn:li:person:{sub}"
return self.person_urn
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
def publish(self, content: Any, images: list[str] | None = None) -> dict:
"""Publish a post (or article) to LinkedIn.
*content* is expected to be a ``str`` – the adapted post body.
"""
author = self._get_person_urn()
# Upload images if provided
image_urns: list[str] = []
if images:
for img in images[: self.MAX_IMAGES]:
urn = self.upload_image(img)
if urn:
image_urns.append(urn)
payload: dict[str, Any] = {
"author": author,
"commentary": content[: self.MAX_TEXT_LENGTH],
"visibility": "PUBLIC",
"distribution": {
"feedDistribution": "MAIN_FEED",
"targetEntities": [],
"thirdPartyDistributionChannels": [],
},
"lifecycleState": "PUBLISHED",
}
# Attach images
if image_urns:
payload["content"] = {
"multiImage": {
"images": [{"id": urn, "altText": ""} for urn in image_urns],
}
}
resp = requests.post(
self.POSTS_URL,
headers=self._headers(),
data=json.dumps(payload),
timeout=60,
)
if resp.status_code in (200, 201):
post_id = resp.headers.get("x-restli-id", "")
return {
"success": True,
"message": "LinkedIn post published successfully.",
"url": f"https://www.linkedin.com/feed/update/{post_id}/" if post_id else None,
"post_id": post_id,
}
return {
"success": False,
"error": f"LinkedIn API error {resp.status_code}: {resp.text}",
}
def validate(self) -> bool:
resp = requests.get(self.USERINFO_URL, headers=self._headers(), timeout=30)
return resp.status_code == 200
def upload_image(self, image_path: str) -> str | None:
"""Upload an image to LinkedIn and return the image URN.
The flow is:
1. ``POST /rest/images?action=initializeUpload`` → get upload URL + image URN.
2. ``PUT <uploadUrl>`` with binary data.
"""
path = Path(image_path)
if not path.exists():
return None
author = self._get_person_urn()
# Step 1 – initialise upload
init_payload = {
"initializeUploadRequest": {
"owner": author,
}
}
resp = requests.post(
f"{self.IMAGES_URL}?action=initializeUpload",
headers=self._headers(),
data=json.dumps(init_payload),
timeout=30,
)
if resp.status_code not in (200, 201):
return None
data = resp.json().get("value", {})
upload_url = data.get("uploadUrl", "")
image_urn = data.get("image", "")
if not upload_url or not image_urn:
return None
# Step 2 – upload binary
with open(path, "rb") as f:
put_resp = requests.put(
upload_url,
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/octet-stream",
},
data=f,
timeout=120,
)
if put_resp.status_code in (200, 201):
return image_urn
return None
FILE:adapters/wechat_adapter.py
"""
WeChat Official Account Adapter (微信公众号)
=============================================
Creates article drafts via the WeChat Official Account Platform API.
Required environment variables
------------------------------
- ``WECHAT_APPID`` – Official Account App ID
- ``WECHAT_APPSECRET`` – Official Account App Secret
References
----------
- Access Token: https://developers.weixin.qq.com/doc/offiaccount/Basic_Information/Get_access_token.html
- Draft API: https://developers.weixin.qq.com/doc/offiaccount/Draft_Box/Add_draft.html
- Media Upload: https://developers.weixin.qq.com/doc/offiaccount/Asset_Management/New_temporary_material.html
"""
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any
import requests
from .base_adapter import BaseAdapter
class WeChatAdapter(BaseAdapter):
"""Adapter for WeChat Official Account (微信公众号)."""
DISPLAY_NAME = "WeChat Official Account (微信公众号)"
AUTH_METHOD = "API Token"
FEATURES = ["draft_creation", "image_upload", "html_article"]
MAX_IMAGES = 10
BASE_URL = "https://api.weixin.qq.com/cgi-bin"
def __init__(self, config: dict | None = None):
super().__init__(config or {})
self.appid = self.config.get("appid") or os.environ.get("WECHAT_APPID", "")
self.appsecret = self.config.get("appsecret") or os.environ.get("WECHAT_APPSECRET", "")
if not self.appid or not self.appsecret:
raise ValueError(
"WeChat credentials incomplete. Set WECHAT_APPID and WECHAT_APPSECRET."
)
self._access_token: str = ""
self._token_expires: float = 0.0
# ------------------------------------------------------------------
# Access-token management
# ------------------------------------------------------------------
def _get_access_token(self) -> str:
"""Obtain or refresh the access token."""
if self._access_token and time.time() < self._token_expires:
return self._access_token
resp = requests.get(
f"{self.BASE_URL}/token",
params={
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.appsecret,
},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
if "access_token" not in data:
raise RuntimeError(
f"WeChat token error: {data.get('errcode')} – {data.get('errmsg')}"
)
self._access_token = data["access_token"]
# Token is valid for 7200 s; refresh 5 min early.
self._token_expires = time.time() + data.get("expires_in", 7200) - 300
return self._access_token
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
def publish(self, content: Any, images: list[str] | None = None) -> dict:
"""Create a draft article on the WeChat Official Account.
*content* is expected to be a ``dict`` with keys:
- ``title`` – article title
- ``content`` – HTML body
- ``digest`` – (optional) short summary
- ``author`` – (optional) author name
If *content* is a plain ``str``, it is wrapped automatically.
"""
token = self._get_access_token()
# Normalise content
if isinstance(content, str):
content = {"title": "Untitled", "content": content}
# Upload a cover image if provided
thumb_media_id = ""
if images:
thumb_media_id = self.upload_image(images[0]) or ""
article = {
"title": content.get("title", "Untitled"),
"author": content.get("author", ""),
"digest": content.get("digest", ""),
"content": content.get("content", ""),
"content_source_url": content.get("source_url", ""),
"thumb_media_id": thumb_media_id,
"need_open_comment": 0,
"only_fans_can_comment": 0,
}
payload = {"articles": [article]}
resp = requests.post(
f"{self.BASE_URL}/draft/add",
params={"access_token": token},
data=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
headers={"Content-Type": "application/json; charset=utf-8"},
timeout=60,
)
data = resp.json()
if data.get("errcode", 0) == 0:
media_id = data.get("media_id", "")
return {
"success": True,
"message": "WeChat draft created successfully.",
"media_id": media_id,
"url": None, # Drafts have no public URL until published
}
return {
"success": False,
"error": f"WeChat API error {data.get('errcode')}: {data.get('errmsg')}",
}
def validate(self) -> bool:
try:
self._get_access_token()
return True
except Exception:
return False
def upload_image(self, image_path: str) -> str | None:
"""Upload a temporary image material and return its ``media_id``.
The material is valid for 3 days on WeChat's servers.
"""
path = Path(image_path)
if not path.exists():
return None
token = self._get_access_token()
with open(path, "rb") as f:
resp = requests.post(
f"{self.BASE_URL}/media/upload",
params={"access_token": token, "type": "image"},
files={"media": (path.name, f, "image/jpeg")},
timeout=120,
)
data = resp.json()
if "media_id" in data:
return data["media_id"]
return None
# ------------------------------------------------------------------
# Extended helpers
# ------------------------------------------------------------------
def upload_permanent_image(self, image_path: str) -> str | None:
"""Upload a *permanent* image material (for article body ``<img>`` tags).
Returns the image URL on WeChat CDN.
"""
path = Path(image_path)
if not path.exists():
return None
token = self._get_access_token()
with open(path, "rb") as f:
resp = requests.post(
f"{self.BASE_URL}/media/uploadimg",
params={"access_token": token},
files={"media": (path.name, f, "image/jpeg")},
timeout=120,
)
data = resp.json()
return data.get("url")
FILE:adapters/xiaohongshu_adapter.py
"""
Xiaohongshu (小红书) Adapter
=============================
Publishes notes to Xiaohongshu via the MCP (Model Context Protocol)
bridge or direct web API.
Required environment variables
------------------------------
- ``XHS_COOKIE`` – Browser cookie string for authentication
- ``XHS_MCP_ENDPOINT`` – (Optional) MCP server endpoint for Xiaohongshu
Defaults to ``http://localhost:3001``
Notes
-----
Xiaohongshu does not offer an official public API. This adapter
supports two strategies:
1. **MCP mode** (preferred) – delegates to a running MCP server that
wraps Xiaohongshu's internal endpoints. The MCP server handles
signing, anti-bot headers, and cookie refresh.
2. **Direct mode** – calls Xiaohongshu's web API directly using the
cookie. This is more fragile and may break when XHS updates its
anti-bot measures.
The adapter tries MCP first and falls back to direct mode.
"""
from __future__ import annotations
import json
import os
import time
import hashlib
from pathlib import Path
from typing import Any
import requests
from .base_adapter import BaseAdapter
class XiaohongshuAdapter(BaseAdapter):
"""Adapter for Xiaohongshu (小红书)."""
DISPLAY_NAME = "Xiaohongshu (小红书)"
AUTH_METHOD = "MCP / Cookie"
FEATURES = ["note", "image_upload"]
MAX_TEXT_LENGTH = 1000
MAX_IMAGES = 18
# XHS web-API base
WEB_BASE = "https://edith.xiaohongshu.com"
# Default MCP endpoint
DEFAULT_MCP_ENDPOINT = "http://localhost:3001"
def __init__(self, config: dict | None = None):
super().__init__(config or {})
self.cookie = self.config.get("cookie") or os.environ.get("XHS_COOKIE", "")
self.mcp_endpoint = (
self.config.get("mcp_endpoint")
or os.environ.get("XHS_MCP_ENDPOINT", self.DEFAULT_MCP_ENDPOINT)
)
if not self.cookie:
raise ValueError(
"Xiaohongshu credentials incomplete. Set XHS_COOKIE."
)
self._use_mcp: bool | None = None # determined lazily
# ------------------------------------------------------------------
# Strategy detection
# ------------------------------------------------------------------
def _mcp_available(self) -> bool:
"""Check whether the MCP server is reachable."""
if self._use_mcp is not None:
return self._use_mcp
try:
resp = requests.get(f"{self.mcp_endpoint}/health", timeout=5)
self._use_mcp = resp.status_code == 200
except Exception:
self._use_mcp = False
return self._use_mcp
# ------------------------------------------------------------------
# MCP helpers
# ------------------------------------------------------------------
def _mcp_call(self, tool: str, arguments: dict) -> dict:
"""Invoke a tool on the MCP server."""
payload = {
"jsonrpc": "2.0",
"id": int(time.time() * 1000),
"method": "tools/call",
"params": {
"name": tool,
"arguments": arguments,
},
}
resp = requests.post(
self.mcp_endpoint,
json=payload,
headers={"Content-Type": "application/json"},
timeout=60,
)
resp.raise_for_status()
return resp.json().get("result", {})
# ------------------------------------------------------------------
# Direct-mode helpers
# ------------------------------------------------------------------
def _web_headers(self) -> dict:
"""Build headers that mimic the XHS web client."""
return {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36"
),
"Cookie": self.cookie,
"Origin": "https://www.xiaohongshu.com",
"Referer": "https://www.xiaohongshu.com/",
"Content-Type": "application/json;charset=UTF-8",
}
def _generate_xhs_sign(self, api_path: str, payload: str = "") -> dict:
"""Generate a minimal X-S / X-T signing pair.
.. warning::
Xiaohongshu's signing algorithm changes frequently. In
production, prefer the MCP approach which keeps the signing
logic up to date.
"""
timestamp = str(int(time.time() * 1000))
raw = f"{api_path}{payload}{timestamp}"
x_s = hashlib.md5(raw.encode()).hexdigest()
return {"X-S": x_s, "X-T": timestamp}
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
def publish(self, content: Any, images: list[str] | None = None) -> dict:
"""Publish a note to Xiaohongshu.
*content* is expected to be a ``dict`` with keys:
- ``title`` – note title
- ``desc`` – note body text
If *content* is a plain ``str``, it is wrapped automatically.
"""
if isinstance(content, str):
# Try to split first line as title
lines = content.strip().split("\n", 1)
title = lines[0].strip()[:20]
desc = content
content = {"title": title, "desc": desc}
# Upload images
image_ids: list[str] = []
if images:
for img in images[: self.MAX_IMAGES]:
iid = self.upload_image(img)
if iid:
image_ids.append(iid)
if self._mcp_available():
return self._publish_via_mcp(content, image_ids)
return self._publish_direct(content, image_ids)
def validate(self) -> bool:
if self._mcp_available():
try:
result = self._mcp_call("xhs_get_user_info", {})
return result.get("success", False)
except Exception:
return False
# Direct mode – try fetching user info
try:
resp = requests.get(
f"{self.WEB_BASE}/api/sns/web/v1/user/selfinfo",
headers=self._web_headers(),
timeout=15,
)
return resp.status_code == 200 and resp.json().get("success", False)
except Exception:
return False
def upload_image(self, image_path: str) -> str | None:
path = Path(image_path)
if not path.exists():
return None
if self._mcp_available():
return self._upload_image_mcp(path)
return self._upload_image_direct(path)
# ------------------------------------------------------------------
# MCP publish / upload
# ------------------------------------------------------------------
def _publish_via_mcp(self, content: dict, image_ids: list[str]) -> dict:
try:
result = self._mcp_call(
"xhs_create_note",
{
"title": content.get("title", ""),
"desc": content.get("desc", ""),
"image_ids": image_ids,
"post_type": "normal",
},
)
if result.get("success"):
note_id = result.get("note_id", "")
return {
"success": True,
"message": "Xiaohongshu note published via MCP.",
"url": f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else None,
"note_id": note_id,
}
return {
"success": False,
"error": result.get("error", "MCP publish failed"),
}
except Exception as exc:
return {"success": False, "error": str(exc)}
def _upload_image_mcp(self, path: Path) -> str | None:
try:
import base64
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
result = self._mcp_call(
"xhs_upload_image",
{"image_base64": b64, "filename": path.name},
)
return result.get("image_id")
except Exception:
return None
# ------------------------------------------------------------------
# Direct publish / upload
# ------------------------------------------------------------------
def _publish_direct(self, content: dict, image_ids: list[str]) -> dict:
api_path = "/api/sns/web/v1/feed"
payload_dict: dict[str, Any] = {
"title": content.get("title", ""),
"desc": content.get("desc", ""),
"note_type": "normal",
"image_info": {"images": [{"file_id": fid} for fid in image_ids]},
"post_time": int(time.time()),
}
payload_str = json.dumps(payload_dict, ensure_ascii=False)
headers = {**self._web_headers(), **self._generate_xhs_sign(api_path, payload_str)}
try:
resp = requests.post(
f"{self.WEB_BASE}{api_path}",
headers=headers,
data=payload_str.encode("utf-8"),
timeout=60,
)
data = resp.json()
if data.get("success"):
note_id = data.get("data", {}).get("note_id", "")
return {
"success": True,
"message": "Xiaohongshu note published (direct mode).",
"url": f"https://www.xiaohongshu.com/explore/{note_id}" if note_id else None,
"note_id": note_id,
}
return {
"success": False,
"error": f"XHS API: {data.get('msg', 'Unknown error')}",
}
except Exception as exc:
return {"success": False, "error": str(exc)}
def _upload_image_direct(self, path: Path) -> str | None:
api_path = "/api/sns/web/v1/upload/image"
headers = {**self._web_headers(), **self._generate_xhs_sign(api_path)}
# Remove JSON content-type for multipart upload
headers.pop("Content-Type", None)
try:
with open(path, "rb") as f:
resp = requests.post(
f"{self.WEB_BASE}{api_path}",
headers=headers,
files={"file": (path.name, f, "image/jpeg")},
timeout=120,
)
data = resp.json()
if data.get("success"):
return data.get("data", {}).get("file_id")
return None
except Exception:
return None
FILE:utils/__init__.py
"""Utility modules for multi-platform-publisher."""
FILE:utils/config_loader.py
"""
Config Loader
=============
Loads platform credentials and settings from multiple sources with the
following precedence (highest → lowest):
1. Environment variables
2. ``~/.openclaw/openclaw.json`` (``skills.entries.multi-platform-publisher``)
3. ``config.json`` in the skill directory
4. Built-in defaults
"""
from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any
class ConfigLoader:
"""Load and merge configuration for multi-platform-publisher."""
SKILL_NAME = "multi-platform-publisher"
# Mapping: config key → environment variable(s)
ENV_MAP: dict[str, dict[str, str]] = {
"twitter": {
"api_key": "TWITTER_API_KEY",
"api_secret": "TWITTER_API_SECRET",
"access_token": "TWITTER_ACCESS_TOKEN",
"access_token_secret": "TWITTER_ACCESS_TOKEN_SECRET",
},
"linkedin": {
"access_token": "LINKEDIN_ACCESS_TOKEN",
"person_urn": "LINKEDIN_PERSON_URN",
},
"wechat": {
"appid": "WECHAT_APPID",
"appsecret": "WECHAT_APPSECRET",
},
"xiaohongshu": {
"cookie": "XHS_COOKIE",
"mcp_endpoint": "XHS_MCP_ENDPOINT",
},
}
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
@classmethod
def load(cls) -> dict[str, Any]:
"""Return the merged configuration dictionary."""
config: dict[str, Any] = {}
# Layer 1 – built-in defaults
config = cls._defaults()
# Layer 2 – skill-local config.json
cls._merge(config, cls._load_local_config())
# Layer 3 – OpenClaw global config
cls._merge(config, cls._load_openclaw_config())
# Layer 4 – environment variables (highest priority)
cls._merge(config, cls._load_env())
return config
# ------------------------------------------------------------------
# Loaders
# ------------------------------------------------------------------
@staticmethod
def _defaults() -> dict[str, Any]:
return {
"twitter": {},
"linkedin": {},
"wechat": {},
"xiaohongshu": {},
"settings": {
"default_platforms": "all",
"content_adaptation": True,
"auto_hashtags": True,
"dry_run": False,
},
}
@classmethod
def _load_env(cls) -> dict[str, Any]:
"""Read credentials from environment variables."""
result: dict[str, Any] = {}
for platform, mapping in cls.ENV_MAP.items():
section: dict[str, str] = {}
for key, env_var in mapping.items():
val = os.environ.get(env_var, "")
if val:
section[key] = val
if section:
result[platform] = section
return result
@classmethod
def _load_openclaw_config(cls) -> dict[str, Any]:
"""Read from ``~/.openclaw/openclaw.json``."""
path = Path.home() / ".openclaw" / "openclaw.json"
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
entry = (
data.get("skills", {})
.get("entries", {})
.get(cls.SKILL_NAME, {})
)
env_section = entry.get("env", {})
config_section = entry.get("config", {})
result: dict[str, Any] = {}
# Map env values back to platform sections
for platform, mapping in cls.ENV_MAP.items():
section: dict[str, str] = {}
for key, env_var in mapping.items():
if env_var in env_section:
section[key] = env_section[env_var]
if section:
result[platform] = section
if config_section:
result["settings"] = config_section
return result
except (json.JSONDecodeError, KeyError):
return {}
@classmethod
def _load_local_config(cls) -> dict[str, Any]:
"""Read from ``config.json`` next to this file."""
path = Path(__file__).resolve().parent.parent / "config.json"
if not path.exists():
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, KeyError):
return {}
# ------------------------------------------------------------------
# Merge helper
# ------------------------------------------------------------------
@staticmethod
def _merge(base: dict, override: dict) -> None:
"""Recursively merge *override* into *base* in place."""
for key, value in override.items():
if (
key in base
and isinstance(base[key], dict)
and isinstance(value, dict)
):
ConfigLoader._merge(base[key], value)
else:
base[key] = value
FILE:utils/content_adapter.py
"""
Content Adapter
===============
Transforms raw article / post content into platform-specific formats.
Each platform has different constraints (character limits, formatting
rules, tone expectations). ``ContentAdapter`` applies the right
transformation based on the target platform key.
"""
from __future__ import annotations
import re
import textwrap
from typing import Any
class ContentAdapter:
"""Adapt a single piece of content for multiple social-media platforms."""
# ------------------------------------------------------------------
# Public entry point
# ------------------------------------------------------------------
def adapt(self, content: str, platform: str, *, as_thread: bool = False) -> Any:
"""Return *content* transformed for *platform*.
Parameters
----------
content : str
Raw Markdown / plain-text content.
platform : str
Target platform key (``twitter``, ``linkedin``, ``wechat``,
``xiaohongshu``).
as_thread : bool
For Twitter – split into a thread instead of truncating.
Returns
-------
Any
Platform-specific payload. May be ``str``, ``list[str]``, or
``dict`` depending on the platform.
"""
method = getattr(self, f"_adapt_{platform}", None)
if method is None:
return content
return method(content, as_thread=as_thread)
# ==================================================================
# Twitter / X
# ==================================================================
def _adapt_twitter(self, content: str, *, as_thread: bool = False) -> str | list[str]:
"""Adapt content for X / Twitter (max 280 chars per tweet)."""
clean = self._strip_markdown(content).strip()
hashtags = self._extract_hashtags(content)
suffix = (" " + " ".join(hashtags)) if hashtags else ""
if as_thread or len(clean + suffix) > 280:
return self._split_thread(clean, hashtags)
return (clean + suffix)[:280]
def _split_thread(self, text: str, hashtags: list[str], max_len: int = 270) -> list[str]:
"""Split *text* into a list of tweets suitable for a thread.
Strategy: split on sentence boundaries, then pack sentences into
tweets without exceeding *max_len* (leaving room for numbering).
"""
sentences = re.split(r"(?<=[.!?。!?])\s+", text)
tweets: list[str] = []
current = ""
for sentence in sentences:
if len(current) + len(sentence) + 1 <= max_len:
current = f"{current} {sentence}".strip() if current else sentence
else:
if current:
tweets.append(current)
# If a single sentence exceeds max_len, hard-wrap it.
if len(sentence) > max_len:
for chunk in textwrap.wrap(sentence, max_len):
tweets.append(chunk)
else:
current = sentence
continue
current = ""
if current:
tweets.append(current)
# Add numbering and hashtags to the last tweet
total = len(tweets)
if total > 1:
tweets = [f"{i + 1}/{total} {t}" for i, t in enumerate(tweets)]
if hashtags:
tag_str = " ".join(hashtags)
last = tweets[-1]
if len(last) + len(tag_str) + 1 <= 280:
tweets[-1] = f"{last} {tag_str}"
else:
tweets.append(tag_str)
return tweets
# ==================================================================
# LinkedIn
# ==================================================================
def _adapt_linkedin(self, content: str, **_: Any) -> str:
"""Adapt content for LinkedIn (professional tone, up to 3 000 chars)."""
clean = self._strip_markdown(content)
# Add line breaks for readability
paragraphs = [p.strip() for p in clean.split("\n\n") if p.strip()]
formatted = "\n\n".join(paragraphs)
hashtags = self._extract_hashtags(content)
if hashtags:
formatted += "\n\n" + " ".join(hashtags)
return formatted[:3000]
# ==================================================================
# WeChat Official Account
# ==================================================================
def _adapt_wechat(self, content: str, **_: Any) -> dict:
"""Adapt content for WeChat Official Account.
Returns a dict with ``title``, ``content`` (HTML), ``digest``,
and ``author``.
"""
title = self._extract_title(content)
html_body = self._markdown_to_html(content)
digest = self._generate_digest(content, max_len=120)
return {
"title": title,
"content": html_body,
"digest": digest,
"author": "",
}
# ==================================================================
# Xiaohongshu (小红书)
# ==================================================================
def _adapt_xiaohongshu(self, content: str, **_: Any) -> dict:
"""Adapt content for Xiaohongshu (casual, emoji-rich, ≤1 000 chars)."""
clean = self._strip_markdown(content)
title = self._extract_title(content)[:20]
# Add emoji flair
desc = self._add_xhs_emoji(clean)
# Add topic tags
tags = self._extract_hashtags(content)
if tags:
tag_line = " ".join(f"#{t.lstrip('#')}" for t in tags)
desc = f"{desc}\n\n{tag_line}"
return {
"title": title,
"desc": desc[:1000],
}
# ==================================================================
# Shared helpers
# ==================================================================
@staticmethod
def _strip_markdown(text: str) -> str:
"""Remove common Markdown syntax, keeping plain text."""
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE) # headings
text = re.sub(r"\*\*(.+?)\*\*", r"\1", text) # bold
text = re.sub(r"\*(.+?)\*", r"\1", text) # italic
text = re.sub(r"`(.+?)`", r"\1", text) # inline code
text = re.sub(r"!\[.*?\]\(.*?\)", "", text) # images
text = re.sub(r"\[(.+?)\]\(.*?\)", r"\1", text) # links → text
text = re.sub(r"^[-*+]\s+", "• ", text, flags=re.MULTILINE) # lists
text = re.sub(r"^>\s+", "", text, flags=re.MULTILINE) # blockquotes
text = re.sub(r"---+", "", text) # horizontal rules
return text.strip()
@staticmethod
def _extract_title(content: str) -> str:
"""Extract the first heading or first line as a title."""
match = re.search(r"^#{1,6}\s+(.+)$", content, re.MULTILINE)
if match:
return match.group(1).strip()
first_line = content.strip().split("\n")[0]
return first_line[:60]
@staticmethod
def _extract_hashtags(content: str) -> list[str]:
"""Extract hashtags from content (``#tag`` patterns)."""
tags = re.findall(r"(?:^|\s)(#\w+)", content)
return list(dict.fromkeys(tags))[:5] # deduplicate, max 5
@staticmethod
def _generate_digest(content: str, max_len: int = 120) -> str:
"""Generate a short digest / summary from the first paragraph."""
clean = re.sub(r"^#{1,6}\s+.+$", "", content, flags=re.MULTILINE).strip()
first_para = clean.split("\n\n")[0] if "\n\n" in clean else clean
first_para = re.sub(r"\s+", " ", first_para).strip()
if len(first_para) > max_len:
return first_para[: max_len - 3] + "..."
return first_para
@staticmethod
def _markdown_to_html(content: str) -> str:
"""Minimal Markdown → HTML conversion for WeChat articles.
For production use, consider a full Markdown parser such as
``markdown`` or ``mistune``.
"""
html = content
# Headings
for level in range(6, 0, -1):
pattern = re.compile(rf"^{'#' * level}\s+(.+)$", re.MULTILINE)
html = pattern.sub(rf"<h{level}>\1</h{level}>", html)
# Bold / italic
html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
# Inline code
html = re.sub(r"`(.+?)`", r"<code>\1</code>", html)
# Links
html = re.sub(r"\[(.+?)\]\((.+?)\)", r'<a href="\2">\1</a>', html)
# Images
html = re.sub(r"!\[(.+?)\]\((.+?)\)", r'<img src="\2" alt="\1" />', html)
# Paragraphs (double newline)
paragraphs = html.split("\n\n")
wrapped = []
for p in paragraphs:
p = p.strip()
if p and not p.startswith("<h") and not p.startswith("<img"):
p = f"<p>{p}</p>"
wrapped.append(p)
html = "\n".join(wrapped)
# Line breaks
html = html.replace("\n", "<br/>\n")
# Wrap in a styled section for WeChat
return (
'<section style="font-size:16px;line-height:1.8;color:#333;">'
f"{html}"
"</section>"
)
@staticmethod
def _add_xhs_emoji(text: str) -> str:
"""Sprinkle emoji into text for Xiaohongshu's casual style."""
emoji_map = {
"推荐": "💯 推荐",
"分享": "📢 分享",
"总结": "📝 总结",
"注意": "⚠️ 注意",
"技巧": "💡 技巧",
"经验": "✨ 经验",
"recommend": "💯 recommend",
"share": "📢 share",
"summary": "📝 summary",
"tips": "💡 tips",
}
for word, replacement in emoji_map.items():
text = text.replace(word, replacement)
return text
FILE:utils/image_handler.py
"""
Image Handler
=============
Validates, resizes, and converts images before uploading to social
media platforms. Each platform has its own size / format constraints;
this module normalises images to a common baseline.
"""
from __future__ import annotations
import os
import shutil
import tempfile
from pathlib import Path
from typing import Any
# Pillow is an optional dependency – degrade gracefully.
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
class ImageHandler:
"""Pre-process images for multi-platform publishing."""
# Sensible defaults that satisfy most platforms.
MAX_WIDTH = 4096
MAX_HEIGHT = 4096
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB
ALLOWED_FORMATS = {"JPEG", "PNG", "GIF", "WEBP"}
def __init__(self, temp_dir: str | None = None):
self.temp_dir = temp_dir or tempfile.mkdtemp(prefix="mpp_images_")
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def process_images(self, paths: list[str]) -> list[str]:
"""Validate and process a list of image paths.
Returns a list of paths to processed images (may be the originals
if no transformation was needed).
"""
processed: list[str] = []
for p in paths:
result = self.process_image(p)
if result:
processed.append(result)
return processed
def process_image(self, image_path: str) -> str | None:
"""Process a single image. Returns the (possibly new) path or
``None`` if the file is invalid."""
path = Path(image_path)
if not path.exists():
return None
if not HAS_PIL:
# Without Pillow we can only do basic checks.
if path.stat().st_size > self.MAX_FILE_SIZE:
return None
return str(path)
try:
img = Image.open(path)
except Exception:
return None
if img.format and img.format.upper() not in self.ALLOWED_FORMATS:
img = img.convert("RGB")
# Resize if necessary
if img.width > self.MAX_WIDTH or img.height > self.MAX_HEIGHT:
img.thumbnail((self.MAX_WIDTH, self.MAX_HEIGHT), Image.LANCZOS)
# Check file size – re-encode with quality reduction if needed
out_path = Path(self.temp_dir) / f"{path.stem}_processed.jpg"
quality = 95
while quality >= 30:
img.save(out_path, format="JPEG", quality=quality, optimize=True)
if out_path.stat().st_size <= self.MAX_FILE_SIZE:
return str(out_path)
quality -= 10
return None
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def get_image_info(image_path: str) -> dict[str, Any] | None:
"""Return basic metadata about an image."""
if not HAS_PIL:
return {"path": image_path, "size": os.path.getsize(image_path)}
try:
img = Image.open(image_path)
return {
"path": image_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size": os.path.getsize(image_path),
}
except Exception:
return None
def cleanup(self) -> None:
"""Remove the temporary directory and all processed images."""
shutil.rmtree(self.temp_dir, ignore_errors=True)
FILE:utils/logger.py
"""
Logger
======
Lightweight logger for multi-platform-publisher. Writes to stderr so
that JSON output on stdout remains clean.
"""
from __future__ import annotations
import sys
from datetime import datetime, timezone
class Logger:
"""Simple levelled logger that writes to stderr."""
LEVELS = {"DEBUG": 0, "INFO": 1, "WARNING": 2, "ERROR": 3}
def __init__(self, name: str, level: str = "INFO"):
self.name = name
self.level = self.LEVELS.get(level.upper(), 1)
def _log(self, level: str, message: str) -> None:
if self.LEVELS.get(level, 0) >= self.level:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
print(f"[{ts}] [{level}] {self.name}: {message}", file=sys.stderr)
def debug(self, msg: str) -> None:
self._log("DEBUG", msg)
def info(self, msg: str) -> None:
self._log("INFO", msg)
def warning(self, msg: str) -> None:
self._log("WARNING", msg)
def error(self, msg: str) -> None:
self._log("ERROR", msg)
SellerSprite Product Research Skill — Input a keyword or category, automatically fetch market data via SellerSprite API and run AI analysis, output a structu...
---
name: sellersprite-product-research
description: "SellerSprite Product Research Skill — Input a keyword or category, automatically fetch market data via SellerSprite API and run AI analysis, output a structured product research report: Blue Ocean Index, competition landscape, entry opportunities, and risk signals. Triggers: product research, sellersprite, amazon product research, blue ocean, keyword research, competitor analysis, market analysis, asin research, amazon fba"
allowed-tools: Bash
metadata:
openclaw:
homepage: https://github.com/mguozhen/sellersprite-product-research
---
# SellerSprite Product Research
> Input a keyword or ASIN, AI automatically calls the SellerSprite API to fetch market data and outputs a structured product research report.
## Prerequisites
Set your SellerSprite API Key (get one at [open.sellersprite.com](https://open.sellersprite.com)):
```bash
export SELLERSPRITE_SECRET_KEY="your-secret-key"
```
Ensure `openclaw` CLI is installed (used for AI analysis):
```bash
which openclaw
```
## Quick Start
```bash
# Basic product research (keyword)
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "wireless earbuds"
# Specify marketplace (default: US)
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "yoga mat" --marketplace UK
# Use a specific AI model
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "phone case" --model claude
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "phone case" --model gemini
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "phone case" --model gpt-5
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "phone case" --model grok
# Use full model ID
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "yoga mat" --model anthropic/claude-opus-4-6
# Analyze a competitor by ASIN
bash ~/.claude/skills/sellersprite-product-research/selection.sh --asin B08N5WRWNW --marketplace US
# Save report to file
bash ~/.claude/skills/sellersprite-product-research/selection.sh --keyword "LED strip" --output report.md
```
## Supported Models
Uses `openclaw` for AI inference. Pass `--model` with a shorthand alias or full model ID:
| Alias | Model |
|---|---|
| `claude` | anthropic/claude-sonnet-4-6 |
| `claude-opus` | anthropic/claude-opus-4-6 |
| `claude-haiku` | anthropic/claude-haiku-4-5 |
| `gemini` | google/gemini-2.5-pro |
| `gemini-flash` | google/gemini-2.5-flash |
| `gpt-4o` | openai/gpt-4o |
| `gpt-5` | openai/gpt-5 |
| `grok` | xai/grok-4 |
| `deepseek` | groq/deepseek-r1-distill-llama-70b |
| `qwen` | groq/qwen/qwen3-32b |
| `mistral` | mistral/mistral-large-latest |
Any full model ID from `openclaw models list --all` is also accepted.
## Sample Output
```
╔══════════════════════════════════════════════════════════════╗
║ SellerSprite Product Research Report ║
║ Keyword: wireless earbuds | Market: US | 2026-03-09 ║
╚══════════════════════════════════════════════════════════════╝
📊 Market Overview
──────────────────────────────────────
Products 1,284
Avg Monthly Units 456/mo
Avg Price $28.50
Avg Ratings 3,210
Blue Ocean Index ████░░░░░░░░░ 3.2 / 10
🔴 Risk Signals
══════════════════════════════════════════════
1. High Competition — TOP10 monthly sales concentration 78%
2. Brand Barrier — Anker/JBL hold 35% market share
...
🟢 Opportunity Windows
══════════════════════════════════════════════
1. [Price Gap] $15-$20 range has few competitors but strong search volume
2. [New Product Bonus] New listings grew 42% in last 90 days
...
🎯 Recommended Entry Strategy
══════════════════════════════════════════════
1. Target $15-18 price range, differentiate on sport/waterproof features...
📌 Top Reference Products
──────────────────────────────────────────────
B0XXXXXXXX 2,340/mo $19.9 4.3★ FBA ⭐ Best Seller
...
```
## How It Works
```
① Input keyword / ASIN / category
↓
② Call SellerSprite API (product research + keyword research)
↓
③ Parse market data (sales, price, competition, trends)
↓
④ Claude AI deep analysis (Blue Ocean scoring + strategy)
↓
⑤ Output structured product research report
```
## Scripts
| File | Description |
|---|---|
| `selection.sh` | Main entry point |
| `fetch.sh` | SellerSprite API data fetching |
| `analyze.sh` | AI analysis and report rendering |
## Marketplace Codes
| Code | Market |
|---|---|
| US | United States |
| UK | United Kingdom |
| DE | Germany |
| JP | Japan |
| CA | Canada |
| FR | France |
| IT | Italy |
| ES | Spain |
| MX | Mexico |
| AU | Australia |
## Notes
- API rate limit: 40 requests/min, max 100 items per request
- Max 2,000 products per query
- Each analysis uses approx 3,000–8,000 tokens ($0.02–$0.05)
- Recommend querying data from the last 1–3 months for freshness
VOC AI — Amazon Review Intelligence. Input an ASIN, fetch real Amazon reviews via Shulex VOC API and run AI analysis. Outputs a structured bilingual report:...
---
name: voc-amazon-reviews
description: "VOC AI — Amazon Review Intelligence. Input an ASIN, fetch real Amazon reviews via Shulex VOC API and run AI analysis. Outputs a structured bilingual report: sentiment breakdown, top pain points, key selling points, and Listing optimization suggestions. Triggers: voc, amazon review analysis, asin analysis, voice of customer, listing optimization, pain points, selling points, review insights, amazon fba, product research"
allowed-tools: Bash
metadata:
openclaw:
homepage: https://github.com/mguozhen/voc-amazon-reviews
---
# VOC AI — Amazon Review Intelligence
> Input an ASIN, fetch real Amazon reviews via Shulex VOC API, and get a structured bilingual insight report powered by AI.
## Quick Setup (30 seconds)
1. **Get your free API key** at [apps.voc.ai/openapi](https://apps.voc.ai/openapi?utm_source=github&utm_medium=readme&utm_campaign=launch_apr)
2. **Create a key** at [API Keys page](https://apps.voc.ai/openapi/api/keys?utm_source=github&utm_medium=readme&utm_campaign=launch_apr)
3. **Set it**:
```bash
export VOC_API_KEY="your-api-key"
```
New accounts include starter credits — enough for multiple analyses.
## Usage
```bash
# Quick analysis — 8 reviews (5 credits)
bash ~/.agents/skills/voc-amazon-reviews/voc.sh B08N5WRWNW
# Deep analysis — 100 reviews (50 credits)
bash ~/.agents/skills/voc-amazon-reviews/voc.sh B08N5WRWNW --limit 100
# Japan marketplace
bash ~/.agents/skills/voc-amazon-reviews/voc.sh B08N5WRWNW --market JP
# Save report to file
bash ~/.agents/skills/voc-amazon-reviews/voc.sh B08N5WRWNW --limit 100 --output report.md
```
### Supported Marketplaces
| Code | Marketplace |
|------|-------------|
| US | amazon.com |
| CA | amazon.ca |
| MX | amazon.com.mx |
| GB | amazon.co.uk |
| DE | amazon.de |
| FR | amazon.fr |
| IT | amazon.it |
| ES | amazon.es |
| JP | amazon.co.jp |
| AU | amazon.com.au |
## Sample Output
```
╔══════════════════════════════════════════════════════════╗
║ VOC AI Analysis Report ║
║ ASIN: B08N5WRWNW | Reviews Analyzed: 100 ║
║ Market: US | Generated: 2026-04-18 ║
╚══════════════════════════════════════════════════════════╝
📊 Sentiment Distribution
Positive ████████████████░░░░ 74%
Neutral ███░░░░░░░░░░░░░░░░░ 16%
Negative ██░░░░░░░░░░░░░░░░░░ 10%
🔴 Top 5 Pain Points
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1. Short battery life (28 mentions)
"Battery drained in 2 days, very disappointed"
🟢 Top 5 Selling Points
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1. Excellent sound quality (52 mentions)
"Amazing bass and crystal clear highs for the price"
💡 Listing Optimization Suggestions
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1. Add battery capacity and playtime hours to title
```
## How It Works
```
① Input ASIN
↓
② Shulex VOC API fetches real-time Amazon reviews
↓
③ Structured review data (rating, body, date, verified, variant)
↓
④ AI deep semantic analysis (sentiment, pain points, selling points)
↓
⑤ Bilingual report (Chinese + English)
```
## Cost Guide
| Reviews | API Credits | Use Case |
|---------|------------|----------|
| 8 (default) | 5 credits | Quick competitor check |
| 50 | 25 credits | Product validation |
| 100 | 50 credits | Deep analysis |
| 200 | 100 credits | Comprehensive audit |
## Scripts
| File | Description |
|---|---|
| `voc.sh` | Main entry point — orchestrates fetch + analyze |
| `fetch.sh` | Shulex VOC API client (submit task → poll → get reviews) |
| `analyze.sh` | AI analysis engine (sentiment, pain points, selling points) |
| `scraper.sh` | Legacy browser-based scraper (deprecated) |
## Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `VOC_API_KEY` | Yes | Shulex VOC API key ([get one free](https://apps.voc.ai/openapi/api/keys?utm_source=github&utm_medium=readme&utm_campaign=launch_apr)) |
## Resources
- [Shulex VOC API Docs](https://apps.voc.ai/openapi?utm_source=github&utm_medium=readme&utm_campaign=launch_apr)
- [API Keys](https://apps.voc.ai/openapi/api/keys?utm_source=github&utm_medium=readme&utm_campaign=launch_apr)
- [Buy Credits](https://apps.voc.ai/openapi/api/billing?utm_source=github&utm_medium=readme&utm_campaign=launch_apr)
FILE:README.md
<p align="center">
<img src="https://cdn-icons-png.flaticon.com/512/1041/1041916.png" width="80" alt="VOC AI">
</p>
<h1 align="center">VOC Amazon Reviews</h1>
<p align="center">
<strong>Analyze any Amazon product's reviews in 5 seconds — real API data, AI-powered insights, 10 marketplaces.</strong>
</p>
<p align="center">
<a href="#quick-start"><img src="https://img.shields.io/badge/setup-30sec-brightgreen?style=flat-square" alt="30s Setup"></a>
<a href="https://openclaw.ai"><img src="https://img.shields.io/badge/OpenClaw-compatible-blue?style=flat-square" alt="OpenClaw"></a>
<a href="https://claude.ai/code"><img src="https://img.shields.io/badge/Claude%20Code-skill-8A2BE2?style=flat-square" alt="Claude Code"></a>
<a href="LICENSE"><img src="https://img.shields.io/badge/license-MIT-green?style=flat-square" alt="MIT License"></a>
<img src="https://img.shields.io/badge/markets-10%20Amazon%20regions-FF9900?style=flat-square&logo=amazon&logoColor=white" alt="10 Marketplaces">
</p>
<p align="center">
<a href="#quick-start">Quick Start</a> •
<a href="#demo">Demo</a> •
<a href="#usage">Usage</a> •
<a href="#how-it-works">How It Works</a> •
<a href="docs/ROADMAP.md">Roadmap</a>
</p>
---
## Demo
<p align="center">
<img src="demo/voc-demo.gif" width="700" alt="VOC AI Demo — ASIN to report in 5 seconds">
</p>
> **Input an ASIN. Get deep bilingual insights in 5 seconds.** Fetches real Amazon reviews via Shulex VOC API, then runs AI semantic analysis — sentiment, pain points, selling points, and listing optimization tips. Not keyword counting. Actual language understanding.
## Features
| Feature | Description |
|---------|-------------|
| **Sentiment Analysis** | Positive / neutral / negative breakdown with percentages |
| **Pain Points** | Top 5 customer complaints with real quotes and mention counts |
| **Selling Points** | Top 5 things buyers love with real quotes and mention counts |
| **Listing Optimization** | Actionable copy suggestions backed by review data |
| **Bilingual Output** | Every insight in both English and Chinese |
| **10 Marketplaces** | US, CA, MX, GB, DE, FR, IT, ES, JP, AU |
| **Zero Dependencies** | Only needs `curl` + `python3` (no browser, no npm) |
| **Free to Start** | 8 reviews = 5 credits. New accounts include starter credits |
## Quick Start
**Step 1** — Get your free API key (30 seconds):
👉 [**apps.voc.ai/openapi**](https://apps.voc.ai/openapi?utm_source=github&utm_medium=readme&utm_campaign=launch_apr)
**Step 2** — Clone and run:
```bash
git clone https://github.com/mguozhen/voc-amazon-reviews.git
cd voc-amazon-reviews
export VOC_API_KEY="your-key"
bash voc.sh B08N5WRWNW
```
That's it. No Docker. No npm install. No config files.
## Usage
```bash
# Quick analysis — 8 reviews (5 credits)
bash voc.sh B08N5WRWNW
# Deep analysis — 100 reviews (50 credits)
bash voc.sh B08N5WRWNW --limit 100
# Japan marketplace
bash voc.sh B08N5WRWNW --market JP
# Save report to file
bash voc.sh B08N5WRWNW --limit 100 --output report.md
```
### Options
| Flag | Default | Description |
|------|---------|-------------|
| `--limit N` | 8 | Number of reviews to fetch |
| `--market CODE` | US | Amazon marketplace (US CA MX GB DE FR IT ES JP AU) |
| `--output FILE` | stdout | Save report to markdown file |
| `--help` | — | Show help |
### Cost Guide
| Reviews | Credits | Use Case |
|---------|---------|----------|
| 8 (default) | 5 | Quick competitor check |
| 50 | 25 | Product validation |
| 100 | 50 | Deep analysis |
| 200 | 100 | Comprehensive audit |
## Sample Output
```
╔══════════════════════════════════════════════════════════════╗
║ VOC AI 分析报告 / VOC AI Analysis Report ║
║ ASIN: B099Z93WD9 | analyzed: 8 reviews ║
║ Market: US | Generated: 2026-04-19 ║
╚══════════════════════════════════════════════════════════════╝
📊 情感分布 / Sentiment Distribution
─────────────────────────────────────────
正面 Positive ████████░░░░░░░░░░░░ 37%
中性 Neutral ██░░░░░░░░░░░░░░░░░░ 13%
负面 Negative ██████████░░░░░░░░░░ 50%
🔴 Top 5 痛点 / Pain Points
═══════════════════════════════════════════════════════════════
1. 充电口故障 / Charging port moisture glitch(2 条提及)
「充电口提示有水分,已知bug,一周都没恢复」
"Moisture in charging port — known glitch, can't charge"
2. 视频卡顿 / Video stalling and weak connection(2 条提及)
「看视频经常卡顿暂停,给小孩看的时候很烦」
"Stalls out, pausing videos, really annoying"
🟢 Top 5 卖点 / Selling Points
═══════════════════════════════════════════════════════════════
1. 性价比高 / Great value for money(3 条提及)
「价格实惠,功能齐全,看电影看书玩游戏都行」
"Budget friendly, entertainment on the go"
2. 便携尺寸 / Perfect portable size(2 条提及)
「尺寸刚好,放包里轻松带上飞机看电影」
"Perfect size, light and easy to fit in my purse"
💡 Listing 优化建议 / Optimization Suggestions
═══════════════════════════════════════════════════════════════
1. Highlight budget-friendly and portability in title
2. Add charging port care instructions in A+ Content
3. Guide users to sideload popular apps
```
## How It Works
```
┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Input ASIN │────▶│ Shulex VOC API │────▶│ AI Analysis │
│ │ │ │ │ │
│ B099Z93WD9 │ │ • Real reviews │ │ • Sentiment │
│ --market US│ │ • 10 markets │ │ • Pain points │
│ --limit 8 │ │ • 5s response │ │ • Sell points │
│ │ │ • No scraping │ │ • Optimization │
└─────────────┘ └──────────────────┘ └────────┬────────┘
│
▼
┌─────────────────┐
│ Bilingual Report │
│ (EN + ZH) │
└─────────────────┘
```
## File Structure
```
voc-amazon-reviews/
├── SKILL.md # Skill definition (Claude/OpenClaw)
├── voc.sh # Main entry point
├── fetch.sh # Shulex VOC API client
├── analyze.sh # AI analysis + report renderer
├── scraper.sh # Legacy browser scraper (deprecated)
├── tests/
│ ├── test_unit.sh # 50 unit tests
│ └── test_regression.sh # 17 regression tests (live API)
├── demo/
│ ├── voc-demo.gif # Demo recording
│ └── demo.sh # Demo script
└── docs/
├── GTM.md # Go-to-market strategy
├── ROADMAP.md # Product roadmap
└── STORY.md # Project narrative
```
## FAQ
<details>
<summary><strong>Where does the review data come from?</strong></summary>
Reviews are fetched via the [Shulex VOC API](https://apps.voc.ai/openapi?utm_source=github&utm_medium=readme&utm_campaign=launch_apr) — a legitimate data provider with proper Amazon data licensing. No scraping. No browser automation. Just a clean API call.
</details>
<details>
<summary><strong>How much does it cost?</strong></summary>
| Component | Cost |
|-----------|------|
| VOC API (8 reviews) | 5 credits |
| VOC API (100 reviews) | 50 credits |
| AI analysis | Depends on your OpenClaw model |
New accounts include starter credits — enough for multiple analyses. [Get your free API key](https://apps.voc.ai/openapi?utm_source=github&utm_medium=readme&utm_campaign=launch_apr).
</details>
<details>
<summary><strong>Is this against Amazon's Terms of Service?</strong></summary>
No. This tool uses the Shulex VOC API, which is a licensed data provider. It does not scrape Amazon directly.
</details>
<details>
<summary><strong>What about API keys and security?</strong></summary>
`VOC_API_KEY` is read from environment variables — never written to disk or printed to stdout.
</details>
## Related
- [VOC AI](https://www.voc.ai) — Full-featured Amazon review analytics platform
- [VOC Open API](https://apps.voc.ai/openapi?utm_source=github&utm_medium=readme&utm_campaign=launch_apr) — Get your free API key
- [Social Reply Bot](https://github.com/mguozhen/social-bot) — AI-powered social media bot
- [Solvea](https://solvea.cx) — AI receptionist for small businesses
## License
MIT
FILE:analyze.sh
#!/usr/bin/env bash
# Amazon 评论分析脚本 - 使用 OpenClaw 默认模型
# Usage: analyze.sh <reviews_json_file> <ASIN> [--output file.md]
set -euo pipefail
REVIEWS_FILE="-"
ASIN="-unknown"
OUTPUT_FILE=""
shift 2 || true
while [[ $# -gt 0 ]]; do
case "$1" in
--output) OUTPUT_FILE="$2"; shift 2 ;;
*) shift ;;
esac
done
if [[ -z "$REVIEWS_FILE" || ! -f "$REVIEWS_FILE" ]]; then
echo "❌ 需要提供评论数据文件" >&2
echo "Usage: analyze.sh <reviews_json_file> <ASIN> [--output file.md]" >&2
exit 1
fi
if ! command -v openclaw &>/dev/null; then
echo "openclaw not found. Please install OpenClaw first." >&2
exit 1
fi
VOC_MODEL=$(openclaw models status --plain 2>/dev/null || echo "unknown")
echo "Analyzing reviews with model: $VOC_MODEL ..." >&2
# 读取评论数据
REVIEWS_JSON=$(cat "$REVIEWS_FILE")
TOTAL=$(echo "$REVIEWS_JSON" | python3 -c "import sys,json; print(len(json.load(sys.stdin)))" 2>/dev/null || echo "0")
TODAY=$(date +%Y-%m-%d)
# 构建分析 Prompt
PROMPT=$(cat <<PROMPT
你是一位专业的亚马逊电商分析师,请对以下评论数据进行深度 VOC(Voice of Customer)分析。
## 分析任务
评论数量:TOTAL 条
ASIN:ASIN
## 评论数据
\`\`\`json
$(echo "$REVIEWS_JSON" | python3 -c "
import sys, json
reviews = json.load(sys.stdin)
# 截取最多150条,避免超出token限制
sample = reviews[:150]
# 精简字段 (compatible with both scraper and API formats)
simplified = [{
'rating': r.get('rating'),
'title': r.get('title',''),
'body': str(r.get('body','') or r.get('content',''))[:500],
'date': r.get('date','') or r.get('reviewDate',''),
'verified': bool(r.get('verified') or r.get('verifiedPurchase', False)),
'variant': r.get('variant',''),
'helpful': r.get('helpful', r.get('helpfulVotes', 0)),
} for r in sample]
print(json.dumps(simplified, ensure_ascii=False))
" 2>/dev/null || echo "$REVIEWS_JSON" | head -c 15000)
\`\`\`
## 输出格式要求
请严格按以下格式输出,中英文双语,不要添加额外说明:
---
SENTIMENT_POSITIVE: [正面评论数量占比,如 74]
SENTIMENT_NEUTRAL: [中性评论数量占比,如 16]
SENTIMENT_NEGATIVE: [负面评论数量占比,如 10]
---
PAIN_POINT_1_ZH: [痛点1中文描述,15字以内]
PAIN_POINT_1_EN: [Pain point 1 in English, under 15 words]
PAIN_POINT_1_COUNT: [提及次数]
PAIN_POINT_1_QUOTE_ZH: [最典型的中文用户原话或翻译,30字以内]
PAIN_POINT_1_QUOTE_EN: [Most representative English user quote, under 30 words]
PAIN_POINT_2_ZH: ...
PAIN_POINT_2_EN: ...
PAIN_POINT_2_COUNT: ...
PAIN_POINT_2_QUOTE_ZH: ...
PAIN_POINT_2_QUOTE_EN: ...
PAIN_POINT_3_ZH: ...
PAIN_POINT_3_EN: ...
PAIN_POINT_3_COUNT: ...
PAIN_POINT_3_QUOTE_ZH: ...
PAIN_POINT_3_QUOTE_EN: ...
PAIN_POINT_4_ZH: ...
PAIN_POINT_4_EN: ...
PAIN_POINT_4_COUNT: ...
PAIN_POINT_4_QUOTE_ZH: ...
PAIN_POINT_4_QUOTE_EN: ...
PAIN_POINT_5_ZH: ...
PAIN_POINT_5_EN: ...
PAIN_POINT_5_COUNT: ...
PAIN_POINT_5_QUOTE_ZH: ...
PAIN_POINT_5_QUOTE_EN: ...
---
SELLING_POINT_1_ZH: [卖点1中文描述,15字以内]
SELLING_POINT_1_EN: [Selling point 1 in English, under 15 words]
SELLING_POINT_1_COUNT: [提及次数]
SELLING_POINT_1_QUOTE_ZH: [最典型的中文用户原话或翻译,30字以内]
SELLING_POINT_1_QUOTE_EN: [Most representative English user quote, under 30 words]
SELLING_POINT_2_ZH: ...
SELLING_POINT_2_EN: ...
SELLING_POINT_2_COUNT: ...
SELLING_POINT_2_QUOTE_ZH: ...
SELLING_POINT_2_QUOTE_EN: ...
SELLING_POINT_3_ZH: ...
SELLING_POINT_3_EN: ...
SELLING_POINT_3_COUNT: ...
SELLING_POINT_3_QUOTE_ZH: ...
SELLING_POINT_3_QUOTE_EN: ...
SELLING_POINT_4_ZH: ...
SELLING_POINT_4_EN: ...
SELLING_POINT_4_COUNT: ...
SELLING_POINT_4_QUOTE_ZH: ...
SELLING_POINT_4_QUOTE_EN: ...
SELLING_POINT_5_ZH: ...
SELLING_POINT_5_EN: ...
SELLING_POINT_5_COUNT: ...
SELLING_POINT_5_QUOTE_ZH: ...
SELLING_POINT_5_QUOTE_EN: ...
---
TIP_1_ZH: [Listing 优化建议1,中文,50字以内]
TIP_1_EN: [Listing optimization tip 1, English, under 50 words]
TIP_2_ZH: ...
TIP_2_EN: ...
TIP_3_ZH: ...
TIP_3_EN: ...
---
SUMMARY_ZH: [整体一句话总结,30字以内]
SUMMARY_EN: [One-sentence overall summary in English, under 30 words]
PROMPT
)
# 调用 OpenClaw 默认模型
SESSION_ID="voc-$(date +%s)"
RESPONSE=$(openclaw agent --local --session-id "$SESSION_ID" -m "$PROMPT" --json 2>/dev/null)
ANALYSIS=$(echo "$RESPONSE" | python3 -c "
import sys, json
r = json.load(sys.stdin)
payloads = r.get('payloads', [])
if payloads:
print(payloads[0].get('text', ''))
else:
print('ERROR: empty response')
" 2>/dev/null)
if [[ -z "$ANALYSIS" ]] || echo "$ANALYSIS" | grep -q "^ERROR:"; then
echo "❌ OpenClaw 调用失败: $ANALYSIS" >&2
exit 1
fi
# 解析结构化输出,渲染为漂亮报告
REPORT=$(python3 - <<PYEOF
import re, sys
raw = """$ANALYSIS"""
def get(key):
m = re.search(rf'^{key}:\s*(.+)$', raw, re.MULTILINE)
return m.group(1).strip() if m else '—'
def bar(pct):
try:
n = int(pct)
filled = round(n / 5)
return '█' * filled + '░' * (20 - filled)
except:
return '░' * 20
pos = get('SENTIMENT_POSITIVE')
neu = get('SENTIMENT_NEUTRAL')
neg = get('SENTIMENT_NEGATIVE')
report = f"""
╔══════════════════════════════════════════════════════════════╗
║ VOC AI 分析报告 / VOC AI Analysis Report ║
║ ASIN: $ASIN | analyzed: $TOTAL reviews ║
║ Market: amazon.com | Generated: $TODAY ║
╚══════════════════════════════════════════════════════════════╝
📊 情感分布 / Sentiment Distribution
─────────────────────────────────────────
正面 Positive {bar(pos)} {pos}%
中性 Neutral {bar(neu)} {neu}%
负面 Negative {bar(neg)} {neg}%
🔴 Top 5 痛点 / Pain Points
═══════════════════════════════════════════════════════════════
"""
for i in range(1, 6):
zh = get(f'PAIN_POINT_{i}_ZH')
en = get(f'PAIN_POINT_{i}_EN')
cnt = get(f'PAIN_POINT_{i}_COUNT')
qzh = get(f'PAIN_POINT_{i}_QUOTE_ZH')
qen = get(f'PAIN_POINT_{i}_QUOTE_EN')
if zh == '—':
break
report += f"""{i}. {zh} / {en}({cnt} 条提及)
「{qzh}」
"{qen}"
"""
report += """🟢 Top 5 卖点 / Selling Points
═══════════════════════════════════════════════════════════════
"""
for i in range(1, 6):
zh = get(f'SELLING_POINT_{i}_ZH')
en = get(f'SELLING_POINT_{i}_EN')
cnt = get(f'SELLING_POINT_{i}_COUNT')
qzh = get(f'SELLING_POINT_{i}_QUOTE_ZH')
qen = get(f'SELLING_POINT_{i}_QUOTE_EN')
if zh == '—':
break
report += f"""{i}. {zh} / {en}({cnt} 条提及)
「{qzh}」
"{qen}"
"""
report += """💡 Listing 优化建议 / Optimization Suggestions
═══════════════════════════════════════════════════════════════
"""
for i in range(1, 4):
zh = get(f'TIP_{i}_ZH')
en = get(f'TIP_{i}_EN')
if zh == '—':
break
report += f"""{i}. {zh}
{en}
"""
summary_zh = get('SUMMARY_ZH')
summary_en = get('SUMMARY_EN')
report += f"""📌 总结 / Summary
─────────────────────────────────────────
{summary_zh}
{summary_en}
══════════════════════════════════════════════════════════════
由 VOC AI Skill 生成 | Generated by VOC AI Skill
https://github.com/mguozhen/voc-amazon-reviews
══════════════════════════════════════════════════════════════
"""
print(report)
PYEOF
)
echo "$REPORT"
if [[ -n "$OUTPUT_FILE" ]]; then
echo "$REPORT" > "$OUTPUT_FILE"
echo "" >&2
echo "💾 报告已保存到: $OUTPUT_FILE" >&2
fi
FILE:fetch.sh
#!/usr/bin/env bash
# Fetch Amazon reviews via Shulex VOC OpenAPI (Realtime Task)
# Usage: fetch.sh <ASIN> [--limit N] [--market US] [--output file.json]
set -euo pipefail
ASIN="-"
LIMIT=8
MARKET="US"
OUTPUT_FILE=""
shift || true
while [[ $# -gt 0 ]]; do
case "$1" in
--limit) LIMIT="$2"; shift 2 ;;
--market) MARKET="$2"; shift 2 ;;
--output) OUTPUT_FILE="$2"; shift 2 ;;
*) shift ;;
esac
done
if [[ -z "$ASIN" ]]; then
echo "Usage: fetch.sh <ASIN> [--limit N] [--market US]" >&2
exit 1
fi
# ── Dependencies ──────────────────────────────────────────────
if ! command -v curl &>/dev/null; then
echo "curl is required" >&2; exit 1
fi
if ! command -v python3 &>/dev/null; then
echo "python3 is required" >&2; exit 1
fi
# ── API Key ───────────────────────────────────────────────────
API_BASE="https://openapi.shulex.com"
API_KEY="-"
if [[ -z "$API_KEY" ]]; then
cat >&2 <<'MSG'
╔══════════════════════════════════════════════════════════╗
║ VOC_API_KEY not set — free registration to get started ║
╚══════════════════════════════════════════════════════════╝
1. Register (free): https://apps.voc.ai/openapi?utm_source=skill&utm_medium=onboarding&utm_campaign=launch_apr
2. Create API key: https://apps.voc.ai/openapi/keys?utm_source=skill&utm_medium=onboarding&utm_campaign=launch_apr
3. Buy credits: https://apps.voc.ai/openapi/billing?utm_source=skill&utm_medium=onboarding&utm_campaign=launch_apr
4. Set your key: export VOC_API_KEY=your-key
New accounts include starter credits — enough for 8+ reviews.
MSG
exit 1
fi
# ── Market code mapping (domain → code) ──────────────────────
case "$MARKET" in
amazon.com|us|Us) MARKET="US" ;;
amazon.ca|ca|Ca) MARKET="CA" ;;
amazon.com.mx|mx|Mx) MARKET="MX" ;;
amazon.co.uk|gb|Gb|uk) MARKET="GB" ;;
amazon.de|de|De) MARKET="DE" ;;
amazon.fr|fr|Fr) MARKET="FR" ;;
amazon.it|it|It) MARKET="IT" ;;
amazon.es|es|Es) MARKET="ES" ;;
amazon.co.jp|jp|Jp) MARKET="JP" ;;
amazon.com.au|au|Au) MARKET="AU" ;;
US|CA|MX|GB|DE|FR|IT|ES|JP|AU) ;; # already valid
*)
echo "Unsupported market: $MARKET" >&2
echo "Supported: US CA MX GB DE FR IT ES JP AU" >&2
exit 1
;;
esac
# ── Calculate maxPage ─────────────────────────────────────────
# Amazon shows ~10 reviews per page; cost = 5 credits x maxPage
MAX_PAGE=$(( (LIMIT + 9) / 10 ))
if [[ $MAX_PAGE -lt 1 ]]; then MAX_PAGE=1; fi
if [[ $MAX_PAGE -gt 100 ]]; then MAX_PAGE=100; fi
echo "Fetching reviews for ASIN: $ASIN (market: $MARKET, limit: $LIMIT)" >&2
# ── Step 1: Submit realtime review task ───────────────────────
echo " Submitting review task..." >&2
SUBMIT_RESP=$(curl -s -X POST "$API_BASE/v1/api/RtTask01" \
-H "Content-Type: application/json" \
-H "X-API-Key: $API_KEY" \
-d "{\"asin\":\"$ASIN\",\"market\":\"$MARKET\",\"maxPage\":$MAX_PAGE,\"platform\":\"AMAZON\"}")
TASK_ID=$(echo "$SUBMIT_RESP" | python3 -c "
import sys, json
try:
d = json.load(sys.stdin)
print(d.get('data',{}).get('taskId',''))
except: print('')
" 2>/dev/null)
CODE=$(echo "$SUBMIT_RESP" | python3 -c "
import sys, json
try:
print(json.load(sys.stdin).get('code',''))
except: print('')
" 2>/dev/null)
if [[ -z "$TASK_ID" || "$CODE" != "0" ]]; then
echo "Failed to submit task:" >&2
echo " $SUBMIT_RESP" >&2
exit 1
fi
echo " Task submitted: $TASK_ID" >&2
# ── Step 2: Poll until SUCCESS or FAILED ──────────────────────
echo " Waiting for reviews..." >&2
MAX_WAIT=120
WAITED=0
STATUS="PENDING"
POLL_RESP=""
while [[ "$STATUS" != "SUCCESS" && "$STATUS" != "FAILED" && $WAITED -lt $MAX_WAIT ]]; do
sleep 5
WAITED=$((WAITED + 5))
POLL_RESP=$(curl -s "$API_BASE/v1/api/RtQry01?taskId=$TASK_ID&pageNo=1&pageSize=$LIMIT" \
-H "X-API-Key: $API_KEY")
STATUS=$(echo "$POLL_RESP" | python3 -c "
import sys, json
try:
print(json.load(sys.stdin).get('data',{}).get('status','UNKNOWN'))
except: print('UNKNOWN')
" 2>/dev/null)
echo " ... status: $STATUS (WAITEDs)" >&2
done
if [[ "$STATUS" != "SUCCESS" ]]; then
ERR_MSG=$(echo "$POLL_RESP" | python3 -c "
import sys, json
try:
d = json.load(sys.stdin).get('data',{})
print(d.get('errorMsg','') or d.get('message','unknown error'))
except: print('Task timed out')
" 2>/dev/null)
echo "Task failed: $ERR_MSG" >&2
exit 1
fi
# ── Step 3: Extract reviews ───────────────────────────────────
RESULT=$(echo "$POLL_RESP" | python3 -c "
import sys, json
resp = json.load(sys.stdin)
data = resp.get('data', {})
reviews = data.get('reviews', [])
total = data.get('total', 0)
limit = $LIMIT
# Normalize review format
normalized = []
for r in reviews[:limit]:
normalized.append({
'rating': r.get('rating'),
'title': r.get('title', ''),
'body': r.get('body', '') or r.get('content', ''),
'date': r.get('reviewDate', ''),
'verified': bool(r.get('verified') or r.get('verifiedPurchase')),
'variant': r.get('variant', ''),
'author': r.get('author', '') or r.get('reviewerName', ''),
'helpful': r.get('helpfulVotes', 0),
'reviewId': r.get('reviewId', ''),
'vineVoice': bool(r.get('isVineVoice')),
})
output = {
'reviews': normalized,
'meta': {
'asin': data.get('asin', ''),
'market': data.get('market', ''),
'total_available': total,
'fetched': len(normalized),
}
}
print(json.dumps(output, ensure_ascii=False, indent=2))
" 2>/dev/null)
REVIEW_COUNT=$(echo "$RESULT" | python3 -c "import sys,json; print(len(json.load(sys.stdin).get('reviews',[])))" 2>/dev/null || echo "0")
TOTAL_AVAIL=$(echo "$RESULT" | python3 -c "import sys,json; print(json.load(sys.stdin).get('meta',{}).get('total_available',0))" 2>/dev/null || echo "0")
echo " Retrieved $REVIEW_COUNT reviews (total available: $TOTAL_AVAIL)" >&2
if [[ -n "$OUTPUT_FILE" ]]; then
echo "$RESULT" > "$OUTPUT_FILE"
echo " Saved to: $OUTPUT_FILE" >&2
else
echo "$RESULT"
fi
FILE:voc.sh
#!/usr/bin/env bash
# VOC AI - Amazon Review Intelligence
# Usage: voc.sh <ASIN> [--limit N] [--market US] [--output file.md]
set -euo pipefail
SKILL_DIR="$(cd "$(dirname "BASH_SOURCE[0]")" && pwd)"
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
CYAN='\033[0;36m'
NC='\033[0m'
print_banner() {
echo -e "BLUE"
echo " ██╗ ██╗ ██████╗ ██████╗ █████╗ ██╗"
echo " ██║ ██║██╔═══██╗██╔════╝ ██╔══██╗██║"
echo " ██║ ██║██║ ██║██║ ███████║██║"
echo " ╚██╗ ██╔╝██║ ██║██║ ██╔══██║██║"
echo " ╚████╔╝ ╚██████╔╝╚██████╗ ██║ ██║██║"
echo " ╚═══╝ ╚═════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝"
echo -e "NC"
echo " Amazon Review Intelligence | Powered by Shulex VOC"
echo " ─────────────────────────────────────────────────────"
echo ""
}
usage() {
echo "Usage: voc.sh <ASIN> [options]"
echo ""
echo "Options:"
echo " --limit N Number of reviews to fetch (default: 8, max with API key)"
echo " --market CODE Amazon marketplace: US CA MX GB DE FR IT ES JP AU (default: US)"
echo " --output FILE Save report to file"
echo " --help Show this help"
echo ""
echo "Examples:"
echo " voc.sh B08N5WRWNW # Quick analysis (8 reviews)"
echo " voc.sh B08N5WRWNW --limit 100 # Deep analysis (100 reviews)"
echo " voc.sh B08N5WRWNW --market JP # Japan marketplace"
echo " voc.sh B08N5WRWNW --limit 200 --output report.md"
echo ""
echo "Environment:"
echo " VOC_API_KEY Shulex VOC API key (required)"
echo " Get yours free: https://apps.voc.ai/openapi?utm_source=skill&utm_medium=onboarding&utm_campaign=launch_apr"
exit 0
}
# Parse args
ASIN=""
LIMIT=8
MARKET="US"
OUTPUT_FILE=""
while [[ $# -gt 0 ]]; do
case "$1" in
--help|-h) usage ;;
--limit) LIMIT="$2"; shift 2 ;;
--market) MARKET="$2"; shift 2 ;;
--output) OUTPUT_FILE="$2"; shift 2 ;;
-*) echo "Unknown option: $1" >&2; usage ;;
*) ASIN="$1"; shift ;;
esac
done
if [[ -z "$ASIN" ]]; then
echo -e "REDPlease provide an ASINNC" >&2
echo "Run 'voc.sh --help' for usage." >&2
exit 1
fi
# Validate ASIN format (10 alphanumeric chars)
if ! echo "$ASIN" | grep -qE '^[A-Z0-9]{10}$'; then
echo -e "YELLOWWarning: ASIN format may be incorrect (expected 10 alphanumeric chars, e.g. B08N5WRWNW)NC" >&2
fi
# Check dependencies
check_deps() {
local missing=0
if ! command -v curl &>/dev/null; then
echo " - curl" >&2
missing=1
fi
if ! command -v python3 &>/dev/null; then
echo " - python3" >&2
missing=1
fi
if [[ -z "-" ]]; then
echo "" >&2
echo -e "YELLOW VOC_API_KEY not set.NC" >&2
echo "" >&2
echo -e " Get your GREENfreeNC API key in 30 seconds:" >&2
echo -e " CYAN1.NC Register: https://apps.voc.ai/openapi?utm_source=skill&utm_medium=onboarding&utm_campaign=launch_apr" >&2
echo -e " CYAN2.NC Create key: https://apps.voc.ai/openapi/keys?utm_source=skill&utm_medium=onboarding&utm_campaign=launch_apr" >&2
echo -e " CYAN3.NC Set it: export VOC_API_KEY=your-key" >&2
echo "" >&2
echo " New accounts include starter credits (8 reviews = 5 credits)." >&2
echo "" >&2
missing=1
fi
if [[ $missing -eq 1 ]]; then
exit 1
fi
}
print_banner
check_deps
# Map market display name
MARKET_UPPER=$(echo "$MARKET" | tr '[:lower:]' '[:upper:]')
echo -e "GREENAnalyzing ASIN: YELLOW$ASINNC"
echo -e " Market: $MARKET_UPPER | Reviews: $LIMIT"
echo ""
# ── Step 1: Fetch reviews via Shulex API ─────────────────────
TEMP_DATA=$(mktemp /tmp/voc_data_XXXXXX.json)
TEMP_REVIEWS=$(mktemp /tmp/voc_reviews_XXXXXX.json)
trap "rm -f $TEMP_DATA $TEMP_REVIEWS" EXIT
echo -e "BLUE[1/2] Fetching review data via Shulex VOC API...NC"
bash "$SKILL_DIR/fetch.sh" "$ASIN" \
--limit "$LIMIT" \
--market "$MARKET" \
--output "$TEMP_DATA"
# Extract just the reviews array for analyze.sh
python3 -c "
import json, sys
with open('$TEMP_DATA') as f:
data = json.load(f)
reviews = data.get('reviews', data if isinstance(data, list) else [])
json.dump(reviews, sys.stdout, ensure_ascii=False, indent=2)
" > "$TEMP_REVIEWS" 2>/dev/null
REVIEW_COUNT=$(python3 -c "import json; print(len(json.load(open('$TEMP_REVIEWS'))))" 2>/dev/null || echo "0")
TOTAL_AVAIL=$(python3 -c "import json; print(json.load(open('$TEMP_DATA')).get('meta',{}).get('total_available',0))" 2>/dev/null || echo "?")
if [[ "$REVIEW_COUNT" -eq 0 ]]; then
echo -e "REDNo reviews retrieved. Check:NC" >&2
echo " - Is the ASIN correct?" >&2
echo " - Does this product have reviews?" >&2
echo " - Is your API key valid? Check: https://apps.voc.ai/openapi/keys?utm_source=skill&utm_medium=onboarding&utm_campaign=launch_apr" >&2
exit 1
fi
echo -e "GREENRetrieved $REVIEW_COUNT reviews (total available: $TOTAL_AVAIL)NC"
echo ""
# ── Step 2: AI analysis ──────────────────────────────────────
echo -e "BLUE[2/2] AI deep analysis...NC"
if [[ -n "$OUTPUT_FILE" ]]; then
bash "$SKILL_DIR/analyze.sh" "$TEMP_REVIEWS" "$ASIN" --output "$OUTPUT_FILE"
else
bash "$SKILL_DIR/analyze.sh" "$TEMP_REVIEWS" "$ASIN"
fi
echo ""
echo -e "GREENAnalysis complete!NC"
# ── Upgrade prompt (show when using default 8 reviews) ────────
if [[ "$LIMIT" -le 8 && "$TOTAL_AVAIL" != "?" ]]; then
TOTAL_NUM=$(echo "$TOTAL_AVAIL" | tr -d '[:space:]')
if [[ "$TOTAL_NUM" -gt 8 ]] 2>/dev/null; then
echo ""
echo -e "CYAN━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━NC"
echo -e " This report analyzed YELLOW$REVIEW_COUNTNC of YELLOW$TOTAL_AVAILNC available reviews."
echo -e " For deeper insights, increase your limit:"
echo ""
echo -e " GREENvoc.sh $ASIN --limit 100NC"
echo ""
echo -e " Need more credits? CYANhttps://apps.voc.ai/openapi/billing?utm_source=skill&utm_medium=report_cta&utm_campaign=launch_aprNC"
echo -e "CYAN━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━NC"
fi
fi