@clawhub-jinqiu193-d48b8a58f3
Multi-Agent collaborative system for writing ultra-long feasibility study reports. Phase 0 Requirements → Phase 1 Planner Outline → Phase 2 Parallel Sub-Agen...
---
name: long-doc-agent
license: MIT
metadata:
version: "3.3.0"
category: document-generation
triggers:
- "write feasibility report"
- "write proposal"
- "multi-chapter"
- "parallel writing"
- "agent write document"
description: >
Multi-Agent collaborative system for writing ultra-long feasibility study reports.
Phase 0 Requirements → Phase 1 Planner Outline → Phase 2 Parallel Sub-Agent Writing →
Phase 2.5 Cross-Chapter Consistency Review → Phase 3 Integrator Final Output (styled docx).
Core files: integrate_report.py (integration/CLI), parallel_tracker.py (parallel progress tracking).
---
# Ultra-Long Feasibility Report Multi-Agent Collaborative Writing v3.3
## Changelog (v3.3)
- ✅ Table parsing fully fixed (`_flush_table` not calling `_parse_md_table` caused character-by-character splitting)
- ✅ Colorful chapter headings officially launched (H1 deep navy full-row band / H2 medium blue block / H3 light blue left bar)
- ✅ Styled tables launched (deep blue header + white text + alternating row colors)
- ✅ Key callout boxes launched (【关键】【注意】【优势】【风险】【数据】 color-coded cards)
- ✅ Cover style enhanced (tech-digital style deep ocean blue full-screen background + white text)
- ✅ Cover/Contents/Executive Summary title bars all colored
- ✅ Fixed `cover_style` integer vs string comparison preventing cover from applying
- ✅ Fixed `RGBColor.from_string()` instead of `eval` to avoid type errors
## Core Capabilities
- **Multi-Agent Parallel**: Up to 5 sub-agents writing concurrently, doubled efficiency
- **Incremental Updates**: Chapters with unchanged content are skipped, faster processing
- **Beautiful Formatting**: Auto-generated cover, table-style TOC, colorful chapter headings, key callout boxes, styled tables
- **Feishu RAG**: Auto-search Feishu knowledge base to supplement reference materials
- **6 Cover Styles**: Switch freely to suit different scenarios
---
## File Structure
```
skill_dir/
├── SKILL.md # This file
├── integrate_report.py # Core engine: parse/ integrate/CLI
├── parallel_tracker.py # Parallel progress tracking
└── references/ # Sub-process reference documents
├── phase0_guide.md # Phase 0 requirements confirmation flow
├── phase1_guide.md # Planner prompt template
├── phase2_guide.md # Sub-Agent prompt template
├── table_format_guide.md # Markdown table format specification
└── bug_fix_guide.md # Bug troubleshooting & forced rebuild
```
> **First-time setup**: Ensure the `F:/agent/chapters/` directory exists.
---
## Pipeline Routing
```
User Task
├─ First writing request ("I want to write xxx"/"help me write a feasibility report")
│ → Phase 0 Requirements → Phase 1 Planner
│
├─ Outline exists, request to start writing
│ → Phase 2 Parallel Sub-Agents
│
├─ A chapter needs modification
│ → Small change: directly edit F:/agent/chapters/0X-xxx.txt
│ → Large change: regenerate that chapter
│
├─ All chapters done, request docx generation
│ → Phase 2.5 Review → Phase 3 Integrator
│
├─ Independent small proposal (2~5 chapters, no existing chapters dependency)
│ → Write Markdown directly → make_docx.py to generate styled docx
│ → See: references/bug_fix_guide.md "make_docx.py Mode"
│
└─ Just need to check progress/glossary/reference materials
→ Direct CLI commands
```
---
## Phase 0: Requirements Confirmation
Confirm 4 items in order; all confirmed → Phase 1:
1. **Writing Topic**: document type / audience / style / special constraints
2. **Background Information**: project background / construction goals / industry context
3. **Reference Materials** (most important):
- A. Local file path or paste directly
- B. Feishu document (RAG search)
- C. Paste content directly
- D. Not provided for now
4. **Outline Confirmation**: After planner outputs outline, user chooses A.Start / B.Adjust / C.Cancel
> More reference materials → more business-aligned content. See `references/phase0_guide.md` for details.
---
## Phase 1: Planner
**Input**: Phase 0 topic / background / reference materials
**Execute**:
```bash
python integrate_report.py glossary
```
Auto-generates `plan.json` + `plan_outline_snapshot.md`
> Full prompt template in `references/phase1_guide.md`
**After completion, send WeChat notification** (using `message` tool, channel=`openclaw-weixin`):
```
📋 Report Outline Generated
📌 《[Report Topic]》
📊 Chapters: [X] chapters
🔍 Industry: [Industry Field]
✅ Reply "start writing" once the outline is confirmed — the system will launch parallel creation!
```
---
## Phase 2: Parallel Sub-Agents
**Execution flow** (fully automatic, no manual confirmation):
1. Display outline / current batch status (display only, no waiting)
2. `python parallel_tracker.py clear` to clear previous batch state
3. Start up to 5 concurrent sub-agents (`sessions_spawn`), automatically execute all batches
4. `python parallel_tracker.py wait` to monitor in background until this batch is complete
5. After completion, automatically run `python integrate_report.py convert-batch`
**Sub-Agent prompt template**: see `references/phase2_guide.md`
**After each batch completes, send WeChat notification** (using `message` tool, channel=`openclaw-weixin`):
```
✅ Batch [X] Chapter Writing Complete!
📖 Completed: [Done]/[Total] chapters
📝 This batch:
• [Chapter 1 Title]
• [Chapter 2 Title]
• [Chapter 3 Title] (if any)
⏳ Next batch: [Next batch chapter list]
(Automatically proceeds to next batch, no manual confirmation needed)
```
- Small change: directly edit `F:/agent/chapters/0X-xxx.txt`, save and regenerate
- Large change: re-trigger sub-agent to rewrite, replacing the original file
---
## Phase 2.5: Cross-Chapter Consistency Review
```bash
python integrate_report.py check
```
Review numerical indicator consistency and terminology uniformity (对照 glossary.json)
**After review completes, send WeChat notification** (using `message` tool, channel=`openclaw-weixin`):
```
🔍 Consistency Review Complete
✅ Terminology uniformity: OK
✅ Numerical indicators: consistent
✅ Cross-chapter references: no conflicts
📄 Proceeding to final integration phase...
```
---
## Phase 3: Integrator Summary
```bash
python integrate_report.py
```
Auto-completes: parse chapters (error isolation) → update glossary → consistency review → generate styled docx
**After final completion, send WeChat notification** (using `message` tool, channel=`openclaw-weixin`):
```
🎉🎉🎉 Report Writing Complete! 🎉🎉🎉
📄 《[Report Topic]》
📊 Scale: [X] chapters / ~[Y] thousand characters
🎨 Cover Style: [Style Name]
✅ Styled report generated!
📁 File location: F:/agent/chapters/output/
Wenxin, full text ready for your review~
```
---
## Document Beautification Features (Auto-Applied)
Generated reports automatically include the following formatting effects (selected via `cover_style` field in `plan.json`):
1. **6 Cover Styles** — Edit `plan.json` → `cover_style` field (integer 1~6)
2. **Executive Summary** — Deep blue title bar (`#1F4E79`) background + white text + body indent
3. **Table-Style TOC** — Deep blue title bar + three-column entries (number/chapter/page)
4. **Colorful Chapter Headings**:
- H1: Full-row deep navy background `#1F4E79` + white text Microsoft YaHei
- H2: Medium blue background `#2E75B6` + white text
- H3: Light blue background `#D6E4F0` + dark blue text + `▌` left bar
5. **Key Callout Boxes** — Auto-detect 【关键】【注意】【优势】【风险】【数据】 tags, render as color cards (background/white text/border)
6. **Styled Tables** — Header deep navy background `#1F4E79` + white text + alternating row colors (`#DEEAF6` / `#FFFFFF`)
---
## Cover Styles (6 Types)
Cover style specified via `cover_style` field in `plan.json` (integer, 1~6):
| # | Style Name | Features | Recommended For |
|---|------------|----------|-----------------|
| 1 | Classic Government | Deep navy top bar + gold accents | Government/state enterprise approval |
| 2 | Modern Minimalist | Left blue heavy block + right info | Tech/business reports |
| 3 | Business Elegant | Burgundy + centered progression | Consulting/investment bank reports |
| 4 | Tech Digital | Deep ocean blue fill + large white title | Internet/digital projects |
| 5 | Chinese Traditional | Forbidden City red + rice paper cream background | Traditional culture/state enterprise |
| 6 | Full Immersive | Deep ocean blue fill + large white title | Digital/tech projects |
> **Note**: `cover_style` value is integer (e.g., `4`), code automatically converts to string for comparison.
---
## CLI Command Reference
| Command | Description |
|---------|-------------|
| `python integrate_report.py` | Generate integrated report (full) |
| `python integrate_report.py convert-batch` | Batch convert to docx |
| `python integrate_report.py convert-one <in> <out>` | Single chapter to docx |
| `python integrate_report.py check` | Consistency review |
| `python integrate_report.py glossary` | Glossary generation/update |
| `python integrate_report.py ref show` | View reference materials |
| `python integrate_report.py ref clear` | Clear reference materials |
| `python integrate_report.py preview [chapter prefix]` | Preview chapter summary |
| `python integrate_report.py feishu-search <query>` | Search Feishu knowledge base |
| `python parallel_tracker.py show` | View writing progress |
| `python parallel_tracker.py wait` | Block & monitor (Ctrl+C to stop) |
| `python parallel_tracker.py clear` | Clear tracking state |
> **Switching cover style**: Edit `cover_style` field (integer 1~6) in `F:/agent/chapters/plan.json`, then regenerate.
> After modifying code: delete `.pyc` files under `__pycache__` + `content_hashes.json` to force rebuild.
---
## State Files
| File | Description |
|------|-------------|
| `F:/agent/chapters/plan.json` | Chapter metadata |
| `F:/agent/chapters/glossary.json` | Terminology table |
| `F:/agent/chapters/reference_material.txt` | Raw reference materials |
| `F:/agent/chapters/plan_outline_snapshot.md` | Outline snapshot |
| `F:/agent/chapters/content_hashes.json` | Incremental cache (delete to force rebuild) |
| `F:/agent/chapters/writing_tracker.json` | Parallel progress tracking |
| `F:/agent/chapters/config.json` | Cover style and other config |
---
## Critical Rules
### Markdown Table Format (Sub-Agents Must Follow)
See `references/table_format_guide.md` for full spec
Key points:
- Separator row must be `|---|---|---|` (leading/trailing `|` required)
- All rows must have same column count as header
- Cell content should avoid containing `|` (use `~` or `-` for ranges)
### Force Rebuild (Must Do Both Steps After Code Changes)
After modifying `integrate_report.py`, must delete both files for new code to take effect:
```bash
# 1. Delete .pyc cache (required after code changes)
del "C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\long-doc-agent\__pycache__\integrate_report.cpython-311.pyc"
# 2. Delete incremental hash (or incremental mode skips everything)
del F:\agent\chapters\content_hashes.json
# 3. Regenerate
python integrate_report.py
```
### Known Bugs Fixed (For Reference)
See `references/bug_fix_guide.md`, including:
- `_flush_table` not calling `_parse_md_table` causing character-by-character table splitting
- `cover_style` integer vs string comparison preventing cover from applying
- `eval` RGB color assignment type error
- `.pyc` cache causing new code to not take effect
- `RGBColor` using index access `rgb[0]/rgb[1]/rgb[2]` instead of `.red/.green/.blue`
- `add_cover()` setting `section.margin=0` causing body text to have no margins
- `PermissionError` when docx file is open in WPS → auto-add `_v2` suffix
- `write` tool has 50KB line limit → large scripts must be written in chunks
---
## References
| File | Content |
|------|---------|
| `references/phase0_guide.md` | Phase 0 requirements confirmation full flow & scripts |
| `references/phase1_guide.md` | Planner full prompt template & plan.json format |
| `references/phase2_guide.md` | Sub-Agent full prompt template (incl. table format warnings) |
| `references/table_format_guide.md` | Markdown table format spec, common errors & examples |
| `references/bug_fix_guide.md` | Bug troubleshooting & forced rebuild procedures |
FILE:integrate_report.py
"""
整合报告生成器 v3
=========================
基于 v2 的增量优化版本,新增:
- Phase 0 参考资料管理(reference_material.txt)
- 术语表前置生成(从参考资料中提取)
- 大纲快照机制(plan_outline_snapshot.md)
- 批量版本快照(snapshot_batch_*.md)
- 单章原地修改工具(inline_edit)
- 全局配置(config.json)
自包含设计:单章转换逻辑直接内嵌,不依赖外部脚本
"""
from docx import Document
from docx.shared import Pt, Inches, Cm, Twips
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
import glob, os, re, subprocess, sys, json as json_module, shutil, hashlib
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Dict, List, Tuple, Optional, Any
# ============ 全局配置 ============
CHAPTERS_DIR = 'F:/agent/chapters'
PLAN_FILE = CHAPTERS_DIR + '/plan.json'
PROGRESS_FILE = CHAPTERS_DIR + '/progress.json'
GLOSSARY_FILE = CHAPTERS_DIR + '/glossary.json'
REFERENCE_FILE = CHAPTERS_DIR + '/reference_material.txt'
OUTLINE_SNAPSHOT = CHAPTERS_DIR + '/plan_outline_snapshot.md'
CONFIG_FILE = CHAPTERS_DIR + '/config.json'
FINAL_DOC = 'F:/agent/整合报告.docx'
CHARS_PER_PAGE = 950
HASH_FILE = CHAPTERS_DIR + '/content_hashes.json' # 增量更新:章节内容hash清单
MERMAID_TEMP = CHAPTERS_DIR + '/mermaid_temp' # Mermaid渲染临时目录
# Playwright Chromium 配置(mmdc 专用)
MERMAID_PUPPETEER_CONFIG = CHAPTERS_DIR + '/mermaid_temp/puppeteer_config.json'
# ============ 增量更新:内容Hash ============
def compute_content_hash(content: str) -> str:
"""计算内容MD5(排除空白符差异)"""
normalized = re.sub(r'\s+', '', content.strip())
return hashlib.md5(normalized.encode('utf-8')).hexdigest()
def load_hashes() -> Dict[str, str]:
if os.path.exists(HASH_FILE):
try:
with open(HASH_FILE, 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
pass
return {}
def save_hashes(hashes: Dict[str, str]):
with open(HASH_FILE, 'w', encoding='utf-8') as f:
json_module.dump(hashes, f, ensure_ascii=False, indent=2)
def get_changed_chapters(chapters_data: List[Tuple], hashes: Dict[str, str]) -> List[Tuple]:
"""返回实际发生变化的章节列表(增量更新依据)"""
changed = []
for item in chapters_data:
seq = item[0]
content = item[3]
new_hash = compute_content_hash(content)
if hashes.get(seq) != new_hash:
changed.append(item)
return changed
# ============ Mermaid 图表渲染 ============
def ensure_mermaid_deps():
"""检查并返回mermaid CLI调用命令(None表示不可用)"""
local_cli = r'E:\lonb\LobsterAI\node_modules\@mermaid-js\mermaid-cli\src\cli.js'
# 规范化并去重 ..
local_cli = os.path.normpath(local_cli)
candidates = [
('local', [local_cli, '--version']),
('local_node', ['node', local_cli, '--version']),
('mmdc', ['mmdc', '--version']),
('npx_mmdc', ['npx', 'mmdc', '--version']),
]
for name, cmd in candidates:
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
if name == 'local' or name == 'local_node':
return 'E:\\lonb\\LobsterAI\\node_modules\\@mermaid-js\\mermaid-cli\\src\\cli.js' # mmdc 完整路径(render时会用node调用)
except Exception:
continue
return None
MERMAID_CLI = ensure_mermaid_deps()
def render_mermaid_image(code: str, out_path: str, cli: str = None) -> bool:
"""
调用 mermaid CLI 将代码块渲染为PNG
cli: 'mmdc' | 'npx mermaid' 等
Returns: 是否成功
"""
if cli is None:
cli = MERMAID_CLI
if cli is None:
return False
os.makedirs(os.path.dirname(out_path), exist_ok=True)
# 写临时文件
import tempfile
tmp_input = os.path.join(CHAPTERS_DIR, '_mermaid_tmp.mmd')
with open(tmp_input, 'w', encoding='utf-8') as f:
f.write(code)
try:
# mmdc 是 .js 文件时需用 node 调用
if cli.endswith('.js'):
cmd = ['node', cli, '-i', tmp_input, '-o', out_path]
else:
cmd = cli.split() + ['-i', tmp_input, '-o', out_path]
# 注入 Playwright Chromium 配置
if os.path.exists(MERMAID_PUPPETEER_CONFIG):
cmd += ['-p', MERMAID_PUPPETEER_CONFIG]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return os.path.exists(out_path)
except Exception:
return False
finally:
if os.path.exists(tmp_input):
os.remove(tmp_input)
def process_mermaid_blocks(content: str) -> Tuple[str, List[str]]:
"""
检测并渲染 content 中的 mermaid 图表代码块。
返回: (processed_content, list_of_rendered_image_paths)
渲染失败时:保留原始代码块,附加【图表渲染失败,请手动替换】提示
"""
rendered_images = []
mermaid_blocks = list(re.finditer(r'```mermaid\n(.*?)```', content, re.DOTALL))
if not mermaid_blocks:
return content, []
processed = content
for m in reversed(mermaid_blocks): # 逆序处理,从后往前替换
code = m.group(1).strip()
# 生成唯一文件名
block_idx = len(mermaid_blocks) - 1 - mermaid_blocks[::-1].index(m)
img_name = f'mermaid_{block_idx:03d}.png'
img_path = os.path.join(MERMAID_TEMP, img_name)
success = False
if MERMAID_CLI:
success = render_mermaid_image(code, img_path, MERMAID_CLI)
if success:
rendered_images.append(img_path)
replacement = f'\n[Mermaid图表已渲染,见附件: {img_name}]\n'
else:
replacement = (
f'\n```mermaid\n{code}\n```\n\n'
f'<!-- ⚠️ Mermaid图表(渲染工具mmdc未安装或渲染失败,'
f'请在支持Mermaid的编辑器中查看,或手动替换为图片) -->\n'
)
# 用替换文稿重建内容
processed = processed[:m.start()] + replacement + processed[m.end():]
return processed, rendered_images
# ============ Word TOC 字段生成 ============
NSMAP = 'xmlns:wpc="http://schemas.microsoft.com/office/word/2010/wordprocessingCanvas" ' \
'xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" ' \
'xmlns:o="urn:schemas-microsoft-com:office:office" ' \
'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships" ' \
'xmlns:m="http://schemas.openxmlformats.org/officeDocument/2006/math" ' \
'xmlns:v="urn:schemas-microsoft-com:vml" ' \
'xmlns:wp14="http://schemas.microsoft.com/office/word/2010/wordprocessingDrawing" ' \
'xmlns:wp="http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing" ' \
'xmlns:w10="urn:schemas-microsoft-com:office:word" ' \
'xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main" ' \
'xmlns:w14="http://schemas.microsoft.com/office/word/2010/wordml" ' \
'xmlns:wpg="http://schemas.microsoft.com/office/word/2010/wordprocessingGroup" ' \
'xmlns:wpi="http://schemas.microsoft.com/office/word/2010/wordprocessingInk" ' \
'xmlns:wne="http://schemas.microsoft.com/office/word/2006/wordml" ' \
'xmlns:wps="http://schemas.microsoft.com/office/word/2010/wordprocessingShape"'
def _make_bookmark_start(bookmark_id: int, bookmark_name: str) -> OxmlElement:
el = OxmlElement('w:bookmarkStart')
el.set(qn('w:id'), str(bookmark_id))
el.set(qn('w:name'), bookmark_name)
return el
def _make_bookmark_end(bookmark_id: int) -> OxmlElement:
el = OxmlElement('w:bookmarkEnd')
el.set(qn('w:id'), str(bookmark_id))
return el
def add_toc_entry(doc, seq: str, title: str, page_num: int, toc_type: str = 'chapter'):
"""
生成真实的Word TOC条目(使用 FORMTEXT + PAGEREF 字段)。
seq: 章节序号,如"一"或"第一章"
toc_type: 'summary'(执行摘要)| 'chapter'(章节)
"""
bm_id = 100 + hash(title) % 1000
p = doc.add_paragraph()
p.paragraph_format.line_spacing = Pt(22)
p.paragraph_format.space_after = Pt(4)
if toc_type == 'summary':
# 执行摘要:纯文本,无超链接
p.paragraph_format.first_line_indent = Cm(-0.74)
r = p.add_run(seq + ' ' + title)
r.font.size = Pt(12)
cjk(r, '宋体')
return
# ---- 章节TOC条目:带超链接 + Tab + 页码字段 ----
prefix = seq + ' '
p.paragraph_format.first_line_indent = Cm(-0.74)
# 前缀文本
r_prefix = p.add_run(prefix)
r_prefix.font.size = Pt(12)
cjk(r_prefix, '宋体')
# 超链接(链接到本章书签)
bookmark_name = f'_Toc_{bm_id}'
run = p.add_run()
run.font.size = Pt(12)
cjk(run, '宋体')
# 插入 FORMTEXT 字段(显示标题)
fld_char_begin = OxmlElement('w:fldChar')
fld_char_begin.set(qn('w:fldCharType'), 'begin')
run._r.append(fld_char_begin)
instr_text = OxmlElement('w:instrText')
instr_text.text = f' FORMTEXT '
run._r.append(instr_text)
fld_char_end = OxmlElement('w:fldChar')
fld_char_end.set(qn('w:fldCharType'), 'end')
run._r.append(fld_char_end)
# 插入 Tab + PAGEREF 字段(显示页码)
tab = OxmlElement('w:tab')
tab.set(qn('w:val'), 'right')
p._p.append(tab)
tab_char = OxmlElement('w:tabChar')
tab_char.set(qn('w:val'), 'right')
p._p.append(tab_char)
run_page = p.add_run()
run_page.font.size = Pt(12)
cjk(run_page, '宋体')
# PAGEREF 字段
fld_char_begin2 = OxmlElement('w:fldChar')
fld_char_begin2.set(qn('w:fldCharType'), 'begin')
run_page._r.append(fld_char_begin2)
instr_text2 = OxmlElement('w:instrText')
instr_text2.text = f' PAGEREF {bookmark_name} \\h '
run_page._r.append(instr_text2)
fld_char_end2 = OxmlElement('w:fldChar')
fld_char_end2.set(qn('w:fldCharType'), 'end')
run_page._r.append(fld_char_end2)
# 添加书签(供 PAGEREF 引用)
p._p.insert(0, _make_bookmark_start(bm_id, bookmark_name))
p._p.append(_make_bookmark_end(bm_id))
return bm_id, bookmark_name
# ============ 配置读写 ============
def load_config() -> Dict[str, Any]:
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
pass
return {"project_name": "", "topic": "", "audience": "", "doc_type": "可行性研究报告", "style": "专业严谨", "custom_constraints": ""}
def save_config(cfg: Dict[str, Any]):
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json_module.dump(cfg, f, ensure_ascii=False, indent=2)
def load_plan() -> Dict[str, Any]:
if os.path.exists(PLAN_FILE):
try:
with open(PLAN_FILE, 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
pass
return make_default_plan()
def make_default_plan() -> Dict[str, Any]:
return {"project_name": "", "chapters": []}
def save_plan(plan: Dict[str, Any]):
with open(PLAN_FILE, 'w', encoding='utf-8') as f:
json_module.dump(plan, f, ensure_ascii=False, indent=2)
# ============ 参考资料管理 ============
def load_reference() -> str:
"""加载参考资料"""
if os.path.exists(REFERENCE_FILE):
try:
with open(REFERENCE_FILE, 'r', encoding='utf-8') as f:
return f.read()
except Exception:
pass
return ""
def save_reference(text: str):
"""保存参考资料"""
with open(REFERENCE_FILE, 'w', encoding='utf-8') as f:
f.write(text)
print(f"[REF] 参考资料已保存,共 {len(text)} 字符")
def extract_terms_from_reference(text: str, max_terms=80) -> List[Dict[str, Any]]:
"""
从参考资料中提取术语(专业词汇提取)
策略:提取重复出现2次以上的中文词组(>=4字),过滤停用词
"""
if not text:
return []
stopwords = {
'以及', '包括', '可以', '通过', '根据', '按照', '为了', '由于', '其中',
'其他', '相关', '以上', '以下', '对于', '并且', '或者', '等等',
'本项目', '本公司', '本系统', '本章', '本节', '本文', '本案',
'进行', '完成', '实现', '提供', '使用', '管理', '系统', '建设',
'方案', '项目', '数据', '平台', '技术', '功能', '模块'
}
# 提取中文词组
pattern = re.compile(r'[\u4e00-\u9fff]{4,}')
candidates = pattern.findall(text)
# 统计频次
freq: Dict[str, int] = {}
for w in candidates:
if w not in stopwords and len(w) >= 4:
freq[w] = freq.get(w, 0) + 1
# 过滤:出现>=2次
filtered = {w: c for w, c in freq.items() if c >= 2}
sorted_terms = sorted(filtered.items(), key=lambda x: -x[1])[:max_terms]
return [{"term": t, "count": c, "source": "reference"} for t, c in sorted_terms]
def build_reference_summary(text: str, max_chars=3000) -> str:
"""构建参考资料摘要(供子Agent使用)"""
if not text:
return ""
# 取前max_chars
summary = text[:max_chars]
if len(text) > max_chars:
summary += f"\n\n[...参考资料共 {len(text)} 字符,此处省略中间部分...]\n\n" + text[-1000:]
return summary
# ============ 字体辅助 ============
def cjk(run, name):
r = run._element
rPr = r.get_or_add_rPr()
rFonts = rPr.find(qn('w:rFonts'))
if rFonts is None:
rFonts = OxmlElement('w:rFonts')
rPr.insert(0, rFonts)
rFonts.set(qn('w:eastAsia'), name)
# ============ Markdown → docx 表格辅助函数 ============
def _clean_inline(text):
"""清除行内markdown符号"""
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'`(.+?)`', r'\1', text)
return text.strip()
def _is_table_line(line):
stripped = line.strip()
return stripped.startswith('|') and stripped.endswith('|')
def _is_separator_line(line):
"""判断是否为 markdown 表格分隔行(如 |----|----|)"""
stripped = line.strip().strip('|')
return bool(re.match(r'^[\s\-:.|]+$', stripped))
def _parse_md_table(rows):
"""将markdown表格行列表解析为二维字符串数组"""
result = []
for line in rows:
stripped = line.strip().strip('|')
cols = stripped.split('|')
result.append([_clean_inline(c.strip()) for c in cols])
return result
def _add_table_to_doc(doc, rows):
"""将解析后的表格写入docx"""
if not rows:
return
col_count = max(len(r) for r in rows)
col_count = max(col_count, 1)
tbl = doc.add_table(rows=len(rows), cols=col_count)
tbl.style = 'Table Grid'
for r_idx, row_data in enumerate(rows):
cells = tbl.rows[r_idx].cells
actual = len(cells)
for c_idx in range(actual):
text = row_data[c_idx] if c_idx < len(row_data) else ''
cells[c_idx].text = text
for para in cells[c_idx].paragraphs:
for run in para.runs:
run.font.name = '宋体'
run.font.size = Pt(10)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
para.paragraph_format.space_before = Pt(2)
para.paragraph_format.space_after = Pt(2)
def _flush_table(doc, pending_table):
"""将收集的表格行写入doc,然后清空缓冲区"""
if pending_table:
_add_table_to_doc(doc, pending_table)
pending_table.clear()
def _write_para(doc, line, font='宋体', size=12, bold=False,
first_indent=Cm(0.74), before=Pt(2), after=Pt(6),
alignment=None, is_heading=False):
"""写入正文段落(统一封装,方便多处复用)"""
p = doc.add_paragraph()
if alignment is not None:
p.alignment = alignment
p.paragraph_format.first_line_indent = first_indent
p.paragraph_format.line_spacing = Pt(22)
p.paragraph_format.space_before = before
p.paragraph_format.space_after = after
r = p.add_run(_clean_inline(line))
r.font.size = Pt(size)
r.font.bold = bold
cjk(r, font)
return p
# ============ Markdown → docx ============
def md_to_paragraphs(doc, content, add_page_break=True):
"""将markdown内容写入docx,正确处理表格和Mermaid图表"""
# --- Mermaid 预处理 ---
processed_content, rendered_images = process_mermaid_blocks(content)
# 建立 mermaid 代码 → 图片路径 的映射
mermaid_img_map = {}
if rendered_images:
for img_path in rendered_images:
img_name = os.path.basename(img_path)
# 从处理后的内容中提取 mermaid 代码作为 key(用标记)
for m in re.finditer(r'\[Mermaid图表已渲染,见附件:\s*(\S+)\]', processed_content):
fname = m.group(1)
if fname == img_name:
mermaid_img_map[m.group(0)] = img_path
break
lines = processed_content.split('\n')
i = 0
pending_table = []
mermaid_img_iter = iter(rendered_images) if rendered_images else iter([])
while i < len(lines):
line = lines[i].rstrip()
i += 1
# --- Mermaid 已渲染图片插入 ---
if '[Mermaid图表已渲染,见附件:' in line:
img_path = next(mermaid_img_iter, None)
if img_path and os.path.exists(img_path):
_flush_table(doc, pending_table)
try:
p = doc.add_paragraph()
run = p.add_run()
run.add_picture(img_path, width=Inches(5.5))
except Exception as e:
# 图片插入失败降级为文字提示
p = doc.add_paragraph()
r = p.add_run(line + ' [图片渲染失败]')
r.font.size = Pt(10); cjk(r, '宋体')
continue
if not line.strip():
_flush_table(doc, pending_table)
continue
if line.startswith('# '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_before = Pt(12)
p.paragraph_format.space_after = Pt(10)
r = p.add_run(_clean_inline(line[2:]))
r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体')
continue
if line.startswith('## '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(10); p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line[3:]))
r.font.size = Pt(14); r.font.bold = True; cjk(r, '楷体')
continue
if line.startswith('### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(8); p.paragraph_format.space_after = Pt(4)
r = p.add_run(_clean_inline(line[4:]))
r.font.size = Pt(12); r.font.bold = True; cjk(r, '仿宋')
continue
if line.startswith('#### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(6); p.paragraph_format.space_after = Pt(3)
r = p.add_run(_clean_inline(line[5:]))
r.font.size = Pt(11); r.font.bold = True; cjk(r, '仿宋')
continue
# markdown 表格行
if _is_table_line(line):
if not _is_separator_line(line):
pending_table.append(line)
continue
# 非表格行 → flush 缓存表格,写入正文段落
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.first_line_indent = Cm(0.74)
p.paragraph_format.line_spacing = Pt(22)
p.paragraph_format.space_before = Pt(2)
p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line))
r.font.size = Pt(12); cjk(r, '宋体')
# 处理末尾可能残留的表格
_flush_table(doc, pending_table)
if add_page_break:
doc.add_page_break()
# ============ 章节解析(错误隔离)============
def safe_parse_chapter(fpath: str) -> Optional[Tuple]:
fname = os.path.basename(fpath).replace('.txt', '')
seq = fname.split('-')[0]
try:
with open(fpath, 'r', encoding='utf-8') as fp:
content = fp.read()
except Exception as e:
print(f"[ERROR] 读取失败 {fname}: {e}")
return None
h2_entries = [l[3:].strip() for l in content.split('\n') if l.strip().startswith('## ')]
title = fname
for line in content.split('\n'):
line = line.strip()
if line.startswith('# '):
title = line[2:].strip()
break
return (seq, fname, title, content, h2_entries)
def parse_chapters(txt_files: List[str]) -> List[Tuple]:
seen_seq = set()
chapters, errors = [], []
for f in txt_files:
seq = os.path.basename(f).replace('.txt', '').split('-')[0]
if seq in seen_seq:
continue
result = safe_parse_chapter(f)
if result is None:
errors.append(os.path.basename(f)); continue
seen_seq.add(seq); chapters.append(result)
if errors:
print(f"[WARN] 以下章节解析失败(已跳过): {errors}")
return chapters
# ============ 字符统计 ============
def count_chars(text: str) -> int:
return len([c for c in text if c.strip()])
# ============ Glossary 生成(前置版)============
def generate_glossary(txt_files: List[str] = None, ref_text: str = "", max_terms=80) -> Dict[str, Any]:
"""从参考资料和章节内容中生成术语表"""
all_terms: Dict[str, int] = {}
# 从参考资料提取
if ref_text:
ref_terms = extract_terms_from_reference(ref_text, max_terms)
for item in ref_terms:
all_terms[item['term']] = all_terms.get(item['term'], 0) + item['count']
# 从章节内容提取
if txt_files:
stopwords = {'以及', '包括', '可以', '通过', '根据', '按照', '为了', '由于', '其中', '其他', '相关', '以上', '以下', '对于', '并且', '或者', '等等', '本项目', '本公司', '本系统'}
pattern = re.compile(r'[\u4e00-\u9fff]{4,}')
for f in txt_files:
try:
with open(f, 'r', encoding='utf-8') as fp:
content = fp.read()
for w in pattern.findall(content):
if w not in stopwords and len(w) >= 4:
all_terms[w] = all_terms.get(w, 0) + 1
except Exception:
continue
# 按频次排序取前max_terms
sorted_terms = sorted(all_terms.items(), key=lambda x: -x[1])[:max_terms]
glossary = {
"generated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"total_ref_chars": len(ref_text),
"terms": [{"term": t, "count": c} for t, c in sorted_terms]
}
with open(GLOSSARY_FILE, 'w', encoding='utf-8') as f:
json_module.dump(glossary, f, ensure_ascii=False, indent=2)
print(f"[GLOSSARY] 术语表已生成: {GLOSSARY_FILE}(共 {len(sorted_terms)} 个术语)")
return glossary
def load_glossary() -> Dict[str, Any]:
if os.path.exists(GLOSSARY_FILE):
try:
with open(GLOSSARY_FILE, 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
pass
return {"terms": []}
def glossary_to_prompt_text(glossary: Dict[str, Any], max_terms=30) -> str:
"""将术语表转为子Agent可读的提示文本"""
terms = glossary.get('terms', [])
if not terms:
return "(术语表暂无数据,完成 Batch A 后自动生成)"
display = terms[:max_terms]
lines = [f"- {t['term']}(出现{t['count']}次)" for t in display]
suffix = f"\n(共 {len(terms)} 个术语,仅展示前 {max_terms} 个)" if len(terms) > max_terms else ""
return '\n'.join(lines) + suffix
# ============ 大纲快照 ============
def save_outline_snapshot(plan: Dict[str, Any]):
"""保存规划师输出的大纲快照"""
lines = [f"# 文档大纲快照({datetime.now().strftime('%Y-%m-%d %H:%M')})"]
project_name = plan.get('project_name', '未知项目')
lines.append(f"\n项目:{project_name}\n")
for ch in plan.get('chapters', []):
lines.append(f"第{ch.get('seq','?')}章 | {ch.get('title','')} | Batch {ch.get('batch','')} | 约{ch.get('word_count',0)}字 | 依赖:{ch.get('dependencies',[])}")
content = '\n'.join(lines)
with open(OUTLINE_SNAPSHOT, 'w', encoding='utf-8') as f:
f.write(content)
print(f"[SNAPSHOT] 大纲快照已保存: {OUTLINE_SNAPSHOT}")
# ============ 批量版本快照 ============
def save_batch_snapshot(batch_label: str, batch_chapters: List[Tuple]):
"""保存每批完成后的章节内容快照"""
snapshot_file = f"{CHAPTERS_DIR}/snapshot_{batch_label}_{datetime.now().strftime('%Y%m%d_%H%M')}.md"
lines = [f"# {batch_label} 快照({datetime.now().strftime('%Y-%m-%d %H:%M')})"]
for seq, fname, title, content, _ in batch_chapters:
lines.append(f"\n---\n## 第{seq}章 {title}\n")
# 只保存前200字预览
preview = content[:300].replace('\n', ' ').strip()
lines.append(f"[预览] {preview}...")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
print(f"[SNAPSHOT] 批次快照已保存: {snapshot_file}")
# ============ 跨章一致性审查 ============
def extract_quantities(text: str) -> Dict[str, str]:
qty = {}
pattern = re.compile(r'(\d+(?:\.\d+)?)\s*(万元|万元/年|万元\/年|人|人\/日|台|套|个|次|年|月|天|%)')
for m in pattern.finditer(text):
key = f"{m.group(1)}{m.group(2)}"
qty[key] = m.group(0)
return qty
def check_cross_chapter_consistency(chapters_data: List[Tuple]) -> List[Dict]:
issues = []
all_qty = [(seq, fname, extract_quantities(content)) for seq, fname, title, content, _ in chapters_data]
for i in range(len(all_qty) - 1):
seq_a, fname_a, qty_a = all_qty[i]
seq_b, fname_b, qty_b = all_qty[i + 1]
shared = set(qty_a.keys()) & set(qty_b.keys())
for key in shared:
ma = re.match(r'^(\d+(?:\.\d+)?)', key)
mb = re.match(r'^(\d+(?:\.\d+)?)', key)
if ma and mb:
try:
if float(ma.group(1)) != float(mb.group(1)):
issues.append({
"seq_a": seq_a, "seq_b": seq_b,
"item": key,
"value_a": qty_a[key], "value_b": qty_b[key]
})
except ValueError:
continue
return issues
# ============ 执行摘要 ============
def _build_summary(chapters_data, max_chars=800):
lines, total = [], 0
for seq, fname, title, content, h2_list in chapters_data:
para_lines = []
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('# ') or line.startswith('## ') or line.startswith('### '):
continue
para_lines.append(line)
if len(para_lines) >= 3:
break
if not para_lines:
continue
para_text = ''.join(para_lines[:2])
if total + len(para_text) > max_chars:
remaining = max_chars - total
if remaining > 50:
lines.append(para_text[:remaining] + '…')
break
lines.append(para_text)
total += len(para_text)
return lines or ['本报告对项目建设进行了全面可行性分析。']
# ============ 最终文档生成 ============
def generate_final_doc(chapters_data, page_estimates, output_path=FINAL_DOC, incremental=True):
"""
生成整合报告 docx。
incremental=True(默认):对比 hash,仅章节内容变化才重写该章;
无变化时跳过重写,直接复用已有章节 docx。
"""
plan = load_plan()
# --- 增量更新:检查哪些章节发生了变化 ---
changed_chapters = chapters_data
if incremental:
hashes = load_hashes()
changed_chapters = get_changed_chapters(chapters_data, hashes)
unchanged = [item for item in chapters_data if item not in changed_chapters]
changed_seqs = {item[0] for item in changed_chapters}
if unchanged and not changed_chapters:
print(f"[INCREMENTAL] 所有 {len(chapters_data)} 章内容未变化,跳过重写")
return None
elif unchanged:
print(f"[INCREMENTAL] {len(unchanged)} 章未变化,{len(changed_chapters)} 章需重写: {changed_seqs}")
doc = Document()
s = doc.sections[0]
s.page_height = Inches(11.69); s.page_width = Inches(8.27)
s.top_margin = Inches(1.0); s.bottom_margin = Inches(1.0)
s.left_margin = Inches(1.18); s.right_margin = Inches(1.18)
# 封面
for _ in range(6): doc.add_paragraph()
for txt, size, bold, font in [
(plan.get('org_name', '编制单位'), Pt(26), True, '黑体'),
(plan.get('project_name', '项目名称'), Pt(32), True, '黑体'),
]:
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
r = p.add_run(txt); r.font.size = size; r.font.bold = bold; cjk(r, font)
for _ in range(3): doc.add_paragraph()
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
r = p.add_run(plan.get('doc_type', '可行性研究报告'))
r.font.size = Pt(22); cjk(r, '楷体')
for _ in range(8): doc.add_paragraph()
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
unit = plan.get('编制单位', '编制单位')
build_time = plan.get('编制时间', datetime.now().strftime('%Y年%m月'))
r = p.add_run(f'编制单位:{unit}\n编制时间:{build_time}')
r.font.size = Pt(14); cjk(r, '宋体')
doc.add_page_break()
# 执行摘要
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_before = Pt(12); p.paragraph_format.space_after = Pt(10)
r = p.add_run('执行摘要'); r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体')
for pt in _build_summary(changed_chapters if changed_chapters else chapters_data):
p2 = doc.add_paragraph()
p2.paragraph_format.first_line_indent = Cm(0.74)
p2.paragraph_format.line_spacing = Pt(22)
p2.paragraph_format.space_after = Pt(6)
r2 = p2.add_run(pt); r2.font.size = Pt(12); cjk(r2, '宋体')
doc.add_page_break()
# 目录(使用真实 Word TOC 字段)
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_before = Pt(12); p.paragraph_format.space_after = Pt(10)
r = p.add_run('目 录'); r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体')
add_toc_entry(doc, '一', '执行摘要', 1, toc_type='summary')
seen = set()
for seq, fname, title, content, h2_list in (changed_chapters if changed_chapters else chapters_data):
if seq in seen: continue
seen.add(seq)
start = page_estimates.get(seq, (1, 0, 1))[0]
if not seq.isdigit(): continue
add_toc_entry(doc, f'第{int(seq)}章', title, start, toc_type='chapter')
doc.add_page_break()
# 各章节
target = changed_chapters if changed_chapters else chapters_data
for seq, fname, title, content, h2_list in target:
md_to_paragraphs(doc, content, add_page_break=True)
# --- 更新 hash 清单(增量记录)---
if incremental:
new_hashes = {}
for item in (changed_chapters if changed_chapters else chapters_data):
seq, content = item[0], item[3]
new_hashes[seq] = compute_content_hash(content)
# 合并未变化章节的旧 hash
old_hashes = load_hashes()
old_hashes.update(new_hashes)
save_hashes(old_hashes)
doc.save(output_path)
print(f"[DONE] 整合报告已保存: {output_path}")
return output_path
# ============ 整合报告主流程 ============
def generate_with_accurate_toc(txt_dir=CHAPTERS_DIR, final_doc=FINAL_DOC):
txt_files = sorted(glob.glob(f'{txt_dir}/*.txt'))
if not txt_files:
print(f"[ERROR] 未找到章节文件: {txt_dir}/*.txt"); return None
chapters_data = parse_chapters(txt_files)
if not chapters_data:
print("[ERROR] 所有章节解析均失败"); return None
print(f"[PARSE] 解析 {len(chapters_data)} 个章节")
# 更新术语表(含参考资料)
ref_text = load_reference()
generate_glossary(txt_files, ref_text=ref_text)
# 跨章一致性审查
issues = check_cross_chapter_consistency(chapters_data)
if issues:
print(f"[CONSISTENCY] 发现 {len(issues)} 个潜在不一致:")
for iss in issues:
print(f" - {iss['message']}")
else:
print("[CONSISTENCY] 跨章一致性检查通过 (OK)")
# 估算页码
pe = {}
cur = 7
for seq, fname, title, content, h2_list in chapters_data:
cc = count_chars(content)
ep = max(1, (cc + CHARS_PER_PAGE - 1) // CHARS_PER_PAGE)
pe[seq] = (cur, cc, ep); cur += ep
print("[BUILD] 生成整合报告...")
generate_final_doc(chapters_data, pe, output_path=final_doc)
# 纯文本版
md_path = final_doc.replace('.docx', '-纯文本.md')
with open(md_path, 'w', encoding='utf-8') as f:
f.write('\n\n---\n\n'.join(c for _, _, _, c, _ in chapters_data))
print(f"[MD] 纯文本版已保存: {md_path}")
return final_doc
# ============ 单章 docx 转换 ============
def convert_single_chapter_inline(txt_path, docx_path):
"""将txt章节文件转换为docx,正确解析markdown表格"""
try:
doc = Document()
s = doc.sections[0]
s.page_height = Inches(11.69); s.page_width = Inches(8.27)
s.top_margin = Inches(1.0); s.bottom_margin = Inches(1.0)
s.left_margin = Inches(1.18); s.right_margin = Inches(1.18)
with open(txt_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
pending_table = []
for line in lines:
line = line.rstrip()
if not line.strip():
_flush_table(doc, pending_table)
continue
if line.startswith('# '):
_flush_table(doc, pending_table)
p = doc.add_paragraph(); p.alignment = 1
p.paragraph_format.space_before = Pt(12); p.paragraph_format.space_after = Pt(10)
r = p.add_run(_clean_inline(line[2:])); r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体')
continue
if line.startswith('## '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(10); p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line[3:])); r.font.size = Pt(14); r.font.bold = True; cjk(r, '楷体')
continue
if line.startswith('### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(8); p.paragraph_format.space_after = Pt(4)
r = p.add_run(_clean_inline(line[4:])); r.font.size = Pt(12); r.font.bold = True; cjk(r, '仿宋')
continue
if line.startswith('#### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(6); p.paragraph_format.space_after = Pt(3)
r = p.add_run(_clean_inline(line[5:])); r.font.size = Pt(11); r.font.bold = True; cjk(r, '仿宋')
continue
# markdown 表格行
if _is_table_line(line):
if not _is_separator_line(line):
pending_table.append(line)
continue
# 普通正文段落
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.first_line_indent = Cm(0.74)
p.paragraph_format.line_spacing = Pt(22)
p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line)); r.font.size = Pt(12); cjk(r, '宋体')
# 处理末尾残留表格
_flush_table(doc, pending_table)
doc.save(docx_path)
return docx_path
except Exception as e:
print(f"[ERROR] 转换失败 {txt_path}: {e}")
raise
def _convert_worker(args) -> Tuple[str, bool, str]:
txt_path, docx_path = args
try:
convert_single_chapter_inline(txt_path, docx_path)
return (docx_path, True, '')
except Exception as e:
return (txt_path, False, str(e))
# ============ 批量并行转换 ============
def batch_convert_txt_to_docx(txt_dir=CHAPTERS_DIR, max_concurrent=8, progress_file=PROGRESS_FILE, incremental=True):
"""
批量将 txt 章节转换为 docx。
incremental=True(默认):对比内容hash,仅转换有变化的章节。
force=False:跳过已存在的docx(默认True)。
"""
txt_files = sorted(glob.glob(os.path.join(txt_dir, '*.txt')))
if not txt_files:
print(f"[ERROR] 未找到 .txt 文件"); return []
hashes = load_hashes() if incremental else {}
jobs = []
for tf in txt_files:
docx_path = tf.replace('.txt', '.docx')
content_hash = compute_content_hash(open(tf, 'r', encoding='utf-8').read())
if incremental and os.path.exists(docx_path):
if hashes.get(os.path.basename(tf)) == content_hash:
print(f" [SKIP] {os.path.basename(tf)} 内容未变化,跳过")
continue
jobs.append((tf, docx_path))
if not jobs:
print("[INFO] 所有章节已是最新(无变化),跳过转换")
return []
print(f"[BATCH] 待转换 {len(jobs)} 个章节,并发上限 {max_concurrent}")
completed, failed = [], []
with ProcessPoolExecutor(max_workers=max_concurrent) as executor:
futures = {executor.submit(_convert_worker, job): job for job in jobs}
for future in as_completed(futures):
docx_path, ok, err = future.result()
if ok:
# 更新 hash
txt_path = docx_path.replace('.docx', '.txt')
if os.path.exists(txt_path):
hashes[os.path.basename(txt_path)] = compute_content_hash(
open(txt_path, 'r', encoding='utf-8').read()
)
completed.append(docx_path); print(f" [OK] {os.path.basename(docx_path)}")
else:
failed.append((docx_path, err)); print(f" [FAIL] {os.path.basename(docx_path)}: {err}")
if incremental and completed:
save_hashes(hashes)
print(f"\n[BATCH] {len(completed)}/{len(jobs)} 成功,{len(failed)} 失败")
return completed
# ============ 进度文件 ============
def load_progress() -> Dict:
if os.path.exists(PROGRESS_FILE):
try:
with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
pass
return {"total": 0, "completed": 0, "batches": [], "current": ""}
# ============ CLI 入口 ============
if __name__ == '__main__':
if len(sys.argv) >= 2 and sys.argv[1] == '--convert-one':
if len(sys.argv) != 4:
print("用法: python integrate_report.py --convert-one <in.txt> <out.docx>"); sys.exit(1)
convert_single_chapter_inline(sys.argv[2], sys.argv[3])
print(f"saved: {sys.argv[3]}", flush=True); sys.exit(0)
elif len(sys.argv) >= 2 and sys.argv[1] == 'convert-batch':
txt_dir = sys.argv[2] if len(sys.argv) > 2 else CHAPTERS_DIR
batch_convert_txt_to_docx(txt_dir=txt_dir)
elif len(sys.argv) >= 2 and sys.argv[1] == 'glossary':
txt_files = sorted(glob.glob(f'{CHAPTERS_DIR}/*.txt'))
ref_text = load_reference()
generate_glossary(txt_files, ref_text=ref_text)
elif len(sys.argv) >= 2 and sys.argv[1] == 'check':
txt_files = sorted(glob.glob(f'{CHAPTERS_DIR}/*.txt'))
chapters_data = parse_chapters(txt_files)
issues = check_cross_chapter_consistency(chapters_data)
if not issues:
print("[OK] 跨章一致性检查通过,无不一致项")
else:
for iss in issues:
print(f"[WARN] {iss['message']}")
elif len(sys.argv) >= 2 and sys.argv[1] == 'status':
prog = load_progress()
print(f"进度: {prog.get('completed',0)}/{prog.get('total','?')}")
if prog.get('current'): print(f"状态: {prog['current']}")
elif len(sys.argv) >= 2 and sys.argv[1] == 'ref':
# 仅查看/更新参考资料
if len(sys.argv) >= 3:
action = sys.argv[2]
if action == 'show':
ref = load_reference()
print(f"参考资料: {len(ref)} 字符")
print(ref[:500] if ref else "(空)")
elif action == 'clear':
save_reference("")
print("参考资料已清空")
else:
ref = load_reference()
print(f"当前参考资料: {len(ref)} 字符")
else:
# 默认:生成整合报告
txt_dir = sys.argv[1] if len(sys.argv) > 1 else CHAPTERS_DIR
result = generate_with_accurate_toc(txt_dir=txt_dir)
if result:
print(f"\n[DONE] 整合报告生成完成: {result}")
FILE:parallel_tracker.py
"""
parallel_tracker.py
===================
多子Agent并行撰写可视化追踪模块
工作原理:
1. 主Agent使用 sessions_spawn 并行启动多个子Agent
2. 每个子Agent启动后向 TRACKER_FILE 写入自己的状态
3. 主Agent周期性地读取 TRACKER_FILE 并渲染可视化表格
使用方式:
from parallel_tracker import Tracker, update_chapter_status
# 子Agent端:启动时注册
tracker = Tracker()
tracker.register(seq="04", title="系统架构设计", batch="B")
tracker.update(seq="04", phase="writing", progress=50, note="撰写功能模块...")
# 子Agent端:完成后标记
tracker.update(seq="04", phase="done", progress=100)
"""
import json, os, time, sys, threading
from datetime import datetime
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
# ============ 配置 ============
CHAPTERS_DIR = 'F:/agent/chapters'
TRACKER_FILE = f'{CHAPTERS_DIR}/writing_tracker.json'
# ============ 追踪器 ============
_GLOBAL_TRACKER: Optional['Tracker'] = None
_GLOBAL_LOCK = threading.Lock()
class Tracker:
"""多子Agent并行撰写状态追踪器(线程安全单例)"""
def __init__(self, tracker_file: str = TRACKER_FILE):
self.tracker_file = tracker_file
self._ensure_file()
@staticmethod
def get_instance(tracker_file: str = TRACKER_FILE) -> 'Tracker':
"""获取单例实例(线程安全)"""
global _GLOBAL_TRACKER
if _GLOBAL_TRACKER is None:
with _GLOBAL_LOCK:
if _GLOBAL_TRACKER is None:
_GLOBAL_TRACKER = Tracker(tracker_file)
return _GLOBAL_TRACKER
def _ensure_file(self):
if not os.path.exists(self.tracker_file):
self._write({})
def _read(self) -> Dict[str, Any]:
with _GLOBAL_LOCK:
try:
with open(self.tracker_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
def _write(self, data: Dict[str, Any]):
with _GLOBAL_LOCK:
with open(self.tracker_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def register(self, seq: str, title: str, batch: str = "", agent_id: str = ""):
"""子Agent启动时注册"""
data = self._read()
data[seq] = {
"seq": seq,
"title": title,
"batch": batch,
"agent_id": agent_id,
"phase": "registered", # registered | outline | writing | reviewing | done | error
"progress": 0,
"note": "已注册,等待启动...",
"started_at": datetime.now().strftime('%H:%M:%S'),
"updated_at": datetime.now().strftime('%H:%M:%S'),
}
self._write(data)
return self
def update(self, seq: str, phase: str, progress: int = None,
note: str = "", error: str = ""):
"""
更新子Agent撰写状态
phase: registered | outline | writing | reviewing | done | error
progress: 0-100
"""
data = self._read()
if seq not in data:
# 未注册,自动注册
data[seq] = {"seq": seq, "title": seq, "batch": ""}
entry = data[seq]
entry["phase"] = phase
if progress is not None:
entry["progress"] = progress
if note:
entry["note"] = note
if error:
entry["error"] = error
entry["updated_at"] = datetime.now().strftime('%H:%M:%S')
self._write(data)
return self
def mark_done(self, seq: str, note: str = "已完成"):
return self.update(seq, phase="done", progress=100, note=note)
def mark_error(self, seq: str, error: str):
return self.update(seq, phase="error", note="出错", error=error)
def get_status(self) -> Dict[str, Any]:
return self._read()
def clear(self):
"""清空追踪状态(每批次开始前调用)"""
self._write({})
def get_summary(self) -> Dict[str, int]:
data = self._read()
phases = {}
for entry in data.values():
p = entry.get("phase", "unknown")
phases[p] = phases.get(p, 0) + 1
return phases
# ============ 可视化渲染 ============
TRACKER_FILE_FOR_PRINT = TRACKER_FILE # 模块级引用
def _progress_bar(progress: int, width: int = 12) -> str:
"""渲染进度条:▓░░░░░░░░░░"""
filled = int(width * progress / 100)
return '▓' * filled + '░' * (width - filled)
def _phase_emoji(phase: str) -> str:
emoji_map = {
"registered": "⏳",
"outline": "📋",
"writing": "✍️",
"reviewing": "🔍",
"done": "✅",
"error": "❌",
}
return emoji_map.get(phase, "⚪")
def render_progress_table(tracker_file: str = TRACKER_FILE) -> str:
"""
渲染当前并行撰写状态表格
返回格式:
╔══════════════════════════════════════════════════════════════╗
║ 📊 多子Agent并行撰写进度监控 ║
╠══════════════════════════════════════════════════════════════╣
║ 04 系统架构设计 ✍️ writing ▓▓▓▓▓▓░░░░ 50% 撰写功能模块... ║
║ 05 技术路线 ✍️ writing ▓▓▓░░░░░░░ 25% 撰写技术选型... ║
║ 06 功能模块设计 ⏳ registered ───────── 0% 等待启动... ║
╚══════════════════════════════════════════════════════════════╝
"""
try:
with open(tracker_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception:
return "(追踪文件暂不可用)"
if not data:
return "(暂无并行撰写任务)"
# 按 seq 排序
sorted_entries = sorted(data.values(), key=lambda x: x.get('seq', '0'))
# 计算全局进度
total = len(sorted_entries)
done = sum(1 for e in sorted_entries if e.get('phase') == 'done')
errors = sum(1 for e in sorted_entries if e.get('phase') == 'error')
overall_pct = int((done / total * 100)) if total > 0 else 0
header = (
f"╔══════════════════════════════════════════════════════════════╗\n"
f"║ 📊 多子Agent并行撰写进度监控 [{done}/{total} 完成"
f"{' ❌' + str(errors) if errors > 0 else ''}] 总体 {overall_pct}% ║\n"
f"╠══════════════════════════════════════════════════════════════╣"
)
footer = "╚══════════════════════════════════════════════════════════════╝"
rows = []
for entry in sorted_entries:
seq = entry.get('seq', '??').rjust(2)
title = entry.get('title', '')[:14].ljust(14)
phase_icon = _phase_emoji(entry.get('phase', ''))
phase_name = entry.get('phase', '').rjust(10)
progress = entry.get('progress', 0)
bar = _progress_bar(progress)
pct = str(progress).rjust(3) + '%'
note = (entry.get('note', '') or '').strip()[:20].ljust(20)
batch = entry.get('batch', '')
batch_str = f"[{batch}] " if batch else " "
row = f"║ {seq} {batch_str}{title} {phase_icon} {phase_name} {bar} {pct} {note} ║"
rows.append(row)
return '\n'.join([header] + rows + [footer])
def print_progress(tracker_file: str = TRACKER_FILE):
"""打印进度表格到标准输出(供 exec 调用)"""
print(render_progress_table(tracker_file), flush=True)
# ============ 轮询监控器 ============
class ProgressMonitor:
"""
定期轮询 tracker 文件并打印进度的监控器
用于在子Agent并行撰写时,主session展示实时进度
"""
def __init__(self, tracker_file: str = TRACKER_FILE, interval_sec: float = 8.0):
self.tracker_file = tracker_file
self.interval_sec = interval_sec
self._running = False
def start(self, duration_sec: float = None):
"""
启动监控循环
duration_sec: 监控持续秒数,None表示直到所有任务完成
"""
self._running = True
import time
start = time.time()
last_seen_done = set()
print(f"[MONITOR] 启动进度监控(间隔{self.interval_sec}秒)", flush=True)
while self._running:
try:
with open(self.tracker_file, 'r', encoding='utf-8') as f:
data = json.load(f)
entries = list(data.values())
if not entries:
time.sleep(self.interval_sec)
continue
# 检查是否全部完成
done_seqs = {e['seq'] for e in entries if e.get('phase') == 'done'}
error_seqs = {e['seq'] for e in entries if e.get('phase') == 'error'}
# 打印进度
os.system('cls' if os.name == 'nt' else 'clear')
print(render_progress_table(self.tracker_file), flush=True)
# 新完成任务时提示
new_done = done_seqs - last_seen_done
if new_done:
print(f"\n✅ 新完成:第 {[e['seq'] for e in entries if e['seq'] in new_done]} 章", flush=True)
last_seen_done = done_seqs
# 检查是否全部结束
all_done = len(done_seqs) + len(error_seqs) == len(entries)
if all_done:
print(f"\n[MONITOR] 所有章节撰写完成!", flush=True)
break
# 检查超时
if duration_sec and (time.time() - start) >= duration_sec:
print(f"\n[MONITOR] 监控超时({duration_sec}秒)", flush=True)
break
time.sleep(self.interval_sec)
except Exception as e:
print(f"[MONITOR] 轮询异常: {e}", flush=True)
time.sleep(self.interval_sec)
def stop(self):
self._running = False
# ============ 子Agent端辅助函数 ============
def get_tracker() -> Tracker:
"""获取Tracker单例(子Agent端推荐使用)"""
return Tracker.get_instance()
def chapter_register(seq: str, title: str, batch: str = ""):
"""子Agent启动时调用:注册章节撰写任务"""
Tracker().register(seq=seq, title=title, batch=batch)
def chapter_update(seq: str, phase: str, progress: int = None, note: str = ""):
"""子Agent撰写过程中调用:更新进度"""
Tracker().update(seq=seq, phase=phase, progress=progress, note=note)
def chapter_done(seq: str, note: str = "已完成"):
"""子Agent完成时调用:标记完成"""
Tracker().mark_done(seq=seq, note=note)
def chapter_error(seq: str, error: str):
"""子Agent出错时调用:标记错误"""
Tracker().mark_error(seq=seq, error=error)
# ============ CLI 入口 ============
if __name__ == '__main__':
if len(sys.argv) >= 2:
cmd = sys.argv[1]
tracker = Tracker()
if cmd == 'show' or len(sys.argv) == 2:
print(render_progress_table())
elif cmd == 'clear':
tracker.clear()
print("追踪状态已清空")
elif cmd == 'status':
summary = tracker.get_summary()
print(f"当前状态: {summary}")
total = sum(summary.values())
done = summary.get('done', 0)
print(f"进度: {done}/{total} 完成")
elif cmd == 'wait':
# 阻塞监控模式
import time
print("开始监控... Ctrl+C 停止")
try:
while True:
os.system('cls' if os.name == 'nt' else 'clear')
print(render_progress_table())
time.sleep(8)
except KeyboardInterrupt:
print("\n监控已停止")
elif cmd == 'register' and len(sys.argv) >= 4:
_, _, seq, title, *rest = sys.argv
batch = rest[0] if rest else ""
tracker.register(seq, title, batch)
print(f"已注册:第{seq}章 {title} [{batch}]")
elif cmd == 'update' and len(sys.argv) >= 4:
_, _, seq, phase, *rest = sys.argv
progress = int(restr[0]) if rest and rest[0].isdigit() else None
note = rest[1] if len(restr := rest) > 1 else ""
tracker.update(seq, phase, progress, note)
print(f"已更新:第{seq}章 {phase} {progress or ''}% {note}")
elif cmd == 'done' and len(sys.argv) >= 3:
seq = sys.argv[2]
tracker.mark_done(seq)
print(f"已标记完成:第{seq}章")
else:
print(render_progress_table())
FILE:references/bug_fix_guide.md
# Bug Troubleshooting & Forced Rebuild
## Fixed: Table Rendering Distortion
### Problem Description
In the generated docx, tables have only 1 column with content split character-by-character; or column count far exceeds expectation (e.g., 4-column table becomes 60+ columns).
### Root Cause
Bug in `_flush_table` function in `integrate_report.py`:
```python
# ❌ Buggy code (early v3)
def _flush_table(doc, pending_table):
if pending_table:
_add_table_to_doc(doc, pending_table) # ← Passing raw string list!
pending_table.clear()
```
- `pending_table` stores `['| Col1 | Col2 | Col3 |', ...]` (string list)
- `_add_table_to_doc` uses `max(len(r) for r in rows)` to calculate column count
- Calling `len()` on a string gives character count (17), not cell count (3)
- Result: 4-column table → 63 columns → each character occupies one cell → completely distorted
### Fix
```python
# ✅ Correct code (current version)
def _flush_table(doc, pending_table):
if pending_table:
parsed_rows = _parse_md_table(pending_table) # ← NEW: parse to 2D array first
_add_table_to_doc(doc, parsed_rows) # ← Pass parsed array
pending_table.clear()
```
### Validation
```python
from docx import Document
doc = Document('F:/agent/整合报告.docx')
for t in doc.tables:
print(f'{len(t.rows)} rows x {len(t.columns)} cols')
# Normal column count: 2~8 columns
# If you see 15+, 30+, 60+ columns → Bug still exists
```
---
## Fixed: Cover Style Comparison Type Error
### Problem Description
`cover_style` in `plan.json` is an integer (e.g., `4`), but code compares as string, causing cover to always take the generic branch — styled cover doesn't apply.
### Root Cause
```python
# ❌ Buggy code
cover_style = plan.get('cover_style', '4')
if cover_style == '4': # Integer 4 != String '4', always False
```
### Fix
```python
# ✅ Correct code
cover_style = str(plan.get('cover_style', '4'))
if cover_style == '4': # String comparison, works correctly
```
---
## Fixed: RGB Color Assignment Error
### Problem Description
Using `eval(f'0x{hex_color}')` to assign color causes `run.font.color.rgb` to receive an integer instead of an `RGBColor` object, throwing an error.
### Root Cause
```python
# ❌ Buggy code
run.font.color.rgb = eval(f'0x{H1_TEXT}') # eval('0xFFFFFF') → 16777215 (int)
# ValueError: rgb color value must be RGBColor object, got <class 'int'>
```
### Fix
```python
# ✅ Correct code
from docx.shared import RGBColor # ← Must import
run.font.color.rgb = RGBColor.from_string(H1_TEXT)
```
---
## Fixed: Incremental Cache Causing New Code to Not Take Effect
### Problem Description
After modifying core logic in `integrate_report.py` and regenerating, incremental mode skips all chapters.
### Root Cause
Python caches compiled `.pyc` files. After modifying `.py`, if cache isn't deleted, the imported code is still the old version. Also `content_hashes.json` causes rewrite skipping.
### Fix
After every code change, do both:
```bash
# 1. Delete .pyc cache
del "C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\long-doc-agent\__pycache__\integrate_report.cpython-311.pyc"
# 2. Delete incremental hash
del F:\agent\chapters\content_hashes.json
# 3. Regenerate
python integrate_report.py
```
---
## Forced Rebuild
### Steps
```bash
# 1. Delete incremental cache and old report
del F:\agent\chapters\content_hashes.json
del F:\agent\整合报告.docx
# 2. Delete .pyc cache (required after code changes)
del "C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\long-doc-agent\__pycache__\integrate_report.cpython-311.pyc"
# 3. Regenerate
cd "C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\long-doc-agent"
python integrate_report.py
```
---
## New: RGBColor Property Access (python-docx 1.2.0)
### Problem Description
Error when running `make_docx.py`: `AttributeError: 'RGBColor' object has no attribute 'red'`
### Root Cause
python-docx 1.2.0's `RGBColor` object doesn't support `.red / .green / .blue` property access; must use index access.
### Fix
```python
# ❌ Wrong
'{:02X}{:02X}{:02X}'.format(rgb.red, rgb.green, rgb.blue)
# ✅ Correct
'{:02X}{:02X}{:02X}'.format(rgb[0], rgb[1], rgb[2])
```
---
## New: Cover Function Must Not Modify Global Page Margins
### Problem Description
Code in `add_cover()` setting `section.left_margin=0` etc. propagates to all pages after the cover, causing body text to fill the entire page (no margins).
### Root Cause
Word's Section properties persist across pages; margins set on the cover affect the entire document.
### Fix
Use a full-page table for the background color in the cover, **do not** modify any section margin properties. Set body margins once in `main()`.
```python
# ❌ Wrong
def add_cover(doc):
sec = doc.sections[0]
sec.left_margin=Inches(0) # ← Affects all pages!
...
# ✅ Correct: only add table, don't touch section
def add_cover(doc):
tbl_ = doc.add_table(rows=1, cols=1)
cell = tbl_.rows[0].cells[0]
# Just fill the table across the full page, don't touch section margin
```
---
## New: Auto-Rename When File Is Held Open
### Problem Description
If the generated docx file is already open in WPS/Word, saving again raises `PermissionError`.
### Fix
Add `_v2` suffix to filename (auto-increment), avoiding conflict with open files. In code:
```python
out_name = 'Hospital_Personnel_Location_Management_System_Proposal.docx'
out_path = os.path.join(out_dir, out_name)
if os.path.exists(out_path):
# File exists, add v2/v3... to avoid conflict
base, ext = os.path.splitext(out_name)
counter = 2
while os.path.exists(os.path.join(out_dir, f'{base}_v{counter}{ext}')):
counter += 1
out_name = f'{base}_v{counter}{ext}'
out_path = os.path.join(out_dir, out_name)
```
---
## New: write Tool Has 50KB Line Limit — Large Scripts Must Use Chunked Writing
### Problem Description
Using `write` tool to write Python scripts > ~50KB or ~2000 lines results in truncated content (only partial code written).
### Root Cause
The write tool has a per-file size limit.
### Fix
Write large scripts in two steps:
```python
# Step 1: Write main file (excluding trailing main() call)
with open('make_docx.py', 'w', encoding='utf-8') as f:
f.write(main_content) # Main content
# Step 2: Append trailing portion
closing = """
def main():
... # Trailing content
if __name__ == '__main__':
main()
"""
with open('make_docx.py', 'a', encoding='utf-8') as f:
f.write(closing)
```
---
## Other Common Issues
### Symptom: Table content all shows as `|`
`_parse_md_table` was not called. Confirm `_flush_table` contains `parsed_rows = _parse_md_table(pending_table)`.
### Symptom: Incremental mode skips modified chapters
Delete `content_hashes.json` to force full rebuild.
### Symptom: Sub-Agent-written txt contains garbled text
Sub-Agent output used wrong encoding. Ensure sub-agents save with `encoding='utf-8'`.
FILE:references/phase0_guide.md
# Phase 0: Requirements Confirmation Full Flow
## Trigger Conditions
User's first writing request ("I want to write xxx"/"help me write a feasibility report"/"write a xxx report")
## Four-Step Confirmation Flow
### Step 1 — Confirm Writing Topic
```
Please tell me the core information for this document:
1. What is the document topic?
(e.g., XX City People's Hospital Medical Asset Refined Management Solution)
2. What type of document is it?
(e.g., Feasibility Study Report / Technical Proposal / Business Plan)
3. Who is the primary audience?
(e.g., Hospital Management / Superior Regulatory Authority / Investors)
4. Overall style?
(e.g., Professional & Rigorous / Concise & Clear)
5. Any special requirements or constraints?
(e.g., Must include budget section / No more than 10 chapters, etc.)
```
### Step 2 — Confirm Writing Background
```
Please provide or describe the background information for this project/topic:
1. What is the project background?
(e.g., Current hospital asset management status, problems faced)
2. What are the construction goals?
(e.g., Improve asset utilization rate, control costs)
3. Any specific industry background?
(e.g., National policies, industry trends)
```
### Step 3 — Provide Reference Materials (Most Important)
```
Please provide reference materials related to this writing task (provide at least one):
A. Upload file: Send local file path or paste content directly
B. Feishu document: Provide document name or link (I will search via RAG)
C. Paste directly: Send reference text directly to this assistant
D. Not provided for now (skip, write using general background knowledge)
⚠️ Strongly recommend providing reference materials!
The more reference material, the more business-aligned the content, the higher the output quality.
Reference materials will be injected as the primary RAG knowledge source into each chapter's writing context.
```
### Step 4 — Outline Confirmation
After planner outputs the outline, display it to the user for confirmation:
```
📋 Planning outline generated. Please confirm the following chapter structure:
Project: XX City People's Hospital Medical Asset Full Lifecycle Refined Management Solution
Type: Feasibility Study Report | Audience: Hospital Management
Chapter Outline:
1. Chapter 01 Project Overview (Batch A, ~2500 words)
2. Chapter 02 Construction Background & Necessity (Batch A, ~3000 words)
...
Please confirm:
A. Outline OK, start writing
B. Need to adjust outline (please specify which chapters need modification/addition/deletion)
C. Cancel this writing task
```
## Saving Reference Materials
After user confirmation, save reference materials to `F:/agent/chapters/reference_material.txt`:
```python
with open('F:/agent/chapters/reference_material.txt', 'w', encoding='utf-8') as f:
f.write(reference_text)
```
FILE:references/phase1_guide.md
# Phase 1: Planner Full Prompt Template
## Execution Steps
1. Load `F:/agent/chapters/reference_material.txt` summary (first 3000 characters) as `reference_summary`
2. Replace `{xxx}` placeholders in the template below with actual values
3. Write output to `F:/agent/chapters/plan.json`
## Prompt Template
```
You are a professional project planner. The user needs to write a 【{doc_type}】 on the topic of "{topic}".
## User-Provided Background Information
{background}
## Reference Material Summary (Priority Reference)
{reference_summary}
Please complete the following tasks:
1. Create a detailed document outline (down to H3 headings)
2. Annotate core writing points for each chapter
3. Identify RAG search keywords for each chapter (≤3 per chapter)
4. Evaluate complexity of each chapter, mark key chapters
5. Identify chapter dependencies (which chapters must be completed before others can be written)
**Chapter Dependency Rules**:
- Type 1 (no dependencies, write first): Overview, Background, Current Analysis, Technology Selection
- Type 2 (depends on Type 1): Overall Design, Detailed Function Design
- Type 3 (depends on several preceding chapters): Implementation Plan, Testing Plan, Deployment Plan
- Type 4 (can write independently or last): Training Plan, Acceptance Plan, Appendices, Conclusion
**Reference Materials Prohibition**
Actively exclude content unrelated to the topic during planning (e.g., infusion monitoring systems, etc.).
Write the following structured information to F:/agent/chapters/plan.json:
{
"project_name": "Project Name",
"doc_type": "Document Type",
"chapters": [
{
"seq": "01",
"title": "Chapter Title",
"brief": "Writing Points",
"feishu_keywords": ["k1", "k2"],
"web_keywords": ["k1", "k2"],
"word_count": 3000,
"batch": "A",
"dependencies": [],
"status": "pending"
}
]
}
```
## plan.json Field Descriptions
| Field | Description |
|-------|-------------|
| `seq` | Chapter sequence number, 2-digit string ("01", "02") |
| `title` | Chapter title |
| `brief` | Core writing points (50-100 characters) |
| `feishu_keywords` | Feishu knowledge base search keywords, max 3 |
| `web_keywords` | Web search keywords, max 3 |
| `word_count` | Target word count (body text, excluding headings) |
| `batch` | Batch label ("A"/"B"/"C", same batch can be written in parallel) |
| `dependencies` | Dependent chapter seq list, e.g. `["01", "02"]` |
| `status` | Status: `pending`/`writing`/`txt_done`/`confirmed` |
## Post-Execution Actions
```bash
# 1. Generate initial glossary (extracted from reference materials)
python integrate_report.py glossary
# 2. Save outline snapshot
python integrate_report.py save-outline
# 3. Display outline to user for confirmation
```
FILE:references/phase2_guide.md
# Phase 2: Sub-Agent Full Prompt Template
## Template Variable Descriptions
| Variable | Source |
|----------|--------|
| `{seq}` | seq field of this chapter in plan.json |
| `{title}` | title field of this chapter in plan.json |
| `{batch}` | batch field of this chapter in plan.json |
| `{topic}` | Document topic |
| `{audience}` | Target audience |
| `{style}` | Overall style |
| `{word_count}` | word_count field of this chapter in plan.json |
| `{reference_summary}` | Reference material summary (first 3000 characters) |
| `{glossary_summary}` | Glossary summary (first 30 entries) |
| `{dependency_chapters}` | Title list of dependent chapters |
| `{chapter_brief}` | brief field of this chapter in plan.json |
| `{feishu_keywords}` | feishu_keywords from plan.json |
| `{web_keywords}` | web_keywords from plan.json |
| `{index}` | 2-digit sequential number (01, 02...) |
| `{short_name}` | Chapter short name (used in filenames) |
## Sub-Agent Prompt (Full Version)
```python
import sys
sys.path.insert(0, r'C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\long-doc-agent')
from parallel_tracker import chapter_register, chapter_update, chapter_done
chapter_register(seq='{seq}', title='{title}', batch='{batch}')
You are a professional document writing expert, responsible for writing the 【{chapter_title}】 chapter of the feasibility report.
## Basic Information
- Document Topic: {topic}
- Target Audience: {audience}
- Overall Style: {style}
- This Chapter's Word Target: {word_count} words
## Reference Materials (Priority Use)
{reference_summary}
## Glossary Reference (Must Use Consistent Terminology)
{glossary_summary}
## Dependencies
This chapter depends on the following completed chapters:
{dependency_chapters}
## This Chapter's Writing Points
{chapter_brief}
## RAG Search (Supplementary Reference)
- Feishu Knowledge Base: keywords {feishu_keywords}
- Web Search (backup): keywords {web_keywords}
## Writing Requirements
1. Content must be professional and rigorous, conforming to feasibility report standards
2. Prioritize citing facts and data from reference materials
3. Terminology usage must be consistent with the glossary
4. Word count: approximately {word_count} words
5. Output format: Markdown
## ⚠️ Markdown Table Format (Must Follow)
When inserting tables, you MUST strictly follow this format, otherwise tables will be distorted in docx conversion:
Correct format:
| Col1 | Col2 | Col3 |
|---|---|---|
| Content1 | Content2 | Content3 |
Key points:
- Separator row format must be `|---|---|---|` (leading/trailing `|` required)
- All rows must have the same column count as the header — mismatch causes column displacement
- Cell content should avoid containing `|` (use `~` or `-` for ranges)
## Progress Update
After completing each ## H2 section heading, call:
chapter_update(seq='{seq}', phase='writing', progress=30, note='Writing in progress...')
## Output: Generate Plain Text .txt Only
After completing the writing:
1. Save to F:/agent/chapters/{index:02d}-{short_name}.txt
2. Call chapter_done(seq='{seq}', note='Completed')
3. Update this chapter's status to 'txt_done' in plan.json
```
## Per-Batch Execution Flow (Main Agent Side, Fully Automatic)
```python
# 1. Display outline/current status (display only, no user confirmation)
print(f"Current batch: Batch {label}")
print(f"Chapters to write: {chapters_list}")
print(f"Estimated parallelism: {n} chapters")
# 2. Clear previous batch tracking state
from parallel_tracker import Tracker
Tracker().clear()
# 3. Parallel launch sub-agents (≤5 per batch, automatically execute all batches)
for subagent_task in batch_tasks:
sessions_spawn(
task=subagent_task,
runtime="subagent",
runTimeoutSeconds=300,
mode="run"
)
# 4. Monitor progress in background (automatically wait for this batch to complete)
# python parallel_tracker.py wait
# 5. After this batch completes, automatically proceed to next batch (no user confirmation)
# If this is the last batch, automatically execute:
from integrate_report import batch_convert_txt_to_docx
batch_convert_txt_to_docx(txt_dir='F:/agent/chapters', max_concurrent=8)
```
## Batch Completion Notification
WeChat notification is automatically sent after each batch completes, no manual intervention needed. If any chapter needs modification, you can notify the main agent at any time (supports small changes via direct .txt editing, or large changes via full chapter regeneration).
FILE:references/table_format_guide.md
# Markdown Table Format Specification
Sub-agents must strictly follow this specification when inserting tables in `.txt` files. Incorrect format causes table distortion during docx conversion.
## Correct Format
```
| Col1 | Col2 | Col3 |
|---|---|---|
| Content1 | Content2 | Content3 |
| Content4 | Content5 | Content6 |
```
## Six Key Rules
1. **Separator row must include leading and trailing `|`**
- ✅ Correct: `|---|---|---|`
- ❌ Wrong: `---|---|---` (missing leading/trailing `|`)
- ❌ Wrong: `|---|:---|:---|` (missing leading/trailing `|` on separator row)
2. **All rows (including data rows) must have leading and trailing `|`**
- Each row format: `| Cell1 | Cell2 | Cell3 |`
3. **All rows must have the same column count as the header**
- If header has 4 columns, all data rows must also have 4 columns
- Mismatch causes column displacement in docx
4. **Separator row only allows `-`, `:`, `|` and spaces**
- ✅ `|---|`, `| :--- |` (alignment markers)
- ❌ `|===|` (`=` not allowed)
- ❌ `|--|--|` (missing leading/trailing `|`)
5. **Cell content must not contain line breaks**
- Cell content must be completed on a single line
6. **Cell content should avoid containing `|` character**
- Use `~` or `-` for ranges: `25—45` not `25|45`
- If `|` must be included, escape it (not recommended)
## Common Error Examples
| Error Type | Wrong | Correct |
|------------|-------|---------|
| Missing leading/trailing `\|` | `\|---\|---\|---` | `\|---\|---\|---\|` |
| Inconsistent row/column count | `\|A\|B\|C\|` followed by `\|1\|2\|` (missing column) | `\|A\|B\|C\|` followed by `\|1\|2\| \|` |
| Separator row uses `=` | `\|====\|====\|` | `\|---\|---\|` |
| Cell contains `\|` | `\|25\|45\|` (range) | `\|25~45\|` |
## Recommended Symbols for Cell Content
| Purpose | Recommended Symbol | Example |
|---------|-------------------|---------|
| Numeric range | `~` or `—` | `25~45`, `25—45` |
| Percentage | `%` | `30%` |
| Rating | `★` (avoid `\|`) | `★★★☆☆` |
| Notes/Remarks | Write directly | `Including equipment maintenance service` |
## Validation Method
After generating docx, use this command to check if column counts are reasonable (normally 2~8 columns):
```python
from docx import Document
doc = Document('F:/agent/整合报告.docx')
for t in doc.tables:
print(f'{len(t.rows)} rows x {len(t.columns)} cols')
# Column count > 15 usually indicates table format error
```
## Why Format Errors Cause Distortion
`_add_table_to_doc` internally uses `max(len(r) for r in rows)` to calculate column count:
- After correct parsing: `rows[0] = ['Col1', 'Col2', 'Col3']`, `len = 3`
- When raw string is passed: `rows[0] = '| Col1 | Col2 | Col3 |'`, `len = 17` (character count)
17 columns vs 3 columns → each character occupies one cell → table completely distorted.
Software Development Work Estimation Skill. Triggered when user mentions "work estimation", "project estimation", "effort estimation", "timeline assessment",...
---
name: work-estimation-en
description: |
Software Development Work Estimation Skill. Triggered when user mentions "work estimation", "project estimation", "effort estimation", "timeline assessment", "task breakdown", "man-hour calculation", "development cycle", or similar terms.
Accepts user requirements text or documents, automatically breaks down work items and estimates effort, outputting Excel evaluation reports.
version: 1.0.0
---
# 📊 Software Development Work Estimation
Automatically analyze user requirements, break them into specific work items, and estimate effort across multiple dimensions, outputting structured Excel reports.
## Workflow
### Step 1: Collect Requirements
User provides:
- Requirements description (plain text)
- Or requirements document path (supports .md, .docx, .txt formats)
### Step 2: AI Requirements Breakdown
AI automatically:
1. Analyzes requirement content
2. Breaks down into specific work modules
3. Categorizes by dimension (Analysis, Design, Frontend, Backend, Algorithm, Testing)
### Step 3: Effort Estimation
For each work item, evaluate:
- Workload (man-days)
- Complexity (Low/Medium/High)
- Risk Level (Low/Medium/High)
- Parallel Feasibility
- Prerequisites
### Step 4: Generate Excel
Multi-sheet Excel report:
- Overview
- Dimension Details
- Gantt Chart (Project Progress)
- Key Risk Items
- Coordination Relations
- Cost Estimation
## Output Description
### Sheet 1: Overview
| Work Module | Analysis | Design | Frontend | Backend | Algorithm | Testing | Total |
|------------|----------|--------|----------|---------|-----------|--------|-------|
| Module A | 1 day | 2 days | 3 days | 5 days | 1 day | 2 days | 14 days |
### Sheet 2-7: Dimension Details
Work content, estimation basis, and estimated hours by dimension.
### Sheet 8: Gantt Chart
Visual display of parallel work, critical path, and milestones. **Automatically skips weekends and holidays**.
### Sheet 9: Key Risk Assessment
Items with high risk or uncertainty are listed separately with explanations and recommendations.
### Sheet 10: Coordination Relations
Dependencies between work items, prerequisites, and coordination matters.
### Sheet 11: Cost Estimation
Three parts:
- **Labor Cost**: Role × man-days × daily rate
- **Hardware/Software Cost**: Servers, domains, SSL, third-party services
- **Total Budget**: Includes 15% contingency buffer
## Estimation Reference
| Complexity | Frontend | Backend | Algorithm | Testing |
|------------|----------|---------|-----------|---------|
| Low | 1-2 days | 1-3 days | 1-3 days | 0.5-1 day |
| Medium | 3-5 days | 3-7 days | 3-7 days | 1-3 days |
| High | 5-10 days| 7-14 days| 7-14 days| 3-5 days |
## Notes
- Estimation based on standard team config (1 frontend, 1 backend, 1 tester)
- Parallel work reduces calendar time, not man-hours
- Reserve 10-20% buffer for changes
- Adjust based on actual team situation
## Example
**User:** Help me estimate this project: Develop an e-commerce mini-app with user login, product display, shopping cart, and order payment
**AI:**
```
Got it! I'll help you with the work estimation.
Based on your requirements, I've broken them down into:
📦 **Work Modules**
1. User System (login, register, profile)
2. Product Module (list, detail, search)
3. Shopping Cart Module
4. Order Module
5. Payment Module
6. Infrastructure (routing, state management, API)
Generating detailed estimation report...
```
FILE:README.md
# 📊 Software Development Work Estimation
Automatically analyze requirements, break down work items, estimate effort, and output professional Excel evaluation reports.
## Features
- ✅ **AI Smart Breakdown** - Automatically analyze requirements and split into work modules
- ✅ **Six-Dimension Estimation** - Analysis, Design, Frontend, Backend, Algorithm, Testing
- ✅ **Gantt Chart** - Visual project progress with parallel work display
- ✅ **Risk Assessment** - Highlight high-risk and uncertain items
- ✅ **Coordination Relations** - Clear dependencies and coordination matters
## Output Structure
| Sheet | Content |
|-------|---------|
| Overview | All work items summary with dimension ratios |
| Analysis | Analysis dimension details |
| Design | Design dimension details |
| Frontend | Frontend development details |
| Backend | Backend development details |
| Algorithm | Algorithm development details |
| Testing | Testing details |
| Gantt Chart | Project progress (skips weekends/holidays) |
| Key Risks | High-risk items |
| Coordination | Dependencies and coordination |
| Cost Estimation | Labor + hardware/software costs |
## Usage
Describe your requirements:
```
Help me estimate this project: Develop an e-commerce mini-app with user login, product display, shopping cart, and order payment
```
## Files
```
work-estimation-en/
├── SKILL.md # Skill definition
├── README.md # This file
├── scripts/
│ └── generate_estimation.py # Excel generator
├── references/
│ └── evaluation-guide.md # Estimation guide
└── evals/
└── evals.json # Test cases
```
FILE:scripts/generate_estimation.py
"""
软件开发工时评估 Excel 生成器
输入:需求描述和拆分后的工作项
输出:多 Sheet 的 Excel 评估报告
"""
import json
from datetime import datetime, timedelta
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from openpyxl.chart import BarChart, PieChart, Reference
from openpyxl.chart.label import DataLabelList
from openpyxl.chart.series import DataPoint
from openpyxl.drawing.fill import PatternFillProperties, ColorChoice
# 中国法定节假日(示例,可扩展)
HOLIDAYS = [
# 2026年
datetime(2026, 1, 1), # 元旦
datetime(2026, 1, 28), datetime(2026, 1, 29), datetime(2026, 1, 30), # 春节
datetime(2026, 2, 1), datetime(2026, 2, 2), datetime(2026, 2, 3), datetime(2026, 2, 4),
datetime(2026, 4, 4), datetime(2026, 4, 5), datetime(2026, 4, 6), # 清明
datetime(2026, 5, 1), datetime(2026, 5, 2), datetime(2026, 5, 3), # 劳动节
datetime(2026, 6, 1), # 端午
datetime(2026, 10, 1), datetime(2026, 10, 2), datetime(2026, 10, 3), # 国庆
datetime(2026, 10, 4), datetime(2026, 10, 5), datetime(2026, 10, 6), datetime(2026, 10, 7),
]
# 样式定义
HEADER_FILL = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
HEADER_FONT = Font(color="FFFFFF", bold=True)
TITLE_FONT = Font(size=14, bold=True)
SUBTITLE_FONT = Font(size=11, bold=True)
MONEY_FILL = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid")
BORDER_THIN = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
def is_working_day(date):
"""判断是否为工作日(跳过周末和节假日)"""
if date.weekday() >= 5: # 0=周一, 5=周六, 6=周日
return False
if date in HOLIDAYS:
return False
return True
def add_working_days(start_date, days):
"""添加工作日后返回结束日期(跳过周末和节假日)"""
current = start_date
remaining = days
while remaining > 0:
current += timedelta(days=1)
if is_working_day(current):
remaining -= 1
return current
def get_working_days_between(start_date, end_date):
"""计算两个日期之间的工作日数"""
count = 0
current = start_date
while current <= end_date:
if is_working_day(current):
count += 1
current += timedelta(days=1)
return count
def set_header(ws, row, col, value):
cell = ws.cell(row=row, column=col, value=value)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.border = BORDER_THIN
return cell
def set_cell(ws, row, col, value, bold=False, align='left', fill=None):
cell = ws.cell(row=row, column=col, value=value)
cell.font = Font(bold=bold)
cell.alignment = Alignment(horizontal=align, vertical='center')
cell.border = BORDER_THIN
if fill:
cell.fill = fill
return cell
def auto_width(ws):
for column in ws.columns:
max_length = 0
column_letter = get_column_letter(column[0].column)
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
def generate_estimation_excel(requirements: str, modules: list, output_path: str = None):
"""
生成工时评估 Excel
Args:
requirements: 需求描述
modules: 工作模块列表,每项包含:
{
"name": "模块名称",
"desc": "模块描述",
"items": [
{
"name": "工作项名称",
"analysis": 1.0, # 需求分析人天
"design": 2.0, # 设计人天
"frontend": 3.0, # 前端人天
"backend": 5.0, # 后台人天
"algorithm": 0.0, # 算法人天
"test": 2.0, # 测试人天
"complexity": "中",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": ""
}
]
}
output_path: 输出路径
"""
wb = Workbook()
# Sheet 1: 工时总览
create_overview_sheet(wb, modules)
# Sheet 2-7: 各维度详情
create_dimensions_sheets(wb, modules)
# Sheet 8: 甘特图
create_gantt_sheet(wb, modules)
# Sheet 9: 重点评估
create_key_risks_sheet(wb, modules)
# Sheet 10: 关系协调
create_coordination_sheet(wb, modules)
# Sheet 11: 成本估算
create_cost_sheet(wb, modules)
# 保存
if not output_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"工时评估_{timestamp}.xlsx"
wb.save(output_path)
return output_path
def create_overview_sheet(wb, modules):
ws = wb.active
ws.title = "工时总览"
# 标题
ws.cell(row=1, column=1, value="软件开发工时评估总览").font = TITLE_FONT
ws.cell(row=2, column=1, value=f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
# 表头
headers = ["工作模块", "工作项", "需求分析", "设计", "前端", "后台", "算法", "测试", "小计", "复杂度", "风险", "并行"]
for i, h in enumerate(headers, 1):
set_header(ws, 4, i, h)
row = 5
total = {"analysis": 0, "design": 0, "frontend": 0, "backend": 0, "algorithm": 0, "test": 0}
module_starts = []
for module in modules:
module_start = row
for item in module.get("items", []):
subtotal = item.get("analysis", 0) + item.get("design", 0) + item.get("frontend", 0) + \
item.get("backend", 0) + item.get("algorithm", 0) + item.get("test", 0)
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, item.get("analysis", 0))
set_cell(ws, row, 4, item.get("design", 0))
set_cell(ws, row, 5, item.get("frontend", 0))
set_cell(ws, row, 6, item.get("backend", 0))
set_cell(ws, row, 7, item.get("algorithm", 0))
set_cell(ws, row, 8, item.get("test", 0))
set_cell(ws, row, 9, subtotal, bold=True, align='center')
set_cell(ws, row, 10, item.get("complexity", "中"))
set_cell(ws, row, 11, item.get("risk", "低"))
set_cell(ws, row, 12, "✓" if item.get("parallel", True) else "×")
total["analysis"] += item.get("analysis", 0)
total["design"] += item.get("design", 0)
total["frontend"] += item.get("frontend", 0)
total["backend"] += item.get("backend", 0)
total["algorithm"] += item.get("algorithm", 0)
total["test"] += item.get("test", 0)
row += 1
module_starts.append((module["name"], module_start, row - 1))
# 合计行
row += 1
set_header(ws, row, 1, "合计")
set_cell(ws, row, 2, "", bold=True)
set_cell(ws, row, 3, total["analysis"], bold=True, align='center')
set_cell(ws, row, 4, total["design"], bold=True, align='center')
set_cell(ws, row, 5, total["frontend"], bold=True, align='center')
set_cell(ws, row, 6, total["backend"], bold=True, align='center')
set_cell(ws, row, 7, total["algorithm"], bold=True, align='center')
set_cell(ws, row, 8, total["test"], bold=True, align='center')
grand_total = sum(total.values())
set_cell(ws, row, 9, grand_total, bold=True, align='center')
# 维度统计
row += 2
ws.cell(row=row, column=1, value="维度工时统计").font = SUBTITLE_FONT
row += 1
dim_headers = ["维度", "工时(人天)", "占比"]
for i, h in enumerate(dim_headers, 1):
set_header(ws, row, i, h)
row += 1
dimensions = [
("需求分析", total["analysis"]),
("设计", total["design"]),
("前端", total["frontend"]),
("后台", total["backend"]),
("算法", total["algorithm"]),
("测试", total["test"]),
]
for dim, hours in dimensions:
if hours > 0:
pct = f"{hours/grand_total*100:.1f}%" if grand_total > 0 else "0%"
set_cell(ws, row, 1, dim)
set_cell(ws, row, 2, hours, align='center')
set_cell(ws, row, 3, pct, align='center')
row += 1
auto_width(ws)
# 添加工时分布图表
create_distribution_charts(ws, modules)
def create_dimensions_sheets(wb, modules):
dimension_map = {
"需求分析": "analysis",
"设计": "design",
"前端": "frontend",
"后台": "backend",
"算法": "algorithm",
"测试": "test"
}
for sheet_name, key in dimension_map.items():
ws = wb.create_sheet(title=sheet_name)
ws.cell(row=1, column=1, value=f"{sheet_name}详情").font = TITLE_FONT
headers = ["工作模块", "工作项", "工作内容", "评估工时(人天)", "评估依据", "复杂度", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, 3, i, h)
row = 4
for module in modules:
for item in module.get("items", []):
hours = item.get(key, 0)
if hours > 0:
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, item.get("desc", ""))
set_cell(ws, row, 4, hours, align='center')
set_cell(ws, row, 5, item.get("basis", f"基于{sheet_name}标准"))
set_cell(ws, row, 6, item.get("complexity", "中"))
set_cell(ws, row, 7, item.get("note", ""))
row += 1
auto_width(ws)
def create_gantt_sheet(wb, modules):
ws = wb.create_sheet(title="甘特图")
ws.cell(row=1, column=1, value="项目进度甘特图(跳过周末和节假日)").font = TITLE_FONT
ws.cell(row=2, column=1, value=f"生成时间: {datetime.now().strftime('%Y-%m-%d')}")
headers = ["任务ID", "任务名称", "执行人", "开始日期", "结束日期", "工作日(天)", "日历日(天)", "前置任务", "状态", "里程碑"]
for i, h in enumerate(headers, 1):
set_header(ws, 3, i, h)
# 从今天开始,跳过周末和节假日
start_date = datetime.now()
# 确保从工作日开始
while not is_working_day(start_date):
start_date += timedelta(days=1)
row = 4
task_id = 1
milestones = ["需求确认", "设计完成", "开发完成", "测试完成", "上线部署"]
milestone_idx = 0
for module in modules:
for item in module.get("items", []):
total_hours = item.get("analysis", 0) + item.get("design", 0) + \
item.get("frontend", 0) + item.get("backend", 0) + \
item.get("algorithm", 0) + item.get("test", 0)
working_days = max(1, int(total_hours))
# 计算工作日结束日期
end_date = add_working_days(start_date, working_days)
# 计算日历天数(含休息日)
calendar_days = (end_date - start_date).days + 1
# 判断里程碑
is_milestone = ""
if milestone_idx < len(milestones) and working_days >= 5:
is_milestone = milestones[milestone_idx]
milestone_idx += 1
set_cell(ws, row, 1, f"T{task_id:03d}")
set_cell(ws, row, 2, f"{module['name']}-{item['name']}")
set_cell(ws, row, 3, item.get("assignee", "待分配"))
set_cell(ws, row, 4, start_date.strftime("%Y-%m-%d"))
set_cell(ws, row, 5, end_date.strftime("%Y-%m-%d"))
set_cell(ws, row, 6, working_days, align='center')
set_cell(ws, row, 7, calendar_days, align='center')
set_cell(ws, row, 8, item.get("prerequisite", "-"))
set_cell(ws, row, 9, "待开始")
set_cell(ws, row, 10, is_milestone)
# 下一个任务从休息日后开始(跳过周末和节假日)
next_start = end_date + timedelta(days=1)
while not is_working_day(next_start):
next_start += timedelta(days=1)
start_date = next_start
task_id += 1
row += 1
# 模块间休息1天(跳过周末和节假日)
start_date += timedelta(days=1)
while not is_working_day(start_date):
start_date += timedelta(days=1)
# 项目总工期
row += 2
if task_id > 1:
ws.cell(row=row, column=1, value="项目总工期(工作日)").font = SUBTITLE_FONT
# 重新计算总工期
total_start = datetime.now()
while not is_working_day(total_start):
total_start += timedelta(days=1)
final_end = datetime.now()
for m in modules:
for it in m.get("items", []):
days = int(it.get("analysis", 0) + it.get("design", 0) + it.get("frontend", 0) + \
it.get("backend", 0) + it.get("algorithm", 0) + it.get("test", 0))
final_end = add_working_days(total_start, days)
total_start = final_end + timedelta(days=1)
while not is_working_day(total_start):
total_start += timedelta(days=1)
total_workdays = get_working_days_between(datetime.now(), final_end)
ws.cell(row=row, column=3, value=f"约 {total_workdays} 个工作日")
auto_width(ws)
# 添加甘特图条形图
create_gantt_chart(ws, modules)
def create_cost_sheet(wb, modules):
"""创建成本估算表"""
ws = wb.create_sheet(title="成本估算")
ws.cell(row=1, column=1, value="项目成本估算").font = TITLE_FONT
ws.cell(row=2, column=1, value=f"生成时间: {datetime.now().strftime('%Y-%m-%d')}")
# ========== 人力成本 ==========
row = 4
ws.cell(row=row, column=1, value="一、人力成本").font = SUBTITLE_FONT
row += 1
headers = ["角色", "工时(人天)", "人数", "日均成本(元)", "小计(元)", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, row, i, h)
row += 1
# 计算各角色总工时
total = {"analysis": 0, "design": 0, "frontend": 0, "backend": 0, "algorithm": 0, "test": 0}
for module in modules:
for item in module.get("items", []):
total["analysis"] += item.get("analysis", 0)
total["design"] += item.get("design", 0)
total["frontend"] += item.get("frontend", 0)
total["backend"] += item.get("backend", 0)
total["algorithm"] += item.get("algorithm", 0)
total["test"] += item.get("test", 0)
# 角色映射和日均成本(可配置)
role_rates = [
("需求分析师", total["analysis"], 1, 1500, "需求分析"),
("UI/UX设计师", total["design"], 1, 1200, "设计"),
("前端工程师", total["frontend"], 1, 1500, "前端"),
("后端工程师", total["backend"], 1, 1800, "后台"),
("算法工程师", total["algorithm"], 1, 2000, "算法"),
("测试工程师", total["test"], 1, 1200, "测试"),
]
total_labor = 0
for role, days, count, daily_rate, _ in role_rates:
if days > 0:
subtotal = days * count * daily_rate
total_labor += subtotal
set_cell(ws, row, 1, role)
set_cell(ws, row, 2, days, align='center')
set_cell(ws, row, 3, count, align='center')
set_cell(ws, row, 4, daily_rate, align='center')
set_cell(ws, row, 5, subtotal, align='center', fill=MONEY_FILL)
set_cell(ws, row, 6, "")
row += 1
# 人力成本合计
set_header(ws, row, 1, "人力成本合计")
set_cell(ws, row, 5, total_labor, bold=True, align='center', fill=MONEY_FILL)
row += 2
# ========== 软硬件成本 ==========
ws.cell(row=row, column=1, value="二、软硬件成本").font = SUBTITLE_FONT
row += 1
headers = ["类别", "项目", "规格/数量", "单次成本(元)", "周期(月)", "小计(元)", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, row, i, h)
row += 1
# 软硬件成本项目
hw_items = [
("服务器", "云服务器(ECS)", "2核4G", 500, 3, "部署、后端服务"),
("服务器", "数据库服务(RDS)", "基础版", 300, 3, "MySQL数据库"),
("服务器", "对象存储(OSS)", "100GB", 50, 3, "文件存储"),
("域名", "域名注册", "1个", 50, 12, "域名费用"),
("SSL证书", "HTTPS证书", "1个/年", 200, 12, "安全证书"),
("第三方服务", "短信服务", "按量付费", 100, 3, "验证码短信"),
("第三方服务", "支付通道", "按交易收费", 0, 3, "支付宝/微信"),
("第三方服务", "CDN加速", "基础套餐", 100, 3, "静态资源加速"),
("软件", "开发工具", "IDE许可证", 0, 0, "免费工具"),
("软件", "设计软件", "设计工具", 0, 0, "免费/Figma"),
]
total_hw = 0
for cat, item, spec, unit_cost, months, note in hw_items:
subtotal = unit_cost * months
total_hw += subtotal
set_cell(ws, row, 1, cat)
set_cell(ws, row, 2, item)
set_cell(ws, row, 3, spec)
set_cell(ws, row, 4, unit_cost if unit_cost > 0 else "-", align='center')
set_cell(ws, row, 5, f"{months}月" if months > 0 else "-", align='center')
set_cell(ws, row, 6, subtotal if subtotal > 0 else "-", align='center', fill=MONEY_FILL)
set_cell(ws, row, 7, note)
row += 1
# 软硬件成本合计
set_header(ws, row, 1, "软硬件成本合计")
set_cell(ws, row, 6, total_hw, bold=True, align='center', fill=MONEY_FILL)
row += 2
# ========== 项目总成本 ==========
ws.cell(row=row, column=1, value="三、项目总成本").font = SUBTITLE_FONT
row += 1
total_project = total_labor + total_hw
set_header(ws, row, 1, "项目总预算")
set_cell(ws, row, 2, total_project, bold=True, align='center', fill=MONEY_FILL)
ws.cell(row=row, column=3, value=f"(人力{total_labor}元 + 软硬件{total_hw}元)")
row += 2
# ========== 成本说明 ==========
ws.cell(row=row, column=1, value="四、成本说明").font = SUBTITLE_FONT
row += 1
notes = [
"1. 人力成本按每天8小时工作制计算",
"2. 日均成本为参考价,可根据实际情况调整",
"3. 软硬件成本按最低配置估算,流量费用另计",
"4. 第三方服务(支付、短信)通常有交易手续费",
"5. 未包含项目管理和沟通成本",
"6. 预留10-20%应急预算",
]
for note in notes:
ws.cell(row=row, column=1, value=note)
row += 1
# 建议预算
row += 1
recommended = int(total_project * 1.15) # 15% buffer
set_cell(ws, row, 1, f"建议项目预算(含15%应急): ", bold=True)
set_cell(ws, row, 2, recommended, bold=True, align='center', fill=MONEY_FILL)
ws.cell(row=row, column=3, value="元")
auto_width(ws)
def create_key_risks_sheet(wb, modules):
ws = wb.create_sheet(title="重点评估")
ws.cell(row=1, column=1, value="重点评估与风险项").font = TITLE_FONT
ws.cell(row=2, column=1, value="以下列出高风险、不确定性大或技术难点明显的工作项")
headers = ["工作模块", "工作项", "风险类型", "风险描述", "影响评估", "建议措施", "优先级"]
for i, h in enumerate(headers, 1):
set_header(ws, 4, i, h)
row = 5
risk_types = {
"高": "高风险",
"中": "中等风险",
"低": "低风险"
}
for module in modules:
for item in module.get("items", []):
risk = item.get("risk", "低")
if risk in ["高", "中"]:
# 评估不确定性
if "algorithm" in item and item.get("algorithm", 0) > 3:
risk_type = "技术难点"
elif not item.get("basis"):
risk_type = "需求不明确"
else:
risk_type = risk_types.get(risk, "其他")
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, risk_type)
set_cell(ws, row, 4, item.get("risk_desc", f"该工作项复杂度{item.get('complexity', '中')},存在一定不确定性"))
set_cell(ws, row, 5, item.get("impact", "可能导致进度延误或需要额外资源"))
set_cell(ws, row, 6, item.get("suggestion", "建议预留buffer时间,提前技术验证"))
set_cell(ws, row, 7, "高" if risk == "高" else "中", align='center')
row += 1
if row == 5:
set_cell(ws, row, 1, "暂无高风险项")
auto_width(ws)
def create_coordination_sheet(wb, modules):
ws = wb.create_sheet(title="关系协调")
ws.cell(row=1, column=1, value="工作关系与协调事项").font = TITLE_FONT
headers = ["工作模块", "工作项", "前置依赖", "协调事项", "协调对象", "协调时间点", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, 3, i, h)
row = 4
for module in modules:
for item in module.get("items", []):
# 检查是否有协调事项
has_coordination = item.get("coordination") or item.get("prerequisite")
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, item.get("prerequisite", "-"))
set_cell(ws, row, 4, item.get("coordination", "-"))
set_cell(ws, row, 5, item.get("coord_target", "待确认"))
set_cell(ws, row, 6, item.get("coord_time", "开发前"))
set_cell(ws, row, 7, item.get("note", ""))
row += 1
# 添加协调关系说明
row += 2
ws.cell(row=row, column=1, value="协调关系类型说明:").font = SUBTITLE_FONT
row += 1
coord_types = [
("前置依赖", "某工作项必须在其他工作项完成后才能开始"),
("接口协调", "前后端需协调接口定义和数据格式"),
("资源协调", "需要申请特定资源(服务器、第三方服务等)"),
("评审协调", "需要安排评审会议(设计评审、代码评审等)"),
]
for coord_type, desc in coord_types:
set_cell(ws, row, 1, coord_type, bold=True)
set_cell(ws, row, 2, desc)
row += 1
auto_width(ws)
def create_gantt_chart(ws, modules):
"""在甘特图Sheet中创建条形图"""
# 准备图表数据区域(在甘特图数据下方)
chart_start_row = ws.max_row + 3
# 写入图表数据:任务名、开始日期、时长
ws.cell(row=chart_start_row, column=1, value="任务名称").font = Font(bold=True)
ws.cell(row=chart_start_row, column=2, value="开始日期").font = Font(bold=True)
ws.cell(row=chart_start_row, column=3, value="时长(天)").font = Font(bold=True)
row = chart_start_row + 1
chart_data_start = row
start_date = datetime.now()
while not is_working_day(start_date):
start_date += timedelta(days=1)
for module in modules:
for item in module.get("items", []):
total = item.get("analysis", 0) + item.get("design", 0) + \
item.get("frontend", 0) + item.get("backend", 0) + \
item.get("algorithm", 0) + item.get("test", 0)
days = max(1, int(total))
end_date = add_working_days(start_date, days)
ws.cell(row=row, column=1, value=f"{module['name']}-{item['name']}")
ws.cell(row=row, column=2, value=start_date)
ws.cell(row=row, column=3, value=days)
# 格式化日期
ws.cell(row=row, column=2).number_format = 'YYYY-MM-DD'
next_start = end_date + timedelta(days=1)
while not is_working_day(next_start):
next_start += timedelta(days=1)
start_date = next_start
row += 1
chart_data_end = row - 1
# 创建甘特图
chart = BarChart()
chart.type = "bar" # 横向条形图
chart.title = "项目进度甘特图"
chart.y_axis.title = "任务"
chart.x_axis.title = "日期"
chart.style = 10
# 数据系列
data = Reference(ws, min_col=3, min_row=chart_start_row, max_row=chart_data_end)
cats = Reference(ws, min_col=1, min_row=chart_start_row + 1, max_row=chart_data_end)
chart.add_data(data, titles_from_data=True)
chart.set_categories(cats)
chart.shape = 4
chart.width = 20
chart.height = 12
# 放置图表
ws.add_chart(chart, f"H{chart_start_row}")
def create_distribution_charts(ws, modules):
"""在工作总览Sheet中创建工时分布图表"""
# 计算各维度工时
total = {"analysis": 0, "design": 0, "frontend": 0, "backend": 0, "algorithm": 0, "test": 0}
module_totals = {}
for module in modules:
module_total = 0
for item in module.get("items", []):
item_total = item.get("analysis", 0) + item.get("design", 0) + \
item.get("frontend", 0) + item.get("backend", 0) + \
item.get("algorithm", 0) + item.get("test", 0)
total["analysis"] += item.get("analysis", 0)
total["design"] += item.get("design", 0)
total["frontend"] += item.get("frontend", 0)
total["backend"] += item.get("backend", 0)
total["algorithm"] += item.get("algorithm", 0)
total["test"] += item.get("test", 0)
module_total += item_total
module_totals[module["name"]] = module_total
grand_total = sum(total.values())
# 找到总览Sheet的最后一行
chart_row = ws.max_row + 3
# ========== 维度占比饼图 ==========
ws.cell(row=chart_row, column=1, value="工时维度占比").font = Font(bold=True, size=12)
chart_row += 1
# 写入饼图数据
ws.cell(row=chart_row, column=1, value="维度")
ws.cell(row=chart_row, column=2, value="工时(人天)")
pie_data_row = chart_row + 1
dimensions = [("需求分析", total["analysis"]),
("设计", total["design"]),
("前端", total["frontend"]),
("后台", total["backend"]),
("算法", total["algorithm"]),
("测试", total["test"])]
row = pie_data_row
for dim, hours in dimensions:
if hours > 0:
ws.cell(row=row, column=1, value=dim)
ws.cell(row=row, column=2, value=hours)
row += 1
pie_data_end = row - 1
# 创建饼图
pie = PieChart()
labels = Reference(ws, min_col=1, min_row=pie_data_row, max_row=pie_data_end)
data = Reference(ws, min_col=2, min_row=pie_data_row - 1, max_row=pie_data_end)
pie.add_data(data, titles_from_data=True)
pie.set_categories(labels)
pie.title = "各维度工时占比"
pie.style = 10
pie.width = 12
pie.height = 10
# 添加数据标签
pie.dataLabels = DataLabelList()
pie.dataLabels.showPercent = True
pie.dataLabels.showVal = True
pie.dataLabels.showCatName = True
ws.add_chart(pie, f"D{chart_row}")
# ========== 模块占比柱状图 ==========
chart_row = pie_data_end + 3
ws.cell(row=chart_row, column=1, value="各模块工时对比").font = Font(bold=True, size=12)
chart_row += 1
# 写入柱状图数据
ws.cell(row=chart_row, column=1, value="模块")
ws.cell(row=chart_row, column=2, value="工时(人天)")
bar_data_row = chart_row + 1
row = bar_data_row
for module_name, hours in module_totals.items():
ws.cell(row=row, column=1, value=module_name)
ws.cell(row=row, column=2, value=hours)
row += 1
bar_data_end = row - 1
# 创建柱状图
bar = BarChart()
bar.type = "col"
bar.style = 10
bar.title = "各模块工时对比"
bar.y_axis.title = "工时(人天)"
bar.x_axis.title = "模块"
labels = Reference(ws, min_col=1, min_row=bar_data_row, max_row=bar_data_end)
data = Reference(ws, min_col=2, min_row=bar_data_row - 1, max_row=bar_data_end)
bar.add_data(data, titles_from_data=True)
bar.set_categories(labels)
bar.width = 14
bar.height = 10
ws.add_chart(bar, f"D{chart_row}")
def parse_requirements(requirements_text: str) -> list:
"""
解析需求文本,生成模块结构
这是一个简化的解析,实际使用时可能需要更复杂的处理
"""
# 简单的模块拆分逻辑
modules = []
current_module = None
lines = requirements_text.split("\n")
for line in lines:
line = line.strip()
if not line:
continue
# 检测是否是模块标题(通常是 ## 或 ### 开头,或者是 "XX模块" 格式)
if line.startswith("#"):
if current_module:
modules.append(current_module)
current_module = {
"name": line.lstrip("#").strip(),
"desc": "",
"items": []
}
elif "模块" in line and ":" in line:
if current_module:
modules.append(current_module)
module_name = line.split(":")[0].strip()
module_desc = line.split(":")[1].strip() if ":" in line else ""
current_module = {
"name": module_name,
"desc": module_desc,
"items": []
}
if current_module:
modules.append(current_module)
return modules
if __name__ == "__main__":
# 测试
test_modules = [
{
"name": "用户系统",
"desc": "用户登录注册相关功能",
"items": [
{
"name": "登录注册",
"desc": "手机号+验证码登录",
"analysis": 1.0,
"design": 1.0,
"frontend": 2.0,
"backend": 3.0,
"algorithm": 0,
"test": 1.0,
"complexity": "低",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": "需与短信服务商协调"
}
]
}
]
output = generate_estimation_excel("测试需求", test_modules)
print(f"已生成: {output}")
FILE:scripts/test_login.py
"""测试:APP手机号登录注册工时评估"""
import sys
sys.path.insert(0, "C:/Users/Administrator/AppData/Roaming/LobsterAI/SKILLs/work-estimation/scripts")
from generate_estimation import generate_estimation_excel
modules = [
{
"name": "用户系统",
"desc": "APP手机号登录注册模块",
"items": [
{
"name": "登录注册界面",
"desc": "手机号输入、验证码发送、倒计时、协议勾选",
"analysis": 0.5,
"design": 1.0,
"frontend": 2.0,
"backend": 1.5,
"algorithm": 0,
"test": 0.5,
"complexity": "低",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": "需与短信服务商协调"
},
{
"name": "验证码服务",
"desc": "短信验证码生成、发送、校验(60秒有效期)",
"analysis": 0.5,
"design": 0.5,
"frontend": 0,
"backend": 2.0,
"algorithm": 0,
"test": 0.5,
"complexity": "中",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": "需与短信服务商协调接口"
},
{
"name": "用户信息存储",
"desc": "用户表设计、注册流程、登录Token生成",
"analysis": 0.5,
"design": 1.0,
"frontend": 0,
"backend": 2.5,
"algorithm": 0,
"test": 0.5,
"complexity": "中",
"risk": "低",
"parallel": False,
"prerequisite": "验证码服务完成后",
"coordination": ""
},
{
"name": "第三方登录(可选)",
"desc": "微信/Apple登录集成",
"analysis": 0.5,
"design": 0.5,
"frontend": 1.5,
"backend": 1.5,
"algorithm": 0,
"test": 0.5,
"complexity": "高",
"risk": "中",
"parallel": True,
"prerequisite": "",
"coordination": "需微信/Apple开发者账号"
}
]
}
]
output = generate_estimation_excel("APP手机号登录注册", modules)
print(f"已生成: {output}")
FILE:references/evaluation-guide.md
# Software Development Work Estimation Guide
## Estimation Dimensions
### 1. Analysis
- Requirements research & interviews
- Requirements documentation
- Requirements review & approval
- Requirements change management
### 2. Design
- Architecture design
- UI/UX design
- Database design
- API design
- Detailed design
### 3. Frontend
- Page development
- Component封装
- State management
- Performance optimization
- Compatibility
### 4. Backend
- Server development
- API development
- Database implementation
- Caching design
- Security
### 5. Algorithm
- Business logic implementation
- Data processing
- AI/ML models
- Performance optimization
### 6. Testing
- Unit testing
- Integration testing
- System testing
- Performance testing
- UAT
---
## Complexity Standards
### Frontend
| Complexity | Description | Example |
|------------|-------------|---------|
| Low | Static pages, minimal interaction | Landing pages, forms |
| Medium | Dynamic pages, state management | List pages, form validation |
| High | Complex interactions, sync | Real-time collaboration, drag-drop |
### Backend
| Complexity | Description | Example |
|------------|-------------|---------|
| Low | CRUD, single table ops | Basic CRUD |
| Medium | Business logic, transactions | Order processing, inventory |
| High | Distributed, high concurrency | Flash sales, real-time computing |
### Algorithm
| Complexity | Description | Example |
|------------|-------------|---------|
| Low | Simple calculations | Statistics, filtering, sorting |
| Medium | Moderate algorithms | Recommendations, search ranking |
| High | Complex algorithms/AI | Image recognition, NLP, deep learning |
---
## Quick Reference Table
### Analysis
| Item | Low | Medium | High |
|------|-----|--------|------|
| Research | 1 day | 2-3 days | 5 days+ |
| Documentation | 1 day | 2-3 days | 5 days+ |
| Review | 0.5 day | 1 day | 2 days+ |
### Design
| Item | Low | Medium | High |
|------|-----|--------|------|
| Architecture | 1-2 days | 3-5 days | 1-2 weeks |
| UI Design | 2-3 days | 5-7 days | 2-3 weeks |
| Database | 0.5 day | 1-2 days | 3-5 days |
### Development (per feature point)
| Role | Low | Medium | High |
|------|-----|--------|------|
| Frontend | 0.5-1 day | 1-2 days | 2-5 days |
| Backend | 1-2 days | 2-4 days | 5-10 days |
| Algorithm | 1-2 days | 3-5 days | 5-10 days |
### Testing
| Item | Ratio | Description |
|------|-------|-------------|
| Functional | 0.3-0.5 | Relative to dev hours |
| Integration | 0.2-0.3 | Relative to dev hours |
| Performance | 0.1-0.2 | Relative to dev hours |
---
## Gantt Chart Planning
### Parallel Work
- Frontend page development can be parallel
- Independent modules can be parallel
- Design and frontend can be partially parallel
- Frontend and backend can be parallel (after API agreement)
### Critical Path
- Sequential work items
- Determines shortest project duration
- Requires close monitoring
### Milestones
- Requirements confirmed
- Design completed
- Development completed
- Testing completed
- Deployment
---
## Risk Assessment
### Key Items (require separate notes)
1. Technical difficulties unclear
2. Third-party dependencies uncertain
3. Requirements boundaries fuzzy
4. Performance requirements extremely high
5. Team lacks experience
### Risk Levels
| Level | Description | Buffer |
|-------|-------------|--------|
| Low | Mature tech, clear requirements | 10% |
| Medium | Some complexity | 20% |
| High | New tech or fuzzy requirements | 30%+ |
---
## Excel Output Structure
```
Sheet 1: Overview
Sheet 2: Analysis Details
Sheet 3: Design Details
Sheet 4: Frontend Details
Sheet 5: Backend Details
Sheet 6: Algorithm Details
Sheet 7: Testing Details
Sheet 8: Gantt Chart
Sheet 9: Key Risks
Sheet 10: Coordination
```
### Gantt Chart Columns
| Task | Start Date | End Date | Duration(days) | Prerequisites | Assignee |
|------|------------|----------|----------------|---------------|----------|
FILE:evals/evals.json
[
{
"id": "eval-001",
"name": "电商小程序工时评估",
"input": {
"requirements": "开发一个电商小程序,包括用户登录、商品展示、购物车、订单支付功能"
},
"expected": {
"modules_count": 5,
"has_overview": true,
"has_gantt": true,
"has_risks": true,
"has_coordination": true,
"dimensions": ["需求分析", "设计", "前端", "后台", "算法", "测试"]
}
},
{
"id": "eval-002",
"name": "企业内部管理系统评估",
"input": {
"requirements": "开发企业内部OA系统,包含审批流程、考勤管理、公告发布三个模块"
},
"expected": {
"modules_count": 3,
"has_overview": true,
"has_gantt": true
}
},
{
"id": "eval-003",
"name": "AI推荐系统评估",
"input": {
"requirements": "开发一个内容推荐系统,包括用户画像、推荐算法、前端展示三大块"
},
"expected": {
"modules_count": 3,
"has_algorithm_sheet": true,
"algorithm_hours_defined": true
}
}
]
软件开发工时评估技能。当用户提到"工时评估"、"工作量评估"、"开发周期评估"、"项目评估"、"评估工时"、"拆分工作"、"任务拆分"、"项目工时"、"排期评估"、"开发周期估算"时触发。 支持用户输入需求描述或上传需求文档,自动进行工作拆分和工时估算,输出 Excel 评估报告。
---
name: work-estimation-zh
description: |
软件开发工时评估技能。当用户提到"工时评估"、"工作量评估"、"开发周期评估"、"项目评估"、"评估工时"、"拆分工作"、"任务拆分"、"项目工时"、"排期评估"、"开发周期估算"时触发。
支持用户输入需求描述或上传需求文档,自动进行工作拆分和工时估算,输出 Excel 评估报告。
version: 1.0.0
---
# 📊 软件开发工时评估
自动分析用户需求,拆分为具体工作项,并从多个维度进行工时评估,输出结构化的 Excel 报告。
## 使用流程
### 第一步:收集需求
请用户提供:
- 需求描述(可直接粘贴文字)
- 或需求文档路径(支持 .md、.docx、.txt 等格式)
### 第二步:AI 需求拆分
AI 会自动:
1. 分析需求内容
2. 拆分为具体工作模块
3. 按维度分类(需求分析、设计、前端、后台、算法、测试)
### 第三步:工时评估
对每个工作项评估:
- 工作量(人天)
- 复杂度(低/中/高)
- 风险等级(低/中/高)
- 并行可行性
- 前置依赖
### 第四步:生成 Excel
输出多 Sheet 的 Excel 报告:
- 总览表
- 各维度详细评估
- 甘特图(项目进度)
- 重点风险项
- 协调关系表
## 输出说明
### Sheet 1:工时总览
| 工作模块 | 需求分析 | 设计 | 前端 | 后台 | 算法 | 测试 | 合计 |
|---------|---------|------|------|------|------|------|------|
| 模块A | 1人天 | 2人天 | 3人天 | 5人天 | 1人天 | 2人天 | 14人天 |
### Sheet 2-7:各维度详情
按维度详细列出工作内容、评估依据、预估工时。
### Sheet 8:甘特图
可视化展示并行工作、项目关键路径、里程碑。**自动跳过周末和法定节假日**。
### Sheet 9:重点评估
高风险或不确定性高的工作单独列出,说明原因和建议。
### Sheet 10:关系协调
工作间的依赖关系、前置任务、协调事项。
### Sheet 11:成本估算
包含三部分:
- **人力成本**:按角色统计工时 × 日均成本
- **软硬件成本**:服务器、域名、SSL、第三方服务等
- **项目总预算**:含15%应急缓冲
## 评估标准参考
| 复杂度 | 前端 | 后台 | 算法 | 测试 |
|-------|------|------|------|------|
| 低 | 1-2天 | 1-3天 | 1-3天 | 0.5-1天 |
| 中 | 3-5天 | 3-7天 | 3-7天 | 1-3天 |
| 高 | 5-10天 | 7-14天 | 7-14天 | 3-5天 |
## 注意事项
- 评估基于常规团队配置(前端1人、后端1人、测试1人)
- 多人并行时工时不变,仅缩短日历时间
- 预留 10-20% buffer 应对变更
- 实际执行需根据团队情况调整
## 示例对话
**用户:** 帮我评估一下这个项目:开发一个电商小程序,包括用户登录、商品展示、购物车、订单支付功能
**AI:**
```
收到!我来帮你进行工时评估。
基于你描述的需求,我拆分为以下模块:
📦 **工作模块**
1. 用户系统(登录、注册、个人中心)
2. 商品模块(列表、详情、搜索)
3. 购物车模块
4. 订单模块
5. 支付模块
6. 基础架构(路由、状态管理、API封装)
正在生成详细评估报告...
```
FILE:README.md
# 📊 软件开发工时评估
自动分析需求、拆分工作项、评估工时,输出专业的 Excel 评估报告。
## 功能特性
- ✅ **智能需求拆分** - AI 自动分析需求,拆分为具体工作模块
- ✅ **六维度评估** - 需求分析、设计、前端、后台、算法、测试
- ✅ **甘特图展示** - 可视化项目进度和并行工作
- ✅ **风险评估** - 重点标注高风险和不确定性工作
- ✅ **协调关系** - 清晰展示工作间的依赖和协调事项
## 输出文件结构
| Sheet | 内容 |
|-------|------|
| 工时总览 | 所有工作项的汇总表和维度占比 |
| 需求分析 | 需求分析维度详细工作项 |
| 设计 | 设计维度详细工作项 |
| 前端 | 前端开发详细工作项 |
| 后台 | 后台开发详细工作项 |
| 算法 | 算法开发详细工作项(如有) |
| 测试 | 测试详细工作项 |
| 甘特图 | 项目进度可视化(跳过周末和节假日) |
| 重点评估 | 高风险项说明 |
| 关系协调 | 依赖关系和协调事项 |
| 成本估算 | 人力成本、软硬件成本、总预算 |
## 使用示例
### 直接描述需求
```
帮我评估这个项目:开发一个在线教育平台,包括课程展示、视频播放、作业提交、成绩查询功能
```
### 提供详细需求文档
```
帮我评估这个项目的工时,需求文档在 C:\docs\requirements.md
```
## 运行脚本
```bash
cd C:/Users/Administrator/AppData/Roaming/LobsterAI/SKILLs/work-estimation/scripts
python generate_estimation.py
```
## 评估参考标准
| 复杂度 | 前端 | 后台 | 算法 |
|--------|------|------|------|
| 低 | 0.5-1天 | 1-2天 | 1-2天 |
| 中 | 1-3天 | 2-5天 | 3-5天 |
| 高 | 3-7天 | 5-10天 | 5-10天 |
## 文件位置
```
C:/Users/Administrator/AppData/Roaming/LobsterAI/SKILLs/work-estimation/
├── SKILL.md # 技能定义
├── README.md # 本文件
├── scripts/
│ └── generate_estimation.py # Excel生成脚本
├── references/
│ └── evaluation-guide.md # 评估指南
└── evals/
└── evals.json # 测试用例
```
FILE:scripts/generate_estimation.py
"""
软件开发工时评估 Excel 生成器
输入:需求描述和拆分后的工作项
输出:多 Sheet 的 Excel 评估报告
"""
import json
from datetime import datetime, timedelta
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
from openpyxl.chart import BarChart, PieChart, Reference
from openpyxl.chart.label import DataLabelList
from openpyxl.chart.series import DataPoint
from openpyxl.drawing.fill import PatternFillProperties, ColorChoice
# 中国法定节假日(示例,可扩展)
HOLIDAYS = [
# 2026年
datetime(2026, 1, 1), # 元旦
datetime(2026, 1, 28), datetime(2026, 1, 29), datetime(2026, 1, 30), # 春节
datetime(2026, 2, 1), datetime(2026, 2, 2), datetime(2026, 2, 3), datetime(2026, 2, 4),
datetime(2026, 4, 4), datetime(2026, 4, 5), datetime(2026, 4, 6), # 清明
datetime(2026, 5, 1), datetime(2026, 5, 2), datetime(2026, 5, 3), # 劳动节
datetime(2026, 6, 1), # 端午
datetime(2026, 10, 1), datetime(2026, 10, 2), datetime(2026, 10, 3), # 国庆
datetime(2026, 10, 4), datetime(2026, 10, 5), datetime(2026, 10, 6), datetime(2026, 10, 7),
]
# 样式定义
HEADER_FILL = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
HEADER_FONT = Font(color="FFFFFF", bold=True)
TITLE_FONT = Font(size=14, bold=True)
SUBTITLE_FONT = Font(size=11, bold=True)
MONEY_FILL = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid")
BORDER_THIN = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
def is_working_day(date):
"""判断是否为工作日(跳过周末和节假日)"""
if date.weekday() >= 5: # 0=周一, 5=周六, 6=周日
return False
if date in HOLIDAYS:
return False
return True
def add_working_days(start_date, days):
"""添加工作日后返回结束日期(跳过周末和节假日)"""
current = start_date
remaining = days
while remaining > 0:
current += timedelta(days=1)
if is_working_day(current):
remaining -= 1
return current
def get_working_days_between(start_date, end_date):
"""计算两个日期之间的工作日数"""
count = 0
current = start_date
while current <= end_date:
if is_working_day(current):
count += 1
current += timedelta(days=1)
return count
def set_header(ws, row, col, value):
cell = ws.cell(row=row, column=col, value=value)
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.border = BORDER_THIN
return cell
def set_cell(ws, row, col, value, bold=False, align='left', fill=None):
cell = ws.cell(row=row, column=col, value=value)
cell.font = Font(bold=bold)
cell.alignment = Alignment(horizontal=align, vertical='center')
cell.border = BORDER_THIN
if fill:
cell.fill = fill
return cell
def auto_width(ws):
for column in ws.columns:
max_length = 0
column_letter = get_column_letter(column[0].column)
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
ws.column_dimensions[column_letter].width = adjusted_width
def generate_estimation_excel(requirements: str, modules: list, output_path: str = None):
"""
生成工时评估 Excel
Args:
requirements: 需求描述
modules: 工作模块列表,每项包含:
{
"name": "模块名称",
"desc": "模块描述",
"items": [
{
"name": "工作项名称",
"analysis": 1.0, # 需求分析人天
"design": 2.0, # 设计人天
"frontend": 3.0, # 前端人天
"backend": 5.0, # 后台人天
"algorithm": 0.0, # 算法人天
"test": 2.0, # 测试人天
"complexity": "中",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": ""
}
]
}
output_path: 输出路径
"""
wb = Workbook()
# Sheet 1: 工时总览
create_overview_sheet(wb, modules)
# Sheet 2-7: 各维度详情
create_dimensions_sheets(wb, modules)
# Sheet 8: 甘特图
create_gantt_sheet(wb, modules)
# Sheet 9: 重点评估
create_key_risks_sheet(wb, modules)
# Sheet 10: 关系协调
create_coordination_sheet(wb, modules)
# Sheet 11: 成本估算
create_cost_sheet(wb, modules)
# 保存
if not output_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"工时评估_{timestamp}.xlsx"
wb.save(output_path)
return output_path
def create_overview_sheet(wb, modules):
ws = wb.active
ws.title = "工时总览"
# 标题
ws.cell(row=1, column=1, value="软件开发工时评估总览").font = TITLE_FONT
ws.cell(row=2, column=1, value=f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
# 表头
headers = ["工作模块", "工作项", "需求分析", "设计", "前端", "后台", "算法", "测试", "小计", "复杂度", "风险", "并行"]
for i, h in enumerate(headers, 1):
set_header(ws, 4, i, h)
row = 5
total = {"analysis": 0, "design": 0, "frontend": 0, "backend": 0, "algorithm": 0, "test": 0}
module_starts = []
for module in modules:
module_start = row
for item in module.get("items", []):
subtotal = item.get("analysis", 0) + item.get("design", 0) + item.get("frontend", 0) + \
item.get("backend", 0) + item.get("algorithm", 0) + item.get("test", 0)
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, item.get("analysis", 0))
set_cell(ws, row, 4, item.get("design", 0))
set_cell(ws, row, 5, item.get("frontend", 0))
set_cell(ws, row, 6, item.get("backend", 0))
set_cell(ws, row, 7, item.get("algorithm", 0))
set_cell(ws, row, 8, item.get("test", 0))
set_cell(ws, row, 9, subtotal, bold=True, align='center')
set_cell(ws, row, 10, item.get("complexity", "中"))
set_cell(ws, row, 11, item.get("risk", "低"))
set_cell(ws, row, 12, "✓" if item.get("parallel", True) else "×")
total["analysis"] += item.get("analysis", 0)
total["design"] += item.get("design", 0)
total["frontend"] += item.get("frontend", 0)
total["backend"] += item.get("backend", 0)
total["algorithm"] += item.get("algorithm", 0)
total["test"] += item.get("test", 0)
row += 1
module_starts.append((module["name"], module_start, row - 1))
# 合计行
row += 1
set_header(ws, row, 1, "合计")
set_cell(ws, row, 2, "", bold=True)
set_cell(ws, row, 3, total["analysis"], bold=True, align='center')
set_cell(ws, row, 4, total["design"], bold=True, align='center')
set_cell(ws, row, 5, total["frontend"], bold=True, align='center')
set_cell(ws, row, 6, total["backend"], bold=True, align='center')
set_cell(ws, row, 7, total["algorithm"], bold=True, align='center')
set_cell(ws, row, 8, total["test"], bold=True, align='center')
grand_total = sum(total.values())
set_cell(ws, row, 9, grand_total, bold=True, align='center')
# 维度统计
row += 2
ws.cell(row=row, column=1, value="维度工时统计").font = SUBTITLE_FONT
row += 1
dim_headers = ["维度", "工时(人天)", "占比"]
for i, h in enumerate(dim_headers, 1):
set_header(ws, row, i, h)
row += 1
dimensions = [
("需求分析", total["analysis"]),
("设计", total["design"]),
("前端", total["frontend"]),
("后台", total["backend"]),
("算法", total["algorithm"]),
("测试", total["test"]),
]
for dim, hours in dimensions:
if hours > 0:
pct = f"{hours/grand_total*100:.1f}%" if grand_total > 0 else "0%"
set_cell(ws, row, 1, dim)
set_cell(ws, row, 2, hours, align='center')
set_cell(ws, row, 3, pct, align='center')
row += 1
auto_width(ws)
# 添加工时分布图表
create_distribution_charts(ws, modules)
def create_dimensions_sheets(wb, modules):
dimension_map = {
"需求分析": "analysis",
"设计": "design",
"前端": "frontend",
"后台": "backend",
"算法": "algorithm",
"测试": "test"
}
for sheet_name, key in dimension_map.items():
ws = wb.create_sheet(title=sheet_name)
ws.cell(row=1, column=1, value=f"{sheet_name}详情").font = TITLE_FONT
headers = ["工作模块", "工作项", "工作内容", "评估工时(人天)", "评估依据", "复杂度", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, 3, i, h)
row = 4
for module in modules:
for item in module.get("items", []):
hours = item.get(key, 0)
if hours > 0:
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, item.get("desc", ""))
set_cell(ws, row, 4, hours, align='center')
set_cell(ws, row, 5, item.get("basis", f"基于{sheet_name}标准"))
set_cell(ws, row, 6, item.get("complexity", "中"))
set_cell(ws, row, 7, item.get("note", ""))
row += 1
auto_width(ws)
def create_gantt_sheet(wb, modules):
ws = wb.create_sheet(title="甘特图")
ws.cell(row=1, column=1, value="项目进度甘特图(跳过周末和节假日)").font = TITLE_FONT
ws.cell(row=2, column=1, value=f"生成时间: {datetime.now().strftime('%Y-%m-%d')}")
headers = ["任务ID", "任务名称", "执行人", "开始日期", "结束日期", "工作日(天)", "日历日(天)", "前置任务", "状态", "里程碑"]
for i, h in enumerate(headers, 1):
set_header(ws, 3, i, h)
# 从今天开始,跳过周末和节假日
start_date = datetime.now()
# 确保从工作日开始
while not is_working_day(start_date):
start_date += timedelta(days=1)
row = 4
task_id = 1
milestones = ["需求确认", "设计完成", "开发完成", "测试完成", "上线部署"]
milestone_idx = 0
for module in modules:
for item in module.get("items", []):
total_hours = item.get("analysis", 0) + item.get("design", 0) + \
item.get("frontend", 0) + item.get("backend", 0) + \
item.get("algorithm", 0) + item.get("test", 0)
working_days = max(1, int(total_hours))
# 计算工作日结束日期
end_date = add_working_days(start_date, working_days)
# 计算日历天数(含休息日)
calendar_days = (end_date - start_date).days + 1
# 判断里程碑
is_milestone = ""
if milestone_idx < len(milestones) and working_days >= 5:
is_milestone = milestones[milestone_idx]
milestone_idx += 1
set_cell(ws, row, 1, f"T{task_id:03d}")
set_cell(ws, row, 2, f"{module['name']}-{item['name']}")
set_cell(ws, row, 3, item.get("assignee", "待分配"))
set_cell(ws, row, 4, start_date.strftime("%Y-%m-%d"))
set_cell(ws, row, 5, end_date.strftime("%Y-%m-%d"))
set_cell(ws, row, 6, working_days, align='center')
set_cell(ws, row, 7, calendar_days, align='center')
set_cell(ws, row, 8, item.get("prerequisite", "-"))
set_cell(ws, row, 9, "待开始")
set_cell(ws, row, 10, is_milestone)
# 下一个任务从休息日后开始(跳过周末和节假日)
next_start = end_date + timedelta(days=1)
while not is_working_day(next_start):
next_start += timedelta(days=1)
start_date = next_start
task_id += 1
row += 1
# 模块间休息1天(跳过周末和节假日)
start_date += timedelta(days=1)
while not is_working_day(start_date):
start_date += timedelta(days=1)
# 项目总工期
row += 2
if task_id > 1:
ws.cell(row=row, column=1, value="项目总工期(工作日)").font = SUBTITLE_FONT
# 重新计算总工期
total_start = datetime.now()
while not is_working_day(total_start):
total_start += timedelta(days=1)
final_end = datetime.now()
for m in modules:
for it in m.get("items", []):
days = int(it.get("analysis", 0) + it.get("design", 0) + it.get("frontend", 0) + \
it.get("backend", 0) + it.get("algorithm", 0) + it.get("test", 0))
final_end = add_working_days(total_start, days)
total_start = final_end + timedelta(days=1)
while not is_working_day(total_start):
total_start += timedelta(days=1)
total_workdays = get_working_days_between(datetime.now(), final_end)
ws.cell(row=row, column=3, value=f"约 {total_workdays} 个工作日")
auto_width(ws)
# 添加甘特图条形图
create_gantt_chart(ws, modules)
def create_cost_sheet(wb, modules):
"""创建成本估算表"""
ws = wb.create_sheet(title="成本估算")
ws.cell(row=1, column=1, value="项目成本估算").font = TITLE_FONT
ws.cell(row=2, column=1, value=f"生成时间: {datetime.now().strftime('%Y-%m-%d')}")
# ========== 人力成本 ==========
row = 4
ws.cell(row=row, column=1, value="一、人力成本").font = SUBTITLE_FONT
row += 1
headers = ["角色", "工时(人天)", "人数", "日均成本(元)", "小计(元)", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, row, i, h)
row += 1
# 计算各角色总工时
total = {"analysis": 0, "design": 0, "frontend": 0, "backend": 0, "algorithm": 0, "test": 0}
for module in modules:
for item in module.get("items", []):
total["analysis"] += item.get("analysis", 0)
total["design"] += item.get("design", 0)
total["frontend"] += item.get("frontend", 0)
total["backend"] += item.get("backend", 0)
total["algorithm"] += item.get("algorithm", 0)
total["test"] += item.get("test", 0)
# 角色映射和日均成本(可配置)
role_rates = [
("需求分析师", total["analysis"], 1, 1500, "需求分析"),
("UI/UX设计师", total["design"], 1, 1200, "设计"),
("前端工程师", total["frontend"], 1, 1500, "前端"),
("后端工程师", total["backend"], 1, 1800, "后台"),
("算法工程师", total["algorithm"], 1, 2000, "算法"),
("测试工程师", total["test"], 1, 1200, "测试"),
]
total_labor = 0
for role, days, count, daily_rate, _ in role_rates:
if days > 0:
subtotal = days * count * daily_rate
total_labor += subtotal
set_cell(ws, row, 1, role)
set_cell(ws, row, 2, days, align='center')
set_cell(ws, row, 3, count, align='center')
set_cell(ws, row, 4, daily_rate, align='center')
set_cell(ws, row, 5, subtotal, align='center', fill=MONEY_FILL)
set_cell(ws, row, 6, "")
row += 1
# 人力成本合计
set_header(ws, row, 1, "人力成本合计")
set_cell(ws, row, 5, total_labor, bold=True, align='center', fill=MONEY_FILL)
row += 2
# ========== 软硬件成本 ==========
ws.cell(row=row, column=1, value="二、软硬件成本").font = SUBTITLE_FONT
row += 1
headers = ["类别", "项目", "规格/数量", "单次成本(元)", "周期(月)", "小计(元)", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, row, i, h)
row += 1
# 软硬件成本项目
hw_items = [
("服务器", "云服务器(ECS)", "2核4G", 500, 3, "部署、后端服务"),
("服务器", "数据库服务(RDS)", "基础版", 300, 3, "MySQL数据库"),
("服务器", "对象存储(OSS)", "100GB", 50, 3, "文件存储"),
("域名", "域名注册", "1个", 50, 12, "域名费用"),
("SSL证书", "HTTPS证书", "1个/年", 200, 12, "安全证书"),
("第三方服务", "短信服务", "按量付费", 100, 3, "验证码短信"),
("第三方服务", "支付通道", "按交易收费", 0, 3, "支付宝/微信"),
("第三方服务", "CDN加速", "基础套餐", 100, 3, "静态资源加速"),
("软件", "开发工具", "IDE许可证", 0, 0, "免费工具"),
("软件", "设计软件", "设计工具", 0, 0, "免费/Figma"),
]
total_hw = 0
for cat, item, spec, unit_cost, months, note in hw_items:
subtotal = unit_cost * months
total_hw += subtotal
set_cell(ws, row, 1, cat)
set_cell(ws, row, 2, item)
set_cell(ws, row, 3, spec)
set_cell(ws, row, 4, unit_cost if unit_cost > 0 else "-", align='center')
set_cell(ws, row, 5, f"{months}月" if months > 0 else "-", align='center')
set_cell(ws, row, 6, subtotal if subtotal > 0 else "-", align='center', fill=MONEY_FILL)
set_cell(ws, row, 7, note)
row += 1
# 软硬件成本合计
set_header(ws, row, 1, "软硬件成本合计")
set_cell(ws, row, 6, total_hw, bold=True, align='center', fill=MONEY_FILL)
row += 2
# ========== 项目总成本 ==========
ws.cell(row=row, column=1, value="三、项目总成本").font = SUBTITLE_FONT
row += 1
total_project = total_labor + total_hw
set_header(ws, row, 1, "项目总预算")
set_cell(ws, row, 2, total_project, bold=True, align='center', fill=MONEY_FILL)
ws.cell(row=row, column=3, value=f"(人力{total_labor}元 + 软硬件{total_hw}元)")
row += 2
# ========== 成本说明 ==========
ws.cell(row=row, column=1, value="四、成本说明").font = SUBTITLE_FONT
row += 1
notes = [
"1. 人力成本按每天8小时工作制计算",
"2. 日均成本为参考价,可根据实际情况调整",
"3. 软硬件成本按最低配置估算,流量费用另计",
"4. 第三方服务(支付、短信)通常有交易手续费",
"5. 未包含项目管理和沟通成本",
"6. 预留10-20%应急预算",
]
for note in notes:
ws.cell(row=row, column=1, value=note)
row += 1
# 建议预算
row += 1
recommended = int(total_project * 1.15) # 15% buffer
set_cell(ws, row, 1, f"建议项目预算(含15%应急): ", bold=True)
set_cell(ws, row, 2, recommended, bold=True, align='center', fill=MONEY_FILL)
ws.cell(row=row, column=3, value="元")
auto_width(ws)
def create_key_risks_sheet(wb, modules):
ws = wb.create_sheet(title="重点评估")
ws.cell(row=1, column=1, value="重点评估与风险项").font = TITLE_FONT
ws.cell(row=2, column=1, value="以下列出高风险、不确定性大或技术难点明显的工作项")
headers = ["工作模块", "工作项", "风险类型", "风险描述", "影响评估", "建议措施", "优先级"]
for i, h in enumerate(headers, 1):
set_header(ws, 4, i, h)
row = 5
risk_types = {
"高": "高风险",
"中": "中等风险",
"低": "低风险"
}
for module in modules:
for item in module.get("items", []):
risk = item.get("risk", "低")
if risk in ["高", "中"]:
# 评估不确定性
if "algorithm" in item and item.get("algorithm", 0) > 3:
risk_type = "技术难点"
elif not item.get("basis"):
risk_type = "需求不明确"
else:
risk_type = risk_types.get(risk, "其他")
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, risk_type)
set_cell(ws, row, 4, item.get("risk_desc", f"该工作项复杂度{item.get('complexity', '中')},存在一定不确定性"))
set_cell(ws, row, 5, item.get("impact", "可能导致进度延误或需要额外资源"))
set_cell(ws, row, 6, item.get("suggestion", "建议预留buffer时间,提前技术验证"))
set_cell(ws, row, 7, "高" if risk == "高" else "中", align='center')
row += 1
if row == 5:
set_cell(ws, row, 1, "暂无高风险项")
auto_width(ws)
def create_coordination_sheet(wb, modules):
ws = wb.create_sheet(title="关系协调")
ws.cell(row=1, column=1, value="工作关系与协调事项").font = TITLE_FONT
headers = ["工作模块", "工作项", "前置依赖", "协调事项", "协调对象", "协调时间点", "备注"]
for i, h in enumerate(headers, 1):
set_header(ws, 3, i, h)
row = 4
for module in modules:
for item in module.get("items", []):
# 检查是否有协调事项
has_coordination = item.get("coordination") or item.get("prerequisite")
set_cell(ws, row, 1, module["name"])
set_cell(ws, row, 2, item["name"])
set_cell(ws, row, 3, item.get("prerequisite", "-"))
set_cell(ws, row, 4, item.get("coordination", "-"))
set_cell(ws, row, 5, item.get("coord_target", "待确认"))
set_cell(ws, row, 6, item.get("coord_time", "开发前"))
set_cell(ws, row, 7, item.get("note", ""))
row += 1
# 添加协调关系说明
row += 2
ws.cell(row=row, column=1, value="协调关系类型说明:").font = SUBTITLE_FONT
row += 1
coord_types = [
("前置依赖", "某工作项必须在其他工作项完成后才能开始"),
("接口协调", "前后端需协调接口定义和数据格式"),
("资源协调", "需要申请特定资源(服务器、第三方服务等)"),
("评审协调", "需要安排评审会议(设计评审、代码评审等)"),
]
for coord_type, desc in coord_types:
set_cell(ws, row, 1, coord_type, bold=True)
set_cell(ws, row, 2, desc)
row += 1
auto_width(ws)
def create_gantt_chart(ws, modules):
"""在甘特图Sheet中创建条形图"""
# 准备图表数据区域(在甘特图数据下方)
chart_start_row = ws.max_row + 3
# 写入图表数据:任务名、开始日期、时长
ws.cell(row=chart_start_row, column=1, value="任务名称").font = Font(bold=True)
ws.cell(row=chart_start_row, column=2, value="开始日期").font = Font(bold=True)
ws.cell(row=chart_start_row, column=3, value="时长(天)").font = Font(bold=True)
row = chart_start_row + 1
chart_data_start = row
start_date = datetime.now()
while not is_working_day(start_date):
start_date += timedelta(days=1)
for module in modules:
for item in module.get("items", []):
total = item.get("analysis", 0) + item.get("design", 0) + \
item.get("frontend", 0) + item.get("backend", 0) + \
item.get("algorithm", 0) + item.get("test", 0)
days = max(1, int(total))
end_date = add_working_days(start_date, days)
ws.cell(row=row, column=1, value=f"{module['name']}-{item['name']}")
ws.cell(row=row, column=2, value=start_date)
ws.cell(row=row, column=3, value=days)
# 格式化日期
ws.cell(row=row, column=2).number_format = 'YYYY-MM-DD'
next_start = end_date + timedelta(days=1)
while not is_working_day(next_start):
next_start += timedelta(days=1)
start_date = next_start
row += 1
chart_data_end = row - 1
# 创建甘特图
chart = BarChart()
chart.type = "bar" # 横向条形图
chart.title = "项目进度甘特图"
chart.y_axis.title = "任务"
chart.x_axis.title = "日期"
chart.style = 10
# 数据系列
data = Reference(ws, min_col=3, min_row=chart_start_row, max_row=chart_data_end)
cats = Reference(ws, min_col=1, min_row=chart_start_row + 1, max_row=chart_data_end)
chart.add_data(data, titles_from_data=True)
chart.set_categories(cats)
chart.shape = 4
chart.width = 20
chart.height = 12
# 放置图表
ws.add_chart(chart, f"H{chart_start_row}")
def create_distribution_charts(ws, modules):
"""在工作总览Sheet中创建工时分布图表"""
# 计算各维度工时
total = {"analysis": 0, "design": 0, "frontend": 0, "backend": 0, "algorithm": 0, "test": 0}
module_totals = {}
for module in modules:
module_total = 0
for item in module.get("items", []):
item_total = item.get("analysis", 0) + item.get("design", 0) + \
item.get("frontend", 0) + item.get("backend", 0) + \
item.get("algorithm", 0) + item.get("test", 0)
total["analysis"] += item.get("analysis", 0)
total["design"] += item.get("design", 0)
total["frontend"] += item.get("frontend", 0)
total["backend"] += item.get("backend", 0)
total["algorithm"] += item.get("algorithm", 0)
total["test"] += item.get("test", 0)
module_total += item_total
module_totals[module["name"]] = module_total
grand_total = sum(total.values())
# 找到总览Sheet的最后一行
chart_row = ws.max_row + 3
# ========== 维度占比饼图 ==========
ws.cell(row=chart_row, column=1, value="工时维度占比").font = Font(bold=True, size=12)
chart_row += 1
# 写入饼图数据
ws.cell(row=chart_row, column=1, value="维度")
ws.cell(row=chart_row, column=2, value="工时(人天)")
pie_data_row = chart_row + 1
dimensions = [("需求分析", total["analysis"]),
("设计", total["design"]),
("前端", total["frontend"]),
("后台", total["backend"]),
("算法", total["algorithm"]),
("测试", total["test"])]
row = pie_data_row
for dim, hours in dimensions:
if hours > 0:
ws.cell(row=row, column=1, value=dim)
ws.cell(row=row, column=2, value=hours)
row += 1
pie_data_end = row - 1
# 创建饼图
pie = PieChart()
labels = Reference(ws, min_col=1, min_row=pie_data_row, max_row=pie_data_end)
data = Reference(ws, min_col=2, min_row=pie_data_row - 1, max_row=pie_data_end)
pie.add_data(data, titles_from_data=True)
pie.set_categories(labels)
pie.title = "各维度工时占比"
pie.style = 10
pie.width = 12
pie.height = 10
# 添加数据标签
pie.dataLabels = DataLabelList()
pie.dataLabels.showPercent = True
pie.dataLabels.showVal = True
pie.dataLabels.showCatName = True
ws.add_chart(pie, f"D{chart_row}")
# ========== 模块占比柱状图 ==========
chart_row = pie_data_end + 3
ws.cell(row=chart_row, column=1, value="各模块工时对比").font = Font(bold=True, size=12)
chart_row += 1
# 写入柱状图数据
ws.cell(row=chart_row, column=1, value="模块")
ws.cell(row=chart_row, column=2, value="工时(人天)")
bar_data_row = chart_row + 1
row = bar_data_row
for module_name, hours in module_totals.items():
ws.cell(row=row, column=1, value=module_name)
ws.cell(row=row, column=2, value=hours)
row += 1
bar_data_end = row - 1
# 创建柱状图
bar = BarChart()
bar.type = "col"
bar.style = 10
bar.title = "各模块工时对比"
bar.y_axis.title = "工时(人天)"
bar.x_axis.title = "模块"
labels = Reference(ws, min_col=1, min_row=bar_data_row, max_row=bar_data_end)
data = Reference(ws, min_col=2, min_row=bar_data_row - 1, max_row=bar_data_end)
bar.add_data(data, titles_from_data=True)
bar.set_categories(labels)
bar.width = 14
bar.height = 10
ws.add_chart(bar, f"D{chart_row}")
def parse_requirements(requirements_text: str) -> list:
"""
解析需求文本,生成模块结构
这是一个简化的解析,实际使用时可能需要更复杂的处理
"""
# 简单的模块拆分逻辑
modules = []
current_module = None
lines = requirements_text.split("\n")
for line in lines:
line = line.strip()
if not line:
continue
# 检测是否是模块标题(通常是 ## 或 ### 开头,或者是 "XX模块" 格式)
if line.startswith("#"):
if current_module:
modules.append(current_module)
current_module = {
"name": line.lstrip("#").strip(),
"desc": "",
"items": []
}
elif "模块" in line and ":" in line:
if current_module:
modules.append(current_module)
module_name = line.split(":")[0].strip()
module_desc = line.split(":")[1].strip() if ":" in line else ""
current_module = {
"name": module_name,
"desc": module_desc,
"items": []
}
if current_module:
modules.append(current_module)
return modules
if __name__ == "__main__":
# 测试
test_modules = [
{
"name": "用户系统",
"desc": "用户登录注册相关功能",
"items": [
{
"name": "登录注册",
"desc": "手机号+验证码登录",
"analysis": 1.0,
"design": 1.0,
"frontend": 2.0,
"backend": 3.0,
"algorithm": 0,
"test": 1.0,
"complexity": "低",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": "需与短信服务商协调"
}
]
}
]
output = generate_estimation_excel("测试需求", test_modules)
print(f"已生成: {output}")
FILE:scripts/test_login.py
"""测试:APP手机号登录注册工时评估"""
import sys
sys.path.insert(0, "C:/Users/Administrator/AppData/Roaming/LobsterAI/SKILLs/work-estimation/scripts")
from generate_estimation import generate_estimation_excel
modules = [
{
"name": "用户系统",
"desc": "APP手机号登录注册模块",
"items": [
{
"name": "登录注册界面",
"desc": "手机号输入、验证码发送、倒计时、协议勾选",
"analysis": 0.5,
"design": 1.0,
"frontend": 2.0,
"backend": 1.5,
"algorithm": 0,
"test": 0.5,
"complexity": "低",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": "需与短信服务商协调"
},
{
"name": "验证码服务",
"desc": "短信验证码生成、发送、校验(60秒有效期)",
"analysis": 0.5,
"design": 0.5,
"frontend": 0,
"backend": 2.0,
"algorithm": 0,
"test": 0.5,
"complexity": "中",
"risk": "低",
"parallel": True,
"prerequisite": "",
"coordination": "需与短信服务商协调接口"
},
{
"name": "用户信息存储",
"desc": "用户表设计、注册流程、登录Token生成",
"analysis": 0.5,
"design": 1.0,
"frontend": 0,
"backend": 2.5,
"algorithm": 0,
"test": 0.5,
"complexity": "中",
"risk": "低",
"parallel": False,
"prerequisite": "验证码服务完成后",
"coordination": ""
},
{
"name": "第三方登录(可选)",
"desc": "微信/Apple登录集成",
"analysis": 0.5,
"design": 0.5,
"frontend": 1.5,
"backend": 1.5,
"algorithm": 0,
"test": 0.5,
"complexity": "高",
"risk": "中",
"parallel": True,
"prerequisite": "",
"coordination": "需微信/Apple开发者账号"
}
]
}
]
output = generate_estimation_excel("APP手机号登录注册", modules)
print(f"已生成: {output}")
FILE:references/evaluation-guide.md
# 软件开发工时评估指南
## 评估维度说明
### 1. 需求分析
- 需求调研与访谈
- 需求文档编写
- 需求评审与确认
- 需求变更管理
### 2. 设计
- 架构设计
- UI/UX 设计
- 数据库设计
- 接口设计
- 详细设计
### 3. 前端
- 页面开发
- 组件封装
- 状态管理
- 性能优化
- 兼容性处理
### 4. 后台
- 服务端开发
- API 开发
- 数据库实现
- 缓存设计
- 安全处理
### 5. 算法
- 业务算法实现
- 数据处理逻辑
- AI/ML 模型(如涉及)
- 性能算法优化
### 6. 测试
- 单元测试
- 集成测试
- 系统测试
- 性能测试
- 用户验收测试
---
## 复杂度评估标准
### 前端复杂度
| 复杂度 | 描述 | 典型场景 |
|--------|------|---------|
| 低 | 静态页面,少量交互 | 展示页面、表单 |
| 中 | 动态页面,状态管理 | 列表页、表单验证 |
| 高 | 复杂交互、状态同步 | 实时协作、拖拽 |
### 后台复杂度
| 复杂度 | 描述 | 典型场景 |
|--------|------|---------|
| 低 | CRUD,单表操作 | 基础增删改查 |
| 中 | 业务逻辑、事务处理 | 订单处理、库存管理 |
| 高 | 分布式、高并发 | 秒杀、实时计算 |
### 算法复杂度
| 复杂度 | 描述 | 典型场景 |
|--------|------|---------|
| 低 | 简单计算逻辑 | 统计、筛选、排序 |
| 中 | 中等算法逻辑 | 推荐算法、搜索排序 |
| 高 | 复杂算法/AI | 图像识别、NLP、深度学习 |
---
## 工时评估速查表
### 需求分析
| 工作项 | 低 | 中 | 高 |
|--------|----|----|-----|
| 需求调研 | 1天 | 2-3天 | 5天+ |
| 需求文档 | 1天 | 2-3天 | 5天+ |
| 需求评审 | 0.5天 | 1天 | 2天+ |
### 设计
| 工作项 | 低 | 中 | 高 |
|--------|----|----|-----|
| 架构设计 | 1-2天 | 3-5天 | 1-2周 |
| UI设计 | 2-3天 | 5-7天 | 2-3周 |
| 数据库设计 | 0.5天 | 1-2天 | 3-5天 |
### 开发(每功能点)
| 角色 | 低 | 中 | 高 |
|------|----|----|-----|
| 前端 | 0.5-1天 | 1-2天 | 2-5天 |
| 后台 | 1-2天 | 2-4天 | 5-10天 |
| 算法 | 1-2天 | 3-5天 | 5-10天 |
### 测试
| 工作项 | 系数 | 说明 |
|--------|------|------|
| 功能测试 | 0.3-0.5 | 相对开发工时 |
| 集成测试 | 0.2-0.3 | 相对开发工时 |
| 性能测试 | 0.1-0.2 | 相对开发工时 |
---
## 甘特图规划原则
### 并行工作识别
- 前端页面开发可并行
- 多个独立模块可并行
- 设计和前端可部分并行
- 前后端可并行开发(接口约定后)
### 关键路径
- 串联执行的工作项
- 决定最短工期的路径
- 需要重点监控的节点
### 里程碑设置
- 需求确认
- 设计完成
- 开发完成
- 测试完成
- 上线部署
---
## 风险评估标准
### 重点评估项(需单独说明)
1. 技术难点不明确
2. 第三方依赖不确定
3. 需求边界模糊
4. 性能要求极高
5. 团队经验不足
### 风险等级
| 等级 | 说明 | 建议 Buffer |
|------|------|------------|
| 低 | 技术成熟、需求清晰 | 10% |
| 中 | 有一定复杂度 | 20% |
| 高 | 全新技术或模糊需求 | 30%+ |
---
## Excel 输出模板结构
```
Sheet 1: 工时总览
Sheet 2: 需求分析详情
Sheet 3: 设计详情
Sheet 4: 前端详情
Sheet 5: 后台详情
Sheet 6: 算法详情
Sheet 7: 测试详情
Sheet 8: 甘特图
Sheet 9: 重点评估
Sheet 10: 关系协调
```
### 甘特图列格式
| 任务名 | 开始日期 | 结束日期 | 时长(天) | 前置任务 | 执行人 |
|--------|---------|---------|---------|---------|--------|
FILE:evals/evals.json
[
{
"id": "eval-001",
"name": "电商小程序工时评估",
"input": {
"requirements": "开发一个电商小程序,包括用户登录、商品展示、购物车、订单支付功能"
},
"expected": {
"modules_count": 5,
"has_overview": true,
"has_gantt": true,
"has_risks": true,
"has_coordination": true,
"dimensions": ["需求分析", "设计", "前端", "后台", "算法", "测试"]
}
},
{
"id": "eval-002",
"name": "企业内部管理系统评估",
"input": {
"requirements": "开发企业内部OA系统,包含审批流程、考勤管理、公告发布三个模块"
},
"expected": {
"modules_count": 3,
"has_overview": true,
"has_gantt": true
}
},
{
"id": "eval-003",
"name": "AI推荐系统评估",
"input": {
"requirements": "开发一个内容推荐系统,包括用户画像、推荐算法、前端展示三大块"
},
"expected": {
"modules_count": 3,
"has_algorithm_sheet": true,
"algorithm_hours_defined": true
}
}
]
Multi-Agent system for writing ultra-long feasibility study reports. Phase 0 Requirement Confirmation - Phase 1 Planner outputs outline - Phase 2 Batch paral...
---
name: lobsterai-report-agent
description: Multi-Agent system for writing ultra-long feasibility study reports. Phase 0 Requirement Confirmation - Phase 1 Planner outputs outline - Phase 2 Batch parallel sub-Agent writing - Phase 2.5 Cross-chapter consistency review - Phase 3 Integrator assembles polished docx.
version: "4.0.0"
homepage: https://github.com/jinqiu193/lobsterai-report-agent
license: MIT
metadata:
openclaw:
category: document-generation
triggers:
- "写可研"
- "写报告"
- "多章节"
- "并行撰写"
- "agent写文档"
---
Phase 2.5 跨章一致性审查 → Phase 3 整合师汇总输出精美 docx。
核心文件:src/engine.py(核心逻辑)、src/config.py(配置)、integrate_report.py(兼容 facade)。
---
# 超长可研报告多Agent协作撰写 v3.3
## 更新日志(v4.0)
- ✅ 重构拆分:`src/config.py`(配置+I/O) + `src/engine.py`(核心逻辑) + `src/cli.py`(CLI入口)
- ✅ `integrate_report.py` 保留为 facade,100% 向后兼容旧接口
- ✅ Mermaid CLI 惰性加载(import 时不执行 subprocess)
- ✅ 路径全部可配置(环境变量 `LOBAI_CHAPTERS_DIR` / `LOBAI_OUTPUT_DIR`)
- ✅ 通知渠道可配置(`notify.py`:`log` / `feishu` / `openclaw-weixin`)
- ✅ 新增 `README.md` + `LICENSE`
## 核心能力
- **多Agent并行**:最多5个子Agent并发撰写,效率翻倍
- **增量更新**:内容未变化的章节跳过重写,速度提升
- **精美排版**:自动生成封面、表格式目录、彩色章节标题、重点标注盒、美化表格
- **飞书RAG**:自动检索飞书知识库补充参考资料
- **6种封面风格**:随意切换,适合不同场景
---
## 文件结构
```
skill_dir/
├── SKILL.md # 本文件
├── README.md # 项目说明(开源版)
├── LICENSE # MIT License
├── requirements.txt # Python 依赖
├── integrate_report.py # facade(兼容旧接口)+ CLI 入口
├── parallel_tracker.py # 并行进度追踪
├── notify.py # 可配置通知渠道
├── src/
│ ├── __init__.py # 公共 API 导出
│ ├── config.py # 路径配置 + 文件 I/O
│ ├── engine.py # 核心业务逻辑
│ └── cli.py # CLI 入口
└── references/ # 子流程参考文档
├── phase0_guide.md # Phase 0 需求确认流程
├── phase1_guide.md # 规划师 prompt 模板
├── phase2_guide.md # 子Agent prompt 模板
├── table_format_guide.md # Markdown表格格式规范
└── bug_fix_guide.md # Bug排查与强制重建
```
> **首次使用前**:工作目录为 `~/.config/lobsterai-report-agent/`(自动创建),可通过 `LOBAI_CHAPTERS_DIR` 环境变量覆盖。
---
## Pipeline 路由
```
用户任务
├─ 首次提出撰写需求("我要写xxx"/"帮我写可研报告")
│ → Phase 0 需求确认 → Phase 1 规划师
│
├─ 已有大纲,要求开始撰写
│ → Phase 2 分批并行子Agent
│
├─ 某章节需修改
│ → 小改动:直接编辑 F:/agent/chapters/0X-xxx.txt
│ → 大改动:重新生成该章节
│
├─ 所有章节已完成,要求生成 docx
│ → Phase 2.5 审查 → Phase 3 整合师汇总
│
├─ 独立小方案(2~5章,无现有chapters依赖)
│ → 直接写作 Markdown → make_docx.py 生成精美 docx
│ → 参考:references/bug_fix_guide.md "make_docx.py 模式"
│
└─ 只需查看进度/术语表/参考资料
→ 直接 CLI 命令
```
---
## Phase 0:需求确认
依次确认4项,全部确认后进入 Phase 1:
1. **写作主题**:文档类型/读者/风格/特殊约束
2. **背景信息**:项目背景/建设目标/行业背景
3. **参考资料**(最重要):
- A. 本地文件路径或直接粘贴
- B. 飞书文档(RAG检索)
- C. 直接粘贴内容
- D. 暂不提供
4. **大纲确认**:规划师输出大纲后,用户选择 A.开始 / B.调整 / C.取消
> 参考资料越充分,内容与业务越贴合。详见 `references/phase0_guide.md`
---
## Phase 1:规划师
**输入**:Phase 0 的主题/背景/参考资料
**执行**:
```bash
python integrate_report.py glossary
```
自动生成 `plan.json` + `plan_outline_snapshot.md`
> 详细 prompt 模板见 `references/phase1_guide.md`
**完成后通知**(通过 `notify.py`,渠道由 `config.json` 的 `notification_channel` 字段决定):
```python
# 在 Agent 指令中使用 notify 模块
from notify import notify
notify(f"""📋 报告大纲已生成
📌 《[报告主题]》
📊 章节数:[X]章
🔍 行业:[行业领域]
✅ 大纲确认后请回复"开始撰写",系统将启动并行创作!""")
```
默认渠道为 `log`(打印到控制台)。开源用户可配置为 `feishu` 或 `openclaw-weixin`。
---
## Phase 2:分批并行子Agent
**执行流程**(全自动,无需人工确认):
1. 展示大纲/当前批次状态(仅展示,不等待确认)
2. `python parallel_tracker.py clear` 清空上批次状态
3. 最多5并发启动子Agent(`sessions_spawn`),自动执行全部批次
4. `python parallel_tracker.py wait` 后台监控,直至本批全部完成
5. 完成后自动执行 `python integrate_report.py convert-batch`
**子Agent prompt 模板**:见 `references/phase2_guide.md`
**每批次完成后通知**:
```python
from notify import notify
notify(f"""✅ 第[X]批章节撰写完成!
📖 已完成:[已完成数]/[总章节数] 章
📝 本批完成:[章节列表]
⏳ 下一批:[下一批章节列表]
(自动进入下一批,无需人工确认)""")
```
📖 已完成:[已完成数]/[总章节数] 章
📝 本批完成:
• [章节1标题]
• [章节2标题]
• [章节3标题](如有)
⏳ 下一批:[下一批章节列表]
(自动进入下一批,无需人工确认)
```
- 小改动:直接编辑 `F:/agent/chapters/0X-xxx.txt`,保存后重新生成
- 大改动:重新触发子Agent重写,替换原文件
---
## Phase 2.5:跨章一致性审查
```bash
python integrate_report.py check
```
审查数量指标一致性与术语统一性(对照 glossary.json)
**审查完成后通知**:
```python
from notify import notify
notify(f"""🔍 一致性审查完成
✅ 术语统一性:正常
✅ 数量指标:一致
✅ 跨章引用:无冲突
📄 即将进入最终整合阶段...
```
---
## Phase 3:整合师汇总
```bash
python integrate_report.py
```
自动完成:解析章节(错误隔离)→ 更新术语表 → 一致性审查 → 生成精美 docx
**最终完成后通知**:
```python
from notify import notify
notify(f"""🎉🎉🎉 报告撰写完成!🎉🎉🎉
📄 《[报告主题]》
📊 规模:[X]章 / 约[Y]万字
🎨 封面风格:[风格名称]
✅ 精美版报告已生成!
📁 文件位置:F:/agent/chapters/output/
文心,全文已就绪,可进行后续审阅~
```
---
## 文档美化功能(自动应用)
生成报告自动包含以下排版效果(通过 `plan.json` 中 `cover_style` 字段选择):
1. **6种封面风格** — 修改 `plan.json` → `cover_style` 字段(整数 1~6)
2. **执行摘要** — 深蓝标题条(`#1F4E79`)背景 + 白字 + 正文缩进
3. **表格式目录** — 深蓝标题条 + 三列条目(序号/章节/页码)
4. **彩色章节标题**:
- H1:整行深藏蓝底色 `#1F4E79` + 白字微软雅黑
- H2:中蓝底色 `#2E75B6` + 白字
- H3:淡蓝背景 `#D6E4F0` + 深蓝字 + `▌` 左边条
5. **重点标注盒** — 自动识别【关键】【注意】【优势】【风险】【数据】标签,渲染为彩色卡片(背景/白字/边框)
6. **美化表格** — 表头深藏蓝背景 `#1F4E79` + 白字 + 奇偶行交替底色(`#DEEAF6` / `#FFFFFF`)
---
## 封面样式(6种风格)
封面风格通过 `plan.json` 中的 `cover_style` 字段指定(整数,1~6):
| 编号 | 风格名称 | 特点 | 推荐场景 |
|------|----------|------|----------|
| 1 | 经典政务风格 | 深藏蓝顶条 + 金色点缀 | 政府/国企审批 |
| 2 | 现代简约风格 | 左侧蓝色重色块 + 右侧信息 | 科技/商务汇报 |
| 3 | 商务典雅风格 | 酒红配色 + 居中递进 | 咨询/投行报告 |
| 4 | 科技数字风格 | 深海蓝铺满 + 大字白字标题 | 互联网/数字化 |
| 5 | 中式传统风格 | 故宫红 + 宣纸米色背景 | 传统文化/国企 |
| 6 | 全屏沉浸风格 | 深海蓝铺满 + 大字白字标题 | 数字化/科技项目 |
> **注意**:`cover_style` 值为整数(如 `4`),代码会自动转换为字符串比较。
---
## CLI 命令速查
| 命令 | 作用 |
|------|------|
| `python integrate_report.py` | 生成整合报告(全量) |
| `python integrate_report.py convert-batch` | 批量生成 docx |
| `python integrate_report.py convert-one <in> <out>` | 单章转 docx |
| `python integrate_report.py check` | 一致性审查 |
| `python integrate_report.py glossary` | 术语表生成/更新 |
| `python integrate_report.py ref show` | 查看参考资料 |
| `python integrate_report.py ref clear` | 清空参考资料 |
| `python integrate_report.py preview [章节前缀]` | 预览章节摘要 |
| `python integrate_report.py feishu-search <query>` | 搜索飞书知识库 |
| `python parallel_tracker.py show` | 查看撰写进度 |
| `python parallel_tracker.py wait` | 阻塞监控(Ctrl+C停止) |
| `python parallel_tracker.py clear` | 清空追踪状态 |
> **封面风格切换**:修改 `F:/agent/chapters/plan.json` 中的 `cover_style` 字段(整数 1~6),然后重新生成。
> 修改代码后需删除 `__pycache__` 下的 `.pyc` 文件 + `content_hashes.json` 强制重建。
---
## 状态文件
> 工作目录默认:`~/.config/lobsterai-report-agent/chapters/`(可通过 `LOBAI_CHAPTERS_DIR` 环境变量覆盖)
| 文件 | 作用 |
|------|------|
| `<CHAPTERS_DIR>/plan.json` | 章节元数据 |
| `<CHAPTERS_DIR>/glossary.json` | 术语表 |
| `<CHAPTERS_DIR>/reference_material.txt` | 参考资料原文 |
| `<CHAPTERS_DIR>/plan_outline_snapshot.md` | 大纲快照 |
| `<CHAPTERS_DIR>/content_hashes.json` | 增量缓存(删后强制重建) |
| `<CHAPTERS_DIR>/writing_tracker.json` | 并行进度追踪 |
| `<CHAPTERS_DIR>/config.json` | 项目配置(封面风格、通知渠道等) |
---
## Critical Rules
### Markdown 表格格式(子Agent必须遵守)
详见 `references/table_format_guide.md`
核心要点:
- 分隔行必须是 `|---|---|---|`(首尾 `|` 不可省略)
- 各行列数必须与表头一致
- 单元格内容避免包含 `|`(用 `~` 或 `-` 表示范围)
### 强制重建(修改代码后必须两步都做)
修改 `src/engine.py` 或 `src/config.py` 后,必须同时删除以下两个文件才能让新代码生效:
```bash
# 1. 删除 .pyc 缓存(修改代码后必须)
# Linux/macOS
find . -type d -name __pycache__ -exec rm -rf {} +
# Windows
Get-ChildItem . -Recurse -Directory __pycache__ | Remove-Item -Recurse -Force
# 2. 删除增量 hash(否则增量模式跳过重写)
# 默认路径
del "%USERPROFILE%\.config\lobsterai-report-agent\chapters\content_hashes.json"
# 3. 重新生成
python integrate_report.py
```
### 已知 Bug 已修复(记录备查)
详见 `references/bug_fix_guide.md`,包括:
- `_flush_table` 未调用 `_parse_md_table` 导致表格逐字拆列(v3 早期版本)
- `ensure_mermaid_deps()` 在 import 时执行 subprocess(现已改为惰性)
- `.pyc` 缓存导致修改后的代码不生效
- `RGBColor` 用索引而非 `.red/.green/.blue` 属性
- `add_cover()` 设置 `section.margin=0` 导致正文无边距
---
## References
| 文件 | 内容 |
|------|------|
| `references/phase0_guide.md` | Phase 0 需求确认完整流程与话术 |
| `references/phase1_guide.md` | 规划师完整 prompt 模板与 plan.json 格式 |
| `references/phase2_guide.md` | 子Agent完整 prompt 模板(含表格格式警告) |
| `references/table_format_guide.md` | Markdown表格格式规范、常见错误与示例 |
| `references/bug_fix_guide.md` | Bug排查与强制重建操作步骤 |
FILE:integrate_report.py
"""
integrate_report.py - 整合报告生成器 v3
=========================================
本模块现已重构为 facade(兼容旧接口),实际逻辑在 src/ 包中:
from src.config import get_chapters_dir, load_plan, ...
from src.engine import generate_with_accurate_toc, batch_convert_txt_to_docx, ...
from src.cli import main
向后兼容:from integrate_report import CHAPTERS_DIR, load_plan, generate_with_accurate_toc, ...
CLI 入口:python integrate_report.py [命令]
"""
# ---- 委托给 src 包(向后兼容)----
from src.config import (
get_chapters_dir as get_chapters_dir,
get_output_dir as get_output_dir,
get_mermaid_cli as get_mermaid_cli,
load_config as load_config,
save_config as save_config,
load_plan as load_plan,
save_plan as save_plan,
make_default_plan as make_default_plan,
load_glossary as load_glossary,
generate_glossary as generate_glossary,
glossary_to_prompt_text as glossary_to_prompt_text,
load_reference as load_reference,
save_reference as save_reference,
load_progress as load_progress,
save_outline_snapshot as save_outline_snapshot,
save_batch_snapshot as save_batch_snapshot,
CHARS_PER_PAGE as CHARS_PER_PAGE,
_p as _p,
_load_paths as _load_paths,
)
from src.engine import (
compute_content_hash as compute_content_hash,
load_hashes as load_hashes,
save_hashes as save_hashes,
get_changed_chapters as get_changed_chapters,
process_mermaid_blocks as process_mermaid_blocks,
add_toc_entry as add_toc_entry,
md_to_paragraphs as md_to_paragraphs,
safe_parse_chapter as safe_parse_chapter,
parse_chapters as parse_chapters,
count_chars as count_chars,
check_cross_chapter_consistency as check_cross_chapter_consistency,
generate_final_doc as generate_final_doc,
generate_with_accurate_toc as generate_with_accurate_toc,
convert_single_chapter_inline as convert_single_chapter_inline,
batch_convert_txt_to_docx as batch_convert_txt_to_docx,
)
# ---- CLI 入口(兼容 python integrate_report.py 用法)----
if __name__ == '__main__':
from src.cli import main as _cli_main
_cli_main()
FILE:notify.py
"""
notify.py - 可配置通知渠道
===========================
支持渠道:log(默认)| feishu | openclaw-weixin
通过 config.json 的 notification_channel 字段配置。
用法:
from notify import notify
notify("报告生成完成!")
notify("警告信息", channel="feishu")
"""
import os, sys
# 通知渠道配置(从 config.json 读取)
_NOTIFY_CHANNEL = None
def _get_channel() -> str:
global _NOTIFY_CHANNEL
if _NOTIFY_CHANNEL is not None:
return _NOTIFY_CHANNEL
# 优先读环境变量,其次读 config.json
_NOTIFY_CHANNEL = os.environ.get('LOBAI_NOTIFY_CHANNEL', 'log')
try:
sys.path.insert(0, os.path.dirname(__file__))
from src.config import load_config
cfg = load_config()
if cfg.get('notification_channel'):
_NOTIFY_CHANNEL = cfg['notification_channel']
except Exception:
pass
return _NOTIFY_CHANNEL
def notify(message: str, channel: str = None) -> bool:
"""
发送通知。
channel=None 时使用配置默认值。
返回是否成功。
"""
ch = channel or _get_channel()
if ch == 'log':
print(f"[NOTIFY] {message}", flush=True)
return True
if ch == 'openclaw-weixin':
return _notify_openclaw_weixin(message)
if ch == 'feishu':
return _notify_feishu(message)
# 未知渠道,降级为 log
print(f"[NOTIFY][{ch}] {message}", flush=True)
return False
def _notify_openclaw_weixin(message: str) -> bool:
"""通过 OpenClaw 微信渠道发送(需要 OpenClaw 运行时)"""
try:
# 动态导入 openclaw 基础设施(运行时存在)
from openclaw_runtime import notify as oc_notify
oc_notify(message, channel='openclaw-weixin')
return True
except ImportError:
pass
except Exception:
pass
# 降级:打印到 stdout
print(f"[NOTIFY][weixin] {message}", flush=True)
return False
def _notify_feishu(message: str) -> bool:
"""通过飞书发送通知(需要飞书配置)"""
try:
from openclaw_runtime import notify as oc_notify
oc_notify(message, channel='feishu')
return True
except ImportError:
pass
except Exception:
pass
print(f"[NOTIFY][feishu] {message}", flush=True)
return False
def set_channel(channel: str) -> None:
"""运行时切换通知渠道"""
global _NOTIFY_CHANNEL
_NOTIFY_CHANNEL = channel
FILE:parallel_tracker.py
"""
parallel_tracker.py
===================
多子Agent并行撰写可视化追踪模块
工作原理:
1. 主Agent使用 sessions_spawn 并行启动多个子Agent
2. 每个子Agent启动后向 TRACKER_FILE 写入自己的状态
3. 主Agent周期性地读取 TRACKER_FILE 并渲染可视化表格
使用方式:
from parallel_tracker import Tracker, update_chapter_status
# 子Agent端:启动时注册
tracker = Tracker()
tracker.register(seq="04", title="系统架构设计", batch="B")
tracker.update(seq="04", phase="writing", progress=50, note="撰写功能模块...")
# 子Agent端:完成后标记
tracker.update(seq="04", phase="done", progress=100)
"""
import json, os, time, sys, threading
from datetime import datetime
from typing import Dict, List, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
# ============ 路径配置(与 integrate_report.py 共用同一配置源)============
def _get_chapters_dir():
env_val = os.environ.get('LOBAI_CHAPTERS_DIR')
if env_val:
return env_val
return os.path.join(os.path.expanduser("~"), ".config", "lobsterai-report-agent", "chapters")
def _get_tracker_file():
return os.path.join(_get_chapters_dir(), 'writing_tracker.json')
# 惰性属性(支持 from parallel_tracker import CHAPTERS_DIR 写法)
_tracker_cache = None
def _load_tracker_paths():
global _tracker_cache
if _tracker_cache is None:
cd = _get_chapters_dir()
_tracker_cache = {
'CHAPTERS_DIR': cd,
'TRACKER_FILE': os.path.join(cd, 'writing_tracker.json'),
}
return _tracker_cache
def __getattr__(name):
_paths = _load_tracker_paths()
if name in _paths:
return _paths[name]
raise AttributeError(f"module has no attribute '{name}'")
# ============ 追踪器 ============
_GLOBAL_TRACKER: Optional['Tracker'] = None
_GLOBAL_LOCK = threading.Lock()
class Tracker:
"""多子Agent并行撰写状态追踪器(线程安全单例)"""
def __init__(self, tracker_file: str = None):
if tracker_file is None:
tracker_file = _get_tracker_file()
self.tracker_file = tracker_file
self._ensure_file()
@staticmethod
def get_instance(tracker_file: str = None) -> 'Tracker':
"""获取单例实例(线程安全)"""
global _GLOBAL_TRACKER
if _GLOBAL_TRACKER is None:
with _GLOBAL_LOCK:
if _GLOBAL_TRACKER is None:
_GLOBAL_TRACKER = Tracker(tracker_file)
return _GLOBAL_TRACKER
def _ensure_file(self):
if not os.path.exists(self.tracker_file):
self._write({})
def _read(self) -> Dict[str, Any]:
with _GLOBAL_LOCK:
try:
with open(self.tracker_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
def _write(self, data: Dict[str, Any]):
with _GLOBAL_LOCK:
with open(self.tracker_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def register(self, seq: str, title: str, batch: str = "", agent_id: str = ""):
"""子Agent启动时注册"""
data = self._read()
data[seq] = {
"seq": seq,
"title": title,
"batch": batch,
"agent_id": agent_id,
"phase": "registered", # registered | outline | writing | reviewing | done | error
"progress": 0,
"note": "已注册,等待启动...",
"started_at": datetime.now().strftime('%H:%M:%S'),
"updated_at": datetime.now().strftime('%H:%M:%S'),
}
self._write(data)
return self
def update(self, seq: str, phase: str, progress: int = None,
note: str = "", error: str = ""):
"""
更新子Agent撰写状态
phase: registered | outline | writing | reviewing | done | error
progress: 0-100
"""
data = self._read()
if seq not in data:
# 未注册,自动注册
data[seq] = {"seq": seq, "title": seq, "batch": ""}
entry = data[seq]
entry["phase"] = phase
if progress is not None:
entry["progress"] = progress
if note:
entry["note"] = note
if error:
entry["error"] = error
entry["updated_at"] = datetime.now().strftime('%H:%M:%S')
self._write(data)
return self
def mark_done(self, seq: str, note: str = "已完成"):
return self.update(seq, phase="done", progress=100, note=note)
def mark_error(self, seq: str, error: str):
return self.update(seq, phase="error", note="出错", error=error)
def get_status(self) -> Dict[str, Any]:
return self._read()
def clear(self):
"""清空追踪状态(每批次开始前调用)"""
self._write({})
def get_summary(self) -> Dict[str, int]:
data = self._read()
phases = {}
for entry in data.values():
p = entry.get("phase", "unknown")
phases[p] = phases.get(p, 0) + 1
return phases
# ============ 可视化渲染 ============
TRACKER_FILE_FOR_PRINT = _get_tracker_file() # 模块级引用(惰性)
def _progress_bar(progress: int, width: int = 12) -> str:
"""渲染进度条:▓░░░░░░░░░░"""
filled = int(width * progress / 100)
return '▓' * filled + '░' * (width - filled)
def _phase_emoji(phase: str) -> str:
emoji_map = {
"registered": "⏳",
"outline": "📋",
"writing": "✍️",
"reviewing": "🔍",
"done": "✅",
"error": "❌",
}
return emoji_map.get(phase, "⚪")
def render_progress_table(tracker_file: str = None) -> str:
if tracker_file is None:
tracker_file = _get_tracker_file()
"""
渲染当前并行撰写状态表格
返回格式:
╔══════════════════════════════════════════════════════════════╗
║ 📊 多子Agent并行撰写进度监控 ║
╠══════════════════════════════════════════════════════════════╣
║ 04 系统架构设计 ✍️ writing ▓▓▓▓▓▓░░░░ 50% 撰写功能模块... ║
║ 05 技术路线 ✍️ writing ▓▓▓░░░░░░░ 25% 撰写技术选型... ║
║ 06 功能模块设计 ⏳ registered ───────── 0% 等待启动... ║
╚══════════════════════════════════════════════════════════════╝
"""
try:
with open(tracker_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception:
return "(追踪文件暂不可用)"
if not data:
return "(暂无并行撰写任务)"
# 按 seq 排序
sorted_entries = sorted(data.values(), key=lambda x: x.get('seq', '0'))
# 计算全局进度
total = len(sorted_entries)
done = sum(1 for e in sorted_entries if e.get('phase') == 'done')
errors = sum(1 for e in sorted_entries if e.get('phase') == 'error')
overall_pct = int((done / total * 100)) if total > 0 else 0
header = (
f"╔══════════════════════════════════════════════════════════════╗\n"
f"║ 📊 多子Agent并行撰写进度监控 [{done}/{total} 完成"
f"{' ❌' + str(errors) if errors > 0 else ''}] 总体 {overall_pct}% ║\n"
f"╠══════════════════════════════════════════════════════════════╣"
)
footer = "╚══════════════════════════════════════════════════════════════╝"
rows = []
for entry in sorted_entries:
seq = entry.get('seq', '??').rjust(2)
title = entry.get('title', '')[:14].ljust(14)
phase_icon = _phase_emoji(entry.get('phase', ''))
phase_name = entry.get('phase', '').rjust(10)
progress = entry.get('progress', 0)
bar = _progress_bar(progress)
pct = str(progress).rjust(3) + '%'
note = (entry.get('note', '') or '').strip()[:20].ljust(20)
batch = entry.get('batch', '')
batch_str = f"[{batch}] " if batch else " "
row = f"║ {seq} {batch_str}{title} {phase_icon} {phase_name} {bar} {pct} {note} ║"
rows.append(row)
return '\n'.join([header] + rows + [footer])
def print_progress(tracker_file: str = None):
"""打印进度表格到标准输出(供 exec 调用)"""
if tracker_file is None:
tracker_file = _get_tracker_file()
print(render_progress_table(tracker_file), flush=True)
# ============ 轮询监控器 ============
class ProgressMonitor:
"""
定期轮询 tracker 文件并打印进度的监控器
用于在子Agent并行撰写时,主session展示实时进度
"""
def __init__(self, tracker_file: str = None, interval_sec: float = 8.0):
if tracker_file is None:
tracker_file = _get_tracker_file()
self.tracker_file = tracker_file
self.interval_sec = interval_sec
self._running = False
def start(self, duration_sec: float = None):
"""
启动监控循环
duration_sec: 监控持续秒数,None表示直到所有任务完成
"""
self._running = True
import time
start = time.time()
last_seen_done = set()
print(f"[MONITOR] 启动进度监控(间隔{self.interval_sec}秒)", flush=True)
while self._running:
try:
with open(self.tracker_file, 'r', encoding='utf-8') as f:
data = json.load(f)
entries = list(data.values())
if not entries:
time.sleep(self.interval_sec)
continue
# 检查是否全部完成
done_seqs = {e['seq'] for e in entries if e.get('phase') == 'done'}
error_seqs = {e['seq'] for e in entries if e.get('phase') == 'error'}
# 打印进度
os.system('cls' if os.name == 'nt' else 'clear')
print(render_progress_table(self.tracker_file), flush=True)
# 新完成任务时提示
new_done = done_seqs - last_seen_done
if new_done:
print(f"\n✅ 新完成:第 {[e['seq'] for e in entries if e['seq'] in new_done]} 章", flush=True)
last_seen_done = done_seqs
# 检查是否全部结束
all_done = len(done_seqs) + len(error_seqs) == len(entries)
if all_done:
print(f"\n[MONITOR] 所有章节撰写完成!", flush=True)
break
# 检查超时
if duration_sec and (time.time() - start) >= duration_sec:
print(f"\n[MONITOR] 监控超时({duration_sec}秒)", flush=True)
break
time.sleep(self.interval_sec)
except Exception as e:
print(f"[MONITOR] 轮询异常: {e}", flush=True)
time.sleep(self.interval_sec)
def stop(self):
self._running = False
# ============ 子Agent端辅助函数 ============
def get_tracker() -> Tracker:
"""获取Tracker单例(子Agent端推荐使用)"""
return Tracker.get_instance()
def chapter_register(seq: str, title: str, batch: str = ""):
"""子Agent启动时调用:注册章节撰写任务"""
Tracker().register(seq=seq, title=title, batch=batch)
def chapter_update(seq: str, phase: str, progress: int = None, note: str = ""):
"""子Agent撰写过程中调用:更新进度"""
Tracker().update(seq=seq, phase=phase, progress=progress, note=note)
def chapter_done(seq: str, note: str = "已完成"):
"""子Agent完成时调用:标记完成"""
Tracker().mark_done(seq=seq, note=note)
def chapter_error(seq: str, error: str):
"""子Agent出错时调用:标记错误"""
Tracker().mark_error(seq=seq, error=error)
# ============ CLI 入口 ============
if __name__ == '__main__':
if len(sys.argv) >= 2:
cmd = sys.argv[1]
tracker = Tracker()
if cmd == 'show' or len(sys.argv) == 2:
print(render_progress_table())
elif cmd == 'clear':
tracker.clear()
print("追踪状态已清空")
elif cmd == 'status':
summary = tracker.get_summary()
print(f"当前状态: {summary}")
total = sum(summary.values())
done = summary.get('done', 0)
print(f"进度: {done}/{total} 完成")
elif cmd == 'wait':
# 阻塞监控模式
import time
print("开始监控... Ctrl+C 停止")
try:
while True:
os.system('cls' if os.name == 'nt' else 'clear')
print(render_progress_table())
time.sleep(8)
except KeyboardInterrupt:
print("\n监控已停止")
elif cmd == 'register' and len(sys.argv) >= 4:
_, _, seq, title, *rest = sys.argv
batch = rest[0] if rest else ""
tracker.register(seq, title, batch)
print(f"已注册:第{seq}章 {title} [{batch}]")
elif cmd == 'update' and len(sys.argv) >= 4:
_, _, seq, phase, *rest = sys.argv
progress = int(restr[0]) if rest and rest[0].isdigit() else None
note = rest[1] if len(restr := rest) > 1 else ""
tracker.update(seq, phase, progress, note)
print(f"已更新:第{seq}章 {phase} {progress or ''}% {note}")
elif cmd == 'done' and len(sys.argv) >= 3:
seq = sys.argv[2]
tracker.mark_done(seq)
print(f"已标记完成:第{seq}章")
else:
print(render_progress_table())
FILE:README.md
# lobsterai-report-agent
超长可行性研究报告多 Agent 协作撰写系统。
多子 Agent 并行创作 + 自动编排流程 + 精美 docx 输出,适合 10+ 章节的超长篇报告。
---
## 功能特性
- **多 Agent 并行**:最多 5 个子 Agent 并发撰写,效率翻倍
- **增量更新**:内容未变化的章节跳过重写
- **精美排版**:封面、表格式目录、彩色章节标题、重点标注盒、美化表格
- **飞书 RAG**:自动检索飞书知识库补充参考资料
- **6 种封面风格**:经典政务 / 现代简约 / 商务典雅 / 科技数字 / 中式传统 / 全屏沉浸
- **跨章一致性审查**:自动检查数量指标冲突和术语不统一
---
## 目录结构
```
lobsterai-report-agent/
├── SKILL.md # OpenClaw Skill 说明文档
├── README.md # 本文件
├── LICENSE # MIT License
├── requirements.txt # Python 依赖
├── integrate_report.py # facade(兼容旧接口)+ CLI 入口
├── parallel_tracker.py # 并行进度追踪
├── notify.py # 可配置通知渠道
├── src/
│ ├── __init__.py # 包入口,导出公共 API
│ ├── config.py # 路径配置 + 文件 I/O
│ ├── engine.py # 核心业务逻辑(markdown → docx、术语表、审查等)
│ └── cli.py # CLI 入口
└── references/ # 子流程参考文档
├── phase0_guide.md # Phase 0 需求确认流程
├── phase1_guide.md # 规划师 prompt 模板
├── phase2_guide.md # 子 Agent prompt 模板
├── table_format_guide.md # Markdown 表格格式规范
└── bug_fix_guide.md # Bug 排查与强制重建
```
---
## 快速开始
### 安装依赖
```bash
pip install python-docx
```
(可选,Mermaid 图表渲染需要):
```bash
npm install -g @mermaid-js/mermaid-cli
```
### 配置工作目录
默认工作目录:`~/.config/lobsterai-report-agent/`
可通过环境变量覆盖:
```bash
# Linux / macOS
export LOBAI_CHAPTERS_DIR=/path/to/your/chapters
export LOBAI_OUTPUT_DIR=/path/to/output
# Windows
set LOBAI_CHAPTERS_DIR=D:\my_reports\chapters
set LOBAI_OUTPUT_DIR=D:\my_reports\output
```
或在工作目录放置 `config.json`:
```json
{
"project_name": "XX市医疗资产精细化管理方案",
"doc_type": "可行性研究报告",
"notification_channel": "log"
}
```
### CLI 用法
```bash
# 生成整合报告(全部章节 → docx)
python integrate_report.py
# 批量转换章节 txt → docx
python integrate_report.py convert-batch
# 单章转换
python integrate_report.py --convert-one 01-概述.txt 01-概述.docx
# 生成/更新术语表
python integrate_report.py glossary
# 跨章一致性审查
python integrate_report.py check
# 查看/清空参考资料
python integrate_report.py ref show
python integrate_report.py ref clear
```
### Python API 用法
```python
import integrate_report as ir
# 读取配置
plan = ir.load_plan()
chapters_dir = ir.get_chapters_dir()
# 生成整合报告
result = ir.generate_with_accurate_toc()
# 批量转换
ir.batch_convert_txt_to_docx()
# 一致性审查
issues = ir.check_cross_chapter_consistency(chapters_data)
```
---
## Pipeline 流程
```
Phase 0:需求确认(4 步)
↓
Phase 1:规划师生成大纲(plan.json)
↓
Phase 2:并行子 Agent 撰写(最多 5 并发)
↓
Phase 2.5:跨章一致性审查
↓
Phase 3:整合师汇总 → docx
```
详见 [SKILL.md](SKILL.md)
---
## 通知渠道配置
`notify.py` 支持可配置的推送渠道:
| 渠道 | 配置值 | 说明 |
|------|--------|------|
| 控制台(默认) | `log` | 打印到 stdout |
| 飞书 | `feishu` | 通过 OpenClaw 飞书插件推送 |
| OpenClaw 微信 | `openclaw-weixin` | 通过 OpenClaw 微信插件推送 |
配置方式(三选一):
1. `config.json` 中添加:`"notification_channel": "feishu"`
2. 环境变量:`set LOBAI_NOTIFY_CHANNEL=feishu`
3. 代码中调用:`notify.set_channel('feishu')`
---
## License
MIT License - 详见 [LICENSE](LICENSE) 文件
FILE:requirements.txt
python-docx>=0.8.11
FILE:src/cli.py
"""
cli.py - CLI 入口
====================
纯 CLI 逻辑,不含业务逻辑。
"""
import sys, glob as glob_module
from . import config
from . import engine
def main():
argv = sys.argv[1:]
if not argv:
# 默认:生成整合报告
txt_dir = argv[0] if argv else None
result = engine.generate_with_accurate_toc(txt_dir=txt_dir)
if result:
print(f"\n[DONE] 整合报告生成完成: {result}")
return
cmd = argv[0]
if cmd == '--convert-one':
if len(argv) != 3:
print("用法: python integrate_report.py --convert-one <in.txt> <out.docx>")
sys.exit(1)
engine.convert_single_chapter_inline(argv[1], argv[2])
print(f"saved: {argv[2]}", flush=True)
elif cmd == 'convert-batch':
txt_dir = argv[1] if len(argv) > 1 else None
engine.batch_convert_txt_to_docx(txt_dir=txt_dir)
elif cmd == 'glossary':
txt_files = sorted(glob_module.glob(config._p('CHAPTERS_DIR') + '/*.txt'))
ref_text = config.load_reference()
config.generate_glossary(txt_files, ref_text=ref_text)
elif cmd == 'check':
txt_files = sorted(glob_module.glob(config._p('CHAPTERS_DIR') + '/*.txt'))
chapters_data = engine.parse_chapters(txt_files)
issues = engine.check_cross_chapter_consistency(chapters_data)
if not issues:
print("[OK] 跨章一致性检查通过,无不一致项")
else:
for iss in issues:
print(f"[WARN] {iss}")
elif cmd == 'status':
prog = config.load_progress()
print(f"进度: {prog.get('completed',0)}/{prog.get('total','?')}")
if prog.get('current'):
print(f"状态: {prog['current']}")
elif cmd == 'ref':
if len(argv) >= 2:
action = argv[1]
if action == 'show':
ref = config.load_reference()
print(f"参考资料: {len(ref)} 字符")
print(ref[:500] if ref else "(空)")
elif action == 'clear':
config.save_reference("")
print("参考资料已清空")
else:
ref = config.load_reference()
print(f"当前参考资料: {len(ref)} 字符")
elif cmd == 'plan':
# 查看/编辑 plan.json
if len(argv) >= 2 and argv[1] == 'show':
import json
plan = config.load_plan()
print(json.dumps(plan, ensure_ascii=False, indent=2))
elif cmd == 'config':
# 查看/编辑 config.json
if len(argv) >= 2 and argv[1] == 'show':
import json
cfg = config.load_config()
print(json.dumps(cfg, ensure_ascii=False, indent=2))
else:
# 未知命令,降级为 generate
txt_dir = argv[0] if argv else None
result = engine.generate_with_accurate_toc(txt_dir=txt_dir)
if result:
print(f"\n[DONE] 整合报告生成完成: {result}")
if __name__ == '__main__':
main()
FILE:src/config.py
"""
config.py - 路径配置与文件 I/O
===================================
所有路径均通过环境变量或 ~/.config/ 动态决定,不在 import 时执行 subprocess。
"""
import os, json as json_module
from typing import Dict, Any
# ============ 路径配置(惰性,不在 import 时执行 subprocess)============
CHARS_PER_PAGE = 950 # 每页字符数估算值
def _get_config_value(key: str, default: str) -> str:
"""优先读环境变量,否则用 default"""
return os.environ.get(key) or default
def _default_chapters_dir() -> str:
return os.path.join(os.path.expanduser("~"), ".config", "lobsterai-report-agent", "chapters")
def _default_output_dir() -> str:
return os.path.join(os.path.expanduser("~"), ".config", "lobsterai-report-agent", "output")
def get_chapters_dir() -> str:
"""获取章节工作目录(可配置:环境变量 LOBAI_CHAPTERS_DIR)"""
return _get_config_value("LOBAI_CHAPTERS_DIR", _default_chapters_dir())
def get_output_dir() -> str:
"""获取最终输出目录(可配置:环境变量 LOBAI_OUTPUT_DIR)"""
return _get_config_value("LOBAI_OUTPUT_DIR", _default_output_dir())
# ---- 路径缓存(延迟到首次使用时)----
_paths_cache: Dict[str, str] = {}
def _load_paths() -> Dict[str, str]:
"""惰性构建所有路径,线程安全"""
global _paths_cache
if not _paths_cache:
cd = get_chapters_dir()
od = get_output_dir()
os.makedirs(cd, exist_ok=True)
os.makedirs(od, exist_ok=True)
_paths_cache.update({
'CHAPTERS_DIR': cd,
'OUTPUT_DIR': od,
'PLAN_FILE': os.path.join(cd, 'plan.json'),
'PROGRESS_FILE': os.path.join(cd, 'progress.json'),
'GLOSSARY_FILE': os.path.join(cd, 'glossary.json'),
'REFERENCE_FILE': os.path.join(cd, 'reference_material.txt'),
'OUTLINE_SNAPSHOT': os.path.join(cd, 'plan_outline_snapshot.md'),
'CONFIG_FILE': os.path.join(cd, 'config.json'),
'FINAL_DOC': os.path.join(od, '整合报告.docx'),
'HASH_FILE': os.path.join(cd, 'content_hashes.json'),
'MERMAID_TEMP': os.path.join(cd, 'mermaid_temp'),
'MERMAID_PUPPETEER_CONFIG': os.path.join(cd, 'mermaid_temp', 'puppeteer_config.json'),
})
return _paths_cache
def _p(key: str) -> str:
"""取路径常量的简写"""
return _load_paths()[key]
# ---- 兼容模块级属性的 __getattr__(支持 from config import CHAPTERS_DIR)----
def __getattr__(name: str):
_paths = _load_paths()
if name in _paths:
return _paths[name]
if name == 'CHARS_PER_PAGE':
return CHARS_PER_PAGE
raise AttributeError(f"module 'config' has no attribute '{name}'")
# ============ Mermaid CLI(惰性,不在 import 时执行)============
_mermaid_cli_cached: str = None
def get_mermaid_cli() -> str:
"""延迟查找 mmdc CLI,仅在首次需要渲染 mermaid 时调用"""
global _mermaid_cli_cached
if _mermaid_cli_cached is not None:
return _mermaid_cli_cached
import subprocess
local_cli = os.path.join(os.path.dirname(__file__), 'node_modules', '@mermaid-js', 'mermaid-cli', 'src', 'cli.js')
local_cli = os.path.normpath(local_cli)
candidates = [
('local', [local_cli, '--version']),
('local_node', ['node', local_cli, '--version']),
('mmdc', ['mmdc', '--version']),
('npx_mmdc', ['npx', 'mmdc', '--version']),
]
for name, cmd in candidates:
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
_mermaid_cli_cached = local_cli if name in ('local', 'local_node') else cmd[0]
return _mermaid_cli_cached
except Exception:
continue
_mermaid_cli_cached = None
return None
# ============ Config 文件读写 =============
def load_config() -> Dict[str, Any]:
try:
with open(_p('CONFIG_FILE'), 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
pass
return {
"project_name": "", "topic": "", "audience": "",
"doc_type": "可行性研究报告", "style": "专业严谨", "custom_constraints": ""
}
def save_config(cfg: Dict[str, Any]) -> None:
with open(_p('CONFIG_FILE'), 'w', encoding='utf-8') as f:
json_module.dump(cfg, f, ensure_ascii=False, indent=2)
# ============ Plan 文件读写 =============
def load_plan() -> Dict[str, Any]:
try:
with open(_p('PLAN_FILE'), 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
pass
return make_default_plan()
def make_default_plan() -> Dict[str, Any]:
return {"project_name": "", "chapters": []}
def save_plan(plan: Dict[str, Any]) -> None:
with open(_p('PLAN_FILE'), 'w', encoding='utf-8') as f:
json_module.dump(plan, f, ensure_ascii=False, indent=2)
# ============ Glossary 文件读写 =============
def generate_glossary(txt_files=None, ref_text: str = "", max_terms: int = 80) -> Dict[str, Any]:
"""从参考资料和章节内容中生成术语表"""
import re
all_terms: Dict[str, int] = {}
stopwords = {
'以及', '包括', '可以', '通过', '根据', '按照', '为了', '由于', '其中',
'其他', '相关', '以上', '以下', '对于', '并且', '或者', '等等',
'本项目', '本公司', '本系统', '本章', '本节', '本文', '本案',
'进行', '完成', '实现', '提供', '使用', '管理', '系统', '建设',
'方案', '项目', '数据', '平台', '技术', '功能', '模块'
}
def extract_from_text(text: str):
pattern = re.compile(r'[\u4e00-\u9fff]{4,}')
for w in pattern.findall(text):
if w not in stopwords and len(w) >= 4:
all_terms[w] = all_terms.get(w, 0) + 1
if ref_text:
extract_from_text(ref_text)
if txt_files:
for f in (txt_files or []):
try:
extract_from_text(open(f, 'r', encoding='utf-8').read())
except Exception:
continue
sorted_terms = sorted(all_terms.items(), key=lambda x: -x[1])[:max_terms]
glossary = {
"generated_at": _timestamp(),
"total_ref_chars": len(ref_text),
"terms": [{"term": t, "count": c} for t, c in sorted_terms]
}
with open(_p('GLOSSARY_FILE'), 'w', encoding='utf-8') as f:
json_module.dump(glossary, f, ensure_ascii=False, indent=2)
print(f"[GLOSSARY] 术语表已生成: {_p('GLOSSARY_FILE')}(共 {len(sorted_terms)} 个术语)")
return glossary
def load_glossary() -> Dict[str, Any]:
try:
with open(_p('GLOSSARY_FILE'), 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
return {"terms": []}
def glossary_to_prompt_text(glossary: Dict[str, Any], max_terms: int = 30) -> str:
terms = glossary.get('terms', [])
if not terms:
return "(术语表暂无数据,完成 Batch A 后自动生成)"
display = terms[:max_terms]
lines = [f"- {t['term']}(出现{t['count']}次)" for t in display]
suffix = f"\n(共 {len(terms)} 个术语,仅展示前 {max_terms} 个)" if len(terms) > max_terms else ""
return '\n'.join(lines) + suffix
# ============ Reference 文件读写 =============
def load_reference() -> str:
try:
with open(_p('REFERENCE_FILE'), 'r', encoding='utf-8') as f:
return f.read()
except Exception:
return ""
def save_reference(text: str) -> None:
with open(_p('REFERENCE_FILE'), 'w', encoding='utf-8') as f:
f.write(text)
print(f"[REF] 参考资料已保存,共 {len(text)} 字符")
# ============ Progress 文件读写 =============
def load_progress() -> Dict[str, Any]:
try:
with open(_p('PROGRESS_FILE'), 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
return {"total": 0, "completed": 0, "batches": [], "current": ""}
# ============ 大纲快照 =============
def save_outline_snapshot(plan: Dict[str, Any]) -> None:
from datetime import datetime
lines = [f"# 文档大纲快照({datetime.now().strftime('%Y-%m-%d %H:%M')})"]
lines.append(f"\n项目:{plan.get('project_name', '未知项目')}\n")
for ch in plan.get('chapters', []):
lines.append(
f"第{ch.get('seq','?')}章 | {ch.get('title','')} | "
f"Batch {ch.get('batch','')} | 约{ch.get('word_count',0)}字 | "
f"依赖:{ch.get('dependencies',[])}"
)
with open(_p('OUTLINE_SNAPSHOT'), 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
print(f"[SNAPSHOT] 大纲快照已保存: {_p('OUTLINE_SNAPSHOT')}")
def save_batch_snapshot(batch_label: str, batch_chapters) -> None:
from datetime import datetime
snapshot_file = f"{_p('CHAPTERS_DIR')}/snapshot_{batch_label}_{datetime.now().strftime('%Y%m%d_%H%M')}.md"
lines = [f"# {batch_label} 快照({datetime.now().strftime('%Y-%m-%d %H:%M')})"]
for seq, fname, title, content, _ in batch_chapters:
lines.append(f"\n---\n## 第{seq}章 {title}\n")
preview = content[:300].replace('\n', ' ').strip()
lines.append(f"[预览] {preview}...")
with open(snapshot_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
print(f"[SNAPSHOT] 批次快照已保存: {snapshot_file}")
# ============ 内部工具 =============
def _timestamp() -> str:
from datetime import datetime
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
FILE:src/engine.py
"""
engine.py - 核心业务逻辑
===========================
不含路径配置(由 config.py 提供),不含 CLI 入口(由 cli.py 提供)。
"""
import os, re, hashlib, glob, json as json_module
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Dict, List, Tuple, Optional, Any
from docx import Document
from docx.shared import Pt, Inches, Cm
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
from . import config
# ============ Hash(增量更新)============
def compute_content_hash(content: str) -> str:
normalized = re.sub(r'\s+', '', content.strip())
return hashlib.md5(normalized.encode('utf-8')).hexdigest()
def load_hashes() -> Dict[str, str]:
try:
with open(config._p('HASH_FILE'), 'r', encoding='utf-8') as f:
return json_module.load(f)
except Exception:
return {}
def save_hashes(hashes: Dict[str, str]) -> None:
with open(config._p('HASH_FILE'), 'w', encoding='utf-8') as f:
json_module.dump(hashes, f, ensure_ascii=False, indent=2)
def get_changed_chapters(chapters_data: List[Tuple], hashes: Dict[str, str]) -> List[Tuple]:
changed = []
for item in chapters_data:
seq, content = item[0], item[3]
if hashes.get(seq) != compute_content_hash(content):
changed.append(item)
return changed
# ============ Mermaid 图表渲染(惰性)============
def render_mermaid_image(code: str, out_path: str, cli: str = None) -> bool:
if cli is None:
cli = config.get_mermaid_cli()
if cli is None:
return False
os.makedirs(os.path.dirname(out_path), exist_ok=True)
tmp_input = os.path.join(config.get_chapters_dir(), '_mermaid_tmp.mmd')
try:
with open(tmp_input, 'w', encoding='utf-8') as f:
f.write(code)
if cli.endswith('.js'):
cmd = ['node', cli, '-i', tmp_input, '-o', out_path]
else:
cmd = cli.split() + ['-i', tmp_input, '-o', out_path]
puppeteer_cfg = config._p('MERMAID_PUPPETEER_CONFIG')
if os.path.exists(puppeteer_cfg):
cmd += ['-p', puppeteer_cfg]
import subprocess
subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return os.path.exists(out_path)
except Exception:
return False
finally:
if os.path.exists(tmp_input):
os.remove(tmp_input)
def process_mermaid_blocks(content: str) -> Tuple[str, List[str]]:
rendered_images = []
mermaid_blocks = list(re.finditer(r'```mermaid\n(.*?)```', content, re.DOTALL))
if not mermaid_blocks:
return content, []
processed = content
for m in reversed(mermaid_blocks):
code = m.group(1).strip()
block_idx = len(mermaid_blocks) - 1 - mermaid_blocks[::-1].index(m)
img_name = f'mermaid_{block_idx:03d}.png'
img_path = os.path.join(config._p('MERMAID_TEMP'), img_name)
cli = config.get_mermaid_cli()
success = bool(cli and render_mermaid_image(code, img_path, cli))
if success:
rendered_images.append(img_path)
replacement = f'\n[Mermaid图表已渲染,见附件: {img_name}]\n'
else:
replacement = (
f'\n```mermaid\n{code}\n```\n\n'
f'<!-- ⚠️ Mermaid图表(渲染工具mmdc未安装或渲染失败,'
f'请在支持Mermaid的编辑器中查看) -->\n'
)
processed = processed[:m.start()] + replacement + processed[m.end():]
return processed, rendered_images
# ============ Word TOC =============
NSMAP = (
'xmlns:wpc="http://schemas.microsoft.com/office/word/2010/wordprocessingCanvas" '
'xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" '
'xmlns:o="urn:schemas-microsoft-com:office:office" '
'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships" '
'xmlns:m="http://schemas.openxmlformats.org/officeDocument/2006/math" '
'xmlns:v="urn:schemas-microsoft-com:vml" '
'xmlns:wp14="http://schemas.microsoft.com/office/word/2010/wordprocessingDrawing" '
'xmlns:wp="http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing" '
'xmlns:w10="urn:schemas-microsoft-com:office:word" '
'xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main" '
'xmlns:w14="http://schemas.microsoft.com/office/word/2010/wordml" '
'xmlns:wpg="http://schemas.microsoft.com/office/word/2010/wordprocessingGroup" '
'xmlns:wpi="http://schemas.microsoft.com/office/word/2010/wordprocessingInk" '
'xmlns:wne="http://schemas.microsoft.com/office/word/2006/wordml" '
'xmlns:wps="http://schemas.microsoft.com/office/word/2010/wordprocessingShape" '
)
def _make_bookmark_start(bookmark_id: int, bookmark_name: str) -> OxmlElement:
el = OxmlElement('w:bookmarkStart')
el.set(qn('w:id'), str(bookmark_id))
el.set(qn('w:name'), bookmark_name)
return el
def _make_bookmark_end(bookmark_id: int) -> OxmlElement:
el = OxmlElement('w:bookmarkEnd')
el.set(qn('w:id'), str(bookmark_id))
return el
def add_toc_entry(doc, seq: str, title: str, page_num: int, toc_type: str = 'chapter'):
bm_id = 100 + hash(title) % 1000
p = doc.add_paragraph()
p.paragraph_format.line_spacing = Pt(22)
p.paragraph_format.space_after = Pt(4)
if toc_type == 'summary':
p.paragraph_format.first_line_indent = Cm(-0.74)
r = p.add_run(seq + ' ' + title)
r.font.size = Pt(12)
cjk(r, '宋体')
return
prefix = seq + ' '
p.paragraph_format.first_line_indent = Cm(-0.74)
r_prefix = p.add_run(prefix)
r_prefix.font.size = Pt(12)
cjk(r_prefix, '宋体')
bookmark_name = f'_Toc_{bm_id}'
run = p.add_run()
run.font.size = Pt(12)
cjk(run, '宋体')
# FORMTEXT 字段
fld_begin = OxmlElement('w:fldChar')
fld_begin.set(qn('w:fldCharType'), 'begin')
run._r.append(fld_begin)
instr = OxmlElement('w:instrText')
instr.text = ' FORMTEXT '
run._r.append(instr)
fld_end = OxmlElement('w:fldChar')
fld_end.set(qn('w:fldCharType'), 'end')
run._r.append(fld_end)
# Tab + PAGEREF
tab = OxmlElement('w:tab')
tab.set(qn('w:val'), 'right')
p._p.append(tab)
tab_char = OxmlElement('w:tabChar')
tab_char.set(qn('w:val'), 'right')
p._p.append(tab_char)
run_page = p.add_run()
run_page.font.size = Pt(12)
cjk(run_page, '宋体')
fld_begin2 = OxmlElement('w:fldChar')
fld_begin2.set(qn('w:fldCharType'), 'begin')
run_page._r.append(fld_begin2)
instr2 = OxmlElement('w:instrText')
instr2.text = f' PAGEREF {bookmark_name} \\h '
run_page._r.append(instr2)
fld_end2 = OxmlElement('w:fldChar')
fld_end2.set(qn('w:fldCharType'), 'end')
run_page._r.append(fld_end2)
p._p.insert(0, _make_bookmark_start(bm_id, bookmark_name))
p._p.append(_make_bookmark_end(bm_id))
# ============ 字体辅助 =============
def cjk(run, name: str) -> None:
r = run._element
rPr = r.get_or_add_rPr()
rFonts = rPr.find(qn('w:rFonts'))
if rFonts is None:
rFonts = OxmlElement('w:rFonts')
rPr.insert(0, rFonts)
rFonts.set(qn('w:eastAsia'), name)
# ============ Markdown 表格解析 =============
def _clean_inline(text: str) -> str:
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'`(.+?)`', r'\1', text)
return text.strip()
def _is_table_line(line: str) -> bool:
s = line.strip()
return s.startswith('|') and s.endswith('|')
def _is_separator_line(line: str) -> bool:
stripped = line.strip().strip('|')
return bool(re.match(r'^[\s\-:.|]+$', stripped))
def _parse_md_table(rows: List[str]) -> List[List[str]]:
result = []
for line in rows:
stripped = line.strip().strip('|')
cols = stripped.split('|')
result.append([_clean_inline(c.strip()) for c in cols])
return result
def _add_table_to_doc(doc, rows: List[List[str]]) -> None:
if not rows:
return
col_count = max(len(r) for r in rows)
col_count = max(col_count, 1)
tbl = doc.add_table(rows=len(rows), cols=col_count)
tbl.style = 'Table Grid'
for r_idx, row_data in enumerate(rows):
cells = tbl.rows[r_idx].cells
for c_idx in range(len(cells)):
text = row_data[c_idx] if c_idx < len(row_data) else ''
cells[c_idx].text = text
for para in cells[c_idx].paragraphs:
for run in para.runs:
run.font.name = '宋体'
run.font.size = Pt(10)
run._element.rPr.rFonts.set(qn('w:eastAsia'), '宋体')
para.paragraph_format.space_before = Pt(2)
para.paragraph_format.space_after = Pt(2)
def _flush_table(doc, pending_table: List[str]) -> None:
if pending_table:
_add_table_to_doc(doc, _parse_md_table(pending_table))
pending_table.clear()
# ============ Markdown → docx =============
def md_to_paragraphs(doc, content: str, add_page_break: bool = True) -> None:
processed_content, rendered_images = process_mermaid_blocks(content)
mermaid_img_iter = iter(rendered_images) if rendered_images else iter([])
lines = processed_content.split('\n')
pending_table: List[str] = []
i = 0
while i < len(lines):
line = lines[i].rstrip()
i += 1
if '[Mermaid图表已渲染,见附件:' in line:
img_path = next(mermaid_img_iter, None)
if img_path and os.path.exists(img_path):
_flush_table(doc, pending_table)
try:
p = doc.add_paragraph()
run = p.add_run()
run.add_picture(img_path, width=Inches(5.5))
except Exception:
p = doc.add_paragraph()
r = p.add_run(line + ' [图片渲染失败]')
r.font.size = Pt(10); cjk(r, '宋体')
continue
if not line.strip():
_flush_table(doc, pending_table)
continue
if line.startswith('# '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_before = Pt(12)
p.paragraph_format.space_after = Pt(10)
r = p.add_run(_clean_inline(line[2:]))
r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体')
continue
if line.startswith('## '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(10); p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line[3:]))
r.font.size = Pt(14); r.font.bold = True; cjk(r, '楷体')
continue
if line.startswith('### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(8); p.paragraph_format.space_after = Pt(4)
r = p.add_run(_clean_inline(line[4:]))
r.font.size = Pt(12); r.font.bold = True; cjk(r, '仿宋')
continue
if line.startswith('#### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.space_before = Pt(6); p.paragraph_format.space_after = Pt(3)
r = p.add_run(_clean_inline(line[5:]))
r.font.size = Pt(11); r.font.bold = True; cjk(r, '仿宋')
continue
if _is_table_line(line) and not _is_separator_line(line):
pending_table.append(line)
continue
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.first_line_indent = Cm(0.74)
p.paragraph_format.line_spacing = Pt(22)
p.paragraph_format.space_before = Pt(2)
p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line))
r.font.size = Pt(12); cjk(r, '宋体')
_flush_table(doc, pending_table)
if add_page_break:
doc.add_page_break()
# ============ 章节解析(错误隔离)============
def safe_parse_chapter(fpath: str) -> Optional[Tuple]:
fname = os.path.basename(fpath).replace('.txt', '')
seq = fname.split('-')[0]
try:
content = open(fpath, 'r', encoding='utf-8').read()
except Exception as e:
print(f"[ERROR] 读取失败 {fname}: {e}")
return None
h2_entries = [l[3:].strip() for l in content.split('\n') if l.strip().startswith('## ')]
title = fname
for line in content.split('\n'):
line = line.strip()
if line.startswith('# '):
title = line[2:].strip()
break
return (seq, fname, title, content, h2_entries)
def parse_chapters(txt_files: List[str]) -> List[Tuple]:
seen_seq = set()
chapters, errors = [], []
for f in txt_files:
seq = os.path.basename(f).replace('.txt', '').split('-')[0]
if seq in seen_seq:
continue
result = safe_parse_chapter(f)
if result is None:
errors.append(os.path.basename(f)); continue
seen_seq.add(seq); chapters.append(result)
if errors:
print(f"[WARN] 以下章节解析失败(已跳过): {errors}")
return chapters
def count_chars(text: str) -> int:
return len([c for c in text if c.strip()])
# ============ 跨章一致性审查 =============
def extract_quantities(text: str) -> Dict[str, str]:
qty = {}
pattern = re.compile(r'(\d+(?:\.\d+)?)\s*(万元|万元/年|万元\/年|人|人\/日|台|套|个|次|年|月|天|%)')
for m in pattern.finditer(text):
key = f"{m.group(1)}{m.group(2)}"
qty[key] = m.group(0)
return qty
def check_cross_chapter_consistency(chapters_data: List[Tuple]) -> List[Dict]:
issues = []
all_qty = [(seq, extract_quantities(content)) for seq, _, _, content, _ in chapters_data]
for i in range(len(all_qty) - 1):
seq_a, qty_a = all_qty[i]
seq_b, qty_b = all_qty[i + 1]
shared = set(qty_a) & set(qty_b)
for key in shared:
ma = re.match(r'^(\d+(?:\.\d+)?)', key)
mb = re.match(r'^(\d+(?:\.\d+)?)', key)
if ma and mb:
try:
if float(ma.group(1)) != float(mb.group(1)):
issues.append({
"seq_a": seq_a, "seq_b": seq_b,
"item": key, "value_a": qty_a[key], "value_b": qty_b[key]
})
except ValueError:
continue
return issues
# ============ 执行摘要 =============
def _build_summary(chapters_data, max_chars: int = 800) -> List[str]:
lines, total = [], 0
for _, _, _, content, _ in chapters_data:
para_lines = []
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('# ') or line.startswith('## ') or line.startswith('### '):
continue
para_lines.append(line)
if len(para_lines) >= 3:
break
if not para_lines:
continue
para_text = ''.join(para_lines[:2])
if total + len(para_text) > max_chars:
remaining = max_chars - total
if remaining > 50:
lines.append(para_text[:remaining] + '…')
break
lines.append(para_text)
total += len(para_text)
return lines or ['本报告对项目建设进行了全面可行性分析。']
# ============ 最终文档生成 =============
def generate_final_doc(chapters_data, page_estimates, output_path: str = None, incremental: bool = True):
if output_path is None:
output_path = config._p('FINAL_DOC')
plan = config.load_plan()
changed_chapters = chapters_data
if incremental:
hashes = load_hashes()
changed_chapters = get_changed_chapters(chapters_data, hashes)
unchanged = [item for item in chapters_data if item not in changed_chapters]
if unchanged and not changed_chapters:
print(f"[INCREMENTAL] 所有 {len(chapters_data)} 章内容未变化,跳过重写")
return None
elif unchanged:
print(f"[INCREMENTAL] {len(unchanged)} 章未变化,{len(changed_chapters)} 章需重写")
doc = Document()
s = doc.sections[0]
s.page_height = Inches(11.69); s.page_width = Inches(8.27)
s.top_margin = Inches(1.0); s.bottom_margin = Inches(1.0)
s.left_margin = Inches(1.18); s.right_margin = Inches(1.18)
# 封面
for _ in range(6): doc.add_paragraph()
for txt, size, bold, font in [
(plan.get('org_name', '编制单位'), Pt(26), True, '黑体'),
(plan.get('project_name', '项目名称'), Pt(32), True, '黑体'),
]:
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
r = p.add_run(txt); r.font.size = size; r.font.bold = bold; cjk(r, font)
for _ in range(3): doc.add_paragraph()
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
r = p.add_run(plan.get('doc_type', '可行性研究报告'))
r.font.size = Pt(22); cjk(r, '楷体')
for _ in range(8): doc.add_paragraph()
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
unit = plan.get('编制单位', '编制单位')
build_time = plan.get('编制时间', datetime.now().strftime('%Y年%m月'))
r = p.add_run(f'编制单位:{unit}\n编制时间:{build_time}')
r.font.size = Pt(14); cjk(r, '宋体')
doc.add_page_break()
# 执行摘要
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_before = Pt(12); p.paragraph_format.space_after = Pt(10)
r = p.add_run('执行摘要'); r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体')
for pt in _build_summary(changed_chapters if changed_chapters else chapters_data):
p2 = doc.add_paragraph()
p2.paragraph_format.first_line_indent = Cm(0.74)
p2.paragraph_format.line_spacing = Pt(22)
p2.paragraph_format.space_after = Pt(6)
r2 = p2.add_run(pt); r2.font.size = Pt(12); cjk(r2, '宋体')
doc.add_page_break()
# 目录
p = doc.add_paragraph(); p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_before = Pt(12); p.paragraph_format.space_after = Pt(10)
r = p.add_run('目 录'); r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体')
add_toc_entry(doc, '一', '执行摘要', 1, toc_type='summary')
seen = set()
for seq, _, title, content, _ in (changed_chapters if changed_chapters else chapters_data):
if seq in seen or not seq.isdigit():
continue
seen.add(seq)
start = page_estimates.get(seq, (1, 0, 1))[0]
add_toc_entry(doc, f'第{int(seq)}章', title, start, toc_type='chapter')
doc.add_page_break()
# 各章节
target = changed_chapters if changed_chapters else chapters_data
for seq, _, _, content, _ in target:
md_to_paragraphs(doc, content, add_page_break=True)
# 更新 hash
if incremental:
new_hashes = {item[0]: compute_content_hash(item[3]) for item in target}
old_hashes = load_hashes()
old_hashes.update(new_hashes)
save_hashes(old_hashes)
doc.save(output_path)
print(f"[DONE] 整合报告已保存: {output_path}")
return output_path
# ============ 整合报告主流程 =============
def generate_with_accurate_toc(txt_dir: str = None, final_doc: str = None):
if txt_dir is None:
txt_dir = config._p('CHAPTERS_DIR')
if final_doc is None:
final_doc = config._p('FINAL_DOC')
txt_files = sorted(glob.glob(os.path.join(txt_dir, '*.txt')))
if not txt_files:
print(f"[ERROR] 未找到章节文件: {txt_dir}/*.txt"); return None
chapters_data = parse_chapters(txt_files)
if not chapters_data:
print("[ERROR] 所有章节解析均失败"); return None
print(f"[PARSE] 解析 {len(chapters_data)} 个章节")
ref_text = config.load_reference()
config.generate_glossary(txt_files, ref_text=ref_text)
issues = check_cross_chapter_consistency(chapters_data)
if issues:
print(f"[CONSISTENCY] 发现 {len(issues)} 个潜在不一致:")
for iss in issues:
print(f" - {iss}")
else:
print("[CONSISTENCY] 跨章一致性检查通过 (OK)")
pe = {}
cur = 7
for seq, _, _, content, _ in chapters_data:
cc = count_chars(content)
ep = max(1, (cc + config.CHARS_PER_PAGE - 1) // config.CHARS_PER_PAGE)
pe[seq] = (cur, cc, ep); cur += ep
print("[BUILD] 生成整合报告...")
generate_final_doc(chapters_data, pe, output_path=final_doc)
md_path = final_doc.replace('.docx', '-纯文本.md')
with open(md_path, 'w', encoding='utf-8') as f:
f.write('\n\n---\n\n'.join(c for _, _, _, c, _ in chapters_data))
print(f"[MD] 纯文本版已保存: {md_path}")
return final_doc
# ============ 单章 docx 转换 =============
def convert_single_chapter_inline(txt_path: str, docx_path: str):
doc = Document()
s = doc.sections[0]
s.page_height = Inches(11.69); s.page_width = Inches(8.27)
s.top_margin = Inches(1.0); s.bottom_margin = Inches(1.0)
s.left_margin = Inches(1.18); s.right_margin = Inches(1.18)
lines = open(txt_path, 'r', encoding='utf-8').readlines()
pending_table: List[str] = []
for line in lines:
line = line.rstrip()
if not line.strip():
_flush_table(doc, pending_table); continue
if line.startswith('# '):
_flush_table(doc, pending_table)
p = doc.add_paragraph(); p.alignment = 1
p.paragraph_format.space_before = Pt(12); p.paragraph_format.space_after = Pt(10)
r = p.add_run(_clean_inline(line[2:])); r.font.size = Pt(18); r.font.bold = True; cjk(r, '黑体'); continue
if line.startswith('## '):
_flush_table(doc, pending_table)
p = doc.add_paragraph(); p.paragraph_format.space_before = Pt(10); p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line[3:])); r.font.size = Pt(14); r.font.bold = True; cjk(r, '楷体'); continue
if line.startswith('### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph(); p.paragraph_format.space_before = Pt(8); p.paragraph_format.space_after = Pt(4)
r = p.add_run(_clean_inline(line[4:])); r.font.size = Pt(12); r.font.bold = True; cjk(r, '仿宋'); continue
if line.startswith('#### '):
_flush_table(doc, pending_table)
p = doc.add_paragraph(); p.paragraph_format.space_before = Pt(6); p.paragraph_format.space_after = Pt(3)
r = p.add_run(_clean_inline(line[5:])); r.font.size = Pt(11); r.font.bold = True; cjk(r, '仿宋'); continue
if _is_table_line(line) and not _is_separator_line(line):
pending_table.append(line); continue
_flush_table(doc, pending_table)
p = doc.add_paragraph()
p.paragraph_format.first_line_indent = Cm(0.74)
p.paragraph_format.line_spacing = Pt(22)
p.paragraph_format.space_after = Pt(6)
r = p.add_run(_clean_inline(line)); r.font.size = Pt(12); cjk(r, '宋体')
_flush_table(doc, pending_table)
doc.save(docx_path)
return docx_path
def _convert_worker(args) -> Tuple[str, bool, str]:
txt_path, docx_path = args
try:
convert_single_chapter_inline(txt_path, docx_path)
return (docx_path, True, '')
except Exception as e:
return (txt_path, False, str(e))
# ============ 批量并行转换 =============
def batch_convert_txt_to_docx(txt_dir: str = None, max_concurrent: int = 8, incremental: bool = True):
if txt_dir is None:
txt_dir = config._p('CHAPTERS_DIR')
txt_files = sorted(glob.glob(os.path.join(txt_dir, '*.txt')))
if not txt_files:
print("[ERROR] 未找到 .txt 文件"); return []
hashes = load_hashes() if incremental else {}
jobs = []
for tf in txt_files:
docx_path = tf.replace('.txt', '.docx')
content_hash = compute_content_hash(open(tf, 'r', encoding='utf-8').read())
if incremental and os.path.exists(docx_path) and hashes.get(os.path.basename(tf)) == content_hash:
print(f" [SKIP] {os.path.basename(tf)} 内容未变化,跳过")
continue
jobs.append((tf, docx_path))
if not jobs:
print("[INFO] 所有章节已是最新(无变化),跳过转换"); return []
print(f"[BATCH] 待转换 {len(jobs)} 个章节,并发上限 {max_concurrent}")
completed, failed = [], []
with ProcessPoolExecutor(max_workers=max_concurrent) as executor:
futures = {executor.submit(_convert_worker, job): job for job in jobs}
for future in as_completed(futures):
docx_path, ok, err = future.result()
if ok:
txt_path = docx_path.replace('.docx', '.txt')
if os.path.exists(txt_path):
hashes[os.path.basename(txt_path)] = compute_content_hash(
open(txt_path, 'r', encoding='utf-8').read())
completed.append(docx_path); print(f" [OK] {os.path.basename(docx_path)}")
else:
failed.append((docx_path, err)); print(f" [FAIL] {os.path.basename(docx_path)}: {err}")
if incremental and completed:
save_hashes(hashes)
print(f"\n[BATCH] {len(completed)}/{len(jobs)} 成功,{len(failed)} 失败")
return completed
FILE:src/__init__.py
"""
lobsterai-report-agent - 超长可研报告多Agent协作撰写系统
========================================================
"""
from .config import (
get_chapters_dir,
get_output_dir,
get_mermaid_cli,
load_config,
save_config,
load_plan,
save_plan,
make_default_plan,
load_glossary,
generate_glossary,
glossary_to_prompt_text,
load_reference,
save_reference,
load_progress,
save_outline_snapshot,
save_batch_snapshot,
CHARS_PER_PAGE,
)
from .engine import (
compute_content_hash,
load_hashes,
save_hashes,
get_changed_chapters,
process_mermaid_blocks,
add_toc_entry,
md_to_paragraphs,
safe_parse_chapter,
parse_chapters,
count_chars,
check_cross_chapter_consistency,
generate_final_doc,
generate_with_accurate_toc,
convert_single_chapter_inline,
batch_convert_txt_to_docx,
)
from .cli import main as cli_main
__all__ = [
# config
'get_chapters_dir', 'get_output_dir', 'get_mermaid_cli',
'load_config', 'save_config', 'load_plan', 'save_plan', 'make_default_plan',
'load_glossary', 'generate_glossary', 'glossary_to_prompt_text',
'load_reference', 'save_reference', 'load_progress',
'save_outline_snapshot', 'save_batch_snapshot', 'CHARS_PER_PAGE',
# engine
'compute_content_hash', 'load_hashes', 'save_hashes', 'get_changed_chapters',
'process_mermaid_blocks', 'add_toc_entry', 'md_to_paragraphs',
'safe_parse_chapter', 'parse_chapters', 'count_chars',
'check_cross_chapter_consistency',
'generate_final_doc', 'generate_with_accurate_toc',
'convert_single_chapter_inline', 'batch_convert_txt_to_docx',
# cli
'cli_main',
]
FILE:references/bug_fix_guide.md
# Bug 排查与强制重建
## 已修复:表格渲染错乱
### 问题描述
生成的 docx 中表格只有1列,内容被逐字拆分;或列数远超预期(如4列表格变成60+列)。
### 根因
`integrate_report.py` 的 `_flush_table` 函数存在 bug:
```python
# ❌ Bug 代码(v3 早期版本)
def _flush_table(doc, pending_table):
if pending_table:
_add_table_to_doc(doc, pending_table) # ← 传入原始字符串列表!
pending_table.clear()
```
- `pending_table` 存的是 `['| 列1 | 列2 | 列3 |', ...]`(字符串列表)
- `_add_table_to_doc` 用 `max(len(r) for r in rows)` 计算列数
- 对字符串求 `len()` 得到字符数(17),不是单元格数(3)
- 结果:4列表格 → 63列 → 每个字符占一个单元格 → 彻底错乱
### 修复方案
```python
# ✅ 正确代码(当前版本)
def _flush_table(doc, pending_table):
if pending_table:
parsed_rows = _parse_md_table(pending_table) # ← 新增:先解析为二维数组
_add_table_to_doc(doc, parsed_rows) # ← 传入解析后的数组
pending_table.clear()
```
### 验证方法
```python
from docx import Document
doc = Document('F:/agent/整合报告.docx')
for t in doc.tables:
print(f'{len(t.rows)} rows x {len(t.columns)} cols')
# 正常列数:2~8 列
# 如果看到 15+、30+、60+ 列 → Bug 仍存在
```
---
## 已修复:封面风格比较类型错误
### 问题描述
`plan.json` 中 `cover_style` 为整数(如 `4`),但代码用字符串比较,导致封面始终走通用分支,精美封面不生效。
### 根因
```python
# ❌ Bug 代码
cover_style = plan.get('cover_style', '4')
if cover_style == '4': # 整数 4 != 字符串 '4',永远为 False
```
### 修复方案
```python
# ✅ 正确代码
cover_style = str(plan.get('cover_style', '4'))
if cover_style == '4': # 字符串比较,正常生效
```
---
## 已修复:RGB 颜色赋值错误
### 问题描述
使用 `eval(f'0x{hex_color}')` 赋值颜色,导致 `run.font.color.rgb` 收到整数而非 `RGBColor` 对象,报错崩溃。
### 根因
```python
# ❌ Bug 代码
run.font.color.rgb = eval(f'0x{H1_TEXT}') # eval('0xFFFFFF') → 16777215 (int)
# ValueError: rgb color value must be RGBColor object, got <class 'int'>
```
### 修复方案
```python
# ✅ 正确代码
from docx.shared import RGBColor # ← 必须导入
run.font.color.rgb = RGBColor.from_string(H1_TEXT)
```
---
## 已修复:增量缓存导致新代码不生效
### 问题描述
修改 `integrate_report.py` 核心逻辑后重新生成,增量模式跳过所有章节。
### 根因
Python 会缓存 `.pyc` 编译文件。修改 `.py` 后若不删除缓存,导入的仍是旧代码。同时 `content_hashes.json` 也导致跳过重写。
### 修复方案
每次修改代码后,两步都要做:
```bash
# 1. 删除 .pyc 缓存
del "C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\lobsterai-skill-zip-long-doc-agent\__pycache__\integrate_report.cpython-311.pyc"
# 2. 删除增量 hash
del F:\agent\chapters\content_hashes.json
# 3. 重新生成
python integrate_report.py
```
---
## 强制重建
### 操作步骤
```bash
# 1. 删除增量缓存和旧报告
del F:\agent\chapters\content_hashes.json
del F:\agent\整合报告.docx
# 2. 删除 .pyc 缓存(修改代码后必须)
del "C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\lobsterai-skill-zip-long-doc-agent\__pycache__\integrate_report.cpython-311.pyc"
# 3. 重新生成
cd "C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\lobsterai-skill-zip-long-doc-agent"
python integrate_report.py
```
---
---
## 新增:RGBColor 属性访问方式(python-docx 1.2.0)
### 问题描述
运行 `make_docx.py` 时报错:`AttributeError: 'RGBColor' object has no attribute 'red'`
### 根因
python-docx 1.2.0 的 `RGBColor` 对象不支持 `.red / .green / .blue` 属性访问,须用索引方式。
### 修复方案
```python
# ❌ 错误
'{:02X}{:02X}{:02X}'.format(rgb.red, rgb.green, rgb.blue)
# ✅ 正确
'{:02X}{:02X}{:02X}'.format(rgb[0], rgb[1], rgb[2])
```
---
## 新增:封面函数不得修改全局页边距
### 问题描述
`add_cover()` 中设置 `section.left_margin=0` 等代码会沿袭到封面之后的所有页面,导致正文文字撑满整页(无边距)。
### 根因
Word 的 Section 属性是跨页传递的,在封面设置的边距会影响整个文档。
### 修复方案
封面用全屏表格实现背景色即可,**不要**修改 section 的任何 margin 属性。正文边距在 `main()` 里一次性设置。
```python
# ❌ 错误
def add_cover(doc):
sec = doc.sections[0]
sec.left_margin=Inches(0) # ← 会影响所有页面!
...
# ✅ 正确:只放表格,不碰 section
def add_cover(doc):
tbl_ = doc.add_table(rows=1, cols=1)
cell = tbl_.rows[0].cells[0]
# 表格填满整页即可,不要动 section margin
```
---
## 新增:文件被占用时自动换名保存
### 问题描述
生成的 docx 文件如果已被 WPS/Word 打开,再次保存会报 `PermissionError`(Permission denied)。
### 修复方案
文件名加 `_v2` 后缀(自动递增),避免覆盖已打开文件。代码中:
```python
out_name = '医院人员定位管理系统_方案.docx'
out_path = os.path.join(out_dir, out_name)
if os.path.exists(out_path):
# 文件存在,加 v2/v3 ... 避免冲突
base, ext = os.path.splitext(out_name)
counter = 2
while os.path.exists(os.path.join(out_dir, f'{base}_v{counter}{ext}')):
counter += 1
out_name = f'{base}_v{counter}{ext}'
out_path = os.path.join(out_dir, out_name)
```
---
## 新增:write 工具有 50KB 行数限制,大脚本要分块写入
### 问题描述
用 `write` 工具写超过 ~50KB 或 ~2000 行的 Python 脚本时,内容会被截断(只写入了部分代码)。
### 根因
write 工具对单文件有大小限制。
### 修复方案
大脚本分两步写:
```python
# Step 1:写主文件(不包含结尾的 main() 调用)
with open('make_docx.py', 'w', encoding='utf-8') as f:
f.write(main_content) # 主体内容
# Step 2:追加结尾部分
closing = """
def main():
... # 结尾内容
if __name__ == '__main__':
main()
"""
with open('make_docx.py', 'a', encoding='utf-8') as f:
f.write(closing)
```
---
## 其他常见问题
### 症状:表格内容全是 `|`
`_parse_md_table` 未被调用。请确认 `_flush_table` 中包含 `parsed_rows = _parse_md_table(pending_table)`。
### 症状:增量模式跳过修改的章节
删除 `content_hashes.json` 即可强制全量重建。
### 症状:子Agent写入的txt包含乱码
子Agent输出时设置了错误的 encoding。请确保子Agent保存时使用 `encoding='utf-8'`。
FILE:references/phase0_guide.md
# Phase 0:需求确认完整流程
## 触发条件
用户首次提出撰写需求("我要写xxx"/"帮我写可研报告"/"写一份xxx报告")
## 四步确认流程
### Step 1 — 确认写作主题
```
请告诉我这份文档的核心信息:
1. 文档主题是什么?
(如:XX市人民医院医疗资产精细化管理解决方案)
2. 文档类型是什么?
(如:可行性研究报告/技术方案/商业计划书)
3. 主要读者是谁?
(如:医院管理层/上级主管部门/投资方)
4. 整体风格?
(如:专业严谨/简洁明了)
5. 有没有特殊要求或约束?
(如:必须包含预算章节/不超过10章等)
```
### Step 2 — 确认写作背景
```
请提供或描述本项目/主题的背景信息:
1. 项目背景是什么?
(如:医院资产管理现状、面临的问题)
2. 建设目标是什么?
(如:提升资产利用率、控制成本)
3. 有没有特定行业背景?
(如:国家政策、行业趋势)
```
### Step 3 — 提供参考资料(最重要)
```
请提供与本次写作相关的参考资料(至少提供一种):
A. 上传文件:发送本地文件路径或直接粘贴内容
B. 飞书文档:提供文档名称或链接(我会通过 RAG 检索)
C. 直接粘贴:将参考资料文本直接发给本助手
D. 暂不提供(跳过,使用通用背景知识撰写)
⚠️ 强烈建议提供参考资料!
参考材料越充分,内容与业务越贴合,输出质量越高。
参考材料将作为优先 RAG 知识源注入各章节撰写上下文。
```
### Step 4 — 大纲确认
规划师输出大纲后,展示给用户确认:
```
📋 规划大纲已生成,请确认以下章节结构:
项目:XX市人民医院医疗资产全生命周期精细化管理解决方案
类型:可行性研究报告 | 读者:医院管理层
章节大纲:
1. 第01章 项目概述(Batch A,约2500字)
2. 第02章 建设背景与必要性(Batch A,约3000字)
...
请确认:
A. 大纲OK,开始撰写
B. 需要调整大纲(请说明哪些章节需要修改/增删)
C. 取消本次撰写
```
## 参考资料保存
用户确认后,将参考资料保存到 `F:/agent/chapters/reference_material.txt`:
```python
with open('F:/agent/chapters/reference_material.txt', 'w', encoding='utf-8') as f:
f.write(reference_text)
```
FILE:references/phase1_guide.md
# Phase 1:规划师完整 prompt 模板
## 执行步骤
1. 加载 `F:/agent/chapters/reference_material.txt` 摘要(前3000字)作为 `reference_summary`
2. 将以下模板中 `{xxx}` 替换为实际值
3. 将输出写入 `F:/agent/chapters/plan.json`
## Prompt 模板
```
你是专业项目规划师。用户需要撰写一份【{doc_type}】,主题是「{topic}」。
## 用户提供的背景信息
{background}
## 参考资料摘要(优先参考)
{reference_summary}
请完成以下任务:
1. 制定详细的文档大纲(到三级标题)
2. 为每个章节标注核心撰写要点
3. 识别每个章节RAG检索关键词(≤3个/章)
4. 评估各章节的预期复杂度,标注重点章节
5. 识别章节依赖关系(哪些章节需在前序章节完成后才能撰写)
**章节依赖关系规则**:
- 第1类(无依赖,可最先写):概述、背景、现状分析、技术选型
- 第2类(依赖第1类):总体设计、详细功能设计
- 第3类(依赖前面若干章):实施计划、测试方案、部署方案
- 第4类(可独立写,也可最后写):培训方案、验收方案、附件、结论
## 参考资料禁止混入的内容
请在规划时主动排除与主题无关的内容(如输液监控系统等)。
请将以下结构化信息写入 F:/agent/chapters/plan.json:
{
"project_name": "项目名称",
"doc_type": "文档类型",
"chapters": [
{
"seq": "01",
"title": "章节标题",
"brief": "撰写要点",
"feishu_keywords": ["k1", "k2"],
"web_keywords": ["k1", "k2"],
"word_count": 3000,
"batch": "A",
"dependencies": [],
"status": "pending"
}
]
}
```
## plan.json 字段说明
| 字段 | 说明 |
|------|------|
| `seq` | 章节序号,2位数字字符串("01", "02") |
| `title` | 章节标题 |
| `brief` | 核心撰写要点(50-100字) |
| `feishu_keywords` | 飞书知识库检索关键词,最多3个 |
| `web_keywords` | 网络检索关键词,最多3个 |
| `word_count` | 目标字数(正文字数,不含标题) |
| `batch` | 批次标签("A"/"B"/"C",同批次可并行撰写) |
| `dependencies` | 依赖章节 seq 列表,如 `["01", "02"]` |
| `status` | 状态:`pending`/`writing`/`txt_done`/`confirmed` |
## 执行后操作
```bash
# 1. 生成初始术语表(从参考资料提取)
python integrate_report.py glossary
# 2. 保存大纲快照
python integrate_report.py save-outline
# 3. 展示大纲给用户确认
```
FILE:references/phase2_guide.md
# Phase 2:子Agent完整 prompt 模板
## 模板变量说明
| 变量 | 来源 |
|------|------|
| `{seq}` | plan.json 中该章的 seq 字段 |
| `{title}` | plan.json 中该章的 title 字段 |
| `{batch}` | plan.json 中该章的 batch 字段 |
| `{topic}` | 文档主题 |
| `{audience}` | 目标读者 |
| `{style}` | 整体风格 |
| `{word_count}` | plan.json 中该章的 word_count |
| `{reference_summary}` | 参考资料摘要(前3000字) |
| `{glossary_summary}` | glossary.json 术语摘要(前30条) |
| `{dependency_chapters}` | 依赖章节的标题列表 |
| `{chapter_brief}` | plan.json 中该章的 brief 字段 |
| `{feishu_keywords}` | plan.json 中该章的 feishu_keywords |
| `{web_keywords}` | plan.json 中该章的 web_keywords |
| `{index}` | 2位数字序号(01, 02...) |
| `{short_name}` | 章节短名(用于文件名) |
## 子Agent prompt(完整版)
```python
import sys
sys.path.insert(0, r'C:\Users\Administrator\AppData\Roaming\LobsterAI\SKILLs\lobsterai-skill-zip-long-doc-agent')
from parallel_tracker import chapter_register, chapter_update, chapter_done
chapter_register(seq='{seq}', title='{title}', batch='{batch}')
你是专业文档撰写专家,负责撰写可研报告的【{chapter_title}】章节。
## 基本信息
- 文档主题:{topic}
- 目标读者:{audience}
- 整体风格:{style}
- 本章字数目标:{word_count}字
## 参考资料(优先使用)
{reference_summary}
## 术语表参考(必须使用统一译名)
{glossary_summary}
## 依赖前提
本章依赖以下已完成章节的内容:
{dependency_chapters}
## 本章撰写要点
{chapter_brief}
## RAG检索(参考补充)
- 飞书知识库:关键词 {feishu_keywords}
- 网络检索(备选):关键词 {web_keywords}
## 撰写要求
1. 内容专业严谨,符合可研报告规范
2. 优先引用参考资料中的事实和数据
3. 术语使用必须与术语表一致
4. 字数:约{word_count}字
5. 输出格式:Markdown
## ⚠️ Markdown表格格式(必须遵守)
如需插入表格,必须严格遵循以下格式,否则docx转换后表格会错乱:
正确格式:
| 列1 | 列2 | 列3 |
|---|---|---|
| 内容1 | 内容2 | 内容3 |
要点:
- 分隔行格式必须是 `|---|---|---|`(首尾必须有`|`)
- 各行列数必须与表头一致,不一致会导致错位
- 单元格内容避免包含`|`(用`~`或`-`表示范围)
## 进度更新
每完成一个 ## 二级节标题后调用:
chapter_update(seq='{seq}', phase='writing', progress=30, note='撰写中...')
## 输出:仅生成纯文本.txt
完成撰写后:
1. 保存至 F:/agent/chapters/{index:02d}-{short_name}.txt
2. 调用 chapter_done(seq='{seq}', note='已完成')
3. 更新 plan.json 中本章状态为 'txt_done'
```
## 每批执行流程(主Agent侧,全自动)
```python
# 1. 展示大纲/当前状态(仅展示,不等待用户确认)
print(f"当前批次:Batch {label}")
print(f"待撰写章节:{chapters_list}")
print(f"预计并行数:{n}章")
# 2. 清空上批次追踪状态
from parallel_tracker import Tracker
Tracker().clear()
# 3. 并行启动子Agent(每批≤5并发,自动执行全部批次)
for subagent_task in batch_tasks:
sessions_spawn(
task=subagent_task,
runtime="subagent",
runTimeoutSeconds=300,
mode="run"
)
# 4. 后台监控进度(自动等待本批全部完成)
# python parallel_tracker.py wait
# 5. 本批全部完成后,自动进入下一批(无需用户确认)
# 若为最后一批,则自动执行:
from integrate_report import batch_convert_txt_to_docx
batch_convert_txt_to_docx(txt_dir='F:/agent/chapters', max_concurrent=8)
```
## 批次完成通知
每批完成后自动发送微信通知,无需人工干预。如某章需修改,可随时告知主Agent(支持小改动直接编辑 .txt,或大改动重新生成整章)。
FILE:references/table_format_guide.md
# Markdown 表格格式规范
子Agent在 `.txt` 文件中插入表格时必须严格遵循此规范。格式错误会导致 docx 转换时表格错乱。
## 正确格式
```
| 列1 | 列2 | 列3 |
|---|---|---|
| 内容1 | 内容2 | 内容3 |
| 内容4 | 内容5 | 内容6 |
```
## 六条关键规则
1. **分隔行必须包含首尾 `|`**
- ✅ 正确:`|---|---|---|`
- ❌ 错误:`---|---|---`(缺少首尾 `|`)
- ❌ 错误:`|---|:---|:---|`(省略分隔行首尾 `|`)
2. **所有行(包括数据行)必须首尾都有 `|`**
- 每行格式:`| 单元格1 | 单元格2 | 单元格3 |`
3. **各行列数必须与表头一致**
- 表头有4列,所有数据行也必须有4列
- 不一致会导致 docx 中列错位
4. **分隔行只允许 `-`、`:`、`|` 和空格**
- ✅ `|---|`、`| :--- |`(对齐标记)
- ❌ `|===|`(`=` 不允许)
- ❌ `|--|--|`(省略首尾 `|`)
5. **单元格内容禁止换行**
- 一个单元格的内容必须在一行内完成
6. **单元格内容避免包含 `|` 字符**
- 范围表示用 `~` 或 `-`:`25—45` 而非 `25|45`
- 如必须包含 `|`,需转义(不推荐)
## 常见错误示例
| 错误类型 | 错误写法 | 正确写法 |
|---------|---------|---------|
| 省略首尾 `\|` | `\|---\|---\|---` | `\|---\|---\|---\|` |
| 行列数不一致 | `\|A\|B\|C\|` 后接 `\|1\|2\|`(缺列) | `\|A\|B\|C\|` 后接 `\|1\|2\| \|` |
| 分隔行用 `=` | `\|====\|====\|` | `\|---\|---\|` |
| 单元格含 `\|` | `\|25\|45\|`(范围) | `\|25~45\|` |
## 单元格内容推荐符号
| 用途 | 推荐符号 | 示例 |
|------|---------|------|
| 数值范围 | `~` 或 `—` | `25~45`、`25—45` |
| 百分比 | `%` | `30%` |
| 等级 | `★`(不建议用 `\|`) | `★★★☆☆` |
| 说明/备注 | 直接填写 | `含设备维保服务` |
## 验证方法
生成 docx 后,用以下命令检查列数是否合理(正常 2~8 列):
```python
from docx import Document
doc = Document('F:/agent/整合报告.docx')
for t in doc.tables:
print(f'{len(t.rows)} rows x {len(t.columns)} cols')
# 列数 > 15 通常说明表格格式有误
```
## 为什么格式错误会导致错乱
`_add_table_to_doc` 内部用 `max(len(r) for r in rows)` 计算列数:
- 正确解析后:`rows[0] = ['列1', '列2', '列3']`,`len = 3`
- 传入原始字符串:`rows[0] = '| 列1 | 列2 | 列3 |'`,`len = 17`(字符数)
17列表格 vs 3列表格 → 每个字符占一个单元格 → 表格彻底错乱。
Convert UI screenshots/images into fully functional HTML/CSS copies. This skill is used when a user provides images of a website, application interface, dash...
--- name: img2html description: "Convert UI screenshots/images into fully functional HTML/CSS copies. This skill is used when a user provides images of a website, application interface, dashboard, or any UI design and wants to recreate it as working HTML code with accurate styles, layout, and visual details." license: Complete terms in LICENSE.txt --- This skill converts UI screenshots or design images into production-ready HTML/CSS code that accurately replicates the visual appearance and layout of the original. The user provides an image (screenshot, mockup, or design reference) showing a UI interface they want replicated. ## Analysis Process First, carefully analyze the image to understand: 1. **Layout Structure** - Identify sections, containers, and nesting hierarchy - Determine grid/flexbox patterns needed - Note spacing relationships and alignment 2. **Visual Elements** - Colors (hex/rgb values from the image) - Typography (font families, sizes, weights) - Icons and imagery - Borders, shadows, gradients, effects 3. **Component Types** - Headers, navigation, cards, buttons, inputs, tables, etc. - Interactive elements and their states - Data displays, labels, badges 4. **Responsive Behavior** - How the layout might adapt to different screen sizes - Mobile-first or desktop-first approach ## Implementation Guidelines **HTML Structure:** - Use semantic HTML5 elements (header, nav, main, section, article, footer) - Create logical nesting that matches the visual hierarchy - Use meaningful class names (BEM or similar convention) **CSS Styling:** - Replicate colors exactly using values from the image - Match typography (font family, size, weight, line-height, letter-spacing) - Recreate spacing (margins, padding, gaps) accurately - Implement visual effects (box-shadow, border-radius, gradients, backdrop-filter) - Use CSS variables for theming when appropriate **Layout Techniques:** - Flexbox for 1D layouts (rows or columns) - CSS Grid for 2D layouts - Position properties for overlays and absolute positioning - Transform for rotations and scaling **Interactive Elements:** - Add :hover, :focus states for buttons and links - Include transitions for smooth state changes - Consider adding basic JavaScript if the UI implies interactivity ## Output Format Provide a complete, self-contained HTML file with: - Proper DOCTYPE and HTML structure - Embedded CSS in `<style>` tags (or separate CSS file if complex) - All necessary meta tags for viewport/responsiveness - Comments explaining major sections if helpful If the design is complex, you may split into multiple files (HTML, CSS, JS). ## Accuracy Priorities 1. **Visual Fidelity**: Match the original image as closely as possible 2. **Proportions**: Maintain correct size ratios between elements 3. **Colors**: Use exact or nearest-match color values 4. **Typography**: Match font styles accurately 5. **Spacing**: Replicate padding, margins, and gaps faithfully ## Limitations to Communicate - Exact font matching may require web fonts (Google Fonts, etc.) - Some effects may be approximated if the exact technique isn't visible - Interactive behavior beyond hover states requires user specification - Dynamic content should be represented with placeholder data ## Multi-Image Handling When the user provides multiple images at once, process each image **individually and sequentially** to ensure maximum quality: **Sequential Processing Workflow:** 1. Process ONE image at a time through the complete conversion pipeline 2. For each image: - Analyze its unique layout, colors, typography, and components - Create a dedicated HTML/CSS replica - Verify visual fidelity before moving to the next 3. Maintain consistent naming conventions across all generated files 4. After processing all images, provide a summary of all converted files **Output Organization:** - Name files descriptively based on each image's content (e.g., `login-page.html`, `dashboard.html`) - If images represent different pages of the same site, maintain shared styles in a common CSS file when appropriate - If images are unrelated, create self-contained HTML files for each **Quality Assurance:** - Do NOT batch-process images together - each deserves full analysis attention - If images are related (e.g., different screens of the same app), note shared components and reuse CSS where sensible - Communicate progress to the user as you work through each image ## Example Workflow 1. User provides UI screenshot(s) 2. For each image: - Analyze layout, colors, typography, components - Create HTML structure matching the hierarchy - Apply CSS to replicate visual appearance - Add polish (hover states, transitions, responsive considerations) 3. Deliver complete HTML file(s) - one per input image (or split logically for multi-page designs) Remember: The goal is a pixel-perfect replica that functions as a real web page, not just a close approximation. Pay attention to details like shadows, borders, gradients, and spacing - these make the difference between "close" and "indistinguishable." FILE:LICENSE.txt MIT License Copyright (c) 2026 imgtohtml skill Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.