@clawhub-junjiantech-308aad65f8
通过企业微信官方网页端操作微盘、文件夹、在线文档和表格。适用于检查登录状态、抓取并发送扫码二维码、下载或导出文件、在本地处理内容、生成报告后再通过网页端导入回企业微信微盘的场景。
---
name: wecom-drive-web
description: 通过企业微信官方网页端操作微盘、文件夹、在线文档和表格。适用于检查登录状态、抓取并发送扫码二维码、下载或导出文件、在本地处理内容、生成报告后再通过网页端导入回企业微信微盘的场景。
version: 0.2.0
metadata:
openclaw:
requires:
bins:
- node
- npm
homepage: https://clawhub.ai
---
# 企微微盘网页端
**重要:路径解析**
执行任何命令前,先根据当前 `SKILL.md` 的所在位置解析 `$SKILL_DIR`。在这个仓库里,包含本文件的目录就是技能根目录。
当 OpenClaw 只能通过企业微信官方网页端访问文件时,使用这个技能。实际操作页面时优先使用浏览器工具,因为这样可以保持会话、查看真实页面,并把截图直接发给用户;当你需要稳定地检查登录状态、抓取二维码、输出页面摘要、导出表格或把导出的 Excel 进一步整理成 Word 报告时,使用随附脚本。
## 强制规则
如果需要登录,必须把登录二维码图片发给用户。不能只保存在本地,也不能在用户扫码前继续执行任何文件读取或编辑操作。
如果进入二维码登录页,必须保持当前登录标签页或浏览器会话存活,直到用户完成扫码或明确放弃;不要在发出二维码后关闭标签页或重建新的登录页。
每次重新抓取二维码时,都要覆盖同一个本地二维码文件,默认使用 `$SKILL_DIR/.outputs/wecom-login-qr.png`,避免把旧二维码误发给用户。
处理文件内容时,默认先把文件下载到本地,再在本地处理;不要默认直接在网页编辑器里处理完整内容。
如果需要把本地结果重新上传到企业微信文档,固定打开 `https://doc.weixin.qq.com/home/recent` 页面,通过网页端导入文件。
## 工作流程
1. 确定目标链接。
- 如果用户已经提供了微盘文件、文件夹或在线文档链接,直接打开该链接。
- 如果用户只是想先建立登录会话,从 `https://doc.weixin.qq.com/home/recent` 开始。
2. 检查是否需要登录。
- 如果当前 URL 是 `https://doc.weixin.qq.com/home/recent` 且页面进入登录态检查,或者当前 URL 包含 `/wework_admin/loginpage_wx`、`/scenario/login.html`,或者页面可见文字包含 `企业微信扫码登录`、`企业身份登录`,或者 iframe 地址包含 `/wwqrlogin/`,就视为需要登录。
- 一旦需要登录,先抓取二维码,再把二维码图片发给用户,然后才能做其他事情。
- 抓到二维码后不要关闭当前标签页;要让用户在这一个仍然存活的登录页面上完成扫码。
- 明确告诉用户需要扫码后才能继续。
- 如果页面提示 `请在桌面端确认登录`、`无法扫码` 或类似文案,就切换到桌面端确认兜底流程,不要让用户重复扫码。
3. 等待用户扫码。
- 等待期间保持同一个浏览器会话存活。
- 如果需要重新截图二维码,覆盖 `$SKILL_DIR/.outputs/wecom-login-qr.png`,不要生成新的时间戳文件。
- 用户扫码后重新检查页面,确认登录墙消失后再继续。
4. 如果命中桌面端确认兜底,改走桌面端确认流程。
- 把当前截图或提示文案发给用户,明确说明这次不是普通扫码,而是需要在企业微信桌面端确认登录。
- 不要反复要求用户重新扫码。
- 优先复用已经登录过的持久化浏览器配置;如果仍未登录,再等待用户在桌面端完成确认后继续。
5. 操作文件页面。
- 先判断这是“在线直接操作”还是“下载到本地处理后再回传”。
- 对文件类内容,优先下载到本地再处理。
- 只在用户明确要求且确实适合网页直接修改时,才在页面里做小范围编辑。
- 如果登录后站点跳到了通用首页,重新打开原始目标链接。
6. 需要上传处理结果时,走固定上传入口。
- 打开 `https://doc.weixin.qq.com/home/recent`。
- 在最近页面里使用导入能力上传本地文件。
- 上传完成后,再打开上传后的文件链接继续检查。
7. 如果目标是表格分析或报表整理,优先走“原生导出 + 本地生成报告”。
- 先用网页原生菜单导出 `.xlsx` 或当前工作表 `.csv`。
- 在本地完成统计、分析和报告生成。
- 需要回传时,把 `.docx`、`.xlsx` 或其他结果文件重新导入微盘。
## 优先浏览器流程
只要浏览器工具可用,就优先使用浏览器工具。
1. 在浏览器中打开目标链接或登录链接。
2. 检查当前页面。
3. 如果页面显示登录墙,截取二维码区域或登录卡片,并在当前对话轮次里把图片发给用户。
4. 登录墙出现后不要关闭该标签页;保留当前页面等待用户扫码,必要时只更新截图文件。
5. 如果页面同时提示“请在桌面端确认登录”或“无法扫码”,立刻告诉用户需要到企业微信桌面端确认,不要继续让用户扫码。
6. 登录完成后,继续使用同一个会话,在输入或点击前再次检查文件页面结构。
7. 如果要处理的是文件而不是少量在线文本,优先走“下载到本地处理”流程,而不是直接在网页里大段改写。
当你需要页面结构和判断规则时,阅读 `references/wecom-web-notes.md`。
## 脚本流程
首次使用先安装依赖:
```bash
cd "$SKILL_DIR"
npm install
```
检查一个目标链接;如果需要登录,就保存二维码截图:
```bash
cd "$SKILL_DIR"
node ./scripts/wecom-drive-browser.mjs \
--url "https://doc.weixin.qq.com/home/recent" \
--qr-path "$SKILL_DIR/.outputs/wecom-login-qr.png" \
--keep-open
```
在同一个持久化浏览器配置目录里等待用户登录:
```bash
cd "$SKILL_DIR"
node ./scripts/wecom-drive-browser.mjs \
--url "https://doc.weixin.qq.com/home/recent" \
--wait-for-login \
--timeout-ms 180000
```
脚本会输出 JSON,包含这些字段:
- `status`
- `targetUrl`
- `currentUrl`
- `title`
- `loginRequired`
- `desktopConfirmationRequired`
- `loginHint`
- `qrPath`
- `page.links`
- `page.editableElements`
- `page.textHints`
用 `qrPath` 对应的图片发给用户。用 `page.links` 和 `page.editableElements` 作为后续浏览器操作的提示信息。
如果 `desktopConfirmationRequired` 为真,优先按桌面端确认流程处理,而不是继续要求用户扫码。
如果结果是登录页,优先保留同一个页面继续等待扫码;不要抓完二维码后立刻关闭浏览器。
不要把 `qrPath` 当成仅供内部使用的产物。脚本生成后,必须把这张图展示或附加给用户,并提示用户扫码。
## 表格导出与报告生成
如果目标是腾讯文档表格,优先尝试网页原生导出:
1. 打开文档左上或顶部的文件菜单。
2. 优先使用 `导出 -> 本地Excel表格 (.xlsx)`。
3. 如果用户只需要当前工作表,且不关心样式,可退而使用 `本地CSV文件 (.csv, 当前工作表)`。
4. 导出完成后,先确认本地文件实际落盘,再进入后续解析。
仓库里附带了一个 Word 报告生成脚本:
```bash
cd "$SKILL_DIR"
python3 ./scripts/generate_stutter_report.py "/path/to/exported.xlsx"
```
脚本会在 `$SKILL_DIR/.outputs/` 下生成:
- `*-分析报告.html`
- `*-分析报告.docx`
默认适用于“卡顿数据汇总 + 严重卡顿明细 + 播放事件抽样”这一类企业微信表格导出文件。如果后续遇到类似结构的排查表,优先复用这个脚本,而不是每次重新手写统计逻辑。
## 读取与编辑
读取时:
- 先用页面快照。
- 如果页面是文档视图,检查可见标题、段落和链接。
- 如果页面是文件列表,先收集可见文件名和链接,再决定打开哪一个。
- 如果目标是文件处理任务,优先下载文件到本地,再用本地工具处理。
编辑时:
- 如果用户的编辑要求还不够具体,先确认具体改动。
- 在输入前先确定当前活跃编辑区域。
- 优先做最小、最安全的修改,不要默认整段替换。
- 如果改动较大、涉及结构化内容转换、批量处理或格式保留,先下载到本地处理,再上传回去。
- 修改完成后,确认变更文本已经在页面上可见。
## 下载与上传
下载时:
- 优先找到页面中的下载、导出或另存为入口,把原文件保存到本地。
- 下载后在本地完成解析、修改、转换或批处理。
- 保留本地输入文件和输出文件路径,方便后续回传。
上传时:
- 不要在任意页面里盲找上传按钮。
- 固定打开 `https://doc.weixin.qq.com/home/recent`。
- 从最近页面触发导入,把本地处理后的文件上传回企业微信文档。
- 上传成功后,打开新文件并检查内容是否符合预期。
## 兜底规则
- 如果目标页面结构不熟悉,先检查 DOM,再执行操作。
- 如果二维码无法被干净地单独截出,就发送整个登录卡片或整页截图。
- 如果需要重新生成二维码截图,优先覆盖已有的 `$SKILL_DIR/.outputs/wecom-login-qr.png`,再把最新图片发给用户。
- 如果用户登录后仍未提供目标链接,要求用户给出具体文件或文件夹链接,不要自行在整个空间里盲找。
- 如果网页端不适合稳定编辑该文件,就退回到“下载到本地处理,再上传”的流程。
- 如果登录页提示“请在桌面端确认登录,无法扫码”,就明确告知用户需要在企业微信桌面端确认,并暂停扫码流程。
FILE:PUBLISHING.md
# Publishing Notes
适合上传到 ClawHub 的最小内容:
- `SKILL.md`
- `README.md`
- `agents/openai.yaml`
- `scripts/`
- `references/`
- `package.json`
- `package-lock.json`
发布前确认:
- 不包含 `node_modules/`
- 不包含 `.outputs/`
- 不包含 `.state/`
- `SKILL.md` 描述与实际能力一致
- `agents/openai.yaml` 默认提示词覆盖登录、导出、本地处理和回传流程
FILE:README.md
# wecom-drive-web
通过企业微信官方网页端操作微盘、在线文档和表格的实用 skill。
它适合这些场景:
- 检查企业微信网页端是否已登录
- 自动抓取登录二维码并发给用户
- 保持原登录会话,等待用户扫码或桌面端确认
- 打开企业微信文档、表格和微盘文件链接
- 使用网页原生能力导出 Excel 或 CSV
- 在本地处理导出的文件并生成 Word 报告
- 再通过网页端把结果文件导回企业微信微盘
## Core Capabilities
- Safe login handoff: detect login walls, capture the live QR code, and keep the same session open until the user finishes login.
- Native web export first: prefer the official export entry in WeCom Docs instead of scraping table cells when spreadsheets need offline analysis.
- Local processing workflow: download or export first, then analyze files locally for safer and more stable transformations.
- Round-trip delivery: upload generated `.docx`, `.xlsx`, or other artifacts back to `https://doc.weixin.qq.com/home/recent`.
## Included Scripts
- `scripts/wecom-drive-browser.mjs`
- 检查登录状态
- 提取二维码截图
- 输出页面结构摘要
- `scripts/generate_stutter_report.py`
- 读取导出的腾讯文档 `.xlsx`
- 生成分析报告 `.html`
- 生成 Word 报告 `.docx`
## Quick Start
安装依赖:
```bash
cd /path/to/wecom-drive-web
npm install
```
检查登录状态并生成二维码截图:
```bash
cd /path/to/wecom-drive-web
node ./scripts/wecom-drive-browser.mjs \
--url "https://doc.weixin.qq.com/home/recent" \
--qr-path "./.outputs/wecom-login-qr.png" \
--keep-open
```
基于导出的 Excel 生成报告:
```bash
cd /path/to/wecom-drive-web
python3 ./scripts/generate_stutter_report.py "/path/to/exported.xlsx"
```
## Workflow Notes
1. 如果需要登录,先把二维码发给用户,再继续后续操作。
2. 不要在用户扫码前关闭当前登录页或重建新的登录会话。
3. 对表格分析任务,优先使用网页原生 `导出 -> 本地Excel表格 (.xlsx)`。
4. 大体量处理优先在本地完成,再把结果导回微盘。
## Packaging
发布前建议排除这些目录:
- `node_modules/`
- `.outputs/`
- `.state/`
本目录已经附带 `.gitignore`,可直接作为发布时的排除参考。
FILE:package-lock.json
{
"name": "wecom-drive-web",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "wecom-drive-web",
"dependencies": {
"playwright-core": "^1.53.2"
}
},
"node_modules/playwright-core": {
"version": "1.58.2",
"resolved": "https://registry.npmjs.org/playwright-core/-/playwright-core-1.58.2.tgz",
"integrity": "sha512-yZkEtftgwS8CsfYo7nm0KE8jsvm6i/PTgVtB8DL726wNf6H2IMsDuxCpJj59KDaxCtSnrWan2AeDqM7JBaultg==",
"license": "Apache-2.0",
"bin": {
"playwright-core": "cli.js"
},
"engines": {
"node": ">=18"
}
}
}
}
FILE:package.json
{
"name": "wecom-drive-web",
"description": "Operate WeCom Docs and Drive from the official web app, including login QR capture, file export/import, and local report generation.",
"private": true,
"type": "module",
"scripts": {
"probe": "node ./scripts/wecom-drive-browser.mjs",
"login:check": "node ./scripts/wecom-drive-browser.mjs --url https://doc.weixin.qq.com/home/recent",
"test:help": "node ./scripts/wecom-drive-browser.mjs --help",
"report:stutter": "python3 ./scripts/generate_stutter_report.py"
},
"dependencies": {
"playwright-core": "^1.53.2"
}
}
FILE:scripts/wecom-drive-browser.mjs
#!/usr/bin/env node
import { access, mkdir, writeFile } from "node:fs/promises";
import path from "node:path";
import process from "node:process";
import { fileURLToPath } from "node:url";
import { parseArgs } from "node:util";
import { chromium } from "playwright-core";
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const SKILL_DIR = path.resolve(__dirname, "..");
const DEFAULT_LOGIN_URL = "https://doc.weixin.qq.com/home/recent";
const DEFAULT_OUTPUT_DIR = path.join(SKILL_DIR, ".outputs");
const DEFAULT_PROFILE_DIR = path.join(SKILL_DIR, ".state", "chrome-profile");
const DEFAULT_QR_PATH = path.join(DEFAULT_OUTPUT_DIR, "wecom-login-qr.png");
function printHelp() {
console.log(`wecom-drive-browser
用法:
node ./scripts/wecom-drive-browser.mjs [选项]
选项:
--url <url> 目标文件、文件夹或登录页面链接
--qr-path <path> 需要登录时,把二维码截图保存到这里;未指定时默认覆盖 .outputs/wecom-login-qr.png
--json-path <path> 把 JSON 结果保存到文件
--profile-dir <path> 持久化浏览器配置目录
--timeout-ms <n> 导航或等待超时时间,单位毫秒
--wait-for-login 轮询等待登录完成,直到超时
--keep-open 输出结果后保持浏览器不关闭;处理二维码登录时建议启用
--headed 显示浏览器窗口,而不是无头模式
--help 打印这段帮助信息
`);
}
function cleanText(value) {
return String(value || "").replace(/\s+/g, " ").trim();
}
async function pathExists(filePath) {
try {
await access(filePath);
return true;
} catch {
return false;
}
}
async function ensureParentDir(filePath) {
await mkdir(path.dirname(filePath), { recursive: true });
}
function defaultQrPath() {
return DEFAULT_QR_PATH;
}
async function resolveBrowserExecutable() {
const candidates = [
process.env.WECOM_DRIVE_BROWSER_PATH,
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
"/Applications/Brave Browser.app/Contents/MacOS/Brave Browser",
"/usr/bin/google-chrome",
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
].filter(Boolean);
for (const candidate of candidates) {
if (await pathExists(candidate)) {
return candidate;
}
}
throw new Error(
"未找到受支持的浏览器可执行文件。请把 WECOM_DRIVE_BROWSER_PATH 设置为 Chrome 或 Chromium 的路径。"
);
}
async function safePageText(page) {
try {
const text = await page.locator("body").innerText({ timeout: 5000 });
return cleanText(text);
} catch {
return "";
}
}
async function safeFrameSrcs(page) {
try {
return await page.locator("iframe").evaluateAll((nodes) =>
nodes
.map((node) => node.getAttribute("src") || "")
.filter((value) => Boolean(value))
);
} catch {
return [];
}
}
async function captureQrScreenshot(page, qrPath) {
await ensureParentDir(qrPath);
const candidates = [
page.frameLocator("iframe").locator("img, canvas").first(),
page.locator("iframe").first(),
page.locator("img[src*='qrcode'], img[src*='qr'], canvas").first(),
page.locator("main").first(),
];
for (const locator of candidates) {
try {
if ((await locator.count()) > 0 && (await locator.first().isVisible())) {
await locator.first().screenshot({ path: qrPath });
return qrPath;
}
} catch {
continue;
}
}
await page.screenshot({ path: qrPath, fullPage: false });
return qrPath;
}
async function detectState(page) {
const currentUrl = page.url();
const title = await page.title().catch(() => "");
const pageText = await safePageText(page);
const frameSrcs = await safeFrameSrcs(page);
const loginByUrl =
/\/wework_admin\/loginpage_wx/.test(currentUrl) ||
/\/scenario\/login\.html/.test(currentUrl) ||
(/\/home\/recent/.test(currentUrl) &&
/企业微信扫码登录|扫码登录|微信扫码登录|企业身份登录/.test(pageText));
const loginByText = /企业微信扫码登录|扫码登录|微信扫码登录|企业身份登录/.test(
pageText
);
const desktopConfirmationRequired = /请在桌面端确认登[录陆]|桌面端确认登[录陆]|无法扫码/.test(
pageText
);
const loginByFrame = frameSrcs.some(
(src) => src.includes("/wwqrlogin/") || src.includes("login_qrcode")
);
const loginRequired = loginByUrl || loginByText || loginByFrame;
const textHints = pageText
.split(/(?<=。)|\n/)
.map((item) => cleanText(item))
.filter(Boolean)
.slice(0, 12);
return {
loginRequired,
desktopConfirmationRequired,
currentUrl,
title,
frameSrcs,
textHints,
};
}
async function extractLinks(page, limit) {
try {
return await page.evaluate((maxLinks) => {
const seen = new Set();
const items = [];
for (const anchor of document.querySelectorAll("a[href]")) {
const text = (anchor.textContent || "").replace(/\s+/g, " ").trim();
const href = anchor.href || anchor.getAttribute("href") || "";
if (!href) continue;
const key = `text::href`;
if (seen.has(key)) continue;
seen.add(key);
items.push({ text, href });
if (items.length >= maxLinks) break;
}
return items;
}, limit);
} catch {
return [];
}
}
async function extractEditableElements(page, limit) {
try {
return await page.evaluate((maxItems) => {
const selectors = [
"[contenteditable='true']",
"textarea",
"input:not([type='hidden'])",
];
return Array.from(document.querySelectorAll(selectors.join(",")))
.map((node) => ({
tag: node.tagName.toLowerCase(),
name: node.getAttribute("name") || "",
placeholder: node.getAttribute("placeholder") || "",
ariaLabel: node.getAttribute("aria-label") || "",
text: (node.textContent || "").replace(/\s+/g, " ").trim().slice(0, 80),
}))
.filter(
(item) =>
item.name || item.placeholder || item.ariaLabel || item.text || item.tag === "textarea"
)
.slice(0, maxItems);
}, limit);
} catch {
return [];
}
}
async function collectPageSummary(page) {
return {
links: await extractLinks(page, 30),
editableElements: await extractEditableElements(page, 20),
};
}
async function waitForLogin(page, timeoutMs) {
const startedAt = Date.now();
while (Date.now() - startedAt < timeoutMs) {
await page.waitForTimeout(1500);
const state = await detectState(page);
if (!state.loginRequired) {
return {
status: "ready",
timedOut: false,
state,
};
}
}
return {
status: "login_timeout",
timedOut: true,
state: await detectState(page),
};
}
async function keepContextAlive(context) {
return new Promise((resolve) => {
let settled = false;
const finish = () => {
if (!settled) {
settled = true;
resolve();
}
};
context.on("close", finish);
const interval = setInterval(async () => {
try {
if (context.pages().length === 0) {
clearInterval(interval);
finish();
return;
}
await context.pages()[0].waitForTimeout(1000);
} catch {
clearInterval(interval);
finish();
}
}, 1000);
});
}
async function main() {
const { values } = parseArgs({
args: process.argv.slice(2),
options: {
url: { type: "string" },
"qr-path": { type: "string" },
"json-path": { type: "string" },
"profile-dir": { type: "string" },
"timeout-ms": { type: "string" },
"wait-for-login": { type: "boolean" },
"keep-open": { type: "boolean" },
headed: { type: "boolean" },
help: { type: "boolean" },
},
allowPositionals: false,
});
if (values.help) {
printHelp();
return;
}
const targetUrl = values.url || DEFAULT_LOGIN_URL;
const timeoutMs = Number.parseInt(values["timeout-ms"] || "30000", 10);
const qrPath = values["qr-path"] || defaultQrPath();
const jsonPath = values["json-path"];
const profileDir = values["profile-dir"] || DEFAULT_PROFILE_DIR;
const headed = Boolean(values.headed);
const keepOpen = Boolean(values["keep-open"]);
let context;
try {
const executablePath = await resolveBrowserExecutable();
await mkdir(profileDir, { recursive: true });
await mkdir(DEFAULT_OUTPUT_DIR, { recursive: true });
context = await chromium.launchPersistentContext(profileDir, {
executablePath,
headless: !headed,
viewport: { width: 1440, height: 960 },
locale: "zh-CN",
args: ["--disable-dev-shm-usage"],
});
const page = context.pages()[0] || (await context.newPage());
await page.goto(targetUrl, {
waitUntil: "domcontentloaded",
timeout: timeoutMs,
});
await page.waitForLoadState("networkidle", { timeout: 5000 }).catch(() => {});
const state = await detectState(page);
const result = {
status: state.loginRequired
? state.desktopConfirmationRequired
? "desktop_confirm_required"
: "login_required"
: "ready",
targetUrl,
currentUrl: state.currentUrl,
title: state.title,
loginRequired: state.loginRequired,
desktopConfirmationRequired: state.desktopConfirmationRequired,
loginHint: state.loginRequired
? state.desktopConfirmationRequired
? "当前登录页要求在企业微信桌面端确认登录,不能只靠扫码完成。"
: "当前页面需要扫码登录。"
: null,
qrPath: null,
profileDir,
page: {
textHints: state.textHints,
links: [],
editableElements: [],
},
};
if (state.loginRequired) {
result.qrPath = await captureQrScreenshot(page, qrPath);
if (values["wait-for-login"]) {
const waitResult = await waitForLogin(page, timeoutMs);
result.status = waitResult.status;
result.currentUrl = waitResult.state.currentUrl;
result.title = waitResult.state.title;
result.loginRequired = waitResult.state.loginRequired;
result.desktopConfirmationRequired =
waitResult.state.desktopConfirmationRequired;
result.loginHint = waitResult.state.loginRequired
? waitResult.state.desktopConfirmationRequired
? "当前登录页要求在企业微信桌面端确认登录,不能只靠扫码完成。"
: "当前页面需要扫码登录。"
: null;
result.page.textHints = waitResult.state.textHints;
}
}
if (!result.loginRequired) {
const summary = await collectPageSummary(page);
result.page.links = summary.links;
result.page.editableElements = summary.editableElements;
}
const output = `JSON.stringify(result, null, 2)\n`;
process.stdout.write(output);
if (jsonPath) {
await ensureParentDir(jsonPath);
await writeFile(jsonPath, output, "utf8");
}
if (keepOpen) {
process.stdout.write("浏览器保持打开状态,等待手动关闭。\n");
await keepContextAlive(context);
context = null;
}
} finally {
if (context) {
await context.close();
}
}
}
main().catch((error) => {
const result = {
status: "error",
error: error instanceof Error ? error.message : String(error),
};
process.stderr.write(`JSON.stringify(result, null, 2)\n`);
process.exitCode = 1;
});
FILE:scripts/generate_stutter_report.py
#!/usr/bin/env python3
import argparse
import html
import zipfile
from collections import Counter
from datetime import UTC, datetime, timedelta
from pathlib import Path
import xml.etree.ElementTree as ET
from xml.sax.saxutils import escape as xml_escape
NS = "{http://schemas.openxmlformats.org/spreadsheetml/2006/main}"
def col_to_num(col: str) -> int:
value = 0
for ch in col:
if ch.isalpha():
value = value * 26 + ord(ch.upper()) - 64
return value
def excel_date(raw: str) -> str:
base = datetime(1899, 12, 30)
return (base + timedelta(days=float(raw))).strftime("%Y-%m-%d")
def clean_text(value: str) -> str:
return (value or "").replace("\n", " ").strip()
def percent(value: float) -> str:
return f"{value * 100:.2f}%"
def read_shared_strings(book: zipfile.ZipFile) -> list[str]:
root = ET.fromstring(book.read("xl/sharedStrings.xml"))
strings = []
for si in root.findall(f"{NS}si"):
strings.append("".join(node.text or "" for node in si.iter(f"{NS}t")))
return strings
def read_sheet(book: zipfile.ZipFile, index: int, shared_strings: list[str]) -> list[list[str]]:
root = ET.fromstring(book.read(f"xl/worksheets/sheet{index}.xml"))
rows = []
max_col = 0
for row in root.find(f"{NS}sheetData").findall(f"{NS}row"):
values = {}
for cell in row.findall(f"{NS}c"):
ref = cell.attrib["r"]
col_index = col_to_num("".join(ch for ch in ref if ch.isalpha()))
max_col = max(max_col, col_index)
cell_type = cell.attrib.get("t")
value_node = cell.find(f"{NS}v")
value = "" if value_node is None else (value_node.text or "")
if cell_type == "s" and value:
value = shared_strings[int(value)]
values[col_index] = value
rows.append([values.get(col, "") for col in range(1, max_col + 1)])
return rows
def load_workbook(xlsx_path: Path) -> dict[str, list[list[str]]]:
with zipfile.ZipFile(xlsx_path) as book:
shared_strings = read_shared_strings(book)
return {
"卡顿数据": read_sheet(book, 1, shared_strings),
"严重卡顿记录": read_sheet(book, 2, shared_strings),
"工作表1": read_sheet(book, 3, shared_strings),
}
def parse_daily(rows: list[list[str]]) -> list[dict]:
result = []
for row in rows[1:]:
if not row or not row[0]:
continue
try:
result.append(
{
"date": excel_date(row[0]),
"stutters": int(float(row[1] or 0)),
"plays": int(float(row[2] or 0)),
"rate": float(row[3] or 0),
"gt10": int(float(row[4] or 0)) if row[4] else 0,
"gt20": int(float(row[5] or 0)) if row[5] else 0,
"note": clean_text(row[6] if len(row) > 6 else ""),
}
)
except ValueError:
continue
return result
def parse_severe(rows: list[list[str]]) -> list[dict]:
result = []
for row in rows[1:]:
if not row or not row[0]:
continue
try:
result.append(
{
"date": excel_date(row[0]),
"play_time": float(row[1] or 0),
"trace": clean_text(row[2]),
"resolution": clean_text(row[3]) or "未知",
"video": clean_text(row[4]),
"session": clean_text(row[5]),
"first_frame_ms": int(float(row[6] or 0)),
"stall_ms": int(float(row[7] or 0)),
"network": clean_text(row[8]) or "未知",
"ip": clean_text(row[9]) or "未知",
"device": clean_text(row[10]) or "未知",
}
)
except ValueError:
continue
return result
def parse_playback(rows: list[list[str]]) -> list[dict]:
result = []
for row in rows:
if len(row) < 10 or not clean_text(row[1]):
continue
result.append(
{
"scene": clean_text(row[1]),
"video": clean_text(row[2]),
"session": clean_text(row[3]),
"status": clean_text(row[5]),
"first_frame_ms": float(row[6] or 0),
"stall_ms": float(row[7] or 0),
}
)
return result
def top_counter(records: list[dict], key: str, limit: int = 10) -> list[tuple[str, int]]:
return Counter(clean_text(record[key]) or "未知" for record in records).most_common(limit)
def render_table(headers: list[str], rows: list[list[str]]) -> str:
thead = "".join(f"<th>{html.escape(str(header))}</th>" for header in headers)
body_rows = []
for row in rows:
body_rows.append(
"<tr>" + "".join(f"<td>{html.escape(str(cell))}</td>" for cell in row) + "</tr>"
)
return (
'<table><thead><tr>'
+ thead
+ "</tr></thead><tbody>"
+ "".join(body_rows)
+ "</tbody></table>"
)
def build_report(xlsx_path: Path, workbook: dict[str, list[list[str]]]) -> str:
daily = parse_daily(workbook["卡顿数据"])
severe = parse_severe(workbook["严重卡顿记录"])
playback = parse_playback(workbook["工作表1"])
total_stutters = sum(item["stutters"] for item in daily)
total_plays = sum(item["plays"] for item in daily)
avg_rate = sum(item["rate"] for item in daily) / len(daily)
total_gt10 = sum(item["gt10"] for item in daily)
total_gt20 = sum(item["gt20"] for item in daily)
max_rate_day = max(daily, key=lambda item: item["rate"])
min_rate_day = min(daily, key=lambda item: item["rate"])
noted_days = [item for item in daily if item["note"]]
severe_by_date = Counter(item["date"] for item in severe).most_common()
top_networks = top_counter(severe, "network")
top_resolutions = top_counter(severe, "resolution")
top_devices = top_counter(severe, "device")
top_videos = top_counter(severe, "video")
top_stall = sorted(severe, key=lambda item: item["stall_ms"], reverse=True)[:10]
top_first_frame = sorted(severe, key=lambda item: item["first_frame_ms"], reverse=True)[:10]
playback_status = Counter(item["status"] for item in playback).most_common()
daily_rows = [
[
item["date"],
item["stutters"],
item["plays"],
percent(item["rate"]),
item["gt10"],
item["gt20"],
item["note"],
]
for item in daily
]
severe_rows = [
[
item["date"],
item["stall_ms"],
item["first_frame_ms"],
item["network"],
item["resolution"],
item["device"],
item["ip"],
item["video"],
]
for item in top_stall
]
observations = [
(
f"统计周期为 {daily[0]['date']} 至 {daily[-1]['date']},"
f"共记录播放 {total_plays} 次、卡顿 {total_stutters} 次,平均卡顿率 {percent(avg_rate)}。"
),
(
f"卡顿率最高日期为 {max_rate_day['date']},当天 {max_rate_day['stutters']} 次卡顿、"
f"{max_rate_day['plays']} 次播放,卡顿率 {percent(max_rate_day['rate'])}。"
),
(
f"卡顿率最低日期为 {min_rate_day['date']},当天卡顿率 {percent(min_rate_day['rate'])}。"
),
(
f"严重卡顿明细共 {len(severe)} 条,其中卡顿超过 10 秒 {total_gt10} 次,"
f"超过 20 秒 {total_gt20} 次。"
),
(
"从备注看,8 月 7 日开始加入 iOS 数据;8 月 13 日至 8 月 20 日连续出现“标清分辨率”;"
"8 月 19 日记录了“死亡课上线”。"
),
(
f"严重卡顿网络类型以 WIFI 为主({top_networks[0][1]} 条),"
f"其次为 4G、Mobile:NR 和 5G,说明问题不完全局限于单一移动网络。"
),
(
f"严重卡顿分辨率以 848*478 为主({top_resolutions[0][1]} 条),"
f"1920*1080 次之,和“标清分辨率”备注相互印证。"
),
(
f"工作表1 中播放事件共 {len(playback)} 条,状态为“异常”{dict(playback_status).get('异常', 0)} 条,"
f"“播放完成”{dict(playback_status).get('播放完成', 0)} 条。"
),
]
def bullet_list(items: list[str]) -> str:
return "<ul>" + "".join(f"<li>{html.escape(item)}</li>" for item in items) + "</ul>"
def pair_list(title: str, items: list[tuple[str, int]]) -> str:
rows = [[name, count] for name, count in items]
return f"<h3>{html.escape(title)}</h3>" + render_table(["维度", "次数"], rows)
style = """
body { font-family: 'PingFang SC', 'Microsoft YaHei', sans-serif; margin: 36px; color: #1f2937; line-height: 1.65; }
h1, h2, h3 { color: #0f172a; }
h1 { font-size: 24px; margin-bottom: 4px; }
h2 { font-size: 18px; margin-top: 28px; margin-bottom: 8px; border-bottom: 1px solid #dbe3ee; padding-bottom: 6px; }
h3 { font-size: 15px; margin-top: 20px; margin-bottom: 8px; }
p.meta { color: #475569; margin-top: 0; }
.card { background: #f8fafc; border: 1px solid #dbe3ee; padding: 12px 14px; margin: 10px 0; border-radius: 8px; }
.metrics { display: grid; grid-template-columns: repeat(2, minmax(180px, 1fr)); gap: 10px; margin: 14px 0 18px; }
.metric { border: 1px solid #dbe3ee; border-radius: 8px; padding: 12px; background: #ffffff; }
.metric strong { display: block; font-size: 20px; color: #0f766e; }
table { width: 100%; border-collapse: collapse; margin: 10px 0 18px; font-size: 12px; }
th, td { border: 1px solid #cbd5e1; padding: 6px 8px; vertical-align: top; }
th { background: #e2e8f0; }
ul { margin-top: 8px; }
.small { font-size: 12px; color: #64748b; }
"""
metrics_html = """
<div class="metrics">
<div class="metric"><span>统计周期</span><strong>{period}</strong></div>
<div class="metric"><span>总播放次数</span><strong>{plays}</strong></div>
<div class="metric"><span>总卡顿次数</span><strong>{stutters}</strong></div>
<div class="metric"><span>平均卡顿率</span><strong>{avg_rate}</strong></div>
<div class="metric"><span>严重卡顿明细</span><strong>{severe_count}</strong></div>
<div class="metric"><span>播放事件抽样</span><strong>{playback_count}</strong></div>
</div>
""".format(
period=html.escape(f"{daily[0]['date']} 至 {daily[-1]['date']}"),
plays=total_plays,
stutters=total_stutters,
avg_rate=html.escape(percent(avg_rate)),
severe_count=len(severe),
playback_count=len(playback),
)
note_rows = [[item["date"], item["note"]] for item in noted_days]
html_parts = [
"<html><head><meta charset='utf-8'><style>",
style,
"</style></head><body>",
"<h1>阿里云视频卡顿排查数据分析报告</h1>",
f"<p class='meta'>来源文件:{html.escape(xlsx_path.name)}<br>生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>",
metrics_html,
"<h2>一、结论摘要</h2>",
bullet_list(observations),
"<h2>二、每日汇总</h2>",
render_table(
["日期", "卡顿次数", "总播放次数", "卡顿率", ">10s", ">20s", "备注"],
daily_rows,
),
"<h2>三、关键节点备注</h2>",
render_table(["日期", "事件"], note_rows),
"<h2>四、严重卡顿明细分析</h2>",
pair_list("按日期分布", severe_by_date),
pair_list("按网络类型分布", top_networks),
pair_list("按分辨率分布", top_resolutions),
pair_list("按设备类型分布", top_devices),
pair_list("按视频 ID 分布", top_videos),
"<h3>卡顿时长 Top 10</h3>",
render_table(
["日期", "卡顿时长(ms)", "首帧耗时(ms)", "网络", "分辨率", "设备", "地区", "视频ID"],
severe_rows,
),
"<h3>首帧耗时 Top 10</h3>",
render_table(
["日期", "首帧耗时(ms)", "卡顿时长(ms)", "网络", "分辨率", "设备", "地区", "视频ID"],
[
[
item["date"],
item["first_frame_ms"],
item["stall_ms"],
item["network"],
item["resolution"],
item["device"],
item["ip"],
item["video"],
]
for item in top_first_frame
],
),
"<h2>五、播放事件抽样(工作表1)</h2>",
pair_list("播放状态分布", playback_status),
pair_list("抽样视频分布", Counter(item["video"] for item in playback).most_common(10)),
(
"<div class='card'><strong>建议</strong><br>"
"1. 优先排查 848*478 标清分辨率链路与相关转码模板。<br>"
"2. 针对高频视频 ID 单独核查 CDN、首帧耗时与源文件编码参数。<br>"
"3. 针对首帧耗时 15000ms 的明细重点复核客户端超时、预加载、首包策略。<br>"
"4. 对 8 月 7 日 iOS 数据接入、8 月 13 日后标清分辨率变更、8 月 19 日“死亡课上线”三个时间点做版本回溯。"
"</div>"
),
"<p class='small'>说明:本报告基于原始工作簿中的三个工作表自动汇总生成。</p>",
"</body></html>",
]
return "".join(html_parts)
def build_docx_lines(xlsx_path: Path, workbook: dict[str, list[list[str]]]) -> list[str]:
daily = parse_daily(workbook["卡顿数据"])
severe = parse_severe(workbook["严重卡顿记录"])
playback = parse_playback(workbook["工作表1"])
total_stutters = sum(item["stutters"] for item in daily)
total_plays = sum(item["plays"] for item in daily)
avg_rate = sum(item["rate"] for item in daily) / len(daily)
total_gt10 = sum(item["gt10"] for item in daily)
total_gt20 = sum(item["gt20"] for item in daily)
max_rate_day = max(daily, key=lambda item: item["rate"])
min_rate_day = min(daily, key=lambda item: item["rate"])
noted_days = [item for item in daily if item["note"]]
severe_by_date = Counter(item["date"] for item in severe).most_common()
top_networks = top_counter(severe, "network", limit=5)
top_resolutions = top_counter(severe, "resolution", limit=5)
top_devices = top_counter(severe, "device", limit=8)
top_videos = top_counter(severe, "video", limit=8)
top_stall = sorted(severe, key=lambda item: item["stall_ms"], reverse=True)[:10]
top_first_frame = sorted(severe, key=lambda item: item["first_frame_ms"], reverse=True)[:10]
playback_status = Counter(item["status"] for item in playback).most_common()
lines = [
"阿里云视频卡顿排查数据分析报告",
f"来源文件:{xlsx_path.name}",
f"生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
"一、整体结论",
f"统计周期:{daily[0]['date']} 至 {daily[-1]['date']}。",
f"总播放次数:{total_plays};总卡顿次数:{total_stutters};平均卡顿率:{percent(avg_rate)}。",
f"严重卡顿(>10s):{total_gt10} 次;严重卡顿(>20s):{total_gt20} 次;严重卡顿明细:{len(severe)} 条。",
f"卡顿率最高日期:{max_rate_day['date']},卡顿率 {percent(max_rate_day['rate'])},卡顿 {max_rate_day['stutters']} 次。",
f"卡顿率最低日期:{min_rate_day['date']},卡顿率 {percent(min_rate_day['rate'])},卡顿 {min_rate_day['stutters']} 次。",
"",
"二、关键观察",
"1. 8 月 7 日开始加入 iOS 数据,说明这一时间点前后需要分开看趋势。",
"2. 8 月 13 日至 8 月 20 日连续备注“标清分辨率”,与严重卡顿记录中 848*478 分辨率占比最高相互印证。",
"3. 8 月 19 日出现“死亡课上线”备注,建议与上线版本、素材变更、转码参数一起回溯。",
f"4. 严重卡顿网络类型以 {top_networks[0][0]} 为主,共 {top_networks[0][1]} 条,说明问题并非只集中在移动网络。",
f"5. 工作表1 中播放事件抽样共 {len(playback)} 条,其中异常 {dict(playback_status).get('异常', 0)} 条,播放完成 {dict(playback_status).get('播放完成', 0)} 条。",
"",
"三、每日汇总",
"日期 | 卡顿次数 | 总播放次数 | 卡顿率 | >10s | >20s | 备注",
]
for item in daily:
lines.append(
f"{item['date']} | {item['stutters']} | {item['plays']} | {percent(item['rate'])} | "
f"{item['gt10']} | {item['gt20']} | {item['note'] or '-'}"
)
lines.extend(["", "四、关键节点备注"])
for item in noted_days:
lines.append(f"{item['date']}:{item['note']}")
lines.extend(["", "五、严重卡顿分布"])
lines.append("按日期分布:")
lines.extend([f"- {name}:{count}" for name, count in severe_by_date])
lines.append("按网络类型分布:")
lines.extend([f"- {name}:{count}" for name, count in top_networks])
lines.append("按分辨率分布:")
lines.extend([f"- {name}:{count}" for name, count in top_resolutions])
lines.append("按设备类型分布:")
lines.extend([f"- {name}:{count}" for name, count in top_devices])
lines.append("按视频 ID 分布:")
lines.extend([f"- {name}:{count}" for name, count in top_videos])
lines.extend(["", "六、卡顿时长 Top 10", "日期 | 卡顿时长(ms) | 首帧耗时(ms) | 网络 | 分辨率 | 设备 | 地区 | 视频ID"])
for item in top_stall:
lines.append(
f"{item['date']} | {item['stall_ms']} | {item['first_frame_ms']} | {item['network']} | "
f"{item['resolution']} | {item['device']} | {item['ip']} | {item['video']}"
)
lines.extend(["", "七、首帧耗时 Top 10", "日期 | 首帧耗时(ms) | 卡顿时长(ms) | 网络 | 分辨率 | 设备 | 地区 | 视频ID"])
for item in top_first_frame:
lines.append(
f"{item['date']} | {item['first_frame_ms']} | {item['stall_ms']} | {item['network']} | "
f"{item['resolution']} | {item['device']} | {item['ip']} | {item['video']}"
)
lines.extend(
[
"",
"八、建议",
"1. 优先排查 848*478 标清分辨率链路与相关转码模板。",
"2. 对高频异常视频 ID 做 CDN、首帧耗时、源文件编码参数复核。",
"3. 对首帧耗时达到 15000ms 的明细重点检查客户端超时与预加载策略。",
"4. 以 2025-08-07、2025-08-13、2025-08-19 为版本回溯关键时间点。",
]
)
return lines
def build_docx_document_xml(lines: list[str]) -> str:
paragraphs = []
for index, line in enumerate(lines):
safe = xml_escape(line)
if index == 0:
paragraphs.append(
"<w:p><w:r><w:rPr><w:b/><w:sz w:val=\"32\"/></w:rPr>"
f"<w:t>{safe}</w:t></w:r></w:p>"
)
elif line == "":
paragraphs.append("<w:p/>")
elif line.endswith("、整体结论") or line.endswith("、关键观察") or line.endswith("、每日汇总") or line.endswith("、关键节点备注") or line.endswith("、严重卡顿分布") or line.endswith("、卡顿时长 Top 10") or line.endswith("、首帧耗时 Top 10") or line.endswith("、建议"):
paragraphs.append(
"<w:p><w:r><w:rPr><w:b/><w:sz w:val=\"28\"/></w:rPr>"
f"<w:t>{safe}</w:t></w:r></w:p>"
)
else:
text = safe.replace(" ", "</w:t><w:t xml:space=\"preserve\"> </w:t><w:t>")
paragraphs.append(f"<w:p><w:r><w:t>{text}</w:t></w:r></w:p>")
body = "".join(paragraphs) + "<w:sectPr><w:pgSz w:w=\"11906\" w:h=\"16838\"/><w:pgMar w:top=\"1440\" w:right=\"1440\" w:bottom=\"1440\" w:left=\"1440\"/></w:sectPr>"
return (
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>"
"<w:document xmlns:wpc=\"http://schemas.microsoft.com/office/word/2010/wordprocessingCanvas\" "
"xmlns:mc=\"http://schemas.openxmlformats.org/markup-compatibility/2006\" "
"xmlns:o=\"urn:schemas-microsoft-com:office:office\" "
"xmlns:r=\"http://schemas.openxmlformats.org/officeDocument/2006/relationships\" "
"xmlns:m=\"http://schemas.openxmlformats.org/officeDocument/2006/math\" "
"xmlns:v=\"urn:schemas-microsoft-com:vml\" "
"xmlns:wp14=\"http://schemas.microsoft.com/office/word/2010/wordprocessingDrawing\" "
"xmlns:wp=\"http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing\" "
"xmlns:w10=\"urn:schemas-microsoft-com:office:word\" "
"xmlns:w=\"http://schemas.openxmlformats.org/wordprocessingml/2006/main\" "
"xmlns:w14=\"http://schemas.microsoft.com/office/word/2010/wordml\" "
"xmlns:wpg=\"http://schemas.microsoft.com/office/word/2010/wordprocessingGroup\" "
"xmlns:wpi=\"http://schemas.microsoft.com/office/word/2010/wordprocessingInk\" "
"xmlns:wne=\"http://schemas.microsoft.com/office/word/2006/wordml\" "
"xmlns:wps=\"http://schemas.microsoft.com/office/word/2010/wordprocessingShape\" "
"mc:Ignorable=\"w14 wp14\">"
f"<w:body>{body}</w:body></w:document>"
)
def write_minimal_docx(docx_path: Path, lines: list[str]) -> None:
content_types = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">
<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>
<Default Extension="xml" ContentType="application/xml"/>
<Override PartName="/word/document.xml" ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document.main+xml"/>
<Override PartName="/docProps/core.xml" ContentType="application/vnd.openxmlformats-package.core-properties+xml"/>
<Override PartName="/docProps/app.xml" ContentType="application/vnd.openxmlformats-officedocument.extended-properties+xml"/>
</Types>"""
rels = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">
<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="word/document.xml"/>
<Relationship Id="rId2" Type="http://schemas.openxmlformats.org/package/2006/relationships/metadata/core-properties" Target="docProps/core.xml"/>
<Relationship Id="rId3" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/extended-properties" Target="docProps/app.xml"/>
</Relationships>"""
core = f"""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<cp:coreProperties xmlns:cp="http://schemas.openxmlformats.org/package/2006/metadata/core-properties"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dcterms="http://purl.org/dc/terms/"
xmlns:dcmitype="http://purl.org/dc/dcmitype/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<dc:title>阿里云视频卡顿排查数据分析报告</dc:title>
<dc:creator>Codex</dc:creator>
<cp:lastModifiedBy>Codex</cp:lastModifiedBy>
<dcterms:created xsi:type="dcterms:W3CDTF">{datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")}</dcterms:created>
<dcterms:modified xsi:type="dcterms:W3CDTF">{datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")}</dcterms:modified>
</cp:coreProperties>"""
app = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Properties xmlns="http://schemas.openxmlformats.org/officeDocument/2006/extended-properties"
xmlns:vt="http://schemas.openxmlformats.org/officeDocument/2006/docPropsVTypes">
<Application>Codex</Application>
</Properties>"""
document = build_docx_document_xml(lines)
with zipfile.ZipFile(docx_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
archive.writestr("[Content_Types].xml", content_types)
archive.writestr("_rels/.rels", rels)
archive.writestr("docProps/core.xml", core)
archive.writestr("docProps/app.xml", app)
archive.writestr("word/document.xml", document)
def main() -> None:
parser = argparse.ArgumentParser(description="Generate a DOCX report from a WeCom stutter workbook export.")
parser.add_argument("xlsx_path", help="Path to the exported .xlsx file")
parser.add_argument("--output-dir", default=None, help="Directory for generated files")
args = parser.parse_args()
xlsx_path = Path(args.xlsx_path).expanduser().resolve()
output_dir = (
Path(args.output_dir).expanduser().resolve()
if args.output_dir
else Path(__file__).resolve().parent.parent / ".outputs"
)
output_dir.mkdir(parents=True, exist_ok=True)
workbook = load_workbook(xlsx_path)
html_content = build_report(xlsx_path, workbook)
docx_lines = build_docx_lines(xlsx_path, workbook)
stem = xlsx_path.stem + "-分析报告"
html_path = output_dir / f"{stem}.html"
docx_path = output_dir / f"{stem}.docx"
html_path.write_text(html_content, encoding="utf-8")
write_minimal_docx(docx_path, docx_lines)
print(f"HTML: {html_path}")
print(f"DOCX: {docx_path}")
if __name__ == "__main__":
main()
FILE:agents/openai.yaml
interface:
display_name: "企微微盘网页版"
short_description: "登录企微网页端、导出文档表格、生成本地报告并回传微盘"
default_prompt: "使用 $wecom-drive-web 打开企业微信文档或微盘链接,先检查登录状态;如果需要登录,抓取二维码并发给用户,保持当前登录页会话存活。对在线文档、表格或文件优先走网页原生下载/导出,再在本地处理;如果用户需要分析报告,可导出 Excel 后生成 Word 报告,再通过最近页导入回企业微信微盘。"
FILE:references/wecom-web-notes.md
# 企业微信网页端备注
## 目录
1. 已知链接
2. 登录判断规则
3. 脚本输出
4. 安全操作建议
## 已知链接
- 登录起点:`https://doc.weixin.qq.com/home/recent`
- 兼容登录页:`https://work.weixin.qq.com/wework_admin/loginpage_wx`
- 首页:`https://work.weixin.qq.com/`
- 常见登录二维码 iframe 路径包含 `/wework_admin/wwqrlogin/`
如果用户已经给出了文件或文件夹链接,直接打开该链接,不要再从首页重新摸索入口。
## 登录判断规则
满足任一条件时,都视为需要登录:
- 当前 URL 是 `https://doc.weixin.qq.com/home/recent`,并且页面出现登录相关提示
- 当前 URL 包含 `/wework_admin/loginpage_wx`
- 当前 URL 包含 `/scenario/login.html`
- 页面可见文字包含 `企业微信扫码登录`、`扫码登录`、`企业身份登录` 或类似文案
- iframe 地址包含 `/wwqrlogin/` 或 `login_qrcode`
如果需要登录,优先从 iframe 中截取二维码;如果失败,就截取登录主体区域或当前整页。无论使用哪种方式,都要把截图发给用户。
二维码一旦发给用户,就继续保留当前登录页面,不要关闭标签页或重开新的登录页。
重新抓取二维码时,覆盖固定文件 `$SKILL_DIR/.outputs/wecom-login-qr.png`,不要生成一串新的时间戳文件。
如果页面同时出现 `请在桌面端确认登录`、`无法扫码` 或类似提示,就不要继续让用户扫码,而是改为提示用户到企业微信桌面端完成确认。
## 脚本输出
`scripts/wecom-drive-browser.mjs` 会输出 JSON,包含:
- `status`:`ready`、`login_required`、`desktop_confirm_required`、`login_timeout` 或 `error`
- `desktopConfirmationRequired`:是否命中“需要桌面端确认”的登录提示
- `loginHint`:给上层流程使用的登录提示语
- `targetUrl`:请求打开的链接
- `currentUrl`:跳转后的最终链接
- `title`:当前页面标题
- `qrPath`:二维码截图保存路径
- `page.links`:当前页面可见链接
- `page.editableElements`:页面中检测到的可编辑控件
- `page.textHints`:用于快速判断页面内容的短文本片段
用这些 JSON 字段判断是否要把二维码发给用户、是否要继续等待登录,以及是否可以进入页面操作。
如果 `desktopConfirmationRequired` 为真,优先提示用户去企业微信桌面端确认登录,不要继续走普通扫码分支。
## 安全操作建议
- 真正的读取和编辑步骤优先使用浏览器工具,这样可以保留交互会话,并在输入前检查 DOM。
- 登录检查、会话准备、二维码提取和页面摘要优先使用随附脚本。
- 登录页处理时优先保留原页面;如果脚本模式下要等待扫码,配合 `--keep-open` 使用。
- 对文件处理任务,默认先下载到本地,再在本地处理。
- 需要上传本地处理结果时,固定使用 `https://doc.weixin.qq.com/home/recent` 作为导入入口。
- 在做可能破坏内容的修改前,如果用户指令还不够精确,先重述并确认目标改动。
- 如果页面是富文本编辑器或类表格编辑器,输入前先检查当前焦点元素和附近工具栏标签。