@clawhub-laoyutang-6462afd61d
Manage multi-project todos with SQLite, supporting subtasks, priority/urgency levels, keyword search, time filters, smart sorting, and scheduled reminders.
# Todo Skill - 待办事项管理
SQLite 驱动的待办事项管理 skill,支持多项目、子待办、重要/紧急程度、定时提醒。
## 功能特性
1. **多项目管理** - 创建工作、生活等多个待办项目
2. **子待办支持** - 待办可有子待办,子待办完成不自动完成父待办
3. **重要程度** - 1=普通, 2=重要🟠, 3=紧急🔴
4. **紧急程度** - 1=普通, 2=重要🔥, 3=紧急🔥🔥
5. **时间筛选** - 按创建时间、完成时间筛选
6. **搜索待办** - 按关键词搜索标题和备注
7. **智能排序** - 默认按紧急 > 重要 > 普通排序
8. **定时提醒** - 支持定时发送未完成待办
## 数据库位置
`~/.openclaw/workspace/data/todo.db`
## 使用方式
### 项目管理
```
# 创建项目
todo create-project <项目名>
例如:todo create-project 工作
例如:todo create-project 生活
# 列出所有项目
todo list-projects
# 删除项目(会删除项目下所有待办)
todo delete-project <项目名>
```
### 添加待办
```
# 基本添加
todo add <项目名> <待办内容>
# 指定重要程度(-i/--importance: 1=普通, 2=重要, 3=紧急)
todo add 工作 "完成报告" -i 3
# 指定紧急程度(-u/--urgency: 1=普通, 2=重要, 3=紧急)
todo add 工作 "紧急修复bug" -i 3 -u 3
# 指定截止日期(-d/--due: YYYY-MM-DD 或 YYYY-MM-DD HH:MM)
todo add 工作 "提交周报" -d 2026-03-25
# 添加子待办(-p/--parent: 父待办ID)
todo add 工作 "写测试用例" -p 5
# 添加备注(-n/--note)
todo add 工作 "代码审查" -n "重点关注性能问题"
```
### 完成待办
```
# 完成待办
todo done <待办ID>
# 取消完成(重新打开)
todo undo <待办ID>
```
### 查看待办
```
# 查看项目下所有待办
todo list <项目名>
# 查看所有未完成待办
todo list-all
# 只看未完成
todo list-all --pending
# 或
todo list-all -p
# 只看已完成
todo list-all --completed
# 或
todo list-all -c
# 按重要程度筛选
todo list-all --importance 3
# 按紧急程度筛选
todo list-all --urgency 3
# 按时间范围筛选
todo list-all --from 2026-03-01 --to 2026-03-31
# 查看单个待办详情(含子待办)
todo show <待办ID>
```
### 编辑待办
```
# 修改标题
todo edit <待办ID> --title "新标题"
# 修改重要程度
todo edit <待办ID> --importance critical
# 修改紧急程度
todo edit <待办ID> --urgency high
# 修改截止日期
todo edit <待办ID> --due 2026-03-30
# 修改备注
todo edit <待办ID> --note "新的备注"
# 移动到其他项目
todo edit <待办ID> --project 生活
```
### 删除待办
```
# 删除待办(会同时删除子待办)
todo delete <待办ID>
```
### 统计
```
# 查看统计信息
todo stats
# 按项目查看统计
todo stats <项目名>
```
### 搜索待办
```
# 搜索标题和备注
todo search <关键词>
# 只搜索未完成
todo search 报告 --pending
# 只搜索已完成
todo search 报告 --completed
```
## 排序规则
待办列表默认按以下顺序排序:
1. **整体最大优先级**(包括子待办)
2. **自身紧急程度**:high > medium > low
3. **自身重要程度**:high > medium > low
4. **创建时间**:新创建的排前面
**父待办排序考虑子待办优先级:**
- 如果子待办有紧急/重要,父待办的排序位置会提升
- 例如:父待办是"一般",但子待办是"高"→ 父待办按"高"级别排序
**子待办排序:**
- 子待办按紧急 > 重要 > 时间排序显示
## 显示规则
重要/紧急程度显示方式:
| 等级 | 重要显示 | 紧急显示 |
|------|---------|---------|
| 3 (紧急) | 🔴 | 🔥🔥 |
| 2 (重要) | 🟠 | 🔥 |
| 1 (普通) | (不显示) | (不显示) |
**示例:**
- `🔴🔥🔥` = 紧急 + 紧急
- `🟠🔥` = 重要 + 重要
- 无标记 = 普通待办(默认)
**注意:** 新建待办默认等级为 1,不显示任何标记。
## 定时提醒
使用 OpenClaw 的 cron 功能设置定时提醒:
```
# 每天早上9点发送未完成待办
定时任务:每天9点提醒待办
```
这会调用 `todo list-all --pending` 并发送结果。
## 数据结构
**projects 表:**
- id: 项目ID
- name: 项目名称
- created_at: 创建时间
**todos 表:**
- id: 待办ID
- project_id: 所属项目
- parent_id: 父待办ID(NULL表示顶级)
- title: 标题
- note: 备注
- importance: 重要程度
- urgency: 紧急程度
- status: 状态(pending/completed)
- due_date: 截止日期
- created_at: 创建时间
- updated_at: 更新时间
- completed_at: 完成时间
## 示例
```
# 创建工作项目
todo create-project 工作
# 添加一个紧急待办
todo add 工作 "上线前检查" -i 3 -u 3 -d 2026-03-25
# 添加子待办
todo add 工作 "检查数据库迁移" -p 1
todo add 工作 "检查API文档" -p 1
# 完成子待办
todo done 2
todo done 3
# 父待办仍需手动完成
todo done 1
# 查看所有未完成
todo list-all -p
```
FILE:todo.py
#!/usr/bin/env python3
"""
Todo Skill - SQLite-based todo management
"""
import sqlite3
import argparse
import sys
import os
from datetime import datetime
from typing import Optional, List
DB_PATH = os.path.expanduser("~/.openclaw/workspace/data/todo.db")
def get_connection():
"""Get database connection, create tables if not exist."""
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
# Create tables
conn.executescript("""
CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL,
parent_id INTEGER,
title TEXT NOT NULL,
note TEXT,
importance INTEGER DEFAULT 1,
urgency INTEGER DEFAULT 1,
status TEXT DEFAULT 'pending',
due_date TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE,
FOREIGN KEY (parent_id) REFERENCES todos(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_todos_project ON todos(project_id);
CREATE INDEX IF NOT EXISTS idx_todos_parent ON todos(parent_id);
CREATE INDEX IF NOT EXISTS idx_todos_status ON todos(status);
""")
conn.commit()
return conn
def create_project(name: str):
"""Create a new project."""
conn = get_connection()
try:
conn.execute("INSERT INTO projects (name) VALUES (?)", (name,))
conn.commit()
print(f"✅ 项目 '{name}' 创建成功")
except sqlite3.IntegrityError:
print(f"❌ 项目 '{name}' 已存在")
finally:
conn.close()
def list_projects():
"""List all projects."""
conn = get_connection()
cursor = conn.execute("""
SELECT p.id, p.name, p.created_at,
COUNT(CASE WHEN t.status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN t.status = 'completed' THEN 1 END) as completed
FROM projects p
LEFT JOIN todos t ON p.id = t.project_id
GROUP BY p.id
ORDER BY p.created_at
""")
projects = cursor.fetchall()
conn.close()
if not projects:
print("暂无项目,使用 'todo create-project <项目名>' 创建")
return
print("📋 项目列表:")
print("-" * 50)
for p in projects:
print(f" [{p['id']}] {p['name']}")
print(f" 未完成: {p['pending']} | 已完成: {p['completed']}")
def delete_project(name: str):
"""Delete a project and all its todos."""
conn = get_connection()
cursor = conn.execute("SELECT id FROM projects WHERE name = ?", (name,))
project = cursor.fetchone()
if not project:
print(f"❌ 项目 '{name}' 不存在")
conn.close()
return
# Count todos
cursor = conn.execute("SELECT COUNT(*) as cnt FROM todos WHERE project_id = ?", (project['id'],))
count = cursor.fetchone()['cnt']
conn.execute("DELETE FROM projects WHERE id = ?", (project['id'],))
conn.commit()
conn.close()
print(f"✅ 项目 '{name}' 已删除(包含 {count} 个待办)")
def add_todo(project_name: str, title: str, importance: str = 'medium',
urgency: str = 'medium', parent_id: Optional[int] = None,
due_date: Optional[str] = None, note: Optional[str] = None):
"""Add a new todo."""
conn = get_connection()
# Get project
cursor = conn.execute("SELECT id FROM projects WHERE name = ?", (project_name,))
project = cursor.fetchone()
if not project:
print(f"❌ 项目 '{project_name}' 不存在,请先创建")
conn.close()
return
# Validate importance/urgency
valid_levels = [1, 2, 3]
if importance not in valid_levels:
print(f"❌ 重要程度必须是: {', '.join(valid_levels)}")
conn.close()
return
if urgency not in valid_levels:
print(f"❌ 紧急程度必须是: {', '.join(valid_levels)}")
conn.close()
return
# Parse due_date
due_dt = None
if due_date:
try:
if ' ' in due_date:
due_dt = datetime.strptime(due_date, "%Y-%m-%d %H:%M")
else:
due_dt = datetime.strptime(due_date, "%Y-%m-%d")
except ValueError:
print(f"❌ 日期格式错误,应为 YYYY-MM-DD 或 YYYY-MM-DD HH:MM")
conn.close()
return
# Insert
cursor = conn.execute("""
INSERT INTO todos (project_id, parent_id, title, note, importance, urgency, due_date)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (project['id'], parent_id, title, note, importance, urgency, due_dt))
todo_id = cursor.lastrowid
conn.commit()
conn.close()
# Display
imp_icon = {1: '', 2: '🟠', 3: '🔴'}
urg_icon = {1: '', 2: '🔥', 3: '🔥🔥'}
print(f"✅ 待办创建成功 [#{todo_id}]")
print(f" {title}")
print(f" 项目: {project_name} | 重要: {imp_icon.get(importance, '')}{importance} | 紧急: {urg_icon.get(urgency, '')}{urgency}")
if due_date:
print(f" 截止: {due_date}")
if parent_id:
print(f" 父待办: #{parent_id}")
def done_todo(todo_id: int):
"""Mark a todo as completed."""
conn = get_connection()
cursor = conn.execute("SELECT id, title, status FROM todos WHERE id = ?", (todo_id,))
todo = cursor.fetchone()
if not todo:
print(f"❌ 待办 #{todo_id} 不存在")
conn.close()
return
if todo['status'] == 'completed':
print(f"⚠️ 待办 #{todo_id} 已经是完成状态")
conn.close()
return
conn.execute("""
UPDATE todos SET status = 'completed', completed_at = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP WHERE id = ?
""", (todo_id,))
conn.commit()
conn.close()
print(f"✅ 待办 #{todo_id} 已完成: {todo['title']}")
def undo_todo(todo_id: int):
"""Reopen a completed todo."""
conn = get_connection()
cursor = conn.execute("SELECT id, title, status FROM todos WHERE id = ?", (todo_id,))
todo = cursor.fetchone()
if not todo:
print(f"❌ 待办 #{todo_id} 不存在")
conn.close()
return
if todo['status'] == 'pending':
print(f"⚠️ 待办 #{todo_id} 已经是未完成状态")
conn.close()
return
conn.execute("""
UPDATE todos SET status = 'pending', completed_at = NULL,
updated_at = CURRENT_TIMESTAMP WHERE id = ?
""", (todo_id,))
conn.commit()
conn.close()
print(f"✅ 待办 #{todo_id} 已重新打开: {todo['title']}")
def delete_todo(todo_id: int):
"""Delete a todo and its children."""
conn = get_connection()
cursor = conn.execute("SELECT id, title FROM todos WHERE id = ?", (todo_id,))
todo = cursor.fetchone()
if not todo:
print(f"❌ 待办 #{todo_id} 不存在")
conn.close()
return
# Count children
def count_children(pid):
count = 1
cursor = conn.execute("SELECT id FROM todos WHERE parent_id = ?", (pid,))
for child in cursor.fetchall():
count += count_children(child['id'])
return count
total = count_children(todo_id)
# Delete (cascade will handle children)
conn.execute("DELETE FROM todos WHERE id = ?", (todo_id,))
conn.commit()
conn.close()
if total > 1:
print(f"✅ 已删除待办 #{todo_id} 及其 {total-1} 个子待办: {todo['title']}")
else:
print(f"✅ 已删除待办 #{todo_id}: {todo['title']}")
def list_todos(project_name: Optional[str] = None, status: Optional[str] = None,
importance: Optional[str] = None, urgency: Optional[str] = None,
date_from: Optional[str] = None, date_to: Optional[str] = None,
show_all: bool = False):
"""List todos with filters."""
conn = get_connection()
query = """
SELECT t.id, t.title, t.note, t.importance, t.urgency, t.status,
t.due_date, t.created_at, t.completed_at, t.parent_id, p.name as project
FROM todos t
JOIN projects p ON t.project_id = p.id
WHERE 1=1
"""
params = []
if project_name:
query += " AND p.name = ?"
params.append(project_name)
if status:
query += " AND t.status = ?"
params.append(status)
if importance:
query += " AND t.importance = ?"
params.append(importance)
if urgency:
query += " AND t.urgency = ?"
params.append(urgency)
if date_from:
query += " AND DATE(t.created_at) >= ?"
params.append(date_from)
if date_to:
query += " AND DATE(t.created_at) <= ?"
params.append(date_to)
# 排序:紧急 > 重要 > 普通按创建时间
query += """ ORDER BY
CASE t.urgency WHEN 'critical' THEN 1 WHEN 'high' THEN 2 WHEN 'medium' THEN 3 WHEN 'low' THEN 4 END,
CASE t.importance WHEN 'critical' THEN 1 WHEN 'high' THEN 2 WHEN 'medium' THEN 3 WHEN 'low' THEN 4 END,
t.created_at DESC"""
cursor = conn.execute(query, params)
todos = cursor.fetchall()
conn.close()
if not todos:
print("暂无符合条件的待办")
return
# Icons
imp_icon = {1: '', 2: '🟠', 3: '🔴'}
urg_icon = {1: '', 2: '🔥', 3: '🔥🔥'}
status_icon = {'pending': '⬜', 'completed': '✅'}
level_text = {1: '', 2: '', 3: ''}
# 优先级分数映射
level_score = {3: 1, 2: 2, 1: 3}
# Build tree
todos_by_parent = {}
root_todos = []
for t in todos:
t_dict = dict(t)
pid = t_dict['parent_id']
if pid is None:
root_todos.append(t_dict)
else:
if pid not in todos_by_parent:
todos_by_parent[pid] = []
todos_by_parent[pid].append(t_dict)
# 计算待办的最大优先级(包括子待办)
def get_max_priority(todo_dict):
"""获取待办及其子待办的最大优先级分数"""
imp_score = level_score.get(int(todo_dict['importance']), 3)
urg_score = level_score.get(int(todo_dict['urgency']), 3)
min_score = min(imp_score, urg_score) # 取更高的优先级
# 检查子待办
if todo_dict['id'] in todos_by_parent:
for child in todos_by_parent[todo_dict['id']]:
child_score = get_max_priority(child)
min_score = min(min_score, child_score)
return min_score
# 排序父待办:考虑子待办的最大优先级
def sort_key(todo_dict):
max_priority = get_max_priority(todo_dict)
own_urg = level_score.get(int(todo_dict['urgency']), 3)
own_imp = level_score.get(int(todo_dict['importance']), 3)
return (max_priority, own_urg, own_imp, todo_dict.get('created_at', ''))
root_todos.sort(key=sort_key)
# 排序子待办
for pid in todos_by_parent:
todos_by_parent[pid].sort(key=lambda t: (
level_score.get(int(t['urgency']), 3),
level_score.get(int(t['importance']), 3),
t.get('created_at', '')
))
def print_todo(t, indent=0):
prefix = " " * indent
icon = status_icon.get(t['status'], '❓')
imp = imp_icon.get(int(t['importance']), '')
urg = urg_icon.get(int(t['urgency']), '')
imp_txt = level_text.get(int(t['importance']), '')
urg_txt = level_text.get(int(t['urgency']), '')
line = f"{prefix}{icon} [#{t['id']}] {t['title']}"
if t['project'] != project_name:
line += f" ({t['project']})"
# 显示重要程度:high显示emoji颜色,medium/low显示文字
if int(t['importance']) == 3:
imp_display = imp
elif int(t['importance']) == 2:
imp_display = imp
else:
imp_display = ""
# 显示紧急程度:high🔥🔥,medium/low不显示
urg_display = urg if int(t['urgency']) in [2, 3] else ''
line += f" {imp_display}{urg_display}"
if t['due_date']:
due = t['due_date'][:10]
line += f" 📅{due}"
print(line)
if t['note'] and indent == 0:
print(f"{prefix} 💬 {t['note']}")
# Print children
if t['id'] in todos_by_parent:
for child in todos_by_parent[t['id']]:
print_todo(child, indent + 1)
title = "📋 待办列表"
if project_name:
title += f" [{project_name}]"
print(title)
print("-" * 50)
for t in root_todos:
print_todo(t)
def show_todo(todo_id: int):
"""Show todo details."""
conn = get_connection()
cursor = conn.execute("""
SELECT t.*, p.name as project
FROM todos t
JOIN projects p ON t.project_id = p.id
WHERE t.id = ?
""", (todo_id,))
todo = cursor.fetchone()
if not todo:
print(f"❌ 待办 #{todo_id} 不存在")
conn.close()
return
# Get children
cursor = conn.execute("SELECT * FROM todos WHERE parent_id = ?", (todo_id,))
children = cursor.fetchall()
# Get parent
parent = None
if todo['parent_id']:
cursor = conn.execute("SELECT id, title FROM todos WHERE id = ?", (todo['parent_id'],))
parent = cursor.fetchone()
conn.close()
# Icons
imp_icon = {1: '', 2: '🟠', 3: '🔴'}
urg_icon = {1: '', 2: '🔥', 3: '🔥🔥'}
status_icon = {'pending': '⬜ 未完成', 'completed': '✅ 已完成'}
print(f"📋 待办详情 [#{todo['id']}]")
print("-" * 50)
print(f"标题: {todo['title']}")
print(f"项目: {todo['project']}")
print(f"状态: {status_icon.get(todo['status'], todo['status'])}")
print(f"重要: {imp_icon.get(todo['importance'], '')} {todo['importance']}")
print(f"紧急: {urg_icon.get(todo['urgency'], '')} {todo['urgency']}")
if todo['due_date']:
print(f"截止: {todo['due_date']}")
if todo['note']:
print(f"备注: {todo['note']}")
print(f"创建: {todo['created_at']}")
if todo['completed_at']:
print(f"完成: {todo['completed_at']}")
if parent:
print(f"父待办: #{parent['id']} {parent['title']}")
if children:
print(f"\n子待办 ({len(children)}):")
for c in children:
icon = '✅' if c['status'] == 'completed' else '⬜'
print(f" {icon} [#{c['id']}] {c['title']}")
def edit_todo(todo_id: int, title: Optional[str] = None, importance: Optional[str] = None,
urgency: Optional[str] = None, due_date: Optional[str] = None,
note: Optional[str] = None, project_name: Optional[str] = None):
"""Edit a todo."""
conn = get_connection()
cursor = conn.execute("SELECT id FROM todos WHERE id = ?", (todo_id,))
if not cursor.fetchone():
print(f"❌ 待办 #{todo_id} 不存在")
conn.close()
return
updates = []
params = []
if title:
updates.append("title = ?")
params.append(title)
if importance:
valid_levels = [1, 2, 3]
if importance not in valid_levels:
print(f"❌ 重要程度必须是: {', '.join(valid_levels)}")
conn.close()
return
updates.append("importance = ?")
params.append(importance)
if urgency:
valid_levels = [1, 2, 3]
if urgency not in valid_levels:
print(f"❌ 紧急程度必须是: {', '.join(valid_levels)}")
conn.close()
return
updates.append("urgency = ?")
params.append(urgency)
if due_date:
try:
if ' ' in due_date:
due_dt = datetime.strptime(due_date, "%Y-%m-%d %H:%M")
else:
due_dt = datetime.strptime(due_date, "%Y-%m-%d")
updates.append("due_date = ?")
params.append(due_dt)
except ValueError:
print(f"❌ 日期格式错误,应为 YYYY-MM-DD 或 YYYY-MM-DD HH:MM")
conn.close()
return
if note is not None:
updates.append("note = ?")
params.append(note)
if project_name:
cursor = conn.execute("SELECT id FROM projects WHERE name = ?", (project_name,))
project = cursor.fetchone()
if not project:
print(f"❌ 项目 '{project_name}' 不存在")
conn.close()
return
updates.append("project_id = ?")
params.append(project['id'])
if not updates:
print("⚠️ 没有要更新的内容")
conn.close()
return
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(todo_id)
query = f"UPDATE todos SET {', '.join(updates)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
conn.close()
print(f"✅ 待办 #{todo_id} 已更新")
def show_stats(project_name: Optional[str] = None):
"""Show todo statistics."""
conn = get_connection()
if project_name:
cursor = conn.execute("SELECT id FROM projects WHERE name = ?", (project_name,))
project = cursor.fetchone()
if not project:
print(f"❌ 项目 '{project_name}' 不存在")
conn.close()
return
cursor = conn.execute("""
SELECT
COUNT(*) as total,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed,
COUNT(CASE WHEN status = 'pending' AND due_date < DATE('now') THEN 1 END) as overdue
FROM todos WHERE project_id = ?
""", (project['id'],))
else:
cursor = conn.execute("""
SELECT
COUNT(*) as total,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed,
COUNT(CASE WHEN status = 'pending' AND due_date < DATE('now') THEN 1 END) as overdue
FROM todos
""")
stats = cursor.fetchone()
# By importance
cursor = conn.execute("""
SELECT importance, COUNT(*) as cnt
FROM todos
WHERE status = 'pending'
GROUP BY importance
ORDER BY
CASE importance
WHEN 3 THEN 1
WHEN 2 THEN 2
WHEN 1 THEN 3
END
""")
by_importance = cursor.fetchall()
# By urgency
cursor = conn.execute("""
SELECT urgency, COUNT(*) as cnt
FROM todos
WHERE status = 'pending'
GROUP BY urgency
ORDER BY
CASE urgency
WHEN 3 THEN 1
WHEN 2 THEN 2
WHEN 1 THEN 3
END
""")
by_urgency = cursor.fetchall()
conn.close()
title = "📊 待办统计"
if project_name:
title += f" [{project_name}]"
print(title)
print("=" * 50)
print(f"总计: {stats['total']} | 未完成: {stats['pending']} | 已完成: {stats['completed']} | 逾期: {stats['overdue']}")
print()
imp_icon = {1: '', 2: '🟠', 3: '🔴'}
urg_icon = {1: '', 2: '🔥', 3: '🔥🔥'}
print("按重要程度(未完成):")
for row in by_importance:
icon = imp_icon.get(row['importance'], '')
print(f" {icon} {row['importance']}: {row['cnt']}")
print()
print("按紧急程度(未完成):")
for row in by_urgency:
icon = urg_icon.get(row['urgency'], '')
print(f" {icon} {row['urgency']}: {row['cnt']}")
def search_todos(keyword: str, status: Optional[str] = None):
"""Search todos by keyword in title or note."""
conn = get_connection()
query = """
SELECT t.id, t.title, t.note, t.importance, t.urgency, t.status,
t.due_date, t.created_at, t.parent_id, p.name as project
FROM todos t
JOIN projects p ON t.project_id = p.id
WHERE (t.title LIKE ? OR t.note LIKE ?)
"""
search_pattern = f"%{keyword}%"
params = [search_pattern, search_pattern]
if status:
query += " AND t.status = ?"
params.append(status)
# 排序:紧急 > 重要 > 普通按创建时间
query += """ ORDER BY
CASE t.urgency WHEN 'critical' THEN 1 WHEN 'high' THEN 2 WHEN 'medium' THEN 3 WHEN 'low' THEN 4 END,
CASE t.importance WHEN 'critical' THEN 1 WHEN 'high' THEN 2 WHEN 'medium' THEN 3 WHEN 'low' THEN 4 END,
t.created_at DESC"""
cursor = conn.execute(query, params)
todos = cursor.fetchall()
conn.close()
if not todos:
print(f"未找到包含 '{keyword}' 的待办")
return
# Icons
imp_icon = {1: '', 2: '🟠', 3: '🔴'}
urg_icon = {1: '', 2: '🔥', 3: '🔥🔥'}
status_icon = {'pending': '⬜', 'completed': '✅'}
level_text = {1: '', 2: '', 3: ''}
print(f"🔍 搜索结果:'{keyword}' ({len(todos)} 条)")
print("-" * 50)
for t in todos:
icon = status_icon.get(t['status'], '❓')
imp = imp_icon.get(int(t['importance']), '')
urg = urg_icon.get(int(t['urgency']), '')
imp_txt = level_text.get(int(t['importance']), '')
urg_txt = level_text.get(int(t['urgency']), '')
line = f"{icon} [#{t['id']}] {t['title']} ({t['project']})"
# 显示重要程度:high显示emoji颜色,medium/low显示文字
if int(t['importance']) == 3:
imp_display = imp
elif int(t['importance']) == 2:
imp_display = imp
else:
imp_display = ""
# 显示紧急程度:high🔥🔥,medium/low不显示
urg_display = urg if int(t['urgency']) in [2, 3] else ''
line += f" {imp_display}{urg_display}"
print(line)
# 高亮匹配的备注
if t['note'] and keyword.lower() in t['note'].lower():
print(f" 💬 {t['note'][:50]}...")
def main():
parser = argparse.ArgumentParser(description='Todo management tool')
subparsers = parser.add_subparsers(dest='command', help='Commands')
# create-project
cp_parser = subparsers.add_parser('create-project', help='创建项目')
cp_parser.add_argument('name', help='项目名称')
# list-projects
subparsers.add_parser('list-projects', help='列出所有项目')
# delete-project
dp_parser = subparsers.add_parser('delete-project', help='删除项目')
dp_parser.add_argument('name', help='项目名称')
# add
add_parser = subparsers.add_parser('add', help='添加待办')
add_parser.add_argument('project', help='项目名称')
add_parser.add_argument('title', help='待办标题')
add_parser.add_argument('-i', '--importance', type=int, choices=[1, 2, 3],
default=1, help='重要程度 (1=普通, 2=重要, 3=紧急)')
add_parser.add_argument('-u', '--urgency', type=int, choices=[1, 2, 3],
default=1, help='紧急程度 (1=普通, 2=重要, 3=紧急)')
add_parser.add_argument('-p', '--parent', type=int, help='父待办ID')
add_parser.add_argument('-d', '--due', help='截止日期 (YYYY-MM-DD)')
add_parser.add_argument('-n', '--note', help='备注')
# done
done_parser = subparsers.add_parser('done', help='完成待办')
done_parser.add_argument('id', type=int, help='待办ID')
# undo
undo_parser = subparsers.add_parser('undo', help='重新打开待办')
undo_parser.add_argument('id', type=int, help='待办ID')
# delete
del_parser = subparsers.add_parser('delete', help='删除待办')
del_parser.add_argument('id', type=int, help='待办ID')
# list / list-all
list_parser = subparsers.add_parser('list', help='列出项目待办')
list_parser.add_argument('project', help='项目名称')
list_parser.add_argument('-p', '--pending', action='store_true', help='只显示未完成')
list_parser.add_argument('-c', '--completed', action='store_true', help='只显示已完成')
listall_parser = subparsers.add_parser('list-all', help='列出所有待办')
listall_parser.add_argument('-p', '--pending', action='store_true', help='只显示未完成')
listall_parser.add_argument('-c', '--completed', action='store_true', help='只显示已完成')
listall_parser.add_argument('--importance', type=int, choices=[1, 2, 3])
listall_parser.add_argument('--urgency', type=int, choices=[1, 2, 3])
listall_parser.add_argument('--from', dest='date_from', help='开始日期 (YYYY-MM-DD)')
listall_parser.add_argument('--to', dest='date_to', help='结束日期 (YYYY-MM-DD)')
# show
show_parser = subparsers.add_parser('show', help='查看待办详情')
show_parser.add_argument('id', type=int, help='待办ID')
# edit
edit_parser = subparsers.add_parser('edit', help='编辑待办')
edit_parser.add_argument('id', type=int, help='待办ID')
edit_parser.add_argument('--title', help='新标题')
edit_parser.add_argument('--importance', type=int, choices=[1, 2, 3])
edit_parser.add_argument('--urgency', type=int, choices=[1, 2, 3])
edit_parser.add_argument('--due', help='截止日期')
edit_parser.add_argument('--note', help='备注')
edit_parser.add_argument('--project', help='移动到项目')
# stats
stats_parser = subparsers.add_parser('stats', help='统计信息')
stats_parser.add_argument('project', nargs='?', help='项目名称')
# search
search_parser = subparsers.add_parser('search', help='搜索待办')
search_parser.add_argument('keyword', help='搜索关键词')
search_parser.add_argument('-p', '--pending', action='store_true', help='只搜索未完成')
search_parser.add_argument('-c', '--completed', action='store_true', help='只搜索已完成')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
if args.command == 'create-project':
create_project(args.name)
elif args.command == 'list-projects':
list_projects()
elif args.command == 'delete-project':
delete_project(args.name)
elif args.command == 'add':
add_todo(args.project, args.title, args.importance, args.urgency,
args.parent, args.due, args.note)
elif args.command == 'done':
done_todo(args.id)
elif args.command == 'undo':
undo_todo(args.id)
elif args.command == 'delete':
delete_todo(args.id)
elif args.command == 'list':
status = 'pending' if args.pending else ('completed' if args.completed else None)
list_todos(args.project, status)
elif args.command == 'list-all':
status = 'pending' if args.pending else ('completed' if args.completed else None)
list_todos(None, status, args.importance, args.urgency, args.date_from, args.date_to)
elif args.command == 'show':
show_todo(args.id)
elif args.command == 'edit':
edit_todo(args.id, args.title, args.importance, args.urgency, args.due, args.note, args.project)
elif args.command == 'stats':
show_stats(args.project)
elif args.command == 'search':
status = 'pending' if args.pending else ('completed' if args.completed else None)
search_todos(args.keyword, status)
if __name__ == '__main__':
main()Left 4 Dead 2 服务器管理助手。支持:(1) 记录和管理多台 L4D2 服务器(别名、IP、端口)(2) 通过 A2S 协议查询服务器状态(玩家数、地图、名称等)(3) 通过 RCON 执行服务器命令。触发词:L4D2、求生之路、服务器状态、rcon、a2s 查询。
---
name: l4d2-server
description: Left 4 Dead 2 服务器管理助手。支持:(1) 记录和管理多台 L4D2 服务器(别名、IP、端口)(2) 通过 A2S 协议查询服务器状态(玩家数、地图、名称等)(3) 通过 RCON 执行服务器命令。触发词:L4D2、求生之路、服务器状态、rcon、a2s 查询。
---
# L4D2 服务器管理助手
管理 Left 4 Dead 2 游戏服务器,支持状态查询和远程指令执行。
## 配置文件
服务器配置存储在:`~/.openclaw/workspace/config/l4d2-servers.json`
### 配置结构
```json
{
"servers": {
"alias": {
"host": "192.168.1.100",
"port": 27015,
"rcon_password": "your_rcon_password"
}
}
}
```
## 功能
### 1. 服务器管理
添加/修改服务器:
```
添加服务器 别名=myserver IP=192.168.1.100 端口=27015 RCON密码=xxx
```
列出已配置的服务器:
```
列出所有 L4D2 服务器
```
### 2. 状态查询
查询服务器状态:
```
查询 myserver 状态
查询 192.168.1.100:27015 状态
```
**查询优先级:**
1. 如果服务器配置了 `rcon_password` → 使用 RCON `status` 命令(信息更详细,含玩家 IP、延迟、丢包等)
2. 如果没有 RCON 密码 → 使用 A2S 协议查询(基础信息:名称、地图、玩家数)
**RCON status 输出字段:**
- hostname: 服务器名称
- map: 当前地图
- players: 玩家数/最大玩家数
- 玩家列表: userid, name, steamid, connected, ping, loss, state, rate, adr
### 3. RCON 命令执行
执行服务器命令:
```
在 myserver 上执行 status
在 myserver 上执行 changelevel c5m1_waterfront
在 myserver 上执行 sm_kick playername
```
常用 RCON 命令:
- `status` - 查看服务器状态和玩家列表
- `hostname` - 查看服务器名称
- `changelevel <map>` - 切换地图
- `sm_kick <name>` - 踢出玩家(需要 SourceMod)
- `sm_ban <name> <duration>` - 封禁玩家
- `sv_cheats 1/0` - 开关作弊模式
- `nb_delete_all` - 清除所有感染
- `z_difficulty` - 查看当前难度
- `mp_gamemode` - 查看当前游戏模式
常用地图代码:
- 战役: c1m1_hotel, c2m1_highway, c3m1_plankcountry, c4m1_milltown_a, c5m1_waterfront
- 生存: l4d2_stadium_city, l4d2_riverbed_dam
- 对抗: c1m4_atrium (牺牲)
## 脚本
### A2S 查询
```bash
python3 scripts/a2s_query.py <host> [port] [--json]
```
默认端口 27015,`--json` 输出 JSON 格式。
### RCON 命令
```bash
python3 scripts/rcon_cmd.py <host> <port> <password> <command>
```
## 配置文件操作
读取配置:
```bash
cat ~/.openclaw/workspace/config/l4d2-servers.json
```
添加服务器到配置:
```bash
# 使用 jq 操作
jq '.servers.myserver = {"host": "192.168.1.100", "port": 27015, "rcon_password": "xxx"}' \
~/.openclaw/workspace/config/l4d2-servers.json > /tmp/l4d2.json && \
mv /tmp/l4d2.json ~/.openclaw/workspace/config/l4d2-servers.json
```
## 注意事项
1. RCON 密码敏感,配置文件应设置适当权限
2. A2S 查询不需要密码,RCON 操作需要密码
3. 部分命令需要服务器安装 SourceMod 插件
FILE:references/server-config.example.json
{
"servers": {
"example-server": {
"host": "192.168.1.100",
"port": 27015,
"rcon_password": "your_rcon_password_here",
"description": "示例服务器,请修改或删除"
}
}
}
FILE:scripts/a2s_query.py
#!/usr/bin/env python3
"""
A2S (Valve Server Query) 查询脚本
查询 Source 引擎游戏服务器状态(L4D2、CS2 等)
用法:
python3 a2s_query.py <host> [port] [--json]
默认端口 27015
"""
import socket
import struct
import sys
import json
from datetime import datetime
def send_a2s_packet(sock, host, port, payload, challenge=None):
"""发送 A2S 数据包并接收响应"""
if challenge:
# 挑战码追加到数据包末尾
payload = payload + challenge
sock.settimeout(10.0)
sock.sendto(payload, (host, port))
response, _ = sock.recvfrom(4096)
return response
def parse_a2s_info_response(data):
"""解析 A2S_INFO 响应"""
if len(data) < 5:
return None
# 跳过头部 (4 bytes header + 1 byte type)
offset = 5
result = {}
# Protocol version (1 byte)
result['protocol'] = data[offset]
offset += 1
# Server name (null-terminated string)
name_end = data.index(b'\x00', offset)
result['name'] = data[offset:name_end].decode('utf-8', errors='replace')
offset = name_end + 1
# Map name (null-terminated string)
map_end = data.index(b'\x00', offset)
result['map'] = data[offset:map_end].decode('utf-8', errors='replace')
offset = map_end + 1
# Folder/game directory (null-terminated string)
folder_end = data.index(b'\x00', offset)
result['folder'] = data[offset:folder_end].decode('utf-8', errors='replace')
offset = folder_end + 1
# Game name (null-terminated string)
game_end = data.index(b'\x00', offset)
result['game'] = data[offset:game_end].decode('utf-8', errors='replace')
offset = game_end + 1
# App ID (2 bytes, little-endian)
result['app_id'] = struct.unpack('<H', data[offset:offset+2])[0]
offset += 2
# Player count (1 byte)
result['players'] = data[offset]
offset += 1
# Max players (1 byte)
result['max_players'] = data[offset]
offset += 1
# Bot count (1 byte)
result['bots'] = data[offset]
offset += 1
# Server type (1 byte: 'd' = dedicated, 'l' = listen, 'p' = SourceTV)
result['server_type'] = chr(data[offset])
offset += 1
# Environment (1 byte: 'w' = Windows, 'l' = Linux)
result['environment'] = chr(data[offset])
offset += 1
# Visibility (1 byte: 0 = public, 1 = private)
result['visibility'] = data[offset]
offset += 1
# VAC secured (1 byte: 0 = unsecured, 1 = secured)
result['vac'] = data[offset]
offset += 1
# Game version (null-terminated string)
if offset < len(data):
version_end = data.index(b'\x00', offset) if b'\x00' in data[offset:] else len(data)
result['version'] = data[offset:version_end].decode('utf-8', errors='replace')
return result
def query_server(host, port=27015):
"""查询服务器状态"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# A2S_INFO 请求包
# Header: 0xFFFFFFFF (long) + 'T' (byte) + Source Engine Query (string)
A2S_INFO = b'\xff\xff\xff\xffTSource Engine Query\x00'
# 先发送请求,可能需要挑战码
response = send_a2s_packet(sock, host, port, A2S_INFO)
# 检查是否需要挑战码
# 响应以 0x9A 开头表示需要挑战码
if response[4] == 0x41: # A2S_INFO_CHALLENGE response
challenge = response[5:9] # 提取挑战码
response = send_a2s_packet(sock, host, port, A2S_INFO, challenge)
# 解析响应
if response[4] == 0x49: # 'I' = A2S_INFO response
info = parse_a2s_info_response(response)
if info:
info['host'] = host
info['port'] = port
info['query_time'] = datetime.now().isoformat()
return info
return {'error': 'Failed to parse server response', 'raw': response.hex()}
except socket.timeout:
return {'error': 'Connection timed out', 'host': host, 'port': port}
except socket.error as e:
return {'error': f'Socket error: {str(e)}', 'host': host, 'port': port}
except Exception as e:
return {'error': f'Query failed: {str(e)}', 'host': host, 'port': port}
finally:
sock.close()
def format_output(info, json_output=False):
"""格式化输出"""
if json_output:
return json.dumps(info, indent=2, ensure_ascii=False)
if 'error' in info:
return f"❌ 查询失败: {info['error']}"
lines = [
f"🖥️ 服务器: {info.get('name', 'Unknown')}",
f"📍 地址: {info['host']}:{info['port']}",
f"🗺️ 地图: {info.get('map', 'Unknown')}",
f"👥 玩家: {info.get('players', 0)}/{info.get('max_players', 0)} ({info.get('bots', 0)} bots)",
f"🎮 游戏: {info.get('game', 'Unknown')} (AppID: {info.get('app_id', 0)})",
f"📦 版本: {info.get('version', 'Unknown')}",
f"🔒 VAC: {'已启用' if info.get('vac') else '未启用'}",
f"🌐 类型: {'Linux' if info.get('environment') == 'l' else 'Windows'} {info.get('server_type', '')}",
]
return '\n'.join(lines)
def main():
if len(sys.argv) < 2 or sys.argv[1] in ['-h', '--help']:
print("用法: python3 a2s_query.py <host> [port] [--json]")
print("示例: python3 a2s_query.py 192.168.1.100 27015")
print(" python3 a2s_query.py 192.168.1.100 --json")
sys.exit(0)
host = sys.argv[1]
port = 27015
json_output = '--json' in sys.argv
# 解析端口
if ':' in host:
host, port_str = host.split(':')
port = int(port_str)
elif len(sys.argv) > 2 and sys.argv[2].isdigit():
port = int(sys.argv[2])
info = query_server(host, port)
print(format_output(info, json_output))
if __name__ == '__main__':
main()
FILE:scripts/rcon_cmd.py
#!/usr/bin/env python3
"""
Valve RCON (Remote Console) 客户端
通过 RCON 协议执行游戏服务器命令
用法:
python3 rcon_cmd.py <host> <port> <password> <command>
python3 rcon_cmd.py 192.168.1.100 27015 mypassword "status"
注意: L4D2 的 RCON 端口通常与游戏端口相同 (默认 27015)
"""
import socket
import struct
import sys
# RCON 数据包类型
SERVERDATA_AUTH = 3
SERVERDATA_AUTH_RESPONSE = 2
SERVERDATA_EXECCOMMAND = 2
SERVERDATA_RESPONSE_VALUE = 0
def create_packet(packet_id, packet_type, body):
"""创建 RCON 数据包
数据包结构:
- 4 bytes: 包大小 (不包含这4字节)
- 4 bytes: 包ID
- 4 bytes: 包类型
- n bytes: 包体 (body + null + null)
"""
body_bytes = body.encode('ascii') + b'\x00\x00'
payload = struct.pack('<II', packet_id, packet_type) + body_bytes
return struct.pack('<I', len(payload)) + payload
def read_packet(sock, timeout=30.0):
"""读取 RCON 响应数据包"""
sock.settimeout(timeout)
# 读取大小 (4 bytes)
size_data = b''
while len(size_data) < 4:
chunk = sock.recv(4 - len(size_data))
if not chunk:
return None, None, None
size_data += chunk
size = struct.unpack('<I', size_data)[0]
# 读取剩余数据
data = b''
while len(data) < size:
chunk = sock.recv(size - len(data))
if not chunk:
break
data += chunk
if len(data) < 8:
return None, None, None
packet_id = struct.unpack('<I', data[:4])[0]
packet_type = struct.unpack('<I', data[4:8])[0]
body = data[8:].rstrip(b'\x00').decode('utf-8', errors='replace')
return packet_id, packet_type, body
def send_command(host, port, password, command, timeout=30.0):
"""通过 RCON 发送命令并获取响应"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
# 连接服务器
sock.connect((host, port))
# 发送认证包
auth_packet = create_packet(1, SERVERDATA_AUTH, password)
sock.sendall(auth_packet)
# 读取认证响应(可能有多个包,包括空响应)
auth_success = False
for _ in range(10):
packet_id, packet_type, body = read_packet(sock, timeout)
if packet_type == SERVERDATA_AUTH_RESPONSE:
if packet_id == -1:
return {'success': False, 'error': '认证失败(密码错误)'}
auth_success = True
break
if not auth_success:
return {'success': False, 'error': '认证响应异常'}
# 发送命令
cmd_packet = create_packet(2, SERVERDATA_EXECCOMMAND, command)
sock.sendall(cmd_packet)
# 发送空命令标记结束(获取完整响应的技巧)
empty_packet = create_packet(3, SERVERDATA_EXECCOMMAND, "")
sock.sendall(empty_packet)
# 读取响应
full_response = ""
for _ in range(50):
packet_id, packet_type, body = read_packet(sock, timeout)
if packet_id is None:
break
if packet_id == 3: # 空包响应,表示命令输出结束
break
if packet_id == 2:
full_response += body
return {
'success': True,
'command': command,
'response': full_response.strip(),
'host': host,
'port': port
}
except socket.timeout:
return {'success': False, 'error': '连接超时'}
except socket.error as e:
return {'success': False, 'error': f'网络错误: {str(e)}'}
except Exception as e:
return {'success': False, 'error': f'执行失败: {str(e)}'}
finally:
sock.close()
def main():
if len(sys.argv) < 5:
print("用法: python3 rcon_cmd.py <host> <port> <password> <command>")
print("示例: python3 rcon_cmd.py 192.168.1.100 27015 mypassword status")
print(" python3 rcon_cmd.py 192.168.1.100 27015 mypassword \"changelevel c5m1_waterfront\"")
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
password = sys.argv[3]
command = ' '.join(sys.argv[4:])
result = send_command(host, port, password, command)
if result['success']:
print(f"✅ 命令执行成功: {result['command']}")
print(f"📝 响应:\n{result['response']}")
else:
print(f"❌ 执行失败: {result['error']}")
if __name__ == '__main__':
main()