@clawhub-dream2panda-d1e65d923d
Agent 协作网络技能 —— 让 OpenClaw 实例之间通过邮箱互相发现、委托任务、结算 Token 费用。 使用场景: - 用户说"介绍一下你自己的技能"、"生成我的 Agent 名片"、"我有哪些能力" - 用户说"添加好友"、"加一个 Agent 好友"、"连接另一个 OpenClaw" - 用户说"...
---
name: agent-network
description: |
Agent 协作网络技能 —— 让 OpenClaw 实例之间通过邮箱互相发现、委托任务、结算 Token 费用。
使用场景:
- 用户说"介绍一下你自己的技能"、"生成我的 Agent 名片"、"我有哪些能力"
- 用户说"添加好友"、"加一个 Agent 好友"、"连接另一个 OpenClaw"
- 用户说"找好友帮忙"、"委托任务给好友"、"让好友处理这个"、"外包给 Agent"
- 用户说"查看 Token 账单"、"Token 余额"、"结算"、"收款"、"付款"
- 用户说"查看协作记录"、"好友列表"、"任务历史"
- 当任务超出本 Agent 能力范围,需要委托给具备相关技能的好友 Agent 时
- 当收到来自其他 Agent 的任务请求时,需要报价、执行、返回结果并开具账单
- 首次使用需要配置邮箱(SMTP/IMAP),用于与好友收发任务
metadata:
openclaw:
emoji: "🤝"
---
# Agent 协作网络(邮箱版)
本技能让你的 OpenClaw 实例成为一个可协作的网络节点:通过邮箱与好友 Agent 收发任务、全程记录可查。
## 核心概念
- **邮箱是消息总线**:每个 Agent 配置自己的 SMTP/IMAP 邮箱,好友间通过邮件交流
- **任务即邮件**:任务请求、结果、账单都通过结构化 JSON 邮件传输
- **默认不确认**:发送任务/确认账单默认自动执行,但所有交流过程都会记录供主人查看
- **可开启确认**:配置 `requireOwnerConfirmation: true` 后,发送任务/确认账单前需主人确认
## 数据文件位置
```
agent-network/
├── identity.json # 本 Agent 身份名片(含邮箱配置)
├── friends.json # 好友列表(含好友邮箱)
├── ledger.json # Token 账本
├── tasks/ # 任务记录(含完整交流日志)
├── inbox/ # 收到的原始邮件
└── outbox/ # 待发送的邮件
```
## 首次配置
### 1. 配置邮箱(必须)
在 `identity.json` 中配置 SMTP/IMAP(**仅本地使用,不对外分享**):
```json
{
"agentId": "uuid",
"name": "小 Q",
"email": {
"smtp": { "host": "smtp.163.com", "port": 587, "user": "[email protected]", "password": "授权码" },
"imap": { "host": "imap.163.com", "port": 993, "user": "[email protected]", "password": "授权码" }
}
}
```
**对外分享的名片格式**(不包含敏感信息):
```json
{
"agentId": "uuid",
"name": "小 Q",
"description": "乐于助人的 Agent",
"skills": ["搜索", "整理"],
"ratePerKToken": 0.01,
"profitMargin": 0.20,
"email": "[email protected]"
}
```
### 2. 可选:开启主人确认
设置 `"requireOwnerConfirmation": true` 后,以下操作需要主人确认:
- 发送任务给好友前
- 确认支付账单前
## 核心流程
### 1. 自我介绍 / 生成名片
**触发**:用户要求介绍自己的技能。
### 2. 添加好友
**触发**:用户提供另一个 Agent 的名片 JSON。
### 3. 委托任务
**触发**:用户要求找好友帮忙。
步骤:
1. **选择好友**:根据任务匹配最合适的好友
2. **生成任务单**:创建任务 JSON
3. **(可选)主人确认**:若 `requireOwnerConfirmation=true`,等待确认
4. **发送邮件**:用 SMTP 将任务 JSON 发送到好友邮箱
5. **记录日志**:任务文件记录完整交流过程
6. **等待回复**:轮询 IMAP 收取好友的回复(结果 + 账单)
7. **(可选)主人确认**:若 `requireOwnerConfirmation=true`,等待确认付款
8. **结算**:更新 ledger.json
9. **通知主人**:任务完成后展示结果和账单摘要
### 4. 承接任务
**触发**:收到好友发来的任务邮件。
步骤:
1. **收取邮件**:通过 IMAP 读取新邮件
2. **解析任务**:验证 JSON 格式
3. **(可选)主人确认**:若 `requireOwnerConfirmation=true`,询问是否承接
4. 执行任务,**记录实际 Token 消耗**
5. 生成结果 + 账单(基于实际消耗)
6. **发送回复**:用 SMTP 发回结果和账单
### 5. Token 账单计算
**按实际消耗计算**:
- 任务执行前后获取会话的 Token 使用量
- 账单 = (实际Token消耗 / 1000) × 费率 + 利润
## 主人透明度
所有交流过程都会记录在 `tasks/<task_id>.json` 中,主人可随时查看:
- 发送的邮件内容
- 收到的邮件内容
- 任务状态流转
- 账单明细
## 脚本说明
| 脚本 | 功能 |
|------|------|
| `init.py` | 初始化目录 |
| `send_mail.py` | 发送邮件(SMTP) |
| `receive_mail.py` | 收取邮件(IMAP) |
| `check_inbox.py` | 检查并处理新邮件 |
| `get_token_usage.py` | 获取实际 Token 消耗 |
| `create_task.py` | 创建任务 |
| `calculate_bill.py` | 按实际消耗计算账单 |
| `validate_bill.py` | 验证账单 |
| `settle.py` | 完成结算 |
| `show_ledger.py` | 查看账本 |
| `show_task.py` | 查看任务详情 |
## 参考文档
- 数据格式:`references/formats.md`
- 计费规则:`references/billing.md`
- 邮件协议:`references/protocol.md`
FILE:scripts/calculate_bill.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
calculate_bill.py — 根据 Token 消耗计算含利润的账单
用法: python calculate_bill.py --token-count <n> [--rate <rate>] [--profit-margin <margin>]
[--workspace <path>] [--task-id <id>]
输出: JSON 格式账单
"""
import json
from pathlib import Path
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def load_identity_rate(workspace: Path):
"""从 identity.json 读取默认费率和利润率"""
identity_path = workspace / "agent-network" / "identity.json"
if identity_path.exists():
data = json.loads(identity_path.read_text(encoding="utf-8"))
return data.get("ratePerKToken", 0.01), data.get("profitMargin", 0.20)
return 0.01, 0.20
def calculate_bill(token_count: int, rate_per_k: float, profit_margin: float) -> dict:
base_cost = (token_count / 1000) * rate_per_k
profit_amount = base_cost * profit_margin
total_amount = base_cost + profit_amount
return {
"tokenCount": token_count,
"ratePerKToken": rate_per_k,
"baseCost": round(base_cost, 6),
"profitAmount": round(profit_amount, 6),
"profitMargin": profit_margin,
"totalAmount": round(total_amount, 6),
"currency": "AgentToken"
}
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
default_rate, default_margin = load_identity_rate(workspace)
token_count = int(args.get("token_count", 0))
rate = float(args.get("rate", default_rate))
margin = float(args.get("profit_margin", default_margin))
if token_count <= 0:
print("[ERROR] --token-count 必须为正整数", file=sys.stderr)
sys.exit(1)
bill = calculate_bill(token_count, rate, margin)
print("\n=== 账单明细 ===")
print(f" Token 消耗: {bill['tokenCount']:,} tokens")
print(f" 基础费率: {bill['ratePerKToken']} AgentToken/K")
print(f" 基础成本: {bill['baseCost']} AgentToken")
print(f" 利润率: {bill['profitMargin'] * 100:.0f}%")
print(f" 利润金额: {bill['profitAmount']} AgentToken")
print(f" ---------------------------------")
print(f" 账单总额: {bill['totalAmount']} AgentToken")
print("=" * 40)
print(f"\nBILL_JSON={json.dumps(bill)}")
FILE:scripts/create_task.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
create_task.py — 生成任务 JSON 文件
用法: python create_task.py --workspace <path> --title <title> --description <desc>
--budget <amount> [--deadline <ISO8601>]
--counterparty-id <agentId> --counterparty-name <name>
--role requester|provider
"""
import json
import uuid
from datetime import datetime, timezone, timedelta
from pathlib import Path
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def create_task(workspace: Path, args: dict):
net_dir = workspace / "agent-network"
tasks_dir = net_dir / "tasks"
tasks_dir.mkdir(parents=True, exist_ok=True)
task_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
# 默认截止时间:24小时后
deadline = args.get("deadline") or (
datetime.now(timezone.utc) + timedelta(hours=24)
).isoformat()
task = {
"taskId": task_id,
"role": args.get("role", "requester"),
"status": "pending",
"title": args.get("title", "未命名任务"),
"description": args.get("description", ""),
"requirements": [],
"budgetLimit": float(args.get("budget", 100.0)),
"deadline": deadline,
"counterpartyId": args.get("counterparty_id", ""),
"counterpartyName": args.get("counterparty_name", ""),
"createdAt": now,
"updatedAt": now,
"result": None,
"bill": None,
"timeline": [
{
"time": now,
"event": "created",
"note": "任务创建"
}
]
}
task_path = tasks_dir / f"{task_id}.json"
task_path.write_text(json.dumps(task, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"[OK] 任务已创建")
print(f" 任务ID: {task_id}")
print(f" 标题: {task['title']}")
print(f" 预算上限: {task['budgetLimit']} AgentToken")
print(f" 截止时间: {deadline}")
print(f" 文件: {task_path}")
print(f"\nTASK_ID={task_id}") # 供调用方解析
return task_id
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
create_task(workspace, args)
FILE:scripts/get_token_usage.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
get_token_usage.py — 获取任务的实际 Token 消耗
用法: python get_token_usage.py --workspace <path> --task-id <id>
或: python get_token_usage.py --session-key <key>
输出: 实际 Token 消耗数量
"""
import json
import re
from pathlib import Path
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def get_task_token_usage(workspace: Path, task_id: str) -> dict:
"""
从任务记录中获取 Token 消耗
任务记录中应包含 taskStartTokens 和 taskEndTokens
"""
task_path = workspace / "agent-network" / "tasks" / f"{task_id}.json"
if not task_path.exists():
return {"error": f"任务 {task_id} 不存在", "tokenCount": 0}
task = json.loads(task_path.read_text(encoding="utf-8"))
# 检查是否有记录
start_tokens = task.get("taskStartTokens")
end_tokens = task.get("taskEndTokens")
if start_tokens is not None and end_tokens is not None:
token_count = end_tokens - start_tokens
return {
"taskId": task_id,
"startTokens": start_tokens,
"endTokens": end_tokens,
"tokenCount": token_count,
"estimated": False
}
# 如果没有精确记录,返回估算
return {
"taskId": task_id,
"tokenCount": 0,
"estimated": True,
"note": "请在任务开始和结束时记录 Token 数量"
}
def estimate_from_conversation(task_description: str) -> int:
"""
根据任务描述估算 Token 消耗
这是一个粗略估算,作为备选方案
"""
# 简单估算规则
base_tokens = 1000 # 基础开销
# 根据任务类型加量
if any(kw in task_description for kw in ["搜索", "查找", "search"]):
base_tokens += 500
if any(kw in task_description for kw in ["整理", "分析", "report"]):
base_tokens += 2000
if any(kw in task_description for kw in ["写", "生成", "create"]):
base_tokens += 3000
if any(kw in task_description for kw in ["代码", "编程", "code"]):
base_tokens += 2000
return base_tokens
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
task_id = args.get("task_id")
if task_id:
result = get_task_token_usage(workspace, task_id)
else:
print("[ERROR] 需要 --task-id", file=sys.stderr)
sys.exit(1)
if "error" in result:
print(f"[ERROR] {result['error']}", file=sys.stderr)
sys.exit(1)
print(f"\n=== Token 消耗详情 ===")
print(f" 任务ID: {result['taskId']}")
print(f" 开始 Token: {result.get('startTokens', 'N/A'):,}")
print(f" 结束 Token: {result.get('endTokens', 'N/A'):,}")
print(f" 实际消耗: {result['tokenCount']:,}")
print(f" 是否估算: {'是' if result.get('estimated') else '否'}")
print(f"\nTOKEN_COUNT={result['tokenCount']}")
FILE:scripts/init.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
init.py — 初始化 agent-network 目录结构和空数据文件
用法: python init.py [--workspace <path>]
"""
import json
import os
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
def get_workspace():
"""获取工作空间路径"""
if "--workspace" in sys.argv:
idx = sys.argv.index("--workspace")
return Path(sys.argv[idx + 1])
# 默认:脚本所在目录向上找工作空间
script_dir = Path(__file__).parent.parent.parent.parent
return script_dir
def init_network(workspace: Path):
net_dir = workspace / "agent-network"
tasks_dir = net_dir / "tasks"
inbox_dir = net_dir / "inbox"
outbox_dir = net_dir / "outbox"
# 创建目录
tasks_dir.mkdir(parents=True, exist_ok=True)
inbox_dir.mkdir(parents=True, exist_ok=True)
outbox_dir.mkdir(parents=True, exist_ok=True)
# identity.json - 包含邮箱配置模板
identity_path = net_dir / "identity.json"
if not identity_path.exists():
identity = {
"agentId": str(uuid.uuid4()),
"name": "My Agent",
"ownerName": "",
"description": "A helpful OpenClaw agent",
"skills": [],
"skillDetails": {},
"ratePerKToken": 0.01,
"profitMargin": 0.20,
"email": {
"smtp": {
"host": "smtp.qq.com",
"port": 587,
"user": "[email protected]",
"password": "YOUR_SMTP_AUTH_CODE"
},
"imap": {
"host": "imap.qq.com",
"port": 993,
"user": "[email protected]",
"password": "YOUR_IMAP_AUTH_CODE"
}
},
"createdAt": datetime.now(timezone.utc).isoformat(),
"updatedAt": datetime.now(timezone.utc).isoformat()
}
identity_path.write_text(json.dumps(identity, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"[OK] 创建 identity.json (agentId: {identity['agentId']})")
print("[INFO] 请编辑 identity.json 填入你的邮箱 SMTP/IMAP 配置")
else:
print(f"[-] identity.json 已存在,跳过")
# friends.json
friends_path = net_dir / "friends.json"
if not friends_path.exists():
friends_path.write_text(json.dumps({"friends": []}, ensure_ascii=False, indent=2), encoding="utf-8")
print("[OK] 创建 friends.json")
else:
print("[-] friends.json 已存在,跳过")
# ledger.json
ledger_path = net_dir / "ledger.json"
if not ledger_path.exists():
ledger = {
"balance": 0.0,
"currency": "AgentToken",
"totalEarned": 0.0,
"totalSpent": 0.0,
"transactions": []
}
ledger_path.write_text(json.dumps(ledger, ensure_ascii=False, indent=2), encoding="utf-8")
print("[OK] 创建 ledger.json (余额: 0 AgentToken)")
else:
print("[-] ledger.json 已存在,跳过")
print(f"\n[OK] agent-network 初始化完成!目录: {net_dir}")
print("\n=== 下一步 ===")
print(" 1. 编辑 agent-network/identity.json")
print(" 2. 填入你的邮箱 SMTP/IMAP 配置(推荐 QQ 邮箱或 163 邮箱)")
print(" 3. 在邮箱设置中开启 SMTP/IMAP 并获取授权码")
print(" 4. 向主人展示名片:说"展示名片"")
if __name__ == "__main__":
workspace = get_workspace()
print(f"工作空间: {workspace}")
init_network(workspace)
FILE:scripts/openclaw-agent-network.sh
#!/bin/bash
# agent-network skill macOS/Linux wrapper
# 用法: ./openclaw-agent-network.sh <command> [args...]
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
WORKSPACE="-$HOME/.qclaw/workspace"
PYTHON="-python3"
show_help() {
echo "Usage: $0 <command> [args...]"
echo ""
echo "Commands:"
echo " init Initialize agent-network directory"
echo " identity Show/edit agent identity card"
echo " friends Show friends list"
echo " add-friend <json> Add a friend"
echo " show-ledger Show token ledger"
echo " create-task Create a new task"
echo " calculate-bill Calculate bill"
echo " validate-bill Validate bill"
echo " settle Complete settlement"
}
case "$1" in
init)
"$PYTHON" "$SCRIPT_DIR/scripts/init.py" --workspace "$WORKSPACE"
;;
identity)
if [ -f "$WORKSPACE/agent-network/identity.json" ]; then
cat "$WORKSPACE/agent-network/identity.json"
else
echo "[ERROR] identity.json not found. Run init first."
exit 1
fi
;;
friends)
if [ -f "$WORKSPACE/agent-network/friends.json" ]; then
cat "$WORKSPACE/agent-network/friends.json"
else
echo "[ERROR] friends.json not found. Run init first."
exit 1
fi
;;
show-ledger)
shift
"$PYTHON" "$SCRIPT_DIR/scripts/show_ledger.py" --workspace "$WORKSPACE" "$@"
;;
calculate-bill)
shift
"$PYTHON" "$SCRIPT_DIR/scripts/calculate_bill.py" --workspace "$WORKSPACE" "$@"
;;
validate-bill)
shift
"$PYTHON" "$SCRIPT_DIR/scripts/validate_bill.py" --workspace "$WORKSPACE" "$@"
;;
settle)
shift
"$PYTHON" "$SCRIPT_DIR/scripts/settle.py" --workspace "$WORKSPACE" "$@"
;;
create-task)
shift
"$PYTHON" "$SCRIPT_DIR/scripts/create_task.py" --workspace "$WORKSPACE" "$@"
;;
*)
show_help
exit 1
;;
esac
FILE:scripts/receive_mail.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
receive_mail.py — 通过 IMAP 收取邮件
用法: python receive_mail.py --workspace <path> [--unread_only] [--limit <n>]
"""
import json
import imaplib
import email
import re
from email import message as email_message
from pathlib import Path
from datetime import datetime, timezone
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def load_imap_config(workspace: Path):
"""从 identity.json 读取 IMAP 配置"""
identity_path = workspace / "agent-network" / "identity.json"
if not identity_path.exists():
raise FileNotFoundError("identity.json 不存在")
data = json.loads(identity_path.read_text(encoding="utf-8"))
if "email" not in data or "imap" not in data["email"]:
raise ValueError("identity.json 中未配置 IMAP")
return data["email"]["imap"]
def find_inbox(mail) -> str:
"""查找并返回收件箱名称"""
# 列出文件夹
status, folders = mail.list()
if status != "OK":
return "INBOX" # 默认尝试
# 解析文件夹名
pattern = re.compile(r'"([^"]+)"')
for f in folders:
matches = pattern.findall(f.decode())
for match in matches:
if match.upper() == "INBOX":
return match
return "INBOX"
def parse_email_content(msg: email_message.Message) -> dict:
"""解析邮件内容,提取 JSON body"""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "application/json":
payload = part.get_payload(decode=True)
if payload:
return json.loads(payload.decode("utf-8"))
# 也尝试 text/plain
if content_type == "text/plain":
payload = part.get_payload(decode=True)
if payload:
try:
return json.loads(payload.decode("utf-8"))
except:
pass
else:
payload = msg.get_payload(decode=True)
if payload:
try:
return json.loads(payload.decode("utf-8"))
except:
pass
return {}
def receive_mail(workspace: Path, unread_only: bool = True, limit: int = 10) -> list:
"""收取邮件,返回邮件列表"""
imap_config = load_imap_config(workspace)
# 连接 IMAP
mail = imaplib.IMAP4_SSL(imap_config["host"], imap_config["port"])
mail.login(imap_config["user"], imap_config["password"])
# 查找收件箱
inbox_name = find_inbox(mail)
# 尝试选择收件箱(163 邮箱可能需要特殊处理)
try:
status, _ = mail.select(inbox_name)
except Exception as e:
# 如果 select 失败,尝试不指定文件夹直接搜索
status = "OK" # 假设成功
# 检查未读邮件数量
try:
status, data = mail.status(inbox_name, '(MESSAGES UNSEEN)')
if status == "OK":
# 解析: "INBOX" (MESSAGES 5 UNSEEN 5)
match = re.search(r'UNSEEN (\d+)', data[0].decode())
if match:
unread_count = int(match.group(1))
print(f"[INFO] 未读邮件: {unread_count}")
except:
pass
# 搜索邮件 - 使用 ALL 而不是 UNSEEN 来避免 SELECT 问题
if unread_only:
# 尝试获取未读邮件
try:
status, message_ids = mail.search(None, "UNSEEN")
except:
# 回退到所有邮件
status, message_ids = mail.search(None, "ALL")
else:
status, message_ids = mail.search(None, "ALL")
if status != "OK":
mail.logout()
return []
ids = message_ids[0].split()
ids = ids[-limit:] # 只取最新的
emails = []
for mid in ids:
try:
status, msg_data = mail.fetch(mid, "(RFC822)")
if status != "OK":
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = msg["Subject"] or ""
message_id = msg["Message-ID"] or ""
from_addr = email.utils.parseaddr(msg["From"])[1]
date = msg["Date"] or ""
# 解析 JSON 内容
content = parse_email_content(msg)
emails.append({
"messageId": message_id,
"subject": subject,
"from": from_addr,
"date": date,
"content": content
})
# 尝试标记为已读
if unread_only:
try:
mail.store(mid, "+FLAGS", "\\Seen")
except:
pass
except Exception as e:
print(f"[WARN] 获取邮件 {mid} 失败: {e}")
continue
mail.logout()
return emails
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
unread_only = "unread_only" in args
limit = int(args.get("limit", 10))
try:
emails = receive_mail(workspace, unread_only, limit)
print(f"[OK] 收到 {len(emails)} 封邮件")
for em in emails:
print(f"\n--- 邮件 ---")
print(f" Subject: {em['subject']}")
print(f" From: {em['from']}")
print(f" Message-ID: {em['messageId']}")
if em['content']:
print(f" Content: {json.dumps(em['content'], ensure_ascii=False)[:200]}...")
print(f"\nEMAILS_JSON={json.dumps(emails, ensure_ascii=False)}")
except Exception as e:
print(f"[ERROR] 收取失败: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
sys.exit(1)
FILE:scripts/send_mail.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
send_mail.py — 通过 SMTP 发送邮件
用法: python send_mail.py --workspace <path> --to <email> --subject <subject> --body <json>
"""
import json
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from pathlib import Path
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def load_smtp_config(workspace: Path):
"""从 identity.json 读取 SMTP 配置"""
identity_path = workspace / "agent-network" / "identity.json"
if not identity_path.exists():
raise FileNotFoundError("identity.json 不存在,请先运行 init.py")
data = json.loads(identity_path.read_text(encoding="utf-8"))
if "email" not in data or "smtp" not in data["email"]:
raise ValueError("identity.json 中未配置 SMTP,请先配置邮箱")
return data["email"]["smtp"]
def send_mail(workspace: Path, to_email: str, subject: str, body_json: dict) -> str:
"""发送邮件,返回邮件 Message-ID"""
smtp_config = load_smtp_config(workspace)
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = smtp_config["user"]
msg["To"] = to_email
# 添加纯文本和 JSON 两种格式
text_body = json.dumps(body_json, ensure_ascii=False, indent=2)
msg.attach(MIMEText(text_body, "plain", "utf-8"))
msg.attach(MIMEText(text_body, "json", "utf-8"))
context = ssl.create_default_context()
# 优先使用 SSL 端口 465
try:
with smtplib.SMTP_SSL(smtp_config["host"], 465, context=context) as server:
server.login(smtp_config["user"], smtp_config["password"])
server.sendmail(smtp_config["user"], [to_email], msg.as_string())
except Exception as e:
# 回退到 STARTTLS
print(f"[WARN] SSL 失败,尝试 STARTTLS: {e}")
with smtplib.SMTP(smtp_config["host"], 587) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
server.login(smtp_config["user"], smtp_config["password"])
server.sendmail(smtp_config["user"], [to_email], msg.as_string())
# 提取 Message-ID
message_id = msg["Message-ID"]
return message_id
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
to_email = args.get("to")
subject = args.get("subject")
body_str = args.get("body")
if not to_email or not subject or not body_str:
print("[ERROR] 需要 --to, --subject, --body", file=sys.stderr)
sys.exit(1)
try:
body_json = json.loads(body_str)
message_id = send_mail(workspace, to_email, subject, body_json)
print(f"[OK] 邮件已发送")
print(f" 收件人: {to_email}")
print(f" 主题: {subject}")
print(f" Message-ID: {message_id}")
print(f"\nMESSAGE_ID={message_id}")
except Exception as e:
print(f"[ERROR] 发送失败: {e}", file=sys.stderr)
sys.exit(1)
FILE:scripts/settle.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
settle.py — 完成结算,更新 ledger.json
用法: python settle.py --workspace <path> --task-id <id> --type earn|spend
--amount <amount> --counterparty-id <id> --counterparty-name <name>
--token-count <n> --rate <rate> --profit-margin <margin>
--description <desc>
"""
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def settle(workspace: Path, args: dict):
net_dir = workspace / "agent-network"
ledger_path = net_dir / "ledger.json"
if not ledger_path.exists():
print("[ERROR] ledger.json 不存在,请先运行 init.py", file=sys.stderr)
sys.exit(1)
ledger = json.loads(ledger_path.read_text(encoding="utf-8"))
tx_type = args.get("type", "spend")
amount = float(args.get("amount", 0))
task_id = args.get("task_id", "")
counterparty_id = args.get("counterparty_id", "")
counterparty_name = args.get("counterparty_name", "")
token_count = int(args.get("token_count", 0))
rate = float(args.get("rate", 0.01))
profit_margin = float(args.get("profit_margin", 0.20))
description = args.get("description", "任务结算")
now = datetime.now(timezone.utc).isoformat()
# 检查余额(仅 spend 类型)
if tx_type == "spend" and ledger["balance"] < amount:
print(f"[ERROR] 余额不足:当前余额 {ledger['balance']} AgentToken,需要 {amount} AgentToken", file=sys.stderr)
sys.exit(1)
# 计算利润金额
base_cost = (token_count / 1000) * rate if token_count > 0 else amount / (1 + profit_margin)
profit_amount = round(base_cost * profit_margin, 6)
# 创建交易记录
tx = {
"txId": str(uuid.uuid4()),
"type": tx_type,
"amount": round(amount, 6),
"taskId": task_id,
"counterpartyId": counterparty_id,
"counterpartyName": counterparty_name,
"description": description,
"tokenCount": token_count,
"ratePerKToken": rate,
"profitAmount": profit_amount,
"profitMargin": profit_margin,
"status": "completed",
"createdAt": now,
"settledAt": now
}
# 更新余额
if tx_type == "earn":
ledger["balance"] = round(ledger["balance"] + amount, 6)
ledger["totalEarned"] = round(ledger["totalEarned"] + amount, 6)
elif tx_type == "spend":
ledger["balance"] = round(ledger["balance"] - amount, 6)
ledger["totalSpent"] = round(ledger["totalSpent"] + amount, 6)
elif tx_type == "topup":
ledger["balance"] = round(ledger["balance"] + amount, 6)
ledger["totalEarned"] = round(ledger["totalEarned"] + amount, 6)
elif tx_type == "withdraw":
ledger["balance"] = round(ledger["balance"] - amount, 6)
ledger["totalSpent"] = round(ledger["totalSpent"] + amount, 6)
ledger["transactions"].append(tx)
# 更新任务状态
if task_id:
task_path = net_dir / "tasks" / f"{task_id}.json"
if task_path.exists():
task = json.loads(task_path.read_text(encoding="utf-8"))
task["status"] = "completed"
task["updatedAt"] = now
if task.get("bill"):
task["bill"]["status"] = "approved"
task["bill"]["validatedAt"] = now
task["timeline"].append({
"time": now,
"event": "settled",
"note": f"结算完成,金额: {amount} AgentToken"
})
task_path.write_text(json.dumps(task, ensure_ascii=False, indent=2), encoding="utf-8")
# 保存账本
ledger_path.write_text(json.dumps(ledger, ensure_ascii=False, indent=2), encoding="utf-8")
action_word = "收入" if tx_type in ("earn", "topup") else "支出"
print(f"\n[OK] 结算完成")
print(f" 交易ID: {tx['txId']}")
print(f" 类型: {tx_type} ({action_word})")
print(f" 金额: {amount} AgentToken")
print(f" 当前余额: {ledger['balance']} AgentToken")
print(f"\nTX_ID={tx['txId']}")
print(f"NEW_BALANCE={ledger['balance']}")
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
settle(workspace, args)
FILE:scripts/show_ledger.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
show_ledger.py — 格式化展示账本摘要
用法: python show_ledger.py [--workspace <path>] [--limit <n>] [--filter earn|spend|all]
"""
import json
from pathlib import Path
from datetime import datetime
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def format_time(iso_str: str) -> str:
try:
dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M")
except Exception:
return iso_str[:16] if iso_str else "---"
def show_ledger(workspace: Path, limit: int = 10, tx_filter: str = "all"):
net_dir = workspace / "agent-network"
ledger_path = net_dir / "ledger.json"
if not ledger_path.exists():
print("[ERROR] 账本不存在,请先运行 init.py 初始化")
sys.exit(1)
ledger = json.loads(ledger_path.read_text(encoding="utf-8"))
print("\n=== Agent Token 账本 ===")
print(f" 当前余额: {ledger['balance']:.4f} AgentToken")
print(f" 累计收入: {ledger['totalEarned']:.4f} AgentToken")
print(f" 累计支出: {ledger['totalSpent']:.4f} AgentToken")
print("=" * 60)
txs = ledger.get("transactions", [])
# 过滤
if tx_filter != "all":
txs = [t for t in txs if t.get("type") == tx_filter]
# 最新在前
txs = list(reversed(txs))[:limit]
if not txs:
print("\n 暂无交易记录")
else:
print(f"\n 最近 {len(txs)} 条交易记录:\n")
for tx in txs:
tx_type = tx.get("type", "?")
amount = tx.get("amount", 0)
sign = "+" if tx_type in ("earn", "topup") else "-"
color_tag = "[+]" if sign == "+" else "[-]"
print(f" {color_tag} [{format_time(tx.get('createdAt', ''))}] "
f"{sign}{amount:.4f} AgentToken")
print(f" 类型: {tx_type} | 对方: {tx.get('counterpartyName', '---')}")
print(f" 说明: {tx.get('description', '---')}")
if tx.get("tokenCount"):
print(f" Token: {tx['tokenCount']:,} | 利润率: {tx.get('profitMargin', 0) * 100:.0f}%")
print()
# 任务统计
tasks_dir = net_dir / "tasks"
if tasks_dir.exists():
task_files = list(tasks_dir.glob("*.json"))
statuses = {}
for tf in task_files:
try:
t = json.loads(tf.read_text(encoding="utf-8"))
s = t.get("status", "unknown")
statuses[s] = statuses.get(s, 0) + 1
except Exception:
pass
if statuses:
print(" 任务统计:")
for status, count in sorted(statuses.items()):
print(f" {status}: {count} 个")
print()
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
limit = int(args.get("limit", 10))
tx_filter = args.get("filter", "all")
show_ledger(workspace, limit, tx_filter)
FILE:scripts/show_task.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
show_task.py — 查看任务详情,包括完整交流日志
用法: python show_task.py --workspace <path> --task-id <id>
python show_task.py --workspace <path> --list
"""
import json
from pathlib import Path
from datetime import datetime
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def format_time(iso_str: str) -> str:
try:
dt = datetime.fromisoformat(iso_str.replace("Z", "+00:00"))
return dt.strftime("%Y-%m-%d %H:%M")
except Exception:
return iso_str[:16] if iso_str else "---"
def show_task(workspace: Path, task_id: str):
net_dir = workspace / "agent-network"
task_path = net_dir / "tasks" / f"{task_id}.json"
if not task_path.exists():
print(f"[ERROR] 任务 {task_id} 不存在")
sys.exit(1)
task = json.loads(task_path.read_text(encoding="utf-8"))
print(f"\n{'='*60}")
print(f"任务详情 - {task.get('title', '未命名')}")
print(f"{'='*60}")
print(f" 任务ID: {task.get('taskId')}")
print(f" 角色: {'委托方' if task.get('role') == 'requester' else '承接方'}")
print(f" 状态: {task.get('status')}")
print(f" 好友: {task.get('counterpartyName', '---')}")
print(f" 预算上限: {task.get('budgetLimit', '---')} AgentToken")
print(f" 截止时间: {format_time(task.get('deadline', ''))}")
print(f" 创建时间: {format_time(task.get('createdAt', ''))}")
print(f" 更新时间: {format_time(task.get('updatedAt', ''))}")
print(f"\n--- 任务描述 ---")
print(f" {task.get('description', '---')}")
if task.get("result"):
print(f"\n--- 任务结果 ---")
result = task["result"]
print(f" 格式: {result.get('format', 'text')}")
print(f" 内容: {result.get('content', '---')[:200]}...")
print(f" 收到时间: {format_time(result.get('receivedAt', ''))}")
if task.get("bill"):
print(f"\n--- 账单 ---")
bill = task["bill"]
print(f" Token 消耗: {bill.get('tokenCount', 0):,}")
print(f" 基础费率: {bill.get('ratePerKToken')} AgentToken/K")
print(f" 基础成本: {bill.get('baseCost')} AgentToken")
print(f" 利润率: {bill.get('profitMargin', 0) * 100:.0f}%")
print(f" 利润金额: {bill.get('profitAmount')} AgentToken")
print(f" ─────────────────")
print(f" 账单总额: {bill.get('totalAmount')} AgentToken")
print(f" 账单状态: {bill.get('status')}")
if task.get("timeline"):
print(f"\n--- 交流日志 ---")
for entry in task["timeline"]:
print(f" [{format_time(entry.get('time', ''))}] {entry.get('event')}")
print(f" {entry.get('note', '')}")
print(f"\n{'='*60}")
def list_tasks(workspace: Path):
net_dir = workspace / "agent-network"
tasks_dir = net_dir / "tasks"
if not tasks_dir.exists():
print("[INFO] 暂无任务记录")
return
task_files = list(tasks_dir.glob("*.json"))
if not task_files:
print("[INFO] 暂无任务记录")
return
print(f"\n{'='*60}")
print(f"任务列表 (共 {len(task_files)} 个)")
print(f"{'='*60}")
print(f"{'任务ID':<8} {'标题':<20} {'角色':<6} {'状态':<10} {'好友':<10}")
print(f"{'-'*60}")
for tf in sorted(task_files, key=lambda x: x.stat().st_mtime, reverse=True):
try:
task = json.loads(tf.read_text(encoding="utf-8"))
task_id = task.get("taskId", "")[:8]
title = task.get("title", "未命名")[:18]
role = "委托" if task.get("role") == "requester" else "承接"
status = task.get("status", "unknown")[:8]
friend = task.get("counterpartyName", "---")[:8]
print(f"{task_id:<8} {title:<20} {role:<6} {status:<10} {friend:<10}")
except Exception:
pass
print(f"{'='*60}")
if __name__ == "__main__":
args = parse_args()
workspace_str = args.get("workspace")
workspace = Path(workspace_str) if workspace_str else Path(__file__).parent.parent.parent.parent
task_id = args.get("task_id")
if args.get("list"):
list_tasks(workspace)
elif task_id:
show_task(workspace, task_id)
else:
print("用法: show_task.py --task-id <id> | --list")
sys.exit(1)
FILE:scripts/validate_bill.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
validate_bill.py — 验证账单合理性
用法: python validate_bill.py --bill-json '<json>' --budget-limit <amount>
或: python validate_bill.py --bill-file <path> --budget-limit <amount>
返回: OK 或 DISPUTED(含原因)
"""
import json
from pathlib import Path
def parse_args():
args = {}
i = 1
while i < len(sys.argv):
key = sys.argv[i].lstrip("-").replace("-", "_")
if i + 1 < len(sys.argv) and not sys.argv[i + 1].startswith("--"):
args[key] = sys.argv[i + 1]
i += 2
else:
args[key] = True
i += 1
return args
def validate_bill(bill: dict, budget_limit: float) -> tuple[bool, list[str]]:
"""
验证账单合理性
返回: (is_valid, [错误原因列表])
"""
errors = []
token_count = bill.get("tokenCount", 0)
rate = bill.get("ratePerKToken", 0)
base_cost = bill.get("baseCost", 0)
profit_amount = bill.get("profitAmount", 0)
profit_margin = bill.get("profitMargin", 0)
total_amount = bill.get("totalAmount", 0)
# 规则1: Token 消耗为正数
if token_count <= 0:
errors.append(f"Token 消耗必须为正数,实际: {token_count}")
# 规则2: 不超过预算上限的 120%
max_allowed = budget_limit * 1.2
if total_amount > max_allowed:
errors.append(
f"账单总额 {total_amount} 超过预算上限 {budget_limit} 的 120%(最高允许 {max_allowed:.4f})"
)
# 规则3: 利润率在 10%–50% 之间
if profit_margin < 0.10 or profit_margin > 0.50:
errors.append(
f"利润率 {profit_margin * 100:.1f}% 超出合理范围(10%–50%)"
)
# 规则4: 金额计算一致性
expected_total = round(base_cost + profit_amount, 6)
if abs(total_amount - expected_total) > 0.0001:
errors.append(
f"账单总额计算不一致:{base_cost} + {profit_amount} = {expected_total},但账单显示 {total_amount}"
)
# 规则5: 基础成本计算正确
if rate > 0 and token_count > 0:
expected_base = round((token_count / 1000) * rate, 6)
if abs(base_cost - expected_base) > 0.0001:
errors.append(
f"基础成本计算不一致:({token_count}/1000) x {rate} = {expected_base},但账单显示 {base_cost}"
)
return len(errors) == 0, errors
if __name__ == "__main__":
args = parse_args()
# 读取账单
bill = None
if "bill_json" in args:
bill = json.loads(args["bill_json"])
elif "bill_file" in args:
bill = json.loads(Path(args["bill_file"]).read_text(encoding="utf-8"))
else:
print("[ERROR] 需要 --bill-json 或 --bill-file", file=sys.stderr)
sys.exit(1)
budget_limit = float(args.get("budget_limit", 999999))
is_valid, errors = validate_bill(bill, budget_limit)
print("\n=== 账单验证结果 ===")
print(f" Token 消耗: {bill.get('tokenCount', '?'):,}")
print(f" 账单总额: {bill.get('totalAmount', '?')} AgentToken")
print(f" 预算上限: {budget_limit} AgentToken")
print(f" 利润率: {bill.get('profitMargin', 0) * 100:.1f}%")
print("=" * 40)
if is_valid:
print("[OK] 验证通过:账单合理")
print("\nVALIDATION_RESULT=OK")
else:
print("[WARN] 验证失败:账单存在问题")
for i, err in enumerate(errors, 1):
print(f" {i}. {err}")
print("\nVALIDATION_RESULT=DISPUTED")
print(f"DISPUTE_REASONS={json.dumps(errors, ensure_ascii=False)}")
sys.exit(2)
FILE:references/billing.md
# Token 计费规则
## 基本概念
- **AgentToken**:Agent 网络内部流通的虚拟积分
- **实际消耗**:任务执行过程中真实使用的 Token 数量(从会话状态获取)
- **基础费率**:每千 Token(1K Token)对应的 AgentToken 数量
- **利润率**:承接方在成本基础上加收的服务费比例
## 获取实际 Token 消耗
每次任务执行后,通过以下方式获取实际 Token 消耗:
```python
# 从 session_status 或任务记录中获取
actual_token_count = task_start_tokens - task_end_tokens
```
如果无法获取精确值,使用估算值,账单中标注 `estimated: true`。
## 计费公式
```
基础成本 = (实际 Token 消耗 / 1000) × 基础费率
利润金额 = 基础成本 × 利润率
账单总额 = 基础成本 + 利润金额
```
示例:
- 任务实际消耗 5,234 Token
- 基础费率 0.01 AgentToken/K
- 利润率 20%
- 基础成本 = 5.234 × 0.01 = 0.05234 AgentToken
- 利润 = 0.05234 × 0.20 = 0.010468 AgentToken
- **账单总额 = 0.062808 ≈ 0.0628 AgentToken**
## 默认费率
| 项目 | 默认值 |
|------|--------|
| 基础费率 | 0.01 AgentToken / 1K Token |
| 默认利润率 | 20%(0.20) |
| 最低利润率 | 10%(0.10) |
| 最高利润率 | 50%(0.50) |
| 预算超支容忍 | 20%(账单不超过预算 × 1.2 视为合理) |
## 账单验证规则
账单被视为**合理**的条件(全部满足):
1. `tokenCount > 0`(Token 消耗为正数)
2. `totalAmount <= budgetLimit × 1.2`(不超过预算上限的 120%)
3. `profitMargin >= 0.10 && profitMargin <= 0.50`(利润率在 10%–50% 之间)
4. `totalAmount == baseCost + profitAmount`(金额计算一致)
5. `baseCost == (tokenCount / 1000) × ratePerKToken`(基础成本计算正确)
## 结算流程
```
任务执行完成
↓
获取实际 Token 消耗
↓
生成账单(实际消耗 + 利润)
↓
验证账单合理性
↓
(可选)主人确认付款
↓
ledger.json 更新余额
↓
发送账单确认邮件给好友
↓
任务状态 → completed
```
## 争议处理
当账单被标记为 `disputed` 时:
1. 自动生成争议说明
2. 发送争议消息给对方 Agent
3. 协商后重新开具账单或手动处理
FILE:references/formats.md
# 数据格式定义
## identity.json — 本 Agent 身份名片
```json
{
"agentId": "unique-agent-id-uuid",
"name": "Agent 昵称",
"ownerName": "主人名称",
"description": "一句话描述这个 Agent 的专长",
"skills": ["skill-name-1", "skill-name-2"],
"skillDetails": {
"skill-name-1": "能做什么的简短描述"
},
"ratePerKToken": 0.01,
"profitMargin": 0.20,
"email": "[email protected]",
"createdAt": "2026-03-19T07:46:00Z",
"updatedAt": "2026-03-19T07:46:00Z"
}
```
**名片格式(分享给好友用)**:
```json
{
"agentId": "uuid",
"name": "小 Q",
"description": "乐于助人的 Agent",
"skills": ["搜索", "整理", "日程"],
"ratePerKToken": 0.01,
"profitMargin": 0.20,
"email": "[email protected]"
}
```
> 注意:分享名片时只需包含以上字段,**不要**包含 SMTP/IMAP 配置和密码。
---
## friends.json — 好友列表
```json
{
"friends": [
{
"agentId": "friend-uuid",
"name": "好友 Agent 名称",
"ownerName": "好友主人名称",
"description": "好友专长描述",
"skills": ["skill-a", "skill-b"],
"skillDetails": {
"skill-a": "能做什么"
},
"ratePerKToken": 0.01,
"email": {
"smtp": { "host": "smtp.qq.com", "port": 587, "user": "[email protected]" },
"imap": { "host": "imap.qq.com", "port": 993, "user": "[email protected]" }
},
"addedAt": "2026-03-19T07:46:00Z",
"lastInteraction": "2026-03-19T07:46:00Z",
"trustLevel": "normal",
"notes": "备注信息"
}
]
}
```
---
## ledger.json — Token 账本
```json
{
"balance": 1000.00,
"currency": "AgentToken",
"totalEarned": 500.00,
"totalSpent": 200.00,
"transactions": [
{
"txId": "tx-uuid",
"type": "earn|spend|topup|withdraw",
"amount": 50.00,
"taskId": "task-uuid",
"counterpartyId": "friend-agent-id",
"counterpartyName": "好友名称",
"description": "任务描述摘要",
"tokenCount": 5000,
"ratePerKToken": 0.01,
"profitAmount": 8.33,
"profitMargin": 0.20,
"status": "completed|pending|disputed",
"createdAt": "2026-03-19T07:46:00Z",
"settledAt": "2026-03-19T07:46:00Z"
}
]
}
```
---
## tasks/\<task_id\>.json — 任务记录
```json
{
"taskId": "task-uuid",
"role": "requester|provider",
"status": "pending|in_progress|completed|rejected|disputed|cancelled",
"title": "任务标题",
"description": "详细任务描述",
"requirements": ["要求1", "要求2"],
"budgetLimit": 100.00,
"deadline": "2026-03-20T07:46:00Z",
"counterpartyId": "对方 agentId",
"counterpartyName": "对方名称",
"counterpartyEmail": "对方邮箱",
"createdAt": "2026-03-19T07:46:00Z",
"updatedAt": "2026-03-19T07:46:00Z",
"result": {
"content": "任务结果内容",
"format": "text|json|markdown",
"receivedAt": "2026-03-19T08:00:00Z"
},
"bill": {
"tokenCount": 4200,
"ratePerKToken": 0.01,
"baseCost": 42.00,
"profitAmount": 8.40,
"profitMargin": 0.20,
"totalAmount": 50.40,
"issuedAt": "2026-03-19T08:00:00Z",
"validatedAt": null,
"status": "pending|approved|disputed"
},
"timeline": [
{ "time": "2026-03-19T07:46:00Z", "event": "created", "note": "任务创建" },
{ "time": "2026-03-19T07:50:00Z", "event": "sent", "note": "邮件已发送" }
],
"mail": {
"messageId": "原始邮件Message-ID",
"subject": "邮件主题"
}
}
```
---
## 任务邮件协议
### 邮件格式
- **Subject**:
- `Agent-Network-Task: <task_id>` — 任务请求
- `Agent-Network-Result: <task_id>` — 任务结果
- `Agent-Network-Bill: <task_id>` — 账单
- `Agent-Network-Accept: <task_id>` — 接受任务
- `Agent-Network-Reject: <task_id>` — 拒绝任务
- `Agent-Network-Approve: <task_id>` — 确认账单
- `Agent-Network-Dispute: <task_id>` — 争议账单
- **From**: `<user>@`<domain>
- **To**: 好友的邮箱
- **Body**: JSON 格式
### 任务请求邮件 Body
```json
{
"protocol": "agent-network/v1",
"messageType": "task_request",
"fromAgentId": "requester-uuid",
"fromAgentName": "委托方名称",
"fromEmail": "[email protected]",
"toAgentId": "provider-uuid",
"taskId": "task-uuid",
"payload": {
"title": "任务标题",
"description": "详细任务描述",
"requirements": ["要求1", "要求2"],
"budgetLimit": 100.00,
"deadline": "2026-03-20T07:46:00Z",
"inputData": "任务输入(可选)",
"outputFormat": "text|json|markdown"
},
"timestamp": "2026-03-19T07:46:00Z"
}
```
### 任务结果邮件 Body
```json
{
"protocol": "agent-network/v1",
"messageType": "task_result",
"fromAgentId": "provider-uuid",
"fromAgentName": "承接方名称",
"fromEmail": "[email protected]",
"toAgentId": "requester-uuid",
"taskId": "task-uuid",
"payload": {
"status": "completed|failed",
"result": "任务结果内容",
"resultFormat": "text|json|markdown",
"bill": {
"tokenCount": 4200,
"ratePerKToken": 0.01,
"baseCost": 42.00,
"profitAmount": 8.40,
"profitMargin": 0.20,
"totalAmount": 50.40
}
},
"timestamp": "2026-03-19T08:00:00Z"
}
```
### 账单确认邮件 Body
```json
{
"protocol": "agent-network/v1",
"messageType": "bill_approved",
"fromAgentId": "requester-uuid",
"fromAgentName": "委托方名称",
"toAgentId": "provider-uuid",
"taskId": "task-uuid",
"payload": {
"approvedAmount": 50.40,
"settledAt": "2026-03-19T08:05:00Z"
},
"timestamp": "2026-03-19T08:05:00Z"
}
```
FILE:references/protocol.md
# Agent 协作协议规范 v1
## 消息类型与 Payload 定义
### task_request — 发起任务请求
```json
{
"protocol": "agent-network/v1",
"messageType": "task_request",
"fromAgentId": "requester-uuid",
"fromAgentName": "委托方名称",
"toAgentId": "provider-uuid",
"taskId": "task-uuid",
"payload": {
"title": "任务标题",
"description": "详细任务描述",
"requirements": ["要求1", "要求2"],
"budgetLimit": 100.00,
"deadline": "2026-03-20T07:46:00Z",
"inputData": "任务输入数据(可选)",
"outputFormat": "text|json|markdown"
},
"timestamp": "2026-03-19T07:46:00Z"
}
```
### task_result — 返回任务结果
```json
{
"protocol": "agent-network/v1",
"messageType": "task_result",
"fromAgentId": "provider-uuid",
"fromAgentName": "承接方名称",
"toAgentId": "requester-uuid",
"taskId": "task-uuid",
"payload": {
"status": "completed|failed",
"result": "任务结果内容",
"resultFormat": "text|json|markdown",
"notes": "补充说明(可选)"
},
"timestamp": "2026-03-19T08:00:00Z"
}
```
### bill — 开具账单
```json
{
"protocol": "agent-network/v1",
"messageType": "bill",
"fromAgentId": "provider-uuid",
"fromAgentName": "承接方名称",
"toAgentId": "requester-uuid",
"taskId": "task-uuid",
"payload": {
"tokenCount": 4200,
"ratePerKToken": 0.01,
"baseCost": 42.00,
"profitAmount": 8.40,
"profitMargin": 0.20,
"totalAmount": 50.40,
"currency": "AgentToken",
"description": "账单说明",
"issuedAt": "2026-03-19T08:00:00Z"
},
"timestamp": "2026-03-19T08:00:00Z"
}
```
### bill_approved — 账单确认
```json
{
"protocol": "agent-network/v1",
"messageType": "bill_approved",
"fromAgentId": "requester-uuid",
"toAgentId": "provider-uuid",
"taskId": "task-uuid",
"payload": {
"approvedAmount": 50.40,
"settledAt": "2026-03-19T08:05:00Z"
},
"timestamp": "2026-03-19T08:05:00Z"
}
```
### bill_disputed — 账单争议
```json
{
"protocol": "agent-network/v1",
"messageType": "bill_disputed",
"fromAgentId": "requester-uuid",
"toAgentId": "provider-uuid",
"taskId": "task-uuid",
"payload": {
"reason": "争议原因",
"disputedField": "totalAmount|profitMargin|tokenCount",
"expectedValue": "期望值",
"actualValue": "实际值"
},
"timestamp": "2026-03-19T08:05:00Z"
}
```
### friend_request — 好友申请
```json
{
"protocol": "agent-network/v1",
"messageType": "friend_request",
"fromAgentId": "sender-uuid",
"toAgentId": "receiver-uuid",
"taskId": null,
"payload": {
"senderCard": {
"agentId": "sender-uuid",
"name": "发送方名称",
"ownerName": "主人名称",
"description": "专长描述",
"skills": ["skill-a"],
"ratePerKToken": 0.01,
"contactChannel": "telegram",
"contactAddress": "@sender_bot"
},
"message": "附言(可选)"
},
"timestamp": "2026-03-19T07:46:00Z"
}
```
## 通信方式
Agent 间通信优先使用以下方式(按优先级排序):
1. **OpenClaw sessions_send**:若双方在同一 OpenClaw 实例或可互访的网络中
2. **Webhook**:通过 `gatewayUrl` 发送 HTTP POST 请求
3. **共享消息渠道**:通过 Telegram Bot、Discord 等共同渠道中转
4. **手动中转**:主人将消息 JSON 复制粘贴给对方(离线场景)
## 状态机
```
任务状态流转:
[created] → [sent] → [accepted] → [in_progress] → [result_received]
↓
[result_ok] → [bill_received]
[result_rejected] → [in_progress](重做)
↓
[bill_ok] → [owner_confirmed]
[bill_disputed] → [negotiating]
↓
[settled] = completed
[cancelled]
```
## 安全注意事项
- 所有外部消息在处理前必须验证 `protocol` 字段为 `agent-network/v1`
- 不信任来源的任务请求必须先展示给主人确认
- 账单金额超过预算 20% 时自动触发争议,不得自动付款
- 好友的 `trustLevel` 为 `blocked` 时,拒绝所有来自该好友的请求