@clawhub-aaiccee-c7c344280d
重大疾病理赔智能评估(支持 28 种病种)。输入住院病历结构化数据,调用内网评估接口,输出原始 JSON 与自然语言结论(结论 + 证据)。
---
name: med-critical-disease-review
description: 重大疾病理赔智能评估(支持 28 种病种)。输入住院病历结构化数据,调用内网评估接口,输出原始 JSON 与自然语言结论(结论 + 证据)。
metadata:
{
"openclaw":
{
"emoji": "🏥"
}
}
---
# 重大疾病理赔评估
概述
----
给定一份住院病历(结构化 `medicalRecord`,含诊断/文书等),本技能调用内网评估服务,对 **任意重大疾病病种** 进行判定,并返回:
- **最终结论**:符合/不符合(来自 `final_result`)
- **原因与证据**:按条件逐条给出 evidence(可带来源 source)
- **原始结果**:接口返回的完整 JSON(便于追溯与二次加工)
数据安全、隐私与伦理声明
------------------------
- **最小必要原则**:仅处理完成评估所必需的数据字段;不要求提供与评估无关的身份信息。
- **严格脱敏**:在发送至任何模型/接口前,会对可识别个人身份的信息进行脱敏/去标识化处理(如姓名、证件号、手机号、详细地址、人脸/影像等)。仅传递脱敏后的必要信息用于本次 skill 调用。
- **不做本地持久化**:不将用户输入与中间结果写入本地持久化存储(包含磁盘文件、数据库、日志)。仅在内存中短暂处理;**本次调用结束即销毁**。
- **第三方 API 风险提示**:在功能需要时,可能会调用第三方模型/服务接口;此时仅会发送**脱敏后的必要信息**,并使用加密传输。除完成本次请求外,不用于任何其他用途(如训练、画像、营销)。
- **医疗边界**:本技能输出为理赔条款/条件匹配与证据整理的辅助信息,不构成医疗诊断或治疗建议;如涉及临床判断请以执业医生意见为准。
支持病种(28)
--------------
- 心脏瓣膜手术:`heart_valve_surgery`
- 主动脉手术:`aortic_surgery`
- 冠状动脉搭桥术:`coronary_artery_bypass`
- 重大器官或造血干细胞移植术:`major_organ_transplant`
- 恶性肿瘤——重度:`malignant_tumor`
- 严重慢性肾衰竭:`severe_chronic_kidney_failure`
- 严重慢性肝衰竭:`severe_chronic_liver_failure`
- 急性重症肝炎或亚急性重症肝炎:`acute_severe_hepatitis`
- 严重慢性呼吸衰竭:`severe_chronic_respiratory_failure`
- 严重特发性肺动脉高压:`severe_idiopathic_pulmonary_hypertension`
- 严重脑损伤:`severe_brain_injury`
- 深度昏迷:`deep_coma`
- 严重脑中风后遗症:`severe_stroke_sequelae`
- 严重阿尔茨海默病:`severe_alzheimers_disease`
- 严重原发性帕金森病:`severe_primary_parkinsons_disease`
- 严重运动神经元病:`severe_motor_neuron_disease`
- 严重脑炎后遗症或严重脑膜炎后遗症:`severe_brain_encephalitis_sequelae`
- 严重非恶性颅内肿瘤:`severe_non_malignant_intracranial_tumor`
- 瘫痪:`paralysis`
- 双耳失聪:`bilateral_deafness`
- 双目失明:`bilateral_blindness`
- 语言能力丧失:`language_ability_loss`
- 严重克罗恩病:`severe_crohn_disease`
- 严重溃疡性结肠炎:`severe_ulcerative_colitis`
- 重型再生障碍性贫血:`severe_aplastic_anemia`
- 较重急性心肌梗死:`moderate_acute_myocardial_infarction`
- 严重Ⅲ度烧伤:`severe_third_degree_burn`
- 多个肢体缺失:`multiple_limb_loss`
输入格式
--------
请求为 JSON(示例见 `../data/med-major-disease-assess/req_data.json`),最少需要包含:
- `medicalRecord`:病历结构化对象
- `mainDiagName` / `otherDiagName`:诊断信息(字符串/JSON 字符串均可)
- `docs`:文书列表(每个 doc 至少包含 `docType` 与文本字段,如 `format_page_text`)
最小校验(先校验,再审核)
--------------------------
脚本会先对入参做最小结构校验,校验通过后才会调用审核接口:
- 请求体必须是 JSON object
- 必须包含 `medicalRecord`(object)
- `medicalRecord.docs` 必须是非空数组
- `docs` 中至少一项包含 `docType`
后端接口
--------
- HTTP API:`http://10.10.20.15:9010/api/v1/assessment/assess/{disease}?model_type=qwq`
- Content-Type:`application/json`
快速开始
--------
```bash
# 在本目录下运行
python3 scripts/major_disease_assess.py \
--disease aortic_surgery \
--input ../data/med-major-disease-assess/req_data.json
```
参数说明
--------
- `--disease STRING`
- 病种类型(如:`aortic_surgery`、`heart_valve_surgery` 等)。
- `--input PATH`
- 输入请求 JSON 路径(UTF-8)。
- `--output-json PATH`
- 保存接口原始返回 JSON(默认:`../runs/med-major-disease-assess/{disease}_resp.json`)。
- `--output-text PATH`
- 保存自然语言结论文本(默认:`../runs/med-major-disease-assess/{disease}_resp.txt`)。
- `--model-type STRING`
- 查询参数 `model_type`(默认:`qwq`)。
- `--timeout SECONDS`
- HTTP 超时秒数(默认:60)。
输出约定
--------
- 若指定输出路径的父目录不存在,会自动创建。
- 自然语言输出默认包含:
- 结论(符合/不符合)
- 原因(`final_result.reason`)
- 逐条条件的证据(`conditions[*].evidence`,并附 `source/description`)
备注
----
- **发布约束**:示例输入、运行输出、自测脚本均放在 skill 包外(分别位于 `../data/`、`../runs/`、`../self_tests/`),skill 目录内仅保留可发布的核心文件(`scripts/`、`SKILL.md`、`_meta.json`)。
FILE:_meta.json
{
"ownerId": "kn76wejkeqxfc03j0rfxp2jaj982m7aC",
"slug": "critical-disease-review",
"version": "1.0.0",
"publishedAt": 1773287610299
}
FILE:scripts/major_disease_assess.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict
import urllib.error
import urllib.parse
import urllib.request
from format_assessment_nl import build_natural_language
BASE_URL = "https://shangbao.yunzhisheng.cn/skills/critical-disease/api/v1/assessment/assess"
SUPPORTED_DISEASES = {
"heart_valve_surgery",
"aortic_surgery",
"coronary_artery_bypass",
"major_organ_transplant",
"malignant_tumor",
"severe_chronic_kidney_failure",
"severe_chronic_liver_failure",
"acute_severe_hepatitis",
"severe_chronic_respiratory_failure",
"severe_idiopathic_pulmonary_hypertension",
"severe_brain_injury",
"deep_coma",
"severe_stroke_sequelae",
"severe_alzheimers_disease",
"severe_primary_parkinsons_disease",
"severe_motor_neuron_disease",
"severe_brain_encephalitis_sequelae",
"severe_non_malignant_intracranial_tumor",
"paralysis",
"bilateral_deafness",
"bilateral_blindness",
"language_ability_loss",
"severe_crohn_disease",
"severe_ulcerative_colitis",
"severe_aplastic_anemia",
"moderate_acute_myocardial_infarction",
"severe_third_degree_burn",
"multiple_limb_loss",
}
def validate_payload(payload: Dict[str, Any]) -> None:
if not isinstance(payload, dict):
raise ValueError("payload must be a JSON object")
if "medicalRecord" not in payload or not isinstance(payload["medicalRecord"], dict):
raise ValueError("Missing or invalid key: medicalRecord (object).")
mr = payload["medicalRecord"]
docs = mr.get("docs")
if not isinstance(docs, list) or len(docs) == 0:
raise ValueError("medicalRecord.docs must be a non-empty list.")
has_doc_type = any(isinstance(d, dict) and d.get("docType") for d in docs)
if not has_doc_type:
raise ValueError("medicalRecord.docs must contain at least one item with docType.")
def call_major_disease_assess(
disease: str,
payload: Dict[str, Any],
*,
model_type: str = "qwq",
timeout: int = 60,
) -> Dict[str, Any]:
disease = (disease or "").strip()
if not disease:
raise ValueError("disease is required, e.g. aortic_surgery")
if disease not in SUPPORTED_DISEASES:
raise ValueError(f"Unsupported disease: {disease}. Supported: {sorted(SUPPORTED_DISEASES)}")
validate_payload(payload)
qs = urllib.parse.urlencode({"model_type": model_type})
url = f"{BASE_URL}/{urllib.parse.quote(disease)}?{qs}"
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(url=url, data=data, method="POST", headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", errors="replace")
return json.loads(body)
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {e.code}: {detail}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Network error: {e}") from e
def main() -> int:
parser = argparse.ArgumentParser(description="Major disease (重大疾病) assessment via /assess/{disease}.")
parser.add_argument("--disease", required=True, help="Disease type, e.g. aortic_surgery, heart_valve_surgery.")
parser.add_argument("--input", required=True, help="Path to request JSON.")
parser.add_argument(
"--output-json",
default="",
help="Path to save raw response JSON (default: ../runs/med-major-disease-assess/{disease}_resp.json).",
)
parser.add_argument(
"--output-text",
default="",
help="Path to save natural language summary (default: ../runs/med-major-disease-assess/{disease}_resp.txt).",
)
parser.add_argument("--model-type", default="qwq", help="Model type query param (default: qwq).")
parser.add_argument("--timeout", type=int, default=60, help="HTTP timeout seconds (default: 60).")
args = parser.parse_args()
in_path = Path(args.input)
if not in_path.exists():
raise FileNotFoundError(f"Input file not found: {in_path}")
payload = json.loads(in_path.read_text(encoding="utf-8"))
try:
# 先校验,后审核
validate_payload(payload)
resp = call_major_disease_assess(args.disease, payload, model_type=args.model_type, timeout=args.timeout)
except Exception as e:
print(f"✗ Error: {e}", file=sys.stderr)
return 1
default_base = Path("../runs/med-major-disease-assess")
out_json = Path(args.output_json) if args.output_json else (default_base / f"{args.disease}_resp.json")
out_text = Path(args.output_text) if args.output_text else (default_base / f"{args.disease}_resp.txt")
out_json.parent.mkdir(parents=True, exist_ok=True)
out_json.write_text(json.dumps(resp, ensure_ascii=False, indent=2), encoding="utf-8")
text = build_natural_language(resp)
out_text.parent.mkdir(parents=True, exist_ok=True)
out_text.write_text(text, encoding="utf-8")
print(f"✓ Saved raw JSON to: {out_json}")
print(f"✓ Saved natural language to: {out_text}")
print("\n--- Natural language preview ---")
print(text)
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/format_assessment_nl.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
from pathlib import Path
from typing import Any, Dict, List, Optional
def _get(d: Dict[str, Any], path: List[str], default: Any = None) -> Any:
cur: Any = d
for k in path:
if not isinstance(cur, dict) or k not in cur:
return default
cur = cur[k]
return cur
def build_natural_language(resp: Dict[str, Any]) -> str:
"""
将评估接口返回(aortic_surgery 等)组装为自然语言。
关注点:
- final_result(satisfied + reason)
- conditions[*].evidence(以及可选 source / reasoning / description)
"""
if not resp.get("success"):
err = resp.get("error") or _get(resp, ["result", "error"]) or "接口返回 success=false"
return f"评估失败:{err}"
result = resp.get("result") or {}
criteria_name = result.get("criteria_name") or result.get("disease_type") or "评估项目"
final_satisfied = _get(result, ["final_result", "satisfied"], None)
final_reason = _get(result, ["final_result", "reason"], "") or ""
if final_satisfied is True:
conclusion = f"结论:符合{criteria_name}理赔条件。"
elif final_satisfied is False:
conclusion = f"结论:不符合{criteria_name}理赔条件。"
else:
conclusion = f"结论:{criteria_name}评估结果未知。"
if final_reason.strip():
conclusion += f"原因:{final_reason.strip()}。"
conditions = result.get("conditions") or []
evidences: List[str] = []
for idx, c in enumerate(conditions, start=1):
if not isinstance(c, dict):
continue
satisfied = c.get("satisfied")
evidence = (c.get("evidence") or "").strip()
source = (c.get("source") or "").strip()
description = (c.get("description") or "").strip()
prefix = f"{idx})"
status = "满足" if satisfied is True else ("不满足" if satisfied is False else "未知")
parts: List[str] = []
if description:
parts.append(description)
if evidence:
parts.append(f"证据:{evidence}")
if source:
parts.append(f"来源:{source}")
if parts:
evidences.append(f"{prefix}{status}," + ";".join(parts))
else:
evidences.append(f"{prefix}{status}")
if evidences:
return conclusion + "\n依据:\n" + "\n".join(evidences)
return conclusion
def main() -> int:
parser = argparse.ArgumentParser(description="Format assessment response to natural language.")
parser.add_argument("--input", required=True, help="Path to assessment response JSON.")
parser.add_argument("--output", default="", help="Optional output text file path.")
args = parser.parse_args()
in_path = Path(args.input)
resp = json.loads(in_path.read_text(encoding="utf-8"))
text = build_natural_language(resp)
if args.output:
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(text, encoding="utf-8")
else:
print(text)
return 0
if __name__ == "__main__":
raise SystemExit(main())
门诊慢病审核(糖尿病/高血压)。输入 OCR 结果数组 JSON,输出审核结论与原因(原始 JSON + 自然语言结论)。
---
name: med-chronic-disease-review
description: 门诊慢病审核(糖尿病/高血压)。输入 OCR 结果数组 JSON,输出审核结论与原因(原始 JSON + 自然语言结论)。
metadata:
{
"openclaw":
{
"emoji": "🩺"
}
}
---
# 门诊慢病审核
概述
----
给定一份 OCR 结果数组(每项包含 `fileName/page/docType/ocrText`),本技能会:
- 输出审核接口原始 JSON
- 输出自然语言摘要(结论 + 原因)
数据安全、隐私与伦理声明
------------------------
- **最小必要原则**:仅处理审核所必需的文本内容;不要求也不鼓励提供与审核无关的身份信息。
- **严格脱敏**:在发送至任何模型/接口前,会对可识别个人身份的信息进行脱敏/去标识化处理(如姓名、证件号、手机号、详细地址、人脸/影像等)。仅传递脱敏后的必要信息用于本次 skill 调用。
- **不做本地持久化**:不将用户输入与中间结果写入本地持久化存储(包含磁盘文件、数据库、日志)。仅在内存中短暂处理;**本次调用结束即销毁**。
- **第三方 API 风险提示**:在功能需要时,可能会调用第三方模型/服务接口;此时仅会发送**脱敏后的必要信息**,并使用加密传输。除完成本次请求外,不用于任何其他用途(如训练、画像、营销)。
- **医疗边界**:本技能输出为审核规则匹配与原因摘要的辅助信息,不构成医疗诊断或治疗建议;如涉及临床判断请以执业医生意见为准。
输入格式
--------
输入必须是 JSON 数组(list),示例:
```json
[
{"fileName":"xxx.pdf","page":1,"docType":"出院记录","ocrText":"..."},
{"fileName":"xxx.pdf","page":2,"docType":"检验报告","ocrText":"..."}
]
```
快速开始
--------
从 `skills` 目录执行:
```bash
# 糖尿病
python3 scripts/chronic_disease_review.py \
--disease-code diabetes \
--input data/med-chronic-disease-review/糖尿病_ocr_task_result.json
# 高血压
python3 scripts/chronic_disease_review.py \
--disease-code hypertension \
--input data/med-chronic-disease-review/高血压_ocr_task_result.json
```
参数说明
--------
- `--disease-code STRING`:**必填**。糖尿病/高血压(也支持 `diabetes/hypertension/dm/htn` 别名)。本 skill 每次只审核一个病种。
- `--review-type STRING`:可选。默认 `慢病审核`(也可传 `大病审核` 等服务支持的值)。
- `--input PATH`:OCR 数组 JSON(UTF-8)。
- `--base URL`:后端 base(默认:`http://10.10.20.15:9011`)。
- `--llm-model STRING`:可选。模型名。
- `--timeout SECONDS`:HTTP 超时;`0` 表示一直等待(默认:0)。
- `--output-json PATH`:保存原始返回 JSON(默认:`../runs/med-chronic-disease-review/{scenario}_resp.json`)。
- `--output-text PATH`:保存自然语言摘要(默认:`../runs/med-chronic-disease-review/{scenario}_resp.txt`)。
输出约定
--------
- 若输出路径父目录不存在,会自动创建。
- 自然语言摘要重点字段:
- `final_decision`:通过/不通过
- `reasoning`:原因(可选)
备注
----
- **发布约束**:示例输入、运行输出、自测脚本均放在 skill 包外(分别位于 `../data/`、`../runs/`、`../self_tests/`),skill 目录内仅保留可发布的核心文件(`scripts/`、`SKILL.md`、`_meta.json`)。
FILE:_meta.json
{
"ownerId": "kn76wejkeqxfc03j0rfxp2jaj982m7aa",
"slug": "med-chronic-disease-review",
"version": "1.0.0",
"publishedAt": 0
}
FILE:scripts/format_review_nl.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
from pathlib import Path
from typing import Any, Dict, List
def _as_text(x: Any) -> str:
if x is None:
return ""
if isinstance(x, str):
return x
try:
return json.dumps(x, ensure_ascii=False)
except Exception:
return str(x)
def build_natural_language(resp: Dict[str, Any]) -> str:
"""
将 /api/v1/review/flow/by-ocr 返回组装为自然语言摘要。
返回通常包含:
- results:数组(1 条或 2 条)
- disease_code / review_type / scenario_code / scenario_id
- flow_id / flow_name / flow_version
- final_decision / reasoning
"""
results = resp.get("results")
if isinstance(results, list) and results:
blocks: List[str] = []
for r in results:
if not isinstance(r, dict):
continue
disease_code = r.get("disease_code")
review_type = r.get("review_type")
scenario_code = r.get("scenario_code")
scenario_id = r.get("scenario_id")
flow_id = r.get("flow_id")
flow_name = r.get("flow_name")
flow_version = r.get("flow_version")
final_decision = r.get("final_decision")
reasoning = r.get("reasoning")
lines: List[str] = []
title = " / ".join([x for x in [_as_text(disease_code), _as_text(review_type)] if x])
if title:
lines.append(title)
if final_decision is not None:
lines.append(f"结论:{_as_text(final_decision)}")
meta = []
if scenario_code:
meta.append(f"scenario_code={_as_text(scenario_code)}")
if scenario_id is not None:
meta.append(f"scenario_id={_as_text(scenario_id)}")
if flow_id is not None:
meta.append(f"flow_id={_as_text(flow_id)}")
if flow_name:
meta.append(f"flow_name={_as_text(flow_name)}")
if flow_version:
meta.append(f"flow_version={_as_text(flow_version)}")
if meta:
lines.append("元信息:" + ",".join(meta))
if reasoning:
lines.append("原因:" + _as_text(reasoning))
blocks.append("\n".join(lines).strip())
if blocks:
return "\n\n---\n\n".join(blocks)
if resp.get("success") is False:
return "审核失败:" + _as_text(resp.get("error") or resp)
return "已返回审核结果,但未找到 results 字段。原始返回请查看保存的 JSON。"
def main() -> int:
parser = argparse.ArgumentParser(description="Format /review/flow response JSON to natural language.")
parser.add_argument("--input", required=True, help="Path to response JSON.")
parser.add_argument("--output", default="", help="Optional output text file path.")
args = parser.parse_args()
in_path = Path(args.input)
resp = json.loads(in_path.read_text(encoding="utf-8"))
text = build_natural_language(resp)
if args.output:
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(text, encoding="utf-8")
else:
print(text)
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/chronic_disease_review.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.error
import urllib.parse
import urllib.request
from format_review_nl import build_natural_language
DEFAULT_BASE = "https://shangbao.yunzhisheng.cn/skills/chronic-disease"
DISEASE_CODE_ALIASES: Dict[str, str] = {
"diabetes": "糖尿病",
"dm": "糖尿病",
"糖尿病": "糖尿病",
"hypertension": "高血压",
"htn": "高血压",
"高血压": "高血压",
}
def _http_json(method: str, url: str, *, payload: Optional[Dict[str, Any]] = None, timeout: int = 0) -> Any:
data: Optional[bytes] = None
headers: Dict[str, str] = {}
if payload is not None:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url=url, data=data, method=method, headers=headers)
try:
if timeout and timeout > 0:
resp_ctx = urllib.request.urlopen(req, timeout=timeout)
else:
resp_ctx = urllib.request.urlopen(req)
with resp_ctx as resp:
body = resp.read().decode("utf-8", errors="replace")
return json.loads(body) if body.strip() else None
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {e.code}: {detail}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Network error: {e}") from e
def validate_ocr_data(ocr_data: Any) -> List[Dict[str, Any]]:
if not isinstance(ocr_data, list) or len(ocr_data) == 0:
raise ValueError("OCR input must be a non-empty JSON array (list).")
out: List[Dict[str, Any]] = []
for i, item in enumerate(ocr_data):
if not isinstance(item, dict):
raise ValueError(f"OCR item #{i} must be an object.")
if not item.get("ocrText"):
raise ValueError(f"OCR item #{i} missing required field: ocrText")
if "page" in item and not isinstance(item["page"], int):
raise ValueError(f"OCR item #{i} field page must be int when provided.")
out.append(item)
return out
def call_review_by_ocr(base: str, req_body: Dict[str, Any], *, timeout: int = 0) -> Dict[str, Any]:
url = f"{base.rstrip('/')}/api/v1/review/flow/by-ocr"
resp = _http_json("POST", url, payload=req_body, timeout=timeout)
if not isinstance(resp, dict):
raise RuntimeError(f"Invalid response from /review/flow/by-ocr: {resp}")
return resp
def _resolve_disease_code(args_disease_code: str) -> Optional[str]:
"""
disease_code 可选:
- 传空:不填 disease_code(接口会两个都跑)
- 传 diabetes/hypertension 等别名:映射到 中文 disease_code
- 传中文糖尿病/高血压:原样
"""
s = (args_disease_code or "").strip()
if not s:
return None
if s in DISEASE_CODE_ALIASES:
return DISEASE_CODE_ALIASES[s]
return s
def _infer_label(disease_code: Optional[str]) -> str:
if disease_code == "糖尿病":
return "diabetes"
if disease_code == "高血压":
return "hypertension"
return "by_ocr"
def _read_ocr_array(path: str) -> List[Dict[str, Any]]:
"""
兼容两种用法:
- 传现存路径
- 只传文件名:若当前目录找不到,则尝试 skills/data/med-chronic-disease-review/<name>
"""
p = Path(path)
if not p.exists() and not p.is_absolute():
fallback = Path(__file__).resolve().parents[2] / "data" / "med-chronic-disease-review" / p.name
if fallback.exists():
p = fallback
raw = json.loads(p.read_text(encoding="utf-8"))
return validate_ocr_data(raw)
def main() -> int:
parser = argparse.ArgumentParser(description="Chronic disease review via POST /api/v1/review/flow/by-ocr.")
parser.add_argument(
"--disease-code",
default="",
help="Required disease_code: 糖尿病/高血压. Also supports aliases: diabetes/hypertension/dm/htn.",
)
parser.add_argument("--review-type", default="慢病审核", help="review_type (default: 慢病审核)")
parser.add_argument("--input", required=True, help="Path to OCR array JSON (list).")
parser.add_argument("--base", default=DEFAULT_BASE, help=f"Service base URL (default: {DEFAULT_BASE})")
parser.add_argument("--llm-model", default="", help="Optional llm_model.")
parser.add_argument("--timeout", type=int, default=0, help="HTTP timeout seconds. 0 means wait forever (default: 0).")
parser.add_argument("--output-json", default="", help="Path to save raw response JSON.")
parser.add_argument("--output-text", default="", help="Path to save natural language summary.")
args = parser.parse_args()
try:
disease_code = _resolve_disease_code(args.disease_code)
if disease_code is None:
raise ValueError("--disease-code is required (糖尿病/高血压 or diabetes/hypertension/dm/htn).")
ocr_data = _read_ocr_array(args.input)
req_body: Dict[str, Any] = {"review_type": (args.review_type or "慢病审核"), "ocr_data": ocr_data}
req_body["disease_code"] = disease_code
if args.llm_model:
req_body["llm_model"] = args.llm_model
resp = call_review_by_ocr(args.base, req_body, timeout=args.timeout)
except Exception as e:
print(f"✗ Error: {e}", file=sys.stderr)
return 1
label = _infer_label(_resolve_disease_code(args.disease_code))
default_base = Path("../runs/med-chronic-disease-review")
out_json = Path(args.output_json) if args.output_json else (default_base / f"{label}_resp.json")
out_text = Path(args.output_text) if args.output_text else (default_base / f"{label}_resp.txt")
out_json.parent.mkdir(parents=True, exist_ok=True)
out_json.write_text(json.dumps(resp, ensure_ascii=False, indent=2), encoding="utf-8")
text = build_natural_language(resp)
out_text.parent.mkdir(parents=True, exist_ok=True)
out_text.write_text(text, encoding="utf-8")
print(f"✓ Saved raw JSON to: {out_json}")
print(f"✓ Saved natural language to: {out_text}")
print("\n--- Natural language preview ---")
print(text)
return 0
if __name__ == "__main__":
raise SystemExit(main())
将中文门诊复诊病历文本结构化为细粒度字段,输出 JSON(如现病史/既往史/诊断/处理意见等)。
---
name: med-followup-record-struct
description: 将中文门诊复诊病历文本结构化为细粒度字段,输出 JSON(如现病史/既往史/诊断/处理意见等)。
metadata:
{
"openclaw":
{
"emoji": "🧩"
}
}
---
# 门诊复诊病历结构化
概述
----
给定一份中文门诊 **复诊病历文本**(医生已书写),本技能抽取并规范化为细粒度字段 JSON,例如:
- 现病史.病情概述
- 现病史.药物
- 现病史.其他治疗措施
- 现病史.病情转归
- 现病史.一般情况
- 既往史.疾病
- 既往史.其他信息
- 既往史.手术史
- 既往史.过敏史
- 既往史.输血史
- 婚育史
- 月经史
- 个人史
- 家族史
- 查体
- 辅助检查
- 诊断
- 处理意见.药物
- 处理意见.其他建议
数据安全、隐私与伦理声明
------------------------
- **最小必要原则**:仅处理结构化抽取所必需的文本内容;不要求也不鼓励包含姓名、证件号、手机号、详细地址等身份信息。
- **严格脱敏**:在发送至任何模型/接口前,会对可识别个人身份的信息进行脱敏/去标识化处理(如姓名、证件号、手机号、详细地址、人脸/影像等)。仅传递脱敏后的必要信息用于本次 skill 调用。
- **不做本地持久化**:不将用户输入与中间结果写入本地持久化存储(包含磁盘文件、数据库、日志)。仅在内存中短暂处理;**本次调用结束即销毁**。
- **第三方 API 风险提示**:在功能需要时,可能会调用第三方模型/服务接口;此时仅会发送**脱敏后的必要信息**,并使用加密传输。除完成本次请求外,不用于任何其他用途(如训练、画像、营销)。
- **医疗边界**:本技能输出为文本抽取与结构化结果,不构成医疗诊断或治疗建议;如涉及临床判断请以执业医生意见为准。
输入格式
--------
纯文本病历(UTF-8),可包含如下分段:
主诉:……
现病史:……
既往史:……
婚育史:……
月经史:……
个人史:……
家族史:……
查体:……
辅助检查:……
诊断:……
处理:……
快速开始
--------
```bash
# 从 skills 目录运行
python3 scripts/struct_followup_record.py \
--input data/med-followup-record-struct/followup_record.txt
```
参数说明
--------
* `--input PATH`
- 输入复诊病历文本路径(UTF-8)。
* `--output PATH`
- 输出结构化 JSON 路径(默认:`../runs/med-followup-record-struct/structured.json`)。
* `--timeout SECONDS`
- 超时秒数;`0` 表示一直等待(默认:0)。
* `--diag-id STRING`
- 对话/就诊 ID(默认:`skill-diag`)。
* `--department STRING`
- 科室(可选)。
输出约定
--------
- 输出为 UTF-8 JSON,key 形如 `现病史.病情概述`、`现病史.药物`、`既往史.疾病`、`诊断`、`处理意见.药物` 等。
- 未提及字段在服务支持的情况下可能填充为“未提及”。
测试命令
--------
从 `skills` 目录执行(网络自测):
```bash
python3 self_tests/med-followup-record-struct/self_test_followup_record_struct.py --run-network
```
FILE:_meta.json
{
"ownerId": "kn76wejkeqxfc03j0rfxp2jaj982m7aa",
"slug": "med-followup-record-struct",
"version": "1.0.0",
"publishedAt": 1773287610243
}
FILE:scripts/struct_followup_record.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import argparse
from typing import Any, Dict
import urllib.error
import urllib.request
API_URL = "https://shangbao.yunzhisheng.cn/skills/record-struct/gen_abstract_by_his"
def _read_http_body(resp: Any) -> str:
return resp.read().decode("utf-8", errors="replace")
def _post_json(url: str, payload: Dict[str, Any], *, timeout: int = 0) -> str:
# 兼容部分服务端对非 ASCII JSON 处理不一致:这里使用 ASCII JSON(\uXXXX)编码
data_bytes = json.dumps(payload, ensure_ascii=True).encode("utf-8")
req = urllib.request.Request(
url=url,
data=data_bytes,
method="POST",
headers={
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json",
},
)
try:
if timeout and timeout > 0:
ctx = urllib.request.urlopen(req, timeout=timeout)
else:
ctx = urllib.request.urlopen(req)
with ctx as resp:
return _read_http_body(resp)
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {e.code}: {detail}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Network error: {e}") from e
def call_followup_struct_api(
his_record: str,
*,
diag_id: str = "skill-diag",
department: str = "",
timeout: int = 0,
) -> Dict[str, Any]:
"""
调用公司接口,对复诊病历进行结构化。
:param his_record: 门诊复诊病历文本(his_record)
:param timeout: 超时时间(秒)
:return: 结构化结果(字典)
"""
if not isinstance(his_record, str) or not his_record.strip():
raise ValueError("his_record is required and must be non-empty.")
payload: Dict[str, Any] = {
"his_record": his_record,
"diag_id": diag_id or "skill-diag",
}
if department:
payload["department"] = department
body = _post_json(API_URL, payload, timeout=timeout)
try:
data = json.loads(body)
except json.JSONDecodeError:
raise ValueError(f"Unexpected non-JSON response: {body[:500]}") from None
# 假定接口返回字段 'structured' / 'result' / 'data' 中包含结构化结果
structured = data.get("structured") or data.get("result") or data.get("data")
if structured is None:
raise ValueError(f"Unexpected API response: {data}")
return structured
def struct_followup_record(
input_path: str,
output_path: str,
timeout: int = 0,
diag_id: str = "skill-diag",
department: str = "",
) -> str:
"""
将复诊病历结构化并保存为 JSON。
:param input_path: 输入病历文件路径
:param output_path: 输出 JSON 文件路径
:param timeout: 超时时间(秒)
:return: 输出文件路径
"""
if not os.path.exists(input_path):
base_name = os.path.basename(input_path)
fallback = os.path.join(os.path.dirname(__file__), "..", "..", "data", "med-followup-record-struct", base_name)
if os.path.exists(fallback):
input_path = fallback
else:
raise FileNotFoundError(f"Input file not found: {input_path}")
with open(input_path, "r", encoding="utf-8") as f:
record_text = f.read()
print(f"Structuring follow-up outpatient record: {input_path}")
structured = call_followup_struct_api(record_text, diag_id=diag_id, department=department, timeout=timeout)
out_dir = os.path.dirname(output_path) or "."
os.makedirs(out_dir, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(structured, f, ensure_ascii=False, indent=2)
print(f"✓ Structured record saved to: {output_path}")
return output_path
def main():
parser = argparse.ArgumentParser(
description="Structure outpatient follow-up medical record into fine-grained fields."
)
parser.add_argument(
"--input",
required=True,
help="Path to outpatient follow-up record text file (UTF-8)."
)
parser.add_argument(
"--output",
default="",
help="Output path for structured JSON (default: ../runs/med-followup-record-struct/structured.json)."
)
parser.add_argument(
"--timeout",
type=int,
default=0,
help="HTTP request timeout seconds. 0 means wait forever (default: 0)."
)
parser.add_argument("--diag-id", default="skill-diag", help="diag_id (default: skill-diag)")
parser.add_argument("--department", default="", help="department (optional)")
args = parser.parse_args()
try:
default_out = os.path.join("..", "runs", "med-followup-record-struct", "structured.json")
out_path = args.output or default_out
struct_followup_record(
input_path=args.input,
output_path=out_path,
timeout=args.timeout,
diag_id=args.diag_id,
department=args.department,
)
print("\n✓ Follow-up record structured successfully!")
return 0
except FileNotFoundError as e:
print(f"✗ Error: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"✗ Unexpected Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())
从中文医患对话文本生成门诊初诊病历,输出结构化分段的病历正文(文本)。
---
name: med-initial-record-gen
description: 从中文医患对话文本生成门诊初诊病历,输出结构化分段的病历正文(文本)。
metadata:
{
"openclaw":
{
"emoji": "📝"
}
}
---
# 门诊初诊病历生成
概述
----
给定一份 **中文医患对话文本**(通常来自 ASR 转写),本技能生成一份门诊 **初诊病历** 文本,包含常见病历分段(如主诉、现病史、既往史、查体、辅助检查、诊断、处理等)。
常见输出分段:
- 主诉
- 现病史
- 既往史
- 月经史(如适用)
- 查体
- 辅助检查
- 诊断
- 处理
数据安全、隐私与伦理声明
------------------------
- **最小必要原则**:仅处理生成病历所必需的对话内容;不要求也不鼓励包含姓名、证件号、手机号、详细地址等身份信息。
- **严格脱敏**:在发送至任何模型/接口前,会对可识别个人身份的信息进行脱敏/去标识化处理(如姓名、证件号、手机号、详细地址、人脸/影像等)。仅传递脱敏后的必要信息用于本次 skill 调用。
- **不做本地持久化**:不将用户输入与中间结果写入本地持久化存储(包含磁盘文件、数据库、日志)。仅在内存中短暂处理;**本次调用结束即销毁**。
- **第三方 API 风险提示**:在功能需要时,可能会调用第三方模型/服务接口;此时仅会发送**脱敏后的必要信息**,并使用加密传输。除完成本次请求外,不用于任何其他用途(如训练、画像、营销)。
- **医疗边界**:本技能用于病历文本整理与结构化表达的辅助生成,不构成医疗诊断或治疗建议;请由执业医生复核并承担最终医疗责任。
输入格式
--------
纯文本对话(UTF-8),建议一行一句/一轮,例如:
患者:……
医生:……
患者:……
医生:……
快速开始
--------
```bash
# 从 skills 目录运行
python3 scripts/gen_initial_record.py \
--input data/med-initial-record-gen/dialogue.txt
```
参数说明
--------
* `--input PATH`
- 输入对话文本路径(UTF-8)。
* `--output PATH`
- 输出病历路径(默认:`../runs/med-initial-record-gen/record.txt`)。
* `--diag-id STRING`
- 对话 ID(默认:`skill-diag`)。
* `--timeout SECONDS`
- 超时秒数;`0` 表示一直等待(默认:0)。
输出约定
--------
- 若输出路径父目录不存在,会自动创建。
- 输出为 UTF-8 文本,包含常见门诊初诊病历分段。
测试命令
--------
从 `skills` 目录执行(网络自测):
```bash
python3 self_tests/med-initial-record-gen/self_test_initial_record_gen.py --run-network
```
FILE:_meta.json
{
"ownerId": "kn76wejkeqxfc03j0rfxp2jaj982m7aa",
"slug": "med-initial-record-gen",
"version": "1.0.0",
"publishedAt": 1773287610243
}
FILE:scripts/gen_initial_record.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import argparse
from datetime import datetime
import json
import urllib.error
import urllib.request
RECORD_API_URL = "https://shangbao.yunzhisheng.cn/skills/record-gen/gen_record_by_diag_v1"
def call_record_api(diag_id: str, dialogue: str, timeout: int = 0) -> str:
"""
调用公司接口,生成初诊门诊病历(最终记录)。
"""
payload = {
"diag_id": diag_id,
"dep_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"diag": dialogue,
}
try:
data_bytes = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
url=RECORD_API_URL,
data=data_bytes,
method="POST",
headers={"Content-Type": "application/json"},
)
if timeout and timeout > 0:
resp_ctx = urllib.request.urlopen(req, timeout=timeout)
else:
resp_ctx = urllib.request.urlopen(req)
with resp_ctx as resp:
body = resp.read().decode("utf-8", errors="replace")
# 优先按 JSON 解析,以便后续扩展
data = json.loads(body)
record = data.get("record") or data.get("result") or data.get("data")
if record:
return record
return body
except json.JSONDecodeError:
# 非 JSON,直接返回文本
return body
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {e.code}: {detail}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Network error: {e}") from e
def generate_initial_record(
input_path: str,
output_path: str,
timeout: int = 0,
diag_id: str = "skill-diag",
) -> str:
"""
从输入对话文件生成门诊初诊病历并保存。
:param input_path: 输入对话文本文件路径
:param output_path: 输出病历文件路径
:param timeout: 请求超时时间(秒)
:return: 输出文件路径
"""
if not os.path.exists(input_path):
# 兼容:如果只传文件名且当前目录不存在,则尝试 skills/data/med-initial-record-gen/<name>
base_name = os.path.basename(input_path)
fallback = os.path.join(os.path.dirname(__file__), "..", "..", "data", "med-initial-record-gen", base_name)
if os.path.exists(fallback):
input_path = fallback
else:
raise FileNotFoundError(f"Input file not found: {input_path}")
with open(input_path, "r", encoding="utf-8") as f:
dialogue_text = f.read()
print(f"Generating initial visit record from dialogue: {input_path}")
# 直接调用生成病历接口
record_text = call_record_api(diag_id=diag_id, dialogue=dialogue_text, timeout=timeout)
out_dir = os.path.dirname(output_path) or "."
os.makedirs(out_dir, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(record_text)
print(f"✓ Record saved to: {output_path}")
return output_path
def main():
parser = argparse.ArgumentParser(
description="Generate outpatient initial visit medical record from doctor-patient dialogue."
)
parser.add_argument(
"--input",
required=True,
help="Path to ASR dialogue text file (UTF-8)."
)
parser.add_argument(
"--output",
default="",
help="Output path for generated medical record (default: ../runs/med-initial-record-gen/record.txt)."
)
parser.add_argument(
"--diag-id",
default="skill-diag",
help="Dialogue ID used for backend services (default: skill-diag)."
)
parser.add_argument(
"--timeout",
type=int,
default=0,
help="HTTP request timeout seconds. 0 means wait forever (default: 0)."
)
args = parser.parse_args()
try:
default_out = os.path.join("..", "runs", "med-initial-record-gen", "record.txt")
out_path = args.output or default_out
generate_initial_record(
input_path=args.input,
output_path=out_path,
timeout=args.timeout,
diag_id=args.diag_id,
)
print("\n✓ Initial visit record generated successfully!")
return 0
except FileNotFoundError as e:
print(f"✗ Error: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"✗ Unexpected Error: {e}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())
Transcribe audio files via UniCloud ASR (云知声语音识别, recorded audio → text) API from UniSound. Supports multiple formats, optimized for finance, customer servic...
---
name: u2-audio-file-transcriber
description: Transcribe audio files via UniCloud ASR (云知声语音识别, recorded audio → text) API from UniSound. Supports multiple formats, optimized for finance, customer service, and other domains.
metadata:
openclaw:
requires:
env:
- UNISOUND_APPKEY
- UNISOUND_SECRET
bins:
- python3
primaryEnv: UNISOUND_SECRET
emoji: "🎤"
homepage: http://af-asr.uat.hivoice.cn
---
# UniSound ASR Audio File Transcriber
## 云知声语音转写
调用云知声(UniSound)语音识别服务转写音频文件。支持多种音频格式,适用于金融、客服等场景。
Transcribe audio files using UniCloud ASR service. Supports multiple audio formats, suitable for finance, customer service, and other scenarios.
## When to Use This Skill
**Use UniSound ASR for**:
- Transcribing pre-recorded audio files (WAV, MP3, M4A, FLAC, OGG)
- Converting customer service recordings to text
- Processing financial audio records
- Audio file transcription up to 2 hours, 100MB
- Domain-specific transcription (finance, customer service, general)
**Do NOT use for**:
- Real-time/streaming speech recognition
- Text-to-speech (TTS) synthesis
- Live captioning or real-time transcription
- Voice conversation or dialogue systems
**Use when**: The user needs to transcribe recorded audio files, or asks for UniSound/云知声 audio file transcription.
**不适用于**: 实时语音识别、语音合成(TTS)或直播字幕。
## Installation
Install Python dependencies before using this skill. From the skill directory (`skills/asr-file-transfer-tools`):
```bash
pip install -r scripts/requirements.txt
```
Requires Python 3.8+.
## How to Use This Skill
**⛔ MANDATORY RESTRICTIONS - DO NOT VIOLATE ⛔**
1. **ONLY use UniSound ASR API** - Execute the script `python3 scripts/transcribe.py`
2. **NEVER transcribe audio directly** - Do NOT attempt local transcription
3. **NEVER offer alternatives** - Do NOT suggest "I can try another method" or similar
4. **IF API fails** - Display the error message and STOP immediately
5. **NO fallback methods** - Do NOT attempt audio transcription any other way
If the script execution fails (API not configured, network error, etc.):
- Show the error message to the user
- Do NOT offer to help using your audio analysis capabilities
- Do NOT ask "Would you like me to try transcribing it?"
- Simply stop and wait for user to fix the configuration
### Basic Workflow
1. **Execute audio transcription**:
```bash
python3 scripts/transcribe.py /path/to/audio.wav
```
**Command options**:
- `--format FORMAT` - Audio format (wav, mp3, m4a, flac, ogg)
- `--domain DOMAIN` - Recognition domain (finance, customer_service, other)
- `--out FILE` - Save output to file instead of stdout
- `--json` - Output JSON format with full result
- `--userid ID` - Custom user ID
2. **Output**:
- Default: Text transcript printed to stdout
- With `--out`: Transcript saved to specified file
- With `--json`: Full JSON result with metadata
### Understanding the Output
**Text Format**:
- Plain transcript of the audio content
- Sentence segmentation preserved
- Timestamps included in JSON mode
**JSON Format**:
- Complete transcription result with metadata
- Confidence scores for each segment
- Timestamp information
- Recognition details
### Usage Examples
**Example 1: Quick Transcription**
```bash
python3 scripts/transcribe.py recording.wav
```
Output: Transcript text printed to console
**Example 2: Save to File**
```bash
python3 scripts/transcribe.py interview.mp3 --format mp3 --out transcript.txt
```
Output: Transcript saved to `transcript.txt`
**Example 3: JSON Output with Metadata**
```bash
python3 scripts/transcribe.py audio.m4a --json --out result.json
```
Output: Complete JSON result with timestamps and confidence scores
**Example 4: Domain-Specific Transcription**
```bash
python3 scripts/transcribe.py financial_call.wav --domain finance
```
Output: Transcript optimized for financial terminology
### How It Works
The script uses the UniCloud ASR API with the following workflow:
1. **Initialize upload** — Get a task ID from the API / 初始化上传,获取任务ID
2. **Upload audio file** — Upload the audio file to the server / 上传音频文件到服务器
3. **Start transcription** — Submit the transcription task / 提交转写任务
4. **Poll for results** — Wait for transcription to complete (typically 10-60 seconds) / 轮询等待转写完成(通常10-60秒)
5. **Return transcript** — Output the recognized text / 输出识别文本
> **Privacy**: Audio files are uploaded directly to UniCloud servers. No data is sent to third-party services.
>
> **隐私说明**:音频文件直接上传到云知声服务器。不会将数据发送到第三方服务。
### Supported Formats
**Supported file types**:
- WAV
- MP3
- M4A
- FLAC
- OGG
**Limits**:
- Maximum duration: 2 hours
- Maximum file size: 100MB
Use the `--format` flag to specify the format if auto-detection fails:
```bash
python3 scripts/transcribe.py audio.mp3 --format mp3
```
## First-Time Configuration
**When API is not configured**:
The error will show:
```
CONFIG_ERROR: UNISOUND_APPKEY or UNISOUND_SECRET not configured.
```
### Obtaining Credentials
To use this skill, you need API credentials from UniCloud (云知声):
您需要从云知声获取 API 凭据:
1. **Contact UniCloud** to obtain your API credentials
联系云知声获取您的 API 凭据
2. **You will receive**:
您将收到:
- **AppKey**: Application key / 应用密钥
- **Secret**: Secret key for authentication / 认证密钥
- **UserId**: Your user identifier / 用户标识
- **Base URL**: API endpoint URL / API 端点地址
### Test Credentials (UAT Environment)
**For testing and evaluation only** (用于测试和评估):
```yaml
AppKey: 681e01d78d8a40e8928bc8268020639b
Secret: d7b2980cb61843d69fdab5e99deafcdf
UserId: unisound-python-demo
Base URL: http://af-asr.uat.hivoice.cn
```
> **⚠️ Important Security Notice / 重要安全提示**
>
> - **Test environment only** — These credentials are for UAT testing only
> - **仅测试环境** — 这些凭据仅用于 UAT 测试
> - **No sensitive data** — Never use with production or sensitive audio files
> - **勿用于敏感数据** — 切勿用于生产或敏感音频文件
> - **Get your own credentials** — For production use, contact UniCloud
> - **获取自己的凭据** — 生产环境请联系云知声
> - **Data privacy** — Audio files are uploaded to UniSound servers
> - **数据隐私** — 音频文件将上传至云知声服务器
### Configuration Steps
**Guide the user to configure securely**:
- Recommend configuring through environment variables or `.env` file
- Recommend using the host application's configuration method when possible
- Warn about sharing credentials in chat (may be stored in conversation history)
**Required environment variables**:
| Variable | Required | Description | Default |
|----------|----------|-------------|---------|
| `UNISOUND_APPKEY` | **Yes** | Application key / 应用密钥 | - |
| `UNISOUND_SECRET` | **Yes** | Secret key / 认证密钥 | - |
| `UNISOUND_USERID` | No | User identifier / 用户标识 | `unisound-python-demo` |
| `UNISOUND_BASE_URL` | No | API base URL / API 基础地址 | `http://af-asr.uat.hivoice.cn` |
| `UNISOUND_DOMAIN` | No | Recognition domain / 识别领域 | `other` |
| `UNISOUND_AUDIOTYPE` | No | Default audio format / 默认音频格式 | `wav` |
**Configuration examples**:
*Linux/macOS:*
```bash
export UNISOUND_APPKEY="681e01d78d8a40e8928bc8268020639b"
export UNISOUND_SECRET="d7b2980cb61843d69fdab5e99deafcdf"
export UNISOUND_USERID="unisound-python-demo"
```
*Windows (PowerShell):*
```powershell
$env:UNISOUND_APPKEY="681e01d78d8a40e8928bc8268020639b"
$env:UNISOUND_SECRET="d7b2980cb61843d69fdab5e99deafcdf"
$env:UNISOUND_USERID="unisound-python-demo"
```
*Using .env file (Recommended):*
```
UNISOUND_APPKEY=681e01d78d8a40e8928bc8268020639b
UNISOUND_SECRET=d7b2980cb61843d69fdab5e99deafcdf
UNISOUND_USERID=unisound-python-demo
```
> **Security Note**: Never commit `.env` files or actual credentials to version control.
> **安全提示**:切勿将 `.env` 文件或实际凭据提交到版本控制系统。
## Error Handling
**Authentication failed**:
```
API returned error: 401
```
→ AppKey or Secret is invalid, reconfigure with correct credentials
→ AppKey 或 Secret 无效,请重新配置正确的凭据
**Network error**:
```
Connection timeout
```
→ Check network connectivity to UniCloud API
→ 检查到云知声 API 的网络连接
**Audio file not found**:
```
错误: 音频文件不存在
```
→ Check the file path, use absolute path if needed
→ 检查文件路径,必要时使用绝对路径
**Transcription timeout**:
```
转写超时
```
→ Transcription is taking longer than expected (server may be busy)
→ 转写时间过长(服务器可能繁忙)
→ Try again later / 稍后重试
→ Check if the audio file is too large / 检查音频文件是否过大
**Unsupported audio format**:
```
Unsupported audio format
```
→ The audio format is not supported by the API
→ API 不支持该音频格式
→ Convert to a supported format (WAV recommended) / 转换为支持的格式(推荐 WAV)
→ Use `--format` flag to explicitly specify the format / 使用 `--format` 参数显式指定格式
```bash
# Convert using ffmpeg / 使用 ffmpeg 转换
ffmpeg -i input.mp3 -ar 16000 -ac 1 output.wav
```
**API quota exceeded**:
```
API returned error: 429
```
→ Too many requests, wait before retrying
→ 请求过多,请稍后重试
## Important Notes
- **Requires network connectivity** to UniCloud ASR API
**需要网络连接**到云知声 ASR API
- **Cloud-based processing** - Audio files are uploaded to UniSound servers
**云端处理**——音频文件会上传到云知声服务器
- **File size limits**: Maximum 2 hours duration, 100MB file size
**文件大小限制**:最长 2 小时,最大 100MB
- **Domain optimization**: Use appropriate `--domain` for better accuracy
**领域优化**:使用适当的 `--domain` 以获得更高的准确率
- **Test credentials**: UAT environment credentials are for testing only
**测试凭据**:UAT 环境凭据仅供测试使用
## Security Best Practices
**For production deployment / 生产部署**:
- **Obtain your own credentials** from UniCloud / 云知声
从云知声获取您自己的凭据
- **Use environment variables** — Never embed production credentials in scripts or configuration files
**使用环境变量**——切勿在脚本或配置文件中嵌入生产凭据
- **Review privacy policy** — Audio files are uploaded to UniSound servers; review their privacy policy
**审查隐私政策**——音频文件会上传到云知声服务器;请查看其隐私政策
- **Test with non-sensitive data first** — Always test with non-sensitive audio files first
**首先使用非敏感数据进行测试**——始终先使用非敏感音频文件进行测试
## Troubleshooting
**Issue**: Script fails with import error
→ Ensure dependencies are installed: `pip install -r scripts/requirements.txt`
→ Ensure using Python 3.8 or later / 确保使用 Python 3.8 或更高版本
**Issue**: Cannot connect to API server
无法连接到 API 服务器
→ Check network connectivity / 检查网络连接
→ Verify API endpoint URL is correct / 验证 API 端点 URL 是否正确
→ Try using a different network / 尝试使用其他网络
**Issue**: Poor transcription quality
→ Check audio quality (background noise, clarity) / 检查音频质量(背景噪音、清晰度)
→ Try using appropriate `--domain` parameter / 尝试使用适当的 `--domain` 参数
→ Ensure audio format is correct / 确保音频格式正确
## Getting Help
If you encounter issues not covered here:
如果遇到未涵盖的问题:
1. Check the UniCloud ASR documentation for the latest API changes
查看云知声 ASR 文档了解最新的 API 变更
2. Verify your network connection to the API server
验证到 API 服务器的网络连接
3. Check the error message details for specific error codes
检查错误消息详情以获取特定错误代码
4. Ensure you're using Python 3.8 or later
确保使用 Python 3.8 或更高版本
```bash
# Check Python version / 检查 Python 版本
python3 --version
```
> **Note**: API capabilities and supported formats are determined by your UniCloud ASR API service configuration.
> **注意**:API 功能和支持的格式由您的云知声 ASR API 服务配置决定。
FILE:requirements.txt
# ASR语音识别Demo依赖包
# HTTP请求库
requests>=2.31.0
# urllib3 (requests的依赖)
urllib3>=2.0.0
FILE:scripts/transcribe.py
# -*- coding: utf-8 -*-
"""
UniSound ASR Transcribe Script
语音文件转写脚本 - 支持命令行调用
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import os
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ==================== 配置类 ====================
@dataclass
class ASRConfig:
"""ASR配置类"""
# 默认配置 - 直接填入你的凭据
base_url: str = "http://af-asr.uat.hivoice.cn"
appkey: str = ""
secret: str = ""
userid: str = "unisound-python-demo"
domain: str = "finance"
audiotype: str = "wav"
use_hot_data: bool = True
punction: str = "beauty"
callbackurl: str = ""
timeout: int = 30
max_retries: int = 3
poll_interval: int = 5
max_poll_attempts: int = 120
@classmethod
def from_env(cls) -> "ASRConfig":
"""从环境变量加载配置(环境变量会覆盖默认值)"""
return cls(
base_url=os.getenv("UNISOUND_BASE_URL", cls.base_url),
appkey=os.getenv("UNISOUND_APPKEY", cls.appkey),
secret=os.getenv("UNISOUND_SECRET", cls.secret),
userid=os.getenv("UNISOUND_USERID", cls.userid),
audiotype=os.getenv("UNISOUND_AUDIOTYPE", cls.audiotype),
use_hot_data=os.getenv("UNISOUND_USE_HOT_DATA", str(cls.use_hot_data)).lower() == "true",
)
@property
def urls(self) -> dict[str, str]:
"""获取所有API端点"""
base = self.base_url
return {
"upload_init": f"{base}/utservice/v2/trans/append_upload/init",
"upload": f"{base}/utservice/v2/trans/append_upload/upload",
"upload_status": f"{base}/utservice/v2/trans/append_upload/status",
"transcribe": f"{base}/utservice/v4/trans/transcribe",
"text": f"{base}/utservice/v2/trans/text",
}
# ==================== 异常类 ====================
class ASRError(Exception):
"""ASR基础异常类"""
pass
class ASRAPIError(ASRError):
"""API调用错误"""
def __init__(self, error_code: int, message: str):
self.error_code = error_code
self.message = message
super().__init__(f"[{error_code}] {message}")
# ==================== 工具函数 ====================
def calculate_file_md5(filepath: str) -> str:
"""计算文件的MD5值"""
md5_hash = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
def get_timestamp() -> int:
"""获取当前时间戳"""
return int(time.time())
# ==================== ASR客户端类 ====================
class ASRClient:
"""ASR语音识别客户端"""
def __init__(self, config: ASRConfig):
self.config = config
self.session = requests.Session()
retry_strategy = Retry(
total=config.max_retries,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def _generate_signature(self, params: dict[str, Any]) -> str:
"""生成签名"""
sorted_params = sorted(params.items())
sign_str = self.config.secret + "".join(v for _, v in sorted_params) + self.config.secret
return hashlib.sha1(sign_str.encode("utf-8")).hexdigest().upper()
def _build_params(self, **kwargs) -> dict[str, str]:
"""构建带签名的请求参数"""
params = {
"appkey": self.config.appkey,
"timestamp": str(get_timestamp()),
**kwargs,
}
params["signature"] = self._generate_signature(params)
return params
def _request(self, method: str, url: str, **kwargs) -> bytes:
"""发送HTTP请求"""
response = self.session.request(
method=method,
url=url,
timeout=self.config.timeout,
**kwargs,
)
response.raise_for_status()
return response.content
def _parse_response(self, data: bytes) -> dict[str, Any]:
"""解析响应数据"""
return json.loads(data.decode("utf-8"))
def _check_error(self, response: dict[str, Any]) -> None:
"""检查响应错误"""
error_code = response.get("error_code", -1)
if error_code != 0:
message = response.get("message", "Unknown error")
raise ASRAPIError(error_code, message)
def init_upload(self) -> str:
"""初始化上传"""
url = self.config.urls["upload_init"]
params = self._build_params(userid=self.config.userid)
response_data = self._request("GET", url, params=params)
response = self._parse_response(response_data)
self._check_error(response)
return response.get("task_id")
def upload_file(self, task_id: str, filepath: str) -> str:
"""上传音频文件"""
url = self.config.urls["upload"]
file_md5 = calculate_file_md5(filepath)
params = self._build_params(
userid=self.config.userid,
task_id=task_id,
md5=file_md5,
audiotype=self.config.audiotype,
)
with open(filepath, "rb") as f:
response_data = self._request("POST", url, params=params, data=f)
response = self._parse_response(response_data)
self._check_error(response)
return task_id
def start_transcribe(self, task_id: str, filepath: str) -> str:
"""开始转写"""
file_md5 = calculate_file_md5(filepath)
url = self.config.urls["transcribe"]
params = self._build_params(
userid=self.config.userid,
task_id=task_id,
audiotype=self.config.audiotype,
domain=self.config.domain,
md5=file_md5,
use_hot_data=str(self.config.use_hot_data).lower(),
callbackurl=self.config.callbackurl,
num_convert="true",
vocab_id="a7ea15d097184eb0814d1e588f45f118"
)
response_data = self._request("GET", url, params=params)
response = self._parse_response(response_data)
self._check_error(response)
return response.get("task_id")
def get_transcribe_result(self, task_id: str) -> dict[str, Any]:
"""获取转写结果"""
url = self.config.urls["text"]
params = self._build_params(task_id=task_id)
response_data = self._request("GET", url, params=params)
return self._parse_response(response_data)
def poll_transcribe_result(self, task_id: str) -> dict[str, Any]:
"""轮询获取转写结果"""
for attempt in range(self.config.max_poll_attempts):
result = self.get_transcribe_result(task_id)
self._check_error(result)
status = result.get("status")
if status == "done":
return result
if attempt % 10 == 0:
print(f" 转写中... ({status})", file=sys.stderr)
time.sleep(self.config.poll_interval)
raise ASRError(f"转写超时: {task_id}")
def transcribe(self, filepath: str) -> dict[str, Any]:
"""完整的转写流程"""
# 1. 初始化上传
task_id = self.init_upload()
print(f" task_id: {task_id}", file=sys.stderr)
# 2. 上传文件
print(f" 上传文件...", file=sys.stderr)
self.upload_file(task_id, filepath)
# 3. 开始转写
print(f" 开始转写...", file=sys.stderr)
transcribe_task_id = self.start_transcribe(task_id, filepath)
# 4. 轮询结果
print(f" 等待转写完成...", file=sys.stderr)
result = self.poll_transcribe_result(transcribe_task_id)
return result
def extract_text(self, result: dict[str, Any]) -> str:
"""从转写结果中提取文本"""
texts = []
if "results" in result:
for item in result["results"]:
if "text" in item:
texts.append(item["text"])
return "".join(texts)
def close(self):
"""关闭客户端"""
self.session.close()
# ==================== 命令行接口 ====================
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="UniSound ASR - 语音文件转写工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s audio.wav
%(prog)s audio.wav --out result.txt
%(prog)s audio.wav --json --out result.json
%(prog)s audio.wav --domain finance
%(prog)s audio.wav --format mp3
""",
)
parser.add_argument("audio", help="音频文件路径")
parser.add_argument("--out", "-o", help="输出文件路径(默认输出到stdout)")
parser.add_argument("--json", action="store_true", help="输出JSON格式(包含完整结果)")
parser.add_argument("--format", help="音频格式(默认: wav)", default="wav")
parser.add_argument("--domain", help="领域(默认: other)", default="other")
args = parser.parse_args()
# 检查文件是否存在
audio_path = Path(args.audio)
if not audio_path.exists():
print(f"错误: 音频文件不存在: {args.audio}", file=sys.stderr)
sys.exit(1)
# 加载配置(优先使用环境变量,否则使用默认配置)
config = ASRConfig.from_env()
config.audiotype = args.format
config.domain = args.domain
# 执行转写
print(f"开始转写: {args.audio}", file=sys.stderr)
client = ASRClient(config)
try:
result = client.transcribe(str(audio_path))
text = client.extract_text(result)
# 输出结果
if args.json:
output = json.dumps(result, ensure_ascii=False, indent=2)
else:
output = text
if args.out:
with open(args.out, "w", encoding="utf-8") as f:
f.write(output)
print(f"结果已保存到: {args.out}", file=sys.stderr)
else:
print(output)
except ASRError as e:
print(f"错误: {e}", file=sys.stderr)
sys.exit(1)
finally:
client.close()
if __name__ == "__main__":
main()
FILE:.claude/settings.local.json
{
"permissions": {
"allow": [
"Bash(ls:*)"
]
}
}
Text-to-speech conversion using UniSound's TTS WebSocket API for generating high-quality Chinese Mandarin audio from text. Supports multiple voices, adjustab...
---
name: u2-tts
description: Text-to-speech conversion using UniSound's TTS WebSocket API for generating high-quality Chinese Mandarin audio from text. Supports multiple voices, adjustable parameters, and real-time streaming synthesis.
metadata:
openclaw:
requires:
env:
- UNISOUND_APPKEY
- UNISOUND_SECRET
bins:
- python
primaryEnv: UNISOUND_SECRET
emoji: "🔊"
homepage: https://www.unisound.com
---
# UniSound TTS - Text-to-Speech
## 云知声语音合成
Text-to-speech conversion using UniSound's TTS WebSocket API for generating high-quality Chinese Mandarin audio from text.
使用云知声 TTS WebSocket API 进行文本转语音转换,生成高质量中文普通话音频。
## When to Use This Skill
**Use UniSound TTS for**:
- Converting Chinese text to natural-sounding speech
- Generating audio for audiobooks, podcasts, or content creation
- Creating accessibility solutions for visually impaired users
- Building voice assistants or chatbot voice responses
- Batch processing text to audio files
- Custom speech synthesis with adjustable parameters (speed, volume, pitch, brightness)
**Do NOT use for**:
- Real-time speech recognition or transcription (use ASR skills instead)
- English language synthesis (optimized for Chinese Mandarin)
- Voice cloning or custom voice model training
**Use when**: The user needs text-to-speech conversion, asks for "语音合成" (speech synthesis), or mentions UniSound/云知声 TTS.
## Installation
Install Python dependencies before using this skill. From the skill directory (`skills/tts-tools`):
```bash
pip install websocket-client
```
Requires Python 3.6+.
## How to Use This Skill
**⛔ MANDATORY RESTRICTIONS - DO NOT VIOLATE ⛔**
1. **ONLY use UniSound TTS API** - Execute the script `python scripts/tts.py`
2. **NEVER synthesize speech directly** - Do NOT attempt local TTS synthesis
3. **NEVER offer alternatives** - Do NOT suggest "I can try another method" or similar
4. **IF API fails** - Display the error message and STOP immediately
5. **NO fallback methods** - Do NOT attempt text-to-speech any other way
If the script execution fails (API not configured, network error, etc.):
- Show the error message to the user
- Do NOT offer to help using your TTS capabilities
- Do NOT ask "Would you like me to try synthesizing it?"
- Simply stop and wait for user to fix the configuration
### Basic Workflow
1. **Configure credentials** (first time only):
```bash
export UNISOUND_APPKEY='ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3'
export UNISOUND_SECRET='5c12231cd279b35873a3ccecf9439118'
```
2. **Execute text-to-speech conversion**:
```bash
python scripts/tts.py --text '今天天气怎么样'
```
**Command options**:
- `--text TEXT` - Text to convert to speech (default: '今天天气怎么样?')
- `--voice VOICE` - Voice name (default: xiaofeng-base)
- `--format FORMAT` - Output format: mp3, wav, pcm (default: mp3)
- `--sample RATE` - Sample rate: 8k, 16k, 24k (default: 24k)
- `--speed SPEED` - Speech speed 0-100 (default: 50)
- `--volume VOLUME` - Volume level 0-100 (default: 50)
- `--pitch PITCH` - Pitch level 0-100 (default: 50)
- `--bright BRIGHT` - Brightness/tone 0-100 (default: 50)
- `--appkey APPKEY` - Override appkey (default: UNISOUND_APPKEY env var)
- `--secret SECRET` - Override secret (default: UNISOUND_SECRET env var)
3. **Output**:
- Audio files are saved to `results/` directory
- Filename format: `<timestamp>.<format>`
- Example: `1234567890.mp3`
### Understanding the Output
**Audio Format Options**:
- **MP3**: Compressed, smaller file size, good quality - best for web and streaming
- **WAV**: Uncompressed, excellent quality - best for production and archival
- **PCM**: Raw audio data - best for further audio processing
**Sample Rates**:
- **24k**: High quality, default - recommended for most use cases
- **16k**: Standard quality - good balance of quality and size
- **8k**: Lower quality, smaller file size - suitable for telephony
### Usage Examples
**Example 1: Quick Start with Test Credentials**
```bash
# Set test credentials
export UNISOUND_APPKEY='ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3'
export UNISOUND_SECRET='5c12231cd279b35873a3ccecf9439118'
# Convert text to speech
python scripts/tts.py --text '你好世界'
```
Output: `results/1234567890.mp3`
**Example 2: Custom Voice and Format**
```bash
python scripts/tts.py --text '今天天气怎么样' --voice xiaofeng-base --format wav
```
Output: High-quality WAV file with male voice
**Example 3: Adjusted Speech Parameters**
```bash
python scripts/tts.py --text '快速朗读' --speed 70 --volume 60 --pitch 50
```
Output: Faster speech with increased volume
**Example 4: High-Quality Audio Production**
```bash
python scripts/tts.py --text '高质量音频' --format wav --sample 24k --volume 60
```
Output: Production-quality WAV file at 24kHz
**Example 5: Command-line Credential Override**
```bash
python scripts/tts.py \
--text '测试' \
--appkey 'ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3' \
--secret '5c12231cd279b35873a3ccecf9439118'
```
### How It Works
The script uses the UniSound TTS WebSocket API with the following workflow:
1. **Authenticate** using SHA256 signature (appkey + timestamp + secret)
使用 SHA256 签名进行身份验证
2. **Establish WebSocket connection** to `wss://ws-stts.hivoice.cn/v1/tts`
建立 WebSocket 连接到云知声 TTS 服务
3. **Send TTS request** with text and voice parameters
发送包含文本和语音参数的 TTS 请求
4. **Receive streaming audio data** in binary chunks
以二进制块形式接收流式音频数据
5. **Save audio file** to the results directory
将音频文件保存到结果目录
### Available Voices
| Voice | Type | Description |
|-------|------|-------------|
| xiaofeng-base | Male | Standard male voice, clear and natural |
| xiaoyan | Female | Female voice options |
| xiaomei | Female | Alternative female voice |
| Custom voices | Various | Contact UniSound for more options |
### Adjustable Parameters
| Parameter | Range | Default | Description |
|-----------|-------|---------|-------------|
| speed | 0-100 | 50 | Speech speed (50 = normal, higher = faster) |
| volume | 0-100 | 50 | Volume level (50 = normal, higher = louder) |
| pitch | 0-100 | 50 | Pitch level (50 = normal, higher = higher) |
| bright | 0-100 | 50 | Brightness/tone (50 = normal) |
**Recommended settings**:
- Audiobooks: speed 45, pitch 50
- News/announcements: speed 55, volume 60, bright 60
- Accessibility: speed 35-40, volume 70
- Normal conversation: speed 50, all parameters 50
## First-Time Configuration
**When credentials are not configured**:
The script will show:
```
Error: AppKey and Secret are required!
Set them via --appkey/--secret arguments or UNISOUND_APPKEY/UNISOUND_SECRET environment variables.
```
### Test Credentials
For testing and evaluation, use these credentials:
用于测试和评估,请使用以下凭据:
```bash
export UNISOUND_APPKEY='ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3'
export UNISOUND_SECRET='5c12231cd279b35873a3ccecf9439118'
```
> **⚠️ Important Security Notice / 重要安全提示**
>
> - **Test credentials only** — These are for testing and evaluation purposes
> - **仅测试凭据**——这些凭据仅供测试和评估使用
> - **No sensitive data** — Never use with production or sensitive content
> - **勿用于敏感数据**——切勿用于生产或敏感内容
> - **Get your own credentials** — For production use, contact UniSound
> - **获取自己的凭据**——生产环境请联系云知声
> - **Data privacy** — Text is sent to UniSound servers for processing
> - **数据隐私**——文本将发送至云知声服务器进行处理
### Obtaining Production Credentials
For production use, obtain API credentials from UniSound (云知声):
用于生产环境时,请从云知声获取 API 凭据:
1. **Contact UniSound** to obtain your API credentials
联系云知声获取您的 API 凭据
Visit: https://www.unisound.com/
2. **You will receive**:
您将收到:
- **AppKey**: Application key / 应用密钥
- **Secret**: Secret key for authentication / 认证密钥
### Configuration Methods
**Method 1: Environment Variables (Recommended)**
*Linux/macOS:*
```bash
export UNISOUND_APPKEY='ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3'
export UNISOUND_SECRET='5c12231cd279b35873a3ccecf9439118'
python scripts/tts.py --text '你好'
```
*Windows (PowerShell):*
```powershell
$env:UNISOUND_APPKEY='ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3'
$env:UNISOUND_SECRET='5c12231cd279b35873a3ccecf9439118'
python scripts/tts.py --text '你好'
```
*Windows (CMD):*
```cmd
set UNISOUND_APPKEY=ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3
set UNISOUND_SECRET=5c12231cd279b35873a3ccecf9439118
python scripts/tts.py --text '你好'
```
**Method 2: .env File (Recommended for Development)**
Create a `.env` file in the project root:
```
UNISOUND_APPKEY=ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3
UNISOUND_SECRET=5c12231cd279b35873a3ccecf9439118
```
Then use with `python-dotenv` or load in your shell.
> **Security Note**: Never commit `.env` files or actual production credentials to version control.
> **安全提示**:切勿将 `.env` 文件或实际生产凭据提交到版本控制系统。
**Method 3: Command-Line Arguments**
```bash
python scripts/tts.py \
--text '你好世界' \
--appkey 'ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3' \
--secret '5c12231cd279b35873a3ccecf9439118'
```
### Required Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `UNISOUND_APPKEY` | **Yes** | Application key / 应用密钥 |
| `UNISOUND_SECRET` | **Yes** | Secret key / 认证密钥 |
### Python API Usage
**Basic Python API**:
```python
import os
from scripts.tts import Ws_parms, do_ws, write_results
# Get credentials from environment variables
appkey = os.getenv('UNISOUND_APPKEY', 'ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3')
secret = os.getenv('UNISOUND_SECRET', '5c12231cd279b35873a3ccecf9439118')
# Configure TTS parameters
ws_parms = Ws_parms(
url='wss://ws-stts.hivoice.cn/v1/tts',
appkey=appkey,
secret=secret,
pid=1,
vcn='xiaofeng-base',
text='你好,欢迎使用云知声语音合成服务!',
tts_format='mp3',
tts_sample='24k',
user_id='my-app',
)
# Execute TTS conversion
do_ws(ws_parms)
# Save result to file
write_results(ws_parms)
print('Audio saved to results/ directory!')
```
## Error Handling
**Authentication failed**:
```
Error: AppKey and Secret are required!
```
→ Credentials not provided
→ Set UNISOUND_APPKEY and UNISOUND_SECRET environment variables
→ 未提供凭据,请设置环境变量
**WebSocket connection error**:
```
WebSocket error: ...
```
→ Check network connectivity to UniSound API
→ Verify the API endpoint URL is correct
→ Check if firewall is blocking WebSocket connections
→ 检查网络连接和防火墙设置
**No audio data received**:
```
Error: No audio data received
```
→ Text may be empty or contain invalid characters
→ Check the text parameter is not empty
→ Verify text encoding is UTF-8
→ Credentials may be invalid
→ 检查文本内容、编码和凭据
**Invalid speech parameter**:
```
Error: speed must be between 0 and 100, got 150
```
→ Speech parameters must be between 0 and 100
→ 语音参数必须在 0 到 100 之间
**WebSocket connection timeout**:
```
WebSocket error: timeout
```
→ Network connection issue
→ API service may be temporarily unavailable
→ Check internet connection
→ 网络连接问题或服务暂时不可用
## Advanced Usage
### Custom Speech Parameters
```python
import os
from scripts.tts import Ws_parms, do_ws, write_results
appkey = os.getenv('UNISOUND_APPKEY', 'ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3')
secret = os.getenv('UNISOUND_SECRET', '5c12231cd279b35873a3ccecf9439118')
ws_parms = Ws_parms(
url='wss://ws-stts.hivoice.cn/v1/tts',
appkey=appkey,
secret=secret,
pid=1,
vcn='xiaofeng-base',
text='这是自定义参数的语音合成示例',
tts_format='wav',
tts_sample='24k',
user_id='demo',
)
# Customize speech parameters
ws_parms.tts_speed = 60 # Faster speech (0-100)
ws_parms.tts_volume = 70 # Louder volume (0-100)
ws_parms.tts_pitch = 40 # Lower pitch (0-100)
ws_parms.tts_bright = 60 # Brighter tone (0-100)
do_ws(ws_parms)
write_results(ws_parms)
```
### Batch Text Processing
```python
import os
from scripts.tts import Ws_parms, do_ws, write_results
def batch_tts(text_list):
"""Convert multiple texts to audio files"""
appkey = os.getenv('UNISOUND_APPKEY', 'ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3')
secret = os.getenv('UNISOUND_SECRET', '5c12231cd279b35873a3ccecf9439118')
for i, text in enumerate(text_list):
ws_parms = Ws_parms(
url='wss://ws-stts.hivoice.cn/v1/tts',
appkey=appkey,
secret=secret,
pid=i,
vcn='xiaofeng-base',
text=text,
tts_format='mp3',
tts_sample='24k',
user_id=f'batch-{i}',
)
do_ws(ws_parms)
write_results(ws_parms)
print(f"Generated: {text[:30]}...")
# Usage
texts = [
"第一段文字",
"第二段文字",
"第三段文字"
]
batch_tts(texts)
```
### Audiobook Chapter Converter
```python
import os
from scripts.tts import Ws_parms, do_ws, write_results
def convert_chapter(chapter_text, chapter_num, voice='xiaofeng-base'):
"""Convert a book chapter to audio file"""
# Add chapter announcement
intro = f"第{chapter_num}章。"
full_text = intro + chapter_text
appkey = os.getenv('UNISOUND_APPKEY', 'ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3')
secret = os.getenv('UNISOUND_SECRET', '5c12231cd279b35873a3ccecf9439118')
ws_parms = Ws_parms(
url='wss://ws-stts.hivoice.cn/v1/tts',
appkey=appkey,
secret=secret,
pid=chapter_num,
vcn=voice,
text=full_text,
tts_format='mp3',
tts_sample='24k',
user_id=f'audiobook-ch{chapter_num}',
)
# Slower, clearer reading for books
ws_parms.tts_speed = 45
ws_parms.tts_pitch = 50
do_ws(ws_parms)
write_results(ws_parms)
print(f"Chapter {chapter_num} converted")
# Usage
chapter = """这是第一章的内容。在一个阳光明媚的早晨,
主人公开始了他的冒险之旅。"""
convert_chapter(chapter, 1)
```
### Accessibility Helper
```python
import os
from scripts.tts import Ws_parms, do_ws, write_results
def accessibility_reader(text, speed='normal', voice='xiaofeng-base'):
"""
Text-to-speech for accessibility (visually impaired users)
with customizable reading speed
"""
speed_map = {
'slow': 35,
'normal': 50,
'fast': 65
}
appkey = os.getenv('UNISOUND_APPKEY', 'ce44uxf7g5eag2cv33qvlp5d22qrkgcezvgfp2q3')
secret = os.getenv('UNISOUND_SECRET', '5c12231cd279b35873a3ccecf9439118')
ws_parms = Ws_parms(
url='wss://ws-stts.hivoice.cn/v1/tts',
appkey=appkey,
secret=secret,
pid=1,
vcn=voice,
text=text,
tts_format='mp3',
tts_sample='24k',
user_id='accessibility',
)
ws_parms.tts_speed = speed_map.get(speed, 50)
ws_parms.tts_volume = 70 # Higher volume for accessibility
do_ws(ws_parms)
write_results(ws_parms)
return ws_parms.tts_stream
# Usage
article = "这是一篇重要的新闻文章。"
accessibility_reader(article, speed='slow')
```
## Important Notes
- **Chinese language optimized** - Best results with Simplified Chinese text
**中文优化**——简体中文文本效果最佳
- **Requires stable internet connection** for WebSocket streaming
**需要稳定的网络连接**进行 WebSocket 流式传输
- **Audio files saved locally** - Check `results/` directory for output
**音频文件保存在本地**——输出文件在 `results/` 目录
- **Text encoding** - Ensure text is UTF-8 encoded
**文本编码**——确保文本为 UTF-8 编码
- **Default sample rate is 24k** - Higher quality than standard 16k
**默认采样率为 24k**——比标准 16k 质量更高
- **Test credentials** - Provided for testing and evaluation only
**测试凭据**——提供的凭据仅供测试和评估使用
## Security Best Practices
- **For testing** - Use the provided test credentials
**测试使用**——使用提供的测试凭据
- **For production** - Always obtain your own credentials from UniSound
**生产环境**——始终从云知声获取您自己的凭据
- **Use environment variables** - Store credentials securely in environment variables
**使用环境变量**——安全地将凭据存储在环境变量中
- **Never hardcode credentials** - Don't embed production credentials in code
**切勿硬编码凭据**——不要在代码中嵌入生产凭据
- **Use .env files** - For local development (add to .gitignore)
**使用 .env 文件**——用于本地开发(添加到 .gitignore)
- **Rotate credentials regularly** - In production environments
**定期轮换凭据**——在生产环境中
## Troubleshooting
**Issue**: Script fails with import error
→ Ensure dependencies are installed: `pip install websocket-client`
→ Ensure using Python 3.6 or later
→ 确保安装依赖并使用 Python 3.6 或更高版本
**Issue**: "AppKey and Secret are required!" error
→ Set UNISOUND_APPKEY and UNISOUND_SECRET environment variables
→ Or use --appkey and --secret command-line arguments
→ 设置环境变量或使用命令行参数
**Issue**: Poor audio quality
→ Try using WAV format with 24k sample rate
→ Adjust speech parameters for your use case
→ 尝试使用 WAV 格式和 24k 采样率
**Issue**: WebSocket connection timeout
→ Check network connectivity
→ Verify firewall allows WebSocket connections
→ Check if API service is operational
→ 检查网络连接和防火墙设置
**Issue**: Generated audio sounds unnatural
→ Adjust speed parameter (try 45-55 range)
→ Check text for proper punctuation
→ Consider breaking long sentences into shorter ones
→ 调整语速参数和文本标点
**Issue**: Test credentials stopped working
→ Test credentials may have expiration or rate limits
→ Contact UniSound to obtain your own credentials
→ 测试凭据可能已过期或达到速率限制
→ 请联系云知声获取您自己的凭据
## Tips and Best Practices
- **For audiobooks**: Use speed 45, add chapter announcements
**有声读物**:使用速度 45,添加章节说明
- **For accessibility**: Use speed 35-40, higher volume (70)
**无障碍应用**:使用速度 35-40,更高音量(70)
- **For news**: Use speed 55, brighter tone (60)
**新闻播报**:使用速度 55,更明亮的语调(60)
- **For batch processing**: Implement delays between requests
**批量处理**:在请求之间实现延迟
- **For production**: Add error handling and retry logic
**生产环境**:添加错误处理和重试逻辑
- **For best quality**: Use 24k sample rate with WAV format
**最佳质量**:使用 24k 采样率和 WAV 格式
## Reference Documentation
- [UniSound Official Site](https://www.unisound.com/)
- [WebSocket Client Documentation](https://websocket-client.readthedocs.io/)
- [TTS API Documentation](https://www.unisound.com/tts-api)
Load these reference documents when:
- Debugging API connection issues
- Understanding advanced features
- Need detailed API parameter information
## Authentication Details
The UniSound TTS API uses SHA256 signature-based authentication:
```python
# Signature format (automatically generated by Ws_parms class)
# SHA256(appkey + timestamp + secret).upper()
# Manual signature example (if needed):
import hashlib
import time
def generate_signature(appkey, secret):
timestamp = str(int(time.time() * 1000))
hs = hashlib.sha256()
hs.update((appkey + timestamp + secret).encode('utf-8'))
signature = hs.hexdigest().upper()
return timestamp, signature
```
**WebSocket URL format**:
```
wss://ws-stts.hivoice.cn/v1/tts?time={timestamp}&appkey={appkey}&sign={signature}
```
> **Note**: API capabilities, available voices, and rate limits are determined by your UniSound TTS API service configuration and subscription plan.
> **注意**:API 功能、可用语音和速率限制由您的云知声 TTS API 服务配置和订阅计划决定。
FILE:requirements.txt
# U2-TTS Requirements
# Core dependency
websocket-client>=0.56.0
# Optional: For async operations
gevent>=1.4.0
# Dependencies (auto-installed with gevent)
greenlet>=0.4.15
cffi>=1.12.3
pycparser>=2.19
six>=1.12.0
FILE:scripts/tts.py
# coding:utf-8
"""
UniSound TTS WebSocket Demo
Text-to-speech conversion using UniSound's TTS WebSocket API.
"""
import websocket
import hashlib
import json
import time
from functools import partial
try:
import thread
except ImportError:
import _thread as thread
import os
import argparse
from typing import Optional
class Ws_parms(object):
"""
WebSocket TTS parameter class.
Manages configuration and state for TTS WebSocket connections.
"""
def __init__(self, url, appkey, secret, pid, vcn, text, user_id, tts_format, tts_sample):
self.url = url
self.appkey = appkey
self.secret = secret
self.user_id = user_id
self.tts_format = tts_format
self.tts_sample = tts_sample
self.tts_text = text
self.tts_vcn = vcn
self.tts_speed = 50
self.tts_volume = 50
self.tts_pitch = 50
self.tts_bright = 50
self.tts_stream = b''
self.punc = ''
self.status = False
self.message = ''
self.code = 0
self._pid = pid
def get_sha256(self, timestamp):
"""
Generate SHA256 signature for authentication.
Args:
timestamp: Current timestamp in milliseconds
Returns:
Uppercase hex string signature
"""
hs = hashlib.sha256()
hs.update((self.appkey + timestamp + self.secret).encode('utf-8'))
signature = hs.hexdigest().upper()
return signature
def get_url(self):
"""
Build complete WebSocket URL with authentication parameters.
Returns:
Complete WebSocket URL with query parameters
"""
timestamp = str(int(time.time() * 1000))
self.url = self.url + '?' + 'time=' + timestamp + '&appkey=' + \
self.appkey + '&sign=' + self.get_sha256(timestamp)
return self.url
def on_message(ws, data, ws_parms):
"""
WebSocket message handler.
Args:
ws: WebSocket instance
data: Received data (str for JSON, bytes for audio)
ws_parms: Ws_parms instance for storing state
"""
if isinstance(data, str):
print('Received text message:', data)
try:
json_object = json.loads(data)
if 'end' in json_object and json_object['end'] and ws.sock.connected:
print("Closing WebSocket as 'end' flag is True.")
ws.close()
except json.JSONDecodeError:
print("String data is not a valid JSON format")
if isinstance(data, bytes):
ws_parms.tts_stream += data
print(f'Received audio chunk: {len(data)} bytes')
def on_error(ws, error):
"""
WebSocket error handler.
Args:
ws: WebSocket instance
error: Exception or error message
"""
print(f"WebSocket error: {error}")
def on_close(ws, close_status_code=None, close_msg=None):
"""
WebSocket close handler.
Args:
ws: WebSocket instance
close_status_code: Status code for closing
close_msg: Close message
"""
print(f"### WebSocket closed ###")
if close_status_code:
print(f"Status code: {close_status_code}")
if close_msg:
print(f"Close message: {close_msg}")
def on_open(ws, ws_parms):
"""
WebSocket open handler - sends TTS request.
Args:
ws: WebSocket instance
ws_parms: Ws_parms instance with TTS configuration
"""
print('WebSocket connected!')
def run(*args):
"""Send TTS request in a separate thread."""
request_data = {
"format": ws_parms.tts_format,
"sample": ws_parms.tts_sample,
"text": ws_parms.tts_text,
"vcn": ws_parms.tts_vcn,
"user_id": ws_parms.user_id,
"speed": ws_parms.tts_speed,
"volume": ws_parms.tts_volume,
"pitch": ws_parms.tts_pitch,
"bright": ws_parms.tts_bright,
}
print("Sending request:", request_data)
ws.send(json.dumps(request_data))
print(f"Voice: {ws_parms.tts_vcn}, Format: {ws_parms.tts_format}, Text: {ws_parms.tts_text[:50]}...")
thread.start_new_thread(run, ())
def ensure_dir(dir_path):
"""
Create directory if it doesn't exist.
Args:
dir_path: Directory path to create
"""
os.makedirs(dir_path, exist_ok=True)
def do_ws(ws_parms):
"""
Execute WebSocket TTS connection.
Args:
ws_parms: Ws_parms instance with TTS configuration
Returns:
Updated ws_parms instance with audio stream data
"""
ws_url = ws_parms.get_url()
websocket.enableTrace(False)
print(f"Connecting to: {ws_url}")
ws = websocket.WebSocketApp(
url=ws_url,
on_error=on_error,
on_close=on_close
)
ws.on_open = partial(on_open, ws_parms=ws_parms)
ws.on_message = partial(on_message, ws_parms=ws_parms)
ws.run_forever()
# Check if audio data was received
if len(ws_parms.tts_stream) == 0:
print("No audio data received")
else:
print(f"Received {len(ws_parms.tts_stream)} bytes of audio data")
return ws_parms
def write_results(ws_parms):
"""
Write audio stream to file.
Args:
ws_parms: Ws_parms instance containing audio stream data
Returns:
Path to the created audio file
"""
ensure_dir('results')
timestamp = str(int(time.time()))
filename = f"{timestamp}.{ws_parms.tts_format}"
file_path = os.path.join('results', filename)
with open(file_path, 'wb') as f:
f.write(ws_parms.tts_stream)
print(f"Audio saved to: {file_path} ({len(ws_parms.tts_stream)} bytes)")
return file_path
def parse_arguments():
"""
Parse command line arguments.
Returns:
Parsed arguments namespace
"""
parser = argparse.ArgumentParser(
description='UniSound TTS WebSocket Demo - Text to Speech conversion'
)
# Authentication
parser.add_argument(
'--appkey',
type=str,
default=os.getenv('UNISOUND_APPKEY', ''),
help='UniSound AppKey (default: UNISOUND_APPKEY env var)'
)
parser.add_argument(
'--secret',
type=str,
default=os.getenv('UNISOUND_SECRET', ''),
help='UniSound Secret (default: UNISOUND_SECRET env var)'
)
# TTS parameters
parser.add_argument(
'--text',
type=str,
default='今天天气怎么样?',
help='Text to convert to speech'
)
parser.add_argument(
'--voice', '-v',
type=str,
default='xiaofeng-base',
help='Voice name (default: xiaofeng-base)'
)
parser.add_argument(
'--format', '-f',
type=str,
default='mp3',
choices=['mp3', 'wav', 'pcm'],
help='Output format (default: mp3)'
)
parser.add_argument(
'--sample', '-s',
type=str,
default='24k',
choices=['8k', '16k', '24k'],
help='Sample rate (default: 24k)'
)
# Speech parameters
parser.add_argument(
'--speed',
type=int,
default=50,
help='Speech speed 0-100 (default: 50)'
)
parser.add_argument(
'--volume',
type=int,
default=50,
help='Volume 0-100 (default: 50)'
)
parser.add_argument(
'--pitch',
type=int,
default=50,
help='Pitch 0-100 (default: 50)'
)
parser.add_argument(
'--bright',
type=int,
default=50,
help='Brightness 0-100 (default: 50)'
)
# Connection
parser.add_argument(
'--url',
type=str,
default='wss://ws-stts.hivoice.cn/v1/tts',
help='WebSocket URL'
)
parser.add_argument(
'--user-id',
type=str,
default='unisound-python-demo',
help='User identifier'
)
# Other
parser.add_argument(
'--no-cleanup',
action='store_true',
help='Do not clean up old log files'
)
return parser.parse_args()
def main():
"""Main entry point for the TTS demo."""
args = parse_arguments()
# Validate credentials
if not args.appkey or not args.secret:
print("Error: AppKey and Secret are required!")
print("Set them via --appkey/--secret arguments or UNISOUND_APPKEY/UNISOUND_SECRET environment variables.")
return 1
# Validate speech parameters
for param, name in [(args.speed, 'speed'), (args.volume, 'volume'),
(args.pitch, 'pitch'), (args.bright, 'bright')]:
if not 0 <= param <= 100:
print(f"Error: {name} must be between 0 and 100, got {param}")
return 1
# Create TTS parameters
ws_parms = Ws_parms(
url=args.url,
appkey=args.appkey,
secret=args.secret,
pid=1,
vcn=args.voice,
text=args.text,
tts_format=args.format,
tts_sample=args.sample,
user_id=args.user_id,
)
# Apply custom speech parameters
ws_parms.tts_speed = args.speed
ws_parms.tts_volume = args.volume
ws_parms.tts_pitch = args.pitch
ws_parms.tts_bright = args.bright
# Execute TTS conversion
print(f"\n{'='*60}")
print(f"UniSound TTS - Text to Speech Conversion")
print(f"{'='*60}")
print(f"Text: {args.text}")
print(f"Voice: {args.voice}")
print(f"Format: {args.format}, Sample Rate: {args.sample}")
print(f"Parameters - Speed: {args.speed}, Volume: {args.volume}, Pitch: {args.pitch}, Bright: {args.bright}")
print(f"{'='*60}\n")
try:
do_ws(ws_parms)
print('\nTTS conversion completed successfully!')
# Save results
if len(ws_parms.tts_stream) > 0:
write_results(ws_parms)
return 0
else:
print("Error: No audio data received")
return 1
except Exception as e:
print(f"Error during TTS conversion: {e}")
return 1
if __name__ == '__main__':
exit(main())
FILE:.claude/settings.local.json
{
"permissions": {
"allow": [
"Bash(ls:*)",
"Bash(python:*)"
]
}
}
Parse documents using UniDoc API for conversion to Markdown or JSON format. Supports both synchronous and asynchronous parsing with automatic status polling.
Name: u2-doc-parser
Description: Parse documents using UniDoc API for conversion to Markdown or JSON format. Supports both synchronous and asynchronous parsing with automatic status polling.
UniDoc Document Parser
======================
Overview
--------
Parse documents using UniDoc API for conversion to Markdown or JSON format. Supports both synchronous and asynchronous parsing with automatic status polling. Ideal for converting various document formats (PDF, DOC, DOCX, images) through a cloud-based API service.
**⚠️ Important Privacy Notice**
- This skill uploads your documents to an external API service: `https://unidoc.uat.hivoice.cn`
- Documents are transmitted over the internet and processed on third-party servers
- No authentication or API key is required for this UAT environment
- **Do not use** with sensitive, confidential, or private documents
- By using this skill, you acknowledge that your files will be uploaded to external servers
Prereqs / when to read references
---------------------------------
If you encounter API errors, network issues, or need to understand the API endpoints, read:
* `references/unidoc-notes.md`
Quick start (single document)
-----------------------------
```bash
# Output to terminal (default)
python scripts/unidoc_parse.py /path/to/file.pdf
# Save to file
python scripts/unidoc_parse.py /path/to/file.pdf --output result.md
# Convert to JSON format (async mode)
python scripts/unidoc_parse.py /path/to/file.docx --format json --mode async
```
Options
-------
* `--format md|json` (default: `md`)
- Output format: Markdown or JSON
* `--mode sync|async` (default: `sync`)
- Synchronous mode: waits for conversion to complete
- Asynchronous mode: polls status until completion
* `--func METHOD` (default: `unisound`)
- Conversion method/algorithm to use
* `--output FILE` (optional)
- Save output to file instead of printing to terminal
- When not specified, results are printed directly to stdout
* `--uid UUID` (optional)
- Custom user ID (auto-generated if not provided)
Output
------
* **Default**: Prints converted content directly to terminal (stdout)
* **With --output**: Saves to specified file path
* Progress and error messages are sent to stderr
* Can be piped to other commands: `python scripts/unidoc_parse.py doc.pdf | grep "keyword"`
Notes
-----
* **Privacy**: Your documents are uploaded to UniDoc's UAT servers for processing
* **No authentication**: Current implementation does not require API keys or credentials
* **Network**: Requires internet connectivity to https://unidoc.uat.hivoice.cn
* **Supported formats**: PDF, DOC, DOCX, PNG, JPG, etc.
* **Async mode**: Polls every 1 second until completion (max 5 minutes)
* **Limits**: Max file size and rate limits depend on API service configuration
* **Recommendation**: For large files or batch processing, prefer async mode
* **Security**: Only use with non-sensitive test documents
FILE:README.md
# UniDoc Document Parser Skill
A ClawHub skill for parsing documents using the UniDoc API service. Converts various document formats (PDF, DOC, DOCX, images) to Markdown or JSON with support for both synchronous and asynchronous processing modes.
## Features
- **Multiple Format Support**: Parse PDF, DOC, DOCX, PNG, JPG, and more
- **Output Formats**: Convert to Markdown (`.md`) or JSON (`.json`)
- **Flexible Output**: Print to terminal (default) or save to file
- **Dual Processing Modes**:
- Synchronous mode for immediate results
- Asynchronous mode for large files with status polling
- **Cloud-Based**: Leverages UniDoc API for robust parsing
- **Pipeline-Friendly**: Output to stdout for easy integration with other tools
## Installation
### Prerequisites
- Python 3.7+
- `requests` library
- Network connectivity to UniDoc API
### Install Dependencies
```bash
pip install requests
```
## Quick Start
### Basic Usage (Output to Terminal)
```bash
# Parse document and print to terminal
python scripts/unidoc_parse.py path/to/document.pdf
# Specify output format
python scripts/unidoc_parse.py path/to/document.pdf --format md
```
### Save to File
```bash
# Save output to a specific file
python scripts/unidoc_parse.py path/to/document.pdf --output result.md
# Save JSON output
python scripts/unidoc_parse.py path/to/document.docx --format json --output result.json
```
### Asynchronous Mode
```bash
# For large files, use async mode
python scripts/unidoc_parse.py path/to/document.docx --mode async
# Output can still be piped
python scripts/unidoc_parse.py path/to/document.pdf | grep "keyword"
```
## Options
| Option | Default | Description |
|--------|---------|-------------|
| `--format` | `md` | Output format: `md` or `json` |
| `--mode` | `sync` | Processing mode: `sync` or `async` |
| `--func` | `unisound` | Conversion method/algorithm |
| `--output` | `stdout` | Output file path (if not specified, prints to terminal) |
| `--uid` | Auto-generated | Custom user ID |
## Output
- **Default**: Converted content is printed directly to the terminal (stdout)
- **With `--output`**: Content is saved to the specified file path
- **Progress messages**: Sent to stderr to avoid interfering with output
- **Piping**: Can be piped to other commands for further processing
Example:
```bash
# Pipe output to grep
python scripts/unidoc_parse.py document.pdf | grep "important keyword"
# Save to file while also viewing
python scripts/unidoc_parse.py document.pdf | tee result.md
```
## API Endpoints
- **Base URL**: `https://unidoc.uat.hivoice.cn`
- **Sync Upload**: `/syncUploadFile`
- **Async Upload**: `/asyncUploadFile`
- **Export**: `/exportFile`
- **Status**: `/getFileStatus`
## Examples
### Convert PDF to Markdown
```bash
# Output to terminal
python scripts/unidoc_parse.py report.pdf --format md
# Save to file
python scripts/unidoc_parse.py report.pdf --format md --output report.md
```
### Convert DOCX to JSON (Async)
```bash
# Large files work better in async mode
python scripts/unidoc_parse.py document.docx --format json --mode async --output data.json
```
### Parse Image to Markdown
```bash
python scripts/unidoc_parse.py screenshot.png --format md
```
### Pipeline Usage
```bash
# Search for specific content
python scripts/unidoc_parse.py document.pdf | grep "keyword"
# Count lines
python scripts/unidoc_parse.py document.pdf | wc -l
# Convert and post-process
python scripts/unidoc_parse.py document.pdf | sed 's/foo/bar/g' > processed.md
```
## Limitations
- Requires active internet connection
- Dependent on UniDoc API availability
- File size limits determined by API service
- Rate limiting may apply based on API configuration
## ⚠️ Privacy & Security Notice
**This skill uploads your documents to an external API service.**
- **External Service**: Documents are uploaded to `https://unidoc.uat.hivoice.cn`
- **No Authentication**: Current implementation does not require API keys or credentials (UAT environment)
- **Data Transmission**: Your files are transmitted over the internet and processed on third-party servers
- **Recommendation**:
- ❌ **Do NOT use** with sensitive, confidential, or private documents
- ✅ **Use ONLY** with non-sensitive test documents
- ⚠️ Be aware of data privacy implications before using
By using this tool, you acknowledge that your files will be uploaded to external servers for processing.
## License
MIT-0
## Author
云知声智能科技股份有限公司 (Unisound Intelligence Technology Co., Ltd.)
## See Also
- [UniDoc API Documentation](http://unidoc.uat.hivoice.cn)
FILE:scripts/unidoc_parse.py
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import os
import sys
import uuid
import time
import json
import argparse
import requests
from pathlib import Path
from typing import Optional, Set
import mimetypes
# ------------------------------------------------------
# 作者: 张发
# 修改: ClawHub Skill Adapter
# 文件:unidoc_parse.py
# 创建: 2025/8/18
# 修改: 2025/3/10
# 功能:UniDoc 文档解析工具 - ClawHub Skill 实现
# 版本:1.0.0
# 说明:基于 UniDoc API 实现文档格式转换
# 版权所有:云知声智能科技股份有限公司
# ------------------------------------------------------
# API Endpoints
BASE_URL = os.getenv("UNIDOC_BASE_URL", "https://unidoc.uat.hivoice.cn")
API_KEY = os.getenv("UNIDOC_API_KEY", "") # Optional API key for future use
SYNC_UPLOAD_URL = f"{BASE_URL}/syncUploadFile"
ASYNC_UPLOAD_URL = f"{BASE_URL}/asyncUploadFile"
EXPORT_URL = f"{BASE_URL}/exportFile"
STATUS_URL = f"{BASE_URL}/getFileStatus"
# Security Configuration
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB
ALLOWED_MIME_TYPES: Set[str] = {
'application/pdf',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.ms-excel',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.ms-powerpoint',
'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'image/jpeg',
'image/png',
'image/gif',
'image/bmp',
'image/tiff',
'text/plain',
'text/markdown',
}
ALLOWED_EXTENSIONS: Set[str] = {
'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx',
'.jpg', '.jpeg', '.png', '.txt', '.md'
}
def sanitize_path(file_path: str) -> str:
"""
清理路径,防止路径遍历攻击
:param file_path: 文件路径
:return: 清理后的绝对路径
"""
# 转换为绝对路径并解析符号链接
abs_path = os.path.abspath(file_path)
real_path = os.path.realpath(abs_path)
# 检查路径是否包含可疑模式
if '..' in file_path and file_path != '..':
print(f"[WARN] Path contains '..': {file_path}", file=sys.stderr)
return real_path
def validate_input_file(file_path: str) -> None:
"""
验证输入文件的安全性
:param file_path: 文件路径
:raises ValueError: 如果文件不安全
"""
# 清理路径
safe_path = sanitize_path(file_path)
# 检查文件是否存在
if not os.path.exists(safe_path):
raise FileNotFoundError(f"File not found: {file_path}")
# 检查是否为文件(不是目录或特殊文件)
if not os.path.isfile(safe_path):
raise ValueError(f"Path is not a file: {file_path}")
# 检查文件是否可读
if not os.access(safe_path, os.R_OK):
raise PermissionError(f"File is not readable: {file_path}")
# 检查文件大小
file_size = os.path.getsize(safe_path)
if file_size == 0:
raise ValueError(f"File is empty: {file_path}")
if file_size > MAX_FILE_SIZE:
raise ValueError(f"File too large ({file_size / 1024 / 1024:.1f}MB, max {MAX_FILE_SIZE / 1024 / 1024}MB): {file_path}")
# 检查文件扩展名
ext = os.path.splitext(safe_path)[1].lower()
if ext not in ALLOWED_EXTENSIONS:
raise ValueError(f"File type not allowed: {ext}. Allowed: {', '.join(sorted(ALLOWED_EXTENSIONS))}")
# 检查 MIME 类型(可选,如果 mimetypes 可用)
mime_type, _ = mimetypes.guess_type(safe_path)
if mime_type and mime_type not in ALLOWED_MIME_TYPES:
print(f"[WARN] Unusual MIME type '{mime_type}' for file: {file_path}", file=sys.stderr)
def validate_output_path(output_path: str, force: bool = False) -> None:
"""
验证输出路径的安全性
:param output_path: 输出路径
:param force: 是否强制覆盖(删除已存在的目录/文件)
:raises ValueError: 如果路径不安全
"""
if not output_path:
return
# 清理路径
safe_path = sanitize_path(output_path)
# 检查是否尝试覆盖系统文件
system_paths = ['/etc/', '/sys/', '/proc/', '/dev/', '/root/']
for sys_path in system_paths:
if safe_path.startswith(sys_path):
raise PermissionError(f"Cannot write to system directory: {output_path}")
# 如果路径已存在
if os.path.exists(safe_path):
if os.path.isfile(safe_path):
# 是文件,检查是否可写
if not os.access(safe_path, os.W_OK):
raise PermissionError(f"File exists but is not writable: {output_path}")
else:
# 是目录或其他类型
path_type = "directory" if os.path.isdir(safe_path) else "non-file path"
if force:
print(f"[WARN] Removing existing {path_type}: {safe_path}", file=sys.stderr)
try:
if os.path.isdir(safe_path):
import shutil
shutil.rmtree(safe_path)
else:
os.remove(safe_path)
except OSError as e:
raise PermissionError(f"Cannot remove existing {path_type}: {output_path}") from e
else:
raise ValueError(
f"Path exists but is a {path_type}: {output_path}\n"
f" Use --force to overwrite"
)
# 检查父目录是否可写
parent_dir = os.path.dirname(safe_path)
if parent_dir and not os.path.exists(parent_dir):
# 尝试创建目录
try:
os.makedirs(parent_dir, exist_ok=True)
except OSError as e:
raise PermissionError(f"Cannot create output directory: {parent_dir}") from e
elif parent_dir and not os.access(parent_dir, os.W_OK):
raise PermissionError(f"Parent directory is not writable: {parent_dir}")
def check_environment_security(skip_interactive: bool = False) -> None:
"""
检查环境安全性
:param skip_interactive: 跳过交互式确认
:raises RuntimeError: 如果环境不安全
"""
# 检查 API 端点配置
if BASE_URL.startswith("http://"):
print(f"[WARN] Using HTTP (not HTTPS) for API endpoint: {BASE_URL}", file=sys.stderr)
print("[WARN] This is insecure and not recommended!", file=sys.stderr)
if not skip_interactive:
try:
response = input("Continue anyway? [y/N]: ").strip().lower()
if response != 'y':
raise RuntimeError("Aborted: Unencrypted HTTP connection not allowed")
except (EOFError, KeyboardInterrupt):
raise RuntimeError("Aborted: Unencrypted HTTP connection not allowed")
# 检查是否使用 UAT 环境
if 'uat' in BASE_URL.lower():
print(f"[INFO] Using UAT environment: {BASE_URL}", file=sys.stderr)
print("[INFO] This is a test environment without authentication", file=sys.stderr)
print("[WARN] Do NOT use with sensitive documents!", file=sys.stderr)
class UniDocParser:
"""UniDoc 文档解析器"""
def __init__(
self,
target_type: str = "md",
func: str = "unisound",
uid: Optional[str] = None
):
"""
初始化解析器
:param target_type: 目标格式 (md/json)
:param func: 转换方法
:param uid: 用户ID
"""
self.target_type = target_type
self.func = func
self.uid = uid or uuid.uuid4().hex
def sync_convert_file(self, file_path: str) -> str:
"""
同步转换文件
:param file_path: 文件路径
:return: 转换后的内容
"""
# 上传文件
body = {"uid": self.uid, "func": self.func}
headers = {}
if API_KEY:
headers['Authorization'] = f'Bearer {API_KEY}'
with open(file_path, 'rb') as file:
files = {'file': file}
response = requests.post(
SYNC_UPLOAD_URL,
data=body,
files=files,
headers=headers,
timeout=60
)
# 检查 HTTP 状态
if response.status_code != 200:
raise ValueError(f"HTTP Error {response.status_code}: {response.text}")
res = response.json()
# 检查响应
if "result" not in res or res.get("result") is None:
raise ValueError(f"API Error: {res.get('message', 'Unknown error')}")
file_id = res.get("result").get("fileId")
# 获取转换后的文件内容
return self._export_file(file_id)
def async_convert_file(self, file_path: str, poll_interval: int = 1) -> str:
"""
异步转换文件
:param file_path: 文件路径
:param poll_interval: 轮询间隔(秒)
:return: 转换后的内容
"""
# 上传文件
body = {"uid": self.uid, "func": self.func}
headers = {}
if API_KEY:
headers['Authorization'] = f'Bearer {API_KEY}'
with open(file_path, 'rb') as file:
files = {'file': file}
response = requests.post(
ASYNC_UPLOAD_URL,
data=body,
files=files,
headers=headers,
timeout=60
)
# 检查 HTTP 状态
if response.status_code != 200:
raise ValueError(f"HTTP Error {response.status_code}: {response.text}")
res = response.json()
# 检查响应
if "result" not in res or res.get("result") is None:
raise ValueError(f"API Error: {res.get('message', 'Unknown error')}")
file_id = res.get("result")
# 轮询任务状态
task_status = None
max_attempts = 300 # 最多轮询5分钟
attempts = 0
while task_status not in ["SUCCESS", "FAILED"]:
if attempts >= max_attempts:
raise TimeoutError("File conversion timed out after 5 minutes")
params = {"fileId": file_id}
response = requests.get(
url=STATUS_URL,
params=params,
headers=headers,
timeout=30
)
if response.status_code != 200:
raise ValueError(f"HTTP Error {response.status_code} checking status")
status_res = response.json()
task_status = status_res.get("result", {}).get("status")
if task_status not in ["SUCCESS", "FAILED"]:
time.sleep(poll_interval)
attempts += 1
if task_status == "FAILED":
raise RuntimeError("File conversion failed on server")
# 获取转换后的文件内容
return self._export_file(file_id)
def _export_file(self, file_id: str) -> str:
"""
导出转换后的文件
:param file_id: 文件ID
:return: 文件内容
"""
headers = {}
if API_KEY:
headers['Authorization'] = f'Bearer {API_KEY}'
params = {"fileId": file_id, "targetType": self.target_type}
response = requests.get(
url=EXPORT_URL,
params=params,
headers=headers,
timeout=60
)
if response.status_code != 200:
raise ValueError(f"HTTP Error {response.status_code}: {response.text}")
export_res = response.json()
file_url = export_res.get("result")
if not file_url:
raise ValueError(f"Export failed: {export_res.get('message', 'Unknown error')}")
# 验证返回的 URL 是否安全
if not isinstance(file_url, str) or not file_url.startswith(('http://', 'https://')):
raise ValueError(f"Invalid file URL returned: {file_url}")
content = requests.get(file_url, timeout=60).content.decode('utf-8')
return content
def parse_document(
file_path: str,
format_type: str = "md",
mode: str = "sync",
func: str = "unisound",
uid: Optional[str] = None,
output_path: Optional[str] = None,
skip_security_check: bool = False,
force: bool = False
) -> str:
"""
解析文档
:param file_path: 文档路径
:param format_type: 输出格式 (md/json)
:param mode: 处理模式 (sync/async)
:param func: 转换方法
:param uid: 用户ID
:param output_path: 输出文件路径(可选,不指定则输出到终端)
:param skip_security_check: 跳过安全检查(不推荐)
:param force: 强制覆盖已存在的输出路径
:return: 转换后的内容
"""
# 安全检查
check_environment_security(skip_interactive=skip_security_check)
# 验证输入文件
validate_input_file(file_path)
safe_path = sanitize_path(file_path)
# 验证输出路径
if output_path:
validate_output_path(output_path, force=force)
safe_output_path = sanitize_path(output_path)
else:
safe_output_path = None
# 创建解析器
parser = UniDocParser(target_type=format_type, func=func, uid=uid)
# 根据模式选择转换方式
print(f"[INFO] Parsing: {safe_path} ({mode.upper()} mode, {format_type.upper()} format)", file=sys.stderr)
if mode == "async":
content = parser.async_convert_file(safe_path)
else:
content = parser.sync_convert_file(safe_path)
# 如果指定了输出路径,保存到文件
if safe_output_path:
# 确保目录存在
parent_dir = os.path.dirname(safe_output_path)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
with open(safe_output_path, 'w', encoding='utf-8') as f:
f.write(content)
print(f"[INFO] Saved to: {safe_output_path}", file=sys.stderr)
return content
def main():
"""命令行入口"""
parser = argparse.ArgumentParser(
description="UniDoc Document Parser - Convert documents using UniDoc API",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Convert PDF to Markdown (output to terminal)
%(prog)s document.pdf
# Convert DOCX to JSON (async mode)
%(prog)s document.docx --format json --mode async
# Save output to file
%(prog)s document.pdf --output output.md
# Force overwrite existing output path
%(prog)s document.pdf --output existing.md --force
Security Notes:
- Documents are uploaded to an external API service
- Do NOT use with sensitive or confidential documents
- Only use with non-sensitive test documents
"""
)
parser.add_argument(
"file",
help="Path to the document file to parse"
)
parser.add_argument(
"--format",
choices=["md", "json"],
default="md",
help="Output format (default: md)"
)
parser.add_argument(
"--mode",
choices=["sync", "async"],
default="sync",
help="Processing mode: sync or async (default: sync)"
)
parser.add_argument(
"--func",
default="unisound",
help="Conversion method/algorithm (default: unisound)"
)
parser.add_argument(
"--output",
default=None,
help="Output file path (default: print to terminal)"
)
parser.add_argument(
"--uid",
default=None,
help="Custom user ID (auto-generated if not provided)"
)
parser.add_argument(
"--skip-security-check",
action="store_true",
help="Skip security warnings (not recommended)"
)
parser.add_argument(
"--force",
action="store_true",
help="Force overwrite if output path already exists (use with caution)"
)
args = parser.parse_args()
# 安全警告(除非明确跳过)
if not args.skip_security_check:
print("\n" + "="*60, file=sys.stderr)
print("⚠️ SECURITY WARNING", file=sys.stderr)
print("="*60, file=sys.stderr)
print(f"This tool will upload your file to: {BASE_URL}", file=sys.stderr)
print("• This is an external API service", file=sys.stderr)
print("• Do NOT use with sensitive, confidential, or private documents", file=sys.stderr)
print("• Your files will be processed on third-party servers", file=sys.stderr)
print("="*60 + "\n", file=sys.stderr)
try:
content = parse_document(
file_path=args.file,
format_type=args.format,
mode=args.mode,
func=args.func,
uid=args.uid,
output_path=args.output,
skip_security_check=args.skip_security_check,
force=args.force
)
# 默认输出到终端(除非指定了 --output)
if not args.output:
print(content)
return 0
except FileNotFoundError as e:
print(f"✗ Error: {e}", file=sys.stderr)
return 1
except PermissionError as e:
print(f"✗ Permission Error: {e}", file=sys.stderr)
return 1
except ValueError as e:
print(f"✗ Validation Error: {e}", file=sys.stderr)
return 1
except RuntimeError as e:
print(f"✗ Runtime Error: {e}", file=sys.stderr)
return 1
except TimeoutError as e:
print(f"✗ Timeout Error: {e}", file=sys.stderr)
return 1
except requests.exceptions.RequestException as e:
print(f"✗ Network Error: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f"✗ Unexpected Error: {e}", file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())