@clawhub-tomuiv-d6b5ae5109
自动登录清华网络学堂,查看待办事项,下载课件,提交作业,并批量标记课件已读,支持无人值守操作。
# tsinghua-learn skill
清华网络学堂(learn.tsinghua.edu.cn)自动化助手——从登录、查看待办、下载课件到提交作业,一切均可对话完成。
---
## 如果你是人类,首次使用请务必阅读以下说明
本 skill 为清华网络学堂打造全自动助手——从登录、查看待办、下载课件到提交作业,一切均可对话完成。首次使用仅需提供学号和密码,之后完全无人值守。
**它能做什么:**
- **查看待办**:自动拉取本学期所有课程未读消息(作业/公告/课件/讨论/答疑/问卷),按截止时间和课程排序显示,再也不用手动一个个点开看
- **下载课件**:指定课程名称,课件自动发送到当前对话,下载后本地临时文件精准清除,不留痕迹
- **提交作业**:直接把作业文件发给机器人,它自动按指定格式(如姓名+学号)命名并提交,省去繁琐的网页上传流程
- **批量标记已读**:所有课程的未读课件,一键全部标记为已读
**运行原理:**
首次运行需手动完成一次浏览器验证(点击"是的,我信任浏览器"),之后所有凭证和浏览器指纹会保存到本地。之后所有操作由机器人自动完成,无需重复验证。
---
## 零、驻地规范(铁律)
所有网络学堂相关脚本必须写在 `skills/tsinghua-learn/` 目录下,禁止散落到任何其他位置。
禁止位置:workspace 根目录、buffer/、其他 skill 目录。
---
## 一、文件结构
```
skills/tsinghua-learn/
├── SKILL.md ← 本文档
├── credentials.json ← 账号密码(机器人自动创建,用户无需手动操作)
│
├── scripts/
│ ├── _config.py ← 凭证和路径加载器(所有脚本共享)
│ ├── learn_api.py ← 所有 HTTP API 封装(详见文件内注释)
│ ├── login_supervised.py ← 有人值守:首次建立 Profile + 2FA
│ ├── login_auto.py ← 无人值守:日常调用,Session 失效自动续期
│ ├── todos_api.py ← 查看待办:默认版(纯 API,并行请求)
│ ├── todos_dom.py ← 查看待办:备用版(Playwright DOM)
│ ├── download_and_send_kj.py ← 下载课件 + 发送 + 精准删除
│ ├── mark_kj_read.py ← 批量标记课件已读
│ └── install_playwright.py ← Chromium 浏览器安装
│
├── sessions/
│ └── learn_session.json ← Session 文件(JSESSIONID + CSRF)
│
└── profiles/
└── learn_profile/ ← 固定浏览器 Profile(cookies 持久化)
```
---
## 二、账号配置(首次使用)
机器人会主动询问用户的学号和密码,配置好后自动写入 `credentials.json`,之后所有脚本自动读取,无需重复操作。
---
## 三、双脚本登录架构
### 3.1 有人值守(supervised)
首次配置 / Session 彻底失效时使用。弹出浏览器窗口,若触发 2FA(企业微信/短信)需人工验证。
### 3.2 无人值守(auto)— 默认
日常调用。Session 有效时直接返回(<100ms),失效时自动用固定 Profile 续期,无需人工介入。
---
## 四、降级策略
```
todos_api.py / todos_dom.py
→ Session 无效 → login_auto.py(自动续期)
→ Profile 丢失 → login_supervised.py(手动 2FA)
→ Chromium 未装 → install_playwright.py
login_auto.py
→ Session 无效 → login_supervised.py
```
---
## 五、未读/未处理判断标准
| 模块 | API 字段 | 过滤条件 |
|------|---------|---------|
| 作业 | `zt` | `zt == "未交"` → 待提交 |
| 公告 | `sfyd` | `sfyd == "否"` → 未读("是"=已读)|
| 课件 | `isNew` | `isNew == 1` → 未读 |
| 讨论/答疑 | `htsl` | `htsl > 0` → 有新回复 |
| 问卷 | — | 全局 API `pageListWks` 返回未做问卷数量 |
---
*最后更新:2026-04-26*
FILE:credentials.json
{
"username": "",
"password": ""
}
FILE:scripts/check_kj_unread.py
import json, requests, sys
sys.stdout.reconfigure(encoding='utf-8')
state = json.load(open(r'C:\Users\TOM\.openclaw\workspace\skills\tsinghua-learn\sessions\learn_session.json', encoding='utf-8'))
csrf = state['csrf']; js = state['learn_jsession']
h = {'Accept': 'application/json, */*', 'X-XSRF-TOKEN': csrf, 'Cookie': 'JSESSIONID=' + js + '; XSRF-TOKEN=' + csrf}
wlkcid = '2025-2026-2151369343'
r = requests.get(f'https://learn.tsinghua.edu.cn/b/wlxt/kj/wlkc_kjxxb/student/kjxxbByWlkcidAndSizeForStudent?wlkcid={wlkcid}&size=100&_csrf={csrf}', headers=h, timeout=15)
d = r.json()
obj = d.get('object', d)
if isinstance(obj, list):
items = obj
elif isinstance(obj, dict):
items = obj.get('aaData', [])
else:
items = []
print(f'课件总数: {len(items)}')
new_items = [x for x in items if str(x.get('isNew','')) == '1']
old_items = [x for x in items if str(x.get('isNew','')) != '1']
print(f' isNew=1(未读): {len(new_items)}')
print(f' isNew=0(已读): {len(old_items)}')
if old_items:
print(f' 已读课件示例: {old_items[0].get("bt","?")[:40]}')
if new_items:
print(f' 未读课件示例: {new_items[0].get("bt","?")[:40]}')
FILE:scripts/check_session.py
import json, requests
f = r'C:\Users\TOM\.openclaw\workspace\skills\tsinghua-learn\sessions\learn_session.json'
s = json.load(open(f, encoding='utf-8'))
print('csrf:', s['csrf'])
h = {
'Accept': 'application/json, */*',
'Referer': 'https://learn.tsinghua.edu.cn/f/wlxt/index/course/student/',
'X-XSRF-TOKEN': s['csrf'],
'Cookie': 'JSESSIONID=' + s['learn_jsession'] + '; XSRF-TOKEN=' + s['csrf'],
}
r = requests.get('https://learn.tsinghua.edu.cn/b/wlxt/kczy/zy/student/index/zyListWj?wlkcid=&size=1', headers=h, timeout=10)
print('status:', r.status_code)
print('has location.href:', 'location.href' in r.text)
print('body[:300]:', r.text[:300])
FILE:scripts/check_wj_api.py
import json, requests, sys
sys.stdout.reconfigure(encoding='utf-8')
state = json.load(open(r'C:\Users\TOM\.openclaw\workspace\skills\tsinghua-learn\sessions\learn_session.json', encoding='utf-8'))
csrf = state['csrf']; js = state['learn_jsession']
h = {'Accept': 'application/json, */*', 'X-XSRF-TOKEN': csrf, 'Cookie': 'JSESSIONID=' + js + '; XSRF-TOKEN=' + csrf}
# 测试问卷 API(POST,Form Data 格式)
# pageListWks = 未做问卷,pageListWys = 已做问卷
import urllib.parse
# 未做问卷
params = {
'aoData[0][name]': 'iDisplayStart',
'aoData[0][value]': 0,
'aoData[1][name]': 'iDisplayLength',
'aoData[1][value]': 100,
}
body = urllib.parse.urlencode(params)
r1 = requests.post(
'https://learn.tsinghua.edu.cn/b/wlxt/kcwj/wlkc_wjb/student/pageListWks?_csrf=' + csrf,
headers={**h, 'Content-Type': 'application/x-www-form-urlencoded'},
data=body, timeout=15
)
print('=== 未做问卷 ===')
print('status:', r1.status_code)
d1 = r1.json()
obj1 = d1.get('object', d1)
if isinstance(obj1, dict):
items1 = obj1.get('aaData', [])
elif isinstance(obj1, list):
items1 = obj1
else:
items1 = []
print(f'未做问卷总数: {len(items1)}')
for x in items1[:3]:
print(f' {x.get("bt","?")[:40]} | wjid={x.get("wjid","?")[:20]}')
# 已做问卷
r2 = requests.post(
'https://learn.tsinghua.edu.cn/b/wlxt/kcwj/wlkc_wjb/student/pageListWys?_csrf=' + csrf,
headers={**h, 'Content-Type': 'application/x-www-form-urlencoded'},
data=body, timeout=15
)
print('\n=== 已做问卷 ===')
print('status:', r2.status_code)
d2 = r2.json()
obj2 = d2.get('object', d2)
if isinstance(obj2, dict):
items2 = obj2.get('aaData', [])
elif isinstance(obj2, list):
items2 = obj2
else:
items2 = []
print(f'已做问卷总数: {len(items2)}')
for x in items2[:3]:
print(f' {x.get("bt","?")[:40]} | wjid={x.get("wjid","?")[:20]}')
# 也测试下对每门课程分别查问卷
print('\n=== 各课程的问卷情况 ===')
courses = {
'2025-2026-2151368648': '大学物理A(1)',
'2025-2026-2151369314': '概率论',
'2025-2026-2151369343': '英语听说交流(A)',
'2025-2026-2151368819': '写作与沟通',
'2025-2026-2151368584': '微积分A(2)',
}
for wlkcid, name in courses.items():
r = requests.get(
f'https://learn.tsinghua.edu.cn/b/wlxt/kcwj/wlkc_wjb/student/pageListWks?wlkcid={wlkcid}&size=20&_csrf={csrf}',
headers=h, timeout=10
)
try:
d = r.json()
obj = d.get('object', d)
items = (obj.get('aaData', []) if isinstance(obj, dict) else (obj if isinstance(obj, list) else []))
print(f' {name} 未做: {len(items)} 项')
except:
print(f' {name} API 无响应或格式异常')
FILE:scripts/check_wj_api2.py
import json, requests, sys
sys.stdout.reconfigure(encoding='utf-8')
state = json.load(open(r'C:\Users\TOM\.openclaw\workspace\skills\tsinghua-learn\sessions\learn_session.json', encoding='utf-8'))
csrf = state['csrf']; js = state['learn_jsession']
h = {'Accept': 'application/json, */*', 'X-XSRF-TOKEN': csrf, 'Cookie': 'JSESSIONID=' + js + '; XSRF-TOKEN=' + csrf}
# DataTables 格式的 aoData 参数
body = 'aoData=%5B%7B%22name%22%3A%22iDisplayStart%22%2C%22value%22%3A0%7D%2C%7B%22name%22%3A%22iDisplayLength%22%2C%22value%22%3A100%7D%5D'
# 未做问卷
r1 = requests.post(
'https://learn.tsinghua.edu.cn/b/wlxt/kcwj/wlkc_wjb/student/pageListWks?_csrf=' + csrf,
headers={**h, 'Content-Type': 'application/x-www-form-urlencoded'},
data=body, timeout=15
)
print('=== 未做问卷 ===')
print('status:', r1.status_code)
print('body[:200]:', r1.text[:200])
# 已做问卷
r2 = requests.post(
'https://learn.tsinghua.edu.cn/b/wlxt/kcwj/wlkc_wjb/student/pageListWys?_csrf=' + csrf,
headers={**h, 'Content-Type': 'application/x-www-form-urlencoded'},
data=body, timeout=15
)
print('\n=== 已做问卷 ===')
print('status:', r2.status_code)
print('body[:200]:', r2.text[:200])
FILE:scripts/check_wj_global.py
import json, requests, sys
sys.stdout.reconfigure(encoding='utf-8')
state = json.load(open(r'C:\Users\TOM\.openclaw\workspace\skills\tsinghua-learn\sessions\learn_session.json', encoding='utf-8'))
csrf = state['csrf']; js = state['learn_jsession']
h = {'Accept': 'application/json, */*', 'X-XSRF-TOKEN': csrf, 'Cookie': 'JSESSIONID=' + js + '; XSRF-TOKEN=' + csrf}
# 未做问卷(全局,不是按课程)
body = 'aoData=%5B%7B%22name%22%3A%22iDisplayStart%22%2C%22value%22%3A0%7D%2C%7B%22name%22%3A%22iDisplayLength%22%2C%22value%22%3A100%7D%5D'
r = requests.post(
'https://learn.tsinghua.edu.cn/b/wlxt/kcwj/wlkc_wjb/student/pageListWks?_csrf=' + csrf,
headers={**h, 'Content-Type': 'application/x-www-form-urlencoded'},
data=body, timeout=15
)
d = r.json()
obj = d.get('object', d)
items = obj.get('aaData', []) if isinstance(obj, dict) else []
print(f'未做问卷总数: {len(items)}')
for x in items:
wlkcid = x.get('wlkcid', '')
wjbt = x.get('bt', '?')
wjsj = x.get('jzsjStr', '')
print(f' [{wlkcid}] {wjbt[:40]} | 截止:{wjsj}')
print()
# 看看有没有 wlkcid 字段可以区分课程
if items:
print('字段列表:', list(items[0].keys()))
print('wlkcid 示例:', items[0].get('wlkcid'))
FILE:scripts/download_and_send_kj.py
"""
下载课件 + 自动标已读 + 发送给用户 + 精准删除
用法:python download_and_send_kj.py < COURSE_ID > < WJID > < 课程名 >
示例:python download_and_send_kj.py 2025-2026-2151368584 2005990081_KJ_xxx 微积分
"""
import json, requests, sys, os
sys.stdout.reconfigure(encoding='utf-8')
# 统一 session 路径
import os as _os
_SKILL_DIR = _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))
STATE_FILE = _os.path.join(_SKILL_DIR, "sessions", "learn_session.json")
state = json.load(open(STATE_FILE, encoding="utf-8"))
learn_j = state['learn_jsession']; csrf = state['csrf']
headers = {
'Accept': 'application/json, */*',
'Referer': 'https://learn.tsinghua.edu.cn/f/wlxt/index/course/student/',
'X-XSRF-TOKEN': csrf,
'Cookie': f'JSESSIONID={learn_j}; XSRF-TOKEN={csrf}',
}
headers_post = {
'Accept': 'application/json, */*',
'Referer': 'https://learn.tsinghua.edu.cn/f/wlxt/kj/wlkc_kjxxb/student/beforePageList',
'X-XSRF-TOKEN': csrf,
'Cookie': f'JSESSIONID={learn_j}; XSRF-TOKEN={csrf}',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
}
def mark_kj_read(wjid):
r = requests.post(
f'https://learn.tsinghua.edu.cn/b/wlxt/kj/wlkc_kjfwb/student/savePlayRecord?_csrf={csrf}',
headers=headers_post, data=f'wjid={wjid}&sfgk=0', timeout=10
)
return r.status_code == 200 and 'success' in r.text
def download_and_send(wlkcid, wjid, wjlx, save_dir):
os.makedirs(save_dir, exist_ok=True)
fname = None
# 下载
dl_url = f'https://learn.tsinghua.edu.cn/b/wlxt/kj/wlkc_kjxxb/student/downloadFile?wlkcid={wlkcid}&wjid={wjid}&sfgk=0'
r = requests.get(dl_url, headers=headers, timeout=60)
# 取文件名(从bt字段,wjlx传进来)
# 构造输出路径
out_path = os.path.join(save_dir, f'temp_kj.{wjlx}')
if r.status_code == 200 and len(r.content) > 10000:
with open(out_path, 'wb') as fp:
fp.write(r.content)
print(f'下载完成: {len(r.content):,} bytes')
# 标记已读
if mark_kj_read(wjid):
print('已标记已读')
else:
print('标记失败(继续)')
# 打印 <qqmedia> 标签(供 AI 直接回复用户)
print(f'\n<qqmedia>{out_path}</qqmedia>')
# 精准删除
if os.path.exists(out_path):
os.remove(out_path)
print(f'已删除: {out_path}')
else:
print(f'下载失败: {r.status_code}')
if __name__ == '__main__':
# 命令行参数:wlkcid wjid wjlx save_dir
if len(sys.argv) >= 5:
wlkc_id = sys.argv[1]
wjid = sys.argv[2]
wjlx = sys.argv[3]
save_dir = sys.argv[4]
download_and_send(wlkc_id, wjid, wjlx, save_dir)
else:
print('用法: python download_and_send_kj.py <wlkcid> <wjid> <wjlx> <save_dir>')
FILE:scripts/install_playwright.py
#!/usr/bin/env python3
"""
install_playwright.py
下载并安装 Playwright Chromium 浏览器
======================================
【用途】
运行任何需要 Playwright 的脚本前,先跑一次这个
如果遇到 "Executable not found" 错误,运行这个即可修复
【用法】
python install_playwright.py
"""
import sys, subprocess
def run():
print("正在安装 Playwright Chromium(只会执行一次)...")
print("这可能需要几分钟,请耐心等待...\n")
result = subprocess.run(
[sys.executable, "-m", "playwright", "install", "chromium"],
capture_output=False
)
if result.returncode == 0:
print("\n✅ Chromium 安装成功!")
print(" 现在可以运行 login_supervised.py 等脚本了。")
else:
print("\n❌ 安装失败,请检查网络连接后重试")
print(" 或手动运行: python -m playwright install chromium")
if __name__ == "__main__":
run()
FILE:scripts/learn_api.py
#!/usr/bin/env python3
"""
learn_api.py — 清华网络学堂 HTTP API 封装
无需浏览器,直接操作网络学堂 API
"""
import sys
sys.stdout.reconfigure(encoding='utf-8')
import os, json, time, ssl, base64
import requests
from urllib.parse import urlencode
# ========== 配置 ==========
SESSION_FILE = r"D:\testclaw\learn_session.json"
FINGERPRINT_FILE = r"D:\testclaw\learn_fingerprint.json"
DOWNLOAD_DIR = r"D:\testclaw\learn_downloads"
LEARN_BASE = "https://learn.tsinghua.edu.cn"
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Referer": LEARN_BASE + "/",
}
AJAX_HEADERS = {
**DEFAULT_HEADERS,
"Accept": "application/json, text/javascript, */*; q=0.01",
"X-Requested-With": "XMLHttpRequest",
}
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
def escape_filename(s):
"""转义文件名非法字符"""
for ch in [' ', '\t', '?', '/', "'", '"', '<', '>', '#', ';', '*', '|', '\\']:
s = s.replace(ch, '_')
return s
class LearnAPI:
def __init__(self, session_file=None):
self.session_file = session_file or SESSION_FILE
self.session = None
self.valid = False
self.fingerprint = None
self.session_data = None
self.cookies = {}
self.xsrf_token = None
self._ensure_download_dir()
# ====== 内部方法 ======
def _ensure_download_dir(self):
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
def _update_headers(self):
"""更新 session headers"""
if self.xsrf_token:
self.session.headers.update({"X-XSRF-TOKEN": self.xsrf_token})
def _post(self, path, data=None, use_ajax=True):
"""POST 请求"""
url = LEARN_BASE + path
headers = AJAX_HEADERS if use_ajax else DEFAULT_HEADERS
kwargs = {"headers": headers}
if data:
if isinstance(data, dict):
kwargs["data"] = urlencode(data, encoding='utf-8')
kwargs["headers"]["Content-Type"] = "application/x-www-form-urlencoded"
else:
kwargs["data"] = data
r = self.session.post(url, **kwargs, verify=False, timeout=15)
try:
return r.json()
except Exception:
return r.text
def _get(self, path, params=None, use_ajax=False):
"""GET 请求"""
url = LEARN_BASE + path
headers = AJAX_HEADERS if use_ajax else DEFAULT_HEADERS
kwargs = {"headers": headers, "params": params}
r = self.session.get(url, **kwargs, verify=False, timeout=15)
return r
def _build_url(self, path):
return LEARN_BASE + path
# ====== Session 管理 ======
def reload_session(self):
"""从文件加载 session 并验证"""
if not os.path.exists(self.session_file):
self.valid = False
return False
with open(self.session_file, 'r', encoding='utf-8') as f:
self.session_data = json.load(f)
self.cookies = self.session_data.get('cookies', {})
self.fingerprint = self.session_data.get('fingerprint', {})
self.session = requests.Session()
self.session.headers.update(DEFAULT_HEADERS)
# 设置 cookies
for name, value in self.cookies.items():
if value is None:
continue
for domain in ['.tsinghua.edu.cn', 'learn.tsinghua.edu.cn', 'id.tsinghua.edu.cn']:
self.session.cookies.set(name, value, domain=domain, path='/', secure=False)
# 设置 XSRF token
self.xsrf_token = self.cookies.get('XSRF-TOKEN') or self.cookies.get('xsrf-token')
if self.xsrf_token:
self.session.headers.update({"X-XSRF-TOKEN": self.xsrf_token})
return self._check_valid()
def _check_valid(self):
"""验证 session 是否有效"""
if not self.session:
self.valid = False
return False
try:
ts = str(int(time.time() * 1000))
url = f"{LEARN_BASE}/b/wlxt/kc/v_wlkc_xs_xkb_kcb_extend/student/loadCourseBySemesterId/2025-2026-2/zh_CN?timestamp={ts}"
r = self.session.get(url, verify=False, timeout=8,
headers={**AJAX_HEADERS, "X-XSRF-TOKEN": self.xsrf_token} if self.xsrf_token else AJAX_HEADERS)
result = r.json()
courses = result.get('resultList', [])
if courses:
self.valid = True
return True
except Exception:
pass
self.valid = False
return False
def login(self):
"""
尝试纯 API 登录。
成功返回 True;失败返回 False(需使用浏览器版脚本)。
"""
# 尝试从已有 fingerprint + session 重新认证
# 当前纯 API 登录受限于 localStorage,暂时返回 False
return False
def save_session(self):
"""保存当前 session 到文件"""
if not self.session or not self.cookies:
return
# 从 credentials.json 读取用户名(不在此处写死)
try:
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), 'scripts'))
from _config import load_credentials
username = load_credentials()[0]
except Exception:
username = 'unknown'
self.session_data = {
'username': username,
'cookies': self.cookies,
'fingerprint': self.fingerprint,
'timestamp': time.time(),
}
with open(self.session_file, 'w', encoding='utf-8') as f:
json.dump(self.session_data, f, ensure_ascii=False, indent=2)
# ====== 课程 API ======
def get_current_semester(self):
"""获取当前学期和下学期信息"""
data = self._post("/b/kc/zhjw_v_code_xnxq/getCurrentAndNextSemester", use_ajax=True)
return data.get('result', {}).get('xnxq', [])
def get_semesters(self):
"""获取所有学期列表"""
data = self._post("/b/wlxt/kc/v_wlkc_xs_xktjb_coassb/queryxnxq", use_ajax=True)
return [x for x in data if x is not None]
def get_courses(self, semester=None):
"""获取课程列表
Args:
semester: str 学期ID,默认当前学期。格式 "2025-2026-2"
Returns:
List[dict] 课程列表,每项含 kcm(已转义课名), wlkcid, jsm(教师), kch, kxh, xf, xs, jslx
"""
if semester is None:
semesters = self.get_current_semester()
semester = semesters[0] if semesters else '2025-2026-2'
courses = []
# 学生选课
try:
data = self._get(
f"/b/wlxt/kc/v_wlkc_xs_xkb_kcb_extend/student/loadCourseBySemesterId/{semester}/zh_CN",
use_ajax=True
).json()
for c in data.get('resultList', []):
c['jslx'] = '3'
courses.append(c)
except Exception:
pass
# 助教课程
try:
data2 = self._post(f"/b/kc/v_wlkc_kcb/queryAsorCoCourseList/{semester}/0", use_ajax=True)
for c in data2.get('resultList', []):
c['jslx'] = '0'
courses.append(c)
except Exception:
pass
# 转义课名
for c in courses:
c['kcm_escaped'] = escape_filename(c.get('kcm', ''))
return courses
def get_course_type(self, jslx):
"""根据 jslx 返回课程类型字符串"""
return {'3': 'student', '0': 'teacher'}.get(str(jslx), 'student')
# ====== 公告 API ======
def get_announcements(self, wlkcid):
"""获取课程公告列表"""
data = self._post(
"/b/wlxt/kcgg/wlkc_ggb/student/pageListXs",
{"aoData": [{"name": "wlkcid", "value": wlkcid}]},
use_ajax=True
)
return data.get('object', {}).get('aaData', [])
# ====== 课件 API ======
def get_files(self, wlkcid):
"""获取课件文件列表"""
data = self._get(
f"/b/wlxt/kj/wlkc_kjxxb/student/kjxxbByWlkcidAndSizeForStudent",
params={"wlkcid": wlkcid, "size": 0},
use_ajax=True
)
return data.json().get('object', [])
def get_file_categories(self, wlkcid, type_='student'):
"""获取课件分类列表"""
data = self._get(
f"/b/wlxt/kj/wlkc_kjflb/{type_}/pageList",
params={"wlkcid": wlkcid},
use_ajax=True
)
return json.loads(data.text).get('object', {}).get('rows', [])
def download_file(self, wlkcid, wjid, type_='student', filename=None, save_dir=None):
"""
下载课件文件。
Returns:
str 保存的完整路径,失败返回 None
"""
save_dir = save_dir or DOWNLOAD_DIR
os.makedirs(save_dir, exist_ok=True)
url = f"{LEARN_BASE}/b/wlxt/kj/wlkc_kjxxb/{type_}/downloadFile"
params = {"sfgk": 0, "wjid": wjid}
self._update_headers()
headers = {**DEFAULT_HEADERS, "X-XSRF-TOKEN": self.xsrf_token} if self.xsrf_token else DEFAULT_HEADERS
r = self.session.get(url, params=params, headers=headers, verify=False, stream=True, timeout=30)
if r.status_code != 200:
return None
# 从 Content-Disposition 提取文件名
cd = r.headers.get('Content-Disposition', '')
if filename is None:
import re
m = re.search(r'filename[^;]*=([^;]+)', cd)
if m:
fname = m.group(1).strip().strip('"').strip("'")
# decode URI encoding
import urllib.parse
filename = urllib.parse.unquote(fname)
else:
filename = f"file_{wjid}"
filename = escape_filename(filename)
filepath = os.path.join(save_dir, filename)
try:
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return filepath
except Exception:
return None
# ====== 作业 API ======
def get_homeworks(self, wlkcid):
"""获取作业列表(含未提交/已提交/已批改)"""
hws = []
data = {"aoData": [{"name": "wlkcid", "value": wlkcid}]}
for endpoint in ['zyListWj', 'zyListYjwg', 'zyListYpg']:
try:
d = self._post(f"/b/wlxt/kczy/zy/student/{endpoint}", data, use_ajax=True)
hws.extend(d.get('object', {}).get('aaData', []))
except Exception:
continue
return hws
def get_homework_detail(self, wlkcid, zyid, xszyid='', type_='student'):
"""获取作业详情(含说明、附件、截止日期)"""
url = f"/f/wlxt/kczy/zy/{type_}/viewZy"
params = {
"wlkcid": wlkcid,
"sfgq": "0",
"zyid": zyid,
"xszyid": xszyid,
}
r = self._get(url, params=params, use_ajax=False)
html = r.text if isinstance(r, requests.Response) else r
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
info = {}
for item in soup.find_all('div', class_='list'):
left = item.find('div', class_='left')
right = item.find('div', class_='right')
if not left or not right:
continue
key = left.get_text(strip=True)
val = right.get_text(strip=True)
if '标题' in key:
info['title'] = val
elif '说明' in key:
info['description'] = val
elif '截止' in key:
info['deadline'] = val
elif '补交' in key:
info['makeup_deadline'] = val
# 附件
attachments = []
for fj in soup.find_all('div', class_='fujian'):
left = fj.find('div', class_='left')
links = fj.find_all('a')
if left and links:
key = left.get_text(strip=True)
for link in links:
href = link.get('href', '')
name = link.get_text(strip=True)
if href and name:
if '作业' in key:
attachments.append({'name': name, 'href': href})
info['attachments'] = attachments
return info
# ====== 讨论 API ======
def get_discussions(self, wlkcid, type_='student'):
"""获取讨论帖列表"""
data = self._get(
f"/b/wlxt/bbs/bbs_tltb/{type_}/kctlList",
params={"wlkcid": wlkcid},
use_ajax=True
)
try:
return json.loads(data.text).get('object', {}).get('resultsList', [])
except Exception:
return []
def get_discussion_detail(self, wlkcid, id_, bqid, type_='student'):
"""获取讨论帖详情"""
url = f"/f/wlxt/bbs/bbs_tltb/{type_}/viewTlById"
params = {"wlkcid": wlkcid, "id": id_, "tabbh": "2", "bqid": bqid}
r = self._get(url, params=params, use_ajax=False)
from bs4 import BeautifulSoup
soup = BeautifulSoup(r.text, 'html.parser')
detail = soup.find('div', class_='detail')
return detail.get_text(strip=True) if detail else ""
# ====== 命令行接口 ======
if __name__ == '__main__':
import argparse, pprint
parser = argparse.ArgumentParser(description='清华网络学堂 API')
parser.add_argument('--session', default=SESSION_FILE)
parser.add_argument('--semester', default=None)
parser.add_argument('--course', default=None, help='课名(部分匹配)')
parser.add_argument('--action', default='courses',
choices=['courses', 'announcements', 'files', 'homeworks', 'discussions', 'semesters'])
args = parser.parse_args()
api = LearnAPI(session_file=args.session)
api.reload_session()
if not api.valid:
print("❌ Session 无效,请先运行:")
print(' python "D:\\testclaw\\learn_login_v2.py"')
exit(1)
courses = api.get_courses(semester=args.semester)
if args.course:
courses = [c for c in courses if args.course in c.get('kcm', '')]
if args.action == 'semesters':
semesters = api.get_semesters()
print("所有学期:", semesters)
elif args.action == 'courses':
for c in courses:
print(f"[{c.get('jslx')}] {c.get('kcm')} | {c.get('jsm')} | wlkcid={c.get('wlkcid')}")
elif args.action == 'announcements':
for c in courses:
ads = api.get_announcements(c['wlkcid'])
if ads:
print(f"\n=== {c['kcm']} 公告 ===")
for a in ads:
print(f" [{a.get('fbsjStr','')}] {a.get('bt','')}")
elif args.action == 'files':
for c in courses:
files = api.get_files(c['wlkcid'])
if files:
print(f"\n=== {c['kcm']} 课件 ===")
for f in files:
print(f" {f.get('bt','?')}.{f.get('wjlx','?')}")
elif args.action == 'homeworks':
for c in courses:
hws = api.get_homeworks(c['wlkcid'])
if hws:
print(f"\n=== {c['kcm']} 作业 ===")
for h in hws:
print(f" [{h.get('zt','?')}] {h.get('bt','?')} 截止:{h.get('scsjStr','?')}")
elif args.action == 'discussions':
for c in courses:
disc = api.get_discussions(c['wlkcid'])
if disc:
print(f"\n=== {c['kcm']} 讨论 ===")
for d in disc:
print(f" {d.get('bt','?')} by {d.get('fbrxm','?')}")
FILE:scripts/login_auto.py
#!/usr/bin/env python3
"""
login_auto.py
清华网络学堂 - 无人值守自动登录脚本(双脚本方案·第二套)
==========================================================
【什么时候用】
日常调用。Session 失效时自动重新登录,无需人工介入。
【工作流程】
1. 检查 sessions/learn_session.json 里的 Session 是否有效
2. 有效 → 直接返回(不登录)
3. 无效 → 复用固定 Profile 的 cookies 自动走 CAS 登录(无 2FA)
4. 登录成功后保存 Session
【核心原则】
固定 profile 路径:profiles/learn_profile/
账号密码从 credentials.json 统一读取(禁止硬编码)
"""
import sys, os, json, time, re
sys.stdout.reconfigure(encoding='utf-8')
from playwright.sync_api import sync_playwright
import requests
# ====== 账号密码(从 credentials.json 统一加载)=======
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _config import load_credentials, get_state_file, get_fp_file, get_profile_dir
STATE_FILE = get_state_file()
FINGERPRINT_FILE = get_fp_file()
PROFILE_DIR = get_profile_dir()
try:
USER, PASS = load_credentials()
except (FileNotFoundError, RuntimeError) as e:
print(e)
raise SystemExit(1)
CAS_URL = "https://id.tsinghua.edu.cn/do/off/ui/auth/login/form/bb5df85216504820be7bba2b0ae1535b/0"
def check_session_valid(state):
"""用轻量 API 检查 Session 是否有效"""
if not state.get("learn_jsession") or not state.get("csrf"):
return False
h = {
"Accept": "application/json, */*",
"Referer": "https://learn.tsinghua.edu.cn/f/wlxt/index/course/student/",
"X-XSRF-TOKEN": state["csrf"],
"Cookie": f"JSESSIONID={state['learn_jsession']}; XSRF-TOKEN={state['csrf']}",
}
try:
r = requests.get(
"https://learn.tsinghua.edu.cn/b/wlxt/kczy/zy/student/index/zyListWj?wlkcid=&size=1",
headers=h, timeout=10
)
return not ("location.href" in r.text and r.status_code == 200)
except:
return False
def save_state(ctx, page_url, csrf):
"""提取并保存 Session"""
learn_jsession = None
for c in ctx.cookies():
if "learn.tsinghua" in c["domain"] and c["name"] == "JSESSIONID":
learn_jsession = c["value"]
state = {
"learn_jsession": learn_jsession,
"learn_token": None,
"csrf": csrf,
"timestamp": time.time(),
"url": page_url,
}
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
print(f"✅ Session 已保存")
return state
def auto_login():
"""用固定 profile 的 cookies 自动登录(无 2FA)"""
fp = json.load(open(FINGERPRINT_FILE, encoding="utf-8"))
pw = sync_playwright().start()
ctx = None
try:
ctx = pw.chromium.launch_persistent_context(
PROFILE_DIR,
headless=True,
viewport={"width": 1280, "height": 900},
ignore_https_errors=True,
args=["--no-sandbox", "--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled"],
)
page = ctx.pages[0] if ctx.pages else ctx.new_page()
page.goto(CAS_URL, wait_until="domcontentloaded", timeout=30000)
time.sleep(2)
page.evaluate(
"localStorage.setItem('fingerPrint', '" + fp["fingerPrint"] + "');"
"localStorage.setItem('fingerGenPrint', '" + fp.get("fingerGenPrint","") + "');"
"localStorage.setItem('fingerGenPrint3', '" + fp.get("fingerGenPrint3","") + "');"
)
time.sleep(1)
page.fill("#i_user", USER)
page.fill("#i_pass", PASS)
page.evaluate("doLogin()")
page.wait_for_url("**://learn.tsinghua.edu.cn/**", timeout=90000)
time.sleep(2)
csrf = None
if "_csrf" in page.url:
for p2 in page.url.split("?")[1].split("&"):
if p2.startswith("_csrf="): csrf = p2.split("=")[1]
if not csrf:
m = re.search(r'_csrf=([a-f0-9\-]{32,})', page.content())
if m: csrf = m.group(1)
return save_state(ctx, page.url, csrf)
finally:
if ctx: ctx.close()
pw.stop()
# ========== 主流程 ==========
if os.path.exists(STATE_FILE):
state = json.load(open(STATE_FILE, encoding="utf-8"))
age_h = (time.time() - state.get("timestamp", 0)) / 3600
print(f"Session 存在,age={age_h:.1f}h")
if check_session_valid(state):
print("✅ Session 有效,无需重新登录")
else:
print("⚠️ Session 失效,自动重新登录...")
state = auto_login()
print(f"JSESSIONID: {state.get('learn_jsession','?')[:10]}...")
else:
print("⚠️ Session 文件不存在,运行 login_supervised.py 建立")
print(" python scripts/login_supervised.py")
sys.exit(1)
FILE:scripts/login_supervised.py
#!/usr/bin/env python3
"""
login_supervised.py
清华网络学堂 - 有人值守登录脚本(双脚本方案·第一套)
======================================================
【什么时候用】
首次配置 / Profile 丢失 / Session 彻底失效时。
需要人工完成二次验证(2FA),只需跑这一次。
【工作流程】
1. 从 credentials.json 读取账号密码
2. 使用固定 profile 目录(cookies 持久化,避免每次触发 2FA)
3. 弹出浏览器窗口,可视化完成登录 + 2FA
4. 登录成功后保存 Session 到 sessions/learn_session.json
5. 下次直接用 login_auto.py 无人值守自动续期
【重要原则】
固定 profile 路径:profiles/learn_profile/(永不重建)
不管脚本跑多少次,都用同一个 profile → cookies 复用 → 不触发 2FA
"""
import sys, os, json, time, re
sys.stdout.reconfigure(encoding='utf-8')
from playwright.sync_api import sync_playwright
# ====== 账号密码(从 credentials.json 统一加载)=======
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _config import load_credentials, get_state_file, get_fp_file, get_profile_dir
STATE_FILE = get_state_file()
FINGERPRINT_FILE = get_fp_file()
PROFILE_DIR = get_profile_dir()
try:
USER, PASS = load_credentials()
except (FileNotFoundError, RuntimeError) as e:
print(e)
raise SystemExit(1)
CAS_URL = "https://id.tsinghua.edu.cn/do/off/ui/auth/login/form/bb5df85216504820be7bba2b0ae1535b/0"
# ====== 正文 ======
fp = json.load(open(FINGERPRINT_FILE, encoding="utf-8"))
os.makedirs(PROFILE_DIR, exist_ok=True)
def save_state(ctx, page_url, csrf):
"""提取并保存完整 Session"""
learn_jsession = None
learn_token = None
for c in ctx.cookies():
if "learn.tsinghua" in c["domain"]:
if c["name"] == "JSESSIONID": learn_jsession = c["value"]
elif c["name"] == "XSRF-TOKEN": learn_token = c["value"]
state = {
"learn_jsession": learn_jsession,
"learn_token": learn_token,
"csrf": csrf,
"timestamp": time.time(),
"url": page_url,
}
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2)
print(f"✅ Session 已保存: {STATE_FILE}")
return state
pw = sync_playwright().start()
ctx = None
try:
ctx = pw.chromium.launch_persistent_context(
PROFILE_DIR,
headless=False, # 可视化,可做 2FA
viewport={"width": 1280, "height": 900},
ignore_https_errors=True,
args=["--no-sandbox", "--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled"],
)
page = ctx.pages[0] if ctx.pages else ctx.new_page()
print(f"打开登录页: {CAS_URL}")
page.goto(CAS_URL, wait_until="domcontentloaded", timeout=30000)
time.sleep(2)
page.evaluate(
"localStorage.setItem('fingerPrint', '" + fp["fingerPrint"] + "');"
"localStorage.setItem('fingerGenPrint', '" + fp.get("fingerGenPrint","") + "');"
"localStorage.setItem('fingerGenPrint3', '" + fp.get("fingerGenPrint3","") + "');"
)
time.sleep(1)
page.fill("#i_user", USER)
page.fill("#i_pass", PASS)
print(f"凭据已填入: {USER}")
print("触发 doLogin(),请在浏览器中完成二次验证(如有)...")
page.evaluate("doLogin()")
time.sleep(3)
title = page.title()
body = page.inner_text("body")[:300]
if "二次认证" in title or "二次验证" in body:
print()
print("=" * 40)
print("⚠️ 检测到二次验证!")
print(" 请在浏览器窗口中完成验证(企业微信/短信)")
print(" 完成后脚本自动继续...")
print("=" * 40)
page.wait_for_url("**://learn.tsinghua.edu.cn/**", timeout=300000)
print("✅ 验证完成")
else:
try:
page.wait_for_url("**://learn.tsinghua.edu.cn/**", timeout=90000)
print("✅ 登录成功")
except Exception:
print(f"⚠️ 未检测到跳转,当前URL: {page.url}")
time.sleep(2)
csrf = None
if "_csrf" in page.url:
for p2 in page.url.split("?")[1].split("&"):
if p2.startswith("_csrf="): csrf = p2.split("=")[1]
if not csrf:
m = re.search(r'_csrf=([a-f0-9\-]{32,})', page.content())
if m: csrf = m.group(1)
state = save_state(ctx, page.url, csrf)
print(f"\nJSESSIONID: {state.get('learn_jsession','?')[:10]}...")
print("\n🎉 完成!以后运行 login_auto.py 即可无人值守登录。")
finally:
if ctx: ctx.close()
pw.stop()
FILE:scripts/mark_kj_read.py
#!/usr/bin/env python3
"""
mark_kj_read.py
批量标记所有课程未读课件为已读
使用 savePlayRecord 接口(isNew=1 → 0)
用法:python mark_kj_read.py
"""
import json, requests, sys, os
sys.stdout.reconfigure(encoding='utf-8')
# 统一路径
_SKILL_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
STATE_FILE = os.path.join(_SKILL_DIR, "sessions", "learn_session.json")
state = json.load(open(STATE_FILE, encoding="utf-8"))
learn_j = state["learn_jsession"]
csrf = state["csrf"]
headers = {
"Accept": "application/json, */*",
"Referer": "https://learn.tsinghua.edu.cn/f/wlxt/kj/wlkc_kjxxb/student/beforePageList",
"X-XSRF-TOKEN": csrf,
"Cookie": f"JSESSIONID={learn_j}; XSRF-TOKEN={csrf}",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
BASE_URL = "https://learn.tsinghua.edu.cn"
def mark_read(wjid):
r = requests.post(
f"{BASE_URL}/b/wlxt/kj/wlkc_kjfwb/student/savePlayRecord?_csrf={csrf}",
headers=headers, data=f"wjid={wjid}&sfgk=0", timeout=10
)
return r.status_code == 200 and "success" in r.text
# 获取全部课程
r = requests.get(
f"{BASE_URL}/b/wlxt/kc/v_wlkc_xs_xkb_kcb_extend/student/loadCourseBySemesterId/2025-2026-2/zh?_csrf={csrf}",
headers={k: v for k, v in headers.items() if k != "Content-Type"},
timeout=15
)
courses = r.json().get("resultList", [])
print(f"共 {len(courses)} 门课程")
total_unread = 0
total_marked = 0
for course in courses:
wlkcid = course.get("wlkcid", "")
kcm = course.get("kcm", "?")
if not wlkcid:
continue
r2 = requests.get(
f"{BASE_URL}/b/wlxt/kj/wlkc_kjxxb/student/kjxxbByWlkcidAndSizeForStudent?wlkcid={wlkcid}&size=100&_csrf={csrf}",
headers={k: v for k, v in headers.items() if k != "Content-Type"},
timeout=15
)
d = r2.json()
obj = d.get("object", d)
items = obj.get("aaData", []) if isinstance(obj, dict) else obj
unread = [x for x in items if x.get("isNew") == 1]
if not unread:
continue
total_unread += len(unread)
marked = 0
for item in unread:
wjid = item.get("wjid", "")
bt = item.get("bt", "?")
if not wjid:
continue
if mark_read(wjid):
marked += 1
else:
print(f" ⚠️ 失败: {bt}")
print(f"【{kcm}】{len(unread)} 项未读 → 标记 {marked} 项")
total_marked += marked
print(f"\n总计:{total_unread} 项未读 → 标记已读 {total_marked} 项")
print("完成!")
FILE:scripts/todos_api.py
#!/usr/bin/env python3
"""
todos_api.py
清华网络学堂代办总览 — 纯API版本(默认版本)
==============================================
【流程】
1. 检查 Session 有效性
2. 无效 → 自动调 login_auto.py 续期
3. 并行发出 5 个课程 × 5 个模块 = 25 个请求
4. 汇总输出
【性能】Session 有效时约 2-3 秒(纯 HTTP,无 Playwright)
【默认运行】python todos.py 时优先调用本文件
"""
import json, requests, sys, time, os, concurrent.futures
sys.stdout.reconfigure(encoding='utf-8')
# ====== 路径 + 账号配置(从 credentials.json 统一加载)=======
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _config import load_credentials, get_state_file
STATE_FILE = get_state_file()
# ======================
# TOM 的 5 门课程
COURSE_NAME = {
"2025-2026-2151368648": "大学物理A(1)",
"2025-2026-2151369314": "概率论与数理统计",
"2025-2026-2151369343": "英语听说交流(A)",
"2025-2026-2151368819": "写作与沟通",
"2025-2026-2151368584": "微积分A(2)",
}
BASE = "https://learn.tsinghua.edu.cn"
HEADERS = {
"Accept": "application/json, */*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Referer": BASE + "/f/wlxt/index/course/student/",
}
def check_session(state):
if not state.get("learn_jsession") or not state.get("csrf"):
return False
h = {**HEADERS, "X-XSRF-TOKEN": state["csrf"],
"Cookie": f"JSESSIONID={state['learn_jsession']}; XSRF-TOKEN={state['csrf']}"}
try:
r = requests.get(BASE + "/b/wlxt/kczy/zy/student/index/zyListWj?wlkcid=&size=1",
headers=h, timeout=10)
return not ("location.href" in r.text and r.status_code == 200)
except:
return False
def auto_relogin():
script_dir = os.path.dirname(os.path.abspath(__file__))
auto_script = os.path.join(script_dir, "login_auto.py")
print("Session 失效,自动续期中...")
import subprocess
result = subprocess.run([sys.executable, auto_script],
capture_output=True, text=True)
if result.returncode != 0:
print("auto login 失败:", result.stderr)
sys.exit(1)
return json.load(open(STATE_FILE, encoding="utf-8"))
def api_get(path, csrf, jsession):
url = (BASE + path) if "?" in path else (BASE + path + "?_csrf=" + csrf)
if "?" in path:
url = BASE + path + "&_csrf=" + csrf
else:
url = BASE + path + "?_csrf=" + csrf
h = {**HEADERS, "X-XSRF-TOKEN": csrf,
"Cookie": f"JSESSIONID={jsession}; XSRF-TOKEN={csrf}"}
try:
r = requests.get(url, headers=h, timeout=15)
return r.json()
except Exception as e:
return {}
def fetch_course(wlkcid, csrf, jsession):
"""并行抓一门课的全部代办,返回 dict"""
d_gg = api_get(f"/b/wlxt/kcgg/wlkc_ggb/student/kcggListXs?wlkcid={wlkcid}&size=20", csrf, jsession)
d_kj = api_get(f"/b/wlxt/kj/wlkc_kjxxb/student/kjxxbByWlkcidAndSizeForStudent?wlkcid={wlkcid}&size=100", csrf, jsession)
d_zy = api_get(f"/b/wlxt/kczy/zy/student/index/zyListWj?wlkcid={wlkcid}&size=100", csrf, jsession)
d_tl = api_get(f"/b/wlxt/bbs/bbs_tltb/student/kctlList?wlkcid={wlkcid}&size=20", csrf, jsession)
d_dy = api_get(f"/b/wlxt/bbs/bbs_tltb/student/kcdyList?wlkcid={wlkcid}&size=20", csrf, jsession)
def count_unread(data, field, value):
if isinstance(data, dict):
obj = data.get("object", data)
if isinstance(obj, dict):
items = obj.get("aaData", [])
elif isinstance(obj, list):
items = obj
else:
items = []
elif isinstance(data, list):
items = data
else:
return 0
return sum(1 for x in items if str(x.get(field, "")).strip('"') == str(value))
def get_items(data):
if isinstance(data, dict):
obj = data.get("object", data)
if isinstance(obj, list):
return obj
elif isinstance(obj, dict):
return obj.get("aaData", [])
elif isinstance(data, list):
return data
return []
return {
"gg": count_unread(d_gg, "sfyd", "否"),
"kj": count_unread(d_kj, "isNew", "1"),
"zy": count_unread(d_zy, "zt", "未交"),
"tl": count_unread(d_tl, "htsl", ""),
"dy": count_unread(d_dy, "htsl", ""),
"zy_items": [x for x in get_items(d_zy) if str(x.get("zt", "")).strip('"') == "未交"],
}
# ====== 主流程 ======
state = json.load(open(STATE_FILE, encoding="utf-8"))
age_h = (time.time() - state.get("timestamp", 0)) / 3600
print(f"Session age={age_h:.1f}h")
if not check_session(state):
print("⚠️ Session 无效")
state = auto_relogin()
else:
print("✅ Session 有效")
csrf = state["csrf"]
jsession = state["learn_jsession"]
# 并行抓所有课程
print("\n并行获取 5 门课程代办数据...")
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as ex:
futures = {ex.submit(fetch_course, wlkcid, csrf, jsession): wlkcid
for wlkcid in COURSE_NAME.keys()}
for f in concurrent.futures.as_completed(futures):
wlkcid = futures[f]
results[wlkcid] = f.result()
# 汇总输出
print("\n=== 网络学堂代办总览(纯API)===\n")
total = 0
for wlkcid, todos in results.items():
cname = COURSE_NAME[wlkcid]
print(f"【{cname}】")
has = False
for cat, label in [
("zy", "作业未提交"),
("gg", "公告未浏览"),
("kj", "课件未浏览"),
("tl", "讨论我参与"),
("dy", "答疑已回答"),
]:
cnt = todos[cat]
if cnt > 0:
print(f" ⚠️ {label}: {cnt} 项")
total += cnt
has = True
if not has:
print(f" ✅ 无待处理")
# 作业详情
any_zy = any(todos["zy"] > 0 for todos in results.values())
if any_zy:
print("\n--- 作业详情 ---")
for wlkcid, todos in results.items():
for x in sorted(todos["zy_items"], key=lambda t: t.get("jzsjStr", "")):
cname = COURSE_NAME[wlkcid]
print(f" {cname} | {x.get('bt','?')} | 截止:{x.get('jzsjStr','?')}")
print(f"\n待办总计: {total} 项")
FILE:scripts/todos_dom.py
#!/usr/bin/env python3
"""
todos.py
清华网络学堂代办总览脚本
================================
【流程】
1. 检查 Session 是否有效
2. 无效 → 自动调 login_auto.py 续期
3. Playwright 打开主页读 DOM(权威未读数)
4. API 获取作业详情(截止时间)
5. 汇总输出
【性能】
Session 有效:纯 API → 1-2s
Session 需续期:Playwright re-login + API → 15-25s
"""
import json, requests, sys, time, shutil, tempfile, re, os
sys.stdout.reconfigure(encoding='utf-8')
from playwright.sync_api import sync_playwright
# ====== 路径 + 账号配置(从 credentials.json 统一加载)=======
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _config import get_state_file, get_profile_dir, get_fp_file
STATE_FILE = get_state_file()
PROFILE_DIR = get_profile_dir()
FINGERPRINT_FILE = get_fp_file()
COURSE_NAME = {
"2025-2026-2151368648": "大学物理A(1)",
"2025-2026-2151369314": "概率论与数理统计",
"2025-2026-2151369343": "英语听说交流(A)",
"2025-2026-2151368819": "写作与沟通",
"2025-2026-2151368584": "微积分A(2)",
}
# ======================
def check_session(state):
"""Session 有效性检查(毫秒级)"""
if not state.get("learn_jsession") or not state.get("csrf"):
return False
h = {
"Accept": "application/json, */*",
"Referer": "https://learn.tsinghua.edu.cn/f/wlxt/index/course/student/",
"X-XSRF-TOKEN": state["csrf"],
"Cookie": f"JSESSIONID={state['learn_jsession']}; XSRF-TOKEN={state['csrf']}",
}
try:
r = requests.get(
"https://learn.tsinghua.edu.cn/b/wlxt/kczy/zy/student/index/zyListWj?wlkcid=&size=1",
headers=h, timeout=10
)
return not ("location.href" in r.text and r.status_code == 200)
except:
return False
def auto_relogin():
"""调 login_auto.py 续期 Session"""
script_dir = os.path.dirname(os.path.abspath(__file__))
auto_script = os.path.join(script_dir, "login_auto.py")
print("Session 失效,自动续期中...")
import subprocess
result = subprocess.run([sys.executable, auto_script],
capture_output=True, text=True)
if result.returncode != 0:
print("auto login 失败:", result.stderr)
sys.exit(1)
print(result.stdout)
return json.load(open(STATE_FILE, encoding="utf-8"))
def api_get(path, csrf, learn_j):
url = f"https://learn.tsinghua.edu.cn{path}&_csrf={csrf}" if "?" in path \
else f"https://learn.tsinghua.edu.cn{path}?_csrf={csrf}"
return requests.get(url, headers={
"Accept": "application/json, */*",
"Referer": "https://learn.tsinghua.edu.cn/f/wlxt/index/course/student/",
"X-XSRF-TOKEN": csrf,
"Cookie": f"JSESSIONID={learn_j}; XSRF-TOKEN={csrf}",
}, timeout=15).json()
# ====== 主流程 ======
# 1. 读取 Session
state = json.load(open(STATE_FILE, encoding="utf-8"))
age_h = (time.time() - state.get("timestamp", 0)) / 3600
print(f"Session age={age_h:.1f}h")
# 2. 检查并续期
if not check_session(state):
print("⚠️ Session 无效")
state = auto_relogin()
else:
print("✅ Session 有效")
csrf = state["csrf"]
learn_j = state["learn_jsession"]
# 3. Playwright 读主页 DOM(权威未读数)
TMP = tempfile.mkdtemp(prefix="todos_")
PROFILE_TMP = os.path.join(TMP, "profile")
os.makedirs(PROFILE_TMP)
pw = sync_playwright().start()
ctx = None
try:
ctx = pw.chromium.launch_persistent_context(
PROFILE_TMP, headless=True,
viewport={"width": 1280, "height": 900},
ignore_https_errors=True,
args=["--no-sandbox", "--disable-dev-shm-usage"],
)
page = ctx.pages[0] if ctx.pages else ctx.new_page()
ctx.add_cookies([
{"name": "JSESSIONID", "value": learn_j, "domain": ".learn.tsinghua.edu.cn", "path": "/"},
{"name": "XSRF-TOKEN", "value": csrf, "domain": ".learn.tsinghua.edu.cn", "path": "/"},
])
page.goto(
"https://learn.tsinghua.edu.cn/f/wlxt/index/course/student/",
timeout=30000, wait_until="networkidle"
)
time.sleep(3)
body_text = page.inner_text("body")
finally:
if ctx: ctx.close()
pw.stop()
shutil.rmtree(TMP, ignore_errors=True)
# 4. 解析主页 DOM
COURSE_CODE = {
"大学物理A(1)": "10430934",
"概率论": "10880012",
"写作与沟通": "10691342",
"微积分": "2151368584",
}
def parse_courses(text):
results = {}
for cname, code in COURSE_CODE.items():
idx = text.find(cname)
if idx < 0: continue
end_idx = len(text)
for _, other_code in COURSE_CODE.items():
if other_code == code: continue
other_pos = text.find(other_code, idx + len(cname))
if other_pos > idx: end_idx = min(end_idx, other_pos)
chunk = text[idx:end_idx]
def get_count(pat):
m = re.search(pat, chunk)
return int(m.group(1)) if m else 0
results[cname] = {
"gg": get_count(r"公告\s*(\d+)"),
"kj": get_count(r"课件\s*(\d+)"),
"zy": get_count(r"作业\s*(\d+)"),
"tl": get_count(r"讨论\s*(\d+)\s*我参与"),
"dy": get_count(r"答疑\s*(\d+)"),
"wj": get_count(r"问卷\s*(\d+)"),
}
return results
course_todos = parse_courses(body_text)
# 英语听说交流(A) - 单独处理
eng_idx = body_text.find("英语听说交流")
if eng_idx >= 0:
eng_chunk = body_text[eng_idx:eng_idx + 800]
def eng_count(pat):
m = re.search(pat, eng_chunk)
return int(m.group(1)) if m else 0
course_todos["英语听说交流(A)"] = {
"gg": eng_count(r"公告\s*(\d+)"),
"kj": eng_count(r"课件\s*(\d+)"),
"zy": eng_count(r"作业\s*(\d+)"),
"tl": eng_count(r"讨论\s*(\d+)"),
"dy": eng_count(r"答疑\s*(\d+)"),
"wj": eng_count(r"问卷\s*(\d+)"),
}
else:
course_todos["英语听说交流(A)"] = {"gg": 0, "kj": 0, "zy": 0, "tl": 0, "dy": 0, "wj": 0}
# 5. API 获取作业详情
def fetch_all_homework():
results = []
for wlkcid in COURSE_NAME.keys():
d = api_get(f"/b/wlxt/kczy/zy/student/index/zyListWj?wlkcid={wlkcid}&size=100", csrf, learn_j)
items = d.get("object", {}).get("aaData", [])
for x in items:
if x.get("zt") == "未交":
results.append({
"wlkcid": wlkcid,
"bt": x.get("bt", ""),
"jzsjStr": x.get("jzsjStr", ""),
})
return results
# 6. 汇总输出
print("\n=== 网络学堂代办总览 ===\n")
total = 0
for cname, todos in course_todos.items():
print(f"【{cname}】")
has_todo = False
for cat, label in [
("zy", "作业未提交"), ("gg", "公告未浏览"), ("kj", "课件未浏览"),
("tl", "讨论我参与"), ("dy", "答疑已回答"), ("wj", "问卷未提交"),
]:
count = todos[cat]
if count > 0:
print(f" ⚠️ {label}: {count} 项")
total += count
has_todo = True
if not has_todo:
print(f" ✅ 无待处理")
# 作业详情
has_zy = any(todos["zy"] > 0 for todos in course_todos.values())
if has_zy:
print("\n--- 作业详情 ---")
hw_list = fetch_all_homework()
for x in sorted(hw_list, key=lambda x: x.get("jzsjStr", "")):
name2 = COURSE_NAME.get(x["wlkcid"], x["wlkcid"])
print(f" {name2} | {x['bt']} | 截止:{x['jzsjStr']}")
print(f"\n待办总计: {total} 项")
FILE:scripts/_config.py
#!/usr/bin/env python3
"""
_config.py — 网络学堂凭证加载器
所有脚本通过 import _config 获取账号密码
不要在任何脚本里硬编码账号密码
路径说明:
_config.py 位于 skills/tsinghua-learn/scripts/
credentials.json 位于 skills/tsinghua-learn/
相对路径:../../credentials.json
"""
import os, json
# skills/tsinghua-learn/scripts/ → 上两级是 skill 根目录
_SKILL_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
_CRED_FILE = os.path.join(_SKILL_DIR, "credentials.json")
_STATE_FILE = os.path.join(_SKILL_DIR, "sessions", "learn_session.json")
_FP_FILE = r"D:\testclaw\learn_fingerprint.json"
_PROFILE_DIR = os.path.join(_SKILL_DIR, "profiles", "learn_profile")
def load_credentials():
"""加载账号密码,若文件不存在或格式错误则提示用户"""
if not os.path.exists(_CRED_FILE):
raise FileNotFoundError(
"credentials.json 未找到!\n"
"请复制 config_example.json 为 credentials.json,"
"并填入你的学号和密码。\n"
"路径: " + _CRED_FILE
)
try:
cred = json.load(open(_CRED_FILE, encoding="utf-8"))
u = cred.get("username", "").strip()
p = cred.get("password", "").strip()
if not u or not p:
raise ValueError("username 或 password 为空")
return u, p
except Exception as e:
raise RuntimeError(
f"credentials.json 读取失败: {e}\n"
"请检查文件格式是否正确(需包含 username 和 password 字段)"
)
def get_state_file():
return _STATE_FILE
def get_fp_file():
return _FP_FILE
def get_profile_dir():
return _PROFILE_DIR
def get_skill_dir():
return _SKILL_DIR
FILE:sessions/learn_session.json
{
"learn_jsession": "",
"learn_token": "",
"csrf": "",
"timestamp": null,
"url": ""
}Classifies tasks by complexity, plans execution, spawns and monitors subagents as needed, and reports progress and results continuously.
# The Executor Protocol - OpenClaw Task Workflow
**Six-Character Mantra: Plan first, then execute; report as you go.**
## ⚡ Core Prohibitions
- Proceeding without task classification → ❌
- Spawning subagents without a plan → ❌
- Abandoning tasks on timeout → ❌
- Failing to respond to TOM within 10s → ❌
- Guessing without reading local code → ❌
- Answering based on assumptions, not facts → ❌
---
## Task Classification
| Type | Definition | Strategy |
|------|------------|----------|
| Simple | Single step, no research needed | Execute directly |
| Medium | Multi-step, requires some research | Spawn 1-2 subagents |
| Complex | Multiple directions, heavy research | One subagent per direction, parallel |
---
## Detailed Workflow
### Phase 1: PLAN
1. Assess task complexity
2. List execution plan (steps and what each step does)
3. Report plan to TOM and wait for confirmation (unless TOM says otherwise)
### Phase 2: Execute + Report
- Simple: Execute directly, return result
- Medium/Complex: Spawn subagents for parallel execution
- Report progress as it happens, don't wait for completion
- Auto-retry on timeout, never give up
- Monitor actively, check status regularly
### Phase 3: Report Results
- Report content/results to TOM
- Explain next steps if any
---
## Subagent Standards (Universal)
### Parallel Strategy
- 5+ independent directions → one subagent each, parallel
- 2-4 directions → serial or small-scale parallel
- Simple tasks → no subagent
### Key Rules
1. **Auto-retry on timeout**: Never give up on timeout
2. **Report as you go**: Every 30-60s or on progress
3. **5 max parallel**: Wait if limit exceeded
4. **Active monitoring**: Check status regularly, don't just wait
### Plan→Spawn→Monitor→Report
1. **Plan**: List plan, report, wait for confirmation
2. **Spawn**: sessions_spawn, mode=run, runtime=subagent
3. **Monitor**: sessions_history/list to check status, sessions_send to correct direction
4. **Report**: Forward results immediately when received
---
## OpenClaw Code Review Standards
When analyzing OpenClaw's own capabilities:
1. Read docs/tools/exec.md and docs/tools/exec-approvals.md
2. Verify exec tool availability
3. Check Python library installation
4. Review openclaw.json config (disabled tools)
---
## Notes
-遇到问题 → Analyze cause first, try alternatives
- Sensitive operations → Must get authorization first
- Execution failure → Record reason, adjust strategy and retry
---
## Changelog
- 2026-04-15 v1.0: Initial release - The Executor Protocol
FILE:skill.json
{
"name": "openclaw-task-executor",
"version": "1.0.0",
"description": "The Executor Protocol - Universal task execution and reporting workflow for OpenClaw agents. Based on the Six-Character Mantra: Plan first, then execute; report as you go. Includes subagent parallel management, task classification, and real-time reporting standards.",
"author": "TOM",
"tags": [
"openclaw",
"task-execution",
"workflow",
"subagent",
"parallel-execution",
"task-management",
"agent-workflow",
"task-classification",
"real-time-reporting",
"openclaw-agent",
"executor-protocol",
"task-protocol"
]
}Local video comprehension skill. Use ffmpeg to extract audio and frames, FunASR for speech recognition, and qwen3-vl for image understanding.
---
name: local-video-understanding
description: Local video comprehension skill. Use ffmpeg to extract audio and frames, FunASR for speech recognition, and qwen3-vl for image understanding.
---
# ⚠️ If you are human, please read README.md first!
---
# Local Video Understanding
Use this skill when you need to understand the content of a video.
## Prerequisites
- FunASR conda environment (`asr-local`) must be activated for audio processing
- Ollama must be running with qwen3-vl:8b model available
- ffmpeg must be in PATH
## Workflow
### Step 1: Extract Audio
```bash
ffmpeg -i "video.mp4" -vn -acodec pcm_s16le -ar 16000 -ac 1 "audio.wav" -y
```
**Note:** If path contains Chinese characters, copy audio.wav to a path without Chinese characters before ASR.
### Step 2: Extract Key Frames
```bash
mkdir frames
ffmpeg -i "video.mp4" -vf "fps=1/10" -q:v 2 "frames/frame_%03d.jpg" -y
```
### Step 3: Speech Recognition (FunASR)
```bash
conda run -n asr-local python -c "
import os
os.environ['MODELSCOPE_CACHE'] = 'C:/Users/TOM/.cache/modelscope'
from funasr import AutoModel
model = AutoModel(
model='iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch',
model_revision='v2.0.4',
disable_update=True,
ncpu=4
)
result = model.generate(input='AUDIO_PATH')
print(result)
"
```
### Step 4: Image Understanding (qwen3-vl)
```bash
ollama run qwen3-vl:8b "Describe this image in detail: /path/to/frame.jpg"
```
### Step 5: Combine Results
- **Audio transcription** → FunASR (local, Chinese speech recognition)
- **Key frames** → qwen3-vl:8b via Ollama (local image understanding)
- **Summary/Analysis** → Cloud LLM API (if needed)
## Important Notes
- Image reading via `Read` tool does NOT provide image understanding - always use qwen3-vl
- For Chinese audio, FunASR is preferred over Whisper
- Check for existing subtitle files (.txt, .srt, .vtt) before running ASR
- Modelscope cache at `C:/Users/TOM/.cache/modelscope` for FunASR models
FILE:README.md
# Video Understanding (Local Video Comprehension)
A skill for understanding video content using local AI models, supporting both audio transcription and image understanding.
## Model Installation
### Image Understanding - qwen3-vl:8b
**Windows:**
1. Download Ollama from https://ollama.com/download
2. Install and verify: `ollama --version`
**macOS/Linux:**
```bash
brew install ollama # or download from ollama.com
```
**Pull the model:**
```bash
ollama pull qwen3-vl:8b
```
### Audio Recognition - FunASR
**Create conda environment:**
```bash
conda create -n asr-local python=3.11
conda activate asr-local
pip install funasr
```
**Note:** The FunASR model will be downloaded automatically on first use.
## Environment Requirements
- ffmpeg (in PATH)
- conda + Python 3.11
- Ollama (running)
- ~6GB VRAM for qwen3-vl:8b
## Troubleshooting
**Ollama not running:**
```bash
ollama serve
```
**FunASR model not found:**
- Models are auto-downloaded on first use
- Ensure internet connection for initial download
**Chinese audio not recognized:**
- FunASR Paraformer model is optimized for Chinese
- For English, consider Whisper via Ollama instead
Local video comprehension skill. Use ffmpeg to extract audio and frames, FunASR for speech recognition, and qwen3-vl for image understanding.
---
name: video-understanding
description: Local video comprehension skill. Use ffmpeg to extract audio and frames, FunASR for speech recognition, and qwen3-vl for image understanding.
---
# Video Understanding
Use this skill when you need to understand the content of a video.
## Prerequisites
- FunASR conda environment (`asr-local`) must be activated for audio processing
- Ollama must be running with qwen3-vl:8b model available
- ffmpeg must be in PATH
## Workflow
### Step 1: Extract Audio
```bash
ffmpeg -i "video.mp4" -vn -acodec pcm_s16le -ar 16000 -ac 1 "audio.wav" -y
```
**Note:** If path contains Chinese characters, copy audio.wav to a path without Chinese characters before ASR.
### Step 2: Extract Key Frames
```bash
mkdir frames
ffmpeg -i "video.mp4" -vf "fps=1/10" -q:v 2 "frames/frame_%03d.jpg" -y
```
### Step 3: Speech Recognition (FunASR)
```bash
conda run -n asr-local python -c "
import os
os.environ['MODELSCOPE_CACHE'] = 'C:/Users/TOM/.cache/modelscope'
from funasr import AutoModel
model = AutoModel(
model='iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch',
model_revision='v2.0.4',
disable_update=True,
ncpu=4
)
result = model.generate(input='AUDIO_PATH')
print(result)
"
```
### Step 4: Image Understanding (qwen3-vl)
```bash
ollama run qwen3-vl:8b "Describe this image in detail: /path/to/frame.jpg"
```
### Step 5: Combine Results
- **Audio transcription** → FunASR (local, Chinese speech recognition)
- **Key frames** → qwen3-vl:8b via Ollama (local image understanding)
- **Summary/Analysis** → Cloud LLM API (if needed)
## Important Notes
- Image reading via `Read` tool does NOT provide image understanding - always use qwen3-vl
- For Chinese audio, FunASR is preferred over Whisper
- Check for existing subtitle files (.txt, .srt, .vtt) before running ASR
- Modelscope cache at `C:/Users/TOM/.cache/modelscope` for FunASR models
FILE:README.md
# Video Understanding (Local Video Comprehension)
A skill for understanding video content using local AI models, supporting both audio transcription and image understanding.
## Model Installation
### Image Understanding - qwen3-vl:8b
**Windows:**
1. Download Ollama from https://ollama.com/download
2. Install and verify: `ollama --version`
**Pull the model:**
```bash
ollama pull qwen3-vl:8b
```
**macOS/Linux:**
```bash
brew install ollama # or download from ollama.com
ollama pull qwen3-vl:8b
```
### Audio Recognition - FunASR
**Create conda environment:**
```bash
conda create -n asr-local python=3.11
conda activate asr-local
pip install funasr
```
**Note:** The FunASR model will be downloaded automatically on first use.
## Environment Requirements
- ffmpeg (in PATH)
- conda + Python 3.11
- Ollama (running)
- ~6GB VRAM for qwen3-vl:8b
## Directory Structure
```
video-understanding/
├── README.md # This file (human guide)
├── SKILL.md # AI execution guide
├── scripts/
│ └── ... # Helper scripts if needed
└── assets/
└── ... # Templates/examples if needed
```
## Troubleshooting
**Ollama not running:**
```bash
ollama serve
```
**Model download slow:**
```bash
# Use a mirror if needed
OLLAMA_BASE_URL=https://example.com ollama pull qwen3-vl:8b
```
**FunASR model not found:**
- Models are auto-downloaded on first use
- Ensure internet connection for initial download
**Chinese audio not recognized:**
- FunASR Paraformer model is optimized for Chinese
- For English, consider Whisper via Ollama instead