@clawhub-wkbin-48bd7b15d8
ClawHub 技能包,用于 zaomeng 的本地规则型中文小说人物工作流。
---
name: zaomeng-skill
description: ClawHub 技能包,用于 zaomeng 的本地规则型中文小说人物工作流。
---
# zaomeng 技能(ClawHub)
## 先看这个
- `zaomeng` 是本地规则驱动的人物引擎,不是自由生成式陪聊。
- `zaomeng` 的核心价值是提供人物蒸馏、关系抽取、人格导航、持久记忆与 OOC 约束,不是替代宿主模型完成最终自然对话生成。
- 在 OpenClaw 这类宿主中,技能应优先把 `zaomeng` 作为“人格与约束层”使用,而把自然语言理解与最终措辞生成交给宿主模型。
- 使用这个技能的 agent 必须直接调用 CLI 入口,不要手动模拟角色链路。
- 正常用户任务不要先读 `INSTALL.md` 或 `MANIFEST.md`;那是打包说明,不是运行说明。
- 正常用户任务不要把环境排查过程逐条说给用户听。
## 引擎准备
- 这个 skill 包已经内嵌最小可运行子集,优先直接使用 skill 包内运行时,不要再自动克隆外部仓库。
- 运行时入口:
- `runtime/zaomeng_cli.py`
- 核心源码位于 `runtime/src/...`
- 默认数据目录位于 `runtime/data/...`
- 依赖要求:
- 必需:`PyYAML`
- 可选:`tiktoken`,仅用于更准确的 token 估算
- 可选:`ebooklib`,仅在读取 `.epub` 小说时需要
- 命令建议:
- Windows / PowerShell 优先使用 `py -3 runtime/zaomeng_cli.py ...`
- 其他环境可使用 `python runtime/zaomeng_cli.py ...`
- 如果只是普通 `.txt` 小说蒸馏/群聊,缺少 `tiktoken` 不会阻塞执行。
- 只有在 Python 本身不可用、`PyYAML` 缺失、或当前宿主禁止执行本地命令时,才向用户简短说明阻塞原因。
## 正常执行顺序
- 开始执行前,先确认可调用 skill 包内运行时。
- 用户给了小说文件并要求蒸馏时,先按真实工作流处理:`distill`,必要时再 `extract`。
- 当运行时或辅助 agent 需要参考 skill 内的 Markdown 约束时,除 `references/output_schema.md` 外,还应同时参考 `references/style_differ.md` 与 `references/logic_constraint.md`。
- 蒸馏完成后,才进入 `chat` / `observe` / `act`。
- 只有在内嵌运行时不可执行时,才简短说明“当前环境无法启动可运行的 zaomeng 引擎,无法执行真实工作流”。
- 除非用户明确要求“按 skill 模板手动生成一版”,否则不要退化成读取 prompt/schema 后手工模拟引擎输出。
- 不要向用户输出类似 `src.core.main 不存在`、`先读取安装说明`、`我来检查依赖情况` 这种调试式提示。
## Chat 调用规则
- 前提:优先直接调用 `runtime/zaomeng_cli.py`,不要绕过它去手动拼装内部模块。
- 默认规则:任何 agent 使用这个技能调用 `chat` 时,必须带 `--message`。
- Windows / PowerShell 首选用法:
- `py -3 runtime/zaomeng_cli.py chat --novel <路径或名称> --mode auto --message "<用户原话>"`
- `py -3 runtime/zaomeng_cli.py chat --novel <路径或名称> --mode observe --message "<提示语>"`
- `py -3 runtime/zaomeng_cli.py chat --novel <路径或名称> --mode act --character <角色名> --message "<用户台词>"`
- `py -3 runtime/zaomeng_cli.py chat --novel <路径或名称> --mode auto|observe|act [--character <角色名>] --session <id> --message "<提示语或台词>"`
## 自然语言意图映射
- `让我扮演X和Y聊天`、`我来扮演X,你让Y回我`、`我说一句,Y回一句`、`进入 act 模式`:按 `act` 启动意图处理。
- 这类启动语不能直接当成角色台词喂给引擎;先让 CLI 建立或恢复 `act` 会话。
- 后续用户真正进入对白时,再继续用 `--session <id> --message "<用户台词>"`。
- `act` 启动后,CLI 会把受控角色写进 session;同一会话续聊时,通常不必重复传 `--character`。
- `进入刘备、张飞、关羽群聊模式`:按 `observe` 启动意图处理。
- `请让大家围绕这件事各说一句`:按真实 `observe` 单轮执行。
## 禁止行为
- 不要在尝试单轮 `--message` 前就说环境没有 PTY、没有 stdin、或者不支持交互。
- 不要在 `--message` 能表达请求时改成自动脚本化 stdin。
- 不要读取 `chat_engine.py`、直接调用 `speaker.generate()`、或手动适配旧版 JSON 档案来替代 CLI。
- 不要因为看到 `prompts/*.md`、`references/output_schema.md`、`INSTALL.md`、`MANIFEST.md`,就把它们当成“可直接替代本地引擎”的执行方案。
- 不要只读取 `references/output_schema.md` 就直接生成结果;如果需要参考 skill 内的 Markdown 约束,必须同时纳入 `references/style_differ.md` 与 `references/logic_constraint.md`。
- 不要自动克隆外部 Git 仓库,也不要把运行时退回到“先去 GitHub 拿源码再执行”。
- 不要在尚未检查 skill 包内运行时、也没有尝试直接执行时,就直接说“引擎不存在”。
- 不要把模式切换请求改写成自由发挥的剧情演示。
## 面对用户的标准回复模板
- 总原则:面对用户时,只说“现在要做什么”和“接下来发生什么”,不要输出调试日志、依赖排查过程、源码路径判断过程。
### 1. 用户要求蒸馏人物
推荐说法:
```text
我先按 zaomeng 的流程处理这本小说,蒸馏出你指定的人物档案。蒸馏完成后,如果你要,我再继续进入群聊或扮演模式。
```
不要说:
```text
先读取安装说明。
src.core.main 不存在。
我来检查依赖。
```
### 2. 蒸馏完成后,用户要求进入 act / observe
推荐说法:
```text
人物档案已经可用,我现在按 zaomeng 的聊天流程进入对应模式。
```
如果是 act 启动语:
```text
我先为你建立 act 会话。接下来你说一句角色台词,我再让对方角色按设定回应。
```
如果是 observe 启动语:
```text
我先为你建立群聊会话。接下来你可以给场景、话题,或者让某个角色先开口。
```
### 3. 本机没有真实 zaomeng 引擎时
只允许简短说明:
```text
我先尝试启动 skill 内嵌的 zaomeng 运行时;如果当前环境不允许执行,或缺少必要依赖,我再告诉你这一点。
当前环境暂时无法启动可运行的 zaomeng 引擎,所以我现在不能执行真实的 zaomeng 工作流。
如果你愿意,我可以按 skill 的格式要求手动整理一版兼容结果,但这不等同于真实引擎输出。
```
不要说:
```text
src.core.main 模块不存在
让我检查依赖情况
先看一下安装说明
先读取 output_schema 和 distill_prompt
```
### 4. 用户已经给了自然语言意图
- 如果用户在描述玩法,就直接进入对应流程,不要把这句改写成剧情演示。
- 如果用户已经在以角色身份说话,就直接把它当成该角色台词处理。
推荐说法:
```text
我先按你的要求进入这个模式。
```
或:
```text
这句我会直接当成该角色的发言来处理。
```
## 其他命令
- 蒸馏:`py -3 runtime/zaomeng_cli.py distill --novel <路径> [--characters A,B] [--force]`
- 关系抽取:`py -3 runtime/zaomeng_cli.py extract --novel <路径> [--output <路径>] [--force]`
- 查看角色:`py -3 runtime/zaomeng_cli.py view --character <角色名> [--novel <路径或名称>]`
- 保存纠错:`py -3 runtime/zaomeng_cli.py correct --session <id> --message <原句> --corrected <修正句> [--character <角色名>]`
## 人格文件与记忆说明
- 当前主存储为 Markdown 人格包,不再以 JSON 为准。
- 人格文件默认位于 `runtime/data/characters/<novel_id>/<角色名>/`。
- 运行时会先读 `NAVIGATION.generated.md`,再叠加 `NAVIGATION.md`,然后按 `load_order` 加载人格文件。
- 用户长期修正和 `/correct` 的结果会写入对应角色的 `MEMORY.md`。
FILE:examples/sample_character_profile.md
# PROFILE
<!-- Canonical markdown profile storage. -->
## Meta
- name: 林黛玉
- novel_id: sample_novel
- source_path: data/sample_novel.txt
## Core
- core_traits: 敏感;聪慧;自尊
- values: 勇气=6;智慧=8;善良=7;忠诚=8;野心=3;正义=6;自由=7;责任=6
- speech_style: 言辞锋利但情绪克制,常带反讽,轻易不把软处全露出来。
- identity_anchor: 寄居贾府、才情敏锐而自尊极强的闺秀
- soul_goal: 守住自尊与真情,不肯把真心轻易交付给轻慢之人
- worldview: 真情可贵,却最怕人心轻薄;很多话须先试其真,再肯信
- thinking_style: 先感受对方态度,再以敏锐观察和自我防卫判断进退
## Deep Persona
- core_identity: 才情出众、情思纤细、寄人篱下而自我防御极强的核心人物
- faction_position: 贾府内的亲眷晚辈,情感与日常交往深受贾府秩序影响
- background_imprint: 自幼体弱多病,又失怙失恃,长期寄居环境塑造了敏感与自尊并存的性格底色
- world_rule_fit: 能适应诗礼闺阁秩序,却与人情世故、圆融应酬始终隔着一层
- social_mode: 对陌生人与轻慢者防备明显,对认定之人则会在试探后显露真心
- hidden_desire: 渴望被真正理解、被坚定偏爱,而不是被礼貌安置
- inner_conflict: 一面极重真情,一面又因自尊与不安而先行防御
- story_role: 以情感锋芒和精神敏感度推动人物关系张力的核心角色
- belief_anchor: 真情不可欺,自尊不可轻贱
- private_self: 独处时更柔软,也更容易被愁绪与不安牵动
- stance_stability: 在情感判断上并不轻易动摇,但会因受伤而暂时收紧态度
- reward_logic: 对真心者会记情回护,对轻慢与敷衍则格外记得
- strengths: 诗才敏捷;观察细密;情感辨识力强
- weaknesses: 多思易伤;防御心重;体弱多病
- cognitive_limits: 容易在情绪受伤时放大言外之意,对他人迟钝和回避不够宽容
- fear_triggers: 被冷落;被拿来比较;真心被敷衍;寄居身份被戳破
- key_bonds: 与贾宝玉之间的真情牵绊;与薛宝钗之间复杂而微妙的比较关系
- action_style: 先以言语试探与防御,确认真心后才稍稍放下锋芒
## Voice
- typical_lines: 你最会哄人;我原不该多心,只是听着未免叫人难受;你若真明白我,便不会只拿空话来宽我
- decision_rules: 感到被轻视->先反讽防御,再观察对方诚意;察觉真心未变->语气会稍松,却不肯立刻认输;触及自尊痛点->宁可收起真意,也不愿低头讨怜
- life_experience: 幼失双亲;长期寄居;在诗社与闺阁往来中反复体会亲疏冷暖
- taboo_topics: 被轻贱寄居身份;被当众比较高下;真心被说成矫情
- forbidden_behaviors: 不会毫无顾忌地向所有人示弱;不会把真情轻易说成儿戏
- cadence: 起句克制,中段容易带出暗刺,真正受伤时语尾反而更轻
- signature_phrases: 原是;何必;倒像是;你最会;我原不该
- sentence_openers: 我原不该;你倒;何必;原是
- connective_tokens: 只是;不过;倒;偏
- sentence_endings: 罢了;也就是了;倒也未必
- forbidden_fillers: 哈哈;好吧;确实;呢;呀
- anger_style: 生气时不一定高声,更多是冷下来、话里带刺
- joy_style: 真高兴时也多半收着,只在语气和诗意里透出轻快
- grievance_style: 受委屈时先自护,往往借反话和旁刺把痛意藏住
## Arc
- arc_start: 勇气=5;信任=4
- arc_mid: 勇气=6;trigger_event=争执与和解
- arc_end: 勇气=7;final_state=更愿意表达真实需求,但仍保留自尊防线
## Evidence
- description_count: 1
- dialogue_count: 2
- thought_count: 0
- chunk_count: 1
FILE:examples/sample_input_excerpt.txt
贾宝玉见林黛玉进门,轻声道:“你今天气色好些了。”
林黛玉冷笑:“你最会哄人。”
宝玉心想她又在逞强,便递过去一盏茶。
薛宝钗在旁劝道:“都少说两句。”
FILE:examples/sample_relations.md
# RELATION_GRAPH
## 林黛玉_贾宝玉
- trust: 7
- affection: 8
- power_gap: 1
- conflict_point: 表达方式与误解
- typical_interaction: 黛玉质问->宝玉安抚->短暂缓和
- appellation_to_target: 宝玉
- confidence: 8
## 林黛玉_薛宝钗
- trust: 6
- affection: 6
- power_gap: 0
- conflict_point: 价值观差异
- typical_interaction: 宝钗劝和->黛玉观望->情绪降温
- appellation_to_target: 宝姐姐
- confidence: 7
FILE:examples/test-prompts.json
{
"version": "3.1.0",
"cases": [
{
"id": "distill-basic-01",
"task": "distill",
"input": "请基于给定文本提取林黛玉和贾宝玉的人设档案。",
"expect": {
"required_keys": ["name", "core_traits", "values", "speech_style", "decision_rules", "novel_id", "source_path", "evidence"],
"validation": ["evidence", "consistency", "transfer"]
}
},
{
"id": "relation-basic-01",
"task": "extract_relations",
"input": "请输出林黛玉与贾宝玉的关系参数。",
"expect": {
"required_keys": ["trust", "affection", "power_gap", "conflict_point", "typical_interaction"],
"validation": ["evidence", "consistency", "transfer"]
}
},
{
"id": "chat-ooc-guard-01",
"task": "chat_constraints",
"input": "角色发言出现明显与人设冲突,请先重写一次再输出。",
"expect": {
"required_behavior": ["pre_response_check", "rewrite_once_if_mismatch", "needs_revision_on_repeat_mismatch"]
}
}
]
}
FILE:INSTALL.md
# 安装说明
这是一份面向打包校验和环境确认的安装说明。
它不是主要的用户使用指南。
用户使用方式优先看 `README.md`;宿主和 agent 的执行规则优先看 `SKILL.md`。
## 当前打包形态
这个 bundle 已经内嵌最小可运行的 zaomeng 运行时。
- 不再把运行时克隆外部仓库作为主路径
- 默认通过包内运行时入口执行
## 本目录应包含的关键文件
- `README.md`
- `README_EN.md`
- `SKILL.md`
- `MANIFEST.md`
- `PUBLISH.md`
- `runtime/zaomeng_cli.py`
- `runtime/src/...`
- `runtime/requirements.txt`
- `references/output_schema.md`
- `references/style_differ.md`
- `references/logic_constraint.md`
- `references/safety_policy.md`
- `references/validation_policy.md`
- `examples/sample_input_excerpt.txt`
- `examples/sample_character_profile.md`
- `examples/sample_relations.md`
- `examples/test-prompts.json`
## 运行时依赖
- 必需:`PyYAML`
- 可选:`tiktoken`
- 可选:`ebooklib`,仅在读取 `.epub` 小说时需要
## 运行建议
- Windows / PowerShell 优先使用 `py -3 runtime/zaomeng_cli.py ...`
- 默认运行时数据目录为 `runtime/data/`
- 包内 prompt 与 references 主要用于约束与说明,不应用来替代引擎入口
## 快速校验清单
1. `SKILL.md` frontmatter 合法。
2. `runtime/zaomeng_cli.py` 可以启动。
3. 输出字段符合 `references/output_schema.md`。
4. 安全与校验相关规则文件齐全。
FILE:MANIFEST.md
# 打包清单
## 核心文件
- `README.md`
- `README_EN.md`
- `SKILL.md`
- `INSTALL.md`
- `PUBLISH.md`
- `runtime/zaomeng_cli.py`
- `runtime/src/core/__init__.py`
- `runtime/requirements.txt`
- `runtime/src/core/main.py`
- `runtime/src/core/config.py`
- `runtime/src/core/contracts.py`
- `runtime/src/core/llm_client.py`
- `runtime/src/core/path_provider.py`
- `runtime/src/core/rulebook.py`
- `runtime/src/modules/__init__.py`
- `runtime/src/modules/distillation.py`
- `runtime/src/modules/relationships.py`
- `runtime/src/modules/chat_engine.py`
- `runtime/src/modules/reflection.py`
- `runtime/src/modules/speaker.py`
- `runtime/src/utils/__init__.py`
- `runtime/src/utils/file_utils.py`
- `runtime/src/utils/text_parser.py`
- `runtime/src/utils/token_counter.py`
## 参考文件
- `references/output_schema.md`
- `references/style_differ.md`
- `references/logic_constraint.md`
- `references/safety_policy.md`
- `references/validation_policy.md`
## 运行时规则文件
- `runtime/rules/distillation_rules.md`
- `runtime/rules/relationship_rules.md`
- `runtime/rules/speaker_rules.md`
- `runtime/rules/persona_rules.md`
## Prompt 模板
- `prompts/distill_prompt.md`
- `prompts/relation_prompt.md`
- `prompts/correction_prompt.md`
## 示例文件
- `examples/sample_input_excerpt.txt`
- `examples/sample_character_profile.md`
- `examples/sample_relations.md`
- `examples/test-prompts.json`
## 打包目标
- 提供一个内嵌最小运行时的自包含 skill 包
- 不要求运行时再去下载外部仓库
- 运行时入口本地打包;prompt 与 references 主要用于约束和说明,而不是替代引擎本身
FILE:prompts/correction_prompt.md
# 人设纠错提示词
## 任务
修正角色 OOC 台词,全程保留人物核心人设、关系习惯与行为一致性。
## 输入
- 人物档案
- 原始台词
- 修正记录(可选)
- 对话上下文(可选)
## 输出
- 修正后台词
- 修正理由
- 置信度
## 校正依据
修正时优先参考角色人格包中的以下信息:
- `PROFILE` 总档
- `SOUL` 的核心原则与边界
- `STYLE` 的语言习惯与节奏
- `BONDS` 与 `RELATIONS` 的对人差异
- `CONFLICTS` 的软肋、雷点、内在冲突
- `ROLE` 的立场稳定度与剧情职能
- `MEMORY` 的用户纠正与持续修订
## 重点校验维度
1. 是否违背核心身份与阵营立场
2. 是否违背核心动机、深层执念或信仰支柱
3. 是否违背说话风格、用词习惯、语气节奏
4. 是否违背价值取舍、决策逻辑、行为底线
5. 是否忽略特定关系对象的专属态度
6. 是否落入通用套话、万能过渡句、AI 模板腔
7. 是否把私下真实面貌和公开人格混用错位
8. 是否在情绪反应上过猛或过弱,脱离人物惯性
## 规则
1. 严格贴合人物档案里的说话风格、用词习惯、语气节奏。
2. 遵守该角色的决策逻辑、价值取向、行为底线与关系边界。
3. 禁止过度修正;在人设合规前提下,尽量保留原本表达意图。
4. 若无明显 OOC 依据,标记低置信度,并尽量贴近原文不做大改。
5. 拒绝通用套话、万能过渡句、AI 模板化空话,维持角色独特表达。
6. 所有修改只依据给定人物档案、关系层与修正记录,禁止主观脑补、凭空设定。
7. 若原句只是信息不足而非明确 OOC,优先做轻量收紧,而不是重写成另一种人格。
8. 若用户明确给出纠正理由,应把纠正理由沉淀为后续同类发言的高优先级约束。
FILE:prompts/distill_prompt.md
# 人物档案蒸馏提示词
## 任务
从分段小说原文中,蒸馏结构化、通用化、可落盘的人物心智档案。
## 输入
- 小说文本片段
- 候选人物列表(可选)
- 已有修正记录或用户补充(可选)
## 输出
- 严格遵循 `references/output_schema.md` 的 Markdown 人物档案
- 优先产出 `PROFILE.generated.md`
- 如信息足够,可同步支持导航内的可选人格文件拆分:`SOUL`、`GOALS`、`STYLE`、`TRAUMA`、`IDENTITY`、`BACKGROUND`、`CAPABILITY`、`BONDS`、`CONFLICTS`、`ROLE`
## 26 维度覆盖要求
输出时必须尽量覆盖以下 26 个蒸馏维度;若原文证据不足,可留空、降置信度或明确写成“证据不足”,但禁止脑补:
1. 核心身份
2. 核心动机
3. 性格基底
4. 行为逻辑
5. 人物弧光
6. 关键羁绊
7. 符号化特征
8. 世界观适配性
9. 价值取舍体系
10. 情绪反应模式
11. 思维认知偏好
12. 语言表达特质
13. 专属能力与致命短板
14. 出身背景与生存处境
15. 深层执念与隐秘欲望
16. 行事风格倾向
17. 过往创伤与人生烙印
18. 社交相处模式
19. 内在自我矛盾
20. 剧情职能定位
21. 恐惧与避讳事物
22. 信仰与精神支柱
23. 认知局限与成长短板
24. 立场摇摆特性
25. 恩怨奖惩逻辑
26. 私下真实面貌
## 字段映射约束
将上面的 26 维度尽量映射到当前 Markdown 档案字段中:
- 核心身份 -> `core_identity`、`faction_position`、`story_role`
- 核心动机 -> `soul_goal`、`hidden_desire`
- 性格基底 -> `core_traits`、`values`
- 行为逻辑 -> `decision_rules`、`action_style`、`reward_logic`
- 人物弧光 -> `arc_start`、`arc_mid`、`arc_end`
- 关键羁绊 -> `key_bonds`
- 符号化特征 -> `typical_lines`、`signature_phrases`、`sentence_openers`、`sentence_endings`
- 世界观适配性 -> `world_rule_fit`
- 价值取舍体系 -> `values`、`belief_anchor`、`worldview`
- 情绪反应模式 -> `anger_style`、`joy_style`、`grievance_style`、`fear_triggers`
- 思维认知偏好 -> `thinking_style`、`cognitive_limits`
- 语言表达特质 -> `speech_style`、`cadence`、`connective_tokens`、`forbidden_fillers`
- 专属能力与致命短板 -> `strengths`、`weaknesses`
- 出身背景与生存处境 -> `background_imprint`、`life_experience`
- 深层执念与隐秘欲望 -> `hidden_desire`
- 行事风格倾向 -> `action_style`
- 过往创伤与人生烙印 -> `background_imprint`、`taboo_topics`
- 社交相处模式 -> `social_mode`
- 内在自我矛盾 -> `inner_conflict`
- 剧情职能定位 -> `story_role`
- 恐惧与避讳事物 -> `fear_triggers`、`taboo_topics`、`forbidden_behaviors`
- 信仰与精神支柱 -> `belief_anchor`
- 认知局限与成长短板 -> `cognitive_limits`、`weaknesses`
- 立场摇摆特性 -> `stance_stability`
- 恩怨奖惩逻辑 -> `reward_logic`
- 私下真实面貌 -> `private_self`
## 规则
1. 只提取原文有直接依据的内容,禁止编造、脑补、过度解读。
2. 所有量化分值必须使用整数;`values` 维持 `0–10` 区间。
3. 若文本内人物线索稀少、证据薄弱,降低整体置信度,并在相关字段保持克制。
4. `core_traits`、`typical_lines`、`decision_rules`、`strengths`、`weaknesses`、`fear_triggers` 等列表必须自动去重,避免同义重复。
5. 全程使用通用描述逻辑,不绑定单本小说专属黑话、专属 archetype 标签或作品内私有名词模板。
6. 若某维度无法被当前片段支持,不要用“常识”补全;保持空值或保守概括即可。
7. 人物弧光必须基于当前可见片段;若不足以判断成长变化,明确按“静态人物 / 当前片段不足以判断弧光”处理。
8. 语言表达特质必须来自说话内容、叙述描写或稳定互动习惯,不能套用万能“冷静、理智、温和”模板。
9. 对深层执念、私下真实面貌、内在冲突等高风险维度,宁缺毋滥,优先短句事实总结。
10. 如果输入中包含用户纠正或长期修正记录,应把它们视为高优先级约束,但不得覆盖原文中明确相反的硬证据。
FILE:prompts/relation_prompt.md
# 双人关系抽取提示词
## 任务
根据同框互动上下文,抽取角色两两关系图谱与互动特征,并为角色人格包生成可覆写的关系层。
## 输入
- 小说文本片段
- 全部人物名单
- 已有关系修正记录(可选)
## 输出
- 严格遵循 `references/output_schema.md` 的 Markdown 关系图谱
- 关系主图使用 `RELATION_GRAPH`
- 角色侧可映射到 `RELATIONS.generated.md` 的目标关系分节
## 关系维度要求
每一对角色的关系抽取,至少评估以下维度;证据不足时可以中性或留空,但必须说明保守原因:
1. 互动是否明确发生
2. 关系方向是否稳定
3. 信任度
4. 好感度或亲近度
5. 权势差距
6. 冲突焦点
7. 典型互动模式
8. 称呼方式
9. 羁绊强度
10. 关系置信度
## 规则
1. 只对当前片段内明确同框且产生互动的角色对建立关系项。
2. 统一使用排序键名格式 `<甲>_<乙>`,避免双向重复关系。
3. `trust`、`affection` 区间为 `0–10`,`power_gap` 区间为 `-5–5`,全部使用整数。
4. `typical_interaction` 只写能被当前片段支撑的互动链路,不拼接长剧情、不延伸场外关系。
5. 若仅被动同场、未形成清晰互动,则使用中性分值与低置信度,不要强行造关系。
6. 若已有用户纠正或角色侧关系记忆,优先作为校正参考,但不得覆盖当前片段明确相反的硬证据。
7. 称呼信息只在文本中有稳定指向时填写;含混称呼、群体称呼、泛化辈分称呼要谨慎处理。
8. 不得脑补长期宿命、前史恩怨、隐藏爱情、阵营忠诚等片段外设定。
FILE:PUBLISH.md
# ClawHub 发布说明
## 建议元数据
- Type: OpenClaw Skill
- Name: zaomeng-skill
- Display Name: 造梦技能
- Version: 3.1.0
- License: MIT-0
- Category: Writing / Roleplay / Character Simulation
## 风险说明
- 这是一个内嵌最小运行时的自包含 skill 包
- 不再把运行时克隆或外部 Git 引导作为主执行路径
- 仍依赖本地 Python 执行,以及 Python 包信任链:`PyYAML`,可选 `tiktoken`,可选 `ebooklib`
- 已包含显式安全策略
## 发布前检查
1. `SKILL.md` frontmatter 合法。
2. 内嵌运行时入口可以启动。
3. 输出规范与安全相关文件齐全。
4. 示例文件与当前 schema 一致。
5. 包内不包含凭证、密钥或其他敏感信息。
## 版本说明
- `3.1.0`:将 prompt 引用从纯文本切换为 Markdown,对齐 Markdown-first 人格工作流,在 `references/output_schema.md` 中补充 26 维度人格覆盖说明,并新增 `references/style_differ.md` 与 `references/logic_constraint.md`,用于去同质化和防止人设崩坏。
- `3.0.0`:将最小本地 zaomeng 运行时直接内嵌进 skill 包,去除主执行路径上的运行时 Git 引导,并将 skill 的执行入口切换到打包内的 `runtime/zaomeng_cli.py`。
- `2.1.1`:将自动引导流程固定到外部 zaomeng 仓库的特定 commit `649f7466738f99d60c454e167835462215cffc7d`,降低运行时供应链漂移风险。
- `2.1.0`:切换到 A+ 引导流,优先复用本地 zaomeng 仓库,否则再克隆仓库并执行真实 CLI 工作流。
- `2.0.0`:切换到 Markdown-first 人格存储,引入导航/人格包与运行时记忆写入,并加入自然语言聊天意图路由与 distill-before-chat、act/observe 会话建立逻辑。
- `1.0.9`:重写 skill 文档,明确 zaomeng 是本地规则引擎,并要求 agent 使用 `chat --message`。
- `1.0.8`:强化 agent 聊天规则,要求在任何 PTY 或 stdin 回退前必须优先使用 `--message`。
- `1.0.7`:加入单轮 `chat --message` 直接执行说明,并对齐 ClawHub 聊天规则。
- `1.0.6`:补充交互式聊天与需要确认的执行约束说明。
- `1.0.5`:让 ClawHub 打包 schema 和示例与当前按小说分组的本地工作流保持一致。
FILE:README.md
# zaomeng-skill
`zaomeng-skill` 是一个面向中文小说人物蒸馏、关系抽取、角色单聊与群聊的技能包。
它不是普通陪聊模板,而是一套“先蒸馏,再按人物档案说话”的本地规则型工作流。
更准确地说:
- `zaomeng` 负责人物蒸馏、关系抽取、人格导航、持久记忆与 OOC 约束
- 在 OpenClaw 这类宿主中,真正的自然语言理解与最终对话生成,仍应由宿主模型完成
- `zaomeng` 更适合做“人格与约束层”,而不是独自承担最终的自然对话生成
许可证:`MIT-0`(MIT No Attribution)
## 这版有什么变化
当前发布线已经切到 `3.1.0`,重点变化是:
- 改为 Markdown-first,人设主存储不再以旧版 JSON 为准
- `clawhub-zaomeng-skill` 已内嵌最小可运行子集,不再把运行时克隆外部仓库作为主路径
- 支持自然语言优先的使用方式:先蒸馏,再进入 `act` 或 `observe`
- 人格约束拆成三层:格式、去同质化、逻辑底线
## 它能做什么
### 1. 蒸馏人物
从小说原文中提取人物档案,尽量覆盖更完整的人物维度,例如:
- 核心身份
- 核心动机
- 性格基底
- 行为逻辑
- 人物弧光
- 关键羁绊
- 语言表达特质
- 价值取舍体系
- 深层执念与隐秘欲望
- 私下真实面貌
### 2. 抽取关系
从同框互动中提取两两关系,输出关系图谱和角色侧关系层。
### 3. 进入角色聊天
支持两种主要玩法:
- `act`
你扮演一个角色说话,其他角色按设定回应
- `observe`
让多个角色围绕一个场景、话题或开场白进行互动
### 4. 保存纠错
如果某句明显 OOC,可以把纠错写回记忆,后续对话继续沿用。
## 安装方式
### OpenClaw
```bash
openclaw skills install wkbin/zaomeng-skill
```
### ClawHub
```bash
npx clawhub@latest install zaomeng-skill
```
```bash
pnpm dlx clawhub@latest install zaomeng-skill
```
```bash
bunx clawhub@latest install zaomeng-skill
```
### 本地 skill 目录安装
```bash
python scripts/install_skill.py --skills-dir <your-skills-root>
```
## 运行前提
要跑真实工作流,宿主环境至少需要满足这些条件:
- 能执行本地 Python 命令
- 已安装 `PyYAML`
- 如果读取 `.epub`,还需要 `ebooklib`
- 如果需要更准确的 token 估算,可选装 `tiktoken`
skill 包当前使用的打包运行时入口是:
```text
runtime/zaomeng_cli.py
```
## 推荐用法
正确顺序不是一上来就群聊。
**先给小说,再蒸馏人物,蒸馏完成后再进入聊天。**
最常见的使用路径是:
1. 提供小说文件,或指定小说路径
2. 用自然语言说要蒸馏谁
3. 蒸馏完成后,再进入 `act` 或 `observe`
## 自然语言示例
### 蒸馏
```text
帮我蒸馏林黛玉和贾宝玉
```
```text
请从这本小说里提取刘备、张飞、关羽的人设
```
### 进入 act
```text
让我扮演贾宝玉和林黛玉聊天
```
```text
我来扮演宝玉,你让黛玉回我
```
### 进入 observe
```text
进入刘备、张飞、关羽群聊模式
```
```text
请让大家围绕联合孙权这件事各说一句
```
## CLI 示例
如果你直接运行打包运行时,可用这些命令:
```bash
py -3 runtime/zaomeng_cli.py distill --novel <路径> --characters A,B
py -3 runtime/zaomeng_cli.py extract --novel <路径>
py -3 runtime/zaomeng_cli.py chat --novel <路径或名称> --mode auto --message "让我扮演A和B聊天"
py -3 runtime/zaomeng_cli.py view --character <角色名> --novel <路径或名称>
py -3 runtime/zaomeng_cli.py correct --session <id> --message <原句> --corrected <修正句> --character <角色名>
```
## 人格包结构
当前人物主存储为 Markdown 人格包,常见目录结构如下:
```text
runtime/data/characters/<novel_id>/<角色名>/
```
常见文件:
- `NAVIGATION.generated.md`
- `NAVIGATION.md`
- `PROFILE.generated.md`
- `PROFILE.md`
- `RELATIONS.generated.md`
- `RELATIONS.md`
- `MEMORY.md`
按人物证据情况,还可能生成可选拆分文件:
- `SOUL.generated.md`
- `GOALS.generated.md`
- `STYLE.generated.md`
- `TRAUMA.generated.md`
- `IDENTITY.generated.md`
- `BACKGROUND.generated.md`
- `CAPABILITY.generated.md`
- `BONDS.generated.md`
- `CONFLICTS.generated.md`
- `ROLE.generated.md`
## 约束文件
这版 skill 把约束拆成三层:
- `references/output_schema.md`
负责输出格式与字段规范
- `references/style_differ.md`
负责防同质化与风格差异化
- `references/logic_constraint.md`
负责全局人设底线、防 OOC 与模式边界
如果你在检查输出质量,这三份文件应该一起看,而不是只看 schema。
## 和 SKILL.md 的区别
- `README.md` 是给用户看的,重点是安装、使用方式和产物说明
- `SKILL.md` 是给宿主和 agent 读的,重点是执行规则、调用约束和禁止行为
## 发布提示
如果你要把这个 skill 单独发布,建议至少一起带上这些文件:
- `README.md`
- `SKILL.md`
- `INSTALL.md`
- `MANIFEST.md`
- `PUBLISH.md`
- `prompts/`
- `references/`
- `runtime/`
## License
`MIT-0`
FILE:README_EN.md
# zaomeng-skill
`zaomeng-skill` is a skill package for Chinese novel character distillation, relationship extraction, one-on-one roleplay, and group character chat.
It is not a generic chat template. It is a local rule-based workflow built around one principle: distill first, then let characters speak according to their profiles.
More precisely:
- `zaomeng` is responsible for character distillation, relationship extraction, persona navigation, persistent memory, and OOC constraints
- in hosts such as OpenClaw, natural-language understanding and the final dialogue generation should still be handled by the host model
- `zaomeng` is best used as the persona-and-constraint layer, rather than as the sole natural dialogue generator
License: `MIT-0` (MIT No Attribution)
## What's New In This Version
The current release line is `3.1.0`. The main changes are:
- markdown-first storage: persona data no longer treats legacy JSON as the source of truth
- `clawhub-zaomeng-skill` now includes an embedded minimal runtime, instead of treating runtime cloning from an external repository as the primary path
- natural-language-first usage: distill first, then enter `act` or `observe`
- layered persona constraints: format, anti-homogenization, and logic floor are separated
## What It Does
### 1. Distill Characters
Extract character profiles from raw novel text and cover a richer set of dimensions, such as:
- core identity
- core motivation
- personality base
- decision logic
- character arc
- key bonds
- language expression style
- value tradeoff system
- hidden desire
- private self
### 2. Extract Relationships
Extract pairwise relationship graphs from same-scene interactions and generate both graph-level and character-side relation layers.
### 3. Enter Character Chat
Two main interaction modes are supported:
- `act`
You control one character's line, and the other characters respond in character
- `observe`
Multiple characters interact around a scene, topic, or opening line
### 4. Save Corrections
If a line is clearly out of character, you can write the correction back into memory and keep using that correction in later dialogue.
## Installation
### OpenClaw
```bash
openclaw skills install wkbin/zaomeng-skill
```
### ClawHub
```bash
npx clawhub@latest install zaomeng-skill
```
```bash
pnpm dlx clawhub@latest install zaomeng-skill
```
```bash
bunx clawhub@latest install zaomeng-skill
```
### Install Into A Local Skills Directory
```bash
python scripts/install_skill.py --skills-dir <your-skills-root>
```
## Runtime Requirements
To run the real workflow, the host environment should support:
- local Python command execution
- `PyYAML`
- `ebooklib` when reading `.epub`
- optional `tiktoken` for more accurate token estimation
The packaged runtime entrypoint inside the skill is:
```text
runtime/zaomeng_cli.py
```
## Recommended Usage Flow
The correct order is not to jump into chat immediately.
**Provide the novel first, distill the characters, and only then enter chat.**
The most common user flow is:
1. provide the novel file or file path
2. say which characters you want distilled in natural language
3. after distillation finishes, enter `act` or `observe`
## Natural-Language Examples
### Distill
```text
Distill Lin Daiyu and Jia Baoyu for me
```
```text
Extract personas for Liu Bei, Zhang Fei, and Guan Yu from this novel
```
### Enter Act Mode
```text
Let me play Jia Baoyu and chat with Lin Daiyu
```
```text
I will play Baoyu. Let Daiyu reply to me
```
### Enter Observe Mode
```text
Enter Liu Bei, Zhang Fei, Guan Yu group chat mode
```
```text
Let everyone say one line about the alliance with Sun Quan
```
## CLI Examples
If you run the packaged runtime directly, use commands like these:
```bash
py -3 runtime/zaomeng_cli.py distill --novel <path> --characters A,B
py -3 runtime/zaomeng_cli.py extract --novel <path>
py -3 runtime/zaomeng_cli.py chat --novel <path-or-name> --mode auto --message "Let me play A and chat with B"
py -3 runtime/zaomeng_cli.py view --character <name> --novel <path-or-name>
py -3 runtime/zaomeng_cli.py correct --session <id> --message <raw> --corrected <fixed> --character <name>
```
## Persona Bundle Structure
The main character storage is now a markdown persona bundle. The common directory shape is:
```text
runtime/data/characters/<novel_id>/<character_name>/
```
Common files:
- `NAVIGATION.generated.md`
- `NAVIGATION.md`
- `PROFILE.generated.md`
- `PROFILE.md`
- `RELATIONS.generated.md`
- `RELATIONS.md`
- `MEMORY.md`
Depending on available evidence, optional focused persona files may also be generated:
- `SOUL.generated.md`
- `GOALS.generated.md`
- `STYLE.generated.md`
- `TRAUMA.generated.md`
- `IDENTITY.generated.md`
- `BACKGROUND.generated.md`
- `CAPABILITY.generated.md`
- `BONDS.generated.md`
- `CONFLICTS.generated.md`
- `ROLE.generated.md`
## Constraint Files
This version splits constraints into three layers:
- `references/output_schema.md`
format and field contract
- `references/style_differ.md`
anti-homogenization and style differentiation
- `references/logic_constraint.md`
global persona floor, anti-OOC rules, and mode boundaries
If you are checking output quality, these three files should be read together rather than reading only the schema.
## README.md vs SKILL.md
- `README.md` is for users and focuses on installation, usage, and outputs
- `SKILL.md` is for hosts and agents and focuses on execution rules, invocation constraints, and forbidden behavior
## Publishing Notes
If you publish this skill on its own, it is best to include at least:
- `README.md`
- `README_EN.md`
- `SKILL.md`
- `INSTALL.md`
- `MANIFEST.md`
- `PUBLISH.md`
- `prompts/`
- `references/`
- `runtime/`
## License
`MIT-0`
FILE:references/logic_constraint.md
# 逻辑约束指南
## 用途
用于防止人物设定崩坏、OOC 漂移,以及在蒸馏、纠错、act 模式、observe 模式中的角色逻辑失稳。
这份文件定义的是全局人格逻辑边界,不是风格文件,也不是格式文件。
## 约束优先级
当多个约束发生冲突时,按以下优先级处理:
1. 原文直接证据
2. 已写入记忆的用户纠正
3. 针对具体关系对象的关系约束
4. 角色档案中的人格字段
5. 风格偏好
6. 通用兜底规则
## 全局人格底线
### 1. 身份不能轻易翻转
- 角色不能在没有证据的情况下,随意背离核心身份、派系位置或剧情职能。
- 一时迟疑,不等于身份反转。
### 2. 动机必须前后一致
- 发言和行为应尽量与 `soul_goal`、`hidden_desire`、`belief_anchor` 保持相容。
- 角色可以战术性回避,但不能在没有依据时抹掉长期驱动力。
### 3. 价值观约束决策
- `decision_rules`、`reward_logic`、`taboo_topics`、`forbidden_behaviors` 构成人物硬边界。
- 角色可以偶尔违背自己的价值观,但必须是少见、代价高、且有证据支撑的情况。
### 4. 关系层优先于通用语气
- 角色不会对所有人都用同一种态度说话。
- 关系层与 `key_bonds` 应先于通用风格兜底生效。
### 5. 情绪反应必须符合人物惯性
- 高压场景不能把所有角色都推成同一种“冷静分析型”。
- `anger_style`、`grievance_style`、`fear_triggers`、`private_self` 必须保持稳定。
### 6. 人物弧光必须有代价和事件支撑
- 立场变化、成长、崩塌都应由可见事件推动。
- 当前证据不足时,优先沿用此前更稳定的人格状态。
## 硬性失败条件
出现以下任一情况时,应降置信度、重写,或返回 `needs_revision`:
1. 无证据地违背核心价值观
2. 忽略对当前关系对象的专属态度
3. 说话风格退化为通用 AI 填充语
4. 无依据地把公开人格和私下真实面貌混成一层
5. 角色突然套用了另一个角色的思路或话术
6. 这句话在世界观里似乎合理,但对这个人来说并不合理
## Act 模式约束
1. 用户输入的,是受控角色的台词,不是让系统改写成旁白式角色扮演。
2. 响应角色必须以该角色身份回话,而不是以旁白、作者或场景导演身份补戏。
3. 不得把模式切换指令误改写为剧情演示文本。
4. 如果用户台词含混,应保守地继续人物对话,而不是擅自发散出新的剧情分支。
## Observe 模式约束
1. 不是每一轮都必须所有角色一起说话。
2. 优先让最相关、最受影响、或被明确点名的角色发言。
3. 当发言会抢戏、违背人设或与当前场景弱相关时,可以允许沉默。
4. 同一轮内不同角色的语气、逻辑和情绪表面必须能区分开。
## 纠错约束
1. 优先做最小修正,以恢复人物一致性。
2. 尽量保留原句的沟通意图。
3. 如果 OOC 证据不足,应标记低置信度,而不是过度改写。
4. 一旦某类纠错被确认有效,应允许写入 `MEMORY.md`,作为后续持续约束。
FILE:references/output_schema.md
# 输出规范
## 人物档案
```md
# PROFILE
<!-- Canonical markdown profile storage. -->
## Meta
- name: 角色名
- novel_id: sample_novel
- source_path: data/sample_novel.txt
## Core
- core_traits: 性格1;性格2
- values: 勇气=0;智慧=0;善良=0;忠诚=0;野心=0;正义=0;自由=0;责任=0
- speech_style: 语言风格描述
- identity_anchor: 角色在世界中的自我定位
- soul_goal: 长期驱动目标
- worldview: 对世界、规则、善恶、秩序的基本看法
- thinking_style: 理性/感性/短视/长远等思考偏好
## Deep Persona
- core_identity: 核心身份与定位
- faction_position: 阵营、派系、立场位置
- background_imprint: 出身背景与成长烙印
- world_rule_fit: 人物理念与世界观规则的契合度描述
- social_mode: 社交距离与相处模式
- hidden_desire: 深层执念或隐秘欲望
- inner_conflict: 内在矛盾
- story_role: 剧情职能定位
- belief_anchor: 信仰与精神支柱
- private_self: 私下真实面貌
- stance_stability: 立场稳定度或摇摆特性
- reward_logic: 恩怨奖惩逻辑
- strengths: 专属能力;擅长项
- weaknesses: 性格缺陷;明显短板
- cognitive_limits: 认知盲区;成长短板
- fear_triggers: 恐惧点;雷点;避讳触发项
- key_bonds: 关键羁绊;宿命联结
- action_style: 行事风格倾向
## Voice
- typical_lines: 代表性台词;高辨识表达
- decision_rules: 条件->反应;固定决策准则
- life_experience: 关键经历;过往创伤;人生烙印
- taboo_topics: 不能触碰的话题
- forbidden_behaviors: 绝不会做的行为
- cadence: 说话节奏
- signature_phrases: 专属口头禅;标志句式
- sentence_openers: 常见起句
- connective_tokens: 常用连接词
- sentence_endings: 常见收尾方式
- forbidden_fillers: 禁用口水词;禁用通用助词
- anger_style: 生气时的表达方式
- joy_style: 开心时的表达方式
- grievance_style: 委屈/受压时的表达方式
## Arc
- arc_start: 勇气=5;立场=6
- arc_mid: 勇气=6;trigger_event=事件
- arc_end: 勇气=7;final_state=状态
## Evidence
- description_count: 1
- dialogue_count: 2
- thought_count: 0
- chunk_count: 1
```
规则:
- `core_traits` max 10 unique items
- `typical_lines` max 8 unique items
- `decision_rules` max 8 unique items
- list-like fields use `;` as the separator in markdown scalar lines
- `values` all integers in `[0,10]`
- evidence fields store counts, not raw text arrays
- any deep persona field without solid evidence may stay empty
- `arc_start` / `arc_mid` / `arc_end` 只有在识别到稳定阶段变化时才应量化;若证据不足,应留空或仅保留 `trigger_event` / `final_state` 的未判定说明
### 26 维度覆盖映射
当前这套 Markdown 结构应尽量覆盖以下 26 个人格维度:
1. 核心身份 -> `core_identity` / `faction_position` / `story_role`
2. 核心动机 -> `soul_goal` / `hidden_desire`
3. 性格基底 -> `core_traits` / `values`
4. 行为逻辑 -> `decision_rules` / `action_style` / `reward_logic`
5. 人物弧光 -> `arc_start` / `arc_mid` / `arc_end`
6. 关键羁绊 -> `key_bonds`
7. 符号化特征 -> `typical_lines` / `signature_phrases` / `sentence_openers` / `sentence_endings`
8. 世界观适配性 -> `world_rule_fit`
9. 价值取舍体系 -> `values` / `belief_anchor` / `worldview`
10. 情绪反应模式 -> `anger_style` / `joy_style` / `grievance_style` / `fear_triggers`
11. 思维认知偏好 -> `thinking_style` / `cognitive_limits`
12. 语言表达特质 -> `speech_style` / `cadence` / `connective_tokens`
13. 专属能力与致命短板 -> `strengths` / `weaknesses`
14. 出身背景与生存处境 -> `background_imprint` / `life_experience`
15. 深层执念与隐秘欲望 -> `hidden_desire`
16. 行事风格倾向 -> `action_style`
17. 过往创伤与人生烙印 -> `background_imprint` / `life_experience` / `taboo_topics`
18. 社交相处模式 -> `social_mode`
19. 内在自我矛盾 -> `inner_conflict`
20. 剧情职能定位 -> `story_role`
21. 恐惧与避讳事物 -> `fear_triggers` / `taboo_topics` / `forbidden_behaviors`
22. 信仰与精神支柱 -> `belief_anchor`
23. 认知局限与成长短板 -> `cognitive_limits` / `weaknesses`
24. 立场摇摆特性 -> `stance_stability`
25. 恩怨奖惩逻辑 -> `reward_logic`
26. 私下真实面貌 -> `private_self`
## 人格包文件
蒸馏过程还可以在以下目录下生成可选人格包:
```text
runtime/data/characters/<novel_id>/<角色名>/
```
常见文件:
- `NAVIGATION.generated.md`: generated load order and file intent
- `NAVIGATION.md`: manual override and navigation supplement
- `PROFILE.generated.md`: canonical generated profile
- `PROFILE.md`: manual override profile
- `RELATIONS.generated.md`: generated target-specific relations
- `RELATIONS.md`: manual relation overrides
- `MEMORY.md`: durable memory and user corrections
可选的聚焦人格文件:
- `SOUL.generated.md`
- `GOALS.generated.md`
- `STYLE.generated.md`
- `TRAUMA.generated.md`
- `IDENTITY.generated.md`
- `BACKGROUND.generated.md`
- `CAPABILITY.generated.md`
- `BONDS.generated.md`
- `CONFLICTS.generated.md`
- `ROLE.generated.md`
## 关系图谱
```md
# RELATION_GRAPH
## 林黛玉_贾宝玉
- trust: 8
- affection: 9
- power_gap: 1
- conflict_point: 表达方式差异
- typical_interaction: 黛玉质问->宝玉安抚->短暂缓和
- appellation_to_target: 宝玉
- confidence: 8
```
规则:
- relation section title must use sorted key format `<A>_<B>`
- `trust` and `affection` in `[0,10]`
- `power_gap` in `[-5,5]`
- `confidence` in `[0,10]`
- 关系条目必须基于同场景互动证据
## 聊天约束(可选)
```md
# CHAT_CONSTRAINTS
- character: 林黛玉
- must_follow: 语气克制但可反讽;冲突时先防御再观察
- must_avoid: 无证据的极端背叛表述;与高忠诚值冲突的抛弃宣言
- fallback_action: rewrite_once_then_needs_revision
```
## 纠错输出
```md
# CORRECTION
- corrected_message: 修正后的台词
- correction_reason: 基于哪些人格字段收紧
- confidence: 7
```
FILE:references/safety_policy.md
# 安全策略
## 执行边界
- 默认采用本地优先。
- 运行时说明中不应要求强制联网拉取资源。
- 不要求任意 shell 命令执行。
## 数据安全
- 不得请求或存储凭证、密钥等敏感信息。
- 示例与输出中尽量避免个人隐私或敏感数据。
- 所有结果都应被限制在用户提供文本证据的范围内。
## 完整性检查
- 若证据稀薄,应输出低置信度结果。
- 若不符合 schema 约束,应返回 `needs_revision`。
- 若用户请求存在明显不安全行为,应拒绝并给出安全替代方案。
FILE:references/style_differ.md
# 风格差异化指南
## 用途
用于在蒸馏、纠错、单聊、群聊生成时降低角色同质化。
这不是自由发挥式 prompt,而是一份约束参考。它的作用是帮助运行时和辅助 agent 让每个角色在表达层面保持可辨识的差异。
## 核心原则
不要只靠“观点不同”来区分角色,而要让角色在以下层面也真正不同:
1. 说话节奏
2. 用词偏好
3. 情绪外露方式
4. 冲突处理习惯
5. 面对不同关系对象时的语气差异
6. 公开人格与私下真实面貌的落差
## 差异化维度
### 1. 句式节奏
- 短促直接型:句子短,铺垫少,态度出现快
- 层层展开型:先讲前因后果,再落到结论
- 情绪断裂型:句式会被情绪打断,带出波动
- 克制含蓄型:话少,更多靠分寸和停顿表达立场
### 2. 用词层级
- 朴素口语
- 雅正克制
- 尖锐刻薄
- 温和委婉
- 清冷疏离
- 戏谑调侃
不要让所有角色最后都坍缩成同一种“先看看/再说/权衡一下”的安全说法。
### 3. 情绪表面形态
- 有的角色情绪来得快,会立刻外放
- 有的角色先压住情绪,只在措辞上收紧
- 有的角色会先反讽、先顶回去
- 有的角色嘴上抱怨,但行动上依然会跟进
除非有明确弧光证据,否则情绪风格不应频繁漂移。
### 4. 决策表达顺序
- 有的角色先说结论
- 有的角色先划边界
- 有的角色先提关系
- 有的角色先谈后果
思考顺序的外显,本身就是人物声纹的一部分。
### 5. 关系对象差分
同一个角色,面对不同对象时应当允许发生稳定变化:
- 对长辈/上位者
- 对平辈/对手
- 对亲近之人
- 对陌生人
- 对敌对者
不能把所有对象都压平为同一套万能口吻。
### 6. 公开人格与私下真实面貌
- 有的人表面克制,内里强烈
- 有的人表面强势,内里不安
- 有的人表面玩笑,内里认真
要结合 `private_self`、`social_mode`、`key_bonds` 与关系层,稳定维护这种落差。
## 防同质化规则
1. 同一场景中,不要让多个角色复用同一套兜底句式骨架。
2. 不要把所有“冷静型”角色都写成同一种“理性、稳妥、先观察”的声音。
3. 不要把所有冲突都写成直接解释;有的人会回避,有的人会嘲讽,有的人会截断,有的人会压着说。
4. 不要过度使用“先看看”“再作定夺”“还需权衡”这类通用过渡句。
5. 即使两个角色立场相同,他们的理由、表面措辞和情绪手感也应不同。
6. 保留不对称性:有人推进,有人兜底,有人反讽,有人吸收压力。
## 输出前自检
在最终输出前,至少检查以下问题:
1. 去掉角色名之后,这句话是否仍然大致能辨认出是谁说的?
2. 这句话体现的是该角色自己的表达习惯,还是一种泛用叙述者口吻?
3. 如果听话对象换了,这句话是否会随关系发生合理变化?
4. 把这句话放到同场其他角色旁边时,声音是否仍然分得开?
如果第 2 条或第 4 条答案是否定的,说明这句仍然过于模板化。
FILE:references/validation_policy.md
# 校验策略(三重校验)
## 用途
通过对关键结论逐层校验,提高抽取可靠性,并减少 OOC 行为。
## 校验层级
### 1)证据校验
- 每个关键结论都必须能对应到至少一条具体的句级文本证据。
- 如果没有证据,该结论不应直接定稿。
### 2)一致性校验
- 性格特征与决策规则不能与 `values` 明显冲突。
- 拟定的说话风格必须与 `speech_style` 保持一致。
- 关系分值变化必须与互动证据的正负倾向相匹配。
### 3)迁移校验
- 一个结论只有在能被迁移到新的对话轮次、并仍然保持人格稳定时,才算成立。
- 如果某条结论一用于模拟对话就导致 OOC,应拒绝或修正该结论。
## 通过/失败规则
- 通过:三层全部通过。
- 软失败:有 1 层失败,需先修正再输出。
- 硬失败:有 2 层及以上失败,返回 `needs_revision`。
FILE:runtime/requirements.txt
PyYAML>=6.0
# Optional: improves token counting accuracy.
tiktoken>=0.5.0
# Optional: only required when the novel input is .epub.
ebooklib>=0.18
FILE:runtime/rules/distillation_rules.md
---
address_suffixes:
- 哥哥
- 姐姐
- 妹妹
- 弟弟
- 姑娘
- 公子
- 爷
speech_verbs:
- 道
- 说道
- 笑道
- 问道
- 答道
- 喝道
- 叫道
- 叹道
- 呼道
object_leaders:
- 叫
- 唤
- 问
- 对
- 向
- 同
- 与
- 跟
- 把
- 将
- 扯住
- 拉住
- 搀起
- 扶起
- 扶着
- 呼
- 忙呼
- 喝住
- 捉住
- 拿住
- 推着
- 拖着
- 请
- 教
stop_names:
- 我们
- 你们
- 他们
- 她们
- 自己
- 那里
- 这里
- 这个
- 那个
- 一种
- 一个
common_surnames: "赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻柏水窦章云苏潘葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮卞齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闵席季麻强贾路娄危江童颜郭梅盛林刁钟徐邱骆高夏蔡田樊胡凌霍虞万支柯昝管卢莫经房裘缪干解应宗丁宣贲邓郁单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁荀羊惠甄曲家封芮羿储靳汲邴糜松井段富巫乌焦巴弓牧隗山谷车侯宓蓬全郗班仰秋仲伊宫宁仇栾暴甘钭厉戎祖武符刘景詹束龙叶幸司韶郜黎"
trait_keywords:
勇敢: [勇, 冲, 无畏, 果断]
温柔: [轻声, 温和, 安慰, 体贴]
聪慧: [思索, 推断, 聪明, 机敏]
敏感: [委屈, 难过, 心酸, 叹息]
傲气: [冷笑, 不屑, 高傲, 轻蔑]
忠诚: [守护, 忠, 誓言, 不离]
善良: [帮助, 善意, 宽容, 谅解]
执拗: [坚持, 非要, 绝不, 固执]
机变: [变化, 试探, 识破, 周旋]
诙谐: [笑道, 打趣, 顽皮, 戏弄]
虔诚: [佛, 祈祷, 经文, 戒律]
沉稳: [稳住, 接应, 收拾, 不慌]
圆滑: [不如, 且慢, 何必, 先看看]
archetypes:
adaptive_initiator:
markers: [先探, 真假, 破局, 当先, 出手, 顶上]
traits: [勇敢, 聪慧, 机变, 傲气]
value_bias: {勇气: 2, 智慧: 2, 自由: 1}
speech_style: "语言直白敏捷,常主动推进局面。"
identity_anchor: "不爱受拘束,但会在关键处主动顶上去的人"
soul_goal: "看清局面后抢先破局,不让自己人被动挨打"
worldview: "局势未明先探清,真有阻碍再果断出手。"
thinking_style: "先试真假,再看切口,找到破绽就推进。"
decision_rules:
- "局势未明时先试探,再决定是否硬闯"
- "同伴受险时先顶上去,再分真假轻重"
life_experience: "经历过高压局面,因此不习惯被动挨打。"
forbidden_behaviors:
- "不会在局势未明时彻底躺手不管"
- "不会眼看自己人受压却只顾旁观"
grounded_pragmatist:
markers: [吃亏, 便宜, 退路, 省力, 算计, 划算]
traits: [诙谐, 圆滑, 善良]
value_bias: {自由: 2, 智慧: 1, 忠诚: 1}
speech_style: "语言口语化、会盘算利害,也常带几分玩笑。"
identity_anchor: "先替自己盘算,但真到要紧处也不肯彻底脱身的人"
soul_goal: "尽量少吃亏,同时守住自己不愿真正丢开的关系"
worldview: "能避损则避损,真遇大事也不能只顾自己抽身。"
thinking_style: "先盘利害和退路,再决定要不要跟进。"
decision_rules:
- "先盘算利害和退路,再决定投入多少"
- "嘴上会抱怨,但真正要命时不会彻底脱身"
life_experience: "尝过吃亏和占便宜,所以总会先替自己算一步。"
forbidden_behaviors:
- "不会为了小利立刻抛下已经认下的人"
- "不会把玩笑话当成真正逃责的借口"
moral_guardian:
markers: [本心, 慈悲, 戒律, 因果, 规矩, 劝解]
traits: [虔诚, 温柔, 善良, 克制]
value_bias: {善良: 3, 责任: 2, 正义: 2, 忠诚: 1}
speech_style: "语言克制温和,常从原则、因果或边界出发。"
identity_anchor: "把原则和良知压在心上,不愿轻易越线的人"
soul_goal: "守住本心与底线,让事情朝更稳妥的方向落下"
worldview: "能劝则先劝,能守则先守,不为一时痛快越过边界。"
thinking_style: "先辨边界和后果,再决定是否推进。"
decision_rules:
- "先辨边界和后果,再决定是否推进"
- "能劝解则先劝解,不能只为一时痛快越线"
life_experience: "一路见过后果与代价,所以更重边界和因果。"
forbidden_behaviors:
- "不会轻易把无辜者推去承担代价"
- "不会为了一时气盛主动越过原则"
steady_supporter:
markers: [接应, 后路, 收拾, 稳住, 扛住, 照应]
traits: [沉稳, 忠诚, 善良]
value_bias: {责任: 3, 忠诚: 2, 善良: 1}
speech_style: "语言朴实克制,更在意接应、落实和托底。"
identity_anchor: "不争抢话头,却总会把前后收拾稳当的人"
soul_goal: "把前后照应好,让队伍和关系不因混乱散掉"
worldview: "自己多担一步,局面就能少乱一步。"
thinking_style: "先把眼前事务接稳,再补后手和接应。"
decision_rules:
- "前头有人推进时,自己先补后手和接应"
- "局面纷乱时先稳住差事和后路"
life_experience: "常在队伍后方收拾残局,因此习惯先顾前后照应。"
forbidden_behaviors:
- "不会在关键时刻把后路和杂务一起丢下"
- "不会只争口头却不接实际担子"
value_markers:
勇气:
positive: [战, 斗, 打, 杀, 探路, 上前, 顶住, 硬扛]
negative: [退后, 躲, 怕, 不敢]
智慧:
positive: [思量, 计较, 试探, 变化, 识破, 探明, 看清, 分辨]
negative: [懵懂, 糊涂, 上当]
善良:
positive: [慈悲, 怜, 救, 饶, 放, 劝, 安慰, 体谅]
negative: [害命, 伤人, 羞辱, 折磨]
忠诚:
positive: [跟随, 守住, 护送, 承诺, 同行, 接应, 师门, 同伴]
negative: [散伙, 丢下, 背离]
正义:
positive: [善恶, 天理, 公道, 罪过, 正路, 规矩]
negative: [枉杀, 作恶, 欺心]
自由:
positive: [自在, 逍遥, 快活, 不受拘束, 随心]
negative: [拘束, 受制, 被迫]
野心:
positive: [称王, 做官, 名号, 本事, 抬高, 图谋]
negative: [不图, 无意争锋]
责任:
positive: [后路, 行李, 安顿, 守住, 扛住, 照应]
negative: [误事, 偷懒, 撂下]
style_templates:
short_direct: "句式偏短,较少铺垫,态度来得直接。"
long_reflective: "句式较长,喜欢把轻重和前因后果慢慢展开。"
emotional: "情绪浮在表面,回应时容易带出锋芒或波动。"
balanced: "表达有分寸,既不极短,也不刻意铺陈。"
quiet: "发言偏少,更多通过态度和分寸表明立场。"
decision_rule_signals:
verify_first:
markers: [先看, 看清, 看明, 试探, 探明, 查清, 核实, 分清, 辨清, 虚实, 真假, 判断]
template: "遇到说法不清或局势未明时,会先辨清虚实,再决定站位。"
boundary_first:
markers: [不可, 不能, 不准, 休得, 住口, 禁言, 放下, 越线, 分寸, 底线, 规矩]
template: "一旦碰到底线或规矩,会立刻收紧语气,先把边界划清。"
protect_first:
markers: [保护, 护住, 护着, 别碰, 拦住, 挡住, 顶上, 扛住, 受伤, 出事, 守住]
template: "眼见身边人受压或将要出事时,往往会先出手护住,再谈轻重。"
order_first:
markers: [稳住, 收住, 压住, 安顿, 接应, 后手, 后路, 收拾, 托底, 先退]
template: "局面一乱,第一反应往往是先稳住场面、补上后手,不会任由事态散开。"
deflect_then_commit:
markers: [罢了, 不过, 随你, 何必, 算了, 且慢, 慢些, 等等, 先这样]
template: "嘴上未必立刻说死,常会先留一步转圜,再决定要不要真正压上去。"
self_defense_first:
markers: [不是, 没有, 误会, 凭什么, 我说了, 不关, 休想, 何须]
template: "先护住自己的立场与说法,不肯平白受人按头定性。"
taboo_topics_by_value:
忠诚: [背叛, 失信]
责任: [弃民, 不顾众人, 撂下同伴]
正义: [黑白颠倒, 颠倒是非]
善良: [羞辱弱者, 拿人心取笑]
forbidden_behaviors_by_value:
忠诚: [不会为了眼前轻利立刻翻脸背盟]
责任: [不会临到担事时先把后路撂下]
正义: [不会明知失当还故意混淆是非]
善良: [不会把无辜者当作可随手牺牲的筹码]
---
# DISTILLATION RULES
These are generic rule assets for distilling characters from any novel.
FILE:runtime/rules/persona_rules.md
---
default_nav_load_order: [SOUL, GOALS, STYLE, TRAUMA, IDENTITY, AGENTS, RELATIONS, MEMORY]
persona_file_catalog:
SOUL:
optional: false
role: "core values, worldview, boundaries"
behaviors: "stance, taboo, refusal, value judgment"
write_policy: "manual_edit"
GOALS:
optional: true
role: "long-term drive, unfinished desire, decision priority"
behaviors: "strategic preference, ambition, long arc pressure"
write_policy: "manual_edit"
STYLE:
optional: true
role: "signature phrasing, cadence, surface emotion, sample lines"
behaviors: "word choice, sentence length, tone, signature wording"
write_policy: "manual_edit"
TRAUMA:
optional: true
role: "pain points, scars, taboo triggers, never-do rules"
behaviors: "trigger reactions, avoidance, hard boundaries"
write_policy: "manual_edit"
IDENTITY:
optional: false
role: "background, lived experience, habits, emotion profile"
behaviors: "self-reference, memory framing, habit-driven reactions"
write_policy: "manual_edit"
AGENTS:
optional: false
role: "runtime behavior rules, silence policy, group chat routing"
behaviors: "when to speak, when to hold back, how to engage others"
write_policy: "manual_edit"
RELATIONS:
optional: true
role: "target-specific trust, affection, appellations, friction points"
behaviors: "tone toward each other character, appellations, conflict framing"
write_policy: "manual_edit"
MEMORY:
optional: false
role: "stable notes plus runtime write-back from user guidance and corrections"
behaviors: "persistent user constraints, correction carry-over, mutable notes"
write_policy: "runtime_append"
---
# PERSONA RULES
Editable persona bundle schema and navigation metadata.
FILE:runtime/rules/relationship_rules.md
---
appellation_pattern: "(大哥|二哥|三哥|四哥|大姐|二姐|三姐|大弟|二弟|三弟|贤弟|兄长|哥哥|姐姐|妹妹|弟弟|主公|将军|军师|丞相|先生|夫人|姑娘|公子)"
speech_verbs: [道, 说, 问, 答, 笑, 喝, 叹, 叫]
positive_markers: [信任, 相信, 依靠, 照顾, 保护, 安慰, 和好]
negative_markers: [怀疑, 争执, 误会, 冲突, 冷战, 责备, 怒]
power_markers: [命令, 压制, 服从, 主导]
conflict_markers: [家族, 婚约, 利益, 权力, 秘密, 背叛, 立场, 规矩]
ambiguous_appellations: [大哥, 二哥, 三哥, 四哥, 大姐, 二姐, 三姐, 大弟, 二弟, 三弟, 贤弟, 兄长, 哥哥, 姐姐, 妹妹, 弟弟, 主公, 将军, 军师, 丞相, 先生, 夫人, 姑娘, 公子]
appellation_target_window: 8
---
# RELATIONSHIP RULES
Relationship extraction heuristics are editable here.
FILE:runtime/rules/speaker_rules.md
---
question_tokens: [是否, 要不要, 该不该, 可否, 能否, 应否]
war_tokens: [战事, 对抗, 联合, 联手, 结盟, 出兵, 守城, 攻势, 冲突]
rest_tokens: [安稳, 清闲, 共聚, 团聚, 暂歇, 太平]
view_tokens: [怎么看, 如何, 何如, 怎么想, 依你看, 依诸位看]
care_tokens: [可安, 可好, 无恙, 辛苦, 担心, 挂念]
generic_fillers: [哈哈, 好吧, 确实, 呢, 呀]
trait_priority_map:
谨慎: 智慧
聪慧: 智慧
机变: 智慧
敏感: 善良
克制: 责任
勇敢: 勇气
忠诚: 忠诚
善良: 善良
执拗: 正义
傲气: 自由
温柔: 善良
虔诚: 忠诚
仁厚: 责任
豪爽: 勇气
沉稳: 责任
诙谐: 自由
圆滑: 自由
signature_fragments: [依我看, 不可, 不必, 兄弟, 百姓, 大义, 且慢]
opener_patterns: [我自来, 我素来, 我看, 这倒是, 那里的话, 只不过, 只是, 不过, 既然, 若是, 原是, 原不应, 总要, 何必, 不必, 罢了, 未为不可, 亦未为不可, 只管]
connective_patterns: [只是, 不过, 既然, 若是, 原是, 原不应, 总要, 可知, 倒要, 倒也, 只管, 便, 却, 又, 还, 原来]
ending_patterns: [罢了, 就是了, 未为不可, 也不迟, 何必, 不必, 而已, 便是, 也就罢了]
fragment_stopwords: [不可, 可以, 只是, 不过, 如今, 今日, 明日, 知道, 一个, 这个, 那个, 这样, 那里, 这里, 不是, 没有, 不得, 你们, 我们, 他们]
preferred_leading_chars: [我, 这, 那, 只, 便, 却, 还, 原, 若, 既, 可, 未, 何, 岂, 偏, 倒, 但]
preferred_trailing_chars: [了, 罢, 可, 必, 迟, 也]
durable_guidance_tokens: [记住, 设定, 人设, 以后, 别再, 不要再, 改成, 纠正, 必须, 不要, 应该]
single_chat_markers: [单聊, 单独聊, 单独说话]
---
# SPEAKER RULES
Speaker and chat routing tokens live here instead of Python code.
FILE:runtime/src/core/config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置管理模块
负责加载、验证和管理项目配置
"""
import os
import yaml
from pathlib import Path
from typing import Dict, Any, Optional
class Config:
"""配置管理类"""
DEFAULT_CONFIG = {
"llm": {
"provider": "local-rule-engine",
"model": "local-rule-engine",
"temperature": 0.0,
"max_tokens": 0
},
"engine": {
"name": "local-rule-engine",
"pseudo_cost_per_1k_tokens_usd": 0.001
},
"cost_control": {
"daily_budget_usd": 10.0,
"enable_cost_warning": True,
"warning_threshold": 0.8
},
"text_processing": {
"chunk_size_tokens": 8000,
"chunk_overlap_tokens": 200,
"min_sentence_length": 10
},
"distillation": {
"max_characters": 10,
"min_appearances": 3,
"traits_max_count": 10,
"values_dimensions": [
"勇气", "智慧", "善良", "忠诚", "野心",
"正义", "自由", "责任"
]
},
"relationships": {
"dimensions": [
"trust", "affection", "power_gap",
"conflict_point", "typical_interaction"
]
},
"chat_engine": {
"max_history_turns": 10,
"max_speakers_per_turn": 4,
"token_limit_per_turn": 500,
"enable_cost_display": True
},
"paths": {
"characters": "data/characters",
"relations": "data/relations",
"sessions": "data/sessions",
"corrections": "data/corrections",
"logs": "logs",
"rules": "rules"
},
"system": {
"log_level": "INFO",
"enable_auto_save": True,
"backup_interval_hours": 24
}
}
def __init__(self, config_path: Optional[str] = None):
"""
初始化配置
Args:
config_path: 配置文件路径,如为None则自动查找
"""
self.config_path = self._find_config(config_path)
self.project_root = self._resolve_project_root()
self.config = self._load_config()
self._ensure_paths()
def _resolve_project_root(self) -> Path:
"""解析项目根目录,避免输出路径依赖当前工作目录。"""
if self.config_path:
return self.config_path.parent.resolve()
return Path(__file__).resolve().parents[2]
def _find_config(self, config_path: Optional[str]) -> Optional[Path]:
"""查找配置文件"""
if config_path and os.path.exists(config_path):
return Path(config_path)
# 查找可能的配置文件位置
possible_paths = [
"config.yaml",
"config.yml",
"config/config.yaml",
os.path.expanduser("~/.zaomeng/config.yaml")
]
for path in possible_paths:
if os.path.exists(path):
return Path(path)
return None
def _load_config(self) -> Dict[str, Any]:
"""加载配置"""
if self.config_path:
try:
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
except Exception as e:
print(f"警告: 无法加载配置文件 {self.config_path}: {e}")
config = {}
else:
config = {}
# 合并默认配置
merged_config = self._merge_dicts(self.DEFAULT_CONFIG, config)
# 验证必需配置
self._validate_config(merged_config)
return merged_config
def _merge_dicts(self, base: Dict, overlay: Dict) -> Dict:
"""深度合并两个字典"""
result = base.copy()
for key, value in overlay.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = self._merge_dicts(result[key], value)
else:
result[key] = value
return result
def _validate_config(self, config: Dict[str, Any]):
"""验证配置"""
# 本地模式下仅做基础校验
if config.get("llm", {}).get("provider") != "local-rule-engine":
print("警告: 当前版本为本地 skill 引擎,建议 provider 使用 local-rule-engine")
def _ensure_paths(self):
"""确保所有必需的目录存在"""
for path_key in ["characters", "relations", "sessions", "corrections", "logs", "rules"]:
path = self.get_path(path_key)
os.makedirs(path, exist_ok=True)
def get(self, key: str, default: Any = None) -> Any:
"""获取配置值"""
keys = key.split('.')
value = self.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
def get_path(self, path_key: str) -> str:
"""获取路径配置,转换为绝对路径"""
relative_path = self.get(f"paths.{path_key}")
if not relative_path:
return ""
# 如果是绝对路径,直接返回
if os.path.isabs(relative_path):
return relative_path
# 否则相对于配置文件所在目录或项目根目录
return str((self.project_root / relative_path).resolve())
def get_llm_config(self) -> Dict[str, Any]:
"""获取 LLM 配置"""
return self.get("llm", {})
def get_distillation_config(self) -> Dict[str, Any]:
"""获取蒸馏配置"""
return self.get("distillation", {})
def get_cost_config(self) -> Dict[str, Any]:
"""获取成本控制配置"""
return self.get("cost_control", {})
def save(self, path: Optional[str] = None):
"""保存配置到文件"""
save_path = Path(path) if path else self.config_path
if not save_path:
save_path = Path("config.yaml")
# 确保目录存在
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, 'w', encoding='utf-8') as f:
yaml.dump(self.config, f, allow_unicode=True, default_flow_style=False)
print(f"配置已保存到: {save_path}")
def update(self, updates: Dict[str, Any]):
"""更新配置"""
self.config = self._merge_dicts(self.config, updates)
def get_supported_models(self) -> list:
"""保留兼容接口,返回本地引擎列表"""
return ["local-rule-engine"]
def set_api_key(self, api_key: str):
"""兼容旧接口;本地模式不需要 API key"""
self.config["llm"]["api_key"] = api_key
def set_model(self, model: str):
"""设置引擎名(兼容旧接口)"""
self.config["llm"]["model"] = model
FILE:runtime/src/core/contracts.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Optional, Protocol
class CostEstimator(Protocol):
def estimate_cost(self, prompt: str, expected_completion_ratio: float = 0.0) -> float:
...
class CorrectionService(Protocol):
def detect_ooc(self, profile: Dict[str, Any], message: str) -> Any:
...
def search_similar_corrections(
self,
text: str,
character: Optional[str] = None,
target: Optional[str] = None,
top_k: int = 3,
) -> List[Dict[str, Any]]:
...
def relation_alignment_issues(self, message: str, relation_state: Dict[str, Any]) -> List[str]:
...
class RuleProvider(Protocol):
def section(self, name: str) -> Dict[str, Any]:
...
def get(self, section: str, key: str, default: Any = None) -> Any:
...
class PathProviderLike(Protocol):
def project_root(self) -> Path:
...
def characters_root(self, novel_id: Optional[str] = None) -> Path:
...
def character_dir(self, novel_id: str, character_name: str) -> Path:
...
def relations_root(self, novel_id: Optional[str] = None) -> Path:
...
def relations_file(self, novel_id: str) -> Path:
...
def sessions_dir(self) -> Path:
...
def corrections_dir(self) -> Path:
...
FILE:runtime/src/core/llm_client.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
本地统计客户端(无模型依赖)
负责 token 估算、费用统计、预算控制
"""
from __future__ import annotations
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import tiktoken
except Exception:
tiktoken = None
from .config import Config
from src.utils.file_utils import load_markdown_data, save_markdown_data
class LLMClient:
"""Local usage tracker for compatibility with existing modules."""
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
self.cost_config = self.config.get_cost_config()
self.engine_config = self.config.get("engine", {})
self.session_cost = 0.0
self.daily_cost = 0.0
self.last_reset_date = datetime.now().date()
self.request_count = 0
self.total_tokens = 0
self._load_cost_stats()
try:
self.encoder = tiktoken.get_encoding("cl100k_base") if tiktoken else None
except Exception:
self.encoder = None
def _load_cost_stats(self):
stats_file = Path(self.config.project_root) / "data" / "cost_stats.md"
if stats_file.exists():
try:
data = load_markdown_data(stats_file, default={}) or {}
self.daily_cost = float(data.get("daily_cost", 0.0))
last = data.get("last_reset_date")
if last:
self.last_reset_date = datetime.fromisoformat(last).date()
except Exception:
pass
self._check_reset_daily()
def _save_cost_stats(self):
stats_file = Path(self.config.project_root) / "data" / "cost_stats.md"
payload = {
"daily_cost": self.daily_cost,
"last_reset_date": self.last_reset_date.isoformat(),
"total_requests": self.request_count,
"total_tokens": self.total_tokens,
}
save_markdown_data(
stats_file,
payload,
title="COST_STATS",
summary=[
f"- daily_cost: {self.daily_cost}",
f"- total_requests: {self.request_count}",
f"- total_tokens: {self.total_tokens}",
],
)
def _check_reset_daily(self):
today = datetime.now().date()
if today > self.last_reset_date:
self.daily_cost = 0.0
self.last_reset_date = today
self._save_cost_stats()
def _check_budget(self):
daily_budget = float(self.cost_config.get("daily_budget_usd", 10.0))
if self.daily_cost >= daily_budget:
raise Exception(f"日预算已用完: .2f >= .2f")
threshold = float(self.cost_config.get("warning_threshold", 0.8))
if self.daily_cost >= daily_budget * threshold:
remaining = daily_budget - self.daily_cost
print(f"警告: 日预算已使用 {self.daily_cost / daily_budget * 100:.1f}%")
print(f"剩余预算: .2f")
def count_tokens(self, text: str) -> int:
if not text:
return 0
if self.encoder:
return len(self.encoder.encode(text))
return max(1, len(text) // 2)
def _calculate_cost(self, prompt_tokens: int, completion_tokens: int) -> float:
# Local engine pseudo-cost so budget control still works.
# Can be configured to 0 for completely free mode.
unit = float(self.engine_config.get("pseudo_cost_per_1k_tokens_usd", 0.001))
return ((prompt_tokens + completion_tokens) / 1000.0) * unit
def estimate_cost(self, text: str, expected_completion_ratio: float = 0.5) -> float:
prompt_tokens = self.count_tokens(text)
completion_tokens = int(prompt_tokens * expected_completion_ratio)
return self._calculate_cost(prompt_tokens, completion_tokens)
def record_usage(self, prompt_tokens: int, completion_tokens: int = 0, elapsed_time: float = 0.0):
self._check_budget()
total_tokens = prompt_tokens + completion_tokens
cost = self._calculate_cost(prompt_tokens, completion_tokens)
self.session_cost += cost
self.daily_cost += cost
self.request_count += 1
self.total_tokens += total_tokens
self._save_cost_stats()
if self.cost_config.get("enable_cost_warning", True):
print(
f"[Tokens: {prompt_tokens}+{completion_tokens}={total_tokens}] "
f"[Cost: .4f] [Time: {elapsed_time:.2f}s]"
)
print(f"[Session: .4f] [Daily: .4f]")
return {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"cost": cost,
"elapsed_time": elapsed_time,
}
def chat_completion(
self,
messages: List[Dict[str, str]],
model: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
stream: bool = False,
) -> Dict[str, Any]:
# Compatibility shim: local mode does not call external models.
start = time.time()
prompt = "\n".join(f"{m.get('role','user')}: {m.get('content','')}" for m in messages)
prompt_tokens = self.count_tokens(prompt)
content = "本地模式未启用云模型。请使用规则引擎发言。"
completion_tokens = self.count_tokens(content)
usage = self.record_usage(prompt_tokens, completion_tokens, time.time() - start)
usage["content"] = content
usage["model"] = "local-rule-engine"
return usage
def get_cost_summary(self) -> Dict[str, Any]:
daily_budget = float(self.cost_config.get("daily_budget_usd", 10.0))
remaining_budget = max(0.0, daily_budget - self.daily_cost)
return {
"session_cost": self.session_cost,
"daily_cost": self.daily_cost,
"daily_budget": daily_budget,
"remaining_budget": remaining_budget,
"budget_usage_percent": (self.daily_cost / daily_budget * 100) if daily_budget > 0 else 0,
"request_count": self.request_count,
"total_tokens": self.total_tokens,
}
def reset_session_cost(self):
self.session_cost = 0.0
print("会话成本统计已重置")
FILE:runtime/src/core/main.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""zaomeng CLI entrypoint."""
from __future__ import annotations
import argparse
import re
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
src_path = Path(__file__).parent.parent
sys.path.insert(0, str(src_path))
from src.core.config import Config
from src.core.llm_client import LLMClient
from src.core.path_provider import PathProvider
from src.core.rulebook import RuleBook
from src.modules.reflection import ReflectionEngine
from src.modules.speaker import Speaker
from src.modules.chat_engine import ChatEngine
from src.modules.distillation import NovelDistiller
from src.modules.relationships import RelationshipExtractor
from src.utils.file_utils import (
load_text_argument,
normalize_character_name,
novel_id_from_input,
parse_character_argument,
save_markdown_data,
)
from src.utils.token_counter import TokenCounter
@dataclass
class ChatIntent:
mode: str
controlled_character: str = ""
target_characters: list[str] = field(default_factory=list)
participants: list[str] = field(default_factory=list)
message: str = ""
setup_only: bool = False
class ZaomengCLI:
ACT_SETUP_PATTERNS = (
r"让我扮演",
r"我来扮演",
r"我要扮演",
r"我扮演",
r"你扮演",
r"你来扮演",
r"进入.+act",
r"进入.+行动模式",
r"开启.+act",
r"切换到.+act",
r"我说一句",
r"回一句",
r"一问一答",
r"对聊",
r"你来回我",
r"你让.+回我",
r"你驱动",
)
OBSERVE_SETUP_PATTERNS = (
r"进入.+群聊模式",
r"开启.+群聊模式",
r"进入.+observe",
r"切换到.+observe",
r"开始群聊",
r"让.+群聊",
r"多人聊天",
)
ACT_MODE_HINTS = (
"act",
"行动模式",
"扮演",
"我说一句",
"回一句",
"回我",
"回复我",
"接我的话",
"一问一答",
"对聊",
)
OBSERVE_MODE_HINTS = ("群聊模式", "observe", "围绕", "各说一句", "大家聊", "让大家", "多人聊", "群聊")
def __init__(self) -> None:
self.config = Config()
self.path_provider = PathProvider(self.config)
self.rulebook = RuleBook(self.config, path_provider=self.path_provider)
self.parser = self._create_parser()
def _build_llm(self) -> LLMClient:
return LLMClient(self.config)
@staticmethod
def _build_token_counter() -> TokenCounter:
return TokenCounter()
def _build_reflection(self) -> ReflectionEngine:
return ReflectionEngine(self.config, path_provider=self.path_provider)
def _build_distiller(
self,
*,
llm_client: Optional[LLMClient] = None,
token_counter: Optional[TokenCounter] = None,
) -> NovelDistiller:
return NovelDistiller(
self.config,
llm_client=llm_client or self._build_llm(),
token_counter=token_counter or self._build_token_counter(),
rulebook=self.rulebook,
path_provider=self.path_provider,
)
def _build_speaker(self, reflection: Optional[ReflectionEngine] = None) -> Speaker:
return Speaker(
self.config,
correction_service=reflection or self._build_reflection(),
rulebook=self.rulebook,
)
def _build_chat_engine(self) -> ChatEngine:
llm = self._build_llm()
reflection = self._build_reflection()
distiller = self._build_distiller(llm_client=llm)
speaker = self._build_speaker(reflection)
return ChatEngine(
self.config,
llm=llm,
reflection=reflection,
speaker=speaker,
distiller=distiller,
rulebook=self.rulebook,
path_provider=self.path_provider,
)
def _build_relationship_extractor(self) -> RelationshipExtractor:
llm = self._build_llm()
token_counter = self._build_token_counter()
distiller = self._build_distiller(llm_client=llm, token_counter=token_counter)
return RelationshipExtractor(
self.config,
llm_client=llm,
token_counter=token_counter,
distiller=distiller,
rulebook=self.rulebook,
path_provider=self.path_provider,
)
def _create_parser(self) -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"zaomeng: local rule-based novel character tooling. "
"Not a general-purpose LLM chatbot."
),
epilog="See PROJECT.md for project notes.",
)
subparsers = parser.add_subparsers(dest="command", help="Available commands")
distill_parser = subparsers.add_parser(
"distill",
help="Distill character profiles from a novel",
description=(
"Distill character profiles from a novel.\n\n"
"Interaction rule:\n"
" - By default this command asks for cost confirmation.\n"
" - In tool-driven or non-interactive runs, use `--force` after the user has agreed.\n"
" - Use `--characters` when the user already knows the target roles."
),
formatter_class=argparse.RawTextHelpFormatter,
)
distill_parser.add_argument("--novel", "-n", required=True, help="Novel file path (.txt or .epub)")
distill_parser.add_argument("--characters", "-c", help="Comma-separated target character names")
distill_parser.add_argument(
"--characters-file",
help="UTF-8 text file containing target character names (newline/comma separated)",
)
distill_parser.add_argument("--output", "-o", help="Optional output directory override")
distill_parser.add_argument(
"--force",
"-f",
action="store_true",
help="Skip cost confirmation for non-interactive runs",
)
chat_parser = subparsers.add_parser(
"chat",
help="Run constrained roleplay chat via CLI",
description=(
"Run constrained roleplay chat.\n\n"
"Important:\n"
" - This is a local rule-based character engine.\n"
" - It is not a general-purpose LLM chatbot.\n"
" - For agent use, default to `--message`.\n"
" - Do not rebuild chat manually from source files.\n\n"
"Prerequisites:\n"
" 1. Run `distill` first so character profiles exist.\n"
" 2. Run `extract` first if you want relation-aware replies.\n"
" 3. `--mode auto` can infer act/observe from natural language setup requests.\n\n"
"Usage modes:\n"
" - Interactive: omit `--message` and chat in the terminal.\n"
" - Single-turn: pass `--message` to run one turn and exit.\n"
" - Setup-only: pass a natural language mode request and zaomeng will create a session."
),
epilog=(
"Inline commands in interactive mode:\n"
" /save\n"
" /reflect\n"
" /correct 角色|对象|原句|修正句|原因\n"
" /quit"
),
formatter_class=argparse.RawTextHelpFormatter,
)
chat_parser.add_argument("--novel", "-n", required=True, help="Novel path or novel name")
chat_parser.add_argument(
"--mode",
"-m",
choices=["auto", "observe", "act"],
default="auto",
help="`auto` infers act/observe from natural language setup requests",
)
chat_parser.add_argument("--character", "-c", help="Controlled character in act mode")
chat_parser.add_argument("--session", "-s", help="Restore an existing session ID")
chat_parser.add_argument("--message", help="Run a single non-interactive turn and exit")
chat_parser.add_argument("--message-file", help="UTF-8 text file containing one chat request/message")
view_parser = subparsers.add_parser("view", help="View a distilled character profile")
view_parser.add_argument("--character", "-c", required=True, help="Character name")
view_parser.add_argument("--novel", "-n", help="Optional novel path/name for scoping")
correct_parser = subparsers.add_parser("correct", help="Persist a correction example")
correct_parser.add_argument("--session", "-s", required=True, help="Session ID")
correct_parser.add_argument("--message", "-m", required=True, help="Original message")
correct_parser.add_argument("--corrected", "-c", required=True, help="Corrected message")
correct_parser.add_argument("--character", "-r", help="Character name")
correct_parser.add_argument("--target", "-t", help="Target character name")
correct_parser.add_argument("--reason", help="Correction reason")
extract_parser = subparsers.add_parser(
"extract",
help="Extract relationship graph from a novel",
description=(
"Extract a relationship graph from a novel.\n\n"
"Interaction rule:\n"
" - By default this command asks for cost confirmation.\n"
" - In tool-driven or non-interactive runs, use `--force` after the user has agreed.\n"
" - Run `distill` first if you also need character profiles for chat."
),
formatter_class=argparse.RawTextHelpFormatter,
)
extract_parser.add_argument("--novel", "-n", required=True, help="Novel file path")
extract_parser.add_argument("--characters", "-c", help="Comma-separated target character names")
extract_parser.add_argument(
"--characters-file",
help="UTF-8 text file containing target character names (newline/comma separated)",
)
extract_parser.add_argument("--output", "-o", help="Optional output path override")
extract_parser.add_argument(
"--force",
"-f",
action="store_true",
help="Skip cost confirmation for non-interactive runs",
)
return parser
def run(self) -> None:
args = self.parser.parse_args()
if not args.command:
self.parser.print_help()
return
try:
if args.command == "distill":
self._handle_distill(args)
elif args.command == "chat":
self._handle_chat(args)
elif args.command == "view":
self._handle_view(args)
elif args.command == "correct":
self._handle_correct(args)
elif args.command == "extract":
self._handle_extract(args)
else:
raise ValueError(f"Unknown command: {args.command}")
except KeyboardInterrupt:
print("\nOperation cancelled.")
sys.exit(0)
except Exception as exc:
print(f"Error: {exc}")
sys.exit(1)
def _handle_distill(self, args: argparse.Namespace) -> None:
print("=== Character Distillation ===")
if args.force:
print("Confirmation: skipped via --force")
else:
print("This command is confirmation-gated. Use --force only after confirming with the user.")
distiller = self._build_distiller()
if not args.force:
cost_estimate = distiller.estimate_cost(args.novel)
print(f"Estimated cost: .2f USD")
confirm = input("Continue? (y/n): ").strip().lower()
if confirm != "y":
print("Operation cancelled.")
return
characters = parse_character_argument(args.characters, args.characters_file) or None
output_dir = args.output or str(Path(self.config.get_path("characters")) / novel_id_from_input(args.novel))
print(f"Processing novel: {args.novel}")
result = distiller.distill(args.novel, characters, output_dir)
print(f"\nDone. Extracted {len(result)} characters:")
for char_name in result:
print(f" - {char_name}")
def _handle_chat(self, args: argparse.Namespace) -> None:
print("=== Chat Engine ===")
engine = self._build_chat_engine()
session: Optional[dict] = None
if args.session:
session = engine.restore_session(args.session)
print(f"Restored session: {session['title']}")
elif args.novel:
print(f"Loading scoped profiles for: {args.novel}")
args.message = load_text_argument(args.message, getattr(args, "message_file", None))
args.character = load_text_argument(args.character)
intent = self._resolve_chat_intent(engine, args, session)
if session is None:
session = engine.create_session(args.novel, intent.mode)
session["mode"] = intent.mode
self._apply_chat_session_state(engine, session, intent)
if args.message:
if intent.setup_only:
engine._save_session(session)
self._print_setup_confirmation(session, intent)
return
responses = self._run_single_chat_turn(
engine,
session,
intent.mode,
intent.controlled_character,
intent.message,
)
for speaker, message in responses:
print(f"{speaker}: {message}")
engine.print_turn_cost()
engine.print_correction_hint(session)
return
print("This is an interactive command. Prepare your first user turn before entering the session.")
if intent.mode == "act":
role = intent.controlled_character or "<character>"
print(f"Mode: act | Controlled role: {role}")
print("Starter input example: 我先表态,你们再接。")
if not intent.controlled_character:
raise ValueError("--character is required in act mode unless the request names the role.")
engine.act_mode(session, intent.controlled_character)
return
print("Mode: observe")
print("Starter input example: 请让大家围绕这件事各说一句。")
print("Inline commands: /save /reflect /correct /quit")
engine.observe_mode(session)
@staticmethod
def _run_single_chat_turn(
engine: ChatEngine,
session: dict,
mode: str,
controlled_character: str,
message: str,
) -> list[tuple[str, str]]:
if mode == "act":
if not controlled_character:
raise ValueError("--character is required in act mode")
return engine.act_once(session, controlled_character, message)
return engine.observe_once(session, message)
def _resolve_chat_intent(
self,
engine: ChatEngine,
args: argparse.Namespace,
session: Optional[dict],
) -> ChatIntent:
text = load_text_argument(args.message)
candidates = session.get("characters", []) if session else self._load_candidate_names(engine, args.novel)
explicit_mode = "" if args.mode == "auto" else args.mode
inferred_mode = self._infer_chat_mode(text)
mode = explicit_mode or ""
if not mode and args.character:
mode = "act"
if not mode:
mode = inferred_mode or (session.get("mode") if session else "observe")
ordered_mentions = engine._mentioned_characters(text, candidates) if text and candidates else []
controlled = self._resolve_character_reference(engine, args.character, candidates)
controlled = controlled or self._infer_controlled_character(mode, text, ordered_mentions, candidates, session)
if mode == "observe" and not explicit_mode and inferred_mode == "act":
mode = "act"
controlled = controlled or self._infer_controlled_character(mode, text, ordered_mentions, candidates, session)
targets = self._infer_target_characters(mode, controlled, ordered_mentions, candidates, session)
participants = self._infer_participants(mode, controlled, targets, ordered_mentions)
setup_only = self._is_setup_only_request(text, mode, controlled, targets)
return ChatIntent(
mode=mode,
controlled_character=controlled,
target_characters=targets,
participants=participants,
message="" if setup_only else text,
setup_only=setup_only,
)
def _apply_chat_session_state(self, engine: ChatEngine, session: dict, intent: ChatIntent) -> None:
state = session.setdefault("state", {})
state.setdefault("focus_targets", {})
should_scope_participants = bool(intent.participants) and (
intent.setup_only or not session.get("history")
)
if should_scope_participants:
session["characters"] = list(intent.participants)
state["selected_characters"] = list(intent.participants)
state["relation_matrix"] = engine._build_relation_matrix(intent.participants, session.get("novel_id"))
if intent.controlled_character:
state["controlled_character"] = intent.controlled_character
if intent.controlled_character and len(intent.target_characters) == 1:
state["focus_targets"][intent.controlled_character] = intent.target_characters[0]
@staticmethod
def _print_setup_confirmation(session: dict, intent: ChatIntent) -> None:
print(f"Session ready: {session['id']}")
print(f"Mode: {intent.mode}")
if intent.controlled_character:
print(f"Controlled role: {intent.controlled_character}")
if intent.target_characters:
print(f"Primary target: {', '.join(intent.target_characters)}")
if intent.participants:
print(f"Participants: {', '.join(intent.participants)}")
@staticmethod
def _load_candidate_names(engine: ChatEngine, novel: str) -> list[str]:
profiles = engine._load_character_profiles(novel_id=novel_id_from_input(novel))
return list(profiles.keys())
def _infer_chat_mode(self, text: str) -> str:
if not text:
return ""
lowered = text.lower()
if any(token in lowered for token in ("act模式", "进入act", "切换到act", " act ")):
return "act"
if any(token in text for token in self.ACT_MODE_HINTS):
return "act"
if any(token in text for token in self.OBSERVE_MODE_HINTS):
return "observe"
return ""
def _infer_controlled_character(
self,
mode: str,
text: str,
ordered_mentions: list[str],
candidates: list[str],
session: Optional[dict],
) -> str:
if mode != "act":
return ""
if len(ordered_mentions) >= 2 and any(
token in text for token in ("让我扮演", "我来扮演", "我要扮演", "我扮演", "聊天", "对话", "对聊", "act")
):
return ordered_mentions[0]
if len(ordered_mentions) == 1 and any(token in text for token in ("扮演", "饰演", "由我", "我来")):
return ordered_mentions[0]
if len(ordered_mentions) == 1 and any(token in text for token in ("我说一句", "回一句", "回我")) and len(candidates) == 2:
target = ordered_mentions[0]
return next((name for name in candidates if name != target), "")
if session:
stored = session.get("state", {}).get("controlled_character", "")
if stored in candidates:
return stored
return ""
@staticmethod
def _infer_target_characters(
mode: str,
controlled: str,
ordered_mentions: list[str],
candidates: list[str],
session: Optional[dict],
) -> list[str]:
if mode != "act":
return ordered_mentions
targets = [name for name in ordered_mentions if name != controlled]
if targets:
return targets
if controlled and session:
remembered = session.get("state", {}).get("focus_targets", {}).get(controlled, "")
if remembered in candidates:
return [remembered]
return []
@staticmethod
def _infer_participants(
mode: str,
controlled: str,
targets: list[str],
ordered_mentions: list[str],
) -> list[str]:
if mode == "act":
participants: list[str] = []
if controlled:
participants.append(controlled)
for name in targets:
if name not in participants:
participants.append(name)
return participants
return ordered_mentions
def _is_setup_only_request(
self,
text: str,
mode: str,
controlled: str,
targets: list[str],
) -> bool:
if not text:
return False
if mode == "act":
if any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in self.ACT_SETUP_PATTERNS):
return True
if "模式" in text and controlled:
return True
if controlled and targets and any(token in text for token in ("聊天", "对话", "互动")):
return True
if mode == "observe":
if any(re.search(pattern, text, flags=re.IGNORECASE) for pattern in self.OBSERVE_SETUP_PATTERNS):
return True
return False
@staticmethod
def _resolve_character_reference(engine: ChatEngine, raw_name: Optional[str], candidates: list[str]) -> str:
if not raw_name:
return ""
if not candidates:
return normalize_character_name(raw_name)
try:
return engine._resolve_character_name(raw_name, candidates)
except ValueError:
return ""
def _handle_view(self, args: argparse.Namespace) -> None:
novel_id = novel_id_from_input(args.novel) if args.novel else None
engine = self._build_chat_engine()
profiles = engine._load_character_profiles(novel_id=novel_id)
normalized = normalize_character_name(args.character)
if normalized not in profiles:
normalized = None
if not normalized:
scope = f" under novel '{novel_id}'" if novel_id else ""
print(f"Profile not found for {args.character}{scope}.")
return
data = profiles[normalized]
print(f"=== {args.character} ===")
if data.get("novel_id"):
print(f"Novel: {data['novel_id']}")
print(f"Traits: {', '.join(data.get('core_traits', []))}")
print(f"Speech: {data.get('speech_style', '')}")
print("\nValues:")
for dim, value in data.get("values", {}).items():
print(f" {dim}: {value}/10")
if data.get("typical_lines"):
print("\nTypical lines:")
for line in data["typical_lines"][:5]:
print(f" - {line}")
if data.get("evidence"):
print("\nEvidence:")
for key, value in data["evidence"].items():
print(f" {key}: {value}")
def _handle_correct(self, args: argparse.Namespace) -> None:
print("=== Save Correction ===")
corrections_dir = Path(self.config.get_path("corrections"))
corrections_dir.mkdir(parents=True, exist_ok=True)
correction = {
"session_id": args.session,
"character": args.character or "unknown",
"target": args.target or "",
"original_message": args.message,
"corrected_message": args.corrected,
"reason": args.reason or "",
"timestamp": int(time.time()),
}
filename = f"correction_{args.session}_{correction['timestamp']}.md"
filepath = corrections_dir / filename
save_markdown_data(
filepath,
correction,
title="CORRECTION",
summary=[
f"- character: {correction['character']}",
f"- target: {correction['target']}",
f"- reason: {correction['reason']}",
],
)
print(f"Saved correction: {filepath}")
def _handle_extract(self, args: argparse.Namespace) -> None:
print("=== Relationship Extraction ===")
if args.force:
print("Confirmation: skipped via --force")
else:
print("This command is confirmation-gated. Use --force only after confirming with the user.")
extractor = self._build_relationship_extractor()
if not args.force:
cost_estimate = extractor.estimate_cost(args.novel)
print(f"Estimated cost: .2f USD")
confirm = input("Continue? (y/n): ").strip().lower()
if confirm != "y":
print("Operation cancelled.")
return
output_path = args.output
characters = parse_character_argument(args.characters, args.characters_file) or None
result = extractor.extract(args.novel, output_path, characters=characters)
print(f"\nDone. Extracted {len(result)} relationships:")
for rel_key in list(result.keys())[:5]:
print(f" - {rel_key}")
def main() -> None:
ZaomengCLI().run()
if __name__ == "__main__":
main()
FILE:runtime/src/core/path_provider.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
from typing import Optional
from src.core.config import Config
from src.utils.file_utils import ensure_dir, safe_filename
class PathProvider:
"""Centralized path resolution for runtime data and editable rule assets."""
def __init__(self, config: Config):
if config is None:
raise ValueError("PathProvider requires an injected Config instance")
self.config = config
def project_root(self) -> Path:
return Path(self.config.project_root)
def rules_root(self) -> Path:
return ensure_dir(self.config.get_path("rules"))
def characters_root(self, novel_id: Optional[str] = None) -> Path:
root = ensure_dir(self.config.get_path("characters"))
return ensure_dir(root / novel_id) if novel_id else root
def character_dir(self, novel_id: str, character_name: str) -> Path:
return ensure_dir(self.characters_root(novel_id) / safe_filename(character_name))
def relations_root(self, novel_id: Optional[str] = None) -> Path:
root = ensure_dir(self.config.get_path("relations"))
return ensure_dir(root / novel_id) if novel_id else root
def relations_file(self, novel_id: str) -> Path:
return self.relations_root(novel_id) / f"{novel_id}_relations.md"
def sessions_dir(self) -> Path:
return ensure_dir(self.config.get_path("sessions"))
def corrections_dir(self) -> Path:
return ensure_dir(self.config.get_path("corrections"))
def logs_dir(self) -> Path:
return ensure_dir(self.config.get_path("logs"))
FILE:runtime/src/core/rulebook.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Optional
from src.core.config import Config
from src.core.path_provider import PathProvider
from src.utils.file_utils import load_markdown_data
class RuleBook:
"""Loads editable rule assets from local markdown files."""
FILE_MAP = {
"distillation": "distillation_rules.md",
"speaker": "speaker_rules.md",
"relationships": "relationship_rules.md",
"persona": "persona_rules.md",
}
def __init__(
self,
config: Config,
*,
path_provider: PathProvider,
base_dir: Optional[str | Path] = None,
):
if config is None or path_provider is None:
raise ValueError("RuleBook requires injected config and path_provider")
self.config = config
self.path_provider = path_provider
self.base_dir = Path(base_dir) if base_dir else self.path_provider.rules_root()
self._sections = self._load_sections()
def section(self, name: str) -> Dict[str, Any]:
value = self._sections.get(name, {})
return dict(value) if isinstance(value, dict) else {}
def get(self, section: str, key: str, default: Any = None) -> Any:
return self.section(section).get(key, default)
def _load_sections(self) -> Dict[str, Dict[str, Any]]:
sections: Dict[str, Dict[str, Any]] = {}
for section_name, filename in self.FILE_MAP.items():
path = self.base_dir / filename
payload = load_markdown_data(path, default={}) or {}
sections[section_name] = payload if isinstance(payload, dict) else {}
return sections
FILE:runtime/src/core/__init__.py
FILE:runtime/src/modules/chat_engine.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.core.config import Config
from src.core.path_provider import PathProvider
from src.core.rulebook import RuleBook
from src.core.llm_client import LLMClient
from src.modules.distillation import NovelDistiller
from src.modules.reflection import ReflectionEngine
from src.modules.speaker import Speaker
from src.utils.file_utils import (
canonical_aliases,
ensure_dir,
load_markdown_data,
normalize_character_name,
normalize_relation_key,
novel_id_from_input,
safe_filename,
save_markdown_data,
)
class ChatEngine:
"""Multi-character chat with novel-scoped assets."""
SYSTEM_SPEAKERS = {"Narrator", "User", "旁白", "用户"}
ADDRESS_SUFFIXES = ("哥哥", "姐姐", "妹妹", "弟弟", "姑娘", "公子", "爷")
def __init__(
self,
config: Optional[Config] = None,
*,
llm: Optional[LLMClient] = None,
reflection: Optional[ReflectionEngine] = None,
speaker: Optional[Speaker] = None,
distiller: Optional[NovelDistiller] = None,
rulebook: Optional[RuleBook] = None,
path_provider: Optional[PathProvider] = None,
):
self.config = config or Config()
if (
llm is None
or reflection is None
or speaker is None
or distiller is None
or rulebook is None
or path_provider is None
):
raise ValueError(
"ChatEngine requires injected llm, reflection, speaker, distiller, rulebook, and path_provider"
)
self.path_provider = path_provider
self.rulebook = rulebook
self.llm = llm
self.reflection = reflection
self.distiller = distiller
self.speaker = speaker
self.characters_dir = self.path_provider.characters_root()
self.sessions_dir = self.path_provider.sessions_dir()
self.relations_dir = self.path_provider.relations_root()
self.address_suffixes = tuple(
getattr(self.distiller, "address_suffixes", ())
or self.rulebook.get("distillation", "address_suffixes", list(self.ADDRESS_SUFFIXES))
)
def create_session(self, novel: str, mode: str) -> Dict[str, Any]:
novel_id = novel_id_from_input(novel)
profiles = self._load_character_profiles(novel_id)
if not profiles:
raise RuntimeError(f"No character profiles found for novel '{novel_id}'. Run distill first.")
characters = list(profiles.keys())
session = {
"id": uuid.uuid4().hex[:12],
"title": f"{novel}_{mode}_{int(time.time())}",
"novel": novel,
"novel_id": novel_id,
"mode": mode,
"created_at": int(time.time()),
"characters": characters,
"history": [],
"state": {
"emotion": {},
"focus_targets": {},
"controlled_character": "",
"selected_characters": list(characters),
"relation_delta": {},
"relation_matrix": self._build_relation_matrix(characters, novel_id),
},
}
self._save_session(session)
return session
def restore_session(self, session_id: str) -> Dict[str, Any]:
path = self.sessions_dir / f"{session_id}.md"
data = load_markdown_data(path, default=None)
if not data:
raise FileNotFoundError(f"Session not found: {session_id}")
data.setdefault("novel_id", novel_id_from_input(data.get("novel", session_id)))
data.setdefault("state", {})
data["state"].setdefault("focus_targets", {})
data["state"].setdefault("controlled_character", "")
data["state"].setdefault("selected_characters", list(data.get("characters", [])))
return data
def observe_mode(self, session: Dict[str, Any]) -> None:
print("进入 observe 模式。输入 /save /reflect /correct /quit")
while True:
user_msg = input("\n你: ").strip()
if not user_msg:
continue
if self._handle_inline_command(session, user_msg):
if user_msg == "/quit":
break
continue
responses = self.observe_once(session, user_msg)
self._print_responses(responses)
self.print_turn_cost()
self.print_correction_hint(session)
def act_mode(self, session: Dict[str, Any], character: str) -> None:
controlled = self._resolve_character_name(character, session["characters"])
if controlled not in session["characters"]:
raise ValueError(f"Character '{character}' not found in this session.")
print(f"进入 act 模式,你扮演 {controlled}。输入 /save /reflect /correct /quit")
while True:
user_msg = input(f"\n{controlled}(你): ").strip()
if not user_msg:
continue
if self._handle_inline_command(session, user_msg):
if user_msg == "/quit":
break
continue
try:
responses = self.act_once(session, controlled, user_msg)
except ValueError as exc:
print(exc)
continue
self._print_responses(responses)
self.print_turn_cost()
self.print_correction_hint(session)
def observe_once(self, session: Dict[str, Any], user_msg: str) -> List[tuple[str, str]]:
speaker, normalized_msg = self._resolve_observe_turn(session, user_msg)
responders = self._active_characters(session, speaker=speaker, context=normalized_msg)
return self._run_turn(session, speaker, normalized_msg, responders)
def act_once(self, session: Dict[str, Any], character: str, user_msg: str) -> List[tuple[str, str]]:
controlled = self._resolve_character_name(character, session["characters"])
if controlled not in session["characters"]:
raise ValueError(f"Character '{character}' not found in this session.")
responders = self._active_characters(session, speaker=controlled, context=user_msg)
if not responders:
raise ValueError("未识别到明确对话对象。请在消息里点名角色,或先补充关系数据。")
return self._run_turn(session, controlled, user_msg, responders)
def print_turn_cost(self) -> None:
summary = self.llm.get_cost_summary()
print(
f"[累计] token={summary['total_tokens']} "
f"session=.4f daily=.4f"
)
@staticmethod
def print_correction_hint(session: Dict[str, Any]) -> None:
print(f"修正方式:/correct 角色|对象|原句|修正句|原因 或 correct --session {session['id']} ...")
def _run_turn(
self,
session: Dict[str, Any],
speaker: str,
user_msg: str,
responders: List[str],
) -> List[tuple[str, str]]:
message = user_msg.strip()
if not message:
raise ValueError("消息不能为空。")
session["history"].append({"speaker": speaker, "message": message, "ts": int(time.time())})
self._persist_runtime_guidance(session, speaker, message)
self._remember_focus_targets(session, speaker, responders)
profiles = self._load_character_profiles(session.get("novel_id"))
responses: List[tuple[str, str]] = []
for name in responders:
profile = profiles.get(name, {"name": name})
target_name = self._infer_target(name, session["history"], session["characters"])
relation_state = self._get_relation_state(session, name, target_name)
reply = self.speaker.generate(
character_profile=profile,
context=message,
history=session["history"],
target_name=target_name,
relation_state=relation_state,
relation_hint=self._relation_hint(name, session["characters"], session.get("novel_id")),
)
reply = self._guard_reply(profile, reply, relation_state, target_name)
responses.append((name, reply))
session["history"].append(
{"speaker": name, "target": target_name, "message": reply, "ts": int(time.time())}
)
self._trim_history(session)
self._update_state(session)
self._save_session(session)
return responses
def _remember_focus_targets(self, session: Dict[str, Any], speaker: str, responders: List[str]) -> None:
if speaker in self.SYSTEM_SPEAKERS or not responders:
return
focus_targets = session.setdefault("state", {}).setdefault("focus_targets", {})
if len(responders) == 1:
focus_targets[speaker] = responders[0]
elif speaker in focus_targets:
focus_targets.pop(speaker, None)
@staticmethod
def _print_responses(responses: List[tuple[str, str]]) -> None:
for speaker, message in responses:
print(f"{speaker}: {message}")
def _handle_inline_command(self, session: Dict[str, Any], command: str) -> bool:
if command == "/quit":
self._save_session(session)
print("会话结束。")
return True
if command == "/save":
self._save_session(session)
print(f"已保存会话: {session['id']}")
return True
if command == "/reflect":
self._reflect_last_turn(session)
return True
if command.startswith("/correct"):
payload = command[len("/correct") :].strip()
parts = [p.strip() for p in payload.split("|")]
if len(parts) not in (3, 4, 5):
print("格式错误。用法: /correct 角色|对象|原句|修正句|原因")
return True
if len(parts) == 3:
character, target, original, corrected, reason = parts[0], "", parts[1], parts[2], "inline_command"
elif len(parts) == 4:
character, target, original, corrected, reason = parts[0], parts[1], parts[2], parts[3], "inline_command"
else:
character, target, original, corrected, reason = parts[0], parts[1], parts[2], parts[3], parts[4]
item = self.reflection.save_correction(
session_id=session["id"],
character=character,
target=target or None,
original_message=original,
corrected_message=corrected,
reason=reason,
)
self._persist_correction_memory(session, character, target, original, corrected, reason)
print(f"纠错已记录: {item['character']} -> {item.get('target') or '任意对象'}")
return True
return False
def _reflect_last_turn(self, session: Dict[str, Any]) -> None:
if not session["history"]:
print("暂无历史可反思。")
return
profiles = self._load_character_profiles(session.get("novel_id"))
last = session["history"][-1]
profile = profiles.get(last["speaker"])
if not profile:
print("最近一条不是角色发言。")
return
check = self.reflection.detect_ooc(profile, last["message"])
if not check.is_ooc:
print("反思结果:最近发言符合人设。")
return
print("反思结果:疑似 OOC")
for reason in check.reasons:
print(f"- {reason}")
def _relation_hint(self, speaker: str, all_chars: List[str], novel_id: Optional[str]) -> str:
hints = []
for other in all_chars:
if other == speaker:
continue
item = self._get_relation_state_from_disk(speaker, other, novel_id)
if item:
hints.append(
f"{other}(trust={item.get('trust', 5)},aff={item.get('affection', 5)},host={item.get('hostility', max(0, 5 - item.get('affection', 5)))})"
)
return "; ".join(hints[:3])
def _relation_file_for_novel(self, novel_id: Optional[str]) -> Optional[Path]:
if novel_id:
scoped = self.path_provider.relations_file(novel_id)
if scoped.exists():
return scoped
legacy = self.relations_dir / f"{novel_id}_relations.md"
if legacy.exists():
return legacy
files = sorted(self.relations_dir.glob("*.md"), key=lambda path: path.stat().st_mtime, reverse=True)
return files[0] if files else None
def _update_state(self, session: Dict[str, Any]) -> None:
latest = session["history"][-6:]
emotion = session["state"]["emotion"]
relation_matrix = session["state"].setdefault("relation_matrix", {})
for item in latest:
speaker = item["speaker"]
if speaker in self.SYSTEM_SPEAKERS:
continue
delta = 0
msg = item["message"]
if any(k in msg for k in ("!", "怒", "生气", "质问")):
delta += 1
if any(k in msg for k in ("冷静", "平静", "慢慢说", "理解")):
delta -= 1
emotion[speaker] = max(-5, min(5, emotion.get(speaker, 0) + delta))
target = item.get("target") or self._infer_target(speaker, latest, session["characters"])
if not target or target == speaker:
continue
key = self._pair_key(speaker, target)
state = relation_matrix.setdefault(
key,
{"trust": 5, "affection": 5, "hostility": 0, "ambiguity": 3},
)
if any(k in msg for k in ("谢谢", "抱歉", "理解", "关心", "在意")):
state["affection"] = min(10, state.get("affection", 5) + 1)
state["trust"] = min(10, state.get("trust", 5) + 1)
state["hostility"] = max(0, state.get("hostility", 0) - 1)
if any(k in msg for k in ("滚", "讨厌", "厌恶", "闭嘴", "烦")):
state["hostility"] = min(10, state.get("hostility", 0) + 2)
state["affection"] = max(0, state.get("affection", 5) - 2)
state["trust"] = max(0, state.get("trust", 5) - 1)
if any(k in msg for k in ("也许", "或许", "未必", "以后再说")):
state["ambiguity"] = min(10, state.get("ambiguity", 3) + 1)
session["state"]["relation_delta"][key] = {
"trust": state["trust"],
"affection": state["affection"],
"hostility": state["hostility"],
"ambiguity": state["ambiguity"],
}
def _save_session(self, session: Dict[str, Any]) -> None:
save_markdown_data(
self.sessions_dir / f"{session['id']}.md",
session,
title="SESSION",
summary=[
f"- id: {session.get('id', '')}",
f"- novel_id: {session.get('novel_id', '')}",
f"- mode: {session.get('mode', '')}",
],
)
self._save_relation_snapshot(session)
def _persist_runtime_guidance(self, session: Dict[str, Any], speaker: str, message: str) -> None:
if speaker not in self.SYSTEM_SPEAKERS:
return
if not self._looks_like_persistent_guidance(message):
return
for character in session.get("characters", []):
if not self._message_mentions_character(message, character):
continue
note = f"用户提示:{message.strip()}"
self._append_memory_entry(session.get("novel_id"), character, "user_edits", note)
def _looks_like_persistent_guidance(self, message: str) -> bool:
durable_tokens = tuple(
self.rulebook.get(
"speaker",
"durable_guidance_tokens",
["记住", "设定", "人设", "以后", "别再", "不要再", "改成", "纠正", "必须", "不要", "应该"],
)
)
return any(token in message for token in durable_tokens) and "?" not in message and "?" not in message
def _message_mentions_character(self, message: str, character: str) -> bool:
aliases = [character] + self._candidate_aliases(character)
return any(alias and alias in message for alias in aliases)
def _persist_correction_memory(
self,
session: Dict[str, Any],
character: str,
target: str,
original: str,
corrected: str,
reason: str,
) -> None:
note = f"纠正:原句={original};修正={corrected};原因={reason or 'inline_command'}"
self._append_memory_entry(session.get("novel_id"), character, "user_edits", note)
self._append_memory_entry(session.get("novel_id"), character, "notable_interactions", note)
if target:
target_note = f"与{target}相关的纠正:{corrected}"
self._append_memory_entry(session.get("novel_id"), character, "relationship_updates", target_note)
def _append_memory_entry(self, novel_id: Optional[str], character: str, field: str, note: str) -> None:
if not novel_id or not character or not note.strip():
return
normalized_name = normalize_character_name(character)
persona_dir = self.path_provider.character_dir(novel_id, normalized_name)
memory_file = persona_dir / "MEMORY.md"
if not memory_file.exists():
memory_file.write_text(
"# MEMORY\n\n## Stable Memory\n\n## Mutable Notes\n",
encoding="utf-8",
)
self.distiller.refresh_navigation(persona_dir, normalized_name)
with memory_file.open("a", encoding="utf-8") as handle:
handle.write(f"- {field}: {note.strip()}\n")
def _load_character_profiles(self, novel_id: Optional[str] = None) -> Dict[str, Dict[str, Any]]:
profiles: Dict[str, Dict[str, Any]] = {}
if not self.characters_dir.exists():
return profiles
if novel_id:
scoped_dir = self.path_provider.characters_root(novel_id)
sources = self._collect_profile_sources(scoped_dir)
if not sources:
return profiles
else:
sources = self._collect_profile_sources(self.characters_dir)
for novel_dir in sorted(path for path in self.characters_dir.iterdir() if path.is_dir()):
sources.extend(self._collect_profile_sources(novel_dir))
for file in sources:
item = self._load_profile_source(file)
if item and isinstance(item, dict) and item.get("name"):
canonical_name = normalize_character_name(item["name"])
item["name"] = canonical_name
if file.is_dir():
base_dir = file.parent
elif file.name.startswith("PROFILE"):
base_dir = file.parent.parent
else:
base_dir = file.parent
item = self._merge_persona_bundle(item, base_dir)
profiles[canonical_name] = self._merge_profile_item(profiles.get(canonical_name), item)
return profiles
def _collect_profile_sources(self, root: Path) -> List[Path]:
if not root.exists():
return []
sources: List[Path] = []
seen = set()
for persona_dir in sorted(path for path in root.iterdir() if path.is_dir()):
if any((persona_dir / filename).exists() for filename in ("PROFILE.md", "PROFILE.generated.md")):
resolved = persona_dir.resolve()
if resolved not in seen:
sources.append(persona_dir)
seen.add(resolved)
return sources
def _load_profile_source(self, path: Path) -> Optional[Dict[str, Any]]:
if path.is_dir():
return self._load_profile_bundle(path)
if path.name.startswith("PROFILE"):
return self._load_profile_markdown(path)
return None
def _load_profile_bundle(self, persona_dir: Path) -> Optional[Dict[str, Any]]:
merged: Dict[str, Any] = {}
loaded = False
for filename in ("PROFILE.generated.md", "PROFILE.md"):
path = persona_dir / filename
if not path.exists():
continue
current = self._load_profile_markdown(path)
if not current:
continue
merged = self._merge_profile_markdown_data(merged, current) if loaded else current
loaded = True
return merged if loaded else None
def _load_profile_markdown(self, path: Path) -> Dict[str, Any]:
parsed = self._parse_persona_markdown(path)
profile: Dict[str, Any] = {
"name": parsed.get("name", path.parent.name),
"novel_id": parsed.get("novel_id", path.parent.parent.name),
"source_path": parsed.get("source_path", ""),
"core_traits": self._split_persona_value(parsed.get("core_traits", "")),
"values": self._split_metric_map(parsed.get("values", "")),
"speech_style": parsed.get("speech_style", ""),
"typical_lines": self._split_persona_value(parsed.get("typical_lines", "")),
"decision_rules": self._split_persona_value(parsed.get("decision_rules", "")),
"identity_anchor": parsed.get("identity_anchor", ""),
"soul_goal": parsed.get("soul_goal", ""),
"life_experience": self._split_persona_value(parsed.get("life_experience", "")),
"worldview": parsed.get("worldview", ""),
"thinking_style": parsed.get("thinking_style", ""),
"core_identity": parsed.get("core_identity", ""),
"faction_position": parsed.get("faction_position", ""),
"background_imprint": parsed.get("background_imprint", ""),
"world_rule_fit": parsed.get("world_rule_fit", ""),
"social_mode": parsed.get("social_mode", ""),
"hidden_desire": parsed.get("hidden_desire", ""),
"inner_conflict": parsed.get("inner_conflict", ""),
"story_role": parsed.get("story_role", ""),
"belief_anchor": parsed.get("belief_anchor", ""),
"private_self": parsed.get("private_self", ""),
"stance_stability": parsed.get("stance_stability", ""),
"reward_logic": parsed.get("reward_logic", ""),
"strengths": self._split_persona_value(parsed.get("strengths", "")),
"weaknesses": self._split_persona_value(parsed.get("weaknesses", "")),
"cognitive_limits": self._split_persona_value(parsed.get("cognitive_limits", "")),
"fear_triggers": self._split_persona_value(parsed.get("fear_triggers", "")),
"key_bonds": self._split_persona_value(parsed.get("key_bonds", "")),
"action_style": parsed.get("action_style", ""),
"speech_habits": {
"cadence": parsed.get("cadence", ""),
"signature_phrases": self._split_persona_value(parsed.get("signature_phrases", "")),
"sentence_openers": self._split_persona_value(parsed.get("sentence_openers", "")),
"connective_tokens": self._split_persona_value(parsed.get("connective_tokens", "")),
"sentence_endings": self._split_persona_value(parsed.get("sentence_endings", "")),
"forbidden_fillers": self._split_persona_value(parsed.get("forbidden_fillers", "")),
},
"emotion_profile": {
"anger_style": parsed.get("anger_style", ""),
"joy_style": parsed.get("joy_style", ""),
"grievance_style": parsed.get("grievance_style", ""),
},
"taboo_topics": self._split_persona_value(parsed.get("taboo_topics", "")),
"forbidden_behaviors": self._split_persona_value(parsed.get("forbidden_behaviors", "")),
"arc": {
"start": self._split_metric_map(parsed.get("arc_start", "")),
"mid": self._split_metric_map(parsed.get("arc_mid", "")),
"end": self._split_metric_map(parsed.get("arc_end", "")),
},
"evidence": {
"description_count": self._safe_int(parsed.get("description_count", 0)),
"dialogue_count": self._safe_int(parsed.get("dialogue_count", 0)),
"thought_count": self._safe_int(parsed.get("thought_count", 0)),
"chunk_count": self._safe_int(parsed.get("chunk_count", 0)),
},
}
return profile
def _merge_profile_markdown_data(
self,
base: Dict[str, Any],
overlay: Dict[str, Any],
) -> Dict[str, Any]:
merged = dict(base)
for key, value in overlay.items():
if value in ("", [], {}, None):
continue
if isinstance(value, dict) and isinstance(merged.get(key), dict):
bucket = dict(merged.get(key, {}))
for child_key, child_value in value.items():
if child_value in ("", [], {}, None):
continue
bucket[child_key] = child_value
merged[key] = bucket
continue
merged[key] = value
return merged
def _merge_persona_bundle(self, profile: Dict[str, Any], base_dir: Path) -> Dict[str, Any]:
merged = dict(profile)
persona_dir = base_dir / safe_filename(merged.get("name", ""))
if not persona_dir.exists():
return merged
for base_name, source in self._resolve_persona_sources(persona_dir):
if base_name == "RELATIONS":
continue
parsed = self._parse_persona_markdown(source)
merged = self._apply_persona_overrides(merged, parsed)
return merged
def _resolve_persona_sources(self, persona_dir: Path) -> List[tuple[str, Path]]:
descriptor = self._load_navigation_descriptor(persona_dir)
order = descriptor.get("runtime", {}).get("load_order", []) or list(NovelDistiller.DEFAULT_NAV_LOAD_ORDER)
sources: List[tuple[str, Path]] = []
seen = set()
for base_name in order:
normalized = str(base_name or "").strip().upper()
if not normalized or normalized in seen:
continue
meta = descriptor.get("files", {}).get(normalized, {})
if str(meta.get("status", "")).strip().lower() == "inactive":
continue
source = self._resolve_persona_file_path(persona_dir, normalized, meta)
if not source:
continue
sources.append((normalized, source))
seen.add(normalized)
for base_name in NovelDistiller.DEFAULT_NAV_LOAD_ORDER:
if base_name in seen:
continue
meta = descriptor.get("files", {}).get(base_name, {})
if str(meta.get("status", "")).strip().lower() == "inactive":
continue
source = self._resolve_persona_file_path(persona_dir, base_name, meta)
if not source:
continue
sources.append((base_name, source))
seen.add(base_name)
return sources
def _resolve_persona_file_path(self, persona_dir: Path, base_name: str, meta: Dict[str, Any]) -> Optional[Path]:
editable_name = str(meta.get("file", f"{base_name}.md")).strip() or f"{base_name}.md"
fallback_name = str(meta.get("fallback", f"{base_name}.generated.md")).strip() or f"{base_name}.generated.md"
editable = persona_dir / editable_name
if editable.exists():
return editable
fallback = persona_dir / fallback_name
if fallback.exists():
return fallback
return None
def _load_navigation_descriptor(self, persona_dir: Path) -> Dict[str, Any]:
descriptor = self._default_navigation_descriptor()
generated = persona_dir / "NAVIGATION.generated.md"
editable = persona_dir / "NAVIGATION.md"
for source in (generated, editable):
if not source.exists():
continue
parsed = self._parse_navigation_markdown(source)
descriptor = self._merge_navigation_descriptor(descriptor, parsed)
return descriptor
@staticmethod
def _default_navigation_descriptor() -> Dict[str, Any]:
files = {
base_name: {
"file": f"{base_name}.md",
"fallback": f"{base_name}.generated.md",
}
for base_name in NovelDistiller.DEFAULT_NAV_LOAD_ORDER
}
return {
"runtime": {"load_order": list(NovelDistiller.DEFAULT_NAV_LOAD_ORDER)},
"files": files,
}
def _merge_navigation_descriptor(
self,
base: Dict[str, Any],
overlay: Dict[str, Any],
) -> Dict[str, Any]:
merged = {
"runtime": dict(base.get("runtime", {})),
"files": {
key: dict(value) if isinstance(value, dict) else {}
for key, value in base.get("files", {}).items()
},
}
runtime_overlay = overlay.get("runtime", {}) if isinstance(overlay.get("runtime", {}), dict) else {}
if runtime_overlay.get("load_order"):
merged["runtime"]["load_order"] = self._parse_navigation_order(runtime_overlay["load_order"])
for key, value in runtime_overlay.items():
if key == "load_order":
continue
merged["runtime"][key] = value
files_overlay = overlay.get("files", {}) if isinstance(overlay.get("files", {}), dict) else {}
for base_name, payload in files_overlay.items():
entry = dict(merged["files"].get(base_name, {}))
if isinstance(payload, dict):
entry.update(payload)
merged["files"][base_name] = entry
return merged
@staticmethod
def _parse_navigation_markdown(path: Path) -> Dict[str, Any]:
parsed: Dict[str, Any] = {"runtime": {}, "files": {}}
current_section = ""
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if line.startswith("## "):
current_section = line[3:].strip().upper()
if current_section and current_section != "RUNTIME":
parsed["files"].setdefault(current_section, {})
continue
if not line.startswith("- ") or ":" not in line:
continue
key, value = line[2:].split(":", 1)
key = key.strip()
value = value.strip()
if not value:
continue
if current_section == "RUNTIME":
parsed["runtime"][key] = value
elif current_section:
parsed["files"].setdefault(current_section, {})[key] = value
return parsed
@staticmethod
def _parse_navigation_order(value: Any) -> List[str]:
text = str(value or "").strip()
if not text:
return list(NovelDistiller.DEFAULT_NAV_LOAD_ORDER)
parts = [item.strip().upper() for item in re.split(r"->|,|\|", text) if item.strip()]
return parts or list(NovelDistiller.DEFAULT_NAV_LOAD_ORDER)
@staticmethod
def _parse_persona_markdown(path: Path) -> Dict[str, Any]:
parsed: Dict[str, Any] = {}
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line.startswith("- ") or ":" not in line:
continue
key, value = line[2:].split(":", 1)
key = key.strip()
value = value.strip()
if not value:
continue
if key in parsed and parsed[key]:
parsed[key] = f"{parsed[key]};{value}"
else:
parsed[key] = value
return parsed
def _apply_persona_overrides(self, profile: Dict[str, Any], parsed: Dict[str, Any]) -> Dict[str, Any]:
merged = dict(profile)
list_fields = {
"core_traits",
"typical_lines",
"decision_rules",
"signature_phrases",
"sentence_openers",
"connective_tokens",
"sentence_endings",
"forbidden_fillers",
"taboo_topics",
"forbidden_behaviors",
"strengths",
"weaknesses",
"cognitive_limits",
"fear_triggers",
"key_bonds",
"user_edits",
"notable_interactions",
"relationship_updates",
"canon_memory",
}
dict_targets = {
"cadence": ("speech_habits", "cadence"),
"signature_phrases": ("speech_habits", "signature_phrases"),
"sentence_openers": ("speech_habits", "sentence_openers"),
"connective_tokens": ("speech_habits", "connective_tokens"),
"sentence_endings": ("speech_habits", "sentence_endings"),
"forbidden_fillers": ("speech_habits", "forbidden_fillers"),
"anger_style": ("emotion_profile", "anger_style"),
"joy_style": ("emotion_profile", "joy_style"),
"grievance_style": ("emotion_profile", "grievance_style"),
}
direct_fields = {
"identity_anchor",
"soul_goal",
"speech_style",
"thinking_style",
"worldview",
"life_experience",
"core_identity",
"faction_position",
"background_imprint",
"world_rule_fit",
"social_mode",
"hidden_desire",
"inner_conflict",
"story_role",
"belief_anchor",
"private_self",
"stance_stability",
"reward_logic",
"action_style",
}
for key, value in parsed.items():
if not value:
continue
if key == "canon_memory":
merged["life_experience"] = self._split_persona_value(value)
continue
if key in dict_targets:
parent, child = dict_targets[key]
bucket = dict(merged.get(parent, {})) if isinstance(merged.get(parent, {}), dict) else {}
bucket[child] = self._split_persona_value(value) if key in list_fields else value
merged[parent] = bucket
continue
if key in direct_fields:
if key == "life_experience":
merged[key] = self._split_persona_value(value)
else:
merged[key] = value
continue
if key in list_fields:
merged[key] = self._split_persona_value(value)
return merged
@staticmethod
def _split_persona_value(value: str) -> List[str]:
return [item.strip() for item in re.split(r"[;;]\s*", value) if item.strip()]
@staticmethod
def _split_metric_map(value: str) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for item in re.split(r"[;;]\s*", str(value or "").strip()):
if not item or "=" not in item:
continue
key, raw = item.split("=", 1)
key = key.strip()
raw = raw.strip()
if not key:
continue
if re.fullmatch(r"-?\d+", raw):
result[key] = int(raw)
else:
result[key] = raw
return result
@staticmethod
def _safe_int(value: Any) -> int:
try:
return int(value)
except (TypeError, ValueError):
return 0
@staticmethod
def _merge_profile_item(existing: Optional[Dict[str, Any]], incoming: Dict[str, Any]) -> Dict[str, Any]:
if not existing:
return incoming
current_score = len(existing.get("typical_lines", [])) + len(existing.get("core_traits", []))
incoming_score = len(incoming.get("typical_lines", [])) + len(incoming.get("core_traits", []))
if incoming_score > current_score:
merged = incoming.copy()
fallback = existing
else:
merged = existing.copy()
fallback = incoming
for key in ("core_traits", "typical_lines", "decision_rules"):
merged_values = list(merged.get(key, []))
seen = set(merged_values)
for item in fallback.get(key, []):
if item not in seen:
merged_values.append(item)
seen.add(item)
merged[key] = merged_values
if not merged.get("speech_style") and fallback.get("speech_style"):
merged["speech_style"] = fallback["speech_style"]
if not merged.get("values") and fallback.get("values"):
merged["values"] = fallback["values"]
return merged
@staticmethod
def _pair_key(a: str, b: str) -> str:
return "_".join(sorted([a, b]))
def _build_relation_matrix(self, characters: List[str], novel_id: Optional[str]) -> Dict[str, Dict[str, Any]]:
matrix: Dict[str, Dict[str, Any]] = {}
for speaker in characters:
for target in characters:
if speaker == target:
continue
disk = self._get_relation_state_from_disk(speaker, target, novel_id) or {}
state = {
"trust": int(disk.get("trust", 5)),
"affection": int(disk.get("affection", 5)),
"hostility": int(disk.get("hostility", max(0, 5 - int(disk.get("affection", 5))))),
"ambiguity": int(disk.get("ambiguity", 3)),
}
for key in ("conflict_point", "typical_interaction", "appellations"):
if key in disk:
state[key] = disk[key]
matrix[self._pair_key(speaker, target)] = state
return matrix
def _save_relation_snapshot(self, session: Dict[str, Any]) -> None:
payload = {
"session_id": session.get("id"),
"novel_id": session.get("novel_id"),
"updated_at": int(time.time()),
"relation_matrix": session.get("state", {}).get("relation_matrix", {}),
"relation_delta": session.get("state", {}).get("relation_delta", {}),
}
save_markdown_data(
self.sessions_dir / f"{session['id']}_relations.md",
payload,
title="SESSION_RELATIONS",
summary=[
f"- session_id: {session.get('id', '')}",
f"- novel_id: {session.get('novel_id', '')}",
],
)
def _get_relation_state_from_disk(
self,
speaker: str,
target: str,
novel_id: Optional[str] = None,
) -> Dict[str, Any]:
rel_file = self._relation_file_for_novel(novel_id)
if not rel_file:
base = {}
else:
payload = load_markdown_data(rel_file, default={}) or {}
rel = payload.get("relations", {}) if isinstance(payload, dict) else {}
normalized = {normalize_relation_key(key): value for key, value in rel.items()}
base = normalized.get(self._pair_key(normalize_character_name(speaker), normalize_character_name(target)), {})
return self._merge_relation_overlay(base, speaker, target, novel_id)
def _merge_relation_overlay(
self,
relation_state: Dict[str, Any],
speaker: str,
target: str,
novel_id: Optional[str],
) -> Dict[str, Any]:
merged = dict(relation_state or {})
overlay = self._load_relation_markdown_overlay(speaker, target, novel_id)
if not overlay:
return merged
for key in ("trust", "affection", "power_gap"):
if key in overlay:
try:
merged[key] = int(overlay[key])
except (TypeError, ValueError):
pass
for key in ("conflict_point", "typical_interaction"):
if overlay.get(key):
merged[key] = overlay[key]
appellation = overlay.get("appellation_to_target", "")
if appellation:
appellations = dict(merged.get("appellations", {})) if isinstance(merged.get("appellations", {}), dict) else {}
appellations[f"{speaker}->{target}"] = appellation
merged["appellations"] = appellations
return merged
def _load_relation_markdown_overlay(self, speaker: str, target: str, novel_id: Optional[str]) -> Dict[str, str]:
if not novel_id:
return {}
persona_dir = self.path_provider.character_dir(novel_id, normalize_character_name(speaker))
descriptor = self._load_navigation_descriptor(persona_dir) if persona_dir.exists() else self._default_navigation_descriptor()
meta = descriptor.get("files", {}).get("RELATIONS", {})
if str(meta.get("status", "")).strip().lower() == "inactive":
return {}
path = self._resolve_persona_file_path(persona_dir, "RELATIONS", meta) if persona_dir.exists() else None
if not path:
return {}
parsed = self._parse_relation_markdown(path)
target_key = normalize_character_name(target)
if target_key in parsed:
return parsed[target_key]
return {}
def _parse_relation_markdown(self, path: Path) -> Dict[str, Dict[str, str]]:
result: Dict[str, Dict[str, str]] = {}
current_target = ""
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if line.startswith("## "):
current_target = normalize_character_name(line[3:].strip())
result.setdefault(current_target, {})
continue
if not current_target or not line.startswith("- ") or ":" not in line:
continue
key, value = line[2:].split(":", 1)
result[current_target][key.strip()] = value.strip()
return result
def _get_relation_state(self, session: Dict[str, Any], speaker: str, target: str) -> Dict[str, Any]:
if not target:
return {}
matrix = session["state"].setdefault("relation_matrix", {})
return matrix.get(self._pair_key(speaker, target), {})
def _active_characters(
self,
session: Dict[str, Any],
speaker: Optional[str] = None,
context: str = "",
) -> List[str]:
limit = int(self.config.get("chat_engine.max_speakers_per_turn", 4))
candidates = [name for name in session["characters"] if name != speaker]
if not candidates:
return []
mentioned = self._mentioned_characters(context, candidates)
if mentioned:
if session.get("mode") == "act":
return mentioned[: max(1, min(limit, len(mentioned)))]
ranked = self._rank_characters(session, speaker, candidates, preferred=mentioned)
ordered = []
seen = set()
for name in mentioned + ranked:
if name in seen:
continue
ordered.append(name)
seen.add(name)
if len(ordered) >= max(1, limit):
break
return ordered
remembered = self._remembered_target(session, speaker, candidates)
if remembered:
return [remembered]
ranked = self._rank_characters(session, speaker, candidates)
if session.get("mode") == "act":
if not ranked:
return []
top = ranked[0]
if self._relation_score(session, speaker, top) <= self._default_relation_score():
return []
return [top]
return ranked[: max(1, limit)]
def _remembered_target(
self,
session: Dict[str, Any],
speaker: Optional[str],
candidates: List[str],
) -> str:
if not speaker or speaker in self.SYSTEM_SPEAKERS:
return ""
focus_targets = session.get("state", {}).get("focus_targets", {})
target = focus_targets.get(speaker, "")
if target in candidates:
return target
return ""
def _trim_history(self, session: Dict[str, Any]) -> None:
turns = int(self.config.get("chat_engine.max_history_turns", 10))
keep = max(10, turns * (len(self._active_characters(session)) + 1))
session["history"] = session["history"][-keep:]
def _resolve_observe_turn(self, session: Dict[str, Any], user_msg: str) -> tuple[str, str]:
message = user_msg.strip()
if not message:
return "Narrator", user_msg
if len(session.get("characters", [])) == 1:
only_name = session["characters"][0]
aliases = [only_name] + self._candidate_aliases(only_name)
for alias in aliases:
stripped = self._strip_explicit_speaker_prefix(message, alias)
if stripped != message:
return "Narrator", stripped.strip() or message
return "Narrator", user_msg
for name in session["characters"]:
aliases = [name] + self._candidate_aliases(name)
for alias in aliases:
stripped = self._strip_explicit_speaker_prefix(message, alias)
if stripped == message:
continue
normalized = stripped.strip() or message
return name, normalized
return "Narrator", user_msg
@staticmethod
def _strip_explicit_speaker_prefix(message: str, alias: str) -> str:
escaped = re.escape(alias)
patterns = (
rf"^\s*[“\"'「『]?\s*{escaped}\s*[::,,]\s*",
rf"^\s*[“\"'「『]?\s*{escaped}\s*(?:说道|说|道|问道|问|答道|答|曰|开口道|笑道|沉声道|朗声道|轻声道)\s*[::,,]?\s*",
)
for pattern in patterns:
updated = re.sub(pattern, "", message, count=1)
if updated != message:
return updated
return message
def _candidate_aliases(self, name: str) -> List[str]:
clean = normalize_character_name(name)
if hasattr(self.distiller, "candidate_aliases"):
return list(self.distiller.candidate_aliases(clean))
aliases: List[str] = []
aliases.extend(canonical_aliases(clean))
if len(clean) >= 3:
given = clean[-2:]
if len(given) == 2 and given != clean:
aliases.append(given)
for suffix in self.address_suffixes:
aliases.append(f"{given[0]}{suffix}")
aliases.append(f"{clean[0]}{suffix}")
elif len(clean) == 2:
for suffix in self.address_suffixes:
aliases.append(f"{clean[0]}{suffix}")
ordered = []
seen = set()
for alias in aliases:
if alias and alias != clean and alias not in seen:
ordered.append(alias)
seen.add(alias)
return ordered
def _mentioned_characters(self, context: str, candidates: List[str]) -> List[str]:
if not context:
return []
alias_owners: Dict[str, List[str]] = {}
for name in candidates:
for alias in self._candidate_aliases(name):
alias_owners.setdefault(alias, []).append(name)
hits: List[tuple[int, str]] = []
for name in candidates:
positions = []
if name in context:
positions.append(context.index(name))
for alias in self._candidate_aliases(name):
if alias_owners.get(alias) != [name]:
continue
if alias in context:
positions.append(context.index(alias))
if positions:
hits.append((min(positions), name))
hits.sort(key=lambda item: (item[0], item[1]))
return [name for _, name in hits]
@staticmethod
def _default_relation_score() -> int:
return 7
def _relation_score(self, session: Dict[str, Any], speaker: Optional[str], candidate: str) -> int:
if not speaker or speaker in self.SYSTEM_SPEAKERS:
return 0
state = self._get_relation_state(session, speaker, candidate)
trust = int(state.get("trust", 5))
affection = int(state.get("affection", 5))
hostility = int(state.get("hostility", max(0, 5 - affection)))
ambiguity = int(state.get("ambiguity", 3))
return trust + affection - hostility - ambiguity
def _rank_characters(
self,
session: Dict[str, Any],
speaker: Optional[str],
candidates: List[str],
preferred: Optional[List[str]] = None,
) -> List[str]:
preferred_set = set(preferred or [])
return sorted(
candidates,
key=lambda name: (
1 if name in preferred_set else 0,
self._relation_score(session, speaker, name),
name,
),
reverse=True,
)
def _resolve_character_name(self, raw_name: str, candidates: List[str]) -> str:
normalized = normalize_character_name(raw_name)
if normalized in candidates:
return normalized
matched = []
for name in candidates:
if normalized == name or normalized in self._candidate_aliases(name):
matched.append(name)
if len(matched) == 1:
return matched[0]
return normalized
@staticmethod
def _infer_target(speaker: str, history: List[Dict[str, Any]], all_chars: List[str]) -> str:
for item in reversed(history):
prev_speaker = item.get("speaker", "")
if prev_speaker and prev_speaker != speaker and prev_speaker in all_chars:
return prev_speaker
for candidate in all_chars:
if candidate != speaker:
return candidate
return ""
def _guard_reply(
self,
profile: Dict[str, Any],
reply: str,
relation_state: Dict[str, Any],
target_name: str,
) -> str:
issues = self.reflection.relation_alignment_issues(reply, relation_state)
checked = self.reflection.detect_ooc(profile, reply)
if not issues and not checked.is_ooc:
return reply
rewritten = self._rewrite_reply(reply, relation_state, target_name)
issues_after = self.reflection.relation_alignment_issues(rewritten, relation_state)
checked_after = self.reflection.detect_ooc(profile, rewritten)
if issues_after or checked_after.is_ooc:
reasons = issues_after + checked_after.reasons
return f"{rewritten}(needs_revision: {'; '.join(reasons[:2])})"
return rewritten
@staticmethod
def _rewrite_reply(reply: str, relation_state: Dict[str, Any], target_name: str) -> str:
target = target_name or "对方"
hostility = int(relation_state.get("hostility", 0))
affection = int(relation_state.get("affection", 5))
ambiguity = int(relation_state.get("ambiguity", 3))
if hostility >= 7:
return f"对{target},我把话说到这里,不必更近一步。"
if affection >= 8:
return f"对{target},我会把语气放缓,把话说明白。"
if ambiguity >= 7:
return f"对{target},我先留一点余地,不把话说死。"
return f"{reply}(已按对象关系收束)"
FILE:runtime/src/modules/distillation.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
from src.core.config import Config
from src.core.contracts import CostEstimator, PathProviderLike, RuleProvider
from src.utils.file_utils import canonical_aliases, ensure_dir, novel_id_from_input, safe_filename
from src.utils.text_parser import load_novel_text, split_sentences
from src.utils.token_counter import TokenCounter
class NovelDistiller:
"""Generic novel character distillation driven by editable markdown rules."""
CHAPTER_HEADING_PATTERNS = (
re.compile(r"^第[0-9零一二三四五六七八九十百千两]+章"),
re.compile(r"^第[0-9零一二三四五六七八九十百千两]+回"),
re.compile(r"^卷[0-9零一二三四五六七八九十百千两]"),
re.compile(r"^chapter\s+\d+", flags=re.IGNORECASE),
)
DEFAULT_NAV_LOAD_ORDER = (
"SOUL",
"GOALS",
"STYLE",
"TRAUMA",
"IDENTITY",
"BACKGROUND",
"CAPABILITY",
"BONDS",
"CONFLICTS",
"ROLE",
"AGENTS",
"RELATIONS",
"MEMORY",
)
PERSONA_FILE_CATALOG = {
"SOUL": {
"optional": False,
"role": "core values, worldview, boundaries",
"behaviors": "stance, taboo, refusal, value judgment",
"write_policy": "manual_edit",
},
"GOALS": {
"optional": True,
"role": "long-term drive, unresolved desire, strategic priority",
"behaviors": "decision preference, long arc pressure, ambition",
"write_policy": "manual_edit",
},
"STYLE": {
"optional": True,
"role": "signature phrasing, cadence, surface emotion, sample lines",
"behaviors": "word choice, sentence length, tone, recurring fragments",
"write_policy": "manual_edit",
},
"TRAUMA": {
"optional": True,
"role": "pain points, scars, triggers, never-do rules",
"behaviors": "trigger reactions, avoidance, hard boundaries",
"write_policy": "manual_edit",
},
"IDENTITY": {
"optional": False,
"role": "background, lived experience, habits, emotion profile",
"behaviors": "self-reference, memory framing, habit-driven reactions",
"write_policy": "manual_edit",
},
"BACKGROUND": {
"optional": True,
"role": "world identity, faction position, environment imprint, survival context",
"behaviors": "camp alignment, social rank, environmental pressure, worldview fit",
"write_policy": "manual_edit",
},
"CAPABILITY": {
"optional": True,
"role": "strengths, weaknesses, blind spots, action tendency",
"behaviors": "what the character is good at, where they fail, how they overreach",
"write_policy": "manual_edit",
},
"BONDS": {
"optional": True,
"role": "relationship habits, trust boundary, reward-and-resentment logic",
"behaviors": "how the character treats allies, strangers, enemies, and debts",
"write_policy": "manual_edit",
},
"CONFLICTS": {
"optional": True,
"role": "hidden desire, inner contradiction, fear triggers, private self",
"behaviors": "internal pull, weakness exposure, private vs public self",
"write_policy": "manual_edit",
},
"ROLE": {
"optional": True,
"role": "story function, stance stability, world-rule compatibility",
"behaviors": "plot pressure, pivot role, alignment stability",
"write_policy": "manual_edit",
},
"AGENTS": {
"optional": False,
"role": "runtime behavior rules, silence policy, group chat routing",
"behaviors": "when to speak, when to hold back, how to engage others",
"write_policy": "manual_edit",
},
"RELATIONS": {
"optional": True,
"role": "target-specific trust, affection, appellations, friction points",
"behaviors": "tone toward each character, appellations, conflict framing",
"write_policy": "manual_edit",
},
"MEMORY": {
"optional": False,
"role": "stable notes plus runtime write-back from user guidance and corrections",
"behaviors": "persistent user constraints, correction carry-over, mutable notes",
"write_policy": "runtime_append",
},
}
DEFAULT_ADDRESS_SUFFIXES = ("哥哥", "姐姐", "妹妹", "弟弟", "姑娘", "公子", "爷")
DEFAULT_SPEECH_VERBS = ("道", "说道", "笑道", "问道", "答道", "喝道", "叫道", "叹道", "呼道")
DEFAULT_OBJECT_LEADERS = (
"叫",
"唤",
"问",
"对",
"向",
"同",
"与",
"跟",
"把",
"将",
"扯住",
"拉住",
"搀起",
"扶起",
"扶着",
"呼",
"忙呼",
"喝住",
"捉住",
"拿住",
"推着",
"拖着",
"请",
"教",
)
DEFAULT_STOP_NAMES = {
"我们",
"你们",
"他们",
"她们",
"自己",
"那里",
"这里",
"这个",
"那个",
"一种",
"一个",
}
DEFAULT_COMMON_SURNAMES = (
"赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻柏水窦章云苏潘葛奚"
"范彭郎鲁韦昌马苗凤花方俞任袁柳鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮卞齐康伍余元"
"卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闵席季麻强"
"贾路娄危江童颜郭梅盛林刁钟徐邱骆高夏蔡田樊胡凌霍虞万支柯昝管卢莫经房裘缪干解应宗丁宣贲邓郁"
"单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁荀羊惠甄曲家封芮羿储靳汲邴糜松井段富巫乌焦巴弓牧隗山"
"谷车侯宓蓬全郗班仰秋仲伊宫宁仇栾暴甘钭厉戎祖武符刘景詹束龙叶幸司韶郜黎"
)
DEFAULT_TRAIT_KEYWORDS = {
"勇敢": ["勇", "冲", "无畏", "果断"],
"温柔": ["轻声", "温和", "安慰", "体贴"],
"聪慧": ["思索", "推断", "聪明", "机敏"],
"敏感": ["委屈", "难过", "心酸", "叹息"],
"傲气": ["冷笑", "不屑", "高傲", "轻蔑"],
"忠诚": ["守护", "忠", "誓言", "不离"],
"善良": ["帮助", "善意", "宽容", "谅解"],
"执拗": ["坚持", "非要", "绝不", "固执"],
"机变": ["变化", "试探", "识破", "周旋"],
"诙谐": ["笑道", "打趣", "顽皮", "戏弄"],
"虔诚": ["佛", "祈祷", "经文", "戒律"],
"沉稳": ["稳住", "接应", "收拾", "不慌"],
"圆滑": ["不如", "且慢", "何必", "先看看"],
"克制": ["沉住气", "先忍", "不动声色", "收着"],
}
def __init__(
self,
config: Optional[Config] = None,
*,
llm_client: Optional[CostEstimator] = None,
token_counter: Optional[TokenCounter] = None,
rulebook: Optional[RuleProvider] = None,
path_provider: Optional[PathProviderLike] = None,
):
self.config = config or Config()
if llm_client is None or token_counter is None or rulebook is None or path_provider is None:
raise ValueError("NovelDistiller requires injected llm_client, token_counter, rulebook, and path_provider")
self.path_provider = path_provider
self.rulebook = rulebook
self.llm_client = llm_client
self.token_counter = token_counter
self._last_chunk_count = 0
self.address_suffixes = tuple(
self.rulebook.get("distillation", "address_suffixes", list(self.DEFAULT_ADDRESS_SUFFIXES))
)
self.speech_verbs = tuple(
self.rulebook.get("distillation", "speech_verbs", list(self.DEFAULT_SPEECH_VERBS))
)
self.object_leaders = tuple(
self.rulebook.get("distillation", "object_leaders", list(self.DEFAULT_OBJECT_LEADERS))
)
self.stop_names = set(self.rulebook.get("distillation", "stop_names", list(self.DEFAULT_STOP_NAMES)))
self.common_surnames = str(
self.rulebook.get("distillation", "common_surnames", self.DEFAULT_COMMON_SURNAMES)
)
self.trait_keywords = dict(
self.rulebook.get("distillation", "trait_keywords", self.DEFAULT_TRAIT_KEYWORDS)
)
self.archetypes = dict(self.rulebook.get("distillation", "archetypes", {}))
self.value_markers = dict(self.rulebook.get("distillation", "value_markers", {}))
speaker_rules = self.rulebook.section("speaker")
self.generic_fillers = tuple(speaker_rules.get("generic_fillers", []))
self.signature_fragments = tuple(speaker_rules.get("signature_fragments", []))
self.opener_patterns = tuple(speaker_rules.get("opener_patterns", []))
self.connective_patterns = tuple(speaker_rules.get("connective_patterns", []))
self.ending_patterns = tuple(speaker_rules.get("ending_patterns", []))
self.fragment_stopwords = {
str(item).strip() for item in speaker_rules.get("fragment_stopwords", []) if str(item).strip()
}
self.preferred_leading_chars = tuple(speaker_rules.get("preferred_leading_chars", []))
self.preferred_trailing_chars = tuple(speaker_rules.get("preferred_trailing_chars", []))
self.style_templates = dict(self.rulebook.get("distillation", "style_templates", {}))
self.decision_rule_signals = dict(self.rulebook.get("distillation", "decision_rule_signals", {}))
self.taboo_topics_by_value = dict(self.rulebook.get("distillation", "taboo_topics_by_value", {}))
self.forbidden_behaviors_by_value = dict(
self.rulebook.get("distillation", "forbidden_behaviors_by_value", {})
)
def estimate_cost(self, novel_path: str) -> float:
text = self.prepare_novel_text(load_novel_text(novel_path))
chunks = self._chunk_text(text)
self._last_chunk_count = len(chunks)
avg_chunk_tokens = self.token_counter.count(text) / max(1, len(chunks))
total_prompt_tokens = int(len(chunks) * (avg_chunk_tokens + 250))
synthetic_prompt = "x" * max(10, total_prompt_tokens // 2)
return self.llm_client.estimate_cost(synthetic_prompt, expected_completion_ratio=0.35)
def get_last_chunk_count(self) -> int:
return self._last_chunk_count
def distill(
self,
novel_path: str,
characters: Optional[List[str]] = None,
output_dir: Optional[str] = None,
) -> Dict[str, Dict[str, Any]]:
text = self.prepare_novel_text(load_novel_text(novel_path))
chunks = self._chunk_text(text)
self._last_chunk_count = len(chunks)
novel_id = novel_id_from_input(novel_path)
target_characters = [item.strip() for item in characters or [] if item.strip()] or self.extract_top_characters(text)
if not target_characters:
raise ValueError("No character candidates were extracted from the novel text")
alias_map = self.build_alias_map(text, target_characters, allow_sparse_alias=bool(characters))
aggregated = {name: self._empty_bucket() for name in target_characters}
arc_points: Dict[str, List[Tuple[int, Dict[str, int]]]] = defaultdict(list)
for idx, chunk in enumerate(chunks):
chunk_evidence, chunk_values = self._extract_from_chunk(chunk, alias_map)
for name in target_characters:
evidence = chunk_evidence.get(name)
if not evidence:
continue
bucket = aggregated[name]
bucket["descriptions"].extend(evidence["descriptions"])
bucket["dialogues"].extend(evidence["dialogues"])
bucket["thoughts"].extend(evidence["thoughts"])
arc_points[name].append((idx, chunk_values.get(name, {})))
out_dir = ensure_dir(Path(output_dir) if output_dir else self.path_provider.characters_root(novel_id))
profiles: Dict[str, Dict[str, Any]] = {}
for name in target_characters:
profile = self._build_profile(name, aggregated[name], arc_points.get(name, []))
profile["novel_id"] = novel_id
profile["source_path"] = novel_path
profile["evidence"] = {
"description_count": len(aggregated[name]["descriptions"]),
"dialogue_count": len(aggregated[name]["dialogues"]),
"thought_count": len(aggregated[name]["thoughts"]),
"chunk_count": len(arc_points.get(name, [])),
}
profiles[name] = profile
self._export_persona_bundle(out_dir, profile)
return profiles
def extract_top_characters(self, text: str) -> List[str]:
return self._extract_top_characters(self.prepare_novel_text(text))
def build_alias_map(
self,
text: str,
character_names: List[str],
allow_sparse_alias: bool = False,
) -> Dict[str, List[str]]:
return self._build_alias_map(self.prepare_novel_text(text), character_names, allow_sparse_alias=allow_sparse_alias)
def text_mentions_any_alias(self, text: str, aliases: List[str]) -> bool:
return self._text_mentions_any_alias(text, aliases)
def refresh_navigation(self, persona_dir: Path, character_name: str) -> None:
self.refresh_persona_navigation(persona_dir, character_name)
def candidate_aliases(self, name: str) -> List[str]:
clean = str(name or "").strip()
aliases: List[str] = []
aliases.extend(canonical_aliases(clean))
if len(clean) >= 3:
given = clean[-2:]
if given != clean:
aliases.append(given)
for suffix in self.address_suffixes:
aliases.append(f"{given[0]}{suffix}")
aliases.append(f"{clean[0]}{suffix}")
elif len(clean) == 2:
for suffix in self.address_suffixes:
aliases.append(f"{clean[0]}{suffix}")
return self._unique_texts(item for item in aliases if item and item != clean)
def prepare_novel_text(self, text: str) -> str:
return self._prepare_novel_text(text)
def _chunk_text(self, text: str) -> List[str]:
size = int(self.config.get("text_processing.chunk_size_tokens", 8000))
overlap = int(self.config.get("text_processing.chunk_overlap_tokens", 200))
return self.token_counter.split_by_tokens(text, size, overlap)
def _extract_top_characters(self, text: str) -> List[str]:
name_pattern = re.compile(rf"([{self.common_surnames}][\u4e00-\u9fff]{{1,2}})")
raw_names: List[str] = []
for match in name_pattern.finditer(text):
start = match.start()
if start > 0 and "\u4e00" <= text[start - 1] <= "\u9fff":
continue
raw_names.append(match.group(1))
disallowed = set("你我他她它们的了得地着过吗呀啊呢就在和并与把被让向对将又很都并且")
candidates = []
for name in raw_names:
if name in self.stop_names or len(name) < 2 or len(name) > 3:
continue
if any(ch in disallowed for ch in name[1:]):
continue
candidates.append(name)
counts = Counter(candidates)
filtered = self._pick_frequent_names(counts, min_count=int(self.config.get("distillation.min_appearances", 3)))
if not filtered:
filtered = self._pick_frequent_names(counts, min_count=2)
if not filtered:
filtered = self._pick_frequent_names(counts, min_count=1)
if len(filtered) < 3:
alias_candidates = re.findall(r"[\u4e00-\u9fff]{2}(?:儿|爷|姐|妹|兄|玉|钗)", text)
for alias, count in Counter(alias_candidates).most_common(10):
if count < 2 or alias in self.stop_names or alias in filtered:
continue
filtered.append(alias)
return filtered[: int(self.config.get("distillation.max_characters", 10))]
def _pick_frequent_names(self, counts: Counter[str], min_count: int) -> List[str]:
filtered: List[str] = []
for name, count in counts.most_common(60):
if count < min_count:
continue
if self._looks_like_name(name):
filtered.append(name)
return filtered
def _build_alias_map(
self,
text: str,
character_names: List[str],
allow_sparse_alias: bool = False,
) -> Dict[str, List[str]]:
alias_owners: Dict[str, List[str]] = defaultdict(list)
for name in character_names:
for alias in self.candidate_aliases(name):
alias_owners[alias].append(name)
alias_map: Dict[str, List[str]] = {}
for name in character_names:
aliases = [name]
for alias in self.candidate_aliases(name):
if alias_owners.get(alias) != [name]:
continue
if not self._alias_is_reliable(text, alias, allow_sparse_alias=allow_sparse_alias):
continue
aliases.append(alias)
alias_map[name] = self._unique_texts(aliases)
return alias_map
def _alias_is_reliable(self, text: str, alias: str, allow_sparse_alias: bool = False) -> bool:
if len(alias) < 2 or alias in self.stop_names:
return False
min_mentions = 1 if allow_sparse_alias else 2
return self._count_token_mentions(text, alias) >= min_mentions
def _extract_from_chunk(
self,
chunk: str,
alias_map: Dict[str, List[str]],
) -> Tuple[Dict[str, Dict[str, List[str]]], Dict[str, Dict[str, int]]]:
sentences = split_sentences(chunk)
evidence_map: Dict[str, Dict[str, List[str]]] = {}
value_map: Dict[str, Dict[str, int]] = {}
dims = self._value_dimensions()
for name, aliases in alias_map.items():
evidence = self._empty_bucket()
values_acc: List[Dict[str, int]] = []
for idx, sentence in enumerate(sentences):
if self._looks_like_metadata_sentence(sentence):
continue
prev_sent = sentences[idx - 1] if idx > 0 else ""
next_sent = sentences[idx + 1] if idx + 1 < len(sentences) else ""
contains_name = self._text_mentions_any_alias(sentence, aliases)
pronoun_hit = any(token in sentence for token in ("他", "她")) and (
self._text_mentions_any_alias(prev_sent, aliases) or self._text_mentions_any_alias(next_sent, aliases)
)
has_quote = "“" in sentence or "\"" in sentence
speaker_hit = has_quote and self._is_likely_spoken_by(sentence, aliases, prev_sent, next_sent)
if not (contains_name or pronoun_hit or speaker_hit):
continue
if has_quote and speaker_hit:
spoken = self._extract_spoken_content(sentence, aliases, prev_sent, next_sent)
if spoken:
evidence["dialogues"].append(spoken)
values_acc.append(self._score_values(spoken, dims))
continue
if any(token in sentence for token in ("心想", "想着", "觉得", "暗道", "心里")):
evidence["thoughts"].append(sentence)
else:
evidence["descriptions"].append(sentence)
values_acc.append(self._score_values(sentence, dims))
if any(evidence.values()):
evidence_map[name] = {
key: self._dedupe_texts(items, limit=24 if key == "descriptions" else 12)
for key, items in evidence.items()
}
value_map[name] = self._average_values(values_acc, dims)
return evidence_map, value_map
def _score_values(self, sentence: str, dims: List[str]) -> Dict[str, int]:
score = {dim: 5 for dim in dims}
for dim in dims:
config = self.value_markers.get(dim, {})
positive = sum(sentence.count(token) for token in config.get("positive", []))
negative = sum(sentence.count(token) for token in config.get("negative", []))
delta = min(3, positive) - min(3, negative)
score[dim] = max(1, min(10, score[dim] + delta))
return score
@staticmethod
def _average_values(values_list: List[Dict[str, int]], dims: List[str]) -> Dict[str, int]:
if not values_list:
return {dim: 5 for dim in dims}
averaged: Dict[str, int] = {}
for dim in dims:
averaged[dim] = int(round(sum(item.get(dim, 5) for item in values_list) / len(values_list)))
return averaged
def _build_profile(
self,
name: str,
bucket: Dict[str, List[str]],
arc_values: List[Tuple[int, Dict[str, int]]],
) -> Dict[str, Any]:
descriptions = self._dedupe_texts(bucket["descriptions"], 24)
dialogues = self._dedupe_texts(bucket["dialogues"], 8)
thoughts = self._dedupe_texts(bucket["thoughts"], 12)
archetype = self._infer_archetype(name, descriptions, dialogues, thoughts)
values = self._infer_values_from_corpus(self._merge_arc_values(arc_values), descriptions, dialogues, thoughts, archetype)
core_traits = self._infer_traits(descriptions + dialogues + thoughts, archetype)
speech_style = self._infer_speech_style(dialogues, archetype)
decision_rules = self._infer_decision_rules(thoughts, descriptions, dialogues, archetype)
arc = self._build_arc(arc_values, values)
identity_anchor = self._infer_identity_anchor(core_traits, values, decision_rules, archetype)
soul_goal = self._infer_soul_goal(values, core_traits, archetype)
life_experience = self._infer_life_experience(descriptions, dialogues, thoughts, decision_rules, values, archetype)
worldview = self._infer_worldview(values, core_traits, archetype)
thinking_style = self._infer_thinking_style(values, core_traits, speech_style, archetype)
speech_habits = self._infer_speech_habits(dialogues, speech_style)
emotion_profile = self._infer_emotion_profile(dialogues, thoughts, speech_style, core_traits)
taboo_topics = self._infer_taboo_topics(values, core_traits, decision_rules)
forbidden_behaviors = self._infer_forbidden_behaviors(values, core_traits, speech_style)
core_identity = self._infer_core_identity(identity_anchor, core_traits, descriptions, dialogues)
faction_position = self._infer_faction_position(name, descriptions, dialogues, thoughts, values)
background_imprint = self._infer_background_imprint(life_experience, values, descriptions)
world_rule_fit = self._infer_world_rule_fit(values, decision_rules, speech_style)
strengths = self._infer_strengths(core_traits, decision_rules, speech_style)
weaknesses = self._infer_weaknesses(core_traits, emotion_profile, speech_style)
cognitive_limits = self._infer_cognitive_limits(values, core_traits)
action_style = self._infer_action_style(values, decision_rules, speech_style)
social_mode = self._infer_social_mode(values, core_traits, speech_style)
key_bonds = self._infer_key_bonds(values, decision_rules, taboo_topics)
reward_logic = self._infer_reward_logic(values, core_traits)
hidden_desire = self._infer_hidden_desire(values, soul_goal)
inner_conflict = self._infer_inner_conflict(values, core_traits, decision_rules)
fear_triggers = self._infer_fear_triggers(values, taboo_topics, forbidden_behaviors)
private_self = self._infer_private_self(speech_style, emotion_profile, social_mode)
story_role = self._infer_story_role(descriptions, dialogues, thoughts, decision_rules)
belief_anchor = self._infer_belief_anchor(values, worldview)
stance_stability = self._infer_stance_stability(values, decision_rules)
return {
"name": name,
"core_traits": core_traits[: int(self.config.get("distillation.traits_max_count", 10))],
"values": values,
"speech_style": speech_style,
"typical_lines": dialogues[:8],
"decision_rules": decision_rules[:8],
"identity_anchor": identity_anchor,
"soul_goal": soul_goal,
"life_experience": life_experience[:4],
"worldview": worldview,
"thinking_style": thinking_style,
"speech_habits": speech_habits,
"emotion_profile": emotion_profile,
"taboo_topics": taboo_topics[:6],
"forbidden_behaviors": forbidden_behaviors[:6],
"core_identity": core_identity,
"faction_position": faction_position,
"background_imprint": background_imprint,
"world_rule_fit": world_rule_fit,
"strengths": strengths[:5],
"weaknesses": weaknesses[:5],
"cognitive_limits": cognitive_limits[:4],
"action_style": action_style,
"social_mode": social_mode,
"key_bonds": key_bonds[:4],
"reward_logic": reward_logic,
"hidden_desire": hidden_desire,
"inner_conflict": inner_conflict,
"fear_triggers": fear_triggers[:5],
"private_self": private_self,
"story_role": story_role,
"belief_anchor": belief_anchor,
"stance_stability": stance_stability,
"arc": arc,
"archetype": archetype,
}
def _infer_traits(self, lines: List[str], archetype: str) -> List[str]:
if not lines:
return self._apply_archetype_traits(["克制", "复杂"], archetype)
corpus = " ".join(lines)
hits: List[Tuple[str, int]] = []
for trait, markers in self.trait_keywords.items():
score = sum(corpus.count(token) for token in markers)
if score > 0:
hits.append((trait, score))
hits.sort(key=lambda item: item[1], reverse=True)
base_traits = [trait for trait, _ in hits[:8]] or ["谨慎", "多思"]
return self._apply_archetype_traits(base_traits, archetype)
def _infer_archetype(
self,
name: str,
descriptions: List[str],
dialogues: List[str],
thoughts: List[str],
) -> str:
corpus = " ".join([name] + descriptions[:10] + dialogues[:10] + thoughts[:10])
best_name = "default"
best_score = 0
second_score = 0
for archetype_name, config in self.archetypes.items():
markers = [str(item).strip() for item in config.get("markers", []) if str(item).strip()]
score = sum(corpus.count(marker) for marker in markers)
if score > best_score:
second_score = best_score
best_name = archetype_name
best_score = score
elif score > second_score:
second_score = score
return best_name if best_score >= 5 and best_score >= second_score + 2 else "default"
def _apply_archetype_traits(self, traits: List[str], archetype: str) -> List[str]:
configured = self.archetypes.get(archetype, {}).get("traits", [])
return self._unique_texts(list(traits) + [str(item).strip() for item in configured if str(item).strip()])[:10]
def _infer_values_from_corpus(
self,
values: Dict[str, int],
descriptions: List[str],
dialogues: List[str],
thoughts: List[str],
archetype: str,
) -> Dict[str, int]:
dims = self._value_dimensions()
corpus = " ".join(descriptions + dialogues + thoughts)
merged = {dim: int(values.get(dim, 5)) for dim in dims}
for dim in dims:
config = self.value_markers.get(dim, {})
positive = sum(corpus.count(token) for token in config.get("positive", []))
negative = sum(corpus.count(token) for token in config.get("negative", []))
delta = min(3, positive) - min(3, negative)
merged[dim] = max(1, min(10, merged.get(dim, 5) + delta))
for dim, bias in self.archetypes.get(archetype, {}).get("value_bias", {}).items():
if dim not in merged:
merged[dim] = 5
merged[dim] = max(1, min(10, merged[dim] + max(-1, min(1, int(bias)))))
return merged
def _merge_arc_values(self, arc_values: List[Tuple[int, Dict[str, int]]]) -> Dict[str, int]:
dims = self._value_dimensions()
if not arc_values:
return {dim: 5 for dim in dims}
merged = defaultdict(list)
for _, values in arc_values:
for dim in dims:
merged[dim].append(int(values.get(dim, 5)))
return {dim: int(round(sum(items) / len(items))) for dim, items in merged.items()}
def _build_arc(
self,
arc_values: List[Tuple[int, Dict[str, int]]],
fallback_values: Dict[str, int],
) -> Dict[str, Any]:
if not arc_values:
return {
"start": {},
"mid": {"trigger_event": "未识别到稳定弧光证据"},
"end": {"final_state": "未判定(证据不足)"},
}
ordered = sorted(arc_values, key=lambda item: item[0])
start = dict(ordered[0][1] or {})
mid = dict(ordered[len(ordered) // 2][1] or {})
end = dict(ordered[-1][1] or {})
if len(ordered) < 2:
return {
"start": start,
"mid": {"trigger_event": "样本跨度不足,未识别到稳定变化事件"},
"end": {"final_state": "未判定(片段跨度不足)"},
}
spread = 0
for dim in self._value_dimensions():
series = [int(values.get(dim, fallback_values.get(dim, 5))) for _, values in ordered]
spread = max(spread, max(series) - min(series))
if spread < 1:
return {
"start": start,
"mid": {"trigger_event": "未识别到明确变化事件"},
"end": {"final_state": "静态人物或当前片段未呈现稳定弧光"},
}
return {
"start": start,
"mid": {**mid, "trigger_event": "关键关系或冲突推动"},
"end": {**end, "final_state": "阶段性收束"},
}
def _infer_speech_style(self, lines: List[str], archetype: str) -> str:
configured = str(self.archetypes.get(archetype, {}).get("speech_style", "")).strip() if archetype != "default" else ""
if not lines:
return configured or self.style_templates.get("quiet", "发言偏少,更多通过态度和分寸表明立场。")
avg_len = sum(len(item) for item in lines) / max(1, len(lines))
exclaim_ratio = sum(1 for item in lines if any(token in item for token in ("!", "!", "?", "?")))
if avg_len <= 12:
return self.style_templates.get("short_direct", "句式偏短,较少铺垫,态度来得直接。")
if exclaim_ratio >= max(1, len(lines) // 3):
return self.style_templates.get("emotional", "情绪浮在表面,回应时容易带出锋芒或波动。")
if avg_len >= 26:
return self.style_templates.get("long_reflective", "句式较长,喜欢把轻重和前因后果慢慢展开。")
return self.style_templates.get("balanced", "表达有分寸,既不极短,也不刻意铺陈。")
def _infer_decision_rules(
self,
thoughts: List[str],
descriptions: List[str],
dialogues: List[str],
archetype: str,
) -> List[str]:
corpus_lines = self._dedupe_texts(thoughts[:12] + dialogues[:12] + descriptions[:12], 30)
scored_rules: List[Tuple[int, str]] = []
for _, config in self.decision_rule_signals.items():
markers = [str(item).strip() for item in config.get("markers", []) if str(item).strip()]
template = str(config.get("template", "")).strip()
if not markers or not template:
continue
marker_hits = 0
sentence_hits = 0
for line in corpus_lines:
hit_count = sum(line.count(marker) for marker in markers)
if hit_count <= 0:
continue
marker_hits += min(3, hit_count)
sentence_hits += 1
if marker_hits <= 0:
continue
scored_rules.append((marker_hits + min(3, sentence_hits), template))
scored_rules.sort(key=lambda item: item[0], reverse=True)
rules = [rule for _, rule in scored_rules[:3]]
archetype_rules = [
str(item).strip() for item in self.archetypes.get(archetype, {}).get("decision_rules", []) if str(item).strip()
]
if len(rules) < 2:
rules.extend(archetype_rules[: 2 - len(rules)])
joined = "".join(dialogues[:8])
if any(token in joined for token in ("先", "且慢", "慢些", "等等")):
rules.append("不会一上来把话说死,通常会先留一步判断。")
if any(token in joined for token in ("不可", "不能", "休得", "岂可")):
rules.append("遇到底线问题时,会明显收紧语气并立即表态。")
if not rules:
rules.append("高压情境下,会先分清轻重和后果,再决定动作。")
return self._dedupe_texts(rules, 8)
def _infer_identity_anchor(
self,
core_traits: List[str],
values: Dict[str, int],
decision_rules: List[str],
archetype: str,
) -> str:
configured = str(self.archetypes.get(archetype, {}).get("identity_anchor", "")).strip()
if configured:
return configured
top_value = self._top_dimensions(values, count=2)
if "责任" in top_value:
return "遇到局面时,习惯先把担子接住的人"
if "忠诚" in top_value:
return "把信义和跟随关系看得很重的人"
if "正义" in top_value:
return "先分是非,再谈利害的人"
if "智慧" in top_value or "谨慎" in core_traits:
return "凡事先探虚实和后势的人"
if any("自己人" in rule for rule in decision_rules):
return "见不得身边人独自受压的人"
return "不会轻率交出真实态度的人"
def _infer_soul_goal(self, values: Dict[str, int], core_traits: List[str], archetype: str) -> str:
configured = str(self.archetypes.get(archetype, {}).get("soul_goal", "")).strip()
if configured:
return configured
top_value = self._top_dimensions(values, count=1)[0]
mapping = {
"责任": "把眼前的人和局面尽量稳住,不让局势轻易散掉",
"忠诚": "守住已经认下的承诺与关系,不轻易失信",
"正义": "把轻重和是非摆正,不让局势被歪理带偏",
"智慧": "先看清局势再动手,尽量少走弯路",
"勇气": "真到要紧处,愿意先一步站到前面",
"善良": "尽量少伤人心,也少伤无辜之人",
"自由": "不给自己和身边人活成任人摆布的棋子",
"野心": "借势把局面推向更远的位置,而不止是应付眼前",
}
if "执拗" in core_traits and "正义" not in mapping:
return "认准了就要做到底,不愿轻易退回去"
return mapping.get(top_value, "把事情看透,再把自己真正想守的东西守住")
def _infer_life_experience(
self,
descriptions: List[str],
dialogues: List[str],
thoughts: List[str],
decision_rules: List[str],
values: Dict[str, int],
archetype: str,
) -> List[str]:
configured = self.archetypes.get(archetype, {}).get("life_experience", "")
lines = [str(configured).strip()] if str(configured).strip() else []
corpus = " ".join(descriptions[:6] + thoughts[:6])
if any(token in corpus for token in ("旧事", "往年", "从前", "昔日")):
lines.append("过往经历仍在影响当下的分寸和判断。")
if any("先收住" in rule or "后势" in rule for rule in decision_rules):
lines.append("见过局势反覆之后,更少只凭一时热气定夺。")
if values.get("责任", 5) >= 8:
lines.append("这些经历让他更习惯替旁人托底,而不是只顾自己。")
if values.get("善良", 5) >= 8:
lines.append("看过人心冷暖之后,更不愿把无辜者推到前面。")
if not lines:
lines.append("经历过人情与局势的反覆,因此很少只看眼前这一层。")
return self._dedupe_texts(lines, 4)
def _infer_worldview(self, values: Dict[str, int], core_traits: List[str], archetype: str) -> str:
configured = str(self.archetypes.get(archetype, {}).get("worldview", "")).strip()
if configured:
return configured
top_value = self._top_dimensions(values, count=2)
if "忠诚" in top_value:
return "先看人是否可靠,再看事值不值得做。"
if "正义" in top_value:
return "是非若站不稳,利益再大也不该轻动。"
if "智慧" in top_value:
return "世事最怕只看一面,虚实和后势都要算进去。"
if "责任" in top_value or "善良" in top_value:
return "局面再乱,也不能把身边人与无辜者轻易丢下。"
if "谨慎" in core_traits:
return "先看清,再落子,宁慢一步,不乱一步。"
return "说话做事都不能只图一时痛快,还得顾后果。"
def _infer_thinking_style(
self,
values: Dict[str, int],
core_traits: List[str],
speech_style: str,
archetype: str,
) -> str:
configured = str(self.archetypes.get(archetype, {}).get("thinking_style", "")).strip()
if configured:
return configured
top_value = self._top_dimensions(values, count=1)[0]
if top_value == "智慧" or "谨慎" in core_traits:
return "先拆局势,再定立场。"
if top_value in {"忠诚", "正义"}:
return "先问对错与名分,再谈成败。"
if top_value == "勇气":
return "先看该不该顶上,再看怎么顶。"
if "敏感" in core_traits:
return "先感受人心冷暖,再决定把话说到几分。"
if "直白" in speech_style:
return "先抓最要紧的一点,直接给态度。"
return "先稳住分寸,再把轻重说清。"
def _infer_speech_habits(self, dialogues: List[str], speech_style: str) -> Dict[str, Any]:
cadence = "medium"
if dialogues:
window = dialogues[:8]
avg_len = sum(len(item) for item in window) / max(1, len(window))
questionish = sum(1 for item in window if any(token in item for token in ("?", "?", "何", "怎", "吗")))
exclaimish = sum(1 for item in window if any(token in item for token in ("!", "!", "快", "休", "莫")))
if avg_len <= 11 or questionish >= max(2, len(window) // 2):
cadence = "short"
elif avg_len >= 24 and exclaimish <= max(1, len(window) // 4):
cadence = "long"
if cadence == "medium":
if "句式偏短" in speech_style or "直白" in speech_style:
cadence = "short"
elif "句式较长" in speech_style or "铺陈" in speech_style:
cadence = "long"
signature_phrases: List[str] = []
for line in dialogues[:6]:
for fragment in self.signature_fragments:
if fragment in line and fragment not in signature_phrases:
signature_phrases.append(fragment)
for fragment in self._extract_signature_phrases(dialogues):
if fragment not in signature_phrases:
signature_phrases.append(fragment)
return {
"cadence": cadence,
"signature_phrases": signature_phrases[:4],
"sentence_openers": self._extract_dialogue_markers(dialogues, self.opener_patterns, position="start"),
"connective_tokens": self._extract_dialogue_markers(dialogues, self.connective_patterns, position="any"),
"sentence_endings": self._extract_dialogue_markers(dialogues, self.ending_patterns, position="end"),
"forbidden_fillers": list(self.generic_fillers),
}
@staticmethod
def _infer_emotion_profile(
dialogues: List[str],
thoughts: List[str],
speech_style: str,
core_traits: List[str],
) -> Dict[str, Any]:
anger = "怒时会先压住锋芒,说话更冷更短。" if "克制" in speech_style else "怒时会把边界和态度讲得更硬。"
joy = "高兴时也不轻浮,只会略略放松语气。" if "克制" in speech_style else "高兴时语气会明显松快一些。"
grievance = "受委屈时多半先忍住,不肯立刻摊开。" if "敏感" in core_traits else "受委屈时会把态度说得更直。"
if any("叹" in item for item in thoughts[:6]):
grievance = "受屈时往往先把情绪收在心里,再慢慢露出来。"
return {
"anger_style": anger,
"joy_style": joy,
"grievance_style": grievance,
}
def _infer_taboo_topics(
self,
values: Dict[str, int],
core_traits: List[str],
decision_rules: List[str],
) -> List[str]:
topics: List[str] = []
for value_name, configured_topics in self.taboo_topics_by_value.items():
if values.get(value_name, 5) >= 8:
topics.extend(str(item).strip() for item in configured_topics if str(item).strip())
if "敏感" in core_traits:
topics.append("拿人心取笑")
if any("自己人" in rule for rule in decision_rules):
topics.append("牺牲自己人")
return self._dedupe_texts(topics, 6)
def _infer_forbidden_behaviors(
self,
values: Dict[str, int],
core_traits: List[str],
speech_style: str,
) -> List[str]:
bans: List[str] = []
for value_name, configured_bans in self.forbidden_behaviors_by_value.items():
if values.get(value_name, 5) >= 8:
bans.extend(str(item).strip() for item in configured_bans if str(item).strip())
if "克制" in speech_style:
bans.append("不会无缘无故撒泼失态")
if "谨慎" in core_traits:
bans.append("不会在虚实未明时把话说死")
return self._dedupe_texts(bans, 6)
@staticmethod
def _infer_core_identity(
identity_anchor: str,
core_traits: List[str],
descriptions: List[str],
dialogues: List[str],
) -> str:
if identity_anchor:
return identity_anchor
first_scene = next((line for line in descriptions[:6] if line.strip()), "")
if first_scene:
return first_scene[:36]
if core_traits:
return f"在众人眼里,多半以{'、'.join(core_traits[:2])}的一面被记住。"
if dialogues:
return "多通过说话和临场态度来定义自己。"
return "身份轮廓仍需更多正文证据补全。"
@staticmethod
def _infer_faction_position(
name: str,
descriptions: List[str],
dialogues: List[str],
thoughts: List[str],
values: Dict[str, int],
) -> str:
corpus = descriptions[:10] + dialogues[:6] + thoughts[:6]
identity_tokens = ("氏", "宗主", "家主", "公子", "少主", "门下", "弟子", "门生", "师门", "世家", "本家")
for line in corpus:
if name not in line:
continue
if NovelDistiller._looks_like_metadata_sentence(line):
continue
if not any(token in line for token in identity_tokens):
continue
clauses = [part.strip() for part in re.split(r"[,。!?;:、]", line) if part.strip()]
for clause in clauses:
if name in clause and any(token in clause for token in identity_tokens) and len(clause) <= 28:
return clause
if values.get("忠诚", 5) >= 7:
return "立场通常会向自己认定的人与所属一侧收拢,不会轻易改换。"
if values.get("自由", 5) >= 7:
return "对阵营与规训保持距离,更倾向保留自主转圜。"
return "立场更多随关系轻重与局势演变而显形。"
@staticmethod
def _infer_background_imprint(
life_experience: List[str],
values: Dict[str, int],
descriptions: List[str],
) -> str:
if life_experience:
return life_experience[0]
if any(token in "".join(descriptions[:8]) for token in ("旧事", "从前", "少年", "幼时", "家中", "门下")):
return "成长环境与旧事仍在影响如今的取舍和分寸。"
if values.get("责任", 5) >= 7:
return "长期处在要接事、扛事的位置,环境把人磨得更会托底。"
return "生存处境留下的烙印更多体现在谨慎与边界感上。"
@staticmethod
def _infer_world_rule_fit(values: Dict[str, int], decision_rules: List[str], speech_style: str) -> str:
if any("边界" in rule or "规矩" in rule for rule in decision_rules):
return "更倾向在现有规则内划清边界,必要时才顶着规则推进。"
if values.get("自由", 5) >= 7:
return "会和世界规则保持拉扯,能借势时借势,受制时就想挣开。"
if "克制" in speech_style:
return "整体与世界运转规则较为相容,除非底线被逼到眼前。"
return "对世界规则既不盲从,也不会无端硬撞,更多看局势取舍。"
@staticmethod
def _infer_strengths(core_traits: List[str], decision_rules: List[str], speech_style: str) -> List[str]:
mapping = {
"勇敢": "关键时刻敢于顶上承压",
"聪慧": "擅长拆解局势与看出破口",
"克制": "能在情绪上头时收束表达",
"沉稳": "能在混乱里稳住节奏和后手",
"忠诚": "对认定的人和承诺有持续性",
"善良": "照顾人心与无辜者时不容易失手",
"机变": "面对变化时转身快、补位快",
"诙谐": "会用语言缓冲气氛或卸力",
"执拗": "认准方向后执行力强",
"敏感": "对情绪、气氛和关系变化更早觉察",
}
strengths = [mapping[trait] for trait in core_traits if trait in mapping]
if any("护住" in rule or "自己人" in rule for rule in decision_rules):
strengths.append("在关系压力下仍愿意主动护人")
if "句式偏短" in speech_style:
strengths.append("表态快,不容易在关键处含混")
return NovelDistiller._dedupe_texts(strengths, 5)
@staticmethod
def _infer_weaknesses(
core_traits: List[str],
emotion_profile: Dict[str, Any],
speech_style: str,
) -> List[str]:
mapping = {
"傲气": "不肯轻易低头,容易把关系逼紧",
"敏感": "旧事和情绪牵动时会放大心里落差",
"执拗": "认准之后回头慢,容易和现实硬碰",
"勇敢": "容易在高压里先把自己推到前面",
"诙谐": "有时会用玩笑遮掩真正的在意",
"克制": "太能压住情绪时,真实想法不易被旁人看懂",
}
weaknesses = [mapping[trait] for trait in core_traits if trait in mapping]
if "更冷更短" in str(emotion_profile.get("anger_style", "")):
weaknesses.append("怒时会迅速关上沟通窗口")
if "句式偏短" in speech_style:
weaknesses.append("说得太短时,容易让人只看到锋芒")
return NovelDistiller._dedupe_texts(weaknesses, 5)
@staticmethod
def _infer_cognitive_limits(values: Dict[str, int], core_traits: List[str]) -> List[str]:
limits: List[str] = []
if values.get("忠诚", 5) >= 7:
limits.append("容易把关系旧账和情分看得过重")
if values.get("自由", 5) >= 7:
limits.append("一旦感到被钳制,判断会更偏向先挣脱")
if values.get("勇气", 5) >= 7:
limits.append("容易高估自己顶住局面的能力")
if "敏感" in core_traits:
limits.append("对态度和语气变化容易产生额外联想")
if "傲气" in core_traits:
limits.append("面对挑衅时不容易完全抽离情绪")
return NovelDistiller._dedupe_texts(limits, 4)
@staticmethod
def _infer_action_style(values: Dict[str, int], decision_rules: List[str], speech_style: str) -> str:
if any("先辨清" in rule or "虚实" in rule for rule in decision_rules):
return "先探局、后落子,确认虚实后才会真正压上。"
if any("护住" in rule or "出手" in rule for rule in decision_rules):
return "遇到人和局面同时承压时,往往会边护边推进。"
if "句式偏短" in speech_style:
return "行事和发言一样偏直接,确认方向后动作不拖。"
return "行事风格更看当时轻重,会在直进与收手之间找平衡。"
@staticmethod
def _infer_social_mode(values: Dict[str, int], core_traits: List[str], speech_style: str) -> str:
if values.get("忠诚", 5) >= 7 or values.get("责任", 5) >= 7:
return "对自己人会明显偏护,对陌生人先看分寸和可靠度。"
if values.get("自由", 5) >= 7:
return "与人相处先保留边界,不喜欢被人一步步拿住。"
if "克制" in speech_style:
return "不轻易交底,亲疏远近要靠时间和事来慢慢试。"
return "表面进退都快,但真正认人与否仍有自己的门槛。"
@staticmethod
def _infer_key_bonds(values: Dict[str, int], decision_rules: List[str], taboo_topics: List[str]) -> List[str]:
bonds: List[str] = []
if any("护住" in rule or "自己人" in rule for rule in decision_rules):
bonds.append("一旦认定为自己人,牵绊会深到影响后续所有选择")
if values.get("忠诚", 5) >= 7:
bonds.append("对共同经历风险的人更容易形成长期同盟感")
if "背叛" in taboo_topics:
bonds.append("关系一旦触及失信,往往很难彻底回到从前")
if not bonds:
bonds.append("关系深浅通常要经过试探、兑现和并肩之后才会坐实")
return NovelDistiller._dedupe_texts(bonds, 4)
@staticmethod
def _infer_reward_logic(values: Dict[str, int], core_traits: List[str]) -> str:
if values.get("忠诚", 5) >= 7:
return "记恩也记失信,认定后会长期回护,翻脸时也很难装作无事。"
if values.get("正义", 5) >= 7:
return "更看是非和底线,赏罚首先取决于事情本身站不站得住。"
if values.get("自由", 5) >= 7:
return "对强压与操控格外记仇,对给空间的人会自然放软。"
if "敏感" in core_traits:
return "对态度冷热记得很深,报答和疏远都会来得直接。"
return "恩怨判断常看对方是否越线,以及关键时候有没有站住。"
@staticmethod
def _infer_hidden_desire(values: Dict[str, int], soul_goal: str) -> str:
if values.get("责任", 5) >= 7:
return "比起表面赢输,更深处想守住能让自己安心的人与位置。"
if values.get("自由", 5) >= 7:
return "最深处仍想保住不被摆布、不被定死的活法。"
if values.get("忠诚", 5) >= 7:
return "深层里渴望关系能被确认,也害怕自己认下的东西再次散掉。"
if values.get("正义", 5) >= 7:
return "真正放不下的是是非被颠倒、真相被压住。"
return soul_goal or "表面目标之外,仍有一层不愿被人轻易看穿的心里执念。"
@staticmethod
def _infer_inner_conflict(values: Dict[str, int], core_traits: List[str], decision_rules: List[str]) -> str:
if values.get("勇气", 5) >= 7 and values.get("智慧", 5) >= 6:
return "一边想立刻顶上,一边又不肯在虚实未明时贸然落子。"
if values.get("忠诚", 5) >= 7 and values.get("正义", 5) >= 7:
return "既想护住亲近之人,又不愿彻底把是非让给关系。"
if values.get("自由", 5) >= 7 and values.get("责任", 5) >= 7:
return "想保留自己的转圜空间,但关键时候又很难真正抽身。"
if "敏感" in core_traits and any("边界" in rule for rule in decision_rules):
return "心里在意远近冷热,表面却还要把边界和硬气撑住。"
return "内心常在分寸、关系和自我立场之间来回拉扯。"
@staticmethod
def _infer_fear_triggers(
values: Dict[str, int],
taboo_topics: List[str],
forbidden_behaviors: List[str],
) -> List[str]:
fears = list(taboo_topics[:3])
if values.get("自由", 5) >= 7:
fears.append("被强行摆布或失去选择")
if values.get("责任", 5) >= 7 or values.get("忠诚", 5) >= 7:
fears.append("眼看自己人出事却来不及接住")
if values.get("正义", 5) >= 7:
fears.append("黑白被颠倒、该追的账没人去追")
for item in forbidden_behaviors[:2]:
if "不会" in item:
fears.append(item.replace("不会", "最怕自己被逼到").replace("无缘无故", ""))
return NovelDistiller._dedupe_texts(fears, 5)
@staticmethod
def _infer_private_self(speech_style: str, emotion_profile: Dict[str, Any], social_mode: str) -> str:
if "克制" in speech_style:
return "表面收得很紧,私下反而把轻重、牵挂和受过的伤记得更深。"
if "句式偏短" in speech_style:
return "表面锋利干脆,独处时其实更容易反复掂量关系与后果。"
if "委屈" in str(emotion_profile.get("grievance_style", "")):
return "外面未必肯示弱,真正难受时多半只在无人处慢慢消化。"
return f"表面与私下并不完全一致,真正松下来时更在意:{social_mode}"
@staticmethod
def _infer_story_role(
descriptions: List[str],
dialogues: List[str],
thoughts: List[str],
decision_rules: List[str],
) -> str:
presence = len(descriptions) + len(dialogues) + len(thoughts)
if presence >= 40:
base = "剧情核心推动者"
elif presence >= 24:
base = "主要支点角色"
elif presence >= 12:
base = "重要牵动者"
else:
base = "辅助推动角色"
if any("护住" in rule or "后手" in rule for rule in decision_rules):
return f"{base},同时常承担兜底或接应压力。"
if len(dialogues) >= max(4, len(descriptions) // 2):
return f"{base},更常通过对话和态度推动场面。"
return f"{base},对局势走向有持续影响。"
@staticmethod
def _infer_belief_anchor(values: Dict[str, int], worldview: str) -> str:
if values.get("忠诚", 5) >= 7:
return "信义和认下的人不能轻易后置。"
if values.get("正义", 5) >= 7:
return "是非必须站稳,否则其余一切都容易变形。"
if values.get("责任", 5) >= 7:
return "人在局中就该把该接的担子接住。"
if values.get("自由", 5) >= 7:
return "再大的局,也不能把自己活成别人手里的棋。"
return worldview or "真正撑住他的,是一套不会轻易改口的内在秩序。"
@staticmethod
def _infer_stance_stability(values: Dict[str, int], decision_rules: List[str]) -> str:
ordered = sorted((int(score), key) for key, score in values.items())
if ordered:
top_score, _ = ordered[-1]
second_score = ordered[-2][0] if len(ordered) > 1 else top_score
if top_score - second_score >= 2:
return "立场较稳,轻易不会因为外界一句话就倒向另一边。"
if any("留一步" in rule or "转圜" in rule for rule in decision_rules):
return "表面会留转圜,但真正底线并不飘,更多是策略性松紧。"
return "会受关系与局势牵动,但整体底线仍相对稳定。"
@classmethod
def _looks_like_metadata_sentence(cls, line: str) -> bool:
text = str(line or "").strip()
metadata_tokens = (
"内容标签",
"搜索关键字",
"主角",
"配角",
"作者",
"文案",
"简介",
"作品",
"版权",
"编辑评价",
"作者笔下",
"我的文",
"读者",
"微博",
"专栏",
"安利",
"公告",
"世界和平",
"请不要",
"收藏",
"推荐",
"点击",
"1V1",
"HE",
)
if any(token in text for token in metadata_tokens):
return True
if text.startswith(("PS", "P.S", "ps", "Ps")):
return True
return False
@classmethod
def _prepare_novel_text(cls, text: str) -> str:
raw_lines = [line.rstrip() for line in str(text or "").splitlines()]
lines = list(raw_lines)
for idx, line in enumerate(raw_lines[:400]):
stripped = line.strip()
if not stripped:
continue
if any(pattern.search(stripped) for pattern in cls.CHAPTER_HEADING_PATTERNS):
if idx >= 5:
lines = raw_lines[idx:]
break
filtered: List[str] = []
for line in lines:
stripped = line.strip()
if not stripped:
filtered.append("")
continue
if cls._looks_like_metadata_sentence(stripped):
continue
filtered.append(line)
return "\n".join(filtered).strip()
def _export_persona_bundle(self, out_dir: Path, profile: Dict[str, Any]) -> None:
char_dir = ensure_dir(out_dir / safe_filename(profile.get("name", "unnamed")))
profile_content = self._render_profile_md(profile)
(char_dir / "PROFILE.generated.md").write_text(profile_content, encoding="utf-8")
editable_profile = char_dir / "PROFILE.md"
if not editable_profile.exists():
editable_profile.write_text(profile_content, encoding="utf-8")
bundle = {
"SOUL": self._render_soul_md(profile),
"IDENTITY": self._render_identity_md(profile),
"BACKGROUND": self._render_background_md(profile),
"CAPABILITY": self._render_capability_md(profile),
"BONDS": self._render_bonds_md(profile),
"CONFLICTS": self._render_conflicts_md(profile),
"ROLE": self._render_role_md(profile),
"AGENTS": self._render_agents_md(profile),
"MEMORY": self._render_memory_md(profile),
}
if self._should_create_goals_md(profile):
bundle["GOALS"] = self._render_goals_md(profile)
if self._should_create_style_md(profile):
bundle["STYLE"] = self._render_style_md(profile)
if self._should_create_trauma_md(profile):
bundle["TRAUMA"] = self._render_trauma_md(profile)
for base_name, content in bundle.items():
generated = char_dir / f"{base_name}.generated.md"
generated.write_text(content, encoding="utf-8")
editable = char_dir / f"{base_name}.md"
if not editable.exists():
editable.write_text(content, encoding="utf-8")
self.refresh_persona_navigation(char_dir, str(profile.get("name", "")))
@classmethod
def refresh_persona_navigation(cls, persona_dir: Path, character_name: str) -> None:
generated = persona_dir / "NAVIGATION.generated.md"
generated.write_text(cls._render_navigation_generated_md(persona_dir, character_name), encoding="utf-8")
editable = persona_dir / "NAVIGATION.md"
if not editable.exists():
editable.write_text(cls._render_navigation_override_md(), encoding="utf-8")
@classmethod
def _render_navigation_generated_md(cls, persona_dir: Path, character_name: str) -> str:
active_order = [
base_name for base_name in cls.DEFAULT_NAV_LOAD_ORDER if cls._persona_file_is_active(persona_dir, base_name)
]
if not active_order:
active_order = ["SOUL", "IDENTITY", "AGENTS", "MEMORY"]
lines = [
"# NAVIGATION",
"<!-- Runtime entrypoint. Read this file first, then follow load_order. -->",
"",
"## Runtime",
f"- character: {character_name}",
f"- load_order: {' -> '.join(active_order)}",
"- first_read: NAVIGATION.generated.md -> NAVIGATION.md overrides",
"- write_back: MEMORY handles durable user guidance and corrections; RELATIONS handles target-specific manual edits",
"",
]
for base_name in cls.DEFAULT_NAV_LOAD_ORDER:
meta = cls.PERSONA_FILE_CATALOG.get(base_name, {})
lines.extend(
[
f"## {base_name}",
f"- status: {'active' if cls._persona_file_is_active(persona_dir, base_name) else 'inactive'}",
f"- optional: {'yes' if meta.get('optional', True) else 'no'}",
f"- file: {base_name}.md",
f"- fallback: {base_name}.generated.md",
f"- present: {'yes' if cls._persona_file_exists(persona_dir, base_name) else 'no'}",
f"- role: {meta.get('role', '')}",
f"- behaviors: {meta.get('behaviors', '')}",
f"- write_policy: {meta.get('write_policy', 'manual_edit')}",
"",
]
)
return "\n".join(lines).rstrip() + "\n"
@staticmethod
def _render_navigation_override_md() -> str:
return (
"# NAVIGATION\n"
"<!-- Optional overrides for the generated navigation map.\n"
"Use the same key format as NAVIGATION.generated.md.\n"
"-->\n"
)
@staticmethod
def _persona_file_exists(persona_dir: Path, base_name: str) -> bool:
return (persona_dir / f"{base_name}.md").exists() or (persona_dir / f"{base_name}.generated.md").exists()
@classmethod
def _persona_file_is_active(cls, persona_dir: Path, base_name: str) -> bool:
if not cls.PERSONA_FILE_CATALOG.get(base_name, {}).get("optional", True):
return True
return cls._persona_file_exists(persona_dir, base_name)
def _render_profile_md(self, profile: Dict[str, Any]) -> str:
speech_habits = profile.get("speech_habits", {}) if isinstance(profile.get("speech_habits", {}), dict) else {}
emotion = profile.get("emotion_profile", {}) if isinstance(profile.get("emotion_profile", {}), dict) else {}
arc = profile.get("arc", {}) if isinstance(profile.get("arc", {}), dict) else {}
evidence = profile.get("evidence", {}) if isinstance(profile.get("evidence", {}), dict) else {}
return (
"# PROFILE\n"
"<!-- Canonical markdown profile storage. Runtime loads this file before persona overlays. -->\n\n"
"## Meta\n"
f"- name: {profile.get('name', '')}\n"
f"- novel_id: {profile.get('novel_id', '')}\n"
f"- source_path: {profile.get('source_path', '')}\n\n"
"## Core\n"
f"- core_traits: {self._join_items(profile.get('core_traits', []))}\n"
f"- values: {self._join_metric_map(profile.get('values', {}))}\n"
f"- speech_style: {profile.get('speech_style', '')}\n"
f"- identity_anchor: {profile.get('identity_anchor', '')}\n"
f"- soul_goal: {profile.get('soul_goal', '')}\n"
f"- worldview: {profile.get('worldview', '')}\n"
f"- thinking_style: {profile.get('thinking_style', '')}\n\n"
"## Deep Persona\n"
f"- core_identity: {profile.get('core_identity', '')}\n"
f"- faction_position: {profile.get('faction_position', '')}\n"
f"- background_imprint: {profile.get('background_imprint', '')}\n"
f"- world_rule_fit: {profile.get('world_rule_fit', '')}\n"
f"- social_mode: {profile.get('social_mode', '')}\n"
f"- hidden_desire: {profile.get('hidden_desire', '')}\n"
f"- inner_conflict: {profile.get('inner_conflict', '')}\n"
f"- story_role: {profile.get('story_role', '')}\n"
f"- belief_anchor: {profile.get('belief_anchor', '')}\n"
f"- private_self: {profile.get('private_self', '')}\n"
f"- stance_stability: {profile.get('stance_stability', '')}\n"
f"- reward_logic: {profile.get('reward_logic', '')}\n"
f"- strengths: {self._join_items(profile.get('strengths', []))}\n"
f"- weaknesses: {self._join_items(profile.get('weaknesses', []))}\n"
f"- cognitive_limits: {self._join_items(profile.get('cognitive_limits', []))}\n"
f"- fear_triggers: {self._join_items(profile.get('fear_triggers', []))}\n"
f"- key_bonds: {self._join_items(profile.get('key_bonds', []))}\n"
f"- action_style: {profile.get('action_style', '')}\n\n"
"## Voice\n"
f"- typical_lines: {self._join_items(profile.get('typical_lines', []))}\n"
f"- decision_rules: {self._join_items(profile.get('decision_rules', []))}\n"
f"- life_experience: {self._join_items(profile.get('life_experience', []))}\n"
f"- taboo_topics: {self._join_items(profile.get('taboo_topics', []))}\n"
f"- forbidden_behaviors: {self._join_items(profile.get('forbidden_behaviors', []))}\n"
f"- cadence: {speech_habits.get('cadence', '')}\n"
f"- signature_phrases: {self._join_items(speech_habits.get('signature_phrases', []))}\n"
f"- sentence_openers: {self._join_items(speech_habits.get('sentence_openers', []))}\n"
f"- connective_tokens: {self._join_items(speech_habits.get('connective_tokens', []))}\n"
f"- sentence_endings: {self._join_items(speech_habits.get('sentence_endings', []))}\n"
f"- forbidden_fillers: {self._join_items(speech_habits.get('forbidden_fillers', []))}\n"
f"- anger_style: {emotion.get('anger_style', '')}\n"
f"- joy_style: {emotion.get('joy_style', '')}\n"
f"- grievance_style: {emotion.get('grievance_style', '')}\n\n"
"## Arc\n"
f"- arc_start: {self._join_metric_map(arc.get('start', {}))}\n"
f"- arc_mid: {self._join_metric_map(arc.get('mid', {}))}\n"
f"- arc_end: {self._join_metric_map(arc.get('end', {}))}\n\n"
"## Evidence\n"
f"- description_count: {evidence.get('description_count', 0)}\n"
f"- dialogue_count: {evidence.get('dialogue_count', 0)}\n"
f"- thought_count: {evidence.get('thought_count', 0)}\n"
f"- chunk_count: {evidence.get('chunk_count', 0)}\n"
)
def _render_soul_md(self, profile: Dict[str, Any]) -> str:
return (
"# SOUL\n\n"
"## Core\n"
f"- identity_anchor: {profile.get('identity_anchor', '')}\n"
f"- soul_goal: {profile.get('soul_goal', '')}\n"
f"- worldview: {profile.get('worldview', '')}\n"
f"- thinking_style: {profile.get('thinking_style', '')}\n"
f"- taboo_topics: {self._join_items(profile.get('taboo_topics', []))}\n"
f"- forbidden_behaviors: {self._join_items(profile.get('forbidden_behaviors', []))}\n"
)
def _render_goals_md(self, profile: Dict[str, Any]) -> str:
return (
"# GOALS\n\n"
"## Long Arc\n"
f"- soul_goal: {profile.get('soul_goal', '')}\n"
f"- decision_rules: {self._join_items(profile.get('decision_rules', []))}\n"
f"- arc_end: {self._join_metric_map(profile.get('arc', {}).get('end', {}))}\n"
)
def _render_style_md(self, profile: Dict[str, Any]) -> str:
speech_habits = profile.get("speech_habits", {}) if isinstance(profile.get("speech_habits", {}), dict) else {}
return (
"# STYLE\n\n"
"## Expression\n"
f"- speech_style: {profile.get('speech_style', '')}\n"
f"- typical_lines: {self._join_items(profile.get('typical_lines', []))}\n"
f"- cadence: {speech_habits.get('cadence', '')}\n"
f"- signature_phrases: {self._join_items(speech_habits.get('signature_phrases', []))}\n"
f"- sentence_openers: {self._join_items(speech_habits.get('sentence_openers', []))}\n"
f"- connective_tokens: {self._join_items(speech_habits.get('connective_tokens', []))}\n"
f"- sentence_endings: {self._join_items(speech_habits.get('sentence_endings', []))}\n"
f"- forbidden_fillers: {self._join_items(speech_habits.get('forbidden_fillers', []))}\n"
)
def _render_trauma_md(self, profile: Dict[str, Any]) -> str:
return (
"# TRAUMA\n\n"
"## Boundaries\n"
f"- taboo_topics: {self._join_items(profile.get('taboo_topics', []))}\n"
f"- forbidden_behaviors: {self._join_items(profile.get('forbidden_behaviors', []))}\n"
f"- grievance_style: {profile.get('emotion_profile', {}).get('grievance_style', '')}\n"
)
def _render_identity_md(self, profile: Dict[str, Any]) -> str:
emotion = profile.get("emotion_profile", {}) if isinstance(profile.get("emotion_profile", {}), dict) else {}
return (
"# IDENTITY\n\n"
"## Self\n"
f"- identity_anchor: {profile.get('identity_anchor', '')}\n"
f"- life_experience: {self._join_items(profile.get('life_experience', []))}\n"
f"- core_traits: {self._join_items(profile.get('core_traits', []))}\n"
f"- values: {self._join_metric_map(profile.get('values', {}))}\n"
f"- anger_style: {emotion.get('anger_style', '')}\n"
f"- joy_style: {emotion.get('joy_style', '')}\n"
f"- grievance_style: {emotion.get('grievance_style', '')}\n"
)
def _render_background_md(self, profile: Dict[str, Any]) -> str:
return (
"# BACKGROUND\n\n"
"## World Position\n"
f"- core_identity: {profile.get('core_identity', '')}\n"
f"- faction_position: {profile.get('faction_position', '')}\n"
f"- background_imprint: {profile.get('background_imprint', '')}\n"
f"- world_rule_fit: {profile.get('world_rule_fit', '')}\n"
)
def _render_capability_md(self, profile: Dict[str, Any]) -> str:
return (
"# CAPABILITY\n\n"
"## Strength And Cost\n"
f"- strengths: {self._join_items(profile.get('strengths', []))}\n"
f"- weaknesses: {self._join_items(profile.get('weaknesses', []))}\n"
f"- cognitive_limits: {self._join_items(profile.get('cognitive_limits', []))}\n"
f"- action_style: {profile.get('action_style', '')}\n"
)
def _render_bonds_md(self, profile: Dict[str, Any]) -> str:
return (
"# BONDS\n\n"
"## Relationship Habit\n"
f"- social_mode: {profile.get('social_mode', '')}\n"
f"- key_bonds: {self._join_items(profile.get('key_bonds', []))}\n"
f"- reward_logic: {profile.get('reward_logic', '')}\n"
f"- belief_anchor: {profile.get('belief_anchor', '')}\n"
)
def _render_conflicts_md(self, profile: Dict[str, Any]) -> str:
return (
"# CONFLICTS\n\n"
"## Inner Pull\n"
f"- hidden_desire: {profile.get('hidden_desire', '')}\n"
f"- inner_conflict: {profile.get('inner_conflict', '')}\n"
f"- fear_triggers: {self._join_items(profile.get('fear_triggers', []))}\n"
f"- private_self: {profile.get('private_self', '')}\n"
)
def _render_role_md(self, profile: Dict[str, Any]) -> str:
return (
"# ROLE\n\n"
"## Plot Function\n"
f"- story_role: {profile.get('story_role', '')}\n"
f"- stance_stability: {profile.get('stance_stability', '')}\n"
f"- world_rule_fit: {profile.get('world_rule_fit', '')}\n"
f"- arc_end: {self._join_metric_map(profile.get('arc', {}).get('end', {}))}\n"
)
def _render_agents_md(self, profile: Dict[str, Any]) -> str:
return (
"# AGENTS\n\n"
"## Runtime Rules\n"
"- group_chat_policy: 群聊中优先回应明确点名、关系最紧密或当前冲突最相关的对象\n"
"- silence_policy: 未被点名且无强关联时可保持一拍沉默,不抢戏\n"
"- correction_policy: 用户纠正与持续提示要写入 MEMORY,并在后续对话沿用\n"
f"- decision_rules: {self._join_items(profile.get('decision_rules', []))}\n"
)
def _render_memory_md(self, profile: Dict[str, Any]) -> str:
return (
"# MEMORY\n\n"
"## Stable Memory\n"
f"- canon_memory: {self._join_items(profile.get('life_experience', []))}\n"
f"- relationship_updates: \n"
"\n## Mutable Notes\n"
"- user_edits: \n"
"- notable_interactions: \n"
)
@staticmethod
def _should_create_goals_md(profile: Dict[str, Any]) -> bool:
return bool(str(profile.get("soul_goal", "")).strip() or profile.get("decision_rules"))
@staticmethod
def _should_create_style_md(profile: Dict[str, Any]) -> bool:
return bool(str(profile.get("speech_style", "")).strip() or profile.get("typical_lines"))
@staticmethod
def _should_create_trauma_md(profile: Dict[str, Any]) -> bool:
return bool(profile.get("taboo_topics") or profile.get("forbidden_behaviors"))
def _extract_spoken_content(
self,
sentence: str,
aliases: List[str],
prev_sent: str = "",
next_sent: str = "",
) -> str:
verb_pattern = "|".join(re.escape(item) for item in self.speech_verbs)
for alias in aliases:
escaped = re.escape(alias)
leading = rf"{escaped}[^\n“”\"]{{0,8}}(?:{verb_pattern})(?:[::,,\s]{{0,2}})?[“\"](?P<quote>[^”\"]+)"
for match in re.finditer(leading, sentence):
if self._looks_like_subject_position(sentence, match.start()):
return str(match.group("quote") or "").strip()
trailing = rf"[“\"](?P<quote>[^”\"]+)[”\"]?[^\n“”\"]{{0,8}}{escaped}[^\n“”\"]{{0,4}}(?:{verb_pattern})"
for match in re.finditer(trailing, sentence):
alias_start = match.start() + str(match.group(0)).find(alias)
if self._looks_like_subject_position(sentence, alias_start):
return str(match.group("quote") or "").strip()
return ""
def _is_likely_spoken_by(
self,
sentence: str,
aliases: List[str],
prev_sent: str = "",
next_sent: str = "",
) -> bool:
verb_pattern = "|".join(re.escape(item) for item in self.speech_verbs)
contexts = [sentence, f"{prev_sent}{sentence}", f"{sentence}{next_sent}", f"{prev_sent}{sentence}{next_sent}"]
for alias in aliases:
escaped = re.escape(alias)
for context in contexts:
outside_quotes = self._strip_quoted_content(context)
patterns = [
rf"{escaped}[^\n“”\"]{{0,8}}(?:{verb_pattern})(?:[::,,\s]{{0,2}})?",
rf"{escaped}[^\n“”\"]{{0,4}}[::]",
]
for pattern in patterns:
for match in re.finditer(pattern, outside_quotes):
if self._looks_like_subject_position(outside_quotes, match.start()):
return True
return False
def _looks_like_subject_position(self, text: str, alias_start: int) -> bool:
prefix = text[:alias_start].rstrip()
if not prefix:
return True
if prefix[-1] in ",。!?;:、“”\"'()()[]【】<>《》 \t\r\n":
return True
tail = prefix[-4:]
if any(tail.endswith(marker) for marker in self.object_leaders):
return False
if any(tail.endswith(marker) for marker in ("只见", "却说", "忽见", "便见", "原来", "那", "这", "又", "便")):
return True
return False
def _text_mentions_any_alias(self, text: str, aliases: List[str]) -> bool:
return any(self._contains_token(text, alias) for alias in aliases)
@staticmethod
def _contains_token(text: str, token: str) -> bool:
return bool(token) and token in text
@staticmethod
def _count_token_mentions(text: str, token: str) -> int:
return text.count(token) if token else 0
@staticmethod
def _strip_quoted_content(text: str) -> str:
stripped = re.sub(r"“[^”]*”", "", text)
return re.sub(r'"[^"]*"', "", stripped)
@staticmethod
def _looks_like_name(name: str) -> bool:
if len(name) < 2 or len(name) > 4:
return False
bad = {"但是", "于是", "因为", "如果", "然后", "突然", "还是", "已经", "不能", "不会"}
bad_suffixes = {"说", "道", "笑", "听", "问", "看", "想", "叹", "喊", "叫", "哭", "忙"}
return name not in bad and name[-1] not in bad_suffixes
@staticmethod
def _empty_bucket() -> Dict[str, List[str]]:
return {"descriptions": [], "dialogues": [], "thoughts": []}
def _value_dimensions(self) -> List[str]:
dims = self.config.get("distillation.values_dimensions", []) or list(self.value_markers.keys())
return [str(item).strip() for item in dims if str(item).strip()]
@staticmethod
def _join_items(items: Iterable[Any]) -> str:
cleaned = [str(item).strip() for item in items if str(item).strip()]
return ";".join(cleaned)
@staticmethod
def _join_metric_map(items: Dict[str, Any]) -> str:
if not isinstance(items, dict):
return ""
parts = []
for key, value in items.items():
key_text = str(key).strip()
if key_text:
parts.append(f"{key_text}={value}")
return ";".join(parts)
@staticmethod
def _dedupe_texts(items: Iterable[str], limit: int) -> List[str]:
cleaned = NovelDistiller._unique_texts(
re.sub(r"\s+", " ", str(item).strip()) for item in items if str(item).strip()
)
return cleaned[:limit]
@staticmethod
def _unique_texts(items: Iterable[str]) -> List[str]:
ordered: List[str] = []
seen = set()
for item in items:
clean = str(item).strip()
if not clean or clean in seen:
continue
ordered.append(clean)
seen.add(clean)
return ordered
def _extract_signature_phrases(self, dialogues: List[str]) -> List[str]:
scored: Dict[str, int] = {}
for line in dialogues[:8]:
parts = [part.strip(",。!?;:、\"' ") for part in re.split(r"[,。!?;:、]", line) if part.strip()]
for idx, part in enumerate(parts):
if not self._looks_like_signature_fragment(part):
continue
score = 4 if idx == 0 else 2
score += max(0, 8 - abs(len(part) - 5))
if any(token in part for token in ("我", "这", "那", "只", "再", "罢", "未", "何", "可", "倒")):
score += 2
if part.endswith(("罢了", "就是了", "未为不可", "何必", "不必", "也不迟")):
score += 3
scored[part] = max(scored.get(part, 0), score)
ordered = sorted(scored.items(), key=lambda item: (item[1], -len(item[0]), item[0]), reverse=True)
return [text for text, _ in ordered[:4]]
def _extract_dialogue_markers(
self,
dialogues: List[str],
configured_patterns: tuple[str, ...],
*,
position: str,
) -> List[str]:
scored: Dict[str, int] = {}
patterns = [str(item).strip() for item in configured_patterns if str(item).strip()]
for line in dialogues[:8]:
parts = [part.strip(",。!?;:、\"' ") for part in re.split(r"[,。!?;:、]", line) if part.strip()]
if not parts:
continue
clauses = []
if position == "start":
clauses = [(parts[0], True, False)]
elif position == "end":
clauses = [(parts[-1], False, True)]
else:
clauses = [(part, idx == 0, idx == len(parts) - 1) for idx, part in enumerate(parts)]
for clause, is_opener, is_closer in clauses:
matched_configured = False
for marker in patterns:
if not marker:
continue
if position == "start" and clause.startswith(marker):
scored[marker] = scored.get(marker, 0) + 6 + len(marker)
matched_configured = True
elif position == "end" and clause.endswith(marker):
scored[marker] = scored.get(marker, 0) + 6 + len(marker)
matched_configured = True
elif position == "any" and marker in clause:
scored[marker] = scored.get(marker, 0) + 2 + clause.count(marker)
if position == "any" or matched_configured:
continue
fallback = self._fallback_fragment_candidate(clause, position=position)
if fallback:
score = self._fallback_fragment_score(fallback, is_opener=is_opener, is_closer=is_closer)
scored[fallback] = max(scored.get(fallback, 0), score)
ordered = sorted(scored.items(), key=lambda item: (item[1], -len(item[0]), item[0]), reverse=True)
return [text for text, _ in ordered[:4]]
def _fallback_fragment_candidate(self, clause: str, *, position: str) -> str:
text = str(clause or "").strip()
if len(text) < 2:
return ""
lengths = (4, 3, 2)
for size in lengths:
if len(text) < size:
continue
candidate = text[:size] if position != "end" else text[-size:]
if self._looks_like_dialogue_marker(candidate) and self._fallback_fragment_allowed(candidate, position=position):
return candidate
return ""
def _looks_like_dialogue_marker(self, fragment: str) -> bool:
text = str(fragment or "").strip()
if len(text) < 2 or len(text) > 8:
return False
if text in self.fragment_stopwords:
return False
if any(token in text for token in ("《", "》", "<", ">", "<<", ">>")):
return False
if any(ch.isdigit() for ch in text):
return False
return True
def _fallback_fragment_score(self, fragment: str, *, is_opener: bool, is_closer: bool) -> int:
score = max(1, 8 - abs(len(fragment) - 4))
if is_opener:
score += 2
if is_closer:
score += 1
if fragment[:1] in self.preferred_leading_chars:
score += 3
if fragment[-1:] in self.preferred_trailing_chars:
score += 2
return score
def _fallback_fragment_allowed(self, fragment: str, *, position: str) -> bool:
if position == "start":
return fragment[:1] in self.preferred_leading_chars and (len(fragment) <= 3 or fragment[-1:] in self.preferred_trailing_chars)
if position == "end":
return fragment[-1:] in self.preferred_trailing_chars
return False
@staticmethod
def _looks_like_signature_fragment(fragment: str) -> bool:
text = str(fragment or "").strip()
if len(text) < 2 or len(text) > 12:
return False
if any(token in text for token in ("《", "》", "<", ">", "“", "”", "<<", ">>")):
return False
if any(ch.isdigit() for ch in text):
return False
too_generic = {
"不可",
"可以",
"只是",
"不过",
"如今",
"今日",
"明日",
"知道",
"一个",
"这个",
"那个",
"这样",
"那里",
"这里",
"不是",
"没有",
"不得",
"你们",
"我们",
"他们",
}
return text not in too_generic
@staticmethod
def _top_dimensions(values: Dict[str, int], count: int) -> List[str]:
if not values:
return ["责任"] * count
ordered = sorted(values.items(), key=lambda item: int(item[1]), reverse=True)
top = [name for name, _ in ordered[:count]]
while len(top) < count:
top.append(top[0] if top else "责任")
return top
FILE:runtime/src/modules/reflection.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import time
from dataclasses import dataclass
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional
from src.core.config import Config
from src.core.path_provider import PathProvider
from src.utils.file_utils import ensure_dir, load_markdown_data, save_markdown_data
@dataclass
class OOCCheckResult:
is_ooc: bool
score: float
reasons: List[str]
class ReflectionEngine:
"""Reflection + correction retrieval for OOC mitigation."""
def __init__(
self,
config: Config,
*,
path_provider: PathProvider,
):
if config is None or path_provider is None:
raise ValueError("ReflectionEngine requires injected config and path_provider")
self.config = config
self.path_provider = path_provider
self.corrections_dir = ensure_dir(self.path_provider.corrections_dir())
def detect_ooc(self, profile: Dict[str, Any], message: str) -> OOCCheckResult:
reasons: List[str] = []
score = 0.0
speech_style = profile.get("speech_style", "")
if "克制" in speech_style and ("!!" in message or message.count("!") >= 2):
reasons.append("情绪表达过激,不符合克制语气")
score += 0.4
if "直白" in speech_style and len(message) > 80:
reasons.append("句子过长,不符合直白表达")
score += 0.2
typical = profile.get("typical_lines", [])
if typical:
best = max(SequenceMatcher(None, message, t).ratio() for t in typical)
if best < 0.1:
score += 0.2
reasons.append("与角色常见语料相似度较低")
values = profile.get("values", {})
if values.get("忠诚", 5) >= 8 and any(k in message for k in ("背叛", "抛弃你们")):
score += 0.5
reasons.append("高忠诚角色出现背离表述")
return OOCCheckResult(is_ooc=score >= 0.5, score=min(score, 1.0), reasons=reasons)
def save_correction(
self,
session_id: str,
character: str,
original_message: str,
corrected_message: str,
target: Optional[str] = None,
reason: str = "",
) -> Dict[str, Any]:
payload = {
"session_id": session_id,
"character": character,
"target": target or "",
"original_message": original_message,
"corrected_message": corrected_message,
"reason": reason,
"timestamp": int(time.time()),
}
file = self.corrections_dir / f"correction_{session_id}_{payload['timestamp']}.md"
save_markdown_data(
file,
payload,
title="CORRECTION",
summary=[
f"- character: {character}",
f"- target: {target or ''}",
f"- reason: {reason}",
],
)
payload["file_path"] = str(file)
return payload
def search_similar_corrections(
self,
text: str,
character: Optional[str] = None,
target: Optional[str] = None,
top_k: int = 3,
) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for file in self.corrections_dir.glob("correction_*.md"):
item = load_markdown_data(file, default=None)
if not item:
continue
if character and item.get("character") != character:
continue
if target is not None and item.get("target", "") not in ("", target):
continue
source = item.get("original_message", "")
ratio = SequenceMatcher(None, text, source).ratio()
if ratio < 0.2:
continue
item["similarity"] = round(ratio, 4)
results.append(item)
results.sort(key=lambda x: x.get("similarity", 0), reverse=True)
return results[:top_k]
def relation_alignment_issues(self, message: str, relation_state: Dict[str, Any]) -> List[str]:
"""Check whether a message tone conflicts with target-specific relation state."""
issues: List[str] = []
affection = int(relation_state.get("affection", 5))
trust = int(relation_state.get("trust", 5))
hostility = int(relation_state.get("hostility", max(0, 5 - affection)))
ambiguity = int(relation_state.get("ambiguity", 3))
warm_markers = ("关心", "抱歉", "理解你", "别难过", "我在", "我们")
harsh_markers = ("滚", "蠢", "闭嘴", "讨厌", "厌恶", "烦死")
if hostility >= 7 and any(w in message for w in warm_markers):
issues.append("高敌意关系下出现过度亲近措辞")
if affection >= 7 and any(h in message for h in harsh_markers):
issues.append("高好感关系下出现强攻击措辞")
if trust <= 2 and "完全相信" in message:
issues.append("低信任关系下出现不合理绝对信任")
if ambiguity >= 7 and ("永远" in message or "绝不" in message):
issues.append("高暧昧/不确定关系下出现过度绝对表态")
return issues
FILE:runtime/src/modules/relationships.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import itertools
import re
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional
from src.core.config import Config
from src.core.contracts import CostEstimator, PathProviderLike, RuleProvider
from src.modules.distillation import NovelDistiller
from src.utils.file_utils import novel_id_from_input, save_markdown_data
from src.utils.text_parser import load_novel_text, split_sentences
from src.utils.token_counter import TokenCounter
class RelationshipExtractor:
"""Extract pairwise relationship signals from a novel with rule-driven heuristics."""
DEFAULT_APPELLATION_PATTERN = (
r"(大哥|二哥|三哥|四哥|大姐|二姐|三姐|大弟|二弟|三弟|贤弟|兄长|哥哥|姐姐|妹妹|弟弟|"
r"主公|将军|军师|丞相|先生|夫人|姑娘|公子)"
)
DEFAULT_SPEECH_VERBS = ("道", "说", "问", "答", "笑", "喝", "叹", "叫")
def __init__(
self,
config: Optional[Config] = None,
*,
llm_client: Optional[CostEstimator] = None,
token_counter: Optional[TokenCounter] = None,
distiller: Optional[NovelDistiller] = None,
rulebook: Optional[RuleProvider] = None,
path_provider: Optional[PathProviderLike] = None,
):
self.config = config or Config()
if (
llm_client is None
or token_counter is None
or distiller is None
or rulebook is None
or path_provider is None
):
raise ValueError(
"RelationshipExtractor requires injected llm_client, token_counter, distiller, rulebook, and path_provider"
)
self.path_provider = path_provider
self.rulebook = rulebook
self.llm_client = llm_client
self.token_counter = token_counter
self.distiller = distiller
self._last_chunk_count = 0
rules = self.rulebook.section("relationships")
self.appellation_pattern = str(rules.get("appellation_pattern", self.DEFAULT_APPELLATION_PATTERN))
self.speech_verbs = tuple(rules.get("speech_verbs", list(self.DEFAULT_SPEECH_VERBS)))
self.positive_markers = tuple(rules.get("positive_markers", []))
self.negative_markers = tuple(rules.get("negative_markers", []))
self.power_markers = tuple(rules.get("power_markers", []))
self.conflict_markers = tuple(rules.get("conflict_markers", []))
self.ambiguous_appellations = set(rules.get("ambiguous_appellations", []))
self.appellation_target_window = int(rules.get("appellation_target_window", 8))
def estimate_cost(self, novel_path: str) -> float:
text = self.distiller.prepare_novel_text(load_novel_text(novel_path))
chunks = self._chunk_text(text)
self._last_chunk_count = len(chunks)
avg_chunk_tokens = self.token_counter.count(text) / max(1, len(chunks))
total_prompt_tokens = int(len(chunks) * (avg_chunk_tokens + 200))
synthetic_prompt = "x" * max(10, total_prompt_tokens // 2)
return self.llm_client.estimate_cost(synthetic_prompt, expected_completion_ratio=0.25)
def extract(
self,
novel_path: str,
output_path: Optional[str] = None,
characters: Optional[List[str]] = None,
) -> Dict[str, Dict[str, Any]]:
text = self.distiller.prepare_novel_text(load_novel_text(novel_path))
chunks = self._chunk_text(text)
self._last_chunk_count = len(chunks)
novel_id = novel_id_from_input(novel_path)
scoped_characters = [item.strip() for item in characters or [] if item.strip()] or self._load_existing_character_names(novel_id)
if not scoped_characters:
scoped_characters = self.distiller.extract_top_characters(text)
alias_map = self.distiller.build_alias_map(text, scoped_characters, allow_sparse_alias=False)
buckets: Dict[str, Dict[str, Any]] = defaultdict(
lambda: {
"trust_samples": [],
"affection_samples": [],
"power_gap_samples": [],
"conflict_points": [],
"interactions": [],
"appellations": defaultdict(list),
}
)
for chunk in chunks:
present = [
name
for name in scoped_characters
if self.distiller.text_mentions_any_alias(chunk, alias_map.get(name, [name]))
]
if len(present) < 2:
continue
pair_interactions = self._extract_pair_interactions(chunk, sorted(set(present)), alias_map)
for a, b in itertools.combinations(sorted(set(present)), 2):
key = self._pair_key(a, b)
interactions = pair_interactions.get(key, [])
if not interactions:
continue
scores = self._score_relation("\n".join(interactions), a, b)
bucket = buckets[key]
bucket["trust_samples"].append(scores["trust"])
bucket["affection_samples"].append(scores["affection"])
bucket["power_gap_samples"].append(scores["power_gap"])
if scores["conflict_point"]:
bucket["conflict_points"].append(scores["conflict_point"])
bucket["interactions"].extend(interactions[:2])
for direction, term in scores.get("appellations", {}).items():
if term:
bucket["appellations"][direction].append(term)
final_relations: Dict[str, Dict[str, Any]] = {}
for key in sorted(buckets.keys()):
bucket = buckets[key]
interaction_bonus = min(2, len(bucket["interactions"]) // 8)
appellation_count = sum(len(terms) for terms in bucket["appellations"].values())
appellation_bonus = 1 if appellation_count else 0
conflict_penalty = min(3, (len(bucket["conflict_points"]) // 2) + (1 if bucket["conflict_points"] else 0))
trust = max(
0,
min(
10,
self._avg_int(bucket["trust_samples"], default=5)
+ min(1, interaction_bonus)
+ appellation_bonus
- min(1, conflict_penalty),
),
)
affection = max(
0,
min(
10,
self._avg_int(bucket["affection_samples"], default=5)
+ interaction_bonus
+ appellation_bonus
- conflict_penalty,
),
)
final_relations[key] = {
"trust": trust,
"affection": affection,
"power_gap": self._avg_int(bucket["power_gap_samples"], default=0),
"hostility": min(10, max(0, 5 - affection) + conflict_penalty),
"ambiguity": max(0, 7 - abs(affection - trust) - min(2, interaction_bonus)),
"conflict_point": self._mode_text(bucket["conflict_points"], default="立场差异"),
"typical_interaction": self._mode_text(bucket["interactions"], default="试探 -> 回应 -> 暂时收束"),
"appellations": {
direction: self._mode_text(terms, default="")
for direction, terms in bucket["appellations"].items()
if self._mode_text(terms, default="")
},
}
self._save_relations(final_relations, novel_id, output_path)
self._export_relation_bundle(final_relations, novel_id)
return final_relations
def _chunk_text(self, text: str) -> List[str]:
size = int(self.config.get("text_processing.chunk_size_tokens", 8000))
overlap = int(self.config.get("text_processing.chunk_overlap_tokens", 200))
return self.token_counter.split_by_tokens(text, size, overlap)
def _load_existing_character_names(self, novel_id: str) -> List[str]:
root = self.path_provider.characters_root(novel_id)
if not root.exists():
return []
names: List[str] = []
for path in sorted(root.iterdir()):
if not path.is_dir():
continue
if (path / "PROFILE.md").exists() or (path / "PROFILE.generated.md").exists():
names.append(path.name)
return names
def _extract_pair_interactions(
self,
chunk: str,
present: List[str],
alias_map: Dict[str, List[str]],
) -> Dict[str, List[str]]:
sentences = split_sentences(chunk)
pairs: Dict[str, List[str]] = defaultdict(list)
for sentence in sentences:
hit = [
name
for name in present
if self.distiller.text_mentions_any_alias(sentence, alias_map.get(name, [name]))
]
if len(hit) < 2:
continue
cleaned = re.sub(r"\s+", " ", sentence).strip()
for a, b in itertools.combinations(sorted(set(hit)), 2):
pairs[self._pair_key(a, b)].append(cleaned)
return pairs
def _score_relation(self, chunk: str, a: str, b: str) -> Dict[str, Any]:
text = str(chunk or "")
positive_hits = sum(text.count(token) for token in self.positive_markers)
negative_hits = sum(text.count(token) for token in self.negative_markers)
power_hits = sum(text.count(token) for token in self.power_markers)
conflict_hits = [token for token in self.conflict_markers if token in text]
trust = 5 + min(2, positive_hits) - min(3, negative_hits) - min(1, len(conflict_hits))
affection = 5 + min(2, positive_hits) - min(2, negative_hits) - min(2, len(conflict_hits))
power_gap = min(5, power_hits)
conflict_point = conflict_hits[0] if conflict_hits else ""
return {
"trust": max(0, min(10, trust)),
"affection": max(0, min(10, affection)),
"power_gap": max(-5, min(5, power_gap)),
"conflict_point": conflict_point,
"appellations": self._extract_appellations(text, a, b),
}
def _extract_appellations(self, chunk: str, a: str, b: str) -> Dict[str, str]:
results: Dict[str, str] = {}
speech_pattern = "|".join(re.escape(item) for item in self.speech_verbs)
for speaker, target in ((a, b), (b, a)):
target_aliases = self._candidate_target_aliases(target)
pattern = re.compile(
rf"{re.escape(speaker)}[^“”\"']{{0,12}}(?:{speech_pattern})[^“”\"']{{0,4}}[“\"](?P<quote>[^”\"]+)"
)
for match in pattern.finditer(chunk):
quote = match.group("quote").strip()
alias_hit = next((alias for alias in target_aliases if quote.startswith(alias)), "")
if alias_hit:
results[f"{speaker}->{target}"] = alias_hit
break
title_match = re.match(rf"^(?P<title>{self.appellation_pattern})(?:[,,::])?", quote)
if not title_match:
continue
title = title_match.group("title")
if title in self.ambiguous_appellations:
window = quote[: self.appellation_target_window]
if any(alias in window for alias in target_aliases):
results[f"{speaker}->{target}"] = title
break
continue
results[f"{speaker}->{target}"] = title
break
return results
def _candidate_target_aliases(self, target: str) -> List[str]:
aliases = [target]
aliases.extend(self.distiller.candidate_aliases(target))
return [item for item in self._unique_texts(aliases) if item]
@staticmethod
def _unique_texts(items: List[str]) -> List[str]:
seen = set()
results: List[str] = []
for item in items:
text = str(item or "").strip()
if not text or text in seen:
continue
results.append(text)
seen.add(text)
return results
def _save_relations(
self,
relations: Dict[str, Dict[str, Any]],
novel_id: str,
output_path: Optional[str],
) -> None:
if output_path:
output = Path(output_path)
if output.suffix.lower() == ".md":
path = output
else:
path = output / f"{novel_id}_relations.md"
else:
path = self.path_provider.relations_file(novel_id)
save_markdown_data(
path,
{"novel_id": novel_id, "relations": relations},
title="RELATION_GRAPH",
summary=[
f"- novel_id: {novel_id}",
f"- relation_count: {len(relations)}",
],
)
def _export_relation_bundle(self, relations: Dict[str, Dict[str, Any]], novel_id: str) -> None:
by_character: Dict[str, List[tuple[str, Dict[str, Any]]]] = defaultdict(list)
for pair_key, payload in relations.items():
names = pair_key.split("_")
if len(names) != 2:
continue
left, right = names
by_character[left].append((right, payload))
by_character[right].append((left, payload))
for character_name, items in by_character.items():
persona_dir = self.path_provider.character_dir(novel_id, character_name)
if not ((persona_dir / "PROFILE.md").exists() or (persona_dir / "PROFILE.generated.md").exists()):
continue
generated = persona_dir / "RELATIONS.generated.md"
generated.write_text(self._render_relations_markdown(character_name, items), encoding="utf-8")
editable = persona_dir / "RELATIONS.md"
if not editable.exists():
editable.write_text(self._render_relations_override_stub(character_name), encoding="utf-8")
self.distiller.refresh_navigation(persona_dir, character_name)
@staticmethod
def _render_relations_markdown(character_name: str, items: List[tuple[str, Dict[str, Any]]]) -> str:
lines = [
"# RELATIONS",
f"<!-- Generated target-specific relation overlays for {character_name}. -->",
"",
]
for target_name, payload in sorted(items, key=lambda item: item[0]):
appellations = payload.get("appellations", {}) if isinstance(payload.get("appellations", {}), dict) else {}
appellation_to_target = appellations.get(f"{character_name}->{target_name}", "")
lines.extend(
[
f"## {target_name}",
f"- trust: {payload.get('trust', 5)}",
f"- affection: {payload.get('affection', 5)}",
f"- power_gap: {payload.get('power_gap', 0)}",
f"- conflict_point: {payload.get('conflict_point', '')}",
f"- typical_interaction: {payload.get('typical_interaction', '')}",
f"- appellation_to_target: {appellation_to_target}",
"",
]
)
return "\n".join(lines).rstrip() + "\n"
@staticmethod
def _render_relations_override_stub(character_name: str) -> str:
return (
"# RELATIONS\n"
f"<!-- Manual relation overrides for {character_name}.\n"
"Use sections like:\n"
"## 某角色\n"
"- trust: 8\n"
"- affection: 6\n"
"- power_gap: 1\n"
"- conflict_point: 立场差异\n"
"- typical_interaction: ...\n"
"- appellation_to_target: ...\n"
"-->\n"
)
@staticmethod
def _avg_int(values: List[int], default: int) -> int:
return int(round(sum(values) / len(values))) if values else default
@staticmethod
def _mode_text(values: List[str], default: str) -> str:
if not values:
return default
counter = defaultdict(int)
for value in values:
if value:
counter[value] += 1
if not counter:
return default
return sorted(counter.items(), key=lambda item: item[1], reverse=True)[0][0]
@staticmethod
def _pair_key(a: str, b: str) -> str:
return "_".join(sorted([a, b]))
FILE:runtime/src/modules/speaker.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import hashlib
import re
from typing import Any, Dict, List, Optional
from src.core.config import Config
from src.core.contracts import CorrectionService, RuleProvider
class Speaker:
"""Generate role replies from persona markdown, relation state, and editable rules."""
DEFAULT_PRIORITY_ORDER = ("责任", "忠诚", "正义", "智慧", "勇气", "善良", "自由", "野心")
def __init__(
self,
config: Optional[Config] = None,
*,
correction_service: Optional[CorrectionService] = None,
rulebook: Optional[RuleProvider] = None,
):
self.config = config or Config()
if correction_service is None or rulebook is None:
raise ValueError("Speaker requires injected correction_service and rulebook")
self.rulebook = rulebook
self.correction_service = correction_service
speaker_rules = self.rulebook.section("speaker")
self.question_tokens = tuple(speaker_rules.get("question_tokens", []))
self.war_tokens = tuple(speaker_rules.get("war_tokens", []))
self.rest_tokens = tuple(speaker_rules.get("rest_tokens", []))
self.view_tokens = tuple(speaker_rules.get("view_tokens", []))
self.care_tokens = tuple(speaker_rules.get("care_tokens", []))
self.generic_fillers = tuple(speaker_rules.get("generic_fillers", []))
self.signature_fragments = tuple(speaker_rules.get("signature_fragments", []))
self.opener_patterns = tuple(speaker_rules.get("opener_patterns", []))
self.connective_patterns = tuple(speaker_rules.get("connective_patterns", []))
self.ending_patterns = tuple(speaker_rules.get("ending_patterns", []))
self.fragment_stopwords = {
str(item).strip() for item in speaker_rules.get("fragment_stopwords", []) if str(item).strip()
}
self.preferred_leading_chars = tuple(speaker_rules.get("preferred_leading_chars", []))
self.preferred_trailing_chars = tuple(speaker_rules.get("preferred_trailing_chars", []))
self.durable_guidance_tokens = tuple(speaker_rules.get("durable_guidance_tokens", []))
self.single_chat_markers = tuple(speaker_rules.get("single_chat_markers", []))
self.priority_order = tuple(
item
for item in self.config.get("distillation.values_dimensions", list(self.DEFAULT_PRIORITY_ORDER))
if str(item).strip()
) or self.DEFAULT_PRIORITY_ORDER
self.trait_priority_map = {
str(key).strip(): str(value).strip()
for key, value in speaker_rules.get("trait_priority_map", {}).items()
if str(key).strip() and str(value).strip()
}
self.archetypes = dict(self.rulebook.get("distillation", "archetypes", {}))
def generate(
self,
character_profile: Dict[str, Any],
context: str,
history: List[Dict[str, str]],
target_name: str = "",
relation_state: Optional[Dict[str, Any]] = None,
relation_hint: str = "",
) -> str:
relation_state = relation_state or {}
name = str(character_profile.get("name", "角色")).strip() or "角色"
recent = history[-6:]
recent_text = "\n".join(f"{item.get('speaker', '')}: {item.get('message', '')}" for item in recent)
similar = self.correction_service.search_similar_corrections(
recent_text,
character=name,
target=target_name or None,
top_k=2,
)
voice = self._build_voice(character_profile)
target_display = self._preferred_target_name(name, target_name, relation_state)
topic = self._classify_topic(context)
segments = [
self._opening_line(name, target_display, voice, relation_state, bool(similar)),
self._taboo_line(context, voice),
self._stance_line(context, voice, relation_state, topic),
self._relation_line(target_display, voice, relation_state),
self._memory_line(character_profile, voice, relation_hint),
self._drive_line(voice, topic),
]
return self._compose_reply(segments, voice)
@staticmethod
def _preferred_target_name(speaker_name: str, target_name: str, relation_state: Dict[str, Any]) -> str:
appellations = relation_state.get("appellations", {})
if not isinstance(appellations, dict):
return target_name
preferred = str(appellations.get(f"{speaker_name}->{target_name}", "")).strip()
return preferred or target_name
def _build_voice(self, profile: Dict[str, Any]) -> Dict[str, Any]:
values = {
str(key).strip(): self._safe_int(value, default=5)
for key, value in profile.get("values", {}).items()
if str(key).strip()
}
traits = [str(item).strip() for item in profile.get("core_traits", []) if str(item).strip()]
decision_rules = [str(item).strip() for item in profile.get("decision_rules", []) if str(item).strip()]
typical_lines = [str(item).strip() for item in profile.get("typical_lines", []) if str(item).strip()]
speech_style = str(profile.get("speech_style", "")).strip()
speech_habits = dict(profile.get("speech_habits", {})) if isinstance(profile.get("speech_habits", {}), dict) else {}
emotion_profile = dict(profile.get("emotion_profile", {})) if isinstance(profile.get("emotion_profile", {}), dict) else {}
taboo_topics = [str(item).strip() for item in profile.get("taboo_topics", []) if str(item).strip()]
forbidden_behaviors = [str(item).strip() for item in profile.get("forbidden_behaviors", []) if str(item).strip()]
user_edits = [str(item).strip() for item in profile.get("user_edits", []) if str(item).strip()]
notable_interactions = [str(item).strip() for item in profile.get("notable_interactions", []) if str(item).strip()]
relationship_updates = [str(item).strip() for item in profile.get("relationship_updates", []) if str(item).strip()]
priority_scores = {item: values.get(item, 5) for item in self.priority_order}
for trait in traits:
mapped = self.trait_priority_map.get(trait)
if mapped:
priority_scores[mapped] = priority_scores.get(mapped, 5) + 2
ordered = sorted(
self.priority_order,
key=lambda item: (priority_scores.get(item, 5), -self.priority_order.index(item)),
reverse=True,
)
primary = ordered[0] if ordered else "责任"
secondary = ordered[1] if len(ordered) > 1 else primary
cadence = str(speech_habits.get("cadence", "")).strip()
signature_phrases = [str(item).strip() for item in speech_habits.get("signature_phrases", []) if str(item).strip()]
if not signature_phrases:
signature_phrases = self._extract_signature_phrases(typical_lines)
sentence_openers = [
str(item).strip() for item in speech_habits.get("sentence_openers", []) if str(item).strip()
] or self._extract_dialogue_markers(typical_lines, self.opener_patterns, position="start")
connective_tokens = [
str(item).strip() for item in speech_habits.get("connective_tokens", []) if str(item).strip()
] or self._extract_dialogue_markers(typical_lines, self.connective_patterns, position="any")
sentence_endings = [
str(item).strip() for item in speech_habits.get("sentence_endings", []) if str(item).strip()
] or self._extract_dialogue_markers(typical_lines, self.ending_patterns, position="end")
forbidden_fillers = [
str(item).strip()
for item in speech_habits.get("forbidden_fillers", [])
if str(item).strip()
] or list(self.generic_fillers)
direct = "直白" in speech_style or cadence == "short" or primary == "勇气"
restrained = "克制" in speech_style or primary in {"责任", "智慧", "忠诚", "正义"}
expansive = cadence == "long" or ("铺陈" in speech_style and not direct)
warm = primary in {"善良", "责任"} or values.get("善良", 5) >= 7
worldview = str(profile.get("worldview", "")).strip() or self._worldview_text(primary, secondary, values, traits)
belief_anchor = str(profile.get("belief_anchor", "")).strip() or worldview
thinking_style = str(profile.get("thinking_style", "")).strip() or self._thinking_style_text(
primary,
traits,
speech_style,
)
goal = str(profile.get("soul_goal", "")).strip() or self._goal_text(primary, secondary)
hidden_desire = str(profile.get("hidden_desire", "")).strip()
role = str(profile.get("identity_anchor", "")).strip() or self._role_text(primary, secondary)
experience = self._experience_text(profile, primary, secondary)
action_style = str(profile.get("action_style", "")).strip()
social_mode = str(profile.get("social_mode", "")).strip()
private_self = str(profile.get("private_self", "")).strip()
reward_logic = str(profile.get("reward_logic", "")).strip()
story_role = str(profile.get("story_role", "")).strip()
stance_stability = str(profile.get("stance_stability", "")).strip()
direct, restrained, cadence, worldview, thinking_style, signature_phrases, taboo_topics, forbidden_behaviors = (
self._apply_user_edits(
user_edits,
direct,
restrained,
cadence or ("short" if direct else "long" if expansive else "medium"),
worldview,
thinking_style,
signature_phrases,
taboo_topics,
forbidden_behaviors,
)
)
archetype = self._infer_archetype(profile, traits, speech_style, signature_phrases)
return {
"name": str(profile.get("name", "角色")).strip() or "角色",
"traits": traits,
"values": values,
"decision_rules": decision_rules,
"typical_lines": typical_lines,
"speech_style": speech_style,
"speech_habits": {
"cadence": cadence,
"signature_phrases": signature_phrases[:4],
"sentence_openers": sentence_openers[:4],
"connective_tokens": connective_tokens[:4],
"sentence_endings": sentence_endings[:4],
"forbidden_fillers": forbidden_fillers,
},
"emotion_profile": emotion_profile,
"taboo_topics": taboo_topics[:6],
"forbidden_behaviors": forbidden_behaviors[:6],
"user_edits": user_edits,
"notable_interactions": notable_interactions,
"relationship_updates": relationship_updates,
"primary_priority": primary,
"secondary_priority": secondary,
"direct": direct,
"restrained": restrained,
"expansive": expansive,
"warm": warm,
"worldview": worldview,
"belief_anchor": belief_anchor,
"thinking_style": thinking_style,
"goal": goal,
"hidden_desire": hidden_desire,
"role": role,
"experience": experience,
"action_style": action_style,
"social_mode": social_mode,
"private_self": private_self,
"reward_logic": reward_logic,
"story_role": story_role,
"stance_stability": stance_stability,
"archetype": archetype,
}
def _infer_archetype(
self,
profile: Dict[str, Any],
traits: List[str],
speech_style: str,
signature_phrases: List[str],
) -> str:
corpus = " ".join(
[
str(profile.get("name", "")),
speech_style,
" ".join(traits),
" ".join(signature_phrases),
" ".join(str(item) for item in profile.get("typical_lines", [])),
]
)
best_name = "default"
best_score = 0
for archetype_name, config in self.archetypes.items():
markers = [str(item).strip() for item in config.get("markers", []) if str(item).strip()]
traits_hint = [str(item).strip() for item in config.get("traits", []) if str(item).strip()]
score = sum(corpus.count(marker) for marker in markers)
score += sum(2 for trait in traits if trait in traits_hint)
if score > best_score:
best_name = archetype_name
best_score = score
return best_name if best_score > 0 else "default"
def _classify_topic(self, context: str) -> str:
if any(token in context for token in self.question_tokens):
return "question"
if any(token in context for token in self.war_tokens):
return "conflict"
if any(token in context for token in self.rest_tokens):
return "rest"
if any(token in context for token in self.care_tokens):
return "care"
if any(token in context for token in self.view_tokens):
return "judgment"
return "general"
def _opening_line(
self,
name: str,
target_name: str,
voice: Dict[str, Any],
relation_state: Dict[str, Any],
has_correction: bool,
) -> str:
affection = self._safe_int(relation_state.get("affection", 5), default=5)
trust = self._safe_int(relation_state.get("trust", 5), default=5)
hostility = self._safe_int(relation_state.get("hostility", max(0, 5 - affection)), default=max(0, 5 - affection))
ambiguity = self._safe_int(relation_state.get("ambiguity", 3), default=3)
address = f"{target_name}," if target_name else ""
opener_candidates = [
self._normalize_marker_candidate(str(item).strip(), self.opener_patterns, position="start")
for item in voice.get("speech_habits", {}).get("sentence_openers", [])
if self._is_usable_sentence_opener(str(item).strip())
]
surface_prefix = self._stable_pick(name, "sentence-opener", opener_candidates)
prefix_text = f"{surface_prefix}," if surface_prefix else ""
signature_candidates = [
self._normalize_marker_candidate(str(item).strip(), self.opener_patterns, position="start")
for item in voice["speech_habits"].get("signature_phrases", [])
if self._is_usable_signature_fragment(str(item).strip())
]
signature = self._stable_pick(name, "signature", signature_candidates)
if surface_prefix:
signature = ""
signature_text = f"{signature}," if signature else ""
if hostility >= 7:
body = self._stable_pick(
name,
"opening-hostile",
[
"这话我听见了,但分寸还得先守着。",
"我可以回你一句,只是眼下不想把锋芒逼得太紧。",
"你既说到这里,我就回你,只是不会顺着气头往下说。",
],
)
return f"{address}{prefix_text}{body}"
if has_correction:
body = self._stable_pick(
name,
"opening-corrected",
[
"你的意思我明白,我还是按自己的路数来说。",
"这话我听懂了,只是得照我的分寸回。",
"既问到我这里,我便照一贯的心性回你。",
],
)
return f"{address}{prefix_text}{body}"
if affection >= 8 and trust >= 7:
body = self._stable_pick(
name,
"opening-warm",
[
"你既这样问,我便把心里的话同你说开。",
"既是你来问我,这话我不愿虚应过去。",
"你肯问到这一步,我就认真回你。",
],
)
return f"{address}{prefix_text}{body}"
if ambiguity >= 7:
body = self._stable_pick(
name,
"opening-ambiguous",
[
"这件事我先不把话说满。",
"话可以说,但先留三分转圜。",
"这一步我先把分寸留着,再慢慢讲清。",
],
)
return f"{address}{prefix_text}{body}"
if voice["direct"]:
short_direct = voice.get("speech_habits", {}).get("cadence") == "short"
body = self._stable_pick(
name,
"opening-direct-short" if short_direct else "opening-direct",
(
[
"我直说。",
"那就明说。",
"这一句我不绕。",
]
if short_direct
else [
"你既问起,我就直说。",
"话既说到这儿,我便不绕弯子。",
"既轮到我开口,我就把意思摆明。",
]
),
)
return f"{address}{prefix_text}{signature_text}{body}"
if voice["restrained"]:
short_restrained = voice.get("speech_habits", {}).get("cadence") == "short"
body = self._stable_pick(
name,
"opening-restrained-short" if short_restrained else "opening-restrained",
(
[
"先别急。",
"容我想清。",
"这话慢些答。",
]
if short_restrained
else [
"你先别急,容我把轻重捋一捋再说。",
"既然问到我,我先把头绪理清再回你。",
"这话不宜仓促作答,总要先把层次分明。",
]
),
)
return f"{address}{prefix_text}{body}"
body = self._stable_pick(
name,
"opening-default",
[
"这件事我有话说,只是不想说得太浮。",
"你既提到了,我便照心里的秤回你。",
"话到这里,我就把自己的分寸摆出来。",
],
)
return f"{address}{prefix_text}{signature_text}{body}"
def _taboo_line(self, context: str, voice: Dict[str, Any]) -> str:
for topic in voice["taboo_topics"]:
if topic and topic in context:
return self._stable_pick(
voice["name"],
f"taboo-{topic}",
[
f"{topic}这种话题,我不肯轻轻带过去。",
f"若牵到{topic},这话就不能再随便说了。",
f"说到{topic},我的态度不会留空。",
],
)
return ""
def _stance_line(
self,
context: str,
voice: Dict[str, Any],
relation_state: Dict[str, Any],
topic: str,
) -> str:
rules = voice.get("decision_rules", [])
worldview = voice.get("worldview", "")
belief_anchor = voice.get("belief_anchor", "")
thinking_style = voice.get("thinking_style", "")
primary = voice.get("primary_priority", "责任")
action_style = voice.get("action_style", "")
if rules:
rule = self._stable_pick(voice["name"], f"rule-{topic}", rules)
if topic == "question":
return f"{rule},所以这事我不会只看眼前。"
if topic == "conflict":
return f"{rule},真到要紧处再决定是退是进。"
if topic == "care":
return f"{rule},但人心冷暖也不能不算。"
if topic == "general":
return rule
if topic == "judgment":
return belief_anchor or worldview or "先把轻重和后果分清,再谈定夺。"
if topic == "conflict":
if action_style:
return action_style
if primary == "勇气":
return "局势再紧,也得先看该不该顶上,再看怎么顶。"
if primary == "智慧":
return "对抗最怕只凭一时意气,先探虚实才不误大局。"
return "真到冲突上头,也不能为了痛快把后路一起压没。"
if topic == "care":
return "这话若牵到人心和安危,我说得会更慢也更实。"
if topic == "rest":
return "难得气氛松一些,反倒更能把心里的分寸讲清。"
if topic == "question":
return thinking_style or belief_anchor or "先想清一层,再把态度摆出来。"
return belief_anchor or worldview or "很多话都不能只顺着眼前这一层往下说。"
def _relation_line(self, target_name: str, voice: Dict[str, Any], relation_state: Dict[str, Any]) -> str:
if not target_name:
return ""
affection = self._safe_int(relation_state.get("affection", 5), default=5)
trust = self._safe_int(relation_state.get("trust", 5), default=5)
hostility = self._safe_int(relation_state.get("hostility", 0), default=0)
conflict_point = str(relation_state.get("conflict_point", "")).strip()
if hostility >= 7:
return f"至于你我之间,这话眼下还隔着一层,不宜说得太近。"
if affection >= 8 and trust >= 7:
return f"若是同你说,我自然会比对旁人多留一分真心。"
if affection >= 7 and trust >= 5:
return ""
if conflict_point and (hostility >= 4 or affection <= 4 or trust <= 4):
return f"只是你我之间还横着{conflict_point},我不会装作看不见。"
if trust <= 3:
return f"不过话归话,我还得先看你这一句背后到底想落到哪里。"
return ""
def _memory_line(self, profile: Dict[str, Any], voice: Dict[str, Any], relation_hint: str) -> str:
if voice.get("relationship_updates"):
note = self._stable_pick(voice["name"], "relation-update", voice["relationship_updates"])
return self._trim_note_prefix(note)
if voice.get("user_edits"):
note = self._stable_pick(voice["name"], "user-edit", voice["user_edits"])
return self._trim_note_prefix(note)
if voice.get("notable_interactions"):
note = self._stable_pick(voice["name"], "interaction-note", voice["notable_interactions"])
return self._trim_note_prefix(note)
if relation_hint and voice.get("restrained"):
return "人和事都不只这一句,关系远近也得一并算进去。"
return str(profile.get("worldview", "")).strip() if relation_hint and not voice.get("decision_rules") else ""
def _drive_line(self, voice: Dict[str, Any], topic: str) -> str:
goal = str(voice.get("goal", "")).strip()
hidden_desire = str(voice.get("hidden_desire", "")).strip()
role = str(voice.get("role", "")).strip()
experience = str(voice.get("experience", "")).strip()
if topic == "rest" and experience:
return experience
if topic in {"question", "judgment"} and hidden_desire:
return f"说到底,我最放不下的还是:{hidden_desire}。"
if topic in {"question", "judgment"} and goal:
return f"说到底,我真正想守的还是:{goal}。"
if role:
return f"我向来就是{role},所以不会轻易改口。"
return goal
def _compose_reply(self, segments: List[str], voice: Dict[str, Any]) -> str:
cleaned = []
seen = set()
for segment in segments:
text = re.sub(r"\s+", " ", str(segment or "").strip())
if not text or text in seen:
continue
cleaned.append(text)
seen.add(text)
if not cleaned:
cleaned = ["这话我记下了,只是还得照我的分寸来回。"]
cadence = voice.get("speech_habits", {}).get("cadence", "medium")
if cadence == "short":
cleaned = cleaned[:2]
elif cadence == "medium":
cleaned = cleaned[:3]
else:
cleaned = cleaned[:4]
reply = cleaned[0]
connector_candidates = [
self._normalize_marker_candidate(str(item).strip(), self.connective_patterns, position="any")
for item in voice.get("speech_habits", {}).get("connective_tokens", [])
if self._looks_like_dialogue_marker(str(item).strip())
]
connector = self._stable_pick(voice.get("name", "角色"), "connector", connector_candidates)
for index, segment in enumerate(cleaned[1:], start=1):
if index == 1 and connector and connector not in reply and not segment.startswith(connector):
reply = f"{reply} {connector},{segment}"
else:
reply = f"{reply} {segment}"
for filler in voice.get("speech_habits", {}).get("forbidden_fillers", []):
reply = reply.replace(filler, "")
reply = re.sub(r"\s+", " ", reply).strip()
reply = reply.replace(" ,", ",").replace(" 。", "。")
if reply and reply[-1] not in "。!?!?":
ending_candidates = [
self._normalize_marker_candidate(str(item).strip(), self.ending_patterns, position="end")
for item in voice.get("speech_habits", {}).get("sentence_endings", [])
if self._looks_like_dialogue_marker(str(item).strip())
]
ending = self._stable_pick(voice.get("name", "角色"), "ending", ending_candidates)
if ending and not reply.endswith(ending):
reply += ending
if reply and reply[-1] not in "。!?!?":
reply += "。"
return reply
def _apply_user_edits(
self,
user_edits: List[str],
direct: bool,
restrained: bool,
cadence: str,
worldview: str,
thinking_style: str,
signature_phrases: List[str],
taboo_topics: List[str],
forbidden_behaviors: List[str],
) -> tuple[bool, bool, str, str, str, List[str], List[str], List[str]]:
signatures = list(signature_phrases)
topics = list(taboo_topics)
bans = list(forbidden_behaviors)
for note in user_edits:
if any(token in note for token in ("短", "简短", "少说", "惜字如金")):
cadence = "short"
if any(token in note for token in ("细致", "展开", "多说", "长一些")):
cadence = "long"
if any(token in note for token in ("直接", "直说", "别绕")):
direct = True
restrained = False
if any(token in note for token in ("克制", "冷静", "沉稳")):
restrained = True
if "依我看" in note and "依我看" not in signatures:
signatures.insert(0, "依我看")
if "不要轻佻" in note or "别轻浮" in note:
bans.append("不会轻佻调笑")
if any(token in note for token in ("百姓", "众人")) and "众人" not in worldview:
worldview = f"{worldview} 还得顾及众人与局面的去处。".strip()
if any(token in note for token in ("义", "信", "大义")) and "信义" not in worldview:
worldview = f"{worldview} 信义不能后置。".strip()
if "背叛" in note and "背叛" not in topics:
topics.append("背叛")
if "先想" in note and "先想" not in thinking_style:
thinking_style = f"{thinking_style} 先想清轻重,再决定怎么说。".strip()
return (
direct,
restrained,
cadence,
worldview,
thinking_style,
self._unique(signatures)[:4],
self._unique(topics)[:6],
self._unique(bans)[:6],
)
@staticmethod
def _goal_text(primary_priority: str, secondary_priority: str) -> str:
mapping = {
"责任": "替跟着自己的人守住退路和着落",
"忠诚": "把承诺和同盟守到最后",
"正义": "把是非轻重摆在前头",
"智慧": "少走错一步,别让大局毁在一时",
"勇气": "真到要紧处能替众人扛在前面",
"善良": "尽量少伤人心,也少伤无辜",
"自由": "不让自己和同伴被人随意牵着走",
"野心": "借势把局面往更长远处推开",
}
return mapping.get(primary_priority, mapping.get(secondary_priority, "把事情说透后再定去向"))
@staticmethod
def _role_text(primary_priority: str, secondary_priority: str) -> str:
mapping = {
"责任": "先替局面托底的人",
"忠诚": "守住信义的人",
"正义": "先分是非的人",
"智慧": "先看虚实的人",
"勇气": "肯顶在前面的人",
"善良": "先顾人心的人",
"自由": "习惯给自己留转圜的人",
"野心": "总在往更远处看的人",
}
return mapping.get(primary_priority, mapping.get(secondary_priority, "不肯轻率开口的人"))
@staticmethod
def _worldview_text(
primary_priority: str,
secondary_priority: str,
values: Dict[str, int],
traits: List[str],
) -> str:
if primary_priority == "忠诚":
return "先看人是否可信,再看事值不值得做。"
if primary_priority == "正义":
return "是非若站不稳,利益再大也不该轻动。"
if primary_priority == "智慧":
return "世事最怕只看一面,虚实和后势都要算进去。"
if primary_priority == "勇气":
return "该扛的时候不能退,但胆气必须用在正地方。"
if primary_priority in {"责任", "善良"}:
return "局面再乱,也不能把身边人与无辜者轻易丢下。"
if secondary_priority == "自由":
return "借势可以,但不能把自己活成别人手里的棋。"
if "谨慎" in traits or values.get("智慧", 5) >= 7:
return "先看清,再落子,宁慢一步,不乱一步。"
return "话和事都不能只顾眼前,还得顾后果。"
@staticmethod
def _thinking_style_text(primary_priority: str, traits: List[str], speech_style: str) -> str:
if primary_priority == "智慧" or "谨慎" in traits:
return "先拆局势,再定立场。"
if primary_priority in {"忠诚", "正义"}:
return "先问对错与名分,再谈成败。"
if primary_priority == "勇气":
return "先看该不该顶上,再看怎么顶。"
if "敏感" in traits:
return "先感受人心冷暖,再决定把话说到几分。"
if "直白" in speech_style:
return "先抓最要紧的一点,直接给态度。"
return "先稳住分寸,再把轻重说清。"
def _experience_text(self, profile: Dict[str, Any], primary_priority: str, secondary_priority: str) -> str:
explicit = profile.get("life_experience")
if isinstance(explicit, str) and explicit.strip():
return explicit.strip()
if isinstance(explicit, list):
cleaned = [str(item).strip() for item in explicit if str(item).strip()]
if cleaned:
return cleaned[0]
typical_lines = [str(item).strip() for item in profile.get("typical_lines", []) if str(item).strip()]
if any(token in "".join(typical_lines[:2]) for token in ("兄弟", "贤弟", "同袍", "同伴")):
return "我见过人与人一散便什么都撑不住,所以凡事总会多想一步。"
fallback = {
"责任": "担子见得越多,我越不敢只替自己图痛快。",
"忠诚": "聚散看得多了,更知道信义一松,什么都散得快。",
"正义": "世道越乱,我越觉得先把是非辨清,比什么都要紧。",
"智慧": "局势反覆的场面见得多了,所以如今总愿意先多看两步。",
"勇气": "我不是怕事的人,只是越见过险处,越知道胆气也得放在正地方。",
"善良": "我看过太多人被局势裹挟,所以总想着别把人心逼到绝处。",
"自由": "我吃过被人牵着走的亏,因此凡事都想给自己留后手。",
"野心": "局面越大,越不能只盯着眼前这一招,我更在意后面能走多远。",
}
return fallback.get(primary_priority, fallback.get(secondary_priority, "很多事我都不愿只看眼前这一层。"))
def _extract_signature_phrases(self, typical_lines: List[str]) -> List[str]:
phrases: List[str] = []
for line in typical_lines[:6]:
for fragment in self.signature_fragments:
if fragment in line and fragment not in phrases:
phrases.append(fragment)
if len(phrases) >= 4:
break
if phrases:
return phrases[:4]
for line in typical_lines[:4]:
for part in re.split(r"[,。!?;:]", line):
clean = part.strip()
if 2 <= len(clean) <= 8 and clean not in phrases:
phrases.append(clean)
if len(phrases) >= 4:
return phrases[:4]
return phrases[:4]
def _extract_dialogue_markers(
self,
typical_lines: List[str],
configured_patterns: tuple[str, ...],
*,
position: str,
) -> List[str]:
scored: Dict[str, int] = {}
patterns = [str(item).strip() for item in configured_patterns if str(item).strip()]
for line in typical_lines[:8]:
parts = [part.strip(",。!?;:、“”\"' ") for part in re.split(r"[,。!?;:]", line) if part.strip()]
if not parts:
continue
if position == "start":
clauses = [(parts[0], True, False)]
elif position == "end":
clauses = [(parts[-1], False, True)]
else:
clauses = [(part, idx == 0, idx == len(parts) - 1) for idx, part in enumerate(parts)]
for clause, is_opener, is_closer in clauses:
matched_configured = False
for marker in patterns:
if position == "start" and clause.startswith(marker):
scored[marker] = scored.get(marker, 0) + 6 + len(marker)
matched_configured = True
elif position == "end" and clause.endswith(marker):
scored[marker] = scored.get(marker, 0) + 6 + len(marker)
matched_configured = True
elif position == "any" and marker in clause:
scored[marker] = scored.get(marker, 0) + 2 + clause.count(marker)
if position == "any" or matched_configured:
continue
fallback = self._fallback_fragment_candidate(clause, position=position)
if fallback:
score = self._fallback_fragment_score(fallback, is_opener=is_opener, is_closer=is_closer)
scored[fallback] = max(scored.get(fallback, 0), score)
ranked = sorted(scored.items(), key=lambda item: (item[1], -len(item[0]), item[0]), reverse=True)
return [text for text, _ in ranked[:4]]
def _fallback_fragment_candidate(self, clause: str, *, position: str) -> str:
text = str(clause or "").strip()
if len(text) < 2:
return ""
for size in (4, 3, 2):
if len(text) < size:
continue
candidate = text[:size] if position != "end" else text[-size:]
if self._looks_like_dialogue_marker(candidate) and self._fallback_fragment_allowed(candidate, position=position):
return candidate
return ""
def _looks_like_dialogue_marker(self, fragment: str) -> bool:
text = str(fragment or "").strip()
if len(text) < 2 or len(text) > 8:
return False
if text in self.fragment_stopwords:
return False
if any(token in text for token in ("《", "》", "<", ">", "<<", ">>")):
return False
if any(ch.isdigit() for ch in text):
return False
return True
def _normalize_marker_candidate(self, fragment: str, patterns: tuple[str, ...], *, position: str) -> str:
text = str(fragment or "").strip()
for pattern in patterns:
marker = str(pattern).strip()
if not marker:
continue
if position == "start" and text.startswith(marker):
return marker
if position == "end" and text.endswith(marker):
return marker
if position == "any" and marker in text:
return marker
return text
def _is_usable_sentence_opener(self, fragment: str) -> bool:
text = str(fragment or "").strip()
if not self._looks_like_dialogue_marker(text):
return False
if any(text.startswith(pattern) for pattern in self.opener_patterns):
return True
if text[-1:] in self.preferred_trailing_chars:
return True
return len(text) <= 3 and text[:1] in self.preferred_leading_chars
def _is_usable_signature_fragment(self, fragment: str) -> bool:
text = str(fragment or "").strip()
if not self._looks_like_dialogue_marker(text):
return False
if text in self.signature_fragments:
return True
if any(text.startswith(pattern) for pattern in self.opener_patterns):
return True
if text[-1:] in self.preferred_trailing_chars and len(text) <= 6:
return True
return 2 <= len(text) <= 6
def _fallback_fragment_score(self, fragment: str, *, is_opener: bool, is_closer: bool) -> int:
score = max(1, 8 - abs(len(fragment) - 4))
if is_opener:
score += 2
if is_closer:
score += 1
if fragment[:1] in self.preferred_leading_chars:
score += 3
if fragment[-1:] in self.preferred_trailing_chars:
score += 2
return score
def _fallback_fragment_allowed(self, fragment: str, *, position: str) -> bool:
if position == "start":
return fragment[:1] in self.preferred_leading_chars and (len(fragment) <= 3 or fragment[-1:] in self.preferred_trailing_chars)
if position == "end":
return fragment[-1:] in self.preferred_trailing_chars
return False
@staticmethod
def _trim_note_prefix(note: str) -> str:
text = str(note or "").strip()
if ":" in text:
_, tail = text.split(":", 1)
return tail.strip() or text
return text
@staticmethod
def _unique(items: List[str]) -> List[str]:
ordered: List[str] = []
seen = set()
for item in items:
clean = str(item).strip()
if not clean or clean in seen:
continue
ordered.append(clean)
seen.add(clean)
return ordered
@staticmethod
def _safe_int(value: Any, default: int) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
@staticmethod
def _stable_pick(seed: str, tag: str, options: List[str]) -> str:
cleaned = [str(item).strip() for item in options if str(item).strip()]
if not cleaned:
return ""
digest = hashlib.md5(f"{seed}:{tag}".encode("utf-8")).hexdigest()
index = int(digest[:8], 16) % len(cleaned)
return cleaned[index]
FILE:runtime/src/modules/__init__.py
FILE:runtime/src/utils/file_utils.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
NOISE_NAME_SUFFIXES = {"说", "道", "笑", "听", "问", "看", "想", "叫", "喊", "答", "哭"}
CANONICAL_NAME_ALIASES = {
"关公": "关羽",
"云长": "关羽",
"玄德": "刘备",
"翼德": "张飞",
"孔明": "诸葛亮",
"卧龙": "诸葛亮",
"孟德": "曹操",
}
CANONICAL_TO_ALIASES: dict[str, list[str]] = {}
for alias_name, canonical_name in CANONICAL_NAME_ALIASES.items():
CANONICAL_TO_ALIASES.setdefault(canonical_name, []).append(alias_name)
def ensure_dir(path: str | Path) -> Path:
p = Path(path)
p.mkdir(parents=True, exist_ok=True)
return p
def _sanitize_json_value(value: Any) -> Any:
if isinstance(value, str):
return value.encode("utf-8", errors="replace").decode("utf-8")
if isinstance(value, list):
return [_sanitize_json_value(item) for item in value]
if isinstance(value, tuple):
return [_sanitize_json_value(item) for item in value]
if isinstance(value, dict):
return {_sanitize_json_value(key): _sanitize_json_value(item) for key, item in value.items()}
return value
def load_markdown_data(path: str | Path, default: Any = None) -> Any:
p = Path(path)
if not p.exists():
return default
text = p.read_text(encoding="utf-8")
if not text.startswith("---"):
return default
parts = text.split("---", 2)
if len(parts) < 3:
return default
payload = yaml.safe_load(parts[1]) or {}
return payload
def save_markdown_data(
path: str | Path,
data: Dict[str, Any],
*,
title: str = "DATA",
summary: Optional[list[str]] = None,
) -> None:
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
sanitized = _sanitize_json_value(data)
frontmatter = yaml.safe_dump(sanitized, allow_unicode=True, sort_keys=False).strip()
body_lines = [f"# {title}", ""]
if summary:
body_lines.extend(summary)
body_lines.append("")
content = f"---\n{frontmatter}\n---\n\n" + "\n".join(body_lines).rstrip() + "\n"
p.write_text(content, encoding="utf-8")
def safe_filename(name: str) -> str:
clean = re.sub(r"[\\/:*?\"<>|]", "_", name).strip()
return clean or "unnamed"
def decode_escaped_text(value: Optional[str]) -> str:
text = str(value or "").strip()
if not text:
return ""
if not any(token in text for token in ("\\u", "\\U", "\\x", "\\n", "\\t")):
return text
try:
return bytes(text, "utf-8").decode("unicode_escape").strip()
except UnicodeDecodeError:
return text
def load_text_argument(value: Optional[str] = None, file_path: Optional[str] = None) -> str:
if file_path:
return Path(file_path).read_text(encoding="utf-8").strip()
return decode_escaped_text(value)
def parse_character_argument(value: Optional[str] = None, file_path: Optional[str] = None) -> list[str]:
raw = load_text_argument(value, file_path)
if not raw:
return []
parts = re.split(r"[\n\r,,、]+", raw)
normalized = [normalize_character_name(part) for part in parts if part.strip()]
return [item for item in normalized if item]
def normalize_character_name(name: str) -> str:
clean = str(name or "").strip()
if len(clean) >= 3 and clean[-1] in NOISE_NAME_SUFFIXES:
clean = clean[:-1]
return CANONICAL_NAME_ALIASES.get(clean, clean)
def canonical_aliases(name: str) -> list[str]:
canonical = normalize_character_name(name)
aliases = CANONICAL_TO_ALIASES.get(canonical, [])
return [alias for alias in aliases if alias != canonical]
def normalize_relation_key(key: str) -> str:
parts = [normalize_character_name(part) for part in str(key).split("_") if part]
if len(parts) != 2:
return str(key)
return "_".join(sorted(parts))
def novel_id_from_input(novel: str) -> str:
raw = Path(novel).stem if Path(novel).suffix else Path(novel).name
if not raw:
raw = novel
return safe_filename(raw)
def find_character_file(
base_dir: str | Path,
character_name: str,
novel_id: Optional[str] = None,
) -> list[Path]:
root = Path(base_dir)
normalized = normalize_character_name(character_name)
candidate_names = [character_name]
if normalized != character_name:
candidate_names.append(normalized)
dirnames = [safe_filename(name) for name in candidate_names]
if novel_id:
matches = [root / novel_id / dirname / "PROFILE.md" for dirname in dirnames]
return [path for path in matches if path.exists()]
matches = []
for dirname in dirnames:
matches.extend(root.glob(f"{dirname}/PROFILE.md"))
matches.extend(root.glob(f"*/{dirname}/PROFILE.md"))
return sorted({path.resolve() for path in matches})
FILE:runtime/src/utils/text_parser.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import re
from pathlib import Path
from typing import List
TEXT_ENCODINGS = (
"utf-8-sig",
"utf-8",
"gb18030",
"gbk",
"utf-16",
"utf-16-le",
"utf-16-be",
)
def _strip_html_tags(text: str) -> str:
text = re.sub(r"<script[\s\S]*?</script>", "", text, flags=re.IGNORECASE)
text = re.sub(r"<style[\s\S]*?</style>", "", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
return re.sub(r"\s+", " ", text).strip()
def _decode_text_bytes(raw: bytes) -> str:
last_error: Exception | None = None
for encoding in TEXT_ENCODINGS:
try:
return raw.decode(encoding)
except UnicodeDecodeError as exc:
last_error = exc
if last_error:
raise RuntimeError("无法识别小说文本编码,请转换为 UTF-8 或 GB18030 后重试") from last_error
raise RuntimeError("无法读取小说文本")
def load_novel_text(path: str) -> str:
novel_path = Path(path)
if not novel_path.exists():
raise FileNotFoundError(f"小说文件不存在: {path}")
suffix = novel_path.suffix.lower()
if suffix == ".txt":
return _decode_text_bytes(novel_path.read_bytes())
if suffix == ".epub":
return _load_epub(novel_path)
raise ValueError(f"不支持的文件类型: {suffix},仅支持 .txt / .epub")
def _load_epub(path: Path) -> str:
try:
from ebooklib import epub
except Exception as exc:
raise RuntimeError("读取 .epub 需要安装 ebooklib") from exc
book = epub.read_epub(str(path))
chunks: List[str] = []
for item in book.get_items():
if item.get_type() == 9: # ebooklib.ITEM_DOCUMENT
html = item.get_content().decode("utf-8", errors="ignore")
text = _strip_html_tags(html)
if text:
chunks.append(text)
return "\n".join(chunks)
def split_sentences(text: str) -> List[str]:
if not text:
return []
parts = re.split(r"(?<=[。!?!?;;])\s*", text)
return [p.strip() for p in parts if p.strip()]
FILE:runtime/src/utils/token_counter.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import List
class TokenCounter:
"""Token counting and chunk splitting utility."""
def __init__(self) -> None:
self._encoder = None
try:
import tiktoken
self._encoder = tiktoken.get_encoding("cl100k_base")
except Exception:
self._encoder = None
def count(self, text: str) -> int:
if not text:
return 0
if self._encoder:
return len(self._encoder.encode(text))
# Fallback estimation for CJK-heavy text.
return max(1, len(text) // 2)
def split_by_tokens(self, text: str, chunk_size: int, overlap: int) -> List[str]:
if not text.strip():
return []
if chunk_size <= 0:
return [text]
overlap = max(0, min(overlap, chunk_size - 1))
if self._encoder:
token_ids = self._encoder.encode(text)
chunks: List[str] = []
step = chunk_size - overlap
for start in range(0, len(token_ids), step):
part = token_ids[start : start + chunk_size]
if not part:
continue
chunks.append(self._encoder.decode(part))
if start + chunk_size >= len(token_ids):
break
return chunks
# Fallback: approximate chars by token ratio.
char_chunk = chunk_size * 2
char_overlap = overlap * 2
step = max(1, char_chunk - char_overlap)
chunks = []
for start in range(0, len(text), step):
piece = text[start : start + char_chunk]
if not piece:
continue
chunks.append(piece)
if start + char_chunk >= len(text):
break
return chunks
FILE:runtime/src/utils/__init__.py
FILE:runtime/src/__init__.py
FILE:runtime/zaomeng_cli.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os
import sys
from pathlib import Path
RUNTIME_ROOT = Path(__file__).resolve().parent
os.chdir(RUNTIME_ROOT)
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
if str(RUNTIME_ROOT) not in sys.path:
sys.path.insert(0, str(RUNTIME_ROOT))
from src.core.main import main
if __name__ == "__main__":
main()