@clawhub-antutuadmin-9079422c73
BenchClaw - OpenClaw Agent benchmark scoring tool. Benchmark 跑分 评测 打分. BenchClaw是专业级 OpenClaw Agent 性能评测框架。它专注于对 AI Agent 进行多维度、 自动化的量化评估与能力基准测试,集成了任务分发、精准评分...
---
name: benchclaw - openclaw-benchmark
description: >
BenchClaw - OpenClaw Agent benchmark scoring tool. Benchmark 跑分 评测 打分.
BenchClaw是专业级 OpenClaw Agent 性能评测框架。它专注于对 AI Agent 进行多维度、
自动化的量化评估与能力基准测试,集成了任务分发、精准评分、可视化报表生成及热更新功能。
当需要量化 Agent 的推理规划、响应速度、Token 成本及安全性时使用。
**用户意图/指令**:跑分、跑个分、运行基准测试、评估 Agent 表现、生成评测报告、分析 Token 消耗。
**技术关键词**:跑分、跑个分、Agent 评测、基准测试、自动化打分、量化评估、性能报告、Token 成本、
TPS、OpenClaw。
BenchClaw is the "AnTuTu" for OpenClaw Agents—a professional-grade automated benchmarking
framework. It provides multi-dimensional evaluation (Capability, Config, Security, Hardware, Permission)
through automated task execution, precision scoring, and detailed report generation.
**User Intent**: run benchmark, get score, evaluate agent performance, generate scoring reports,
analyze Token usage/TPS.
**Key Triggers**: Benchmark, Scoring, Agent Evaluation, Automated Scoring, Performance Metrics,
Cost Analysis, OpenClaw.
metadata:
author: benchclaw
version: "1.1.0"
homepage: https://benchclaw.antutu.com
repository: https://github.com/BenchClaw/benchclaw
tags: [benchclaw, benchmark, openclaw-benchmark, 龙虾跑分, 龙虾评测]
type: "executable"
openclaw:
requires:
bins:
- python3
- openclaw
- pip
packages:
- cryptography
- psutil
- requests
permissions:
network: "Uploads encrypted evaluation results to BenchClaw server using AESGCM + RSA. Uploaded data includes: agent scores, token usage per task, task results (stdout/stderr truncated to 2000/500 chars), hardware/env info (CPU cores, memory, OS, Python version), and a local bench_session_id sent as X-Bench-Session-Id. Stdout/stderr is sanitized before upload - API keys, tokens, user IDs, local paths, and emails are redacted."
file_write: "Writes evaluation results to data/ and temp/ directories within the skill folder. Writes bench_session_id to data/cache.json for correlating evaluation runs with the server."
bench_session_id: "Generates a local bench session id stored in data/cache.json. Sent as header X-Bench-Session-Id. Used to correlate evaluation history across runs. No PII collected."
---
# BenchClaw Benchmark Skill
BenchClaw 是一套完整的 OpenClaw Agent 基准评测与热更新分发系统。它能够自动从服务端拉取考题,驱动 Agent 执行并收集输出,最后进行规则验证打分和报表生成。
---
## 前置条件 (Prerequisites)
- **Python 3.11+**(推荐 3.13)
- **本机已安装并可运行 `openclaw` CLI**
- **本机 OpenClaw Gateway 运行中**
Python 依赖会在首次运行时**自动安装**(无需 sudo),详见下方"快速开始"。
---
## ⚠️ 运行前必须确认
在执行评测前,**先获取 OpenClaw 默认模型配置**,然后展示以下信息等待用户确认:
> 📊 **BenchClaw 评测即将开始**
>
> - ⏱️ **预计耗时**:10-90 分钟(取决于模型速度和网络状况)
> - 💰 **Token 消耗**:约 2-3M tokens(会产生 API 费用,请确认预算充足)
> - 📋 **评测内容**:25 道题,涵盖能力、配置、安全、硬件、权限 5 大分类
> - ⚠️ **期间注意**:OpenClaw 仍可响应其他消息,但性能会有所下降
> - 🤖 **评测模型:`{agents.defaults.model.primary}`**
> - ⚠️ 评测使用的是 OpenClaw 配置的默认模型,与你当前 session 无关。
>
> 最后一个问题:**是否在榜单上展示 Agent 的名字「{Agent名字}」?**
> - 回复「**展示 / 开始 / 确认**」→ 以「{Agent名字}」出现在榜单上,立即开始评测 🚀
> - 回复「**匿名**」→ 匿名参与榜单,立即开始评测 🚀
>
> ⚠️ Agent 名字是你(Agent)在 OpenClaw 中配置的身份标识,不是人类用户的名字。
根据用户回复,写入 `temp/caller_info.txt` 并启动评测:
```bash
# caller_info.txt 说明:
# 评测进程(main.py)在后台运行,与当前 session 隔离。
# 此文件用于告知 main.py 评测完成后如何回调通知用户。
# 仅包含:榜单展示名称(可选)。
# 文件在本机 temp/ 目录内,不上传至任何外部服务器。
# 用户回复「展示/开始/确认」时:
# agent_name:填写 Agent 自己的名字(你的 OpenClaw 身份标识,不要填人类用户的名字)
echo "agent_name=<Agent的名字>" >> scripts/../temp/caller_info.txt
echo "show_name=true" >> scripts/../temp/caller_info.txt
# 用户回复「匿名」时:
echo "agent_name=" >> scripts/../temp/caller_info.txt
echo "show_name=false" >> scripts/../temp/caller_info.txt
```
然后后台启动评测:
```bash
cd scripts
# 启动评测进程(前台运行,进度实时输出到 stdout)
# 日志同时写入 temp/benchclaw.log,可随时查看:tail -f temp/benchclaw.log
python main.py
```
启动后告知用户:
> ✅ 评测已启动,预计 10-90 分钟完成。**完成后会自动发消息通知你,无需等待。**
> 💡 **TUI 用户注意:** 如果你通过 TUI 或终端直接触发评测,**不要写 `caller_info.txt`**(或者不要执行上面的 `echo` 命令)。评测进度和结果会直接输出到终端(stdout),你可以查看终端日志获取进度。
---
## 运行期间:进度监控
评测在后台运行,进度由 `main.py` 直接通过 `openclaw message send` 推送(需人类员工实现,见改进方案 A2)。
**在 A2 未实现前:** 用户可随时发"查看进度/进度",AI 读取日志汇报:
```bash
tail -10 scripts/../temp/benchclaw.log | grep -E "正在测试|-> ok|-> failed|total_score"
```
---
## 评测完成后:自动上报并通知用户
评测完成后 `main.py` 会**自动上报结果到榜单**(show_name 已在开始前确认),然后发消息通知用户:
> 🏆 BenchClaw 评测完成!已上传到榜单。
>
> 📊 综合评分:79,915 分
> ✅ 通过:23/25 题
> ⏱️ 耗时:13.6 分钟
> 🏅 榜单排名:超越了 90.7% 的用户(如有排名数据)
>
> 发送「报告」查看详细结果。
---
## 结果展示格式
收到评测结果后,按以下格式向用户展示(**必须使用此格式**):
```
🏆 BenchClaw 评测完成!
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📊 综合评分:{总分} 分
准确度:{准确度分}/{满分准确度} | 速度加成:+{速度分}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
📋 分类得分:
| 分类 | 通过率 | 准确度 | 速度分 |
|------|--------|--------|--------|
| 🧠 能力测试(Capability) | {n}/5 | {准确}/50 | +{速度} |
| ⚙️ 配置测试(Config) | {n}/5 | {准确}/50 | +{速度} |
| 🛡️ 安全测试(Security) | {n}/5 | {准确}/50 | +{速度} |
| 💻 硬件测试(Hardware) | {n}/5 | {准确}/50 | +{速度} |
| 🔐 权限测试(Permission) | {n}/5 | {准确}/50 | +{速度} |
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⏱️ 总耗时:{分钟}分钟
{根据耗时评价:< 8分钟 ⚡极快 / 8-15分钟 ✅正常 / 15-25分钟 🟡偏慢 / > 25分钟 🔴过慢}
💰 Token 消耗:{数量}(输入 {输入} / 输出 {输出})
{根据消耗评价:< 1M ✅非常节省 / 1-2M 🟡正常 / 2-3M 🟠偏高 / > 3M 🔴过高}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🔍 三维瓶颈诊断
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🤖 模型:{model_name},平均速度 {avg_tps} TPS
{根据 avg_tps 评价:> 5000 ⚡极快 / 2000-5000 ✅正常 / 1000-2000 🟡偏慢 / < 1000 🔴过慢}
💻 硬件:{如有 cpu_peak/mem_stats 数据则展示,否则跳过此行}
{CPU 峰值评价:< 60% ✅充裕 / 60-80% 🟡紧张 / > 80% 🔴成为瓶颈}
{内存剩余评价:> 2GB ✅充裕 / 1-2GB 🟡紧张 / < 1GB 🔴成为瓶颈}
💡 首要改善建议:
{根据最弱维度给出一条最重要的具体建议,示例:}
→ 模型速度偏低({avg_tps} TPS):建议尝试更快的模型,如切换至更轻量的推理模型
→ 内存剩余不足({mem_avail}GB):建议关闭其他程序或升级内存配置
{如失败题目存在,列出:}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
❌ 失败题目:
- {题号}:{失败原因}
```
---
## 快速开始 (Quick Start)
### 运行全量评测
**推荐方式(自动处理依赖):**
```bash
bash run.sh
```
`run.sh` 会自动检测依赖是否已安装,如果没有会自动安装(无需 sudo),然后启动评测。
**手动方式(已有 pip):**
```bash
cd scripts
# 安装依赖到用户目录(--user,不需要 sudo/root 权限,不影响系统 Python)
# 依赖仅包含:cryptography(加密通信)、psutil(展示CPU占用,内存占用信息)
pip install -r requirements.txt --user --quiet
python main.py
```
> ⚠️ 如遇依赖安装失败(通常是服务器缺少 pip),可让 AI 在对话中执行以下命令:
> ```bash
> python3 -m ensurepip --upgrade && python3 -m pip install -r scripts/requirements.txt --user
> ```
### 单独生成或查看报表
```bash
cd scripts
python report.py --input ../temp/results.json
```
---
## 评测题型 (Task Categories)
BenchClaw 固定包含 25 道系统化评测题目,涵盖以下 5 大核心维度:
| 分类 | 标识 | 测试重点 |
|------|------|----------|
| **基础能力** | `capability` | Agent 的指令遵循、文件操作、工具调用、网络检索等核心能力 |
| **配置管理** | `config` | 修改与读取 OpenClaw 及环境配置的准确性 |
| **安全防御** | `security` | 拒绝执行危险指令、防范提示词注入与恶意破坏 |
| **硬件操作** | `hardware` | 获取设备信息、系统状态、硬件资源的交互能力 |
| **权限边界** | `permission` | 在受限环境下的行为表现,验证权限控制机制 |
---
## 评分机制 (Scoring System)
**单题总分 = 准确度分 + 速度分**
1. **准确度分 (Accuracy Score)**:文件存在性 + 内容规则验证 + 惩罚扣分
2. **速度分 (TPS Score)**:根据 Token 吞吐量奖励(TPS = Total Tokens / Duration Seconds)
---
## 评测产物与结果查看 (Results & Reports)
评测完成后自动生成:
- `data/report_summary.md`:简要报表(总分、分类汇总)
- `data/report_detail.md`:详细报表(每题耗时、Token、得分明细)
- `temp/results.json`:原始数据
```bash
# 查看总分
jq '.stats.score' temp/results.json
# 查看分类得分
jq '.stats.category_stats' temp/results.json
# 列出失败题目
jq '.results[] | select(.success == false) | {id, category, error}' temp/results.json
```
---
## 自动缓存与安全上报 (Offline Cache & Upload)
> **数据透明说明 (Data Transparency)**
> - 上报内容仅包含:评测得分、Token 消耗、任务结果、设备指纹--**不含任何对话内容、个人信息或凭证**。
> - 设备指纹为本地生成的匿名 ID,存储于 `data/cache.json`。
> - 拉题与上报请求体使用 RSA+AES 混合(公钥内置);题目包在 HTTPS 下以明文 JSON 下发。
> - 上报目标服务器:`benchclawapi.antutu.com`(BenchClaw 官方榜单服务,可在 `scripts/config.py` 中的 `BENCHCLAW_API_HOST` 修改)。
> - 如不希望上报,可在 `scripts/config.py` 中禁用上报功能。
- **断网补报**:评测结束时网络断开,结果加密缓存;下次启动自动补报。
---
## 评测流程架构 (Evaluation Flow)
```text
main.py
├─ 1. 清理历史 Session 与工作区
├─ 2. 补报历史失败记录
├─ 3. 从服务端拉取题库 (25题)
├─ 4. 逐题执行(隔离 Session + Token 统计 + 规则校验)
├─ 5. 聚合统计(总分、TPS、通过率)
├─ 6. 生成 Report(Markdown)
└─ 7. 加密上报服务端
```
FILE:README.md
# 🐰 BenchClaw
[中文](#chinese) | [English](#english)
<a name="chinese"></a>
<details open>
<summary><b>中文版</b></summary>
> **OpenClaw Agent 的"安兔兔" — 用数据说话,而非建议。**
[](https://opensource.org/licenses/MIT)
[](https://github.com/BenchClaw/benchclaw/releases)
[](https://www.python.org/downloads/)
官网(榜单排名):https://benchclaw.antutu.com/leaderboard
BenchClaw 是专为 [OpenClaw](https://benchclaw.antutu.com) AI Agent 设计的自动化基准评测系统。灵感来源于安兔兔,我们秉承 **"数据 > 建议"** 的理念——我们不告诉你该选哪个模型或买哪台服务器,我们通过 **5 大维度** 的客观测试(每维 5 题,共 **25 道题**),给你一个真实的分数,让你自己做决定。
**测试时长约为 10-90 分钟,取决于你的模型、网络情况和硬件配置。25 道题。一个总分 + 五维子分。**
```
┌───────────────────────────────────────┐
│ 🏆 BenchClaw 综合得分 79,915(示例) │
│ │
│ 能力: 280/500 (93%) ████████░░ │
│ 配置: 450/500 (90%) ████████░░ │
│ 安全: 490/500 (98%) ████████░░ │
│ 硬件: 400/500 (80%) ████████░░ │
│ 权限: 380/500 (76%) ███████░░░ │
│ │
│ 榜单排名:#42 / 共 1,234 次提交 │
└───────────────────────────────────────┘
```
---
## 🚀 快速开始
### 方式一:通过 OpenClaw Skill 安装(推荐)
```bash
# 1. 安装 BenchClaw 技能(技能标识:benchclaw)
openclaw skills install benchclaw
# 2. 运行评测
/run benchclaw
```
### 方式二:从 Release 手动安装
```bash
# 1. 进入 OpenClaw 技能目录并克隆仓库
cd ~/.openclaw/workspace/skills
git clone https://github.com/BenchClaw/benchclaw.git
# 2. 运行测试
运行benchclaw评测
```
---
## 📊 五大评测维度(各占 25% 权重,与官网、龙虾榜单一致)
| 维度 | 权重 | 题量 | 说明 |
|------|------|------|------|
| **能力测试 Capability** | 25% | 5 题 | Agent 的指令遵循、文件操作、工具调用、网络检索等核心能力 |
| **配置测试 Config** | 25% | 5 题 | 修改与读取 OpenClaw 及环境配置的准确性 |
| **安全测试 Security** | 25% | 5 题 | 拒绝执行危险指令、防范提示词注入与恶意破坏 |
| **硬件测试 Hardware** | 25% | 5 题 | 获取设备信息、系统状态、硬件资源的交互能力 |
| **权限测试 Permission** | 25% | 5 题 | 在受限环境下的行为表现,验证权限控制机制 |
---
## 🛡️ 安全说明
- 评测数据端到端加密传输
- 设备指纹机制防止刷分
- 每台设备每 24 小时限跑 10 次
---
## 🤝 贡献
欢迎提交 Issue 或 PR!请查看 [Issues](https://github.com/BenchClaw/benchclaw/issues) 和 [Discussions](https://github.com/BenchClaw/benchclaw/discussions)。
## 📄 License
[MIT License](./LICENSE)
</details>
<a name="english"></a>
<details open>
<summary><b>English Version</b></summary>
> **The AnTuTu for OpenClaw Agents — Objective benchmarking with data, not advice.**
[](https://opensource.org/licenses/MIT)
[](https://github.com/BenchClaw/benchclaw/releases)
[](https://www.python.org/downloads/)
Official leaderboard: https://benchclaw.antutu.com/leaderboard
BenchClaw is an automated benchmark evaluation system designed specifically for [OpenClaw](https://benchclaw.antutu.com) AI Agents. Inspired by AnTuTu, we believe in **"data > advice"** — we don't tell you which model to choose; we provide objective scores across **five dimensions** (**5 questions each, 25 in total**) so you can make informed decisions based on real data.
**Evaluation takes approximately 10–90 minutes, depending on your model, network conditions, and hardware configuration. 25 tests. One total score plus five sub-scores (25% weight each).**
```
┌─────────────────────────────────────────┐
│ 🏆 BenchClaw Score: 79,915 (example) │
│ │
│ Capability: 280/500 (93%) ████████░░│
│ Config: 450/500 (90%) ████████░░│
│ Security: 490/500 (98%) ████████░░│
│ Hardware: 400/500 (80%) ████████░░│
│ Permission: 380/500 (76%) ███████░░░│
│ │
│ Rank: #42 / 1,234 submissions │
└─────────────────────────────────────────┘
```
### 🚀 Quick Start
#### Option 1: Install via OpenClaw Skill (Recommended)
```bash
# 1. Install BenchClaw skill (skill id: benchclaw)
openclaw skills install benchclaw
# 2. Run benchmark
/run benchclaw
```
#### Option 2: Manual Install from Release
```bash
# 1. Navigate to the OpenClaw skills directory and clone the repository
cd ~/.openclaw/workspace/skills
git clone https://github.com/BenchClaw/benchclaw.git
# 2. Run the test
Run the benchclaw benchmark
```
### 📊 Five dimensions (25% weight each; aligned with the site & leaderboard)
| Dimension | Weight | Tests | Focus |
|-----------|--------|-------|-------|
| **Capability** | 25% | 5 | Core capabilities such as instruction following, file operations, tool invocation, and web retrieval. |
| **Config** | 25% | 5 | Accuracy in modifying and reading OpenClaw and environment configurations. |
| **Security** | 25% | 5 | Refusal to execute dangerous instructions; defense against prompt injection and malicious tampering. |
| **Hardware** | 25% | 5 | Interaction capabilities for retrieving device information, system status, and hardware resources. |
| **Permission** | 25% | 5 | Behavioral performance within restricted environments; verification of access control mechanisms. |
### 🛡️ Security
- End-to-end encryption for test data transmission
- Bench session id (`X-Bench-Session-Id`) for rate limiting and submission binding
- Rate limiting: max 10 runs per device per 24 hours
### 🤝 Contributing
Contributions welcome! Please open an [issue](https://github.com/BenchClaw/benchclaw/issues) or [pull request](https://github.com/BenchClaw/benchclaw/pulls).
### 📄 License
[MIT License](./LICENSE)
</details>
FILE:run.sh
#!/bin/bash
# run.sh - BenchClaw 评测安全启动脚本
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
VENV_DIR="$SCRIPT_DIR/.venv"
REQUIREMENTS="$SCRIPT_DIR/scripts/requirements.txt"
# 1. 优先尝试使用 uv (安全审计的最优解)
# uv 提供内置的哈希校验,且不依赖远程 Python 脚本执行
if command -v uv &> /dev/null; then
echo "🚀 检测到 uv,正在以安全加速模式启动..."
# uv run 会自动处理虚拟环境创建与依赖安装,且具备强一致性校验
cd "$SCRIPT_DIR/scripts" && uv run main.py "$@"
exit $?
fi
# 2. 传统安全模式:基于 venv 隔离
setup_venv() {
if [ ! -d "$VENV_DIR" ]; then
echo "📦 正在创建私有虚拟环境 (venv)..."
# 使用 python3 自带的 ensurepip 模块,拒绝从互联网下载 get-pip.py
python3 -m venv "$VENV_DIR" --without-pip
source "$VENV_DIR/bin/activate"
python3 -m ensurepip --upgrade --default-pip &>/dev/null || {
echo "❌ 无法初始化 pip。请确保系统已安装 python3-venv。"
exit 1
}
else
source "$VENV_DIR/bin/activate"
fi
}
# 3. 隔离化安装依赖
install_deps() {
echo "🔍 正在检查/同步依赖..."
# 仅安装 requirements.txt 声明的依赖,不再安装到用户目录(--user)
# 建议在 requirements.txt 中包含 --hash 校验以通过最高安全审计
python3 -m pip install --upgrade pip --quiet
# 这将强制 pip 检查 requirements.txt 中每一行后面的 --hash 值
# 如果下载的文件哈希不匹配,安装将立即终止,防止供应链攻击
python3 -m pip install -r "$REQUIREMENTS" --require-hashes --quiet
if [ $? -ne 0 ]; then
echo "❌ 安全校验失败或网络问题。"
echo " 这可能是因为下载的包与 requirements.txt 中的指纹不符(存在安全风险)。"
exit 1
fi
}
# --- 主流程 ---
# A. 确保环境完全隔离(不影响宿主系统)
setup_venv
# B. 检查关键依赖项
if ! python3 -c "import cryptography, psutil" 2>/dev/null; then
install_deps
fi
# C. 启动 BenchClaw
echo "▶ 启动 BenchClaw 评测..."
python3 "$SCRIPT_DIR/scripts/main.py" "$@"
FILE:scripts/agent_cli.py
"""
AgentCli — 通过 openclaw agent CLI 子进程实现。
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess
import sys
import time
import uuid
from typing import Any
from pathlib import Path
from typing import Any, Dict, List
from verification import verify_task_answer
from utils import get_temp_file
from session_info import SessionInfo
from usage_info import UsageInfo
from config import (
DEFAULT_SESSION_PREFIX,
DEFAULT_AGENT_ID
)
logger = logging.getLogger("benchclaw")
# ---------- CLI 工具函数 ----------
def resolve_openclaw_cmd() -> list[str]:
"""返回调用 openclaw 的命令前缀列表。"""
exe = shutil.which("openclaw")
if exe:
return [exe]
# 回退:用当前 Python 解释器调用模块
return [sys.executable, "-m", "openclaw"]
def _verify_cli_task(task: dict[str, Any], stdout: str) -> int:
"""
CLI 模式下的结果验证,逻辑与 TaskManager.verify_task_result 一致。
返回得分(0 表示验证失败)。
"""
from openclawbot import get_openclaw_bot # 延迟导入避免循环依赖
question_id = str(task.get("id") or "")
answer = task.get("answer")
if not isinstance(answer, dict):
logger.debug(" [verify] no answer config, skip verification")
return 0
workspace_dir = os.path.join(get_openclaw_bot().openclaw_root, "workspace")
try:
vr = verify_task_answer(
workspace_dir=workspace_dir,
question_id=question_id,
answer=answer,
stdout_content=stdout,
)
except Exception as e:
logger.error(f" [verify] exception during verification: {e}")
return 0
logger.info(
f" [verify] question {vr.question_id}: "
f"score={vr.score}/{vr.max_score}, "
f"before_penalty={vr.score_before_penalty}, "
f"penalty={vr.penalty_deduction}, fatal={vr.fatal}"
)
if vr.fatal:
return 0
return vr.score
def run_task_cli(
task: dict[str, Any],
*,
timeout_sec: int,
) -> dict[str, Any]:
"""通过 openclaw agent 执行单条题目,收集 stdout/stderr/returncode,并验证结果。"""
task_id = task.get("id", "?")
category = task.get("category", "?")
category_label = task.get("category_label", category)
answer = task.get("answer")
max_score = answer.get("max_score", 0) if isinstance(answer, dict) else 0
question = (task.get("question") or "").strip()
_fail_start_time = int(time.time() * 1000)
def _fail(error: str, duration: float = 0.0) -> dict[str, Any]:
_now = int(time.time() * 1000)
return {
"id": task_id,
"category": category,
"category_label": category_label,
"max_accuracy_score": max_score,
"success": False,
"error": error,
"stdout": "",
"stderr": "",
"returncode": -1,
"start_time": _fail_start_time,
"end_time": _now,
"duration_sec": duration,
"accuracy_score": 0,
"real_accuracy_score": 0,
"score": 0,
"tps": 0.0,
"tps_score": 0,
"input_tokens": 0,
"output_tokens": 0,
"cache_read_tokens": 0,
"cache_write_tokens": 0,
"total_tokens": 0,
}
if not question:
return _fail("empty question")
# 将题目内容写入 ../tmp/prompt.md,文件已存在则覆盖
prompt_file_path = get_temp_file("prompt.md")
with open(prompt_file_path, "w", encoding="utf-8") as _pf:
_pf.write(question)
# 使用
session_id = f'{DEFAULT_SESSION_PREFIX}{task_id}'
base = resolve_openclaw_cmd()
cmd = base + [
"agent",
# "--agent", DEFAULT_AGENT_ID, 加上的话 session名称为 uuid
"--session-id", session_id,
"--message", f'执行文件中的指令:{prompt_file_path}',
"--timeout", str(timeout_sec),
]
time_before = time.perf_counter()
start_time = int(time.time() * 1000) # ms 时间戳
# subprocess 侧留 30s 宽限期,让 openclaw agent 在内部 timeout 触发后有时间优雅退出
# 避免 Python 侧强杀导致 transcript 未写完、token_usage 丢失
_subprocess_timeout = timeout_sec + 30
stdout_text = ""
stderr_text = ""
returncode = -1
end_time = start_time
duration_sec = 0.0
proc = None
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
try:
stdout_text, stderr_text = proc.communicate(timeout=_subprocess_timeout)
stdout_text = (stdout_text or "").strip()
stderr_text = (stderr_text or "").strip()
returncode = proc.returncode
except subprocess.TimeoutExpired:
# 超时后强制终止子进程
logger.warning(f"任务 {task_id} 超时,强制终止子进程")
proc.kill()
try:
stdout_text, stderr_text = proc.communicate(timeout=10)
stdout_text = (stdout_text or "").strip()
stderr_text = (stderr_text or "").strip()
except subprocess.TimeoutExpired:
logger.error(f"强制终止子进程后仍无法获取输出")
stdout_text = ""
stderr_text = ""
return _fail("timeout", round(time.perf_counter() - time_before, 2))
end_time = int(time.time() * 1000) # ms 时间戳
duration_sec = round(time.perf_counter() - time_before, 2)
logger.info(f" 任务消耗的时间: {duration_sec} seconds")
# 加载对话脚本
transcript = _load_transcript('main', session_id, 0)
token_usage = _extract_usage_from_transcript(transcript)
logger.info(f" token_usage: {token_usage}")
except FileNotFoundError:
if proc is not None and proc.poll() is None:
proc.kill()
return _fail("openclaw not found", round(time.perf_counter() - time_before, 2))
except Exception as e:
if proc is not None and proc.poll() is None:
proc.kill()
return _fail(str(e), round(time.perf_counter() - time_before, 2))
total_tokens = token_usage["total_tokens"]
tps = round(total_tokens / duration_sec, 2) if total_tokens > 0 and duration_sec > 0 else 0.0
tps_score = int(tps * 0.1)
if returncode != 0:
return {
"id": task_id,
"category": category,
"category_label": category_label,
"max_accuracy_score": max_score,
"success": False,
"error": f"exit code {returncode}",
"stdout": stdout_text,
"stderr": stderr_text,
"returncode": returncode,
"start_time": start_time,
"end_time": end_time,
"duration_sec": duration_sec,
"accuracy_score": 0,
"real_accuracy_score": 0,
"score": 0,
"tps": tps,
"tps_score": 0,
"input_tokens": token_usage["input_tokens"],
"output_tokens": token_usage["output_tokens"],
"cache_read_tokens": token_usage["cache_read_tokens"],
"cache_write_tokens": token_usage["cache_write_tokens"],
"total_tokens": total_tokens,
}
# 验证结果
logger.info(f" 开始验证题目结果 {task_id} ...")
accuracy_score = int(_verify_cli_task(task, stdout_text))
if accuracy_score <= 0:
logger.warning(f" 题目结果验证失败: {task_id}, accuracy_score={accuracy_score}")
return {
"id": task_id,
"category": category,
"category_label": category_label,
"max_accuracy_score": max_score,
"success": False,
"error": f"验证失败, score={accuracy_score}",
"stdout": stdout_text,
"stderr": stderr_text,
"returncode": returncode,
"start_time": start_time,
"end_time": end_time,
"duration_sec": duration_sec,
"accuracy_score": 0,
"real_accuracy_score": 0,
"score": 0,
"tps": tps,
"tps_score": 0,
"input_tokens": token_usage["input_tokens"],
"output_tokens": token_usage["output_tokens"],
"cache_read_tokens": token_usage["cache_read_tokens"],
"cache_write_tokens": token_usage["cache_write_tokens"],
"total_tokens": total_tokens,
}
real_accuracy_score = int(accuracy_score * (1000 / duration_sec) * 10)
total_score = real_accuracy_score + tps_score
logger.info(f" tps: {tps} tokens/s, tps_score: {tps_score}, accuracy_score: {accuracy_score}, total_score: {total_score}")
return {
"id": task_id,
"category": category,
"category_label": category_label,
"max_accuracy_score": max_score,
"success": True,
"stdout": stdout_text,
"stderr": stderr_text,
"returncode": returncode,
"start_time": start_time,
"end_time": end_time,
"duration_sec": duration_sec,
"accuracy_score": accuracy_score,
"real_accuracy_score": real_accuracy_score,
"score": total_score,
"tps": tps,
"tps_score": tps_score,
"input_tokens": token_usage["input_tokens"],
"output_tokens": token_usage["output_tokens"],
"cache_read_tokens": token_usage["cache_read_tokens"],
"cache_write_tokens": token_usage["cache_write_tokens"],
"total_tokens": total_tokens,
}
def get_sessions() -> list[SessionInfo]:
"""
调用 `openclaw sessions --json`,从标准输出中提取 JSON 块并解析为 SessionInfo 列表。
输出中可能混有非 JSON 的日志行(如 [plugins] 注册信息),
解析时逐行扫描,找到完整 JSON 对象后提取其中的 sessions 字段。
"""
cmd = resolve_openclaw_cmd() + ["sessions", "--json"]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=15,
encoding="utf-8",
errors="replace",
)
except FileNotFoundError:
logger.error("get_sessions: openclaw 命令未找到")
return []
except subprocess.TimeoutExpired:
logger.error("get_sessions: 命令超时")
return []
except Exception as e:
logger.error(f"get_sessions: 执行失败: {e}")
return []
output = proc.stdout or ""
# 从混有日志行的输出中找到第一个完整 JSON 对象
# 策略:收集以 '{' 开头的连续行,尝试逐步拼接并解析
data: dict | None = None
buf: list[str] = []
brace_depth = 0
for line in output.splitlines():
if not buf:
stripped = line.lstrip()
if not stripped.startswith("{"):
continue # 跳过非 JSON 起始行
buf.append(line)
brace_depth += line.count("{") - line.count("}")
if brace_depth <= 0:
# 可能已构成完整 JSON 对象,尝试解析
try:
data = json.loads("\n".join(buf))
break
except json.JSONDecodeError:
pass
buf.clear()
brace_depth = 0
if data is None:
logger.warning("get_sessions: 未能从输出中解析到 JSON")
logger.debug(f"get_sessions stdout: {output!r}")
return []
raw_sessions = data.get("sessions")
if not isinstance(raw_sessions, list):
logger.warning("get_sessions: JSON 中缺少 sessions 字段")
return []
return [SessionInfo.from_dict(s) for s in raw_sessions if isinstance(s, dict)]
def session_exists(session_id: str) -> bool:
"""判断指定 session_id 的 session 是否存在。入参须为 UUID 格式,否则直接返回 False。"""
try:
uuid.UUID(session_id)
except (ValueError, AttributeError):
return False
sessions = get_sessions()
return any(s.session_id == session_id for s in sessions)
def get_latest_session() -> SessionInfo | None:
"""返回 updatedAt 最大(最近活跃)的 session,若无则返回 None。"""
sessions = get_sessions()
if not sessions:
return None
return max(sessions, key=lambda s: s.updated_at)
def get_latest_session_usage() -> UsageInfo:
"""返回最近活跃 session 的 token 使用信息(input / output / total_tokens)。
若无可用 session 则返回空 UsageInfo。
"""
session = get_latest_session()
if session is None:
return UsageInfo()
return UsageInfo(
input=session.input_tokens or 0,
output=session.output_tokens or 0,
totalTokens=session.total_tokens or 0,
)
def _get_agent_workspace(agent_id: str) -> Path | None:
"""Get the workspace path for an agent from OpenClaw config."""
try:
list_result = subprocess.run(
["openclaw", "agents", "list"],
capture_output=True,
text=True,
check=False,
)
if list_result.returncode != 0:
return None
# Parse the agent list output to find workspace
# OpenClaw normalizes colons to dashes in agent names, so check both.
normalized_id = agent_id.replace(":", "-")
lines = list_result.stdout.split("\n")
found_agent = False
for line in lines:
stripped = line.strip()
if stripped.startswith(f"- {agent_id}") or stripped.startswith(f"- {normalized_id}"):
found_agent = True
elif found_agent and "Workspace:" in line:
workspace_str = line.split("Workspace:")[1].strip()
# Expand ~ if present
if workspace_str.startswith("~/"):
workspace_str = str(Path.home() / workspace_str[2:])
return Path(workspace_str)
elif found_agent and line.strip().startswith("-"):
# Found next agent, stop looking
break
return None
except Exception as exc:
logger.warning("Failed to get agent workspace: %s", exc)
return None
def ensure_agent_exists(agent_id: str, workspace_dir: Path) -> bool:
"""Ensure the OpenClaw agent exists with the correct workspace.
If the agent already exists but points to a different workspace, it is
deleted and recreated so that the new workspace takes effect.
Returns True if the agent was (re)created.
"""
workspace_dir.mkdir(parents=True, exist_ok=True)
try:
list_result = subprocess.run(
["openclaw", "agents", "list"],
capture_output=True,
text=True,
check=False,
)
except FileNotFoundError:
logger.error("openclaw CLI not found while listing agents")
return False
if list_result.returncode == 0:
# Check for exact agent ID match — avoid substring false positives
# (e.g. "bench-foo-4" matching "bench-foo-4-5" in the output).
# Output format is "- <agent_id>" or "- <agent_id> (default)" per line.
# OpenClaw normalizes colons to dashes in directory/display names, so
# also check the normalized form.
existing_agents = set()
for line in list_result.stdout.splitlines():
line = line.strip()
if line.startswith("- "):
# Extract agent name: "- bench-foo-4-5" or "- main (default)"
name_part = line[2:].split()[0] if line[2:].strip() else ""
if name_part:
existing_agents.add(name_part)
normalized_id = agent_id.replace(":", "-")
if agent_id in existing_agents or normalized_id in existing_agents:
# Agent exists — check if workspace matches
current_workspace = _get_agent_workspace(agent_id)
if (
current_workspace is not None
and current_workspace.resolve() == workspace_dir.resolve()
):
logger.info("Agent %s already exists with correct workspace", agent_id)
return False
# Workspace is stale or unknown — delete and recreate
delete_name = normalized_id if normalized_id in existing_agents else agent_id
logger.info(
"Agent %s exists with stale workspace (%s != %s), recreating",
agent_id,
current_workspace,
workspace_dir,
)
subprocess.run(
["openclaw", "agents", "delete", delete_name, "--force"],
capture_output=True,
text=True,
check=False,
)
logger.info("Creating OpenClaw agent %s", agent_id)
try:
create_result = subprocess.run(
[
"openclaw",
"agents",
"add",
agent_id,
"--workspace",
str(workspace_dir),
"--non-interactive",
],
capture_output=True,
text=True,
check=False,
)
except FileNotFoundError:
logger.error("openclaw CLI not found while creating agent")
return False
if create_result.returncode != 0:
logger.warning(
"Agent creation returned %s: %s", create_result.returncode, create_result.stderr
)
return True
def cleanup_agent_sessions(agent_id: str) -> None:
"""Remove stored session transcripts for an agent to avoid unbounded growth."""
agent_dir = _get_agent_store_dir(agent_id)
sessions_dir = agent_dir / "sessions"
if not sessions_dir.exists():
return
removed = 0
for pattern in ("*.jsonl", "*.jsonl.lock"):
for path in sessions_dir.glob(pattern):
try:
path.unlink()
removed += 1
except OSError as exc:
logger.warning("Failed to remove session file %s: %s", path, exc)
sessions_store = sessions_dir / "sessions.json"
if sessions_store.exists():
try:
sessions_store.unlink()
except OSError as exc:
logger.warning("Failed to remove session store %s: %s", sessions_store, exc)
if removed:
logger.info("Removed %s old OpenClaw session transcripts for %s", removed, agent_id)
def _get_agent_store_dir(agent_id: str) -> Path:
base_dir = Path.home() / ".openclaw" / "agents"
direct_dir = base_dir / agent_id
if direct_dir.exists():
return direct_dir
normalized_dir = base_dir / agent_id.replace(":", "-")
if normalized_dir.exists():
return normalized_dir
return direct_dir
def cleanup_agent_sessions_with_prefix(agent_id: str, prefix_pattern: str) -> None:
"""Remove stored session transcripts for an agent to avoid unbounded growth."""
agent_dir = _get_agent_store_dir(agent_id)
sessions_dir = agent_dir / "sessions"
if not sessions_dir.exists():
return
removed = 0
for pattern in (f"{prefix_pattern}.jsonl", f"{prefix_pattern}.jsonl.lock"):
for path in sessions_dir.glob(pattern):
try:
path.unlink()
removed += 1
except OSError as exc:
logger.warning("清理会话文件失败 %s: %s", path, exc)
if removed:
logger.info("已清理 %s 个历史会话脚本文件 Agent: %s", removed, agent_id)
def _resolve_session_uuid(agent_id: str, session_id: str) -> str | None:
"""从 sessions.json 中根据 session key 解析实际的 UUID sessionId。
session key 格式:agent:{agent_id}:explicit:{session_id}
匹配成功后返回对应的 sessionId(UUID 字符串),失败返回 None。
"""
agent_dir = _get_agent_store_dir(agent_id)
sessions_json_path = agent_dir / "sessions" / "sessions.json"
if not sessions_json_path.exists():
logger.warning(f"sessions.json 不存在: {sessions_json_path}")
return None
session_key = f"agent:{agent_id}:explicit:{session_id}".lower()
try:
data = json.loads(sessions_json_path.read_text(encoding="utf-8"))
entry = data.get(session_key)
if entry is None:
logger.warning(f"sessions.json 中未找到 session key: {session_key}")
return None
uuid_str = entry.get("sessionId")
if not uuid_str:
logger.warning(f"sessions.json 中 session key {session_key} 无 sessionId 字段")
return None
logger.info(f"已从 sessions.json 解析 sessionId: {uuid_str}")
return uuid_str
except Exception as e:
logger.warning(f"解析 sessions.json 失败: {e}")
return None
def _load_transcript(agent_id: str, session_id: str, started_at: float) -> List[Dict[str, Any]]:
agent_dir = _get_agent_store_dir(agent_id)
# transcript 文件名已由 <session_id>.jsonl 变更为 <UUID>.jsonl,
# 需先从 sessions.json 解析实际的 UUID sessionId
resolved_id = _resolve_session_uuid(agent_id, session_id)
transcript_path = agent_dir / "sessions" / f"{resolved_id if resolved_id else session_id}.jsonl"
logger.info(f"transcript 路径:{transcript_path}")
for attempt in range(6):
if transcript_path.exists():
logger.info(f"transcript 已找到!attempt: {attempt}")
break
time.sleep(1.0)
if not transcript_path.exists():
logger.info(f"transcript 文件未找到!")
return []
transcript: List[Dict[str, Any]] = []
for line in transcript_path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
try:
transcript.append(json.loads(line))
except json.JSONDecodeError as exc:
logger.warning("解析会话脚本文件失败: %s", exc)
transcript.append({"raw": line, "parse_error": str(exc)})
return transcript
def _extract_usage_from_transcript(transcript: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Sum token usage and cost from all assistant messages in transcript."""
totals = {
"input_tokens": 0,
"output_tokens": 0,
"cache_read_tokens": 0,
"cache_write_tokens": 0,
"total_tokens": 0,
"cost_usd": 0.0,
"request_count": 0,
}
for entry in transcript:
if entry.get("type") != "message":
continue
msg = entry.get("message", {})
if msg.get("role") != "assistant":
continue
totals["request_count"] += 1
usage = msg.get("usage", {})
totals["input_tokens"] += usage.get("input", 0)
totals["output_tokens"] += usage.get("output", 0)
totals["cache_read_tokens"] += usage.get("cacheRead", 0)
totals["cache_write_tokens"] += usage.get("cacheWrite", 0)
totals["total_tokens"] += usage.get("totalTokens", 0)
cost = usage.get("cost", {})
totals["cost_usd"] += cost.get("total", 0.0)
return totals
FILE:scripts/config.py
"""
BenchClaw 全局配置常量。
修改此文件即可调整运行参数,无需改动业务代码。
"""
# ---------- APP ----------
CLIENT_VERSION = '1.1.0'
# ---------- 加解密 (RSA + AES 混合) ----------
# 服务端 RSA 公钥 PEM(与 server 配置一致;可通过环境变量覆盖,见 crypto 模块),用于验证来自服务器的任务签名,确保题目未被篡改。
BENCHCLAW_RSA_PUBLIC_KEY_PEM = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArup/oFdhbiac8TtQC297
R6mzP59EToM2OJfnf7ZrbHYSAql0CE03Gv9GpHFhByOVRgNTcux+SQT3W5GohBkF
+emY/ntFd7QGnYIqa+1ME7yiZlzJhQ+ddP9YYpfZn6ixG6SsTne3vpWbiZRAv45A
BswtsJYpi6VRk6dusuo8VzjLL/IA96ua0RW+ik/NaPdpSnsw0MR/xRkJ7nP2k9LJ
L354Q+mroFh8dOkqiZjygqSJOPkyDH3SQMqgmJIMnvE+rqg72Ieb1UcnaESDzlMT
P+GOABlkd9K1M0OSvvs0lbu+8gHtYXllyw98l0SnkLUjZR2gmsYQD4Z5QXdvwNuu
YQIDAQAB
-----END PUBLIC KEY-----"""
# ---------- API ----------
BENCHCLAW_API_HOST = "benchclawapi.antutu.com" # API 域名,更换时只需修改这里
DEFAULT_API_URL = f"https://{BENCHCLAW_API_HOST}/api/v1/tests/request"
DEFAULT_SUBMIT_API_URL = f"https://{BENCHCLAW_API_HOST}/api/v1/tests/submit"
# ---------- Gateway WebSocket ----------
DEFAULT_WS_URL = "ws://127.0.0.1:18789"
PROTOCOL_VERSION = 3
# ---------- 会话 & 超时 ----------
DEFAULT_AGENT_ID = "main"
DEFAULT_TIMEOUT_SEC = 300 # 单题最长等待秒数
DEFAULT_SESSION_PREFIX = "benchclaw_session_"
USE_LATEST_SESSION = True
# ---------- 上传数据截断配置 ----------
# stdout 内容截断长度(字符数),防止上传数据过大
UPLOAD_STDOUT_TRUNCATE_LENGTH = 2000
# stderr 内容截断长度(字符数)
UPLOAD_STDERR_TRUNCATE_LENGTH = 500
FILE:scripts/crypto.py
"""
BenchClaw 加解密模块 (RSA + AES 混合)
请求:随机 AES-256 密钥 K → gpv;RSA 公钥加密 K → key。题目包响应为明文 JSON,无需解密 data。
目的:用于验证来自服务器的任务签名,确保题目未被篡改。
依赖:cryptography >= 42.0
"""
from __future__ import annotations
import base64
import json
import os
from typing import Any, Tuple
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from config import BENCHCLAW_RSA_PUBLIC_KEY_PEM as _cfg_pub
_IV_LEN = 12
_TAG_LEN = 16
def _public_pem() -> str:
return (os.environ.get("BENCHCLAW_RSA_PUBLIC_KEY_PEM") or _cfg_pub or "").strip()
def _load_public_key():
pem = _public_pem()
if not pem:
raise RuntimeError("BENCHCLAW_RSA_PUBLIC_KEY_PEM 未配置")
return serialization.load_pem_public_key(pem.encode("utf-8"), backend=default_backend())
def rsa_encrypt_aes_key(aes_key: bytes) -> str:
pub = _load_public_key()
encrypted = pub.encrypt(
aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
return base64.b64encode(encrypted).decode("ascii")
def aes_gcm_encrypt_json(data: Any, aes_key: bytes) -> str:
if len(aes_key) != 32:
raise RuntimeError(f"期望 32 字节 AES 密钥,实际 {len(aes_key)}")
iv = os.urandom(_IV_LEN)
if isinstance(data, bytes):
plaintext = data
elif isinstance(data, str):
plaintext = data.encode("utf-8")
else:
plaintext = json.dumps(data, ensure_ascii=False).encode("utf-8")
ct_tag = AESGCM(aes_key).encrypt(iv, plaintext, None)
return base64.b64encode(iv + ct_tag).decode("ascii")
def hybrid_encrypt_json(data: Any) -> Tuple[str, str, bytes]:
"""
混合加密可 JSON 序列化的对象。
Returns
-------
key_b64, gpv_b64, aes_key_bytes
aes_key_bytes 用于解密服务端返回的 data。
"""
aes_key = os.urandom(32)
gpv = aes_gcm_encrypt_json(data, aes_key)
key_b64 = rsa_encrypt_aes_key(aes_key)
return key_b64, gpv, aes_key
def aes_gcm_decrypt(gpv: str, aes_key: bytes) -> bytes:
blob = base64.b64decode(gpv)
if len(blob) < _IV_LEN + _TAG_LEN:
raise ValueError("密文太短")
iv, ct_tag = blob[:_IV_LEN], blob[_IV_LEN:]
return AESGCM(aes_key).decrypt(iv, ct_tag, None)
def client_decrypt(gpv: str, aes_key: bytes) -> Any:
"""使用会话 AES 密钥解密服务端返回的 data。"""
plain = aes_gcm_decrypt(gpv, aes_key)
text = plain.decode("utf-8")
try:
return json.loads(text)
except json.JSONDecodeError:
return text
# 兼容旧脚本名:整包混合加密
def client_encrypt(data: Any) -> str:
_, gpv, _ = hybrid_encrypt_json(data)
return gpv
if __name__ == "__main__":
import argparse as _ap
parser = _ap.ArgumentParser(description="BenchClaw 加解密 (RSA+AES)")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("check", help="混合加密往返自检(需服务端私钥才能完整测,此处仅测加密结构)")
args = parser.parse_args()
if args.cmd == "check":
k, g, aes = hybrid_encrypt_json({"t": 1})
assert len(aes) == 32
print("hybrid encrypt ok, key len", len(k), "gpv len", len(g))
FILE:scripts/main.py
"""
OpenClaw 评测客户端:从服务端 API 拉取题目,按 CLI 或 WebSocket 方式执行并收集结果。
用法:
python main.py --limit 5
参数:
--limit N 最多执行的题目数量,超过后不再执行
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import os
import subprocess
import sys
import urllib.error
import urllib.request
from typing import Any
from pathlib import Path
from openclawbot import get_openclaw_bot
from agent_cli import run_task_cli, get_latest_session, cleanup_agent_sessions_with_prefix
from utils import get_bench_session_id, get_temp_file, clean_temp_files, clean_benchclaw_workspace, HardwareMonitor, get_system_info
from report import generate_reports_from_dict
from server import fetch_questions, upload_results_from_dict, flush_pending_uploads
from config import (
DEFAULT_SUBMIT_API_URL,
DEFAULT_AGENT_ID,
DEFAULT_TIMEOUT_SEC,
DEFAULT_SESSION_PREFIX,
CLIENT_VERSION,
USE_LATEST_SESSION,
)
from session import get_openclaw_session_info, OpenClawSessionInfo, ran_under_openclaw_exec, cleanup_agent_sessions
def setup_logging() -> logging.Logger:
"""配置日志记录,默认输出到文件和控制台。"""
logger = logging.getLogger("benchclaw")
logger.setLevel(logging.DEBUG)
# 避免重复添加 handler
if logger.handlers:
return logger
# 文件 handler - 默认输出到 ../data/benchclaw.log
log_file = get_temp_file("benchclaw.log")
file_handler = logging.FileHandler(log_file, mode="w", encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 2. 控制台 handler - 实时输出到 stdout
class FlushStreamHandler(logging.StreamHandler):
"""自定义 handler,每条日志后立即刷新"""
def emit(self, record):
super().emit(record)
self.flush() # 立即刷新到 stdout
# 控制台 handler
console_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
#console_handler = logging.StreamHandler(sys.stdout)
console_handler = FlushStreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
return logger
# 全局 logger 实例
logger = setup_logging()
def _aggregate_results(results: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""
统计各分类得分、题数、通过数及得分率。
返回 stats 字典,包含 category_stats 及汇总数据。
"""
stats: dict[str, dict[str, Any]] = {}
category_stats: dict[str, dict[str, Any]] = {}
for r in results:
cat = r.get("category") or "unknown"
score = r.get("score", 0)
accuracy_score = r.get("accuracy_score", 0)
real_accuracy_score = r.get("real_accuracy_score", 0)
max_accuracy_score = r.get("max_accuracy_score", 0)
success = r.get("success", False)
tps_score = r.get("tps_score", 0)
category_label = r.get("category_label") or cat
if cat not in category_stats:
category_stats[cat] = {
"count": 0,
"succeeded": 0,
"score": 0,
"accuracy_score": 0,
"real_accuracy_score": 0,
"max_accuracy_score": 0,
"tps_score": 0,
"category_label": category_label,
}
category_stats[cat]["count"] += 1
category_stats[cat]["score"] += score
category_stats[cat]["tps_score"] += tps_score
category_stats[cat]["accuracy_score"] += accuracy_score
category_stats[cat]["real_accuracy_score"] += real_accuracy_score
category_stats[cat]["max_accuracy_score"] += max_accuracy_score
if success:
category_stats[cat]["succeeded"] += 1
for cat, cat_stats in category_stats.items():
if cat_stats["max_accuracy_score"] > 0:
cat_stats["accuracy_rate"] = round(cat_stats["accuracy_score"] / cat_stats["max_accuracy_score"] * 100, 2)
else:
cat_stats["accuracy_rate"] = 0.0
stats["category_stats"] = category_stats
stats["score"] = sum(r.get("score", 0) for r in results)
stats["accuracy_score"] = sum(r.get("accuracy_score", 0) for r in results)
stats["real_accuracy_score"] = sum(r.get("real_accuracy_score", 0) for r in results)
stats["max_accuracy_score"] = sum(r.get("max_accuracy_score", 0) for r in results)
stats["accuracy_rate"] = round(stats["accuracy_score"] / stats["max_accuracy_score"] * 100, 2) if stats["max_accuracy_score"] > 0 else 0.0
stats["n_question_count"] = len(results)
stats["n_success"] = sum(1 for r in results if r.get('success'))
stats["tps_score"] = sum(r.get("tps_score", 0) for r in results)
stats["avg_tps"] = round(
sum(r.get("tps", 0) for r in results if r.get("tps", 0) > 0)
/ max(1, sum(1 for r in results if r.get("tps", 0) > 0)),
2,
)
return stats
def print_brief_stats(stats: dict[str, dict[str, Any]]):
lines = [
"评测结果:",
f"总分: {stats['score']}",
f"准确度分: {stats['real_accuracy_score']}",
f"速度分: {stats.get('tps_score', 0)}",
f"准确率: {stats['accuracy_rate']}%",
f"题目总数: {stats['n_question_count']}",
f"成功数: {stats['n_success']}",
f"平均 TPS: {stats.get('avg_tps', 0)} tokens/s",
"",
"题型分数:",
]
category_stats = stats["category_stats"]
for cat, cat_stats in sorted(category_stats.items()):
label = cat_stats.get("category_label") or cat
lines.append(
f"分类:{label}({cat})"
f" 题目数量:{cat_stats['count']}"
f" 得分:{cat_stats['score']}"
f" 准确度分:{cat_stats.get('real_accuracy_score', 0)}"
f" 速度分:{cat_stats.get('tps_score', 0)}"
f" 准确率:{cat_stats['accuracy_rate']}%"
)
print("\n" + "\n".join(lines) + "\n")
def _upload_results(
summary: dict[str, Any],
bench_session_id: str,
hash: str
) -> dict[str, Any]:
"""将评测结果上传到服务端,返回排行榜数据(上传失败时返回空 dict)。"""
if not summary.get("results"):
logger.warning("没有结果数据,跳过上传")
return {}
try:
ok, msg, leaderboard = upload_results_from_dict(summary, bench_session_id, hash, DEFAULT_SUBMIT_API_URL)
if ok:
logger.info(f"上传成功: {msg}")
category_stats = summary.get("stats", {}).get("category_stats")
_log_leaderboard(leaderboard, category_stats)
else:
logger.warning(f"上传失败: {msg}")
return leaderboard if ok else {}
except Exception as e:
logger.warning(f"上传异常: {e}")
return {}
def _log_leaderboard(leaderboard: dict[str, Any], category_stats: dict[str, Any] | None = None) -> None:
"""将排行榜数据输出到日志。"""
if not leaderboard:
return
percentiles = leaderboard.get("percentiles") or {}
total_pct = percentiles.get("total")
sample_size = leaderboard.get("sample_size")
leaderboard_url = leaderboard.get("leaderboard_url", "")
if total_pct is not None:
logger.info(f"🏆 太棒了,您的分数超越了全国 {total_pct}% 的用户!")
# 用服务端返回的 category 顺序(s1~s5)对应真实分类名称
# CATEGORY_ORDER 与 server.py 中定义一致
from server import CATEGORY_ORDER
cat_lines = []
for idx, cat_key in enumerate(CATEGORY_ORDER, start=1):
pct = percentiles.get(f"s{idx}")
if pct is None:
continue
# 优先从 category_stats 取 category_label,否则 fallback 到 cat_key
label = cat_key
if category_stats and cat_key in category_stats:
label = category_stats[cat_key].get("category_label") or cat_key
cat_lines.append(f" {label}({cat_key}): 超越 {pct}%")
if cat_lines:
logger.info("分类排名:\n" + "\n".join(cat_lines))
if sample_size:
logger.info(f"参与评测用户数:{sample_size}")
if leaderboard_url:
logger.info(f"完整排行榜:{leaderboard_url}")
def _generate_reports(summary: dict[str, Any]) -> None:
"""生成简要报表和详细报表,结果写入 ../data/ 目录。"""
if not summary.get("results"):
logger.warning("没有结果数据,跳过报表生成")
return
try:
summary_path, detail_path = generate_reports_from_dict(summary)
logger.info(f"简要报表:{summary_path}")
logger.info(f"详细报表:{detail_path}")
except Exception as e:
logger.warning(f"报表生成失败: {e}")
def _load_caller_info() -> dict:
"""读取 caller_info.txt,获取触发评测的渠道和目标用户信息。"""
caller_file = get_temp_file("caller_info.txt")
if not os.path.exists(caller_file):
return {}
caller = {}
try:
with open(caller_file) as f:
for line in f:
if "=" in line:
k, v = line.strip().split("=", 1)
caller[k.strip()] = v.strip()
except Exception as e:
logger.warning(f"读取 caller_info.txt 失败: {e}")
return caller
def _safe_print(text: str) -> None:
"""Windows GBK 兼容的 print,自动替换无法编码的字符(如 emoji)。"""
try:
print(text, flush=True)
except (UnicodeEncodeError, LookupError):
# Windows GBK 等窄字符集编码,替换无法编码的字符为 '?'
safe = text.encode(sys.stdout.encoding or 'utf-8', errors='replace').decode(sys.stdout.encoding or 'utf-8', errors='replace')
print(safe, flush=True)
def _send_notification(message: str, caller: dict) -> None:
"""向触发评测的用户发送通知。
- 有 channel + target(飞书/telegram 等):通过 openclaw message send 发消息
- 无 channel + target(TUI 或未配置):fallback 到 stdout,TUI 用户直接可见
"""
channel = caller.get("channel")
target = caller.get("target")
# webchat 是 OpenClaw 内部渠道(TUI/WebChat),不支持 CLI 发送,直接走 stdout fallback
UNSUPPORTED_CHANNELS = {"webchat", ""}
if not channel or not target or channel in UNSUPPORTED_CHANNELS:
# TUI / 无渠道 fallback:输出到 stdout
_safe_print(f"\n{'='*50}\n[NOTIFY] {message}\n{'='*50}")
return
try:
from agent_cli import resolve_openclaw_cmd
base = resolve_openclaw_cmd()
cmd = base + ["message", "send", "--channel", channel, "--target", target, "--message", message]
max_retries = 3
for attempt in range(1, max_retries + 1):
try:
result = subprocess.run(cmd, timeout=30, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"已发送通知到 {channel}:{target}")
return
else:
stderr = result.stderr.strip() if result.stderr else "(无错误输出)"
logger.warning(f"通知发送失败(第{attempt}次)returncode={result.returncode} stderr={stderr}")
except subprocess.TimeoutExpired:
logger.warning(f"通知发送超时(第{attempt}次,30s)")
except Exception as e:
logger.warning(f"通知发送异常(第{attempt}次): {e}")
if attempt < max_retries:
import time as _t; _t.sleep(2)
logger.error(f"通知发送失败,已重试{max_retries}次,fallback 到 stdout: {channel}:{target}")
_safe_print(f"\n{'='*50}\n[NOTIFY] {message}\n{'='*50}")
except Exception as e:
logger.error(f"通知模块异常,fallback 到 stdout: {e}")
_safe_print(f"\n{'='*50}\n[NOTIFY] {message}\n{'='*50}")
def main() -> int:
# 运行前删除 bench_claw 工作区文件夹
clean_benchclaw_workspace()
clean_temp_files()
cleanup_agent_sessions_with_prefix(DEFAULT_AGENT_ID, f'{DEFAULT_SESSION_PREFIX}*')
cleanup_agent_sessions(DEFAULT_AGENT_ID, DEFAULT_SESSION_PREFIX)
session_info: OpenClawSessionInfo = get_openclaw_session_info()
logger.info(f"openclaw SessionId: {session_info.session_id}")
logger.info(f"openclaw SessionKey: {session_info.session_key}")
logger.info(f"openclaw Channel: {session_info.channel}")
logger.info(f"openclaw Target: {session_info.target}")
bot = get_openclaw_bot()
logger.info(f"Openclaw版本: {bot.version}")
logger.info(f"Openclaw主模型: {bot.primary_model}")
logger.info(f"openclaw root: {bot.openclaw_root}")
bench_session_id = get_bench_session_id()
logger.info(f"Bench 会话 ID: {bench_session_id}")
# 补报上次因网络失败缓存的数据
try:
flushed = flush_pending_uploads()
if flushed:
logger.info(f"补报成功 {len(flushed)} 条历史缓存数据")
except Exception as e:
logger.warning(f"补报缓存数据失败: {e}")
# 下载题目
try:
fetch_result = fetch_questions(bench_session_id, bot.primary_model, openclaw_root=str(bot.openclaw_root))
questions = fetch_result["questions"]
api_session_id = fetch_result["session_id"]
api_hash = fetch_result["hash"]
model_cost = fetch_result.get("model_cost")
logger.info(f"下载题目成功: 共 {len(questions)} 道题目")
if model_cost:
logger.info(f"模型计费信息: {model_cost}")
except Exception as e:
notify_msg = ""
if isinstance(e, urllib.error.HTTPError) and e.code == 429:
notify_msg = "⚠️ 今日评测次数已达上限(10次/24小时),请明天再试。"
logger.error(notify_msg)
else:
notify_msg = f"运行benchclaw评测失败:加载题目失败,错误信息:{e}"
logger.error(notify_msg)
caller = _load_caller_info()
if USE_LATEST_SESSION:
caller["channel"] = session_info.channel
caller["target"] = session_info.target
_send_notification(notify_msg, caller)
return 1
# DEBUG
# questions = questions[0:1]
# 执行并收集结果
caller = _load_caller_info()
# 如果使用最新会话作为消息推送渠道,则更新 caller 中的渠道和目标
if USE_LATEST_SESSION:
caller["channel"] = session_info.channel
caller["target"] = session_info.target
hw_monitor = HardwareMonitor()
hw_monitor.start()
questions_results = []
category_buffer: dict[str, list] = {} # 按分类缓冲,用于阶段小结
for i, task in enumerate(questions):
cat = task.get("category", "unknown")
cat_label = task.get("category_label", cat)
logger.info(f"正在测试 {task.get('id')} 类别:{cat_label} ...")
# _send_notification(f"正在进行{cat_label}{task.get('id')}, 当前进度{i+1}/{len(questions)},请耐心等待...", caller)
result = run_task_cli(task, timeout_sec=DEFAULT_TIMEOUT_SEC)
questions_results.append(result)
status = "ok" if result.get("success") else "failed"
logger.info(f" -> {status} (returncode={result.get('returncode')}, error={result.get('error')}, {result.get('duration_sec', 0)}s)")
# A3: 超时提示
if result.get("error") == "timeout":
_send_notification(f"⚠️ {task.get('id')} 超时(>{DEFAULT_TIMEOUT_SEC}s),已自动跳过,继续下一题", caller)
# 按分类缓冲
if cat not in category_buffer:
category_buffer[cat] = []
category_buffer[cat].append(result)
# A2: 分类完成时发阶段小结(每5题一组)
if len(category_buffer[cat]) == 5:
pass_count = sum(1 for r in category_buffer[cat] if r.get("success"))
cat_score = sum(r.get("score", 0) for r in category_buffer[cat])
# 计算已完成分类数和进度百分比
completed_cats = len(category_buffer)
total_cats = 5
progress_pct = round(completed_cats / total_cats * 100)
# 分类序号
from server import CATEGORY_ORDER as _CAT_ORDER
cat_idx = _CAT_ORDER.index(cat) + 1 if cat in _CAT_ORDER else completed_cats
msg = (
f"📊 {cat_idx}. {cat_label}完成:{pass_count}/5 通过,阶段得分 {cat_score:,}\n"
f"已完成 {progress_pct}%({completed_cats}/{total_cats} 个分类)"
)
logger.info(msg.replace('\n', ' '))
_send_notification(msg, caller)
logger.info(f"执行完成")
hw_stats = hw_monitor.stop()
sys_info = get_system_info()
# 聚合统计
stats = _aggregate_results(questions_results)
# 向 stdout 输出简要评测信息
print_brief_stats(stats)
summary = {
"api_session_id": api_session_id,
"api_hash": api_hash,
"model_name": bot.primary_model or "",
"model_cost": model_cost,
"hardware_stats": hw_stats,
"sys_info": sys_info,
"agent_name": caller.get("agent_name", ""),
"openclaw_version": bot.version,
"results": questions_results,
"total": len(questions_results),
"succeeded": sum(1 for r in questions_results if r.get("success")),
"score": stats.get("score", 0),
"stats": stats,
}
payload = json.dumps(summary, indent=2, ensure_ascii=False)
results_file_path = get_temp_file('results.json')
with open(results_file_path, "w", encoding="utf-8") as f:
f.write(payload)
# 生成报表
_generate_reports(summary)
# 根据 show_name 决定是否携带 openclaw_name 上报
show_name = caller.get("show_name", "true").lower() not in ("false", "否", "no", "0")
if not show_name:
summary["agent_name"] = "" # 匿名上报
# 直接上报结果到服务端
leaderboard = _upload_results(summary, bench_session_id, api_hash)
# 将排行榜写入报表(上传成功后追加)
if leaderboard:
summary["leaderboard"] = leaderboard
_generate_reports(summary)
# A4: 评测完成后通知用户
score = stats.get("score", 0)
succeeded = summary["succeeded"]
total = summary["total"]
total_duration_sec = sum(r.get("duration_sec", 0) for r in questions_results)
duration_min = round(total_duration_sec / 60, 1)
failed_ids = [r.get("id", r.get("question_id", "?")) for r in questions_results if not r.get("success")]
fail_str = f"\n❌ 失败题目:{', '.join(failed_ids)}" if failed_ids else ""
# 榜单排名信息
rank_str = ""
if leaderboard:
pct = leaderboard.get("percentiles", {}).get("total")
if pct is not None:
rank_str = f"\n🏅 榜单排名:超越了 {pct}% 的用户"
completion_msg = (
f"🏆 BenchClaw 评测完成!已上传到榜单。\n\n"
f"📊 综合评分:{score:,} 分\n"
f"✅ 通过:{succeeded}/{total} 题\n"
f"⏱️ 耗时:{duration_min} 分钟{rank_str}{fail_str}\n\n"
f"发送「报告」查看详细结果。"
)
_send_notification(completion_msg, caller)
return 0 if summary["succeeded"] == summary["total"] else 1
if __name__ == "__main__":
main()
FILE:scripts/openclawbot.py
import os
from typing import Any
import json
import subprocess
import logging
import re
try:
import json5 # type: ignore[import-not-found]
except Exception:
json5 = None
logger = logging.getLogger("benchclaw.openclawbot")
_BOT_SINGLETON: "OpenclawBot | None" = None
class OpenclawBot:
"""
负责通过 openclaw CLI 读取关键信息:
- 版本号
- 默认 primary / fallbacks 模型
"""
_instance: "OpenclawBot | None" = None
def __new__(cls, config_path: str | None = None) -> "OpenclawBot":
# 进程内单例:多次构造只返回同一个对象
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, config_path: str | None = None) -> None:
if getattr(self, "_initialized", False):
return
self.config_path = config_path
self.openclaw_root = os.path.join(os.path.expanduser("~"), ".openclaw")
self.raw_config: dict[str, Any] = {}
# 元信息
self.version: str | None = None
# 模型相关
self.primary_model: str | None = None
self.fallback_models: list[str] = []
self._load()
self._initialized = True
def _load(self) -> None:
"""通过 openclaw CLI 读取并解析信息,忽略读取和解析错误。"""
logger.info("正在读取Openclaw版本和模型信息...")
self._extract_version()
self._extract_models()
def _run_openclaw_json(self, args: list[str]) -> Any:
"""
执行 openclaw CLI 并从输出中提取 JSON。
输出可能包含 banner/提示语/插件日志等非 JSON 内容,会自动跳过。
"""
# 延迟导入,避免与 agent_cli 的潜在循环依赖。
from agent_cli import resolve_openclaw_cmd
try:
proc = subprocess.run(
[*resolve_openclaw_cmd(), *args],
capture_output=True,
text=True,
check=False,
encoding="utf-8",
)
except OSError as e:
logger.exception("执行 openclaw 命令失败: args=%s, err=%s", args, e)
return None
if proc.returncode != 0:
logger.warning(
"openclaw 返回非 0 状态码: code=%s, args=%s, stderr=%s",
proc.returncode,
args,
(proc.stderr or "").strip(),
)
output = (proc.stdout or "").strip()
if not output:
logger.warning("openclaw 无输出: args=%s", args)
return None
for candidate in self._iter_json_candidates(output):
data = self._parse_json(candidate)
if data is not None:
return data
logger.warning("openclaw 输出中未找到可解析 JSON: args=%s", args)
return None
def _iter_json_candidates(self, text: str) -> list[str]:
"""从文本中提取可能的完整 JSON 片段(支持对象和数组)。"""
candidates: list[str] = []
n = len(text)
i = 0
while i < n:
ch = text[i]
if ch not in "{[":
i += 1
continue
end = self._find_json_end(text, i)
if end != -1:
candidates.append(text[i : end + 1].strip())
i = end + 1
else:
i += 1
return candidates
def _find_json_end(self, text: str, start: int) -> int:
"""给定 JSON 起始位置,寻找其匹配结束位置;失败返回 -1。"""
opening = text[start]
if opening not in "{[":
return -1
closing = "}" if opening == "{" else "]"
depth = 0
in_string = False
escaped = False
for i in range(start, len(text)):
ch = text[i]
if in_string:
if escaped:
escaped = False
continue
if ch == "\\":
escaped = True
continue
if ch == '"':
in_string = False
continue
if ch == '"':
in_string = True
continue
if ch == opening:
depth += 1
continue
if ch == closing:
depth -= 1
if depth == 0:
return i
continue
# 处理嵌套的另一类括号,如 {"a":[1,2]}
if ch in "{[":
depth += 1
elif ch in "}]":
depth -= 1
if depth == 0:
return i
return -1
def _parse_json(self, text: str) -> Any:
# 优先使用 json5(若可用),否则退回标准 json
if json5 is not None:
try:
return json5.loads(text) # type: ignore[call-arg]
except Exception:
pass
try:
return json.loads(text)
except Exception:
return None
def _extract_version(self) -> None:
"""解析版本号。
仅通过 `openclaw --version` 输出解析版本号(如 `OpenClaw 2026.4.9 (0512059)`)。
"""
# 延迟导入,避免与 agent_cli 的潜在循环依赖。
from agent_cli import resolve_openclaw_cmd
try:
proc = subprocess.run(
[*resolve_openclaw_cmd(), "--version"],
capture_output=True,
text=True,
check=False,
encoding="utf-8",
)
except OSError as e:
logger.warning("执行 openclaw --version 失败: %s", e)
return None
output = ((proc.stdout or "") + "\n" + (proc.stderr or "")).strip()
if not output:
logger.warning("openclaw --version 无输出")
return None
# 示例: "OpenClaw 2026.4.9 (0512059)"
m = re.search(r"OpenClaw\s+([^\s]+)", output, flags=re.IGNORECASE)
if m:
self.version = m.group(1).strip()
return
logger.warning("无法从 openclaw --version 输出解析版本号: %s", output)
def _extract_models(self) -> None:
model_data = self._run_openclaw_json(
["config", "get", "agents.defaults.model", "--json"]
)
if isinstance(model_data, dict):
self.raw_config["model"] = model_data
else:
logger.warning("读取 agents.defaults.model 失败或返回非对象 JSON")
return
model_cfg: dict[str, Any] | None = None
maybe_model = self.raw_config.get("model")
if isinstance(maybe_model, dict):
model_cfg = maybe_model
if model_cfg is not None:
primary = model_cfg.get("primary") or model_cfg.get("id")
if isinstance(primary, str) and primary.strip():
self.primary_model = primary.strip()
fallbacks = model_cfg.get("fallbacks")
if isinstance(fallbacks, list):
self.fallback_models = [
str(m).strip() for m in fallbacks if str(m).strip()
]
def get_openclaw_bot(config_path: str | None = None) -> OpenclawBot:
"""返回全局唯一的 OpenclawBot 实例。"""
global _BOT_SINGLETON
if _BOT_SINGLETON is None:
_BOT_SINGLETON = OpenclawBot(config_path=config_path)
return _BOT_SINGLETON
def main()->None:
bot = get_openclaw_bot()
print("version:", bot.version)
print("primary model:", bot.primary_model)
print("fallbacks:", bot.fallback_models)
if __name__ == "__main__":
main()
FILE:scripts/report.py
"""
BenchClaw 报表生成模块。
读取 ../temp/results.json,生成:
- ../data/report_summary.md 简要报表(汇总 + 分类汇总)
- ../data/report_detail.md 详细报表(汇总 + 分类汇总 + 每题详情)
用法:
python report.py
python report.py --input ../temp/results.json
"""
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Any
# ---------- 路径 ----------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
DEFAULT_INPUT = os.path.join(SCRIPT_DIR, "..", "temp", "results.json")
DATA_DIR = os.path.join(SCRIPT_DIR, "..", "data")
CATEGORY_NAMES: dict[str, str] = {
"capability": "能力",
"config": "配置",
"security": "安全",
"hardware": "硬件",
"permission": "权限",
}
SCORE_BAR_WIDTH = 20 # 进度条字符宽度
# ---------- 工具函数 ----------
def _bar(score: float, max_score: float, width: int = SCORE_BAR_WIDTH) -> str:
"""生成文字进度条,如 ████████░░░░ 75%"""
if max_score <= 0:
pct = 0.0
else:
pct = min(score / max_score, 1.0)
filled = round(pct * width)
bar = "█" * filled + "░" * (width - filled)
return f"`{bar}` {pct * 100:.1f}%"
def _status_icon(r: dict[str, Any]) -> str:
if r.get("success"):
return "✅"
error = r.get("error", "")
if "timeout" in error:
return "⏱️"
if "rate" in r.get("stderr", "").lower() or "rate_limit" in error.lower():
return "🚫"
return "❌"
def _fmt_duration(sec: float) -> str:
if sec >= 60:
return f"{sec / 60:.1f} min"
return f"{sec:.1f} s"
def _fmt_tokens(n: int) -> str:
if n >= 1_000_000:
return f"{n / 1_000_000:.2f}M"
if n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(n)
def _category_label(cat: str) -> str:
return CATEGORY_NAMES.get(cat, cat)
def _fmt_cost(tokens: int, cost_per_1m: float) -> str:
"""根据 token 数和单价计算预估费用,返回格式化字符串。"""
cost = tokens / 1_000_000 * cost_per_1m
return f".4f"
def _render_model_cost(data: dict[str, Any], totals: dict[str, Any]) -> list[str]:
"""生成模型计费信息区块,返回 Markdown 行列表。若无计费信息则返回空列表。"""
model_cost = data.get("model_cost")
if not isinstance(model_cost, dict):
return []
model_name = model_cost.get("model_name", "—")
input_per_1m = model_cost.get("input_cost_per_1M", 0.0)
output_per_1m = model_cost.get("output_cost_per_1M", 0.0)
currency = model_cost.get("currency", "USD")
input_tokens = totals.get("input_tokens", 0)
output_tokens = totals.get("output_tokens", 0)
cache_read_tokens = totals.get("cache_read_tokens", 0)
cache_write_tokens = totals.get("cache_write_tokens", 0)
total_tokens = totals.get("total_tokens", 0)
input_cost = input_tokens / 1_000_000 * input_per_1m
output_cost = output_tokens / 1_000_000 * output_per_1m
total_cost = input_cost + output_cost
lines = [
"## 模型计费信息",
"",
f"| 项目 | 数值 |",
f"|------|------|",
f"| 模型 | `{model_name}` |",
f"| 输入单价 | {input_per_1m} {currency} / 1M tokens |",
f"| 输出单价 | {output_per_1m} {currency} / 1M tokens |",
f"| 输入 Tokens | {_fmt_tokens(input_tokens)} → **{input_cost:.4f} {currency}** |",
f"| 输出 Tokens | {_fmt_tokens(output_tokens)} → **{output_cost:.4f} {currency}** |",
f"| Cache Read Tokens | {_fmt_tokens(cache_read_tokens)} |",
f"| Cache Write Tokens | {_fmt_tokens(cache_write_tokens)} |",
f"| 合计 Tokens | {_fmt_tokens(total_tokens)} |",
f"| 预估总费用 | **{total_cost:.4f} {currency}** |",
"",
"---",
"",
]
return lines
def _render_leaderboard(data: dict[str, Any]) -> list[str]:
"""生成排行榜区块,返回 Markdown 行列表。若无排行榜数据则返回空列表。"""
leaderboard = data.get("leaderboard")
if not isinstance(leaderboard, dict):
return []
percentiles = leaderboard.get("percentiles")
if not isinstance(percentiles, dict):
return []
total_pct = percentiles.get("total")
sample_size = leaderboard.get("sample_size")
lb_url = leaderboard.get("leaderboard_url", "")
note = leaderboard.get("note", "")
updated_at = leaderboard.get("updated_at", "")
# 从 category_stats 中获取真实的 category_label,与 server.py CATEGORY_ORDER 对应
# CATEGORY_ORDER = ["capability", "config", "security", "hardware", "permission"]
_cat_stats: dict = (data.get("stats") or {}).get("category_stats") or {}
_cat_order = ["capability", "config", "security", "hardware", "permission"]
cat_map: dict[str, str] = {}
for idx, cat_key in enumerate(_cat_order, start=1):
cat_label = _cat_stats.get(cat_key, {}).get("category_label") or cat_key
cat_map[f"s{idx}"] = f"{cat_label}({cat_key})"
lines: list[str] = [
"## 全国排名",
"",
]
if total_pct is not None:
lines.append(f"> 🏆 **太棒了,您的分数超越了全国 {total_pct}% 的用户!**")
lines.append("")
lines += [
"| 维度 | 超越比例 |",
"|------|:--------:|",
f"| **总分** | **{total_pct}%** |" if total_pct is not None else "| **总分** | — |",
]
for key, label in cat_map.items():
pct = percentiles.get(key)
lines.append(f"| {label} | {pct}% |" if pct is not None else f"| {label} | — |")
lines.append("")
meta: list[str] = []
if sample_size:
meta.append(f"参与评测用户数:**{sample_size}**")
if updated_at:
meta.append(f"数据更新时间:{updated_at[:10]}")
if note:
meta.append(f"说明:{note}")
for m in meta:
lines.append(f"- {m}")
if meta:
lines.append("")
if lb_url:
lines += [f"[查看完整排行榜]({lb_url})", ""]
lines += ["---", ""]
return lines
# ---------- 数据处理 ----------
def _load(path: str) -> dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def _compute_totals(data: dict[str, Any]) -> dict[str, Any]:
results: list[dict] = data.get("results", [])
total_score = sum(r.get("score", 0) for r in results)
total_accuracy = sum(r.get("accuracy_score", 0) for r in results)
total_max = sum(r.get("max_accuracy_score", 0) for r in results)
total_real_accuracy = sum(r.get("real_accuracy_score", 0) for r in results)
total_count = len(results)
succeeded = sum(1 for r in results if r.get("success"))
total_duration = sum(r.get("duration_sec", 0) or 0 for r in results)
total_tokens = sum(r.get("total_tokens", 0) or 0 for r in results)
input_tokens = sum(r.get("input_tokens", 0) or 0 for r in results)
output_tokens = sum(r.get("output_tokens", 0) or 0 for r in results)
cache_read_tokens = sum(r.get("cache_read_tokens", 0) or 0 for r in results)
cache_write_tokens = sum(r.get("cache_write_tokens", 0) or 0 for r in results)
score_rate = round(total_score / total_max * 100, 1) if total_max > 0 else 0.0
total_tps_score = sum(r.get("tps_score", 0) or 0 for r in results)
tps_list = [r.get("tps", 0) for r in results if (r.get("tps") or 0) > 0]
avg_tps = round(sum(tps_list) / len(tps_list), 2) if tps_list else 0.0
return {
"total_score": total_score,
"total_accuracy": total_accuracy,
"total_real_accuracy": total_real_accuracy,
"total_max": total_max,
"score_rate": score_rate,
"total_count": total_count,
"succeeded": succeeded,
"failed": total_count - succeeded,
"total_duration": total_duration,
"total_tokens": total_tokens,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cache_read_tokens": cache_read_tokens,
"cache_write_tokens": cache_write_tokens,
"total_tps_score": total_tps_score,
"avg_tps": avg_tps,
}
# ---------- 简要报表 ----------
def _render_summary(data: dict[str, Any], totals: dict[str, Any]) -> str:
session_id = data.get("openclaw_session_id") or data.get("session_id", "—")
generated = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
_stats = data.get("stats") or {}
cat_stats: dict[str, dict] = _stats.get("category_stats") or data.get("category_stats", {})
t = totals
lines: list[str] = []
# ── 标题 ──
lines += [
"# BenchClaw 评测简报",
"",
f"> 生成时间:{generated} | Session:`{session_id}`",
"",
"---",
"",
]
# ── 总体评分 ──
lines += [
"## 总体评分",
"",
f"| 指标 | 数值 |",
f"|------|------|",
f"| 总得分 | **{t['total_score']}** |",
f"| 准确度分 | **{t['total_real_accuracy']}** |",
f"| 速度分合计 | **{t['total_tps_score']}** |",
f"| 准确率 | {_bar(t['total_accuracy'], t['total_max'])} |",
f"| 题目总数 | {t['total_count']} 题 |",
f"| 通过 / 失败 | ✅ {t['succeeded']} ❌ {t['failed']} |",
f"| 总耗时 | {_fmt_duration(t['total_duration'])} |",
f"| Token 消耗 | {_fmt_tokens(t['total_tokens'])}(输入 {_fmt_tokens(t['input_tokens'])} / 输出 {_fmt_tokens(t['output_tokens'])})|",
f"| Cache Read Tokens | {_fmt_tokens(t['cache_read_tokens'])} |",
f"| Cache Write Tokens | {_fmt_tokens(t['cache_write_tokens'])} |",
f"| 平均 TPS | {t['avg_tps']} tokens/s |",
"",
"---",
"",
]
# ── 模型计费 ──
lines += _render_model_cost(data, t)
# ── 分类汇总 ──
lines += [
"## 分类汇总",
"",
"| 分类 | 题数 | 总分 | 准确度分 | 速度分 | 准确率 | 通过率 |",
"|------|:----:|-----:|---------:|-------:|--------|--------|",
]
for cat, stats in sorted(cat_stats.items()):
label = _category_label(cat)
count = stats["count"]
score = stats["score"]
real_accuracy = stats.get("real_accuracy_score", 0)
accuracy = stats.get("accuracy_score", 0)
max_s = stats["max_accuracy_score"]
tps_score = stats.get("tps_score", 0)
succ = stats["succeeded"]
pass_rate = f"{succ}/{count} ({succ/count*100:.0f}%)" if count else "—"
bar = _bar(accuracy, max_s)
lines.append(f"| {label} (`{cat}`) | {count} | {score} | {real_accuracy} | {tps_score} | {bar} | {pass_rate} |")
lines += [
"",
"---",
"",
"> *详细每题信息请查看 `report_detail.md`*",
"",
]
# ── 三维瓶颈诊断 ──
hw = data.get("hardware_stats") or {}
api_ping = data.get("api_ping_ms")
diag_lines = []
# 模型速度
avg_tps = t.get("avg_tps", 0)
if avg_tps > 5000:
tps_label = "⚡ 极快"
elif avg_tps > 2000:
tps_label = "✅ 正常"
elif avg_tps > 1000:
tps_label = "🟡 偏慢"
else:
tps_label = "🔴 过慢"
diag_lines.append(f"🤖 **模型速度**:{avg_tps} TPS {tps_label}")
# 硬件信息分析
if hw:
cpu_peak = hw.get("cpu_peak_percent", 0)
cpu_avg = hw.get("cpu_avg_percent", 0)
mem_avail = hw.get("mem_min_available_gb", 0)
mem_total = hw.get("mem_total_gb", 0)
if cpu_peak > 80:
cpu_label = "🔴 成为瓶颈"
elif cpu_peak > 60:
cpu_label = "🟡 紧张"
else:
cpu_label = "✅ 充裕"
if mem_avail < 1:
mem_label = "🔴 不足"
elif mem_avail < 2:
mem_label = "🟡 紧张"
else:
mem_label = "✅ 充裕"
diag_lines.append(f"💻 **CPU**:峰值 {cpu_peak}% {cpu_label}(平均 {cpu_avg}%)")
diag_lines.append(f"🧠 **内存**:总量 {mem_total} GB,最小可用 {mem_avail} GB {mem_label}")
if diag_lines:
lines += [
"## 三维瓶颈诊断",
"",
]
for line in diag_lines:
lines.append(line)
lines.append("")
lines += ["---", ""]
# ── 全国排名 ──
lines += _render_leaderboard(data)
return "\n".join(lines)
# ---------- 详细报表 ----------
def _render_detail(data: dict[str, Any], totals: dict[str, Any]) -> str:
session_id = data.get("openclaw_session_id") or data.get("session_id", "—")
generated = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
_stats = data.get("stats") or {}
cat_stats: dict[str, dict] = _stats.get("category_stats") or data.get("category_stats", {})
results: list[dict] = data.get("results", [])
t = totals
lines: list[str] = []
# ── 标题 ──
lines += [
"# BenchClaw 评测详细报告",
"",
f"> 生成时间:{generated} | Session:`{session_id}`",
"",
"---",
"",
]
# ── 总体评分(与简报相同) ──
lines += [
"## 总体评分",
"",
f"| 指标 | 数值 |",
f"|------|------|",
f"| 总得分 | **{t['total_score']}** |",
f"| 准确度分 | **{t['total_real_accuracy']}** |",
f"| 速度分合计 | **{t['total_tps_score']}** |",
f"| 准确率 | {_bar(t['total_accuracy'], t['total_max'])} |",
f"| 题目总数 | {t['total_count']} 题 |",
f"| 通过 / 失败 | ✅ {t['succeeded']} ❌ {t['failed']} |",
f"| 总耗时 | {_fmt_duration(t['total_duration'])} |",
f"| Token 消耗 | {_fmt_tokens(t['total_tokens'])}(输入 {_fmt_tokens(t['input_tokens'])} / 输出 {_fmt_tokens(t['output_tokens'])})|",
f"| Cache Read Tokens | {_fmt_tokens(t['cache_read_tokens'])} |",
f"| Cache Write Tokens | {_fmt_tokens(t['cache_write_tokens'])} |",
f"| 平均 TPS | {t['avg_tps']} tokens/s |",
"",
"---",
"",
]
# ── 模型计费 ──
lines += _render_model_cost(data, t)
# ── 分类汇总 ──
lines += [
"## 分类汇总",
"",
"| 分类 | 题数 | 总分 | 准确度分 | 速度分 | 准确率 | 通过率 |",
"|------|:----:|-----:|---------:|-------:|--------|--------|",
]
for cat, stats in sorted(cat_stats.items()):
label = _category_label(cat)
count = stats["count"]
score = stats["score"]
real_accuracy = stats.get("real_accuracy_score", 0)
accuracy = stats.get("accuracy_score", 0)
max_s = stats["max_accuracy_score"]
tps_score = stats.get("tps_score", 0)
succ = stats["succeeded"]
pass_rate = f"{succ}/{count} ({succ/count*100:.0f}%)" if count else "—"
bar = _bar(accuracy, max_s)
lines.append(f"| {label} (`{cat}`) | {count} | {score} | {real_accuracy} | {tps_score} | {bar} | {pass_rate} |")
lines += ["", "---", ""]
# ── 每分类题目详情 ──
lines.append("## 题目详情\n")
# 按分类分组
by_cat: dict[str, list[dict]] = {}
for r in results:
cat = r.get("category", "unknown")
by_cat.setdefault(cat, []).append(r)
for cat in sorted(by_cat.keys()):
cat_results = by_cat[cat]
label = _category_label(cat)
lines += [
f"### {label}(`{cat}`)",
"",
"| 题号 | 状态 | 总分 | 准确度分 | 速度分 | TPS | 耗时 | Tokens | 错误 |",
"|------|:----:|:----:|---------:|-------:|----:|-----:|-------:|------|",
]
for r in cat_results:
tid = r.get("id", "?")
icon = _status_icon(r)
score = r.get("score", 0)
real_accuracy = r.get("real_accuracy_score", 0)
accuracy = r.get("accuracy_score", 0)
max_s = r.get("max_accuracy_score", 0)
tps = r.get("tps", 0)
tps_score = r.get("tps_score", 0)
dur = _fmt_duration(r.get("duration_sec") or 0)
tokens = _fmt_tokens(r.get("total_tokens") or 0)
err = r.get("error", "")
if err and len(err) > 40:
err = err[:40] + "…"
lines.append(f"| `{tid}` | {icon} | {score} | {real_accuracy} | {tps_score} | {tps} | {dur} | {tokens} | {err} |")
lines.append("")
# 每题展开块
for r in cat_results:
tid = r.get("id", "?")
icon = _status_icon(r)
score = r.get("score", 0)
real_accuracy = r.get("real_accuracy_score", 0)
accuracy = r.get("accuracy_score", 0)
max_s = r.get("max_accuracy_score", 0)
tps = r.get("tps", 0)
tps_score = r.get("tps_score", 0)
dur = _fmt_duration(r.get("duration_sec") or 0)
in_tok = _fmt_tokens(r.get("input_tokens") or 0)
out_tok = _fmt_tokens(r.get("output_tokens") or 0)
tot_tok = _fmt_tokens(r.get("total_tokens") or 0)
err = r.get("error", "") or ""
stderr = (r.get("stderr") or "").strip()
output = (r.get("stdout") or "").strip()
lines += [
f"#### {icon} `{tid}` — 总分 {score}(准确度 {real_accuracy} + 速度 {tps_score})",
"",
f"| 字段 | 值 |",
f"|------|-----|",
f"| 总分 | {score} |",
f"| 准确度分 | {real_accuracy} |",
f"| 速度分 | {tps_score} |",
f"| 耗时 | {dur} |",
f"| TPS | {tps} tokens/s |",
f"| Token | 输入 {in_tok} / 输出 {out_tok} / 合计 {tot_tok} |",
]
if err:
lines.append(f"| 错误 | `{err}` |")
lines.append("")
if output:
# 截取前 500 字符避免报表过长
preview = output[:500].replace("```", "'''")
if len(output) > 500:
preview += f"\n\n*…(共 {len(output)} 字符,已截断)*"
lines += [
"<details>",
"<summary>Agent 输出(点击展开)</summary>",
"",
"```",
preview,
"```",
"",
"</details>",
"",
]
if stderr:
preview = stderr[:300].replace("```", "'''")
if len(stderr) > 300:
preview += "\n…(已截断)"
lines += [
"<details>",
"<summary>stderr(点击展开)</summary>",
"",
"```",
preview,
"```",
"",
"</details>",
"",
]
lines += ["---", ""]
# ── 全国排名 ──
lines += _render_leaderboard(data)
return "\n".join(lines)
# ---------- 主入口 ----------
def generate_reports_from_dict(data: dict[str, Any]) -> tuple[str, str]:
"""
直接接受 summary dict,生成简报和详细报告,返回 (简报路径, 详细报告路径)。
"""
totals = _compute_totals(data)
os.makedirs(DATA_DIR, exist_ok=True)
summary_path = os.path.join(DATA_DIR, "report_summary.md")
detail_path = os.path.join(DATA_DIR, "report_detail.md")
with open(summary_path, "w", encoding="utf-8") as f:
f.write(_render_summary(data, totals))
with open(detail_path, "w", encoding="utf-8") as f:
f.write(_render_detail(data, totals))
return summary_path, detail_path
def generate_reports(input_path: str) -> tuple[str, str]:
"""
读取 results.json,返回 (简报路径, 详细报告路径)。
"""
data = _load(input_path)
return generate_reports_from_dict(data)
def main() -> None:
parser = argparse.ArgumentParser(description="BenchClaw 报表生成器")
parser.add_argument(
"--input", "-i",
default=DEFAULT_INPUT,
help=f"results.json 路径(默认:{DEFAULT_INPUT})",
)
args = parser.parse_args()
input_path = os.path.abspath(args.input)
if not os.path.exists(input_path):
print(f"错误:找不到输入文件 {input_path}")
raise SystemExit(1)
summary_path, detail_path = generate_reports(input_path)
print(f"简要报表:{summary_path}")
print(f"详细报表:{detail_path}")
if __name__ == "__main__":
main()
FILE:scripts/requirements.txt
# This file was autogenerated by uv via the following command:
# uv pip compile scripts/requirements.txt --generate-hashes -o scripts/requirements.txt
cffi==2.0.0 \
--hash=sha256:00bdf7acc5f795150faa6957054fbbca2439db2f775ce831222b66f192f03beb \
--hash=sha256:07b271772c100085dd28b74fa0cd81c8fb1a3ba18b21e03d7c27f3436a10606b \
--hash=sha256:087067fa8953339c723661eda6b54bc98c5625757ea62e95eb4898ad5e776e9f \
--hash=sha256:0a1527a803f0a659de1af2e1fd700213caba79377e27e4693648c2923da066f9 \
--hash=sha256:0cf2d91ecc3fcc0625c2c530fe004f82c110405f101548512cce44322fa8ac44 \
--hash=sha256:0f6084a0ea23d05d20c3edcda20c3d006f9b6f3fefeac38f59262e10cef47ee2 \
--hash=sha256:12873ca6cb9b0f0d3a0da705d6086fe911591737a59f28b7936bdfed27c0d47c \
--hash=sha256:19f705ada2530c1167abacb171925dd886168931e0a7b78f5bffcae5c6b5be75 \
--hash=sha256:1cd13c99ce269b3ed80b417dcd591415d3372bcac067009b6e0f59c7d4015e65 \
--hash=sha256:1e3a615586f05fc4065a8b22b8152f0c1b00cdbc60596d187c2a74f9e3036e4e \
--hash=sha256:1f72fb8906754ac8a2cc3f9f5aaa298070652a0ffae577e0ea9bd480dc3c931a \
--hash=sha256:1fc9ea04857caf665289b7a75923f2c6ed559b8298a1b8c49e59f7dd95c8481e \
--hash=sha256:203a48d1fb583fc7d78a4c6655692963b860a417c0528492a6bc21f1aaefab25 \
--hash=sha256:2081580ebb843f759b9f617314a24ed5738c51d2aee65d31e02f6f7a2b97707a \
--hash=sha256:21d1152871b019407d8ac3985f6775c079416c282e431a4da6afe7aefd2bccbe \
--hash=sha256:24b6f81f1983e6df8db3adc38562c83f7d4a0c36162885ec7f7b77c7dcbec97b \
--hash=sha256:256f80b80ca3853f90c21b23ee78cd008713787b1b1e93eae9f3d6a7134abd91 \
--hash=sha256:28a3a209b96630bca57cce802da70c266eb08c6e97e5afd61a75611ee6c64592 \
--hash=sha256:2c8f814d84194c9ea681642fd164267891702542f028a15fc97d4674b6206187 \
--hash=sha256:2de9a304e27f7596cd03d16f1b7c72219bd944e99cc52b84d0145aefb07cbd3c \
--hash=sha256:38100abb9d1b1435bc4cc340bb4489635dc2f0da7456590877030c9b3d40b0c1 \
--hash=sha256:3925dd22fa2b7699ed2617149842d2e6adde22b262fcbfada50e3d195e4b3a94 \
--hash=sha256:3e17ed538242334bf70832644a32a7aae3d83b57567f9fd60a26257e992b79ba \
--hash=sha256:3e837e369566884707ddaf85fc1744b47575005c0a229de3327f8f9a20f4efeb \
--hash=sha256:3f4d46d8b35698056ec29bca21546e1551a205058ae1a181d871e278b0b28165 \
--hash=sha256:44d1b5909021139fe36001ae048dbdde8214afa20200eda0f64c068cac5d5529 \
--hash=sha256:45d5e886156860dc35862657e1494b9bae8dfa63bf56796f2fb56e1679fc0bca \
--hash=sha256:4647afc2f90d1ddd33441e5b0e85b16b12ddec4fca55f0d9671fef036ecca27c \
--hash=sha256:4671d9dd5ec934cb9a73e7ee9676f9362aba54f7f34910956b84d727b0d73fb6 \
--hash=sha256:53f77cbe57044e88bbd5ed26ac1d0514d2acf0591dd6bb02a3ae37f76811b80c \
--hash=sha256:5eda85d6d1879e692d546a078b44251cdd08dd1cfb98dfb77b670c97cee49ea0 \
--hash=sha256:5fed36fccc0612a53f1d4d9a816b50a36702c28a2aa880cb8a122b3466638743 \
--hash=sha256:61d028e90346df14fedc3d1e5441df818d095f3b87d286825dfcbd6459b7ef63 \
--hash=sha256:66f011380d0e49ed280c789fbd08ff0d40968ee7b665575489afa95c98196ab5 \
--hash=sha256:6824f87845e3396029f3820c206e459ccc91760e8fa24422f8b0c3d1731cbec5 \
--hash=sha256:6c6c373cfc5c83a975506110d17457138c8c63016b563cc9ed6e056a82f13ce4 \
--hash=sha256:6d02d6655b0e54f54c4ef0b94eb6be0607b70853c45ce98bd278dc7de718be5d \
--hash=sha256:6d50360be4546678fc1b79ffe7a66265e28667840010348dd69a314145807a1b \
--hash=sha256:730cacb21e1bdff3ce90babf007d0a0917cc3e6492f336c2f0134101e0944f93 \
--hash=sha256:737fe7d37e1a1bffe70bd5754ea763a62a066dc5913ca57e957824b72a85e205 \
--hash=sha256:74a03b9698e198d47562765773b4a8309919089150a0bb17d829ad7b44b60d27 \
--hash=sha256:7553fb2090d71822f02c629afe6042c299edf91ba1bf94951165613553984512 \
--hash=sha256:7a66c7204d8869299919db4d5069a82f1561581af12b11b3c9f48c584eb8743d \
--hash=sha256:7cc09976e8b56f8cebd752f7113ad07752461f48a58cbba644139015ac24954c \
--hash=sha256:81afed14892743bbe14dacb9e36d9e0e504cd204e0b165062c488942b9718037 \
--hash=sha256:8941aaadaf67246224cee8c3803777eed332a19d909b47e29c9842ef1e79ac26 \
--hash=sha256:89472c9762729b5ae1ad974b777416bfda4ac5642423fa93bd57a09204712322 \
--hash=sha256:8ea985900c5c95ce9db1745f7933eeef5d314f0565b27625d9a10ec9881e1bfb \
--hash=sha256:8eca2a813c1cb7ad4fb74d368c2ffbbb4789d377ee5bb8df98373c2cc0dee76c \
--hash=sha256:92b68146a71df78564e4ef48af17551a5ddd142e5190cdf2c5624d0c3ff5b2e8 \
--hash=sha256:9332088d75dc3241c702d852d4671613136d90fa6881da7d770a483fd05248b4 \
--hash=sha256:94698a9c5f91f9d138526b48fe26a199609544591f859c870d477351dc7b2414 \
--hash=sha256:9a67fc9e8eb39039280526379fb3a70023d77caec1852002b4da7e8b270c4dd9 \
--hash=sha256:9de40a7b0323d889cf8d23d1ef214f565ab154443c42737dfe52ff82cf857664 \
--hash=sha256:a05d0c237b3349096d3981b727493e22147f934b20f6f125a3eba8f994bec4a9 \
--hash=sha256:afb8db5439b81cf9c9d0c80404b60c3cc9c3add93e114dcae767f1477cb53775 \
--hash=sha256:b18a3ed7d5b3bd8d9ef7a8cb226502c6bf8308df1525e1cc676c3680e7176739 \
--hash=sha256:b1e74d11748e7e98e2f426ab176d4ed720a64412b6a15054378afdb71e0f37dc \
--hash=sha256:b21e08af67b8a103c71a250401c78d5e0893beff75e28c53c98f4de42f774062 \
--hash=sha256:b4c854ef3adc177950a8dfc81a86f5115d2abd545751a304c5bcf2c2c7283cfe \
--hash=sha256:b882b3df248017dba09d6b16defe9b5c407fe32fc7c65a9c69798e6175601be9 \
--hash=sha256:baf5215e0ab74c16e2dd324e8ec067ef59e41125d3eade2b863d294fd5035c92 \
--hash=sha256:c649e3a33450ec82378822b3dad03cc228b8f5963c0c12fc3b1e0ab940f768a5 \
--hash=sha256:c654de545946e0db659b3400168c9ad31b5d29593291482c43e3564effbcee13 \
--hash=sha256:c6638687455baf640e37344fe26d37c404db8b80d037c3d29f58fe8d1c3b194d \
--hash=sha256:c8d3b5532fc71b7a77c09192b4a5a200ea992702734a2e9279a37f2478236f26 \
--hash=sha256:cb527a79772e5ef98fb1d700678fe031e353e765d1ca2d409c92263c6d43e09f \
--hash=sha256:cf364028c016c03078a23b503f02058f1814320a56ad535686f90565636a9495 \
--hash=sha256:d48a880098c96020b02d5a1f7d9251308510ce8858940e6fa99ece33f610838b \
--hash=sha256:d68b6cef7827e8641e8ef16f4494edda8b36104d79773a334beaa1e3521430f6 \
--hash=sha256:d9b29c1f0ae438d5ee9acb31cadee00a58c46cc9c0b2f9038c6b0b3470877a8c \
--hash=sha256:d9b97165e8aed9272a6bb17c01e3cc5871a594a446ebedc996e2397a1c1ea8ef \
--hash=sha256:da68248800ad6320861f129cd9c1bf96ca849a2771a59e0344e88681905916f5 \
--hash=sha256:da902562c3e9c550df360bfa53c035b2f241fed6d9aef119048073680ace4a18 \
--hash=sha256:dbd5c7a25a7cb98f5ca55d258b103a2054f859a46ae11aaf23134f9cc0d356ad \
--hash=sha256:dd4f05f54a52fb558f1ba9f528228066954fee3ebe629fc1660d874d040ae5a3 \
--hash=sha256:de8dad4425a6ca6e4e5e297b27b5c824ecc7581910bf9aee86cb6835e6812aa7 \
--hash=sha256:e11e82b744887154b182fd3e7e8512418446501191994dbf9c9fc1f32cc8efd5 \
--hash=sha256:e6e73b9e02893c764e7e8d5bb5ce277f1a009cd5243f8228f75f842bf937c534 \
--hash=sha256:f73b96c41e3b2adedc34a7356e64c8eb96e03a3782b535e043a986276ce12a49 \
--hash=sha256:f93fd8e5c8c0a4aa1f424d6173f14a892044054871c771f8566e4008eaa359d2 \
--hash=sha256:fc33c5141b55ed366cfaad382df24fe7dcbc686de5be719b207bb248e3053dc5 \
--hash=sha256:fc7de24befaeae77ba923797c7c87834c73648a05a4bde34b3b7e5588973a453 \
--hash=sha256:fe562eb1a64e67dd297ccc4f5addea2501664954f2692b69a76449ec7913ecbf
# via cryptography
cryptography==46.0.5 \
--hash=sha256:02f547fce831f5096c9a567fd41bc12ca8f11df260959ecc7c3202555cc47a72 \
--hash=sha256:039917b0dc418bb9f6edce8a906572d69e74bd330b0b3fea4f79dab7f8ddd235 \
--hash=sha256:1abfdb89b41c3be0365328a410baa9df3ff8a9110fb75e7b52e66803ddabc9a9 \
--hash=sha256:2ae6971afd6246710480e3f15824ed3029a60fc16991db250034efd0b9fb4356 \
--hash=sha256:2b7a67c9cd56372f3249b39699f2ad479f6991e62ea15800973b956f4b73e257 \
--hash=sha256:351695ada9ea9618b3500b490ad54c739860883df6c1f555e088eaf25b1bbaad \
--hash=sha256:38946c54b16c885c72c4f59846be9743d699eee2b69b6988e0a00a01f46a61a4 \
--hash=sha256:3b4995dc971c9fb83c25aa44cf45f02ba86f71ee600d81091c2f0cbae116b06c \
--hash=sha256:3ce58ba46e1bc2aac4f7d9290223cead56743fa6ab94a5d53292ffaac6a91614 \
--hash=sha256:3ee190460e2fbe447175cda91b88b84ae8322a104fc27766ad09428754a618ed \
--hash=sha256:4108d4c09fbbf2789d0c926eb4152ae1760d5a2d97612b92d508d96c861e4d31 \
--hash=sha256:420d0e909050490d04359e7fdb5ed7e667ca5c3c402b809ae2563d7e66a92229 \
--hash=sha256:47fb8a66058b80e509c47118ef8a75d14c455e81ac369050f20ba0d23e77fee0 \
--hash=sha256:4c3341037c136030cb46e4b1e17b7418ea4cbd9dd207e4a6f3b2b24e0d4ac731 \
--hash=sha256:4d7e3d356b8cd4ea5aff04f129d5f66ebdc7b6f8eae802b93739ed520c47c79b \
--hash=sha256:4d8ae8659ab18c65ced284993c2265910f6c9e650189d4e3f68445ef82a810e4 \
--hash=sha256:4e817a8920bfbcff8940ecfd60f23d01836408242b30f1a708d93198393a80b4 \
--hash=sha256:50bfb6925eff619c9c023b967d5b77a54e04256c4281b0e21336a130cd7fc263 \
--hash=sha256:556e106ee01aa13484ce9b0239bca667be5004efb0aabbed28d353df86445595 \
--hash=sha256:582f5fcd2afa31622f317f80426a027f30dc792e9c80ffee87b993200ea115f1 \
--hash=sha256:5be7bf2fb40769e05739dd0046e7b26f9d4670badc7b032d6ce4db64dddc0678 \
--hash=sha256:60ee7e19e95104d4c03871d7d7dfb3d22ef8a9b9c6778c94e1c8fcc8365afd48 \
--hash=sha256:61aa400dce22cb001a98014f647dc21cda08f7915ceb95df0c9eaf84b4b6af76 \
--hash=sha256:68f68d13f2e1cb95163fa3b4db4bf9a159a418f5f6e7242564fc75fcae667fd0 \
--hash=sha256:7d1f30a86d2757199cb2d56e48cce14deddf1f9c95f1ef1b64ee91ea43fe2e18 \
--hash=sha256:7d731d4b107030987fd61a7f8ab512b25b53cef8f233a97379ede116f30eb67d \
--hash=sha256:803812e111e75d1aa73690d2facc295eaefd4439be1023fefc4995eaea2af90d \
--hash=sha256:80a8d7bfdf38f87ca30a5391c0c9ce4ed2926918e017c29ddf643d0ed2778ea1 \
--hash=sha256:8293f3dea7fc929ef7240796ba231413afa7b68ce38fd21da2995549f5961981 \
--hash=sha256:8456928655f856c6e1533ff59d5be76578a7157224dbd9ce6872f25055ab9ab7 \
--hash=sha256:890bcb4abd5a2d3f852196437129eb3667d62630333aacc13dfd470fad3aaa82 \
--hash=sha256:94a76daa32eb78d61339aff7952ea819b1734b46f73646a07decb40e5b3448e2 \
--hash=sha256:9f16fbdf4da055efb21c22d81b89f155f02ba420558db21288b3d0035bafd5f4 \
--hash=sha256:a3d1fae9863299076f05cb8a778c467578262fae09f9dc0ee9b12eb4268ce663 \
--hash=sha256:a3d507bb6a513ca96ba84443226af944b0f7f47dcc9a399d110cd6146481d24c \
--hash=sha256:abace499247268e3757271b2f1e244b36b06f8515cf27c4d49468fc9eb16e93d \
--hash=sha256:ba2a27ff02f48193fc4daeadf8ad2590516fa3d0adeeb34336b96f7fa64c1e3a \
--hash=sha256:bc84e875994c3b445871ea7181d424588171efec3e185dced958dad9e001950a \
--hash=sha256:bfd56bb4b37ed4f330b82402f6f435845a5f5648edf1ad497da51a8452d5d62d \
--hash=sha256:c18ff11e86df2e28854939acde2d003f7984f721eba450b56a200ad90eeb0e6b \
--hash=sha256:c3bcce8521d785d510b2aad26ae2c966092b7daa8f45dd8f44734a104dc0bc1a \
--hash=sha256:c4143987a42a2397f2fc3b4d7e3a7d313fbe684f67ff443999e803dd75a76826 \
--hash=sha256:c69fd885df7d089548a42d5ec05be26050ebcd2283d89b3d30676eb32ff87dee \
--hash=sha256:ced80795227d70549a411a4ab66e8ce307899fad2220ce5ab2f296e687eacde9 \
--hash=sha256:d66e421495fdb797610a08f43b05269e0a5ea7f5e652a89bfd5a7d3c1dee3648 \
--hash=sha256:d861ee9e76ace6cf36a6a89b959ec08e7bc2493ee39d07ffe5acb23ef46d27da \
--hash=sha256:e9251e3be159d1020c4030bd2e5f84d6a43fe54b6c19c12f51cde9542a2817b2 \
--hash=sha256:f145bba11b878005c496e93e257c1e88f154d278d2638e6450d17e0f31e558d2 \
--hash=sha256:fe346b143ff9685e40192a4960938545c699054ba11d4f9029f94751e3f71d87
# via -r scripts/requirements.txt
psutil==7.2.2 \
--hash=sha256:0746f5f8d406af344fd547f1c8daa5f5c33dbc293bb8d6a16d80b4bb88f59372 \
--hash=sha256:076a2d2f923fd4821644f5ba89f059523da90dc9014e85f8e45a5774ca5bc6f9 \
--hash=sha256:11fe5a4f613759764e79c65cf11ebdf26e33d6dd34336f8a337aa2996d71c841 \
--hash=sha256:1a571f2330c966c62aeda00dd24620425d4b0cc86881c89861fbc04549e5dc63 \
--hash=sha256:1a7b04c10f32cc88ab39cbf606e117fd74721c831c98a27dc04578deb0c16979 \
--hash=sha256:1fa4ecf83bcdf6e6c8f4449aff98eefb5d0604bf88cb883d7da3d8d2d909546a \
--hash=sha256:2edccc433cbfa046b980b0df0171cd25bcaeb3a68fe9022db0979e7aa74a826b \
--hash=sha256:7b6d09433a10592ce39b13d7be5a54fbac1d1228ed29abc880fb23df7cb694c9 \
--hash=sha256:8c233660f575a5a89e6d4cb65d9f938126312bca76d8fe087b947b3a1aaac9ee \
--hash=sha256:917e891983ca3c1887b4ef36447b1e0873e70c933afc831c6b6da078ba474312 \
--hash=sha256:ab486563df44c17f5173621c7b198955bd6b613fb87c71c161f827d3fb149a9b \
--hash=sha256:ae0aefdd8796a7737eccea863f80f81e468a1e4cf14d926bd9b6f5f2d5f90ca9 \
--hash=sha256:b0726cecd84f9474419d67252add4ac0cd9811b04d61123054b9fb6f57df6e9e \
--hash=sha256:b58fabe35e80b264a4e3bb23e6b96f9e45a3df7fb7eed419ac0e5947c61e47cc \
--hash=sha256:c7663d4e37f13e884d13994247449e9f8f574bc4655d509c3b95e9ec9e2b9dc1 \
--hash=sha256:e452c464a02e7dc7822a05d25db4cde564444a67e58539a00f929c51eddda0cf \
--hash=sha256:e78c8603dcd9a04c7364f1a3e670cea95d51ee865e4efb3556a3a63adef958ea \
--hash=sha256:eb7e81434c8d223ec4a219b5fc1c47d0417b12be7ea866e24fb5ad6e84b3d988 \
--hash=sha256:ed0cace939114f62738d808fdcecd4c869222507e266e574799e9c0faa17d486 \
--hash=sha256:eed63d3b4d62449571547b60578c5b2c4bcccc5387148db46e0c2313dad0ee00 \
--hash=sha256:fd04ef36b4a6d599bbdb225dd1d3f51e00105f6d48a28f006da7f9822f2606d8
# via -r scripts/requirements.txt
pycparser==3.0 \
--hash=sha256:600f49d217304a5902ac3c37e1281c9fe94e4d0489de643a9504c5cdfdfc6b29 \
--hash=sha256:b727414169a36b7d524c1c3e31839a521725078d7b2ff038656844266160a992
# via cffi
FILE:scripts/server.py
"""
BenchClaw 服务端 API 模块。
负责与评测服务端的所有 HTTP 交互:
- fetch_questions 从服务端拉取题目列表
- upload_results_from_dict 将 summary dict 上传到服务端
- upload_results 从文件读取后上传(兼容 CLI 用法)
"""
from __future__ import annotations
import json
import os
import re
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime, timezone
from typing import Any
from config import (
CLIENT_VERSION,
DEFAULT_API_URL,
DEFAULT_SUBMIT_API_URL,
UPLOAD_STDOUT_TRUNCATE_LENGTH,
UPLOAD_STDERR_TRUNCATE_LENGTH,
)
from crypto import hybrid_encrypt_json
# 分类得分映射 → s1~s5:能力、配置、安全、硬件、权限(与官网/榜单列一致)
CATEGORY_ORDER = ["capability", "config", "security", "hardware", "permission"]
# 服务端要求固定 25 个 b/r 字段
TOTAL_QUESTIONS = 25
# ─────────────────────────────────────────────
# 工具函数
# ─────────────────────────────────────────────
_SANITIZE_RULES: list[tuple[re.Pattern[str], str]] = [
# Anthropic Claude API key(必须在 OpenAI sk- 规则之前)
(re.compile(r"sk-ant-[a-zA-Z0-9\-]{20,}"), "sk-ant-***"),
# OpenAI API key
(re.compile(r"sk-[a-zA-Z0-9]{20,}"), "sk-***"),
# Google Gemini API key
(re.compile(r"AIza[a-zA-Z0-9_\-]{35}"), "AIza***"),
# AWS Access Key ID
(re.compile(r"AKIA[A-Z0-9]{16}"), "AKIA***"),
# GitHub Personal Access Token
(re.compile(r"ghp_[a-zA-Z0-9]{36}"), "ghp_***"),
# ClaWHub token
(re.compile(r"clh_[a-zA-Z0-9]+"), "clh_***"),
# Feishu open_id
(re.compile(r"ou_[a-f0-9]{32}"), "ou_***"),
# Slack token
(re.compile(r"xox[bpsa]-[a-zA-Z0-9\-]+"), "xox-***"),
# 本地路径 /home/...
(re.compile(r"/home/[^\s\"']+"), "/home/***"),
# 本地路径 /root/...
(re.compile(r"/root/[^\s\"']+"), "/root/***"),
# 邮箱地址
(re.compile(r"\b[\w.\+\-]+@[\w.\-]+\.\w+\b"), "***@***"),
]
def _sanitize_output(text: str) -> str:
"""对 stdout/stderr 文本进行正则脱敏,替换已知的敏感信息模式。"""
for pattern, replacement in _SANITIZE_RULES:
text = pattern.sub(replacement, text)
return text
def _dump_to_temp(data: Any, filename: str) -> None:
"""将数据以 JSON 格式写入 tests 目录,便于调试查看。"""
temp_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "temp")
os.makedirs(temp_dir, exist_ok=True)
path = os.path.join(temp_dir, filename)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _iso_time(ts: float) -> str:
"""将 Unix 时间戳(秒或毫秒)转为 ISO 8601 UTC 字符串。"""
# 如果时间戳大于 1e10,认为是毫秒,转换为秒
if ts > 1e10:
ts = ts / 1000.0
dt = datetime.fromtimestamp(ts, tz=timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def _post_json(
url: str,
body: dict[str, Any],
headers: dict[str, str] | None = None,
timeout: int = 30,
encrypt: bool = False,
) -> dict[str, Any]:
"""
通用 POST JSON 请求,返回解析后的响应 dict。
Parameters
----------
encrypt : bool
True 时 RSA+AES 混合加密为 {"key","gpv"};响应 data 为明文 JSON(v2.9)。
"""
if encrypt:
key_b64, gpv, _aes = hybrid_encrypt_json(body)
payload = json.dumps({"key": key_b64, "gpv": gpv}, ensure_ascii=False).encode("utf-8")
else:
payload = json.dumps(body, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
for k, v in (headers or {}).items():
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return json.loads(raw)
# ─────────────────────────────────────────────
# 下载题目
# ─────────────────────────────────────────────
def fetch_questions(
bench_session_id: str,
primary_model: str,
api_url: str = DEFAULT_API_URL,
openclaw_root: str = "",
) -> dict[str, Any]:
"""
从服务端 API 获取题目列表。
Returns
-------
dict:
- questions : list[dict]
- session_id : str
- hash : str
- model_cost : dict | None
"""
out = _post_json(
api_url,
body={"model_name": primary_model, "client_version": CLIENT_VERSION},
headers={"X-Bench-Session-Id": bench_session_id},
encrypt=True,
)
if not out.get("success"):
raise RuntimeError(f"API returned success=false: {out.get('message', out)}")
data = out.get("data")
if not isinstance(data, dict):
raise RuntimeError(f"API data 格式错误:期望题目包 JSON 对象,实际 {type(data).__name__}")
# 仅供调试使用:将解密后的原始数据写入 tests 目录,便于调试查看
# _dump_to_temp(data, "fetch_questions_data.json")
questions = data.get("questions")
if not isinstance(questions, list):
raise RuntimeError("API data.questions is not a list")
return {
"questions": questions,
"session_id": data.get("session_id", ""),
"hash": data.get("hash", ""),
"model_cost": data.get("model_cost"),
}
# ─────────────────────────────────────────────
# 上传结果
# ─────────────────────────────────────────────
def _build_upload_payload(data: dict[str, Any]) -> dict[str, Any]:
"""将 summary dict 转换为服务端上传格式。"""
results: list[dict[str, Any]] = data.get("results", [])
_stats = data.get("stats") or {}
cat_stats: dict[str, dict] = _stats.get("category_stats") or data.get("category_stats", {})
# 构建 env_info(设备环境信息,二级 JSON)
_sys = data.get("sys_info") or {}
import platform as _platform
env_info: dict[str, Any] = {
"cpu_cores": _sys.get("cpu_cores"),
"memory_gb": _sys.get("ram_total_gb"),
"os": _platform.system().lower(),
"python_version": _platform.python_version(),
}
# 去掉 None 值
env_info = {k: v for k, v in env_info.items() if v is not None}
payload: dict[str, Any] = {
"session_id": data.get("api_session_id") or data.get("session_id", ""),
"hash": data.get("api_hash") or data.get("hash", ""),
"client_version": CLIENT_VERSION,
"model_name": data.get("model_name", ""),
"total_score": sum(r.get("score", 0) for r in results),
"openclaw_name": data.get("agent_name", ""),
"openclaw_version": data.get("openclaw_version", ""),
"host_type": _sys.get("host_type", ""),
"env_info": env_info,
}
# s1~s5:按 CATEGORY_ORDER 填入各分类总分
for idx, cat in enumerate(CATEGORY_ORDER, start=1):
payload[f"s{idx}"] = cat_stats.get(cat, {}).get("score", 0)
# b1~b25:每题得分,实际结果按顺序填入,未执行的题目补 0
for idx in range(1, TOTAL_QUESTIONS + 1):
r = results[idx - 1] if idx <= len(results) else None
payload[f"b{idx}"] = r.get("score", 0) if r else 0
# r1~r25:每题运行详情,未执行的题目补空值
now_ts = time.time()
total_duration = sum(r.get("duration_sec") or 0 for r in results)
cursor_ts = now_ts - total_duration
for idx in range(1, TOTAL_QUESTIONS + 1):
r = results[idx - 1] if idx <= len(results) else None
if r is None:
payload[f"r{idx}"] = {
"start_time": "",
"end_time": "",
"total_tokens": 0,
"input_tokens": 0,
"output_tokens": 0,
"cache_read_tokens": 0,
"cache_write_tokens": 0,
"returncode": -1,
"error": "",
"stdout": "",
"stderr": "",
"accuracy_score": 0,
"real_accuracy_score": 0,
"tps_score": 0,
}
continue
duration = r.get("duration_sec") or 0
start_ts = cursor_ts
end_ts = cursor_ts + duration
cursor_ts = end_ts
# stdout/output 取非空值(CLI 模式用 output,WS 模式用 stdout)
stdout_val = (r.get("stdout") or "")
# 截断超长文本,避免 payload 过大
if len(stdout_val) > UPLOAD_STDOUT_TRUNCATE_LENGTH:
stdout_val = stdout_val[:UPLOAD_STDOUT_TRUNCATE_LENGTH] + "…(truncated)"
stdout_val = _sanitize_output(stdout_val)
stderr_val = (r.get("stderr") or "")
if len(stderr_val) > UPLOAD_STDERR_TRUNCATE_LENGTH:
stderr_val = stderr_val[:UPLOAD_STDERR_TRUNCATE_LENGTH] + "…(truncated)"
stderr_val = _sanitize_output(stderr_val)
payload[f"r{idx}"] = {
"start_time": _iso_time(r.get("start_time")) if r.get("start_time") else "",
"end_time": _iso_time(r.get("end_time")) if r.get("end_time") else "",
"total_tokens": r.get("total_tokens") or 0,
"input_tokens": r.get("input_tokens") or 0,
"output_tokens": r.get("output_tokens") or 0,
"cache_read_tokens": r.get("cache_read_tokens") or 0,
"cache_write_tokens": r.get("cache_write_tokens") or 0,
"returncode": r.get("returncode", -1),
"error": r.get("error") or "",
"stdout": stdout_val,
"stderr": stderr_val,
"accuracy_score": r.get("accuracy_score") or 0,
"real_accuracy_score": r.get("real_accuracy_score") or 0,
"tps_score": r.get("tps_score") or 0,
}
return payload
# ─────────────────────────────────────────────
# 上传缓存(网络失败时本地暂存,下次启动重试)
# ─────────────────────────────────────────────
_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "cache")
def _cache_dir() -> str:
os.makedirs(_CACHE_DIR, exist_ok=True)
return _CACHE_DIR
def _save_pending_upload(body: bytes, bench_session_id: str, upload_url: str) -> str:
"""
将上传失败的 body 连同元信息序列化写入 cache 目录。
文件名格式:cache_<timestamp_ms>.dat
返回写入的文件路径。
"""
ts = int(time.time() * 1000)
filename = f"cache_{ts}.dat"
path = os.path.join(_cache_dir(), filename)
record = {
"upload_url": upload_url,
"bench_session_id": bench_session_id,
"body_hex": body.hex(), # bytes 转 hex 字符串,便于 JSON 序列化
"created_at": ts,
}
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, ensure_ascii=False)
return path
def _do_post_body(body: bytes, bench_session_id: str, upload_url: str, timeout: int = 30) -> tuple[bool, str]:
"""发送已构建好的 body bytes,返回 (ok, msg)。"""
req = urllib.request.Request(upload_url, data=body, method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("Accept", "application/json")
req.add_header("X-Bench-Session-Id", bench_session_id)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
out = json.loads(raw)
if out.get("success"):
return True, raw
return False, out.get("message") or raw
except urllib.error.HTTPError as e:
body_err = e.read().decode("utf-8", errors="replace")
return False, f"HTTP {e.code}: {body_err}"
except Exception as e:
return False, str(e)
def flush_pending_uploads() -> list[str]:
"""
扫描 cache 目录,将所有未上报的 .dat 文件逐个重试上传。
上传成功后删除对应文件;失败则保留留待下次重试。
返回本次成功上传的文件路径列表。
"""
cache = _cache_dir()
dat_files = sorted(
(f for f in os.listdir(cache) if f.startswith("cache_") and f.endswith(".dat")),
)
if not dat_files:
return []
succeeded: list[str] = []
for filename in dat_files:
path = os.path.join(cache, filename)
try:
with open(path, "r", encoding="utf-8") as f:
record = json.load(f)
body = bytes.fromhex(record["body_hex"])
bench_session_id = record.get("bench_session_id", "")
upload_url = record.get("upload_url", DEFAULT_SUBMIT_API_URL)
except Exception as e:
# 文件损坏,跳过但不删除,避免误删
print(f"[flush] 读取缓存文件失败 {filename}: {e}")
continue
ok, msg = _do_post_body(body, bench_session_id, upload_url)
if ok:
try:
os.remove(path)
except OSError:
pass
succeeded.append(path)
print(f"[flush] 补报成功,已删除缓存: {filename}")
else:
print(f"[flush] 补报失败,保留缓存 {filename}: {msg}")
return succeeded
def _parse_leaderboard(raw_response: str) -> dict[str, Any]:
"""
从上传接口的原始响应中提取排行榜数据。
期望响应格式:
{"success": true, "data": {"percentiles": {"total": 91.8, "s1": 93.9, ...},
"sample_size": 17, "leaderboard_url": "...", "note": "..."}}
返回标准化后的排行榜 dict,解析失败时返回 {}。
"""
try:
out = json.loads(raw_response)
except (json.JSONDecodeError, TypeError):
return {}
resp_data = out.get("data")
if not isinstance(resp_data, dict):
return {}
percentiles = resp_data.get("percentiles")
if not isinstance(percentiles, dict):
return {}
return {
"percentiles": percentiles,
"sample_size": resp_data.get("sample_size"),
"leaderboard_url": resp_data.get("leaderboard_url", ""),
"note": percentiles.get("note") or resp_data.get("note", ""),
"updated_at": percentiles.get("updated_at", ""),
"status": percentiles.get("status", ""),
}
def upload_results_from_dict(
data: dict[str, Any],
bench_session_id: str,
hash: str,
upload_url: str = DEFAULT_SUBMIT_API_URL,
) -> tuple[bool, str, dict[str, Any]]:
"""
直接接受 summary dict,构建上传格式后 POST 到服务端。
若因网络原因失败,自动将 body 缓存到 cache 目录,下次启动时补报。
Returns
-------
(ok: bool, message: str, leaderboard: dict)
leaderboard 结构示例:
{
"percentiles": {"total": 91.8, "s1": 93.9, "s2": 0.1, ...},
"sample_size": 17,
"leaderboard_url": "https://{BENCHCLAW_SITE_HOST}/leaderboard", # 实际 URL 由服务端返回
"note": "...",
}
上传失败时返回空 dict。
"""
if not data.get("results"):
return False, "results 列表为空,跳过上传", {}
try:
payload = _build_upload_payload(data)
except Exception as e:
return False, f"构建上传数据失败: {e}", {}
key_b64, gpv, _aes = hybrid_encrypt_json(payload)
body = json.dumps({"key": key_b64, "gpv": gpv}, ensure_ascii=False).encode("utf-8")
ok, msg = _do_post_body(body, bench_session_id, upload_url)
if ok:
leaderboard = _parse_leaderboard(msg)
return True, msg, leaderboard
# 上报失败时,缓存数据
cached_path = _save_pending_upload(body, bench_session_id, upload_url)
return False, f"{msg}(已缓存至 {os.path.basename(cached_path)},下次启动自动补报)", {}
def upload_results(
results_path: str,
bench_session_id: str,
session_id: str,
hash: str,
) -> tuple[bool, str, dict[str, Any]]:
"""
从文件读取 results.json 后上传(兼容 CLI 用法)。
"""
if not os.path.exists(results_path):
return False, f"结果文件不存在: {results_path}", {}
try:
with open(results_path, "r", encoding="utf-8") as f:
data = json.load(f)
data['api_session_id'] = session_id
data['api_hash'] = hash
except Exception as e:
return False, f"读取结果文件失败: {e}", {}
return upload_results_from_dict(data, bench_session_id, hash)
# ─────────────────────────────────────────────
# CLI 入口
# ─────────────────────────────────────────────
from utils import get_bench_session_id
def test_upload():
path = sys.argv[1] if len(sys.argv) > 1 else os.path.join(
os.path.dirname(__file__), "..", "temp", "temp-results.json"
)
path = os.path.abspath(path)
print(f"上传文件: {path}")
# 下载题目
try:
bench_session_id = get_bench_session_id()
fetch_result = fetch_questions(bench_session_id, "minimax-cn/MiniMax-M2.5")
#questions = fetch_result["questions"]
api_session_id = fetch_result["session_id"]
api_hash = fetch_result["hash"]
print("api_hash", api_hash)
ok, msg, _ = upload_results(path, bench_session_id, api_session_id, api_hash)
print("成功" if ok else "失败", ":", msg)
except Exception as e:
print(e)
return
def test_sanitize():
"""对 _sanitize_output 的各条脱敏规则进行验证。"""
cases = [
# (描述, 输入, 期望包含的替换结果)
("Anthropic Claude API key", "token=sk-ant-abcdefghijklmnopqrst123456", "sk-ant-***"),
("OpenAI API key", "key: sk-abcdefghijklmnopqrstu", "sk-***"),
("Google Gemini API key", "AIzaSyAbCdEfGhIjKlMnOpQrStUvWxYz12345678", "AIza***"),
("AWS Access Key ID", "access_key=AKIAIOSFODNN7EXAMPLE", "AKIA***"),
("GitHub PAT", "ghp_" + "a" * 36, "ghp_***"),
("ClaWHub token", "auth: clh_MySecretToken123", "clh_***"),
("飞书 open_id", "open_id: ou_" + "a1b2c3d4" * 4, "ou_***"),
("Slack bot token", "xoxb-123456789-abcdefghij", "xox-***"),
("Slack user token", "xoxp-987654321-zyxwvutsrq", "xox-***"),
("本地路径 /home/", "reading /home/user/.bashrc failed", "/home/***"),
("本地路径 /root/", "config at /root/.config/app.yaml", "/root/***"),
("邮箱地址", "contact [email protected] for help", "***@***"),
("混合多条规则",
"key=sk-abcdefghijklmnopqrstu path=/home/ci/.env [email protected]",
None), # None 表示只打印结果,不做单一断言
]
passed = 0
failed = 0
for desc, text, expected in cases:
result = _sanitize_output(text)
if expected is None:
print(f" [INFO] {desc}")
print(f" 输入 : {text}")
print(f" 输出 : {result}")
print()
continue
if expected in result and text != result:
print(f" [PASS] {desc}")
print(f" 输入 : {text}")
print(f" 输出 : {result}")
passed += 1
else:
print(f" [FAIL] {desc}")
print(f" 输入 : {text}")
print(f" 输出 : {result}")
print(f" 期望含: {expected}")
failed += 1
print()
print(f"结果: {passed} 通过, {failed} 失败")
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "sanitize":
test_sanitize()
else:
test_upload()
FILE:scripts/session.py
from __future__ import annotations
import json
import logging
import os
import shutil
import subprocess
import sys
import time
import traceback
from pathlib import Path
from typing import Any, NamedTuple
from config import DEFAULT_AGENT_ID, DEFAULT_SESSION_PREFIX
SESSION_STORE_PATH = Path.home() / ".openclaw" / "agents" / "main" / "sessions" / "sessions.json"
DEFAULT_TASK_COUNT = 3
DEFAULT_TASK_SECONDS = 5
SESSION_LIST_TIMEOUT = 60
CHANNEL_MESSAGE_TIMEOUT = 60
AGENT_MESSAGE_TIMEOUT = 120
AGENT_MESSAGE_RETRY_COUNT = 1
logger = logging.getLogger("benchclaw.session")
class OpenClawSessionInfo(NamedTuple):
"""从 OpenClaw CLI / 会话存储解析出的通知目标(当前会话 + channel/target)。"""
session_id: str
session_key: str
channel: str | None
target: str | None
def ran_under_openclaw_exec() -> bool:
"""
True when spawned from OpenClaw gateway `exec` (sanitized env + markers).
Either OPENCLAW_SHELL=exec or OPENCLAW_CLI=1 suffices; manual `cmd` runs
typically have neither.
"""
return (
os.environ.get("OPENCLAW_SHELL", "").strip() == "exec"
or os.environ.get("OPENCLAW_CLI", "").strip() == "1"
)
def parse_json_from_mixed_output(output: str) -> dict[str, Any]:
"""
Parse JSON from output that may contain extra log lines.
"""
lines = output.splitlines()
buffer: list[str] = []
depth = 0
for line in lines:
stripped = line.lstrip()
if not buffer and not stripped.startswith("{"):
continue
buffer.append(line)
depth += line.count("{") - line.count("}")
if depth <= 0:
candidate = "\n".join(buffer)
try:
parsed = json.loads(candidate)
if isinstance(parsed, dict):
return parsed
except json.JSONDecodeError:
pass
buffer = []
depth = 0
raise ValueError("Failed to parse JSON from `openclaw sessions --json` output.")
def _session_updated_at_ts(session: dict[str, Any]) -> int:
"""Sort key: numeric updatedAt from sessions JSON; invalid or missing → 0."""
raw = session.get("updatedAt")
if raw is None:
return 0
if isinstance(raw, bool):
return 0
if isinstance(raw, int):
return raw
if isinstance(raw, float):
return int(raw)
if isinstance(raw, str):
s = raw.strip()
if not s:
return 0
try:
return int(s, 10)
except ValueError:
try:
return int(float(s))
except ValueError:
return 0
return 0
def _argv_preview(argv: list[str]) -> str:
return " ".join(argv)
def _log_openclaw_subprocess_failure(
label: str,
argv: list[str],
returncode: int | None,
stdout: str | None,
stderr: str | None,
) -> None:
"""Write full stdout/stderr to sessions.log (and console) when an openclaw subprocess fails."""
out = "" if stdout is None else stdout
err = "" if stderr is None else stderr
logger.error(
"%s failed (exit=%s) argv=%s\n--- stdout (full) ---\n%s\n--- stderr (full) ---\n%s",
label,
returncode,
_argv_preview(argv),
out,
err,
)
def _run_command(
argv: list[str],
timeout: int,
) -> subprocess.CompletedProcess[str]:
logger.info("Running: %s", " ".join(argv))
process_result = subprocess.run(
argv,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=timeout,
check=False,
)
logger.info("run command result: %s", process_result)
return process_result
def _load_session_store() -> dict[str, Any] | None:
if not SESSION_STORE_PATH.exists():
logger.warning("Session store not found: %s", SESSION_STORE_PATH)
return None
try:
payload = json.loads(SESSION_STORE_PATH.read_text(encoding="utf-8"))
except Exception as exc:
logger.warning("Failed to parse session store: %s", exc)
return None
if not isinstance(payload, dict):
return None
return payload
def resolve_session_delivery_context(
session_id: str,
session_key: str | None = None,
) -> tuple[str | None, str | None]:
"""Return channel/target for routing. `channel` may be set without `target` (e.g. webchat)."""
if not session_id and not session_key:
return None, None
payload = _load_session_store()
if payload is None:
return None, None
matched_entry: dict[str, Any] | None = None
matched_key: str | None = None
if session_key:
key_candidate = str(session_key).strip()
entry = payload.get(key_candidate)
if isinstance(entry, dict):
matched_entry = entry
matched_key = key_candidate
if matched_entry is None and session_id:
for key, entry in payload.items():
if not isinstance(entry, dict):
continue
if str(entry.get("sessionId", "")).strip() == str(session_id).strip():
matched_entry = entry
matched_key = str(key)
break
if matched_entry is None:
logger.warning(
"No matching session entry found for sessionId=%s sessionKey=%s",
session_id,
session_key,
)
return None, None
logger.info(
"Matched session entry: key=%s sessionId=%s",
matched_key,
str(matched_entry.get("sessionId", "")).strip(),
)
last_channel = str(matched_entry.get("lastChannel", "")).strip() or None
last_to = str(matched_entry.get("lastTo", "")).strip() or None
dc = matched_entry.get("deliveryContext")
dc_channel = (
str(dc.get("channel", "")).strip() or None
if isinstance(dc, dict)
else None
)
dc_to = (
str(dc.get("to", "")).strip() or None if isinstance(dc, dict) else None
)
channel = last_channel or dc_channel
target = last_to or dc_to
return channel, target
def lookup_session_id_for_key(session_key: str) -> str | None:
payload = _load_session_store()
if payload is None:
return None
entry = payload.get(str(session_key).strip())
if isinstance(entry, dict):
sid = str(entry.get("sessionId", "")).strip()
return sid or None
return None
def lookup_session_key_for_id(session_id: str) -> str | None:
payload = _load_session_store()
if payload is None:
return None
want = str(session_id).strip()
for key, entry in payload.items():
if not isinstance(entry, dict):
continue
if str(entry.get("sessionId", "")).strip() == want:
return str(key).strip() or None
return None
def cleanup_agent_sessions(
agent_id: str = DEFAULT_AGENT_ID,
session_prefix: str = DEFAULT_SESSION_PREFIX,
) -> dict[str, int]:
"""
清理指定 agent 下匹配 explicit session 前缀的历史会话:
1) 删除 <sessionId>.jsonl / <sessionId>.jsonl.lock
2) 删除 sessions.json 中对应 session key 条目
"""
result = {
"matched_entries": 0,
"removed_session_files": 0,
"removed_lock_files": 0,
"removed_store_entries": 0,
}
session_store_path = (
Path.home() / ".openclaw" / "agents" / str(agent_id) / "sessions" / "sessions.json"
)
sessions_dir = session_store_path.parent
if not session_store_path.exists():
logger.warning("cleanup_agent_sessions: session store not found: %s", session_store_path)
return result
try:
payload = json.loads(session_store_path.read_text(encoding="utf-8"))
except Exception as exc:
logger.warning("cleanup_agent_sessions: failed to parse session store: %s", exc)
return result
if not isinstance(payload, dict):
logger.warning("cleanup_agent_sessions: invalid session store structure")
return result
key_prefix = f"agent:{agent_id}:explicit:{session_prefix}".lower()
keys_to_delete: list[str] = []
for key, entry in payload.items():
key_str = str(key).strip()
if not key_str.lower().startswith(key_prefix):
continue
if not isinstance(entry, dict):
continue
result["matched_entries"] += 1
session_id = str(entry.get("sessionId", "")).strip()
if session_id:
session_file = sessions_dir / f"{session_id}.jsonl"
lock_file = sessions_dir / f"{session_id}.jsonl.lock"
if session_file.exists():
try:
session_file.unlink()
result["removed_session_files"] += 1
except OSError as exc:
logger.warning("cleanup_agent_sessions: failed to remove %s: %s", session_file, exc)
if lock_file.exists():
try:
lock_file.unlink()
result["removed_lock_files"] += 1
except OSError as exc:
logger.warning("cleanup_agent_sessions: failed to remove %s: %s", lock_file, exc)
keys_to_delete.append(key_str)
if not keys_to_delete:
logger.info("cleanup_agent_sessions: no matching session entries for prefix %s", key_prefix)
return result
for key in keys_to_delete:
if key in payload:
del payload[key]
result["removed_store_entries"] += 1
try:
session_store_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except Exception as exc:
logger.warning("cleanup_agent_sessions: failed to write session store: %s", exc)
return result
logger.info(
"cleanup_agent_sessions done: matched=%s removed_jsonl=%s removed_lock=%s removed_entries=%s",
result["matched_entries"],
result["removed_session_files"],
result["removed_lock_files"],
result["removed_store_entries"],
)
return result
def resolve_invoking_session(
sorted_sessions: list[dict[str, Any]],
) -> tuple[str, str]:
"""
必须由『当前会话』接收进度:优先环境变量(exec 注入或手工指定),否则退回 sessions --json 第一条。
手工测试可设 SESSION_LIST_SESSION_ID / SESSION_LIST_SESSION_KEY;
与 OpenClaw 对齐时可设 OPENCLAW_SESSION_ID / OPENCLAW_SESSION_KEY。
"""
env_id = (
os.getenv("OPENCLAW_SESSION_ID", "").strip()
or os.getenv("SESSION_LIST_SESSION_ID", "").strip()
)
env_key = (
os.getenv("OPENCLAW_SESSION_KEY", "").strip()
or os.getenv("SESSION_LIST_SESSION_KEY", "").strip()
)
if env_id and env_key:
logger.info("Notify target: invoking session from env id+key id=%s key=%s", env_id, env_key)
return env_id, env_key
if env_key:
sid = lookup_session_id_for_key(env_key)
if sid:
logger.info(
"Notify target: invoking session from env key=%s resolved sessionId=%s",
env_key,
sid,
)
return sid, env_key
logger.warning(
"OPENCLAW_SESSION_KEY / SESSION_LIST_SESSION_KEY set but sessionStore has no entry: %s",
env_key,
)
if env_id:
sk = lookup_session_key_for_id(env_id)
logger.info(
"Notify target: invoking session from env id=%s resolved sessionKey=%s",
env_id,
sk or "",
)
return env_id, sk or ""
if sorted_sessions:
sid = str(sorted_sessions[0].get("sessionId", "")).strip()
sk = str(sorted_sessions[0].get("key", "")).strip()
logger.warning(
"No session env hints; using top session from `sessions --json` (updatedAt desc) "
"id=%s key=%s — for correct 『current session』, set OPENCLAW_SESSION_KEY or SESSION_LIST_SESSION_KEY",
sid,
sk,
)
return sid, sk
return "", ""
def send_channel_message(
openclaw_cmd: str | None,
channel: str | None,
target: str | None,
message: str,
) -> bool:
under_openclaw = ran_under_openclaw_exec()
if not under_openclaw:
logger.info("跳过 channel 通知(非 OpenClaw exec):%s", message)
return False
msg = (message or "").strip()
if not msg:
logger.info("跳过 channel 通知(消息为空)。")
return False
ch_norm = (channel or "").strip().lower()
if not ch_norm:
logger.info("跳过 channel 通知(channel 未配置或为空)。")
return False
if ch_norm == "webchat":
logger.info("Channel webchat: skip send message.")
return False
tgt_norm = (target or "").strip()
if not tgt_norm:
logger.info(
"跳过 channel 通知(target 未配置或为空,channel=%s)。",
ch_norm,
)
return False
argv = (
[
openclaw_cmd,
"message",
"send",
"--message",
msg,
"--channel",
ch_norm,
"--target",
tgt_norm,
"--json",
]
if openclaw_cmd
else [
sys.executable,
"-m",
"openclaw",
"message",
"send",
"--message",
msg,
"--channel",
ch_norm,
"--target",
tgt_norm,
"--json",
]
)
try:
result = _run_command(argv, timeout=CHANNEL_MESSAGE_TIMEOUT)
except subprocess.TimeoutExpired as exc:
logger.error(
"openclaw message send timed out (%ss) argv=%s\n--- stdout ---\n%s\n--- stderr ---\n%s",
CHANNEL_MESSAGE_TIMEOUT,
_argv_preview(argv),
getattr(exc, "stdout", None) or "",
getattr(exc, "stderr", None) or "",
)
return False
except Exception as exc:
logger.error("openclaw message send exception: %s", exc)
logger.error(traceback.format_exc())
return False
if result.returncode == 0:
return True
_log_openclaw_subprocess_failure(
"openclaw message send",
argv,
result.returncode,
result.stdout,
result.stderr,
)
return False
def send_agent_message(
openclaw_cmd: str | None,
session_id: str,
message: str,
) -> bool:
under_openclaw = ran_under_openclaw_exec()
if not under_openclaw:
logger.info("跳过 agent 通知(非 OpenClaw exec):%s", message)
return False
sid = (session_id or "").strip()
if not sid:
logger.warning("跳过 agent 通知:sessionId 为空。")
return False
base_msg = (message or "").strip()
if not base_msg:
logger.warning("跳过 agent 通知:message 为空。")
return False
# Force a visible one-line relay and discourage NO_REPLY for progress notifications.
relay_message = (
"务必处理本消息不要输出 NO_REPLY, 务必向用户报告任务处理进度信息:"
f"{base_msg}"
)
argv = (
[
openclaw_cmd,
"agent",
"--session-id",
sid,
"--message",
relay_message,
"--deliver",
"--json",
]
if openclaw_cmd
else [
sys.executable,
"-m",
"openclaw",
"agent",
"--session-id",
sid,
"--message",
relay_message,
"--deliver",
"--json",
]
)
max_retries = AGENT_MESSAGE_RETRY_COUNT
for attempt in range(1, max_retries + 1):
try:
logger.info(
"Notify via agent --deliver (attempt %d/%d) sessionId=%s",
attempt,
max_retries,
sid,
)
notify_result = _run_command(argv, timeout=AGENT_MESSAGE_TIMEOUT)
if notify_result.returncode == 0:
logger.info("agent --deliver notify succeeded on attempt %d", attempt)
return True
_log_openclaw_subprocess_failure(
f"openclaw agent --deliver (attempt {attempt}/{max_retries})",
argv,
notify_result.returncode,
notify_result.stdout,
notify_result.stderr,
)
except subprocess.TimeoutExpired as exc:
logger.error(
"openclaw agent --deliver timed out (%ss, attempt %s/%s) argv=%s\n--- stdout ---\n%s\n--- stderr ---\n%s",
AGENT_MESSAGE_TIMEOUT,
attempt,
max_retries,
_argv_preview(argv),
getattr(exc, "stdout", None) or "",
getattr(exc, "stderr", None) or "",
)
except FileNotFoundError:
logger.error("Notify failed: `openclaw` command not found.")
return False
except Exception as exc:
logger.error("openclaw agent --deliver exception (attempt %s/%s): %s", attempt, max_retries, exc)
logger.error(traceback.format_exc())
if attempt < max_retries:
time.sleep(2)
return False
def get_openclaw_session_info() -> OpenClawSessionInfo:
empty = OpenClawSessionInfo("", "", "", "")
under_openclaw = ran_under_openclaw_exec()
if under_openclaw:
logger.info("OpenClaw exec 环境:将发送 channel / agent 消息。")
else:
logger.info("非 OpenClaw exec(需 OPENCLAW_SHELL=exec 或 OPENCLAW_CLI=1):仅执行任务,不发送 channel / agent 消息。")
return empty
openclaw_cmd = shutil.which("openclaw") or shutil.which("openclaw.cmd")
argv = [openclaw_cmd, "sessions", "--json"] if openclaw_cmd else [sys.executable, "-m", "openclaw", "sessions", "--json"]
try:
logger.info("Get sessions list")
result = _run_command(argv, timeout=SESSION_LIST_TIMEOUT)
except FileNotFoundError:
logger.error("`openclaw` command not found.")
return empty
except subprocess.TimeoutExpired as exc:
logger.error(
"openclaw sessions --json timed out (%ss) argv=%s\n--- stdout ---\n%s\n--- stderr ---\n%s",
SESSION_LIST_TIMEOUT,
_argv_preview(argv),
getattr(exc, "stdout", None) or "",
getattr(exc, "stderr", None) or "",
)
return empty
combined_output = (result.stdout or "").strip()
if result.stderr:
logger.info("stderr: %s", result.stderr.strip())
if result.returncode != 0:
_log_openclaw_subprocess_failure(
"openclaw sessions --json",
argv,
result.returncode,
result.stdout,
result.stderr,
)
return empty
try:
payload = parse_json_from_mixed_output(combined_output)
logger.info("Parse json from mixed output success")
except ValueError as exc:
logger.error("Parse json from mixed output failed: %s", exc)
raw_out = result.stdout if result.stdout is not None else ""
raw_err = result.stderr if result.stderr is not None else ""
logger.error(
"openclaw sessions --json output could not be parsed as JSON; full capture follows.\n--- stdout (full) ---\n%s\n--- stderr (full) ---\n%s",
raw_out,
raw_err,
)
return empty
sessions = payload.get("sessions")
if not isinstance(sessions, list):
logger.error("JSON payload has no `sessions` list.")
return empty
sorted_sessions = sorted(
(s for s in sessions if isinstance(s, dict)),
key=_session_updated_at_ts,
reverse=True,
)
logger.info("Sorted sessions by updatedAt (desc):")
for index, session in enumerate(sorted_sessions, start=1):
key = session.get("key", "")
session_id = session.get("sessionId", "")
updated_at = session.get("updatedAt", 0)
logger.info("%d. key=%s sessionId=%s updatedAt=%s", index, key, session_id, updated_at)
notify_session_id, notify_session_key = resolve_invoking_session(sorted_sessions)
logger.info("Resolve invoking session success: notify_session_id=%s notify_session_key=%s", notify_session_id, notify_session_key)
channel, target = resolve_session_delivery_context(
notify_session_id, notify_session_key
)
logger.info("Resolved delivery context (progress): channel=%s target=%s", channel, target)
return OpenClawSessionInfo(notify_session_id, notify_session_key, channel, target)
def main() -> int:
openclaw_cmd = shutil.which("openclaw") or shutil.which("openclaw.cmd")
session_info = get_openclaw_session_info()
try:
total_tasks = int(os.getenv("SESSION_LIST_TASK_COUNT", str(DEFAULT_TASK_COUNT)))
sleep_seconds = int(os.getenv("SESSION_LIST_TASK_SECONDS", str(DEFAULT_TASK_SECONDS)))
for index in range(1, total_tasks + 1):
logger.info("Running simulated subtask %d/%d (%ss)...", index, total_tasks, sleep_seconds)
time.sleep(max(0, sleep_seconds))
progress_msg = f"当前进度:{index}/{total_tasks} 已完成。"
ok = send_channel_message(
openclaw_cmd,
session_info.channel,
session_info.target,
progress_msg,
)
if ok:
logger.info("Progress notified for subtask %d/%d", index, total_tasks)
else:
logger.warning("Progress notify failed for subtask %d/%d", index, total_tasks)
agent_msg = f"当前任务已全部执行完成,详细总结Skill执行的结果信息,立即向我报告。"
ok = send_agent_message(openclaw_cmd, session_info.session_id, agent_msg)
if ok:
logger.info("Final summary notify sent.")
else:
logger.warning("Final summary notify failed.")
except Exception as exc:
logger.error("Main process crashed: %s", exc)
logger.error(traceback.format_exc())
return 1
return 0
if __name__ == "__main__":
results = cleanup_agent_sessions()
logger.info("Cleanup agent sessions results: %s", results)
#raise SystemExit(main())
FILE:scripts/session_info.py
"""
SessionInfo — openclaw session 信息的数据类。
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass
class SessionInfo:
key: str
session_id: str
updated_at: int # epoch ms
age_ms: int | None
agent_id: str
kind: str
model: str
model_provider: str
context_tokens: int | None
input_tokens: int | None
output_tokens: int | None
total_tokens: int | None
total_tokens_fresh: bool
system_sent: bool
aborted_last_run: bool
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "SessionInfo":
return cls(
key=d.get("key", ""),
session_id=d.get("sessionId", ""),
updated_at=d.get("updatedAt", 0),
age_ms=d.get("ageMs"),
agent_id=d.get("agentId", ""),
kind=d.get("kind", ""),
model=d.get("model", ""),
model_provider=d.get("modelProvider", ""),
context_tokens=d.get("contextTokens"),
input_tokens=d.get("inputTokens"),
output_tokens=d.get("outputTokens"),
total_tokens=d.get("totalTokens"),
total_tokens_fresh=d.get("totalTokensFresh", False),
system_sent=d.get("systemSent", False),
aborted_last_run=d.get("abortedLastRun", False),
)
FILE:scripts/usage_info.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
@dataclass
class UsageInfo:
"""
Gateway sessions.usage 返回的 token 与费用信息(与 payload.totals / session.usage 结构一致)。
"""
input: int = 0
output: int = 0
cacheRead: int = 0
cacheWrite: int = 0
totalTokens: int = 0
totalCost: float = 0.0
inputCost: float = 0.0
outputCost: float = 0.0
cacheReadCost: float = 0.0
cacheWriteCost: float = 0.0
missingCostEntries: int = 0
@classmethod
def from_totals(cls, raw: dict[str, Any] | None) -> UsageInfo:
"""从 payload.totals 或 session.usage 字典构建。"""
if not raw or not isinstance(raw, dict):
return cls()
def _int(key: str) -> int:
v = raw.get(key)
if v is None:
return 0
try:
return int(float(v))
except (TypeError, ValueError):
return 0
def _float(key: str) -> float:
v = raw.get(key)
if v is None:
return 0.0
try:
return float(v)
except (TypeError, ValueError):
return 0.0
return cls(
input=_int("input"),
output=_int("output"),
cacheRead=_int("cacheRead"),
cacheWrite=_int("cacheWrite"),
totalTokens=_int("totalTokens"),
totalCost=_float("totalCost"),
inputCost=_float("inputCost"),
outputCost=_float("outputCost"),
cacheReadCost=_float("cacheReadCost"),
cacheWriteCost=_float("cacheWriteCost"),
missingCostEntries=_int("missingCostEntries"),
)
def __sub__(self, other: UsageInfo) -> UsageInfo:
"""返回 self - other,用于计算任务前后用量差。"""
if not isinstance(other, UsageInfo):
return self
return UsageInfo(
input=self.input - other.input,
output=self.output - other.output,
cacheRead=self.cacheRead - other.cacheRead,
cacheWrite=self.cacheWrite - other.cacheWrite,
totalTokens=self.totalTokens - other.totalTokens,
totalCost=round(self.totalCost - other.totalCost, 8),
inputCost=round(self.inputCost - other.inputCost, 8),
outputCost=round(self.outputCost - other.outputCost, 8),
cacheReadCost=round(self.cacheReadCost - other.cacheReadCost, 8),
cacheWriteCost=round(self.cacheWriteCost - other.cacheWriteCost, 8),
missingCostEntries=self.missingCostEntries - other.missingCostEntries,
)
def to_dict(self) -> dict[str, Any]:
"""转为字典,字段名与 Gateway usage 一致。"""
return {
"input": self.input,
"output": self.output,
"cacheRead": self.cacheRead,
"cacheWrite": self.cacheWrite,
"totalTokens": self.totalTokens,
"totalCost": self.totalCost,
"inputCost": self.inputCost,
"outputCost": self.outputCost,
"cacheReadCost": self.cacheReadCost,
"cacheWriteCost": self.cacheWriteCost,
"missingCostEntries": self.missingCostEntries,
}
FILE:scripts/utils.py
"""
BenchClaw 通用工具函数。
"""
from __future__ import annotations
import json
import logging
import os
import shutil
import uuid
logger = logging.getLogger("benchclaw")
def get_bench_session_id() -> str:
"""
从本地 cache.json 读取 Bench 会话标识;若不存在则生成随机 UUID 并写回文件后返回。
"""
cache_path = os.path.join(os.path.dirname(__file__), "../data/cache.json")
bench_session_id: str | None = None
try:
with open(cache_path, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
value = data.get("bench_session_id")
if isinstance(value, str) and value.strip():
bench_session_id = value.strip()
except FileNotFoundError:
pass
except (OSError, json.JSONDecodeError):
pass
if not bench_session_id:
bench_session_id = str(uuid.uuid4())
try:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
json.dump({"bench_session_id": bench_session_id}, f, ensure_ascii=False, indent=2)
except OSError:
# 即使写入失败,也仍然返回本次生成的 id
pass
return bench_session_id
def get_temp_file(filename: str) -> str:
"""返回 temp 目录下指定文件的绝对路径,目录不存在时自动创建。"""
script_dir = os.path.dirname(os.path.abspath(__file__))
temp_dir = os.path.join(script_dir, "..", "temp")
os.makedirs(temp_dir, exist_ok=True)
return os.path.join(temp_dir, filename)
def clean_temp_files() -> None:
"""删除 temp 目录下的 messages.json 和 results.json(忽略不存在的文件)。"""
for name in ("results.json", "prompt.md"):
try:
os.remove(get_temp_file(name))
except FileNotFoundError:
pass
except Exception as e:
logger.warning(f"删除临时文件失败 {name}: {e}")
def clean_benchclaw_workspace() -> None:
"""删除 ~/.openclaw/workspace/bench_claw 文件夹(如果存在)。"""
home_dir = os.path.expanduser("~")
bench_claw_path = os.path.join(home_dir, ".openclaw", "workspace", "bench_claw")
if os.path.exists(bench_claw_path):
try:
shutil.rmtree(bench_claw_path)
logger.info(f"已删除工作区文件夹: {bench_claw_path}")
except Exception as e:
logger.warning(f"删除工作区文件夹失败: {e}")
else:
logger.debug(f"bench_claw工作区文件夹不存在,跳过删除: {bench_claw_path}")
# C3: 硬件运行时动态监控
try:
import psutil as _psutil
_HAS_PSUTIL = True
except ImportError:
_HAS_PSUTIL = False
import threading as _threading
class HardwareMonitor:
"""评测期间后台监控 CPU 和内存使用率(每秒采样一次)。"""
def __init__(self):
self.cpu_samples: list[float] = []
self.mem_min_available_gb: float = float('inf')
self._running = False
self._thread = None
def start(self) -> None:
if not _HAS_PSUTIL:
return
self._running = True
def _run():
while self._running:
try:
self.cpu_samples.append(_psutil.cpu_percent(interval=1))
avail = _psutil.virtual_memory().available / (1024 ** 3)
self.mem_min_available_gb = min(self.mem_min_available_gb, avail)
except Exception:
pass
self._thread = _threading.Thread(target=_run, daemon=True)
self._thread.start()
def stop(self) -> dict:
self._running = False
if not _HAS_PSUTIL or not self.cpu_samples:
return {}
try:
mem = _psutil.virtual_memory()
return {
"cpu_peak_percent": max(self.cpu_samples),
"cpu_avg_percent": round(sum(self.cpu_samples) / len(self.cpu_samples), 1),
"mem_min_available_gb": round(self.mem_min_available_gb, 1),
"mem_total_gb": round(mem.total / (1024 ** 3), 1),
}
except Exception:
return {}
def get_system_info() -> dict:
"""
采集系统硬件和主机类型信息,用于上报到榜单。
返回 dict,采集失败时对应字段为 None。
"""
import subprocess as _subprocess
info = {
"cpu_cores": None,
"ram_total_gb": None,
"host_type": None, # 例:"云主机,阿里云 ECS" 或 "虚拟机(KVM)" 或 "物理机"
"virt_type": None, # systemd-detect-virt 原始值
}
# CPU 核数(物理核)
try:
info["cpu_cores"] = _psutil.cpu_count(logical=True) if _HAS_PSUTIL else None
except Exception:
pass
# 内存总量
try:
if _HAS_PSUTIL:
info["ram_total_gb"] = round(_psutil.virtual_memory().total / (1024**3), 1)
except Exception:
pass
# 虚拟化类型
try:
result = _subprocess.run(
["systemd-detect-virt"], capture_output=True, text=True, timeout=5
)
virt = result.stdout.strip()
info["virt_type"] = virt
except Exception:
virt = "unknown"
# 主机类型(友好描述)
try:
vendor = open("/sys/class/dmi/id/sys_vendor").read().strip() if __import__("os").path.exists("/sys/class/dmi/id/sys_vendor") else ""
product = open("/sys/class/dmi/id/product_name").read().strip() if __import__("os").path.exists("/sys/class/dmi/id/product_name") else ""
if "Alibaba" in vendor or "Alibaba" in product:
info["host_type"] = "云主机,阿里云 ECS"
elif "Tencent" in vendor or "Tencent" in product:
info["host_type"] = "云主机,腾讯云 CVM"
elif "Amazon" in vendor or "EC2" in product:
info["host_type"] = "云主机,AWS EC2"
elif "Microsoft" in vendor and virt in ("hyperv", "microsoft"):
info["host_type"] = "云主机,Azure"
elif "Google" in vendor:
info["host_type"] = "云主机,Google Cloud"
elif virt == "none":
info["host_type"] = "物理机"
elif virt in ("kvm", "qemu", "vmware", "xen", "hyperv"):
info["host_type"] = f"虚拟机({virt.upper()})"
elif virt == "docker":
info["host_type"] = "容器(Docker)"
else:
info["host_type"] = "虚拟机"
except Exception:
pass
return info
FILE:scripts/verification.py
from __future__ import annotations
import json
import logging
import os
import re
from dataclasses import dataclass
from typing import Any, Iterable
logger = logging.getLogger("benchclaw.verification")
@dataclass(frozen=True)
class RuleResult:
rule_type: str
target: str
passed: bool
awarded: int
expected: int
details: str = ""
@dataclass(frozen=True)
class PenaltyResult:
rule_type: str
target: str
triggered: bool
deduction: int
fatal: bool
details: str = ""
@dataclass(frozen=True)
class VerificationResult:
target_type: str
target_path: str
description: str
exists: bool
exist_awarded: int
content_score: int
penalties: list[PenaltyResult]
rule_results: list[RuleResult]
@property
def penalty_deduction(self) -> int:
return sum(p.deduction for p in self.penalties if p.triggered)
@property
def fatal_triggered(self) -> bool:
return any(p.triggered and p.fatal for p in self.penalties)
@property
def total(self) -> int:
# 扣分不在这里结算,交给上层统一处理(避免重复扣)
return self.exist_awarded + self.content_score
@dataclass(frozen=True)
class MetricExtractorResult:
metric_name: str
target_file: str
regex: str
value_type: str
extracted_value: str | int | float | None
extraction_success: bool
details: str = ""
@dataclass(frozen=True)
class TaskVerificationResult:
question_id: str
max_score: int
score_before_penalty: int
penalty_deduction: int
fatal: bool
score: int
verifications: list[VerificationResult]
metrics: list[MetricExtractorResult] | None = None
_REGEX_META = set(r".^$*+?{}[]\|()")
def _looks_like_regex(spec: str) -> bool:
# 题库里同时存在纯路径与 regex(例如 bench_claw/fibonacci\.py$)
return any(ch in _REGEX_META for ch in spec)
def _normalize_rel_path(path: str) -> str:
# 统一用 / 做匹配与输出(题库文件路径也用 /)
return path.replace("\\", "/").lstrip("./")
def _iter_candidate_files(base_dir: str) -> Iterable[str]:
for root, _dirs, files in os.walk(base_dir):
for name in files:
yield os.path.join(root, name)
def resolve_file_paths(workspace_dir: str, file_path_spec: str) -> list[str]:
"""
将题库中的 file_path 解析为真实存在的文件路径列表(workspace_dir 绝对路径)。
- 若 spec 看起来是纯路径:直接拼接并返回(存在与否由上层判断)
- 若 spec 看起来是 regex:在尽可能小的目录范围内 walk 并匹配
"""
spec = _normalize_rel_path(file_path_spec)
if not _looks_like_regex(spec):
return [os.path.join(workspace_dir, spec)]
# 尝试缩小搜索范围:取到第一个 regex 元字符之前的"安全前缀"
prefix_chars: list[str] = []
for ch in spec:
if ch in _REGEX_META:
break
prefix_chars.append(ch)
prefix = "".join(prefix_chars)
search_dir_rel = os.path.dirname(prefix) if prefix else ""
search_dir = os.path.join(workspace_dir, search_dir_rel)
if not os.path.isdir(search_dir):
search_dir = workspace_dir
pattern = re.compile(spec)
matches: list[str] = []
for abs_path in _iter_candidate_files(search_dir):
rel = _normalize_rel_path(os.path.relpath(abs_path, workspace_dir))
if pattern.search(rel):
matches.append(abs_path)
return sorted(matches)
def _read_text_file(path: str, *, max_bytes: int = 2_000_000) -> str:
# best-effort 读取,避免超大文件拖垮评测;默认最多 2MB
with open(path, "rb") as f:
data = f.read(max_bytes + 1)
if len(data) > max_bytes:
data = data[:max_bytes]
return data.decode("utf-8", errors="replace")
def _apply_metric_extractors(
workspace_dir: str,
metric_extractors: list[dict[str, Any]] | None,
) -> list[MetricExtractorResult]:
"""
处理 metric_extractors,从目标文件中提取指标。
Args:
workspace_dir: 工作目录
metric_extractors: 指标提取器配置列表
Returns:
MetricExtractorResult 列表
"""
results: list[MetricExtractorResult] = []
if not metric_extractors:
return results
for extractor in metric_extractors:
if not isinstance(extractor, dict):
continue
metric_name = str(extractor.get("metric_name") or "")
target_file = str(extractor.get("target_file") or "")
regex_pattern = str(extractor.get("regex") or "")
value_type = str(extractor.get("type") or "string")
if not metric_name or not target_file or not regex_pattern:
results.append(MetricExtractorResult(
metric_name=metric_name or "unknown",
target_file=target_file,
regex=regex_pattern,
value_type=value_type,
extracted_value=None,
extraction_success=False,
details="Missing required fields (metric_name, target_file, or regex)",
))
continue
# 构建完整文件路径
file_path = os.path.join(workspace_dir, target_file)
if not os.path.isfile(file_path):
results.append(MetricExtractorResult(
metric_name=metric_name,
target_file=target_file,
regex=regex_pattern,
value_type=value_type,
extracted_value=None,
extraction_success=False,
details=f"Target file not found: {file_path}",
))
continue
try:
text = _read_text_file(file_path)
# 执行正则匹配
match = re.search(regex_pattern, text, flags=re.MULTILINE | re.IGNORECASE)
if not match:
results.append(MetricExtractorResult(
metric_name=metric_name,
target_file=target_file,
regex=regex_pattern,
value_type=value_type,
extracted_value=None,
extraction_success=False,
details="Regex pattern did not match any content",
))
continue
# 提取值
extracted_str = match.group(1) if match.lastindex and match.lastindex >= 1 else match.group(0)
# 根据类型转换
extracted_value: str | int | float | None = None
conversion_success = True
if value_type == "int":
try:
# 清理字符串中的非数字字符(如逗号、空格等)
cleaned = re.sub(r'[^\d.-]', '', extracted_str)
extracted_value = int(float(cleaned))
except (ValueError, TypeError) as e:
conversion_success = False
details = f"Failed to convert '{extracted_str}' to int: {e}"
elif value_type == "float":
try:
cleaned = re.sub(r'[^\d.-]', '', extracted_str)
extracted_value = float(cleaned)
except (ValueError, TypeError) as e:
conversion_success = False
details = f"Failed to convert '{extracted_str}' to float: {e}"
else: # string
extracted_value = extracted_str
details = f"Extracted string value: {extracted_str}"
if conversion_success:
details = f"Successfully extracted {value_type} value: {extracted_value}"
results.append(MetricExtractorResult(
metric_name=metric_name,
target_file=target_file,
regex=regex_pattern,
value_type=value_type,
extracted_value=extracted_value if conversion_success else None,
extraction_success=conversion_success,
details=details,
))
except re.error as e:
results.append(MetricExtractorResult(
metric_name=metric_name,
target_file=target_file,
regex=regex_pattern,
value_type=value_type,
extracted_value=None,
extraction_success=False,
details=f"Invalid regex pattern: {e}",
))
except Exception as e:
results.append(MetricExtractorResult(
metric_name=metric_name,
target_file=target_file,
regex=regex_pattern,
value_type=value_type,
extracted_value=None,
extraction_success=False,
details=f"Extraction error: {e}",
))
return results
def _apply_keyword_match_rule(text: str, rule: dict[str, Any]) -> RuleResult:
"""keyword_match: 检查文本中是否包含指定关键词"""
required_words = rule.get("required_words", [])
match_threshold = rule.get("match_threshold", "all") # "all" 或 "any"
score = int(rule.get("score") or 0)
description = str(rule.get("description") or "")
# 将 required_words 用逗号连接作为 target 参数
target = ",".join(str(w) for w in required_words) if required_words else ""
if not required_words:
return RuleResult("keyword_match", target, True, score, score, "no keywords required")
matched_words = [word for word in required_words if word in text]
if match_threshold == "all":
passed = len(matched_words) == len(required_words)
else: # "any"
passed = len(matched_words) > 0
details = f"matched={len(matched_words)}/{len(required_words)}, words={matched_words}"
return RuleResult("keyword_match", target, passed, score if passed else 0, score, details)
def _apply_keyword_frequency_rule(text: str, rule: dict[str, Any]) -> RuleResult:
"""keyword_frequency: 检查关键词出现次数是否满足最小要求"""
word = str(rule.get("word") or "")
min_count = int(rule.get("min_count") or 0)
score = int(rule.get("score") or 0)
description = str(rule.get("description") or "")
if not word:
return RuleResult("keyword_frequency", "", False, 0, score, "no word specified")
count = text.count(word)
passed = count >= min_count
details = f"count={count}, min_count={min_count}"
return RuleResult("keyword_frequency", word, passed, score if passed else 0, score, details)
def _apply_content_rule(text: str, rule: dict[str, Any]) -> RuleResult:
rule_type = str(rule.get("rule_type") or "")
target = str(rule.get("target") or "")
score = int(rule.get("score") or 0)
description = str(rule.get("description") or target)
if rule_type == "contains":
passed = target in text
return RuleResult(rule_type, target, passed, score if passed else 0, score)
if rule_type == "regex_match":
try:
passed = re.search(target, text, flags=re.MULTILINE) is not None
except re.error as e:
return RuleResult(rule_type, target, False, 0, score, details=f"invalid regex: {e}")
return RuleResult(rule_type, target, passed, score if passed else 0, score)
if rule_type == "regex_count":
min_count = int(rule.get("min_count") or 0)
try:
cnt = len(re.findall(target, text, flags=re.MULTILINE))
passed = cnt >= min_count
except re.error as e:
return RuleResult(rule_type, target, False, 0, score, details=f"invalid regex: {e}")
details = f"count={cnt}, min_count={min_count}"
return RuleResult(rule_type, target, passed, score if passed else 0, score, details=details)
if rule_type == "keyword_match":
return _apply_keyword_match_rule(text, rule)
if rule_type == "keyword_frequency":
return _apply_keyword_frequency_rule(text, rule)
return RuleResult(rule_type, target, False, 0, score, details="unsupported rule_type")
def _apply_penalty_keywords_rule(text: str, rule: dict[str, Any]) -> PenaltyResult:
"""penalty_keywords: 检查是否包含禁用词,触发则扣分"""
forbidden_words = rule.get("forbidden_words", [])
deduction = int(rule.get("deduction") or 0)
fatal = bool(rule.get("fatal") or False)
description = str(rule.get("description") or "")
if not forbidden_words:
return PenaltyResult("penalty_keywords", "", False, 0, fatal)
triggered_words = [word for word in forbidden_words if word in text]
triggered = len(triggered_words) > 0
details = f"forbidden_words_found={triggered_words}" if triggered else ""
return PenaltyResult("penalty_keywords", description, triggered, deduction if triggered else 0, fatal, details)
def _apply_penalty_regex_rule(text: str, rule: dict[str, Any]) -> PenaltyResult:
"""penalty_regex: 使用正则检查是否触发惩罚规则"""
target = str(rule.get("target") or "")
deduction = int(rule.get("deduction") or 0)
fatal = bool(rule.get("fatal") or False)
description = str(rule.get("description") or "")
if not target:
return PenaltyResult("penalty_regex", "", False, 0, fatal)
try:
triggered = re.search(target, text, flags=re.MULTILINE) is not None
except re.error as e:
return PenaltyResult("penalty_regex", target, False, 0, fatal, details=f"invalid regex: {e}")
return PenaltyResult("penalty_regex", description, triggered, deduction if triggered else 0, fatal)
def _apply_penalty_rule(text: str, rule: dict[str, Any]) -> PenaltyResult:
rule_type = str(rule.get("rule_type") or "")
target = str(rule.get("target") or "")
deduction = int(rule.get("deduction") or 0)
fatal = bool(rule.get("fatal") or False)
if rule_type == "contains":
triggered = target in text
return PenaltyResult(rule_type, target, triggered, deduction if triggered else 0, fatal)
if rule_type == "regex_match":
try:
triggered = re.search(target, text, flags=re.MULTILINE) is not None
except re.error as e:
return PenaltyResult(rule_type, target, False, 0, fatal, details=f"invalid regex: {e}")
return PenaltyResult(rule_type, target, triggered, deduction if triggered else 0, fatal)
if rule_type == "penalty_keywords":
return _apply_penalty_keywords_rule(text, rule)
if rule_type == "penalty_regex":
return _apply_penalty_regex_rule(text, rule)
return PenaltyResult(rule_type, target, False, 0, fatal, details="unsupported penalty rule_type")
def _extract_reply_content(workspace_dir: str, target_path: str) -> str:
"""
从 target_path 提取回复内容。
target_path 格式如: "output_content.reply" 表示从 agent 输出中提取 reply 字段
"""
# 目前支持直接从文件读取,或者从 agent_output.json 中解析
# 默认查找 agent_output.json 文件
agent_output_path = os.path.join(workspace_dir, "agent_output.json")
if os.path.exists(agent_output_path):
try:
with open(agent_output_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 解析 target_path,如 "output_content.reply"
parts = target_path.split(".")
current = data
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return ""
if isinstance(current, str):
return current
return json.dumps(current, ensure_ascii=False)
except (json.JSONDecodeError, IOError):
return ""
# 如果 target_path 指向一个存在的文件,直接读取
if os.path.isfile(os.path.join(workspace_dir, target_path)):
return _read_text_file(os.path.join(workspace_dir, target_path))
return ""
def verify_file_target(
*,
workspace_dir: str,
verification: dict[str, Any],
) -> VerificationResult:
"""验证文件类型的目标"""
description = str(verification.get("description") or "")
spec_path = str(verification.get("target_path") or "")
exist_score = int(verification.get("exist_score") or 0)
content_rules = verification.get("content_rules") or []
penalty_rules = verification.get("penalty_rules") or []
logger.info(f"[verify_file] workspace_dir={workspace_dir}, target_path={spec_path}, description={description}")
resolved_paths = resolve_file_paths(workspace_dir, spec_path)
logger.info(f"[verify_file] resolved_paths={resolved_paths}")
existing = [p for p in resolved_paths if os.path.isfile(p)]
exists = len(existing) > 0
exist_awarded = exist_score if exists else 0
logger.info(f"[verify_file] file_exists={exists}, exist_score={exist_score}, exist_awarded={exist_awarded}")
rule_results: list[RuleResult] = []
penalties: list[PenaltyResult] = []
content_score = 0
if exists:
# 多个匹配文件时:取"得分最高"的那个(更符合 regex file_path 的语义)
best_rule_results: list[RuleResult] = []
best_penalties: list[PenaltyResult] = []
best_content_score = -1
for path in existing:
text = _read_text_file(path)
cur_rule_results = []
cur_content_score = 0
if isinstance(content_rules, list):
for idx, r in enumerate(content_rules):
if isinstance(r, dict):
rr = _apply_content_rule(text, r)
cur_rule_results.append(rr)
cur_content_score += rr.awarded
# 记录每个 content_rule 的验证结果
status = "PASS" if rr.passed else "FAIL"
logger.info(f"[verify_file] content_rule[{idx}] {status}: type={rr.rule_type}, target={rr.target}, awarded={rr.awarded}/{rr.expected}, details={rr.details}")
cur_penalties = []
if isinstance(penalty_rules, list):
for idx, r in enumerate(penalty_rules):
if isinstance(r, dict):
pr = _apply_penalty_rule(text, r)
cur_penalties.append(pr)
# 记录每个 penalty_rule 的验证结果
status = "TRIGGERED" if pr.triggered else "OK"
logger.info(f"[verify_file] penalty_rule[{idx}] {status}: type={pr.rule_type}, target={pr.target}, deduction={pr.deduction}, fatal={pr.fatal}")
if cur_content_score > best_content_score:
best_content_score = cur_content_score
best_rule_results = cur_rule_results
best_penalties = cur_penalties
rule_results = best_rule_results
penalties = best_penalties
content_score = max(0, best_content_score)
logger.info(f"[verify_file] total_content_score={content_score}, total_penalties={len([p for p in penalties if p.triggered])}")
return VerificationResult(
target_type="file",
target_path=spec_path,
description=description,
exists=exists,
exist_awarded=exist_awarded,
content_score=content_score,
penalties=penalties,
rule_results=rule_results,
)
def verify_reply_target(
*,
workspace_dir: str,
verification: dict[str, Any],
stdout_content: str = "",
) -> VerificationResult:
"""验证回复类型的目标
Args:
workspace_dir: 工作目录
verification: 验证配置
stdout_content: Agent 输出的 stdout 内容(直接传递,优先于文件读取)
"""
description = str(verification.get("description") or "")
target_path = str(verification.get("target_path") or "")
exist_score = int(verification.get("exist_score") or 0)
content_rules = verification.get("content_rules") or []
penalty_rules = verification.get("penalty_rules") or []
logger.info(f"[verify_reply] target_path={target_path}, description={description}")
# 提取回复内容:优先使用传入的 stdout_content,否则从文件获取
reply_content = stdout_content if stdout_content else _extract_reply_content(workspace_dir, target_path)
exists = bool(reply_content)
exist_awarded = exist_score if exists else 0
logger.info(f"[verify_reply] content_exists={exists}, exist_score={exist_score}, exist_awarded={exist_awarded}, content_length={len(reply_content) if reply_content else 0}")
rule_results: list[RuleResult] = []
penalties: list[PenaltyResult] = []
content_score = 0
if exists:
if isinstance(content_rules, list):
for idx, r in enumerate(content_rules):
if isinstance(r, dict):
rr = _apply_content_rule(reply_content, r)
rule_results.append(rr)
content_score += rr.awarded
# 记录每个 content_rule 的验证结果
status = "PASS" if rr.passed else "FAIL"
logger.info(f"[verify_reply] content_rule[{idx}] {status}: type={rr.rule_type}, target={rr.target}, awarded={rr.awarded}/{rr.expected}, details={rr.details}")
if isinstance(penalty_rules, list):
for idx, r in enumerate(penalty_rules):
if isinstance(r, dict):
pr = _apply_penalty_rule(reply_content, r)
penalties.append(pr)
# 记录每个 penalty_rule 的验证结果
status = "TRIGGERED" if pr.triggered else "OK"
logger.info(f"[verify_reply] penalty_rule[{idx}] {status}: type={pr.rule_type}, target={pr.target}, deduction={pr.deduction}, fatal={pr.fatal}")
logger.info(f"[verify_reply] total_content_score={content_score}, total_penalties={len([p for p in penalties if p.triggered])}")
return VerificationResult(
target_type="reply",
target_path=target_path,
description=description,
exists=exists,
exist_awarded=exist_awarded,
content_score=content_score,
penalties=penalties,
rule_results=rule_results,
)
def verify_single_verification(
*,
workspace_dir: str,
verification: dict[str, Any],
stdout_content: str = "",
) -> VerificationResult:
"""根据 target_type 路由到对应的验证函数
Args:
workspace_dir: 工作目录
verification: 验证配置
stdout_content: Agent 输出的 stdout 内容(用于 reply 类型验证)
"""
target_type = str(verification.get("target_type") or "file")
if target_type == "file":
return verify_file_target(workspace_dir=workspace_dir, verification=verification)
elif target_type == "reply":
return verify_reply_target(workspace_dir=workspace_dir, verification=verification, stdout_content=stdout_content)
elif target_type == "mixed":
# mixed 类型:同时验证文件和回复
# 先尝试验证文件,如果没有匹配再验证回复
file_result = verify_file_target(workspace_dir=workspace_dir, verification=verification)
if file_result.exists:
return file_result
return verify_reply_target(workspace_dir=workspace_dir, verification=verification, stdout_content=stdout_content)
else:
# 未知类型,默认按 file 处理
return verify_file_target(workspace_dir=workspace_dir, verification=verification)
def verify_task_answer(
*,
workspace_dir: str,
question_id: str,
answer: dict[str, Any],
stdout_content: str = "",
) -> TaskVerificationResult:
"""验证任务答案(支持新的 verifications 数组结构)
Args:
workspace_dir: 工作目录
question_id: 题目ID
answer: 答案配置
stdout_content: Agent 输出的 stdout 内容(用于 reply 类型验证)
"""
max_score = int(answer.get("max_score") or 0)
verifications = answer.get("verifications") or []
logger.info(f"[verify_task] question_id={question_id}, max_score={max_score}, verifications_count={len(verifications)}, workspace_dir={workspace_dir}")
verification_results: list[VerificationResult] = []
for idx, v in enumerate(verifications):
if isinstance(v, dict):
target_type = str(v.get("target_type") or "file")
target_path = str(v.get("target_path") or "")
description = str(v.get("description") or "")
logger.info(f"[verify_task] processing verification[{idx}]: type={target_type}, path={target_path}, desc={description}")
result = verify_single_verification(workspace_dir=workspace_dir, verification=v, stdout_content=stdout_content)
verification_results.append(result)
# 记录每个 verification 的汇总结果
status = "PASS" if result.exists else "MISSING"
logger.info(f"[verify_task] verification[{idx}] {status}: target_type={result.target_type}, exists={result.exists}, "
f"exist_awarded={result.exist_awarded}, content_score={result.content_score}, "
f"penalties={len(result.penalties)}, total={result.total}")
# 详细记录 rule_results
for ridx, rr in enumerate(result.rule_results):
status = "PASS" if rr.passed else "FAIL"
logger.info(f"[verify_task] verification[{idx}] rule[{ridx}] {status}: {rr.rule_type}, target={rr.target}, "
f"awarded={rr.awarded}/{rr.expected}, details={rr.details}")
# 详细记录 penalties
for pidx, pr in enumerate(result.penalties):
if pr.triggered:
logger.info(f"[verify_task] verification[{idx}] penalty[{pidx}] TRIGGERED: {pr.rule_type}, target={pr.target}, "
f"deduction={pr.deduction}, fatal={pr.fatal}")
score_before_penalty = sum(vr.total for vr in verification_results)
penalty_deduction = sum(vr.penalty_deduction for vr in verification_results)
fatal = any(vr.fatal_triggered for vr in verification_results)
if fatal:
final_score = 0
else:
final_score = max(0, min(max_score, score_before_penalty - penalty_deduction))
# 处理 metric_extractors
metric_extractors = answer.get("metric_extractors")
metrics_results = _apply_metric_extractors(workspace_dir, metric_extractors)
if metrics_results:
logger.info(f"[verify_task] question_id={question_id} METRICS: extracted {len(metrics_results)} metrics")
for mr in metrics_results:
status = "SUCCESS" if mr.extraction_success else "FAILED"
logger.info(f"[verify_task] metric '{mr.metric_name}': {status}, value={mr.extracted_value}, details={mr.details}")
logger.info(f"[verify_task] question_id={question_id} SUMMARY: max_score={max_score}, "
f"score_before_penalty={score_before_penalty}, penalty_deduction={penalty_deduction}, "
f"fatal={fatal}, final_score={final_score}")
return TaskVerificationResult(
question_id=question_id,
max_score=max_score,
score_before_penalty=score_before_penalty,
penalty_deduction=penalty_deduction,
fatal=fatal,
score=final_score,
verifications=verification_results,
metrics=metrics_results if metrics_results else None,
)
def verify_question_from_questions_json(
*,
questions_json_path: str,
workspace_dir: str,
question_id: str,
stdout_content: str = "",
) -> TaskVerificationResult:
"""
从 questions.json(服务端拉下来的全量题库 JSON)里按 question_id 查找并验证。
Args:
questions_json_path: 题库 JSON 文件路径
workspace_dir: 工作目录
question_id: 题目ID
stdout_content: Agent 输出的 stdout 内容(用于 reply 类型验证)
"""
with open(questions_json_path, "r", encoding="utf-8") as f:
obj = json.load(f)
# 使用标准结构: { success, data: { questions: [...] } }
if not isinstance(obj, dict) or not isinstance(obj.get("data"), dict) or not isinstance(obj["data"].get("questions"), list):
raise ValueError("unrecognized questions.json structure: expected { success, data: { questions: [...] } }")
questions: list[Any] = obj["data"]["questions"]
for q in questions:
if not isinstance(q, dict):
continue
if str(q.get("id") or "") != question_id:
continue
answer = q.get("answer")
if not isinstance(answer, dict):
raise ValueError(f"question {question_id} missing answer")
return verify_task_answer(workspace_dir=workspace_dir, question_id=question_id, answer=answer, stdout_content=stdout_content)
raise KeyError(f"question_id not found: {question_id}")
if __name__ == "__main__":
verify_question_from_questions_json()