@clawhub-lambdua-6dd99fea81
Zopia AI 视频创作技能 - 通过 Zopia 平台的 AI Agent 进行视频/图片创作。覆盖场景包括:AI 视频生成(文生视频、图生视频)、AI 图片生成(角色设定图、分镜关键帧)、剧本创作(对话/旁白/场景描述)、角色设计、分镜设计、多集连续剧制作。当用户提到 zopia、视频创作、短剧制作、分镜、...
---
name: zopia-skill
description: Zopia AI 视频创作技能 - 通过 Zopia 平台的 AI Agent 进行视频/图片创作。覆盖场景包括:AI 视频生成(文生视频、图生视频)、AI 图片生成(角色设定图、分镜关键帧)、剧本创作(对话/旁白/场景描述)、角色设计、分镜设计、多集连续剧制作。当用户提到 zopia、视频创作、短剧制作、分镜、角色设计、AI 视频生成时应触发。关键判断:只要用户的请求涉及通过 AI 进行系统化的视频创作流程(剧本→角色→分镜→视频),都必须触发此技能。
user-invocable: true
metadata:
{
"openclaw":
{
"emoji": "🎬",
"requires":
{
"bins": ["python3"],
"env": ["ZOPIA_ACCESS_KEY"]
},
"primaryEnv": "ZOPIA_ACCESS_KEY"
}
}
---
# Zopia AI 视频创作
Zopia 是一个项目制的 AI 视频创作平台。每个项目包含完整的创作流水线:**剧本 → 角色 → 分镜 → 视频**,由后端 AI Agent 自动驱动。你通过脚本管理项目、传达用户意图、追踪进度、获取成果。
## 环境配置
```bash
export ZOPIA_ACCESS_KEY="zopia-xxxxxxxxxxxx" # 必需,30天有效
export ZOPIA_BASE_URL="https://zopia.ai" # 可选
```
仅使用 Python 标准库,无需额外安装。
## 核心概念
| 概念 | 说明 |
|------|------|
| **Project (Base)** | 创作项目,包含设置、剧集、所有资产。创建时自动生成首集 |
| **Episode** | 剧集,同一项目下可创建多集,每集有独立的剧本/角色/分镜 |
| **Session** | 一次 Agent 对话。异步执行,通过轮询获取进展 |
| **Workspace** | 项目的实时工作区快照,包含角色(entities)、分镜(storyboard)、各媒体的生成状态 |
## 脚本速查
| 脚本 | 用途 | 关键参数 |
|------|------|---------|
| `create_project.py` | 创建项目 | `[名称]` |
| `save_settings.py` | 项目设置 | `--base-id` `--style` `--aspect-ratio` `--video-model` `--storyboard-image-model` `--entity-image-model` ... |
| `send_message.py` | 发送创作指令(异步) | `--base-id` `--episode-id` `消息` |
| `query_session.py` | 查询进展 | `SESSION_ID` `--poll` `--after-seq N` |
| `download_results.py` | 下载媒体资源 | `SESSION_ID` `--output-dir` `--type image\|video` |
| `get_balance.py` | 余额查询 | — |
| `list_projects.py` | 列出项目 | `--page` `--page-size` |
| `manage_episodes.py` | 剧集管理 | `list\|create\|delete` |
| `render_episode.py` | 合成最终视频 | `trigger\|status` `--base-id` `--episode-id` `--poll` |
## 项目设置参考
创建项目后,必须配置基础设置(locale / aspect_ratio / style)才能开始创作。
```bash
python3 {baseDir}/scripts/save_settings.py --base-id BASE_ID \
--locale zh-CN --aspect-ratio 16:9 --style realistic_3d_cg
```
### 风格
| ID | 说明 |
|----|------|
| `anime_japanese_korean` | 日韩动漫 |
| `realistic_3d_cg` | 3D CG 写实 🔥 |
| `pixar_3d_cartoon` | Pixar 3D 卡通 |
| `photorealistic_real_human` | 真人写实 |
| `3D_CG_Animation` | 3D CG 动画 🔥 |
| `anime_chibi` | Q版可爱 |
| `anime_shinkai` | 新海诚 |
| `anime_ghibli` | 吉卜力 |
| `stylized_pixel` | 像素艺术 |
别名支持:`realistic` → `realistic_3d_cg`,`ghibli` → `anime_ghibli`,`shinkai` → `anime_shinkai`,`pixel` → `stylized_pixel`
### 视频模型 × 生成方式
不同模型支持不同的生成方式(generation_method),不匹配会报错。
| 模型 ID | 名称 | 支持的方式 | 默认 |
|---------|------|-----------|------|
| `generate_video_by_seedance_20` | Seedance 2.0 Pro ⭐ | n_grid, video_ref, multi_ref, multi_ref_v2 | video_ref |
| `generate_video_by_seedance_20_fast` | Seedance 2.0 Fast | n_grid, video_ref, multi_ref, multi_ref_v2 | video_ref |
| `generate_video_by_kling_o3` | Kling O3 | start_frame, n_grid, multi_ref, multi_ref_v2 | n_grid |
| `generate_video_by_kling_v3_0` | Kling V3.0 | start_frame, n_grid | n_grid |
| `generate_video_by_pixverse_c1` | PixVerse C1 | start_frame, multi_ref | start_frame |
| `generate_video_by_hailuo_02` | Hailuo 2.3 | start_frame | start_frame |
| `generate_video_by_wan26_i2v` | Wan 2.6 | start_frame | start_frame |
| `generate_video_by_wan26_i2v_flash` | Wan 2.6 Flash | start_frame | start_frame |
| `generate_video_by_viduq2_pro` | Vidu Q2 Pro | start_frame | start_frame |
| `generate_video_by_viduq3_pro` | Vidu Q3 Pro | start_frame | start_frame |
| `generate_video_by_viduq3` | Vidu Q3 | n_grid, multi_ref, multi_ref_v2 | n_grid |
| `generate_video_by_seedance_15` | Seedance 1.5 Pro | start_frame | start_frame |
### 图片模型
分镜关键帧(`storyboard_image_model`)与角色/场景设定图(`entity_image_model`)使用独立的图片模型,可分别配置。
| 模型 ID | 名称 | 默认场景 |
|---------|------|---------|
| `generate_image_by_nano_banana_2` | Nano Banana 2 | storyboard 默认 |
| `generate_image_by_doubao_seedream_4` | Doubao Seedream 4 | entity 默认 |
| `generate_image_by_nano_banana` | Nano Banana | — |
| `generate_image_by_gpt_image_2` ⭐ | GPT Image 2 | — |
不传则后端使用默认值。传非法 ID 后端会返回 `invalid_storyboard_image_model` / `invalid_entity_image_model` 400 错误,并在响应的 `allowed_values` 字段给出当前可用列表。
### 其他设置
| 字段 | 可选值 |
|------|--------|
| `--aspect-ratio` | `16:9`, `9:16` |
| `--image-size` | `1k`, `2K`, `4K`(注意 1k 小写)|
| `--video-resolution` | `480p`, `720p`, `1080p` |
| `--generation-method` | `n_grid`, `multi_ref`, `multi_ref_v2`, `start_frame`, `video_ref` |
| `--storyboard-image-model` | 见上方"图片模型"表 |
| `--entity-image-model` | 见上方"图片模型"表 |
---
## 典型场景
理解这些场景,才能正确组合脚本完成用户需求。
### 场景 1:用户给出创作需求,从零开始(最常见)
```
1. get_balance.py → 确认余额 ≥ 10
2. create_project.py "赛博朋克短剧" → 拿到 baseId, episodeId
3. save_settings.py --base-id B \
--locale zh-CN --aspect-ratio 16:9 \
--style anime_japanese_korean → 配置项目
4. send_message.py --base-id B \
--episode-id E "用户的原始描述" → 拿到 session_id
5. query_session.py S --poll → 自动轮询直到完成
6. download_results.py S \
--output-dir ./赛博朋克短剧 \
--prefix storyboard → 自动下载到本地
```
生成完成后**自动执行下载**,不需要用户额外请求。下载目录和前缀根据任务语义自动命名(如分镜用 `storyboard`,角色设定用 `character`,最终视频用 `video` 等)。
**展示时机:** 生成过程中只告知进度("角色图生成中..."、"分镜关键帧 5/8 完成"),**不要提前给出项目链接**。全部完成后,同时给出:**本地文件列表** + **项目链接**(`{ZOPIA_BASE_URL}/base/{baseId}?session_id={sessionId}`,用户可在浏览器中查看和编辑完整项目)。优先使用脚本输出中的 `projectUrl` 字段。
### 场景 2:在已有会话中追加新需求(如"再改一下角色造型")
```
1. send_message.py --base-id B --episode-id E \
--session-id S "用户的新指令" → 复用已有会话
2. 轮询 → 下载 → 展示
```
使用同一个 `session_id` 可保持上下文连续。
### 场景 3:在已有项目中继续创作
```
1. list_projects.py → 让用户选择项目
2. manage_episodes.py list --base-id B → 查看剧集列表
3. send_message.py --base-id B \
--episode-id E "新的创作指令" → 新建会话
4. 轮询 → 下载 → 展示
```
### 场景 4:多集连续剧制作
一个项目(Project)可以包含多个剧集(Episode)。每集有独立的剧本、角色表、分镜表,但共享项目级设置(风格、画幅、模型)。
**创作流程:**
```
1. create_project.py "我的连续剧" → 拿到 baseId, episodeId (自动创建第一集)
2. save_settings.py --base-id B ... → 配置项目(所有剧集共享)
── 第一集 ──
3. send_message.py --base-id B \
--episode-id EP1 "第一集:主角进入废墟..." → 创作第一集
4. 轮询 → 下载
── 第二集 ──
5. manage_episodes.py create --base-id B → 拿到新 episodeId (EP2)
6. send_message.py --base-id B \
--episode-id EP2 "第二集:发现地下实验室..." → 创作第二集
7. 轮询 → 下载
── 更多剧集:重复步骤 5-7 ──
```
**多集注意事项:**
- 每集有独立的角色和分镜,不会互相干扰
- 如果后续剧集需要沿用前集角色形象,在消息中说明即可(如"延续第一集的角色设定"),后端 Agent 会处理
- 创建新剧集前,建议先确认当前剧集已完成(`status: "completed"`)
- 可以随时用 `manage_episodes.py list --base-id B` 查看所有剧集状态
- 删除剧集是不可逆操作,会清除该集所有内容
### 场景 5:将分镜视频合成为最终 MP4
所有分镜视频生成完毕后,可一键触发云端渲染,将所有片段按时间轴顺序合成为完整 MP4 文件。
```
1. render_episode.py trigger \
--base-id B --episode-id E → 拿到 render_id,渲染开始(异步)
2. render_episode.py status \
--base-id B --episode-id E \
--render-id RENDER_ID --poll → 自动轮询,完成后输出 video_url
```
**触发时机:** 用户明确要求「导出视频」「合成 MP4」「生成完整视频」时才触发。分镜视频生成阶段不要触发。
**渲染前提:** storyboard 中至少有一个分镜有 video_urls(即已完成视频生成),否则渲染内容为空。
**完成标志:** `status: "completed"` 且返回 `video_url`(S3 直链,可直接下载或分享)。
**轮询说明:** 渲染由 Remotion Lambda 执行,通常需要 1–5 分钟,`--poll` 参数每 8 秒检查一次进度(`progress` 字段 0→1),超时上限 10 分钟。
---
## 读懂 workspace 进度
`query_session.py` 返回的 `workspace` 是项目的实时快照,用来判断创作走到哪一步了:
```json
{
"status": "running",
"workspace": {
"entities": [{"name": "角色A", "images_status": "done", "image_urls": [...]}],
"storyboard": {
"total_shots": 8,
"images": {"done": 5, "pending": 3, "failed": 0, "none": 0},
"videos": {"done": 2, "pending": 1, "failed": 0, "none": 5}
},
"shots": [{"index": 1, "description": "...", "image_urls": [...], "video_urls": [...]}]
}
}
```
**怎么读:**
- `status: "running"` + workspace 空 → 刚开始,Agent 还在理解需求
- `entities` 出现,`images_status: "pending"` → 正在生成角色图
- `storyboard.images.pending > 0` → 正在生成分镜关键帧
- `storyboard.videos.pending > 0` → 正在生成视频片段
- `status: "completed"` → 全部完成,检查有无 failed 项
**轮询策略:**
- **间隔**:每 8 秒查询一次
- **增量拉取**:首次 `--after-seq 0`,后续传上次拿到的最大 seq 值
- **完成判断**:`status` 变为 `completed`(全部完成)或 `idle`
- **超时**:连续 3 分钟无新进展,告知用户「生成时间较长」并给出项目链接供自行查看,停止轮询
- **错误重试**:单次查询失败可重试 1 次;连续 3 次失败则停止并告知用户
- **自动轮询**:使用 `--poll` 参数可自动执行上述策略,无需手动循环
---
## 你的角色
Zopia 后端有完整的 AI 创作 Agent(对模型能力、prompt 工程、创作流程远比用户侧专业),你负责的是**项目管理和需求传达**。
**你要做的三件事:**
1. **配置** — 根据用户意图创建项目,选择合适的风格、模型、画幅
2. **传话** — 把用户的原始需求原封不动发给后端 Agent
3. **取件** — 追踪进度,在关键节点通知用户,完成后自动下载结果并展示
**不要做的事:**
- 不替用户扩写、润色、翻译创作描述(用户说"帮我推演分镜",就直接传这句话,不要自己先写个分镜表再逐条发)
- 不自行拆分任务(用户说"生成8个分镜图",发一条消息给后端,后端自己拆解)
- 不在消息中添加自己编的描述词(如"超写实风格,电影级光影,8K分辨率")
**正确:**
```
用户说:「帮我做一个赛博朋克风格的短剧,讲一个机器人在废墟中寻找最后一朵花」
→ create_project.py "赛博朋克短剧"
→ save_settings.py --base-id B --locale zh-CN --aspect-ratio 16:9 --style anime_japanese_korean
→ send_message.py --base-id B --episode-id E "帮我做一个赛博朋克风格的短剧,讲一个机器人在废墟中寻找最后一朵花"
→ 轮询 → 下载到 ./赛博朋克短剧/ → 展示文件列表 + 项目链接
```
**错误:**
```
❌ 先自己写了个详细的 5 场剧本和分镜描述
❌ 把自己编的内容逐条发给后端
❌ 在用户描述后面追加 "cinematic lighting, 8K, ultra detailed"
```
---
## 错误码速查
| 状态码 | 含义 | 处理 |
|--------|------|------|
| 400 | 参数缺失或设置不合法 | 检查必填字段和枚举值 |
| 401 | Token 无效或过期 | 提醒用户重新获取 |
| 402 | 余额不足 | 提醒充值 |
| 403 | 无权限 | 检查 baseId 归属 |
| 404 | 资源不存在 | 检查 ID 是否正确 |
| 409 | 会话执行中 | 等待当前会话完成再发新消息 |
FILE:CLAUDE.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## 项目定位
本仓库是 **Zopia AI 视频创作 Skill 包**,被 Claude Code / Gemini CLI / Codex / Cursor 等 Agent 通过 `npx skills add` 或 `npx clawhub install` 安装后调用。它本身不是后端服务,而是一组 Python 脚本 + 一份 `SKILL.md` 行为指引,封装对 Zopia 平台 HTTP API 的调用。
后端实现在姐妹仓库 `C:\code\jobCode\zipia\jaaz-cloud`(同 host 下的 `https://zopia.ai`),本仓库的脚本通过 HTTP 调用其接口。修改本项目时,若涉及接口字段、新模型 ID、新风格枚举等,对应能力须在 `jaaz-cloud` 已上线,否则脚本调用会报 400/404。
## 核心架构
```
SKILL.md ← 行为契约:Agent 什么时候触发、如何组合脚本、典型场景、错误处理
README.md ← 用户向:安装/配置/枚举值速查
scripts/_common.py ← 唯一的共享层:urllib HTTP + Bearer 鉴权 + 业务级封装函数
scripts/*.py ← 一脚本一动作,全是 argparse + _common 函数 + print_json 的薄壳
docs/ ← 维护者文档(如 publish-to-clawhub.md)
```
**只允许 Python 标准库**。`_common.py` 用 `urllib.request` 实现 HTTP,是有意为之——Skill 安装到用户机器后不能要求 `pip install`。新增脚本必须沿用这个约定,**不要引入 requests / httpx / pydantic 等第三方包**。
每个脚本的写法是固定的薄壳模式:
1. `sys.path.insert(0, os.path.dirname(__file__))` 后从 `_common` 导入
2. `argparse` 解析参数
3. 调用 `_common` 里的业务函数
4. `print_json(result)` 输出,由调用方(Agent)解析
新增 API 调用时,**先在 `_common.py` 加业务级封装函数**(参考 `create_project` / `send_message`),再写脚本壳。不要让脚本自己拼 URL 和 header。
## SKILL.md 是行为源
`SKILL.md` 是 Agent 的运行手册——它规定了 Agent 应该如何串联这些脚本(场景 1–5)、如何读 workspace 进度、什么时候该展示项目链接、不要替用户扩写需求等。**修改脚本参数或新增脚本时,必须同步更新 `SKILL.md` 的脚本速查表与场景示例**,否则 Agent 行为会与实际能力脱节。`README.md` 的枚举值表(风格、模型)也要同步。
## 常用命令
仓库无 lint、无测试、无构建。日常只有这些:
```bash
# 配置环境变量(脚本运行前提)
export ZOPIA_ACCESS_KEY="zopia-xxxxxxxxxxxx"
export ZOPIA_BASE_URL="https://zopia.ai" # 可选,默认即此
# 直接跑脚本调试
python3 scripts/get_balance.py
python3 scripts/create_project.py "测试项目"
python3 scripts/query_session.py SESSION_ID --poll
```
调试本地 `jaaz-cloud` 时,把 `ZOPIA_BASE_URL` 改到本地端口(如 `http://localhost:3000`)即可。
## 发布流程
发布到 ClawHub 技能市场,详细步骤见 `docs/publish-to-clawhub.md`:
```bash
git push # 先推 GitHub
npx clawhub publish . --slug zopia-skill --version x.y.z --changelog "..."
npx clawhub inspect zopia-skill # 验证 Latest 字段
```
版本号 semver:新增模型/功能 → minor +1;修 bug / 文档更新 → patch +1。
## 项目约定
- **Git commit message 用中文**,控制在 20 字以内(参考最近提交:`feat: 新增 Seedance 2.0 Fast 和 PixVerse C1 模型`)
- **不要 `git commit`**,除非用户明确要求
- 重构时不考虑兼容性,直接删除旧逻辑(除非用户明确要求保留)
- npm/npx 命令在 Windows 下用 cmd 执行
FILE:docs/publish-to-clawhub.md
# 发布 zopia-skill 到 ClawHub
## 前提条件
- Node.js 已安装(`npx` 可用)
- 已登录 ClawHub(见下文)
## 1. 登录
首次使用需登录:
```bash
npx clawhub login
```
验证登录状态:
```bash
npx clawhub whoami
# ✔ Lambdua
```
登录态会持久化,后续无需重复登录。
## 2. 修改内容并推送 GitHub
修改 `SKILL.md`、脚本等文件后,先 commit & push 到 GitHub:
```bash
git add .
git commit -m "描述本次变更"
git push
```
## 3. 发布新版本到 ClawHub
```bash
npx clawhub publish /path/to/zopia-skills \
--slug zopia-skill \
--version <新版本号> \
--changelog "本次变更说明"
```
**示例:**
```bash
npx clawhub publish /c/code/jobCode/zipia/zopia-skills \
--slug zopia-skill \
--version 1.0.3 \
--changelog "新增 xxx 模型"
```
版本号遵循 semver:
- 新增模型 / 功能 → 次版本号 +1(如 1.0.1 → 1.1.0)
- Bug 修复 / 文档更新 → 补丁号 +1(如 1.0.1 → 1.0.2)
## 4. 验证发布结果
```bash
npx clawhub inspect zopia-skill
```
确认 `Latest` 字段已更新为新版本号。
## 常用命令速查
| 命令 | 说明 |
|------|------|
| `npx clawhub whoami` | 查看当前登录用户 |
| `npx clawhub inspect zopia-skill` | 查看已发布版本信息 |
| `npx clawhub publish <path> --slug zopia-skill --version x.y.z` | 发布新版本 |
| `npx clawhub skill --help` | 管理已发布技能 |
FILE:README.md
# Zopia Skills
Zopia AI 视频创作技能 — 通过 [Zopia](https://zopia.ai) 平台的 AI Agent 进行视频/图片创作。
覆盖场景:AI 视频生成(文生视频、图生视频)、AI 图片生成(角色设定图、分镜关键帧)、剧本创作、角色设计、分镜设计、多集连续剧制作。
## 安装
### 通过 npx 安装(推荐)
```bash
npx skills add 11cafe/zopia-skills
```
> [`skills`](https://github.com/vercel-labs/skills) 是 Vercel Labs 开发的跨平台技能安装 CLI,支持 Claude Code、Gemini CLI、Codex、Cursor 等 40+ 个 Agent。
安装到指定 Agent:
```bash
npx skills add 11cafe/zopia-skills -a claude-code
```
### 通过 OpenClaw 安装
在 [OpenClaw](https://openclaw.ai) 技能市场搜索 `zopia-skill`,或使用命令行:
```bash
npx clawhub install zopia-skill
```
### 手动安装
```bash
git clone https://github.com/11cafe/zopia-skills.git
```
将 `SKILL.md` 和 `scripts/` 目录复制到对应的技能目录:
| 范围 | 路径 |
|------|------|
| 个人全局 | `~/.claude/skills/zopia-skill/` |
| 项目级别 | `.claude/skills/zopia-skill/` |
## 配置
使用前需设置环境变量:
```bash
export ZOPIA_ACCESS_KEY="zopia-xxxxxxxxxxxx" # 必需,30天有效
export ZOPIA_BASE_URL="https://zopia.ai" # 可选,默认值即可
```
仅依赖 Python 标准库,无需安装第三方包。需要 `python3` 可用。
## 使用
安装技能后,在 Claude Code 中直接描述你的创作需求即可:
```
帮我做一个赛博朋克风格的短剧,讲一个机器人在废墟中寻找最后一朵花
```
技能会自动完成:创建项目 → 配置设置 → 发送创作指令 → 轮询进度 → 下载结果。
### 支持的风格
| ID | 说明 |
|----|------|
| `anime_japanese_korean` | 日韩动漫 |
| `realistic_3d_cg` | 3D CG 写实 |
| `pixar_3d_cartoon` | Pixar 3D 卡通 |
| `photorealistic_real_human` | 真人写实 |
| `3D_CG_Animation` | 3D CG 动画 |
| `anime_chibi` | Q版可爱 |
| `anime_shinkai` | 新海诚 |
| `anime_ghibli` | 吉卜力 |
| `stylized_pixel` | 像素艺术 |
### 支持的图片模型
分镜图(storyboard)与角色/场景图(entity)可分别配置,两者共享同一组可选值:
| 模型 | 名称 |
|------|------|
| `generate_image_by_nano_banana_2` | Nano Banana 2(storyboard 默认)|
| `generate_image_by_doubao_seedream_4` | Doubao Seedream 4(entity 默认)|
| `generate_image_by_nano_banana` | Nano Banana |
| `generate_image_by_gpt_image_2` | GPT Image 2 |
### 支持的视频模型
| 模型 | 名称 |
|------|------|
| `generate_video_by_kling_o3` | Kling O3 |
| `generate_video_by_kling_v3_0` | Kling V3.0 |
| `generate_video_by_hailuo_02` | Hailuo 2.3 |
| `generate_video_by_wan26_i2v` | Wan 2.6 |
| `generate_video_by_wan26_i2v_flash` | Wan 2.6 Flash |
| `generate_video_by_viduq2_pro` | Vidu Q2 Pro |
| `generate_video_by_viduq3_pro` | Vidu Q3 Pro |
| `generate_video_by_viduq3` | Vidu Q3 |
| `generate_video_by_seedance_15` | Seedance 1.5 Pro |
### 脚本列表
| 脚本 | 用途 |
|------|------|
| `create_project.py` | 创建项目 |
| `save_settings.py` | 配置项目设置 |
| `send_message.py` | 发送创作指令 |
| `query_session.py` | 查询创作进度 |
| `download_results.py` | 下载媒体资源 |
| `get_balance.py` | 查询余额 |
| `list_projects.py` | 列出所有项目 |
| `manage_episodes.py` | 管理剧集 |
| `render_episode.py` | 合成最终视频(MP4) |
## License
[MIT](LICENSE)
FILE:scripts/create_project.py
#!/usr/bin/env python3
"""创建 Zopia 项目。
用法:
python create_project.py [项目名称]
返回:
{baseId, baseName, episodeId, projectUrl}
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import build_project_url, create_project, print_json
def main() -> None:
parser = argparse.ArgumentParser(description="创建 Zopia 项目")
parser.add_argument("name", nargs="?", default=None, help="项目名称(可选)")
args = parser.parse_args()
result = create_project(args.name)
base_id = result.get("baseId", "")
result["projectUrl"] = build_project_url(base_id)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/download_results.py
#!/usr/bin/env python3
"""批量下载 Zopia 会话中生成的媒体资源。
用法:
# 从会话结果中下载所有媒体
python download_results.py SESSION_ID
# 指定输出目录和前缀
python download_results.py SESSION_ID --output-dir ./results --prefix storyboard
# 仅下载图片或视频
python download_results.py SESSION_ID --type image
python download_results.py SESSION_ID --type video
"""
from __future__ import annotations
import argparse
import os
import re
import sys
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from _common import print_json, query_session
# 支持的文件扩展名
IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp"}
VIDEO_EXTS = {".mp4", ".mov", ".webm"}
MAX_FILE_SIZE = 200 * 1024 * 1024 # 200MB
def extract_urls(result: dict) -> list[dict[str, str]]:
"""从会话结果中提取所有媒体 URL。"""
urls: list[dict[str, str]] = []
seen: set[str] = set()
# 从 workspace 中提取
workspace = result.get("workspace", {})
# 实体图片
for entity in workspace.get("entities", []):
for url in entity.get("image_urls", []):
if url and url not in seen:
seen.add(url)
urls.append({"url": url, "type": "image", "source": f"entity:{entity.get('name', '')}"})
# 分镜图片和视频
for shot in workspace.get("shots", []):
for img in shot.get("image_urls", []):
if img and img not in seen:
seen.add(img)
urls.append({"url": img, "type": "image", "source": f"shot:{shot.get('index', '')}"})
for vid in shot.get("video_urls", []):
if vid and vid not in seen:
seen.add(vid)
urls.append({"url": vid, "type": "video", "source": f"shot:{shot.get('index', '')}"})
# 从消息文本中正则提取 URL(兜底)
for msg in result.get("messages", []):
content = msg.get("content", "")
if isinstance(content, str):
for match in re.finditer(r'https?://[^\s"\'<>]+\.(?:png|jpg|jpeg|webp|mp4|mov|webm)', content):
url = match.group(0)
if url not in seen:
seen.add(url)
ext = Path(url.split("?")[0]).suffix.lower()
media_type = "video" if ext in VIDEO_EXTS else "image"
urls.append({"url": url, "type": media_type, "source": "message"})
return urls
def download_file(url: str, output_path: str) -> bool:
"""下载单个文件,返回是否成功。"""
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=60) as resp:
content_length = resp.headers.get("Content-Length")
if content_length and int(content_length) > MAX_FILE_SIZE:
print(f"跳过(文件过大): {url}", file=sys.stderr)
return False
with open(output_path, "wb") as f:
while True:
chunk = resp.read(8192)
if not chunk:
break
f.write(chunk)
return True
except Exception as exc:
print(f"下载失败 {url}: {exc}", file=sys.stderr)
return False
def main() -> None:
parser = argparse.ArgumentParser(description="批量下载 Zopia 会话中的媒体资源")
parser.add_argument("session_id", help="会话 ID")
parser.add_argument("--output-dir", default=".", help="输出目录(默认当前目录)")
parser.add_argument("--prefix", default="", help="文件名前缀")
parser.add_argument("--type", choices=["image", "video"], default=None,
help="仅下载指定类型")
parser.add_argument("--workers", type=int, default=5, help="并发下载数")
args = parser.parse_args()
# 获取会话结果
result = query_session(args.session_id)
media_urls = extract_urls(result)
# 按类型过滤
if args.type:
media_urls = [m for m in media_urls if m["type"] == args.type]
if not media_urls:
print("没有找到可下载的媒体资源")
return
# 创建输出目录
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 构建下载任务
tasks: list[tuple[str, str]] = []
for i, media in enumerate(media_urls, 1):
url = media["url"]
ext = Path(url.split("?")[0]).suffix.lower() or ".png"
prefix = f"{args.prefix}_" if args.prefix else ""
filename = f"{prefix}{media['type']}_{i:02d}{ext}"
output_path = str(output_dir / filename)
tasks.append((url, output_path))
# 并发下载
success_count = 0
downloaded: list[dict[str, str]] = []
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(download_file, url, path): (url, path, media_urls[i])
for i, (url, path) in enumerate(tasks)}
for future in futures:
url, path, media_info = futures[future]
if future.result():
success_count += 1
downloaded.append({
"url": url,
"path": path,
"type": media_info["type"],
"source": media_info["source"],
})
print_json({
"total": len(tasks),
"downloaded": success_count,
"failed": len(tasks) - success_count,
"files": downloaded,
})
if __name__ == "__main__":
main()
FILE:scripts/get_balance.py
#!/usr/bin/env python3
"""查询 Zopia 账户余额。
用法:
python get_balance.py
返回:
{accounts: [...], summary: {totalBalance, totalHeld, totalAvailable}}
"""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import get_balance, print_json
def main() -> None:
result = get_balance()
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/list_projects.py
#!/usr/bin/env python3
"""列出 Zopia 项目。
用法:
python list_projects.py
python list_projects.py --page 2 --page-size 20
返回:
{data: [...], page, pageSize, hasMore}
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import build_project_url, list_projects, print_json
def main() -> None:
parser = argparse.ArgumentParser(description="列出 Zopia 项目")
parser.add_argument("--page", type=int, default=1, help="页码(默认 1)")
parser.add_argument("--page-size", type=int, default=12, help="每页数量(默认 12,最大 50)")
args = parser.parse_args()
result = list_projects(args.page, args.page_size)
for item in result.get("data", []):
item["projectUrl"] = build_project_url(item.get("id", ""))
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/manage_episodes.py
#!/usr/bin/env python3
"""管理 Zopia 项目的剧集。
用法:
# 列出剧集
python manage_episodes.py list --base-id BASE_ID
# 创建新剧集
python manage_episodes.py create --base-id BASE_ID
# 删除剧集
python manage_episodes.py delete --episode-id EPISODE_ID
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import create_episode, delete_episode, list_episodes, print_json
def main() -> None:
parser = argparse.ArgumentParser(description="管理 Zopia 项目的剧集")
subparsers = parser.add_subparsers(dest="action", required=True)
# list
list_parser = subparsers.add_parser("list", help="列出剧集")
list_parser.add_argument("--base-id", required=True, help="项目 ID")
# create
create_parser = subparsers.add_parser("create", help="创建新剧集")
create_parser.add_argument("--base-id", required=True, help="项目 ID")
# delete
delete_parser = subparsers.add_parser("delete", help="删除剧集")
delete_parser.add_argument("--episode-id", required=True, help="剧集 ID")
args = parser.parse_args()
if args.action == "list":
result = list_episodes(args.base_id)
print_json(result)
elif args.action == "create":
result = create_episode(args.base_id)
print_json(result)
elif args.action == "delete":
result = delete_episode(args.episode_id)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/query_session.py
#!/usr/bin/env python3
"""轮询 Zopia 会话结果。
用法:
# 查询完整结果
python query_session.py SESSION_ID
# 增量查询(仅获取 seq > 5 的新消息)
python query_session.py SESSION_ID --after-seq 5
# 自动轮询直到完成
python query_session.py SESSION_ID --poll
返回结构化结果:
{
"status": "completed" | "running" | "idle",
"messages": [...],
"workspace": {
"entities": [...],
"storyboard": {...}
}
}
"""
from __future__ import annotations
import argparse
import os
import sys
import time
sys.path.insert(0, os.path.dirname(__file__))
from _common import print_json, query_session
# 轮询参数
POLL_INTERVAL = 8 # 秒
POLL_TIMEOUT = 180 # 最长轮询时间(秒)
MAX_CONSECUTIVE_FAIL = 3
def main() -> None:
parser = argparse.ArgumentParser(description="轮询 Zopia 会话结果")
parser.add_argument("session_id", help="会话 ID")
parser.add_argument("--after-seq", type=int, default=0,
help="仅获取 seq 大于此值的消息")
parser.add_argument("--poll", action="store_true",
help="自动轮询直到会话完成")
args = parser.parse_args()
if not args.poll:
result = query_session(args.session_id, args.after_seq)
print_json(result)
return
# 自动轮询模式
after_seq = args.after_seq
start_time = time.time()
consecutive_fails = 0
while True:
elapsed = time.time() - start_time
if elapsed > POLL_TIMEOUT:
print(f"轮询超时({POLL_TIMEOUT}秒)", file=sys.stderr)
sys.exit(1)
try:
result = query_session(args.session_id, after_seq)
consecutive_fails = 0
except SystemExit:
consecutive_fails += 1
if consecutive_fails >= MAX_CONSECUTIVE_FAIL:
print(f"连续失败 {MAX_CONSECUTIVE_FAIL} 次,停止轮询", file=sys.stderr)
sys.exit(1)
time.sleep(POLL_INTERVAL)
continue
status = result.get("status", "")
messages = result.get("messages", [])
# 更新增量游标
if messages:
max_seq = max(m.get("seq", 0) for m in messages)
if max_seq > after_seq:
after_seq = max_seq
# 输出当前状态
print_json(result)
if status in ("completed", "idle", "error"):
break
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()
FILE:scripts/render_episode.py
#!/usr/bin/env python3
"""触发并查询 Zopia episode 视频合成渲染。
用法:
# 触发渲染(异步,立即返回 render_id)
python render_episode.py trigger --base-id BASE_ID --episode-id EPISODE_ID
# 触发渲染并添加水印
python render_episode.py trigger --base-id BASE_ID --episode-id EPISODE_ID --watermark
# 查询最新渲染状态
python render_episode.py status --base-id BASE_ID --episode-id EPISODE_ID
# 查询指定渲染状态
python render_episode.py status --base-id BASE_ID --episode-id EPISODE_ID --render-id RENDER_ID
# 自动轮询直到渲染完成
python render_episode.py status --base-id BASE_ID --episode-id EPISODE_ID --render-id RENDER_ID --poll
返回结构:
trigger: {"render_id": "...", "status": "processing"}
status: {"status": "not_started" | "processing" | "completed" | "failed",
"render_id": "...", "progress": 0.0~1.0, "video_url": "..."}
"""
from __future__ import annotations
import argparse
import os
import sys
import time
sys.path.insert(0, os.path.dirname(__file__))
from _common import get_render_status, print_json, trigger_render
POLL_INTERVAL = 8 # 秒
POLL_TIMEOUT = 600 # 最长轮询时间(秒),渲染比 Agent 慢,给 10 分钟
def main() -> None:
parser = argparse.ArgumentParser(description="Zopia episode 视频渲染")
subparsers = parser.add_subparsers(dest="action", required=True)
# trigger
t = subparsers.add_parser("trigger", help="触发渲染(异步)")
t.add_argument("--base-id", required=True, help="项目 ID")
t.add_argument("--episode-id", required=True, help="剧集 ID")
t.add_argument("--watermark", action="store_true", help="添加水印(默认不加)")
# status
s = subparsers.add_parser("status", help="查询渲染状态")
s.add_argument("--base-id", required=True, help="项目 ID")
s.add_argument("--episode-id", required=True, help="剧集 ID")
s.add_argument("--render-id", default=None, help="渲染 ID(省略则查最新)")
s.add_argument("--poll", action="store_true", help="自动轮询直到完成")
args = parser.parse_args()
if args.action == "trigger":
result = trigger_render(args.base_id, args.episode_id, args.watermark)
print_json(result)
return
# status
if not args.poll:
result = get_render_status(args.base_id, args.episode_id, args.render_id)
print_json(result)
return
# 自动轮询模式
render_id = args.render_id
start_time = time.time()
while True:
if time.time() - start_time > POLL_TIMEOUT:
print(f"渲染轮询超时({POLL_TIMEOUT}秒)", file=sys.stderr)
sys.exit(1)
result = get_render_status(args.base_id, args.episode_id, render_id)
print_json(result)
status = result.get("status", "")
# 补全 render_id(首次查到后固定住)
if not render_id and result.get("render_id"):
render_id = result["render_id"]
if status == "completed":
break
if status == "failed":
print("渲染失败", file=sys.stderr)
sys.exit(1)
if status == "not_started":
print("尚未触发渲染,请先执行 trigger", file=sys.stderr)
sys.exit(1)
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()
FILE:scripts/save_settings.py
#!/usr/bin/env python3
"""保存或查询 Zopia 项目设置。
用法:
# 查询设置
python save_settings.py --base-id BASE_ID --get
# 保存设置
python save_settings.py --base-id BASE_ID --locale zh-CN --aspect-ratio 16:9 --style anime
支持的设置字段:
--locale 语言 (zh-CN, en, ja)
--aspect-ratio 画面比例 (16:9, 9:16)
--style 视觉风格
--video-model 视频模型
--generation-method 生成方式
--image-size 图片尺寸
--video-resolution 视频分辨率
--storyboard-image-model 分镜图模型
--entity-image-model 角色场景图模型
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import get_settings, print_json, save_settings
def main() -> None:
parser = argparse.ArgumentParser(description="保存或查询 Zopia 项目设置")
parser.add_argument("--base-id", required=True, help="项目 ID")
parser.add_argument("--get", action="store_true", help="查询当前设置")
parser.add_argument("--locale", help="语言 (zh-CN, en, ja)")
parser.add_argument("--aspect-ratio", help="画面比例 (16:9, 9:16)")
parser.add_argument("--style", help="视觉风格")
parser.add_argument("--video-model", help="视频模型")
parser.add_argument("--generation-method", help="生成方式")
parser.add_argument("--image-size", help="图片尺寸")
parser.add_argument("--video-resolution", help="视频分辨率")
parser.add_argument("--storyboard-image-model", help="分镜图模型")
parser.add_argument("--entity-image-model", help="角色场景图模型")
args = parser.parse_args()
if args.get:
result = get_settings(args.base_id)
print_json(result)
return
settings: dict[str, str] = {}
field_map = {
"locale": args.locale,
"aspect_ratio": args.aspect_ratio,
"style": args.style,
"video_model": args.video_model,
"generation_method": args.generation_method,
"image_size": args.image_size,
"video_resolution": args.video_resolution,
"storyboard_image_model": args.storyboard_image_model,
"entity_image_model": args.entity_image_model,
}
for key, value in field_map.items():
if value is not None:
settings[key] = value
if not settings:
print("错误: 至少需要指定一个设置字段", file=sys.stderr)
sys.exit(1)
result = save_settings(args.base_id, settings)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/send_message.py
#!/usr/bin/env python3
"""向 Zopia Agent 异步发送消息。
用法:
python send_message.py --base-id BASE_ID --episode-id EP_ID "生成一个赛博朋克风格的视频"
python send_message.py --base-id BASE_ID --episode-id EP_ID --session-id SESS_ID "继续生成下一个镜头"
返回:
{session_id, base_id, ...}
注意:
此接口为异步模式,返回 session_id 后需使用 query_session.py 轮询结果。
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import build_project_url, print_json, send_message
def main() -> None:
parser = argparse.ArgumentParser(description="向 Zopia Agent 异步发送消息")
parser.add_argument("message", help="发送给 Agent 的消息内容")
parser.add_argument("--base-id", required=True, help="项目 ID")
parser.add_argument("--episode-id", required=True, help="剧集 ID")
parser.add_argument("--session-id", default=None, help="会话 ID(可选,续接已有会话)")
args = parser.parse_args()
result = send_message(
base_id=args.base_id,
episode_id=args.episode_id,
message=args.message,
session_id=args.session_id,
)
sid = result.get("session_id", "")
result["projectUrl"] = build_project_url(args.base_id, sid)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/_common.py
"""Zopia Skill 共享模块 — 封装 HTTP 请求、认证、错误处理。
仅依赖 Python 标准库。
环境变量:
ZOPIA_ACCESS_KEY (必需) Bearer token,格式 zopia-xxxxxxxxxxxx
ZOPIA_BASE_URL (可选) API 基础地址,默认 https://zopia.ai
"""
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
# ---------------------------------------------------------------------------
# 配置
# ---------------------------------------------------------------------------
def _get_access_key() -> str:
key = os.environ.get("ZOPIA_ACCESS_KEY", "").strip()
if not key:
print("错误: 环境变量 ZOPIA_ACCESS_KEY 未设置", file=sys.stderr)
sys.exit(1)
return key
def _get_base_url() -> str:
return os.environ.get("ZOPIA_BASE_URL", "https://zopia.ai").rstrip("/")
# ---------------------------------------------------------------------------
# 底层 HTTP 工具
# ---------------------------------------------------------------------------
def _build_headers() -> dict[str, str]:
return {
"Authorization": f"Bearer {_get_access_key()}",
"Content-Type": "application/json",
}
def api_get(path: str, params: dict[str, str] | None = None) -> Any:
"""发送 GET 请求,返回解析后的 JSON。"""
url = f"{_get_base_url()}{path}"
if params:
url = f"{url}?{urllib.parse.urlencode(params)}"
req = urllib.request.Request(url, headers=_build_headers(), method="GET")
return _do_request(req)
def api_post(path: str, body: dict[str, Any] | None = None) -> Any:
"""发送 POST 请求,返回解析后的 JSON。"""
url = f"{_get_base_url()}{path}"
data = json.dumps(body or {}).encode("utf-8")
req = urllib.request.Request(url, data=data, headers=_build_headers(), method="POST")
return _do_request(req)
def api_delete(path: str) -> Any:
"""发送 DELETE 请求,返回解析后的 JSON。"""
url = f"{_get_base_url()}{path}"
req = urllib.request.Request(url, headers=_build_headers(), method="DELETE")
return _do_request(req)
def _do_request(req: urllib.request.Request) -> Any:
"""执行请求,统一处理错误。"""
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8")
if not raw:
return {}
return json.loads(raw)
except urllib.error.HTTPError as exc:
body = ""
try:
body = exc.read().decode("utf-8", errors="replace")
except Exception:
pass
print(f"HTTP {exc.code} 错误: {body}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as exc:
print(f"网络错误: {exc.reason}", file=sys.stderr)
sys.exit(1)
# ---------------------------------------------------------------------------
# 业务级封装
# ---------------------------------------------------------------------------
def create_project(base_name: str | None = None) -> dict[str, Any]:
"""创建项目,返回 {baseId, baseName, episodeId}。"""
body: dict[str, Any] = {}
if base_name:
body["baseName"] = base_name
resp = api_post("/api/base/create", body)
return resp.get("data", resp)
def save_settings(base_id: str, settings: dict[str, Any]) -> dict[str, Any]:
"""保存项目设置,返回合并后的设置。"""
resp = api_post("/api/base/settings", {"base_id": base_id, "settings": settings})
return resp
def get_settings(base_id: str) -> dict[str, Any]:
"""获取项目设置。"""
resp = api_get("/api/base/settings", {"base_id": base_id})
return resp
def send_message(base_id: str, episode_id: str, message: str,
session_id: str | None = None) -> dict[str, Any]:
"""异步发送消息,返回 {session_id, ...}。"""
body: dict[str, Any] = {
"base_id": base_id,
"episode_id": episode_id,
"message": message,
}
if session_id:
body["session_id"] = session_id
return api_post("/api/v1/agent/chat/async", body)
def query_session(session_id: str, after_seq: int = 0) -> dict[str, Any]:
"""增量查询会话消息,返回结构化结果。"""
params: dict[str, str] = {}
if after_seq > 0:
params["afterSeq"] = str(after_seq)
return api_get(f"/api/v1/agent/session/{session_id}/messages", params)
def list_projects(page: int = 1, page_size: int = 12) -> dict[str, Any]:
"""获取项目列表。"""
return api_get("/api/base/list", {"page": str(page), "pageSize": str(page_size)})
def get_project_detail(base_id: str, episode_id: str) -> dict[str, Any]:
"""获取项目详情。"""
return api_get(f"/api/base/{base_id}", {"episode_id": episode_id})
def create_episode(base_id: str) -> dict[str, Any]:
"""创建新剧集。"""
return api_post(f"/api/episode/create?base_id={base_id}")
def list_episodes(base_id: str) -> dict[str, Any]:
"""列出项目的所有剧集。"""
return api_get("/api/episode/list", {"base_id": base_id})
def delete_episode(episode_id: str) -> dict[str, Any]:
"""删除剧集。"""
return api_delete(f"/api/episode/{episode_id}")
def get_balance() -> dict[str, Any]:
"""查询余额。"""
return api_get("/api/billing/getBalance")
def trigger_render(base_id: str, episode_id: str, show_watermark: bool = False) -> dict[str, Any]:
"""触发 episode 视频合成渲染(异步),返回 {render_id, status}。"""
return api_post(
f"/api/v1/base/{base_id}/episode/{episode_id}/render",
{"show_watermark": show_watermark},
)
def get_render_status(base_id: str, episode_id: str, render_id: str | None = None) -> dict[str, Any]:
"""查询渲染状态,返回 {status, render_id?, progress?, video_url?, error?}。"""
params: dict[str, str] = {}
if render_id:
params["render_id"] = render_id
return api_get(f"/api/v1/base/{base_id}/episode/{episode_id}/render", params or None)
def build_project_url(base_id: str, session_id: str | None = None) -> str:
"""构造项目的 Web 访问 URL。"""
url = f"{_get_base_url()}/base/{base_id}"
if session_id:
url = f"{url}?session_id={session_id}"
return url
# ---------------------------------------------------------------------------
# 输出工具
# ---------------------------------------------------------------------------
def print_json(data: Any) -> None:
"""以格式化的 JSON 输出到 stdout。"""
print(json.dumps(data, ensure_ascii=False, indent=2))
Zopia AI 视频创作技能 - 通过 Zopia 平台的 AI Agent 进行视频/图片创作。覆盖场景包括:AI 视频生成(文生视频、图生视频)、AI 图片生成(角色设定图、分镜关键帧)、剧本创作(对话/旁白/场景描述)、角色设计、分镜设计、多集连续剧制作。当用户提到 zopia、视频创作、短剧制作、分镜、...
---
name: zopia-skill
description: Zopia AI 视频创作技能 - 通过 Zopia 平台的 AI Agent 进行视频/图片创作。覆盖场景包括:AI 视频生成(文生视频、图生视频)、AI 图片生成(角色设定图、分镜关键帧)、剧本创作(对话/旁白/场景描述)、角色设计、分镜设计、多集连续剧制作。当用户提到 zopia、视频创作、短剧制作、分镜、角色设计、AI 视频生成时应触发。关键判断:只要用户的请求涉及通过 AI 进行系统化的视频创作流程(剧本→角色→分镜→视频),都必须触发此技能。
user-invocable: true
metadata:
{
"openclaw":
{
"emoji": "🎬",
"requires":
{
"bins": ["python3"],
"env": ["ZOPIA_ACCESS_KEY"]
},
"primaryEnv": "ZOPIA_ACCESS_KEY"
}
}
---
# Zopia AI 视频创作
Zopia 是一个项目制的 AI 视频创作平台。每个项目包含完整的创作流水线:**剧本 → 角色 → 分镜 → 视频**,由后端 AI Agent 自动驱动。你通过脚本管理项目、传达用户意图、追踪进度、获取成果。
## 环境配置
```bash
export ZOPIA_ACCESS_KEY="zopia-xxxxxxxxxxxx" # 必需,30天有效
export ZOPIA_BASE_URL="https://zopia.ai" # 可选
```
仅使用 Python 标准库,无需额外安装。
## 核心概念
| 概念 | 说明 |
|------|------|
| **Project (Base)** | 创作项目,包含设置、剧集、所有资产。创建时自动生成首集 |
| **Episode** | 剧集,同一项目下可创建多集,每集有独立的剧本/角色/分镜 |
| **Session** | 一次 Agent 对话。异步执行,通过轮询获取进展 |
| **Workspace** | 项目的实时工作区快照,包含角色(entities)、分镜(storyboard)、各媒体的生成状态 |
## 脚本速查
| 脚本 | 用途 | 关键参数 |
|------|------|---------|
| `create_project.py` | 创建项目 | `[名称]` |
| `save_settings.py` | 项目设置 | `--base-id` `--style` `--aspect-ratio` `--video-model` `--storyboard-image-model` `--entity-image-model` ... |
| `send_message.py` | 发送创作指令(异步) | `--base-id` `--episode-id` `消息` |
| `query_session.py` | 查询进展 | `SESSION_ID` `--poll` `--after-seq N` |
| `download_results.py` | 下载媒体资源 | `SESSION_ID` `--output-dir` `--type image\|video` |
| `get_balance.py` | 余额查询 | — |
| `list_projects.py` | 列出项目 | `--page` `--page-size` |
| `manage_episodes.py` | 剧集管理 | `list\|create\|delete` |
| `render_episode.py` | 合成最终视频 | `trigger\|status` `--base-id` `--episode-id` `--poll` |
## 项目设置参考
创建项目后,必须配置基础设置(locale / aspect_ratio / style)才能开始创作。
```bash
python3 {baseDir}/scripts/save_settings.py --base-id BASE_ID \
--locale zh-CN --aspect-ratio 16:9 --style realistic_3d_cg
```
### 风格
| ID | 说明 |
|----|------|
| `anime_japanese_korean` | 日韩动漫 |
| `realistic_3d_cg` | 3D CG 写实 🔥 |
| `pixar_3d_cartoon` | Pixar 3D 卡通 |
| `photorealistic_real_human` | 真人写实 |
| `3D_CG_Animation` | 3D CG 动画 🔥 |
| `anime_chibi` | Q版可爱 |
| `anime_shinkai` | 新海诚 |
| `anime_ghibli` | 吉卜力 |
| `stylized_pixel` | 像素艺术 |
别名支持:`realistic` → `realistic_3d_cg`,`ghibli` → `anime_ghibli`,`shinkai` → `anime_shinkai`,`pixel` → `stylized_pixel`
### 视频模型 × 生成方式
不同模型支持不同的生成方式(generation_method),不匹配会报错。
| 模型 ID | 名称 | 支持的方式 | 默认 |
|---------|------|-----------|------|
| `generate_video_by_seedance_20` | Seedance 2.0 Pro ⭐ | n_grid, video_ref, multi_ref, multi_ref_v2 | video_ref |
| `generate_video_by_seedance_20_fast` | Seedance 2.0 Fast | n_grid, video_ref, multi_ref, multi_ref_v2 | video_ref |
| `generate_video_by_kling_o3` | Kling O3 | start_frame, n_grid, multi_ref, multi_ref_v2 | n_grid |
| `generate_video_by_kling_v3_0` | Kling V3.0 | start_frame, n_grid | n_grid |
| `generate_video_by_pixverse_c1` | PixVerse C1 | start_frame, multi_ref | start_frame |
| `generate_video_by_hailuo_02` | Hailuo 2.3 | start_frame | start_frame |
| `generate_video_by_wan26_i2v` | Wan 2.6 | start_frame | start_frame |
| `generate_video_by_wan26_i2v_flash` | Wan 2.6 Flash | start_frame | start_frame |
| `generate_video_by_viduq2_pro` | Vidu Q2 Pro | start_frame | start_frame |
| `generate_video_by_viduq3_pro` | Vidu Q3 Pro | start_frame | start_frame |
| `generate_video_by_viduq3` | Vidu Q3 | n_grid, multi_ref, multi_ref_v2 | n_grid |
| `generate_video_by_seedance_15` | Seedance 1.5 Pro | start_frame | start_frame |
### 图片模型
分镜关键帧(`storyboard_image_model`)与角色/场景设定图(`entity_image_model`)使用独立的图片模型,可分别配置。
| 模型 ID | 名称 | 默认场景 |
|---------|------|---------|
| `generate_image_by_nano_banana_2` | Nano Banana 2 | storyboard 默认 |
| `generate_image_by_doubao_seedream_4` | Doubao Seedream 4 | entity 默认 |
| `generate_image_by_nano_banana` | Nano Banana | — |
| `generate_image_by_gpt_image_2` ⭐ | GPT Image 2 | — |
不传则后端使用默认值。传非法 ID 后端会返回 `invalid_storyboard_image_model` / `invalid_entity_image_model` 400 错误,并在响应的 `allowed_values` 字段给出当前可用列表。
### 其他设置
| 字段 | 可选值 |
|------|--------|
| `--aspect-ratio` | `16:9`, `9:16` |
| `--image-size` | `1k`, `2K`, `4K`(注意 1k 小写)|
| `--video-resolution` | `480p`, `720p`, `1080p` |
| `--generation-method` | `n_grid`, `multi_ref`, `multi_ref_v2`, `start_frame`, `video_ref` |
| `--storyboard-image-model` | 见上方"图片模型"表 |
| `--entity-image-model` | 见上方"图片模型"表 |
---
## 典型场景
理解这些场景,才能正确组合脚本完成用户需求。
### 场景 1:用户给出创作需求,从零开始(最常见)
```
1. get_balance.py → 确认余额 ≥ 10
2. create_project.py "赛博朋克短剧" → 拿到 baseId, episodeId
3. save_settings.py --base-id B \
--locale zh-CN --aspect-ratio 16:9 \
--style anime_japanese_korean → 配置项目
4. send_message.py --base-id B \
--episode-id E "用户的原始描述" → 拿到 session_id
5. query_session.py S --poll → 自动轮询直到完成
6. download_results.py S \
--output-dir ./赛博朋克短剧 \
--prefix storyboard → 自动下载到本地
```
生成完成后**自动执行下载**,不需要用户额外请求。下载目录和前缀根据任务语义自动命名(如分镜用 `storyboard`,角色设定用 `character`,最终视频用 `video` 等)。
**展示时机:** 生成过程中只告知进度("角色图生成中..."、"分镜关键帧 5/8 完成"),**不要提前给出项目链接**。全部完成后,同时给出:**本地文件列表** + **项目链接**(`{ZOPIA_BASE_URL}/base/{baseId}?session_id={sessionId}`,用户可在浏览器中查看和编辑完整项目)。优先使用脚本输出中的 `projectUrl` 字段。
### 场景 2:在已有会话中追加新需求(如"再改一下角色造型")
```
1. send_message.py --base-id B --episode-id E \
--session-id S "用户的新指令" → 复用已有会话
2. 轮询 → 下载 → 展示
```
使用同一个 `session_id` 可保持上下文连续。
### 场景 3:在已有项目中继续创作
```
1. list_projects.py → 让用户选择项目
2. manage_episodes.py list --base-id B → 查看剧集列表
3. send_message.py --base-id B \
--episode-id E "新的创作指令" → 新建会话
4. 轮询 → 下载 → 展示
```
### 场景 4:多集连续剧制作
一个项目(Project)可以包含多个剧集(Episode)。每集有独立的剧本、角色表、分镜表,但共享项目级设置(风格、画幅、模型)。
**创作流程:**
```
1. create_project.py "我的连续剧" → 拿到 baseId, episodeId (自动创建第一集)
2. save_settings.py --base-id B ... → 配置项目(所有剧集共享)
── 第一集 ──
3. send_message.py --base-id B \
--episode-id EP1 "第一集:主角进入废墟..." → 创作第一集
4. 轮询 → 下载
── 第二集 ──
5. manage_episodes.py create --base-id B → 拿到新 episodeId (EP2)
6. send_message.py --base-id B \
--episode-id EP2 "第二集:发现地下实验室..." → 创作第二集
7. 轮询 → 下载
── 更多剧集:重复步骤 5-7 ──
```
**多集注意事项:**
- 每集有独立的角色和分镜,不会互相干扰
- 如果后续剧集需要沿用前集角色形象,在消息中说明即可(如"延续第一集的角色设定"),后端 Agent 会处理
- 创建新剧集前,建议先确认当前剧集已完成(`status: "completed"`)
- 可以随时用 `manage_episodes.py list --base-id B` 查看所有剧集状态
- 删除剧集是不可逆操作,会清除该集所有内容
### 场景 5:将分镜视频合成为最终 MP4
所有分镜视频生成完毕后,可一键触发云端渲染,将所有片段按时间轴顺序合成为完整 MP4 文件。
```
1. render_episode.py trigger \
--base-id B --episode-id E → 拿到 render_id,渲染开始(异步)
2. render_episode.py status \
--base-id B --episode-id E \
--render-id RENDER_ID --poll → 自动轮询,完成后输出 video_url
```
**触发时机:** 用户明确要求「导出视频」「合成 MP4」「生成完整视频」时才触发。分镜视频生成阶段不要触发。
**渲染前提:** storyboard 中至少有一个分镜有 video_urls(即已完成视频生成),否则渲染内容为空。
**完成标志:** `status: "completed"` 且返回 `video_url`(S3 直链,可直接下载或分享)。
**轮询说明:** 渲染由 Remotion Lambda 执行,通常需要 1–5 分钟,`--poll` 参数每 8 秒检查一次进度(`progress` 字段 0→1),超时上限 10 分钟。
---
## 读懂 workspace 进度
`query_session.py` 返回的 `workspace` 是项目的实时快照,用来判断创作走到哪一步了:
```json
{
"status": "running",
"workspace": {
"entities": [{"name": "角色A", "images_status": "done", "image_urls": [...]}],
"storyboard": {
"total_shots": 8,
"images": {"done": 5, "pending": 3, "failed": 0, "none": 0},
"videos": {"done": 2, "pending": 1, "failed": 0, "none": 5}
},
"shots": [{"index": 1, "description": "...", "image_urls": [...], "video_urls": [...]}]
}
}
```
**怎么读:**
- `status: "running"` + workspace 空 → 刚开始,Agent 还在理解需求
- `entities` 出现,`images_status: "pending"` → 正在生成角色图
- `storyboard.images.pending > 0` → 正在生成分镜关键帧
- `storyboard.videos.pending > 0` → 正在生成视频片段
- `status: "completed"` → 全部完成,检查有无 failed 项
**轮询策略:**
- **间隔**:每 8 秒查询一次
- **增量拉取**:首次 `--after-seq 0`,后续传上次拿到的最大 seq 值
- **完成判断**:`status` 变为 `completed`(全部完成)或 `idle`
- **超时**:连续 3 分钟无新进展,告知用户「生成时间较长」并给出项目链接供自行查看,停止轮询
- **错误重试**:单次查询失败可重试 1 次;连续 3 次失败则停止并告知用户
- **自动轮询**:使用 `--poll` 参数可自动执行上述策略,无需手动循环
---
## 你的角色
Zopia 后端有完整的 AI 创作 Agent(对模型能力、prompt 工程、创作流程远比用户侧专业),你负责的是**项目管理和需求传达**。
**你要做的三件事:**
1. **配置** — 根据用户意图创建项目,选择合适的风格、模型、画幅
2. **传话** — 把用户的原始需求原封不动发给后端 Agent
3. **取件** — 追踪进度,在关键节点通知用户,完成后自动下载结果并展示
**不要做的事:**
- 不替用户扩写、润色、翻译创作描述(用户说"帮我推演分镜",就直接传这句话,不要自己先写个分镜表再逐条发)
- 不自行拆分任务(用户说"生成8个分镜图",发一条消息给后端,后端自己拆解)
- 不在消息中添加自己编的描述词(如"超写实风格,电影级光影,8K分辨率")
**正确:**
```
用户说:「帮我做一个赛博朋克风格的短剧,讲一个机器人在废墟中寻找最后一朵花」
→ create_project.py "赛博朋克短剧"
→ save_settings.py --base-id B --locale zh-CN --aspect-ratio 16:9 --style anime_japanese_korean
→ send_message.py --base-id B --episode-id E "帮我做一个赛博朋克风格的短剧,讲一个机器人在废墟中寻找最后一朵花"
→ 轮询 → 下载到 ./赛博朋克短剧/ → 展示文件列表 + 项目链接
```
**错误:**
```
❌ 先自己写了个详细的 5 场剧本和分镜描述
❌ 把自己编的内容逐条发给后端
❌ 在用户描述后面追加 "cinematic lighting, 8K, ultra detailed"
```
---
## 错误码速查
| 状态码 | 含义 | 处理 |
|--------|------|------|
| 400 | 参数缺失或设置不合法 | 检查必填字段和枚举值 |
| 401 | Token 无效或过期 | 提醒用户重新获取 |
| 402 | 余额不足 | 提醒充值 |
| 403 | 无权限 | 检查 baseId 归属 |
| 404 | 资源不存在 | 检查 ID 是否正确 |
| 409 | 会话执行中 | 等待当前会话完成再发新消息 |
FILE:CLAUDE.md
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## 项目定位
本仓库是 **Zopia AI 视频创作 Skill 包**,被 Claude Code / Gemini CLI / Codex / Cursor 等 Agent 通过 `npx skills add` 或 `npx clawhub install` 安装后调用。它本身不是后端服务,而是一组 Python 脚本 + 一份 `SKILL.md` 行为指引,封装对 Zopia 平台 HTTP API 的调用。
后端实现在姐妹仓库 `C:\code\jobCode\zipia\jaaz-cloud`(同 host 下的 `https://zopia.ai`),本仓库的脚本通过 HTTP 调用其接口。修改本项目时,若涉及接口字段、新模型 ID、新风格枚举等,对应能力须在 `jaaz-cloud` 已上线,否则脚本调用会报 400/404。
## 核心架构
```
SKILL.md ← 行为契约:Agent 什么时候触发、如何组合脚本、典型场景、错误处理
README.md ← 用户向:安装/配置/枚举值速查
scripts/_common.py ← 唯一的共享层:urllib HTTP + Bearer 鉴权 + 业务级封装函数
scripts/*.py ← 一脚本一动作,全是 argparse + _common 函数 + print_json 的薄壳
docs/ ← 维护者文档(如 publish-to-clawhub.md)
```
**只允许 Python 标准库**。`_common.py` 用 `urllib.request` 实现 HTTP,是有意为之——Skill 安装到用户机器后不能要求 `pip install`。新增脚本必须沿用这个约定,**不要引入 requests / httpx / pydantic 等第三方包**。
每个脚本的写法是固定的薄壳模式:
1. `sys.path.insert(0, os.path.dirname(__file__))` 后从 `_common` 导入
2. `argparse` 解析参数
3. 调用 `_common` 里的业务函数
4. `print_json(result)` 输出,由调用方(Agent)解析
新增 API 调用时,**先在 `_common.py` 加业务级封装函数**(参考 `create_project` / `send_message`),再写脚本壳。不要让脚本自己拼 URL 和 header。
## SKILL.md 是行为源
`SKILL.md` 是 Agent 的运行手册——它规定了 Agent 应该如何串联这些脚本(场景 1–5)、如何读 workspace 进度、什么时候该展示项目链接、不要替用户扩写需求等。**修改脚本参数或新增脚本时,必须同步更新 `SKILL.md` 的脚本速查表与场景示例**,否则 Agent 行为会与实际能力脱节。`README.md` 的枚举值表(风格、模型)也要同步。
## 常用命令
仓库无 lint、无测试、无构建。日常只有这些:
```bash
# 配置环境变量(脚本运行前提)
export ZOPIA_ACCESS_KEY="zopia-xxxxxxxxxxxx"
export ZOPIA_BASE_URL="https://zopia.ai" # 可选,默认即此
# 直接跑脚本调试
python3 scripts/get_balance.py
python3 scripts/create_project.py "测试项目"
python3 scripts/query_session.py SESSION_ID --poll
```
调试本地 `jaaz-cloud` 时,把 `ZOPIA_BASE_URL` 改到本地端口(如 `http://localhost:3000`)即可。
## 发布流程
发布到 ClawHub 技能市场,详细步骤见 `docs/publish-to-clawhub.md`:
```bash
git push # 先推 GitHub
npx clawhub publish . --slug zopia-skill --version x.y.z --changelog "..."
npx clawhub inspect zopia-skill # 验证 Latest 字段
```
版本号 semver:新增模型/功能 → minor +1;修 bug / 文档更新 → patch +1。
## 项目约定
- **Git commit message 用中文**,控制在 20 字以内(参考最近提交:`feat: 新增 Seedance 2.0 Fast 和 PixVerse C1 模型`)
- **不要 `git commit`**,除非用户明确要求
- 重构时不考虑兼容性,直接删除旧逻辑(除非用户明确要求保留)
- npm/npx 命令在 Windows 下用 cmd 执行
FILE:docs/publish-to-clawhub.md
# 发布 zopia-skill 到 ClawHub
## 前提条件
- Node.js 已安装(`npx` 可用)
- 已登录 ClawHub(见下文)
## 1. 登录
首次使用需登录:
```bash
npx clawhub login
```
验证登录状态:
```bash
npx clawhub whoami
# ✔ Lambdua
```
登录态会持久化,后续无需重复登录。
## 2. 修改内容并推送 GitHub
修改 `SKILL.md`、脚本等文件后,先 commit & push 到 GitHub:
```bash
git add .
git commit -m "描述本次变更"
git push
```
## 3. 发布新版本到 ClawHub
```bash
npx clawhub publish /path/to/zopia-skills \
--slug zopia-skill \
--version <新版本号> \
--changelog "本次变更说明"
```
**示例:**
```bash
npx clawhub publish /c/code/jobCode/zipia/zopia-skills \
--slug zopia-skill \
--version 1.0.3 \
--changelog "新增 xxx 模型"
```
版本号遵循 semver:
- 新增模型 / 功能 → 次版本号 +1(如 1.0.1 → 1.1.0)
- Bug 修复 / 文档更新 → 补丁号 +1(如 1.0.1 → 1.0.2)
## 4. 验证发布结果
```bash
npx clawhub inspect zopia-skill
```
确认 `Latest` 字段已更新为新版本号。
## 常用命令速查
| 命令 | 说明 |
|------|------|
| `npx clawhub whoami` | 查看当前登录用户 |
| `npx clawhub inspect zopia-skill` | 查看已发布版本信息 |
| `npx clawhub publish <path> --slug zopia-skill --version x.y.z` | 发布新版本 |
| `npx clawhub skill --help` | 管理已发布技能 |
FILE:README.md
# Zopia Skills
Zopia AI 视频创作技能 — 通过 [Zopia](https://zopia.ai) 平台的 AI Agent 进行视频/图片创作。
覆盖场景:AI 视频生成(文生视频、图生视频)、AI 图片生成(角色设定图、分镜关键帧)、剧本创作、角色设计、分镜设计、多集连续剧制作。
## 安装
### 通过 npx 安装(推荐)
```bash
npx skills add 11cafe/zopia-skills
```
> [`skills`](https://github.com/vercel-labs/skills) 是 Vercel Labs 开发的跨平台技能安装 CLI,支持 Claude Code、Gemini CLI、Codex、Cursor 等 40+ 个 Agent。
安装到指定 Agent:
```bash
npx skills add 11cafe/zopia-skills -a claude-code
```
### 通过 OpenClaw 安装
在 [OpenClaw](https://openclaw.ai) 技能市场搜索 `zopia-skill`,或使用命令行:
```bash
npx clawhub install zopia-skill
```
### 手动安装
```bash
git clone https://github.com/11cafe/zopia-skills.git
```
将 `SKILL.md` 和 `scripts/` 目录复制到对应的技能目录:
| 范围 | 路径 |
|------|------|
| 个人全局 | `~/.claude/skills/zopia-skill/` |
| 项目级别 | `.claude/skills/zopia-skill/` |
## 配置
使用前需设置环境变量:
```bash
export ZOPIA_ACCESS_KEY="zopia-xxxxxxxxxxxx" # 必需,30天有效
export ZOPIA_BASE_URL="https://zopia.ai" # 可选,默认值即可
```
仅依赖 Python 标准库,无需安装第三方包。需要 `python3` 可用。
## 使用
安装技能后,在 Claude Code 中直接描述你的创作需求即可:
```
帮我做一个赛博朋克风格的短剧,讲一个机器人在废墟中寻找最后一朵花
```
技能会自动完成:创建项目 → 配置设置 → 发送创作指令 → 轮询进度 → 下载结果。
### 支持的风格
| ID | 说明 |
|----|------|
| `anime_japanese_korean` | 日韩动漫 |
| `realistic_3d_cg` | 3D CG 写实 |
| `pixar_3d_cartoon` | Pixar 3D 卡通 |
| `photorealistic_real_human` | 真人写实 |
| `3D_CG_Animation` | 3D CG 动画 |
| `anime_chibi` | Q版可爱 |
| `anime_shinkai` | 新海诚 |
| `anime_ghibli` | 吉卜力 |
| `stylized_pixel` | 像素艺术 |
### 支持的图片模型
分镜图(storyboard)与角色/场景图(entity)可分别配置,两者共享同一组可选值:
| 模型 | 名称 |
|------|------|
| `generate_image_by_nano_banana_2` | Nano Banana 2(storyboard 默认)|
| `generate_image_by_doubao_seedream_4` | Doubao Seedream 4(entity 默认)|
| `generate_image_by_nano_banana` | Nano Banana |
| `generate_image_by_gpt_image_2` | GPT Image 2 |
### 支持的视频模型
| 模型 | 名称 |
|------|------|
| `generate_video_by_kling_o3` | Kling O3 |
| `generate_video_by_kling_v3_0` | Kling V3.0 |
| `generate_video_by_hailuo_02` | Hailuo 2.3 |
| `generate_video_by_wan26_i2v` | Wan 2.6 |
| `generate_video_by_wan26_i2v_flash` | Wan 2.6 Flash |
| `generate_video_by_viduq2_pro` | Vidu Q2 Pro |
| `generate_video_by_viduq3_pro` | Vidu Q3 Pro |
| `generate_video_by_viduq3` | Vidu Q3 |
| `generate_video_by_seedance_15` | Seedance 1.5 Pro |
### 脚本列表
| 脚本 | 用途 |
|------|------|
| `create_project.py` | 创建项目 |
| `save_settings.py` | 配置项目设置 |
| `send_message.py` | 发送创作指令 |
| `query_session.py` | 查询创作进度 |
| `download_results.py` | 下载媒体资源 |
| `get_balance.py` | 查询余额 |
| `list_projects.py` | 列出所有项目 |
| `manage_episodes.py` | 管理剧集 |
| `render_episode.py` | 合成最终视频(MP4) |
## License
[MIT](LICENSE)
FILE:scripts/create_project.py
#!/usr/bin/env python3
"""创建 Zopia 项目。
用法:
python create_project.py [项目名称]
返回:
{baseId, baseName, episodeId, projectUrl}
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import build_project_url, create_project, print_json
def main() -> None:
parser = argparse.ArgumentParser(description="创建 Zopia 项目")
parser.add_argument("name", nargs="?", default=None, help="项目名称(可选)")
args = parser.parse_args()
result = create_project(args.name)
base_id = result.get("baseId", "")
result["projectUrl"] = build_project_url(base_id)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/download_results.py
#!/usr/bin/env python3
"""批量下载 Zopia 会话中生成的媒体资源。
用法:
# 从会话结果中下载所有媒体
python download_results.py SESSION_ID
# 指定输出目录和前缀
python download_results.py SESSION_ID --output-dir ./results --prefix storyboard
# 仅下载图片或视频
python download_results.py SESSION_ID --type image
python download_results.py SESSION_ID --type video
"""
from __future__ import annotations
import argparse
import os
import re
import sys
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
sys.path.insert(0, os.path.dirname(__file__))
from _common import print_json, query_session
# 支持的文件扩展名
IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".webp"}
VIDEO_EXTS = {".mp4", ".mov", ".webm"}
MAX_FILE_SIZE = 200 * 1024 * 1024 # 200MB
def extract_urls(result: dict) -> list[dict[str, str]]:
"""从会话结果中提取所有媒体 URL。"""
urls: list[dict[str, str]] = []
seen: set[str] = set()
# 从 workspace 中提取
workspace = result.get("workspace", {})
# 实体图片
for entity in workspace.get("entities", []):
for url in entity.get("image_urls", []):
if url and url not in seen:
seen.add(url)
urls.append({"url": url, "type": "image", "source": f"entity:{entity.get('name', '')}"})
# 分镜图片和视频
for shot in workspace.get("shots", []):
for img in shot.get("image_urls", []):
if img and img not in seen:
seen.add(img)
urls.append({"url": img, "type": "image", "source": f"shot:{shot.get('index', '')}"})
for vid in shot.get("video_urls", []):
if vid and vid not in seen:
seen.add(vid)
urls.append({"url": vid, "type": "video", "source": f"shot:{shot.get('index', '')}"})
# 从消息文本中正则提取 URL(兜底)
for msg in result.get("messages", []):
content = msg.get("content", "")
if isinstance(content, str):
for match in re.finditer(r'https?://[^\s"\'<>]+\.(?:png|jpg|jpeg|webp|mp4|mov|webm)', content):
url = match.group(0)
if url not in seen:
seen.add(url)
ext = Path(url.split("?")[0]).suffix.lower()
media_type = "video" if ext in VIDEO_EXTS else "image"
urls.append({"url": url, "type": media_type, "source": "message"})
return urls
def download_file(url: str, output_path: str) -> bool:
"""下载单个文件,返回是否成功。"""
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=60) as resp:
content_length = resp.headers.get("Content-Length")
if content_length and int(content_length) > MAX_FILE_SIZE:
print(f"跳过(文件过大): {url}", file=sys.stderr)
return False
with open(output_path, "wb") as f:
while True:
chunk = resp.read(8192)
if not chunk:
break
f.write(chunk)
return True
except Exception as exc:
print(f"下载失败 {url}: {exc}", file=sys.stderr)
return False
def main() -> None:
parser = argparse.ArgumentParser(description="批量下载 Zopia 会话中的媒体资源")
parser.add_argument("session_id", help="会话 ID")
parser.add_argument("--output-dir", default=".", help="输出目录(默认当前目录)")
parser.add_argument("--prefix", default="", help="文件名前缀")
parser.add_argument("--type", choices=["image", "video"], default=None,
help="仅下载指定类型")
parser.add_argument("--workers", type=int, default=5, help="并发下载数")
args = parser.parse_args()
# 获取会话结果
result = query_session(args.session_id)
media_urls = extract_urls(result)
# 按类型过滤
if args.type:
media_urls = [m for m in media_urls if m["type"] == args.type]
if not media_urls:
print("没有找到可下载的媒体资源")
return
# 创建输出目录
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# 构建下载任务
tasks: list[tuple[str, str]] = []
for i, media in enumerate(media_urls, 1):
url = media["url"]
ext = Path(url.split("?")[0]).suffix.lower() or ".png"
prefix = f"{args.prefix}_" if args.prefix else ""
filename = f"{prefix}{media['type']}_{i:02d}{ext}"
output_path = str(output_dir / filename)
tasks.append((url, output_path))
# 并发下载
success_count = 0
downloaded: list[dict[str, str]] = []
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {executor.submit(download_file, url, path): (url, path, media_urls[i])
for i, (url, path) in enumerate(tasks)}
for future in futures:
url, path, media_info = futures[future]
if future.result():
success_count += 1
downloaded.append({
"url": url,
"path": path,
"type": media_info["type"],
"source": media_info["source"],
})
print_json({
"total": len(tasks),
"downloaded": success_count,
"failed": len(tasks) - success_count,
"files": downloaded,
})
if __name__ == "__main__":
main()
FILE:scripts/get_balance.py
#!/usr/bin/env python3
"""查询 Zopia 账户余额。
用法:
python get_balance.py
返回:
{accounts: [...], summary: {totalBalance, totalHeld, totalAvailable}}
"""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import get_balance, print_json
def main() -> None:
result = get_balance()
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/list_projects.py
#!/usr/bin/env python3
"""列出 Zopia 项目。
用法:
python list_projects.py
python list_projects.py --page 2 --page-size 20
返回:
{data: [...], page, pageSize, hasMore}
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import build_project_url, list_projects, print_json
def main() -> None:
parser = argparse.ArgumentParser(description="列出 Zopia 项目")
parser.add_argument("--page", type=int, default=1, help="页码(默认 1)")
parser.add_argument("--page-size", type=int, default=12, help="每页数量(默认 12,最大 50)")
args = parser.parse_args()
result = list_projects(args.page, args.page_size)
for item in result.get("data", []):
item["projectUrl"] = build_project_url(item.get("id", ""))
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/manage_episodes.py
#!/usr/bin/env python3
"""管理 Zopia 项目的剧集。
用法:
# 列出剧集
python manage_episodes.py list --base-id BASE_ID
# 创建新剧集
python manage_episodes.py create --base-id BASE_ID
# 删除剧集
python manage_episodes.py delete --episode-id EPISODE_ID
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import create_episode, delete_episode, list_episodes, print_json
def main() -> None:
parser = argparse.ArgumentParser(description="管理 Zopia 项目的剧集")
subparsers = parser.add_subparsers(dest="action", required=True)
# list
list_parser = subparsers.add_parser("list", help="列出剧集")
list_parser.add_argument("--base-id", required=True, help="项目 ID")
# create
create_parser = subparsers.add_parser("create", help="创建新剧集")
create_parser.add_argument("--base-id", required=True, help="项目 ID")
# delete
delete_parser = subparsers.add_parser("delete", help="删除剧集")
delete_parser.add_argument("--episode-id", required=True, help="剧集 ID")
args = parser.parse_args()
if args.action == "list":
result = list_episodes(args.base_id)
print_json(result)
elif args.action == "create":
result = create_episode(args.base_id)
print_json(result)
elif args.action == "delete":
result = delete_episode(args.episode_id)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/query_session.py
#!/usr/bin/env python3
"""轮询 Zopia 会话结果。
用法:
# 查询完整结果
python query_session.py SESSION_ID
# 增量查询(仅获取 seq > 5 的新消息)
python query_session.py SESSION_ID --after-seq 5
# 自动轮询直到完成
python query_session.py SESSION_ID --poll
返回结构化结果:
{
"status": "completed" | "running" | "idle",
"messages": [...],
"workspace": {
"entities": [...],
"storyboard": {...}
}
}
"""
from __future__ import annotations
import argparse
import os
import sys
import time
sys.path.insert(0, os.path.dirname(__file__))
from _common import print_json, query_session
# 轮询参数
POLL_INTERVAL = 8 # 秒
POLL_TIMEOUT = 180 # 最长轮询时间(秒)
MAX_CONSECUTIVE_FAIL = 3
def main() -> None:
parser = argparse.ArgumentParser(description="轮询 Zopia 会话结果")
parser.add_argument("session_id", help="会话 ID")
parser.add_argument("--after-seq", type=int, default=0,
help="仅获取 seq 大于此值的消息")
parser.add_argument("--poll", action="store_true",
help="自动轮询直到会话完成")
args = parser.parse_args()
if not args.poll:
result = query_session(args.session_id, args.after_seq)
print_json(result)
return
# 自动轮询模式
after_seq = args.after_seq
start_time = time.time()
consecutive_fails = 0
while True:
elapsed = time.time() - start_time
if elapsed > POLL_TIMEOUT:
print(f"轮询超时({POLL_TIMEOUT}秒)", file=sys.stderr)
sys.exit(1)
try:
result = query_session(args.session_id, after_seq)
consecutive_fails = 0
except SystemExit:
consecutive_fails += 1
if consecutive_fails >= MAX_CONSECUTIVE_FAIL:
print(f"连续失败 {MAX_CONSECUTIVE_FAIL} 次,停止轮询", file=sys.stderr)
sys.exit(1)
time.sleep(POLL_INTERVAL)
continue
status = result.get("status", "")
messages = result.get("messages", [])
# 更新增量游标
if messages:
max_seq = max(m.get("seq", 0) for m in messages)
if max_seq > after_seq:
after_seq = max_seq
# 输出当前状态
print_json(result)
if status in ("completed", "idle", "error"):
break
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()
FILE:scripts/render_episode.py
#!/usr/bin/env python3
"""触发并查询 Zopia episode 视频合成渲染。
用法:
# 触发渲染(异步,立即返回 render_id)
python render_episode.py trigger --base-id BASE_ID --episode-id EPISODE_ID
# 触发渲染并添加水印
python render_episode.py trigger --base-id BASE_ID --episode-id EPISODE_ID --watermark
# 查询最新渲染状态
python render_episode.py status --base-id BASE_ID --episode-id EPISODE_ID
# 查询指定渲染状态
python render_episode.py status --base-id BASE_ID --episode-id EPISODE_ID --render-id RENDER_ID
# 自动轮询直到渲染完成
python render_episode.py status --base-id BASE_ID --episode-id EPISODE_ID --render-id RENDER_ID --poll
返回结构:
trigger: {"render_id": "...", "status": "processing"}
status: {"status": "not_started" | "processing" | "completed" | "failed",
"render_id": "...", "progress": 0.0~1.0, "video_url": "..."}
"""
from __future__ import annotations
import argparse
import os
import sys
import time
sys.path.insert(0, os.path.dirname(__file__))
from _common import get_render_status, print_json, trigger_render
POLL_INTERVAL = 8 # 秒
POLL_TIMEOUT = 600 # 最长轮询时间(秒),渲染比 Agent 慢,给 10 分钟
def main() -> None:
parser = argparse.ArgumentParser(description="Zopia episode 视频渲染")
subparsers = parser.add_subparsers(dest="action", required=True)
# trigger
t = subparsers.add_parser("trigger", help="触发渲染(异步)")
t.add_argument("--base-id", required=True, help="项目 ID")
t.add_argument("--episode-id", required=True, help="剧集 ID")
t.add_argument("--watermark", action="store_true", help="添加水印(默认不加)")
# status
s = subparsers.add_parser("status", help="查询渲染状态")
s.add_argument("--base-id", required=True, help="项目 ID")
s.add_argument("--episode-id", required=True, help="剧集 ID")
s.add_argument("--render-id", default=None, help="渲染 ID(省略则查最新)")
s.add_argument("--poll", action="store_true", help="自动轮询直到完成")
args = parser.parse_args()
if args.action == "trigger":
result = trigger_render(args.base_id, args.episode_id, args.watermark)
print_json(result)
return
# status
if not args.poll:
result = get_render_status(args.base_id, args.episode_id, args.render_id)
print_json(result)
return
# 自动轮询模式
render_id = args.render_id
start_time = time.time()
while True:
if time.time() - start_time > POLL_TIMEOUT:
print(f"渲染轮询超时({POLL_TIMEOUT}秒)", file=sys.stderr)
sys.exit(1)
result = get_render_status(args.base_id, args.episode_id, render_id)
print_json(result)
status = result.get("status", "")
# 补全 render_id(首次查到后固定住)
if not render_id and result.get("render_id"):
render_id = result["render_id"]
if status == "completed":
break
if status == "failed":
print("渲染失败", file=sys.stderr)
sys.exit(1)
if status == "not_started":
print("尚未触发渲染,请先执行 trigger", file=sys.stderr)
sys.exit(1)
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()
FILE:scripts/save_settings.py
#!/usr/bin/env python3
"""保存或查询 Zopia 项目设置。
用法:
# 查询设置
python save_settings.py --base-id BASE_ID --get
# 保存设置
python save_settings.py --base-id BASE_ID --locale zh-CN --aspect-ratio 16:9 --style anime
支持的设置字段:
--locale 语言 (zh-CN, en, ja)
--aspect-ratio 画面比例 (16:9, 9:16)
--style 视觉风格
--video-model 视频模型
--generation-method 生成方式
--image-size 图片尺寸
--video-resolution 视频分辨率
--storyboard-image-model 分镜图模型
--entity-image-model 角色场景图模型
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import get_settings, print_json, save_settings
def main() -> None:
parser = argparse.ArgumentParser(description="保存或查询 Zopia 项目设置")
parser.add_argument("--base-id", required=True, help="项目 ID")
parser.add_argument("--get", action="store_true", help="查询当前设置")
parser.add_argument("--locale", help="语言 (zh-CN, en, ja)")
parser.add_argument("--aspect-ratio", help="画面比例 (16:9, 9:16)")
parser.add_argument("--style", help="视觉风格")
parser.add_argument("--video-model", help="视频模型")
parser.add_argument("--generation-method", help="生成方式")
parser.add_argument("--image-size", help="图片尺寸")
parser.add_argument("--video-resolution", help="视频分辨率")
parser.add_argument("--storyboard-image-model", help="分镜图模型")
parser.add_argument("--entity-image-model", help="角色场景图模型")
args = parser.parse_args()
if args.get:
result = get_settings(args.base_id)
print_json(result)
return
settings: dict[str, str] = {}
field_map = {
"locale": args.locale,
"aspect_ratio": args.aspect_ratio,
"style": args.style,
"video_model": args.video_model,
"generation_method": args.generation_method,
"image_size": args.image_size,
"video_resolution": args.video_resolution,
"storyboard_image_model": args.storyboard_image_model,
"entity_image_model": args.entity_image_model,
}
for key, value in field_map.items():
if value is not None:
settings[key] = value
if not settings:
print("错误: 至少需要指定一个设置字段", file=sys.stderr)
sys.exit(1)
result = save_settings(args.base_id, settings)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/send_message.py
#!/usr/bin/env python3
"""向 Zopia Agent 异步发送消息。
用法:
python send_message.py --base-id BASE_ID --episode-id EP_ID "生成一个赛博朋克风格的视频"
python send_message.py --base-id BASE_ID --episode-id EP_ID --session-id SESS_ID "继续生成下一个镜头"
返回:
{session_id, base_id, ...}
注意:
此接口为异步模式,返回 session_id 后需使用 query_session.py 轮询结果。
"""
from __future__ import annotations
import argparse
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from _common import build_project_url, print_json, send_message
def main() -> None:
parser = argparse.ArgumentParser(description="向 Zopia Agent 异步发送消息")
parser.add_argument("message", help="发送给 Agent 的消息内容")
parser.add_argument("--base-id", required=True, help="项目 ID")
parser.add_argument("--episode-id", required=True, help="剧集 ID")
parser.add_argument("--session-id", default=None, help="会话 ID(可选,续接已有会话)")
args = parser.parse_args()
result = send_message(
base_id=args.base_id,
episode_id=args.episode_id,
message=args.message,
session_id=args.session_id,
)
sid = result.get("session_id", "")
result["projectUrl"] = build_project_url(args.base_id, sid)
print_json(result)
if __name__ == "__main__":
main()
FILE:scripts/_common.py
"""Zopia Skill 共享模块 — 封装 HTTP 请求、认证、错误处理。
仅依赖 Python 标准库。
环境变量:
ZOPIA_ACCESS_KEY (必需) Bearer token,格式 zopia-xxxxxxxxxxxx
ZOPIA_BASE_URL (可选) API 基础地址,默认 https://zopia.ai
"""
from __future__ import annotations
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
# ---------------------------------------------------------------------------
# 配置
# ---------------------------------------------------------------------------
def _get_access_key() -> str:
key = os.environ.get("ZOPIA_ACCESS_KEY", "").strip()
if not key:
print("错误: 环境变量 ZOPIA_ACCESS_KEY 未设置", file=sys.stderr)
sys.exit(1)
return key
def _get_base_url() -> str:
return os.environ.get("ZOPIA_BASE_URL", "https://zopia.ai").rstrip("/")
# ---------------------------------------------------------------------------
# 底层 HTTP 工具
# ---------------------------------------------------------------------------
def _build_headers() -> dict[str, str]:
return {
"Authorization": f"Bearer {_get_access_key()}",
"Content-Type": "application/json",
}
def api_get(path: str, params: dict[str, str] | None = None) -> Any:
"""发送 GET 请求,返回解析后的 JSON。"""
url = f"{_get_base_url()}{path}"
if params:
url = f"{url}?{urllib.parse.urlencode(params)}"
req = urllib.request.Request(url, headers=_build_headers(), method="GET")
return _do_request(req)
def api_post(path: str, body: dict[str, Any] | None = None) -> Any:
"""发送 POST 请求,返回解析后的 JSON。"""
url = f"{_get_base_url()}{path}"
data = json.dumps(body or {}).encode("utf-8")
req = urllib.request.Request(url, data=data, headers=_build_headers(), method="POST")
return _do_request(req)
def api_delete(path: str) -> Any:
"""发送 DELETE 请求,返回解析后的 JSON。"""
url = f"{_get_base_url()}{path}"
req = urllib.request.Request(url, headers=_build_headers(), method="DELETE")
return _do_request(req)
def _do_request(req: urllib.request.Request) -> Any:
"""执行请求,统一处理错误。"""
try:
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8")
if not raw:
return {}
return json.loads(raw)
except urllib.error.HTTPError as exc:
body = ""
try:
body = exc.read().decode("utf-8", errors="replace")
except Exception:
pass
print(f"HTTP {exc.code} 错误: {body}", file=sys.stderr)
sys.exit(1)
except urllib.error.URLError as exc:
print(f"网络错误: {exc.reason}", file=sys.stderr)
sys.exit(1)
# ---------------------------------------------------------------------------
# 业务级封装
# ---------------------------------------------------------------------------
def create_project(base_name: str | None = None) -> dict[str, Any]:
"""创建项目,返回 {baseId, baseName, episodeId}。"""
body: dict[str, Any] = {}
if base_name:
body["baseName"] = base_name
resp = api_post("/api/base/create", body)
return resp.get("data", resp)
def save_settings(base_id: str, settings: dict[str, Any]) -> dict[str, Any]:
"""保存项目设置,返回合并后的设置。"""
resp = api_post("/api/base/settings", {"base_id": base_id, "settings": settings})
return resp
def get_settings(base_id: str) -> dict[str, Any]:
"""获取项目设置。"""
resp = api_get("/api/base/settings", {"base_id": base_id})
return resp
def send_message(base_id: str, episode_id: str, message: str,
session_id: str | None = None) -> dict[str, Any]:
"""异步发送消息,返回 {session_id, ...}。"""
body: dict[str, Any] = {
"base_id": base_id,
"episode_id": episode_id,
"message": message,
}
if session_id:
body["session_id"] = session_id
return api_post("/api/v1/agent/chat/async", body)
def query_session(session_id: str, after_seq: int = 0) -> dict[str, Any]:
"""增量查询会话消息,返回结构化结果。"""
params: dict[str, str] = {}
if after_seq > 0:
params["afterSeq"] = str(after_seq)
return api_get(f"/api/v1/agent/session/{session_id}/messages", params)
def list_projects(page: int = 1, page_size: int = 12) -> dict[str, Any]:
"""获取项目列表。"""
return api_get("/api/base/list", {"page": str(page), "pageSize": str(page_size)})
def get_project_detail(base_id: str, episode_id: str) -> dict[str, Any]:
"""获取项目详情。"""
return api_get(f"/api/base/{base_id}", {"episode_id": episode_id})
def create_episode(base_id: str) -> dict[str, Any]:
"""创建新剧集。"""
return api_post(f"/api/episode/create?base_id={base_id}")
def list_episodes(base_id: str) -> dict[str, Any]:
"""列出项目的所有剧集。"""
return api_get("/api/episode/list", {"base_id": base_id})
def delete_episode(episode_id: str) -> dict[str, Any]:
"""删除剧集。"""
return api_delete(f"/api/episode/{episode_id}")
def get_balance() -> dict[str, Any]:
"""查询余额。"""
return api_get("/api/billing/getBalance")
def trigger_render(base_id: str, episode_id: str, show_watermark: bool = False) -> dict[str, Any]:
"""触发 episode 视频合成渲染(异步),返回 {render_id, status}。"""
return api_post(
f"/api/v1/base/{base_id}/episode/{episode_id}/render",
{"show_watermark": show_watermark},
)
def get_render_status(base_id: str, episode_id: str, render_id: str | None = None) -> dict[str, Any]:
"""查询渲染状态,返回 {status, render_id?, progress?, video_url?, error?}。"""
params: dict[str, str] = {}
if render_id:
params["render_id"] = render_id
return api_get(f"/api/v1/base/{base_id}/episode/{episode_id}/render", params or None)
def build_project_url(base_id: str, session_id: str | None = None) -> str:
"""构造项目的 Web 访问 URL。"""
url = f"{_get_base_url()}/base/{base_id}"
if session_id:
url = f"{url}?session_id={session_id}"
return url
# ---------------------------------------------------------------------------
# 输出工具
# ---------------------------------------------------------------------------
def print_json(data: Any) -> None:
"""以格式化的 JSON 输出到 stdout。"""
print(json.dumps(data, ensure_ascii=False, indent=2))