@clawhub-crw0149-793560b53c
structured academic paper analysis from local paper files or paper urls, adapted from a dify scheme a workflow. use when the user asks to analyze pdf/docx/te...
---
name: paper-analysis-evidence
description: structured academic paper analysis from local paper files or paper urls, adapted from a dify scheme a workflow. use when the user asks to analyze pdf/docx/text/html academic papers, extract title/task/background/problem/method/datasets/baselines/metrics/results/ablations/limitations/contributions, cite evidence spans, verify consistency against the original paper, or export paper analysis reports. supports chinese or english outputs and saves downloaded inputs, intermediate files, generated json, markdown, html, and docx reports under the ubuntu desktop.
---
# Paper Analysis Evidence
## Purpose
Run the Scheme A evidence-enhanced paper analysis workflow: prepare paper inputs, split the paper into key sections, generate structured extraction JSON, verify the extraction against the original text, and render final reports.
This skill is based on the uploaded Dify workflow `论文分析系统_方案A_结构化证据增强版`.
## Runtime file policy
Always save runtime downloads and generated outputs under the Ubuntu desktop unless the user explicitly requests another location:
```bash
~/Desktop/paper_analysis_results/<YYYYMMDD_HHMMSS>/
```
Do not modify the original local paper file. Copy it into the work directory before extraction. Download URL inputs into the same batch work directory.
## Inputs
Accept:
- `language`: `中文` or `英文`; default to `中文` when unspecified.
- `paper_files`: one or more local paper files, preferably PDF, DOCX, TXT, MD, or HTML.
- `paper_urls`: one or more PDF/direct paper URLs, comma-separated or repeated.
If both local files and URLs are empty, stop with this message:
```text
上传的文件和论文URL不能同时为空。
```
## Workflow
### 1. Prepare inputs and sections
Run:
```bash
python scripts/prepare_papers.py --language 中文 --files /path/to/paper.pdf --urls "https://example.com/paper.pdf"
```
Use only the relevant arguments. For URL-only runs, omit `--files`; for local-only runs, omit `--urls`.
The script creates `manifest.json` and one work directory per paper. It performs:
1. local file copy or URL download,
2. raw text extraction,
3. text cleaning,
4. section splitting into abstract, intro, method, experiment, conclusion, and `paper_body`,
5. prompt file generation.
### 2. Generate structured extraction JSON
For each paper in `manifest.json`, read:
```text
prompts/01_structured_extraction_prompt.md
```
Send that prompt to the model. Save the model response exactly as JSON-only content to:
```text
generated/structured_result.json
```
Required JSON fields:
```json
{
"title": "",
"task": "",
"background": "",
"problem_statement": "",
"method_name": "",
"method_core": "",
"datasets": [],
"baselines": [],
"metrics": [],
"main_results": [
{"dataset": "", "metric": "", "value": "", "baseline": "", "improvement": ""}
],
"ablations": [],
"limitations": [],
"claims": [],
"contributions": [],
"evidence_spans": [
{"field": "", "claim": "", "evidence": ""}
]
}
```
Extraction rules:
- Only use information present in, or directly inferable from, the paper.
- Prefer corresponding sections, but fall back to the full `paper_body` when a section is empty or insufficient.
- Do not leave `datasets`, `baselines`, or `metrics` empty just because the experiment section is weak; first check `paper_body`, result text, implementation details, and table-neighboring text.
- Use empty strings or arrays only when the full paper text truly lacks the information.
- Provide at least 6 evidence spans. Each evidence span must be a direct quote or a very close paraphrase from the source text.
- Prioritize numeric results from experiment, results, analysis, implementation details, or table-neighboring text.
- Keep JSON keys in English. Natural-language values must use the selected output language.
### 3. Run consistency verification
Open:
```text
prompts/02_verification_prompt_template.md
```
Replace `{{structured_json}}` with the actual content of `generated/structured_result.json`. Send the complete verification prompt to the model and save JSON-only output to:
```text
generated/verification_result.json
```
Required verification JSON:
```json
{
"overall_score": 0,
"hallucination_risk": "low/medium/high",
"issues": [
{"field": "", "problem": "", "severity": "low/medium/high"}
],
"verified_claims": [
{"claim": "", "status": "supported/weak/unsupported", "evidence": ""}
],
"final_verdict": ""
}
```
Verification rules:
- Score 5: nearly no hallucination, strong evidence.
- Score 4: minor imprecision.
- Score 3: several claims lack evidence.
- Score 2: clear inconsistency exists.
- Score 1: substantial hallucination or misreading.
- Focus on omitted or incorrect datasets, baselines, metrics, and main results.
- If the structured extraction uses an empty array/string for information that exists in the original paper, explicitly list that in `issues`.
- Provide at least 4 verified claims.
### 4. Render reports
After `structured_result.json` and `verification_result.json` are saved for every paper, run:
```bash
python scripts/render_report.py --manifest ~/Desktop/paper_analysis_results/<YYYYMMDD_HHMMSS>/manifest.json
```
Outputs per paper:
```text
report/final_report.md
report/final_report.html
report/final_report.docx
```
The `.md` file preserves editable Markdown source. The `.html` file is the rendered visual version. The `.docx` file is the Word-compatible report.
## Report structure
Chinese report sections:
1. 论文题目
2. 任务与问题
3. 方法概述
4. 实验要素:数据集、基线方法、评价指标
5. 主要结果
6. 贡献提炼
7. 消融与局限性
8. 证据片段
9. 一致性校验:总评分、幻觉风险、最终结论、已核验结论、发现的问题
English report sections mirror the same structure as `Paper Analysis`.
## References
- Use `references/prompt_templates.md` when prompt details are needed.
- Use `references/workflow_mapping.md` when checking how the Dify nodes map to this skill.
- `references/dify_scheme_a_source.yml` preserves the uploaded Dify DSL source for auditability.
FILE:scripts/prepare_papers.py
#!/usr/bin/env python3
"""Prepare paper inputs for the evidence-enhanced paper analysis workflow.
This script handles the deterministic parts of the Dify Scheme A workflow:
- validate that at least one local file or URL is provided
- copy local files and download URL inputs into a desktop work directory
- extract text from common document formats
- clean text
- split text into paper sections
- write prompt-ready files for LLM structured extraction and verification
"""
from __future__ import annotations
import argparse
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import time
import urllib.parse
import urllib.request
import zipfile
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Iterable
MIN_TEXT_LENGTH = 800
def desktop_root() -> Path:
return Path.home() / "Desktop" / "paper_analysis_results"
def timestamp() -> str:
return time.strftime("%Y%m%d_%H%M%S")
def sanitize_filename(name: str, fallback: str = "paper") -> str:
name = urllib.parse.unquote(name or "")
name = name.strip().replace("\\", "/").split("/")[-1]
name = re.sub(r"[^A-Za-z0-9._\-\u4e00-\u9fff]+", "_", name).strip("._")
return name or fallback
def split_csv(values: Iterable[str] | None) -> list[str]:
result: list[str] = []
for value in values or []:
for item in (value or "").split(","):
item = item.strip()
if item:
result.append(item)
return result
def safe_copy_file(src: Path, dest_dir: Path) -> Path:
if not src.exists() or not src.is_file():
raise FileNotFoundError(f"local paper file not found: {src}")
dest_dir.mkdir(parents=True, exist_ok=True)
dest = dest_dir / sanitize_filename(src.name)
if dest.exists():
stem, suffix = dest.stem, dest.suffix
i = 2
while True:
candidate = dest_dir / f"{stem}_{i}{suffix}"
if not candidate.exists():
dest = candidate
break
i += 1
shutil.copy2(src, dest)
return dest
def guess_download_name(url: str, content_type: str | None = None) -> str:
path_name = sanitize_filename(urllib.parse.urlparse(url).path, "paper")
if "." in path_name:
return path_name
if content_type:
ct = content_type.lower()
if "pdf" in ct:
return path_name + ".pdf"
if "word" in ct or "docx" in ct:
return path_name + ".docx"
if "html" in ct:
return path_name + ".html"
if "text" in ct:
return path_name + ".txt"
return path_name + ".pdf"
def download_url(url: str, dest_dir: Path, timeout: int = 60, max_retries: int = 2) -> Path:
dest_dir.mkdir(parents=True, exist_ok=True)
last_error: Exception | None = None
for attempt in range(max_retries + 1):
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0 paper-analysis-evidence"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
content_type = resp.headers.get("Content-Type")
name = guess_download_name(url, content_type)
dest = dest_dir / name
if dest.exists():
stem, suffix = dest.stem, dest.suffix
digest = hashlib.sha1(url.encode("utf-8")).hexdigest()[:8]
dest = dest_dir / f"{stem}_{digest}{suffix}"
with open(dest, "wb") as f:
shutil.copyfileobj(resp, f)
return dest
except Exception as exc: # pragma: no cover - depends on network
last_error = exc
if attempt < max_retries:
time.sleep(1.5 * (attempt + 1))
raise RuntimeError(f"failed to download {url}: {last_error}")
def extract_pdf_text(path: Path) -> str:
errors: list[str] = []
for module_name in ("pypdf", "PyPDF2"):
try:
module = __import__(module_name)
reader = module.PdfReader(str(path))
pages = []
for page in reader.pages:
pages.append(page.extract_text() or "")
text = "\n\n".join(pages).strip()
if text:
return text
except Exception as exc:
errors.append(f"{module_name}: {exc}")
try:
completed = subprocess.run(
["pdftotext", str(path), "-"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=120,
)
text = completed.stdout.strip()
if text:
return text
except Exception as exc:
errors.append(f"pdftotext: {exc}")
raise RuntimeError("could not extract PDF text. install pypdf or poppler-utils. " + "; ".join(errors))
def extract_docx_text(path: Path) -> str:
with zipfile.ZipFile(path) as zf:
xml = zf.read("word/document.xml").decode("utf-8", errors="ignore")
xml = re.sub(r"</w:p>", "\n", xml)
xml = re.sub(r"<[^>]+>", "", xml)
entities = {"&": "&", "<": "<", ">": ">", """: '"', "'": "'"}
for k, v in entities.items():
xml = xml.replace(k, v)
return xml.strip()
def extract_html_text(path: Path) -> str:
text = path.read_text(encoding="utf-8", errors="ignore")
text = re.sub(r"(?is)<script[^>]*>.*?</script>", " ", text)
text = re.sub(r"(?is)<style[^>]*>.*?</style>", " ", text)
text = re.sub(r"(?i)<br\s*/?>", "\n", text)
text = re.sub(r"(?i)</p>", "\n", text)
text = re.sub(r"<[^>]+>", " ", text)
text = text.replace(" ", " ").replace("&", "&").replace("<", "<").replace(">", ">")
return text.strip()
def extract_text(path: Path) -> str:
suffix = path.suffix.lower()
if suffix == ".pdf":
return extract_pdf_text(path)
if suffix == ".docx":
return extract_docx_text(path)
if suffix in {".txt", ".md", ".markdown"}:
return path.read_text(encoding="utf-8", errors="ignore")
if suffix in {".html", ".htm"}:
return extract_html_text(path)
try:
return path.read_text(encoding="utf-8", errors="ignore")
except Exception as exc:
raise RuntimeError(f"unsupported document format: {path.name}. Use PDF, DOCX, TXT, MD, or HTML. {exc}")
def clean_for_analysis(raw_text: str, min_length: int = MIN_TEXT_LENGTH) -> tuple[str, int, str]:
if not raw_text or not isinstance(raw_text, str):
return "", 0, "input text is empty or invalid"
text = raw_text.replace("\r\n", "\n").replace("\r", "\n")
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\nReferences[\s\S]*$", "", text, flags=re.I)
text = re.sub(r"\nBibliography[\s\S]*$", "", text, flags=re.I)
text = text.strip()
word_count = len(text)
if word_count < min_length:
return text, word_count, f"text is short ({word_count} chars); extraction may have failed"
return text, word_count, ""
def normalize_newlines(text: str) -> str:
text = (text or "").replace("\r\n", "\n").replace("\r", "\n")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def trim_references(text: str) -> str:
if not text:
return ""
patterns = [r"\n\s*(references|bibliography)\s*$", r"\n\s*参考文献\s*$"]
cut_positions = []
for p in patterns:
m = re.search(p, text, flags=re.I | re.M)
if m:
cut_positions.append(m.start())
if cut_positions:
text = text[:min(cut_positions)]
return text.strip()
def heading_patterns() -> dict[str, list[str]]:
return {
"abstract": [r"abstract", r"摘要"],
"intro": [r"introduction", r"intro", r"引言", r"绪论"],
"related": [r"related work", r"background", r"preliminar(?:y|ies)", r"相关工作", r"背景", r"预备知识"],
"method": [
r"method", r"methods", r"methodology", r"approach", r"approaches", r"framework", r"model", r"architecture",
r"proposed method", r"proposed approach", r"our method", r"our approach", r"方法", r"模型", r"框架", r"架构",
],
"experiment": [
r"experiment", r"experiments", r"experimental setup", r"evaluation", r"evaluations", r"results", r"analysis",
r"implementation details", r"empirical study", r"实验", r"实验设置", r"实验结果", r"评估", r"结果", r"分析", r"实现细节",
],
"conclusion": [r"conclusion", r"conclusions", r"discussion", r"limitations", r"future work", r"结论", r"讨论", r"局限性", r"未来工作"],
"tail": [r"acknowledg(?:e)?ments?", r"appendix", r"references", r"bibliography", r"致谢", r"附录", r"参考文献"],
}
def line_is_heading(line: str) -> bool:
s = line.strip()
return bool(s) and len(s) <= 120 and s.count(". ") < 2
def detect_heading_type(line: str, patterns: dict[str, list[str]]) -> str | None:
s = line.strip()
if not line_is_heading(s):
return None
normalized = re.sub(
r"^\s*(section\s+)?((\d+(\.\d+)*)|[ivxlcdm]+|第\s*\d+\s*[章节部分])[\.\:\-\s]*",
"",
s,
flags=re.I,
).strip()
for sec_type, pats in patterns.items():
for pat in pats:
if re.fullmatch(rf"{pat}", normalized, flags=re.I):
return sec_type
for sec_type, pats in patterns.items():
for pat in pats:
if re.search(rf"\b{pat}\b", normalized, flags=re.I):
return sec_type
return None
def collect_headings(text: str) -> list[dict[str, object]]:
patterns = heading_patterns()
headings: list[dict[str, object]] = []
offset = 0
for line in text.split("\n"):
sec_type = detect_heading_type(line, patterns)
if sec_type:
headings.append({"type": sec_type, "title": line.strip(), "start": offset})
offset += len(line) + 1
headings.sort(key=lambda x: int(x["start"]))
return headings
def get_section_by_heading(text: str, headings: list[dict[str, object]], target_type: str) -> str:
candidates = [h for h in headings if h["type"] == target_type]
if not candidates:
return ""
start = int(candidates[0]["start"])
later = [int(h["start"]) for h in headings if int(h["start"]) > start]
end = min(later) if later else len(text)
return text[start:end].strip()
def fallback_window(text: str, keywords: list[str], max_len: int = 10000, search_start: int = 0) -> str:
window_text = text[search_start:]
best: int | None = None
for kw in keywords:
m = re.search(kw, window_text, flags=re.I)
if m:
pos = search_start + m.start()
if best is None or pos < best:
best = pos
if best is None:
return ""
start = max(0, best - 150)
end = min(len(text), start + max_len)
return text[start:end].strip()
def normalize_section(section: str, max_len: int) -> str:
if not section:
return ""
section = section.strip()
section = re.sub(r"\n{3,}", "\n\n", section)
return section[:max_len]
def split_sections(cleaned_text: str) -> dict[str, str]:
text = trim_references(normalize_newlines(cleaned_text))
if not text:
return {
"abstract_section": "", "intro_section": "", "method_section": "",
"experiment_section": "", "conclusion_section": "", "paper_body": "",
}
headings = collect_headings(text)
abstract = get_section_by_heading(text, headings, "abstract")
if not abstract:
m = re.search(r"(^|\n)\s*(abstract|摘要)\s*\n", text, flags=re.I)
if m:
start = m.start()
next_heads = [int(h["start"]) for h in headings if int(h["start"]) > start]
end = min(next_heads) if next_heads else min(len(text), start + 6000)
abstract = text[start:end].strip()
else:
intro_heads = [int(h["start"]) for h in headings if h["type"] == "intro"]
if intro_heads and intro_heads[0] > 200:
abstract = text[:intro_heads[0]].strip()[:6000]
intro = get_section_by_heading(text, headings, "intro")
if not intro:
m = re.search(r"(introduction|引言|绪论)", text[:max(3000, len(text)//5)], flags=re.I)
if m:
start = max(0, m.start() - 100)
next_heads = [int(h["start"]) for h in headings if int(h["start"]) > start]
end = min(next_heads) if next_heads else min(len(text), start + 8000)
intro = text[start:end].strip()
method = get_section_by_heading(text, headings, "method")
if not method:
intro_heads = [int(h["start"]) for h in headings if h["type"] == "intro"]
method = fallback_window(text, [r"\bmethod\b", r"\bmethods\b", r"\bmethodology\b", r"\bapproach\b", r"\bframework\b", r"\barchitecture\b", r"\bmodel\b", r"方法", r"框架", r"架构", r"模型"], 12000, intro_heads[0] if intro_heads else 0)
experiment = get_section_by_heading(text, headings, "experiment")
if not experiment:
method_heads = [int(h["start"]) for h in headings if h["type"] == "method"]
experiment = fallback_window(text, [r"\bexperiments?\b", r"\bevaluation\b", r"\bresults\b", r"\banalysis\b", r"\bimplementation details\b", r"实验", r"评估", r"结果", r"分析", r"实现细节"], 12000, method_heads[0] if method_heads else 0)
conclusion = get_section_by_heading(text, headings, "conclusion")
if not conclusion:
conclusion = fallback_window(text, [r"\bconclusion\b", r"\bconclusions\b", r"\bdiscussion\b", r"\blimitations\b", r"\bfuture work\b", r"结论", r"讨论", r"局限", r"未来工作"], 6000, len(text)//2)
return {
"abstract_section": normalize_section(abstract, 6000),
"intro_section": normalize_section(intro, 8000),
"method_section": normalize_section(method, 12000),
"experiment_section": normalize_section(experiment, 12000),
"conclusion_section": normalize_section(conclusion, 6000),
"paper_body": text[:60000],
}
STRUCTURED_PROMPT_TEMPLATE = """你是一位严谨的论文信息抽取器。请严格基于给定论文内容,输出一个 JSON 对象,不要输出 Markdown、不要解释、不要加 ```json。
输出字段必须包含:
{{
"title": "",
"task": "",
"background": "",
"problem_statement": "",
"method_name": "",
"method_core": "",
"datasets": [],
"baselines": [],
"metrics": [],
"main_results": [
{{"dataset": "", "metric": "", "value": "", "baseline": "", "improvement": ""}}
],
"ablations": [],
"limitations": [],
"claims": [],
"contributions": [],
"evidence_spans": [
{{"field": "", "claim": "", "evidence": ""}}
]
}}
规则:
1. 只能写原文出现或可直接推出的信息,不要脑补。
2. 优先使用对应章节抽取信息;如果某章节为空或信息不足,必须从全文补充。
3. datasets、baselines、metrics 不允许因为章节为空而直接写空数组,必须先检查全文 paper_body、实验结果、实现细节、表格附近文本是否存在相关信息。
4. 只有在全文中也确实找不到时,才能写空字符串或空数组。
5. evidence_spans 至少给 6 条,每条都必须是原文中的直接证据片段或非常贴近原文的概括。
6. 数值结果优先来自实验部分、结果部分、分析部分或表格附近文本。
7. JSON 键名保持英文;JSON 中的自然语言内容使用 {language}。
论文摘要段:
{abstract_section}
引言段:
{intro_section}
方法段:
{method_section}
实验段:
{experiment_section}
结论段:
{conclusion_section}
全文:
{paper_body}
"""
VERIFICATION_PROMPT_TEMPLATE = """你是一位论文事实核验器。请对下面的结构化抽取结果做一致性校验,并输出 JSON 对象,不要输出 Markdown、不要解释。
输出格式:
{{
"overall_score": 0,
"hallucination_risk": "low/medium/high",
"issues": [
{{"field": "", "problem": "", "severity": "low/medium/high"}}
],
"verified_claims": [
{{"claim": "", "status": "supported/weak/unsupported", "evidence": ""}}
],
"final_verdict": ""
}}
评分规则:
- 5:几乎无幻觉,证据充分
- 4:少量不严谨
- 3:有若干缺证据表述
- 2:存在明显不一致
- 1:大量幻觉或错读
要求:
1. JSON 键名保持英文。
2. JSON 中的自然语言内容使用 {language}。
3. 重点检查 datasets、baselines、metrics、main_results 是否遗漏或误抽。
4. 若抽取结果将原文存在的信息写成空数组或空字符串,请在 issues 中明确指出。
5. verified_claims 至少给 4 条。
原文:
{paper_body}
待校验JSON:
{{structured_json}}
"""
@dataclass
class PaperRecord:
id: str
source_type: str
source: str
input_file: str
work_dir: str
raw_text_file: str
cleaned_text_file: str
sections_file: str
structured_prompt_file: str
verification_prompt_file: str
structured_json_file: str
verification_json_file: str
final_markdown_file: str
final_html_file: str
final_docx_file: str
language: str
word_count: int
warning: str = ""
def write_text(path: Path, text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(text, encoding="utf-8")
def prepare_one(input_path: Path, source_type: str, source: str, batch_dir: Path, language: str, index: int) -> PaperRecord:
paper_id = f"paper_{index:02d}_{sanitize_filename(input_path.stem, 'paper')}"
paper_dir = batch_dir / paper_id
for sub in ["input", "text", "sections", "prompts", "generated", "report"]:
(paper_dir / sub).mkdir(parents=True, exist_ok=True)
kept = safe_copy_file(input_path, paper_dir / "input") if input_path.parent != paper_dir / "input" else input_path
raw = extract_text(kept)
cleaned, word_count, warning = clean_for_analysis(raw)
sections = split_sections(cleaned)
raw_file = paper_dir / "text" / "raw_text.txt"
cleaned_file = paper_dir / "text" / "cleaned_text.txt"
sections_file = paper_dir / "sections" / "sections.json"
structured_prompt = paper_dir / "prompts" / "01_structured_extraction_prompt.md"
verification_prompt = paper_dir / "prompts" / "02_verification_prompt_template.md"
structured_json = paper_dir / "generated" / "structured_result.json"
verification_json = paper_dir / "generated" / "verification_result.json"
final_md = paper_dir / "report" / "final_report.md"
final_html = paper_dir / "report" / "final_report.html"
final_docx = paper_dir / "report" / "final_report.docx"
write_text(raw_file, raw)
write_text(cleaned_file, cleaned)
sections_file.write_text(json.dumps(sections, ensure_ascii=False, indent=2), encoding="utf-8")
write_text(structured_prompt, STRUCTURED_PROMPT_TEMPLATE.format(language=language, **sections))
write_text(verification_prompt, VERIFICATION_PROMPT_TEMPLATE.format(language=language, paper_body=sections["paper_body"]))
if not structured_json.exists():
write_text(structured_json, "{}\n")
if not verification_json.exists():
write_text(verification_json, "{}\n")
return PaperRecord(
id=paper_id,
source_type=source_type,
source=source,
input_file=str(kept),
work_dir=str(paper_dir),
raw_text_file=str(raw_file),
cleaned_text_file=str(cleaned_file),
sections_file=str(sections_file),
structured_prompt_file=str(structured_prompt),
verification_prompt_file=str(verification_prompt),
structured_json_file=str(structured_json),
verification_json_file=str(verification_json),
final_markdown_file=str(final_md),
final_html_file=str(final_html),
final_docx_file=str(final_docx),
language=language,
word_count=word_count,
warning=warning,
)
def main() -> int:
parser = argparse.ArgumentParser(description="Prepare paper files for evidence-enhanced analysis.")
parser.add_argument("--language", default="中文", choices=["中文", "英文"], help="Natural language for generated content.")
parser.add_argument("--files", nargs="*", default=[], help="Local paper files. PDF, DOCX, TXT, MD, HTML are supported.")
parser.add_argument("--urls", nargs="*", default=[], help="Paper PDF/direct URLs. Multiple comma-separated values are also accepted.")
parser.add_argument("--output-root", default=str(desktop_root()), help="Root output directory. Defaults to ~/Desktop/paper_analysis_results.")
args = parser.parse_args()
files = [Path(p).expanduser().resolve() for p in args.files if p]
urls = split_csv(args.urls)
if not files and not urls:
print("上传的文件和论文URL不能同时为空。", file=sys.stderr)
return 2
batch_dir = Path(args.output_root).expanduser() / timestamp()
downloads_dir = batch_dir / "downloads"
batch_dir.mkdir(parents=True, exist_ok=True)
records: list[PaperRecord] = []
index = 1
for file_path in files:
records.append(prepare_one(file_path, "local_file", str(file_path), batch_dir, args.language, index))
index += 1
for url in urls:
downloaded = download_url(url, downloads_dir)
records.append(prepare_one(downloaded, "url", url, batch_dir, args.language, index))
index += 1
manifest = {
"workflow": "paper-analysis-evidence",
"language": args.language,
"created_at": timestamp(),
"batch_dir": str(batch_dir),
"papers": [asdict(r) for r in records],
"next_steps": [
"For each paper, send prompts/01_structured_extraction_prompt.md to the model and save the JSON-only answer to generated/structured_result.json.",
"Then send prompts/02_verification_prompt_template.md plus the structured JSON to the model and save the JSON-only answer to generated/verification_result.json.",
"Run scripts/render_report.py with the manifest to create final_report.md, final_report.html, and final_report.docx.",
],
}
manifest_path = batch_dir / "manifest.json"
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps({"manifest": str(manifest_path), "batch_dir": str(batch_dir), "papers": [r.id for r in records]}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/render_report.py
#!/usr/bin/env python3
"""Render final reports for the evidence-enhanced paper analysis workflow.
Given structured extraction JSON and verification JSON, create Markdown, HTML,
and DOCX reports. This mirrors the Dify Scheme A aggregation node while adding
local file outputs that are easy to inspect on Ubuntu Desktop.
"""
from __future__ import annotations
import argparse
import html
import json
import os
import re
import sys
import zipfile
from pathlib import Path
from typing import Any
def parse_json_text(text: str) -> dict[str, Any]:
if not text:
return {}
s = text.strip()
s = re.sub(r"^```json\s*", "", s)
s = re.sub(r"^```", "", s)
s = re.sub(r"```$", "", s)
s = s.strip()
try:
data = json.loads(s)
return data if isinstance(data, dict) else {"raw_text": data}
except Exception:
match = re.search(r"\{[\s\S]*\}", s)
if match:
try:
data = json.loads(match.group(0))
return data if isinstance(data, dict) else {"raw_text": data}
except Exception:
return {"raw_text": s}
return {"raw_text": s}
def is_english(language: str) -> bool:
return (language or "").strip() == "英文"
def empty_text(language: str) -> str:
return "None" if is_english(language) else "无"
def clean_item(x: Any) -> str:
if isinstance(x, (dict, list)):
return json.dumps(x, ensure_ascii=False)
return str(x).strip()
def list_md(items: Any, language: str) -> str:
fallback = empty_text(language)
if not items:
return fallback
if isinstance(items, list):
lines = [f"- {clean_item(x)}" for x in items if clean_item(x)]
return "\n".join(lines) if lines else fallback
s = clean_item(items)
return s if s else fallback
def results_md(items: Any, language: str) -> str:
fallback = empty_text(language)
if not items or not isinstance(items, list):
return fallback
lines = []
en = is_english(language)
for it in items:
if isinstance(it, dict):
if en:
lines.append(f"- Dataset: {it.get('dataset','')}; Metric: {it.get('metric','')}; Value: {it.get('value','')}; Baseline: {it.get('baseline','')}; Improvement: {it.get('improvement','')}")
else:
lines.append(f"- 数据集:{it.get('dataset','')};指标:{it.get('metric','')};结果:{it.get('value','')};对比基线:{it.get('baseline','')};提升:{it.get('improvement','')}")
else:
s = clean_item(it)
if s:
lines.append(f"- {s}")
return "\n".join(lines) if lines else fallback
def evidence_md(items: Any, language: str) -> str:
fallback = empty_text(language)
if not items or not isinstance(items, list):
return fallback
en = is_english(language)
lines = []
for it in items[:8]:
if isinstance(it, dict):
if en:
lines.append(f"- Field: {it.get('field','')} | Claim: {it.get('claim','')} | Evidence: {it.get('evidence','')}")
else:
lines.append(f"- 字段:{it.get('field','')}|结论:{it.get('claim','')}|证据:{it.get('evidence','')}")
else:
s = clean_item(it)
if s:
lines.append(f"- {s}")
return "\n".join(lines) if lines else fallback
def verify_md(items: Any, language: str) -> str:
fallback = empty_text(language)
if not items or not isinstance(items, list):
return fallback
en = is_english(language)
lines = []
for it in items:
if isinstance(it, dict):
if en:
lines.append(f"- Claim: {it.get('claim','')} | Status: {it.get('status','')} | Evidence: {it.get('evidence','')}")
else:
lines.append(f"- {it.get('claim','')}|状态:{it.get('status','')}|证据:{it.get('evidence','')}")
else:
s = clean_item(it)
if s:
lines.append(f"- {s}")
return "\n".join(lines) if lines else fallback
def issues_md(items: Any, language: str) -> str:
fallback = empty_text(language)
if not items or not isinstance(items, list):
return fallback
en = is_english(language)
lines = []
for it in items:
if isinstance(it, dict):
if en:
lines.append(f"- Field: {it.get('field','')} | Problem: {it.get('problem','')} | Severity: {it.get('severity','')}")
else:
lines.append(f"- 字段:{it.get('field','')}|问题:{it.get('problem','')}|严重度:{it.get('severity','')}")
else:
s = clean_item(it)
if s:
lines.append(f"- {s}")
return "\n".join(lines) if lines else fallback
def build_markdown(structured_json: str, verification_json: str, language: str) -> str:
data = parse_json_text(structured_json)
verify = parse_json_text(verification_json)
en = is_english(language)
title = data.get("title", "Untitled" if en else "未识别题目")
if en:
return f"""# Paper Analysis
## Title
{title}
## Task and Problem
- Task: {data.get('task', '')}
- Background: {data.get('background', '')}
- Problem Statement: {data.get('problem_statement', '')}
## Method Overview
- Method Name: {data.get('method_name', '')}
- Method Core: {data.get('method_core', '')}
## Experimental Elements
### Datasets
{list_md(data.get('datasets', []), language)}
### Baselines
{list_md(data.get('baselines', []), language)}
### Metrics
{list_md(data.get('metrics', []), language)}
## Main Results
{results_md(data.get('main_results', []), language)}
## Contributions
{list_md(data.get('contributions', []), language)}
## Ablations and Limitations
### Ablations
{list_md(data.get('ablations', []), language)}
### Limitations
{list_md(data.get('limitations', []), language)}
## Evidence Spans
{evidence_md(data.get('evidence_spans', []), language)}
## Consistency Check
- Overall Score: {verify.get('overall_score', '')}
- Hallucination Risk: {verify.get('hallucination_risk', '')}
- Final Verdict: {verify.get('final_verdict', '')}
### Verified Claims
{verify_md(verify.get('verified_claims', []), language)}
### Issues
{issues_md(verify.get('issues', []), language)}
"""
return f"""# 论文分析结果
## 论文题目
{title}
## 任务与问题
- 研究任务:{data.get('task', '')}
- 背景:{data.get('background', '')}
- 问题定义:{data.get('problem_statement', '')}
## 方法概述
- 方法名称:{data.get('method_name', '')}
- 方法核心:{data.get('method_core', '')}
## 实验要素
### 数据集
{list_md(data.get('datasets', []), language)}
### 基线方法
{list_md(data.get('baselines', []), language)}
### 评价指标
{list_md(data.get('metrics', []), language)}
## 主要结果
{results_md(data.get('main_results', []), language)}
## 贡献提炼
{list_md(data.get('contributions', []), language)}
## 消融与局限性
### 消融实验
{list_md(data.get('ablations', []), language)}
### 局限性
{list_md(data.get('limitations', []), language)}
## 证据片段
{evidence_md(data.get('evidence_spans', []), language)}
## 一致性校验
- 总评分:{verify.get('overall_score', '')}
- 幻觉风险:{verify.get('hallucination_risk', '')}
- 最终结论:{verify.get('final_verdict', '')}
### 已核验结论
{verify_md(verify.get('verified_claims', []), language)}
### 发现的问题
{issues_md(verify.get('issues', []), language)}
"""
def markdown_to_html(markdown: str, title: str = "Paper Analysis") -> str:
try:
import markdown as markdown_lib # type: ignore
body = markdown_lib.markdown(markdown, extensions=["tables", "fenced_code", "sane_lists"])
except Exception:
body_lines: list[str] = []
in_ul = False
for raw in markdown.splitlines():
line = raw.rstrip()
if line.startswith("### "):
if in_ul:
body_lines.append("</ul>"); in_ul = False
body_lines.append(f"<h3>{html.escape(line[4:])}</h3>")
elif line.startswith("## "):
if in_ul:
body_lines.append("</ul>"); in_ul = False
body_lines.append(f"<h2>{html.escape(line[3:])}</h2>")
elif line.startswith("# "):
if in_ul:
body_lines.append("</ul>"); in_ul = False
body_lines.append(f"<h1>{html.escape(line[2:])}</h1>")
elif line.startswith("- "):
if not in_ul:
body_lines.append("<ul>"); in_ul = True
body_lines.append(f"<li>{html.escape(line[2:])}</li>")
elif not line.strip():
if in_ul:
body_lines.append("</ul>"); in_ul = False
else:
if in_ul:
body_lines.append("</ul>"); in_ul = False
body_lines.append(f"<p>{html.escape(line)}</p>")
if in_ul:
body_lines.append("</ul>")
body = "\n".join(body_lines)
return f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<title>{html.escape(title)}</title>
<style>
body {{ max-width: 920px; margin: 40px auto; padding: 0 24px; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; line-height: 1.65; color: #1f2937; }}
h1, h2, h3 {{ color: #111827; line-height: 1.25; }}
h1 {{ border-bottom: 2px solid #e5e7eb; padding-bottom: 12px; }}
h2 {{ margin-top: 32px; border-bottom: 1px solid #e5e7eb; padding-bottom: 6px; }}
li {{ margin: 6px 0; }}
code, pre {{ background: #f3f4f6; }}
</style>
</head>
<body>
{body}
</body>
</html>
"""
def write_docx_with_python_docx(markdown: str, output: Path) -> bool:
try:
from docx import Document # type: ignore
except Exception:
return False
doc = Document()
for raw in markdown.splitlines():
line = raw.rstrip()
if not line:
continue
if line.startswith("# "):
doc.add_heading(line[2:].strip(), level=1)
elif line.startswith("## "):
doc.add_heading(line[3:].strip(), level=2)
elif line.startswith("### "):
doc.add_heading(line[4:].strip(), level=3)
elif line.startswith("- "):
doc.add_paragraph(line[2:].strip(), style="List Bullet")
else:
doc.add_paragraph(line)
doc.save(output)
return True
def write_minimal_docx(markdown: str, output: Path) -> None:
def xml_escape(s: str) -> str:
return html.escape(s, quote=False)
paras = []
for raw in markdown.splitlines():
line = raw.strip()
if not line:
continue
if line.startswith("#"):
line = line.lstrip("#").strip()
elif line.startswith("- "):
line = "• " + line[2:].strip()
paras.append(f"<w:p><w:r><w:t>{xml_escape(line)}</w:t></w:r></w:p>")
document_xml = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<w:document xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main"><w:body>%s<w:sectPr><w:pgSz w:w="12240" w:h="15840"/><w:pgMar w:top="1440" w:right="1440" w:bottom="1440" w:left="1440"/></w:sectPr></w:body></w:document>""" % "".join(paras)
content_types = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types"><Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/><Default Extension="xml" ContentType="application/xml"/><Override PartName="/word/document.xml" ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document.main+xml"/></Types>"""
rels = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships"><Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="word/document.xml"/></Relationships>"""
output.parent.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(output, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr("[Content_Types].xml", content_types)
zf.writestr("_rels/.rels", rels)
zf.writestr("word/document.xml", document_xml)
def write_docx(markdown: str, output: Path) -> None:
output.parent.mkdir(parents=True, exist_ok=True)
if not write_docx_with_python_docx(markdown, output):
write_minimal_docx(markdown, output)
def render_one(structured_path: Path, verification_path: Path, language: str, output_md: Path, output_html: Path, output_docx: Path) -> dict[str, str]:
structured = structured_path.read_text(encoding="utf-8", errors="ignore")
verification = verification_path.read_text(encoding="utf-8", errors="ignore")
markdown = build_markdown(structured, verification, language)
output_md.parent.mkdir(parents=True, exist_ok=True)
output_md.write_text(markdown, encoding="utf-8")
output_html.write_text(markdown_to_html(markdown), encoding="utf-8")
write_docx(markdown, output_docx)
return {"markdown": str(output_md), "html": str(output_html), "docx": str(output_docx)}
def render_manifest(manifest_path: Path) -> list[dict[str, str]]:
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
outputs: list[dict[str, str]] = []
language = manifest.get("language", "中文")
for paper in manifest.get("papers", []):
result = render_one(
Path(paper["structured_json_file"]),
Path(paper["verification_json_file"]),
paper.get("language") or language,
Path(paper["final_markdown_file"]),
Path(paper["final_html_file"]),
Path(paper["final_docx_file"]),
)
outputs.append({"paper_id": paper.get("id", ""), **result})
return outputs
def main() -> int:
parser = argparse.ArgumentParser(description="Render paper analysis reports from generated JSON files.")
parser.add_argument("--manifest", help="Manifest created by prepare_papers.py. If set, per-paper paths are read from the manifest.")
parser.add_argument("--structured-json", help="Path to structured_result.json for single-paper mode.")
parser.add_argument("--verification-json", help="Path to verification_result.json for single-paper mode.")
parser.add_argument("--language", default="中文", choices=["中文", "英文"])
parser.add_argument("--output-md", help="Output Markdown path for single-paper mode.")
parser.add_argument("--output-html", help="Output rendered HTML path for single-paper mode.")
parser.add_argument("--output-docx", help="Output DOCX path for single-paper mode.")
args = parser.parse_args()
if args.manifest:
outputs = render_manifest(Path(args.manifest).expanduser())
print(json.dumps({"outputs": outputs}, ensure_ascii=False, indent=2))
return 0
required = [args.structured_json, args.verification_json, args.output_md, args.output_html, args.output_docx]
if not all(required):
parser.error("Either --manifest or all single-paper paths are required.")
result = render_one(
Path(args.structured_json).expanduser(),
Path(args.verification_json).expanduser(),
args.language,
Path(args.output_md).expanduser(),
Path(args.output_html).expanduser(),
Path(args.output_docx).expanduser(),
)
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:references/prompt_templates.md
# Prompt templates
These templates are adapted from the uploaded Dify DSL `论文分析系统_方案A_结构化证据增强版`.
## Structured extraction prompt
Use after `prepare_papers.py` has created section files. The script writes a filled prompt to `prompts/01_structured_extraction_prompt.md`; prefer the filled file over manually copying this template.
```text
你是一位严谨的论文信息抽取器。请严格基于给定论文内容,输出一个 JSON 对象,不要输出 Markdown、不要解释、不要加 ```json。
输出字段必须包含:
{
"title": "",
"task": "",
"background": "",
"problem_statement": "",
"method_name": "",
"method_core": "",
"datasets": [],
"baselines": [],
"metrics": [],
"main_results": [
{"dataset": "", "metric": "", "value": "", "baseline": "", "improvement": ""}
],
"ablations": [],
"limitations": [],
"claims": [],
"contributions": [],
"evidence_spans": [
{"field": "", "claim": "", "evidence": ""}
]
}
规则:
1. 只能写原文出现或可直接推出的信息,不要脑补。
2. 优先使用对应章节抽取信息;如果某章节为空或信息不足,必须从全文补充。
3. datasets、baselines、metrics 不允许因为章节为空而直接写空数组,必须先检查全文 paper_body、实验结果、实现细节、表格附近文本是否存在相关信息。
4. 只有在全文中也确实找不到时,才能写空字符串或空数组。
5. evidence_spans 至少给 6 条,每条都必须是原文中的直接证据片段或非常贴近原文的概括。
6. 数值结果优先来自实验部分、结果部分、分析部分或表格附近文本。
7. JSON 键名保持英文;JSON 中的自然语言内容使用用户选择的语言。
```
## Verification prompt
Use only after the structured JSON exists. The script writes a filled template to `prompts/02_verification_prompt_template.md`; paste the actual `structured_result.json` into the `{{structured_json}}` placeholder before sending.
```text
你是一位论文事实核验器。请对下面的结构化抽取结果做一致性校验,并输出 JSON 对象,不要输出 Markdown、不要解释。
输出格式:
{
"overall_score": 0,
"hallucination_risk": "low/medium/high",
"issues": [
{"field": "", "problem": "", "severity": "low/medium/high"}
],
"verified_claims": [
{"claim": "", "status": "supported/weak/unsupported", "evidence": ""}
],
"final_verdict": ""
}
评分规则:
- 5:几乎无幻觉,证据充分
- 4:少量不严谨
- 3:有若干缺证据表述
- 2:存在明显不一致
- 1:大量幻觉或错读
要求:
1. JSON 键名保持英文。
2. JSON 中的自然语言内容使用用户选择的语言。
3. 重点检查 datasets、baselines、metrics、main_results 是否遗漏或误抽。
4. 若抽取结果将原文存在的信息写成空数组或空字符串,请在 issues 中明确指出。
5. verified_claims 至少给 4 条。
```
FILE:references/workflow_mapping.md
# Dify Scheme A to OpenClaw Skill mapping
## Original Dify intent
The uploaded workflow is named `论文分析系统_方案A_结构化证据增强版`. It analyzes one or more uploaded papers and/or paper PDF URLs, extracts structured paper information, performs evidence-focused consistency verification, and exports a Markdown-to-DOCX report.
## Node mapping
| Dify node | Skill implementation |
|---|---|
| `if_empty` + `empty_error` | `prepare_papers.py` exits with `上传的文件和论文URL不能同时为空。` when no inputs are provided. |
| `split_urls` | `prepare_papers.py --urls` accepts repeated values or comma-separated URL lists. |
| `http_download` | `prepare_papers.py` downloads URLs to the batch `downloads/` directory under the Desktop output root. |
| `upload_doc_extract` / `download_doc_extract` | `prepare_papers.py` extracts text from copied local files or downloaded files. |
| `upload_clean` / `download_clean` | `prepare_papers.py` applies the same whitespace normalization and reference/bibliography trimming pattern. |
| `upload_sections` / `download_sections` | `prepare_papers.py` implements the same abstract, intro, method, experiment, conclusion, and `paper_body` section split. |
| `upload_structured` / `download_structured` | The Agent must send `prompts/01_structured_extraction_prompt.md` to the model and save JSON-only output to `generated/structured_result.json`. |
| `upload_verify` / `download_verify` | The Agent must send `prompts/02_verification_prompt_template.md` with actual structured JSON inserted and save JSON-only output to `generated/verification_result.json`. |
| `upload_render` / `download_render` | `render_report.py` aggregates structured and verification JSON into a final report. |
| `upload_docx` / `download_docx` | `render_report.py` writes `.docx`; it also writes `.md` and rendered `.html` for easier Markdown preview. |
## Output directory convention
Default runtime path:
```text
~/Desktop/paper_analysis_results/<YYYYMMDD_HHMMSS>/
```
Per-paper files are placed under:
```text
paper_XX_<filename>/
├── input/
├── text/raw_text.txt
├── text/cleaned_text.txt
├── sections/sections.json
├── prompts/01_structured_extraction_prompt.md
├── prompts/02_verification_prompt_template.md
├── generated/structured_result.json
├── generated/verification_result.json
└── report/
├── final_report.md
├── final_report.html
└── final_report.docx
```
FILE:agents/openai.yaml
interface:
display_name: "Paper Analysis Evidence"
short_description: "Extracts evidence-backed structured paper analysis and consistency checks."
icon: "📄"
color: "#FFEAD5"
download arxiv paper pdfs or accept local paper files with a preprocessing script, then extract text, clean text, and generate a summary version, detailed ve...
---
name: paper-summary-scripted
description: download arxiv paper pdfs or accept local paper files with a preprocessing script, then extract text, clean text, and generate a summary version, detailed version, contribution extraction, and a final consistency check in the user's requested language. use when the user wants the same dify-style four-stage paper summarization workflow with deterministic arxiv pdf download, text extraction, and text cleaning before generation.
---
# 带脚本的论文摘要生成
## Overview
Use this skill when arXiv paper URLs or local paper files need deterministic preprocessing before the four-stage paper summarization workflow runs.
The bundled script downloads arXiv PDFs to local storage when URLs are provided, then handles extraction and cleaning.
Do not parse paper web pages or use HTML content as the paper source.
After preprocessing, run three independent generation stages from the same cleaned paper text, then a fourth verification stage that evaluates all three generated outputs against the original text.
## Canonical inputs
Normalize the request into:
- `language`
- `paperurls` for arXiv inputs
- `paperfiles`
Treat empty string, `[]`, `null`, `None`, missing field, or blank list as empty.
## Workflow
1. If both `paperurls` and `paperfiles` are empty, return an error immediately.
2. Run the preprocessing script:
- `python scripts/process_papers.py --language "<language>" --paperurls '<paperurls>' --paperfiles '<paperfiles>' --output-dir ./runs/paper-summary`
3. Read `manifest.json` in the output directory.
4. For each successful item, read the `extracted_text_path` file and treat its contents as `cleaned_text`.
5. Generate these three sections separately from the same `cleaned_text`:
- summary version
- detailed version
- contribution extraction
6. After the three sections are complete, run quality judgment using:
- original cleaned paper text
- summary version
- detailed version
- contribution extraction
7. Merge the outputs using `references/output-template.md`.
## Preprocessing rules
The script does deterministic preprocessing only.
Treat URL inputs as arXiv identifiers, arXiv abstract URLs, or arXiv PDF URLs that must resolve to a PDF download.
Do not attempt webpage parsing, HTML extraction, or generic site scraping.
Do not use the script's previews as a substitute for the full extracted text.
Treat manifest failures, partial extraction notes, or unsupported formats as evidence that the source may be incomplete.
## Generation-stage rules
Consult `references/prompts.md` for the exact Dify-style prompt patterns and variable mapping.
### Summary version
Generate in the requested language.
Must include when available:
- original title
- research background or pain point
- core method name
- at least one key experimental number
If no explicit experimental result is provided in the source, state `原文未提供具体实验数据` or the equivalent in the requested language.
Do not add praise or filler.
### Detailed version
Generate in the requested language.
Use this exact structure:
- `### 1. 背景与动机`
- `### 2. 核心方法`
- `### 3. 实验设置`
- `### 4. 主要结果与消融实验`
- `### 5. 局限性(若有)`
Only include content supported by the extracted text.
### Contribution extraction
Generate in the requested language.
Each contribution must be an independent innovation point, not an experimental observation.
Each one must include source-grounded support evidence without inventing citations or page numbers.
### Quality judgment
Run this only after the three generated sections exist.
Evaluate summary, detailed, and contribution outputs separately against the original cleaned text.
For each one, provide a 1-5 score and a concrete error list.
## Manifest-aware confidence rules
Downgrade confidence or mention extraction risk when the manifest shows:
- download failure
- arxiv source normalization failure
- partial parsing
- fallback decoding
- missing quantitative evidence
- unreadable pdf or docx parsing problems
## Non-negotiable constraints
- Never fabricate paper content missing from the extracted text.
- Keep the three generation stages independent before the quality stage.
- Preserve the requested language.
- Keep different papers separate unless the user explicitly asks for a comparison.
## Resources
- `scripts/process_papers.py`: normalize arXiv inputs, download PDFs or read local files, extract text, clean text, and emit `manifest.json`
- `references/prompts.md`: exact Dify-style prompt logic and variable mapping
- `references/output-template.md`: final response template
- `references/script-usage.md`: script I/O and manifest field definitions
FILE:scripts/process_papers.py
#!/usr/bin/env python3
"""Preprocess arXiv paper URLs and local files for LLM-based summarization workflows.
This script normalizes arXiv inputs, downloads PDFs, extracts text from supported
formats, cleans text, and emits a manifest.json plus per-paper text files.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
import urllib.parse
import urllib.request
import zipfile
from pathlib import Path
from typing import Iterable
import xml.etree.ElementTree as ET
TEXT_EXTENSIONS = {".txt", ".md", ".markdown", ".json", ".csv"}
MAX_PREVIEW_CHARS = 400
ARXIV_HOSTS = {"arxiv.org", "www.arxiv.org"}
ARXIV_ID_PATTERN = re.compile(
r"^(?:arxiv:)?(?P<id>(?:\d{4}\.\d{4,5}|[a-z-]+(?:\.[A-Z]{2})?/\d{7})(?:v\d+)?)$",
re.IGNORECASE,
)
class ProcessingError(Exception):
"""Raised when a source cannot be processed."""
def parse_multi_value(raw: str | None) -> list[str]:
if raw is None:
return []
value = raw.strip()
if not value or value.lower() in {"none", "null", "[]"}:
return []
try:
parsed = json.loads(value)
if isinstance(parsed, list):
return [str(item).strip() for item in parsed if str(item).strip()]
if isinstance(parsed, str) and parsed.strip():
return [parsed.strip()]
except json.JSONDecodeError:
pass
if "\n" in value:
return [item.strip() for item in value.splitlines() if item.strip()]
if "," in value:
return [item.strip() for item in value.split(",") if item.strip()]
return [value]
def safe_slug(text: str, fallback: str) -> str:
slug = re.sub(r"[^a-zA-Z0-9._-]+", "-", text).strip("-._").lower()
return slug[:80] or fallback
def ensure_text(text: str) -> str:
text = text.replace("\x00", " ")
text = re.sub(r"\r\n?", "\n", text)
text = re.sub(r"[\t\f\v]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r"[ ]{2,}", " ", text)
return text.strip()
def read_text_file(path: Path) -> str:
data = path.read_bytes()
for encoding in ("utf-8", "utf-8-sig", "latin-1"):
try:
return data.decode(encoding)
except UnicodeDecodeError:
continue
raise ProcessingError(f"cannot decode text file: {path}")
def extract_docx_text(path: Path) -> str:
try:
with zipfile.ZipFile(path) as zf:
xml_bytes = zf.read("word/document.xml")
except Exception as exc:
raise ProcessingError(f"cannot open docx: {exc}") from exc
try:
root = ET.fromstring(xml_bytes)
except ET.ParseError as exc:
raise ProcessingError(f"invalid docx xml: {exc}") from exc
ns = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
texts = [node.text for node in root.findall(".//w:t", ns) if node.text]
if not texts:
raise ProcessingError("docx contains no extractable text")
return ensure_text(" ".join(texts))
def extract_pdf_text(path: Path) -> str:
try:
from pypdf import PdfReader # type: ignore
except Exception as exc:
raise ProcessingError("pdf extraction requires pypdf in the runtime") from exc
try:
reader = PdfReader(str(path))
pieces = []
for page in reader.pages:
pieces.append(page.extract_text() or "")
text = ensure_text("\n\n".join(pieces))
if not text:
raise ProcessingError("pdf contains no extractable text")
return text
except Exception as exc:
raise ProcessingError(f"cannot extract pdf text: {exc}") from exc
def extract_text_from_path(path: Path) -> tuple[str, list[str]]:
notes: list[str] = []
ext = path.suffix.lower()
if ext in TEXT_EXTENSIONS:
return ensure_text(read_text_file(path)), notes
if ext == ".docx":
notes.append("parsed docx via zipped xml extraction")
return extract_docx_text(path), notes
if ext == ".pdf":
notes.append("parsed pdf via pypdf when available")
return extract_pdf_text(path), notes
# Fallback: attempt text decode for local text-like files only.
try:
notes.append("used fallback text decode for unknown extension")
return ensure_text(read_text_file(path)), notes
except ProcessingError as exc:
raise ProcessingError(f"unsupported or unreadable file type {ext or '[no extension]'}: {exc}") from exc
def extract_arxiv_id(source: str) -> tuple[str, list[str]]:
value = source.strip()
notes: list[str] = []
match = ARXIV_ID_PATTERN.fullmatch(value)
if match:
notes.append("normalized arxiv id to direct pdf download")
return match.group("id"), notes
parsed = urllib.parse.urlparse(value)
host = parsed.netloc.lower()
if host not in ARXIV_HOSTS:
raise ProcessingError("paperurls only supports arXiv IDs or arXiv abs/pdf URLs")
path = parsed.path.strip("/")
if not path:
raise ProcessingError("arXiv URL is missing a paper identifier")
parts = [part for part in path.split("/") if part]
if len(parts) < 2:
raise ProcessingError("unsupported arXiv URL format")
prefix = parts[0].lower()
identifier = "/".join(parts[1:])
if prefix == "pdf" and identifier.lower().endswith(".pdf"):
identifier = identifier[:-4]
elif prefix == "abs":
notes.append("normalized arxiv abs URL to direct pdf download")
else:
raise ProcessingError("unsupported arXiv URL format")
if not ARXIV_ID_PATTERN.fullmatch(identifier):
raise ProcessingError("unsupported or invalid arXiv identifier")
return identifier, notes
def download_arxiv_pdf(source: str, downloads_dir: Path, index: int) -> tuple[Path, list[str]]:
arxiv_id, notes = extract_arxiv_id(source)
url = f"https://arxiv.org/pdf/{arxiv_id}.pdf"
request = urllib.request.Request(url, headers={"User-Agent": "paper-summary-script/1.0"})
try:
with urllib.request.urlopen(request, timeout=30) as response:
headers = dict(response.headers.items())
content_type = headers.get("Content-Type", "").lower()
if "pdf" not in content_type and not url.lower().endswith(".pdf"):
raise ProcessingError("arXiv source did not resolve to a PDF download")
base = safe_slug(arxiv_id.replace("/", "-"), f"arxiv-{index}")
target = downloads_dir / f"{index:03d}-{base}.pdf"
target.write_bytes(response.read())
return target, notes
except Exception as exc:
raise ProcessingError(f"failed to download arXiv pdf: {exc}") from exc
def process_source(index: int, source_type: str, source_label: str, work_dir: Path) -> dict:
downloads_dir = work_dir / "downloads"
texts_dir = work_dir / "texts"
downloads_dir.mkdir(parents=True, exist_ok=True)
texts_dir.mkdir(parents=True, exist_ok=True)
record = {
"index": index,
"source_type": source_type,
"source_label": source_label,
"status": "failed",
"error": None,
"download_path": None,
"extracted_text_path": None,
"characters": 0,
"preview": "",
"notes": [],
}
try:
if source_type == "url":
local_path, download_notes = download_arxiv_pdf(source_label, downloads_dir, index)
record["download_path"] = str(local_path)
record["notes"].extend(download_notes)
else:
local_path = Path(source_label).expanduser().resolve()
if not local_path.exists() or not local_path.is_file():
raise ProcessingError("local file does not exist or is not a file")
record["download_path"] = str(local_path)
text, notes = extract_text_from_path(local_path)
record["notes"].extend(notes)
if not text:
raise ProcessingError("no extractable text found")
label = Path(local_path).stem or f"paper-{index}"
text_path = texts_dir / f"{index:03d}-{safe_slug(label, f'paper-{index}')}.txt"
text_path.write_text(text + "\n", encoding="utf-8")
record["status"] = "success"
record["extracted_text_path"] = str(text_path)
record["characters"] = len(text)
record["preview"] = text[:MAX_PREVIEW_CHARS]
return record
except Exception as exc:
record["error"] = str(exc)
return record
def combine_texts(records: Iterable[dict], output_path: Path) -> None:
chunks: list[str] = []
for record in records:
if record.get("status") != "success":
continue
text_path = record.get("extracted_text_path")
if not text_path:
continue
try:
text = Path(text_path).read_text(encoding="utf-8")
except Exception:
continue
chunks.append(
f"## Paper {record['index']}: {record['source_label']}\n\n{text.strip()}\n"
)
output_path.write_text("\n\n".join(chunks).strip() + "\n", encoding="utf-8")
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Preprocess arXiv paper URLs and local files for summarization workflows.")
parser.add_argument("--language", default="", help="Requested output language.")
parser.add_argument("--paperurls", default="", help="JSON array, newline list, comma list, or single arXiv URL/ID.")
parser.add_argument("--paperfiles", default="", help="JSON array, newline list, comma list, or single file path.")
parser.add_argument("--output-dir", required=True, help="Directory to write manifest and extracted text files.")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
urls = parse_multi_value(args.paperurls)
files = parse_multi_value(args.paperfiles)
if not urls and not files:
print(json.dumps({
"status": "error",
"message": "paperurls and paperfiles cannot both be empty",
}, ensure_ascii=False, indent=2))
return 1
work_dir = Path(args.output_dir).expanduser().resolve()
work_dir.mkdir(parents=True, exist_ok=True)
records: list[dict] = []
index = 1
for url in urls:
records.append(process_source(index, "url", url, work_dir))
index += 1
for file_path in files:
records.append(process_source(index, "file", file_path, work_dir))
index += 1
manifest = {
"status": "ok",
"language": args.language,
"paperurls_count": len(urls),
"paperfiles_count": len(files),
"success_count": sum(1 for r in records if r["status"] == "success"),
"failure_count": sum(1 for r in records if r["status"] != "success"),
"records": records,
}
manifest_path = work_dir / "manifest.json"
manifest_path.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8")
combine_texts(records, work_dir / "combined_extracted_text.md")
print(json.dumps(manifest, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:references/output-template.md
# Output template
Repeat this block once per paper.
Use the user's requested language.
## Paper [index]: [original title or source label]
### Input status
- language: [language]
- source type: [url | file]
- source label: [arXiv URL or filename]
- preprocessing status: [success | failed]
- extracted text path: [path or none]
### 摘要版
[summary version]
### 详细版
[detailed version]
### 学术贡献
[contribution extraction]
### 质量判断
#### 1. 摘要版一致性
- 评分:[1-5]
- 错误列表:
- [error or 未发现明显不一致]
#### 2. 详细版一致性
- 评分:[1-5]
- 错误列表:
- [error or 未发现明显不一致]
#### 3. 贡献提炼一致性
- 评分:[1-5]
- 错误列表:
- [error or 未发现明显不一致]
### Missing or uncertain items
- [item]
- [item]
FILE:references/prompts.md
# Prompt mapping
Use these variables conceptually when reproducing the Dify workflow:
- `language`: requested output language
- `cleaned_text`: cleaned paper text extracted from the original source
- `summary_text`: generated summary version
- `detailed_text`: generated detailed version
- `contribution_text`: generated contribution extraction
## Summary version
```text
你是一位严谨的计算机科学审稿人。请严格基于以下论文内容生成“摘要版”。一定要生成{{language}}版。
要求:
1. 必须包含三要素:原版题目、研究背景痛点、核心方法名称、主要实验结果(至少一个关键数字)。
2. 禁止添加原文未提及的评价性词汇(如“卓越的”、“开创性的”),只陈述事实。
3. 如果原文中未明确给出实验结果,请注明“原文未提供具体实验数据”。
4. 不要有“好的,已根据您提供的论文内容,生成**版摘要”这种废话。
论文内容:
{{cleaned_text}}
```
## Detailed version
```text
请生成“详细版”摘要。一定要生成{{language}}版。不要有“好的,已根据您提供的论文内容,生成**版摘要”这种废话。必须使用以下结构:
### 1. 背景与动机
简述研究背景、现有问题、本文目标。
### 2. 核心方法
详细描述模型架构、算法流程、创新点。如有公式或算法伪代码,请简要概括。
### 3. 实验设置
- 数据集:列出所有使用的数据集及其划分方式。
- 基线模型:列出对比的方法。
- 评价指标:使用的指标。
### 4. 主要结果与消融实验
- 主要结果:与基线对比的表格数据(若原文有)。
- 消融实验:关键组件的影响分析(若原文有)。
### 5. 局限性(若有)
若原文讨论了方法的不足,请摘录。
论文内容:
{{cleaned_text}}
```
## Contribution extraction
```text
请提炼本文的“学术贡献”。一定要生成{{language}}版。输出格式如下:
- **贡献 1**:[具体创新点]
支撑依据:[引用原文段落或页码]
- **贡献 2**:[具体创新点]
支撑依据:[引用原文段落或页码]
...
注意:
1. 每个贡献必须是独立的创新点,而非实验观察结果。
2. 支撑依据必须从原文中直接摘录或概括,不能凭空捏造。
3. 不要有“好的,已根据您提供的论文内容,生成**版摘要”这种废话。
论文内容:
{{cleaned_text}}
```
## Quality judgment
```text
请分别检查以下三个生成内容与原文的一致性:
原文:{{cleaned_text}}
1. 摘要版:{{summary_text}}
2. 详细版:{{detailed_text}}
3. 贡献提炼:{{contribution_text}}
对每个输出给出评分(1-5)和错误列表。
```
## Execution rule
The quality judgment stage must be called only after the summary, detailed, and contribution stages have all completed. Its input includes all three generated outputs plus the original cleaned text.
FILE:references/script-usage.md
# Script usage
## Command
```bash
python scripts/process_papers.py --language "zh-CN" --paperurls '["https://arxiv.org/abs/1706.03762"]' --paperfiles '["/absolute/path/to/uploaded-paper.pdf"]' --output-dir ./runs/paper-summary
```
## Accepted input forms
`--paperurls` accepts arXiv IDs, arXiv abstract URLs, arXiv PDF URLs, and the same list formats below.
`--paperfiles` accepts local paper files in the same list formats below.
- JSON arrays
- newline-separated values
- comma-separated values
- a single raw value
URL inputs are normalized to direct arXiv PDF downloads only.
The script does not parse web pages or use HTML as a paper source.
## Output files
The script writes:
- `manifest.json`: machine-readable per-input status
- `combined_extracted_text.md`: merged text for quick inspection
- `texts/*.txt`: one cleaned text file per successful source
- `downloads/*`: raw downloaded arXiv PDFs for URL inputs
## Manifest fields
Each record contains:
- `index`
- `source_type`
- `source_label`
- `status`
- `error`
- `download_path`
- `extracted_text_path`
- `characters`
- `preview`
- `notes`
## How to use the manifest
1. Ignore failed records for content generation.
2. Read each successful `extracted_text_path` as `cleaned_text`.
3. Generate summary, detailed, and contribution outputs independently from `cleaned_text`.
4. Only after all three are complete, run quality judgment with `cleaned_text + summary + detailed + contribution`.
5. Use `preview` only for quick triage, never as a substitute for the full extracted text.
6. Use `notes` to downgrade confidence when extraction is partial or lossy.
FILE:agents/openai.yaml
interface:
display_name: "带脚本的论文摘要生成"
short_description: "下载 arXiv PDF 并预处理后生成论文摘要"