@clawhub-zhangyu68-f076ebc371
飞书全场景Todo管理器,深度整合飞书生态。必须触发场景:用户发送待办事项、用户提到'/todo'指令、用户询问待办列表、用户要求保存待办、用户发送包含时间的任务安排、用户询问日历日程、用户要求同步到日历。支持多渠道消息(飞书/微信/短信等)接收Todo,自动同步飞书日历。
---
name: feishu-omni-todo
description: 飞书全场景Todo管理器,深度整合飞书生态。必须触发场景:用户发送待办事项、用户提到'/todo'指令、用户询问待办列表、用户要求保存待办、用户发送包含时间的任务安排、用户询问日历日程、用户要求同步到日历。支持多渠道消息(飞书/微信/短信等)接收Todo,自动同步飞书日历。
---
# 飞书全场景 Todo 管理器 (Feishu Omni-Todo)
## 核心功能
从飞书消息中智能识别和管理待办事项,支持自动保存、列表查询、状态管理和提醒设置。
## 触发规则
当收到飞书消息时,自动检测是否符合以下任一情况:
1. 消息内容包含待办事项描述(如"明天要做xxx"、"需要完成xxx")
2. 消息包含微信公众号链接 + 处理需求(如"这篇文章下周看")
3. 消息以 `/todo` 开头(指令系统)
4. 用户询问"我有哪些待办"、"最近要做什么"等
符合条件时必须使用本技能处理,不得直接回复。
## 数据存储
Todo数据存储在 `~/.openclaw/workspace/todo.json`,格式如下:
```json
{
"todos": [
{
"id": 1,
"content": "待办内容描述",
"created_at": "2026-04-27T10:00:00+08:00",
"due_time": "2026-04-28T15:00:00+08:00",
"status": "pending",
"priority": "medium",
"source": "飞书消息ID: om_xxx",
"tags": [],
"links": [
{
"url": "https://example.com/doc",
"title": "参考文档",
"type": "feishu_wiki"
}
]
}
]
}
```
### 字段说明
- `priority`: 优先级,可选值:`high`(高)/`medium`(中)/`low`(低)
- `links`: 关联链接数组,可选字段
- `url`: 链接地址
- `title`: 链接标题/描述
- `type`: 链接类型:`feishu_wiki`(飞书文档)/`url`(普通链接)/`image`(图片)/`file`(文件)
## 处理流程
### 1. 自动识别待办(非指令消息)
当收到普通飞书消息(非 `/todo` 开头)时:
1. 分析消息内容是否包含待办事项
2. 提取待办内容、识别时间信息(如果有)
3. 自动添加到Todo库
4. 回复用户:"✅ 已保存待办:[内容] [(截止时间:xxx)]"
5. 如果消息包含多个待办,逐一识别并保存
### 2. `/todo` 指令处理
#### `/todo`(无参数)
- 列出所有未完成的待办,按截止时间优先级排序
- 输出格式:
```
📋 你的待办列表:
1. [ ] 🔴 高优先级待办
📅 截止时间:2026-04-28 15:00 ⚠️ 即将到期
🔗 相关链接:
- [参考文档](https://my.feishu.cn/wiki/xxx)
🏷️ 标签:工作、重要
2. [ ] 🟡 中优先级待办
📅 创建于:2026-04-27
3. [ ] 🟢 低优先级待办
🏷️ 标签:阅读
```
- 优先级标记:🔴高/🟡中/🟢低
- 标签显示:🏷️ 标签名
- 链接显示:🔗 链接标题(可点击跳转)
#### `/todo all`
- 列出所有待办(包括已完成的)
- 已完成的项目显示为:
```
[x] 已完成的待办内容 ✅ 已完成
📅 完成时间:2026-04-27 15:00
```
#### `/todo done [序号] [序号...]`
- 将指定序号的待办标记为已完成,支持批量操作
- 示例:`/todo done 1` 或 `/todo done 1 2 3`
- 回复:"✅ 已标记为完成:[内容]"
#### `/todo del [序号] [序号...]`
- 删除指定序号的待办,支持批量操作和范围选择
- 示例:`/todo del 1` 或 `/todo del 1-3`
- 回复:"🗑️ 已删除待办:[内容]"
#### `/todo clear`
- 清空所有已完成的待办
- 回复:"🗑️ 已清空所有已完成的待办"
#### `/todo remind [序号] [时间]`
- 为指定待办设置提醒时间,支持自然语言时间
- 示例:`/todo remind 1 明天下午3点`、`/todo remind 2 周三前`、`/todo remind 3 今晚8点`
- 回复:"⏰ 已为待办设置提醒:[内容] 提醒时间:[解析后的时间]"
- 到期前30分钟和5分钟会自动发送飞书提醒
#### `/todo priority [序号] [高/中/低]`
- 设置待办优先级
- 示例:`/todo priority 1 高`
- 回复:"✅ 已设置优先级:[内容] -> [优先级]"
#### `/todo tag [序号] [标签]`
- 为待办添加标签
- 示例:`/todo tag 1 工作`、`/todo tag 2 阅读`
- 回复:"🏷️ 已添加标签:[内容] -> [标签]"
#### `/todo link [序号] [url] [标题]`
- 为待办添加关联链接
- 示例:`/todo link 1 https://my.feishu.cn/wiki/xxx 参考文档`
- 回复:"🔗 已添加链接:[内容] -> [标题](链接)"
#### `/todo link list [序号]`
- 列出待办的所有关联链接
- 示例:`/todo link list 1`
#### `/todo link del [序号] [链接索引]`
- 删除待办的指定链接
- 示例:`/todo link del 1 1`(删除待办1的第1个链接)
#### `/todo filter [标签]`
- 按标签筛选待办
- 示例:`/todo filter 工作`
- 只显示包含指定标签的待办
#### `/todo stats`
- 查看待办统计信息
- 显示本周完成率、待办分布、分类统计
#### `/todo sync [序号]`
- 将指定待办同步到飞书日历
- 示例:`/todo sync 1`
- 自动创建日历事件,设置30分钟和5分钟提醒
- 回复:"📅 已同步到飞书日历:[内容]"
#### `/todo calendar [天数]`
- 查看未来N天的飞书日历和待办合并视图
- 示例:`/todo calendar 7`(查看未来7天)
- 显示会议、待办、空闲时间分布
#### `/todo suggest`
- 智能推荐空闲时间段安排待办
- 自动避开已有会议,优先推荐工作时间
- 回复:"💡 推荐时间:明天下午14:00(该时间段无会议)"
## 智能识别规则
1. **时间识别**:支持识别丰富的自然语言时间表述:
- 相对时间:"今天"、"明天"、"后天"、"3天后"、"两小时后"
- 日期时间:"下周一"、"5月1日"、"2026-04-28"、"周三前"、"周五前"、"月底"
- 时间段:"下午3点"、"晚上8点"、"15:00"、"今晚"、"明晚"
- 模糊时间:"下周"、"下个月"、"最近几天"
- 自动转换为标准时间,支持时区校正(Asia/Shanghai)
2. **链接识别**:自动识别消息中的URL,包括:
- 微信公众号链接:自动打"阅读"标签,添加到links字段
- 飞书文档/知识库链接:自动打"文档"标签,添加到links字段,type设为"feishu_wiki"
- 其他链接:自动识别并添加到links字段,type设为"url"
- 链接标题自动识别:飞书链接自动获取文档标题,其他链接使用网页标题或默认"相关链接"
3. **多待办识别**:如果一条消息包含多个待办(用换行、分号、数字序号分隔),逐一拆分为独立待办项。
4. **优先级识别**:自动识别待办优先级关键词:
- 高优先级:"紧急"、"重要"、"马上要"、"立刻" → 🔴
- 中优先级:默认 → 🟡
- 低优先级:"不急"、"有空再看"、"慢慢做" → 🟢
5. **日历同步**:
- 带时间的待办自动同步到飞书日历(可配置开关)
- 添加待办时自动检查飞书日历时间冲突
- 检测到冲突时提示:"⚠️ 该时间段已有会议,是否要安排到其他时间?"
## 边界处理
- 如果用户发送的消息不是待办,不做任何操作,正常回复
- 如果指令格式错误,友好提示正确用法:
```
⚠️ 指令格式错误,支持的指令:
/todo - 查看未完成待办
/todo all - 查看所有待办
/todo done [序号] - 标记完成(支持批量)
/todo del [序号] - 删除待办(支持批量)
/todo clear - 清空已完成
/todo remind [序号] [时间] - 设置提醒
/todo priority [序号] [高/中/低] - 设置优先级
/todo tag [序号] [标签] - 添加标签
/todo filter [标签] - 按标签筛选
/todo stats - 查看统计
```
- 如果指定的序号不存在,提示:"⚠️ 未找到序号为 [x] 的待办"
- 如果时间解析失败,提示:"⚠️ 无法识别时间格式,请使用更明确的时间表述"
FILE:_meta.json
{
"ownerId": "kn70s7x8s79zj70xycct4rkwhn83hbee",
"slug": "feishu-omni-todo",
"version": "1.0.0",
"publishedAt": 1777260077334
}
FILE:evals/evals.json
{
"skill_name": "feishu-todo-manager",
"evals": [
{
"id": 1,
"prompt": "明天下午3点参加项目评审会",
"expected_output": "自动识别为待办并保存,回复确认消息",
"files": []
},
{
"id": 2,
"prompt": "https://mp.weixin.qq.com/s/xxx 下周看完这篇文章",
"expected_output": "识别为带链接的待办,保存链接和内容",
"files": []
},
{
"id": 3,
"prompt": "/todo",
"expected_output": "列出所有未完成的待办,按时间排序",
"files": []
},
{
"id": 4,
"prompt": "/todo done 1",
"expected_output": "标记ID为1的待办为已完成",
"files": []
},
{
"id": 5,
"prompt": "今天要写周报,还要整理会议纪要,明天提交",
"expected_output": "识别为两个独立待办,分别保存",
"files": []
}
]
}
FILE:package.json
{"name": "feishu-omni-todo", "version": "1.0.1", "description": "飞书全场景Todo管理器,深度整合飞书生态,自动识别待办、同步日历、支持链接管理"}
FILE:scripts/feishu_calendar.py
#!/usr/bin/env python3
"""
飞书日历集成模块
"""
import os
import json
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional
# 飞书API配置
FEISHU_API_URL = "https://open.feishu.cn/open-apis"
APP_ID = os.getenv("FEISHU_APP_ID", "")
APP_SECRET = os.getenv("FEISHU_APP_SECRET", "")
USER_ID = "ou_fd95eeaa259733145362ac2207654aaf"
class FeishuCalendar:
def __init__(self):
self.access_token = self._get_access_token()
def _get_access_token(self) -> str:
"""获取飞书API访问令牌"""
if not APP_ID or not APP_SECRET:
return ""
url = f"{FEISHU_API_URL}/auth/v3/tenant_access_token/internal/"
data = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
response = requests.post(url, json=data)
if response.status_code == 200:
result = response.json()
return result.get("tenant_access_token", "")
return ""
def create_event(self, title: str, start_time: datetime, end_time: datetime, description: str = "") -> Optional[str]:
"""创建日历事件"""
if not self.access_token:
return None
url = f"{FEISHU_API_URL}/calendar/v4/calendars/primary/events"
headers = {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
data = {
"summary": title,
"description": description,
"start_time": {
"timestamp": int(start_time.timestamp()),
"timezone": "Asia/Shanghai"
},
"end_time": {
"timestamp": int(end_time.timestamp()),
"timezone": "Asia/Shanghai"
},
"attendees": [
{
"user_id": USER_ID,
"type": "user"
}
],
"reminders": [
{
"minutes": 30,
"method": "notification"
},
{
"minutes": 5,
"method": "notification"
}
]
}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
result = response.json()
return result.get("data", {}).get("event", {}).get("event_id")
return None
def list_upcoming_events(self, days: int = 7) -> List[Dict]:
"""获取未来几天的日历事件"""
if not self.access_token:
return []
url = f"{FEISHU_API_URL}/calendar/v4/calendars/primary/events"
headers = {
"Authorization": f"Bearer {self.access_token}"
}
start_time = datetime.now()
end_time = start_time + timedelta(days=days)
params = {
"start_time": int(start_time.timestamp()),
"end_time": int(end_time.timestamp()),
"user_id_type": "open_id"
}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
result = response.json()
return result.get("data", {}).get("items", [])
return []
def check_time_available(self, check_time: datetime, duration: int = 60) -> bool:
"""检查指定时间段是否空闲(duration单位:分钟)"""
events = self.list_upcoming_events(days=7)
check_start = check_time
check_end = check_time + timedelta(minutes=duration)
for event in events:
event_start = datetime.fromtimestamp(int(event["start_time"]["timestamp"]))
event_end = datetime.fromtimestamp(int(event["end_time"]["timestamp"]))
# 检查时间重叠
if (check_start < event_end) and (check_end > event_start):
return False
return True
def suggest_available_time(self, preferred_hour: int = 14, days: int = 7) -> Optional[datetime]:
"""推荐可用的时间段,默认优先下午2点"""
for day_offset in range(days):
check_date = datetime.now() + timedelta(days=day_offset)
check_time = datetime(check_date.year, check_date.month, check_date.day, preferred_hour, 0, 0)
if check_time > datetime.now() and self.check_time_available(check_time):
return check_time
# 如果下午2点都满了,找其他时间
for day_offset in range(days):
check_date = datetime.now() + timedelta(days=day_offset)
for hour in range(9, 18):
check_time = datetime(check_date.year, check_date.month, check_date.day, hour, 0, 0)
if check_time > datetime.now() and self.check_time_available(check_time):
return check_time
return None
# 全局实例
calendar = FeishuCalendar()
def sync_todo_to_calendar(todo_content: str, due_time: datetime) -> Optional[str]:
"""同步待办到飞书日历"""
# 默认事件时长1小时
end_time = due_time + timedelta(hours=1)
return calendar.create_event(
title=f"待办:{todo_content[:50]}",
start_time=due_time,
end_time=end_time,
description=f"来自Omni-Todo的待办事项:\n{todo_content}"
)
def get_calendar_events(days: int = 7) -> List[Dict]:
"""获取日历事件"""
events = calendar.list_upcoming_events(days)
formatted_events = []
for event in events:
formatted_events.append({
"title": event.get("summary", ""),
"start_time": datetime.fromtimestamp(int(event["start_time"]["timestamp"])),
"end_time": datetime.fromtimestamp(int(event["end_time"]["timestamp"])),
"location": event.get("location", ""),
"status": event.get("status", "")
})
return formatted_events
FILE:scripts/reminder_check.py
#!/usr/bin/env python3
"""
待办提醒检查脚本,每分钟运行一次,检查即将到期的待办并发送飞书提醒
"""
import os
import sys
import json
from datetime import datetime, timedelta
from todo_utils import load_todos, list_todos
# 飞书消息发送工具
FEISHU_SCRIPT = os.path.expanduser("~/.openclaw/skills/feishu-todo-manager/scripts/send_feishu_message.py")
REMINDER_LOG = os.path.expanduser("~/.openclaw/workspace/reminder_log.json")
def load_reminder_log() -> Dict:
"""加载已发送的提醒日志"""
if not os.path.exists(REMINDER_LOG):
return {"sent_reminders": {}}
with open(REMINDER_LOG, "r", encoding="utf-8") as f:
return json.load(f)
def save_reminder_log(data: Dict):
"""保存提醒日志"""
with open(REMINDER_LOG, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def has_reminded(todo_id: int, remind_type: str) -> bool:
"""检查是否已经发送过该类型的提醒"""
log = load_reminder_log()
todo_key = str(todo_id)
return todo_key in log["sent_reminders"] and remind_type in log["sent_reminders"][todo_key]
def mark_reminded(todo_id: int, remind_type: str):
"""标记已发送提醒"""
log = load_reminder_log()
todo_key = str(todo_id)
if todo_key not in log["sent_reminders"]:
log["sent_reminders"][todo_key] = []
log["sent_reminders"][todo_key].append(remind_type)
save_reminder_log(log)
def send_feishu_message(content: str) -> bool:
"""发送飞书消息给用户"""
# 这里调用飞书消息发送API
# 暂时使用系统消息通知,后续替换为实际飞书API调用
print(f"[提醒] {content}")
# 调用message工具发送飞书消息
try:
import subprocess
cmd = [
"openclaw", "message", "send",
"--channel", "feishu",
"--to", "ou_fd95eeaa259733145362ac2207654aaf",
"--message", f"⏰ 待办提醒:\n{content}"
]
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
except Exception as e:
print(f"发送消息失败: {e}")
return False
def check_reminders():
"""检查待办提醒"""
now = datetime.now()
todos = list_todos()
for todo in todos:
if not todo.get("due_time"):
continue
try:
due_time = datetime.fromisoformat(todo["due_time"])
time_diff = due_time - now
# 提前30分钟提醒
if timedelta(minutes=25) <= time_diff <= timedelta(minutes=35):
if not has_reminded(todo["id"], "30min"):
msg = f"即将到期:{todo['content']}\n⏰ 截止时间:{due_time.strftime('%Y-%m-%d %H:%M')}"
send_feishu_message(msg)
mark_reminded(todo["id"], "30min")
# 提前5分钟提醒
elif timedelta(minutes=0) <= time_diff <= timedelta(minutes=10):
if not has_reminded(todo["id"], "5min"):
msg = f"马上到期!:{todo['content']}\n⏰ 截止时间:{due_time.strftime('%Y-%m-%d %H:%M')}"
send_feishu_message(msg)
mark_reminded(todo["id"], "5min")
# 已过期提醒
elif time_diff < timedelta(minutes=0) and abs(time_diff) < timedelta(hours=1):
if not has_reminded(todo["id"], "overdue"):
msg = f"⚠️ 已逾期:{todo['content']}\n⏰ 截止时间:{due_time.strftime('%Y-%m-%d %H:%M')}"
send_feishu_message(msg)
mark_reminded(todo["id"], "overdue")
except Exception as e:
print(f"处理待办 {todo['id']} 出错: {e}")
continue
if __name__ == "__main__":
check_reminders()
FILE:scripts/send_feishu_message.py
#!/usr/bin/env python3
"""
发送飞书消息工具
"""
import sys
import json
import requests
def send_feishu_message(user_id: str, content: str) -> bool:
"""发送飞书消息给指定用户"""
# 这里需要配置飞书机器人的webhook或者API调用
# 暂时使用模拟实现,后续替换为实际API
# 示例:使用飞书自定义机器人webhook
# webhook_url = "https://open.feishu.cn/open-apis/bot/v2/hook/xxx"
# headers = {"Content-Type": "application/json"}
# data = {
# "msg_type": "text",
# "content": {
# "text": content
# }
# }
# response = requests.post(webhook_url, headers=headers, json=data)
# return response.status_code == 200
print(f"发送飞书消息给 {user_id}: {content}")
return True
if __name__ == "__main__":
if len(sys.argv) < 3:
print("用法: python send_feishu_message.py <user_id> <content>")
sys.exit(1)
user_id = sys.argv[1]
content = " ".join(sys.argv[2:])
success = send_feishu_message(user_id, content)
sys.exit(0 if success else 1)
FILE:scripts/setup_cron.py
#!/usr/bin/env python3
"""
设置定时任务
"""
import sys
from crontab import CronTab
import os
SCRIPT_PATH = os.path.abspath(__file__)
REMINDER_SCRIPT = os.path.join(os.path.dirname(SCRIPT_PATH), "reminder_check.py")
PYTHON_PATH = sys.executable
def setup_cron():
"""设置定时任务"""
cron = CronTab(user=True)
# 检查是否已经存在该任务
job_exists = False
for job in cron:
if REMINDER_SCRIPT in str(job.command):
job_exists = True
break
if not job_exists:
# 创建新任务:每分钟运行一次
job = cron.new(command=f"{PYTHON_PATH} {REMINDER_SCRIPT} >> /tmp/todo_reminder.log 2>&1", comment="飞书Todo提醒")
job.minute.every(1)
cron.write()
print("✅ 定时提醒任务已设置,每分钟检查一次待办提醒")
else:
print("ℹ️ 定时提醒任务已存在")
# 列出所有任务
print("\n当前定时任务:")
for job in cron:
print(job)
if __name__ == "__main__":
import sys
setup_cron()
FILE:scripts/todo_utils.py
#!/usr/bin/env python3
import json
import os
import dateparser
from datetime import datetime, timedelta
from typing import List, Dict, Optional
TODO_FILE = os.path.expanduser("~/.openclaw/workspace/todo.json")
def load_todos() -> Dict:
"""加载Todo数据"""
if not os.path.exists(TODO_FILE):
return {"todos": []}
with open(TODO_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def save_todos(data: Dict):
"""保存Todo数据"""
with open(TODO_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def parse_time(time_str: str) -> Optional[str]:
"""解析自然语言时间,返回ISO格式字符串"""
if not time_str:
return None
now = datetime.now()
time_str = time_str.strip().lower()
# 处理简单时间格式
if '今晚' in time_str or '今天晚上' in time_str:
time_part = time_str.replace('今晚', '').replace('今天晚上', '').strip()
if not time_part:
time_part = '23:59:59'
dt = datetime(now.year, now.month, now.day, 23, 59, 59)
if ':' in time_part:
h, m = map(int, time_part.split(':'))
dt = datetime(now.year, now.month, now.day, h, m)
return dt.isoformat()
if '明晚' in time_str or '明天晚上' in time_str:
time_part = time_str.replace('明晚', '').replace('明天晚上', '').strip()
tomorrow = now + timedelta(days=1)
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 23, 59, 59)
if ':' in time_part:
h, m = map(int, time_part.split(':'))
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, h, m)
return dt.isoformat()
if '明天' in time_str:
time_part = time_str.replace('明天', '').strip()
tomorrow = now + timedelta(days=1)
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 23, 59, 59)
if '下午' in time_part:
time_part = time_part.replace('下午', '').strip()
if ':' in time_part:
h, m = map(int, time_part.split(':'))
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, h+12, m)
elif '上午' in time_part:
time_part = time_part.replace('上午', '').strip()
if ':' in time_part:
h, m = map(int, time_part.split(':'))
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, h, m)
elif ':' in time_part:
h, m = map(int, time_part.split(':'))
dt = datetime(tomorrow.year, tomorrow.month, tomorrow.day, h, m)
return dt.isoformat()
if '周三前' in time_str:
# 计算下周三
days_ahead = 2 - now.weekday()
if days_ahead <= 0: # 今天已经是周三或之后
days_ahead += 7
next_wed = now + timedelta(days=days_ahead)
return datetime(next_wed.year, next_wed.month, next_wed.day, 23, 59, 59).isoformat()
if '周五前' in time_str:
# 计算下周五
days_ahead = 4 - now.weekday()
if days_ahead <= 0: # 今天已经是周五或之后
days_ahead += 7
next_fri = now + timedelta(days=days_ahead)
return datetime(next_fri.year, next_fri.month, next_fri.day, 23, 59, 59).isoformat()
# 尝试ISO格式解析
try:
dt = datetime.fromisoformat(time_str)
return dt.isoformat()
except:
pass
# 尝试dateparser
settings = {
'PREFER_DAY_OF_MONTH': 'first',
'PREFER_DATES_FROM': 'future',
'TIMEZONE': 'Asia/Shanghai',
'RETURN_AS_TIMEZONE_AWARE': False
}
dt = dateparser.parse(time_str, languages=['zh', 'en'], settings=settings)
if dt:
return dt.isoformat()
return None
def add_todo(content: str, due_time: Optional[str] = None, source: str = "") -> int:
"""添加新待办,返回新待办的ID"""
data = load_todos()
todos = data["todos"]
# 解析时间
parsed_due_time = None
if due_time:
# 尝试解析自然语言时间
parsed = parse_time(due_time)
if parsed:
parsed_due_time = parsed
else:
# 尝试直接解析ISO格式
try:
dt = datetime.fromisoformat(due_time)
parsed_due_time = due_time
except:
pass
# 生成新ID
new_id = max([todo["id"] for todo in todos], default=0) + 1
new_todo = {
"id": new_id,
"content": content,
"created_at": datetime.now().isoformat(),
"due_time": parsed_due_time,
"status": "pending",
"source": source,
"tags": [],
"priority": "medium"
}
todos.append(new_todo)
save_todos(data)
return new_id
def list_todos(include_completed: bool = False) -> List[Dict]:
"""列出待办,默认只显示未完成的"""
data = load_todos()
todos = data["todos"]
if not include_completed:
todos = [todo for todo in todos if todo["status"] == "pending"]
# 按截止时间排序,没有截止时间的排在后面
def sort_key(todo):
if todo["due_time"]:
try:
return datetime.fromisoformat(todo["due_time"])
except:
return datetime.max
return datetime.max
todos.sort(key=sort_key)
return todos
def mark_done(todo_id: int) -> Optional[Dict]:
"""标记待办为完成,返回被修改的待办"""
data = load_todos()
for todo in data["todos"]:
if todo["id"] == todo_id:
todo["status"] = "done"
save_todos(data)
return todo
return None
def delete_todo(todo_id: int) -> Optional[Dict]:
"""删除待办,返回被删除的待办"""
data = load_todos()
for i, todo in enumerate(data["todos"]):
if todo["id"] == todo_id:
deleted = data["todos"].pop(i)
save_todos(data)
return deleted
return None
def clear_completed() -> int:
"""清空已完成的待办,返回删除的数量"""
data = load_todos()
original_count = len(data["todos"])
data["todos"] = [todo for todo in data["todos"] if todo["status"] != "done"]
save_todos(data)
return original_count - len(data["todos"])
def set_reminder(todo_id: int, remind_time: str) -> Optional[Dict]:
"""设置提醒时间,支持自然语言"""
data = load_todos()
parsed_time = parse_time(remind_time)
if not parsed_time:
return None
for todo in data["todos"]:
if todo["id"] == todo_id:
todo["due_time"] = parsed_time
save_todos(data)
return todo
return None
def get_todos_with_display_order(include_completed: bool = False) -> tuple[List[Dict], Dict]:
"""获取带显示序号的待办列表,返回(列表, 序号到ID的映射)"""
todos = list_todos(include_completed)
display_map = {}
for idx, todo in enumerate(todos, 1):
display_map[idx] = todo["id"]
todo["display_id"] = idx
return todos, display_map
def get_id_by_display_number(display_num: int, include_completed: bool = False) -> Optional[int]:
"""根据显示序号获取真实ID"""
_, display_map = get_todos_with_display_order(include_completed)
return display_map.get(display_num)
def set_priority(todo_id: int, priority: str) -> Optional[Dict]:
"""设置待办优先级:high/medium/low"""
if priority not in ["high", "medium", "low"]:
return None
data = load_todos()
for todo in data["todos"]:
if todo["id"] == todo_id:
todo["priority"] = priority
save_todos(data)
return todo
return None
def add_tag(todo_id: int, tag: str) -> Optional[Dict]:
"""为待办添加标签"""
data = load_todos()
for todo in data["todos"]:
if todo["id"] == todo_id:
if tag not in todo["tags"]:
todo["tags"].append(tag)
save_todos(data)
return todo
return None
def filter_by_tag(tag: str) -> List[Dict]:
"""按标签筛选待办"""
todos = list_todos()
return [todo for todo in todos if tag in todo.get("tags", [])]
def sync_todo_to_calendar(todo_id: int) -> Optional[str]:
"""同步待办到飞书日历"""
todo = get_todo_by_id(todo_id)
if not todo or not todo.get("due_time"):
return None
try:
from feishu_calendar import sync_todo_to_calendar
due_time = datetime.fromisoformat(todo["due_time"])
event_id = sync_todo_to_calendar(todo["content"], due_time)
if event_id:
# 保存日历事件ID到待办
data = load_todos()
for t in data["todos"]:
if t["id"] == todo_id:
t["calendar_event_id"] = event_id
save_todos(data)
break
return event_id
except Exception as e:
print(f"同步日历失败: {e}")
return None
def get_calendar_agenda(days: int = 7) -> List[Dict]:
"""获取日历日程和待办的合并视图"""
try:
from feishu_calendar import get_calendar_events
calendar_events = get_calendar_events(days)
except Exception as e:
print(f"获取日历事件失败: {e}")
calendar_events = []
# 获取待办
todos = list_todos()
all_events = []
# 添加日历事件
for event in calendar_events:
all_events.append({
"type": "calendar",
"title": event["title"],
"start_time": event["start_time"],
"end_time": event["end_time"],
"location": event.get("location", ""),
"priority": "medium"
})
# 添加待办事件
for todo in todos:
if todo.get("due_time"):
try:
due_time = datetime.fromisoformat(todo["due_time"])
if due_time <= datetime.now() + timedelta(days=days):
all_events.append({
"type": "todo",
"title": f"[待办] {todo['content']}",
"start_time": due_time,
"end_time": due_time + timedelta(hours=1),
"priority": todo.get("priority", "medium"),
"todo_id": todo["id"]
})
except:
pass
# 按时间排序
all_events.sort(key=lambda x: x["start_time"])
return all_events
def get_todo_by_id(todo_id: int) -> Optional[Dict]:
"""根据ID获取待办"""
data = load_todos()
for todo in data["todos"]:
if todo["id"] == todo_id:
return todo
return None
if __name__ == "__main__":
# 测试用
import sys
if len(sys.argv) > 1:
if sys.argv[1] == "add":
content = " ".join(sys.argv[2:])
todo_id = add_todo(content)
print(f"Added todo #{todo_id}: {content}")
elif sys.argv[1] == "list":
todos = list_todos()
for todo in todos:
status = "✓" if todo["status"] == "done" else " "
print(f"[{status}] #{todo['id']}: {todo['content']}")
Estimate LLM inference performance metrics including TTFT, decode speed, and VRAM requirements based on model architecture, GPU specs, and quantization format.
---
name: llm-perf-estimator
description: Estimate LLM inference performance metrics including TTFT, decode speed, and VRAM requirements based on model architecture, GPU specs, and quantization format.
argument-hint: "[model_name_or_config_path] [gpu_name] [input_tokens] [output_tokens] [quant]"
user-invocable: true
---
# LLM Inference Performance Estimator
Estimate **TTFT (Time To First Token)**, **decode speed (tokens/s)**, and **VRAM usage** for a given LLM on a specific GPU.
## How to Use
The user may invoke this skill in several ways:
1. **Named model**: `/llm-perf-estimator Qwen2.5-7B RTX4090 2048 512 fp16`
2. **With config file**: `/llm-perf-estimator config.json RTX4090 2048 512 int4`
3. **Interactive**: `/llm-perf-estimator` — ask the user step by step
Arguments (all optional, prompt for missing ones):
- `model` — model name from preset list, or path to a HuggingFace `config.json`
- `gpu` — GPU name from preset list, or custom specs
- `input_tokens` — prefill sequence length (default: 1024)
- `output_tokens` — number of tokens to generate (default: 256)
- `quant` — quantization format: `fp16`, `bf16`, `fp8`, `int8`, `int4` (default: `fp16`)
---
## Step 1 — Resolve Model Architecture
### Preset Models
If the user provides a known model name, use the following presets:
| Model | Type | Total Params | Activated Params | Layers | Hidden | Heads (Q) | Heads (KV) | FFN Type | Intermediate | Vocab |
|---|---|---|---|---|---|---|---|---|---|---|
| **Qwen3.5-4B** | Hybrid Dense | 4B | 4B | 32 (8 full+24 linear) | 2560 | 16 (full) / 16 (linear) | 4 (full) | SwiGLU | 9216 | 248320 |
| **Qwen3.5-35B-A3B** | Hybrid MoE | 35B | 3B | 40 (10 full+30 linear) | 2048 | 16 (full) / 16 (linear) | 2 (full) | SwiGLU+MoE | 8×512 per tok | 248320 |
If the model is not in the preset list and no config file is provided, ask the user to provide a `config.json`. They can get it without downloading the full model:
```
# ModelScope (browser)
https://modelscope.cn/models/{org}/{model}/file/view/master/config.json
# HuggingFace (browser)
https://huggingface.co/{org}/{model}/blob/main/config.json
```
Open the URL, copy the content, and paste it directly into the conversation. Alternatively, provide the local file path if the model is already downloaded.
If the user cannot provide a config, ask them to manually input:
- `num_hidden_layers`, `hidden_size`, `num_attention_heads`, `num_key_value_heads`
- `intermediate_size`, `vocab_size`
- For MoE: `num_experts`, `num_experts_per_tok`, `moe_intermediate_size`
### Parsing config.json
If the user provides a `config.json` path, read the file and extract:
```
num_hidden_layers, hidden_size, num_attention_heads, num_key_value_heads,
intermediate_size, vocab_size, model_type,
# MoE fields (if present):
num_experts / num_local_experts, num_experts_per_tok, moe_intermediate_size
# Hybrid attention (if present):
layer_types ← list of strings, e.g. ["linear_attention", ..., "full_attention", ...]
head_dim ← if explicitly provided, use it; otherwise head_dim = hidden_size / num_attention_heads
```
**Determine `num_full_attn_layers`**:
- If `layer_types` exists: `num_full_attn_layers = count of "full_attention" in layer_types`
- If `layer_types` is absent (standard transformer): `num_full_attn_layers = num_hidden_layers`
**Note on nested configs** (e.g. Qwen3.5-35B-A3B has a `text_config` wrapper):
- If the top-level JSON has a `text_config` key, read all text model fields from inside it.
- `head_dim` may be explicitly set (e.g. `256`); prefer that over computing from `hidden_size / num_attention_heads`.
**Note on `tie_word_embeddings`**: if `true`, the embedding table and lm_head share the same weights. Do not count them twice in VRAM — the embedding contributes `vocab_size × hidden_size × bytes_per_param` only once.
**Note on `attn_output_gate`**: recognized but ignored in calculations — its contribution to FLOPs and VRAM is <1% and within the MFU uncertainty margin.
---
## Step 2 — Resolve GPU Specs
### Preset GPUs
| GPU | VRAM (GB) | BF16 TFLOPS | FP8 TFLOPS | INT8 TOPS | HBM BW (GB/s) |
|---|---|---|---|---|---|
| RTX 4060 | 8 | 15.1 | — | 30.2 | 272 |
| RTX 4060 Ti | 16 | 22.1 | — | 44.2 | 288 |
| RTX 4070 | 12 | 29.1 | — | 58.2 | 504 |
| RTX 4070 Ti | 12 | 40.1 | — | 80.2 | 504 |
| RTX 4070 Ti Super | 16 | 40.1 | — | 80.2 | 672 |
| RTX 4080 | 16 | 48.7 | — | 97.4 | 717 |
| RTX 4080 Super | 16 | 52.2 | — | 104.4 | 736 |
| RTX 4090 | 24 | 82.6 | — | 165.2 | 1008 |
| RTX 5070 Ti | 16 | 176.0 | 352.0 | 352.0 | 896 |
| RTX 5080 | 16 | 225.0 | 450.0 | 450.0 | 960 |
| RTX 5090 | 32 | 419.0 | 838.0 | 838.0 | 1792 |
| A10G | 24 | 31.2 | — | 62.5 | 600 |
| A100-40G | 40 | 77.97 | — | 311.9 | 1555 |
| A100-80G | 80 | 77.97 | — | 311.9 | 2000 |
| H100-SXM | 80 | 989.4 | 1978.9 | 3958.0 | 3350 |
| H100-PCIe | 80 | 756.0 | 1513.0 | 3026.0 | 2000 |
| H200-SXM | 141 | 989.4 | 1978.9 | 3958.0 | 4800 |
| L4 | 24 | 30.3 | 60.6 | 121.2 | 300 |
| L40S | 48 | 91.6 | 183.2 | 366.4 | 864 |
| MI300X | 192 | 1307.4 | 2614.9 | 5229.8 | 5300 |
| Apple M4 (16GB) | 16 | 4.6 | — | — | 120 |
| Apple M4 Pro (48GB) | 48 | 9.2 | — | — | 273 |
| Apple M4 Max (128GB) | 128 | 18.4 | — | — | 546 |
If the GPU is not listed, ask the user to provide:
- VRAM (GB)
- BF16/FP16 TFLOPS
- HBM bandwidth (GB/s)
---
## Step 3 — Quantization Bytes Per Parameter
| Format | Bytes/param | Compute dtype | Notes |
|---|---|---|---|
| fp32 | 4.0 | fp32 | Rarely used for inference |
| bf16 / fp16 | 2.0 | bf16/fp16 | Baseline |
| fp8 | 1.0 | fp8 | Requires H100/H200/RTX50xx |
| int8 | 1.0 | int8 | W8A8 or W8A16 |
| int4 | 0.5 | int4/fp16 | GPTQ/AWQ/bitsandbytes |
Select the GPU TFLOPS column matching the compute dtype:
- fp16/bf16 → BF16 TFLOPS
- fp8 → FP8 TFLOPS (fall back to BF16 if not supported, with a warning)
- int8 → INT8 TOPS
- int4 → BF16 TFLOPS (dequant to fp16 for matmul in most frameworks)
---
## Step 4 — Compute VRAM Requirements
### 4.1 Weight Memory
```
weight_bytes = total_params × bytes_per_param
weight_GB = weight_bytes / 1e9
```
For MoE models, `total_params` includes all expert weights (not just activated).
### 4.2 KV Cache Memory
Only **full attention layers** maintain a KV cache. Linear attention layers use a fixed-size recurrent state (negligible, ~tens of MB) that does not grow with sequence length.
```
kv_heads = num_key_value_heads # from full attention config
kv_bytes_per_token = 2 × num_full_attn_layers × kv_heads × head_dim × bytes_per_param
kv_cache_GB = kv_bytes_per_token × (input_tokens + output_tokens) / 1e9
```
If `num_full_attn_layers = num_hidden_layers` (standard transformer), this reduces to the standard formula.
### 4.3 Activation Memory (prefill peak)
```
activation_GB ≈ num_layers × hidden_size × input_tokens × bytes_per_param × 2 / 1e9
```
This is an approximation; actual peak depends on framework and attention implementation.
### 4.4 Total VRAM
```
total_VRAM_GB = weight_GB + kv_cache_GB + activation_GB
```
Add a **15% overhead** for framework buffers, CUDA context, etc.:
```
total_VRAM_GB_with_overhead = total_VRAM_GB × 1.15
```
---
## Step 5 — Estimate TTFT (Prefill Latency)
Prefill is **compute-bound** for long sequences.
### 5.1 Attention FLOPs (prefill)
Only **full attention layers** have O(n²) attention compute. Linear attention layers are O(n) and their attention FLOPs are already captured in the projection FLOPs (Step 5.3).
```
attn_flops = 4 × num_full_attn_layers × input_tokens² × hidden_size
```
(factor of 4 = QK matmul + softmax + AV matmul, forward pass)
If `num_full_attn_layers = num_hidden_layers`, this is the standard transformer formula.
### 5.2 FFN FLOPs (prefill)
For SwiGLU/GeGLU (3 projections: gate, up, down):
```
ffn_flops = 3 × 2 × num_layers × input_tokens × hidden_size × intermediate_size
```
For MoE, replace `intermediate_size` with `num_experts_per_tok × moe_intermediate_size`.
### 5.3 QKV + Output Projection FLOPs
For **full attention layers** (standard QKV projections):
```
full_proj_flops = 2 × num_full_attn_layers × input_tokens × hidden_size
× (num_attention_heads × head_dim + 2 × kv_heads × head_dim + hidden_size)
```
For **linear attention layers** (also have Q/K/V-equivalent projections, but different dims):
```
linear_proj_flops = 2 × num_linear_attn_layers × input_tokens × hidden_size
× (linear_num_key_heads × linear_key_head_dim
+ linear_num_key_heads × linear_key_head_dim
+ linear_num_value_heads × linear_value_head_dim
+ hidden_size)
```
If `layer_types` is absent (standard transformer), only `full_proj_flops` applies and `num_linear_attn_layers = 0`.
### 5.4 Total Prefill FLOPs
```
total_prefill_flops = attn_flops + ffn_flops + full_proj_flops + linear_proj_flops
```
### 5.5 TTFT
Apply **MFU (Model FLOP Utilization)** efficiency factor:
| Scenario | MFU |
|---|---|
| Long prompt (>512 tokens), data center GPU | 0.45 |
| Long prompt, consumer GPU | 0.35 |
| Short prompt (<128 tokens) | 0.25 |
```
effective_tflops = gpu_tflops × MFU
TTFT_seconds = total_prefill_flops / (effective_tflops × 1e12)
```
---
## Step 6 — Estimate Decode Speed
Decode is **memory-bandwidth-bound** at batch=1.
### 6.1 Bytes Read Per Decode Step
Each decode step reads:
- All activated model weights once
- KV cache for all previous tokens (full attention layers only; linear attention state is fixed-size and already loaded with weights)
```
activated_weight_bytes = activated_params × bytes_per_param
kv_cache_bytes_at_step = kv_bytes_per_token × (input_tokens + current_output_tokens)
bytes_per_step = activated_weight_bytes + kv_cache_bytes_at_step
```
For the average decode step, use `current_output_tokens ≈ output_tokens / 2`.
### 6.2 Decode Speed
Apply **bandwidth utilization** efficiency factor:
| Scenario | BW Utilization |
|---|---|
| Data center GPU (HBM2e/HBM3) | 0.85 |
| Consumer GPU (GDDR6X) | 0.75 |
| Apple Silicon (unified memory) | 0.80 |
```
effective_bandwidth = gpu_bandwidth_GBs × bw_utilization
decode_speed_tps = effective_bandwidth × 1e9 / bytes_per_step
```
---
## Step 7 — Output Report
Present results as a Markdown report with the following sections:
### Section 1: Configuration Summary
| Parameter | Value |
|---|---|
| Model | {model_name} |
| Type | Dense / MoE / Hybrid MoE |
| Total Params | {X}B |
| Activated Params | {X}B |
| Total Layers | {N} |
| Full Attention Layers | {N} ({N} linear attention) |
| GPU | {gpu_name} |
| VRAM Available | {X} GB |
| Quantization | {quant} |
| Input Tokens | {N} |
| Output Tokens | {N} |
### Section 2: VRAM Breakdown
| Component | Size (GB) |
|---|---|
| Model Weights | {X} |
| KV Cache | {X} |
| Activations (peak) | {X} |
| Framework Overhead (15%) | {X} |
| **Total Required** | **{X}** |
| GPU Available | {X} |
| **Fits in VRAM?** | ✅ Yes / ❌ No |
If it doesn't fit, suggest:
- A lower quantization format
- Offloading options (CPU offload, disk offload)
### Section 3: Performance Estimates
| Metric | Estimate |
|---|---|
| TTFT (Time to First Token) | {X} ms |
| Decode Speed | {X} tokens/s |
| Time to Generate {N} tokens | {X} s |
| Total End-to-End Latency | {X} s |
### Section 4: Assumptions & Caveats
List the MFU and bandwidth utilization values used, and note:
- Estimates assume batch_size=1, single GPU
- Actual performance varies by framework (vLLM, llama.cpp, Ollama, etc.)
- FlashAttention / FlashAttention-2 is assumed for prefill
- KV cache quantization not considered
- Speculative decoding not considered
---
## Notes for the Agent
- Always show intermediate calculations in a collapsible section or footnote if the user asks "how did you calculate this"
- If VRAM is insufficient, proactively suggest the minimum quantization that would fit
- If the user provides a `config.json`, confirm the parsed values before computing
- Round all results to 2 significant figures for readability
- For MoE models, clearly distinguish total vs activated parameters in all calculations
FILE:README.md
# llm-perf-estimator
A Claude Code skill that estimates LLM inference performance metrics based on model architecture, GPU specs, and quantization format.
## What it estimates
- **TTFT** — Time to First Token (prefill latency)
- **Decode speed** — tokens/second
- **VRAM usage** — weights + KV cache + activations + overhead
## Usage
```
/llm-perf-estimator [model] [gpu] [input_tokens] [output_tokens] [quant]
```
All arguments are optional — the skill will prompt for anything missing.
**Examples:**
```
/llm-perf-estimator Qwen3.5-4B RTX4090 2048 512 int4
/llm-perf-estimator config.json H100-SXM 4096 1024 fp8
/llm-perf-estimator
```
## Supported inputs
**Models (presets):**
- Qwen3.5-4B (Hybrid Dense, calibrated from official config)
- Qwen3.5-35B-A3B (Hybrid MoE, calibrated from official config)
For any other model, provide a `config.json` from ModelScope or HuggingFace — no need to download the full model weights.
**GPUs:** RTX 40/50 series, A100, H100, H200, L4, L40S, MI300X, Apple M4 series. Custom specs also accepted.
**Quantization:** `fp32`, `fp16`, `bf16`, `fp8`, `int8`, `int4`
## Architecture support
The skill handles:
- Standard dense transformers
- MoE (Mixture of Experts)
- Hybrid attention (linear + full attention layers, e.g. Qwen3.5 series)
For hybrid models, KV cache and O(n²) attention FLOPs are computed only for full attention layers. Linear attention layers use a fixed-size recurrent state that does not grow with sequence length.
## Methodology
| Phase | Bottleneck | Formula |
|---|---|---|
| Prefill (TTFT) | Compute-bound | FLOPs / (GPU TFLOPS × MFU) |
| Decode | Bandwidth-bound | Bytes per step / (HBM BW × utilization) |
MFU and bandwidth utilization coefficients are selected based on GPU type and prompt length.
## Installation
Copy `SKILL.md` to your Claude Code skills directory:
```bash
# Personal (all projects)
cp SKILL.md ~/.claude/skills/llm-perf-estimator.md
# Project-local
cp SKILL.md .claude/skills/llm-perf-estimator.md
```
## Contributing
Preset models are intentionally limited to architectures verified against official `config.json` files. To add a new preset, please include the source config in your PR.