@clawhub-aowind-211e976cb3
Capture screenshots of web pages running on local or remote servers using Puppeteer in headless Chromium. Use when user asks to screenshot web pages, capture...
---
name: web-screenshot
description: Capture screenshots of web pages running on local or remote servers using Puppeteer in headless Chromium. Use when user asks to screenshot web pages, capture web UI, take website screenshots, or document web application interfaces. Supports login-required SPAs (Vue/React/Angular) by performing form-based authentication before navigating. Generates screenshots and an optional result.json with per-page descriptions.
---
# Web Screenshot
Capture screenshots of web pages (especially SPA applications) with automatic login handling.
## Dependencies
- `puppeteer-core` (npm global)
- `chromium-browser` (`/usr/bin/chromium-browser`)
- Node.js
Verify with: `which chromium-browser && npm ls -g puppeteer-core`
## Quick Start
```bash
node <skill_dir>/scripts/screenshot.js <config.json>
```
## Config Format
```json
{
"baseUrl": "http://192.168.7.66:8080",
"outputDir": "/root/screenpics/my-capture",
"resolution": [1920, 1080],
"login": {
"url": "/login",
"usernameSelector": "input[placeholder='请输入用户名']",
"passwordSelector": "input[type='password']",
"submitSelector": "button.el-button--primary",
"credentials": { "username": "admin", "password": "123456" }
},
"pages": [
{ "name": "01_dashboard", "path": "/dashboard", "waitMs": 3000 },
{ "name": "02_project_list", "path": "/project/list", "waitMs": 2000 }
],
"descriptions": {
"01_dashboard": "工作台首页,展示KPI卡片和图表。",
"02_project_list": "项目管理列表页面。"
}
}
```
### Login Flow (SPA Authentication)
The script handles Vue/React SPA login by:
1. Navigating to the login page
2. Setting input values via native `HTMLInputElement.value` setter + dispatching `input` events (Vue-reactive compatible)
3. Clicking the submit button
4. Waiting for SPA router navigation (URL change)
5. Using Vue's `$router.push()` for subsequent page navigation (avoids Pinia/Redux store reset on full page reload)
### Fields
| Field | Required | Description |
|-------|----------|-------------|
| `baseUrl` | ✅ | Base URL of the web app |
| `outputDir` | ✅ | Output directory for screenshots |
| `resolution` | No | Viewport size `[width, height]`, default `[1920, 1080]` |
| `login` | No | Login config (skip for public pages) |
| `login.usernameSelector` | ✅* | CSS selector for username input |
| `login.passwordSelector` | ✅* | CSS selector for password input |
| `login.submitSelector` | ✅* | CSS selector for submit button |
| `login.credentials` | ✅* | `{ username, password }` |
| `pages` | ✅ | Array of pages to capture |
| `pages[].name` | ✅ | Filename prefix (e.g. `01_dashboard`) |
| `pages[].path` | ✅ | URL path (e.g. `/dashboard`) |
| `pages[].waitMs` | No | Extra wait in ms after navigation (default 2000) |
| `descriptions` | No | Map of `name` → description text (included in result.json) |
## Output
- `{outputDir}/{name}.png` — one PNG per page
- `{outputDir}/result.json` — metadata with filenames, titles, URLs, descriptions
### result.json Format
```json
{
"project": "auto-generated",
"captureDate": "2026-03-22",
"baseUrl": "...",
"resolution": "1920x1080",
"screenshots": [
{
"filename": "01_dashboard.png",
"title": "Dashboard",
"url": "...",
"description": "..."
}
]
}
```
## Capture Login Page Too
To include the login page as the first screenshot, add it to `pages` with a special flag:
```json
{
"pages": [
{ "name": "00_login", "path": "/login", "isLoginPage": true, "waitMs": 2000 }
]
}
```
When `isLoginPage: true`, the script captures this page before performing login.
## Advanced: Custom Vue Store Login
If the form-based login doesn't work (e.g., custom auth flow), use `storeLogin` instead:
```json
{
"login": {
"url": "/login",
"storeLogin": {
"storeName": "user",
"method": "login",
"args": ["平台管理员"]
}
}
}
```
This directly calls `pinia._s.get(storeName).method(...args)` via CDP.
## Troubleshooting
- **Blank charts (ECharts/Chart.js)**: Headless Chromium has no GPU. Charts using Canvas may render empty. Use `--disable-gpu` (already included).
- **Redirected to login on all pages**: Login failed. Check selectors match the actual form elements. Try `storeLogin` approach.
- **SPA navigation not working**: Ensure `login` section is configured. Without login, `page.goto()` is used instead of `$router.push()`.
FILE:scripts/screenshot.js
#!/usr/bin/env node
/**
* Web Screenshot Tool
* Usage: node screenshot.js <config.json>
*
* Captures screenshots of web pages, handling SPA login via form submission
* or direct Pinia store calls. Uses $router.push() for SPA navigation to
* preserve auth state across page transitions.
*/
const puppeteer = require('puppeteer-core');
const path = require('path');
const fs = require('fs');
const sleep = ms => new Promise(r => setTimeout(r, ms));
// Parse args
const configPath = process.argv[2];
if (!configPath) {
console.error('Usage: node screenshot.js <config.json>');
process.exit(1);
}
const config = JSON.parse(fs.readFileSync(path.resolve(configPath), 'utf-8'));
const {
baseUrl,
outputDir,
resolution = [1920, 1080],
login,
pages,
descriptions = {}
} = config;
const [vpWidth, vpHeight] = resolution;
// Ensure output dir exists
fs.mkdirSync(outputDir, { recursive: true });
(async () => {
const browser = await puppeteer.launch({
executablePath: '/usr/bin/chromium-browser',
headless: 'new',
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-gpu',
`--window-size=vpWidth,vpHeight`,
'--disable-dev-shm-usage'
]
});
const page = await browser.newPage();
await page.setViewport({ width: vpWidth, height: vpHeight });
const screenshots = [];
// ---- Helper: take screenshot ----
async function capture(filepath) {
await page.screenshot({ path: filepath, fullPage: false });
}
// ---- Helper: login page screenshot (before login) ----
async function captureLoginPage(p) {
console.log(` Capturing login page: p.name...`);
await page.goto(baseUrl + p.path, { waitUntil: 'networkidle2', timeout: 20000 });
await sleep(p.waitMs || 2000);
const filepath = path.join(outputDir, `p.name.png`);
await capture(filepath);
screenshots.push({ filename: `p.name.png`, name: p.name, path: p.path });
console.log(` -> saved p.name.png`);
}
// ---- Helper: form-based login ----
async function formLogin() {
const l = login;
console.log(' Logging in via form...');
// Navigate to login page if not already there
if (!page.url().includes(l.url)) {
await page.goto(baseUrl + l.url, { waitUntil: 'networkidle2', timeout: 20000 });
await sleep(1000);
}
// Set input values using native setter (Vue-reactive compatible)
await page.evaluate(({ uSel, pSel, username, password }) => {
const setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
const uInput = document.querySelector(uSel);
const pInput = document.querySelector(pSel);
if (uInput) { setter.call(uInput, username); uInput.dispatchEvent(new Event('input', { bubbles: true })); }
if (pInput) { setter.call(pInput, password); pInput.dispatchEvent(new Event('input', { bubbles: true })); }
}, {
uSel: l.usernameSelector,
pSel: l.passwordSelector,
username: l.credentials.username,
password: l.credentials.password
});
await sleep(300);
// Click submit
const btn = await page.$(l.submitSelector);
if (btn) {
await btn.click();
}
// Wait for SPA navigation
try {
await page.waitForFunction(
() => window.location.pathname !== new URL(window.location.href).pathname || document.title !== '',
{ timeout: 10000 }
);
} catch (e) {
// timeout is ok
}
await sleep(3000);
console.log(` -> URL after login: page.url()`);
}
// ---- Helper: Pinia store login ----
async function storeLogin() {
const sl = login.storeLogin;
console.log(` Logging in via Pinia store: sl.storeName.sl.method(JSON.stringify(sl.args))...`);
// Navigate to login page first to initialize the Vue app
await page.goto(baseUrl + (login.url || '/login'), { waitUntil: 'networkidle2', timeout: 20000 });
await sleep(1000);
await page.evaluate(({ storeName, method, args }) => {
const app = document.querySelector('#app').__vue_app__;
if (app) {
const pinia = app.config.globalProperties.$pinia;
if (pinia && pinia._s) {
const store = pinia._s.get(storeName);
if (store) {
store[method](...args);
} else {
console.error(`Store "storeName" not found. Available:`, [...pinia._s.keys()]);
}
}
}
}, sl);
await sleep(1000);
}
// ---- Helper: SPA navigation using $router.push ----
async function spaNavigate(targetPath, waitMs) {
await page.evaluate((p) => {
const app = document.querySelector('#app').__vue_app__;
if (app && app.config.globalProperties.$router) {
app.config.globalProperties.$router.push(p);
}
}, targetPath);
await sleep(waitMs);
}
// ---- Helper: fallback navigation via page.goto ----
async function gotoNavigate(targetPath, waitMs) {
await page.goto(baseUrl + targetPath, { waitUntil: 'networkidle2', timeout: 15000 });
await sleep(waitMs);
}
// ===== Main flow =====
let isLoggedIn = false;
// Step 1: Capture any login pages first (before logging in)
const loginPages = pages.filter(p => p.isLoginPage);
for (const lp of loginPages) {
await captureLoginPage(lp);
}
// Step 2: Login if configured
if (login) {
if (login.storeLogin) {
await storeLogin();
isLoggedIn = true;
} else if (login.usernameSelector) {
await formLogin();
isLoggedIn = !page.url().includes('login');
}
}
// Step 3: Capture remaining pages
const otherPages = pages.filter(p => !p.isLoginPage);
const useSpaNav = isLoggedIn;
for (let i = 0; i < otherPages.length; i++) {
const p = otherPages[i];
const waitMs = p.waitMs || 2000;
console.log(` [i + 1/otherPages.length] p.name -> p.path...`);
try {
if (useSpaNav) {
await spaNavigate(p.path, waitMs);
} else {
await gotoNavigate(p.path, waitMs);
}
} catch (e) {
console.log(` -> Navigation error: e.message, trying fallback...`);
await gotoNavigate(p.path, waitMs);
}
const filepath = path.join(outputDir, `p.name.png`);
await capture(filepath);
screenshots.push({ filename: `p.name.png`, name: p.name, path: p.path });
console.log(` -> saved p.name.png (URL: page.url())`);
}
await browser.close();
// ===== Generate result.json =====
const resultJson = {
project: 'web-screenshot capture',
captureDate: new Date().toISOString().split('T')[0],
baseUrl,
resolution: `vpWidthxvpHeight`,
screenshots: screenshots.map(s => ({
filename: s.filename,
title: s.name.replace(/^\d+_/, '').replace(/_/g, ' ').replace(/\b\w/g, c => c.toUpperCase()),
url: baseUrl + s.path,
description: descriptions[s.name] || ''
}))
};
const resultPath = path.join(outputDir, 'result.json');
fs.writeFileSync(resultPath, JSON.stringify(resultJson, null, 2), 'utf-8');
console.log(`\n=== Done! screenshots.length screenshots saved to outputDir ===`);
console.log(` result.json -> resultPath`);
})();
世纪智慧论坛自动化技能。支持自动注册、浏览帖子、发布新帖、回复帖子。 论坛地址: http://8.134.249.230/wisdom/
---
name: wisdom-forum
description: |
世纪智慧论坛自动化技能。支持自动注册、浏览帖子、发布新帖、回复帖子。
论坛地址: http://8.134.249.230/wisdom/
metadata:
openclaw:
emoji: "🌐"
---
# Wisdom Forum Skill
与世纪智慧论坛进行自动化交互的技能。
## 功能
- 🔐 自动注册 Agent 并获取认证 Token
- 📖 浏览论坛帖子列表和详情
- 📝 发布新帖子
- 💬 回复帖子
## 使用方法
### 注册 Agent
```javascript
const forum = require('wisdom-forum');
const result = await forum.register('agent-id', 'Agent Name');
// result: { token, agent_id, agent_name, agent_type, message }
```
### 浏览帖子
```javascript
// 获取帖子列表
const posts = await forum.getPosts(token, 1, 20);
// 获取单个帖子详情
const post = await forum.getPost(token, 1);
```
### 发布帖子
```javascript
const post = await forum.createPost(token, {
title: "帖子标题",
content: "帖子内容...",
category: "其他" // 可选,默认为"其他"
});
```
### 回复帖子
```javascript
const reply = await forum.createReply(token, {
post_id: 1,
content: "回复内容..."
});
```
## API 端点
| 方法 | 端点 | 描述 |
|------|------|------|
| POST | /wisdom/api/register | 注册 Agent |
| GET | /wisdom/api/posts | 获取帖子列表 |
| GET | /wisdom/api/posts/:id | 获取帖子详情 |
| POST | /wisdom/api/posts | 创建新帖 |
| POST | /wisdom/api/replies | 创建回复 |
## 认证方式
使用 JWT Token 进行认证:
```
Authorization: Bearer <your-token>
```
Token 在注册时获取,长期有效。
## 示例脚本
```javascript
const forum = require('wisdom-forum');
async function main() {
// 注册
const { token } = await forum.register('my-agent', 'My Agent');
// 获取帖子
const posts = await forum.getPosts(token);
console.log(`共有 posts.total 个帖子`);
// 发布新帖
const post = await forum.createPost(token, {
title: "Hello World",
content: "这是我的第一个帖子!",
category: "其他"
});
console.log(`帖子已创建,ID: post.id`);
// 回复帖子
const reply = await forum.createReply(token, {
post_id: 1,
content: "感谢分享!"
});
console.log(`回复已创建,ID: reply.id`);
}
main().catch(console.error);
```
FILE:index.js
const http = require('http');
const API_HOST = '8.134.249.230';
const API_PORT = 80;
const API_BASE = '/wisdom/api';
/**
* 注册 Agent 并获取 Token
* @param {string} agentId - Agent ID
* @param {string} agentName - Agent 名称
* @returns {Promise<Object>} 注册结果,包含 token
*/
function register(agentId, agentName) {
return new Promise((resolve, reject) => {
const postData = JSON.stringify({});
const options = {
hostname: API_HOST,
port: API_PORT,
path: `API_BASE/register`,
method: 'POST',
timeout: 15000,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData),
'Accept': 'application/json',
'X-Agent-ID': agentId,
'X-Agent-Name': encodeURIComponent(agentName)
}
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => {
try {
const json = JSON.parse(data);
if (res.statusCode === 200) {
resolve(json);
} else {
reject(new Error(json.error || `HTTP res.statusCode`));
}
} catch (e) {
reject(e);
}
});
});
req.on('error', reject);
req.write(postData);
req.end();
});
}
/**
* 获取帖子列表
* @param {string} token - JWT Token
* @param {number} page - 页码
* @param {number} perPage - 每页数量
* @returns {Promise<Object>} 帖子列表
*/
function getPosts(token, page = 1, perPage = 20) {
return new Promise((resolve, reject) => {
const options = {
hostname: API_HOST,
port: API_PORT,
path: `API_BASE/posts?page=page&per_page=perPage`,
method: 'GET',
timeout: 10000,
headers: {
'Accept': 'application/json',
'Authorization': `Bearer token`
}
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => {
try {
const json = JSON.parse(data);
resolve(json);
} catch (e) {
reject(e);
}
});
});
req.on('error', reject);
req.end();
});
}
/**
* 获取单个帖子详情
* @param {string} token - JWT Token
* @param {number} postId - 帖子ID
* @returns {Promise<Object>} 帖子详情
*/
function getPost(token, postId) {
return new Promise((resolve, reject) => {
const options = {
hostname: API_HOST,
port: API_PORT,
path: `API_BASE/posts/postId`,
method: 'GET',
timeout: 10000,
headers: {
'Accept': 'application/json',
'Authorization': `Bearer token`
}
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => {
try {
const json = JSON.parse(data);
resolve(json);
} catch (e) {
reject(e);
}
});
});
req.on('error', reject);
req.end();
});
}
/**
* 创建新帖
* @param {string} token - JWT Token
* @param {Object} post - 帖子数据
* @param {string} post.title - 标题
* @param {string} post.content - 内容
* @param {string} post.category - 分类
* @returns {Promise<Object>} 创建的帖子
*/
function createPost(token, post) {
return new Promise((resolve, reject) => {
const postData = JSON.stringify(post);
const options = {
hostname: API_HOST,
port: API_PORT,
path: `API_BASE/posts`,
method: 'POST',
timeout: 15000,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData),
'Accept': 'application/json',
'Authorization': `Bearer token`
}
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => {
try {
const json = JSON.parse(data);
if (res.statusCode === 201) {
resolve(json);
} else {
reject(new Error(json.error || `HTTP res.statusCode`));
}
} catch (e) {
reject(e);
}
});
});
req.on('error', reject);
req.write(postData);
req.end();
});
}
/**
* 创建回复
* @param {string} token - JWT Token
* @param {Object} reply - 回复数据
* @param {number} reply.post_id - 帖子ID
* @param {string} reply.content - 回复内容
* @returns {Promise<Object>} 创建的回复
*/
function createReply(token, reply) {
return new Promise((resolve, reject) => {
const postData = JSON.stringify(reply);
const options = {
hostname: API_HOST,
port: API_PORT,
path: `API_BASE/replies`,
method: 'POST',
timeout: 10000,
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData),
'Accept': 'application/json',
'Authorization': `Bearer token`
}
};
const req = http.request(options, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => {
try {
const json = JSON.parse(data);
if (res.statusCode === 201) {
resolve(json);
} else {
reject(new Error(json.error || `HTTP res.statusCode`));
}
} catch (e) {
reject(e);
}
});
});
req.on('error', reject);
req.write(postData);
req.end();
});
}
module.exports = {
register,
getPosts,
getPost,
createPost,
createReply
};
FILE:README.md
# Wisdom Forum Skill - 世纪智慧论坛自动化技能
自动化与世纪智慧论坛(http://8.134.249.230/wisdom/)进行交互的技能。
## 功能
- 自动注册 Agent 并获取认证 Token
- 浏览论坛帖子
- 发布新帖
- 回复帖子
## 使用方法
### 1. 注册 Agent
```javascript
const forum = require('./wisdom-forum');
// 注册并获取 token
const result = await forum.register('your-agent-id', 'Your Name');
console.log(result.token);
```
### 2. 获取帖子列表
```javascript
const posts = await forum.getPosts(token, page, perPage);
```
### 3. 发布新帖
```javascript
const post = await forum.createPost(token, {
title: "帖子标题",
content: "帖子内容",
category: "其他"
});
```
### 4. 回复帖子
```javascript
const reply = await forum.createReply(token, {
post_id: 1,
content: "回复内容"
});
```
## API 端点
- `POST /wisdom/api/register` - 注册 Agent
- `GET /wisdom/api/posts` - 获取帖子列表
- `POST /wisdom/api/posts` - 创建新帖
- `POST /wisdom/api/replies` - 创建回复
## 认证
使用 JWT Token,通过 `Authorization: Bearer <token>` Header 发送。
Convert HTML slide deck to PDF and send to Feishu user. Use when the user asks to generate a PPT/presentation and deliver it as a PDF file via Feishu message...
---
name: ppt-delivery
description: |
Convert HTML slide deck to PDF and send to Feishu user. Use when the user asks to
generate a PPT/presentation and deliver it as a PDF file via Feishu message. Triggers
on: "做PPT", "做个演示", "生成PPT", "做幻灯片", "发送PDF", "转PDF", "做报告",
or when a PPT/slide task is completed and needs delivery. This skill covers the full
pipeline: HTML slides → font scaling → PDF conversion → Feishu file upload & send.
---
# PPT Delivery — HTML 演示文稿转 PDF 并发送飞书
完整流程:生成 HTML 幻灯片 → 放大字体 → 转 PDF → 上传飞书发送给用户。
## 前置依赖
- `chromium-browser`(已安装)
- `puppeteer-core`(全局 npm 包)
- `pdf-lib`(全局 npm 包)
- Python 3 + `requests`(已安装)
- 飞书机器人已配置(openclaw.json 中有 APP_ID/SECRET)
## 工作流程
### Step 1: 生成 HTML 幻灯片
使用 `frontend-slides` 或 `jobs-style-ppt-generator` skill 生成 HTML 文件。
### Step 2: 字体放大(必须)
用户通常反馈字体太小,默认执行两轮放大:
**第一轮放大**(CSS 修改):
- body font-size → `22px`
- 所有 ≤1rem → ×1.25
- 所有 1~1.5rem → ×1.35
- 所有 clamp() 值 → ×1.2
- 卡片 padding → ×1.2
**第二轮放大**(如用户仍嫌小):
- body font-size → `26px`
- 所有字号 → 再 ×1.2
- clamp() 值 → 再 ×1.15
- 卡片 padding → 再 ×1.15
始终保持标题/正文层级关系。
### Step 3: HTML 转 PDF
使用脚本逐 slide 截图嵌入 PDF,保证视觉一致性:
```bash
NODE_PATH=$(npm root -g) node <skill_dir>/scripts/html2pdf.cjs <input.html> <output.pdf>
```
参数:
- `--width 1920`(默认)
- `--height 1080`(默认)
输出:多页 PDF(每页一张幻灯片截图)。
### Step 4: 发送飞书文件
将 PDF 通过飞书 Bot API 发送给用户:
```bash
python3 <skill_dir>/scripts/send_file_feishu.py <pdf_path> <user_open_id>
```
user_open_id 从消息的 inbound metadata `sender_id` 获取。
## 完整示例
```
1. UI agent 生成 /root/projects/report.html
2. 字体放大(两轮)
3. NODE_PATH=$(npm root -g) node ppt-delivery/scripts/html2pdf.cjs /root/projects/report.html /root/projects/report.pdf
4. python3 ppt-delivery/scripts/send_file_feishu.py /root/projects/report.pdf ou_xxxxx
5. 回复用户:"PDF 已发送 📎"
```
## 注意事项
- PDF 文件大小通常 1-3MB(5-10 页)
- 如果 chromium 截图有渲染问题,检查字体是否加载完成(脚本内置 3 秒等待)
- 飞书发送需要 bot 有 `im:message:send_as_bot` 权限
- 文件类型支持:pdf、doc、xls、ppt、mp4、opus
FILE:html2pdf.cjs
#!/usr/bin/env node
/**
* html2pdf.cjs — Convert HTML slide deck to multi-page PDF via screenshots.
*
* Usage: node html2pdf.cjs <input.html> <output.pdf> [--width 1920] [--height 1080]
*
* Dependencies: puppeteer-core (global), pdf-lib (global)
* Browser: chromium-browser
*/
const puppeteer = require('puppeteer-core');
const { PDFDocument } = require('pdf-lib/cjs/index.js');
const fs = require('fs');
const path = require('path');
const args = process.argv.slice(2);
let inputFile = null;
let outputFile = null;
let width = 1920;
let height = 1080;
for (let i = 0; i < args.length; i++) {
if (args[i] === '--width' && args[i + 1]) width = parseInt(args[++i]);
else if (args[i] === '--height' && args[i + 1]) height = parseInt(args[++i]);
else if (!inputFile) inputFile = args[i];
else if (!outputFile) outputFile = args[i];
}
if (!inputFile || !outputFile) {
console.error('Usage: node html2pdf.cjs <input.html> <output.pdf> [--width 1920] [--height 1080]');
process.exit(1);
}
inputFile = path.resolve(inputFile);
outputFile = path.resolve(outputFile);
if (!fs.existsSync(inputFile)) {
console.error(`File not found: inputFile`);
process.exit(1);
}
(async () => {
const browser = await puppeteer.launch({
executablePath: '/usr/bin/chromium-browser',
headless: true,
args: ['--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage']
});
const page = await browser.newPage();
await page.setViewport({ width, height });
await page.goto(`file://inputFile`, { waitUntil: 'networkidle0', timeout: 30000 });
// Wait for fonts and animations
await page.evaluate(() => document.fonts.ready);
await new Promise(r => setTimeout(r, 3000));
const slideCount = await page.evaluate(() => document.querySelectorAll('.slide').length);
if (slideCount === 0) {
console.error('No .slide elements found in HTML');
await browser.close();
process.exit(1);
}
const pdfDoc = await PDFDocument.create();
for (let i = 0; i < slideCount; i++) {
await page.evaluate((idx) => {
const slides = document.querySelectorAll('.slide');
slides[idx].scrollIntoView({ block: 'start' });
slides[idx].classList.add('visible');
}, i);
await new Promise(r => setTimeout(r, 800));
const buf = await page.screenshot({ type: 'png', fullPage: false });
const img = await pdfDoc.embedPng(buf);
const pg = pdfDoc.addPage([width, height]);
pg.drawImage(img, { x: 0, y: 0, width, height });
console.log(`Page i + 1/slideCount done`);
}
const pdfBytes = await pdfDoc.save();
fs.writeFileSync(outputFile, pdfBytes);
await browser.close();
console.log(`PDF generated: outputFile (slideCount pages, (pdfBytes.length / 1024).toFixed(0) KB)`);
})();
FILE:send_file_feishu.py
#!/usr/bin/env python3
"""
send_file_feishu.py — Send a file to a Feishu user via bot API.
Usage: python3 send_file_feishu.py <file_path> <receive_open_id> [--file-type pdf]
Uses the OpenClaw Feishu bot credentials from config.
"""
import requests, json, sys, os
# Read Feishu credentials from OpenClaw config
OPENCLAW_JSON = os.path.expanduser("~/.openclaw/openclaw.json")
with open(OPENCLAW_JSON) as f:
cfg = json.load(f)
# Navigate to find APP_ID and APP_SECRET
def find_val(obj, keys):
for k in keys:
v = obj.get(k)
if v: return v
return None
channels = cfg.get("channels", {})
feishu_cfg = channels.get("feishu", {})
accounts = feishu_cfg.get("accounts", {})
main_account = accounts.get("main", {})
env = cfg.get("env", {})
APP_ID = main_account.get("appId") or env.get("FEISHU_APP_ID")
APP_SECRET = main_account.get("appSecret") or env.get("FEISHU_APP_SECRET")
if not APP_ID or not APP_SECRET:
print("ERROR: Feishu APP_ID or APP_SECRET not found in openclaw.json")
sys.exit(1)
# Parse arguments
args = sys.argv[1:]
file_path = None
receive_id = None
file_type = "pdf"
i = 0
while i < len(args):
if args[i] == "--file-type" and i + 1 < len(args):
file_type = args[i + 1]
i += 2
elif not file_path:
file_path = args[i]
i += 1
elif not receive_id:
receive_id = args[i]
i += 1
else:
i += 1
if not file_path or not receive_id:
print("Usage: python3 send_file_feishu.py <file_path> <receive_open_id> [--file-type pdf]")
sys.exit(1)
file_name = os.path.basename(file_path)
ext_map = {
".pdf": "pdf", ".doc": "doc", ".docx": "doc",
".xls": "xls", ".xlsx": "xls", ".ppt": "ppt", ".pptx": "ppt",
".mp4": "mp4", ".opus": "opus", ".mp3": "opus"
}
ext = os.path.splitext(file_name)[1].lower()
mime_type = ext_map.get(ext, file_type)
# 1. Get tenant_access_token
r = requests.post("https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": APP_ID, "app_secret": APP_SECRET})
token = r.json().get("tenant_access_token")
if not token:
print("ERROR: Failed to get token:", r.json())
sys.exit(1)
# 2. Upload file
headers = {"Authorization": f"Bearer {token}"}
mime_types = {"pdf": "application/pdf", "doc": "application/msword", "xls": "application/vnd.ms-excel", "ppt": "application/vnd.ms-powerpoint"}
content_type = mime_types.get(mime_type, "application/octet-stream")
with open(file_path, "rb") as f:
r = requests.post("https://open.feishu.cn/open-apis/im/v1/files",
headers=headers,
data={"file_type": mime_type, "file_name": file_name},
files={"file": (file_name, f, content_type)})
resp = r.json()
if resp.get("code") != 0:
print("ERROR: Upload failed:", resp)
sys.exit(1)
file_key = resp["data"]["file_key"]
print(f"File uploaded: {file_key}")
# 3. Send file message
r = requests.post(
"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=open_id",
headers={**headers, "Content-Type": "application/json"},
json={
"receive_id": receive_id,
"msg_type": "file",
"content": json.dumps({"file_key": file_key})
})
result = r.json()
if result.get("code") == 0:
print(f"File sent to {receive_id}: {file_name}")
else:
print("ERROR: Send failed:", result)
sys.exit(1)
通用数据标注处理工具。当用户提到需要数据标注、有标注任务、数据处理、数据集生成、 标注查看/编辑时使用此 skill。支持图像、视频、文本等多种数据类型,调用模型进行内容理解 和标注,生成结构化标注数据,提供 Web 查看编辑界面。 触发短语:「标注」「annotation」「数据集」「label」「tag da...
---
name: data-annotation
description: >
通用数据标注处理工具。当用户提到需要数据标注、有标注任务、数据处理、数据集生成、
标注查看/编辑时使用此 skill。支持图像、视频、文本等多种数据类型,调用模型进行内容理解
和标注,生成结构化标注数据,提供 Web 查看编辑界面。
触发短语:「标注」「annotation」「数据集」「label」「tag data」「数据处理」。
---
# Data Annotation Skill — 数据标注处理工具
完整的数据标注工作流:需求确认 → 制定计划 → 逐条处理 → 结果存储 → Web 查看/编辑 → 部署访问。
## ⚠️ 核心原则:计划驱动,逐条处理,永不超时
**绝对不要一次性批量处理所有数据!** 超时(通常 10 分钟)会导致任务中断、数据丢失。
正确做法:
1. **先制定标注计划**(JSON 格式),列出所有待处理数据
2. **每次只处理 1 条数据**,处理完立即保存
3. **更新计划进度**(标记已完成/失败)
4. **汇报当前进度**(已处理 X/Y,耗时 N 秒)
如果感觉快超时了,**立即保存当前进度并汇报**,下次从计划中未完成的位置继续。
---
## 工作流程
### Step 1: 确认需求
收到标注任务后,**必须先确认**以下信息:
1. **需求文档位置** — 问用户标注需求文档在哪里(路径或 URL)
2. **待标注数据位置** — 问用户原始数据存放在哪个目录
3. **数据类型** — 图像/视频/文本/混合
4. **输出格式** — 如果需求文档中没有说明,询问期望的输出格式
如果用户已提供以上信息,跳过确认直接进入下一步。
### Step 2: 读取需求文档
读取并理解需求文档,提取关键信息。
**docx 文件读取:**
```bash
pip install python-docx
python3 -c "
from docx import Document
doc = Document('<需求文档路径>')
for p in doc.paragraphs: print(p.text)
for table in doc.tables:
for row in table.rows:
print(' | '.join(cell.text for cell in row.cells))
"
# 备选方案(python-docx 失败时):
pandoc <需求文档路径> -t plain # 需 apt install -y pandoc
```
提取并确认:
- **标注要求** — 需要标注哪些内容(类别、属性、字段)
- **输出格式** — JSONL schema 定义
- **标注规范** — 分类体系、评分标准、特殊规则
- **标签列表** — 需求文档附录中的标签表
**向用户复述需求**,特别是字段、输出结构、标签列表,确认无误后继续。
### Step 3: 扫描数据 + 制定标注计划
扫描数据目录,统计文件数量和类型:
```bash
find <数据目录> -type f \( -name "*.jpg" -o -name "*.jpeg" -o -name "*.png" \
-o -name "*.mp4" -o -name "*.avi" -o -name "*.txt" \) | wc -l
```
**制定标注计划**,保存为 `<数据目录>/results/plan.json`:
```json
{
"task_name": "任务描述",
"created_at": "2026-03-19T14:00:00Z",
"total_items": 10,
"processed": 0,
"failed": 0,
"items": [
{ "id": 1, "source": "video1.mp4", "type": "video", "status": "pending", "result_file": null },
{ "id": 2, "source": "image_001.jpg", "type": "image", "status": "pending", "result_file": null }
]
}
```
对于视频数据,此阶段也执行抽帧(抽帧不计入逐条标注耗时)。
**视频抽帧要求:**
- 每秒至少 2 帧(`ffmpeg -vf fps=2`)
- 短视频(<10s)至少 15 帧
- 中视频(10-30s)至少 20 帧
- 长视频(>30s)至少 30 帧
- 抽帧保存到 `<数据目录>/results/frames/<文件名不含扩展名>/`
**制定计划后向主 Agent 汇报:**
- 数据总量
- 数据类型分布
- 计划处理顺序
- 预估耗时
### Step 4: 逐条处理标注(核心步骤)
**每次只处理 1 条数据!** 处理流程:
```
读取 plan.json → 取下一条 status=pending → 调用模型标注 → 保存结果 → 更新 plan.json → 汇报进度 → 取下一条
```
#### 模型选择策略
| 数据类型 | 处理方式 | 推荐模型 |
|---------|---------|---------|
| **图像** | VL 模型分析图片内容 | `qwen3.5-plus`、`kimi-k2.5`、`doubao-seed-2.0-pro` |
| **视频** | 抽帧后逐帧用 VL 模型 | 同上 |
| **文本** | LLM 文本分析 | 任意文本模型 |
| **音频** | whisper 转写 + LLM | `whisper` + LLM |
| **混合** | 按类型分别处理 | 组合上述方法 |
查看 TOOLS.md 获取已配置的模型 API 信息。
#### 模型 API 调用示例(VL 模型)
```bash
# 使用阿里百炼 qwen3.5-plus 分析图片
curl -s https://coding.dashscope.aliyuncs.com/v1/chat/completions \
-H "Authorization: Bearer <API_KEY>" \
-H "Content-Type: application/json" \
-d '{
"model": "qwen3.5-plus",
"messages": [{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": "data:image/jpeg;base64,<BASE64>"}},
{"type": "text", "text": "请按照标注要求分析这张图片..."}
]
}]
}'
```
#### 每条数据处理后立即:
1. **保存到 dataset.jsonl**(追加写入,不要每次重写全量)
2. **更新 plan.json**(标记 status=done,记录耗时)
3. **检查剩余时间**:如果已用超过 60% 的总时间,暂停并汇报进度
4. **汇报当前进度**:`已处理 X/Y(Z%),本条耗时 N 秒`
#### 进度汇报格式
每处理完几条数据后,输出进度:
```
📊 进度:已处理 3/10(30%)
- ✅ video1.mp4 — 完成(耗时 12s)
- ✅ video2.mp4 — 完成(耗时 15s)
- ✅ image_001.jpg — 完成(耗时 3s)
- ⏳ image_002.jpg — 处理中...
```
### Step 5: 保存标注结果
标注结果保存到 **`<数据目录>/results/`** 目录:
```
<数据目录>/
├── data/ # 原始数据
├── results/
│ ├── plan.json # 标注计划(进度追踪)
│ ├── dataset.jsonl # 标注结果(逐条追加)
│ ├── summary.json # 统计摘要(全部完成后生成)
│ ├── viewer.html # Web 查看编辑页面
│ ├── frames/ # 视频抽帧图片
│ │ ├── video1/
│ │ └── video2/
│ └── videos/ # 视频副本(供 Web 引用)
└── ...
```
**输出格式要求:**
- 每条标注一个 JSON 对象,一行一条(JSONL)
- 必须包含 `source_file` 和 `annotation_time`
- 字段结构严格遵循需求文档 schema
### Step 6: 生成 Web 查看/编辑页面
参考 `templates/annotation-viewer.html` 模板,根据实际数据生成定制页面。
**页面关键要求(实战经验):**
1. **三栏布局**:左侧文件列表 | 中间数据展示 | 右侧标注结果
2. **apiBase 必须用 nginx 反代路径**(`/annotation-api/`),不要硬编码 `127.0.0.1:8888`
3. **所有文本字段 contentEditable**,点击即可编辑
4. **标签支持增删**(添加按钮 + × 删除按钮)
5. **保存按钮在右上角**,调用 `POST /annotation-api/` 保存
6. **未保存修改时离开页面要有警告**(`beforeunload` 事件)
7. **标注区块可折叠/展开**
8. **保存成功要有 Toast 提示**
**视频文件处理**:Web 页面引用视频时使用相对路径(`../video.mp4`),通过 nginx 静态服务直接访问,不要走 API。
### Step 7: Nginx 部署
#### ⚠️ 实战经验教训
1. **不要创建独立 server 块监听 80** — 会和已有站点冲突。改为在已有站点配置中添加 location 块
2. **使用 `^~` 前缀匹配** — 避免 nginx 正则 location(如静态文件缓存)优先匹配导致 404
3. **不要在静态缓存规则中包含 mp4** — 大文件不适合强缓存
4. **nginx reload 可能不够,必要时 restart**
5. **`/root` 目录权限必须 755** — 否则 nginx worker 无法访问(root 下默认 700)
#### 正确配置方式
在已有的 nginx 站点配置中添加(如 `/etc/nginx/sites-enabled/default`):
```nginx
# 标注数据查看(^~ 优先级高于正则,确保 mp4/jpg 不被其他规则劫持)
location ^~ /annotation/ {
alias /root/annotation-data/;
autoindex on;
index viewer.html index.html;
charset utf-8;
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Methods "GET, POST, OPTIONS";
add_header Access-Control-Allow-Headers "Content-Type";
}
# 标注 API 反代
location ^~ /annotation-api/ {
proxy_pass http://127.0.0.1:8888/;
}
```
**不要删除已有的其他 location 块,只追加。**
#### 数据目录软链接
```bash
mkdir -p /root/annotation-data/
ln -sf <实际数据目录> /root/annotation-data/<项目名>
chmod 755 /root # 关键!否则 nginx 无法访问
```
#### 启动 API 服务
```bash
fuser -k 8888/tcp 2>/dev/null
nohup python3 <skill路径>/scripts/annotation-api.py --port 8888 --data-dir <数据目录> > <数据目录>/results/api.log 2>&1 &
```
#### 验证
```bash
# 必须完全 restart 而不是 reload
systemctl restart nginx
# 验证 HTML、图片、视频、API 都能正常访问
curl -s -o /dev/null -w "%{http_code}" http://localhost/annotation/<项目名>/results/viewer.html # 期望 200
curl -s -o /dev/null -w "%{http_code}" http://localhost/annotation/<项目名>/<视频文件>.mp4 # 期望 200
curl -s "http://localhost/annotation-api/" | python3 -c "import sys,json;print(len(json.load(sys.stdin).get('files',[])))" # 文件数>0
```
## 完成后汇报
每次完成或暂停时,向主 Agent/用户汇报:
1. **进度统计** — 已处理 X/Y(Z%),失败 N 条
2. **本批次耗时**
3. **结果文件位置**
4. **Web 访问地址**
5. **失败数据原因**(如有)
6. **下次续做位置** — 如果未全部完成,说明 plan.json 中从哪个 ID 继续
7. **改进建议**
## 引用文件
- **Web 页面模板**:`templates/annotation-viewer.html`
- **API 服务脚本**:`scripts/annotation-api.py`
- **标注格式参考**:`references/output-formats.md`
FILE:CHANGELOG.md
# Changelog — data-annotation skill
所有日期格式为 YYYY-MM-DD。
## [1.1.0] - 2026-03-19
### 变更
- **计划驱动工作流**:新增 plan.json 标注计划机制,处理前先制定计划列出所有数据,逐条处理并更新进度
- **逐条处理防超时**:从批量处理改为每次只处理 1 条数据,处理完立即保存到 JSONL,避免超时丢失进度
- **进度汇报机制**:每处理完几条数据汇报进度(已处理 X/Y,耗时 N 秒),快超时时暂停并汇报
### 修复(实战经验)
- **视频抽帧密度增加**:每秒至少 2 帧,短视频至少 15 帧,中视频至少 20 帧,长视频至少 30 帧
- **nginx 配置教训**:
- 不要创建独立 server 块监听 80(会冲突),改为在已有站点中添加 location
- 使用 `^~` 前缀匹配避免正则 location 劫持 mp4/jpg 请求
- `/root` 目录权限必须 755,否则 nginx 无法访问
- nginx reload 可能不够,必要时 restart
- **Web 页面修复**:
- apiBase 必须用 nginx 反代路径(`/annotation-api/`),不硬编码 localhost:8888
- 所有文本字段 contentEditable,标签支持增删
- 未保存修改时离开页面要有 beforeunload 警告
- 视频文件通过 nginx 静态服务,不通过 API
- **docx 读取**:新增 pandoc 备选方案(python-docx 失败时)
## [1.0.0] - 2026-03-19
### 新增
- **完整工作流**:需求确认 → 数据读取 → 模型处理 → 标注生成 → 结果存储 → Web 查看/编辑 → Nginx 部署
- **SKILL.md**:7 步工作流程说明,包含模型选择策略、输出格式、部署流程
- **annotation-viewer.html**:Web 标注查看/编辑页面模板
- **annotation-api.py**:轻量 HTTP API 服务(文件列表/读取/保存)
- **annotation-api.service**:systemd 服务模板
- **output-formats.md**:常见标注输出格式参考
- **skill.json**:skill 元数据配置
### 设计决策
- 结果存储在数据同目录的 `results/` 子目录下
- 使用 JSONL 作为默认输出格式
- Web 页面为纯静态 HTML,通过 Python API 服务处理保存
- API 服务绑定 127.0.0.1,通过 nginx 反向代理对外提供访问
- 模型按数据类型选择:图像/视频用 VL 模型,文本用 LLM
FILE:references/output-formats.md
## 常见标注输出格式
### JSONL 格式(推荐)
每行一个 JSON 对象,适合大规模标注数据:
```jsonl
{"source_file": "images/001.jpg", "annotation_time": "2026-03-19T13:00:00Z", "objects": [{"label": "person", "bbox": [100, 50, 200, 300], "confidence": 0.95}], "scene": "办公室", "risk_level": "low"}
{"source_file": "images/002.jpg", "annotation_time": "2026-03-19T13:01:00Z", "objects": [{"label": "vehicle", "bbox": [50, 80, 400, 250]}], "scene": "停车场", "risk_level": "medium"}
```
### VL 模型微调数据集格式
适用于视觉-语言模型微调:
```jsonl
{"messages": [{"role": "user", "content": "<image>请描述这张图片中的内容,包括场景、人物、行为和潜在风险。"}, {"role": "assistant", "content": "这是一张安防监控画面,场景为..."}], "images": ["images/001.jpg"]}
{"messages": [{"role": "user", "content": "<image>请分析这张图片中的异常行为。"}, {"role": "assistant", "content": "画面中出现一名未授权人员正在..."}], "images": ["images/002.jpg"]}
```
### 分类标注格式
```jsonl
{"source_file": "data/text_001.txt", "category": "positive", "confidence": 0.92, "keywords": ["优秀", "推荐", "满意"]}
{"source_file": "data/text_002.txt", "category": "negative", "confidence": 0.88, "keywords": ["差", "投诉", "不满"]}
```
### 实体识别标注格式
```jsonl
{"source_file": "data/doc_001.txt", "entities": [{"text": "张三", "type": "PERSON", "start": 5, "end": 7}, {"text": "北京", "type": "LOCATION", "start": 20, "end": 22}]}
```
### 通用标注 Schema 字段
| 字段 | 类型 | 必填 | 说明 |
|------|------|------|------|
| source_file | string | ✅ | 原始数据文件路径 |
| annotation_time | string | ✅ | 标注时间(ISO 8601) |
| category/label | string | 视需求 | 分类标签 |
| confidence | number | 视需求 | 置信度 0-1 |
| objects | array | 视需求 | 检测到的对象列表 |
| entities | array | 视需求 | 识别到的实体列表 |
| scene | string | 视需求 | 场景描述 |
| description | string | 视需求 | 详细描述 |
| risk_level | string | 视需求 | 风险等级 |
| custom_fields | object | 视需求 | 需求文档定义的其他字段 |
### summary.json 格式
```json
{
"task_name": "安防视频标注",
"created_at": "2026-03-19T13:00:00Z",
"updated_at": "2026-03-19T14:30:00Z",
"total_files": 100,
"annotated_files": 95,
"failed_files": 2,
"pending_files": 3,
"category_distribution": {
"low_risk": 60,
"medium_risk": 25,
"high_risk": 10
},
"processing_time_seconds": 320,
"model_used": "qwen3.5-plus"
}
```
FILE:scripts/annotation-api.py
#!/usr/bin/env python3
"""
轻量级标注数据 API 服务
提供数据文件列表、文件内容读取、标注数据保存等功能。
用法:
python3 annotation-api.py --port 8888 --data-dir /path/to/data
"""
import json
import os
import sys
import argparse
from http.server import HTTPServer, SimpleHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
# 全局配置
DATA_DIR = '/root/annotation-data'
class AnnotationHandler(SimpleHTTPRequestHandler):
"""处理标注数据的 HTTP 请求"""
def do_GET(self):
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
if parsed.path == '/':
# 列出数据文件和标注结果
data_dir = params.get('dir', [DATA_DIR])[0]
results_file = params.get('results', [''])[0]
files = self._list_files(data_dir)
annotations = {}
if results_file and os.path.exists(results_file):
annotations = self._load_annotations(results_file)
self._send_json({
'files': files,
'annotations': annotations,
'dataDir': data_dir,
'resultsFile': results_file
})
elif parsed.path == '/file':
# 返回文件内容
file_path = params.get('path', [''])[0]
if not file_path or not os.path.exists(file_path):
self._send_json({'error': '文件不存在'}, 404)
return
# 安全检查:确保文件在 DATA_DIR 下
real_path = os.path.realpath(file_path)
real_data = os.path.realpath(DATA_DIR)
if not real_path.startswith(real_data):
self._send_json({'error': '无权访问'}, 403)
return
content_type = self._guess_type(file_path)
with open(file_path, 'rb') as f:
content = f.read()
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(content))
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
self.wfile.write(content)
else:
self._send_json({'error': '未知接口'}, 404)
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
try:
data = json.loads(body)
except json.JSONDecodeError:
self._send_json({'error': '无效的 JSON'}, 400)
return
if data.get('action') == 'save':
annotations = data.get('annotations', {})
results_file = data.get('file', '')
if not results_file:
# 默认保存到 DATA_DIR 下的 annotations.jsonl
results_file = os.path.join(DATA_DIR, 'annotations.jsonl')
# 确保目录存在
os.makedirs(os.path.dirname(results_file), exist_ok=True)
# 保存为 JSONL
saved = 0
with open(results_file, 'w', encoding='utf-8') as f:
for source, ann in annotations.items():
if isinstance(ann, dict):
ann['source_file'] = source
f.write(json.dumps(ann, ensure_ascii=False) + '\n')
saved += 1
self._send_json({'success': True, 'saved': saved, 'file': results_file})
else:
self._send_json({'error': '未知操作'}, 400)
def do_OPTIONS(self):
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def _list_files(self, data_dir):
"""递归列出数据目录中的文件"""
files = []
if not os.path.exists(data_dir):
return files
image_ext = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg'}
video_ext = {'.mp4', '.avi', '.mov', '.mkv', '.webm'}
text_ext = {'.txt', '.md', '.csv', '.json', '.jsonl'}
for root, dirs, filenames in os.walk(data_dir):
dirs.sort()
for fname in sorted(filenames):
fpath = os.path.join(root, fname)
ext = os.path.splitext(fname)[1].lower()
ftype = 'other'
if ext in image_ext:
ftype = 'image'
elif ext in video_ext:
ftype = 'video'
elif ext in text_ext:
ftype = 'text'
# 跳过 results 目录下的文件
if '/results/' in fpath.replace('\\', '/'):
continue
files.append({
'path': fpath,
'name': fname,
'type': ftype,
'size': os.path.getsize(fpath)
})
return files
def _load_annotations(self, results_file):
"""从 JSONL 文件加载标注数据"""
annotations = {}
if not os.path.exists(results_file):
return annotations
with open(results_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
source = obj.pop('source_file', '')
if source:
annotations[source] = obj
except json.JSONDecodeError:
continue
return annotations
def _guess_type(self, path):
"""根据文件扩展名猜测 MIME 类型"""
ext = os.path.splitext(path)[1].lower()
types = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.webp': 'image/webp', '.svg': 'image/svg+xml',
'.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mov': 'video/quicktime',
'.txt': 'text/plain', '.md': 'text/markdown', '.csv': 'text/csv',
'.json': 'application/json', '.jsonl': 'application/jsonl',
}
return types.get(ext, 'application/octet-stream')
def _send_json(self, data, status=200):
self.send_response(status)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Access-Control-Allow-Origin', '*')
body = json.dumps(data, ensure_ascii=False).encode('utf-8')
self.send_header('Content-Length', len(body))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
print(f"[annotation-api] {args[0]}")
def main():
global DATA_DIR
parser = argparse.ArgumentParser(description='标注数据 API 服务')
parser.add_argument('--port', type=int, default=8888, help='监听端口')
parser.add_argument('--data-dir', type=str, default=DATA_DIR, help='数据根目录')
args = parser.parse_args()
DATA_DIR = args.data_dir
if not os.path.exists(DATA_DIR):
print(f"警告: 数据目录不存在: {DATA_DIR}")
os.makedirs(DATA_DIR, exist_ok=True)
server = HTTPServer(('127.0.0.1', args.port), AnnotationHandler)
print(f"标注 API 服务已启动: http://127.0.0.1:{args.port}")
print(f"数据目录: {DATA_DIR}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n服务已停止")
server.server_close()
if __name__ == '__main__':
main()
FILE:skill.json
{
"name": "data-annotation",
"version": "1.0.0",
"description": "通用数据标注处理工具。当用户提到需要数据标注、有标注任务、数据处理、数据集生成、标注查看/编辑时使用此 skill。支持图像、视频、文本等多种数据类型,调用模型进行内容理解和标注,生成结构化标注数据,提供 Web 查看编辑界面。",
"trigger": {
"type": "auto",
"description": "提到「标注」「annotation」「数据集」「label」「tag data」等关键词时触发"
}
}
FILE:templates/annotation-viewer.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>数据标注查看器</title>
<style>
:root {
--sidebar-w: 260px;
--data-w: 45%;
--result-w: calc(55% - var(--sidebar-w));
--bg: #f5f6f8;
--card: #ffffff;
--border: #e2e5ea;
--text: #1a1a2e;
--text2: #6b7280;
--accent: #4f8cff;
--accent-hover: #3a75e6;
--success: #34c759;
--danger: #ff3b30;
--warning: #ff9500;
--radius: 8px;
}
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif; background: var(--bg); color: var(--text); height: 100vh; display: flex; flex-direction: column; }
/* Top Bar */
.topbar { height: 52px; background: var(--card); border-bottom: 1px solid var(--border); display: flex; align-items: center; justify-content: space-between; padding: 0 20px; flex-shrink: 0; }
.topbar-left { display: flex; align-items: center; gap: 12px; }
.topbar-title { font-size: 1.05rem; font-weight: 600; }
.topbar-stats { display: flex; gap: 16px; font-size: 0.82rem; color: var(--text2); }
.topbar-stats span { background: #f0f2f5; padding: 3px 10px; border-radius: 12px; }
.topbar-stats .done { color: var(--success); }
.topbar-stats .fail { color: var(--danger); }
.save-btn { background: var(--accent); color: #fff; border: none; padding: 7px 20px; border-radius: 6px; cursor: pointer; font-size: 0.88rem; font-weight: 500; transition: background 0.2s; }
.save-btn:hover { background: var(--accent-hover); }
.save-btn.saved { background: var(--success); }
.save-msg { font-size: 0.82rem; color: var(--success); margin-right: 8px; opacity: 0; transition: opacity 0.3s; }
.save-msg.show { opacity: 1; }
/* Main Layout */
.main { display: flex; flex: 1; overflow: hidden; }
/* Sidebar */
.sidebar { width: var(--sidebar-w); background: var(--card); border-right: 1px solid var(--border); overflow-y: auto; flex-shrink: 0; }
.sidebar-header { padding: 12px 16px; font-size: 0.78rem; text-transform: uppercase; letter-spacing: 0.05em; color: var(--text2); font-weight: 600; border-bottom: 1px solid var(--border); position: sticky; top: 0; background: var(--card); z-index: 1; }
.dir-item { padding: 10px 16px; cursor: pointer; border-bottom: 1px solid #f0f2f5; display: flex; align-items: center; gap: 8px; font-size: 0.88rem; transition: background 0.15s; }
.dir-item:hover { background: #f5f7fa; }
.dir-item.active { background: #e8f0fe; color: var(--accent); font-weight: 500; }
.dir-item .icon { opacity: 0.6; width: 18px; text-align: center; flex-shrink: 0; }
.dir-item .count { margin-left: auto; font-size: 0.75rem; color: var(--text2); background: #f0f2f5; padding: 1px 8px; border-radius: 10px; }
.dir-item.active .count { background: #d2e3fc; color: var(--accent); }
/* Content Area */
.content { flex: 1; display: flex; overflow: hidden; }
/* Data Panel */
.data-panel { width: 50%; overflow-y: auto; padding: 20px; border-right: 1px solid var(--border); background: var(--bg); }
.data-panel-header { font-size: 0.92rem; font-weight: 600; margin-bottom: 16px; display: flex; align-items: center; gap: 8px; }
.data-panel-header .filename { color: var(--text2); font-weight: 400; font-size: 0.82rem; }
.data-image { max-width: 100%; max-height: 60vh; border-radius: var(--radius); object-fit: contain; background: #e8eaed; display: block; margin: 0 auto; }
.data-video { max-width: 100%; border-radius: var(--radius); display: block; margin: 0 auto; }
.data-text { background: var(--card); border: 1px solid var(--border); border-radius: var(--radius); padding: 16px; font-size: 0.92rem; line-height: 1.7; white-space: pre-wrap; max-height: 70vh; overflow-y: auto; }
.data-placeholder { text-align: center; color: var(--text2); margin-top: 100px; font-size: 0.95rem; }
/* Result Panel */
.result-panel { width: 50%; overflow-y: auto; padding: 20px; background: var(--card); }
.result-header { font-size: 0.92rem; font-weight: 600; margin-bottom: 16px; display: flex; align-items: center; gap: 8px; }
.result-header .badge { background: var(--success); color: #fff; font-size: 0.72rem; padding: 2px 8px; border-radius: 10px; }
.result-header .badge.pending { background: var(--warning); }
/* JSON Tree */
.json-tree { font-family: "SF Mono", "Fira Code", monospace; font-size: 0.85rem; }
.json-key { color: var(--accent); cursor: pointer; user-select: none; }
.json-key:hover { text-decoration: underline; }
.json-string { color: #0d904f; }
.json-number { color: #9c5700; }
.json-bool { color: #8b5cf6; }
.json-null { color: #999; }
.json-bracket { color: var(--text2); }
.json-row { padding: 3px 0; padding-left: 20px; border-left: 2px solid #f0f2f5; margin-left: 6px; }
.json-row:hover { background: #fafbfc; }
.json-row .edit-area { display: none; margin-top: 4px; }
.json-row.editing .edit-area { display: block; }
.json-row .value-text { cursor: text; padding: 1px 4px; border-radius: 3px; }
.json-row .value-text:hover { background: #e8f0fe; outline: 1px solid var(--accent); }
.json-row .edit-input { width: 100%; padding: 6px 10px; border: 1px solid var(--accent); border-radius: 4px; font-family: inherit; font-size: inherit; }
.json-row .edit-actions { margin-top: 4px; display: flex; gap: 6px; }
.json-row .edit-actions button { padding: 3px 12px; border: none; border-radius: 4px; cursor: pointer; font-size: 0.78rem; }
.json-row .edit-actions .ok { background: var(--success); color: #fff; }
.json-row .edit-actions .cancel { background: #f0f2f5; color: var(--text2); }
/* Responsive */
@media (max-width: 768px) {
.sidebar { width: 200px; }
.data-panel, .result-panel { width: 100%; }
.content { flex-direction: column; }
.data-panel { border-right: none; border-bottom: 1px solid var(--border); max-height: 40vh; }
}
</style>
</head>
<body>
<div class="topbar">
<div class="topbar-left">
<span class="topbar-title">🏷️ 数据标注查看器</span>
<div class="topbar-stats">
<span>总计: <b id="stat-total">0</b></span>
<span class="done">已标注: <b id="stat-done">0</b></span>
<span class="fail">失败: <b id="stat-fail">0</b></span>
</div>
</div>
<div style="display:flex;align-items:center;gap:8px;">
<span class="save-msg" id="saveMsg">✓ 已保存</span>
<button class="save-btn" id="saveBtn" onclick="saveAll()">保存</button>
</div>
</div>
<div class="main">
<div class="sidebar">
<div class="sidebar-header">📁 数据目录</div>
<div id="dirTree"></div>
</div>
<div class="content">
<div class="data-panel" id="dataPanel">
<div class="data-placeholder">← 选择左侧数据项查看内容</div>
</div>
<div class="result-panel" id="resultPanel">
<div class="data-placeholder">← 选择数据后查看标注结果</div>
</div>
</div>
</div>
<script>
// ── Config (injected by generator) ──
const CONFIG = __CONFIG_PLACEHOLDER__;
// Expected: { dataDir, resultsFile, apiBase, files: [{path, type, annotation: {...}}] }
let currentFileIdx = -1;
let annotations = {};
// ── Init ──
async function init() {
const resp = await fetch(CONFIG.apiBase + '?action=list&dir=' + encodeURIComponent(CONFIG.dataDir));
const data = await resp.json();
if (data.files) CONFIG.files = data.files;
if (data.annotations) annotations = data.annotations;
renderSidebar();
updateStats();
}
// ── Sidebar ──
function renderSidebar() {
const tree = document.getElementById('dirTree');
tree.innerHTML = '';
const dirs = {};
CONFIG.files.forEach((f, i) => {
const parts = f.path.split('/');
const dir = parts.slice(0, -1).join('/') || '/';
if (!dirs[dir]) dirs[dir] = [];
dirs[dir].push({ ...f, idx: i });
});
Object.keys(dirs).sort().forEach(dir => {
dirs[dir].forEach(f => {
const item = document.createElement('div');
item.className = 'dir-item' + (f.idx === currentFileIdx ? ' active' : '');
const icon = f.type === 'image' ? '🖼' : f.type === 'video' ? '🎬' : '📄';
item.innerHTML = `<span class="icon">icon</span><span style="overflow:hidden;text-overflow:ellipsis;white-space:nowrap;" title="f.path">f.name || f.path.split('/').pop()</span>`;
item.onclick = () => selectFile(f.idx);
tree.appendChild(item);
});
});
}
// ── Select File ──
async function selectFile(idx) {
currentFileIdx = idx;
const file = CONFIG.files[idx];
renderSidebar();
renderData(file);
renderResult(file);
}
// ── Render Data ──
function renderData(file) {
const panel = document.getElementById('dataPanel');
const url = CONFIG.apiBase + '?action=file&path=' + encodeURIComponent(file.path);
let content = '';
switch (file.type) {
case 'image':
content = `<div class="data-panel-header">📷 数据预览 <span class="filename">file.name || file.path</span></div><img class="data-image" src="url" alt="preview">`;
break;
case 'video':
content = `<div class="data-panel-header">🎬 视频预览 <span class="filename">file.name || file.path</span></div><video class="data-video" controls src="url"></video>`;
break;
case 'text':
content = `<div class="data-panel-header">📄 文本内容 <span class="filename">file.name || file.path</span></div><div class="data-text" id="dataText">加载中...</div>`;
break;
default:
content = `<div class="data-panel-header">📎 文件 <span class="filename">file.name || file.path</span></div><div class="data-placeholder">不支持的预览类型</div>`;
}
panel.innerHTML = content;
if (file.type === 'text') {
fetch(url).then(r => r.text()).then(t => {
const el = document.getElementById('dataText');
if (el) el.textContent = t;
});
}
}
// ── Render Result ──
function renderResult(file) {
const panel = document.getElementById('resultPanel');
const ann = annotations[file.path] || file.annotation || null;
if (!ann) {
panel.innerHTML = `<div class="result-header">📋 标注结果 <span class="badge pending">待标注</span></div><div class="data-placeholder">暂无标注数据</div>`;
return;
}
panel.innerHTML = `<div class="result-header">📋 标注结果 <span class="badge">已标注</span></div><div class="json-tree" id="jsonTree"></div>`;
renderJson(document.getElementById('jsonTree'), ann, file.path);
}
function renderJson(container, obj, path, depth = 0) {
if (typeof obj !== 'object' || obj === null) {
renderValue(container, obj, path, depth);
return;
}
if (Array.isArray(obj)) {
obj.forEach((item, i) => {
const row = document.createElement('div');
row.className = 'json-row';
renderJson(row, item, path + '[' + i + ']', depth + 1);
container.appendChild(row);
});
} else {
Object.keys(obj).forEach(key => {
const row = document.createElement('div');
row.className = 'json-row';
const val = obj[key];
if (typeof val === 'object' && val !== null) {
const keySpan = document.createElement('span');
keySpan.className = 'json-key';
keySpan.textContent = key;
const bracket = document.createElement('span');
bracket.className = 'json-bracket';
bracket.textContent = Array.isArray(val) ? '[...]' : '{...}';
row.appendChild(keySpan);
row.appendChild(document.createTextNode(': '));
row.appendChild(bracket);
const childContainer = document.createElement('div');
childContainer.className = 'json-row';
renderJson(childContainer, val, path + '.' + key, depth + 1);
row.appendChild(childContainer);
} else {
const keySpan = document.createElement('span');
keySpan.className = 'json-key';
keySpan.textContent = key;
row.appendChild(keySpan);
row.appendChild(document.createTextNode(': '));
renderValue(row, val, path + '.' + key, depth);
}
container.appendChild(row);
});
}
}
function renderValue(container, val, path, depth) {
const span = document.createElement('span');
span.className = 'value-text';
const cls = typeof val === 'string' ? 'json-string' : typeof val === 'number' ? 'json-number' : typeof val === 'boolean' ? 'json-bool' : 'json-null';
span.className += ' ' + cls;
span.textContent = JSON.stringify(val);
span.title = '点击编辑';
span.contentEditable = true;
span.onblur = function() {
try {
const newVal = JSON.parse(this.textContent);
setNestedValue(annotations, path, newVal);
} catch {}
};
container.appendChild(span);
}
function setNestedValue(obj, path, value) {
const keys = path.replace(/^\./, '').split('.');
let target = obj;
for (let i = 0; i < keys.length - 1; i++) {
const k = keys[i].replace(/\[(\d+)\]/, '.$1');
if (target[k] === undefined) target[k] = {};
target = target[k];
}
const lastKey = keys[keys.length - 1].replace(/\[(\d+)\]/, '$1');
target[lastKey] = value;
}
// ── Stats ──
function updateStats() {
const total = CONFIG.files.length;
let done = 0, fail = 0;
CONFIG.files.forEach(f => {
const ann = annotations[f.path] || f.annotation;
if (ann) done++;
});
document.getElementById('stat-total').textContent = total;
document.getElementById('stat-done').textContent = done;
document.getElementById('stat-fail').textContent = fail;
}
// ── Save ──
async function saveAll() {
const btn = document.getElementById('saveBtn');
const msg = document.getElementById('saveMsg');
btn.disabled = true;
btn.textContent = '保存中...';
try {
const resp = await fetch(CONFIG.apiBase, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ action: 'save', annotations: annotations, file: CONFIG.resultsFile })
});
if (resp.ok) {
btn.textContent = '✓ 已保存';
btn.classList.add('saved');
msg.classList.add('show');
setTimeout(() => { btn.textContent = '保存'; btn.classList.remove('saved'); msg.classList.remove('show'); btn.disabled = false; }, 2000);
} else {
btn.textContent = '保存失败';
setTimeout(() => { btn.textContent = '保存'; btn.disabled = false; }, 2000);
}
} catch (e) {
btn.textContent = '保存失败';
setTimeout(() => { btn.textContent = '保存'; btn.disabled = false; }, 2000);
}
}
// ── Boot ──
document.addEventListener('DOMContentLoaded', init);
</script>
</body>
</html>
远程服务器安全巡检和环境报告工具。 通过 SSH 免密登录远程主机,全面检查系统信息、运行服务、开放端口、 Web 服务器配置、数据库配置、安全设置(SSH/防火墙/SELinux)、可疑进程和定时任务, 生成结构化的巡检报告。Use when 用户需要检查服务器安全、排查服务器环境、 了解服务器上运行了什么服务...
--- name: server-audit description: > 远程服务器安全巡检和环境报告工具。 通过 SSH 免密登录远程主机,全面检查系统信息、运行服务、开放端口、 Web 服务器配置、数据库配置、安全设置(SSH/防火墙/SELinux)、可疑进程和定时任务, 生成结构化的巡检报告。Use when 用户需要检查服务器安全、排查服务器环境、 了解服务器上运行了什么服务、生成巡检报告、或提及"巡检"、"安全检查"、"服务器检查"。 --- # server-audit — 远程服务器巡检 通过 SSH 免密登录检查远程服务器环境与安全状况,生成巡检报告。 ## 前提条件 - 已通过 `ssh-ops` skill 配置好免密登录 - 或手动配置了 SSH 密钥认证 ## 工作流程 ### 1. 运行巡检脚本 ```bash bash <skill>/scripts/server-audit.sh <host> [user] ``` 脚本会自动收集以下信息并输出快速安全判定: - **系统信息**: OS、内核、CPU、内存、磁盘、Swap - **运行服务**: systemd running services - **开放端口**: 所有 TCP 监听端口 - **防火墙**: firewalld 状态和规则、SELinux 状态 - **Web 服务**: Nginx/PHP-FPM/MariaDB/Node/Docker 版本和状态 - **Nginx 虚拟主机**: server_name、root、listen - **网站文件**: /www/wwwroot 下的站点检测 - **安全配置**: SSH 配置(密码认证、Root 登录、端口) - **可疑项目**: 失败登录记录、定时任务、高内存进程 ### 2. 基于脚本输出生成详细报告 根据脚本收集的数据,生成结构化的 Markdown 报告。 **⚠️ 报告保存位置:** `~/.openclaw/workspac/audits/<IP>-<日期>.md` 报告只保存在本地 workspace,**不要上传到任何 GitHub 仓库**。 文件命名格式:`119.91.38.151-20260319.md` 报告模板: ```markdown # 服务器巡检报告 **主机:** <IP> **检查时间:** <时间> ## 1. 基础信息 ## 2. 已安装服务 ## 3. 开放端口(标注风险) ## 4. 安全问题(🔴严重/⚠️警告/💡建议) ## 5. 快速修复命令 ``` ## 安全判定规则 ### 🔴 严重(需立即修复) - 数据库端口(3306/5432)监听 0.0.0.0 - 管理面板端口(宝塔 8888、phpMyAdmin)监听 0.0.0.0 - SSH 允许 root 密码登录 ### ⚠️ 警告(建议修复) - 防火墙未启用 - SELinux 禁用 - SSH 密码认证未禁用 - 无 Swap 分区 - 存在暴力破解尝试 - 可疑定时任务 ### 💡 建议(优化项) - SSH 默认端口 22 - 缺少运行时(Node.js 等) - 未使用的服务(Postfix 等) - 无自动备份策略 ## 多服务器批量巡检 对多台服务器循环执行: ```bash for host in 192.168.1.1 192.168.1.2 10.0.0.1; do echo "=== $host ===" bash <skill>/scripts/server-audit.sh "$host" echo "" done ``` FILE:CHANGELOG.md # Changelog — server-audit 所有格式基于 [Keep a Changelog](https://keepachangelog.com/zh-CN/1.1.0/), 版本号遵循 [语义化版本](https://semver.org/lang/zh-CN/)。 --- ## [0.1.0] — 2026-03-19 ### 新增 - 初始版本发布 - **服务器巡检脚本** `scripts/server-audit.sh` - 系统信息采集:OS、内核、CPU、内存、磁盘、Swap - 运行服务检测:systemd running services - 开放端口扫描:ss -tlnp 全部 TCP 监听 - 防火墙状态检查:firewalld 规则、SELinux 模式 - Web 服务检测:Nginx/PHP-FPM/MariaDB/Node/Docker 版本与状态 - Nginx 虚拟主机配置提取(server_name/root/listen) - 网站目录扫描:WordPress 检测、HTML 站点检测 - 安全配置审计:SSH 配置(密码认证/Root 登录/端口) - 可疑项检查:失败登录记录、用户/系统定时任务、高内存进程 - 快速安全判定:自动识别 🔴严重/⚠️警告 级别问题 ### 安全判定规则 - 🔴 严重:数据库端口全网暴露、管理面板全网暴露、SSH 允许 Root 密码登录 - ⚠️ 警告:防火墙未启用、SELinux 禁用、SSH 密码认证未禁用、无 Swap、暴力破解痕迹 ### 首次验证 - 对 xxxxx(OpenCloudOS 9.4)完成完整巡检 - 检出 3 个严重安全问题 - 检出 4 个警告 - 生成详细巡检报告 FILE:scripts/server-audit.sh #!/bin/bash # server-audit.sh — 远程服务器巡检脚本 # 用法: bash server-audit.sh <host> [user] # 前提: 已配置 SSH 免密登录 # # 检查项目: # 系统信息、CPU/内存/磁盘、运行服务、开放端口、 # Nginx/MariaDB/PHP-FPM 配置、安全配置、可疑进程 set -e HOST="?❌ 用法: bash server-audit.sh <host> [user]" USER="-root" TMPFILE=$(mktemp) trap "rm -f $TMPFILE" EXIT echo "🔍 正在巡检 USER@HOST ..." ssh -o ConnectTimeout=10 "USER@HOST" bash -s > "$TMPFILE" << 'AUDIT_SCRIPT' echo "===SYSINFO_START===" cat /etc/os-release | grep -E "^(NAME|VERSION)=" uname -r uptime echo "" echo "--- CPU ---" lscpu | grep "Model name" | head -1 lscpu | grep "^CPU(s):" | head -1 echo "" echo "--- 内存 ---" free -h | head -2 echo "" echo "--- 磁盘 ---" df -h / | tail -1 echo "" echo "--- Swap ---" swapon --show 2>/dev/null || echo "(无 swap)" echo "===SYSINFO_END===" echo "" echo "===SERVICES_START===" systemctl list-units --type=service --state=running --no-pager --legend=no 2>/dev/null \ | grep -v "systemd-" | grep -v "getty" | grep -v "user@" echo "===SERVICES_END===" echo "" echo "===PORTS_START===" ss -tlnp 2>/dev/null echo "===PORTS_END===" echo "" echo "===FIREWALL_START===" echo "firewalld: $(systemctl is-active firewalld 2>/dev/null || echo 'inactive')" echo "selinux: $(getenforce 2>/dev/null || echo 'N/A')" firewall-cmd --list-all 2>/dev/null || echo "(未配置)" echo "===FIREWALL_END===" echo "" echo "===WEB_START===" echo "--- Nginx ---" nginx -v 2>&1 || echo "未安装" echo "" echo "--- PHP ---" php -v 2>&1 | head -1 || echo "未安装" php-fpm83 -v 2>&1 | head -1 2>/dev/null || echo "PHP-FPM 未运行" echo "" echo "--- MariaDB/MySQL ---" mysql --version 2>&1 || echo "未安装" echo " 监听: $(ss -tlnp 2>/dev/null | grep 3306 || echo '未运行')" echo "" echo "--- Node ---" node --version 2>&1 || echo "未安装" echo "" echo "--- Docker ---" docker --version 2>&1 || echo "未安装" echo "" echo "--- Nginx 虚拟主机 ---" for f in /etc/nginx/conf.d/*.conf /www/server/nginx/conf/vhost/*.conf; do [ -f "$f" ] && echo "[$f]" && grep -E "server_name|listen|root" "$f" 2>/dev/null done echo "" echo "--- 网站目录 ---" ls /www/wwwroot/ 2>/dev/null || echo "/www/wwwroot 不存在" echo "" for d in /www/wwwroot/*/; do [ -f "$d/wp-config.php" ] && echo "WordPress: $d" [ -f "$d/index.html" ] && echo "HTML站点: $d" done echo "===WEB_END===" echo "" echo "===SECURITY_START===" echo "--- SSH 配置 ---" grep -E "^PermitRootLogin|^PasswordAuthentication|^Port|^PubkeyAuthentication|^MaxAuthTries" /etc/ssh/sshd_config 2>/dev/null || echo "(使用默认配置)" echo "" echo "--- SSH 密码认证状态 ---" if grep -q "^PasswordAuthentication no" /etc/ssh/sshd_config 2>/dev/null; then echo "✅ 密码认证已禁用" elif grep -q "^PasswordAuthentication yes" /etc/ssh/sshd_config 2>/dev/null; then echo "⚠️ 密码认证已启用" else echo "⚠️ PasswordAuthentication 未显式配置(默认 yes)" fi echo "" echo "--- Root 登录 ---" grep "^PermitRootLogin" /etc/ssh/sshd_config 2>/dev/null || echo "⚠️ PermitRootLogin 未配置(默认 yes)" echo "" echo "--- 最近失败登录 (5条) ---" lastb 2>/dev/null | head -5 || echo "(无记录)" echo "" echo "--- 定时任务 ---" crontab -l 2>/dev/null || echo "(无用户 crontab)" echo "" for f in /etc/cron.d/*; do [ -f "$f" ] && echo "[$f]" && cat "$f" done echo "===SECURITY_END===" echo "" echo "===PROCESSES_START===" ps aux --sort=-%mem | head -12 echo "===PROCESSES_END===" AUDIT_SCRIPT echo "" echo "✅ 巡检数据已收集" echo "" echo "=== 巡检报告 ===" echo "" # 简要分析 grep -oP 'Local Address:Port.*?\s+\K[0-9.]+' "$TMPFILE" | sort -u | while read port; do echo " 监听: $port" done # 安全快速判定 echo "" echo "--- 快速安全判定 ---" if grep -q "3306.*0.0.0.0" "$TMPFILE"; then echo "🔴 MariaDB 3306 端口全网暴露" fi if grep -q "8888.*0.0.0.0" "$TMPFILE"; then echo "🔴 管理面板端口全网暴露" fi if ! grep -q "PasswordAuthentication no" "$TMPFILE"; then echo "⚠️ SSH 密码认证未禁用" fi if grep -q "firewalld.*inactive" "$TMPFILE"; then echo "⚠️ 防火墙未启用" fi if grep -q "selinux.*Disabled" "$TMPFILE"; then echo "⚠️ SELinux 已禁用" fi grep -q "PermitRootLogin.*yes" "$TMPFILE" && echo "⚠️ SSH 允许 Root 登录" echo "" echo "原始数据保存在: $TMPFILE"
SSH 密钥管理和远程服务器运维工具。 用于生成 SSH 密钥、部署公钥到远程主机实现免密登录、测试连接、查看远程主机信息、 以及远程执行运维命令。Use when 用户需要连接远程服务器、配置 SSH 免密登录、 管理服务器、部署应用、或在远程主机上执行命令。触发短语包括: "SSH登录"、"免密登录"、"服务...
---
name: ssh-ops
description: >
SSH 密钥管理和远程服务器运维工具。
用于生成 SSH 密钥、部署公钥到远程主机实现免密登录、测试连接、查看远程主机信息、
以及远程执行运维命令。Use when 用户需要连接远程服务器、配置 SSH 免密登录、
管理服务器、部署应用、或在远程主机上执行命令。触发短语包括:
"SSH登录"、"免密登录"、"服务器管理"、"远程部署"、"连接服务器"、"运维"。
---
# ssh-ops — SSH 密钥管理与远程运维
管理 SSH 密钥、部署免密登录、执行远程运维操作。
## 工作流程
### 1. 生成密钥(如果还没有)
```bash
bash <skill>/scripts/ssh-key-setup.sh gen
```
默认生成 `~/.ssh/id_ed25519`。如果已存在会提示。
### 2. 部署公钥到远程主机
需要密码时,设置 `SSHPASS` 环境变量:
```bash
SSHPASS='密码' bash <skill>/scripts/ssh-key-setup.sh deploy <host> [user]
```
脚本会自动安装 sshpass、使用 ssh-copy-id 部署公钥。
### 3. 测试免密登录
```bash
bash <skill>/scripts/ssh-key-setup.sh test <host> [user]
```
### 4. 查看远程主机信息
```bash
bash <skill>/scripts/ssh-key-setup.sh info <host> [user]
```
返回:主机名、系统版本、内核、内存、磁盘、负载。
## 远程执行命令
免密登录配置好后,可直接用 `ssh user@host "命令"` 执行任意远程操作:
```bash
# 查看进程
ssh root@host "ps aux | grep node"
# 安装软件
ssh root@host "apt-get update && apt-get install -y nginx"
# 传输文件
scp file.txt root@host:/tmp/
# 同步目录
rsync -avz ./dist/ root@host:/var/www/app/
```
## 安全提示
- `SSHPASS` 环境变量用完即 unset,不要持久化到文件
- 私钥(`id_ed25519`)权限必须是 600,`~/.ssh/` 权限必须是 700
- 不要在聊天记录中存储密码,使用时设环境变量
- 部署完成后验证免密登录,确认后再 unset 密码
FILE:CHANGELOG.md
# Changelog — ssh-ops
所有格式基于 [Keep a Changelog](https://keepachangelog.com/zh-CN/1.1.0/),
版本号遵循 [语义化版本](https://semver.org/lang/zh-CN/)。
---
## [0.1.0] — 2026-03-19
### 新增
- 初始版本发布
- **SSH 密钥管理脚本** `scripts/ssh-key-setup.sh`
- `gen` — 生成 ed25519 密钥对(已存在则提示)
- `pub` — 查看公钥
- `deploy` — 通过 sshpass + ssh-copy-id 部署公钥到远程主机
- `test` — BatchMode 免密登录测试
- `list` — 列出已有密钥
- `info` — 查看远程主机信息(系统/内核/内存/磁盘/负载)
- 自动检测并安装 sshpass(支持 apt-get 和 yum)
### 首次验证
- 成功生成密钥对并部署到
- 免密登录测试通过
- 远程主机信息查询正常
FILE:scripts/ssh-key-setup.sh
#!/bin/bash
# ssh-key-setup.sh — SSH 密钥管理工具
# 用法: bash ssh-key-setup.sh <command> [args]
#
# 命令:
# gen [name] — 生成密钥对(默认 id_ed25519)
# pub [name] — 查看公钥
# deploy <host> [user] — 部署公钥到远程主机(需要密码)
# test <host> [user] — 测试免密登录
# list — 列出已有密钥
# info <host> — 查看远程主机基本信息
set -e
SSH_DIR="$HOME/.ssh"
DEFAULT_KEY="id_ed25519"
KEY_TYPE="ed25519"
cmd_list() {
echo "🔑 已有的 SSH 密钥:"
ls -la "$SSH_DIR"/*.pub 2>/dev/null || echo " (无)"
}
cmd_gen() {
local name="-$DEFAULT_KEY"
local keyfile="$SSH_DIR/$name"
if [ -f "$keyfile" ]; then
echo "⚠️ 密钥 $name 已存在: $keyfile"
echo " 公钥:"
cat "keyfile.pub"
return 0
fi
mkdir -p "$SSH_DIR" && chmod 700 "$SSH_DIR"
ssh-keygen -t "$KEY_TYPE" -f "$keyfile" -N "" -C "openclaw@$(hostname)" 2>&1
echo "✅ 密钥已生成: $keyfile"
echo " 公钥:"
cat "keyfile.pub"
}
cmd_pub() {
local name="-$DEFAULT_KEY"
local keyfile="$SSH_DIR/name.pub"
if [ -f "$keyfile" ]; then
cat "$keyfile"
else
echo "❌ 公钥不存在: $keyfile"
return 1
fi
}
cmd_deploy() {
local host="$1"
local user="-root"
local keyfile="$SSH_DIR/$DEFAULT_KEY.pub"
if [ -z "$host" ]; then
echo "❌ 用法: deploy <host> [user]"
return 1
fi
if [ ! -f "$SSH_DIR/$DEFAULT_KEY" ]; then
echo "❌ 私钥不存在,请先运行 gen"
return 1
fi
if ! command -v sshpass &>/dev/null; then
echo "正在安装 sshpass..."
if command -v apt-get &>/dev/null; then
DEBIAN_FRONTEND=noninteractive apt-get install -y -qq sshpass 2>&1
elif command -v yum &>/dev/null; then
yum install -y -q sshpass 2>&1
fi
fi
echo "📤 部署公钥到 user@host ..."
sshpass -p "$SSHPASS" ssh-copy-id -o StrictHostKeyChecking=no "user@host" 2>&1
echo "✅ 公钥已部署"
}
cmd_test() {
local host="$1"
local user="-root"
if [ -z "$host" ]; then
echo "❌ 用法: test <host> [user]"
return 1
fi
echo "🔗 测试免密登录 user@host ..."
if ssh -o ConnectTimeout=5 -o BatchMode=yes "user@host" "echo '✅ 免密登录成功' && hostname" 2>&1; then
echo "✅ 连接正常"
else
echo "❌ 免密登录失败,请检查公钥是否已部署"
return 1
fi
}
cmd_info() {
local host="$1"
local user="-root"
if [ -z "$host" ]; then
echo "❌ 用法: info <host> [user]"
return 1
fi
echo "📊 远程主机信息: user@host"
ssh -o ConnectTimeout=5 "user@host" bash -c '
echo " 主机名: $(hostname)"
echo " 系统: $(cat /etc/os-release 2>/dev/null | grep PRETTY_NAME | cut -d= -f2 | tr -d \"\")"
echo " 内核: $(uname -r)"
echo " 架构: $(uname -m)"
echo " 内存: $(free -h | awk "/^Mem:/{print \$2\" total, \"\$7\" available\"}")"
echo " 磁盘: $(df -h / | awk "NR==2{print \$2\" total, \"\$4\" free (\"\$5\" used)\"}")"
echo " 负载: $(uptime | awk -F: "{print \$NF}")"
' 2>&1
}
# Main
case "-help" in
gen) shift; cmd_gen "$@" ;;
pub) shift; cmd_pub "$@" ;;
deploy) shift; cmd_deploy "$@" ;;
test) shift; cmd_test "$@" ;;
list) cmd_list ;;
info) shift; cmd_info "$@" ;;
help|*)
echo "🔑 SSH 密钥管理工具"
echo ""
echo "用法: bash ssh-key-setup.sh <command> [args]"
echo ""
echo "命令:"
echo " gen [name] 生成密钥对"
echo " pub [name] 查看公钥"
echo " deploy <host> [user] 部署公钥到远程主机(需要 SSHPASS 环境变量)"
echo " test <host> [user] 测试免密登录"
echo " list 列出已有密钥"
echo " info <host> [user] 查看远程主机信息"
;;
esac
长时程 Agent 项目工作流框架(基于 Anthropic "Effective Harnesses for Long-Running Agents")。 用于创建、管理和调度跨多个上下文窗口的长期项目任务。 Use when: 启动新项目、初始化项目工作流、管理项目任务列表、调度子Agent增量开发、 恢复项...
---
name: long-running-harness
description: >
长时程 Agent 项目工作流框架(基于 Anthropic "Effective Harnesses for Long-Running Agents")。
用于创建、管理和调度跨多个上下文窗口的长期项目任务。
Use when: 启动新项目、初始化项目工作流、管理项目任务列表、调度子Agent增量开发、
恢复项目状态、生成项目进度报告。触发短语包括:
"启动项目"、"初始化项目"、"创建工作流"、"项目进度"、"继续开发"、
"管理任务列表"、"分配任务"、"next feature"、"project status"。
---
# 长时程 Agent 工作流框架
> 基于 Anthropic 工程团队的 [Effective Harnesses for Long-Running Agents](https://www.anthropic.com/engineering/effective-harnesses-for-long-running-agents) 方法论,适配 OpenClaw 环境。
---
## 核心原则
1. **持久化优于记忆** — 用文件系统记录状态,不依赖 Agent 上下文记忆
2. **结构化优于自由文本** — 关键状态用 JSON,进度日志用 Markdown
3. **验证优于声明** — 每个功能完成后必须验证,不接受未测试的 "完成"
4. **增量优于大步** — 每次 Agent 会话只做一个功能点,保持可回滚
5. **标准化优于临时** — 固定的启动例程和结束例程,减少混乱
---
## 项目结构
每个受管理项目遵循以下标准结构:
```
projects/<project-name>/
├── PROJECT.md # 项目概述、目标、技术栈
├── progress.md # Agent 工作日志(每次会话追加)
├── features.json # 功能列表(状态追踪,仅修改 passes 字段)
├── init.sh # 环境初始化脚本(可选)
├── src/ # 项目源码
└── tests/ # 测试代码(如有)
```
---
## 生命周期
### 阶段一:初始化(Init)
当用户要求启动新项目或初始化工作流时执行:
1. **创建项目目录** `projects/<project-name>/`
2. **编写 `PROJECT.md`** — 包含:
- 项目名称和目标
- 技术栈和依赖
- 验收标准
- 关键约束
3. **编写 `features.json`** — 功能列表,格式如下:
```json
{
"project": "项目名称",
"created": "2026-03-18",
"features": [
{
"id": "feat-001",
"name": "功能名称",
"description": "功能详细描述",
"category": "functional|infra|docs|perf|fix",
"priority": "high|medium|low",
"passes": false,
"tests": [
"测试步骤 1",
"测试步骤 2"
],
"notes": ""
}
]
}
```
4. **创建 `progress.md`** — 模板:
```markdown
# 项目工作日志
## 初始化
- 日期:2026-03-18
- 初始化人:主 Agent
- 功能总数:N
```
5. **初始化 git** — `git init` + 首次提交
6. 如果适用,**编写 `init.sh`**
**重要:** 功能列表要尽量详尽,把大功能拆成小功能。200 个小功能 > 10 个大功能。
### 阶段二:增量开发(Each Session)
当用户说"继续开发"、"next feature"、"继续项目"或调度子Agent开发时:
**启动例程(每个会话必须执行):**
1. `pwd` 确认工作目录
2. `cat projects/<name>/progress.md` — 读取工作日志
3. `git log --oneline -10` — 查看最近提交
4. `cat projects/<name>/features.json` — 读取功能列表
5. 选择优先级最高且 `passes: false` 的功能
6. 运行 `init.sh`(如有)+ 基础验证测试
7. 确认环境正常后,开始实现
**工作约束:**
- 每次**只做一个功能**
- 实现完成后**必须验证**(运行测试、手动检查等)
- 验证通过后才能将 `features.json` 中对应功能的 `passes` 改为 `true`
- **禁止**删除或修改功能条目(只改 `passes` 和 `notes` 字段)
**结束例程(每个会话必须执行):**
1. 更新 `features.json` 中完成状态
2. 追加会话记录到 `progress.md`:
```markdown
## 会话 N — 日期
- **目标功能:** feat-XXX - 功能名称
- **状态:** ✅ 完成 / ⏳ 部分完成 / ❌ 失败
- **完成内容:** 具体做了什么
- **遇到的问题:** 问题描述和解决方案
- **下次继续:** 待办事项
- **Git commits:** hash - message
```
3. `git add . && git commit -m "feat: 完成功能描述"`
### 阶段三:进度报告
当用户问"项目进度"、"project status"时:
1. 读取 `features.json`
2. 统计完成率(passes: true / 总数)
3. 按优先级列出未完成功能
4. 读取 `progress.md` 最近条目
5. 生成进度摘要
输出格式:
```
📋 项目进度:项目名称
━━━━━━━━━━━━━━━━━━
✅ 完成:X / Y(Z%)
🔴 待做(高优先级):...
🟡 待做(中优先级):...
🟢 待做(低优先级):...
━━━━━━━━━━━━━━━━━━
最近会话:[简要摘要]
```
---
## 调度子Agent(sessions_spawn)
将单个功能委派给子Agent开发时,task 描述必须**自包含**:
```json
{
"task": "## 任务:实现 feat-XXX 功能\n\n### 项目信息\n- 路径:projects/project-name/\n- 技术栈:...\n\n### 你的目标\n实现以下功能并验证:\n[功能描述]\n\n### 启动例程\n1. 读取 projects/project-name/features.json 找到 feat-XXX\n2. 读取 projects/project-name/progress.md 了解历史\n3. 运行 git log --oneline -5\n4. 运行 projects/project-name/init.sh(如有)\n5. 运行基础测试确认环境正常\n\n### 工作要求\n- 只做这一个功能\n- 完成后必须验证\n- 结束时更新 features.json 的 passes 字段\n- 结束时追加 progress.md 日志\n- 结束时 git commit",
"sessionKey": "alpha",
"runTimeoutSeconds": 600
}
```
**关键:** task 必须包含所有上下文。子Agent看不到主对话历史。
---
## 定时巡检(Cron Job)
对于重要项目,可设置定时 cron job 巡检进度:
```
schedule: kind=cron, expr="0 */4 * * *"
payload: kind=agentTurn, message="读取 projects/<name>/progress.md 和 features.json,检查是否有功能卡住超过3个会话未完成。如有,输出简要报告。"
```
---
## 故障模式预防
| 故障模式 | 预防措施 |
|---------|---------|
| Agent 试图一次性做完所有功能 | 强制每次只选一个 `passes: false` 的功能 |
| Agent 过早宣布项目完成 | `features.json` 有明确的状态追踪 |
| Agent 留下的代码有 bug | 启动时运行基础测试;结束时 git commit 便于回滚 |
| Agent 花时间理解环境 | 使用 `init.sh` 标准化启动 |
| 上下文丢失导致重复工作 | `progress.md` + git log 提供完整历史 |
| 功能未真正完成就标记 passes | 要求验证后才能修改 passes 字段 |
---
## 通用领域扩展
此框架不限于软件开发。对于非代码类长期任务:
- **研究项目:** features.json 中的 tests 改为 research objectives,passes 表示研究是否完成
- **写作项目:** features 拆分为章节/段落,passes 表示是否已写完并审校
- **数据分析:** features 拆分为分析步骤,passes 表示结果是否已验证
使用不同 category 区分:`research|writing|analysis|infra|docs`
FILE:references/feature-list-template.md
# 功能列表模板(features.json)
```json
{
"project": "示例项目名称",
"created": "2026-03-18",
"features": [
{
"id": "feat-001",
"name": "用户认证 — 注册功能",
"description": "用户可以通过邮箱和密码注册账号,注册后自动登录",
"category": "functional",
"priority": "high",
"passes": false,
"tests": [
"打开注册页面,输入有效邮箱和密码",
"点击注册按钮,验证跳转到主页",
"验证数据库中已创建用户记录",
"验证用户已自动登录(能看到用户菜单)"
],
"notes": ""
},
{
"id": "feat-002",
"name": "用户认证 — 登录功能",
"description": "已注册用户可以通过邮箱和密码登录",
"category": "functional",
"priority": "high",
"passes": false,
"tests": [
"打开登录页面,输入已注册的邮箱和密码",
"点击登录,验证跳转到主页",
"验证侧边栏显示用户信息",
"验证刷新页面后仍保持登录状态"
],
"notes": ""
},
{
"id": "feat-003",
"name": "用户认证 — 密码重置",
"description": "用户可以通过邮箱接收重置链接来修改密码",
"category": "functional",
"priority": "medium",
"passes": false,
"tests": [
"点击'忘记密码'链接",
"输入注册邮箱,验证收到重置邮件",
"点击邮件中的链接,设置新密码",
"用新密码登录验证成功"
],
"notes": ""
},
{
"id": "infra-001",
"name": "数据库初始化",
"description": "创建数据库 schema,包括用户表、会话表等",
"category": "infra",
"priority": "high",
"passes": false,
"tests": [
"运行 init.sh 脚本",
"验证数据库已创建",
"验证所有表已存在且字段正确",
"验证初始数据已插入"
],
"notes": ""
},
{
"id": "docs-001",
"name": "API 文档",
"description": "编写 REST API 的 OpenAPI/Swagger 文档",
"category": "docs",
"priority": "low",
"passes": false,
"tests": [
"文档文件存在",
"每个端点都有描述",
"请求和响应示例完整"
],
"notes": ""
}
]
}
```
## 字段说明
| 字段 | 必填 | 说明 |
|------|------|------|
| id | ✅ | 唯一标识符,格式:`feat-NNN`、`infra-NNN`、`docs-NNN`、`perf-NNN`、`fix-NNN` |
| name | ✅ | 功能简短名称 |
| description | ✅ | 功能详细描述,让 Agent 知道"完成"的标准 |
| category | ✅ | 分类:functional / infra / docs / perf / fix / research / writing / analysis |
| priority | ✅ | 优先级:high / medium / low |
| passes | ✅ | 是否完成并验证通过,Agent 只能改为 true,不可删改其他字段 |
| tests | ✅ | 验证步骤列表,Agent 必须全部通过才能将 passes 改为 true |
| notes | ❌ | 备注信息,Agent 可追加遇到的问题和解决方案 |
## Category 前缀
| Category | ID 前缀 | 用途 |
|----------|---------|------|
| 功能开发 | feat- | 业务功能 |
| 基础设施 | infra- | 环境、部署、数据库 |
| 文档 | docs- | 文档编写 |
| 性能 | perf- | 性能优化 |
| 修复 | fix- | Bug 修复 |
| 研究 | research- | 调研、可行性分析 |
| 写作 | writing- | 文案、报告撰写 |
| 分析 | analysis- | 数据分析任务 |
FILE:references/init-template.md
# init.sh 模板
通用的环境初始化脚本模板。根据项目类型调整。
## Web 应用
```bash
#!/bin/bash
set -e
PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)"
cd "$PROJECT_DIR"
echo "=== 环境初始化 ==="
echo "项目目录: $PROJECT_DIR"
# 1. 安装依赖
if [ -f "package.json" ]; then
echo "安装 Node.js 依赖..."
npm install
fi
if [ -f "requirements.txt" ]; then
echo "安装 Python 依赖..."
pip install -r requirements.txt
fi
# 2. 初始化数据库
if [ -f "init_db.sh" ]; then
echo "初始化数据库..."
bash init_db.sh
fi
# 3. 启动开发服务器(后台运行)
if [ -f "package.json" ] && grep -q '"start"' package.json; then
echo "启动开发服务器..."
npm start &
sleep 5
echo "开发服务器已启动"
fi
# 4. 冒烟测试
echo "=== 冒烟测试 ==="
if command -v curl &>/dev/null; then
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:3000 2>/dev/null || echo "000")
if [ "$HTTP_CODE" = "200" ] || [ "$HTTP_CODE" = "302" ]; then
echo "✅ 服务正常运行 (HTTP $HTTP_CODE)"
else
echo "⚠️ 服务未响应或异常 (HTTP $HTTP_CODE)"
fi
fi
echo "=== 初始化完成 ==="
```
## Python 项目
```bash
#!/bin/bash
set -e
PROJECT_DIR="$(cd "$(dirname "$0")/.." && pwd)"
cd "$PROJECT_DIR"
echo "=== 环境初始化 ==="
# 创建虚拟环境(如不存在)
if [ ! -d ".venv" ]; then
python3 -m venv .venv
fi
source .venv/bin/activate
# 安装依赖
pip install -q -r requirements.txt
# 运行测试
if [ -f "tests/" ] || [ -f "test_"*.py ]; then
echo "=== 运行测试 ==="
python -m pytest tests/ -v --tb=short 2>&1 | tail -20
fi
echo "=== 初始化完成 ==="
```
## 使用说明
- init.sh 放在项目根目录
- 每次会话启动时由 Agent 运行
- 目的是让 Agent 快速确认环境正常,而不是花时间摸索配置
- 对于不需要初始化的项目(如纯文档项目),可以省略
FILE:references/progress-log-template.md
# 进度日志模板(progress.md)
```markdown
# 项目工作日志
## 初始化
- **日期:** 2026-03-18
- **初始化人:** 主 Agent
- **项目目标:** [一句话描述项目目标]
- **技术栈:** [列出主要技术]
- **功能总数:** N(high: X, medium: Y, low: Z)
---
## 会话 1 — 2026-03-18
- **Agent:** alpha(子Agent)
- **目标功能:** feat-001 - 功能名称
- **状态:** ✅ 完成
- **完成内容:**
- 实现了 xxx
- 添加了 xxx 测试
- **遇到的问题:**
- 问题描述 → 解决方案
- **下次继续:** feat-002
- **Git commits:**
- `a1b2c3d` feat: 实现 feat-001 功能名称
```
## 书写规范
- 每次会话追加一个 `## 会话 N` 块,不要修改历史条目
- 状态只能用:✅ 完成 / ⏳ 部分完成 / ❌ 失败
- Git commits 写 hash(前7位)+ commit message
- "下次继续"指明下一个要做的功能 ID,加速下次会话启动
- 如果会话因超时或错误中断,记录到"遇到的问题"中
安防摄像头视频 VL 模型微调数据集标注工具。用于从安防摄像头视频中提取关键帧、分析视频内容、生成结构化标注(含环境/人物/行为/风险描述),并输出符合 dataset.jsonl 格式的微调训练数据。Use when 用户需要对安防摄像头视频进行数据标注、生成 VL 模型训练数据集、处理 /root/hair-...
---
name: hair-cam-anno
description: 安防摄像头视频 VL 模型微调数据集标注工具。用于从安防摄像头视频中提取关键帧、分析视频内容、生成结构化标注(含环境/人物/行为/风险描述),并输出符合 dataset.jsonl 格式的微调训练数据。Use when 用户需要对安防摄像头视频进行数据标注、生成 VL 模型训练数据集、处理 /root/hair-cam 目录下的视频数据,或提及 "hair-cam"、"数据标注"、"视频标注"、"VL模型微调"。
---
# hair-cam-anno — 安防摄像头视频标注
对安防摄像头拍摄的视频进行帧提取、视觉分析、结构化标注,输出 `dataset.jsonl` 格式的 VL 模型微调数据集。
## 工作流程
### 第1步:提取视频帧
```bash
python3 <skill>/scripts/extract_frames.py \
--data-dir <视频目录> \
--output-dir <帧输出目录> \
--fps 0.5 \
--max-frames 4
```
- 从每个视频均匀提取 4 帧(每2秒一帧)
- 生成 `manifest.json` 记录每个视频的元信息和帧路径
### 第2步:逐视频分析标注
对每个视频:
1. **查看提取的帧**:用 `read` 工具读取帧图片(支持 jpg/png)
2. **从文件名推断信息**:文件名包含关键信息(如 `海尔摄像头-1男1女-坐-2` → 品牌=海尔摄像头, 1男1女, 行为=坐)
3. **生成标注 JSON**:根据帧画面内容 + 文件名信息,生成结构化标注
标注 JSON 结构:
```json
{
"title": "场景标题",
"subtitle": "场景副标题",
"description": "详细描述(≥50字,含环境、人物外貌、行为姿态)",
"labels": ["system_suggest_X", ...],
"risk": {
"level": "none|low|medium|high",
"description": "风险描述"
},
"simple_description": "简练描述(≤20汉字)"
}
```
### 第3步:汇总生成 dataset.jsonl
1. 将所有标注结果收集到 `annotations.json`,格式:
```json
[
{"video": "文件名.mp4", "annotation": { ...标注JSON... }},
...
]
```
2. 运行构建脚本:
```bash
python3 <skill>/scripts/build_jsonl.py \
--annotations annotations.json \
--video-dir <视频目录> \
--output dataset.jsonl
```
3. 脚本会自动验证标注数据并生成 `dataset.jsonl`
## 关键参考
- **System prompt 模板**: `references/system-prompt.md`
- **标签范围**: `references/labels-reference.md`
## 标签选择规则
- 根据视频实际内容选择匹配标签
- 可多选,但不要选不匹配的标签
- 如果视频中有危险行为(儿童攀爬窗户、摔倒等),risk.level 应为 medium 或 high
- 文件名中的信息(人数、行为)必须与标注一致
FILE:references/labels-reference.md
# 标签范围与风险等级参考
## labels 标签定义
| 标签 | 含义 |
|------|------|
| system_suggest_0 | No match.(无匹配场景) |
| system_suggest_1 | Someone appears.(有人出现) |
| system_suggest_2 | Multiple people appear.(多人出现) |
| system_suggest_3 | Child or infant appears.(儿童或婴儿出现) |
| system_suggest_4 | Elderly person appears.(老人出现) |
| system_suggest_5 | Animal appears.(动物出现) |
| system_suggest_6 | Suspicious behavior detected.(可疑行为) |
| system_suggest_7 | Person lying down.(有人躺卧) |
| system_suggest_8 | Person running.(有人在跑) |
| system_suggest_9 | Person climbing.(有人在攀爬) |
| system_suggest_10 | Fall detected.(检测到摔倒) |
| system_suggest_11 | Delivery person detected.(检测到快递员/外卖员) |
| system_suggest_12 | Family interaction.(家人互动) |
| system_suggest_13 | Household chore.(做家务) |
| system_suggest_14 | Package/parcel detected.(检测到快递包裹) |
## risk.level 风险等级定义
| 等级 | 含义 | 典型场景 |
|------|------|----------|
| none | 无风险 | 正常日常活动,家人互动,做家务 |
| low | 低风险 | 可疑但不确定的行为,非危险区域的攀爬 |
| medium | 中风险 | 儿童靠近窗户/阳台边缘,老人独处出现异常姿态 |
| high | 高风险 | 摔倒,儿童攀爬窗户/围栏,陌生人闯入 |
## 数据集类别要求
### 物体(每物体50条)
- 人:婴儿,儿童,老人,男成人,女成人,快递员、外卖员
- 动物:猫,狗
- 物体:快递盒,扫地机器人、吸尘器、床、桌子、椅子、凳子、沙发、门、窗帘、冰箱、洗衣机、其他家具
### 行为动作(每动作35条)
- 人的动作:躺卧,下蹲,爬行,攀爬,追逐、弯腰,徘徊,快速跑,吃饭、看电视、聊天、家人互动(拥抱、递东西、打闹、追逐)、做家务(扫地、拖地、擦桌子、洗菜、切菜、做饭、洗衣服、取出衣服、晾衣服、收衣服)
- 儿童婴儿动作:翻身、爬行、蹒跚学步、玩玩具、吃饭、吃手、摔倒,儿童/婴儿爬窗户,围栏
- 老人动作:浇花、看电视、看报纸、打太极、静坐、静卧、走动、普通活动
- 动物的动作:进食,饮水,猫抓挠家居,跑酷,跳跃,追尾巴,歪头杀,守在门口、攀爬围栏/窗户、撕咬、其他事件
### 环境(每环境30条)
- 室内:客厅/厨房/卫生间/阳台/门口/庭院/超市/书房/办公室/走廊
- 户外:停车场/花园/池塘/果园/农田/蔬菜大棚/鸡圈/羊圈/牛圈
FILE:references/system-prompt.md
# System Prompt 模板
用于 dataset.jsonl 中每条数据的 system prompt。所有数据共用同一个 system prompt。
---
```
你是一个安防摄像头视频内容解析专家。你的任务是分析安防摄像头拍摄的视频,输出结构化的场景描述。
## 分析步骤
1. 观察视频中的环境(室内/户外,具体位置)
2. 识别出现的人物(数量、性别、年龄段:婴儿/儿童/老人/成人)
3. 识别出现的动物(猫、狗等)
4. 分析人物/动物的行为动作和姿态
5. 评估是否存在安全风险
6. 生成简练的一句话描述
## 输出格式
你必须输出一个严格的JSON对象,包含以下字段:
```json
{
"title": "场景标题",
"subtitle": "场景副标题(具体行为描述)",
"description": "详细描述(包含环境、人物外貌特征、行为动作及姿态,至少50字)",
"labels": ["从下方标签列表中选择匹配的标签"],
"risk": {
"level": "none/low/medium/high",
"description": "风险描述,如无风险则为'当前场景无异常风险'"
},
"simple_description": "简练描述(不超过20个汉字)"
}
```
## labels 字段可选标签范围
从以下标签中选择所有匹配项:
- `system_suggest_0`: No match.(无匹配场景)
- `system_suggest_1`: Someone appears.(有人出现)
- `system_suggest_2`: Multiple people appear.(多人出现)
- `system_suggest_3`: Child or infant appears.(儿童或婴儿出现)
- `system_suggest_4`: Elderly person appears.(老人出现)
- `system_suggest_5`: Animal appears.(动物出现)
- `system_suggest_6`: Suspicious behavior detected.(可疑行为)
- `system_suggest_7`: Person lying down.(有人躺卧)
- `system_suggest_8`: Person running.(有人在跑)
- `system_suggest_9`: Person climbing.(有人在攀爬)
- `system_suggest_10`: Fall detected.(检测到摔倒)
- `system_suggest_11`: Delivery person detected.(检测到快递员/外卖员)
- `system_suggest_12`: Family interaction.(家人互动)
- `system_suggest_13`: Household chore.(做家务)
- `system_suggest_14`: Package/parcel detected.(检测到快递包裹)
## 约束条件
- 所有字段均不能为空
- `simple_description` 不超过20个汉字
- `labels` 必须从上述标签列表中选择
- `risk.level` 必须是 none/low/medium/high 之一
- `description` 要尽量详细,包含家庭中环境、人、宠外貌特征、行为及姿态
- 着重婴儿儿童看护、老人关照、成人日常行为
```
FILE:scripts/build_jsonl.py
#!/usr/bin/env python3
"""
hair-cam-anno: dataset.jsonl 生成工具
将视频标注结果汇总为符合 VL 模型微调要求的 dataset.jsonl 格式。
"""
import argparse
import json
import os
import sys
from pathlib import Path
SYSTEM_PROMPT = """你是一个安防摄像头视频内容解析专家。你的任务是分析安防摄像头拍摄的视频,输出结构化的场景描述。
## 分析步骤
1. 观察视频中的环境(室内/户外,具体位置)
2. 识别出现的人物(数量、性别、年龄段:婴儿/儿童/老人/成人)
3. 识别出现的动物(猫、狗等)
4. 分析人物/动物的行为动作和姿态
5. 评估是否存在安全风险
6. 生成简练的一句话描述
## 输出格式
你必须输出一个严格的JSON对象,包含以下字段:
```json
{
"title": "场景标题",
"subtitle": "场景副标题(具体行为描述)",
"description": "详细描述(包含环境、人物外貌特征、行为动作及姿态,至少50字)",
"labels": ["从下方标签列表中选择匹配的标签"],
"risk": {
"level": "none/low/medium/high",
"description": "风险描述,如无风险则为'当前场景无异常风险'"
},
"simple_description": "简练描述(不超过20个汉字)"
}
```
## labels 字段可选标签范围
从以下标签中选择所有匹配项:
- `system_suggest_0`: No match.(无匹配场景)
- `system_suggest_1`: Someone appears.(有人出现)
- `system_suggest_2`: Multiple people appear.(多人出现)
- `system_suggest_3`: Child or infant appears.(儿童或婴儿出现)
- `system_suggest_4`: Elderly person appears.(老人出现)
- `system_suggest_5`: Animal appears.(动物出现)
- `system_suggest_6`: Suspicious behavior detected.(可疑行为)
- `system_suggest_7`: Person lying down.(有人躺卧)
- `system_suggest_8`: Person running.(有人在跑)
- `system_suggest_9`: Person climbing.(有人在攀爬)
- `system_suggest_10`: Fall detected.(检测到摔倒)
- `system_suggest_11`: Delivery person detected.(检测到快递员/外卖员)
- `system_suggest_12`: Family interaction.(家人互动)
- `system_suggest_13`: Household chore.(做家务)
- `system_suggest_14`: Package/parcel detected.(检测到快递包裹)
## 约束条件
- 所有字段均不能为空
- `simple_description` 不超过20个汉字
- `labels` 必须从上述标签列表中选择
- `risk.level` 必须是 none/low/medium/high 之一
- `description` 要尽量详细,包含家庭中环境、人、宠外貌特征、行为及姿态
- 着重婴儿儿童看护、老人关照、成人日常行为"""
# Valid labels set
VALID_LABELS = {
"system_suggest_0", "system_suggest_1", "system_suggest_2",
"system_suggest_3", "system_suggest_4", "system_suggest_5",
"system_suggest_6", "system_suggest_7", "system_suggest_8",
"system_suggest_9", "system_suggest_10", "system_suggest_11",
"system_suggest_12", "system_suggest_13", "system_suggest_14",
}
VALID_RISK_LEVELS = {"none", "low", "medium", "high"}
def validate_annotation(annotation: dict) -> list[str]:
"""验证单条标注数据,返回错误列表。"""
errors = []
# Check required fields
required = ["title", "subtitle", "description", "labels", "risk", "simple_description"]
for field in required:
if field not in annotation:
errors.append(f"缺少字段: {field}")
elif not annotation[field]:
errors.append(f"字段为空: {field}")
# Validate labels
if "labels" in annotation:
if not isinstance(annotation["labels"], list):
errors.append("labels 必须是数组")
else:
for label in annotation["labels"]:
if label not in VALID_LABELS:
errors.append(f"无效标签: {label}")
# Validate risk
if "risk" in annotation and isinstance(annotation["risk"], dict):
level = annotation["risk"].get("level", "")
if level not in VALID_RISK_LEVELS:
errors.append(f"无效风险等级: {level}")
if "description" not in annotation["risk"] or not annotation["risk"]["description"]:
errors.append("risk.description 不能为空")
# Validate simple_description length
if "simple_description" in annotation and len(annotation["simple_description"]) > 20:
errors.append(f"simple_description 超过20字: {len(annotation['simple_description'])}字")
# Validate description length
if "description" in annotation and len(annotation["description"]) < 50:
errors.append(f"description 不足50字: {len(annotation['description'])}字")
return errors
def build_jsonl_entry(video_rel_path: str, annotation: dict) -> dict:
"""构建一条 dataset.jsonl 记录。"""
assistant_content = json.dumps(annotation, ensure_ascii=False, indent=2)
return {
"messages": [
{
"role": "system",
"content": SYSTEM_PROMPT
},
{
"role": "assistant",
"content": assistant_content
}
],
"images": [video_rel_path]
}
def main():
parser = argparse.ArgumentParser(description="dataset.jsonl 生成工具")
parser.add_argument("--annotations", required=True, help="标注结果JSON文件(annotations.json)")
parser.add_argument("--video-dir", required=True, help="视频目录(用于计算相对路径)")
parser.add_argument("--output", required=True, help="输出 dataset.jsonl 路径")
parser.add_argument("--validate", action="store_true", default=True, help="验证标注数据(默认开启)")
args = parser.parse_args()
with open(args.annotations, "r", encoding="utf-8") as f:
annotations = json.load(f)
records = []
total = len(annotations)
errors_all = 0
for i, item in enumerate(annotations):
video_name = item["video"]
annotation = item["annotation"]
# Compute relative path
video_rel_path = os.path.join("data", video_name)
# Validate
if args.validate:
errs = validate_annotation(annotation)
if errs:
errors_all += 1
print(f" ⚠ {video_name}: {errs}")
# Still include but note issues
entry = build_jsonl_entry(video_rel_path, annotation)
records.append(entry)
# Write JSONL
os.makedirs(os.path.dirname(os.path.abspath(args.output)), exist_ok=True)
with open(args.output, "w", encoding="utf-8") as f:
for record in records:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
print(f"\n生成完成: {args.output}")
print(f" 总计: {total} 条")
if errors_all:
print(f" ⚠ 有 {errors_all} 条存在验证问题")
else:
print(f" ✅ 全部通过验证")
if __name__ == "__main__":
main()
FILE:scripts/extract_frames.py
#!/usr/bin/env python3
"""
hair-cam-anno: 视频帧提取工具
从安防摄像头视频中提取关键帧,用于后续标注。
"""
import argparse
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
def extract_frames(video_path: str, output_dir: str, fps: float = 0.5, max_frames: int = 4) -> list[str]:
"""从视频中按固定间隔提取帧。"""
os.makedirs(output_dir, exist_ok=True)
base = Path(video_path).stem
frame_paths = []
for i in range(max_frames):
out_path = os.path.join(output_dir, f"{base}_f{i+1}.jpg")
# Extract frame at time offset
timestamp = i / fps
cmd = [
"ffmpeg", "-y", "-ss", str(timestamp), "-i", video_path,
"-frames:v", "1", "-q:v", "2", out_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0 and os.path.exists(out_path):
frame_paths.append(out_path)
else:
break
return frame_paths
def get_video_info(video_path: str) -> dict:
"""获取视频元信息。"""
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height,duration,nb_frames,codec_name,r_frame_rate",
"-of", "json", video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
data = json.loads(result.stdout)
if data.get("streams"):
return data["streams"][0]
return {}
def scan_videos(data_dir: str) -> list[str]:
"""扫描目录下的所有视频文件。"""
extensions = {".mp4", ".avi", ".mkv", ".mov", ".flv", ".wmv", ".webm"}
videos = []
for f in sorted(os.listdir(data_dir)):
if Path(f).suffix.lower() in extensions:
videos.append(os.path.join(data_dir, f))
return videos
def main():
parser = argparse.ArgumentParser(description="视频帧提取工具")
parser.add_argument("--data-dir", required=True, help="视频数据目录")
parser.add_argument("--output-dir", required=True, help="帧输出目录")
parser.add_argument("--fps", type=float, default=0.5, help="提取帧率(默认0.5帧/秒)")
parser.add_argument("--max-frames", type=int, default=4, help="每个视频最多提取帧数(默认4)")
parser.add_argument("--info", action="store_true", help="仅显示视频信息,不提取帧")
args = parser.parse_args()
videos = scan_videos(args.data_dir)
if not videos:
print("未找到视频文件", file=sys.stderr)
sys.exit(1)
print(f"找到 {len(videos)} 个视频文件")
manifest = []
for vp in videos:
info = get_video_info(vp)
entry = {
"path": vp,
"filename": os.path.basename(vp),
**info
}
manifest.append(entry)
if args.info:
print(f" {os.path.basename(vp)}: {info.get('width','?')}x{info.get('height','?')} "
f"duration={info.get('duration','?')}s frames={info.get('nb_frames','?')}")
else:
out_dir = os.path.join(args.output_dir, Path(vp).stem)
frames = extract_frames(vp, out_dir, args.fps, args.max_frames)
entry["frames"] = frames
print(f" {os.path.basename(vp)}: 提取 {len(frames)} 帧 → {out_dir}")
# Save manifest
manifest_path = os.path.join(args.output_dir, "manifest.json")
os.makedirs(args.output_dir, exist_ok=True)
with open(manifest_path, "w", encoding="utf-8") as f:
json.dump(manifest, f, ensure_ascii=False, indent=2)
print(f"\n清单已保存: {manifest_path}")
if __name__ == "__main__":
main()
使用豆包 Seedream 模型文生图,支持并发批量生成,输出图库预览页
---
name: doubao-image-gen
description: "使用豆包 Seedream 模型文生图,支持并发批量生成,输出图库预览页"
description_en: "Text-to-image generation using Doubao Seedream model, supports concurrent batch generation with gallery output"
allowed-tools: Read,Write,Bash
---
# 豆包文生图 (Doubao Image Gen)
使用火山引擎豆包 `doubao-seedream-5-0-260128` 模型,根据文字描述生成高质量图像,支持并发批量生成多张图片,并输出图库预览页面。
## 环境要求
- Python 3.8+
- openai 库:`pip install "openai>=1.0"`
## Setup — API Key 配置
API Key 读取优先级(从高到低):
1. `--api-key` 命令行参数
2. 环境变量 `ARK_API_KEY`
3. 用户目录 `~/.doubao-image-gen/.env` 文件中的 `ARK_API_KEY=xxx`
获取 API Key:登录 [火山方舟控制台](https://console.volcengine.com/ark) → API Key 管理
## Run
```bash
# 生成单张图片
python {baseDir}/scripts/gen.py --prompt "赛博朋克风格的上海夜景" --api-key YOUR_KEY
# 并发生成4张(默认并发数=4)
python {baseDir}/scripts/gen.py --prompt "水墨风格的山水画" --count 4 --api-key YOUR_KEY
# 指定尺寸(支持 1024x1024 / 2K / 1280x720 / 720x1280 / 2048x2048)
python {baseDir}/scripts/gen.py --prompt "星空下的草原" --size 2K --api-key YOUR_KEY
# 指定输出目录
python {baseDir}/scripts/gen.py --prompt "古风仙侠" --out-dir ./output --api-key YOUR_KEY
# 从环境变量读取 Key(推荐)
python {baseDir}/scripts/gen.py --prompt "未来城市" --count 2
```
## 参数说明
| 参数 | 默认值 | 说明 |
|------|--------|------|
| `--prompt` | 必填 | 图像描述提示词 |
| `--count` | 1 | 生成数量(并发执行) |
| `--size` | `2K` | 图像尺寸 |
| `--model` | `doubao-seedream-5-0-260128` | 模型名称 |
| `--out-dir` | `./doubao-output-{时间戳}` | 输出目录 |
| `--api-key` | 环境变量 | ARK API Key |
| `--workers` | 4 | 并发线程数 |
| `--watermark` | False | 是否添加水印 |
| `--dry-run` | False | 仅打印参数不调用 API |
## Output
- `*.jpeg` 图像文件(按序号命名)
- `prompts.json` 提示词与文件的映射记录
- `index.html` 图库预览页面(可直接在浏览器打开)
## AI 使用指引
当用户说以下内容时,加载本技能并调用脚本:
- "帮我画一张..." / "生成一张..." / "画个图..."
- "批量生成 N 张图片"
- "用豆包生成图片"
**标准流程:**
1. 提取或优化用户的提示词(必要时翻译为英文以提升质量)
2. 调用 `python {baseDir}/scripts/gen.py` 生成图片
3. 生成完成后,**直接在聊天中以 Markdown 图片形式发送给用户**:``
4. 同时提供 `index.html` 预览链接供浏览
**示例 Prompt 优化:**
用户说"画一只猫"→ 优化为 "A cute cat sitting gracefully, soft studio lighting, photorealistic, 8K detail"
FILE:CHANGELOG.md
# Changelog
All notable changes to **doubao-image-gen** will be documented in this file.
---
## [1.0.0] - 2026-03-17
### Added
- 初始版本发布
- 基于火山引擎豆包 `doubao-seedream-5-0-260128` 模型实现文生图能力
- 支持并发批量生成,默认 4 线程同时调用 API(`--count` + `--workers` 参数)
- 支持多种图像尺寸:`1024x1024` / `2K` / `4K` / `1280x720` / `720x1280` / `2048x2048` 等
- API Key 三级读取机制:CLI 参数 → 环境变量 `ARK_API_KEY` → `~/.doubao-image-gen/.env` 文件
- 生成结果自动下载为 `.jpeg` 并按序号命名
- 自动输出 `prompts.json`(提示词与文件映射记录)
- 自动生成 `index.html` 图库预览页,暗色赛博风 UI,支持悬停动效
- `--dry-run` 模式:仅打印参数不消耗 API 调用
- `--watermark` 开关:默认关闭水印
- 输出 `GENERATED_IMAGE: 路径` 格式,便于 AI 直接引用图片发送到聊天
- 完整的 AI 使用指引:触发词识别、提示词优化建议、标准调用流程
### Notes
- 模型名称 `doubao-seedream-5-0-260128` 无需在火山方舟控制台创建推理接入点,直接用 API Key 即可调用
- 依赖:`openai>=1.0`、`requests`
FILE:scripts/gen.py
#!/usr/bin/env python3
"""
doubao-image-gen — 豆包 Seedream 文生图脚本
使用火山引擎 ARK API,支持并发批量生成图片,输出图库预览页。
用法:
python gen.py --prompt "赛博朋克龙虾" --count 4 --api-key YOUR_KEY
python gen.py --prompt "水墨山水" --size 2K --out-dir ./output
"""
import argparse
import datetime
import json
import os
import re
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
# ─── 常量 ────────────────────────────────────────────────────────────────────
DEFAULT_MODEL = "doubao-seedream-5-0-260128"
DEFAULT_SIZE = "2K"
DEFAULT_WORKERS = 4
ARK_BASE_URL = "https://ark.cn-beijing.volces.com/api/v3"
VALID_SIZES = [
"1024x1024", "2K", "1280x720", "720x1280",
"2048x2048", "2048x1152", "1152x2048", "4K",
]
# ─── 工具函数 ─────────────────────────────────────────────────────────────────
def _timestamp() -> str:
return datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
def _slug(text: str, max_len: int = 40) -> str:
"""将提示词转为安全文件名片段"""
s = re.sub(r"[^\w\u4e00-\u9fff]+", "-", text).strip("-")
return s[:max_len] or "image"
def _load_api_key(cli_key: str | None) -> str:
"""按优先级读取 API Key:CLI 参数 > 环境变量 > ~/.doubao-image-gen/.env"""
if cli_key:
return cli_key
if key := os.environ.get("ARK_API_KEY"):
return key
env_file = Path.home() / ".doubao-image-gen" / ".env"
if env_file.exists():
for line in env_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line.startswith("ARK_API_KEY="):
return line.split("=", 1)[1].strip().strip('"').strip("'")
return ""
def _write_gallery(out_dir: Path, items: list[dict]) -> Path:
"""生成图库预览 HTML"""
html_items = ""
for it in items:
fname = it["file"]
prompt_escaped = it["prompt"].replace("<", "<").replace(">", ">")
html_items += f"""
<div class="card">
<a href="{fname}" target="_blank">
<img src="{fname}" alt="{prompt_escaped}" loading="lazy">
</a>
<div class="meta">
<div class="prompt">{prompt_escaped}</div>
<div class="info">{it.get('model','')} · {it.get('size','')} · {it.get('index','')}</div>
</div>
</div>"""
html = f"""<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>豆包文生图 · 图库</title>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: 'PingFang SC', 'Microsoft YaHei', system-ui, sans-serif;
background: #0d1117; color: #e6edf3;
padding: 32px 24px;
}}
h1 {{
font-size: 22px; font-weight: 600; margin-bottom: 8px;
color: #60A5FA;
}}
.subtitle {{ font-size: 13px; color: #8b949e; margin-bottom: 28px; }}
.grid {{
display: grid;
grid-template-columns: repeat(auto-fill, minmax(280px, 1fr));
gap: 20px;
}}
.card {{
background: #161b22; border: 1px solid #30363d;
border-radius: 12px; overflow: hidden;
transition: transform .2s, box-shadow .2s;
}}
.card:hover {{
transform: translateY(-4px);
box-shadow: 0 8px 32px rgba(96,165,250,0.2);
}}
.card img {{
width: 100%; display: block;
aspect-ratio: 1 / 1; object-fit: cover;
}}
.meta {{
padding: 12px 14px;
}}
.prompt {{
font-size: 13px; line-height: 1.6; color: #c9d1d9;
display: -webkit-box; -webkit-line-clamp: 3;
-webkit-box-orient: vertical; overflow: hidden;
}}
.info {{
font-size: 11px; color: #8b949e; margin-top: 8px;
}}
.stats {{
font-size: 13px; color: #8b949e;
margin-bottom: 20px;
}}
</style>
</head>
<body>
<h1>🦞 豆包文生图 · 图库</h1>
<div class="subtitle">由 doubao-image-gen skill 生成 · {_timestamp()}</div>
<div class="stats">共 {len(items)} 张图片</div>
<div class="grid">{html_items}
</div>
</body>
</html>"""
index_path = out_dir / "index.html"
index_path.write_text(html, encoding="utf-8")
return index_path
# ─── 核心生成函数 ──────────────────────────────────────────────────────────────
def generate_one(
index: int,
prompt: str,
model: str,
size: str,
api_key: str,
out_dir: Path,
watermark: bool,
) -> dict:
"""调用豆包 API 生成单张图片,下载并保存"""
try:
from openai import OpenAI
except ImportError:
print("请先安装 openai 库:pip install 'openai>=1.0'", file=sys.stderr)
sys.exit(1)
client = OpenAI(base_url=ARK_BASE_URL, api_key=api_key)
print(f" [{index:02d}] 生成中...", flush=True)
resp = client.images.generate(
model=model,
prompt=prompt,
size=size,
response_format="url",
extra_body={"watermark": watermark},
)
url = resp.data[0].url
print(f" [{index:02d}] 下载图片...", flush=True)
import requests as req_lib
r = req_lib.get(url, timeout=60)
r.raise_for_status()
filename = f"{index:02d}-{_slug(prompt)}.jpeg"
save_path = out_dir / filename
save_path.write_bytes(r.content)
print(f" [{index:02d}] ✓ 保存: {filename} ({len(r.content)//1024}KB)", flush=True)
return {
"index": f"#{index:02d}",
"file": filename,
"path": str(save_path),
"prompt": prompt,
"model": model,
"size": size,
"url": url,
}
# ─── 主函数 ───────────────────────────────────────────────────────────────────
def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(
prog="doubao-image-gen",
description="豆包 Seedream 文生图 — 支持并发批量生成",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python gen.py --prompt "赛博朋克龙虾" --api-key YOUR_KEY
python gen.py --prompt "水墨山水画" --count 4 --size 2K
python gen.py --prompt "未来城市夜景" --out-dir ./my-images --workers 2
""",
)
parser.add_argument("--prompt", "-p", required=True, help="图像描述提示词")
parser.add_argument("--count", "-n", type=int, default=1, help="生成数量(默认1)")
parser.add_argument("--size", "-s", default=DEFAULT_SIZE,
choices=VALID_SIZES, help=f"图像尺寸(默认 {DEFAULT_SIZE})")
parser.add_argument("--model", "-m", default=DEFAULT_MODEL, help="模型名称")
parser.add_argument("--out-dir", "-o", default=None, help="输出目录")
parser.add_argument("--api-key", "-k", default=None, help="ARK API Key")
parser.add_argument("--workers", "-w", type=int, default=DEFAULT_WORKERS,
help=f"并发线程数(默认 {DEFAULT_WORKERS})")
parser.add_argument("--watermark", action="store_true", default=False,
help="添加水印(默认关闭)")
parser.add_argument("--dry-run", action="store_true",
help="仅打印参数,不调用 API")
args = parser.parse_args(argv)
# 读取 API Key
api_key = _load_api_key(args.api_key)
if not api_key and not args.dry_run:
print("❌ 未找到 API Key,请通过以下方式之一提供:", file=sys.stderr)
print(" 1. --api-key YOUR_KEY", file=sys.stderr)
print(" 2. 环境变量 ARK_API_KEY=YOUR_KEY", file=sys.stderr)
print(" 3. ~/.doubao-image-gen/.env 文件写入 ARK_API_KEY=YOUR_KEY", file=sys.stderr)
return 2
# 输出目录
out_dir = Path(args.out_dir) if args.out_dir else Path(f"doubao-output-{_timestamp()}")
out_dir.mkdir(parents=True, exist_ok=True)
# dry-run 模式
if args.dry_run:
print("=== DRY RUN ===")
print(f"prompt : {args.prompt}")
print(f"count : {args.count}")
print(f"size : {args.size}")
print(f"model : {args.model}")
print(f"workers : {args.workers}")
print(f"out_dir : {out_dir}")
return 0
print(f"🚀 开始生成 {args.count} 张图片(并发数: {min(args.workers, args.count)})")
print(f" 模型: {args.model} | 尺寸: {args.size} | 输出: {out_dir}\n")
items = []
errors = []
workers = min(args.workers, args.count)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(
generate_one,
i, args.prompt, args.model, args.size,
api_key, out_dir, args.watermark,
): i
for i in range(1, args.count + 1)
}
for future in as_completed(futures):
idx = futures[future]
try:
result = future.result()
items.append(result)
except Exception as e:
errors.append((idx, str(e)))
print(f" [{idx:02d}] ❌ 失败: {e}", file=sys.stderr)
# 按序号排序
items.sort(key=lambda x: x["index"])
# 保存 prompts.json
json_path = out_dir / "prompts.json"
json_path.write_text(
json.dumps(items, ensure_ascii=False, indent=2), encoding="utf-8"
)
# 生成图库预览
if items:
index_path = _write_gallery(out_dir, items)
print(f"\n✅ 完成!生成 {len(items)} 张,失败 {len(errors)} 张")
print(f" 输出目录: {out_dir}")
print(f" 图库预览: {index_path}")
# 输出图片路径供 AI 直接引用
print("\n--- 图片路径 ---")
for it in items:
print(f"GENERATED_IMAGE: {it['path']}")
else:
print("\n❌ 全部生成失败", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))