@clawhub-lianghaoxun-95f66d74e8
通过 NVIDIA NIM API 或 SiliconFlow API 生成图片。支持 Kolors (快手可图)、Qwen-Image (通义千问)、flux.2-klein-4b 等模型。当用户要求"生成图片"、"画一张图"、"AI绘图"或类似表达时调用。支持中文提示词,返回图片文件路径。
---
name: ai-photo-pro
version: 2.0.0
description: 通过 NVIDIA NIM API 或 SiliconFlow API 生成图片。支持 Kolors (快手可图)、Qwen-Image (通义千问)、flux.2-klein-4b 等模型。当用户要求"生成图片"、"画一张图"、"AI绘图"或类似表达时调用。支持中文提示词,返回图片文件路径。
---
# AiPhotoPro - AI 图片生成工具
支持双引擎:**NVIDIA NIM API**(flux.2-klein-4b)和 **SiliconFlow API**(Kolors / Qwen-Image)。
## 调用方式
### 命令行(推荐)
```bash
# SiliconFlow - 可图 Kolors(默认)
python /home/ubuntu/.openclaw/skills/ai-photo-pro/scripts/siliconflow_main.py "<提示词>" ["<负面提示词>"]
# SiliconFlow - 通义千问 Qwen-Image(付费模型,建议按需选取)
python /home/ubuntu/.openclaw/skills/ai-photo-pro/scripts/siliconflow_main.py "<提示词>" ["<负面提示词>"] --model Qwen/Qwen-Image
# NVIDIA NIM API - flux.2-klein-4b
python /home/ubuntu/.openclaw/skills/ai-photo-pro/scripts/nvid_main.py "<提示词>"
```
### Python 导入
```python
import sys
sys.path.insert(0, '/home/ubuntu/.openclaw/skills/ai-photo-pro/scripts')
# SiliconFlow
from siliconflow_main import generate_png
img_list = generate_png(model="Kwai-Kolors/Kolors", base_str="<提示词>", negative_prompt="<负面提示词>")
# NVIDIA
from nvid_main import run_pngvidapi
img_path = run_pngvidapi(model="flux.2-klein-4b", base_str="<提示词>")
```
## 参数说明
### SiliconFlow `generate_png()`
| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `model` | string | ❌ | 模型名,默认 `Kwai-Kolors/Kolors`,可选 `Qwen/Qwen-Image`(付费模型,建议按需选取) |
| `base_str` | string | ✅ | 中文提示词 |
| `negative_prompt` | string | ❌ | 负面提示词,可空 |
| `batch_size` | int | ❌ | 批量大小,默认 1 |
| `num_inference_steps` | int | ❌ | 推理步骤数,默认 20 |
| `guidance_scale` | float | ❌ | 提示词匹配度,默认 2.5 |
### NVIDIA `run_pngvidapi()`
| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `model` | string | ✅ | 固定填 `flux.2-klein-4b` |
| `base_str` | string | ✅ | 中文提示词 |
## 输出
- 图片保存路径:`/home/ubuntu/.openclaw/skills/ai-photo-pro/scripts/img_data/<model>_<timestamp>.png`
- 函数返回值为图片路径列表
## API Key 配置
首次使用需配置 API Key,运行交互式配置脚本:
```bash
python /home/ubuntu/.openclaw/skills/ai-photo-pro/scripts/config_json.py
```
或手动写入 `config.json`(位于 `scripts/` 目录):
```json
{
"NVID": "nvapi-你的NVID密钥",
"SILICONFLOW": "sk-你的SiliconFlow密钥"
}
```
### 获取 Key
- **NVID API Key**: https://nim.nvidia.com/ 注册获取
- **SiliconFlow API Key**: https://cloud.siliconflow.cn/i/IOo0eaWy 注册获取
## 示例提示词
**人物:**
```
一位美丽的短发东亚女性坐在高层公寓的落地窗前,身穿紧身的白色衬衫,(光线是午后柔和的定向自然光,在人物身上形成优美的明暗轮廓),脸上带着温暖而亲密的微笑,皮肤毛孔清晰,虹膜清晰锐利
```
**物体/场景:**
```
一个小苹果,红彤彤的,挂在绿叶树枝上,阳光照射,背景是模糊的果园,摄影风格,高清细节
```
**风格化:**
```
赛博朋克城市夜景,霓虹灯光,雨后街道,反射,高对比度,电影感
```
## 注意事项
- SiliconFlow 默认尺寸 1024×1024,steps=20
- NVIDIA 默认尺寸 1024×1024,steps=4(更快)
- 生成失败 SiliconFlow 会抛出异常;NVIDIA 会自动重试最多 5 次
- 图片路径通过函数返回值传递,方便 agent 捕获并发送
FILE:scripts/config_json.py
import json
import os
# 程序启动地址
ORGPATH = os.path.dirname(os.path.abspath(__file__))
os.chdir(ORGPATH)
config_file = "config.json"
a = input(
"您需要配置API Key才能使用本工具。\n\n"
"1. 配置NVID API Key\n"
"2. 配置硅基流动API Key\n"
"3. 退出\n"
"请输入您的选择: 选择指定数字,其他输入退出程序"
)
config_data = {}
if os.path.exists(config_file):
with open(config_file, "r") as f:
config_data = json.load(f)
if a == "1":
TOKEN_KEY = input("请输入您的NVID API Key: ")
config_data["NVID"] = TOKEN_KEY
with open(config_file, "w") as f:
json.dump(config_data, f, indent=2)
print("配置文件已创建")
elif a == "2":
TOKEN_KEY = input("请输入您的硅基流动API Key: ")
config_data["SILICONFLOW"] = TOKEN_KEY
with open(config_file, "w") as f:
json.dump(config_data, f, indent=2)
print("配置文件已创建")
else:
exit(1)
FILE:scripts/nvid_main.py
import requests
import base64
from io import BytesIO
from PIL import Image
import pandas as pd
import requests
import json
import os
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
# 程序启动地址
ORGPATH = os.path.dirname(os.path.abspath(__file__))
os.chdir(ORGPATH)
config_file = "config.json"
count_api = 0
if not os.path.exists(config_file):
raise FileNotFoundError("配置文件 config.json 不存在,请运行 config_json.py 创建配置文件")
else:
try:
with open(config_file, "r") as f:
config = json.load(f)
TOKEN_KEY = config["NVID"]
except:
raise ValueError("配置文件 config.json 至少应该存在NVID API Key配置,请运行 config_json.py 创建配置文件")
@retry(
stop=stop_after_attempt(5), # 重试3次后停止
wait=wait_fixed(2), # 每次重试间隔60秒
retry=retry_if_exception_type((requests.exceptions.HTTPError, requests.exceptions.RequestException)) # 仅针对特定异常重试
)
def run_pngvidapi(model,base_str=None):
invoke_url = f"https://ai.api.nvidia.com/v1/genai/black-forest-labs/{model}"
headers = {
"Authorization": f"Bearer {TOKEN_KEY}",
"Accept": "application/json",
}
payload = {
"prompt": base_str,
"width": 1024,
"height": 1024,
# "seed": 0,
"steps": 4
}
try:
response = requests.post(invoke_url, headers=headers, json=payload)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
print(f"进行尝试重启中...HTTP错误: {e}")
raise requests.exceptions.HTTPError(e)
response_body = response.json()
try:
# 解码 base64 数据
image_data = base64.b64decode(response_body['artifacts'][0]['base64'])
# 从字节数据创建图片
image = Image.open(BytesIO(image_data))
# 保存为 PNG
import time
timestamp = int(time.time())
image.save(f"{ORGPATH}/img_data/{model.replace('/', '_')}_{timestamp}.png", "PNG")
print(f"图片已保存为 {ORGPATH}/img_data/{model.replace('/', '_')}_{timestamp}.png")
return f"{ORGPATH}/img_data/{model.replace('/', '_')}_{timestamp}.png"
except Exception as e:
# 如果以上字段都不对,先打印响应结构看看
print("API响应结构:", response_body.keys())
# 或者保存整个响应供调试
with open(f"{ORGPATH}/response.json", "w") as f:
import json
json.dump(response_body, f, indent=2)
print("图片生成失败,已将响应保存为 response.json,请检查其中的图片数据字段")
print(f"进行尝试重启中...HTTP错误: {e}")
raise requests.exceptions.HTTPError(e)
def main():
"""命令行入口:通过 sys.argv[1] 传入提示词"""
import sys
if len(sys.argv) < 2:
print("用法: python nvid_main.py <提示词>")
sys.exit(1)
prompt = sys.argv[1]
img = run_pngvidapi(model="flux.2-klein-4b", base_str=prompt)
print(img)
return img
if __name__ == "__main__":
main()
FILE:scripts/siliconflow_main.py
import sys
import requests
import os
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import numpy as np
import time
import json
ORGPATH = os.path.dirname(os.path.abspath(__file__))
os.chdir(ORGPATH)
config_file = "config.json"
count_api = 0
if not os.path.exists(config_file):
raise FileNotFoundError("配置文件 config.json 不存在,请运行 config_json.py 创建配置文件")
else:
try:
with open(config_file, "r") as f:
config = json.load(f)
TOKEN_KEY = config["SILICONFLOW"]
except:
raise ValueError("配置文件 config.json 至少应该存在SiliconFlow API Key配置,请运行 config_json.py 创建配置文件")
def post_gui(payload,key):
url = "https://api.siliconflow.cn/v1/images/generations"
try:
headers = {
"Authorization": key,
"Content-Type": "application/json"
}
response = requests.request("POST", url, json=payload, headers=headers)
return response.json()
except Exception as e:
raise ValueError(f"请求失败:{e} {response}")
def nan_check(value):
if value==np.nan or str(value)=="" or str(value)=="None"or value=="" or value is None:
return True
else:
return False
def generate_png(model="Kwai-Kolors/Kolors",batch_size=1,num_inference_steps=20,guidance_scale=2.5,base_str=None,negative_prompt=None):
"""
生成图片
:param model: 模型名称,可选(Kwai-Kolors/Kolors、Qwen/Qwen-Image),默认值为Kwai-Kolors/Kolors
:param batch_size: 批量大小,推荐值为1
:param num_inference_steps: 推理步骤数,推荐值为20
:param guidance_scale: 提示词之间的匹配度,推荐值为2.5
:param base_str: 提示词,不能为空
:param negative_prompt: 负提示词,可以为空
:return: 图片列表
"""
if nan_check(base_str):
raise ValueError("提示词不能为空")
payload = {
"model": model,
"prompt": base_str,
"image_size": "1024x1024",
"batch_size": batch_size,
"num_inference_steps": num_inference_steps, #推理步骤数
"guidance_scale": guidance_scale, #提示词之间的匹配度
}
if not nan_check(negative_prompt):
payload["negative_prompt"] = negative_prompt
try:
response = post_gui(payload,key=f"Bearer {TOKEN_KEY}")
img_list = []
for i in range(len(response["images"])):
image_url = response["images"][i]["url"]
# 下载图片
response_png = requests.get(image_url)
# 根据时间编码命名图片
timestamp = int(time.time())
with open(f"{ORGPATH}/img_data/siliconflow_{model.replace('/', '_')}_{timestamp}.png", "wb") as f:
f.write(response_png.content)
img_list.append(f"{ORGPATH}/img_data/siliconflow_{model.replace('/', '_')}_{timestamp}.png")
print(f"图片已保存在: {img_list}")
return img_list
except Exception as e:
print(f"生成图片失败:{e} 重试中...",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
raise ValueError(f"生成图片失败:{e} {response}")
def main():
"""
命令行入口,供其他 agent 调用
用法: python siliconflow_main.py "<提示词>" ["<负面提示词>"] [--model <模型名>]
负面提示词可省略
模型可选: Kwai-Kolors/Kolors (默认), Qwen/Qwen-Image
"""
if len(sys.argv) < 2:
print("用法: python siliconflow_main.py <提示词> [负面提示词] [--model <模型名>]")
sys.exit(1)
model = "Kwai-Kolors/Kolors"
base_str = None
negative_prompt = None
i = 1
while i < len(sys.argv):
if sys.argv[i] == "--model" and i + 1 < len(sys.argv):
model = sys.argv[i + 1]
i += 2
elif base_str is None:
base_str = sys.argv[i]
i += 1
else:
negative_prompt = sys.argv[i]
i += 1
if base_str is None:
print("用法: python siliconflow_main.py <提示词> [负面提示词] [--model <模型名>]")
sys.exit(1)
return generate_png(model=model, base_str=base_str, negative_prompt=negative_prompt)
if __name__ == "__main__":
main()This skill provides integrated WeChat account management, message delivery, and file transmission capabilities. It supports querying current user account inf...
---
name: wechat-account-send
version: 4.0.0
description: This skill provides integrated WeChat account management, message delivery, and file transmission capabilities. It supports querying current user account information and actively sending text messages or various file types (including images, videos, audio, and documents) to specified WeChat contacts or groups when invoked through OpenClaw. The functionality enables complete operation execution for user-driven communication and sharing tasks.
---
# 微信账号消息发送 Skill
使用 OpenClaw 的微信集成功能,实现微信账号信息查询、会话管理、主动文本消息和文件发送功能。
调用时机:当用户需要查询当前微信账户信息,或主动向指定微信联系人/群组发送消息时,通过微信发送文件即可触发。
## 功能说明
本技能提供以下核心功能:
1. **微信会话查询**:使用 sessions_list 工具查询当前活跃的微信会话和对话信息。
2. **主动文本消息发送**:通过微信后端API向指定用户发送文本消息。
3. **文件发送**:通过微信后端API向指定用户或机器人发送多种格式的文件(如图片、视频、音频、文档)。
## 功能亮点
- 支持多账号管理
- 自动获取用户配置信息
- 完整的消息与文件发送反馈
- 跨平台兼容性(Windows/Linux/MacOS)
- 详细的调试信息输出
- 会话信息查询和管理
- 实时会话状态监控
- 自动路径检测,无需手动配置
- 支持广泛的文件格式(图片、视频、音频、文档、压缩包等)
## 使用方法
### 1. 会话查询功能
#### 列出所有活跃会话
当用户要求列出所有会话时,调用 sessions_list 工具:
sessions_list()
```
#### 找到当前对话
当用户要求找到我们之间的对话时:
# 获取所有会话
sessions_list()
# 过滤出微信渠道的会话
# 找到最新的或与当前对话匹配的会话
```
#### 获取会话详情
使用 sessions_list 的参数来获取更详细的信息:
# 获取最近活跃的会话
sessions_list(activeMinutes=60, messageLimit=5)
```
### 2. 主动消息发送功能
#### 命令行调用方式
```bash
python scripts/main_send_msg <account_id> <message>
```
#### 示例
```bash
python scripts/main_send_msg "6e2f83e62b99-im-bot" "这是发送的消息"
```
### 3. 文件发送功能 (新增)
本技能提供两种核心文件发送场景:
#### 场景一:微信文件自我发送
- **使用场景**:当用户要求将某个文件(如截图、结果文件等)通过微信发送给自己时触发。
- **执行流程**:
1. 通过微信会话查询功能,确认当前会话对应的机器人账号。
2. 理解用户需要发送的文件(由用户提供或根据上下文生成)。
3. 调用文件发送程序,将文件发送至用户微信。
#### 场景二:微信文件推送发送
- **使用场景**:当用户要求将某个文件通过微信发送给指定机器人时触发。
- **执行流程**:
1. 解析用户指令,确认需要发送的文件。
2. 调用文件发送程序,将文件发送至目标机器人账号。
#### 命令行调用方式
```bash
# 发送图片
python scripts/main_send_file.py "XXXXXXX-im-bot" "/home/测试图片.jpg"
# 发送视频
python scripts/main_send_file.py "XXXXXXX-im-bot" "/home/测试视频.mp4"
# 发送文档
python scripts/main_send_file.py "XXXXXXX-im-bot" "/home/测试文档.pdf"
# 发送音频
python scripts/main_send_file.py "XXXXXXX-im-bot" "/home/测试音频.mp3"
```
#### 触发条件
当同时满足以下条件时,文件发送功能将被自动调用:
1. 用户通过微信发送的消息**非纯文本**。
2. 消息中**携带附件**(如图片、视频、文档等)。
3. 用户明确要求"通过微信发送文件"或表达类似意图。
## 参数说明
### 1. 会话查询参数
| 参数名称 | 类型 | 描述 | 默认值 |
| :--- | :--- | :--- | :--- |
| `activeMinutes` | 整数 | 查询指定分钟内活跃的会话 | 60 |
| `messageLimit` | 整数 | 每个会话返回的消息数量 | 5 |
### 2. 消息发送参数
#### 命令行参数
| 参数名称 | 类型 | 描述 | 必填 |
| :--- | :--- | :--- | :--- |
| `account_id` | 字符串 | 微信账号唯一标识符(如:`6e2f83e62b99-im-bot`) | ✅ 是 |
| `message` | 字符串 | 要发送的文本消息内容 | ✅ 是 |
### 3. 文件发送参数 (新增)
#### 命令行参数
| 参数名称 | 类型 | 描述 | 必填 |
| :--- | :--- | :--- | :--- |
| `account_id` | 字符串 | 微信账号唯一标识符(如:`XXXXXXX-im-bot`) | ✅ 是 |
| `file_path` | 字符串 | 待发送文件的完整本地路径 | ✅ 是 |
## 注意事项
### 使用前准备
1. **环境配置**:确保已正确安装 OpenClaw 并完成微信账号登录。
2. **账号配置**:确保账号ID对应的配置文件存在于正确的位置。
3. **依赖安装**:确保已安装所需的 Python 依赖包(`requests`, `uuid`, `pathlib` 等)。
### 使用限制
1. **消息格式**:支持文本消息和多种格式的文件发送。
2. **频率限制**:请遵守微信API的调用频率限制,避免频繁发送。
3. **账号状态**:确保账号处于正常登录状态。
4. **权限控制**:发送消息和文件需要对应的权限配置。
5. **文件限制**:目前不支持发送整个文件夹,文件需携带完整路径。
### 约束与限制 (新增)
#### 文件大小建议
| 文件类型 | 建议大小上限 |
| :--- | :--- |
| 图片 | ≤ 10 MB |
| 视频 | ≤ 20 MB |
| 其他文件 | ≤ 50 MB |
> 注:实际文件大小限制以微信官方API规定为准,超过限制可能导致发送失败。
#### 已测试支持的文件格式
- 图片:jpg, png, gif, bmp 等
- 视频:mp4, mov, avi 等
- 音频:mp3, wav, m4a 等
- 文档:pdf, doc, docx, xls, xlsx, ppt, pptx, txt 等
- 压缩文件:zip, rar, 7z 等
### 配置文件位置
账号配置文件默认存放在以下位置:
- Windows: `C:\Users\用户名\.openclaw\openclaw-weixin\accounts`
- Linux/MacOS: `~/.openclaw/openclaw-weixin/accounts`
### 自动路径检测
技能现在通过 `get_openclaw_path()` 函数实现自动路径检测:
1. 优先检测环境变量 `OPENCLAW_STATE_DIR`
2. 如果未设置环境变量,使用默认路径
3. 支持 Windows、Linux 和 macOS 平台
### 错误处理
常见错误及解决方法:
| 错误类型 | 可能原因 | 解决方法 |
| :--- | :--- | :--- |
| 路径不存在 | OpenClaw路径配置错误 | 检查 OpenClaw 是否正确安装 |
| 配置文件缺失 | 账号ID不存在或配置文件损坏 | 重新登录微信账号 |
| 网络请求异常 | 网络连接失败或API地址错误 | 检查网络连接和API地址配置 |
| 权限验证失败 | Token失效或权限不足 | 重新获取身份验证令牌 |
| 文件发送失败 (新增) | 文件路径错误、网络异常、API返回非零状态码、文件类型不支持或大小超限 | 检查文件路径与网络,查看错误日志,确认文件格式与大小 |
## 示例输出
### 会话查询示例
**sessions_list 返回示例:**
```json
{
"sessions": [
{
"sessionKey": "openclaw-weixin:xxxxxxxx-22f75d8b",
"kind": "openclaw-weixin",
"lastMessage": "检查微信助手的账号ID",
"lastMessageTime": "2026-04-01T17:12:00Z"
}
]
}
```
### 消息发送示例
**成功发送文本消息的输出:**
```
==================================================
【微信消息发送测试】
目标用户: xxxxxxxx
客户端ID: openclaw-weixin:1735114630-abc123def
上下文Token: xxxxxxxx-22f75d8b
------------------------------
HTTP 状态码: 200
API 返回码 (ret): 0
✅ 消息发送成功!
服务器响应详情:
{
"ret": 0,
"msg": "success",
"data": {}
}
==================================================
```
### 文件发送示例 (新增)
**成功发送文件的输出:**
```
1. 准备上传参数...
准备成功,filekey: a1b2c3d4e5f67890..., aeskey: 0011223344556677...
2. 申请上传参数...
获取到upload_param: xyz123abc456def789...
3. 加密文件...
加密完成,原始大小: 102400, 加密后大小: 102416
4. 上传到CDN...
上传成功,encrypt_query_param: enc_param_abc123def456...
5. 发送文件...
HTTP 状态码: 200
API 返回码 (ret): 0
✅ 文件发送成功!
服务器响应详情:
{
"ret": 0,
"msg": "success",
"data": {}
}
```
## 版本历史
| 版本 | 更新日期 | 更新内容 |
| :--- | :--- | :--- |
| 4.0.0 | 2026-04-03 | **新增文件发送功能**,支持图片、视频、音频、文档、压缩文件等多种格式文件发送。 |
| 3.0.0 | 2026-04-02 | 新增主动消息发送功能,支持向指定用户发送消息;添加自动路径检测功能 |
| 2.0.0 | 2026-04-01 | 改用sessions_list工具查询会话信息 |
| 1.0.0 | 2026-04-01 | 初始版本发布,支持微信账号信息查询 |
## 技术说明
### 工作原理
#### 会话查询原理
1. 调用 OpenClaw 的 sessions_list 工具
2. 获取所有活跃会话信息
3. 过滤出微信渠道的会话
4. 返回会话详细信息
#### 消息发送原理
1. 根据账号ID查找对应的配置文件
2. 读取配置文件中的连接参数
3. 构建符合微信API规范的请求
4. 发送HTTP请求到微信后端
5. 处理响应并显示结果
#### 文件发送原理 (新增)
1. 根据账号ID查找对应的配置文件,获取必要的连接参数。
2. 读取并验证本地文件,进行加密和上传参数准备。
3. 将文件上传至CDN,获取文件访问参数。
4. 调用微信API,发送包含文件访问信息的消息到指定会话。
5. 处理API响应,返回发送结果。
#### 自动路径检测原理
```python
def get_openclaw_path():
"""获取OpenClaw安装路径,支持Windows、Linux、MacOS"""
# 优先使用环境变量指定的路径
state_dir = os.environ.get('OPENCLAW_STATE_DIR')
if state_dir and os.path.exists(state_dir):
return Path(state_dir)
# 默认路径
home = Path.home()
if sys.platform == 'win32':
# Windows: C:\Users\用户名\.openclaw
return home / '.openclaw'
else:
# Linux/MacOS: ~/.openclaw
return home / '.openclaw'
```
### 架构设计
```
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 会话查询模块 │ │ 账号信息查询 │────▶│ 消息发送模块 │
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ ▲ ▲
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 错误处理 │ │ 配置文件管理 │ │ 网络请求处理 │
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ ▲ ▲
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ 自动路径检测 │ │ 环境变量检测 │ │ 文件处理模块(新增)│
└─────────────────┘ └──────────────────┘ └──────────────────┘
```
### 核心依赖
- `requests`: HTTP请求库
- `json`: JSON数据处理
- `time`: 时间戳管理
- `base64`: 编码解码
- `os`: 操作系统交互
- `uuid`: 唯一ID生成
- `pathlib`: 路径管理
- `sys`: 系统级操作
## 部署说明
### 1. 目录结构
wechat-account-send/
├── SKILL.md # 技能文档
├── scripts/
│ ├── main_send_msg # 文本消息发送主程序
│ └── main_send_file.py # (新增) 文件发送主程序
└── (其他配置文件)
### 2. 自动路径检测
技能现在支持自动路径检测,无需手动配置环境变量。但如果需要自定义 OpenClaw 路径,可以设置以下环境变量:
```bash
# Windows
set OPENCLAW_STATE_DIR=C:\path\to\your\openclaw
# Linux/MacOS
export OPENCLAW_STATE_DIR=/path/to/your/openclaw
```
## 安全注意事项
1. **配置文件保护**:账号配置文件包含敏感信息,请勿泄露。
2. **代码执行安全**:确保脚本来源可信,避免执行恶意代码。
3. **API权限控制**:定期检查和更新API权限配置。
4. **文件安全**:发送文件时,请确保文件来源安全可靠,不包含恶意内容。
## 使用场景
### 原始功能使用场景
- 检查微信助手的会话信息
- 找到当前对话的 sessionKey
- 查询所有活跃的会话列表
- 监控会话的实时状态
- 自动向微信用户发送通知或提醒文本消息
### 新增功能使用场景 (文件发送)
- 将本地生成的截图、报告、日志文件通过微信发送给自己。
- 将处理完成的图片、视频、音频或文档发送给指定的同事或机器人账号。
- 集成到自动化流程中,自动推送生成的文件结果。
---
FILE:scripts/main_send_file.py
import httpx
import json
import base64
import random
import time
import hashlib
from pathlib import Path
from typing import Dict, Any
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from urllib.parse import quote
import os
import requests
import uuid
import sys
def get_openclaw_path():
"""获取OpenClaw安装路径,支持Windows、Linux、MacOS"""
# 优先使用环境变量指定的路径
state_dir = os.environ.get('OPENCLAW_STATE_DIR')
if state_dir and os.path.exists(state_dir):
return Path(state_dir)
# 默认路径
home = Path.home()
if sys.platform == 'win32':
# Windows: C:\Users\用户名\.openclaw
return home / '.openclaw'
else:
# Linux/MacOS: ~/.openclaw
return home / '.openclaw'
def find_account_json(account_id,use_path=False):
"""根据账号ID查找对应的JSON文件并返回内容"""
try:
if use_path:
openclaw_path = use_path
else:
# 获取OpenClaw路径
openclaw_path = get_openclaw_path()
# 构建openclaw-weixin/accounts路径
weixin_accounts_path = os.path.join( openclaw_path,'openclaw-weixin','accounts')
# 检查路径是否存在
if not os.path.exists(weixin_accounts_path):
return {"error": f"路径不存在: {weixin_accounts_path} 你可以设置一个测试地址use_path" }
datas = {}
for file_json in [account_id,account_id+".context-tokens",account_id+".sync"]:
# 构建JSON文件路径
json_file = os.path.join(weixin_accounts_path,f"{file_json}.json")
# 读取并返回JSON内容
with open(json_file, 'r', encoding='utf-8') as f:
datas[file_json] = json.load(f)
return {
"account_id": account_id,
"file_path": str(json_file),
"data": datas
}
except Exception as e:
return {"error": f"读取失败: {str(e)}"}
# 计算AES-128-ECB加密后的文件大小
def calculate_encrypted_size(raw_size: int) -> int:
"""
计算AES-128-ECB加密后的文件大小
根据文档公式:filesize = ceil((rawsize + 1) / 16) * 16
因为PKCS7填充会至少填充1个字节
"""
return ((raw_size + 1 + 15) // 16) * 16
# 准备上传参数
def prepare_image_upload( image_path: str) -> Dict[str, Any]:
"""
准备图片上传:计算所有必要参数
返回包含所有上传所需参数的字典
"""
if not Path(image_path).exists():
return {
"success": False,
"error": f"图片文件不存在: {image_path}"
}
try:
# 读取文件
with open(image_path, 'rb') as f:
file_data = f.read()
# 计算文件参数
rawsize = len(file_data)
rawfilemd5 = hashlib.md5(file_data).hexdigest()
# 生成随机参数
filekey = ''.join(random.choices('0123456789abcdef', k=32)) # 16字节hex
aeskey_hex = ''.join(random.choices('0123456789abcdef', k=32)) # 16字节hex
# 计算加密后大小
filesize = calculate_encrypted_size(rawsize)
return {
"success": True,
"filekey": filekey,
"aeskey_hex": aeskey_hex,
"rawsize": rawsize,
"rawfilemd5": rawfilemd5,
"filesize": filesize,
"file_data": file_data
}
except Exception as e:
return {
"success": False,
"error": f"准备图片失败: {str(e)}"
}
# 加密文件
def aes_encrypt_file(file_path: str, aes_key_hex: str) -> bytes:
"""
使用AES-128-ECB模式加密文件,使用PKCS7 padding
根据文档第二步:本地加密文件
"""
if len(aes_key_hex) != 32:
raise ValueError("AES密钥必须是32个十六进制字符(16字节)")
# 将十六进制字符串转换为字节
aes_key = bytes.fromhex(aes_key_hex)
# 读取文件
with open(file_path, 'rb') as f:
file_data = f.read()
# 创建AES-128-ECB加密器
cipher = AES.new(aes_key, AES.MODE_ECB)
# 对数据进行PKCS7填充并加密
padded_data = pad(file_data, AES.block_size, style='pkcs7')
encrypted_data = cipher.encrypt(padded_data)
return encrypted_data
# 编码AES密钥
def encode_aes_key(aes_key_hex: str) -> str:
"""
将AES密钥编码为"base64(hex string)"格式
根据文档8.4节:先把hex文本当作字节,再base64
例如:hex "00112233445566778899aabbccddeeff" → "MDAxMTIyMzM0NDU1NjY3Nzg4OTlhYWJiY2NkZGVlZmY="
"""
# 将十六进制字符串转换为字节
hex_bytes = aes_key_hex.encode('utf-8')
# 进行base64编码
return base64.b64encode(hex_bytes).decode('utf-8')
# 申请上传参数
def get_upload_params( use_token: str,
filekey: str,
media_type: int,
to_user_id: str,
rawsize: int,
rawfilemd5: str,
filesize: int,
aeskey_hex: str,
no_need_thumb: bool = True) -> Dict[str, Any]:
"""
第一步:申请上传参数 (getuploadurl)
根据文档8.2节第一步
"""
random_uint32 = os.urandom(4)
x_wechat_uin = base64.b64encode(random_uint32).decode('utf-8')
# 构造请求体
body = {
"filekey": filekey,
"media_type": media_type, # 1=图片
"to_user_id": to_user_id,
"rawsize": rawsize,
"rawfilemd5": rawfilemd5,
"filesize": filesize,
"no_need_thumb": no_need_thumb,
"aeskey": aeskey_hex,
"base_info": {
"channel_version": "1.0.0"
}
}
# 发送请求
raw = json.dumps(body, ensure_ascii=False)
headers = {
"Content-Type": "application/json",
"AuthorizationType": "ilink_bot_token",
"Authorization": f"Bearer {use_token}",
"X-WECHAT-UIN": x_wechat_uin,
"Content-Length": str(len(raw.encode("utf-8"))),
}
# 发送POST请求
resp = requests.post(
"https://ilinkai.weixin.qq.com/ilink/bot/getuploadurl",
headers=headers,
data= raw.encode('utf-8'),
timeout=15 # 秒
)
if resp.status_code != 200:
return {
"success": False,
"error": f"申请上传参数失败,HTTP状态码: {resp.status_code}",
"response": resp.text
}
result = resp.json()
upload_param = result.get("upload_param", "")
if not upload_param:
return {
"success": False,
"error": "响应中未包含upload_param",
"response": result
}
return {
"success": True,
"upload_param": upload_param, # 就是encrypted_query_param
"thumb_upload_param": result.get("thumb_upload_param", ""),
"response": result
}
# 上传到CDN
def upload_to_cdn( upload_param: str,
filekey: str,
encrypted_data: bytes) -> Dict[str, Any]:
"""
第三步:上传到CDN
根据文档8.2节第三步
"""
# 构建CDN上传URL
encoded_param = quote(upload_param, safe='')
cdn_url = f"https://novac2c.cdn.weixin.qq.com/c2c/upload?encrypted_query_param={encoded_param}&filekey={filekey}"
# 构造请求头
headers = {
"Content-Type": "application/octet-stream"
}
try:
# 发送请求
resp = httpx.post(
cdn_url,
headers=headers,
content=encrypted_data,
timeout=30,
)
if resp.status_code == 200:
encrypt_query_param = resp.headers.get('x-encrypted-param', '')
if not encrypt_query_param:
return {
"success": False,
"error": "未在响应头中找到x-encrypted-param",
"headers": dict(resp.headers)
}
return {
"success": True,
"encrypt_query_param": encrypt_query_param,
"response_headers": dict(resp.headers)
}
else:
return {
"success": False,
"error": f"上传失败,HTTP状态码: {resp.status_code}",
"response": resp.text
}
except httpx.RequestError as e:
return {
"success": False,
"error": f"上传请求失败: {str(e)}"
}
def send_weixin_file(BOT_TOKEN,TARGET_USER_ID,CONTEXT_TOKEN,IMAGE_PATH):
"""
根据文件类型发送微信文件
参数说明:
BOT_TOKEN: 机器人令牌
TARGET_USER_ID: 目标用户ID
CONTEXT_TOKEN: 上下文令牌
FILE_PATH: 文件路径(支持图片、视频、文件等多种类型)
媒体类型说明:
1: 图片消息类型
2: 视频消息类型
3: 文件消息类型
4: 音频消息类型
"""
# 根据文件后缀判断媒体类型
file_ext = IMAGE_PATH.lower().split('.')[-1] if '.' in IMAGE_PATH else ''
# 图片类型
image_extensions = ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'tiff', 'svg']
# 视频类型
video_extensions = ['mp4', 'mov', 'avi', 'wmv', 'flv', 'mkv', 'webm', 'mpeg', 'mpg']
# 音频类型
audio_extensions = ['mp3', 'wav', 'aac', 'flac', 'm4a', 'ogg', 'wma']
# 文档类型
document_extensions = ['pdf', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx', 'txt']
# 压缩文件类型
archive_extensions = ['zip', 'rar', '7z', 'tar', 'gz']
# 判断文件类型
if file_ext in image_extensions:
media_type = 1
elif file_ext in video_extensions:
media_type = 2
elif file_ext in audio_extensions:
media_type = 4
elif file_ext in document_extensions or file_ext in archive_extensions:
media_type = 3
else:
# 默认处理为文件类型
media_type = 3
print(f"警告:未知文件类型 .{file_ext},将作为普通文件发送")
# 1.准备上传参数
print("1. 准备上传参数...")
prepare_result = prepare_image_upload(IMAGE_PATH)
if not prepare_result["success"]:
print(f"准备失败: {prepare_result.get('error')}")
else:
print(f"准备成功,filekey: {prepare_result['filekey'][:16]}..., aeskey: {prepare_result['aeskey_hex'][:16]}...")
filekey = prepare_result["filekey"]
aeskey_hex = prepare_result["aeskey_hex"]
rawsize = prepare_result["rawsize"]
rawfilemd5 = prepare_result["rawfilemd5"]
filesize = prepare_result["filesize"]
# 2.申请上传参数
print("2. 申请上传参数...")
upload_params = get_upload_params(
use_token=BOT_TOKEN,
filekey=filekey,
media_type=media_type,
to_user_id=TARGET_USER_ID,
rawsize=rawsize,
rawfilemd5=rawfilemd5,
filesize=filesize,
aeskey_hex=aeskey_hex,
no_need_thumb=True
)
print(f"获取到upload_param: {upload_params['upload_param'][:20]}...")
# 3. 加密文件...
print("3. 加密文件...")
encrypted_data = aes_encrypt_file(IMAGE_PATH,aeskey_hex)
print(f" 加密完成,原始大小: {rawsize}, 加密后大小: {len(encrypted_data)}")
# 4. 上传到CDN...
print("4. 上传到CDN...")
upload_result = upload_to_cdn(upload_params["upload_param"],filekey, encrypted_data)
encrypt_query_param = upload_result["encrypt_query_param"]
print(f" 上传成功,encrypt_query_param: {encrypt_query_param[:20]}...")
# 编码AES密钥
encoded_aes_key = encode_aes_key(aeskey_hex)
upload_result = {
"success": True,
"encrypt_query_param": encrypt_query_param,
"aes_key_hex": aeskey_hex,
"encoded_aes_key": encoded_aes_key,
"file_size": rawsize,
"filesize": filesize,
"filekey": filekey
}
# 5. 发送图片...
print("5. 发送文件...")
timestamp_ms = int(time.time() * 1000)
random_suffix = uuid.uuid4().hex[:8]
client_id = f"openclaw-weixin:{timestamp_ms}-{random_suffix}"
random_uint32 = os.urandom(4)
x_wechat_uin = base64.b64encode(random_uint32).decode('utf-8')
# 构造图片消息体(第四步)
if media_type == 1:
payload = {
"msg": {
"from_user_id": "",
"to_user_id": TARGET_USER_ID,
"client_id": client_id,
"message_type": 2,
"message_state": 2,
"context_token": CONTEXT_TOKEN,
"item_list": [
{
"type": 2, # 图片消息类型
"image_item": {
"media": {
"encrypt_query_param": upload_result["encrypt_query_param"],
"aes_key": upload_result["encoded_aes_key"],
"encrypt_type": 1
},
"mid_size": upload_result["file_size"] # 明文文件大小
}
}
]
},
"base_info": {
"channel_version": "1.0.0"
}
}
# 构造文件消息体(第四步)
elif media_type == 3:
payload = {
"msg": {
"from_user_id": "",
"to_user_id": TARGET_USER_ID,
"client_id": client_id,
"message_type": 2,
"message_state": 2,
"context_token": CONTEXT_TOKEN,
"item_list": [
{
"type": 4,
"file_item": {
"media": {
"encrypt_query_param": upload_result["encrypt_query_param"],
"aes_key": upload_result["encoded_aes_key"],
"encrypt_type": 1
},
"file_name": Path(IMAGE_PATH).name,
# "len": filesize, 写了发不出去,应该是前面有问题
# "md5": rawfilemd5 同
}
}
]
},
"base_info": {
"channel_version": "1.0.0"
}
}
# 构造视频消息体(第四步)
elif media_type == 2:
payload = {
"msg": {
"from_user_id": "",
"to_user_id": TARGET_USER_ID,
"client_id": client_id,
"message_type": 2,
"message_state": 2,
"context_token": CONTEXT_TOKEN,
"item_list": [
{
"type": 5,
"video_item": {
"media": {
"encrypt_query_param": upload_result["encrypt_query_param"],
"aes_key": upload_result["encoded_aes_key"],
"encrypt_type": 1
},
"video_size": upload_result["file_size"] # 明文文件大小
}
}
]
},
"base_info": {
"channel_version": "1.0.0"
}
}
# 发送请求
# 4. 构建请求头
raw = json.dumps(payload, ensure_ascii=False)
headers = {
"Content-Type": "application/json",
"AuthorizationType": "ilink_bot_token",
"Authorization": f"Bearer {BOT_TOKEN}",
"X-WECHAT-UIN": x_wechat_uin,
"Content-Length": str(len(raw.encode("utf-8"))),
}
# 发送POST请求
response = requests.post(
"https://ilinkai.weixin.qq.com/ilink/bot/sendmessage",
headers=headers,
data= raw.encode('utf-8'),
timeout=15 # 秒
)
print(f"HTTP 状态码: {response.status_code}")
if response.status_code == 200:
resp_json = response.json()
print(f"API 返回码 (ret): {resp_json.get('ret')}")
print("✅ 文件发送成功!")
print("服务器响应详情:")
print(json.dumps(resp_json, indent=2, ensure_ascii=False))
else:
print(f"❌ 请求失败,HTTP状态码非200。")
print(f"响应内容: {response.text[:500]}") # 打印前500字符以便调试
def main():
"""主函数:从命令行参数获取账号ID并返回JSON"""
if len(sys.argv) != 3:
print("用法: python script.py <account_id> <msg>")
print("示例: python script.py wechat_12345 你好")
sys.exit(1)
account_id = sys.argv[1]
IMAGE_PATH = sys.argv[2]
result = find_account_json(account_id)
result = result["data"]
# ==================== 配置区 (请根据您的实际情况修改) ====================
# 1. 基础API地址 (从扫码登录后的账户配置中获取)
BASE_URL = result[f"{account_id}"]["baseUrl"] # 请替换为您的有效 baseUrl
# 2. 机器人令牌 (Bot Token)
BOT_TOKEN = result[f"{account_id}"]["token"] # 请替换为您的有效 token
# 3. 消息接收方用户ID
TARGET_USER_ID = result[f"{account_id}"]["userId"] # 请替换为目标用户的 userId
# 4. 会话上下文令牌 (Context Token)
CONTEXT_TOKEN = result[f"{account_id}.context-tokens"][result[f"{account_id}"]["userId"]]
send_weixin_file(BOT_TOKEN,TARGET_USER_ID,CONTEXT_TOKEN,IMAGE_PATH)
if __name__ == "__main__":
main()
# 调用方法
# python main_send_file.py XXXXXXXXXXXX-im-bot "测试文件.txt"
# python main_send_file.py XXXXXXXXXXXX-im-bot "测试视频.txt"
# python main_send_file.py XXXXXXXXXXXX-im-bot "测试图片.txt"
FILE:scripts/main_send_msg.py
import requests
import json
import time
import base64
import os
import uuid
from pathlib import Path
import sys
def get_openclaw_path():
"""获取OpenClaw安装路径,支持Windows、Linux、MacOS"""
# 优先使用环境变量指定的路径
state_dir = os.environ.get('OPENCLAW_STATE_DIR')
if state_dir and os.path.exists(state_dir):
return Path(state_dir)
# 默认路径
home = Path.home()
if sys.platform == 'win32':
# Windows: C:\Users\用户名\.openclaw
return home / '.openclaw'
else:
# Linux/MacOS: ~/.openclaw
return home / '.openclaw'
def find_account_json(account_id,use_path=False):
"""根据账号ID查找对应的JSON文件并返回内容"""
try:
if use_path:
openclaw_path = use_path
else:
# 获取OpenClaw路径
openclaw_path = get_openclaw_path()
# 构建openclaw-weixin/accounts路径
weixin_accounts_path = os.path.join( openclaw_path,'openclaw-weixin','accounts')
# 检查路径是否存在
if not os.path.exists(weixin_accounts_path):
return {"error": f"路径不存在: {weixin_accounts_path}"}
datas = {}
for file_json in [account_id,account_id+".context-tokens",account_id+".sync"]:
# 构建JSON文件路径
json_file = os.path.join(weixin_accounts_path,f"{file_json}.json")
# 读取并返回JSON内容
with open(json_file, 'r', encoding='utf-8') as f:
datas[file_json] = json.load(f)
return {
"account_id": account_id,
"file_path": str(json_file),
"data": datas
}
except Exception as e:
return {"error": f"读取失败: {str(e)}"}
def send_weixin_message(BASE_URL,BOT_TOKEN,TARGET_USER_ID,CONTEXT_TOKEN,TXT_MSG):
"""
通过微信后端API发送一条文本测试消息。
"""
# 1. 构建请求URL (关键:端点路径为 /ilink/bot/sendmessage)
api_endpoint = f"{BASE_URL.rstrip('/')}/ilink/bot/sendmessage"
# 2. 生成请求唯一标识 (client_id) 和随机UIN
timestamp_ms = int(time.time() * 1000)
random_suffix = uuid.uuid4().hex[:8]
client_id = f"openclaw-weixin:{timestamp_ms}-{random_suffix}"
random_uint32 = os.urandom(4)
x_wechat_uin = base64.b64encode(random_uint32).decode('utf-8')
# 3. 构建请求体 (JSON)
payload = {
"msg": {
"from_user_id": "", # 机器人发送时固定为空字符串
"to_user_id": TARGET_USER_ID,
"client_id": client_id,
"message_type": 2, # 固定为2,代表Bot消息
"message_state": 2, # 固定为2,代表完成态
"context_token": CONTEXT_TOKEN, # 重要:如果为空,首次发送可能失败
"item_list": [
{
"type": 1, # 1 代表文本消息
"text_item": {
"text": TXT_MSG
}
}
]
},
"base_info": {
"channel_version": "1.0.0" # 应与您的信道插件版本一致
}
}
# 4. 构建请求头
raw = json.dumps(payload, ensure_ascii=False)
headers = {
"Content-Type": "application/json",
"AuthorizationType": "ilink_bot_token",
"Authorization": f"Bearer {BOT_TOKEN}",
"X-WECHAT-UIN": x_wechat_uin,
"Content-Length": str(len(raw.encode("utf-8"))),
}
print("=" * 50)
print("【微信消息发送测试】")
print(f"目标用户: {TARGET_USER_ID}")
print(f"客户端ID: {client_id}")
print(f"上下文Token: {CONTEXT_TOKEN if CONTEXT_TOKEN else '(为空,可能影响首次发送)'}")
print("-" * 30)
try:
# 5. 发送POST请求
response = requests.post(
api_endpoint,
headers=headers,
data=json.dumps(payload, ensure_ascii=False).encode('utf-8'),
timeout=15 # 秒
)
# 6. 处理响应
print(f"HTTP 状态码: {response.status_code}")
if response.status_code == 200:
resp_json = response.json()
print(f"API 返回码 (ret): {resp_json.get('ret')}")
print("✅ 消息发送成功!")
print("服务器响应详情:")
print(json.dumps(resp_json, indent=2, ensure_ascii=False))
else:
print(f"❌ 请求失败,HTTP状态码非200。")
print(f"响应内容: {response.text[:500]}") # 打印前500字符以便调试
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求异常: {e}")
except json.JSONDecodeError as e:
print(f"❌ 解析服务器响应失败: {e}")
print(f"原始响应文本: {response.text[:500]}")
print("=" * 50)
def main():
"""主函数:从命令行参数获取账号ID并返回JSON"""
if len(sys.argv) != 3:
print("用法: python script.py <account_id> <msg>")
print("示例: python script.py wechat_12345 你好")
sys.exit(1)
account_id = sys.argv[1]
TXT_MSG = sys.argv[2]
result = find_account_json(account_id)
result = result["data"]
# ==================== 配置区 (请根据您的实际情况修改) ====================
# 1. 基础API地址 (从扫码登录后的账户配置中获取)
BASE_URL = result[f"{account_id}"]["baseUrl"] # 请替换为您的有效 baseUrl
# 2. 机器人令牌 (Bot Token)
BOT_TOKEN = result[f"{account_id}"]["token"] # 请替换为您的有效 token
# 3. 消息接收方用户ID
TARGET_USER_ID = result[f"{account_id}"]["userId"] # 请替换为目标用户的 userId
# 4. 会话上下文令牌 (Context Token)
CONTEXT_TOKEN = result[f"{account_id}.context-tokens"][result[f"{account_id}"]["userId"]]
# 正常的逻辑是机器人提供消息用户的ID,让后我通过查询插件数据获取token
# 输出JSON格式的结果
send_weixin_message(BASE_URL,BOT_TOKEN,TARGET_USER_ID,CONTEXT_TOKEN,TXT_MSG)
if __name__ == "__main__":
# 执行发送测试
main()
# 调用方法
# python main_send_msg.py XXXXXXXXXXXX-im-bot "你好"