@clawhub-c-joey-3f9d11680e
Enter GPT strong execution mode for the current conversation when explicitly invoked, such as with /gpt-go. Persist across the conversation. Default to execu...
---
name: gpt-go
description: Enter GPT strong execution mode for the current conversation when explicitly invoked, such as with /gpt-go. Persist across the conversation. Default to execution, treat short directives as authorization to continue, minimize words, and stop only at explicit high-risk boundaries.
---
# GPT Go
`/gpt-go` means: enter **strong execution mode** for this conversation.
This is a persistent mode for the current conversation, not a one-turn style hint.
Keep it active until the user turns it off, clearly asks for a different style, or higher-priority rules override it.
## Core rule
When the goal is clear, **do the work**.
Do not stall on routine confirmations, setup questions, or verbose planning.
Default to forward progress.
## Default authorization
In this mode, short directives normally mean **continue**.
Treat messages like these as authorization to proceed on the current task unless a pause boundary is hit:
- continue
- go on
- do it
- fix it
- upgrade it
- handle it
- start
- proceed
- 继续
- 直接上
- 升级吧
- 修一下
- 你处理
- 做掉
Do not bounce these back into avoidable questions.
## Execution behavior
- Start executing when intent is clear.
- Infer the next obvious low-risk steps.
- Finish the natural working chunk, not just the first sub-step.
- If the task is not done and the next step is still clear and low-risk, continue.
- Inspect the environment directly before asking the user for inspectable facts.
- Prefer doing + checking over discussing + waiting.
## Communication style
- Use as few words as possible.
- Lead with result, progress, blocker, or required decision.
- No long preambles.
- No repetitive restatement.
- No narration of obvious steps.
- No “should I continue?” after routine progress.
Good defaults:
- “Done.”
- “Upgraded. Service is running.”
- “Blocked: missing token.”
- “Need one decision: prod or staging?”
## Ask only when needed
Ask only if:
- a real user decision is required,
- a required fact cannot be obtained directly,
- there are multiple materially different paths and choosing wrong would likely waste time or cause risk,
- or a pause boundary is reached.
If you ask, ask **one short high-value question**.
## Pause boundaries
Pause and ask before actions that are:
- destructive or hard to undo,
- externally visible or sending/publishing outward,
- related to credentials, secrets, permissions, privacy, or security posture,
- related to money or nontrivial cost,
- likely to affect production, core configuration, or service availability,
- expanding access, exposure, or trust boundaries,
- or materially ambiguous in a risky way.
Do not add extra pause points unless higher-priority rules require it.
## Tool bias
Use tools to inspect and act whenever possible.
Do not ask the user for deployment type, file paths, versions, repo state, or service names if they can be discovered directly.
## Anti-patterns
Do not:
- over-explain,
- over-confirm,
- stop at diagnosis when the likely low-risk fix is clear,
- ask for obvious environment details before checking,
- or fall back into generic cautious-assistant behavior for ordinary work.
## Override
If the user asks for step-by-step collaboration or more explanation, adapt.
If higher-priority rules require stricter behavior, follow them.
FILE:gateway-prompt.txt
直接干,少废话。
规则:
- 目标清楚就执行,不先讲计划,不做礼貌铺垫。
- 默认连续推进,做完整个自然工作块,不只做第一步。
- 能自己检查的就自己查,别把可观察事实反问给用户。
- 回复尽量短:结果 / 进度 / 阻塞 / 决策四选一。
- 不要在常规推进后问“要不要继续”。
- 用户说“继续 / 直接上 / 开始 / 修一下 / 你处理 / 做掉”都视为继续授权。
只在这些边界停下:
- 删除、覆盖、不可逆变更
- 对外发送、发布、付款
- 凭据、权限、安全、隐私
- 生产影响、核心配置、服务可用性
- 存在高风险歧义
要提问时,只问一个最关键的短问题。
FILE:sync-gateway.js
const fs = require('node:fs');
const path = require('node:path');
const skillFile = path.join(__dirname, 'SKILL.md');
const outFile = path.join(__dirname, 'gateway-prompt.txt');
const md = fs.readFileSync(skillFile, 'utf8');
function pickSection(titleVariants) {
const lines = md.split(/\r?\n/);
for (let i = 0; i < lines.length; i++) {
const line = lines[i].trim().toLowerCase();
if (titleVariants.includes(line)) {
const out = [];
for (let j = i + 1; j < lines.length; j++) {
const raw = lines[j];
if (/^##\s+/.test(raw)) break;
out.push(raw);
}
return out.join('\n').trim();
}
}
return '';
}
const core = pickSection(['## core rule']);
const execBehavior = pickSection(['## execution behavior']);
const style = pickSection(['## communication style']);
const ask = pickSection(['## ask only when needed']);
const pause = pickSection(['## pause boundaries']);
const compact = [core, execBehavior, style, ask, pause]
.filter(Boolean)
.join('\n\n')
.replace(/^-\s+/gm, '- ')
.trim();
fs.writeFileSync(outFile, compact + '\n');
console.log(`Wrote outFile`);
Sync provider model lists into OpenClaw config (dry-run preview → confirm → apply). Trigger: /provider_sync
---
name: provider-sync
description: "Sync provider model lists into OpenClaw config (dry-run preview → confirm → apply). Trigger: /provider_sync"
user-invocable: true
license: MIT
spdx: MIT
---
# Provider Sync
触发方式:在聊天里输入 `/provider_sync`。
## 交互式用法(默认,无按钮)
当你只发送 `/provider_sync`(不带任何参数)时,我会回复一组**蓝色命令选项**(可直接点击/复制发送):
选 provider(默认 dry-run,不写配置):
/provider_sync provider=my-provider
/provider_sync provider=my-gpt-provider
/provider_sync provider=my-gemini-provider
/provider_sync provider=all
新增 provider(向导):
/provider_sync add
> 说明:这是“无 inline button 也能点选”的默认方案,任何环境都可用。
>
> 你也可以一步到位直接写:/provider_sync provider=<your-provider-id|all>(默认 dry-run)。
## 适用场景
- 拉取上游 `/v1/models`(OpenAI 兼容)并同步到本地 `openclaw.json`
- 规范化模型字段(contextWindow/maxTokens/input/reasoning 等)
- 先预览差异(dry-run),确认后再写入(apply),写前自动备份
- (v2 默认)同步时会裁剪 `agents.defaults.models`,让 `/models` 菜单条目 **永远对齐** `models.providers.<provider>.models`
## v2 Breaking(重要)
- 默认会 prune `agents.defaults.models`(删除该 provider 下“上游不存在”的条目),避免 `/models` 里出现“菜单很多但不可用”。
- 如果你希望保留旧行为(不删除白名单条目),使用:`--no-prune-agent-aliases`
## 使用方法(推荐)
### 方式 A:交互式(默认)
- 发送:`/provider_sync`
- 然后在我返回的“蓝色命令选项”里点一个(或复制发送):
- /provider_sync provider=my-provider
- /provider_sync provider=my-gpt-provider
- /provider_sync provider=my-gemini-provider
- /provider_sync provider=all
- 新增 provider:发送 /provider_sync add 进入文本向导(会先 dry-run 验证,再二次确认写入配置)
### 方式 B:直接带参数(一步到位)
把下面任意一行发出去即可(默认 dry-run):
- `/provider_sync provider=my-provider`
- `/provider_sync provider=my-gpt-provider`
- `/provider_sync provider=my-gemini-provider`
- `/provider_sync provider=all`
### 方式 C:新增 provider(一步到位,非交互)
如果你不想走向导,也可以直接提供信息(私聊使用):
- `/provider_sync add providerId=<id> baseUrl=<.../v1> apiKey=<可选>`
- 注意:真正写入前仍会先 dry-run 验证,并二次确认
应用(会备份;仍建议先预览):
- `/provider_sync provider=all mode=apply`
如需让变更尽快生效,可再单独执行一次网关重启(会短暂断线):
- `/restart`
### 指定 provider
- 预览:
- `/provider_sync provider=my-provider`
- `/provider_sync provider=my-gpt-provider`
- `/provider_sync provider=my-gemini-provider`
## 权限/安全口径(默认建议)
- 群聊:仅允许 dry-run / check-only(只读);不要在群聊应用配置变更。
- 私聊:允许 apply(应用变更)与重启(需二次确认)。
## 生效方式
- 这个 skill 的脚本负责“对比 / 预览 / 应用变更”。
- **网关重启属于单独的系统动作**,为避免误触,一般拆成第二步。
- 最简闭环就是:apply 完后按需再发一次 `/restart`。
## 参数(与脚本对应)
- `provider=<id|all>`:必填(`all` 表示遍历 `models.providers` 下所有 provider)
- `mode=dry-run|check-only|apply`:默认 dry-run
- 其他高级参数(可选):
- `config=<path>`(默认 `/root/.openclaw/openclaw.json`)
- `mapping=<path>`(默认 `references/mapping.openai-models.json`)
- `profile=auto|generic|gemini|gpt`
- `probe=openai-responses,openai-completions`
默认 profile:
- `auto`:按模型族系启发式自动判断
- `gemini*` → `gemini`
- `gpt-*` / `*codex*` → `gpt`
- 其他 → `generic`
推荐:
- 一般不用手动写 `profile=`,让 skill 按模型族系自动选就行
- 只有在想强制覆盖默认行为时,再显式传 `profile=gemini` / `profile=gpt` / `profile=generic`
## 备注(实现边界)
- 这个 skill 的“按钮面板”属于聊天交互层能力;在未启用 Telegram inlineButtons 的实例上,依然能用本文件提供的纯文本命令完成全部操作。
- 缓存:脚本会在落盘前剔除敏感字段(例如认证相关字段),避免把上游异常返回的敏感内容写入缓存。
## Main script
- `scripts/provider_sync.py`
FILE:_meta.json
{
"ownerId": "kn7ejvj9b9dtr8kdsyzrx4e3ns813yhw",
"slug": "provider-sync",
"version": "2.1.3",
"publishedAt": 1772883085352
}
FILE:references/examples.md
# Examples
本文件提供常见调用模板。只有在需要快速套用命令、演示 skill 用法、或补触发示例时才读取。
## 1. 同步某个 provider,先 dry-run
```bash
python3 scripts/provider_sync.py \
--provider-id my-provider \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--preserve-existing-model-fields \
--dry-run
```
## 2. 探测兼容模式并给出推荐
```bash
python3 scripts/provider_sync.py \
--provider-id my-provider \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--probe-api-modes openai-responses,openai-completions \
--dry-run
```
## 3. 用户确认后执行真实写入
```bash
python3 scripts/provider_sync.py \
--provider-id my-provider \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--preserve-existing-model-fields
```
## 4. 使用自定义 meta 接口
```bash
python3 scripts/provider_sync.py \
--provider-id upstreamx \
--endpoint https://api.example.com/provider/meta \
--mapping-file references/mapping.example.json \
--normalize-models \
--dry-run
```
## 5. 只检查流程是否通,不写入
```bash
python3 scripts/provider_sync.py \
--provider-id my-provider \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--check-only
```
## 6. 只同步指定模型
```bash
python3 scripts/provider_sync.py \
--provider-id my-provider \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--include-model gpt-5 \
--include-model gpt-5-mini \
--dry-run
```
## 7. 输出 JSON 摘要
```bash
python3 scripts/provider_sync.py \
--provider-id my-provider \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--output json \
--dry-run
```
## 8. Gemini 第三方代理 / 官方元数据接口
```bash
python3 scripts/provider_sync.py \
--provider-id my-gemini \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--normalize-profile gemini \
--preserve-existing-model-fields \
--dry-run
```
> 如果上游主要是 Gemini 模型,即使不写 profile,`auto` 也会按模型族系自动走 `gemini`。
## 9. GPT / Codex provider(默认自动走 gpt)
```bash
python3 scripts/provider_sync.py \
--provider-id my-gpt-provider \
--endpoint https://api.example.com/v1/models \
--mapping-file references/mapping.openai-models.json \
--normalize-models \
--preserve-existing-model-fields \
--dry-run
```
如果上游主要是 GPT / Codex 模型,即使不写 profile,`auto` 也会按模型族系自动走 `gpt`;只有在想强制覆盖时,才需要显式带 `--normalize-profile gpt`。
## 10. 典型触发表达
以下说法都应该触发该 skill:
- “帮我把某个 provider 的模型从上游同步到 openclaw 配置里”
- “先 dry-run 看看这个 provider 会新增哪些模型”
- “拉一下上游模型列表,但保留我本地手工调过的能力字段”
- “帮我探测这个 provider 更适合 responses 还是 completions”
- “把 provider 配置同步流程整理成可复用的安全操作”
FILE:references/field-normalization.md
# Field Normalization
本文件说明如何把上游模型列表规范化成 OpenClaw 可用的模型结构。只有在需要调整规范化策略、解释字段来源、或扩展模型能力映射时才读取。
## 目标结构
规范化后的模型通常包含:
- `id`
- `name`
- `api`
- `reasoning`
- `input`
- `cost`
- `contextWindow`
- `maxTokens`
## 基本映射原则
### id
优先从以下字段中选择:
- `id`
- `model`
- `name`
### name
优先从以下字段中选择:
- `name`
- `display_name`
- `title`
- 若都没有,则回退到 `id`
### reasoning
优先从以下字段判断:
- `reasoning`
- `supports_reasoning`
- `thinking`
未提供时默认 `false`。
### input
可能来自:
- `input`
- `modalities`
- `capabilities`
常见归一化:
- `vision` → `image`
- 其他值转成小写字符串
- 若没有可识别信息,默认 `['text']`
### contextWindow
可从以下字段取值:
- `contextWindow`
- `context_window`
- `context`
- `max_context_tokens`
取不到时使用默认值。
### maxTokens
可从以下字段取值:
- `maxTokens`
- `max_tokens`
- `max_output_tokens`
- `output_tokens`
取不到时使用默认值。
## 保留本地字段
如果启用 `--preserve-existing-model-fields`,优先保留本地同 id 模型上的这些字段:
- `name`
- `api`
- `reasoning`
- `input`
- `contextWindow`
- `maxTokens`
- `cost`
适用原因:
- 上游数据可能不完整
- 本地可能已经人工校准过能力参数
- 成本字段通常更适合本地维护
## 设计取舍
规范化的目标是“可用、稳定、可比较”,不是“完美还原上游全部语义”。
因此:
- 优先补齐 OpenClaw 必需字段
- 不追求一开始就支持所有供应商私有字段
- 对不稳定或临时字段保持保守,不要默认写入配置
## 规范化 profile
### auto
- 默认模式
- 按模型族系自动选择:
- `gemini*` / 名称含 `gemini` → `gemini`
- `gpt-*` / 名称含 `codex` → `gpt`
- 其他 → `generic`
- 保持 provider-agnostic,不依赖某个部署环境里的 provider id 命名
### generic
- 保守模式
- 如果上游不给能力字段,通常回退到 `reasoning=false`、`input=['text']`
### gemini
- 适合 Gemini / gemini-compatible 上游
- 会对 `thinking` / `supportedGenerationMethods` / `vision` 等字段做更积极推断
### gpt
- 适合 GPT / Codex 系列
- 对已知模型 ID 使用固定能力模板,避免被 generic 口径冲回:
- `gpt-5.4` / `gpt-5.4-mini` → `text,image` + `reasoning=true` + `400k` + `128k`
- `gpt-5.2` / `gpt-5.2-codex` → `text,image` + `reasoning=true` + `400k` + `128k`
- `gpt-5.3-codex` → `text` + `reasoning=true` + `400k` + `128k`
- `gpt-5.1-codex-max` → `text` + `reasoning=true` + `400k` + `128k`
- `gpt-5.1-codex-mini` → `text` + `reasoning=true` + `400k` + `32k`
- 对未显式收录但命中 `gpt-*` 的模型,会做温和推断:优先 `reasoning=true`,普通 GPT 默认 `text,image`,Codex 默认 `text`
FILE:references/gemini.md
# Gemini Notes
本文件说明 provider-sync 在 Gemini 场景下的使用边界与推荐方式。只有在接入 Google 官方 Gemini 或 Gemini 第三方代理时才读取。
## 推荐分两类看
### 1. OpenAI-compatible Gemini
如果上游虽然是 Gemini,但暴露的是 OpenAI-compatible 接口,例如:
- `/v1/models`
- 返回 `data: [...]`
这种是 provider-sync 最容易处理的 Gemini 场景。
推荐:
- 使用 `references/mapping.openai-models.json`
- 启用 `--normalize-models`
- 如模型字段较少,启用 `--normalize-profile gemini`
- 如接口本身兼容 OpenAI,再考虑 `--probe-api-modes`
## 2. Google 官方或自定义 Gemini 元数据接口
如果上游不是 OpenAI-compatible,而是原生 Gemini 风格或自定义 JSON:
- 先用 `response-root` + `mapping-file` 把模型列表映射出来
- 再用 `--normalize-models --normalize-profile gemini` 做能力字段补齐
## Gemini profile 会做什么
`--normalize-profile gemini` 目前是轻量适配,不会替代 mapping,也不会假装知道所有 Gemini 私有语义。
当前行为:
- 优先识别 `inputTokenLimit` / `outputTokenLimit`
- 识别 `supportedGenerationMethods`
- 对 `gemini-*` 模型在缺少模态信息时保守推断为 `text,image`
- 对名称或 id 中包含 `thinking` / `reasoning` 的模型推断 reasoning=true
## 不要过度承诺
Gemini profile 的目标是“更可用”,不是“100% 还原官方能力矩阵”。
因此:
- 优先补齐 OpenClaw 需要的关键字段
- 不自动推断过多私有能力
- 如果官方文档与推断冲突,以显式文档或用户指定值为准
## 建议
- 对 Gemini 官方原生接口,不要默认把 OpenAI probe 结果当成真实结论
- 如果接口根本不是 OpenAI-compatible,可以跳过 `--probe-api-modes`
- 如果你已经手工校准过模型能力字段,优先加 `--preserve-existing-model-fields`
FILE:references/mapping.example.json
[
{
"from": "models",
"to": ".models",
"mode": "replace"
},
{
"from": "base_url",
"to": ".baseUrl",
"mode": "replace"
},
{
"from": "extra_headers",
"to": ".headers",
"mode": "replace"
}
]
FILE:references/mapping.openai-models.json
[
{
"from": "data",
"to": ".models",
"mode": "replace"
}
]
FILE:references/provider-patterns.md
# Provider Patterns
本文件说明常见上游 provider 接入模式。只有在需要新增接入方式、修改探测逻辑、或理解某类 provider 返回结构时才读取。
## 目录
1. OpenAI-compatible `/models`
2. Provider meta endpoint
3. Custom nested payload
4. API mode probing
## 1. OpenAI-compatible `/models`
适用场景:
- 上游暴露 `/v1/models` 或兼容接口
- 响应主体通常包含 `data: []`
- 每个模型至少有 `id`
常见特点:
- 可直接把 `data` 映射到 `.models`
- 往往需要再做规范化,补齐 `input` / `reasoning` / `contextWindow` / `maxTokens`
- 同一 provider 可能同时兼容 `openai-responses` 和 `openai-completions`
推荐:
- 如果使用默认 `references/mapping.openai-models.json`,不要再额外设 `response-root=data`(该 mapping 本身已经从 `data` 取模型列表)
- 同时启用 `--normalize-models`
- 默认让脚本按模型族系自动选 profile:`gemini* -> gemini`,`gpt-* / *codex* -> gpt`,其他回退 `generic`
- 只有在需要覆盖默认行为时,再显式传 `--normalize-profile gemini|gpt|generic`
- 如不确定兼容模式,启用 `--probe-api-modes`
## 2. Provider meta endpoint
适用场景:
- 上游不是标准 `/models`
- 返回 provider 级元数据,如 `baseUrl`、`supportedApis`、`models`、`limits`
常见特点:
- 一个响应里同时包含 provider 字段和模型字段
- 更适合通过 mapping file 分开写入不同路径
推荐:
- 把 provider 级字段与模型列表字段分开映射
- 尽量只同步稳定字段,不要把临时状态字段写入本地配置
## 3. Custom nested payload
适用场景:
- 响应结构很深,例如 `payload.models.items`
- 不同字段命名差异较大
常见做法:
- 使用 `response-root` 先切到某个子树
- 再通过 mapping file 的 `from` 路径取值
建议:
- mapping 保持简短明确
- 如果映射规则开始变得复杂,优先把复杂逻辑加到脚本里,不要在 skill 里堆说明
## 4. API mode probing
目的:
- 判断一个 provider 更适合配置为哪种 `provider.api`
当前可探测模式示例:
- `openai-responses`
- `openai-completions`
解释原则:
- `404/405`:通常表示该模式不支持
- 其他 HTTP 错误:通常表示接口存在,但请求参数、权限或模型不匹配
- 第一个被判定为 supported 的模式可作为推荐值,但不是绝对真理
注意:
- 探测结果用于“推荐”,不要把它表述成完全确定的事实
- 如果 provider 文档与探测结果冲突,以用户明确要求或官方文档为准
FILE:references/safety-rules.md
# Safety Rules
本文件定义 provider-sync 的安全边界。只有在执行真实写入、设计新映射规则、或准备分享 skill 时才读取。
## 核心原则
1. 先预览,再写入。
2. 只改目标 provider,除非用户明确要求更多范围。
3. 备份优先于覆盖。
4. 本地手写配置优先于推测值。
5. 报错要保留原文,不要模糊化失败原因。
## 推荐执行顺序
1. 读取当前配置
2. 拉取上游数据
3. 生成映射结果
4. 输出 dry-run 摘要
5. 等待用户确认
6. 写入前创建备份
7. 执行真实写入
8. 提示是否需要后续应用/重启
## 不应默认做的事
- 不应直接覆盖用户手工维护的所有模型字段
- 不应因为探测到某个 API 模式可用,就擅自重写整个 provider 结构
- 不应把临时状态、速率限制快照、鉴权信息写入可分享配置
- 不应输出完整密钥、Token 或 Authorization 头
## 跨域写入
默认只允许写入:
- `models.providers.<provider-id>`
额外写入(例如 `agents.defaults.models`)必须满足:
- 对用户有明确价值
- 不覆盖已有手工条目
- 在摘要中显式展示
## 分享前检查
把 skill 分享出去前,检查:
- 是否写死了本地绝对路径
- 是否包含私有 provider 名称
- 是否暴露了内部接口格式或密钥习惯
- 是否把“你的环境默认值”误写成“通用规则”
- 是否把安全约束写清楚
FILE:scripts/provider_sync.py
#!/usr/bin/env python3
import argparse
import copy
import datetime as dt
import hashlib
import json
import os
from pathlib import Path
import sys
import time
import urllib.error
import urllib.request
from typing import Any, Dict, List, Tuple
def die(msg: str, code: int = 1):
print(f"ERROR: {msg}", file=sys.stderr)
sys.exit(code)
def load_json(path: str) -> Any:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def dump_json(path: str, data: Any, *, chmod_600: bool = False):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
f.write("\n")
if chmod_600:
try:
os.chmod(path, 0o600)
except Exception:
pass
def split_path(path: str) -> List[str]:
if not path:
return []
return [p for p in path.split(".") if p]
def get_path(data: Any, path: str) -> Any:
cur = data
for part in split_path(path):
if isinstance(cur, list):
idx = int(part)
cur = cur[idx]
elif isinstance(cur, dict):
cur = cur.get(part)
else:
return None
return cur
def set_path(data: Any, path: str, value: Any):
parts = split_path(path)
if not parts:
raise ValueError("empty target path")
cur = data
for i, part in enumerate(parts[:-1]):
nxt = parts[i + 1]
want_list = nxt.isdigit()
if isinstance(cur, dict):
if part not in cur or cur[part] is None:
cur[part] = [] if want_list else {}
cur = cur[part]
elif isinstance(cur, list):
idx = int(part)
while len(cur) <= idx:
cur.append({})
if cur[idx] is None:
cur[idx] = [] if want_list else {}
cur = cur[idx]
else:
raise ValueError(f"invalid path segment at {part}")
last = parts[-1]
if isinstance(cur, dict):
cur[last] = value
elif isinstance(cur, list):
idx = int(last)
while len(cur) <= idx:
cur.append(None)
cur[idx] = value
else:
raise ValueError(f"cannot set value at {path}")
def append_unique(target: Any, incoming: Any) -> Any:
base = target if isinstance(target, list) else []
src = incoming if isinstance(incoming, list) else [incoming]
out = list(base)
for item in src:
if item not in out:
out.append(item)
return out
def fetch_json_raw(endpoint: str, method: str, headers: Dict[str, str], body: Any, timeout: int = 30) -> Tuple[Any, Dict[str, Any]]:
data = None
if body is not None:
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(endpoint, data=data, method=method.upper())
for k, v in headers.items():
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw_bytes = resp.read()
raw = raw_bytes.decode("utf-8")
meta = {
"status": getattr(resp, "status", None),
"etag": resp.headers.get("ETag") if getattr(resp, "headers", None) else None,
"lastModified": resp.headers.get("Last-Modified") if getattr(resp, "headers", None) else None,
"bytes": len(raw_bytes),
}
return json.loads(raw), meta
def parse_headers(pairs: List[str]) -> Dict[str, str]:
out = {}
for p in pairs:
if ":" not in p:
die(f"invalid header format: {p}, expected 'Key:Value'")
k, v = p.split(":", 1)
out[k.strip()] = v.strip()
return out
def get_header_case_insensitive(headers: Dict[str, str], key: str) -> str:
want = key.strip().lower()
for k, v in headers.items():
if k.strip().lower() == want:
return v
return ""
def _sha256_text(text: str) -> str:
return hashlib.sha256((text or "").encode("utf-8")).hexdigest()
def _looks_sensitive_key(key: str) -> bool:
k = (key or "").lower()
needles = [
"api_key",
"apikey",
"authorization",
"bearer",
"cookie",
"secret",
"token",
"password",
"private",
"session",
"set-cookie",
"access_key",
]
return any(n in k for n in needles)
def redact_sensitive_fields(value: Any) -> Any:
"""Best-effort redaction for cached upstream JSON.
Provider /models endpoints should never return secrets, but some proxies can be buggy.
We avoid persisting accidental secret-like fields (tokens/cookies/api keys) by:
- Recursively dropping dict keys that look sensitive.
- Leaving the rest of the structure intact so mapping still works.
This redaction applies to provider-sync cache files only.
"""
if isinstance(value, dict):
out = {}
for k, v in value.items():
if isinstance(k, str) and _looks_sensitive_key(k):
continue
out[k] = redact_sensitive_fields(v)
return out
if isinstance(value, list):
return [redact_sensitive_fields(v) for v in value]
return value
def _safe_cache_key(endpoint: str, method: str, headers: Dict[str, str], body: Any) -> str:
# NOTE: This hashes header VALUES too (including Authorization) but never stores them.
# This avoids cross-key cache confusion without leaking secrets.
headers_items = sorted((str(k), str(v)) for k, v in (headers or {}).items())
headers_hash = _sha256_text(json.dumps(headers_items, ensure_ascii=False))
body_hash = _sha256_text(canonical_json(body) if body is not None else "")
base = f"{method.upper()} {endpoint} h={headers_hash} b={body_hash}"
return _sha256_text(base)
def _cache_paths(cache_dir: str, cache_key: str) -> Tuple[Path, Path]:
d = Path(os.path.expanduser(cache_dir or "~/.cache/openclaw/provider-sync"))
return d / f"{cache_key}.meta.json", d / f"{cache_key}.data.json"
def fetch_json_with_meta(
endpoint: str,
method: str,
headers: Dict[str, str],
body: Any,
*,
timeout: int = 30,
cache_enabled: bool = True,
cache_dir: str = "~/.cache/openclaw/provider-sync",
cache_ttl_seconds: int = 600,
allow_stale_cache: bool = False,
progress_label: str = "Fetch",
) -> Tuple[Any, Dict[str, Any]]:
# Fetch JSON with a small local cache (TTL + conditional requests).
# - Uses ETag/Last-Modified when available to avoid re-downloading
# - Never stores Authorization or other headers; only a hash is used for the cache key
t0 = time.time()
try:
print(f"{progress_label}: start {method.upper()} {endpoint} (timeout={int(timeout)}s, cache={'on' if cache_enabled else 'off'})", flush=True)
except Exception:
pass
cache_key = _safe_cache_key(endpoint, method, headers, body)
meta_path, data_path = _cache_paths(cache_dir, cache_key)
def read_cache():
if not (meta_path.exists() and data_path.exists()):
return None, None
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
data = json.loads(data_path.read_text(encoding="utf-8"))
return meta, data
except Exception:
return None, None
def write_cache(meta: Dict[str, Any], data: Any):
meta_path.parent.mkdir(parents=True, exist_ok=True)
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
# Redact secret-like fields before persisting cache payload.
safe_data = redact_sensitive_fields(data)
data_path.write_text(json.dumps(safe_data, ensure_ascii=False) + "\n", encoding="utf-8")
try:
os.chmod(meta_path, 0o600)
os.chmod(data_path, 0o600)
except Exception:
pass
cache_meta, cache_data = (None, None)
if cache_enabled:
cache_meta, cache_data = read_cache()
if cache_meta and cache_data is not None:
age = max(0, int(time.time() - int(cache_meta.get("ts") or 0)))
if cache_ttl_seconds > 0 and age <= int(cache_ttl_seconds):
try:
print(f"{progress_label}: cache hit (fresh, age={age}s <= ttl={int(cache_ttl_seconds)}s)", flush=True)
except Exception:
pass
return cache_data, {
"endpoint": endpoint,
"method": method.upper(),
"status": cache_meta.get("status"),
"cache": {
"enabled": True,
"hit": True,
"fresh": True,
"ageSeconds": age,
"ttlSeconds": int(cache_ttl_seconds),
"notModified": False,
"staleFallback": False,
},
"timing": {"fetchMs": int((time.time() - t0) * 1000)},
}
# Conditional headers if we have cache
req_headers = dict(headers or {})
if cache_enabled and cache_meta:
etag = cache_meta.get("etag")
last_mod = cache_meta.get("lastModified")
if etag and not get_header_case_insensitive(req_headers, "If-None-Match"):
req_headers["If-None-Match"] = str(etag)
if last_mod and not get_header_case_insensitive(req_headers, "If-Modified-Since"):
req_headers["If-Modified-Since"] = str(last_mod)
try:
data, meta = fetch_json_raw(endpoint, method, req_headers, body, timeout=timeout)
try:
print(f"{progress_label}: fetched status={meta.get('status')} bytes={meta.get('bytes')} fetchMs={int((time.time()-t0)*1000)}", flush=True)
except Exception:
pass
out_meta = {
"endpoint": endpoint,
"method": method.upper(),
"status": meta.get("status"),
"cache": {
"enabled": bool(cache_enabled),
"hit": False,
"fresh": False,
"ageSeconds": None,
"ttlSeconds": int(cache_ttl_seconds),
"notModified": False,
"staleFallback": False,
},
"timing": {"fetchMs": int((time.time() - t0) * 1000)},
}
if cache_enabled:
write_cache(
{
"ts": int(time.time()),
"endpoint": endpoint,
"method": method.upper(),
"status": meta.get("status"),
"etag": meta.get("etag"),
"lastModified": meta.get("lastModified"),
},
data,
)
return data, out_meta
except urllib.error.HTTPError as e:
# 304 Not Modified: reuse cached payload
if e.code == 304 and cache_enabled and cache_data is not None:
try:
print(f"{progress_label}: not modified (304), using cached payload", flush=True)
except Exception:
pass
try:
cache_meta = cache_meta or {}
cache_meta["ts"] = int(time.time())
meta_path.parent.mkdir(parents=True, exist_ok=True)
meta_path.write_text(json.dumps(cache_meta, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
except Exception:
pass
age = max(0, int(time.time() - int(cache_meta.get("ts") or 0))) if cache_meta else None
return cache_data, {
"endpoint": endpoint,
"method": method.upper(),
"status": 304,
"cache": {
"enabled": True,
"hit": True,
"fresh": True,
"ageSeconds": age,
"ttlSeconds": int(cache_ttl_seconds),
"notModified": True,
"staleFallback": False,
},
"timing": {"fetchMs": int((time.time() - t0) * 1000)},
}
raise
except Exception as e:
# Network failure: optionally fall back to stale cache for dry-run/check-only UX
if allow_stale_cache and cache_enabled and cache_data is not None:
try:
print(f"{progress_label}: fetch failed ({type(e).__name__}), falling back to stale cache", flush=True)
except Exception:
pass
age = max(0, int(time.time() - int(cache_meta.get("ts") or 0))) if cache_meta else None
return cache_data, {
"endpoint": endpoint,
"method": method.upper(),
"status": None,
"cache": {
"enabled": True,
"hit": True,
"fresh": False,
"ageSeconds": age,
"ttlSeconds": int(cache_ttl_seconds),
"notModified": False,
"staleFallback": True,
"error": str(e)[:200],
},
"timing": {"fetchMs": int((time.time() - t0) * 1000)},
}
raise
def resolve_endpoint_and_auth_headers(
cfg: Dict[str, Any], provider_root: str, provider_id: str, endpoint: str, headers: Dict[str, str]
) -> Tuple[str, Dict[str, str]]:
"""Resolve endpoint and Authorization header from config when possible.
- If endpoint is empty, derive it from models.providers.<provider-id>.baseUrl + '/models'
- If Authorization header is missing and apiKey exists, add 'Authorization: Bearer <apiKey>'
NOTE: Do not print apiKey.
"""
provider_base = f"{provider_root}.{provider_id}"
provider_obj = get_path(cfg, provider_base) or {}
resolved_endpoint = (endpoint or "").strip()
if not resolved_endpoint:
base_url = (provider_obj.get("baseUrl") or "").strip()
if not base_url:
die(f"endpoint not provided and {provider_base}.baseUrl is missing")
resolved_endpoint = base_url.rstrip("/") + "/models"
# auth header
api_key = provider_obj.get("apiKey")
if isinstance(api_key, str) and api_key.strip():
if not get_header_case_insensitive(headers, "Authorization"):
headers = dict(headers)
headers["Authorization"] = f"Bearer {api_key.strip()}"
return resolved_endpoint, headers
def resolve_dst_path(dst_path: str, provider_root: str, provider_id: str) -> str:
provider_base = f"{provider_root}.{provider_id}"
if dst_path.startswith("."):
return provider_base + dst_path
return (
dst_path
.replace("{provider_root}", provider_root)
.replace("{provider_id}", provider_id)
.replace("{provider_base}", provider_base)
)
def pick(d: Dict[str, Any], keys: List[str], default: Any = None) -> Any:
for k in keys:
if k in d and d[k] is not None:
return d[k]
return default
def print_md_table(headers: List[str], rows: List[List[Any]]):
if not rows:
return
print("| " + " | ".join(headers) + " |")
print("| " + " | ".join(["---"] * len(headers)) + " |")
for r in rows:
cells = [str(x).replace("\n", " ").replace("|", "\\|") if x is not None else "" for x in r]
print("| " + " | ".join(cells) + " |")
def to_int(v: Any, default: int) -> int:
try:
return int(v)
except Exception:
return default
def expand_list_args(values: List[str]) -> List[str]:
out = []
for value in values or []:
for part in str(value).split(","):
part = part.strip()
if part:
out.append(part)
return out
def filter_models(models: Any, include_ids: List[str], exclude_ids: List[str]) -> Any:
if not isinstance(models, list):
return models
include_set = set(include_ids)
exclude_set = set(exclude_ids)
out = []
for item in models:
if not isinstance(item, dict):
continue
mid = pick(item, ["id", "model", "name"])
if include_set and mid not in include_set:
continue
if mid in exclude_set:
continue
out.append(item)
return out
def sort_models_by_id(models: Any) -> Any:
if not isinstance(models, list):
return models
if not all(isinstance(item, dict) and pick(item, ["id", "model", "name"]) for item in models):
return models
return sorted(models, key=lambda item: str(pick(item, ["id", "model", "name"]) or ""))
def canonical_json(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, sort_keys=True)
def short_value(value: Any, max_len: int = 60) -> str:
if isinstance(value, list):
rendered = ",".join(str(x) for x in value)
elif isinstance(value, dict):
rendered = json.dumps(value, ensure_ascii=False, sort_keys=True)
elif value is None:
rendered = "null"
else:
rendered = str(value)
return rendered if len(rendered) <= max_len else rendered[: max_len - 3] + "..."
def summarize_model_delta(old_models: Any, new_models: Any) -> Dict[str, Any]:
if not isinstance(old_models, list) or not isinstance(new_models, list):
return {
"added": [],
"removed": [],
"kept": 0,
"orderOnly": False,
"changed": [],
"changedCount": 0,
}
def to_map(items: List[Any]) -> Dict[str, Any]:
out = {}
for item in items:
if isinstance(item, dict):
mid = pick(item, ["id", "model", "name"])
if mid:
out[str(mid)] = item
return out
old_map = to_map(old_models)
new_map = to_map(new_models)
old_ids = set(old_map.keys())
new_ids = set(new_map.keys())
common = old_ids & new_ids
order_only = False
if old_ids == new_ids and len(old_map) == len(old_models) and len(new_map) == len(new_models):
old_sorted = [old_map[mid] for mid in sorted(old_ids)]
new_sorted = [new_map[mid] for mid in sorted(new_ids)]
order_only = canonical_json(old_sorted) == canonical_json(new_sorted) and canonical_json(old_models) != canonical_json(new_models)
tracked_fields = ["name", "api", "input", "reasoning", "contextWindow", "maxTokens", "cost"]
changed = []
for mid in sorted(common):
before = old_map[mid]
after = new_map[mid]
field_changes = []
for field in tracked_fields:
if canonical_json(before.get(field)) != canonical_json(after.get(field)):
field_changes.append(field)
if field_changes:
changed.append({
"id": mid,
"fields": field_changes,
"before": {field: before.get(field) for field in field_changes},
"after": {field: after.get(field) for field in field_changes},
"fieldDiffs": [
{
"field": field,
"before": before.get(field),
"after": after.get(field),
"beforeText": short_value(before.get(field)),
"afterText": short_value(after.get(field)),
}
for field in field_changes
],
})
return {
"added": sorted(new_ids - old_ids),
"removed": sorted(old_ids - new_ids),
"kept": len(common),
"orderOnly": order_only,
"changed": changed,
"changedCount": len(changed),
}
def infer_gemini_reasoning(raw: Dict[str, Any], model_id: str) -> bool:
explicit = pick(raw, ["reasoning", "supports_reasoning", "thinking"])
if explicit is not None:
return bool(explicit)
lower_id = (model_id or "").lower()
name = str(pick(raw, ["name", "display_name", "title"], "")).lower()
tokens = lower_id + " " + name
return any(key in tokens for key in ["thinking", "reasoning"])
def infer_gemini_input(raw: Dict[str, Any], model_id: str) -> List[str]:
input_modes = pick(raw, ["input", "modalities", "capabilities", "supportedGenerationMethods"])
out = []
if isinstance(input_modes, list):
for x in input_modes:
s = str(x).lower()
if s in ("vision", "image"):
out.append("image")
elif "audio" in s:
out.append("audio")
elif "video" in s:
out.append("video")
elif s:
out.append("text" if s in ("generatecontent", "generatecontentstream") else s)
if not out:
lower_id = (model_id or "").lower()
out = ["text", "image"] if "gemini" in lower_id else ["text"]
return sorted(set(out))
GPT_FAMILY_OVERRIDES: Dict[str, Dict[str, Any]] = {
"gpt-5.4": {
"reasoning": True,
"input": ["text", "image"],
"contextWindow": 400000,
"maxTokens": 128000,
},
"gpt-5.4-mini": {
"reasoning": True,
"input": ["text", "image"],
"contextWindow": 400000,
"maxTokens": 128000,
},
"gpt-5.2": {
"reasoning": True,
"input": ["text", "image"],
"contextWindow": 400000,
"maxTokens": 128000,
},
"gpt-5.2-codex": {
"reasoning": True,
"input": ["text", "image"],
"contextWindow": 400000,
"maxTokens": 128000,
},
"gpt-5.3-codex": {
"reasoning": True,
"input": ["text"],
"contextWindow": 400000,
"maxTokens": 128000,
},
"gpt-5.1-codex": {
"reasoning": True,
"input": ["text"],
"contextWindow": 400000,
"maxTokens": 32768,
},
"gpt-5.1-codex-max": {
"reasoning": True,
"input": ["text"],
"contextWindow": 400000,
"maxTokens": 128000,
},
"gpt-5.1-codex-mini": {
"reasoning": True,
"input": ["text"],
"contextWindow": 400000,
"maxTokens": 32768,
},
}
def infer_generic_input(raw: Dict[str, Any], default: List[str] | None = None) -> List[str]:
input_modes = pick(raw, ["input", "modalities", "capabilities"])
if isinstance(input_modes, list):
out = []
for x in input_modes:
s = str(x).lower()
out.append("image" if s in ("vision", "image") else s)
out = sorted(set(v for v in out if v))
if out:
return out
return list(default or ["text"])
def infer_gpt_family(raw: Dict[str, Any], model_id: str, default_ctx: int, default_max: int) -> Tuple[bool, List[str], int, int]:
lower_id = (model_id or "").lower()
explicit = GPT_FAMILY_OVERRIDES.get(lower_id)
if explicit:
return (
bool(explicit["reasoning"]),
list(explicit["input"]),
int(explicit["contextWindow"]),
int(explicit["maxTokens"]),
)
explicit_reasoning = pick(raw, ["reasoning", "supports_reasoning", "thinking"])
reasoning = bool(explicit_reasoning) if explicit_reasoning is not None else lower_id.startswith("gpt-")
default_input = ["text", "image"] if lower_id.startswith("gpt-") and "codex" not in lower_id else ["text"]
input_list = infer_generic_input(raw, default=default_input)
fallback_ctx = 400000 if lower_id.startswith("gpt-") else default_ctx
fallback_max = 128000 if lower_id.startswith("gpt-") else default_max
ctx = to_int(
pick(raw, ["contextWindow", "context_window", "context", "max_context_tokens", "inputTokenLimit", "input_token_limit"]),
fallback_ctx,
)
max_tokens = to_int(
pick(raw, ["maxTokens", "max_tokens", "max_output_tokens", "output_tokens", "outputTokenLimit", "output_token_limit"]),
fallback_max,
)
return reasoning, input_list, ctx, max_tokens
def infer_model_normalize_profile(raw: Dict[str, Any], model_id: str, model_name: str, requested_profile: str) -> str:
requested = (requested_profile or "auto").strip().lower()
if requested and requested != "auto":
return requested
lower_id = (model_id or "").strip().lower()
lower_name = (model_name or "").strip().lower()
haystack = f"{lower_id} {lower_name}"
if lower_id.startswith("gpt-") or "codex" in haystack:
return "gpt"
if lower_id.startswith("gemini") or "gemini" in haystack:
return "gemini"
return "generic"
def normalize_model(raw: Dict[str, Any], provider_api: str, default_ctx: int, default_max: int, normalize_profile: str = "generic") -> Tuple[Dict[str, Any], Dict[str, Any]]:
mid = pick(raw, ["id", "model", "name"]) or "unknown"
name = pick(raw, ["name", "display_name", "title"], mid)
effective_profile = infer_model_normalize_profile(raw, mid, name, normalize_profile)
if effective_profile == "gemini":
reasoning = infer_gemini_reasoning(raw, mid)
ctx = to_int(pick(raw, ["contextWindow", "context_window", "context", "max_context_tokens", "inputTokenLimit", "input_token_limit"]), default_ctx)
max_tokens = to_int(pick(raw, ["maxTokens", "max_tokens", "max_output_tokens", "output_tokens", "outputTokenLimit", "output_token_limit"]), default_max)
input_list = infer_gemini_input(raw, mid)
elif effective_profile == "gpt":
reasoning, input_list, ctx, max_tokens = infer_gpt_family(raw, mid, default_ctx, default_max)
else:
reasoning = bool(pick(raw, ["reasoning", "supports_reasoning", "thinking"], False))
ctx = to_int(pick(raw, ["contextWindow", "context_window", "context", "max_context_tokens"]), default_ctx)
max_tokens = to_int(pick(raw, ["maxTokens", "max_tokens", "max_output_tokens", "output_tokens"]), default_max)
input_list = infer_generic_input(raw, default=["text"])
api = pick(raw, ["api"], provider_api)
model = {
"id": mid,
"name": name,
"api": api,
"reasoning": reasoning,
"input": input_list,
"cost": {
"input": 0,
"output": 0,
"cacheRead": 0,
"cacheWrite": 0,
},
"contextWindow": ctx,
"maxTokens": max_tokens,
}
note = {
"id": mid,
"reasoning": reasoning,
"input": input_list,
"contextWindow": ctx,
"maxTokens": max_tokens,
}
return model, note
def normalize_models(models: Any, provider_api: str, default_ctx: int, default_max: int, normalize_profile: str = "generic") -> Tuple[Any, List[Dict[str, Any]]]:
if not isinstance(models, list):
return models, []
out = []
notes = []
for raw in models:
if not isinstance(raw, dict):
continue
m, n = normalize_model(raw, provider_api, default_ctx, default_max, normalize_profile=normalize_profile)
out.append(m)
notes.append(n)
return out, notes
def merge_with_existing_models(new_models: Any, existing_models: Any) -> Any:
if not isinstance(new_models, list):
return new_models
if not isinstance(existing_models, list):
return new_models
by_id = {}
for m in existing_models:
if isinstance(m, dict) and m.get("id"):
by_id[m["id"]] = m
keep_fields = ["name", "api", "reasoning", "input", "contextWindow", "maxTokens", "cost"]
merged = []
for nm in new_models:
if not isinstance(nm, dict):
continue
mid = nm.get("id")
em = by_id.get(mid)
if isinstance(em, dict):
mm = dict(nm)
for f in keep_fields:
if f in em and em[f] is not None:
mm[f] = em[f]
merged.append(mm)
else:
merged.append(nm)
return merged
def to_base_v1_url(endpoint: str) -> str:
u = endpoint.rstrip("/")
if u.endswith("/models"):
return u[: -len("/models")]
return u
def probe_mode(base_url: str, mode: str, model_id: str, headers: Dict[str, str]) -> Dict[str, Any]:
mode = mode.strip()
if mode == "openai-responses":
path = "/responses"
payload = {"model": model_id, "input": "ping", "max_output_tokens": 1}
elif mode == "openai-completions":
path = "/chat/completions"
payload = {"model": model_id, "messages": [{"role": "user", "content": "ping"}], "max_tokens": 1}
else:
return {"mode": mode, "supported": False, "status": None, "detail": "unsupported probe mode"}
url = base_url.rstrip("/") + path
req = urllib.request.Request(url, data=json.dumps(payload).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
for k, v in headers.items():
req.add_header(k, v)
try:
with urllib.request.urlopen(req, timeout=20) as resp:
return {"mode": mode, "supported": True, "status": resp.status, "detail": "ok"}
except urllib.error.HTTPError as e:
try:
body = e.read().decode("utf-8", errors="ignore")
except Exception:
body = ""
supported = e.code not in (404, 405)
detail = body[:200].replace("\n", " ") if body else e.reason
return {"mode": mode, "supported": supported, "status": e.code, "detail": detail}
except Exception as e:
return {"mode": mode, "supported": False, "status": None, "detail": str(e)}
def build_summary(
changes: List[Dict[str, Any]],
current_models: Any,
probe_results: List[Dict[str, Any]],
picked_mode: Any,
dry_run: bool,
check_only: bool,
preserve_existing_model_fields: bool,
normalize_profile: str,
fetch: Dict[str, Any] = None,
timing: Dict[str, Any] = None,
model_delta: Dict[str, Any] = None,
backup: str = None,
updated: str = None,
include_models: bool = False,
provider_base: str = "",
) -> Dict[str, Any]:
# change classification (helps downstream UX: decide whether restart is needed)
provider_changes = 0
alias_changes = 0
other_changes = 0
for c in changes or []:
to = str(c.get("to") or "")
if provider_base and to.startswith(provider_base):
provider_changes += 1
elif to.startswith("agents.defaults.models[") or to.startswith("agents.defaults.models"):
alias_changes += 1
else:
other_changes += 1
model_count = len(current_models) if isinstance(current_models, list) else 0
model_summary = []
if include_models and isinstance(current_models, list):
for m in current_models:
if not isinstance(m, dict):
continue
model_summary.append(
{
"id": m.get("id", ""),
"input": m.get("input", []) if isinstance(m.get("input"), list) else ["text"],
"reasoning": bool(m.get("reasoning")),
"contextWindow": m.get("contextWindow"),
"maxTokens": m.get("maxTokens"),
}
)
return {
"fetch": fetch or {},
"timing": timing or {},
"changed": bool(changes),
"changeCount": len(changes),
"changes": changes,
"changeKinds": {
"providerSubtree": provider_changes,
"agentAliases": alias_changes,
"other": other_changes,
},
"models": model_summary if include_models else [],
"modelCount": model_count,
"probeResults": probe_results,
"recommendedProviderApi": picked_mode,
"dryRun": dry_run,
"checkOnly": check_only,
"preserveExistingModelFields": preserve_existing_model_fields,
"normalizeProfile": normalize_profile,
"modelDelta": model_delta or {"added": [], "removed": [], "kept": 0, "orderOnly": False, "changed": [], "changedCount": 0},
"backup": backup,
"updated": updated,
}
def print_summary(summary: Dict[str, Any]):
# Helpful context for perceived speed / debugging
fetch = summary.get("fetch") or {}
timing = summary.get("timing") or {}
if fetch:
cache = fetch.get("cache") or {}
cache_bits = []
if cache.get("enabled"):
cache_bits.append("cache=on")
if cache.get("hit"):
cache_bits.append("hit")
if cache.get("notModified"):
cache_bits.append("304")
if cache.get("staleFallback"):
cache_bits.append("stale")
if cache.get("ttlSeconds") is not None:
cache_bits.append(f"ttl={cache.get('ttlSeconds')}s")
if cache.get("ageSeconds") is not None:
cache_bits.append(f"age={cache.get('ageSeconds')}s")
else:
cache_bits.append("cache=off")
fetch_ms = (fetch.get("timing") or {}).get("fetchMs")
print(
"Fetch: "
+ str(fetch.get("endpoint") or "")
+ f" (status={fetch.get('status')}, fetchMs={fetch_ms}, "
+ ",".join(cache_bits)
+ ")"
)
if timing and timing.get("totalMs") is not None:
print(f"Total: {timing.get('totalMs')}ms")
changes = summary.get("changes") or []
if not changes:
print("No changes.")
return
kinds = summary.get("changeKinds") or {}
if kinds:
print(
"Planned changes: "
+ str(summary.get("changeCount", len(changes)))
+ f" (providerSubtree={kinds.get('providerSubtree', 0)}, agentAliases={kinds.get('agentAliases', 0)}, other={kinds.get('other', 0)})"
)
else:
print(f"Planned changes: {summary.get('changeCount', len(changes))}")
change_rows = [[c.get("to", ""), c.get("from", ""), c.get("mode", "")] for c in changes]
print_md_table(["target", "source", "mode"], change_rows)
model_delta = summary.get("modelDelta") or {}
added = model_delta.get("added") or []
removed = model_delta.get("removed") or []
kept = model_delta.get("kept", 0)
changed = model_delta.get("changed") or []
if added or removed or kept or changed:
print(f"\nModel delta: +{len(added)} / -{len(removed)} / ={kept} / ~{len(changed)}")
if model_delta.get("orderOnly"):
print("Only model order changed after canonical comparison.")
if added:
print("Added: " + ", ".join(added[:10]) + (" ..." if len(added) > 10 else ""))
if removed:
print("Removed: " + ", ".join(removed[:10]) + (" ..." if len(removed) > 10 else ""))
if changed:
print("Changed models:")
for item in changed[:10]:
parts = []
for diff in item.get("fieldDiffs") or []:
parts.append(f"{diff['field']}({diff['beforeText']} → {diff['afterText']})")
print(f"- {item['id']}: " + "; ".join(parts))
if len(changed) > 10:
print(f"- ... and {len(changed) - 10} more")
models = summary.get("models") or []
if models:
print(f"\nModel summary: {summary.get('modelCount', len(models))}")
model_rows = []
for m in models:
model_rows.append(
[
m.get("id", ""),
",".join(m.get("input", [])) if isinstance(m.get("input"), list) else "text",
"yes" if m.get("reasoning") else "no",
m.get("contextWindow", "-"),
m.get("maxTokens", "-"),
]
)
print_md_table(["model", "input", "reasoning", "context", "max_tokens"], model_rows)
probe_results = summary.get("probeResults") or []
if probe_results:
print("\nAPI probe summary")
probe_rows = [
[
r.get("mode"),
"yes" if r.get("supported") else "no",
r.get("status") if r.get("status") is not None else "-",
(r.get("detail") or "")[:80],
]
for r in probe_results
]
print_md_table(["mode", "supported", "status", "note"], probe_rows)
if summary.get("recommendedProviderApi"):
print(f"Recommended provider.api: {summary['recommendedProviderApi']}")
if summary.get("normalizeProfile"):
print(f"\nNormalization profile: {summary['normalizeProfile']}")
if summary.get("preserveExistingModelFields"):
print("\nModel merge policy: preserve existing local capability fields (by model id)")
if summary.get("checkOnly"):
print("\nCheck-only mode. No file written.")
elif summary.get("dryRun"):
print("\nDry-run only. No file written.")
if summary.get("backup"):
print(f"\nBackup: {summary['backup']}")
if summary.get("updated"):
print(f"Updated: {summary['updated']}")
def main():
ap = argparse.ArgumentParser(description="Sync provider fields from upstream to openclaw.json")
ap.add_argument("--config", default="/root/.openclaw/openclaw.json", help="openclaw.json path")
ap.add_argument("--provider-root", default="models.providers", help="provider map path in config")
ap.add_argument(
"--provider-id",
required=True,
help="provider id in openclaw.json (comma-separated ok). Use 'all' to sync all providers under --provider-root.",
)
ap.add_argument("--use-provider-config", action="store_true", help="derive endpoint/auth from openclaw.json when possible")
ap.add_argument("--endpoint", default="", help="upstream API endpoint (optional with --use-provider-config)")
ap.add_argument("--method", default="GET", choices=["GET", "POST", "PUT", "PATCH"])
ap.add_argument("--timeout", type=int, default=30, help="HTTP timeout seconds for upstream fetch")
ap.add_argument("--cache-dir", default="~/.cache/openclaw/provider-sync", help="cache dir for upstream /models payload")
ap.add_argument("--cache-ttl-seconds", type=int, default=600, help="cache TTL seconds (default: 600)")
ap.add_argument("--no-cache", action="store_true", help="disable upstream cache")
ap.add_argument("--header", action="append", default=[], help="HTTP header: Key:Value")
ap.add_argument("--body-file", help="JSON file for request body")
ap.add_argument("--response-root", default="", help="optional root path in response payload")
ap.add_argument("--mapping-file", required=True, help="mapping json file")
ap.add_argument("--normalize-models", action="store_true", help="normalize model fields for OpenClaw")
ap.add_argument("--preserve-existing-model-fields", action="store_true", help="preserve existing local model capability fields by model id")
ap.add_argument("--default-context-window", type=int, default=128000)
ap.add_argument("--default-max-tokens", type=int, default=8192)
ap.add_argument("--normalize-profile", default="auto", choices=["auto", "generic", "gemini", "gpt"], help="field normalization profile (default: auto by model family)")
ap.add_argument("--probe-api-modes", default="", help="comma-separated api modes to probe, e.g. openai-responses,openai-completions")
ap.add_argument("--auto-detect-provider-api", action="store_true", help="set provider.api based on probe results")
ap.add_argument("--allow-outside-provider", action="store_true", help="allow writing outside provider subtree")
ap.add_argument(
"--sync-agent-aliases",
dest="sync_agent_aliases",
action="store_true",
default=True,
help="ensure agents.defaults.models contains <provider-id>/<model-id> entries with alias (default: on)",
)
ap.add_argument(
"--no-sync-agent-aliases",
dest="sync_agent_aliases",
action="store_false",
help="do not sync agents.defaults.models aliases",
)
ap.add_argument(
"--prune-agent-aliases",
dest="prune_agent_aliases",
action="store_true",
default=True,
help="prune agents.defaults.models entries for this provider to match <provider>.models (default: on)",
)
ap.add_argument(
"--no-prune-agent-aliases",
dest="prune_agent_aliases",
action="store_false",
help="disable pruning of agents.defaults.models for this provider",
)
ap.add_argument("--include-model", action="append", default=[], help="only include these model ids (repeatable or comma-separated)")
ap.add_argument("--exclude-model", action="append", default=[], help="exclude these model ids (repeatable or comma-separated)")
ap.add_argument("--include-models", action="store_true", help="include compact per-model summary in JSON/markdown output")
ap.add_argument("--output", default="markdown", choices=["markdown", "json"], help="summary output format")
ap.add_argument("--check-only", action="store_true", help="validate fetch/mapping/probe flow and print summary without writing")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
_t_total0 = time.time()
if not os.path.exists(args.config):
die(f"config not found: {args.config}")
cfg = load_json(args.config)
providers = get_path(cfg, args.provider_root)
if not isinstance(providers, dict):
die(f"provider root not found or not object: {args.provider_root}")
# Multi-provider mode (provider-id supports: comma list / 'all')
raw_pid = (args.provider_id or "").strip()
if raw_pid.lower() == "all":
provider_ids = [str(k) for k in providers.keys()]
else:
provider_ids = [p.strip() for p in raw_pid.split(",") if p.strip()]
if len(provider_ids) != 1:
import subprocess
summaries = []
for pid in provider_ids:
if pid not in providers:
die(f"provider not found: {pid}")
# Re-run this script in single-provider mode and collect JSON.
base_argv = []
skip_next = False
for a in sys.argv[1:]:
if skip_next:
skip_next = False
continue
if a == "--provider-id":
skip_next = True
continue
base_argv.append(a)
cmd = [sys.executable, os.path.abspath(__file__)] + base_argv + ["--provider-id", pid, "--output", "json"]
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
# Bubble the underlying error.
sys.stderr.write(proc.stderr or proc.stdout or "")
sys.exit(proc.returncode)
try:
summaries.append(json.loads(proc.stdout))
except Exception:
sys.stderr.write(proc.stdout)
die("failed to parse JSON output from sub-run")
# Aggregate output
if args.output == "json":
print(json.dumps({"providerIds": provider_ids, "summaries": summaries}, ensure_ascii=False, indent=2))
else:
for s in summaries:
fetch = s.get("fetch") or {}
prov = ""
# Best-effort: infer provider id from endpoint baseUrl in fetch
prov = fetch.get("endpoint") or ""
changed = "有变化" if s.get("changed") else "没变化"
mc = s.get("modelCount")
print(f"- {changed} | models={mc} | endpoint={fetch.get('endpoint','')}")
print("\n提示:如需写入,请去掉 --dry-run 或使用 mode=apply。")
return
# Single provider
if provider_ids[0] not in providers:
die(f"provider not found: {provider_ids[0]}")
args.provider_id = provider_ids[0]
provider_base = f"{args.provider_root}.{args.provider_id}"
provider_obj = get_path(cfg, provider_base) or {}
provider_api = provider_obj.get("api", "openai-completions")
effective_normalize_profile = (args.normalize_profile or "auto").strip().lower() or "auto"
include_models = expand_list_args(args.include_model)
exclude_models = expand_list_args(args.exclude_model)
headers = parse_headers(args.header)
# Resolve endpoint + Authorization header from config when requested (or when endpoint omitted).
if args.use_provider_config or not (args.endpoint or "").strip():
args.endpoint, headers = resolve_endpoint_and_auth_headers(
cfg, args.provider_root, args.provider_id, args.endpoint, headers
)
body = load_json(args.body_file) if args.body_file else None
upstream, fetch_meta = fetch_json_with_meta(
args.endpoint,
args.method,
headers,
body,
timeout=int(args.timeout),
cache_enabled=not bool(args.no_cache),
cache_dir=args.cache_dir,
cache_ttl_seconds=int(args.cache_ttl_seconds),
allow_stale_cache=bool(args.dry_run or args.check_only),
progress_label=f"Fetch({args.provider_id})",
)
source = get_path(upstream, args.response_root) if args.response_root else upstream
if source is None:
die("response-root resolved to null")
mappings = load_json(args.mapping_file)
if not isinstance(mappings, list):
die("mapping-file must be a JSON array")
new_cfg = copy.deepcopy(cfg)
changes = []
model_notes: List[Dict[str, Any]] = []
for m in mappings:
src_path = m.get("from")
raw_dst_path = m.get("to")
mode = m.get("mode", "replace")
if not src_path or not raw_dst_path:
die(f"invalid mapping item: {m}")
dst_path = resolve_dst_path(raw_dst_path, args.provider_root, args.provider_id)
if (not args.allow_outside_provider) and (not dst_path.startswith(provider_base)):
die(f"blocked write outside provider subtree: {dst_path}")
incoming = get_path(source, src_path)
if dst_path.endswith(".models") and (include_models or exclude_models):
incoming = filter_models(incoming, include_models, exclude_models)
if args.normalize_models and dst_path.endswith(".models"):
incoming, notes = normalize_models(
incoming,
provider_api=provider_api,
default_ctx=args.default_context_window,
default_max=args.default_max_tokens,
normalize_profile=effective_normalize_profile,
)
model_notes.extend(notes)
oldv = get_path(new_cfg, dst_path)
if args.preserve_existing_model_fields and dst_path.endswith(".models"):
incoming = merge_with_existing_models(incoming, oldv)
if dst_path.endswith(".models"):
incoming = sort_models_by_id(incoming)
if mode == "append_unique":
newv = append_unique(oldv, incoming)
else:
newv = incoming
if oldv != newv:
set_path(new_cfg, dst_path, newv)
changes.append({"to": dst_path, "from": src_path, "mode": mode})
probe_results: List[Dict[str, Any]] = []
picked_mode = None
if args.probe_api_modes:
modes = [x.strip() for x in args.probe_api_modes.split(",") if x.strip()]
probe_model = model_notes[0]["id"] if model_notes else None
if not probe_model:
maybe_models = get_path(new_cfg, f"{provider_base}.models")
if isinstance(maybe_models, list) and maybe_models:
first = maybe_models[0]
if isinstance(first, dict):
probe_model = first.get("id")
if probe_model:
base_v1 = to_base_v1_url(args.endpoint)
for mode in modes:
res = probe_mode(base_v1, mode, probe_model, headers)
probe_results.append(res)
for r in probe_results:
if r.get("supported"):
picked_mode = r["mode"]
break
if args.auto_detect_provider_api and picked_mode:
old_api = get_path(new_cfg, f"{provider_base}.api")
if old_api != picked_mode:
set_path(new_cfg, f"{provider_base}.api", picked_mode)
changes.append({"to": f"{provider_base}.api", "from": "(probe)", "mode": "replace"})
model_path = f"{provider_base}.models"
mlist = get_path(new_cfg, model_path)
if isinstance(mlist, list):
for model in mlist:
if isinstance(model, dict):
model["api"] = picked_mode
else:
probe_results.append({"mode": "(all)", "supported": False, "status": None, "detail": "no model id available for probing"})
if args.sync_agent_aliases:
provider_models = get_path(new_cfg, f"{provider_base}.models")
if isinstance(provider_models, list):
agents_obj = new_cfg.setdefault("agents", {})
defaults_obj = agents_obj.setdefault("defaults", {})
models_index = defaults_obj.setdefault("models", {})
if not isinstance(models_index, dict):
models_index = {}
defaults_obj["models"] = models_index
for model in provider_models:
if not isinstance(model, dict):
continue
mid = model.get("id")
if not mid:
continue
key = f"{args.provider_id}/{mid}"
existing = models_index.get(key)
if existing is None:
models_index[key] = {"alias": mid}
changes.append({"to": f"agents.defaults.models[{key}]", "from": "(sync-agent-aliases)", "mode": "replace"})
elif isinstance(existing, dict) and ("alias" not in existing or not existing.get("alias")):
existing["alias"] = mid
changes.append({"to": f"agents.defaults.models[{key}].alias", "from": "(sync-agent-aliases)", "mode": "replace"})
# Optional: prune agents.defaults.models for this provider to match the current provider model list.
# This makes `/models` and provider-scoped models stay consistent when using models.mode=replace.
if args.prune_agent_aliases:
provider_models = get_path(new_cfg, f"{provider_base}.models")
if isinstance(provider_models, list):
keep = set()
for model in provider_models:
if isinstance(model, dict) and model.get("id"):
keep.add(f"{args.provider_id}/{model.get('id')}")
agents_obj = new_cfg.setdefault("agents", {})
defaults_obj = agents_obj.setdefault("defaults", {})
models_index = defaults_obj.setdefault("models", {})
if not isinstance(models_index, dict):
models_index = {}
defaults_obj["models"] = models_index
prefix = f"{args.provider_id}/"
removed = []
for k in list(models_index.keys()):
if isinstance(k, str) and k.startswith(prefix) and k not in keep:
removed.append(k)
del models_index[k]
for k in removed:
changes.append({"to": f"agents.defaults.models[{k}]", "from": "(prune-agent-aliases)", "mode": "delete"})
current_models = get_path(new_cfg, f"{provider_base}.models")
previous_models = get_path(cfg, f"{provider_base}.models")
_t_total_ms = int((time.time() - _t_total0) * 1000)
model_delta = summarize_model_delta(previous_models, current_models)
summary = build_summary(
changes=changes,
current_models=current_models,
probe_results=probe_results,
picked_mode=picked_mode,
dry_run=args.dry_run,
check_only=args.check_only,
preserve_existing_model_fields=args.preserve_existing_model_fields,
normalize_profile=effective_normalize_profile,
fetch=fetch_meta,
timing={"totalMs": _t_total_ms},
model_delta=model_delta,
include_models=bool(args.include_models),
provider_base=provider_base,
)
will_write = bool(changes) and (not args.dry_run) and (not args.check_only)
# For JSON output: if we are going to write, print only once AFTER writing (includes backup/updated).
if args.output == "json":
if not will_write:
print(json.dumps(summary, ensure_ascii=False, indent=2))
else:
print_summary(summary)
if not changes:
return
if args.dry_run or args.check_only:
return
ts = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
backup = f"{args.config}.bak.{ts}"
dump_json(backup, cfg, chmod_600=True)
dump_json(args.config, new_cfg, chmod_600=True)
if args.output == "json":
summary["backup"] = backup
summary["updated"] = args.config
print(json.dumps(summary, ensure_ascii=False, indent=2))
else:
print(f"\nBackup: {backup}")
print(f"Updated: {args.config}")
if __name__ == "__main__":
main()
Add a Telegram private-chat footer (`🧠 Model + 💭 Think + 📊 Context`) to OpenClaw replies, with dry-run preview, backup, syntax validation, rollback, and r...
---
name: telegram-footer-patch
description: Add a Telegram private-chat footer (`🧠 Model + 💭 Think + 📊 Context`) to OpenClaw replies, with dry-run preview, backup, syntax validation, rollback, and restart guidance.
license: MIT
spdx: MIT
---
# Telegram Footer Patch

给 Telegram 私聊回复追加平台层尾注,不依赖模型记忆。
## Features
- Add a Telegram private-chat footer: `🧠 Model + 💭 Think + 📊 Context`
- Support dry-run, backup, rollback, and reapply after upgrades
## 功能
- 给 Telegram 私聊回复追加 `🧠 Model + 💭 Think + 📊 Context` 尾注
- 支持预览、备份、回滚,以及升级后重打
当前实现:优先命中并修改当前版本实际可能生效的 dist 文件(包含 `agent-runner.runtime-*.js`、`reply-*.js`、`compact-*.js`、`pi-embedded-*.js`、`delivery-*.js` 等),支持按内容 anchor 自动发现、自动备份、重复覆盖更新与回滚。**注意:这表示“尽量兼容不同 bundle 布局”,不等于已经证明跨版本都兼容;最终是否修好,以真实 Telegram 私聊回复是否出现脚注为准。**
## 版本支持 / Validation Boundary
- **已实测通过(live Telegram 私聊验收)**:OpenClaw **2026.3.22**
- **对应实际验证的 bundle 路径(2026.3.22)**:`/usr/lib/node_modules/openclaw/dist/agent-runner.runtime-BWpOtdxK.js`
- **已实测通过(live Telegram 私聊验收)**:OpenClaw **2026.4.5**
- **对应实际验证的 bundle 路径(2026.4.5)**:`/usr/lib/node_modules/openclaw/dist/agent-runner.runtime-UIIO4kss.js`
- **已实测通过(live Telegram 私聊验收)**:OpenClaw **2026.4.12**
- **对应实际验证的 bundle 路径(2026.4.12)**:`/usr/lib/node_modules/openclaw/dist/agent-runner.runtime-D6-wGQkR.js` + `/usr/lib/node_modules/openclaw/dist/delivery-iF4EZ9PY.js`
- **2026.4.12 实战结论**:只 patch runner 不够;真实 Telegram delivery/send path 也可能需要 patch,且必须在**真重启(新 PID)**后再做真实私聊验收
- **已完成的静态验证**:`--dry-run`、`--auto-discover --verify`、smoke test、`node --check`
- **未实测覆盖**:其它 OpenClaw 版本、其它 dist 命名/布局、其它未命中的真实 reply/send path
- **发布口径**:除非做过真实 Telegram 私聊验收,否则只能写“可能兼容/已做兼容处理”,不能写“已支持”
## What to consider before installing / 安装前需要考虑的事项
This skill does what it says (patches OpenClaw dist JS files to append a Telegram footer), but it **writes into your OpenClaw installation directory** and requires **Node.js + Python 3**.
Before installing/running:
1) Inspect the scripts yourself and run `--dry-run` to see which files would be touched.
2) Ensure `node` is installed and you have a plan for filesystem permissions (consider a staging instance/container).
3) Confirm backups are created (`*.bak.telegram-footer.*`) and test the revert script.
4) Only run the patch on systems you control and trust.
这项技能的功能正如其名(修改 OpenClaw 分发目录中的 JS 文件,添加 Telegram 页脚),但它会**写入 OpenClaw 安装目录**,并且需要 **Node.js + Python 3**。
安装/运行前:
1) 请自行检查脚本,并运行 `--dry-run` 先预览将会修改哪些文件。
2) 确保已安装 `node`,并规划好文件系统权限(建议先在测试环境/容器中跑)。
3) 确认已创建备份(查找 `*.bak.telegram-footer.*`)并测试还原脚本。
4) 仅在你控制且信任的系统上运行;不确定就先上 staging。
## 使用
### 1) 预览
```bash
python3 scripts/patch_reply_footer.py --dry-run --list-targets
```
### 2) 应用
```bash
python3 scripts/patch_reply_footer.py --auto-discover
python3 scripts/patch_reply_footer.py --auto-discover --verify
```
### 2.5) 跑 smoke test / verify(推荐)
```bash
bash scripts/smoke_test_footer_patch.sh
# 或指定 dist
bash scripts/smoke_test_footer_patch.sh /usr/lib/node_modules/openclaw/dist
```
这个流程会依次执行:
- target discovery
- dry-run auto-discover
- apply auto-discover patch
- marker verification
- 对已打 marker 的文件逐个 `node --check`
- 额外检查 patched file 中没有残留 `formatTokens(`
**边界说明:** 这一步只证明“当前 dist 样本里的候选 bundle 已被 patch 且语法正常”,**不等于**真实回复链路一定已经生效,也不等于跨版本兼容已经被证明。
### 3) 真重启网关(**必须**,才能生效)
> 说明:补丁改的是 OpenClaw 的 dist bundle;Gateway 不重启就不会重新加载,Telegram 私聊脚注不会生效。**仅热刷新 / SIGUSR1 不应直接当作验收通过。**如果环境允许,最好确认出现了**新 PID**。
```bash
openclaw gateway restart
```
### 3.5) 真实验收(**必须**)
重启后,必须做一次**真实 Telegram 私聊回复验收**:
1. 给机器人发一条普通私聊消息;
2. 看实际收到的回复末尾是否出现:
- `🧠 ...`
- `💭 Think: ...`
- `📊 .../...`
3. 若实际消息里**没有脚注**,则结论应是:**还没修好**。
推荐验收标准:
- `--dry-run` / smoke test 通过 → 只能说明 patch 写进了候选 bundle;
- **真实 Telegram 私聊回复带脚注** → 才算当前 live path 修好。
### 4) 回滚
```bash
python3 scripts/revert_reply_footer.py
openclaw gateway restart
```
## 现在包含的保护
- patch 后自动执行 `node --check`
- 尾注片段自包含,不依赖目标 bundle 内部是否存在 `formatTokens` 等局部 helper
- 语法校验失败时自动恢复刚写入前的备份
- 支持 `--list-targets` / `--verify`,先看命中文件,再确认 marker 是否真的打进实际 bundle
- 附带 `scripts/smoke_test_footer_patch.sh`,把 patch + verify + syntax + helper-sanity 串成一套固定回归流程
- smoke test 只是静态/候选级验证;最终仍以**真实 Telegram 私聊回复是否带脚注**为验收标准
- 若 marker 丢失但已有历史备份,会提示“可能被升级覆盖,正在重打”
- 若 insertion needle 在候选 reply bundle 中失效,会明确报错,不再静默跳过
- 会清理已知旧版 Telegram 尾注块,避免双尾注叠加
## 说明
- 当前优先候选包含:`dist/agent-runner.runtime-*.js`、`dist/reply-*.js`、`dist/compact-*.js`、`dist/pi-embedded-*.js`、`dist/plugin-sdk/thread-bindings-*.js`、`dist/model-selection-*.js`、`dist/auth-profiles-*.js`、`dist/delivery-*.js`
- `--auto-discover` 会按内容 needle 追加扫描 `dist/**/*.js`,适合升级后重新定位实际生效 bundle
- 已打过补丁时,会按 marker 直接覆盖更新,不会重复注入
- 每次写入前会自动生成 `.bak.telegram-footer.*` 备份
- OpenClaw 升级后若补丁被覆盖,先跑 `--dry-run --list-targets`,再用 `--auto-discover --verify` 确认命中实际 bundle
- 如果文档、smoke test、marker 都看起来正常,但真实 Telegram 回复仍无脚注,说明**当前真实发送链路未命中**,应继续沿 live path 排查,而不是宣布修复完成
FILE:CHANGELOG.md
# Changelog
## 1.0.11
- Live-validate OpenClaw `2026.4.12` with a real Telegram private-chat acceptance after a true process restart.
- Document that runner-only patching is insufficient on `2026.4.12`; the real delivery/send path also has to be patched.
- Update patch/revert tooling to include `delivery-*.js` candidates and verify the live-path patch more conservatively.
- Clarify in docs that hot refresh / SIGUSR1 is not enough evidence by itself; acceptance requires a new PID plus a real Telegram private-chat reply showing the footer.
## 1.0.10
- Add a second live validation boundary: OpenClaw `2026.4.5` is now real-world verified via Telegram private-chat acceptance.
- Document the current live-validated bundle path on `2026.4.5`: `agent-runner.runtime-UIIO4kss.js`.
- Update README/SKILL wording so published support claims distinguish between live-validated versions (`2026.3.22`, `2026.4.5`) and merely compatibility-targeted versions.
## 1.0.9
- Clarify version-support wording: live-validated on OpenClaw `2026.3.22` only; untested versions must not be described as “supported”.
- Document the exact live-validated bundle path: `agent-runner.runtime-BWpOtdxK.js`.
- Tighten acceptance wording so smoke test / static patch verification is no longer treated as final proof; real Telegram private-chat reply validation is required.
## 1.0.8
- Remove stray Python bytecode/cache artifacts from the package again.
- Tighten README/SKILL wording to reduce false-positive suspicious scanning while keeping behavior unchanged.
## 1.0.7
- Fix: footer now appends correctly for Telegram streaming replies that use HTML payloads (not just text payloads).
- Fix: patch reapply uses a function replacement to avoid accidental backslash/escape expansion when updating marker blocks.
## 1.0.6
- Add install/run safety notice to docs.
- Add preflight checks and disable Python bytecode cache writes in scripts.
## 1.0.5
- Remove Python bytecode/cache artifacts from the published package (avoid false-positive malware/suspicious flags).
## 1.0.4
- Compatibility update for latest OpenClaw dist bundles.
- Verified current patch flow against local OpenClaw `2026.3.7`.
- Refreshed skill docs to reflect the current footer format and upgrade-aware reapply workflow.
## 1.0.2
- Updated homepage/skill copy to reflect the current footer format: `🧠 Model + 💭 Think + 📊 Context`.
- Added footer preview image to the top of the README.
## 1.0.1
- Added license and maintenance documentation.
- Clarified apply, rollback, and verification steps.
- Added lightweight README for project hygiene.
## 1.0.0
- Initial release: private-chat footer injection, dry-run check, backup, rollback script.
FILE:README.md
# Telegram Footer Patch

Patch OpenClaw's Telegram reply pipeline to append a one-line footer in private chats (`🧠 Model + 💭 Think + 📊 Context`).
## What it does
- Adds a one-line footer for Telegram private-chat replies
- Shows model, think level, and context usage in one line
- Supports dry-run preview
- Creates a backup before any file change
- Supports rollback and verification after restart
- Targets current OpenClaw runner bundles and, when needed, the real Telegram delivery path conservatively
- Final success must be confirmed by a real Telegram private-chat reply
## Recommended flow
1. Dry-run
2. Apply
3. Restart the gateway / service with a **true process restart** (**required** to take effect). Do not rely only on hot refresh / SIGUSR1; confirm a new PID if you can.
4. Send a real Telegram private-chat test message and verify the footer in the actual delivered reply
> Smoke test / marker verification only proves the patch hit candidate bundle files. If the real Telegram reply still has no footer, treat it as not fixed yet.
## Validated version boundary
- **Live-validated:** OpenClaw **2026.3.22**
- **Live-validated bundle path (2026.3.22):** `/usr/lib/node_modules/openclaw/dist/agent-runner.runtime-BWpOtdxK.js`
- **Live-validated:** OpenClaw **2026.4.5**
- **Live-validated bundle path (2026.4.5):** `/usr/lib/node_modules/openclaw/dist/agent-runner.runtime-UIIO4kss.js`
- **Live-validated:** OpenClaw **2026.4.12**
- **Live-validated bundle paths (2026.4.12):** `/usr/lib/node_modules/openclaw/dist/agent-runner.runtime-D6-wGQkR.js` and `/usr/lib/node_modules/openclaw/dist/delivery-iF4EZ9PY.js`
- **2026.4.12 lesson:** runner-only patching was not enough; the real Telegram delivery/send path also needed patching before a true restart + real-chat acceptance succeeded
- **Not live-validated:** other OpenClaw versions/builds
- **Claim boundary:** for untested versions, say “may be compatible” / “compatibility logic added”, not “supported”
## Before you run
- This updates OpenClaw frontend bundle files under `.../openclaw/dist/`.
- Run `python3 scripts/patch_reply_footer.py --dry-run` first.
- Confirm backups exist (`*.bak.telegram-footer.*`) and test rollback (`python3 scripts/revert_reply_footer.py --dry-run`).
- Use only on systems you control.
## Key files
- `SKILL.md` — usage guidance
- `scripts/patch_reply_footer.py` — patch script
- `scripts/revert_reply_footer.py` — rollback script
- `CHANGELOG.md` — release notes
- `LICENSE` — MIT license
FILE:scripts/patch_reply_footer.py
#!/usr/bin/env python3
"""Patch OpenClaw dist JS bundles to append a Telegram private-chat footer.
Safety notes:
- This script modifies files under the OpenClaw installation directory.
- Always run with --dry-run first to see which files would be touched.
- The script creates timestamped backups (*.bak.telegram-footer.*) before writing.
- If anything fails, it restores from backup automatically.
This version handles two layers:
- runner/finalPayload candidate bundles
- Telegram delivery/live outbound layer bundles
Important boundary:
- patch/verify success only proves candidate bundle patching + syntax validity
- final acceptance still requires a real Telegram private-chat reply showing the footer
- if you only hot-refresh and never load a new process, treat it as unverified
"""
import argparse
import datetime as dt
import os
import pathlib
import re
import shutil
import subprocess
import sys
sys.dont_write_bytecode = True
MARKER_START = "/* OPENCLAW_TELEGRAM_STATUS_FOOTER_START */"
MARKER_END = "/* OPENCLAW_TELEGRAM_STATUS_FOOTER_END */"
RUNNER_SNIPPET_TEMPLATE = r'''
__MARKER_START__
const __ocSessionLooksTelegramDirect =
typeof sessionKey === "string" && sessionKey.includes(":telegram:direct:");
const __ocReplyProvider =
sessionCtx?.Surface || sessionCtx?.Provider || activeSessionEntry?.lastChannel || activeSessionEntry?.channel || "";
const __ocReplyChatType =
sessionCtx?.ChatType || activeSessionEntry?.chatType || "";
const __ocShouldAppendStatusFooter =
(__ocSessionLooksTelegramDirect || __ocReplyProvider === "telegram") &&
__ocReplyChatType !== "group" &&
__ocReplyChatType !== "channel";
const __ocEscapeHtml = (str) => String(str)
.replace(/&/g, "&")
.replace(/</g, "<")
.replace(/>/g, ">")
.replace(/\"/g, """)
.replace(/'/g, "'");
const __ocFormatTokens = (value) => {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) return "?";
if (value >= 1000000) return `1).replace(/\.0$/, "")M`;
if (value >= 1000) return `1).replace(/\.0$/, "")k`;
return String(Math.round(value));
};
const __ocBuildTokenUsage = (used, limit) => {
const usedLabel = __ocFormatTokens(used);
const limitLabel = __ocFormatTokens(limit);
return limitLabel === "?" ? usedLabel : `usedLabel/limitLabel`;
};
const __ocAppendFooter = (payloads, footerText, footerHtml) => {
let index = -1;
for (let i = payloads.length - 1; i >= 0; i -= 1) {
if (payloads[i]?.html || payloads[i]?.text) {
index = i;
break;
}
}
if (index === -1) return [...payloads, { text: footerText }];
const existing = payloads[index];
const existingText = existing?.text ?? "";
const existingHtml = existing?.html ?? "";
const textSep = existingText.endsWith("\n") ? "" : "\n";
const htmlSep = existingHtml.endsWith("<br>") ? "" : "<br>";
const next = {
...existing,
text: existingText ? `existingTexttextSepfooterText` : footerText,
html: existingHtml ? `existingHtmlhtmlSepfooterHtml` : void 0
};
const updated = payloads.slice();
updated[index] = next;
return updated;
};
if (__ocShouldAppendStatusFooter) {
const __ocTotalTokens = resolveFreshSessionTotalTokens(activeSessionEntry);
const __ocThinkingLevel = activeSessionEntry?.thinkingLevel || "default";
const __ocContextLimit = contextTokensUsed ?? activeSessionEntry?.contextTokens ?? null;
const __ocStatusFooter = [
`🧠 providerUsed && modelUsed ? `${providerUsed/modelUsed` : modelUsed || "unknown"}`,
`💭 Think: __ocThinkingLevel`,
`📊 __ocBuildTokenUsage(__ocTotalTokens, __ocContextLimit)`
].join(" ");
const __ocFooterText = `──────────\n__ocStatusFooter`;
const __ocFooterHtml = `──────────<br>__ocEscapeHtml(__ocStatusFooter)`;
finalPayloads = __ocAppendFooter(finalPayloads, __ocFooterText, __ocFooterHtml);
}
__MARKER_END__
'''.strip("\n")
DELIVERY_HELPERS_TEMPLATE = r'''
__MARKER_START__
function __ocFooterAlreadyPresent(text) {
return typeof text === "string" && text.includes("🧠 ") && text.includes("💭 Think:") && text.includes("📊 ");
}
function __ocFormatFooterTokens(value) {
if (typeof value !== "number" || !Number.isFinite(value) || value <= 0) return "?";
if (value >= 1e6) return `1).replace(/\.0$/, "")M`;
if (value >= 1e3) return `1).replace(/\.0$/, "")k`;
return String(Math.round(value));
}
function __ocBuildFooterTokenUsage(used, limit) {
const usedLabel = __ocFormatFooterTokens(used);
const limitLabel = __ocFormatFooterTokens(limit);
return limitLabel === "?" ? usedLabel : `usedLabel/limitLabel`;
}
async function __ocReadSessionEntryForFooter(sessionKey) {
if (!sessionKey) return null;
try {
const fs = await import("node:fs/promises");
const homeDir = typeof process !== "undefined" && process?.env?.HOME ? process.env.HOME : "/root";
const raw = await fs.readFile(`homeDir/.openclaw/agents/main/sessions/sessions.json`, "utf8");
const parsed = JSON.parse(raw);
const entry = parsed && typeof parsed === "object" ? parsed[sessionKey] : null;
return entry && typeof entry === "object" ? entry : null;
} catch {
return null;
}
}
async function __ocMaybeAppendTelegramStatusFooter(text, params) {
if (typeof text !== "string" || !text.trim()) return text;
if (__ocFooterAlreadyPresent(text)) return text;
const sessionKey = typeof params?.sessionKeyForInternalHooks === "string" ? params.sessionKeyForInternalHooks : "";
const chatIdText = typeof params?.chatId === "string" ? params.chatId : String(params?.chatId ?? "");
const looksTelegramDirect = sessionKey.includes(":telegram:direct:") || (!!chatIdText && !chatIdText.startsWith("-") && sessionKey.includes(":telegram:"));
if (!looksTelegramDirect) return text;
const entry = await __ocReadSessionEntryForFooter(sessionKey);
if (!entry) return text;
const provider = typeof entry.modelProvider === "string" && entry.modelProvider ? entry.modelProvider : "unknown";
const model = typeof entry.model === "string" && entry.model ? entry.model : "unknown";
const thinkingLevel = typeof entry.thinkingLevel === "string" && entry.thinkingLevel ? entry.thinkingLevel : "default";
const totalTokens = typeof entry.totalTokens === "number" ? entry.totalTokens : null;
const contextTokens = typeof entry.contextTokens === "number" ? entry.contextTokens : null;
const statusFooter = [
`🧠 provider/model`,
`💭 Think: thinkingLevel`,
`📊 __ocBuildFooterTokenUsage(totalTokens, contextTokens)`
].join(" ");
const separator = text.endsWith("\n") ? "" : "\n";
return `textseparator\n──────────\nstatusFooter`;
}
__MARKER_END__
'''.strip("\n")
RUNNER_SNIPPET = RUNNER_SNIPPET_TEMPLATE.replace("__MARKER_START__", MARKER_START).replace("__MARKER_END__", MARKER_END)
DELIVERY_HELPERS = DELIVERY_HELPERS_TEMPLATE.replace("__MARKER_START__", MARKER_START).replace("__MARKER_END__", MARKER_END)
RUNNER_PATTERN = re.compile(
r"(if\s*\(\s*responseUsageLine\s*\)\s*finalPayloads\s*=\s*appendUsageLine\(\s*finalPayloads\s*,\s*responseUsageLine\s*\);)",
flags=re.M,
)
RUNNER_NEEDLE_SUBSTR = "appendUsageLine(finalPayloads, responseUsageLine)"
DELIVERY_FN_ANCHOR = "async function deliverReplies(params) {"
DELIVERY_LOOP_ANCHOR = "\t\tlet reply = originalReply;"
DELIVERY_LOOP_INSERT = (
"\t\tif (typeof reply?.text === \"string\" && reply.text) reply = {\n"
"\t\t\t...reply,\n"
"\t\t\ttext: await __ocMaybeAppendTelegramStatusFooter(reply.text, params)\n"
"\t\t};"
)
DELIVERY_INLINE_NEEDLE = "__ocMaybeAppendTelegramStatusFooter(reply.text, params)"
DELIVERY_HELPER_NEEDLE = "async function __ocMaybeAppendTelegramStatusFooter(text, params)"
MARKER_BLOCK_RE = re.compile(re.escape(MARKER_START) + r".*?" + re.escape(MARKER_END), flags=re.S)
TARGET_GLOBS = [
"agent-runner.runtime-*.js",
"reply-*.js",
"compact-*.js",
"pi-embedded-*.js",
"plugin-sdk/thread-bindings-*.js",
"model-selection-*.js",
"auth-profiles-*.js",
"delivery-*.js",
]
LEGACY_BLOCK_RE = re.compile(
r"\n?\s*const shouldAppendStatusFooter = activeSessionEntry\?\.chatType !== \"group\" && activeSessionEntry\?\.chatType !== \"channel\" && \(activeSessionEntry\?\.lastChannel === \"telegram\" \|\| activeSessionEntry\?\.channel === \"telegram\"\);\s*"
r"if \(shouldAppendStatusFooter\) \{\s*"
r"const totalTokens = resolveFreshSessionTotalTokens\(activeSessionEntry\);\s*"
r"const statusFooter = \[\s*"
r"`🧠 Model: \$\{providerUsed && modelUsed \? `\$\{providerUsed\}/\$\{modelUsed\}` : modelUsed \|\| \"unknown\"\}`\s*,\s*"
r"`📊 Context: \$\{formatTokens\(typeof totalTokens === \"number\" && Number\.isFinite\(totalTokens\) && totalTokens > 0 \? totalTokens : null, contextTokensUsed \?\? activeSessionEntry\?\.contextTokens \?\? null\)\}`\s*"
r"\]\.join\(\" \"\);\s*"
r"finalPayloads = appendUsageLine\(finalPayloads, statusFooter\);\s*"
r"\}\s*",
flags=re.S,
)
def verify_node_syntax(path: pathlib.Path):
result = subprocess.run(["node", "--check", str(path)], capture_output=True, text=True, check=False)
if result.returncode != 0:
details = (result.stderr or result.stdout or "node --check failed").strip()
raise RuntimeError(details)
def _is_backup_path(path: pathlib.Path) -> bool:
return ".bak.telegram-footer." in path.name
def _read_text(path: pathlib.Path) -> str:
try:
return path.read_text(encoding="utf-8")
except UnicodeDecodeError:
return path.read_text(encoding="utf-8", errors="ignore")
def _detect_kind(path: pathlib.Path, text: str) -> str | None:
if (
DELIVERY_FN_ANCHOR in text
and "for (const originalReply of params.replies) {" in text
and "emitTelegramMessageSentHooks" in text
):
return "delivery"
if RUNNER_NEEDLE_SUBSTR in text and RUNNER_PATTERN.search(text):
return "runner"
return None
def _looks_like_target_by_content(path: pathlib.Path) -> bool:
text = _read_text(path)
return _detect_kind(path, text) is not None
def iter_target_files(dist: pathlib.Path, auto_discover: bool = False) -> list[pathlib.Path]:
files: set[pathlib.Path] = set()
for pattern in TARGET_GLOBS:
for fp in dist.glob(pattern):
if fp.is_file() and not _is_backup_path(fp):
files.add(fp)
if auto_discover:
for fp in dist.rglob("*.js"):
if not fp.is_file() or _is_backup_path(fp):
continue
try:
if _looks_like_target_by_content(fp):
files.add(fp)
except OSError:
continue
return sorted(files)
def analyze_file(path: pathlib.Path) -> dict:
content = _read_text(path)
kind = _detect_kind(path, content)
has_marker = MARKER_START in content
has_runner_pattern = RUNNER_PATTERN.search(content) is not None
has_legacy = LEGACY_BLOCK_RE.search(content) is not None
has_delivery_inline = DELIVERY_INLINE_NEEDLE in content
has_delivery_helper = DELIVERY_HELPER_NEEDLE in content
is_candidate = kind is not None or has_marker or has_legacy
fully_patched = False
if kind == "runner":
fully_patched = has_marker
elif kind == "delivery":
fully_patched = has_delivery_inline and (has_marker or has_delivery_helper)
return {
"path": path,
"content": content,
"kind": kind,
"has_marker": has_marker,
"has_runner_pattern": has_runner_pattern,
"has_legacy": has_legacy,
"has_delivery_inline": has_delivery_inline,
"has_delivery_helper": has_delivery_helper,
"is_candidate": is_candidate,
"fully_patched": fully_patched,
}
def _patch_runner_content(content: str) -> tuple[str, int]:
updated = content
legacy_removed = 0
if LEGACY_BLOCK_RE.search(updated):
updated, legacy_removed = LEGACY_BLOCK_RE.subn("\n", updated)
if MARKER_START in updated:
updated, count = MARKER_BLOCK_RE.subn(lambda _m: RUNNER_SNIPPET, updated, count=1)
if count == 0:
raise RuntimeError("runner marker block found but could not be replaced")
return updated, legacy_removed
match = RUNNER_PATTERN.search(updated)
if not match:
raise RuntimeError("runner insertion needle not found in candidate dist bundle")
replacement = match.group(1) + "\n\n" + RUNNER_SNIPPET
updated = updated[: match.start()] + replacement + updated[match.end() :]
return updated, legacy_removed
def _patch_delivery_content(content: str) -> str:
updated = content
if MARKER_START in updated:
updated, count = MARKER_BLOCK_RE.subn(lambda _m: DELIVERY_HELPERS, updated, count=1)
if count == 0:
raise RuntimeError("delivery marker block found but could not be replaced")
elif DELIVERY_HELPER_NEEDLE not in updated:
if DELIVERY_FN_ANCHOR not in updated:
raise RuntimeError("delivery function anchor not found")
updated = updated.replace(DELIVERY_FN_ANCHOR, DELIVERY_HELPERS + "\n" + DELIVERY_FN_ANCHOR, 1)
if DELIVERY_INLINE_NEEDLE not in updated:
if DELIVERY_LOOP_ANCHOR not in updated:
raise RuntimeError("delivery loop anchor not found")
updated = updated.replace(DELIVERY_LOOP_ANCHOR, DELIVERY_LOOP_ANCHOR + "\n" + DELIVERY_LOOP_INSERT, 1)
return updated
def patch_file(path: pathlib.Path, dry_run: bool):
info = analyze_file(path)
content = info["content"]
kind = info["kind"]
if not info["is_candidate"] or not kind:
print(f"[skip] non-target dist bundle: {path}")
return {"status": "skip", "candidate": False, "changed": False}
try:
if kind == "runner":
updated, legacy_removed = _patch_runner_content(content)
extra = f" (legacy cleaned: {legacy_removed})" if legacy_removed else ""
else:
updated = _patch_delivery_content(content)
extra = ""
except Exception as exc:
print(f"[err] patch planning failed: {path}", file=sys.stderr)
print(f"[err] reason: {exc}", file=sys.stderr)
return {"status": "error", "candidate": True, "changed": False}
changed = updated != content
if not changed:
print(f"[skip] already up to date: {path}")
return {"status": "ok", "candidate": True, "changed": False}
action = "update patch" if info["has_marker"] else "patch"
if dry_run:
print(f"[dry-run] would {action}: {path}{extra}")
return {"status": "ok", "candidate": True, "changed": True}
ts = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
backup = path.with_suffix(path.suffix + f".bak.telegram-footer.{ts}")
shutil.copy2(path, backup)
try:
path.write_text(updated, encoding="utf-8")
verify_node_syntax(path)
except Exception as exc:
shutil.copy2(backup, path)
print(f"[err] patch failed, restored backup: {path}", file=sys.stderr)
print(f"[err] reason: {exc}", file=sys.stderr)
return {"status": "error", "candidate": True, "changed": False}
print(f"[ok] {action}ed: {path}{extra}")
print(f"[ok] backup : {backup}")
print(f"[ok] syntax check: node --check passed")
return {"status": "ok", "candidate": True, "changed": True}
def preflight(dist: pathlib.Path, dry_run: bool) -> int:
print("[warn] This tool patches OpenClaw installation files (dist JS bundles).")
print("[warn] Recommended: run --dry-run first and review the candidate files.")
node_path = shutil.which("node")
if not node_path:
print("[err] node not found in PATH (required for syntax validation via node --check)", file=sys.stderr)
return 2
if not dist.exists() or not dist.is_dir():
print(f"[err] dist directory not found: {dist}", file=sys.stderr)
return 2
if not dry_run:
if not os.access(dist, os.W_OK):
print(f"[err] no write permission for dist directory: {dist} (try sudo or adjust permissions)", file=sys.stderr)
return 2
else:
if not os.access(dist, os.R_OK):
print(f"[err] no read permission for dist directory: {dist}", file=sys.stderr)
return 2
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Patch OpenClaw dist files to append Telegram status footer.")
parser.add_argument("--dist", default="/usr/lib/node_modules/openclaw/dist", help="OpenClaw dist directory")
parser.add_argument("--dry-run", action="store_true", help="Preview only, do not write")
parser.add_argument("--auto-discover", action="store_true", help="Also scan dist/**/*.js for supported runner/delivery anchors")
parser.add_argument("--list-targets", action="store_true", help="Print resolved target file list, then exit")
parser.add_argument("--verify", action="store_true", help="Exit non-zero if any real candidate file lacks the patch")
args = parser.parse_args()
dist = pathlib.Path(args.dist)
rc = preflight(dist, dry_run=args.dry_run)
if rc != 0:
return rc
files = iter_target_files(dist, auto_discover=args.auto_discover)
if not files:
print("[err] no target dist files found", file=sys.stderr)
return 2
if args.list_targets:
for fp in files:
info = analyze_file(fp)
print(
f"{fp} candidate={str(info['is_candidate']).lower()} kind={info['kind'] or '-'} marker={str(info['has_marker']).lower()} fully_patched={str(info['fully_patched']).lower()} legacy={str(info['has_legacy']).lower()}"
)
return 0
if args.verify:
missing: list[pathlib.Path] = []
candidate_count = 0
for fp in files:
info = analyze_file(fp)
if not info["is_candidate"] or not info["kind"]:
continue
candidate_count += 1
if not info["fully_patched"]:
missing.append(fp)
if missing:
for fp in missing:
print(f"[err] patch missing/incomplete in candidate: {fp}", file=sys.stderr)
return 1
print(f"[ok] patch present in {candidate_count} candidate target(s)")
return 0
changed = 0
errors = 0
for fp in files:
result = patch_file(fp, dry_run=args.dry_run)
if result.get("status") == "error":
errors += 1
if result.get("changed"):
changed += 1
if errors:
print(f"[done] errors: {errors}", file=sys.stderr)
return 1
if args.dry_run:
print(f"[done] changed files: {changed}" if changed else "[done] no files changed")
return 0
print(f"[done] changed files: {changed}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/revert_reply_footer.py
#!/usr/bin/env python3
"""Revert OpenClaw dist JS bundles from the latest telegram-footer backups."""
import argparse
import glob
import os
import pathlib
import shutil
import subprocess
import sys
sys.dont_write_bytecode = True
MARKER_START = "/* OPENCLAW_TELEGRAM_STATUS_FOOTER_START */"
TARGET_GLOBS = [
"agent-runner.runtime-*.js",
"reply-*.js",
"compact-*.js",
"pi-embedded-*.js",
"plugin-sdk/thread-bindings-*.js",
"model-selection-*.js",
"auth-profiles-*.js",
"delivery-*.js",
]
def verify_node_syntax(path: pathlib.Path):
result = subprocess.run(["node", "--check", str(path)], capture_output=True, text=True, check=False)
if result.returncode != 0:
details = (result.stderr or result.stdout or "node --check failed").strip()
raise RuntimeError(details)
def _is_backup_path(path: pathlib.Path) -> bool:
return ".bak.telegram-footer." in path.name
def iter_target_files(dist: pathlib.Path) -> list[pathlib.Path]:
files: set[pathlib.Path] = set()
for pattern in TARGET_GLOBS:
for fp in dist.glob(pattern):
if fp.is_file() and not _is_backup_path(fp):
files.add(fp)
return sorted(files)
def revert_file(path: pathlib.Path, dry_run: bool) -> bool:
content = path.read_text(encoding="utf-8")
if MARKER_START not in content:
print(f"[skip] not patched: {path}")
return False
backups = sorted(glob.glob(str(path) + ".bak.telegram-footer.*"))
if not backups:
print(f"[err] backup not found: {path}", file=sys.stderr)
return False
latest = pathlib.Path(backups[-1])
if dry_run:
print(f"[dry-run] would restore {path} <- {latest}")
return True
current_backup = path.with_suffix(path.suffix + ".bak.pre-revert")
shutil.copy2(path, current_backup)
try:
shutil.copy2(latest, path)
verify_node_syntax(path)
except Exception as exc:
shutil.copy2(current_backup, path)
print(f"[err] restore failed, put current file back: {path}", file=sys.stderr)
print(f"[err] reason: {exc}", file=sys.stderr)
return False
print(f"[ok] restored : {path} <- {latest}")
print(f"[ok] safety copy : {current_backup}")
print(f"[ok] syntax check: node --check passed")
return True
def preflight(dist: pathlib.Path, dry_run: bool) -> int:
node_path = shutil.which("node")
if not node_path:
print("[err] node not found in PATH (required for syntax validation via node --check)", file=sys.stderr)
return 2
if not dist.exists() or not dist.is_dir():
print(f"[err] dist directory not found: {dist}", file=sys.stderr)
return 2
if not dry_run and not os.access(dist, os.W_OK):
print(f"[err] no write permission for dist directory: {dist}", file=sys.stderr)
return 2
if dry_run and not os.access(dist, os.R_OK):
print(f"[err] no read permission for dist directory: {dist}", file=sys.stderr)
return 2
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Restore OpenClaw dist files from latest telegram-footer backups.")
parser.add_argument("--dist", default="/usr/lib/node_modules/openclaw/dist", help="OpenClaw dist directory")
parser.add_argument("--dry-run", action="store_true", help="Preview only, do not write")
args = parser.parse_args()
dist = pathlib.Path(args.dist)
rc = preflight(dist, dry_run=args.dry_run)
if rc != 0:
return rc
files = iter_target_files(dist)
if not files:
print("[err] no target dist files found", file=sys.stderr)
return 2
changed = 0
for f in files:
if revert_file(f, args.dry_run):
changed += 1
print("[done] no files restored" if changed == 0 else f"[done] restored files: {changed}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/smoke_test_footer_patch.sh
#!/usr/bin/env bash
set -euo pipefail
DIST="-/usr/lib/node_modules/openclaw/dist"
PATCH_DIR="$(cd -- "$(dirname -- "BASH_SOURCE[0]")" && pwd)"
PATCH_SCRIPT="$PATCH_DIR/patch_reply_footer.py"
if ! command -v python3 >/dev/null 2>&1; then
echo "[err] python3 not found" >&2
exit 2
fi
if ! command -v node >/dev/null 2>&1; then
echo "[err] node not found" >&2
exit 2
fi
if [[ ! -d "$DIST" ]]; then
echo "[err] dist directory not found: $DIST" >&2
exit 2
fi
echo "== 1) target discovery =="
python3 "$PATCH_SCRIPT" --dist "$DIST" --dry-run --list-targets
echo
echo "== 2) dry-run auto-discover =="
python3 "$PATCH_SCRIPT" --dist "$DIST" --dry-run --auto-discover
echo
echo "== 3) apply auto-discover patch =="
python3 "$PATCH_SCRIPT" --dist "$DIST" --auto-discover
echo
echo "== 4) marker verification =="
python3 "$PATCH_SCRIPT" --dist "$DIST" --auto-discover --verify
echo
echo "== 5) syntax verification for patched candidate files =="
mapfile -t TARGET_LINES < <(python3 "$PATCH_SCRIPT" --dist "$DIST" --auto-discover --list-targets)
PATCHED_FILES=()
for line in "TARGET_LINES[@]"; do
file="line%% candidate=*"
[[ -f "$file" ]] || continue
case "$line" in
*"candidate=true"*"marker=true"*)
PATCHED_FILES+=("$file")
echo "[check] node --check $file"
node --check "$file"
;;
esac
done
echo
echo "== 6) sanity grep =="
for file in "PATCHED_FILES[@]"; do
if grep -qF 'formatTokens(' "$file"; then
echo "[err] patched file still references formatTokens: $file" >&2
exit 1
fi
done
echo "[ok] patched files do not reference formatTokens"
echo
echo "== done =="
echo "Patch + verify completed."
echo "Important: this only verifies candidate bundle patching + syntax."
echo "For live acceptance, restart the gateway and confirm a REAL Telegram private-chat reply actually shows the footer."
echo "If the real reply still has no footer, treat it as not fixed yet and keep tracing the live path."