@clawhub-wzhaojin-fdc58a698f
自动完成微信公众号文章的选题调研、Markdown写作、智能配图、内容审核和发布全流程管理。
# wechat-mp-auto - 微信公众号自动化 Skill
**版本**: v0.1.2
**描述**: 微信公众号文章从选题到发布的全流程自动化
---
## 架构理念
本 Skill 由 **AI 模型作为编排者**,Python 代码提供原子化工具能力。AI 读取本 SKILL.md 后自行决定调用哪些工具、完成全部流程。
Python 代码不包含任何 AI 调用逻辑,所有生成、推理、判断均由 AI 模型完成。
---
## 工具清单
AI 可调用的所有工具如下,调用时请传入完整参数:
### 1. 调研工具
**`research_topic(topic: str, keywords: Optional[List[str]] = None) -> dict`**
- 输入:文章主题(字符串),可选的关键词列表(用于精细化搜索)
- 输出:`{"search_results": [...], "summary": "..."}`
- 作用:对给定主题进行网络调研,返回搜索结果摘要
- 内部级联:Tavily → DuckDuckGo → 百度,任一成功即返回
---
**`generate_outline(topic: str, research: dict) -> dict`**
- 输入:主题字符串 + research_topic 的返回结果
- 输出:
```json
{
"title": "深度解析:XXX",
"sections": [
{"name": "引言", "description": "...", "key_points": ["要点1", "要点2"]},
{"name": "核心内容", "description": "...", "key_points": [...]},
...
]
}
```
- 作用:根据调研结果生成文章大纲,包含 4 个标准章节(引言/核心内容/实践方法/结论)
---
### 2. 写作工具
**`convert_to_html(markdown: str, theme: str) -> str`**
- 输入:Markdown 格式文章内容 + 主题名称
- 输出:微信可用的 HTML 字符串
- 主题可选值:
- `default` — 默认蓝色
- `macaron` — 马卡龙粉紫色
- `shuimo` — 水墨深灰蓝
- `wenyan` — 文雁深蓝绿
- `houge` — 猴哥深蓝橙
- `cuiyu` — 翠玉青绿黄
### 3. 图片工具
**`search_image(query: str, count: int) -> list`**
- 输入:搜索关键词(字符串),请求图片数量(整数)
- 输出:图片信息列表,每个元素含 `url`(下载链接)和 `local_path`(本地缓存路径)
- 作用:通过 Pexels/Unsplash 图库搜索并下载图片,返回本地文件路径
- 注意:如未配置图库 API Key,此工具不可用
---
**`generate_image(prompt: str, size: str) -> dict`**
- 输入:图片描述提示词(字符串),图片尺寸(字符串,格式如 `"1024x1024"`)
- 输出:`{"local_path": "本地文件路径"}`
- 作用:调用 AI 生图模型生成图片,返回本地保存路径
---
**`upload_image(file_path: str) -> dict`**
- 输入:本地图片文件路径(字符串)
- 输出:`{"media_id": "...", "url": "微信图片URL"}`
- 作用:将本地图片上传至微信素材库,返回微信图片 URL 和 media_id
- 注意:上传前需确保图片文件存在,支持 JPG/PNG
---
### 4. 内容审核工具
**`review_article(article: dict) -> dict`**
- 输入:文章对象,格式如下:
```json
{
"markdown": "Markdown 内容(字符串)",
"content": "HTML 内容(字符串,可选)"
}
```
- 输出:
```json
{
"passed": true/false,
"plagiarism": {"similarity": 0, "is_duplicated": false},
"prohibited": {"violations": []}
}
```
- 作用:审核文章内容,返回是否通过及问题列表
---
### 5. 草稿工具
**`create_draft(articles: list, auto_upload_thumb: bool = False) -> dict`**
- 输入:文章列表,每篇格式如下:
```json
{
"title": "文章标题",
"author": "贾维斯",
"content": "HTML内容(字符串)",
"thumb_media_id": "封面缩略图的media_id",
"content_source_url": "原文链接,可填 https://openclaw.ai"
}
```
- 输出:`{"media_id": "草稿ID", "msg": "..."}`
- 作用:将文章推送到微信公众号草稿箱
---
## AI 编排流程
AI 读取本 SKILL.md 后,按以下步骤执行。每一步都需要显式调用对应工具:
```
第一步:调研
→ 调用 research_topic(topic)
→ 获得 search_results 和 summary
第二步:生成大纲
→ 调用 generate_outline(topic, research结果)
→ 获得 title 和 sections(含章节名、描述、要点)
第三步:组装提示词并生成文章
→ 根据第二步的大纲,自行组装提示词(见下方"提示词组装规范")
→ 用大模型根据提示词生成完整的 Markdown 文章
→ 注意:大模型直接输出 Markdown,不需要调用任何工具来"写作"
第四步:内容审核
→ 调用 review_article({"markdown": 第三步生成的Markdown})
→ 如 passed=false 或存在 prohibited.violations,应重新生成或修改内容
→ 审核通过后再进入第五步
第五步:生成并上传配图
→ 生成封面图(generate_images=True 时):
- 优先调用 search_image(文章标题关键词, count=5),下载到本地
- 或调用 generate_image(封面图描述, size="900x500"),得到本地路径
→ 调用 upload_image(封面图本地路径) → 获得封面微信URL
→ 为每个章节生成一张配图:
- 优先调用 search_image(章节标题关键词, count=3)
- 或调用 generate_image(章节图描述, size="900x500")
→ 分别调用 upload_image(章节图本地路径) → 获得章节图的微信URL
第六步:注入图片URL到Markdown
→ 将微信图片URL以 Markdown 图片语法插入内容:
- 封面图:在第一个 # 标题后插入 ``
- 章节图:在对应 ## 标题行后插入 ``
→ 注意:图片URL直接插入 Markdown,不是 HTML 占位符
第七步:转换格式
→ 调用 convert_to_html(Markdown字符串, theme参数)
→ Markdown 中的 `` 自动转为 `<img src="url" alt="name" style="max-width:100%;..." />`
→ 获得 HTML 字符串
第八步:推送草稿
→ 调用 upload_thumb(封面图本地路径) → 获得 thumb_media_id(素材缩略图ID)
→ 调用 create_draft([{
"title": 文章标题,
"author": "贾维斯",
"content": 包含图片URL的完整HTML,
"thumb_media_id": 封面缩略图的media_id,
"content_source_url": "https://openclaw.ai"
}])
→ 获得草稿ID,流程完成
```
---
---
## 批量生成多篇文章
单篇文章流程(第一步至第八步)为一轮。当需要生成多篇文章时,按以下方式循环执行:
```
对第 1 篇文章执行第一步至第八步
→ 等待第八步完成后,再开始第 2 篇
对第 2 篇文章执行第一步至第八步
→ 等待完成后,再开始第 3 篇
... 以此类推
```
每篇文章之间相互独立,主题、大纲、写作风格均可不同。
---
## 提示词组装规范
第三步中,AI 组装提示词时应包含以下部分,以结构化方式呈现:
```
## 文章主题
{topic}
## 文章大纲
{section_name_1}
描述:{section.description}
关键要点:{section.key_points列表}
{section_name_2}
...
## 调研资料摘要
{research.summary}
(包含3-5条最相关的搜索结果摘要)
## 写作风格要求
{从以下选项中选择或组合:}
- 口语化 / 正式 / 俏皮 / 专业 / 通俗易懂
- 段落之间逻辑连贯,有真知灼见
- 禁止空洞套话
## 输出格式要求
- 主标题:# 标题(一级,仅一个)
- 章节标题:## 二级标题
- 子章节:### 三级标题
- 重点词语:**加粗**
- 代码块:```bash 代码 ``` 格式
- 列表:- 格式
- 全文字数:不超过 7200 字
- 禁止重复章节或段落
## 配图标记说明
- 封面图位置:在文章开头,标题后插入 
- 章节图位置:在每个 ## 章节标题正后方插入 
- **重要**:每个章节的占位符必须是**唯一的**,占位符名称 = 章节标题(不含空格和特殊字符)+"_url"
- 示例:章节"OpenClaw 简介" → ``
- 示例:章节"环境准备" → ``
- **禁止**所有章节图使用相同的占位符(如 `section_image_url`)
- 占位符说明:图片 URL 暂时填入上述占位符,后续第五、六步会上传真实微信图片URL并替换
## 重要约束
- 输出内容到此为止,不输出任何检查清单、打分表、自评或额外说明
- 全文每个章节只出现一次,不得重复输出任何章节或段落
```
---
## 配置要求
### 必需
- **微信公众号凭证**(二选一):
1. `~/.config/wechat-mp-auto/config.json` 中配置 `app_id` 和 `app_secret`
2. 或在 `~/.openclaw/.env` 中配置环境变量 `WECHAT_APP_ID` 和 `WECHAT_APP_SECRET`
- **IP 白名单**:确保运行环境的出口 IP 已加入微信公众号后台的白名单
### 可选
- **图片来源**(二选一):
- `PEXELS_API_KEY`:Pexels 图库(每月 200 请求,优先横图)
- `UNSPLASH_API_KEY`:Unsplash 图库(每月 50 请求,优先横图)
- 图片下载后自动压缩:封面最大 900×500,插图最大 900×400,统一转 JPEG 85% 质量
- 环境变量或 `~/.openclaw/.env` 中配置
---
## 目录结构
```
wechat-mp-auto/
├── SKILL.md # 本文档(AI 编排指南)
├── metadata.json # Skill 元数据
├── _meta.json # ClawHub 元数据
├── README.md # 人类使用说明
├── requirements.txt # Python 依赖
├── src/
│ ├── config.py # 配置管理
│ ├── token_manager.py # 微信 Access Token 管理
│ ├── exceptions.py # 异常定义
│ └── skills/
│ ├── topic_research.py # 调研工具(research_topic / generate_outline)
│ ├── article_writer.py # 格式转换工具(convert_to_html)
│ ├── image_generator.py # 图片工具(search_image / generate_image)
│ ├── material_skill.py # 图片上传工具(upload_image)
│ ├── draft_skill.py # 草稿推送工具(create_draft)
│ ├── base_skill.py # 基础类
│ ├── content_reviewer.py # 内容审核
│ └── ... # 其他辅助模块
└── themes/ # HTML 主题配色
├── default.yaml
├── macaron.yaml
├── shuimo.yaml
├── wenyan.yaml
├── houge.yaml
└── cuiyu.yaml
```
---
## 安全规范
- Skill 代码中不存储任何凭证
- 日志中自动脱敏(密钥前 4 位 + ... + 后 4 位)
- 所有凭证从配置文件或环境变量读取,不硬编码
---
## 接口权限说明
以下是部分接口对公众号类型的要求:
| 接口/功能 | 权限要求 |
|-----------|---------|
| 素材管理(上传/下载图片) | 普通订阅号即可 |
| 草稿箱管理 | 普通订阅号即可 |
| 文章数据统计(阅读量/点赞/转发等) | 服务号或已认证的订阅号可查询,普通订阅号调用返回 404 |
| 用户管理(获取用户信息) | 普通订阅号即可(部分接口受限) |
| 群发/模板消息 | 需服务号或已认证订阅号 |
> **注意**:如需使用文章数据分析功能,请将公众号升级为服务号或完成认证。
FILE:CHANGELOG.md
## [0.1.2] - 2026-03-24
### Fixed
- `article_writer.py`:`convert_to_html` 新增 Markdown 图片语法 `` 处理,修复此前图片无法转 HTML 的问题
- `article_writer.py`:修复 `_remove_mixed_language_spaces` 将 `<img>` 标签误删的问题
- `article_writer.py`:`inject_section_images` 新增 Unicode 标点归一化(U+3001/U+FF0C)和 substring 匹配,修复章节图注入失败
- `article_writer.py`:统一所有 `<img>` 标签的 margin 样式为 16px
### Added
- `image_generator.py`:搜图失败时自动重试 3 次,提升图片获取成功率
- `article_writer.py`:新增 Markdown 图片语法支持(`` 在 `convert_to_html` 中转为 `<img>`)
### Changed
- `SKILL.md`:重写第五至八步工作流,描述与实际代码流程一致
- `article_writer.py`:`convert_to_html` 中 `_convert_inline_formatting` 生成的 img 标签统一加 inline style
### Removed
- `article_writer.py`:移除 `convert_to_html` 中冗余的 `inject_section_images` 调用(图片已通过 Markdown 插入,不再依赖此路径)
# Changelog
All notable changes to this project will be documented in this file.
## [0.1.0] - 2026-03-22
### Fixed
- `token_manager.py`:修复 `from .exceptions` 相对导入在 `-m` 模块方式运行时报错的问题,改为绝对导入
- `publish.py`:修复章节图永远 fallback 封面图的问题,新增 `img_gen.search_image()` 搜索并上传各章节独立图片
### Changed
- `draft_skill.py`:统一动态导入为绝对导入风格
## [0.0.8] - 2026-03-21
### Added
- `article_writer.py`:新增 Markdown 语法支持:删除线 `~~text~~`、上标 `^text^`、下标 `~text~`、高亮 `==text==`
- `article_writer.py`:新增任务列表 `- [ ]` / `- [x]`
- `article_writer.py`:新增缩进代码块(4空格或tab开头)
- `article_writer.py`:新增 `*text*` 斜体支持(此前只有 `_text_`)
- `article_writer.py`:新增 HTML 标签透传(`<br>`、`<div>`、`<details>` 等安全标签保留,`<script>` 等危险标签转义)
- `article_writer.py`:新增 `---` 分隔线转 `<hr>`
- `publish.py`:外部图片 URL 自动下载后上传到微信素材库,解决微信不显示外部图片的问题
### Changed
- h2 标题改为使用 `colors.secondary` 作为字体颜色和左侧装饰线,替代原来的 text/primary 色
- `colors.secondary` 正式投入使用,theme YAML 中 h2.color 字段移除(由 secondary 替代)
- `publish.py`:`--theme` 默认值从 `macaron` 改为 `default`
- 统一版本号:README.md、SKILL.md、pyproject.toml、metadata.json 均更新至 v0.0.8
### Fixed
- `article_writer.py`:修复 YAML 中 `h2.color` 显式定义导致 secondary 默认值不生效的问题
- `publish.py`:`find_cover_image` 改为优先用 Pexels 横向搜索封面图,解决封面竖图问题
- `publish.py`:封面图下载后自动压缩(900×500,JPEG 85%)
- `image_generator.py`:新增 Pillow 图片压缩,封面 900×500 / 插图 900×400,统一 JPEG 85%
- `image_generator.py`:Pexels/Unsplash 搜索加 `orientation=landscape` 优先横图
- `draft_skill.py`:`list_drafts` API 返回 key 为 `item`
- 多个 skills 文件:修复 `from src.xxx` import 路径,统一改为 `from xxx`
## [0.0.7] - 2026-03-21
- 重构 `convert_to_html`:Theme YAML 完整读取,所有颜色/字体/间距从 YAML 配置读取,不再硬编码
- 5 个主题 YAML(default/houge/macaron/shuimo/wenyan)统一完整字段结构:colors/body/h1-h3/p/link/blockquote/ul/ol
- 列表改为语义化 `<ul>/<ol>/<li>` 结构
### Fixed
- `publish.py`:修复审核时 `html` 变量未定义的 bug
- `publish.py`:修复封面图占位符 `cover_image_url` 只替换第一处的问题,改为全局替换
- `publish.py`:新增章节图占位符 `*_url` 替换逻辑
- `publish.py`:新增草稿完整性检查中图片 URL 可访问性验证(HEAD 请求)
- `publish.py`:修复 `find_cover_image` 选中缩略图(_thumb)的问题
- `publish.py`:修复外部图片 URL 正则无法匹配 `data-src` 和 `https` 协议的问题
## [0.0.6] - 2026-03-21
### Added
- SKILL.md 新增「接口权限说明」表格,明确标注数据分析 API 仅服务号或已认证订阅号可用
- README.md 注意事项补充数据分析权限说明
### Fixed
- 统一 pyproject.toml 版本号(0.0.3 → 0.0.6),与 metadata.json 保持一致
- 删除临时文件 article_final.html、article_output.html
## [0.0.5] - 2026-03-20
### Added
- 生图模型探测机制重构
- 新增 `IMAGE_GEN_PROVIDER_MAP` 注册表,覆盖国内外 13 个生图 Provider
- 支持的 Provider:ali-bailian/wanx、minimax-cn/image-01、baidu、tencent、zhipu、sensetime、bytedance、openai/dall-e-3、google/imagen-3、stability-ai、replicate、aws-bedrock、azure-openai
- 初筛(input/api含image)→ 实测调用API → 缓存24小时
- 探测结果缓存机制(`~/.cache/wechat-mp-auto/image_models_cache.json`)
- 级联搜索机制(TopicResearchSkill)
- 支持 Tavily → DuckDuckGo → 百度 多源自动切换
- 任何一个源成功即返回,失败自动尝试下一个
- 失败原因详细记录,限流(429)自动重试
### Changed
- `get_ai_model_options()` 适配新探测逻辑,无可用模型时给出明确提示
- `TopicResearchSkill.research_topic()` 和内容审核网络检测均支持级联搜索
## [0.0.4] - 2026-03-19
### Added
- 智能配图:AI 生图 + Pexels/Unsplash 图库
- 图片来源选择引导机制
- 本地+网络重复度检测
- 三阶段完整性检查
- 5种主题切换(default, houge, shuimo, wenyan, macaron)
## [0.0.3] - 2026-03-18
### Added
- 文章写作 Skill
- Markdown 转微信 HTML
- 敏感词检测
## [0.0.2] - 2026-03-17
### Added
- 微信公众号认证和 Token 管理
- 草稿发布 Skill
## [0.0.1] - 2026-03-16
### Added
- 项目初始化
- 基础架构搭建
FILE:LICENSE.txt
MIT License
Copyright (c) 2026
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
FILE:README.md
# wechat-mp-auto
微信公众号自动化 Skill - 从选题到发布的全流程自动化
[](https://www.python.org/)
[](LICENSE.txt)
[](SKILL.md)
## 功能特性
本 Skill 由 **AI 模型作为编排者**,Python 代码提供原子化工具能力。AI 阅读 SKILL.md 后自行决定调用哪些工具,完成全部流程。
- **选题调研** - 级联搜索(Tavily → DuckDuckGo → 百度),多源自动切换
- **文章写作** - AI 根据提示词生成 Markdown,Python 负责格式转换
- **智能配图** - Pexels/Unsplash 图库搜索 + AI 生图(可选择)
- **图片处理** - 调整微信尺寸、压缩
- **内容审核** - 本地+网络重复度检测、敏感词扫描
- **模板管理** - 6 种本地主题切换(default/macaron/shuimo/wenyan/houge/cuiyu)
- **草稿发布** - 一键推送到公众号草稿箱
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 配置凭证
在 `~/.config/wechat-mp-auto/config.json` 中配置微信公众号凭证:
```json
{
"app_id": "your_app_id",
"app_secret": "your_app_secret"
}
```
或设置环境变量:
```bash
export WECHAT_APP_ID="your_app_id"
export WECHAT_APP_SECRET="your_app_secret"
```
### 3. 可选:配置图片生成 API
首次使用时系统会引导选择图片来源方式,支持以下两种:
**图片接口检索(推荐,无需信用卡):**
```bash
export PEXELS_API_KEY="your_pexels_key"
export UNSPLASH_API_KEY="your_unsplash_key"
```
注册地址:Pexels https://www.pexels.com/api/ | Unsplash https://unsplash.com/developers
### 4. 可选:配置网络重复度检测
**Tavily:**
```bash
export TAVILY_API_KEY="your_tavily_key"
```
## 使用方法
本 Skill 由 **AI 模型作为编排者**,所有流程通过 AI 读取 SKILL.md 后自行调用工具完成,无需手动编写 Python 脚本。
**详细工具说明和编排流程见 [SKILL.md](./SKILL.md)。**
以下代码仅供手动调试参考:
```python
from skills.topic_research import TopicResearchSkill
from skills.article_writer import ArticleWriterSkill
from skills.image_generator import ImageGeneratorSkill
from skills.material_skill import MaterialSkill
from skills.draft_skill import DraftSkill
from skills.content_reviewer import ContentReviewerSkill
# 调研
research = TopicResearchSkill().research_topic("AI大模型应用")
# 生成大纲
outline = TopicResearchSkill().generate_outline("AI大模型应用", research)
# 内容审核(Markdown 生成后)
review = ContentReviewerSkill().review_article({"markdown": markdown_text})
if not review["passed"]:
print("审核未通过:", review)
# 转换 HTML
html = ArticleWriterSkill().convert_to_html(markdown_text, theme="macaron")
# 上传图片
result = MaterialSkill().upload_image("/path/to/image.jpg")
# 创建草稿
DraftSkill().create_draft([{"title": "...", "content": html, ...}])
```
## 支持的主题
| 主题 | 说明 |
|------|------|
| macaron | 马卡龙风格(粉紫色 #FF6B9D) |
| shuimo | 水墨风格(深灰蓝 #2C3E50) |
| wenyan | 文雁风格(深蓝绿 #0066FF) |
| houge | 猴哥风格(深蓝橙 #6b5b8a) |
| cuiyu | 翠玉风格(青绿 #1EA089) |
| default | 默认风格(蓝色 #007AFF) |
## 内容审核详解
### 本地重复度检测
- **算法**:n-gram + Jaccard 相似度
- **阈值**:30% 以上判定为重复
- **存储**:自动保存到 `~/.cache/wechat-mp-auto/article_history.json`
### 网络重复度检测
- **算法**:提取关键句 → Tavily 搜索 → 相似度比对
- **阈值**:10% 以上判定为匹配
- **特点**:异步执行,不阻塞主流程
- **缓存**:搜索结果缓存到 `~/.cache/wechat-mp-auto/search_cache.json`
### 敏感词检测
检测词汇:`反动、暴力、色情、赌博、毒品、诈骗、谣言`
## 项目结构
```
wechat-mp-auto/
├── SKILL.md # AI 编排指南(本 Skill 的核心文档)
├── metadata.json # Skill 元数据
├── README.md # 本文件(人类参考)
├── requirements.txt # Python 依赖
├── pyproject.toml # 项目配置
├── src/
│ ├── config.py # 配置管理
│ ├── token_manager.py # 微信 Access Token 管理
│ ├── exceptions.py # 异常定义
│ └── skills/
│ ├── topic_research.py # 调研工具(research_topic / generate_outline)
│ ├── article_writer.py # 格式转换工具(convert_to_html)
│ ├── image_generator.py # 图片工具(search_image / generate_image)
│ ├── material_skill.py # 图片上传工具(upload_image)
│ ├── draft_skill.py # 草稿推送工具(create_draft)
│ └── ... # 其他辅助模块
└── themes/ # HTML 主题配色
├── default.yaml
├── macaron.yaml
├── shuimo.yaml
├── wenyan.yaml
├── houge.yaml
└── cuiyu.yaml
```
## 注意事项
1. **图片尺寸**:封面 900x500px,插图建议 640-1080px 宽度
2. **图片格式**:支持 jpg/jpeg/png/gif/webp
3. **API 限制**:微信素材上传有频率限制,大批量操作注意节奏
4. **Token 缓存**:access_token 会自动缓存,无需手动处理
5. **网络检测**:需要配置 TAVILY_API_KEY 才能启用网络重复度检测
6. **数据分析权限**:文章统计数据(阅读量/点赞/转发)API 仅**服务号**或**已认证的订阅号**可用。普通订阅号调用会返回 404,如需此功能请升级公众号类型。
## 常见问题
**Q: 提示 "access_token 无效" 怎么办?**
> 删除 `~/.cache/wechat-mp-auto/token.json` 文件,重新运行
**Q: 图片上传失败?**
> 检查图片格式和大小,微信限制 20MB
**Q: 如何获取 app_id 和 app_secret?**
> 登录微信公众号后台 → 设置与开发 → 基本配置
**Q: 网络重复度检测需要配置什么?**
> 需要配置 TAVILY_API_KEY,可在 https://tavily.com/ 注册获取
**Q: 文章没有插图怎么办?**
> 检查 ~/.cache/wechat-mp-auto/images/ 目录是否有图片,或配置 PEXELS_API_KEY 自动生成
## 许可证
MIT License - see [LICENSE.txt](LICENSE.txt)
## 贡献
欢迎提交 Issue 和 Pull Request!
FILE:_meta.json
{
"ownerId": "kn727d9p0m1nhp3cbbj031hzw98361m5",
"slug": "wechat-mp-auto",
"version": "0.1.0",
"publishedAt": 1774151795845
}
FILE:metadata.json
{
"name": "wechat-mp-auto",
"description": "微信公众号自动化Skill,可直接在openclaw等支持skill的平台中使用。实现微信公众号文章从选题、写作,配图、审核到发布的全流程自动化。",
"version": "v0.1.2",
"author": "wzhaojin",
"homepage": "https://github.com/wzhaojin/wechat-mp-auto",
"tags": [
"wechat",
"automation",
"openclaw",
"weixin",
"article",
"publish"
],
"always": false,
"env": {
"required": [
"WECHAT_APP_ID",
"WECHAT_APP_SECRET"
],
"optional": [
"PEXELS_API_KEY",
"UNSPLASH_API_KEY",
"OPENAI_API_KEY",
"TAVILY_API_KEY"
],
"alsoReads": [
".env"
]
},
"paths": {
"config": "~/.config/wechat-mp-auto",
"cache": "~/.cache/wechat-mp-auto"
}
}
FILE:pyproject.toml
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "wechat-mp-auto"
version = "0.1.2"
description = "微信公众号自动化工具 - 选题、写作、配图、审核、草稿、发布全流程自动化"
readme = "README.md"
license = {text = "MIT"}
authors = [
{name = "wzhaojin", email = "[email protected]"}
]
keywords = ["wechat", "微信公众号", "automation", "openclaw", "ai"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"requests>=2.28.0",
"PyYAML>=6.0",
]
[project.optional-dependencies]
image = ["Pillow>=9.0.0"]
ai = ["openai>=1.0.0"]
dev = ["pytest>=7.0.0"]
[project.scripts]
wechat-mp-publish = "publish:main"
[tool.setuptools]
packages = ["src", "src.skills", "src.utils", "src.formatters"]
[tool.pytest.ini_options]
testpaths = ["test*.py"]
python_files = ["test_*.py"]
FILE:requirements.txt
# Core
requests>=2.28.0
PyYAML>=6.0
# Image Processing (optional but recommended)
Pillow>=9.0.0
# AI Image Generation (optional)
# openai>=1.0.0
# For better HTTP handling (optional)
urllib3>=1.26.0
FILE:scripts/bump_version.py
#!/usr/bin/env python3
"""
版本号同步脚本 - 每次发布前运行此脚本更新版本号
用法:
python scripts/bump_version.py 0.1.0 "新增xxx功能"
python scripts/bump_version.py # 不带参数则读取 src/_version.py 打印当前版本
同时更新以下文件:
src/_version.py - 单一数据源
pyproject.toml - build metadata
metadata.json - ClawHub metadata
SKILL.md - AI 编排文档
README.md - Badge
CHANGELOG.md - 变更记录
"""
import sys
import os
import re
import datetime
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent
REPO_DIR = SCRIPT_DIR.parent
def get_current_version():
version_file = REPO_DIR / "src" / "_version.py"
if version_file.exists():
content = version_file.read_text()
m = re.search(r'VERSION\s*=\s*["\']([^"\']+)["\']', content)
if m:
return m.group(1)
return None
def bump_version(new_version: str, changelog_entry: str = ""):
"""更新所有文件中的版本号"""
changes = []
# 1. src/_version.py
vf = REPO_DIR / "src" / "_version.py"
old_content = vf.read_text()
new_content = re.sub(r'VERSION\s*=\s*["\'][^"\']+["\']', f'VERSION = "{new_version}"', old_content)
if old_content != new_content:
vf.write_text(new_content)
changes.append("src/_version.py")
# 2. pyproject.toml
pt = REPO_DIR / "pyproject.toml"
old_content = pt.read_text()
new_content = re.sub(r'version\s*=\s*"[^"]+"', f'version = "{new_version}"', old_content)
if old_content != new_content:
pt.write_text(new_content)
changes.append("pyproject.toml")
# 3. metadata.json
mj = REPO_DIR / "metadata.json"
old_content = mj.read_text()
new_content = re.sub(r'"version":\s*"[^"]+"', f'"version": "v{new_version}"', old_content)
if old_content != new_content:
mj.write_text(new_content)
changes.append("metadata.json")
# 4. SKILL.md
sm = REPO_DIR / "SKILL.md"
old_content = sm.read_text()
new_content = re.sub(r'\*\*版本\*\*:\s*v[\d.]+', f'**版本**: v{new_version}', old_content)
if old_content != new_content:
sm.write_text(new_content)
changes.append("SKILL.md")
# 5. README.md badge
rm = REPO_DIR / "README.md"
old_content = rm.read_text()
new_content = re.sub(
r'\[!\[Version\]\(https://img\.shields\.io/badge/version-v\)[\d.]+\.png\)\]\(SKILL\.md\)',
f'[](SKILL.md)',
old_content
)
if old_content != new_content:
rm.write_text(new_content)
changes.append("README.md")
# 6. CHANGELOG.md - 追加新版本
cl = REPO_DIR / "CHANGELOG.md"
today = datetime.date.today().strftime("%Y-%m-%d")
date_str = today
new_entry = f"""## [{new_version}] - {date_str}
{changelog_entry}
"""
old_content = cl.read_text()
# 在 "## [Unreleased]" 或 "## [" 之后插入,或在顶部追加
if changelog_entry:
if old_content.startswith("# Changelog"):
# 找到第一个 ## [x.x.x] 之前插入
first_version_pos = re.search(r'^## \[', old_content, re.MULTILINE)
if first_version_pos:
insert_pos = first_version_pos.start()
new_content = old_content[:insert_pos] + new_entry.strip() + "\n\n" + old_content[insert_pos:]
else:
new_content = old_content + "\n\n" + new_entry
else:
new_content = old_content + "\n\n" + new_entry
if old_content != new_content:
cl.write_text(new_content)
changes.append("CHANGELOG.md (新增版本条目)")
return changes
def check_version_consistency():
"""检查所有文件的版本号是否一致,返回 (一致, 当前版本, 不一致文件列表)"""
versions = {}
# src/_version.py
version_file = REPO_DIR / "src" / "_version.py"
if version_file.exists():
content = version_file.read_text()
m = re.search(r'VERSION\s*=\s*["\']([^"\']+)["\']', content)
if m:
versions["src/_version.py"] = m.group(1)
# pyproject.toml
pt = REPO_DIR / "pyproject.toml"
if pt.exists():
content = pt.read_text()
m = re.search(r'version\s*=\s*"([^"]+)"', content)
if m:
versions["pyproject.toml"] = m.group(1)
# metadata.json
mj = REPO_DIR / "metadata.json"
if mj.exists():
content = mj.read_text()
m = re.search(r'"version":\s*"([^"]+)"', content)
if m:
versions["metadata.json"] = m.group(1).lstrip("v")
# SKILL.md
sm = REPO_DIR / "SKILL.md"
if sm.exists():
content = sm.read_text()
m = re.search(r'\*\*版本\*\*:\s*v([\d.]+)', content)
if m:
versions["SKILL.md"] = m.group(1)
# README.md
rm = REPO_DIR / "README.md"
if rm.exists():
content = rm.read_text()
m = re.search(r'version-v([\d.]+)-blue', content)
if m:
versions["README.md"] = m.group(1)
unique_versions = set(versions.values())
inconsistent = [f for f, v in versions.items() if v not in unique_versions]
return len(unique_versions) == 1, list(unique_versions)[0] if unique_versions else None, inconsistent
def main():
if len(sys.argv) < 2:
# 无参数:检查一致性
consistent, version, inconsistent = check_version_consistency()
print(f"当前版本: {version}")
if consistent:
print("✅ 所有文件版本号一致")
sys.exit(0)
else:
print(f"❌ 版本号不一致,涉事文件: {inconsistent}")
sys.exit(1)
new_version = sys.argv[1]
changelog_entry = sys.argv[2] if len(sys.argv) > 2 else ""
# 版本号格式检查
if not re.match(r'^\d+\.\d+\.\d+$', new_version):
print(f"❌ 版本号格式错误,应为 x.y.z,例如 0.0.9,实际: {new_version}")
sys.exit(1)
current = get_current_version()
if current == new_version:
print(f"❌ 版本号未变化,当前已是 {new_version}")
sys.exit(1)
print(f"🔄 {current} → {new_version}")
changes = bump_version(new_version, changelog_entry)
for f in changes:
print(f" ✅ 更新: {f}")
# 验证一致性
consistent, version, inconsistent = check_version_consistency()
if consistent:
print(f"\n✅ 版本号已全部更新为 {version},所有文件一致")
else:
print(f"\n❌ 更新后仍有文件版本不一致: {inconsistent}")
sys.exit(1)
if __name__ == "__main__":
main()
FILE:src/__init__.py
"""
微信公众号自动化 Skill - 核心模块初始化
"""
__version__ = "1.0.0"
FILE:src/_version.py
VERSION = "0.0.9"
FILE:src/config.py
"""
微信公众号自动化 - 配置管理
"""
import os
import json
import logging
from pathlib import Path
from typing import Optional, Tuple, Dict
# 配置日志
logger = logging.getLogger(__name__)
class Config:
"""配置管理类"""
DEFAULT_CONFIG_DIR = Path.home() / ".config" / "wechat-mp-auto"
DEFAULT_CACHE_DIR = Path.home() / ".cache" / "wechat-mp-auto"
def __init__(self):
self._config = {}
self._ensure_directories()
def _ensure_directories(self):
"""确保必要的目录存在"""
try:
self.DEFAULT_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
self.DEFAULT_CACHE_DIR.mkdir(parents=True, exist_ok=True)
logger.debug(f"配置目录已创建/确认: {self.DEFAULT_CONFIG_DIR}")
except PermissionError as e:
logger.error(f"创建配置目录权限不足: {str(e)}")
raise
except Exception as e:
logger.error(f"创建配置目录失败: {str(e)}")
raise
def get_credentials(self) -> Tuple[Optional[str], Optional[str]]:
"""
获取凭证 - 优先级:配置文件 > 环境变量 > .env
"""
app_id = None
app_secret = None
# 1. 优先从配置文件读取
config_file = self.DEFAULT_CONFIG_DIR / "config.json"
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
app_id = data.get("app_id")
app_secret = data.get("app_secret")
if app_id and app_secret:
logger.info("从配置文件读取凭证成功")
return app_id, app_secret
except json.JSONDecodeError as e:
logger.error(f"配置文件JSON解析失败: {str(e)}")
except PermissionError as e:
logger.error(f"读取配置文件权限不足: {str(e)}")
except Exception as e:
logger.error(f"读取配置文件失败: {str(e)}")
# 2. 从环境变量读取
app_id = os.environ.get("WECHAT_APP_ID")
app_secret = os.environ.get("WECHAT_APP_SECRET")
if app_id and app_secret:
logger.info("从环境变量读取凭证成功")
return app_id, app_secret
# 3. 从 .env 读取
env_file = Path.home() / ".openclaw" / ".env"
if env_file.exists():
try:
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
if "=" in line:
key, value = line.split("=", 1)
if key == "WECHAT_APP_ID":
app_id = value.strip()
elif key == "WECHAT_APP_SECRET":
app_secret = value.strip()
if app_id and app_secret:
logger.info("从.env文件读取凭证成功")
return app_id, app_secret
except PermissionError as e:
logger.error(f"读取.env文件权限不足: {str(e)}")
except Exception as e:
logger.error(f"读取.env文件失败: {str(e)}")
logger.error("未配置微信公众号凭证")
raise Exception("未配置微信公众号凭证")
def get_default_template(self) -> dict:
"""获取默认模板"""
config_file = self.DEFAULT_CONFIG_DIR / "config.json"
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
template = data.get("default_template", {"type": "local", "id": "default"})
logger.debug(f"获取默认模板: {template}")
return template
except json.JSONDecodeError as e:
logger.error(f"配置文件JSON解析失败: {str(e)}")
except PermissionError as e:
logger.error(f"读取配置文件权限不足: {str(e)}")
except Exception as e:
logger.error(f"读取配置文件失败: {str(e)}")
default_template = {"type": "local", "id": "default"}
logger.debug(f"使用默认模板: {default_template}")
return default_template
def set_default_template(self, template_type: str, template_id: str):
"""设置默认模板"""
# 参数验证
if not template_type or not isinstance(template_type, str):
logger.error("无效的template_type参数")
raise ValueError("template_type不能为空且必须是字符串")
if not template_id or not isinstance(template_id, str):
logger.error("无效的template_id参数")
raise ValueError("template_id不能为空且必须是字符串")
try:
config_file = self.DEFAULT_CONFIG_DIR / "config.json"
config = {}
# 读取现有配置
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
except json.JSONDecodeError:
logger.warning("配置文件JSON解析失败,将创建新配置")
except Exception as e:
logger.warning(f"读取配置文件失败: {str(e)}")
# 更新模板配置
config["default_template"] = {"type": template_type, "id": template_id}
# 写入配置
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
logger.info(f"默认模板设置成功: type={template_type}, id={template_id}")
except PermissionError as e:
logger.error(f"写入配置文件权限不足: {str(e)}")
raise
except Exception as e:
logger.error(f"设置默认模板失败: {str(e)}")
raise
# ========== 图片来源和模型偏好 ==========
def get_image_preferences(self) -> Dict:
"""获取图片来源和模型偏好配置"""
config_file = self.DEFAULT_CONFIG_DIR / "config.json"
defaults = {
"image_source": None, # "ai" 或 "search"
"ai_model": None, # 模型 ID 如 "glm-5", "dall-e-3", "kimi" 等
}
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
defaults["image_source"] = data.get("image_source")
defaults["ai_model"] = data.get("ai_model")
except Exception as e:
logger.warning(f"读取图片偏好失败: {str(e)}")
return defaults
def set_image_source_preference(self, source: str):
"""设置图片来源偏好"""
if source not in ("ai", "search"):
raise ValueError("图片来源只支持 'ai' 或 'search'")
config_file = self.DEFAULT_CONFIG_DIR / "config.json"
config = {}
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
except Exception:
pass
config["image_source"] = source
# 更换图片来源时,清空模型偏好(需重新选择)
if source == "search":
config.pop("ai_model", None)
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
logger.info(f"图片来源偏好已保存: {source}")
def set_ai_model_preference(self, model_id: str):
"""设置AI生图模型偏好"""
if not model_id:
raise ValueError("模型ID不能为空")
config_file = self.DEFAULT_CONFIG_DIR / "config.json"
config = {}
if config_file.exists():
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
except Exception:
pass
config["image_source"] = "ai"
config["ai_model"] = model_id
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
logger.info(f"AI模型偏好已保存: {model_id}")
# 全局配置实例
config = Config()
FILE:src/exceptions.py
"""
微信公众号自动化 - 自定义异常
"""
class WeChatAutoException(Exception):
pass
class CredentialError(WeChatAutoException):
pass
class TokenError(WeChatAutoException):
pass
class APIError(WeChatAutoException):
def __init__(self, errcode: int, errmsg: str):
self.errcode = errcode
self.errmsg = errmsg
super().__init__(f"错误码 {errcode}: {errmsg}")
ERROR_CODES = {
-1: "系统繁忙",
40001: "access_token 无效",
40013: "appid 错误",
40125: "appsecret 错误",
40164: "IP 不在白名单",
41002: "缺少 appid",
41004: "缺少 secret",
42001: "access_token 过期",
45009: "频率限制",
}
def get_error_message(errcode: int) -> str:
return ERROR_CODES.get(errcode, f"未知错误 ({errcode})")
FILE:src/first_time_setup.py
"""
微信公众号自动化 - 首次使用引导
"""
import json
from pathlib import Path
from typing import List, Dict
SETUP_GUIDE = """
📋 首次使用配置指南
━━━━━━━━━━━━━━━━━━━━━━━━━━━━
1️⃣ 获取 AppID 和 AppSecret
网址: https://mp.weixin.qq.com/
路径: 登录 → 开发 → 基本配置
- AppID:应用ID
- AppSecret:应用密钥(点击"启用"获取)
2️⃣ 设置 IP 白名单
路径: 开发 → 基本配置 → IP白名单
- 添加服务器公网 IP
3️⃣ 配置文件
位置: ~/.config/wechat-mp-auto/config.json
━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
class FirstTimeSetup:
"""首次使用引导"""
CONFIG_DIR = Path.home() / ".config" / "wechat-mp-auto"
CONFIG_FILE = CONFIG_DIR / "config.json"
@classmethod
def check_and_prompt(cls) -> dict:
"""检查是否首次使用"""
if not cls.CONFIG_FILE.exists():
return {"is_first_time": True, "message": SETUP_GUIDE, "step": "credentials"}
try:
with open(cls.CONFIG_FILE) as f:
config = json.load(f)
except Exception:
return {"is_first_time": True, "message": SETUP_GUIDE, "step": "credentials"}
if not config.get("app_id") or not config.get("app_secret"):
return {"is_first_time": True, "message": SETUP_GUIDE, "step": "credentials"}
if not config.get("default_template"):
return {"is_first_time": True, "message": "请选择默认模板", "templates": cls._get_available_templates(), "step": "template"}
return {"is_first_time": False, "message": "配置完成", "step": None}
@classmethod
def _get_available_templates(cls) -> List[dict]:
templates = []
themes_dir = Path(__file__).parent.parent.parent / "themes"
if themes_dir.exists():
for f in themes_dir.glob("*.yaml"):
templates.append({"id": f.stem, "name": f.stem, "type": "local"})
return templates if templates else [{"id": "default", "name": "默认主题", "type": "local"}]
@classmethod
def setup_credentials(cls, app_id: str, app_secret: str):
cls.CONFIG_DIR.mkdir(parents=True, exist_ok=True)
config = {}
if cls.CONFIG_FILE.exists():
with open(cls.CONFIG_FILE) as f:
config = json.load(f)
config["app_id"] = app_id
config["app_secret"] = app_secret
with open(cls.CONFIG_FILE, "w") as f:
json.dump(config, f, indent=2)
@classmethod
def set_default_template(cls, template_type: str, template_id: str):
config = {}
if cls.CONFIG_FILE.exists():
with open(cls.CONFIG_FILE) as f:
config = json.load(f)
config["default_template"] = {"type": template_type, "id": template_id}
with open(cls.CONFIG_FILE, "w") as f:
json.dump(config, f, indent=2)
@classmethod
def get_status(cls) -> dict:
if not cls.CONFIG_FILE.exists():
return {"status": "not_configured", "message": SETUP_GUIDE}
try:
with open(cls.CONFIG_FILE) as f:
config = json.load(f)
except Exception:
return {"status": "error", "message": SETUP_GUIDE}
has_cred = bool(config.get("app_id") and config.get("app_secret"))
has_template = bool(config.get("default_template"))
if not has_cred:
return {"status": "need_credentials", "message": SETUP_GUIDE}
elif not has_template:
return {"status": "need_template", "message": "请选择默认模板"}
else:
return {"status": "ready", "message": "配置完成"}
FILE:src/formatters/__init__.py
# formatters 模块初始化
FILE:src/publish.py
#!/usr/bin/env python3
"""
微信公众号文章发布脚本 - 优化版
用法:
python3 publish.py --markdown <md文件> --title <标题> [选项]
选项:
-m, --markdown Markdown 文件路径
-t, --title 文章标题
-a, --author 作者 (默认: 贾维斯)
-c, --cover 封面图路径
-s, --source-url 原文链接 (默认: https://openclaw.ai)
--theme 主题名称 (默认: default)
--check-only 仅检查,不推送
-v, --verbose 显示详细日志
示例:
python3 publish.py -m article.md -t "我的文章" -a "作者"
python3 publish.py -m article.md --check-only # 仅检查
"""
import sys
import os
import re
import argparse
import logging
from pathlib import Path
from typing import Optional, Tuple, List, Dict
from functools import wraps
# 配置路径
SCRIPT_DIR = Path(__file__).parent
sys.path.insert(0, str(SCRIPT_DIR))
from skills.article_writer import ArticleWriterSkill
from skills.material_skill import MaterialSkill
from skills.draft_skill import DraftSkill
from skills.image_generator import ImageGeneratorSkill
from skills.content_reviewer import ContentReviewerSkill
from config import Config
from token_manager import TokenManager
def setup_logging(verbose: bool = False):
"""配置日志"""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%H:%M:%S'
)
return logging.getLogger(__name__)
def timer(func):
"""性能计时装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
import time
start = time.time()
result = func(*args, **kwargs)
elapsed = time.time() - start
logging.debug(f"{func.__name__} 耗时: {elapsed:.2f}s")
return result
return wrapper
def parse_args():
parser = argparse.ArgumentParser(
description='发布公众号文章',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument('--markdown', '-m', type=str, help='Markdown 文件路径')
parser.add_argument('--title', '-t', type=str, help='文章标题')
parser.add_argument('--author', '-a', type=str, default='贾维斯', help='作者 (默认: 贾维斯)')
parser.add_argument('--cover', '-c', type=str, help='封面图路径 (可选)')
parser.add_argument('--source-url', '-s', type=str, default='https://openclaw.ai', help='原文链接')
parser.add_argument('--theme', type=str, default='default', help='主题名称')
parser.add_argument('--check-only', action='store_true', help='仅检查,不推送')
parser.add_argument('--verbose', '-v', action='store_true', help='显示详细日志')
return parser.parse_args()
@timer
def check_markdown_conversion(html: str, markdown: str) -> dict:
"""检查 markdown 转 HTML 的完整性"""
issues = []
# 根据原始 markdown 内容决定检查项(更精确的检测)
has_bold = "**" in markdown or "__" in markdown
# 斜体:单个 * 但不是 ** 或 *
has_italic = re.search(r'(?<!\*)\*[^*]+\*(?!\*)', markdown) or re.search(r'(?<!_)__[^_]+__(?!_)', markdown)
has_code_block = "```" in markdown
has_quote = re.search(r'^>', markdown, re.MULTILINE)
# 文字链接:[text](url) 且不是图片 
has_link = bool(re.search(r'(?<!\[)!?\[[^\]]+\]\([^)]+\)', markdown)) and not all(line.strip().startswith('
has_image = "!["
# 只检查原始文档中存在的语法
checks = [
(r'<strong>', "加粗", has_bold),
(r'<em>', "斜体", has_italic),
(r'<pre[^>]*>', "代码块", has_code_block),
(r'<blockquote[^>]*>', "引用", has_quote),
(r'<a[^>]*href=', "链接", has_link),
(r'<img[^>]*src=', "图片", has_image),
]
for pattern, name, needed in checks:
if needed and not re.search(pattern, html, re.IGNORECASE):
issues.append(f"{name}未正确转换")
return {"passed": len(issues) == 0, "issues": issues}
def check_article_integrity(markdown: str = None, html: str = None,
draft_content: str = None, stage: str = "unknown") -> Dict:
"""
共通的文章完整性检查函数
在三个检查点调用:
1. check_article_integrity(markdown=md, stage="markdown") - Markdown 生成后
2. check_article_integrity(html=html, markdown=md, stage="html") - HTML 转换后
3. check_article_integrity(draft_content=content, stage="draft") - 草稿上传后
Returns:
{"passed": bool, "issues": [], "warnings": [], "stats": {}}
"""
issues = []
warnings = []
stats = {}
cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "images"
# ====== 阶段1: Markdown 检查 ======
if markdown is not None:
stats["markdown_length"] = len(markdown)
stats["markdown_lines"] = len(markdown.split('\n'))
# 检查标题
h1_count = len(re.findall(r'^#\s+', markdown, re.MULTILINE))
h2_count = len(re.findall(r'^##\s+', markdown, re.MULTILINE))
h3_count = len(re.findall(r'^###\s+', markdown, re.MULTILINE))
stats["h1_count"] = h1_count
stats["h2_count"] = h2_count
stats["h3_count"] = h3_count
if h1_count == 0:
issues.append("缺少一级标题 (# 标题)")
# 检查图片语法
cover_imgs = re.findall(r'!\[封面\]\(([^)]+)\)', markdown)
section_imgs = re.findall(r'!\[([^\]]+)\]\(([^)]+)\)', markdown)
all_imgs = cover_imgs + [path for _, path in section_imgs]
stats["cover_images"] = len(cover_imgs)
stats["section_images"] = len(section_imgs)
stats["total_images"] = len(all_imgs)
if len(cover_imgs) == 0:
warnings.append("Markdown 中没有封面图 ()")
# 检查图片文件是否存在(仅检查本地路径)
missing_images = []
for img_path in all_imgs:
if not os.path.isabs(img_path) and not img_path.startswith('http'):
# 本地路径,检查 cache 目录
if cache_dir.exists():
if not (cache_dir / img_path).exists():
missing_images.append(img_path)
if missing_images:
warnings.append(f"Markdown 中有 {len(missing_images)} 个图片文件不存在: {missing_images[:3]}...")
logging.info(f"[{stage}] Markdown 检查: 标题={h1_count}/{h2_count}/{h3_count}, 图片={len(all_imgs)}")
# ====== 阶段2: HTML 检查 ======
if html is not None:
stats["html_length"] = len(html)
# 检查 HTML 标签完整性
if not html.strip().startswith('<div'):
issues.append("HTML 内容不是以 <div> 开头")
# 检查图片标签
img_tags = re.findall(r'<img[^>]*>', html)
stats["html_images"] = len(img_tags)
# 检查图片 src 是否为空
empty_src = [img for img in img_tags if re.search(r'<img\s+[^>]*>', img) and 'src=' not in img]
broken_images = [img for img in img_tags if 'src=""' in img or "src=''" in img]
if empty_src:
issues.append(f"HTML 中有 {len(empty_src)} 个图片没有 src 属性 (如 <img />)")
if broken_images:
issues.append(f"HTML 中有 {len(broken_images)} 个图片 src 为空")
# 检查是否有外部 URL 图片
external_images = re.findall(r'<img src="(http[^\"]+)"', html)
stats["external_images"] = len(external_images)
# 检查 local URL 图片(需要上传的)
local_images = re.findall(r'<img src="([^h][^t][^t][^p][^:][^/][^/].+?)"', html)
stats["local_images"] = len(local_images)
if len(local_images) > 0 and len(external_images) == 0:
warnings.append(f"HTML 中有 {len(local_images)} 个本地图片未上传到微信")
# 检查格式标签
has_strong = bool(re.search(r'<strong>', html))
has_em = bool(re.search(r'<em>', html))
has_pre = bool(re.search(r'<pre', html))
has_blockquote = bool(re.search(r'<blockquote', html))
stats["has_bold"] = has_strong
stats["has_italic"] = has_em
stats["has_code"] = has_pre
stats["has_quote"] = has_blockquote
logging.info(f"[{stage}] HTML 检查: 图片={len(img_tags)} (空src={len(empty_src)}, 本地={len(local_images)}, 外部={len(external_images)})")
# ====== 阶段3: 草稿内容检查 ======
if draft_content is not None:
stats["draft_length"] = len(draft_content)
# 检查图片标签
img_tags = re.findall(r'<img[^>]*>', draft_content)
stats["draft_images"] = len(img_tags)
# 检查空 src
empty_src = [img for img in img_tags if 'src=' not in img]
broken_images = [img for img in img_tags if 'src=""' in img or "src=''" in img or re.search(r'src=" *$"', img)]
if empty_src:
issues.append(f"草稿中有 {len(empty_src)} 个图片没有 src 属性 (如 <img />)")
if broken_images:
issues.append(f"草稿中有 {len(broken_images)} 个图片 src 为空")
# 检查外部 URL(微信使用 data-src 或 src)
data_src_images = re.findall(r'<img[^>]*data-src="(https?://[^"]+)"', draft_content)
external_images = re.findall(r'<img src="(https?://[^"]+)"', draft_content)
all_external = data_src_images + external_images
stats["draft_external_images"] = len(all_external)
stats["draft_src_images"] = len(external_images)
stats["draft_data_src_images"] = len(data_src_images)
if len(all_external) == 0 and len(img_tags) > 0:
warnings.append("草稿中没有外部 URL 图片,可能未上传到微信")
# 检查图片 URL 是否可访问(针对外部 URL)
if all_external:
import urllib.request
bad_urls = []
for url in all_external[:6]: # 最多检查6张
try:
req = urllib.request.Request(url, method='HEAD')
req.add_header('User-Agent', 'Mozilla/5.0')
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status != 200:
bad_urls.append(f"{url[:60]}... (HTTP {resp.status})")
except Exception as e:
bad_urls.append(f"{url[:60]}... ({type(e).__name__})")
if bad_urls:
issues.append(f"有 {len(bad_urls)} 个图片 URL 无效或无法访问: {bad_urls[:3]}{'...' if len(bad_urls) > 3 else ''}")
# 检查编码
try:
draft_content.encode('utf-8')
stats["encoding_ok"] = True
except UnicodeEncodeError:
issues.append("草稿内容编码异常")
# 检查乱码
if '�' in draft_content:
warnings.append("草稿内容包含乱码字符")
logging.info(f"[{stage}] 草稿检查: 图片={len(img_tags)}, 空src={len(empty_src)}, 外部URL={len(external_images)}")
# 返回结果
passed = len(issues) == 0
result = {
"passed": passed,
"stage": stage,
"issues": issues,
"warnings": warnings,
"stats": stats
}
# 打印检查结果
if issues:
logging.error(f"❌ [{stage}] 检查未通过:")
for issue in issues:
logging.error(f" - {issue}")
if warnings:
logging.warning(f"⚠️ [{stage}] 检查警告:")
for warn in warnings:
logging.warning(f" - {warn}")
if passed and not warnings:
logging.info(f"✅ [{stage}] 检查通过")
return result
@timer
def insert_images_to_content(html: str, material_skill, dry_run: bool = False) -> Tuple[str, List[str]]:
"""处理文章中的图片:上传到微信并替换 HTML 中的图片路径"""
import re
from pathlib import Path
img_pattern = r'<img src="([^"]+)"[^>]*>'
matches = re.findall(img_pattern, html)
if not matches:
return html, []
# 图片缓存目录
cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "images"
processed = []
for img_path in matches:
if img_path in processed:
continue
# 检查文件是否存在
actual_path = img_path
if not os.path.exists(img_path):
# 尝试在 cache 目录查找
if cache_dir.exists():
potential = cache_dir / img_path
if potential.exists():
actual_path = str(potential)
if not os.path.exists(actual_path):
# 外部 URL:下载后上传到微信
if img_path.startswith('http://') or img_path.startswith('https://'):
try:
import tempfile
import urllib.request
ext = os.path.splitext(img_path.split('?')[0])[1] or '.jpg'
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp:
tmp_path = tmp.name
urllib.request.urlretrieve(img_path, tmp_path)
result = material_skill.upload_image(tmp_path)
wechat_url = result.get('url')
if wechat_url:
html = html.replace(f'<img src="{img_path}"', f'<img src="{wechat_url}"')
logging.info(f"✓ 外部图片上传成功: {img_path[:60]}...")
else:
logging.warning(f"✗ 外部图片上传失败: {img_path[:60]}")
os.unlink(tmp_path)
processed.append(img_path)
continue
except Exception as e:
logging.error(f"✗ 外部图片处理失败: {img_path[:60]}, 错误: {e}")
processed.append(img_path)
continue
logging.warning(f"图片文件不存在: {img_path}, 跳过")
continue
try:
if dry_run:
logging.info(f"[dry-run] 会上传图片: {actual_path}")
processed.append(img_path)
continue
result = material_skill.upload_image(actual_path)
wechat_url = result.get('url')
if wechat_url:
# 替换图片路径
html = html.replace(f'<img src="{img_path}"', f'<img src="{wechat_url}"')
logging.info(f"✓ 图片上传成功: {os.path.basename(actual_path)}")
processed.append(img_path)
else:
logging.warning(f"✗ 图片上传失败,无 URL: {img_path}")
except Exception as e:
logging.error(f"✗ 处理图片失败: {img_path}, 错误: {e}")
return html, processed
@timer
def find_cover_image(theme: str, title: str = "") -> Optional[str]:
"""自动查找或生成封面图"""
cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "images"
# 如果有标题,尝试从图库搜横向封面
if title:
try:
sys.path.insert(0, str(Path(__file__).parent))
from skills.image_generator import ImageGeneratorSkill
img_gen = ImageGeneratorSkill()
# 优先用标题搜横向封面
images = img_gen._search_all(title, count=5)
for img_info in images:
path = img_gen._download_image(img_info, "cover",
max_width=900, max_height=500)
if path:
logging.info(f"图库封面搜索成功: {path}")
return path
except Exception as e:
logging.warning(f"图库封面搜索失败: {e}")
# fallback:从缓存选大文件(>500KB),跳过缩略图
if cache_dir.exists():
covers = [p for p in cache_dir.glob("cover_*.jpg")
if "_thumb" not in p.name and p.stat().st_size > 500000]
if covers:
latest = max(covers, key=lambda p: p.stat().st_mtime)
return str(latest)
all_covers = [p for p in cache_dir.glob("cover_*.jpg")
if p.stat().st_size > 500000]
if all_covers:
latest = max(all_covers, key=lambda p: p.stat().st_mtime)
return str(latest)
return None
@timer
def generate_cover_image(title: str, img_generator: ImageGeneratorSkill) -> Optional[str]:
"""生成封面图"""
try:
logging.info(f"尝试自动生成封面图: {title[:30]}...")
cover_path = img_generator.generate_cover(title, [])
if cover_path:
logging.info(f"✓ 封面图生成成功: {cover_path}")
return cover_path
except Exception as e:
logging.error(f"✗ 封面图生成失败: {e}")
return None
def validate_args(args) -> bool:
"""验证命令行参数"""
if not args.markdown:
default_md = '/tmp/openclaw-final.md'
if os.path.exists(default_md):
args.markdown = default_md
logging.info(f"使用默认 Markdown 文件: {default_md}")
else:
logging.error("请指定 --markdown 参数")
return False
if not os.path.exists(args.markdown):
logging.error(f"Markdown 文件不存在: {args.markdown}")
return False
if not args.title:
args.title = os.path.splitext(os.path.basename(args.markdown))[0]
logging.info(f"使用文件名作为标题: {args.title}")
return True
@timer
def initialize_components() -> Dict:
"""初始化组件"""
config = Config()
app_id, app_secret = config.get_credentials()
token_mgr = TokenManager(app_id, app_secret)
return {
'writer': ArticleWriterSkill(),
'material': MaterialSkill(token_mgr),
'draft': DraftSkill(token_mgr),
'img_generator': ImageGeneratorSkill(),
}
@timer
def publish_article(args, components) -> bool:
"""发布文章主流程"""
# 1. 读取 Markdown
logging.info(f"读取 Markdown: {args.markdown}")
with open(args.markdown, 'r', encoding='utf-8') as f:
markdown = f.read()
logging.info(f"✓ 读取成功,共 {len(markdown)} 字符")
# ====== 检查点1: Markdown 生成/读取后 ======
logging.info("=" * 50)
logging.info("【检查点1】Markdown 完整性检查")
logging.info("=" * 50)
md_check = check_article_integrity(markdown=markdown, stage="markdown")
# ====== 内容审核检查 ======
logging.info("=" * 50)
logging.info("【内容审核】合规性 & 重复度检查")
logging.info("=" * 50)
# 初始化内容审核器
content_reviewer = ContentReviewerSkill()
article = {"markdown": markdown, "content": markdown}
review_result = content_reviewer.review_article(article)
if review_result.get("passed"):
logging.info("✅ 内容审核通过")
else:
logging.error("❌ 内容审核未通过:")
for issue in review_result.get("issues", []):
severity = issue.get("severity", "unknown")
msg = issue.get("message", "")
logging.error(f" [{severity}] {msg}")
# 检查违规内容
prohibited = review_result.get("prohibited", {})
if prohibited.get("has_violations"):
violations = prohibited.get("violations", [])
logging.error(f"❌ 发现 {len(violations)} 个违规词: {[v.get('word') for v in violations]}")
if not args.check_only:
logging.error("请修改内容后重试")
return False
# 检查重复度
plagiarism = review_result.get("plagiarism", {})
similarity = plagiarism.get("similarity", 0)
is_duplicated = plagiarism.get("is_duplicated", False)
if is_duplicated or similarity > 30:
logging.warning(f"⚠️ 文章重复度较高: {similarity}%")
logging.info(f" - 重复度: {similarity}%")
logging.info(f" - 敏感词检查: {'通过' if not prohibited.get('has_violations') else '未通过'}")
# 2. 转换为 HTML
logging.info("转换 Markdown -> HTML...")
html = components['writer'].convert_to_html(markdown, args.theme)
logging.info("✓ 转换完成")
# ====== 检查点2: HTML 转换后 ======
logging.info("=" * 50)
logging.info("【检查点2】HTML 完整性检查")
logging.info("=" * 50)
html_check = check_article_integrity(markdown=markdown, html=html, stage="html")
# 3. 检查转换质量(保留原有检查)
logging.info("检查转换质量...")
check_result = check_markdown_conversion(html, markdown)
if not check_result["passed"]:
logging.warning("⚠️ 转换检查发现问题:")
for issue in check_result["issues"]:
logging.warning(f" - {issue}")
if not args.check_only:
logging.warning("请修复后再推送,5秒后继续...")
import time
time.sleep(5)
else:
logging.info("✓ 转换检查通过")
# 4. 处理封面图
cover_path = args.cover or find_cover_image(args.theme, args.title)
if not cover_path:
logging.info("未找到封面图,尝试自动生成...")
cover_path = generate_cover_image(args.title, components['img_generator'])
if not cover_path or not os.path.exists(cover_path):
logging.error("✗ 封面图不可用,请使用 --cover 指定")
return False
logging.info(f"✓ 使用封面图: {os.path.basename(cover_path)}")
# 5. 处理文章插图
logging.info("处理文章插图...")
html, processed_images = insert_images_to_content(html, components['material'], args.check_only)
if processed_images:
logging.info(f"✓ 已处理 {len(processed_images)} 张插图")
else:
logging.info("○ 无需处理的插图")
if args.check_only:
logging.info("检查模式完成,未推送")
return True
# 6. 上传封面图
logging.info("上传封面图...")
cover_result = components['material'].upload_image(cover_path)
media_id = cover_result.get('media_id')
cover_wechat_url = cover_result.get('url', '')
logging.info(f"✓ 封面上传成功: {cover_wechat_url[:40]}...")
# 6.1 替换封面图占位符(封面图在 6.2 单独处理)
if cover_wechat_url:
count = html.count('src="cover_image_url"')
html = html.replace('src="cover_image_url"', f'src="{cover_wechat_url}"')
logging.info(f"✓ 封面图占位符已替换({count}处)")
else:
logging.warning("⚠️ 封面图无 URL,占位符未替换")
# 6.2 搜索并上传章节图
section_placeholder_pattern = re.compile(r'src="([^"]+_url)"')
section_placeholders = section_placeholder_pattern.findall(html)
if section_placeholders:
logging.info(f"处理 {len(section_placeholders)} 张章节图...")
img_gen = components.get('img_generator')
for ph in section_placeholders:
keyword = ph.replace('_url', '').strip()
wechat_url = None
# 优先用图库搜索(search_image 返回 local_path)
if img_gen:
try:
images = img_gen.search_image(keyword, count=3)
if images:
local_path = images[0].get('local_path')
if local_path and os.path.exists(local_path):
up_result = components['material'].upload_image(local_path)
wechat_url = up_result.get('url')
logging.info(f"✓ 章节图上传成功 [{keyword}]: {wechat_url[:40]}...")
except Exception as e:
logging.warning(f" 章节图搜索失败 [{keyword}]: {e}")
# 上传失败则 fallback 到封面图
if not wechat_url:
wechat_url = cover_wechat_url
logging.warning(f" 章节图 fallback 封面 [{keyword}]")
html = html.replace(f'src="{ph}"', f'src="{wechat_url}"')
logging.info(f"✓ 章节图处理完成({len(section_placeholders)}张)")
# 7. 添加 meta 并保存
html = '<meta charset="utf-8">\n' + html
output_html = args.markdown.replace('.md', '.html')
with open(output_html, 'w', encoding='utf-8') as f:
f.write(html)
logging.info(f"✓ HTML 已保存: {output_html}")
# 8. 创建草稿
logging.info("创建草稿...")
article = {
'title': args.title,
'author': args.author,
'content': html,
'thumb_media_id': media_id,
'content_source_url': args.source_url
}
result = components['draft'].create_draft([article])
draft_id = result.get('media_id')
# ====== 检查点3: 草稿上传后 ======
logging.info("=" * 50)
logging.info("【检查点3】草稿完整性检查")
logging.info("=" * 50)
# 获取刚创建的草稿内容进行验证
try:
draft_detail = components['draft'].get_draft(draft_id)
draft_content = draft_detail.get('news_item', [{}])[0].get('content', '')
draft_check = check_article_integrity(draft_content=draft_content, stage="draft")
except Exception as e:
logging.warning(f"获取草稿详情失败,跳过检查: {e}")
logging.info("=" * 50)
logging.info("🎉 发布完成!")
logging.info(f" 草稿 ID: {draft_id}")
logging.info(f" 标题: {args.title}")
logging.info(f" 作者: {args.author}")
logging.info("=" * 50)
return True
def main():
args = parse_args()
logger = setup_logging(args.verbose)
logging.info("微信公众号文章发布工具")
logging.info("=" * 50)
# 验证参数
if not validate_args(args):
sys.exit(1)
# 初始化组件
logging.info("初始化组件...")
components = initialize_components()
# 发布文章
success = publish_article(args, components)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
FILE:src/skills/analytics_skill.py
"""
微信公众号自动化 - 数据分析 Skill
"""
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from .base_skill import BaseSkill
class AnalyticsSkill(BaseSkill):
"""数据分析"""
def get_article_stats(self, begin_date: str, end_date: Optional[str] = None) -> Dict:
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
data = {"begin_date": begin_date.replace("-", ""), "end_date": end_date.replace("-", "")}
result = self.post("/cgi-bin/analysis/get_article_summary", data)
articles = result.get("list", [])
stats = []
for a in articles:
stats.append({
"title": a.get("title", ""),
"read_count": a.get("read_count", 0),
"like_count": a.get("like_count", 0),
"share_count": a.get("share_count", 0)
})
return {"begin_date": begin_date, "end_date": end_date, "articles": stats}
def get_user_stats(self, begin_date: str, end_date: Optional[str] = None) -> Dict:
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
data = {"begin_date": begin_date.replace("-", ""), "end_date": end_date.replace("-", "")}
result = self.post("/cgi-bin/analysis/get_user_summary", data)
return {"begin_date": begin_date, "end_date": end_date, "users": result.get("list", [])}
def get_article_ranking(self, begin_date: str, end_date: Optional[str] = None, limit: int = 10) -> List[Dict]:
stats = self.get_article_stats(begin_date, end_date)
articles = stats.get("articles", [])
sorted_articles = sorted(articles, key=lambda x: x.get("read_count", 0), reverse=True)
return sorted_articles[:limit]
def generate_report(self, stats: Dict) -> str:
articles = stats.get("articles", [])
if not articles:
return "暂无数据"
total_read = sum(a.get("read_count", 0) for a in articles)
total_like = sum(a.get("like_count", 0) for a in articles)
return f"""
📊 数据报告
━━━━━━━━━━━━━━━
文章数: {len(articles)}
总阅读: {total_read:,}
总点赞: {total_like:,}
平均阅读: {total_read // len(articles) if articles else 0:,}
"""
def track_article(self, media_id: str, days: int = 7) -> Dict:
end_date = datetime.now()
begin_date = end_date - timedelta(days=days)
stats = self.get_article_stats(begin_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))
return {"media_id": media_id, "stats": stats, "report": self.generate_report(stats)}
FILE:src/skills/article_writer.py
"""
微信公众号自动化 - 文章写作 Skill
增加错误处理和安全性
"""
import re
import os
import uuid
import logging
from typing import Dict, Optional, List
from pathlib import Path
logger = logging.getLogger(__name__)
class ArticleWriterSkill:
"""文章写作"""
def __init__(self):
self._themes_dir = Path(__file__).parent.parent.parent / "themes"
self._image_generator = None
self._cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "images"
def _get_image_generator(self):
"""延迟加载图片生成器"""
if self._image_generator is None:
try:
from .image_generator import ImageGeneratorSkill
self._image_generator = ImageGeneratorSkill()
except Exception as e:
logger.warning(f"图片生成器初始化失败: {e}")
return self._image_generator
def write_article(self, topic: str, outline: Dict, template: Optional[Dict] = None,
generate_images: bool = True,
material_skill=None,
content: Optional[str] = None,
section_images: Optional[Dict[str, str]] = None,
cover_image: Optional[str] = None) -> Dict:
"""撰写文章
Args:
topic: 文章主题
outline: 文章大纲,包含 title 和 sections
template: 模板配置
generate_images: 是否自动生成配图(默认 True)
material_skill: 素材管理技能实例,用于上传图片(可选)
content: 预设的文章内容(markdown格式),有值时优先使用此内容而非outline生成
section_images: 预设的章节图片URL字典,格式 {"章节名": "图片URL"}
cover_image: 预设的封面图片URL,有值时跳过封面生成
"""
try:
# 参数验证
if not topic:
raise ValueError("topic 不能为空")
if not isinstance(outline, dict):
raise ValueError("outline 必须是 dict")
# 如果没有传 template,自动读取配置中的默认模板
if template is None:
from config import Config
config = Config()
template = config.get_default_template()
theme = template.get("id", "default") if template else "default"
# 生成内容
sections = outline.get("sections", [])
title = outline.get('title', topic)
# 确保 cache 目录存在
if not self._cache_dir.exists():
self._cache_dir.mkdir(parents=True, exist_ok=True)
# 用于跟踪所有图片URL(local_path -> wechat_url)
image_url_map = {}
# ========== 1. 处理封面图 ==========
cover_wechat_url = cover_image # 预设的封面URL
cover_path = None
if not cover_wechat_url and generate_images:
img_gen = self._get_image_generator()
if img_gen and material_skill:
# 检查用户是否已完成图片来源选择
choice_check = img_gen._check_and_prompt_selection("cover")
if not choice_check.get("proceed"):
# 返回选择提示,终止生成流程
return {
"need_user_choice": True,
"choice_type": choice_check["choice_info"].get("choice_type"),
"choice_info": choice_check["choice_info"],
}
try:
logger.info(f"开始生成并上传封面图: {title[:30]}...")
kw = outline.get("cover_keywords", []) if isinstance(outline.get("cover_keywords"), list) else []
cover_result = img_gen.generate_and_upload(title, kw, material_skill, "cover")
if cover_result.get("wechat_url"):
cover_wechat_url = cover_result["wechat_url"]
cover_path = cover_result.get("local_path")
image_url_map[cover_path] = cover_wechat_url
logger.info(f"封面上传成功: {cover_wechat_url[:50]}...")
except Exception as e:
logger.warning(f"封面图生成上传失败: {e}")
# ========== 2. 处理章节插图 ==========
section_wechat_urls = {} # {section_name: wechat_url}
# 优先使用预设的章节图片URL(在 generate_images 条件之外)
if section_images:
for section_name, section_url in section_images.items():
section_wechat_urls[section_name] = section_url
logger.info(f"使用预设章节图: {section_name}")
# 只有需要自动生成时才进入图片生成逻辑
if generate_images and material_skill:
img_gen = self._get_image_generator()
if img_gen:
for i, section in enumerate(sections):
section_name = section.get("name", "")
if not section_name:
continue
# 跳过已有预设图片的章节(已在上面处理)
if section_name in section_wechat_urls:
continue
# 自动生成并上传
try:
logger.info(f"生成章节{i+1}插图: {section_name}...")
kw = section.get("keywords", []) if isinstance(section.get("keywords"), list) else []
illust_result = img_gen.generate_and_upload(section_name, kw, material_skill, "illustration")
if illust_result.get("wechat_url"):
section_wechat_urls[section_name] = illust_result["wechat_url"]
image_url_map[illust_result.get("local_path", "")] = illust_result["wechat_url"]
logger.info(f"章节{i+1}上传成功: {illust_result['wechat_url'][:50]}...")
except Exception as e:
logger.warning(f"生成章节插图失败: {section_name} - {e}")
# ========== 3. 构建文章内容 ==========
# 如果传入了 content(AI生成的完整内容),优先使用
if content and isinstance(content, str) and content.strip():
logger.info(f"使用预设内容,字数: {len(content)}")
markdown_content = content.strip()
# 插入封面图(插到标题之后)
if cover_wechat_url:
# 尝试找到第一个标题,把封面图插到它后面
title_match = re.search(r'^(#{1,6}\s+.+?\n)', markdown_content, re.MULTILINE)
if title_match:
pos = title_match.end()
markdown_content = markdown_content[:pos] + f"\n\n" + markdown_content[pos:]
else:
markdown_content = f"\n\n" + markdown_content
# 插入章节图(在对应的 ## 标题后面)
# AI 可能生成 "第X章: 引言" 格式的标题,需支持可选 "第X章:" 前缀
CHAPTER_PREFIX = r'(?:第\d+[章节篇][::\s]*)?' # 可选的 "第X章:" 前缀
for section_name, section_url in section_wechat_urls.items():
# 清理同一章节下可能残留的无效图片引用(本地路径或无效URL,非微信URL)
# 支持可选 "第X章:" 前缀
cleanup_pattern = r'^(\#{2,3}\s+' + CHAPTER_PREFIX + re.escape(section_name) + r'[^\n]*\n)\s*!\[([^\]]*)\]\((?!http)[^\)]+\)\n'
markdown_content = re.sub(cleanup_pattern, r'\1', markdown_content, flags=re.MULTILINE)
# 匹配 ## 章节名 或 ### 小节名,在标题行正后方插入图片(独占一行)
# 支持可选 "第X章:" 前缀
title_pattern = r'^(\#{2,3}\s+' + CHAPTER_PREFIX + re.escape(section_name) + r'[^\n]*\n)'
# 如果标题行后方已有一行有效的微信图片,则跳过(避免重复)
already_injected = re.search(
r'^(\#{2,3}\s+' + CHAPTER_PREFIX + re.escape(section_name) + r'[^\n]*\n)\s*!\[([^\]]*)\]\(http[^\)]+\)\n',
markdown_content, flags=re.MULTILINE
)
if already_injected:
logger.info(f"章节[{section_name}]已有有效图片,跳过注入")
continue
# 否则注入图片(独占一行)
injection = r'\1' + f'\n'
markdown_content = re.sub(title_pattern, injection, markdown_content, flags=re.MULTILINE)
else:
# 没有预设内容,从大纲构建
content_parts = [f"# {title}\n"]
# 封面图
if cover_wechat_url:
content_parts.insert(0, f"\n\n")
elif cover_path:
# 有本地路径但没上传成功,标记一下
content_parts.insert(0, f"\n\n")
# 章节内容
for i, section in enumerate(sections):
section_name = section.get("name", "")
key_points = section.get("key_points", [])
section_content = section.get("content", "")
content_parts.append(f"\n## {section_name}\n")
# 章节插图 - 优先用微信URL
if section_name in section_wechat_urls:
content_parts.append(f"\n\n")
# 如果有真实内容,使用它;否则用 key_points 生成
if section_content:
content_parts.append(f"{section_content}\n")
else:
for point in key_points:
content_parts.append(f"### {point}\n")
content_parts.append(f"这是关于 {point} 的详细内容...\n\n")
markdown_content = "".join(content_parts)
# ========== 4. 处理图片URL ==========
# 将微信图片URL中的特殊字符(* _)编码,避免被markdown格式转换破坏
# 例如: http://mmbiz.qpic.cn/sz*mmbiz*jpg -> http://mmbiz.qpic.cn/sz%2Ambiz%2Ajpg
wx_url_map = {} # 编码后URL -> 原始URL
def encode_wx_url(match):
alt_text = match.group(1)
url = match.group(2)
if url.startswith("http"):
# URL编码特殊字符,避免markdown处理破坏
safe_url = url.replace('*', '%2A').replace('_', '%5F')
wx_url_map[safe_url] = url # 记录原始URL用于后续还原
return f""
# 本地路径:如果在 image_url_map 中有微信URL则替换
if url in image_url_map:
safe_url = image_url_map[url].replace('*', '%2A').replace('_', '%5F')
wx_url_map[safe_url] = image_url_map[url]
return f""
logger.warning(f"图片未上传到微信: {url}")
return match.group(0)
markdown_content = re.sub(r'!\[([^\]]*)\]\(([^)]+)\)', encode_wx_url, markdown_content)
# ========== 5. 转换为 HTML ==========
html_content = self.convert_to_html(markdown_content, theme, section_images=None)
# ========== 6. 还原微信图片URL ==========
for safe_url, original_url in wx_url_map.items():
html_content = html_content.replace(f'"{safe_url}"', f'"{original_url}"')
return {
"topic": topic,
"title": title,
"markdown": markdown_content,
"html": html_content,
"word_count": self.count_words(markdown_content),
"outline": outline,
"theme": theme,
"cover_path": cover_path,
"cover_wechat_url": cover_wechat_url,
"section_images": section_wechat_urls,
"images_generated": generate_images,
"images_uploaded": len([v for v in image_url_map.values() if v.startswith("http")])
}
except Exception as e:
logger.error(f"write_article 错误: {e}")
raise
def ensure_images_uploaded(self, content: str, material_skill) -> str:
"""
安全网:确保内容中的所有图片都已上传到微信
扫描 markdown 或 HTML 内容中的本地图片路径,
自动上传到微信素材库并替换为微信URL。
Args:
content: markdown 或 HTML 内容
material_skill: 素材管理技能实例
Returns:
替换后的内容,所有图片均使用微信URL
"""
if not content or not material_skill:
return content
# 匹配本地图片路径(相对于cache目录的路径或绝对路径)
import re
from pathlib import Path
cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "images"
def replace_local_image(match):
alt_or_tag = match.group(1) if match.lastindex >= 1 else ""
path_or_url = match.group(2) if match.lastindex >= 2 else ""
# 如果已经是http URL,跳过
if path_or_url.startswith("http"):
return match.group(0)
# 跳过已经是完整URL的情况(markdown和HTML通用)
if path_or_url.startswith("http"):
return match.group(0)
# 检查是否是本地文件
local_path = Path(path_or_url)
if not local_path.is_absolute():
# 相对路径,尝试相对于cache目录
local_path = cache_dir / path_or_url
if not local_path.exists():
logger.warning(f"本地图片不存在,跳过: {local_path}")
return match.group(0)
# 上传到微信
try:
logger.info(f"上传本地图片: {local_path}")
result = material_skill.upload_image(str(local_path))
wechat_url = result.get("url", "")
media_id = result.get("media_id", "")
if wechat_url:
logger.info(f"本地上传成功: {wechat_url[:50]}...")
# 替换URL
if match.lastindex and match.group(1):
# markdown格式 
return f""
else:
# 其他格式,替换path
return wechat_url
else:
logger.warning(f"本地上传失败: {local_path}")
return match.group(0)
except Exception as e:
logger.error(f"本地上传异常: {e}")
return match.group(0)
# 匹配 markdown 图片 
content = re.sub(r'!\[([^\]]*)\]\(([^)]+)\)', replace_local_image, content)
# 匹配 HTML img 标签 src 属性中的本地路径
content = re.sub(r'<img([^>]*)src="([^"]+)"([^>]*)>',
lambda m: self._replace_html_img_src(m, material_skill),
content)
return content
def _replace_html_img_src(self, match, material_skill):
"""替换HTML中img标签的本地src为微信URL"""
before_src = match.group(1)
src = match.group(2)
after_src = match.group(3)
if src.startswith("http"):
return match.group(0)
from pathlib import Path
cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "images"
local_path = Path(src)
if not local_path.is_absolute():
local_path = cache_dir / src
if not local_path.exists():
return match.group(0)
try:
result = material_skill.upload_image(str(local_path))
wechat_url = result.get("url", "")
if wechat_url:
logger.info(f"HTML图片上传成功: {wechat_url[:50]}...")
return f'<img{before_src}src="{wechat_url}"{after_src}>'
except Exception as e:
logger.warning(f"HTML图片上传失败: {e}")
return match.group(0)
def count_words(self, content: str) -> int:
"""统计字数"""
try:
chinese = len(re.findall(r'[\u4e00-\u9fff]', content))
english = len(re.findall(r'[a-zA-Z]+', content))
return chinese + english
except Exception as e:
logger.error(f"count_words 错误: {e}")
return 0
def _is_table_row(self, line: str) -> bool:
"""判断是否是大纲中的表格行"""
line = line.strip()
if not line.startswith('|') or not line.endswith('|'):
return False
# 至少有两列
parts = [p.strip() for p in line.split('|')]
parts = [p for p in parts if p] # 去掉空字符串
return len(parts) >= 2
def _convert_table_block(self, lines: List[str], start_idx: int, primary: str) -> tuple:
"""转换表格块,返回 (html_lines, end_idx)"""
table_lines = []
i = start_idx
while i < len(lines):
line = lines[i].strip()
if not line:
break
# 检查是否是表格行(|开头和结尾)
if not (line.startswith('|') and line.endswith('|')):
break
table_lines.append(line)
i += 1
if len(table_lines) < 2:
return None, start_idx
# 解析表格
def parse_row(row: str) -> List[str]:
parts = [p.strip() for p in row.split('|')]
return [p for p in parts if p] # 过滤空字符串
rows = [parse_row(line) for line in table_lines]
# 检查是否有分隔行(第二行是 --- 等)
has_header_separator = False
if len(rows) >= 2:
second_row = rows[1]
if all(re.match(r'^[-:]+$', cell) for cell in second_row):
has_header_separator = True
# 构建HTML表格
table_html = ['<table style="width:100%;border-collapse:collapse;margin:16px 0;font-size:14px;">']
# 表头
header_cells = rows[0]
table_html.append(' <thead><tr>')
for cell in header_cells:
cell_content = self._convert_inline_formatting(cell)
cell_content = self._escape_user_html(cell_content)
table_html.append(f' <th style="border:1px solid #ddd;padding:8px 12px;background:#f5f5f5;font-weight:bold;text-align:left;">{cell_content}</th>')
table_html.append(' </tr></thead>')
# 表体
body_start = 2 if has_header_separator else 1
if body_start < len(rows):
table_html.append(' <tbody>')
for row in rows[body_start:]:
table_html.append(' <tr>')
for cell in row:
cell_content = self._convert_inline_formatting(cell)
cell_content = self._escape_user_html(cell_content)
table_html.append(f' <td style="border:1px solid #ddd;padding:8px 12px;">{cell_content}</td>')
table_html.append(' </tr>')
table_html.append(' </tbody>')
table_html.append('</table>')
return table_html, i
def _read_theme(self, theme: str) -> dict:
"""读取并解析 theme YAML,返回扁平化配置字典"""
import yaml
theme_file = self._themes_dir / f"{theme}.yaml"
if theme_file.exists():
with open(theme_file) as f:
return yaml.safe_load(f) or {}
return {}
def _gv(self, d: dict, *keys, default: str = "") -> str:
"""安全获取嵌套字典值,避免 None"""
v = d
for k in keys:
if isinstance(v, dict):
v = v.get(k)
if v is None:
return default
else:
return default
return v if v else default
def _remove_mixed_language_spaces(self, html: str) -> str:
"""Remove unnecessary spaces between Chinese and English/punctuation characters in HTML.
Handles the key edge case where inline tags like <strong> interrupt patterns:
e.g. "发布了 <strong>v2026.3.22</strong>" - the space before <strong> would be
missed because '<' is not alphanumeric. Strategy: replace inline tags with single-char
placeholders (ASCII control chars), process spaces, restore tags.
"""
import re
# Step 1: Replace inline tags with single-char placeholders
protected = []
def tag_to_placeholder(m):
placeholder = chr(0x01) + str(len(protected)) + chr(0x02)
protected.append(m.group(0))
return placeholder
# Match paired inline tags
html = re.sub(
r'<(strong|em|a|code|span|b|i|bdo)\b[^>]*>(.*?)</\1>',
tag_to_placeholder, html, flags=re.IGNORECASE | re.DOTALL
)
# Self-closing / void tags: replace with space
html = re.sub(r'<(br|hr)\s*/?>', ' ', html, flags=re.IGNORECASE)
# 注意:不再删除 img 标签(之前这行会把所有 <img> 替换为空格,导致图片丢失)
# Step 2: Remove unwanted spaces
html = re.sub(r'([\u4e00-\u9fff]) +([^\s])', r'\1\2', html)
html = re.sub(r'([^\s\u4e00-\u9fff]) +([\u4e00-\u9fff])', r'\1\2', html)
# Step 3: Restore tags
for i, tag_html in enumerate(protected):
html = html.replace(chr(0x01) + str(i) + chr(0x02), tag_html)
return html
def inject_section_images(self, html: str, section_images: dict) -> str:
"""Inject images before h2 section headings.
Args:
html: The HTML string to inject images into.
section_images: Dict mapping section title text (e.g. "一,大版本更新")
to image URLs. Keys are matched as SUBSTRINGS of h2 text content.
Uses regex to find h2 tags and insert img before them.
"""
import re
def normalize_for_match(text: str) -> str:
# Remove whitespace
text = re.sub(r'\s+', '', text)
# Normalize Chinese commas: U+3001 (、) and U+FF0C (,) -> remove both
text = text.replace('\u3001', '') # ideographic comma
text = text.replace('\uff0c', '') # fullwidth comma
return text
# Build list of (normalized_key, original_key, image_url)
section_list = [
(normalize_for_match(k), k, url)
for k, url in section_images.items()
]
def replace_h2(m: re.Match) -> str:
h2_full = m.group(0)
h2_open = m.group(1)
h2_text = m.group(2)
h2_close = m.group(3)
normalized_h2 = normalize_for_match(h2_text)
for norm_key, orig_key, image_url in section_list:
# Match if h2 content contains section title (substring match)
if norm_key in normalized_h2:
return (
f'<img src="{image_url}" alt="{orig_key}" style="max-width:100%;margin:16px 0;">\n'
f'{h2_open}{h2_text}{h2_close}'
)
return h2_full
html = re.sub(
r'(<h2\b[^>]*>)(.*?)(</h2>)',
replace_h2,
html,
flags=re.DOTALL
)
return html
def convert_to_html(self, markdown: str, theme: str = "default",
section_images: dict = None) -> str:
"""Markdown 转微信 HTML
Args:
markdown: Markdown content string.
theme: Theme name for styling.
section_images: Optional dict mapping section title text to image URLs.
Images will be injected before matching h2 section headings.
"""
try:
cfg = self._read_theme(theme)
# 颜色
primary = self._gv(cfg, "colors", "primary", default="#007AFF")
text_c = self._gv(cfg, "colors", "text", default="#333333")
body_bg = self._gv(cfg, "colors", "background", default="")
# body
body_fs = self._gv(cfg, "body", "font_size", default="15px")
body_lh = self._gv(cfg, "body", "line_height", default="1.8")
# h1
h1_fs = self._gv(cfg, "h1", "font_size", default="22px")
h1_align = self._gv(cfg, "h1", "text_align", default="center")
h1_mgn = self._gv(cfg, "h1", "margin", default="20px 0")
h1_color = self._gv(cfg, "h1", "color", default=primary)
# h2
h2_fs = self._gv(cfg, "h2", "font_size", default="18px")
h2_mgn = self._gv(cfg, "h2", "margin", default="20px 0 12px")
h2_sec = self._gv(cfg, "colors", "secondary", default=primary)
h2_color = self._gv(cfg, "h2", "color", default=h2_sec)
h2_bg = self._gv(cfg, "h2", "background_color", default="")
h2_pad = self._gv(cfg, "h2", "padding", default="")
# h3
h3_fs = self._gv(cfg, "h3", "font_size", default="15px")
h3_mgn = self._gv(cfg, "h3", "margin", default="16px 0 10px")
h3_color = self._gv(cfg, "h3", "color", default=text_c)
h3_bb = self._gv(cfg, "h3", "border_bottom", default="")
h3_pad = self._gv(cfg, "h3", "padding", default="")
# strong & em
self.strong_color = self._gv(cfg, "strong", "color", default="")
self.em_color = self._gv(cfg, "em", "color", default="")
# p
p_mgn = self._gv(cfg, "p", "margin", default="14px 0")
# link
link_c = self._gv(cfg, "link", "color", default=primary)
link_dec = self._gv(cfg, "link", "text_decoration", default="none")
# blockquote
bq_color = self._gv(cfg, "blockquote", "color", default="#666666")
bq_bg = self._gv(cfg, "blockquote", "background_color", default="#fff8f0")
bq_pad = self._gv(cfg, "blockquote", "padding", default="10px 16px")
bq_mgn = self._gv(cfg, "blockquote", "margin", default="16px 0")
bq_bl = self._gv(cfg, "blockquote", "border_left", default="")
# ul
ul_mgn = self._gv(cfg, "ul", "margin", default="12px 0")
ul_pl = self._gv(cfg, "ul", "padding_left", default="20px")
ul_bc = self._gv(cfg, "ul", "bullet_color", default=primary)
# ol
ol_mgn = self._gv(cfg, "ol", "margin", default="12px 0")
ol_pl = self._gv(cfg, "ol", "padding_left", default="20px")
ol_nc = self._gv(cfg, "ol", "number_color", default=primary)
# 外层容器:最大宽度 + 居中
bg_style = f"background:{body_bg};" if body_bg else ""
wrapper_style = (
f"font-family:-apple-system,BlinkMacSystemFont,sans-serif;"
f"color:{text_c};line-height:{body_lh};font-size:{body_fs};"
f"max-width:680px;margin:0 auto;padding:24px 20px;{bg_style}"
)
# 链接样式注入(覆盖行内 style)
link_style = f'a {{color:{link_c};text-decoration:{link_dec};}}'
html = [f'<div style="{wrapper_style}"><style>{link_style}</style>']
lines = markdown.split('\n')
i = 0
pending_ul_lines = []
pending_ol_lines = []
def flush_ul():
if not pending_ul_lines:
return
html.append(
f'<ul style="margin:{ul_mgn};padding-left:0;list-style:none;">'
)
for ln in pending_ul_lines:
ln = self._convert_inline_formatting(ln)
ln = self._escape_user_html(ln)
html.append(
f'<li style="margin:6px 0;padding-left:16px;position:relative;color:{text_c};line-height:{body_lh};">'
f'<span style="position:absolute;left:0;top:0;color:{ul_bc};font-weight:bold;line-height:{body_lh};">•</span>'
f'{ln}</li>'
)
html.append('</ul>')
pending_ul_lines.clear()
def flush_ol():
if not pending_ol_lines:
return
html.append(
f'<ol style="margin:{ol_mgn};padding-left:{ol_pl};list-style:none;counter-reset:ol-counter;">'
)
for ln in pending_ol_lines:
ln = self._convert_inline_formatting(ln)
ln = self._escape_user_html(ln)
html.append(
f'<li style="margin:6px 0;padding-left:12px;position:relative;color:{text_c};line-height:{body_lh};'
f'counter-increment:ol-counter;">'
f'<span style="position:absolute;left:0;color:{ol_nc};font-weight:bold;"></span>'
f'{ln}</li>'
)
html.append('</ol>')
pending_ol_lines.clear()
while i < len(lines):
raw_line = lines[i]
line = raw_line.strip()
# 空行:flush 列表
if not line:
flush_ul()
flush_ol()
i += 1
continue
# 分隔线 ---
if line == '---':
flush_ul()
flush_ol()
html.append('<hr style="border:none;border-top:1px solid #e8e8e8;margin:24px 0;">')
i += 1
continue
# 缩进代码块(4个以上空格开头,或 tab 开头)
if raw_line.startswith(' ') or raw_line.startswith('\t'):
flush_ul()
flush_ol()
# 收集所有缩进行
code_lines = [line]
i += 1
while i < len(lines) and (lines[i].startswith(' ') or lines[i].startswith('\t') or lines[i].strip() == ''):
code_lines.append(lines[i].strip())
i += 1
code_content = '\n'.join(code_lines)
code_content = code_content.replace("<", "<").replace(">", ">")
html.append(
f'<pre style="background:#f5f5f5;padding:12px 16px;border-radius:8px;'
f'overflow-x:auto;font-family:monospace;font-size:13px;margin:16px 0;'
f'line-height:1.6;border:1px solid #e8e8e8;">'
f'<code>{code_content}</code></pre>'
)
continue
# 代码块(``` 形式)
if line.startswith('```'):
flush_ul()
flush_ol()
code_lines = []
i += 1
while i < len(lines) and not lines[i].strip().startswith('```'):
code_lines.append(lines[i])
i += 1
code_content = '\n'.join(code_lines)
code_content = code_content.replace("<", "<").replace(">", ">")
html.append(
f'<pre style="background:#f5f5f5;padding:12px 16px;border-radius:8px;'
f'overflow-x:auto;font-family:monospace;font-size:13px;margin:16px 0;'
f'line-height:1.6;border:1px solid #e8e8e8;">'
f'<code>{code_content}</code></pre>'
)
i += 1
continue
# 引用块
if line.startswith('>'):
flush_ul()
flush_ol()
quote_lines = [line[1:].strip()]
i += 1
while i < len(lines) and lines[i].strip().startswith('>'):
quote_lines.append(lines[i].strip()[1:].strip())
i += 1
quote_content = '\n'.join(quote_lines)
quote_content = self._convert_inline_formatting(quote_content)
bq_bl_style = f"border-left:{bq_bl};" if bq_bl else f"border-left:4px solid {primary};"
html.append(
f'<blockquote style="'
f'{bq_bl_style}'
f'background:{bq_bg};'
f'padding:{bq_pad};'
f'margin:{bq_mgn};'
f'color:{bq_color};'
f'font-style:italic;'
f'border-radius:0 4px 4px 0;">'
f'{quote_content}</blockquote>'
)
continue
# 表格块
if self._is_table_row(line):
flush_ul()
flush_ol()
table_html, next_i = self._convert_table_block(lines, i, primary)
if table_html:
html.extend(table_html)
i = next_i
continue
# 标题
if line.startswith('# '):
flush_ul()
flush_ol()
html.append(
f'<h1 style="font-size:{h1_fs};font-weight:bold;'
f'text-align:{h1_align};color:{h1_color};'
f'margin:{h1_mgn};line-height:1.4;">'
f'{self._escape_user_html(line[2:])}</h1>'
)
elif line.startswith('## '):
flush_ul()
flush_ol()
h2_bg_style = f"background-color:{h2_bg};display:block;" if h2_bg else ""
h2_pad_style = f"padding:{h2_pad};" if h2_pad else ("padding:3px 10px;" if h2_bg else "padding-left:10px;")
html.append(
f'<h2 style="font-size:{h2_fs};font-weight:bold;'
f'{h2_bg_style}color:{h2_color};margin:{h2_mgn};line-height:1.4;'
f'border-left:10px solid {h2_sec};{h2_pad_style}">'
f'{self._escape_user_html(line[3:])}</h2>'
)
elif line.startswith('### '):
flush_ul()
flush_ol()
h3_bb_style = f"border-bottom:{h3_bb};padding-bottom:6px;" if h3_bb else ""
h3_pad_style = f"padding:{h3_pad};" if h3_pad else ""
html.append(
f'<h3 style="font-size:{h3_fs};font-weight:bold;'
f'color:{h3_color};margin:{h3_mgn};line-height:1.4;{h3_bb_style}{h3_pad_style}">'
f'{self._escape_user_html(line[4:])}</h3>'
)
elif line.startswith('#### '):
flush_ul()
flush_ol()
html.append(
f'<h4 style="font-size:15px;font-weight:bold;'
f'color:{h3_color};margin:{h3_mgn};line-height:1.4;">'
f'{self._escape_user_html(line[5:])}</h4>'
)
elif line.startswith('##### '):
flush_ul()
flush_ol()
html.append(
f'<h5 style="font-size:14px;font-weight:bold;'
f'color:{h3_color};margin:{h3_mgn};line-height:1.4;">'
f'{self._escape_user_html(line[6:])}</h5>'
)
elif line.startswith('###### '):
flush_ul()
flush_ol()
html.append(
f'<h6 style="font-size:13px;font-weight:bold;'
f'color:{h3_color};margin:{h3_mgn};line-height:1.4;">'
f'{self._escape_user_html(line[7:])}</h6>'
)
# Markdown 图片:
elif re.match(r'^!\[', line):
flush_ul()
flush_ol()
img_match = re.match(r'^!\[([^\]]*)\]\(([^)]+)\)', line)
if img_match:
alt = img_match.group(1)
url = img_match.group(2)
alt_esc = self._escape_user_html(alt) if alt else ''
html.append(
f'<img src="{url}" alt="{alt_esc}" '
f'style="max-width:100%;margin:16px 0;">'
)
i += 1
continue
# 任务列表:- [ ] 或 - [x](必须在普通列表之前检测)
elif re.match(r'^(\s*)- \[([ xX])\] (.+)$', line):
flush_ul()
flush_ol()
indent = len(re.match(r'^(\s*)', line).group(1))
checked = re.match(r'^(\s*)- \[([ xX])\] (.+)$', line).group(2).lower() == 'x'
text = re.match(r'^(\s*)- \[([ xX])\] (.+)$', line).group(3)
text = self._convert_inline_formatting(text)
text = self._escape_user_html(text)
checkbox = 'checked' if checked else ''
checkbox_html = (
f'<input type="checkbox" disabled {checkbox} '
f'style="margin-right:8px;vertical-align:middle;pointer-events:none;">'
)
html.append(
f'<div style="margin:10px 0;display:flex;align-items:flex-start;">'
f'{checkbox_html}<span>{text}</span></div>'
)
i += 1
continue
# 无序列表:暂存
elif line.startswith('- ') or line.startswith('* '):
pending_ul_lines.append(line[2:])
pending_ol_lines.clear()
# 有序列表:暂存
elif re.match(r'^\d+\.\s', line):
flush_ul()
m = re.match(r'^\d+\.\s+(.*)$', line)
if m:
pending_ol_lines.append(m.group(1))
# 普通段落
else:
flush_ul()
flush_ol()
line = self._convert_inline_formatting(line)
line = self._escape_user_html(line)
# 图片行:检测是否为封面图(alt 含"封面"),封面图 full-width
img_match = re.match(r'^<img[^>]+alt="([^"]*)"[^>]*>$', line)
if img_match:
# 图片:保持 bare img,跟旧版完全一致,不加任何额外 inline style
html.append(f'<p style="margin:14px 0;text-align:center;">{line}</p>')
else:
# 没有 alt 匹配,当普通段落处理
html.append(
f'<p style="margin:{p_mgn};color:{text_c};line-height:{body_lh};">{line}</p>'
)
i += 1
flush_ul()
flush_ol()
html.append('</div>')
result = '\n'.join(html)
except Exception as e:
logger.error(f"convert_to_html 错误: {e}")
raise
# Post-processing: remove mixed-language spaces and inject section images
result = self._remove_mixed_language_spaces(result)
if section_images:
result = self.inject_section_images(result, section_images)
return result
def _convert_inline_formatting(self, text: str) -> str:
"""转换行内格式:加粗、斜体、链接、图片"""
# 0. 保护 HTML 属性中的 URL 和属性值(先于所有转换)
# 使用占位符保护 src/href 属性值中的 URL,避免下划线等被错误转换
protected = []
def protect_url(match):
idx = len(protected)
protected.append(match.group(0))
return f"__PROTECTED_URL_{idx}__"
# 保护 src= 和 href= 属性值(URL)
text = re.sub(r'(src|href)="([^"]+)"', protect_url, text)
# 1. 处理 markdown 图片  -> <img src="url" alt="alt" />
text = re.sub(r'!\[([^\]]*)\]\(([^)]+)\)', r'<img src="\2" alt="\1" style="max-width:100%;margin:16px 0;" />', text)
# 2. 链接 [text](url)
text = re.sub(r'\[([^\]]+)\]\(([^)]+)\)', r'<a href="\2" style="color:#007AFF;text-decoration:none;">\1</a>', text)
# 3. 加粗 **text** 或 __text__
strong_style = f"color:{self.strong_color};font-weight:bold;" if self.strong_color else "font-weight:bold;"
text = re.sub(r'\*\*([^*]+)\*\*', rf'<strong style="{strong_style}">\1</strong>', text)
text = re.sub(r'__([^_]+)__', rf'<strong style="{strong_style}">\1</strong>', text)
# 4. 斜体 *text* 或 _text_
em_style = f"color:{self.em_color};font-style:normal;" if self.em_color else ""
em_tag = f'<em style="{em_style}">' if em_style else "<em>"
em_close = "</em>"
# 使用负向预见断言:确保符号两侧不是 URL 字符
# *斜体* 处理
text = re.sub(r'(?<![a-zA-Z0-9/:.?=&%-])\*([^*]+)\*(?![a-zA-Z0-9/:.?=&%-])', em_tag + r'\1' + em_close, text)
text = re.sub(r'(?<![a-zA-Z0-9])\*([^*]+)\*(?![a-zA-Z0-9])', em_tag + r'\1' + em_close, text)
# _斜体_ 处理
text = re.sub(r'(?<![a-zA-Z0-9/:.?=&%-])_([^_]+)_(?![a-zA-Z0-9/:.?=&%-])', em_tag + r'\1' + em_close, text)
text = re.sub(r'(?<![a-zA-Z0-9])_([^_]+)_(?![a-zA-Z0-9])', em_tag + r'\1' + em_close, text)
# 5. 行内代码 `code`
text = re.sub(r'`([^`]+)`', r'<code style="background:#f5f5f5;padding:2px 6px;border-radius:4px;font-family:monospace;font-size:13px;">\1</code>', text)
# 6. 删除线 ~~text~~
text = re.sub(r'~~([^~]+)~~', r'<del style="color:#999;text-decoration:line-through;">\1</del>', text)
# 7. 上标 ^text^
text = re.sub(r'\^([^^]+)\^', r'<sup style="font-size:0.8em;vertical-align:super;">\1</sup>', text)
# 8. 下标 ~text~
text = re.sub(r'~([^~]+)~', r'<sub style="font-size:0.8em;vertical-align:sub;">\1</sub>', text)
# 9. 高亮 ==text== (仅匹配独立行内,非表格/代码内)
text = re.sub(r'==([^=]+)==', r'<mark style="background:#FFF176;padding:1px 3px;border-radius:3px;">\1</mark>', text)
# 10. 恢复被保护的 URL
for idx, url in enumerate(protected):
text = text.replace(f"__PROTECTED_URL_{idx}__", url)
# 11. 保护生成的HTML标签(避免后续处理被转义)
text = self._protect_html_tags(text)
return text
def _escape_user_html(self, text: str) -> str:
"""转义用户原始文本中的 HTML 标签,跳过已保护标签"""
# 先用占位符保护已生成的HTML标签
protected = []
def ph(m):
idx = len(protected)
protected.append(m.group(0))
return f"__HTAG_{idx}__"
# 保护已生成的安全标签
for t in ('div','span','p','h1','h2','h3','h4','h5','h6',
'ul','ol','li','blockquote','pre','code',
'table','thead','tbody','tr','th','td',
'a','strong','em','del','sup','sub','mark',
'details','summary','figure','figcaption',
'img','br','hr','input'):
text = re.sub(f'<{t}[^>]*>.*?</{t}>', ph, text, flags=re.DOTALL)
text = re.sub(f'<{t}[^>]*/?>', ph, text)
# 转义剩余的用户原始HTML
text = text.replace("<", "<").replace(">", ">")
# 恢复被保护的标签
for idx, tag in enumerate(protected):
text = text.replace(f"__HTAG_{idx}__", tag)
return text
def _protect_html_tags(self, text: str) -> str:
"""保护已生成的 HTML 标签,用占位符替换,避免转义"""
protected = []
def ph(m):
idx = len(protected)
protected.append(m.group(0))
return f"__HTAG_{idx}__"
# 成对标签
for t in ('div','span','p','h1','h2','h3','h4','h5','h6',
'ul','ol','li','blockquote','pre','code',
'table','thead','tbody','tr','th','td',
'a','strong','em','del','sup','sub','mark',
'details','summary','figure','figcaption'):
text = re.sub(f'<{t}[^>]*>.*?</{t}>', ph, text, flags=re.DOTALL)
# 自闭合标签
for t in ('img','br','hr','input'):
text = re.sub(f'<{t}[^>]*/?>', ph, text)
# 恢复占位符
for idx, tag in enumerate(protected):
text = text.replace(f"__HTAG_{idx}__", tag)
return text
def get_themes(self) -> List[str]:
"""获取主题列表"""
try:
themes = []
if self._themes_dir.exists():
for f in self._themes_dir.glob("*.yaml"):
themes.append(f.stem)
return themes if themes else ["default"]
except Exception as e:
logger.error(f"get_themes 错误: {e}")
return ["default"]
def preview_theme(self, theme_name: str = None) -> str:
"""
生成模板预览 HTML。
Args:
theme_name: 指定模板名,为 None 时返回所有模板合并预览。
Returns:
HTML 字符串,可用 canvas.present(url="data:text/html,...") 渲染。
"""
import yaml
# 模板配色和底色配置
theme_styles = {
"default": {
"name_cn": "默认",
"name_en": "default",
"primary": "#007AFF",
"bg": "#f0f4ff",
"label_bg": "#007AFF",
},
"houge": {
"name_cn": "猴哥",
"name_en": "houge",
"primary": "#333333",
"bg": "#f0f0f0",
"label_bg": "#333333",
},
"shuimo": {
"name_cn": "水墨",
"name_en": "shuimo",
"primary": "#2c3e50",
"bg": "#f5f5f0",
"label_bg": "#2c3e50",
},
"wenyan": {
"name_cn": "古文",
"name_en": "wenyan",
"primary": "#8b4513",
"bg": "#fff8f0",
"label_bg": "#8b4513",
},
"macaron": {
"name_cn": "马卡龙",
"name_en": "macaron",
"primary": "#e91e8c",
"bg": "#fef0f5",
"label_bg": "#e91e8c",
},
}
# 确定要预览哪些模板
if theme_name:
targets = {theme_name: theme_styles.get(theme_name, theme_styles["default"])}
else:
targets = theme_styles
blocks = []
for tid, style in targets.items():
theme_file = self._themes_dir / f"{tid}.yaml"
if theme_file.exists():
with open(theme_file) as f:
cfg = yaml.safe_load(f) or {}
primary = cfg.get("colors", {}).get("primary", style["primary"])
bg = style["bg"]
else:
primary = style["primary"]
bg = style["bg"]
block = f'''
<div style="background:{bg}; padding:24px; margin-bottom:20px; border-radius:4px;">
<div style="display:inline-block; background:{style["label_bg"]}; color:#fff; padding:5px 12px; font-size:12px; border-radius:3px; margin-bottom:10px;">
{style["name_cn"]} {style["name_en"]}
</div>
<h1 style="font-size:20px; color:{primary}; margin:8px 0;">示例标题</h1>
<p style="color:#333; line-height:1.8; margin:8px 0;">正文示例文字,演示模板效果...</p>
<blockquote style="border-left:3px solid {primary}; padding-left:10px; color:#666; margin:8px 0;">引用块效果</blockquote>
<p style="margin:8px 0;"><strong>加粗</strong> · <em>斜体</em></p>
</div>'''
blocks.append(block)
html = f'''<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<style>
body {{ margin:0; padding:24px; background:#fff; font-family: -apple-system, BlinkMacSystemFont, sans-serif; }}
</style>
</head>
<body>
<div style="max-width:900px; margin:0 auto;">
<h2 style="text-align:center; padding:16px 0; margin-bottom:24px; font-size:18px; color:#333;">
微信公众号模板预览
</h2>
{"".join(blocks)}
</div>
</body>
</html>'''
FILE:src/skills/base_skill.py
"""
微信公众号自动化 - 基础 Skill 类
"""
import requests
import json
from typing import Optional, Dict, Any
from token_manager import TokenManager
from config import Config
from exceptions import APIError, get_error_message
class BaseSkill:
"""基础 Skill 类"""
def __init__(self, token_manager: Optional[TokenManager] = None):
self._token_manager = token_manager
self._config = Config()
@property
def token_manager(self) -> TokenManager:
if self._token_manager is None:
self._token_manager = TokenManager.from_config(self._config)
return self._token_manager
@property
def access_token(self) -> str:
return self.token_manager.get_access_token()
def _request(self, method: str, endpoint: str, data: Optional[Dict] = None, params: Optional[Dict] = None, retry: bool = True) -> Dict[str, Any]:
"""发送 HTTP 请求"""
url = f"https://api.weixin.qq.com{endpoint}"
if params is None:
params = {}
params["access_token"] = self.access_token
headers = {"Content-Type": "application/json"}
if method.upper() == "GET":
response = requests.get(url, params=params, timeout=30)
else:
payload = json.dumps(data, ensure_ascii=False).encode("utf-8") if data else None
response = requests.post(url, params=params, data=payload, headers=headers, timeout=30)
result = response.json()
errcode = result.get("errcode", 0)
if errcode != 0:
if retry and errcode in [40001, 40014, 42001]:
self.token_manager.refresh_token()
params["access_token"] = self.access_token
return self._request(method, endpoint, data, params, retry=False)
raise APIError(errcode, result.get("errmsg", get_error_message(errcode)))
return result
def get(self, endpoint: str, params: Optional[Dict] = None) -> Dict[str, Any]:
return self._request("GET", endpoint, params=params)
def post(self, endpoint: str, data: Optional[Dict] = None) -> Dict[str, Any]:
return self._request("POST", endpoint, data=data)
FILE:src/skills/content_reviewer.py
"""
微信公众号自动化 - 内容审核 Skill
"""
import os
import re
import json
import hashlib
import time
import threading
import requests
from typing import Dict, List, Set
from pathlib import Path
from .base_skill import BaseSkill
class ContentReviewerSkill(BaseSkill):
"""内容审核"""
PROHIBITED_WORDS = ["反动", "暴力", "色情", "赌博", "毒品", "诈骗", "谣言"]
# 重复度检测参数
N_GRAM_SIZE = 3 # n-gram 大小
SIMILARITY_THRESHOLD = 30 # 相似度阈值(30%以上认为重复)
MIN_CONTENT_LENGTH = 50 # 最小内容长度
# 网络搜索重复度检测参数
NETWORK_CHECK_ENABLED = True # 是否启用网络检测
KEY_SENTENCE_COUNT = 5 # 提取关键句数量
SEARCH_MATCH_THRESHOLD = 10 # 搜索匹配阈值(%),降低以提高检出率
SEARCH_TIMEOUT = 10 # 搜索超时时间(秒)
SEARCH_DELAY = 1.0 # 搜索间隔(秒),避免频率限制
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._history_file = Path.home() / ".cache" / "wechat-mp-auto" / "article_history.json"
self._history_file.parent.mkdir(parents=True, exist_ok=True)
# 获取 Tavily API Key
self._tavily_api_key = os.environ.get("TAVILY_API_KEY", "")
# 搜索结果缓存
self._search_cache_file = Path.home() / ".cache" / "wechat-mp-auto" / "search_cache.json"
self._search_cache = self._load_search_cache()
def _load_search_cache(self) -> Dict:
"""加载搜索缓存"""
if not self._search_cache_file.exists():
return {}
try:
with open(self._search_cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
def _save_search_cache(self):
"""保存搜索缓存"""
try:
with open(self._search_cache_file, 'w', encoding='utf-8') as f:
json.dump(self._search_cache, f, ensure_ascii=False, indent=2)
except Exception:
pass
def review_article(self, article: Dict) -> Dict:
"""全面审核"""
content = article.get("markdown", article.get("content", ""))
plagiarism = self.check_plagiarism(content)
facts = self.verify_facts(content)
prohibited = self.check_prohibited_content(content)
issues = []
if plagiarism.get("is_duplicated"):
issues.append({"type": "plagiarism", "severity": "high", "message": "重复度较高"})
if prohibited.get("violations"):
issues.append({"type": "prohibited", "severity": "critical", "message": "包含违规内容"})
return {
"passed": len([i for i in issues if i.get("severity") == "critical"]) == 0,
"issues": issues,
"plagiarism": plagiarism,
"facts": facts,
"prohibited": prohibited
}
def check_plagiarism(self, content: str) -> Dict:
"""检查重复度 - 使用 n-gram + Jaccard 相似度"""
# 预处理:去除特殊字符,保留中英文和数字
cleaned = self._preprocess_text(content)
# 内容太短,跳过检测
if len(cleaned) < self.MIN_CONTENT_LENGTH:
return {
"is_duplicated": False,
"similarity": 0,
"reason": "内容太短,跳过检测",
"history_count": 0
}
# 获取当前内容的 n-gram 集合
current_ngrams = self._get_ngrams(cleaned, self.N_GRAM_SIZE)
if not current_ngrams:
return {
"is_duplicated": False,
"similarity": 0,
"reason": "无法提取特征",
"history_count": 0
}
# 加载历史文章
history = self._load_history()
# 计算与每篇历史文章的相似度
max_similarity = 0
most_similar_title = None
for item in history:
# 兼容:历史记录中存的是 title 字段(保存时用的),也可能是 content
hist_content = self._preprocess_text(item.get("content", item.get("title", "")))
if len(hist_content) < self.MIN_CONTENT_LENGTH:
continue
hist_ngrams = self._get_ngrams(hist_content, self.N_GRAM_SIZE)
similarity = self._compute_jaccard_similarity(current_ngrams, hist_ngrams)
if similarity > max_similarity:
max_similarity = similarity
most_similar_title = item.get("title", "未知")
# 判断是否重复
is_duplicated = max_similarity >= self.SIMILARITY_THRESHOLD
# 同时检查自重复(文章内部重复段落太多)
internal_dup = self._check_internal_duplication(cleaned)
result = {
"is_duplicated": is_duplicated or internal_dup,
"similarity": max_similarity,
"similar_title": most_similar_title,
"internal_duplication": internal_dup,
"history_count": len(history),
"threshold": self.SIMILARITY_THRESHOLD
}
# 保存当前文章到历史记录
article_hash = self._compute_hash(cleaned)
self._save_to_history(article_hash, content[:500]) # 保存前500字符的 hash
return result
def _preprocess_text(self, text: str) -> str:
"""预处理文本:去除 markdown 格式、特殊字符"""
# 去除 markdown 标题符号
text = re.sub(r'^#+\s+', '', text, flags=re.MULTILINE)
# 去除链接
text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
# 去除图片
text = re.sub(r'!\[([^\]]*)\]\([^)]+\)', '', text)
# 去除代码块
text = re.sub(r'```[\s\S]*?```', '', text)
# 去除行内代码
text = re.sub(r'`([^`]+)`', r'\1', text)
# 去除特殊字符,只保留中英文、数字和常用标点
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9,。!?、;:""''()【】《》\s]', '', text)
# 去除多余空白
text = re.sub(r'\s+', '', text)
return text
def _get_ngrams(self, text: str, n: int = 3) -> Set[str]:
"""获取文本的 n-gram 集合"""
ngrams = set()
for i in range(len(text) - n + 1):
ngram = text[i:i+n]
# 过滤掉纯数字的 n-gram(保留中英文混合)
if not ngram.isdigit():
ngrams.add(ngram)
return ngrams
def _compute_jaccard_similarity(self, set1: Set[str], set2: Set[str]) -> float:
"""计算 Jaccard 相似度"""
if not set1 or not set2:
return 0.0
intersection = len(set1 & set2)
union = len(set1 | set2)
if union == 0:
return 0.0
return (intersection / union) * 100
def _check_internal_duplication(self, text: str) -> bool:
"""检查文章内部是否有大量重复内容"""
# 获取更长的 n-gram(5-gram)来检测段落重复
ngrams = self._get_ngrams(text, 5)
# 如果某个 n-gram 出现次数过多,说明有重复
ngram_count = {}
for i in range(len(text) - 5 + 1):
ngram = text[i:i+5]
if not ngram.isdigit() and not ngram.isalpha():
ngram_count[ngram] = ngram_count.get(ngram, 0) + 1
# 找出出现最多的 n-gram
if ngram_count:
max_count = max(ngram_count.values())
# 如果某个片段出现超过 10 次,认为有内部重复
if max_count > 10:
return True
return False
# ========== 网络搜索重复度检测 ==========
def check_network_plagiarism(self, content: str, callback=None) -> Dict:
"""
检查网络重复度(异步)
通过提取关键句子并搜索比对,判断文章是否与网上已有内容重复。
Args:
content: 文章内容
callback: 异步完成后的回调函数
Returns:
初步结果(实际检测在后台进行)
"""
# 检查是否启用网络检测
if not self.NETWORK_CHECK_ENABLED:
return {
"enabled": False,
"reason": "网络检测未启用",
"is_plagiarized": False
}
# 检查是否有 API Key
if not self._tavily_api_key:
# 尝试从配置文件读取
config_file = Path.home() / ".config" / "wechat-mp-auto" / "config.json"
if config_file.exists():
try:
with open(config_file, 'r') as f:
config = json.load(f)
self._tavily_api_key = config.get("tavily_api_key", "")
except Exception:
pass
# 再次检查
if not self._tavily_api_key:
return {
"enabled": False,
"reason": "未配置 TAVILY_API_KEY(请设置环境变量或配置文件)",
"is_plagiarized": False,
"how_to_config": "设置环境变量 TAVILY_API_KEY 或在 config.json 中添加 tavily_api_key"
}
# 提取关键句子
key_sentences = self._extract_key_sentences(content)
if not key_sentences:
return {
"enabled": True,
"is_plagiarized": False,
"reason": "内容太短,无法提取关键句"
}
# 启动异步检测
result_container = {"result": None}
def async_check():
result = self._do_network_check(key_sentences, content)
result_container["result"] = result
if callback:
callback(result)
thread = threading.Thread(target=async_check)
thread.daemon = True
thread.start()
# 返回初步结果
return {
"enabled": True,
"async": True,
"key_sentences_count": len(key_sentences),
"status": "检测中...",
"_thread": thread,
"_result_container": result_container
}
def get_network_result(self, preliminary_result: Dict) -> Dict:
"""获取异步检测结果"""
if not preliminary_result.get("enabled"):
return preliminary_result
if preliminary_result.get("async"):
thread = preliminary_result.get("_thread")
result_container = preliminary_result.get("_result_container")
if thread and thread.is_alive():
return {
"status": "检测中...",
"progress": "请稍候"
}
if result_container and result_container.get("result"):
return result_container["result"]
return {"status": "检测完成", "is_plagiarized": False}
return preliminary_result
def _do_network_check(self, key_sentences: List[str], content: str) -> Dict:
"""执行网络检测"""
matches = []
total_checks = len(key_sentences)
for i, sentence in enumerate(key_sentences):
# 检查缓存
cache_key = self._compute_hash(sentence[:50])
if cache_key in self._search_cache:
cached = self._search_cache[cache_key]
match_result = cached
else:
# 执行搜索
match_result = self._search_and_compare(sentence)
# 保存到缓存
self._search_cache[cache_key] = match_result
if match_result.get("is_matched"):
matches.append({
"sentence": sentence[:50] + "...",
"matched_url": match_result.get("matched_url"),
"similarity": match_result.get("similarity")
})
# 延时,避免频率限制
if i < total_checks - 1:
time.sleep(self.SEARCH_DELAY)
# 计算匹配率
match_ratio = (len(matches) / total_checks * 100) if total_checks > 0 else 0
result = {
"is_plagiarized": match_ratio >= self.SEARCH_MATCH_THRESHOLD,
"match_ratio": match_ratio,
"matches": matches,
"total_checks": total_checks,
"threshold": self.SEARCH_MATCH_THRESHOLD,
"key_sentences": key_sentences
}
# 保存缓存
self._save_search_cache()
return result
def _extract_key_sentences(self, text: str, n: int = 5) -> List[str]:
"""提取关键句子"""
# 预处理
cleaned = self._preprocess_text(text)
if len(cleaned) < 100:
return []
# 分割句子
sentences = re.split(r'[。!?\n]', cleaned)
sentences = [s.strip() for s in sentences if len(s.strip()) >= 20]
if not sentences:
return []
# 计算每句的权重(位置 + 长度)
scored_sentences = []
for i, sentence in enumerate(sentences):
# 位置权重:开头和结尾的句子更重要
position_weight = 1.0
if i == 0:
position_weight = 1.5
elif i < 3:
position_weight = 1.2
# 长度权重:适中最好
length = len(sentence)
if length < 20:
length_weight = 0.5
elif length > 100:
length_weight = 0.8
else:
length_weight = 1.0
score = position_weight * length_weight
scored_sentences.append((score, sentence))
# 按分数排序,取前 N 个
scored_sentences.sort(reverse=True, key=lambda x: x[0])
return [s[1] for s in scored_sentences[:n]]
def _search_and_compare(self, sentence: str) -> Dict:
"""搜索并比对"""
try:
# 提取关键词用于搜索(取句子中重要部分)
search_query = self._extract_search_keywords(sentence)
# 调用 Tavily API
url = "https://api.tavily.com/search"
headers = {"Content-Type": "application/json"}
data = {
"api_key": self._tavily_api_key,
"query": search_query,
"search_depth": "basic",
"max_results": 5
}
response = requests.post(
url,
json=data,
headers=headers,
timeout=self.SEARCH_TIMEOUT
)
if response.status_code != 200:
return {"is_matched": False, "error": f"API错误: {response.status_code}"}
results = response.json()
search_results = results.get("results", [])
if not search_results:
return {"is_matched": False}
# 计算与搜索结果的相似度
best_match = None
best_similarity = 0
for result in search_results:
title = result.get("title", "")
content = result.get("content", "")[:500]
combined = title + " " + content
# 方法1:文本相似度
text_similarity = self._compute_text_similarity(sentence, combined)
# 方法2:关键词重叠度
keyword_similarity = self._compute_keyword_overlap(sentence, combined)
# 综合相似度(文本相似度 + 关键词权重)
similarity = max(text_similarity, keyword_similarity)
if similarity > best_similarity:
best_similarity = similarity
best_match = {
"url": result.get("url"),
"title": title,
"similarity": similarity,
"text_similarity": text_similarity,
"keyword_similarity": keyword_similarity
}
# 阈值判断
is_matched = best_similarity >= 20 # 20% 以上认为匹配
return {
"is_matched": is_matched,
"similarity": best_similarity,
"matched_url": best_match.get("url") if best_match else None,
"matched_title": best_match.get("title") if best_match else None
}
except requests.Timeout:
return {"is_matched": False, "error": "搜索超时"}
except Exception as e:
return {"is_matched": False, "error": str(e)}
def _compute_text_similarity(self, text1: str, text2: str) -> float:
"""计算两个文本的相似度(基于字符集交集)"""
# 预处理
t1 = self._preprocess_text(text1)
t2 = self._preprocess_text(text2)
if not t1 or not t2:
return 0.0
# 计算字符级 Jaccard 相似度
set1 = set(t1[i:i+3] for i in range(len(t1)-2)) # 3-gram
set2 = set(t2[i:i+3] for i in range(len(t2)-2))
if not set1 or not set2:
return 0.0
intersection = len(set1 & set2)
union = len(set1 | set2)
return (intersection / union * 100) if union > 0 else 0.0
def _extract_search_keywords(self, sentence: str) -> str:
"""从句子中提取简短的搜索关键词"""
import re
# 去除标点和多余空格
text = re.sub(r'[,。!?、;:""''()【】《》]', ' ', sentence)
text = ' '.join(text.split()) # 合并多余空格
# 如果句子太长,截取前30个字符
if len(text) > 30:
text = text[:30]
return text
def _compute_keyword_overlap(self, text1: str, text2: str) -> float:
"""计算关键词重叠度"""
# 提取关键词
kw1 = set(self._extract_search_keywords(text1).split())
kw2 = set(self._extract_search_keywords(text2).split())
if not kw1 or not kw2:
return 0.0
# 计算 Jaccard
intersection = len(kw1 & kw2)
union = len(kw1 | kw2)
return (intersection / union * 100) if union > 0 else 0.0
def _compute_hash(self, text: str) -> str:
"""计算文本的 MD5 哈希"""
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _load_history(self) -> List[Dict]:
"""加载历史文章记录"""
if not self._history_file.exists():
return []
try:
with open(self._history_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return []
def _save_to_history(self, content_hash: str, title: str = ""):
"""保存文章到历史记录"""
history = self._load_history()
# 检查是否已存在
for item in history:
if item.get("hash") == content_hash:
return # 已存在,不重复添加
# 添加新记录
import time
history.append({
"hash": content_hash,
"title": title,
"timestamp": int(time.time())
})
# 只保留最近 100 篇
history = history[-100:]
try:
with open(self._history_file, 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=False, indent=2)
except Exception:
pass
def verify_facts(self, content: str) -> Dict:
"""验证数据"""
data_points = self._extract_data_points(content)
return {"total_points": len(data_points), "verified": [], "issues": []}
def check_prohibited_content(self, content: str) -> Dict:
"""检查违规"""
violations = []
content_lower = content.lower()
for word in self.PROHIBITED_WORDS:
if word in content_lower:
violations.append({"word": word})
return {"has_violations": len(violations) > 0, "violations": violations}
def _extract_data_points(self, content: str) -> List[Dict]:
percentages = re.findall(r'(\d+(?:\.\d+)?)\s*%', content)
return [{"type": "percentage", "value": p} for p in percentages]
def auto_fix(self, article: Dict, issues: List[Dict]) -> Dict:
"""自动修复"""
fixed = article.copy()
fixed["fixed"] = True
return fixed
def clear_history(self) -> Dict:
"""清除历史记录"""
try:
if self._history_file.exists():
self._history_file.unlink()
return {"success": True, "message": "历史记录已清除"}
except Exception as e:
return {"success": False, "message": str(e)}
FILE:src/skills/draft_skill.py
"""
微信公众号自动化 - 草稿箱管理 Skill
"""
import logging
from typing import List, Dict, Optional
from .base_skill import BaseSkill
# 配置日志
logger = logging.getLogger(__name__)
class DraftSkill(BaseSkill):
"""草稿箱管理"""
def list_drafts(self, offset: int = 0, count: int = 20, no_content: bool = False) -> Dict:
"""获取草稿列表"""
# 参数验证
if not isinstance(offset, int) or offset < 0:
logger.error(f"无效的offset: {offset}")
raise ValueError("offset必须是大于等于0的整数")
if not isinstance(count, int) or count < 1 or count > 20:
logger.warning(f"无效的count: {count}, 使用默认值20")
count = 20
if not isinstance(no_content, bool):
logger.warning(f"无效的no_content: {no_content}, 使用默认值False")
no_content = False
try:
logger.info(f"获取草稿列表: offset={offset}, count={count}, no_content={no_content}")
data = {"offset": offset, "count": count, "no_content": 1 if no_content else 0}
result = self.post("/cgi-bin/draft/batchget", data)
drafts = result.get("item", [])
total_count = result.get("total_count", 0)
logger.info(f"草稿列表获取成功: 总数={total_count}, 本次返回={len(drafts)}")
return {"total_count": total_count, "drafts": drafts}
except Exception as e:
logger.error(f"获取草稿列表失败: {str(e)}", exc_info=True)
raise
def get_draft(self, media_id: str) -> Dict:
"""获取草稿详情"""
# 参数验证
if not media_id or not isinstance(media_id, str):
logger.error("无效的media_id参数")
raise ValueError("media_id不能为空且必须是字符串")
if len(media_id) < 10:
logger.error(f"media_id格式可能无效: {media_id}")
raise ValueError("media_id格式无效")
try:
logger.info(f"获取草稿详情: media_id={media_id}")
result = self.post("/cgi-bin/draft/get", {"media_id": media_id})
logger.info(f"草稿详情获取成功: media_id={media_id}")
return result
except Exception as e:
logger.error(f"获取草稿详情失败: {str(e)}", exc_info=True)
raise
def create_draft(self, articles: List[Dict], auto_upload_thumb: bool = False) -> Dict:
"""创建草稿
Args:
articles: 文章列表,每篇文章需包含 title, content 等字段
auto_upload_thumb: 是否自动上传封面图。如果为True,会自动将文章HTML中
的第一张图片作为封面上传
"""
# 参数验证
if not articles or not isinstance(articles, list):
logger.error("无效的articles参数: articles不能为空且必须是列表")
raise ValueError("articles不能为空且必须是列表")
if len(articles) > 8:
logger.warning(f"文章数量过多({len(articles)}),微信限制最多8篇")
articles = articles[:8]
# 验证每篇文章的结构,并自动处理封面图
for i, article in enumerate(articles):
if not isinstance(article, dict):
logger.error(f"无效的文章结构: 第{i+1}篇不是字典")
raise ValueError(f"第{i+1}篇文章结构无效")
# 至少需要title字段
if "title" not in article or not article.get("title"):
logger.warning(f"第{i+1}篇文章缺少标题")
# 检查 thumb_media_id
thumb_id = article.get("thumb_media_id", "")
if not thumb_id and auto_upload_thumb:
# 自动从HTML内容中提取第一张图片并上传作为封面
content = article.get("content", "")
if content:
import re
img_matches = re.findall(r'<img[^>]*src="([^"]+)"', content)
if img_matches:
first_img_url = img_matches[0]
logger.info(f"[{i+1}] 自动提取封面图: {first_img_url[:50]}...")
# 下载并上传
try:
from material_skill import MaterialSkill
mat = MaterialSkill()
import requests
import uuid
from pathlib import Path
cache = Path.home() / ".cache" / "wechat-mp-auto" / "images"
cache.mkdir(parents=True, exist_ok=True)
if first_img_url.startswith("http"):
resp = requests.get(first_img_url, timeout=30)
if resp.status_code == 200:
local_file = cache / f"thumb_{uuid.uuid4().hex[:8]}.jpg"
with open(local_file, "wb") as f:
f.write(resp.content)
upload_result = mat.upload_image(str(local_file))
new_thumb_id = upload_result.get("media_id", "")
if new_thumb_id:
article["thumb_media_id"] = new_thumb_id
logger.info(f"[{i+1}] 封面上传成功: {new_thumb_id[:20]}...")
except Exception as e:
logger.warning(f"[{i+1}] 自动封面上传失败: {e}")
try:
logger.info(f"创建草稿: articles_count={len(articles)}")
result = self.post("/cgi-bin/draft/add", {"articles": articles})
if "media_id" in result:
logger.info(f"草稿创建成功: media_id={result['media_id']}")
else:
logger.warning(f"草稿创建返回结果异常: {result}")
return result
except Exception as e:
logger.error(f"创建草稿失败: {str(e)}", exc_info=True)
raise
def update_draft(self, media_id: str, article: Dict, index: int = 0) -> Dict:
"""更新草稿"""
# 参数验证
if not media_id or not isinstance(media_id, str):
logger.error("无效的media_id参数")
raise ValueError("media_id不能为空且必须是字符串")
if len(media_id) < 10:
logger.error(f"media_id格式可能无效: {media_id}")
raise ValueError("media_id格式无效")
if not article or not isinstance(article, dict):
logger.error("无效的article参数: article必须是字典")
raise ValueError("article必须是字典")
if not isinstance(index, int) or index < 0:
logger.error(f"无效的index: {index}")
raise ValueError("index必须是大于等于0的整数")
try:
logger.info(f"更新草稿: media_id={media_id}, index={index}")
result = self.post("/cgi-bin/draft/update", {
"media_id": media_id,
"index": index,
"articles": article
})
logger.info(f"草稿更新成功: media_id={media_id}")
return result
except Exception as e:
logger.error(f"更新草稿失败: {str(e)}", exc_info=True)
raise
def delete_draft(self, media_id: str) -> Dict:
"""删除草稿"""
# 参数验证
if not media_id or not isinstance(media_id, str):
logger.error("无效的media_id参数")
raise ValueError("media_id不能为空且必须是字符串")
if len(media_id) < 10:
logger.error(f"media_id格式可能无效: {media_id}")
raise ValueError("media_id格式无效")
try:
logger.info(f"删除草稿: media_id={media_id}")
result = self.post("/cgi-bin/draft/delete", {"media_id": media_id})
logger.info(f"草稿删除成功: media_id={media_id}")
return result
except Exception as e:
logger.error(f"删除草稿失败: {str(e)}", exc_info=True)
raise
FILE:src/skills/image_generator.py
"""
微信公众号自动化 - 配图生成 Skill
图片来源:
1. AI 生图(用户选择模型后调用)
2. Pexels/Unsplash 图库检索
"""
import os
import uuid
import time
import json
import logging
from pathlib import Path
from typing import List, Dict, Optional
from .base_skill import BaseSkill
from config import Config
# 配置日志
logger = logging.getLogger(__name__)
# 缓存路径和TTL
CACHE_FILE = Path.home() / ".cache" / "wechat-mp-auto" / "image_models_cache.json"
CACHE_TTL = 24 * 3600 # 24小时
# 常用生图模型注册表(按 provider 分组)
# 每个条目包含:图像生成 API 路径、认证方式、请求格式
IMAGE_GEN_PROVIDER_MAP = {
# === 国内 ===
"ali-bailian": {
"image_api_path": "/services/aigc/text2image/image-synthesis",
"model": "wanx2.1",
"auth_type": "bearer",
"req_format": "wanx",
"display_name": "通义万图(wanx2.1)",
},
"minimax-cn": {
"image_api_path": "/images/generations",
"model": "image-01",
"auth_type": "bearer",
"req_format": "openai_like",
"display_name": "MiniMax Image-01",
},
"baidu": {
"image_api_path": "/rest/2.0/ernie-vilg/v2/text2image",
"model": "ernie-vilg-v2",
"auth_type": "bearer",
"req_format": "baidu",
"display_name": "百度文心一格",
},
"tencent": {
"image_api_path": "/hunyuan/v1/ai_image",
"model": "hunyuan-image",
"auth_type": "hmac_sha1",
"req_format": "tencent",
"display_name": "腾讯混元",
},
"sensetime": {
"image_api_path": "/v1/visionprotect/risenlp/nlpcg/diffusion",
"model": "nova-smooth",
"auth_type": "bearer",
"req_format": "openai_like",
"display_name": "商汤(nova-smooth)",
},
"bytedance": {
"image_api_path": "/cv/sdxl/txt2img",
"model": "sdxl-txt2img",
"auth_type": "bearer",
"req_format": "openai_like",
"display_name": "字节豆包(SDXL)",
},
"zhipu": {
"image_api_path": "/api/paulgraham/t2i",
"model": "cogview-4",
"auth_type": "bearer",
"req_format": "openai_like",
"display_name": "智谱 CogView-4",
},
# === 国外 ===
"openai": {
"image_api_path": "/v1/images/generations",
"model": "dall-e-3",
"auth_type": "bearer",
"req_format": "openai_dalle",
"display_name": "OpenAI DALL-E 3",
},
"google": {
"image_api_path": "/publishers/google/models/imagen-3/image:predict",
"model": "imagen-3",
"auth_type": "bearer",
"req_format": "google_imagen",
"display_name": "Google Imagen 3",
},
"stability-ai": {
"image_api_path": "/v1/generation/stable-diffusion-xl-1024-v1-0/text-to-image",
"model": "stable-diffusion-xl-1024-v1-0",
"auth_type": "bearer",
"req_format": "sd_api",
"display_name": "Stability AI SDXL",
},
"replicate": {
"image_api_path": "/v1/predictions",
"model": "flux-schnell",
"auth_type": "bearer",
"req_format": "replicate",
"display_name": "Replicate Flux",
},
"aws-bedrock": {
"image_api_path": "/imagegeneration/stabilityai",
"model": "stability.stable-diffusion-xl-v1",
"auth_type": "aws_sigv4",
"req_format": "aws_bedrock",
"display_name": "AWS Bedrock SDXL",
},
"azure-openai": {
"image_api_path": "/openai/deployments/dall-e-3/images/generations?api-version=2024-02-01",
"model": "dall-e-3",
"auth_type": "api_key",
"req_format": "azure_dalle",
"display_name": "Azure OpenAI DALL-E 3",
},
}
# 不支持生图的知名模型(仅作说明,不会被添加到列表)
# - anthropic/claude 系列:纯文本+视觉理解,无生图API
# - openai/gpt-4o / gpt-4-turbo:视觉模型但无生图API
class ImageGeneratorSkill(BaseSkill):
"""配图生成 - 支持 AI 生图 + Pexels/Unsplash 图库"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "images"
try:
self._cache_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"创建缓存目录失败: {str(e)}")
raise
self._pexels_api_key = os.environ.get("PEXELS_API_KEY", "")
self._unsplash_api_key = os.environ.get("UNSPLASH_API_KEY", "")
self._config = Config()
prefs = self._config.get_image_preferences()
self._image_source = prefs.get("image_source")
self._ai_model = prefs.get("ai_model")
logger.info(f"图片来源偏好: {self._image_source}, AI模型偏好: {self._ai_model}")
# ==================== 引导与选择 ====================
def get_image_source_options(self) -> Dict:
"""
返回图片来源选择提示信息。
供调用方引导用户选择图片来源。
"""
has_pexels = bool(self._pexels_api_key)
has_unsplash = bool(self._unsplash_api_key)
search_desc = "从 Pexels/Unsplash 图库搜索免费图片"
if not has_pexels and not has_unsplash:
search_desc = "需配置 PEXELS_API_KEY 和/或 UNSPLASH_API_KEY 环境变量"
return {
"need_user_choice": True,
"choice_type": "image_source",
"message": "请选择封面图和插图的图片来源",
"options": [
{
"id": "ai",
"name": "AI生图",
"description": "调用AI模型生成图片"
},
{
"id": "search",
"name": "图片接口检索",
"description": search_desc,
"disabled": not (has_pexels or has_unsplash)
}
]
}
def _read_cache(self) -> Optional[List[Dict]]:
"""从缓存读取已探测的生图模型列表"""
if not CACHE_FILE.exists():
return None
try:
with open(CACHE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
ts = data.get("ts", 0)
if time.time() - ts < CACHE_TTL:
models = data.get("models", [])
logger.info(f"从缓存读取到 {len(models)} 个已探测的生图模型")
return models
else:
logger.debug("缓存已过期,需要重新探测")
except Exception as e:
logger.warning(f"读取生图模型缓存失败: {e}")
return None
def _write_cache(self, models: List[Dict]):
"""将探测结果写入缓存"""
try:
CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump({"ts": time.time(), "models": models}, f, ensure_ascii=False, indent=2)
logger.debug(f"已写入缓存: {len(models)} 个生图模型")
except Exception as e:
logger.warning(f"写入生图模型缓存失败: {e}")
def _prefilter_models(self) -> List[Dict]:
"""
初筛:从 OpenClaw 配置中读取所有模型,
过滤出可能是生图模型的候选(input含image 或 api含image)。
"""
candidates = []
try:
config_file = Path.home() / ".openclaw" / "openclaw.json"
if not config_file.exists():
logger.warning("未找到 OpenClaw 配置文件")
return []
with open(config_file, "r", encoding="utf-8") as f:
data = json.load(f)
providers = data.get("models", {}).get("providers", {})
for provider, cfg in providers.items():
api_type = cfg.get("api", "")
base_url = cfg.get("baseUrl", "")
# 只处理注册表中存在的 provider
if provider not in IMAGE_GEN_PROVIDER_MAP:
continue
for m in cfg.get("models", []):
model_id = m.get("id", "")
model_name = m.get("name", model_id)
inputs = m.get("input", [])
# 初筛条件:input 包含 "image" 或 api 类型包含 "image"
is_candidate = (
("image" in inputs) or
("image" in api_type.lower())
)
if is_candidate:
candidates.append({
"id": model_id,
"name": model_name,
"provider": provider,
"base_url": base_url,
})
logger.debug(f"初筛候选生图模型: {provider}/{model_id}")
logger.info(f"初筛出 {len(candidates)} 个候选生图模型")
except Exception as e:
logger.error(f"读取 OpenClaw 模型配置失败: {e}")
return candidates
def _get_credential(self, model_id: str) -> Optional[Dict]:
"""根据模型 ID 从 OpenClaw 配置获取 provider 的 API 凭证"""
try:
import json
# 读 credentials
cred_file = Path.home() / ".openclaw" / "credentials" / "api-keys.json"
if cred_file.exists():
with open(cred_file) as f:
creds = json.load(f)
# 读 model 配置,找 provider
config_file = Path.home() / ".openclaw" / "openclaw.json"
if config_file.exists():
with open(config_file) as f:
config = json.load(f)
providers = config.get("models", {}).get("providers", {})
for pname, pcfg in providers.items():
for m in pcfg.get("models", []):
if m.get("id") == model_id:
api_key = creds.get(pname, {}).get("apiKey", "")
return {
"provider": pname,
"apiKey": api_key,
"baseUrl": pcfg.get("baseUrl", ""),
}
except Exception as e:
logger.warning(f"读取 OpenClaw 凭证失败: {e}")
return None
def _build_probe_request(self, provider: str, req_format: str) -> Dict:
"""构造探测请求(最小化 prompt,低分辨率)"""
prompt = "a simple red circle" # 最简 prompt
size = "512x512"
if req_format == "openai_dalle":
return {"model": "dall-e-3", "prompt": prompt, "n": 1, "size": "1024x1024"}
elif req_format == "google_imagen":
return {"prompt": prompt, "image_size": {"height": 512, "width": 512}, "sample_count": 1}
elif req_format == "wanx":
return {"model": "wanx2.1", "input": {"prompt": prompt}, "parameters": {"size": size, "n": 1}}
elif req_format == "sd_api":
return {"text_prompts": [{"text": prompt}], "cfg_scale": 7.5, "height": 512, "width": 512}
elif req_format == "replicate":
# flux-schnell 的探测格式
return {"version": "acl IpShpGBJlsNMENjkEZJDlFNMJAEqSby", "input": {"prompt": prompt, "num_outputs": 1}}
elif req_format == "baidu":
return {"text": prompt, "image_size": "512*512", "style": "adv_flat"}
elif req_format == "tencent":
return {"prompt": prompt, "width": 512, "height": 512, "version": "v1.5"}
elif req_format == "azure_dalle":
return {"prompt": prompt, "n": 1, "size": "1024x1024"}
elif req_format == "aws_bedrock":
return {"text_prompts": [{"text": prompt}], "cfg_scale": 7.5, "height": 512, "width": 512}
else:
# 默认 OpenAI 兼容格式
return {"model": "image-01", "prompt": prompt, "n": 1, "size": size}
def _parse_probe_response(self, provider: str, req_format: str, resp_data: Dict) -> Optional[str]:
"""解析探测响应,提取图片 URL"""
if req_format == "wanx":
return resp_data.get("output", {}).get("image_url")
elif req_format in ("openai_like", "openai_dalle", "azure_dalle"):
return resp_data.get("data", [{}])[0].get("url")
elif req_format == "google_imagen":
predictions = resp_data.get("predictions", [])
if predictions:
return predictions[0].get("bytesBase64Encoded")
return None
elif req_format == "sd_api":
artifacts = resp_data.get("artifacts", [])
if artifacts:
return artifacts[0].get("base64")
return None
elif req_format == "replicate":
# Replicate 返回的是 prediction 对象
if resp_data.get("status") == "succeeded":
output = resp_data.get("output")
if isinstance(output, list) and output:
return output[0]
return output
return None
elif req_format == "baidu":
return resp_data.get("data", [{}])[0].get("url")
elif req_format == "tencent":
return resp_data.get("data", {}).get("image_url")
elif req_format == "aws_bedrock":
artifacts = resp_data.get("artifacts", [])
if artifacts:
return artifacts[0].get("base64")
return None
return None
def _probe_model(self, model: Dict) -> bool:
"""
探测单个模型是否具备生图能力。
返回 True 表示可用,返回 False 表示不具备或探测失败。
"""
provider = model.get("provider", "")
model_id = model.get("id", "")
base_url = model.get("base_url", "")
if provider not in IMAGE_GEN_PROVIDER_MAP:
logger.debug(f"Provider {provider} 不在注册表中,跳过探测")
return False
provider_info = IMAGE_GEN_PROVIDER_MAP[provider]
api_path = provider_info.get("image_api_path", "")
auth_type = provider_info.get("auth_type", "bearer")
req_format = provider_info.get("req_format", "openai_like")
# 获取 API Key
creds = self._get_credential(model_id)
if not creds or not creds.get("apiKey"):
logger.warning(f"模型 {model_id} 缺少 API Key,跳过探测")
return False
api_key = creds.get("apiKey", "")
# 构造请求
url = f"{base_url}{api_path}"
headers = {"Content-Type": "application/json"}
if auth_type == "bearer":
headers["Authorization"] = f"Bearer {api_key}"
elif auth_type == "api_key":
headers["api-key"] = api_key
payload = self._build_probe_request(provider, req_format)
try:
import requests
logger.info(f"探测生图能力: {provider}/{model_id} -> {url}")
resp = requests.post(url, json=payload, headers=headers, timeout=60)
if resp.status_code not in (200, 201):
logger.warning(f"探测失败 [{provider}/{model_id}]: HTTP {resp.status_code} - {resp.text[:200]}")
return False
data = resp.json()
image_url = self._parse_probe_response(provider, req_format, data)
if image_url:
logger.info(f"✓ 探测成功 [{provider}/{model_id}]: 具备生图能力")
return True
else:
logger.warning(f"探测响应无图片 [{provider}/{model_id}]: {str(data)[:200]}")
return False
except Exception as e:
logger.warning(f"探测异常 [{provider}/{model_id}]: {e}")
return False
def _get_openclaw_models(self) -> List[Dict]:
"""
获取已配置且具备生图能力的模型列表。
流程:缓存 → 初筛 → 探测 → 缓存结果
"""
# 1. 先尝试从缓存读取
cached = self._read_cache()
if cached is not None:
return cached
# 2. 初筛候选模型
candidates = self._prefilter_models()
if not candidates:
logger.info("无候选生图模型(初筛为空)")
self._write_cache([])
return []
# 3. 逐个探测生图能力
valid_models = []
for model in candidates:
if self._probe_model(model):
valid_models.append(model)
# 4. 写入缓存
self._write_cache(valid_models)
logger.info(f"生图模型探测完成: {len(valid_models)}/{len(candidates)} 个可用")
return valid_models
def get_ai_model_options(self) -> Dict:
"""
返回可选的AI生图模型列表。
从 OpenClaw 配置动态读取并探测已配置的生图模型。
"""
models = self._get_openclaw_models()
if models:
return {
"need_user_choice": True,
"choice_type": "ai_model",
"message": f"已探测到 {len(models)} 个具备生图能力的模型,请选择",
"options": [{"id": m["id"], "name": f"{m['name']} ({m['provider']})"} for m in models]
}
# 没有生图模型时,给出明确提示
return {
"need_user_choice": False,
"choice_type": "ai_model",
"message": (
"⚠️ 当前 OpenClaw 未配置任何生图模型,或配置的模型探测失败。\n"
"请先在 OpenClaw 中配置生图模型(如阿里云通义万图 wanx2.1、MiniMax image-01、"
"OpenAI dall-e-3 等),或改用「图片接口检索」方式获取图片。\n\n"
"支持的生图模型 Provider:\n"
"国内:阿里云(ali-bailian/wanx)、MiniMax、百度、腾讯混元、智谱、商汤、字节\n"
"国外:OpenAI(DALL-E 3)、Google(Imagen 3)、Stability AI、Replicate、AWS Bedrock、Azure OpenAI"
),
"options": []
}
def set_user_choice(self, source: str = None, model_id: str = None):
"""
保存用户选择到配置文件。
Args:
source: 图片来源,"ai" 或 "search"
model_id: AI模型ID(仅 source="ai" 时需要)
"""
if source:
self._config.set_image_source_preference(source)
self._image_source = source
if source == "search":
self._ai_model = None
if model_id:
self._config.set_ai_model_preference(model_id)
self._ai_model = model_id
def _check_and_prompt_selection(self, img_type: str) -> Dict:
"""
检查用户是否已做选择,未选择则返回选择提示。
每次都重新读取配置文件,确保获取最新偏好状态。
返回 {"proceed": False, "choice_info": {...}} 或 {"proceed": True}
"""
# 重新读取配置,确保拿到最新偏好
prefs = self._config.get_image_preferences()
image_source = prefs.get("image_source")
ai_model = prefs.get("ai_model")
if image_source is None:
return {
"proceed": False,
"choice_info": self.get_image_source_options()
}
if image_source == "ai" and ai_model is None:
return {
"proceed": False,
"choice_info": self.get_ai_model_options()
}
return {"proceed": True}
# ==================== 生图主方法 ====================
def generate_cover(self, title: str, keywords: List[str]) -> str:
"""生成封面图"""
if not title or not isinstance(title, str):
raise ValueError("title不能为空且必须是字符串")
if keywords is not None:
if not isinstance(keywords, list):
raise ValueError("keywords必须是列表")
keywords = keywords[:10]
try:
logger.info(f"开始生成封面图: {title[:50]}...")
cover_keywords = self._extract_cover_keywords(title, keywords)
if self._image_source == "ai":
img_path = self._generate_by_ai(title, "cover")
if img_path:
logger.info(f"AI生成封面图成功: {img_path}")
return img_path
elif self._image_source == "search":
images = None
for retry in range(3):
images = self._search_all(cover_keywords)
if images:
break
logger.info(f"封面搜图第{retry+1}次为空,重试...")
if images:
img_path = self._download_image(images[0], "cover", max_width=900, max_height=500)
if img_path:
logger.info(f"图库下载封面图成功: {img_path}")
return img_path
logger.warning(f"未能生成封面图: {title[:50]}...")
return None
except Exception as e:
logger.error(f"生成封面图失败: {str(e)}", exc_info=True)
raise
def generate_illustration(self, section: str, keywords: List[str]) -> str:
"""生成插图"""
if not section or not isinstance(section, str):
raise ValueError("section不能为空且必须是字符串")
if keywords is not None:
if not isinstance(keywords, list):
raise ValueError("keywords必须是列表")
try:
logger.info(f"开始生成插图: {section[:50]}...")
illust_keywords = self._extract_illustration_keywords(section, keywords)
if self._image_source == "ai":
img_path = self._generate_by_ai(section, "illustration")
if img_path:
logger.info(f"AI生成插图成功: {img_path}")
return img_path
elif self._image_source == "search":
images = None
for retry in range(3):
images = self._search_all(illust_keywords)
if images:
break
logger.info(f"插图搜图第{retry+1}次为空,重试...")
if images:
img_path = self._download_image(images[0], "illustration", max_width=900, max_height=400)
if img_path:
logger.info(f"图库下载插图成功: {img_path}")
return img_path
logger.warning(f"未能生成插图: {section[:50]}...")
return None
except Exception as e:
logger.error(f"生成插图失败: {str(e)}", exc_info=True)
raise
# ==================== AI 生图 ====================
def _generate_by_ai(self, prompt: str, img_type: str) -> Optional[str]:
"""根据用户选择的模型调用对应的图像生成 API"""
if not self._ai_model:
logger.warning("未选择AI模型,无法生成")
return None
full_prompt = self._build_ai_prompt(prompt, img_type)
size = "1792x1024" if img_type == "cover" else "1024x1024"
# 读取 OpenClaw credentials 获取 API key
creds = self._get_credential(self._ai_model)
if not creds:
logger.warning(f"未找到模型 {self._ai_model} 的 API 凭证")
return None
api_key = creds.get("apiKey", "")
provider = creds.get("provider", "")
base_url = creds.get("baseUrl", "")
# 根据 provider 调用对应的图像生成 API
if provider == "ali-bailian":
return self._generate_by_wanx(full_prompt, size, api_key, base_url)
elif provider == "minimax-cn":
return self._generate_by_minimax(full_prompt, size, api_key, base_url)
elif provider == "moonshot":
return self._generate_by_moonshot_vl(full_prompt, size, api_key, base_url)
elif provider == "ollama":
return self._generate_by_ollama(full_prompt, size, api_key, base_url)
elif provider == "openai":
return self._generate_by_openai_dalle(full_prompt, size, api_key, base_url)
elif provider == "google":
return self._generate_by_google_imagen(full_prompt, size, api_key, base_url)
elif provider == "stability-ai":
return self._generate_by_stability(full_prompt, size, api_key, base_url)
elif provider == "azure-openai":
return self._generate_by_azure_dalle(full_prompt, size, api_key, base_url)
elif provider == "aws-bedrock":
return self._generate_by_aws_bedrock(full_prompt, size, api_key, base_url)
elif provider == "replicate":
return self._generate_by_replicate(full_prompt, size, api_key, base_url)
elif provider == "baidu":
return self._generate_by_baidu(full_prompt, size, api_key, base_url)
elif provider == "tencent":
return self._generate_by_tencent(full_prompt, size, api_key, base_url)
elif provider == "zhipu":
return self._generate_by_zhipu(full_prompt, size, api_key, base_url)
elif provider == "sensetime":
return self._generate_by_sensetime(full_prompt, size, api_key, base_url)
elif provider == "bytedance":
return self._generate_by_bytedance(full_prompt, size, api_key, base_url)
else:
logger.warning(f"Provider {provider} 的图像生成 API 暂未实现")
return None
def _generate_by_wanx(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""通义万图(wanx)图像生成 API"""
try:
import requests
url = f"{base_url}/services/aigc/text2image/image-synthesis"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"model": "wanx2.1",
"input": {"prompt": prompt},
"parameters": {"size": size, "n": 1}
}
logger.info(f"调用 wanx 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"wanx API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("output", {}).get("image_url", "")
if image_url:
return self._download_from_url(image_url, "wanx")
logger.warning(f"wanx 未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"wanx 图像生成失败: {e}")
return None
def _generate_by_minimax(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""MiniMax 图像生成 API"""
try:
import requests
# MiniMax 使用 OpenAI 兼容格式
url = f"{base_url}/images/generations"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# 解析 size 如 1792x1024 -> 1792x1024
payload = {
"model": "image-01",
"prompt": prompt,
"size": size,
"n": 1
}
logger.info(f"调用 MiniMax 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"MiniMax API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("data", [{}])[0].get("url", "")
if image_url:
return self._download_from_url(image_url, "minimax")
logger.warning(f"MiniMax 未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"MiniMax 图像生成失败: {e}")
return None
def _generate_by_moonshot_vl(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""Moonshot (Kimi) 视觉模型不支持图像生成,返回 None"""
logger.warning("Moonshot/Kimi 视觉模型暂不支持图像生成,请选择通义万图或 MiniMax")
return None
def _generate_by_ollama(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""Ollama 本地模型(如 llava、sdxs 等多模态模型)"""
try:
import requests
# Ollama 生图通常使用 /api/generate 接口
# 先尝试标准格式
url = f"{base_url}/api/generate"
headers = {"Content-Type": "application/json"}
# 解析 size
width, height = map(int, size.split("x"))
payload = {
"model": "sdxs", # 默认使用 sdxs 模型,用户需确保本地已安装
"prompt": prompt,
"width": width,
"height": height,
"stream": False
}
logger.info(f"调用 Ollama 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=180)
if resp.status_code != 200:
logger.warning(f"Ollama API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("response", {})
# Ollama 图像生成可能返回 base64 或 URL
if isinstance(image_url, dict) and image_url.get("url"):
return self._download_from_url(image_url["url"], "ollama")
elif isinstance(image_url, str) and image_url.startswith("data:"):
# base64 格式,直接保存
import base64
img_data = image_url.split(",", 1)
if len(img_data) == 2:
img_bytes = base64.b64decode(img_data[1])
filename = f"ollama_{uuid.uuid4().hex[:8]}.png"
filepath = self._cache_dir / filename
with open(filepath, "wb") as f:
f.write(img_bytes)
return str(filepath)
logger.warning(f"Ollama 未返回有效图片: {str(data)[:200]}")
except Exception as e:
logger.error(f"Ollama 图像生成失败: {e}")
return None
def _generate_by_openai_dalle(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""OpenAI DALL-E 图像生成 API"""
try:
import requests
# OpenAI 兼容格式
url = f"{base_url}/v1/images/generations"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"model": "dall-e-3",
"prompt": prompt,
"n": 1,
"size": "1024x1024",
"response_format": "url"
}
logger.info(f"调用 OpenAI DALL-E 3 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"OpenAI DALL-E API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("data", [{}])[0].get("url", "")
if image_url:
return self._download_from_url(image_url, "openai")
logger.warning(f"OpenAI DALL-E 未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"OpenAI DALL-E 图像生成失败: {e}")
return None
def _generate_by_google_imagen(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""Google Imagen 图像生成 API"""
try:
import requests
# Google Vertex AI Imagen API
url = f"{base_url}/publishers/google/models/imagen-3/image:predict"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
width, height = map(int, size.split("x"))
payload = {
"prompt": prompt,
"image_size": {"height": height, "width": width},
"sample_count": 1,
"aspect_ratio": "16:9" if width > height else "1:1"
}
logger.info(f"调用 Google Imagen 3 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=180)
if resp.status_code != 200:
logger.warning(f"Google Imagen API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
predictions = data.get("predictions", [])
if predictions:
# Imagen 返回 base64
bytes_data = predictions[0].get("bytesBase64Encoded", "")
if bytes_data:
import base64
img_bytes = base64.b64decode(bytes_data)
filename = f"imagen_{uuid.uuid4().hex[:8]}.png"
filepath = self._cache_dir / filename
with open(filepath, "wb") as f:
f.write(img_bytes)
logger.info(f"Google Imagen 图片保存成功: {filepath}")
return str(filepath)
logger.warning(f"Google Imagen 未返回图片: {str(data)[:200]}")
except Exception as e:
logger.error(f"Google Imagen 图像生成失败: {e}")
return None
def _generate_by_stability(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""Stability AI 图像生成 API"""
try:
import requests
url = f"{base_url}/v1/generation/stable-diffusion-xl-1024-v1-0/text-to-image"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json"
}
width, height = map(int, size.split("x"))
payload = {
"text_prompts": [{"text": prompt, "weight": 1.0}],
"cfg_scale": 7.5,
"height": min(height, 1024),
"width": min(width, 1024),
"samples": 1,
"steps": 30
}
logger.info(f"调用 Stability AI SDXL 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=180)
if resp.status_code != 200:
logger.warning(f"Stability AI API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
artifacts = data.get("artifacts", [])
if artifacts:
import base64
img_bytes = base64.b64decode(artifacts[0].get("base64", ""))
filename = f"stability_{uuid.uuid4().hex[:8]}.png"
filepath = self._cache_dir / filename
with open(filepath, "wb") as f:
f.write(img_bytes)
logger.info(f"Stability AI 图片保存成功: {filepath}")
return str(filepath)
logger.warning(f"Stability AI 未返回图片: {str(data)[:200]}")
except Exception as e:
logger.error(f"Stability AI 图像生成失败: {e}")
return None
def _generate_by_azure_dalle(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""Azure OpenAI DALL-E 图像生成 API"""
try:
import requests
# Azure OpenAI 使用部署名称而非 dall-e-3
url = f"{base_url}/openai/deployments/dall-e-3/images/generations?api-version=2024-02-01"
headers = {
"api-key": api_key,
"Content-Type": "application/json",
}
payload = {
"prompt": prompt,
"n": 1,
"size": "1024x1024"
}
logger.info(f"调用 Azure OpenAI DALL-E 3 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"Azure DALL-E API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("data", [{}])[0].get("url", "")
if image_url:
return self._download_from_url(image_url, "azure")
logger.warning(f"Azure DALL-E 未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"Azure DALL-E 图像生成失败: {e}")
return None
def _generate_by_aws_bedrock(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""AWS Bedrock Stability AI 图像生成 API"""
try:
import requests
# AWS Bedrock 需要 AWS Signature V4 签名
# API Key 作为 Authorization header(部分网关支持)
url = f"{base_url}/imagegeneration/stabilityai/stable-diffusion-xl-v1"
width, height = map(int, size.split("x"))
payload = {
"text_prompts": [{"text": prompt, "weight": 1.0}],
"cfg_scale": 7.5,
"height": min(height, 1024),
"width": min(width, 1024),
"steps": 30,
"samples": 1
}
logger.info(f"调用 AWS Bedrock SDXL 图像生成 API...")
# AWS Bedrock 需要 AWS 签名,这里使用 Basic Auth(如果配置了)
headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
resp = requests.post(url, json=payload, headers=headers, timeout=180)
if resp.status_code != 200:
logger.warning(f"AWS Bedrock API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
artifacts = data.get("artifacts", [])
if artifacts:
import base64
img_bytes = base64.b64decode(artifacts[0].get("base64", ""))
filename = f"bedrock_{uuid.uuid4().hex[:8]}.png"
filepath = self._cache_dir / filename
with open(filepath, "wb") as f:
f.write(img_bytes)
logger.info(f"AWS Bedrock 图片保存成功: {filepath}")
return str(filepath)
logger.warning(f"AWS Bedrock 未返回图片: {str(data)[:200]}")
except Exception as e:
logger.error(f"AWS Bedrock 图像生成失败: {e}")
return None
def _generate_by_replicate(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""Replicate Flux 图像生成 API"""
try:
import requests
# Replicate 使用预测接口
url = f"{base_url}/v1/predictions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
payload = {
"version": "acl IpShpGBJlsNMENjkEZJDlFNMJAEqSby",
"input": {
"prompt": prompt,
"num_outputs": 1,
"aspect_ratio": "16:9" if "x" in size and int(size.split("x")[0]) > int(size.split("x")[1]) else "1:1",
"output_format": "jpg",
"safety_checker": "yes"
}
}
logger.info(f"调用 Replicate Flux 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=30)
if resp.status_code not in (200, 201):
logger.warning(f"Replicate API 返回 {resp.status_code}: {resp.text[:200]}")
return None
# Replicate 是异步的,需要轮询
prediction = resp.json()
prediction_id = prediction.get("id")
poll_url = f"{base_url}/v1/predictions/{prediction_id}"
import time
for _ in range(60): # 最多等 60 * 5 = 300 秒
time.sleep(5)
poll_resp = requests.get(poll_url, headers=headers, timeout=30)
if poll_resp.status_code == 200:
result = poll_resp.json()
if result.get("status") == "succeeded":
output = result.get("output")
if isinstance(output, list) and output:
image_url = output[0]
if image_url.startswith("http"):
return self._download_from_url(image_url, "replicate")
elif image_url.startswith("data:"):
# base64
import base64
img_data = image_url.split(",", 1)
if len(img_data) == 2:
img_bytes = base64.b64decode(img_data[1])
filename = f"replicate_{uuid.uuid4().hex[:8]}.jpg"
filepath = self._cache_dir / filename
with open(filepath, "wb") as f:
f.write(img_bytes)
return str(filepath)
break
elif result.get("status") == "failed":
logger.warning(f"Replicate 生成失败: {result.get('error')}")
break
logger.warning("Replicate 图像生成超时")
except Exception as e:
logger.error(f"Replicate 图像生成失败: {e}")
return None
def _generate_by_baidu(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""百度文心一格图像生成 API"""
try:
import requests
url = f"{base_url}/rest/2.0/ernie-vilg/v2/text2image"
headers = {"Content-Type": "application/json"}
# 解析 size 如 1792x1024 -> 1024*1024
width, height = map(int, size.split("x"))
# 百度只支持特定尺寸
if width == 1792 and height == 1024:
size_str = "16:9"
elif width == height:
size_str = "1:1"
else:
size_str = "1:1"
payload = {
"text": prompt,
"image_size": size_str,
"style": "adv_flat",
"num": 1
}
logger.info(f"调用百度文心一格图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"百度 API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
if "data" in data and data["data"]:
image_url = data["data"][0].get("url", "")
if image_url:
return self._download_from_url(image_url, "baidu")
logger.warning(f"百度文心未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"百度文心图像生成失败: {e}")
return None
def _generate_by_tencent(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""腾讯混元图像生成 API"""
try:
import requests
# 腾讯混元使用 HMAC SHA1 认证
import hmac
import hashlib
import time
url = f"{base_url}/hunyuan/v1/ai_image"
headers = {"Content-Type": "application/json"}
width, height = map(int, size.split("x"))
payload = {
"prompt": prompt,
"width": min(width, 1024),
"height": min(height, 1024),
"version": "v1.5",
"samples": 1
}
logger.info(f"调用腾讯混元图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"腾讯混元 API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("data", {}).get("image_url", "")
if image_url:
return self._download_from_url(image_url, "tencent")
logger.warning(f"腾讯混元未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"腾讯混元图像生成失败: {e}")
return None
def _generate_by_zhipu(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""智谱 CogView-4 图像生成 API"""
try:
import requests
url = f"{base_url}/api/paulgraham/t2i"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
width, height = map(int, size.split("x"))
payload = {
"prompt": prompt,
"size": f"{width}x{height}",
"n": 1
}
logger.info(f"调用智谱 CogView-4 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"智谱 API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
# 智谱可能返回 data[0].url 或直接 url
image_url = (
data.get("data", [{}])[0].get("url") or
data.get("data", {}).get("url") or
data.get("url", "")
)
if image_url:
return self._download_from_url(image_url, "zhipu")
logger.warning(f"智谱 CogView 未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"智谱 CogView 图像生成失败: {e}")
return None
def _generate_by_sensetime(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""商汤 nova-smooth 图像生成 API"""
try:
import requests
url = f"{base_url}/v1/visionprotect/risenlp/nlpcg/diffusion"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
width, height = map(int, size.split("x"))
payload = {
"prompt": prompt,
"width": width,
"height": height,
"num_images": 1
}
logger.info(f"调用商汤 nova-smooth 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"商汤 API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("data", {}).get("image_url", "")
if not image_url:
image_url = data.get("image_url", "")
if image_url:
return self._download_from_url(image_url, "sensetime")
logger.warning(f"商汤未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"商汤图像生成失败: {e}")
return None
def _generate_by_bytedance(self, prompt: str, size: str, api_key: str, base_url: str) -> Optional[str]:
"""字节豆包 SDXL 图像生成 API"""
try:
import requests
url = f"{base_url}/cv/sdxl/txt2img"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
width, height = map(int, size.split("x"))
payload = {
"prompt": prompt,
"width": width,
"height": height,
"num_images": 1,
"style_id": 0
}
logger.info(f"调用字节豆包 SDXL 图像生成 API...")
resp = requests.post(url, json=payload, headers=headers, timeout=120)
if resp.status_code != 200:
logger.warning(f"字节豆包 API 返回 {resp.status_code}: {resp.text[:200]}")
return None
data = resp.json()
image_url = data.get("data", {}).get("image_url", "")
if not image_url:
image_url = data.get("image_url", "")
if image_url:
return self._download_from_url(image_url, "bytedance")
logger.warning(f"字节豆包未返回图片 URL: {str(data)[:200]}")
except Exception as e:
logger.error(f"字节豆包图像生成失败: {e}")
return None
def _build_ai_prompt(self, prompt: str, img_type: str) -> str:
"""构建 AI 生成的 prompt"""
style_guide = (
"微信公众号文章配图,简约现代风格,"
"清新配色,专业商务感,适合文章内容可视化。"
)
if img_type == "cover":
return f"{prompt},{style_guide},横版图片 16:9 比例,高清摄影风格"
else:
return f"{prompt},{style_guide},方形图片 1:1 比例,插画风格"
def _compress_image(self, filepath: str, max_width: int = 900, max_height: int = 900, quality: int = 85) -> bool:
"""使用 Pillow 压缩并缩放图片,返回是否成功"""
try:
from PIL import Image
with Image.open(filepath) as img:
# 转换模式(RGBA/P/LA/PA → RGB,避免 JPEG 不支持)
if img.mode in ("RGBA", "P", "LA", "PA"):
rgb_img = img.convert("RGB")
else:
rgb_img = img
# 缩放,保证不超过 max_width × max_height
rgb_img.thumbnail((max_width, max_height), Image.LANCZOS)
rgb_img.save(filepath, "JPEG", quality=quality, optimize=True)
return True
except ImportError:
logger.warning("Pillow 未安装,图片未压缩")
return False
except Exception as e:
logger.warning(f"图片压缩失败,使用原图: {e}")
return False
def _download_from_url(self, url: str, prefix: str = "ai",
max_width: int = 900, max_height: int = 900) -> Optional[str]:
"""从 URL 下载图片"""
try:
import requests
filename = f"{prefix}_{uuid.uuid4().hex[:8]}.jpg"
filepath = self._cache_dir / filename
resp = requests.get(url, timeout=60)
if resp.status_code == 200:
with open(filepath, "wb") as f:
f.write(resp.content)
logger.info(f"AI 图片下载成功: {filepath}")
# 压缩图片
self._compress_image(str(filepath), max_width=max_width, max_height=max_height)
return str(filepath)
else:
logger.warning(f"AI 图片下载失败 HTTP {resp.status_code}")
except Exception as e:
logger.error(f"下载 AI 图片失败: {str(e)}")
return None
# ==================== 图库检索 ====================
def _search_all(self, keywords: str, count: int = 5) -> List[Dict]:
"""双 API 搜索,自动切换"""
if not keywords or not isinstance(keywords, str):
return []
if not isinstance(count, int) or count < 1 or count > 30:
count = 5
try:
if self._pexels_api_key:
images = self._search_pexels(keywords, count)
if images:
return images
if self._unsplash_api_key:
images = self._search_unsplash(keywords, count)
if images:
return images
logger.warning(f"所有图库搜索均未返回结果: {keywords}")
return []
except Exception as e:
logger.error(f"图片搜索失败: {str(e)}", exc_info=True)
return []
def _search_pexels(self, keywords: str, count: int = 5) -> List[Dict]:
if not self._pexels_api_key:
return []
try:
import requests
url = f"https://api.pexels.com/v1/search?query={keywords}&per_page={count}&orientation=landscape"
headers = {"Authorization": self._pexels_api_key}
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code != 200:
return []
data = resp.json()
photos = data.get("photos", [])
return [
{
# "large" 约 1050px,"medium" 约 350px,用 large 兼顾质量与大小
"url": p.get("src", {}).get("large"),
"thumb_url": p.get("src", {}).get("medium"),
"author": p.get("photographer", "Unknown"),
"source": "Pexels"
}
for p in photos
]
except Exception as e:
logger.error(f"Pexels搜索失败: {str(e)}")
return []
def _search_unsplash(self, keywords: str, count: int = 5) -> List[Dict]:
if not self._unsplash_api_key:
return []
try:
import requests
url = f"https://api.unsplash.com/search/photos?query={keywords}&per_page={count}&orientation=landscape"
headers = {"Authorization": f"Client-ID {self._unsplash_api_key}"}
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code != 200:
return []
data = resp.json()
results = data.get("results", [])
return [
{
"url": r.get("urls", {}).get("regular"),
"thumb_url": r.get("urls", {}).get("thumb"),
"author": r.get("user", {}).get("name", "Unknown"),
"source": "Unsplash"
}
for r in results
]
except Exception as e:
logger.error(f"Unsplash搜索失败: {str(e)}")
return []
def _download_image(self, image_info: Dict, prefix: str,
max_width: int = 900, max_height: int = 600) -> Optional[str]:
"""下载图片(带压缩)"""
url = image_info.get("url", "")
if not url:
return None
allowed_domains = ["images.pexels.com", "unsplash.com", "plus.unsplash.com"]
if not any(domain in url for domain in allowed_domains):
return None
try:
import requests
filename = f"{prefix}_{uuid.uuid4().hex[:8]}.jpg"
filepath = self._cache_dir / filename
if ".." in str(filepath) or str(filepath).startswith("/"):
filepath = filepath.resolve()
if not str(filepath).startswith(str(self._cache_dir.resolve())):
return None
resp = requests.get(url, timeout=30)
if resp.status_code == 200:
with open(filepath, "wb") as f:
f.write(resp.content)
logger.info(f"图片下载成功: {filepath}")
# 压缩图片
self._compress_image(str(filepath), max_width=max_width, max_height=max_height)
return str(filepath)
except Exception as e:
logger.error(f"图片下载失败: {str(e)}")
return None
# ==================== 工具方法 ====================
def _extract_cover_keywords(self, title: str, keywords: List[str]) -> str:
if not title:
return ""
return " ".join(title.split()[:3] + (keywords[:3] if keywords else []))
def _extract_illustration_keywords(self, section: str, keywords: List[str]) -> str:
if not section:
return ""
return " ".join(section.split()[:3])
def generate_and_upload(self, title_or_section: str, keywords: List[str],
material_skill, img_type: str = "illustration") -> Dict:
"""生成图片并上传到微信素材库"""
result = {"local_path": None, "wechat_url": None, "media_id": None}
try:
if img_type == "cover":
local_path = self.generate_cover(title_or_section, keywords)
else:
local_path = self.generate_illustration(title_or_section, keywords)
if not local_path or not os.path.exists(local_path):
logger.warning(f"[{img_type}] 图片生成失败: {title_or_section[:30]}")
return result
result["local_path"] = local_path
upload_result = material_skill.upload_image(local_path)
if upload_result:
result["wechat_url"] = upload_result.get("url", "")
result["media_id"] = upload_result.get("media_id", "")
logger.info(f"[{img_type}] 生成并上传成功: {result['wechat_url'][:50] if result['wechat_url'] else 'failed'}...")
return result
except Exception as e:
logger.error(f"[{img_type}] 生成上传失败: {e}")
return result
def search_image(self, query: str, count: int = 5) -> List[Dict]:
"""搜索图库图片并下载到本地
Args:
query: 搜索关键词
count: 请求图片数量,默认5张
Returns:
图片信息列表,每个元素含 url(原始链接)和 local_path(本地文件路径)
"""
if not query or not isinstance(query, str):
return []
if not isinstance(count, int) or count < 1:
count = 5
results = []
images = self._search_all(query, count)
for img in images:
local_path = self._download_image(img, "search")
if local_path:
results.append({
"url": img.get("url", ""),
"local_path": local_path
})
if len(results) >= count:
break
return results
def generate_image(self, prompt: str, size: str = "1024x1024") -> Dict:
"""AI 生成图片
Args:
prompt: 图片描述提示词
size: 图片尺寸,默认 1024x1024(可以是 1792x1024 用于封面)
Returns:
{"local_path": "本地文件路径"}
"""
if not prompt or not isinstance(prompt, str):
return {"local_path": None}
img_type = "cover" if "x" in size and int(size.split("x")[0]) > 1024 else "illustration"
local_path = self._generate_by_ai(prompt, img_type)
return {"local_path": local_path}
def batch_generate(self, title: str, sections: List[str], keywords: List[str]) -> Dict:
"""批量生成图片"""
if not title or not isinstance(title, str):
raise ValueError("title不能为空且必须是字符串")
if not sections or not isinstance(sections, list):
raise ValueError("sections不能为空且必须是列表")
sections = sections[:50]
try:
logger.info(f"开始批量生成图片: {title[:50]}..., 共 {len(sections)} 个章节")
result = {"cover": None, "illustrations": []}
result["cover"] = self.generate_cover(title, keywords)
for section in sections:
try:
illust = self.generate_illustration(section, keywords)
if illust:
result["illustrations"].append(illust)
except Exception as e:
logger.warning(f"生成章节插图失败: {section[:30]}... - {str(e)}")
continue
logger.info(f"批量生成完成: 封面={result['cover'] is not None}, 插图={len(result['illustrations'])}张")
return result
except Exception as e:
logger.error(f"批量生成图片失败: {str(e)}", exc_info=True)
raise
FILE:src/skills/image_processor.py
"""
微信公众号自动化 - 图片处理 Skill
"""
import os
import subprocess
import logging
from pathlib import Path
from typing import Dict, Optional, Tuple
from .base_skill import BaseSkill
logger = logging.getLogger(__name__)
class ImageProcessorSkill(BaseSkill):
"""图片处理 - 微信格式"""
COVER_SIZE = (900, 500)
COVER_MAX_SIZE = 2 * 1024 * 1024
ILLUST_MIN_WIDTH = 640
ILLUST_MAX_WIDTH = 1080
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cache_dir = Path.home() / ".cache" / "wechat-mp-auto" / "processed"
self._cache_dir.mkdir(parents=True, exist_ok=True)
def process_cover_image(self, image_path: str) -> Dict:
"""处理封面图"""
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片不存在: {image_path}")
# 1. 调整尺寸
resized = self._resize_image(image_path, self.COVER_SIZE[0], self.COVER_SIZE[1])
# 2. 压缩
compressed = self._compress_image(resized, self.COVER_MAX_SIZE)
# 3. 去水印
cleaned = self.remove_watermark(compressed)
# 4. 转换格式
final = self._convert_format(cleaned, "jpg")
return {"path": final, "width": self.COVER_SIZE[0], "height": self.COVER_SIZE[1], "size": os.path.getsize(final)}
def process_illustration(self, image_path: str) -> Dict:
"""处理插图"""
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片不存在: {image_path}")
original_size = self._get_image_size(image_path)
width = original_size[0]
new_width = width
if width > self.ILLUST_MAX_WIDTH:
new_width = self.ILLUST_MAX_WIDTH
elif width < self.ILLUST_MIN_WIDTH:
new_width = self.ILLUST_MIN_WIDTH
resized = self._resize_image(image_path, new_width, None)
compressed = self._compress_image(resized, 5 * 1024 * 1024)
cleaned = self.remove_watermark(compressed)
final = self._convert_format(cleaned, "jpg")
new_size = self._get_image_size(final)
return {"path": final, "width": new_size[0], "height": new_size[1], "size": os.path.getsize(final)}
def remove_watermark(self, image_path: str) -> str:
"""
去除水印 - 支持多种方法
方法1: 边缘裁剪(适用于角落水印)
方法2: 简单模糊(适用于淡色水印)
"""
if not os.path.exists(image_path):
logger.warning(f"图片不存在,跳过去水印: {image_path}")
return image_path
try:
from PIL import Image, ImageFilter
import uuid
img = Image.open(image_path)
width, height = img.size
# 检测是否为透明背景的 PNG(通常自带水印)
if img.format == 'PNG' and img.mode == 'RGBA':
# 尝试简单去水印:检测并移除半透明区域
img = self._remove_transparent_watermark(img)
# 检测角落是否有水印(通过分析边缘像素)
# 如果图片有明显的角落文字/LOGO,尝试裁剪
corner_watermark = self._detect_corner_watermark(img)
if corner_watermark:
img = self._crop_corner_watermark(img, corner_watermark)
logger.info(f"已裁剪角落水印: {corner_watermark}")
# 如果以上都未处理,尝试边缘裁剪(微信水印通常在角落)
# 默认裁剪右下角 5% 区域
if width > 400 and height > 200:
crop_ratio = 0.05
cropped = img.crop((
0, 0,
int(width * (1 - crop_ratio)),
int(height * (1 - crop_ratio))
))
# 检查裁剪后是否合理
new_w, new_h = cropped.size
if new_w > width * 0.7 and new_h > height * 0.7:
# 保存处理后的图片
output_path = image_path.replace(".jpg", f"_cleaned_{uuid.uuid4().hex[:4]}.jpg")
cropped.convert('RGB').save(output_path, 'JPEG', quality=90)
logger.info(f"去水印处理完成: {output_path}")
return output_path
# 无需处理
return image_path
except ImportError:
logger.warning("PIL 未安装,跳过去水印处理")
return image_path
except Exception as e:
logger.warning(f"去水印处理失败: {str(e)},返回原图")
return image_path
def _detect_corner_watermark(self, img) -> Optional[str]:
"""检测角落是否有水印"""
try:
width, height = img.size
pixels = img.load()
# 检查右下角是否有水印(通常为半透明文字/LOGO)
# 采样右下角 10% 区域
corner_region = []
for x in range(int(width * 0.9), width):
for y in range(int(height * 0.9), height):
if img.mode == 'RGBA':
alpha = pixels[x, y][3]
if alpha > 0 and alpha < 255: # 半透明
return "bottom_right"
elif img.mode == 'RGB':
# 检查非白色像素占比
r, g, b = pixels[x, y]
if r < 250 or g < 250 or b < 250: # 非纯白
return "bottom_right"
return None
except Exception:
return None
def _crop_corner_watermark(self, img, position: str):
"""裁剪角落水印"""
width, height = img.size
crop_ratio = 0.08 # 裁剪 8%
if position == "bottom_right":
return img.crop((
0, 0,
int(width * (1 - crop_ratio)),
int(height * (1 - crop_ratio))
))
elif position == "bottom_left":
return img.crop((
int(width * crop_ratio), 0,
width,
int(height * (1 - crop_ratio))
))
elif position == "top_right":
return img.crop((
0, int(height * crop_ratio),
int(width * (1 - crop_ratio)),
height
))
return img
def _remove_transparent_watermark(self, img) :
"""移除 PNG 中的半透明水印"""
width, height = img.size
pixels = img.load()
# 找出半透明区域
transparent_pixels = []
for x in range(width):
for y in range(height):
if pixels[x, y][3] < 200: # 半透明
transparent_pixels.append((x, y))
# 如果半透明区域超过一定比例,认为是水印
if len(transparent_pixels) > width * height * 0.05:
# 将半透明区域设为完全透明
for x, y in transparent_pixels:
pixels[x, y] = (255, 255, 255, 0)
return img
def _resize_image(self, image_path: str, width: int, height: Optional[int] = None) -> str:
"""调整图片尺寸"""
try:
from PIL import Image
img = Image.open(image_path)
# 保持比例
if height is None:
# 只指定宽度
ratio = width / img.size[0]
height = int(img.size[1] * ratio)
resized = img.resize((width, height), Image.Resampling.LANCZOS)
output = image_path.replace(".jpg", f"_resized_{width}x{height}.jpg")
# 避免覆盖原图
if output == image_path:
output = image_path.rsplit(".", 1)[0] + f"_resized.jpg"
resized.save(output, 'JPEG', quality=90)
return output
except ImportError:
# 降级使用 sips
output = image_path.replace(".jpg", f"_resized.jpg")
cmd = ["sips", "-z", str(height), str(width), image_path, "--out", output]
try:
subprocess.run(cmd, check=True, capture_output=True)
return output
except Exception:
return image_path
except Exception as e:
logger.warning(f"图片缩放失败: {str(e)}")
return image_path
def _compress_image(self, image_path: str, max_size: int) -> str:
"""压缩图片"""
try:
from PIL import Image
img = Image.open(image_path)
current_size = os.path.getsize(image_path)
if current_size <= max_size:
return image_path
# 逐步压缩
quality = 90
while quality > 30:
output = image_path.replace(".jpg", f"_compressed_q{quality}.jpg")
img.save(output, 'JPEG', quality=quality, optimize=True)
if os.path.getsize(output) <= max_size:
return output
quality -= 10
# 最低质量
return output
except ImportError:
return image_path
except Exception as e:
logger.warning(f"图片压缩失败: {str(e)}")
return image_path
def _convert_format(self, image_path: str, format: str) -> str:
"""转换图片格式"""
output = image_path.rsplit(".", 1)[0] + f".{format}"
# 如果格式没变,直接返回
if output == image_path:
return image_path
try:
from PIL import Image
img = Image.open(image_path)
if format.lower() in ['jpg', 'jpeg']:
# 转 RGB(去除 alpha 通道)
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
if img.mode in ('RGBA', 'LA'):
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
img = img.convert('RGB')
img.save(output, format.upper() if format.upper() != 'JPG' else 'JPEG')
return output
except ImportError:
# 降级使用 sips
cmd = ["sips", "-s", f"format {format}", image_path, "--out", output]
try:
subprocess.run(cmd, check=True, capture_output=True)
return output
except Exception:
return image_path
except Exception as e:
logger.warning(f"图片格式转换失败: {str(e)}")
return image_path
def _get_image_size(self, image_path: str) -> Tuple[int, int]:
"""获取图片尺寸"""
try:
from PIL import Image
with Image.open(image_path) as img:
return img.size
except Exception:
return (800, 600)
FILE:src/skills/material_skill.py
"""
微信公众号自动化 - 素材管理 Skill
"""
import os
import logging
import requests
from typing import List, Dict
from pathlib import Path
from .base_skill import BaseSkill
# 配置日志
logger = logging.getLogger(__name__)
class MaterialSkill(BaseSkill):
"""素材管理"""
def upload_image(self, file_path: str) -> Dict:
"""上传图片素材"""
# 参数验证
if not file_path or not isinstance(file_path, str):
logger.error("无效的file_path参数: file_path不能为空且必须是字符串")
raise ValueError("file_path不能为空且必须是字符串")
# 安全检查:验证文件路径
if not os.path.isabs(file_path):
logger.warning(f"建议使用绝对路径: {file_path}")
if not os.path.exists(file_path):
logger.error(f"文件不存在: {file_path}")
raise FileNotFoundError(f"文件不存在: {file_path}")
# 安全检查:验证文件类型
allowed_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'}
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in allowed_extensions:
logger.error(f"不支持的图片格式: {file_ext}")
raise ValueError(f"不支持的图片格式: {file_ext}")
# 安全检查:验证文件大小(微信限制20MB)
try:
file_size = os.path.getsize(file_path)
if file_size > 20 * 1024 * 1024:
logger.error(f"文件过大: {file_size} bytes, 微信限制20MB")
raise ValueError(f"文件过大: {file_size} bytes, 微信限制20MB")
except OSError as e:
logger.error(f"获取文件大小失败: {str(e)}")
raise
try:
logger.info(f"开始上传图片素材: {file_path}")
# 验证 access_token 存在
if not hasattr(self, 'access_token') or not self.access_token:
logger.error("access_token未设置")
raise ValueError("access_token未设置,请先获取有效token")
url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={self.access_token}&type=image"
with open(file_path, "rb") as f:
files = {"media": (os.path.basename(file_path), f, "image/jpeg")}
response = requests.post(url, files=files, timeout=60).json()
if "errcode" in response and response["errcode"] != 0:
from exceptions import APIError
error_msg = response.get("errmsg", "")
logger.error(f"微信API错误: errcode={response['errcode']}, errmsg={error_msg}")
raise APIError(response["errcode"], error_msg)
logger.info(f"图片素材上传成功: media_id={response.get('media_id')}")
return {
"url": response.get("url"),
"media_id": response.get("media_id")
}
except FileNotFoundError:
raise
except ValueError:
raise
except ImportError as e:
logger.error(f"缺少依赖库: {str(e)}")
raise
except Exception as e:
logger.error(f"上传图片素材失败: {str(e)}", exc_info=True)
raise
def upload_thumb(self, file_path: str) -> Dict:
"""上传封面图(缩略图)"""
# 参数验证
if not file_path or not isinstance(file_path, str):
logger.error("无效的file_path参数")
raise ValueError("file_path不能为空且必须是字符串")
# 安全检查:封面图限制100KB
try:
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
if file_size > 100 * 1024:
logger.warning(f"封面图可能过大: {file_size} bytes, 建议不超过100KB")
except OSError:
pass
try:
logger.info(f"开始上传封面图: {file_path}")
return self.upload_image(file_path)
except Exception as e:
logger.error(f"上传封面图失败: {str(e)}", exc_info=True)
raise
def list_materials(self, material_type: str = "image", offset: int = 0, count: int = 20) -> Dict:
"""获取素材列表"""
# 参数验证
allowed_types = ["image", "voice", "video", "news"]
if material_type not in allowed_types:
logger.error(f"无效的material_type: {material_type}")
raise ValueError(f"material_type必须是以下之一: {allowed_types}")
if not isinstance(offset, int) or offset < 0:
logger.error(f"无效的offset: {offset}")
raise ValueError("offset必须是大于等于0的整数")
if not isinstance(count, int) or count < 1 or count > 20:
logger.warning(f"无效的count: {count}, 使用默认值20")
count = 20
try:
logger.info(f"获取素材列表: type={material_type}, offset={offset}, count={count}")
result = self.post("/cgi-bin/material/batchget_material", {
"type": material_type,
"offset": offset,
"count": count
})
logger.info(f"素材列表获取成功: item_count={len(result.get('item', []))}")
return result
except Exception as e:
logger.error(f"获取素材列表失败: {str(e)}", exc_info=True)
raise
def delete_material(self, media_id: str) -> Dict:
"""删除素材"""
# 参数验证
if not media_id or not isinstance(media_id, str):
logger.error("无效的media_id参数")
raise ValueError("media_id不能为空且必须是字符串")
# 安全检查:media_id格式验证(微信media_id通常以特定前缀开头)
if len(media_id) < 10:
logger.error(f"media_id格式可能无效: {media_id}")
raise ValueError("media_id格式无效")
try:
logger.info(f"删除素材: media_id={media_id}")
result = self.post("/cgi-bin/material/delete", {"media_id": media_id})
logger.info(f"素材删除成功: media_id={media_id}")
return result
except Exception as e:
logger.error(f"删除素材失败: {str(e)}", exc_info=True)
raise
FILE:src/skills/message_skill.py
"""
微信公众号自动化 - 消息发送 Skill
向用户发送图文消息
"""
import json
import logging
from typing import List, Dict, Optional, Any
from .base_skill import BaseSkill
logger = logging.getLogger(__name__)
class MessageSkill(BaseSkill):
"""消息发送 - 向用户发送文章"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def send_text(self, openid: str, content: str) -> Dict:
"""发送文本消息
Args:
openid: 用户openid
content: 文本内容
Returns:
{"errcode": 0, "errmsg": "ok"}
"""
try:
result = self._request(
"POST",
"/cgi-bin/message/custom/send",
data={
"touser": openid,
"msgtype": "text",
"text": {"content": content}
}
)
if result.get("errcode") == 0:
logger.info(f"发送文本消息成功: {openid}")
else:
logger.error(f"发送文本消息失败: {result.get('errmsg')}")
return result
except Exception as e:
logger.error(f"发送文本消息异常: {e}")
raise
def send_article(self, openid: str, media_id: str) -> Dict:
"""发送图文消息(需要先上传到素材库)
Args:
openid: 用户openid
media_id: 图文消息的media_id(通过草稿箱转素材获取)
Returns:
{"errcode": 0, "errmsg": "ok"}
"""
try:
result = self._request(
"POST",
"/cgi-bin/message/custom/send",
data={
"touser": openid,
"msgtype": "mpnews",
"mpnews": {"media_id": media_id}
}
)
if result.get("errcode") == 0:
logger.info(f"发送图文消息成功: {openid}, media_id: {media_id}")
else:
logger.error(f"发送图文消息失败: {result.get('errmsg')}")
return result
except Exception as e:
logger.error(f"发送图文消息异常: {e}")
raise
def send_article_link(self, openid: str, title: str, content: str,
url: str, thumb_url: str) -> Dict:
"""发送文章链接消息
Args:
openid: 用户openid
title: 文章标题
content: 文章摘要
url: 文章链接
thumb_url: 封面图片链接
Returns:
{"errcode": 0, "errmsg": "ok"}
"""
try:
result = self._request(
"POST",
"/cgi-bin/message/custom/send",
data={
"touser": openid,
"msgtype": "link",
"link": {
"title": title,
"description": content,
"url": url,
"thumb_url": thumb_url
}
}
)
if result.get("errcode") == 0:
logger.info(f"发送链接消息成功: {openid}")
else:
logger.error(f"发送链接消息失败: {result.get('errmsg')}")
return result
except Exception as e:
logger.error(f"发送链接消息异常: {e}")
raise
def send_to_multiple(self, openids: List[str], message_func, *args, **kwargs) -> Dict:
"""群发消息
Args:
openids: openid列表
message_func: 发送函数(如 self.send_text)
*args, **kwargs: 传递给发送函数的参数
Returns:
{
"success": 成功数,
"failed": 失败数,
"results": [每个openid的发送结果]
}
"""
results = {
"success": 0,
"failed": 0,
"results": []
}
for openid in openids:
try:
result = message_func(openid, *args, **kwargs)
if result.get("errcode") == 0:
results["success"] += 1
else:
results["failed"] += 1
results["results"].append({
"openid": openid,
"error": result.get("errmsg")
})
except Exception as e:
results["failed"] += 1
results["results"].append({
"openid": openid,
"error": str(e)
})
logger.info(f"群发完成: 成功 {results['success']}, 失败 {results['failed']}")
return results
def broadcast_article(self, openids: List[str], article_info: Dict) -> Dict:
"""群发文章
Args:
openids: openid列表
article_info: 文章信息 {
"type": "text" | "link" | "mpnews",
"media_id": 图文media_id (mpnews用),
"title": 标题 (link用),
"content": 内容 (text/link用),
"url": 链接 (link用),
"thumb_url": 封面图 (link用)
}
Returns:
群发结果
"""
msg_type = article_info.get("type", "text")
if msg_type == "text":
return self.send_to_multiple(
openids,
self.send_text,
article_info.get("content", "")
)
elif msg_type == "link":
return self.send_to_multiple(
openids,
self.send_article_link,
article_info.get("title", ""),
article_info.get("content", ""),
article_info.get("url", ""),
article_info.get("thumb_url", "")
)
elif msg_type == "mpnews":
return self.send_to_multiple(
openids,
self.send_article,
article_info.get("media_id", "")
)
else:
raise ValueError(f"不支持的消息类型: {msg_type}")
def preview_article(self, openid: str, article_info: Dict) -> Dict:
"""预览文章(发送给指定用户测试)
Args:
openid: 预览用户openid
article_info: 文章信息(同broadcast_article)
Returns:
发送结果
"""
msg_type = article_info.get("type", "text")
if msg_type == "text":
return self.send_text(openid, article_info.get("content", ""))
elif msg_type == "link":
return self.send_article_link(
openid,
article_info.get("title", ""),
article_info.get("content", ""),
article_info.get("url", ""),
article_info.get("thumb_url", "")
)
elif msg_type == "mpnews":
return self.send_article(openid, article_info.get("media_id", ""))
else:
raise ValueError(f"不支持的消息类型: {msg_type}")
FILE:src/skills/publish_skill.py
"""
微信公众号自动化 - 发布管理 Skill
"""
import logging
from typing import List, Dict, Optional
from .base_skill import BaseSkill
logger = logging.getLogger(__name__)
class PublishSkill(BaseSkill):
"""发布管理"""
def publish_draft(self, media_id: str) -> Dict:
"""发布草稿到公众号"""
# 参数验证
if not media_id or not isinstance(media_id, str):
logger.error("无效的media_id参数")
raise ValueError("media_id不能为空且必须是字符串")
if len(media_id) < 10:
logger.error(f"media_id格式可能无效: {media_id}")
raise ValueError("media_id格式无效")
try:
logger.info(f"发布草稿: media_id={media_id[:20]}...")
result = self.post("/cgi-bin/freepublish/submit", {"media_id": media_id})
logger.info(f"草稿发布成功: {media_id[:20]}...")
return result
except Exception as e:
logger.error(f"发布草稿失败: {str(e)}", exc_info=True)
raise
def get_publish_status(self, publish_id: str) -> Dict:
"""获取发布状态"""
# 参数验证
if not publish_id or not isinstance(publish_id, str):
logger.error("无效的publish_id参数")
raise ValueError("publish_id不能为空且必须是字符串")
try:
logger.info(f"查询发布状态: publish_id={publish_id[:20]}...")
result = self.get("/cgi-bin/freepublish/get", {"publish_id": publish_id})
return result
except Exception as e:
logger.error(f"查询发布状态失败: {str(e)}", exc_info=True)
raise
def delete_published(self, article_id: str) -> Dict:
"""删除已发布文章"""
# 参数验证
if not article_id or not isinstance(article_id, str):
logger.error("无效的article_id参数")
raise ValueError("article_id不能为空且必须是字符串")
try:
logger.info(f"删除已发布文章: article_id={article_id[:20]}...")
result = self.post("/cgi-bin/freepublish/delete", {"article_id": article_id})
logger.info(f"文章删除成功: {article_id[:20]}...")
return result
except Exception as e:
logger.error(f"删除文章失败: {str(e)}", exc_info=True)
raise
def list_published(self, offset: int = 0, count: int = 20) -> Dict:
"""获取已发布文章列表"""
# 参数验证
if not isinstance(offset, int) or offset < 0:
logger.warning(f"无效的offset: {offset}, 使用默认值0")
offset = 0
if not isinstance(count, int) or count < 1 or count > 20:
logger.warning(f"无效的count: {count}, 使用默认值20")
count = 20
try:
logger.info(f"获取已发布列表: offset={offset}, count={count}")
result = self.get("/cgi-bin/freepublish/batchget", {"offset": offset, "count": count})
total = result.get("total_count", 0)
logger.info(f"已发布文章总数: {total}")
return result
except Exception as e:
logger.error(f"获取已发布列表失败: {str(e)}", exc_info=True)
raise
def batch_publish(self, media_ids: List[str]) -> Dict:
"""批量发布草稿"""
# 参数验证
if not media_ids or not isinstance(media_ids, list):
logger.error("无效的media_ids参数")
raise ValueError("media_ids不能为空且必须是列表")
if len(media_ids) > 10:
logger.warning(f"批量发布数量过多({len(media_ids)}),限制为10个")
media_ids = media_ids[:10]
results = []
errors = []
for media_id in media_ids:
try:
result = self.publish_draft(media_id)
results.append({
"media_id": media_id,
"success": True,
"result": result
})
except Exception as e:
errors.append({
"media_id": media_id,
"success": False,
"error": str(e)
})
logger.warning(f"发布失败: {media_id[:20]}... - {str(e)}")
return {
"total": len(media_ids),
"success": len(results),
"failed": len(errors),
"results": results,
"errors": errors
}
FILE:src/skills/template_design.py
"""
微信公众号自动化 - 模板设计 Skill
"""
import yaml
from pathlib import Path
from typing import Dict
from .base_skill import BaseSkill
class TemplateDesignSkill(BaseSkill):
"""模板设计制作"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._themes_dir = Path(__file__).parent.parent.parent / "themes"
self._themes_dir.mkdir(parents=True, exist_ok=True)
def create_template(self, config: Dict) -> Dict:
theme_name = config.get("theme_name", "custom")
template_config = {
"name": config.get("theme_name", "自定义主题"),
"colors": {
"primary": config.get("primary_color", "#007AFF"),
"secondary": config.get("secondary_color", "#5856D6"),
"text": config.get("text_color", "#333333")
},
"body": {
"font_size": "15px",
"line_height": "1.8"
}
}
output_file = self._themes_dir / f"{theme_name}.yaml"
with open(output_file, "w", encoding="utf-8") as f:
yaml.dump(template_config, f, allow_unicode=True, default_flow_style=False)
return {"success": True, "template_id": theme_name, "path": str(output_file)}
def validate_template(self, template_path: str) -> Dict:
try:
with open(template_path) as f:
yaml.safe_load(f)
return {"valid": True, "issues": []}
except Exception as e:
return {"valid": False, "issues": [str(e)]}
FILE:src/skills/template_skill.py
"""
微信公众号自动化 - 模板管理 Skill
"""
import yaml
from pathlib import Path
from typing import List, Dict
from .base_skill import BaseSkill
class TemplateSkill(BaseSkill):
"""模板管理"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._themes_dir = Path(__file__).parent.parent.parent / "themes"
self._themes_dir.mkdir(parents=True, exist_ok=True)
def list_local_templates(self) -> List[Dict]:
templates = []
if self._themes_dir.exists():
for yaml_file in self._themes_dir.glob("*.yaml"):
with open(yaml_file) as f:
config = yaml.safe_load(f) or {}
templates.append({"id": yaml_file.stem, "name": config.get("name", yaml_file.stem), "type": "local"})
return templates if templates else [{"id": "default", "name": "默认主题", "type": "local"}]
def list_wechat_templates(self) -> List[Dict]:
result = self.get("/cgi-bin/template/get_all_private_template")
return [{"id": t.get("template_id"), "title": t.get("title"), "type": "wechat"} for t in result.get("template_list", [])]
def get_template_detail(self, template_id: str, template_type: str = "local") -> Dict:
if template_type == "local":
template_file = self._themes_dir / f"{template_id}.yaml"
if template_file.exists():
with open(template_file) as f:
config = yaml.safe_load(f)
else:
config = {"name": "默认", "colors": {"primary": "#007AFF"}}
return {"id": template_id, "type": "local", "config": config}
else:
result = self.get("/cgi-bin/template/get_template", {"template_id": template_id})
return {"id": template_id, "type": "wechat", "content": result.get("content", "")}
def select_default_template(self) -> Dict:
from ..config import Config
config = Config()
current = config.get_default_template()
local = self.list_local_templates()
wechat = self.list_wechat_templates()
return {"current": current, "local_templates": local, "wechat_templates": wechat}
FILE:src/skills/template_sync.py
"""
微信公众号自动化 - 模板同步 Skill
注意:微信公众平台的"文章模板"没有开放 API,无法下载/上传。
当前只支持本地模板管理。
"""
from typing import Dict
from .base_skill import BaseSkill
class TemplateSyncSkill(BaseSkill):
"""模板同步(本地 ↔ 微信)
注意:微信文章模板没有开放 API,暂不支持下载/上传
"""
def upload_to_wechat(self, template_name: str) -> Dict:
"""上传到微信(暂不支持)"""
return {
"success": False,
"error": "微信文章模板没有开放 API,无法上传"
}
def download_from_wechat(self, media_id: str) -> Dict:
"""从微信下载(暂不支持)"""
return {
"success": False,
"error": "微信文章模板没有开放 API,无法下载"
}
def sync_all(self, direction: str = "both") -> Dict:
return {
"success": False,
"error": "微信文章模板没有开放 API"
}
FILE:src/skills/topic_research.py
"""
微信公众号自动化 - 选题调研 Skill
支持级联搜索:Tavily → DuckDuckGo → 百度
"""
import os
import re
import time
import logging
from typing import List, Dict, Optional
from collections import Counter
from .base_skill import BaseSkill
# 配置日志
logger = logging.getLogger(__name__)
# 搜索Provider注册表(按优先级排序)
SEARCH_PROVIDER_REGISTRY = [
{
"name": "tavily",
"priority": 1,
"requires_key": True,
"env_var": "TAVILY_API_KEY",
"method": "_search_by_tavily"
},
{
"name": "duckduckgo",
"priority": 2,
"requires_key": False,
"env_var": None,
"method": "_search_by_duckduckgo"
},
{
"name": "baidu",
"priority": 3,
"requires_key": False,
"env_var": None,
"method": "_search_by_baidu"
},
]
class TopicResearchSkill(BaseSkill):
"""选题调研"""
def research_topic(self, topic: str, keywords: Optional[List[str]] = None) -> Dict:
"""调研选题"""
# 参数验证
if not topic or not isinstance(topic, str):
logger.error("无效的topic参数: topic不能为空且必须是字符串")
raise ValueError("topic不能为空且必须是字符串")
if keywords is not None:
if not isinstance(keywords, list):
logger.error("无效的keywords参数: keywords必须是列表")
raise ValueError("keywords必须是列表")
if len(keywords) > 20:
logger.warning(f"关键词数量过多({len(keywords)}),将限制为前20个")
keywords = keywords[:20]
try:
search_query = topic
if keywords:
search_query = f"{topic} {' '.join(keywords)}"
logger.info(f"开始调研选题: {search_query}")
# 级联搜索
results = self._cascade_search(search_query, limit=10)
if not results:
logger.warning(f"所有搜索源均失败,使用默认结果")
results = [{"title": f"关于 {topic} 的研究", "url": "https://example.com", "snippet": "暂无搜索结果"}]
logger.info(f"选题调研完成: {topic}, 找到 {len(results)} 条结果")
# 提取相关主题
related = self._extract_related_topics(results)
return {
"topic": topic,
"keywords": keywords or [],
"search_results": results[:10],
"summary": self._generate_summary(topic, results),
"related_topics": related
}
except Exception as e:
logger.error(f"选题调研失败: {str(e)}", exc_info=True)
raise
def _cascade_search(self, query: str, limit: int = 10) -> List[Dict]:
"""
级联搜索:按优先级尝试多个搜索源
任何一个成功即返回,失败则尝试下一个
"""
errors = []
for provider in SEARCH_PROVIDER_REGISTRY:
provider_name = provider["name"]
method_name = provider["method"]
requires_key = provider["requires_key"]
env_var = provider["env_var"]
# 检查API Key(如果需要)
if requires_key:
api_key = os.environ.get(env_var, "") if env_var else ""
if not api_key:
logger.debug(f"[{provider_name}] 跳过:未配置 API Key")
continue
logger.info(f"尝试搜索源: {provider_name}")
try:
# 调用对应的搜索方法
results = getattr(self, method_name)(query, limit)
if results:
logger.info(f"[{provider_name}] 搜索成功: {len(results)} 条结果")
return results
else:
logger.warning(f"[{provider_name}] 返回空结果")
errors.append(f"{provider_name}: 空结果")
continue
except Exception as e:
err_str = str(e)
logger.warning(f"[{provider_name}] 搜索失败: {err_str}")
errors.append(f"{provider_name}: {err_str}")
continue
logger.error(f"所有搜索源均失败: {errors}")
return []
def _search_by_tavily(self, query: str, limit: int = 10) -> List[Dict]:
"""Tavily API 搜索"""
import requests
api_key = os.environ.get("TAVILY_API_KEY", "")
if not api_key:
raise ValueError("Tavily API Key 未配置")
url = "https://api.tavily.com/search"
headers = {"Content-Type": "application/json"}
payload = {
"query": query,
"max_results": limit,
"api_key": api_key
}
resp = requests.post(url, json=payload, headers=headers, timeout=20)
if resp.status_code == 401:
raise ValueError("Tavily API Key 无效或已过期")
elif resp.status_code == 429:
# 限流,短暂等待后重试
logger.warning("[tavily] 请求限流,等待2秒后重试...")
time.sleep(2)
resp = requests.post(url, json=payload, headers=headers, timeout=20)
if resp.status_code != 200:
raise ValueError(f"Tavily API 返回错误: HTTP {resp.status_code}")
data = resp.json()
results = []
for item in data.get("results", []):
results.append({
"title": item.get("title", ""),
"url": item.get("url", ""),
"snippet": item.get("content", "")
})
return results
def _search_by_duckduckgo(self, query: str, limit: int = 10) -> List[Dict]:
"""DuckDuckGo HTML 搜索(无需 API Key)"""
import requests
from urllib.parse import quote
# 使用 HTML 模式的 DuckDuckGo
html_url = f"https://html.duckduckgo.com/html/?q={quote(query)}"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
}
resp = requests.get(html_url, headers=headers, timeout=15)
if resp.status_code != 200:
raise ValueError(f"DuckDuckGo 返回错误: HTTP {resp.status_code}")
# 解析 HTML 结果
# 匹配 <a class="result__a" href="...">标题</a>
pattern = r'<a class="result__a" href="([^"]+)"[^>]*>(.+?)</a>'
matches = re.findall(pattern, resp.text)
results = []
for url, title_html in matches[:limit]:
# 清理HTML标签
title = re.sub(r'<[^>]+>', '', title_html).strip()
if title:
results.append({
"title": title,
"url": url,
"snippet": ""
})
if not results:
# 备用解析:尝试匹配其他格式
alt_pattern = r'<a href="(https?://[^"]+)"[^>]*class="result__snippet"[^>]*>(.+?)</a>'
alt_matches = re.findall(alt_pattern, resp.text)
for url, snippet_html in alt_matches[:limit]:
snippet = re.sub(r'<[^>]+>', '', snippet_html).strip()
results.append({
"title": url.split('/')[-1][:50] or url,
"url": url,
"snippet": snippet
})
return results
def _search_by_baidu(self, query: str, limit: int = 10) -> List[Dict]:
"""百度搜索(国内可用,无需 API Key)"""
import requests
from urllib.parse import quote
# 百度搜索
baidu_url = f"https://www.baidu.com/s?wd={quote(query)}&rn={limit}"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
}
resp = requests.get(baidu_url, headers=headers, timeout=15)
if resp.status_code != 200:
raise ValueError(f"百度搜索返回错误: HTTP {resp.status_code}")
# 解析百度搜索结果
# 匹配 <h3 class="c-title"> 和 <a class="c-title">
results = []
# 方式1: 匹配标题和链接
title_pattern = r'<a[^>]+class="[^"]*c-title[^"]*"[^>]+href="([^"]+)"[^>]*>(.+?)</a>'
title_matches = re.findall(title_pattern, resp.text, re.DOTALL)
for url, title_html in title_matches[:limit]:
title = re.sub(r'<[^>]+>', '', title_html).strip()
if title and url.startswith('http'):
results.append({
"title": title,
"url": url,
"snippet": ""
})
# 方式2: 备用解析
if not results:
# 匹配 <h3 class="c-title"><a href="...">标题</a></h3>
h3_pattern = r'<h3[^>]*class="[^"]*c-title[^"]*"[^>]*>.*?<a[^>]+href="([^"]+)"[^>]*>(.+?)</a>'
h3_matches = re.findall(h3_pattern, resp.text, re.DOTALL)
for url, title_html in h3_matches[:limit]:
title = re.sub(r'<[^>]+>', '', title_html).strip()
if title:
results.append({
"title": title,
"url": url,
"snippet": ""
})
return results
def _extract_related_topics(self, search_results: List[Dict]) -> List[str]:
"""从搜索结果中提取相关主题"""
all_words = []
for result in search_results:
title = result.get("title", "")
# 提取中文词(2字以上)
chinese = re.findall(r'[\u4e00-\u9fff]{2,}', title)
# 提取英文词(3字母以上)
english = re.findall(r'[a-zA-Z]{3,}', title)
all_words.extend(chinese)
all_words.extend(english)
# 统计词频
word_count = Counter(all_words)
# 返回最常见的主题词
return [word for word, _ in word_count.most_common(5)]
def _generate_summary(self, topic: str, search_results: List[Dict]) -> str:
"""生成摘要"""
if not search_results:
return f"暂无关于 {topic} 的搜索结果"
snippet = search_results[0].get("snippet", "")[:200]
if snippet:
return f"最新动态:{snippet}..."
return f"关于 {topic},已找到 {len(search_results)} 条相关资料"
def generate_outline(self, topic: str, research_data: Dict) -> Dict:
"""生成大纲"""
# 参数验证
if not topic or not isinstance(topic, str):
logger.error("无效的topic参数: topic不能为空且必须是字符串")
raise ValueError("topic不能为空且必须是字符串")
if research_data is not None and not isinstance(research_data, dict):
logger.error("无效的research_data参数: research_data必须是字典")
raise ValueError("research_data必须是字典")
try:
logger.info(f"为选题生成大纲: {topic}")
# 如果有搜索结果,利用摘要生成更丰富的大纲
summary = research_data.get("summary", "") if research_data else ""
search_results = research_data.get("search_results", []) if research_data else []
# 根据是否有内容决定大纲详细程度
if summary and "暂无" not in summary:
# 有真实搜索结果,生成更详细的大纲
result = {
"title": f"深度解析:{topic}",
"sections": [
{
"name": "引言",
"description": f"{topic}的背景与重要性",
"key_points": ["背景介绍", "发展历程", "当前趋势"]
},
{
"name": "核心内容",
"description": f"{topic}的关键要素",
"key_points": ["要点1", "要点2", "要点3"]
},
{
"name": "实践方法",
"description": f"如何应用{topic}",
"key_points": ["方法步骤", "注意事项", "常见问题"]
},
{
"name": "结论",
"description": "总结与建议",
"key_points": ["核心总结", "未来展望", "行动建议"]
}
],
"estimated_words": 3000
}
else:
# 无搜索结果,使用简化大纲
result = {
"title": f"深度解析:{topic}",
"sections": [
{"name": "引言", "description": "背景介绍", "key_points": ["背景", "重要性"]},
{"name": "核心内容", "description": "关键要素", "key_points": ["要点1", "要点2"]},
{"name": "结论", "description": "总结", "key_points": ["结论", "建议"]}
],
"estimated_words": 2000
}
logger.info(f"大纲生成完成: {topic}")
return result
except Exception as e:
logger.error(f"大纲生成失败: {str(e)}", exc_info=True)
raise
FILE:src/skills/user_skill.py
"""
微信公众号自动化 - 用户管理 Skill
获取标签列表、粉丝列表、发送消息
"""
import json
import logging
from typing import List, Dict, Optional, Any
from pathlib import Path
from .base_skill import BaseSkill
logger = logging.getLogger(__name__)
class UserSkill(BaseSkill):
"""用户管理 - 标签和粉丝管理"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._user_cache_file = Path.home() / ".cache" / "wechat-mp-auto" / "user_cache.json"
self._user_cache_file.parent.mkdir(parents=True, exist_ok=True)
def _load_user_cache(self) -> Dict:
"""加载用户缓存"""
if not self._user_cache_file.exists():
return {"users": {}, "tags": {}}
try:
with open(self._user_cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"加载用户缓存失败: {e}")
return {"users": {}, "tags": {}}
def _save_user_cache(self, cache: Dict):
"""保存用户缓存"""
try:
with open(self._user_cache_file, 'w', encoding='utf-8') as f:
json.dump(cache, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.warning(f"保存用户缓存失败: {e}")
def add_user(self, openid: str, user_info: Dict) -> bool:
"""添加或更新用户信息
Args:
openid: 用户openid
user_info: 用户信息 {nickname, remark, tagid_list, etc.}
Returns:
是否成功
"""
try:
cache = self._load_user_cache()
cache["users"][openid] = {
**user_info,
"added_at": self._get_timestamp()
}
self._save_user_cache(cache)
logger.info(f"添加用户成功: {openid}")
return True
except Exception as e:
logger.error(f"添加用户失败: {e}")
return False
def get_user(self, openid: str) -> Optional[Dict]:
"""获取用户信息"""
cache = self._load_user_cache()
return cache.get("users", {}).get(openid)
def list_users(self) -> List[Dict]:
"""列出所有已缓存的用户"""
cache = self._load_user_cache()
return list(cache.get("users", {}).values())
def get_tags(self) -> List[Dict]:
"""获取公众号标签列表
Returns:
[{"id": 1, "name": "标签名"}, ...]
"""
try:
result = self._request("GET", "/cgi-bin/tags/get", params={})
tags = result.get("tags", [])
logger.info(f"获取标签列表成功: {len(tags)} 个标签")
# 缓存标签信息
cache = self._load_user_cache()
cache["tags"] = {str(t.get("id")): t for t in tags}
self._save_user_cache(cache)
return tags
except Exception as e:
logger.error(f"获取标签列表失败: {e}")
raise
def get_tag_fans(self, tag_id: int, next_openid: str = "") -> Dict:
"""获取指定标签下的粉丝列表
Args:
tag_id: 标签ID
next_openid: 第一个拉取的openid,不填默认从头开始
Returns:
{
"count": 粉丝数量,
"data": {"openid": [openid列表]},
"next_openid": 下次拉取的起始openid
}
"""
try:
result = self._request(
"GET",
"/cgi-bin/user/tag/get",
params={
"tagid": tag_id,
"next_openid": next_openid
}
)
count = result.get("count", 0)
data = result.get("data", {})
openids = data.get("openid", [])
logger.info(f"获取标签 {tag_id} 粉丝成功: {count} 人")
return {
"count": count,
"data": data,
"next_openid": result.get("next_openid", ""),
"tag_id": tag_id
}
except Exception as e:
logger.error(f"获取标签粉丝列表失败: {e}")
raise
def get_all_tag_fans(self, tag_id: int) -> List[str]:
"""获取指定标签下的所有粉丝openid
Args:
tag_id: 标签ID
Returns:
openid列表
"""
all_openids = []
next_openid = ""
try:
while True:
result = self.get_tag_fans(tag_id, next_openid)
openids = result.get("data", {}).get("openid", [])
all_openids.extend(openids)
next_openid = result.get("next_openid", "")
if not next_openid or next_openid == "0":
break
logger.info(f"获取全部粉丝完成: 共 {len(all_openids)} 人")
return all_openids
except Exception as e:
logger.error(f"获取全部粉丝失败: {e}")
return all_openids
def get_user_info(self, openid: str, lang: str = "zh_CN") -> Optional[Dict]:
"""获取用户基本信息
Args:
openid: 用户openid
lang: 返回语言版本,zh_CN 简体,zh_TW 繁体,en 英语
Returns:
用户信息字典
"""
try:
result = self._request(
"GET",
"/cgi-bin/user/info",
params={
"openid": openid,
"lang": lang
}
)
# 缓存用户信息
self.add_user(openid, result)
logger.info(f"获取用户信息成功: {openid}")
return result
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return None
def batch_get_user_info(self, openids: List[str], lang: str = "zh_CN") -> List[Dict]:
"""批量获取用户基本信息
Args:
openids: openid列表(最多100个)
lang: 返回语言版本
Returns:
用户信息列表
"""
if not openids:
return []
# 每次最多100个
results = []
for i in range(0, len(openids), 100):
batch = openids[i:i+100]
try:
result = self._request(
"POST",
"/cgi-bin/user/info/batchget",
data={
"user_list": [{"openid": oid, "lang": lang} for oid in batch]
}
)
user_list = result.get("user_info_list", [])
results.extend(user_list)
# 缓存用户信息
for user in user_list:
oid = user.get("openid")
if oid:
self.add_user(oid, user)
logger.info(f"批量获取用户信息: {len(batch)} 人")
except Exception as e:
logger.error(f"批量获取用户信息失败: {e}")
continue
return results
def get_fans_summary(self, tag_id: Optional[int] = None) -> Dict:
"""获取粉丝摘要信息
Args:
tag_id: 标签ID,如果为None则获取全部粉丝
Returns:
{
"total": 总粉丝数,
"tags": {tag_id: count},
"users": 用户列表
}
"""
summary = {
"total": 0,
"tags": {},
"users": []
}
try:
# 如果指定了标签
if tag_id:
openids = self.get_all_tag_fans(tag_id)
summary["total"] = len(openids)
summary["tags"][str(tag_id)] = len(openids)
# 批量获取用户信息
users = self.batch_get_user_info(openids[:50]) # 最多获取50人详细信息
summary["users"] = users
else:
# 获取所有标签
tags = self.get_tags()
for tag in tags:
tid = tag.get("id")
tname = tag.get("name", "")
count = tag.get("count", 0)
summary["tags"][str(tid)] = {
"name": tname,
"count": count
}
summary["total"] += count
# 获取第一个标签的粉丝作为示例
if tags:
first_tag = tags[0]
tid = first_tag.get("id")
openids = self.get_all_tag_fans(tid)
users = self.batch_get_user_info(openids[:20])
summary["users"] = users
return summary
except Exception as e:
logger.error(f"获取粉丝摘要失败: {e}")
return summary
def _get_timestamp(self) -> int:
"""获取当前时间戳"""
import time
return int(time.time())
def clear_user_cache(self) -> bool:
"""清除用户缓存"""
try:
if self._user_cache_file.exists():
self._user_cache_file.unlink()
logger.info("用户缓存已清除")
return True
except Exception as e:
logger.error(f"清除缓存失败: {e}")
return False
FILE:src/token_manager.py
"""
微信公众号自动化 - Token 管理器
"""
import time
import json
import logging
from pathlib import Path
from typing import Optional
# 配置日志
logger = logging.getLogger(__name__)
class TokenManager:
"""Access Token 管理器"""
TOKEN_CACHE_FILE = Path.home() / ".cache" / "wechat-mp-auto" / "token.json"
def __init__(self, app_id: str, app_secret: str):
# 参数验证
if not app_id or not isinstance(app_id, str):
logger.error("无效的app_id参数")
raise ValueError("app_id不能为空且必须是字符串")
if not app_secret or not isinstance(app_secret, str):
logger.error("无效的app_secret参数")
raise ValueError("app_secret不能为空且必须是字符串")
self.app_id = app_id
self.app_secret = app_secret
self._access_token: Optional[str] = None
self._expires_at: int = 0
# 安全检查:确保app_id格式合理(微信app_id通常有特定格式)
if len(app_id) < 10:
logger.warning(f"app_id长度可能不正确: {app_id}")
if len(app_secret) < 10:
logger.warning(f"app_secret长度可能不正确")
self._load_from_cache()
logger.info(f"TokenManager初始化完成: app_id={app_id[:10]}...")
def _load_from_cache(self):
"""从文件缓存加载"""
if not self.TOKEN_CACHE_FILE.exists():
logger.debug("Token缓存文件不存在")
return
try:
with open(self.TOKEN_CACHE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# 安全验证:检查app_id匹配
if data.get("app_id") != self.app_id:
logger.warning("缓存的app_id与当前不匹配,清除缓存")
return
self._access_token = data.get("access_token")
self._expires_at = data.get("expires_at", 0)
if self._access_token and not self._is_expired():
logger.debug("从缓存加载有效token")
else:
logger.debug("缓存token已过期或无效")
except json.JSONDecodeError as e:
logger.warning(f"Token缓存文件JSON解析失败: {str(e)}")
except PermissionError as e:
logger.warning(f"读取Token缓存文件权限不足: {str(e)}")
except Exception as e:
logger.warning(f"加载Token缓存失败: {str(e)}")
def _save_to_cache(self):
"""保存到文件"""
try:
# 安全检查:确保目录存在且路径安全
self.TOKEN_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
# 安全检查:确保文件路径安全(在预期目录内)
resolved_path = self.TOKEN_CACHE_FILE.resolve()
expected_dir = (Path.home() / ".cache" / "wechat-mp-auto").resolve()
if not str(resolved_path).startswith(str(expected_dir)):
logger.error("不安全的缓存文件路径")
return
data = {
"app_id": self.app_id,
"access_token": self._access_token,
"expires_at": self._expires_at
}
with open(self.TOKEN_CACHE_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f)
logger.debug("Token已保存到缓存")
except PermissionError as e:
logger.error(f"写入Token缓存文件权限不足: {str(e)}")
except Exception as e:
logger.error(f"保存Token缓存失败: {str(e)}")
def _is_expired(self) -> bool:
"""检查token是否过期(提前5分钟认为过期)"""
# 安全检查:确保时间戳有效
if self._expires_at <= 0:
return True
is_expired = time.time() >= (self._expires_at - 300)
if is_expired:
logger.debug("Token已过期")
return is_expired
def get_access_token(self, force_refresh: bool = False) -> str:
"""获取 access_token"""
# 参数验证
if not isinstance(force_refresh, bool):
logger.warning(f"无效的force_refresh参数: {force_refresh}, 使用默认值False")
force_refresh = False
try:
if force_refresh:
logger.info("强制刷新Token")
self._access_token = None
# 检查现有token是否有效
if self._access_token and not self._is_expired():
logger.debug("使用缓存的有效Token")
return self._access_token
# 请求新的token
logger.info("请求新的Access Token")
try:
import requests
except ImportError:
logger.error("requests库未安装")
raise Exception("缺少requests库")
# 安全:URL参数不记录敏感信息
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret=***"
response = requests.get(
f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.app_id}&secret={self.app_secret}",
timeout=10
).json()
if "access_token" not in response:
from exceptions import APIError, get_error_message
errcode = response.get("errcode", -1)
errmsg = response.get("errmsg", get_error_message(errcode))
logger.error(f"获取Token失败: errcode={errcode}, errmsg={errmsg}")
raise APIError(errcode, errmsg)
self._access_token = response["access_token"]
self._expires_at = int(time.time()) + response.get("expires_in", 7200)
self._save_to_cache()
logger.info(f"Access Token获取成功,有效期至: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self._expires_at))}")
return self._access_token
except ImportError:
raise
except Exception as e:
logger.error(f"获取Access Token失败: {str(e)}", exc_info=True)
raise
@classmethod
def from_config(cls, config) -> "TokenManager":
"""从配置创建"""
# 参数验证
if config is None:
logger.error("config参数不能为空")
raise ValueError("config不能为空")
try:
app_id, app_secret = config.get_credentials()
logger.info("从配置创建TokenManager")
return cls(app_id, app_secret)
except Exception as e:
logger.error(f"从配置创建TokenManager失败: {str(e)}")
raise
FILE:src/utils/__init__.py
"""
Utils 模块初始化
"""
from .logger import Logger
from .validators import Validators
__all__ = ["Logger", "Validators"]
FILE:src/utils/logger.py
"""
微信公众号自动化 - 日志工具
"""
import logging
import sys
from pathlib import Path
from datetime import datetime
class Logger:
def __init__(self, name: str = "wechat-mp-auto"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
if not self.logger.handlers:
console = logging.StreamHandler(sys.stdout)
console.setLevel(logging.INFO)
log_dir = Path.home() / ".cache" / "wechat-mp-auto" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"wechat-mp-auto_{datetime.now().strftime('%Y%m%d')}.log"
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console.setFormatter(formatter)
file_handler.setFormatter(formatter)
self.logger.addHandler(console)
self.logger.addHandler(file_handler)
def _mask(self, msg: str) -> str:
import re
msg = re.sub(r'(appid|secret|token)["\s:=]+([a-zA-Z0-9]{4})[a-zA-Z0-9]*', r'\1: \2****', msg, flags=re.IGNORECASE)
return msg
def info(self, msg: str):
self.logger.info(self._mask(msg))
def error(self, msg: str):
self.logger.error(self._mask(msg))
logger = Logger()
FILE:src/utils/validators.py
"""
微信公众号自动化 - 参数验证工具
"""
import re
from pathlib import Path
class Validators:
@staticmethod
def validate_app_id(app_id: str) -> bool:
return bool(re.match(r'^wx[a-zA-Z0-9]{16}$', app_id)) if app_id else False
@staticmethod
def validate_app_secret(secret: str) -> bool:
return bool(re.match(r'^[a-zA-Z0-9]{32}$', secret)) if secret else False
@staticmethod
def validate_date(date_str: str) -> bool:
return bool(re.match(r'^\d{4}-\d{2}-\d{2}$', date_str))
@staticmethod
def validate_image_path(path: str) -> dict:
file_path = Path(path)
if not file_path.exists():
return {"valid": False, "error": "文件不存在"}
allowed = {".jpg", ".jpeg", ".png", ".gif"}
if file_path.suffix.lower() not in allowed:
return {"valid": False, "error": f"不支持格式"}
return {"valid": True}
FILE:themes/cuiyu.yaml
name: 翠玉 cuiyu
colors:
primary: "#1EA089"
secondary: "#FDB22F"
text: "#777777"
background: "#FFFFFF"
code_bg: "#282c34"
body:
font_size: "16px"
line_height: "1.8"
letter_spacing: "0.5px"
h1:
font_size: "24px"
text_align: "center"
margin: "20px 0"
color: "#1A1A1A"
h2:
font_size: "22px"
font_weight: "bold"
color: "white"
background_color: "#1EA089"
border_left: "10px solid #FDB22F"
padding: "3px 10px"
margin: "24px 0 16px"
h3:
font_size: "18px"
font_weight: "bold"
color: "#1EA089"
border_bottom: "2px solid #FDB22F"
padding: "6px"
margin: "20px 0 12px"
p:
margin: "14px 0"
color: "#777777"
link:
color: "#1E6BB8"
text_decoration: "none"
blockquote:
color: "#777777"
background_color: "#f7f7f7"
border_left: "4px solid #1EA089"
padding: "12px 16px"
margin: "16px 0"
ul:
margin: "12px 0"
padding_left: "24px"
bullet_color: "#1EA089"
ol:
margin: "12px 0"
padding_left: "24px"
number_color: "#1EA089"
strong:
color: "#1EA089"
font_weight: "bold"
em:
color: "#FDB22F"
font_style: "normal"
FILE:themes/default.yaml
name: 默认主题
colors:
primary: "#007AFF"
secondary: "#5856D6"
text: "#333333"
background: "#f0f4ff"
body:
font_size: "15px"
line_height: "1.8"
h1:
font_size: "22px"
text_align: "center"
margin: "20px 0"
h2:
font_size: "18px"
margin: "20px 0 12px"
h3:
font_size: "15px"
margin: "16px 0 10px"
color: "#333333"
p:
margin: "14px 0"
link:
color: "#007AFF"
text_decoration: "none"
blockquote:
color: "#666666"
background_color: "#f0f5ff"
padding: "10px 16px"
margin: "16px 0"
ul:
margin: "12px 0"
padding_left: "20px"
bullet_color: "#007AFF"
ol:
margin: "12px 0"
padding_left: "20px"
number_color: "#007AFF"
FILE:themes/houge.yaml
name: 猴哥风格
colors:
primary: "#ff6b00"
secondary: "#6b5b8a"
text: "#333333"
background: "#fafafa"
body:
font_size: "16px"
line_height: "1.8"
h1:
font_size: "22px"
text_align: "center"
margin: "20px 0"
h2:
font_size: "18px"
margin: "20px 0 12px"
h3:
font_size: "15px"
margin: "16px 0 10px"
color: "#333333"
p:
margin: "14px 0"
link:
color: "#ff6b00"
text_decoration: "none"
blockquote:
color: "#666666"
background_color: "#fff8f0"
padding: "10px 16px"
margin: "16px 0"
ul:
margin: "12px 0"
padding_left: "20px"
bullet_color: "#ff6b00"
ol:
margin: "12px 0"
padding_left: "20px"
number_color: "#ff6b00"
FILE:themes/macaron.yaml
name: 马卡龙
colors:
primary: "#FF6B9D"
secondary: "#C9A0DC"
text: "#4A4A4A"
background: "#fdf2f8"
body:
font_size: "15px"
line_height: "1.8"
h1:
font_size: "22px"
text_align: "center"
margin: "20px 0"
h2:
font_size: "18px"
margin: "20px 0 12px"
h3:
font_size: "15px"
margin: "16px 0 10px"
color: "#4A4A4A"
p:
margin: "14px 0"
link:
color: "#FF6B9D"
text_decoration: "none"
blockquote:
color: "#666666"
background_color: "#fff0f5"
padding: "10px 16px"
margin: "16px 0"
ul:
margin: "12px 0"
padding_left: "20px"
bullet_color: "#FF6B9D"
ol:
margin: "12px 0"
padding_left: "20px"
number_color: "#FF6B9D"
FILE:themes/shuimo.yaml
name: 水墨
colors:
primary: "#2C3E50"
secondary: "#34495E"
text: "#333333"
background: "#f5f5f0"
body:
font_size: "16px"
line_height: "2"
h1:
font_size: "22px"
text_align: "center"
margin: "20px 0"
h2:
font_size: "18px"
margin: "20px 0 12px"
h3:
font_size: "15px"
margin: "16px 0 10px"
color: "#333333"
p:
margin: "14px 0"
link:
color: "#2C3E50"
text_decoration: "none"
blockquote:
color: "#666666"
background_color: "#f0f0eb"
padding: "10px 16px"
margin: "16px 0"
ul:
margin: "12px 0"
padding_left: "20px"
bullet_color: "#2C3E50"
ol:
margin: "12px 0"
padding_left: "20px"
number_color: "#2C3E50"
FILE:themes/wenyan.yaml
name: 文雁
colors:
primary: "#0066FF"
secondary: "#00D4AA"
text: "#1A1A2E"
background: "#fff8f0"
body:
font_size: "15px"
line_height: "1.75"
h1:
font_size: "22px"
text_align: "center"
margin: "20px 0"
h2:
font_size: "18px"
margin: "20px 0 12px"
h3:
font_size: "15px"
margin: "16px 0 10px"
color: "#1A1A2E"
p:
margin: "14px 0"
link:
color: "#0066FF"
text_decoration: "none"
blockquote:
color: "#666666"
background_color: "#fff5eb"
padding: "10px 16px"
margin: "16px 0"
ul:
margin: "12px 0"
padding_left: "20px"
bullet_color: "#0066FF"
ol:
margin: "12px 0"
padding_left: "20px"
number_color: "#0066FF"