@clawhub-chuyun-3dfaeef348
Placeholder skill for video-to-video workflows on skills.video. Use when the user is asking about v2v generation and the concrete API contract has not been i...
--- name: "v2v" description: Placeholder skill for video-to-video workflows on skills.video. Use when the user is asking about v2v generation and the concrete API contract has not been implemented yet. --- # v2v ## Overview Use this as a placeholder skill for video-to-video generation on `skills.video`. This file is intentionally incomplete and should not be treated as a production workflow. ## Current status - No verified v2v OpenAPI contract has been wired into this skill yet. - No default endpoint, payload template, or helper command is defined yet. - The skill should surface this limitation clearly instead of guessing request fields. ## Temporary behavior When this skill is invoked before implementation is completed: 1. Tell the user that `v2v` is currently a placeholder. 2. Check whether a model-specific `openapi.json` or docs page exists for the requested provider/model. 3. Avoid inventing undocumented video input fields, upload semantics, or edit parameters. 4. Ask for the exact model or endpoint only if it is necessary to continue. 5. Once the API contract is confirmed, promote this placeholder into a real workflow similar to `video-generation`. ## Implementation checklist - Confirm which providers/models support v2v on `skills.video` - Identify the create endpoint and terminal status flow - Document video input format such as URL, file upload, or asset ID - Document supported edit controls such as prompt, strength, trim, or style transfer - Add request-template extraction steps from OpenAPI - Add execution examples and fallback polling behavior if applicable - Add runtime error handling guidance ## Related skills - `video-generation`: generic video generation flow - `i2v`: image-to-video placeholder flow - `i2i`: image-to-image placeholder flow
Placeholder skill for text-to-video workflows on skills.video. Use when the user is asking about t2v generation and the concrete API contract has not been im...
--- name: "t2v" description: Placeholder skill for text-to-video workflows on skills.video. Use when the user is asking about t2v generation and the concrete API contract has not been implemented yet. --- # t2v ## Overview Use this as a placeholder skill for text-to-video generation on `skills.video`. This file is intentionally incomplete and should not be treated as a production workflow. ## Current status - No verified t2v OpenAPI contract has been wired into this skill yet. - No default endpoint, payload template, or helper command is defined yet. - The skill should surface this limitation clearly instead of guessing request fields. ## Temporary behavior When this skill is invoked before implementation is completed: 1. Tell the user that `t2v` is currently a placeholder. 2. Check whether a model-specific `openapi.json` or docs page exists for the requested provider/model. 3. Avoid inventing undocumented request fields or unsupported generation options. 4. Ask for the exact model or endpoint only if it is necessary to continue. 5. Once the API contract is confirmed, promote this placeholder into a real workflow or redirect to `video-generation` if that generic skill is sufficient. ## Implementation checklist - Confirm which providers/models support t2v on `skills.video` - Identify the create endpoint and terminal status flow - Document prompt, duration, aspect ratio, and other supported parameters - Add request-template extraction steps from OpenAPI - Add execution examples and fallback polling behavior if applicable - Add runtime error handling guidance ## Related skills - `video-generation`: generic video generation flow - `i2v`: image-to-video placeholder flow - `v2v`: video-to-video placeholder flow
Placeholder skill for text-to-image workflows on skills.video. Use when the user is asking about t2i generation and the concrete API contract has not been im...
--- name: "t2i" description: Placeholder skill for text-to-image workflows on skills.video. Use when the user is asking about t2i generation and the concrete API contract has not been implemented yet. --- # t2i ## Overview Use this as a placeholder skill for text-to-image generation on `skills.video`. This file is intentionally incomplete and should not be treated as a production workflow. ## Current status - No verified t2i OpenAPI contract has been wired into this skill yet. - No default endpoint, payload template, or helper command is defined yet. - The skill should surface this limitation clearly instead of guessing request fields. ## Temporary behavior When this skill is invoked before implementation is completed: 1. Tell the user that `t2i` is currently a placeholder. 2. Check whether a model-specific `openapi.json` or docs page exists for the requested provider/model. 3. Avoid inventing undocumented request fields or unsupported image parameters. 4. Ask for the exact model or endpoint only if it is necessary to continue. 5. Once the API contract is confirmed, promote this placeholder into a real workflow or redirect to `image-generation` if that generic skill is sufficient. ## Implementation checklist - Confirm which providers/models support t2i on `skills.video` - Identify the create endpoint and terminal status flow - Document prompt, size, aspect ratio, and other supported parameters - Add request-template extraction steps from OpenAPI - Add execution examples and fallback polling behavior if applicable - Add runtime error handling guidance ## Related skills - `image-generation`: generic image generation flow - `i2i`: image-to-image placeholder flow - `t2v`: text-to-video placeholder flow
Placeholder skill for image-to-video workflows on skills.video. Use when the user is asking about i2v generation and the concrete API contract has not been i...
--- name: "i2v" description: Placeholder skill for image-to-video workflows on skills.video. Use when the user is asking about i2v generation and the concrete API contract has not been implemented yet. --- # i2v ## Overview Use this as a placeholder skill for image-to-video generation on `skills.video`. This file is intentionally incomplete and should not be treated as a production workflow. ## Current status - No verified i2v OpenAPI contract has been wired into this skill yet. - No default endpoint, payload template, or helper command is defined yet. - The skill should surface this limitation clearly instead of guessing request fields. ## Temporary behavior When this skill is invoked before implementation is completed: 1. Tell the user that `i2v` is currently a placeholder. 2. Check whether a model-specific `openapi.json` or docs page exists for the requested provider/model. 3. Avoid inventing undocumented image input fields or upload semantics. 4. Ask for the exact model or endpoint only if it is necessary to continue. 5. Once the API contract is confirmed, promote this placeholder into a real workflow similar to `video-generation`. ## Implementation checklist - Confirm which providers/models support i2v on `skills.video` - Identify the create endpoint and terminal status flow - Document image input format such as URL, file upload, or asset ID - Document supported motion, duration, and aspect-ratio parameters - Add request-template extraction steps from OpenAPI - Add execution examples and fallback polling behavior if applicable - Add runtime error handling guidance ## Related skills - `video-generation`: generic video generation flow - `i2i`: image-to-image placeholder flow - `v2v`: video-to-video placeholder flow
Placeholder skill for image-to-image workflows on skills.video. Use when the user is asking about i2i generation and the concrete API contract has not been i...
--- name: "i2i" description: Placeholder skill for image-to-image workflows on skills.video. Use when the user is asking about i2i generation and the concrete API contract has not been implemented yet. --- # i2i ## Overview Use this as a placeholder skill for image-to-image generation on `skills.video`. This file is intentionally incomplete and should not be treated as a production workflow. ## Current status - No verified i2i OpenAPI contract has been wired into this skill yet. - No default endpoint, payload template, or helper command is defined yet. - The skill should surface this limitation clearly instead of guessing request fields. ## Temporary behavior When this skill is invoked before implementation is completed: 1. Tell the user that `i2i` is currently a placeholder. 2. Check whether a model-specific `openapi.json` or docs page exists for the requested provider/model. 3. Avoid inventing undocumented image input fields or upload semantics. 4. Ask for the exact model or endpoint only if it is necessary to continue. 5. Once the API contract is confirmed, promote this placeholder into a real workflow similar to `image-generation` or `video-generation`. ## Implementation checklist - Confirm which providers/models support i2i on `skills.video` - Identify the create endpoint and terminal status flow - Document image input format such as URL, file upload, or asset ID - Add request-template extraction steps from OpenAPI - Add execution examples and fallback polling behavior if applicable - Add runtime error handling guidance ## Related skills - `image-generation`: text-to-image generation flow - `video-generation`: text-to-video generation flow
Build and execute skills.video video generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document video generation calls on...
---
name: ai-video
description: Build and execute skills.video video generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document video generation calls on open.skills.video.
license: MIT
metadata:
openclaw:
author: skills-video
os:
- linux
- darwin
requires:
env:
- SKILLS_VIDEO_API_KEY
bins:
- python3
- curl
primaryEnv: SKILLS_VIDEO_API_KEY
cliHelp: |
Configure API key:
export SKILLS_VIDEO_API_KEY="your_api_key_here"
Verify:
python scripts/ensure_api_key.py
config:
stateDirs:
- ~/.openclaw
example: "Required env vars: SKILLS_VIDEO_API_KEY. Store the key in OpenClaw skill env or shell env and do not hardcode it in files."
links:
repository: https://github.com/skills-video/skills
homepage: https://skills.video
---
# ai-video
## Overview
Use this skill to turn OpenAPI definitions into working video-generation API calls for `skills.video`.
Prefer deterministic extraction from `openapi.json` instead of guessing fields.
## Prerequisites
1. Obtain an API key at: `https://skills.video/dashboard/developer`
2. Configure `SKILLS_VIDEO_API_KEY` before using the skill.
Preferred OpenClaw setup:
- Open the skill settings for `ai-video`
- Add an environment variable named `SKILLS_VIDEO_API_KEY`
- Paste the API key as its value
Equivalent config shape:
```json
{
"skills": {
"entries": {
"ai-video": {
"enabled": true,
"env": {
"SKILLS_VIDEO_API_KEY": "your_api_key_here"
}
}
}
}
}
```
Other valid ways to provide the key:
- **Shell**: `export SKILLS_VIDEO_API_KEY="your_api_key_here"`
- **Tool-specific env config**: any runtime that injects `SKILLS_VIDEO_API_KEY`
## Workflow
1. Check API key and bootstrap environment on first use.
2. Identify the active spec.
3. Select the SSE endpoint pair for a video model.
4. Extract request schema and generate a payload template.
5. Execute `POST /generation/sse/...` as default and keep the stream open.
6. If SSE does not reach terminal completion, poll `GET /generation/{id}` to terminal status.
7. Return only terminal result (`COMPLETED`/`SUCCEEDED`/`FAILED`/`CANCELED`), never `IN_PROGRESS`.
8. Apply retry and failure handling.
## 0) Check API key (first run)
Run this check before any API call.
```bash
python scripts/ensure_api_key.py
```
If `ok` is `false`, tell the user to:
- Follow the setup in `Prerequisites`
Example:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## 1) Identify the spec
Load the most specific OpenAPI first.
- Prefer model-specific OpenAPI when available (for example `/v1/openapi.json` under a model namespace).
- Fall back to platform-level `openapi.json`.
- Use `references/open-platform-api.md` for base URL, auth, and async lifecycle.
## 2) Select a video endpoint
If `docs.json` exists, derive video endpoints from the `Videos` navigation group.
Use `default_endpoints` from the script output as the primary list (SSE first).
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
When `docs.json` is unavailable, pass a known endpoint directly (for example `/generation/sse/kling-ai/kling-v2.6`).
Use `references/video-model-endpoints.md` as a snapshot list.
## 3) Extract schema and build payload
Inspect endpoint details and generate a request template from required/default fields.
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--endpoint /generation/sse/kling-ai/kling-v2.6 \
--include-template
```
Use the returned `request_template` as the starting point.
Do not add fields not defined by the endpoint schema.
Use `default_create_endpoint` from output unless an explicit override is required.
## 4) Execute SSE request (default) with automatic fallback
Prefer the helper script. It creates via SSE and keeps streaming; if stream ends before terminal completion, it automatically switches to polling fallback.
```bash
python scripts/create_and_wait.py \
--sse-endpoint /generation/sse/kling-ai/kling-v2.6 \
--payload '{"prompt":"A cinematic dolly shot of neon city rain at night"}' \
--poll-timeout 900 \
--poll-interval 3
```
Treat SSE as the default result channel.
Do not finish the task on `IN_QUEUE` or `IN_PROGRESS`.
Return only after terminal result.
## 5) Fall back to polling
Use polling only if SSE cannot be established, disconnects early, or does not reach a terminal state.
Use `GET /generation/{id}` (or model-spec equivalent path if the OpenAPI uses `/v1/...`).
```bash
curl -X GET "https://open.skills.video/api/v1/generation/<GENERATION_ID>" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Stop polling on terminal states:
- `COMPLETED`
- `FAILED`
- `CANCELED`
Recommended helper:
```bash
python scripts/wait_generation.py \
--generation-id <GENERATION_ID> \
--timeout 900 \
--interval 3
```
Return to user only after helper emits `event=terminal`.
## 6) Handle errors and retries
Handle these response codes for create, SSE, and fallback poll operations:
- `400`: request format issue
- `401`: missing/invalid API key
- `402`: possible payment/credits issue in runtime
- `404`: endpoint or generation id not found
- `422`: schema validation failed
Classify non-2xx runtime errors with:
```bash
python scripts/handle_runtime_error.py \
--status <HTTP_STATUS> \
--body '<RAW_ERROR_BODY_JSON_OR_TEXT>'
```
If category is `insufficient_credits`, tell the user to recharge:
- Open `https://skills.video/dashboard` and go to Billing/Credits
- Recharge or purchase additional credits
- Retry after recharge
Optional balance check:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Apply retries only for transient conditions (network failure or temporary `5xx`).
Use bounded exponential backoff (for example `1s`, `2s`, `4s`, max `16s`, then fail).
Do not retry unchanged payloads after `4xx` validation errors.
## Rate limits and timeouts
Treat rate limits and server-side timeout windows as unknown unless documented in the active OpenAPI or product docs.
If unknown, explicitly note this in output and choose conservative client defaults.
## Resources
- `scripts/ensure_api_key.py`: validate `SKILLS_VIDEO_API_KEY` and show first-run setup guidance
- `scripts/handle_runtime_error.py`: classify runtime errors and provide recharge guidance for insufficient credits
- `scripts/inspect_openapi.py`: extract SSE/polling endpoint pair, contract, and payload template
- `scripts/create_and_wait.py`: create via SSE and auto-fallback to polling when stream does not reach terminal status
- `scripts/wait_generation.py`: poll generation status until terminal completion and return final response
- `references/open-platform-api.md`: SSE-first lifecycle, fallback polling, retry baseline
- `references/video-model-endpoints.md`: current video endpoint snapshot from `docs.json`
FILE:scripts/ensure_api_key.py
#!/usr/bin/env python3
"""Check SKILLS_VIDEO_API_KEY and print setup guidance when missing."""
from __future__ import annotations
import json
import os
def main() -> int:
key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if key:
print(
json.dumps(
{
"ok": True,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "API key detected.",
},
ensure_ascii=False,
indent=2,
)
)
return 0
print(
json.dumps(
{
"ok": False,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "Missing API key. Configure it before calling skills.video APIs.",
"dashboard_url": "https://skills.video/dashboard/developer",
"how_to_get_key": [
"Sign in at the dashboard URL.",
"Click 'Create API Key'.",
"Copy the generated key.",
],
"set_env_examples": [
"export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"",
"echo 'export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"' >> ~/.zshrc && source ~/.zshrc",
],
},
ensure_ascii=False,
indent=2,
)
)
return 1
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/handle_runtime_error.py
#!/usr/bin/env python3
"""Classify skills.video runtime API errors and provide actionable guidance."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
INSUFFICIENT_CREDITS_KEYWORDS = (
"insufficient credit",
"not enough credit",
"insufficient balance",
"credit balance",
"top up",
"recharge",
)
def load_body(body: str | None, body_file: str | None) -> Any:
if body_file:
raw = Path(body_file).read_text(encoding="utf-8")
elif body is not None:
raw = body
else:
return None
raw = raw.strip()
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error = payload.get("error")
if isinstance(error, dict):
nested = error.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def classify(status: int, message: str) -> str:
lowered = message.lower()
if status == 402 or any(keyword in lowered for keyword in INSUFFICIENT_CREDITS_KEYWORDS):
return "insufficient_credits"
if status == 401 or "unauthorized" in lowered:
return "auth"
if status == 422 or "validation" in lowered:
return "validation"
if status == 404:
return "not_found"
if status == 429 or status >= 500:
return "transient"
return "unknown"
def exit_code_for(category: str) -> int:
table = {
"insufficient_credits": 20,
"auth": 21,
"validation": 22,
"not_found": 23,
"transient": 24,
"unknown": 25,
}
return table.get(category, 25)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--status", type=int, required=True, help="HTTP status code")
parser.add_argument("--body", help="Raw response body JSON/text")
parser.add_argument("--body-file", help="Path to file containing raw response body")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--credits-endpoint", default="/credits")
args = parser.parse_args()
payload = load_body(args.body, args.body_file)
message = extract_message(payload)
category = classify(args.status, message)
guidance: list[str] = []
should_retry = False
if category == "insufficient_credits":
guidance = [
"Credits are insufficient for this request.",
"Open https://skills.video/dashboard and go to Billing/Credits to recharge.",
"After recharge, retry the same request.",
]
elif category == "auth":
guidance = [
"Authentication failed.",
"Verify SKILLS_VIDEO_API_KEY and retry.",
]
elif category == "validation":
guidance = [
"Request payload validation failed.",
"Fix request parameters based on OpenAPI schema and retry.",
]
elif category == "not_found":
guidance = [
"Resource or endpoint was not found.",
"Recheck endpoint path/model id and generation id.",
]
elif category == "transient":
guidance = [
"Transient server or rate-limit error.",
"Retry with bounded exponential backoff.",
]
should_retry = True
else:
guidance = [
"Unhandled runtime error.",
"Inspect response payload and apply a safe fallback path.",
]
credits_url = f"{args.base_url.rstrip('/')}{args.credits_endpoint}"
output = {
"category": category,
"status": args.status,
"message": message,
"should_retry": should_retry,
"guidance": guidance,
"credits_check_command": (
f"curl -X GET \"{credits_url}\" "
"-H \"Authorization: Bearer $SKILLS_VIDEO_API_KEY\""
),
"recharge": {
"dashboard_url": "https://skills.video/dashboard",
"pricing_url": "https://skills.video/pricing",
},
}
print(json.dumps(output, ensure_ascii=False, indent=2))
return exit_code_for(category)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/create_and_wait.py
#!/usr/bin/env python3
"""Create generation via SSE, then fallback to polling until terminal status."""
from __future__ import annotations
import argparse
import json
import os
import socket
import subprocess
import sys
from pathlib import Path
from typing import Any
from urllib import error, request
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
TERMINAL_EVENT_NAMES = {
"completed",
"complete",
"succeeded",
"success",
"failed",
"failure",
"canceled",
"cancelled",
"done",
"finished",
"terminal",
}
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def load_payload(args: argparse.Namespace) -> dict[str, Any]:
if args.payload_file:
raw = Path(args.payload_file).read_text(encoding="utf-8")
payload = parse_json_or_text(raw)
elif args.payload is not None:
payload = parse_json_or_text(args.payload)
else:
payload = {}
if payload is None:
return {}
if not isinstance(payload, dict):
raise SystemExit("Payload must be a JSON object.")
return payload
def endpoint_url(base_url: str, endpoint: str) -> str:
if endpoint.startswith("http://") or endpoint.startswith("https://"):
return endpoint
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
return f"{base_url.rstrip('/')}{endpoint}"
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def find_generation_id(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"generation_id",
"generationId",
"prediction_id",
"predictionId",
"task_id",
"taskId",
"id",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def is_terminal_event_name(event_name: str | None) -> bool:
if not event_name:
return False
return event_name.strip().lower() in TERMINAL_EVENT_NAMES
def run_sse(
url: str,
api_key: str,
payload: dict[str, Any],
request_timeout: float,
) -> tuple[int, str | None, Any]:
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
},
data=json.dumps(payload).encode("utf-8"),
method="POST",
)
generation_id: str | None = None
terminal_payload: Any = None
event_name = "message"
data_lines: list[str] = []
def flush_event() -> tuple[bool, int]:
nonlocal event_name, data_lines, generation_id, terminal_payload
if not data_lines:
event_name = "message"
return False, 0
text = "\n".join(data_lines)
parsed = parse_json_or_text(text)
status_value, status_key = find_status(parsed)
status_normalized = status_value.upper() if status_value else None
event_terminal = (status_normalized in TERMINAL_STATUSES) or is_terminal_event_name(event_name)
current_id, _ = find_generation_id(parsed)
if current_id and not generation_id:
generation_id = current_id
emit(
{
"event": "sse_event",
"sse_event_name": event_name,
"generation_id": generation_id,
"status": status_value,
"status_key": status_key,
"terminal": event_terminal,
"payload": parsed,
}
)
data_lines = []
event_name = "message"
if event_terminal:
terminal_payload = parsed
if status_normalized in SUCCESS_STATUSES:
return True, 0
if status_normalized in TERMINAL_STATUSES:
return True, 10
return True, 0
return False, 0
try:
with request.urlopen(req, timeout=request_timeout) as resp:
emit({"event": "sse_open", "http_status": resp.getcode(), "url": url})
for raw_line in resp:
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line:
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
event_name = line.split(":", 1)[1].strip() or "message"
elif line.startswith("data:"):
data_lines.append(line.split(":", 1)[1].lstrip())
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
emit(
{
"event": "sse_stream_ended",
"generation_id": generation_id,
"message": "SSE stream ended before terminal status.",
}
)
return 100, generation_id, None
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
parsed = parse_json_or_text(body)
emit(
{
"event": "sse_http_error",
"http_status": exc.code,
"message": extract_message(parsed),
"payload": parsed,
}
)
return 101, generation_id, parsed
except (socket.timeout, TimeoutError) as exc:
emit({"event": "sse_timeout", "message": str(exc), "generation_id": generation_id})
return 102, generation_id, None
except error.URLError as exc:
emit({"event": "sse_connect_error", "message": str(exc), "generation_id": generation_id})
return 103, generation_id, None
def run_poll_fallback(
generation_id: str,
base_url: str,
timeout: float,
interval: float,
request_timeout: float,
) -> int:
wait_script = Path(__file__).with_name("wait_generation.py")
if not wait_script.exists():
emit(
{
"event": "error",
"error": "wait_script_not_found",
"message": f"Missing fallback script: {wait_script}",
}
)
return 2
cmd = [
sys.executable,
str(wait_script),
"--generation-id",
generation_id,
"--base-url",
base_url,
"--timeout",
str(timeout),
"--interval",
str(interval),
"--request-timeout",
str(request_timeout),
]
emit(
{
"event": "fallback_polling_start",
"generation_id": generation_id,
"command": " ".join(cmd),
}
)
result = subprocess.run(cmd, check=False)
emit(
{
"event": "fallback_polling_end",
"generation_id": generation_id,
"exit_code": result.returncode,
}
)
return result.returncode
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--sse-endpoint", required=True, help="SSE create endpoint path or full URL")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--payload", help="JSON object payload string")
parser.add_argument("--payload-file", help="Path to JSON payload file")
parser.add_argument("--sse-request-timeout", type=float, default=120.0)
parser.add_argument("--poll-timeout", type=float, default=900.0)
parser.add_argument("--poll-interval", type=float, default=3.0)
parser.add_argument("--poll-request-timeout", type=float, default=20.0)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before calling skills.video APIs.",
}
)
return 1
if args.payload is not None and args.payload_file:
emit(
{
"event": "error",
"error": "invalid_arguments",
"message": "Use either --payload or --payload-file, not both.",
}
)
return 1
payload = load_payload(args)
url = endpoint_url(args.base_url, args.sse_endpoint)
emit({"event": "start", "url": url, "mode": "sse_then_poll_fallback"})
sse_rc, generation_id, terminal_payload = run_sse(
url=url,
api_key=api_key,
payload=payload,
request_timeout=args.sse_request_timeout,
)
if sse_rc in (0, 10):
emit(
{
"event": "terminal",
"source": "sse",
"ok": sse_rc == 0,
"generation_id": generation_id,
"response": terminal_payload,
}
)
return sse_rc
if not generation_id:
emit(
{
"event": "error",
"error": "missing_generation_id",
"message": "SSE did not return generation id; cannot start poll fallback.",
"sse_exit_code": sse_rc,
}
)
return 3
return run_poll_fallback(
generation_id=generation_id,
base_url=args.base_url,
timeout=args.poll_timeout,
interval=args.poll_interval,
request_timeout=args.poll_request_timeout,
)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/wait_generation.py
#!/usr/bin/env python3
"""Poll a skills.video generation task until it reaches terminal status."""
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Any
from urllib import error, request
TRANSIENT_HTTP_STATUSES = {429, 500, 502, 503, 504}
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def fetch_generation(
base_url: str,
generation_id: str,
api_key: str,
request_timeout: float,
) -> tuple[int, Any]:
url = f"{base_url.rstrip('/')}/generation/{generation_id}"
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
method="GET",
)
try:
with request.urlopen(req, timeout=request_timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return resp.getcode(), parse_json_or_text(raw)
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return exc.code, parse_json_or_text(body)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--generation-id", required=True, help="Generation id to poll")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--interval", type=float, default=3.0, help="Poll interval in seconds")
parser.add_argument("--timeout", type=float, default=600.0, help="Total wait timeout in seconds")
parser.add_argument(
"--request-timeout",
type=float,
default=20.0,
help="HTTP request timeout per poll in seconds",
)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before polling generation status.",
}
)
return 1
if args.interval <= 0:
emit({"event": "error", "error": "invalid_interval", "message": "--interval must be > 0"})
return 1
if args.timeout <= 0:
emit({"event": "error", "error": "invalid_timeout", "message": "--timeout must be > 0"})
return 1
start = time.monotonic()
attempt = 0
last_status: str | None = None
while True:
elapsed = time.monotonic() - start
if elapsed > args.timeout:
emit(
{
"event": "timeout",
"generation_id": args.generation_id,
"elapsed_seconds": round(elapsed, 2),
"last_status": last_status,
}
)
return 11
attempt += 1
http_status, payload = fetch_generation(
base_url=args.base_url,
generation_id=args.generation_id,
api_key=api_key,
request_timeout=args.request_timeout,
)
if http_status >= 400:
message = extract_message(payload)
transient = http_status in TRANSIENT_HTTP_STATUSES
emit(
{
"event": "poll_error",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"transient": transient,
"message": message,
}
)
if transient:
time.sleep(args.interval)
continue
return 12
status_value, status_key = find_status(payload)
normalized = status_value.upper() if status_value else None
terminal = normalized in TERMINAL_STATUSES if normalized else False
emit(
{
"event": "poll",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"status": status_value,
"status_key": status_key,
"terminal": terminal,
}
)
if status_value:
last_status = status_value
if terminal:
ok = normalized in SUCCESS_STATUSES
emit(
{
"event": "terminal",
"ok": ok,
"generation_id": args.generation_id,
"status": status_value,
"response": payload,
}
)
return 0 if ok else 10
time.sleep(args.interval)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/inspect_openapi.py
#!/usr/bin/env python3
"""Inspect skills.video generation OpenAPI contracts and emit endpoint-ready summaries."""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_CATEGORY = "videos"
METHOD_AND_PATH_RE = re.compile(r"^(GET|POST|PUT|PATCH|DELETE)\s+(.+)$", re.IGNORECASE)
def read_json(path: Path) -> dict[str, Any]:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise SystemExit(f"File not found: {path}")
except json.JSONDecodeError as exc:
raise SystemExit(f"Invalid JSON in {path}: {exc}")
def find_group_nodes(node: Any, group_name: str) -> list[dict[str, Any]]:
matches: list[dict[str, Any]] = []
if isinstance(node, dict):
if node.get("group") == group_name and isinstance(node.get("pages"), list):
matches.append(node)
for value in node.values():
matches.extend(find_group_nodes(value, group_name))
elif isinstance(node, list):
for item in node:
matches.extend(find_group_nodes(item, group_name))
return matches
def collect_method_paths(node: Any) -> list[str]:
rows: list[str] = []
if isinstance(node, str):
match = METHOD_AND_PATH_RE.match(node.strip())
if match:
method = match.group(1).upper()
path = match.group(2).strip()
if method == "POST" and path.startswith("/generation/"):
rows.append(f"{method} {path}")
elif isinstance(node, dict):
if "pages" in node:
rows.extend(collect_method_paths(node["pages"]))
elif isinstance(node, list):
for item in node:
rows.extend(collect_method_paths(item))
return rows
def unique_in_order(items: list[str]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for item in items:
if item not in seen:
seen.add(item)
output.append(item)
return output
def list_endpoints_from_docs(docs: dict[str, Any], category: str) -> list[str]:
group_name = "Videos" if category == "videos" else "Images"
rows: list[str] = []
for group in find_group_nodes(docs, group_name):
rows.extend(collect_method_paths(group.get("pages", [])))
return unique_in_order(rows)
def parse_endpoint_arg(endpoint: str) -> tuple[str, str]:
endpoint = endpoint.strip()
match = METHOD_AND_PATH_RE.match(endpoint)
if match:
return match.group(1).upper(), match.group(2).strip()
return "POST", endpoint
def try_paths(path: str) -> list[str]:
candidates = [path]
if path.startswith("/v1/"):
candidates.append(path[3:])
elif path.startswith("/"):
candidates.append(f"/v1{path}")
else:
candidates.append(f"/v1/{path}")
return unique_in_order(candidates)
def resolve_operation(openapi: dict[str, Any], method: str, path: str) -> tuple[str, dict[str, Any]]:
paths = openapi.get("paths", {})
for candidate in try_paths(path):
operations = paths.get(candidate)
if isinstance(operations, dict) and method.lower() in operations:
operation = operations[method.lower()]
if isinstance(operation, dict):
return candidate, operation
raise SystemExit(f"Endpoint not found in OpenAPI: {method} {path}")
def endpoint_exists(openapi: dict[str, Any], method: str, path: str) -> bool:
operations = openapi.get("paths", {}).get(path)
return isinstance(operations, dict) and method.lower() in operations
def to_sse_path(path: str) -> str:
if "/generation/sse/" in path:
return path
return path.replace("/generation/", "/generation/sse/", 1)
def to_polling_path(path: str) -> str:
return path.replace("/generation/sse/", "/generation/", 1)
def find_sse_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
sse_candidate = to_sse_path(resolved_path)
if endpoint_exists(openapi, method, sse_candidate):
return sse_candidate
if "/generation/sse/" in resolved_path and endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_polling_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
polling_candidate = to_polling_path(resolved_path)
if endpoint_exists(openapi, method, polling_candidate):
return polling_candidate
if endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_subscribe_sse_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/predictions/\{[^}]+\}/subscribe$", path):
return path
return None
def build_default_and_sse_endpoints(
openapi: dict[str, Any], endpoints: list[str]
) -> tuple[list[str], list[str]]:
default_endpoints: list[str] = []
sse_endpoints: list[str] = []
for item in endpoints:
method, path = parse_endpoint_arg(item)
sse_path = to_sse_path(path)
if endpoint_exists(openapi, method, sse_path):
resolved = f"{method} {sse_path}"
default_endpoints.append(resolved)
sse_endpoints.append(resolved)
else:
default_endpoints.append(f"{method} {path}")
return (
unique_in_order(default_endpoints),
unique_in_order(sse_endpoints),
)
def ref_name(ref: str) -> str:
return ref.rsplit("/", 1)[-1]
def resolve_schema(openapi: dict[str, Any], schema: dict[str, Any]) -> tuple[str | None, dict[str, Any]]:
if "$ref" not in schema:
return None, schema
name = ref_name(schema["$ref"])
resolved = openapi.get("components", {}).get("schemas", {}).get(name)
if not isinstance(resolved, dict):
raise SystemExit(f"Schema not found for ref: {schema['$ref']}")
return name, resolved
def allowed_values(schema: dict[str, Any]) -> list[Any] | None:
if isinstance(schema.get("enum"), list):
return schema["enum"]
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
consts = [item["const"] for item in branch if isinstance(item, dict) and "const" in item]
if consts:
return consts
return None
def summarize_type(schema: dict[str, Any]) -> str:
if "$ref" in schema:
return ref_name(schema["$ref"])
schema_type = schema.get("type")
if schema_type == "array":
item_schema = schema.get("items") if isinstance(schema.get("items"), dict) else {}
return f"array<{summarize_type(item_schema)}>"
if isinstance(schema_type, str):
return schema_type
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
variants: list[str] = []
for item in branch:
if not isinstance(item, dict):
continue
if "const" in item:
const_value = item["const"]
if isinstance(const_value, bool):
variants.append("boolean")
elif isinstance(const_value, int):
variants.append("integer")
elif isinstance(const_value, float):
variants.append("number")
elif isinstance(const_value, str):
variants.append("string")
else:
variants.append(type(const_value).__name__)
elif "$ref" in item:
variants.append(ref_name(item["$ref"]))
elif isinstance(item.get("type"), str):
variants.append(item["type"])
if variants:
return " | ".join(unique_in_order(variants))
return "object"
def template_value(field_name: str, schema: dict[str, Any]) -> Any:
if "default" in schema:
return schema["default"]
values = allowed_values(schema)
if values:
return values[0]
schema_type = schema.get("type")
if schema_type == "string":
if schema.get("format") == "uri":
return f"https://example.com/{field_name}.png"
if "prompt" in field_name:
return "Describe what to generate"
return f"<{field_name}>"
if schema_type in {"integer", "number"}:
return 1
if schema_type == "boolean":
return False
if schema_type == "array":
item = schema.get("items") if isinstance(schema.get("items"), dict) else {}
if item.get("format") == "uri":
return ["https://example.com/input.png"]
return []
if schema_type == "object":
return {}
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
for item in branch:
if isinstance(item, dict) and "const" in item:
return item["const"]
return f"<{field_name}>"
def summarize_fields(schema: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
properties = schema.get("properties") if isinstance(schema.get("properties"), dict) else {}
required = schema.get("required") if isinstance(schema.get("required"), list) else []
required_set = set(required)
fields: list[dict[str, Any]] = []
template: dict[str, Any] = {}
for name, prop in properties.items():
if not isinstance(prop, dict):
continue
row: dict[str, Any] = {
"name": name,
"required": name in required_set,
"type": summarize_type(prop),
}
if isinstance(prop.get("description"), str):
row["description"] = prop["description"]
if "default" in prop:
row["default"] = prop["default"]
values = allowed_values(prop)
if values:
row["allowed_values"] = values
fields.append(row)
for name in required:
prop = properties.get(name)
if isinstance(prop, dict):
template[name] = template_value(name, prop)
for name, prop in properties.items():
if name in template:
continue
if isinstance(prop, dict) and "default" in prop:
template[name] = prop["default"]
return fields, template
def find_poll_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
preferred = ["/generation/{id}", "/v1/generation/{request_id}", "/v1/generation/{id}"]
for path in preferred:
ops = paths.get(path)
if isinstance(ops, dict) and "get" in ops:
return path
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/generation/\{[^}]+\}", path):
return path
return None
def status_values(openapi: dict[str, Any], schema_name: str) -> list[str] | None:
schema = openapi.get("components", {}).get("schemas", {}).get(schema_name)
if isinstance(schema, dict) and isinstance(schema.get("enum"), list):
if all(isinstance(item, str) for item in schema["enum"]):
return schema["enum"]
return None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--openapi", required=True, help="Path to OpenAPI JSON")
parser.add_argument("--docs", help="Path to docs.json for Videos/Images group extraction")
parser.add_argument("--category", choices=["videos", "images"], default=DEFAULT_CATEGORY)
parser.add_argument("--list-endpoints", action="store_true", help="List POST generation endpoints from docs.json group")
parser.add_argument("--endpoint", help="Endpoint path or full string like 'POST /generation/...'")
parser.add_argument("--include-template", action="store_true", help="Include request_template in output")
parser.add_argument("--json-indent", type=int, default=2)
args = parser.parse_args()
openapi_path = Path(args.openapi)
openapi = read_json(openapi_path)
if args.list_endpoints:
if not args.docs:
raise SystemExit("--list-endpoints requires --docs <path/to/docs.json>")
docs = read_json(Path(args.docs))
polling_endpoints = list_endpoints_from_docs(docs, args.category)
default_endpoints, sse_endpoints = build_default_and_sse_endpoints(
openapi, polling_endpoints
)
print(
json.dumps(
{
"category": args.category,
"source": str(Path(args.docs).resolve()),
"default_result_mode": "sse" if sse_endpoints else "polling",
"default_endpoints": default_endpoints,
"sse_endpoints": sse_endpoints,
"polling_endpoints": polling_endpoints,
"poll_endpoint": find_poll_endpoint(openapi),
},
ensure_ascii=False,
indent=args.json_indent,
)
)
if not args.endpoint:
return 0
if not args.endpoint:
raise SystemExit("Provide --endpoint or use --list-endpoints only")
method, raw_path = parse_endpoint_arg(args.endpoint)
resolved_path, operation = resolve_operation(openapi, method, raw_path)
sse_create_endpoint = find_sse_create_endpoint(openapi, method, resolved_path)
polling_create_endpoint = find_polling_create_endpoint(openapi, method, resolved_path)
request_schema = (
operation.get("requestBody", {})
.get("content", {})
.get("application/json", {})
.get("schema", {})
)
if not isinstance(request_schema, dict):
request_schema = {}
request_schema_name, resolved_request_schema = resolve_schema(openapi, request_schema)
fields, request_template = summarize_fields(resolved_request_schema)
responses = operation.get("responses", {})
if not isinstance(responses, dict):
responses = {}
error_codes = sorted(
int(code) for code in responses.keys() if isinstance(code, str) and code.isdigit() and int(code) >= 400
)
servers = openapi.get("servers") if isinstance(openapi.get("servers"), list) else []
server_url = None
if servers and isinstance(servers[0], dict):
server_url = servers[0].get("url")
output: dict[str, Any] = {
"endpoint": f"{method} {resolved_path}",
"default_result_mode": "sse" if sse_create_endpoint else "polling",
"default_create_endpoint": f"{method} {sse_create_endpoint or polling_create_endpoint or resolved_path}",
"sse_create_endpoint": f"{method} {sse_create_endpoint}" if sse_create_endpoint else None,
"polling_create_endpoint": f"{method} {polling_create_endpoint}" if polling_create_endpoint else None,
"summary": operation.get("summary"),
"server_url": server_url,
"request_schema": request_schema_name,
"fields": fields,
"prediction_status_values": status_values(openapi, "PredictionStatus"),
"queue_status_values": status_values(openapi, "QueueState"),
"sse_subscribe_endpoint": find_subscribe_sse_endpoint(openapi),
"poll_endpoint": find_poll_endpoint(openapi),
"error_status_codes": error_codes,
}
if args.include_template:
output["request_template"] = request_template
print(json.dumps(output, ensure_ascii=False, indent=args.json_indent))
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:agents/openai.yaml
interface:
display_name: "ai-video"
short_description: "Access the world's leading models in one place to produce videos"
default_prompt: "Use $ai-video to create videos of {subject}."
FILE:references/video-model-endpoints.md
# Video Model Endpoints (Snapshot)
Snapshot source: `docs.json` group `APIs -> Videos`
Snapshot date: 2026-03-20
Default mode: SSE-first.
- Primary call path: replace `/generation/` with `/generation/sse/` for each endpoint below.
- Fallback call path: use the listed endpoint below with polling result fetch.
## Endpoints
- `POST /generation/openai/sora-2-pro`
- `POST /generation/openai/sora-2`
- `POST /generation/google/veo-2`
- `POST /generation/google/veo-3.1`
- `POST /generation/google/veo-3.1-fast`
- `POST /generation/bytedance/seedance-1-pro`
- `POST /generation/bytedance/seedance-1-pro-fast`
- `POST /generation/bytedance/seedance-1.5`
- `POST /generation/xai/grok-imagine-video`
- `POST /generation/kling-ai/v3/video`
- `POST /generation/kling-ai/o3/video`
- `POST /generation/kling-ai/o3/edit`
- `POST /generation/kling-ai/o1/video`
- `POST /generation/kling-ai/o1/edit`
- `POST /generation/kling-ai/kling-v2.6`
- `POST /generation/kling-ai/motion-control-v2.6`
- `POST /generation/kling-ai/kling-v2.5-turbo`
- `POST /generation/runwayml/gen-4.5`
- `POST /generation/runwayml/gen4-turbo`
- `POST /generation/runwayml/gen4-aleph`
- `POST /generation/wan/wan-v2.6`
- `POST /generation/luma/ray-2`
- `POST /generation/luma/ray-2-flash`
- `POST /generation/vidu/q2/video-turbo`
- `POST /generation/vidu/q3`
- `POST /generation/minimax/hailuo-2.3`
- `POST /generation/minimax/hailuo-2.3-fast`
- `POST /generation/lightricks/ltx-2.3`
- `POST /generation/lightricks/ltx-2.3/audio-to-video`
- `POST /generation/lightricks/ltx-2.3/extend-video`
- `POST /generation/lightricks/ltx-2.3/retake-video`
- `POST /generation/pixverse/v5.6`
- `POST /generation/bytedance/omnihuman-v1.5`
- `POST /generation/sync/lipsync-2`
- `POST /generation/veed/lipsync`
- `POST /generation/pixverse/lipsync`
- `POST /generation/topaz/upscale/video`
- `POST /generation/runwayml/upscale-v1`
## Refresh command
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
FILE:references/open-platform-api.md
# skills.video Open Platform API Contract
## Scope
Use this reference for generation flows that create videos or images through `open.skills.video`.
## Base URL and endpoint shape
- Platform base URL: `https://open.skills.video/api/v1`
- Default generation endpoint pattern: `POST /generation/sse/{provider}/{model}`
- Polling create endpoint pattern (fallback): `POST /generation/{provider}/{model}`
- Task query endpoint pattern (fallback): `GET /generation/{id}`
Some model-scoped OpenAPI files use:
- `servers[0].url = /api`
- paths prefixed with `/v1/...`
Treat these as equivalent path representations under the same host.
## Authentication
Use bearer auth on every API call:
```http
Authorization: Bearer <API_KEY>
```
Security scheme in OpenAPI: `bearerAuth` (`type: http`, `scheme: bearer`).
## First-time API key setup
If `SKILLS_VIDEO_API_KEY` is not configured:
1. Open `https://skills.video/dashboard/developer` and sign in.
2. Click `Create API Key`.
3. Export the key in your shell:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## Async generation lifecycle
1. Submit generation with SSE endpoint `POST /generation/sse/{provider}/{model}`.
2. Read stream events (`text/event-stream`) until terminal event.
3. Fall back to polling only if SSE cannot connect, disconnects early, or does not reach terminal state.
4. Poll fallback result with `GET /generation/{id}` (or model-scoped equivalent path).
5. Stop polling at terminal status.
Important:
- `IN_QUEUE` / `IN_PROGRESS` are non-terminal. Do not return as final result.
- Return to caller only after terminal status (`COMPLETED`, `SUCCEEDED`, `FAILED`, `CANCELED`).
## Status values from OpenAPI
### PredictionStatus
- `starting`
- `processing`
- `succeeded`
- `failed`
- `canceled`
### QueueState
- `IN_QUEUE`
- `IN_PROGRESS`
- `COMPLETED`
- `FAILED`
- `CANCELED`
## Error response shape
`ErrorResponse` defines:
- `message` (`string`)
Common documented HTTP errors on generation endpoints:
- `400`
- `401`
- `404`
- `422`
Runtime note:
- Credit exhaustion can appear as `402` and/or error text indicating insufficient credits.
## Insufficient credits handling
When runtime error indicates insufficient credits:
1. Stop automatic retries for this request.
2. Tell the user to recharge credits in `https://skills.video/dashboard` (Billing/Credits).
3. Retry only after recharge.
Use `GET /credits` to check remaining balance:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
## Retry baseline
- Retry only transient failures (`429`, `5xx`, transport timeouts, connection resets).
- Do not retry unchanged payloads on validation/auth errors (`400`, `401`, `404`, `422`).
- Use bounded exponential backoff when retrying.
- Preserve SSE-first behavior on retries; use polling only as fallback.
## Unknowns
OpenAPI in this workspace does not declare concrete rate-limit windows or global timeout budgets.
Mark both as unknown unless an endpoint-specific OpenAPI explicitly documents them.
Build and execute skills.video image generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document image generation calls on...
---
name: ai-image
description: Build and execute skills.video image generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document image generation calls on open.skills.video.
license: MIT
metadata:
openclaw:
author: skills-video
os:
- linux
- darwin
requires:
env:
- SKILLS_VIDEO_API_KEY
bins:
- python3
- curl
primaryEnv: SKILLS_VIDEO_API_KEY
cliHelp: |
Configure API key:
export SKILLS_VIDEO_API_KEY="your_api_key_here"
Verify:
python scripts/ensure_api_key.py
config:
stateDirs:
- ~/.openclaw
example: "Required env vars: SKILLS_VIDEO_API_KEY. Store the key in OpenClaw skill env or shell env and do not hardcode it in files."
links:
repository: https://github.com/skills-video/skills
homepage: https://skills.video
---
# ai-image
## Overview
Use this skill to turn OpenAPI definitions into working image-generation API calls for `skills.video`.
Prefer deterministic extraction from `openapi.json` instead of guessing fields.
## Prerequisites
1. Obtain an API key at: `https://skills.video/dashboard/developer`
2. Configure `SKILLS_VIDEO_API_KEY` before using the skill.
Preferred OpenClaw setup:
- Open the skill settings for `ai-image`
- Add an environment variable named `SKILLS_VIDEO_API_KEY`
- Paste the API key as its value
Equivalent config shape:
```json
{
"skills": {
"entries": {
"ai-image": {
"enabled": true,
"env": {
"SKILLS_VIDEO_API_KEY": "your_api_key_here"
}
}
}
}
}
```
Other valid ways to provide the key:
- **Shell**: `export SKILLS_VIDEO_API_KEY="your_api_key_here"`
- **Tool-specific env config**: any runtime that injects `SKILLS_VIDEO_API_KEY`
## Workflow
1. Check API key and bootstrap environment on first use.
2. Identify the active spec.
3. Select the SSE endpoint pair for an image model.
4. Extract request schema and generate a payload template.
5. Execute `POST /generation/sse/...` as default and keep the stream open.
6. If SSE does not reach terminal completion, poll `GET /generation/{id}` to terminal status.
7. Return only terminal result (`COMPLETED`/`SUCCEEDED`/`FAILED`/`CANCELED`), never `IN_PROGRESS`.
8. Apply retry and failure handling.
## 0) Check API key (first run)
Run this check before any API call.
```bash
python scripts/ensure_api_key.py
```
If `ok` is `false`, tell the user to:
- Follow the setup in `Prerequisites`
Example:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## 1) Identify the spec
Load the most specific OpenAPI first.
- Prefer model-specific OpenAPI when available (for example `/v1/openapi.json` under a model namespace).
- Fall back to platform-level `openapi.json`.
- Use `references/open-platform-api.md` for base URL, auth, and async lifecycle.
## 2) Select an image endpoint
If `docs.json` exists, derive image endpoints from the `Images` navigation group.
Use `default_endpoints` from the script output as the primary list (SSE first).
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
When `docs.json` is unavailable, pass a known endpoint directly (for example `/generation/sse/google/nano-banana-pro`).
Use `references/image-model-endpoints.md` as a snapshot list.
## 3) Extract schema and build payload
Inspect endpoint details and generate a request template from required/default fields.
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--endpoint /generation/sse/google/nano-banana-pro \
--include-template
```
Use the returned `request_template` as the starting point.
Do not add fields not defined by the endpoint schema.
Use `default_create_endpoint` from output unless an explicit override is required.
## 4) Execute SSE request (default) with automatic fallback
Prefer the helper script. It creates via SSE and keeps streaming; if stream ends before terminal completion, it automatically switches to polling fallback.
```bash
python scripts/create_and_wait.py \
--sse-endpoint /generation/sse/google/nano-banana-pro \
--payload '{"prompt":"Minimal product photo of a matte black coffee grinder on white background"}' \
--poll-timeout 900 \
--poll-interval 3
```
Treat SSE as the default result channel.
Do not finish the task on `IN_QUEUE` or `IN_PROGRESS`.
Return only after terminal result.
## 5) Fall back to polling
Use polling only if SSE cannot be established, disconnects early, or does not reach a terminal state.
Use `GET /generation/{id}` (or model-spec equivalent path if the OpenAPI uses `/v1/...`).
```bash
curl -X GET "https://open.skills.video/api/v1/generation/<GENERATION_ID>" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Stop polling on terminal states:
- `COMPLETED`
- `FAILED`
- `CANCELED`
Recommended helper:
```bash
python scripts/wait_generation.py \
--generation-id <GENERATION_ID> \
--timeout 900 \
--interval 3
```
Return to user only after helper emits `event=terminal`.
## 6) Handle errors and retries
Handle these response codes for create, SSE, and fallback poll operations:
- `400`: request format issue
- `401`: missing/invalid API key
- `402`: possible payment/credits issue in runtime
- `404`: endpoint or generation id not found
- `422`: schema validation failed
Classify non-2xx runtime errors with:
```bash
python scripts/handle_runtime_error.py \
--status <HTTP_STATUS> \
--body '<RAW_ERROR_BODY_JSON_OR_TEXT>'
```
If category is `insufficient_credits`, tell the user to recharge:
- Open `https://skills.video/dashboard` and go to Billing/Credits
- Recharge or purchase additional credits
- Retry after recharge
Optional balance check:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Apply retries only for transient conditions (network failure or temporary `5xx`).
Use bounded exponential backoff (for example `1s`, `2s`, `4s`, max `16s`, then fail).
Do not retry unchanged payloads after `4xx` validation errors.
## Rate limits and timeouts
Treat rate limits and server-side timeout windows as unknown unless documented in the active OpenAPI or product docs.
If unknown, explicitly note this in output and choose conservative client defaults.
## Resources
- `scripts/ensure_api_key.py`: validate `SKILLS_VIDEO_API_KEY` and show first-run setup guidance
- `scripts/handle_runtime_error.py`: classify runtime errors and provide recharge guidance for insufficient credits
- `scripts/inspect_openapi.py`: extract SSE/polling endpoint pair, contract, and payload template
- `scripts/create_and_wait.py`: create via SSE and auto-fallback to polling when stream does not reach terminal status
- `scripts/wait_generation.py`: poll generation status until terminal completion and return final response
- `references/open-platform-api.md`: SSE-first lifecycle, fallback polling, retry baseline
- `references/image-model-endpoints.md`: current image endpoint snapshot from `docs.json`
FILE:scripts/ensure_api_key.py
#!/usr/bin/env python3
"""Check SKILLS_VIDEO_API_KEY and print setup guidance when missing."""
from __future__ import annotations
import json
import os
def main() -> int:
key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if key:
print(
json.dumps(
{
"ok": True,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "API key detected.",
},
ensure_ascii=False,
indent=2,
)
)
return 0
print(
json.dumps(
{
"ok": False,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "Missing API key. Configure it before calling skills.video APIs.",
"dashboard_url": "https://skills.video/dashboard/developer",
"how_to_get_key": [
"Sign in at the dashboard URL.",
"Click 'Create API Key'.",
"Copy the generated key.",
],
"set_env_examples": [
"export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"",
"echo 'export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"' >> ~/.zshrc && source ~/.zshrc",
],
},
ensure_ascii=False,
indent=2,
)
)
return 1
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/handle_runtime_error.py
#!/usr/bin/env python3
"""Classify skills.video runtime API errors and provide actionable guidance."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
INSUFFICIENT_CREDITS_KEYWORDS = (
"insufficient credit",
"not enough credit",
"insufficient balance",
"credit balance",
"top up",
"recharge",
)
def load_body(body: str | None, body_file: str | None) -> Any:
if body_file:
raw = Path(body_file).read_text(encoding="utf-8")
elif body is not None:
raw = body
else:
return None
raw = raw.strip()
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error = payload.get("error")
if isinstance(error, dict):
nested = error.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def classify(status: int, message: str) -> str:
lowered = message.lower()
if status == 402 or any(keyword in lowered for keyword in INSUFFICIENT_CREDITS_KEYWORDS):
return "insufficient_credits"
if status == 401 or "unauthorized" in lowered:
return "auth"
if status == 422 or "validation" in lowered:
return "validation"
if status == 404:
return "not_found"
if status == 429 or status >= 500:
return "transient"
return "unknown"
def exit_code_for(category: str) -> int:
table = {
"insufficient_credits": 20,
"auth": 21,
"validation": 22,
"not_found": 23,
"transient": 24,
"unknown": 25,
}
return table.get(category, 25)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--status", type=int, required=True, help="HTTP status code")
parser.add_argument("--body", help="Raw response body JSON/text")
parser.add_argument("--body-file", help="Path to file containing raw response body")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--credits-endpoint", default="/credits")
args = parser.parse_args()
payload = load_body(args.body, args.body_file)
message = extract_message(payload)
category = classify(args.status, message)
guidance: list[str] = []
should_retry = False
if category == "insufficient_credits":
guidance = [
"Credits are insufficient for this request.",
"Open https://skills.video/dashboard and go to Billing/Credits to recharge.",
"After recharge, retry the same request.",
]
elif category == "auth":
guidance = [
"Authentication failed.",
"Verify SKILLS_VIDEO_API_KEY and retry.",
]
elif category == "validation":
guidance = [
"Request payload validation failed.",
"Fix request parameters based on OpenAPI schema and retry.",
]
elif category == "not_found":
guidance = [
"Resource or endpoint was not found.",
"Recheck endpoint path/model id and generation id.",
]
elif category == "transient":
guidance = [
"Transient server or rate-limit error.",
"Retry with bounded exponential backoff.",
]
should_retry = True
else:
guidance = [
"Unhandled runtime error.",
"Inspect response payload and apply a safe fallback path.",
]
credits_url = f"{args.base_url.rstrip('/')}{args.credits_endpoint}"
output = {
"category": category,
"status": args.status,
"message": message,
"should_retry": should_retry,
"guidance": guidance,
"credits_check_command": (
f"curl -X GET \"{credits_url}\" "
"-H \"Authorization: Bearer $SKILLS_VIDEO_API_KEY\""
),
"recharge": {
"dashboard_url": "https://skills.video/dashboard",
"pricing_url": "https://skills.video/pricing",
},
}
print(json.dumps(output, ensure_ascii=False, indent=2))
return exit_code_for(category)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/create_and_wait.py
#!/usr/bin/env python3
"""Create generation via SSE, then fallback to polling until terminal status."""
from __future__ import annotations
import argparse
import json
import os
import socket
import subprocess
import sys
from pathlib import Path
from typing import Any
from urllib import error, request
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
TERMINAL_EVENT_NAMES = {
"completed",
"complete",
"succeeded",
"success",
"failed",
"failure",
"canceled",
"cancelled",
"done",
"finished",
"terminal",
}
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def load_payload(args: argparse.Namespace) -> dict[str, Any]:
if args.payload_file:
raw = Path(args.payload_file).read_text(encoding="utf-8")
payload = parse_json_or_text(raw)
elif args.payload is not None:
payload = parse_json_or_text(args.payload)
else:
payload = {}
if payload is None:
return {}
if not isinstance(payload, dict):
raise SystemExit("Payload must be a JSON object.")
return payload
def endpoint_url(base_url: str, endpoint: str) -> str:
if endpoint.startswith("http://") or endpoint.startswith("https://"):
return endpoint
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
return f"{base_url.rstrip('/')}{endpoint}"
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def find_generation_id(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"generation_id",
"generationId",
"prediction_id",
"predictionId",
"task_id",
"taskId",
"id",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def is_terminal_event_name(event_name: str | None) -> bool:
if not event_name:
return False
return event_name.strip().lower() in TERMINAL_EVENT_NAMES
def run_sse(
url: str,
api_key: str,
payload: dict[str, Any],
request_timeout: float,
) -> tuple[int, str | None, Any]:
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
},
data=json.dumps(payload).encode("utf-8"),
method="POST",
)
generation_id: str | None = None
terminal_payload: Any = None
event_name = "message"
data_lines: list[str] = []
def flush_event() -> tuple[bool, int]:
nonlocal event_name, data_lines, generation_id, terminal_payload
if not data_lines:
event_name = "message"
return False, 0
text = "\n".join(data_lines)
parsed = parse_json_or_text(text)
status_value, status_key = find_status(parsed)
status_normalized = status_value.upper() if status_value else None
event_terminal = (status_normalized in TERMINAL_STATUSES) or is_terminal_event_name(event_name)
current_id, _ = find_generation_id(parsed)
if current_id and not generation_id:
generation_id = current_id
emit(
{
"event": "sse_event",
"sse_event_name": event_name,
"generation_id": generation_id,
"status": status_value,
"status_key": status_key,
"terminal": event_terminal,
"payload": parsed,
}
)
data_lines = []
event_name = "message"
if event_terminal:
terminal_payload = parsed
if status_normalized in SUCCESS_STATUSES:
return True, 0
if status_normalized in TERMINAL_STATUSES:
return True, 10
return True, 0
return False, 0
try:
with request.urlopen(req, timeout=request_timeout) as resp:
emit({"event": "sse_open", "http_status": resp.getcode(), "url": url})
for raw_line in resp:
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line:
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
event_name = line.split(":", 1)[1].strip() or "message"
elif line.startswith("data:"):
data_lines.append(line.split(":", 1)[1].lstrip())
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
emit(
{
"event": "sse_stream_ended",
"generation_id": generation_id,
"message": "SSE stream ended before terminal status.",
}
)
return 100, generation_id, None
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
parsed = parse_json_or_text(body)
emit(
{
"event": "sse_http_error",
"http_status": exc.code,
"message": extract_message(parsed),
"payload": parsed,
}
)
return 101, generation_id, parsed
except (socket.timeout, TimeoutError) as exc:
emit({"event": "sse_timeout", "message": str(exc), "generation_id": generation_id})
return 102, generation_id, None
except error.URLError as exc:
emit({"event": "sse_connect_error", "message": str(exc), "generation_id": generation_id})
return 103, generation_id, None
def run_poll_fallback(
generation_id: str,
base_url: str,
timeout: float,
interval: float,
request_timeout: float,
) -> int:
wait_script = Path(__file__).with_name("wait_generation.py")
if not wait_script.exists():
emit(
{
"event": "error",
"error": "wait_script_not_found",
"message": f"Missing fallback script: {wait_script}",
}
)
return 2
cmd = [
sys.executable,
str(wait_script),
"--generation-id",
generation_id,
"--base-url",
base_url,
"--timeout",
str(timeout),
"--interval",
str(interval),
"--request-timeout",
str(request_timeout),
]
emit(
{
"event": "fallback_polling_start",
"generation_id": generation_id,
"command": " ".join(cmd),
}
)
result = subprocess.run(cmd, check=False)
emit(
{
"event": "fallback_polling_end",
"generation_id": generation_id,
"exit_code": result.returncode,
}
)
return result.returncode
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--sse-endpoint", required=True, help="SSE create endpoint path or full URL")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--payload", help="JSON object payload string")
parser.add_argument("--payload-file", help="Path to JSON payload file")
parser.add_argument("--sse-request-timeout", type=float, default=120.0)
parser.add_argument("--poll-timeout", type=float, default=900.0)
parser.add_argument("--poll-interval", type=float, default=3.0)
parser.add_argument("--poll-request-timeout", type=float, default=20.0)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before calling skills.video APIs.",
}
)
return 1
if args.payload is not None and args.payload_file:
emit(
{
"event": "error",
"error": "invalid_arguments",
"message": "Use either --payload or --payload-file, not both.",
}
)
return 1
payload = load_payload(args)
url = endpoint_url(args.base_url, args.sse_endpoint)
emit({"event": "start", "url": url, "mode": "sse_then_poll_fallback"})
sse_rc, generation_id, terminal_payload = run_sse(
url=url,
api_key=api_key,
payload=payload,
request_timeout=args.sse_request_timeout,
)
if sse_rc in (0, 10):
emit(
{
"event": "terminal",
"source": "sse",
"ok": sse_rc == 0,
"generation_id": generation_id,
"response": terminal_payload,
}
)
return sse_rc
if not generation_id:
emit(
{
"event": "error",
"error": "missing_generation_id",
"message": "SSE did not return generation id; cannot start poll fallback.",
"sse_exit_code": sse_rc,
}
)
return 3
return run_poll_fallback(
generation_id=generation_id,
base_url=args.base_url,
timeout=args.poll_timeout,
interval=args.poll_interval,
request_timeout=args.poll_request_timeout,
)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/wait_generation.py
#!/usr/bin/env python3
"""Poll a skills.video generation task until it reaches terminal status."""
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Any
from urllib import error, request
TRANSIENT_HTTP_STATUSES = {429, 500, 502, 503, 504}
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def fetch_generation(
base_url: str,
generation_id: str,
api_key: str,
request_timeout: float,
) -> tuple[int, Any]:
url = f"{base_url.rstrip('/')}/generation/{generation_id}"
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
method="GET",
)
try:
with request.urlopen(req, timeout=request_timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return resp.getcode(), parse_json_or_text(raw)
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return exc.code, parse_json_or_text(body)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--generation-id", required=True, help="Generation id to poll")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--interval", type=float, default=3.0, help="Poll interval in seconds")
parser.add_argument("--timeout", type=float, default=600.0, help="Total wait timeout in seconds")
parser.add_argument(
"--request-timeout",
type=float,
default=20.0,
help="HTTP request timeout per poll in seconds",
)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before polling generation status.",
}
)
return 1
if args.interval <= 0:
emit({"event": "error", "error": "invalid_interval", "message": "--interval must be > 0"})
return 1
if args.timeout <= 0:
emit({"event": "error", "error": "invalid_timeout", "message": "--timeout must be > 0"})
return 1
start = time.monotonic()
attempt = 0
last_status: str | None = None
while True:
elapsed = time.monotonic() - start
if elapsed > args.timeout:
emit(
{
"event": "timeout",
"generation_id": args.generation_id,
"elapsed_seconds": round(elapsed, 2),
"last_status": last_status,
}
)
return 11
attempt += 1
http_status, payload = fetch_generation(
base_url=args.base_url,
generation_id=args.generation_id,
api_key=api_key,
request_timeout=args.request_timeout,
)
if http_status >= 400:
message = extract_message(payload)
transient = http_status in TRANSIENT_HTTP_STATUSES
emit(
{
"event": "poll_error",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"transient": transient,
"message": message,
}
)
if transient:
time.sleep(args.interval)
continue
return 12
status_value, status_key = find_status(payload)
normalized = status_value.upper() if status_value else None
terminal = normalized in TERMINAL_STATUSES if normalized else False
emit(
{
"event": "poll",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"status": status_value,
"status_key": status_key,
"terminal": terminal,
}
)
if status_value:
last_status = status_value
if terminal:
ok = normalized in SUCCESS_STATUSES
emit(
{
"event": "terminal",
"ok": ok,
"generation_id": args.generation_id,
"status": status_value,
"response": payload,
}
)
return 0 if ok else 10
time.sleep(args.interval)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/inspect_openapi.py
#!/usr/bin/env python3
"""Inspect skills.video generation OpenAPI contracts and emit endpoint-ready summaries."""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_CATEGORY = "images"
METHOD_AND_PATH_RE = re.compile(r"^(GET|POST|PUT|PATCH|DELETE)\s+(.+)$", re.IGNORECASE)
def read_json(path: Path) -> dict[str, Any]:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise SystemExit(f"File not found: {path}")
except json.JSONDecodeError as exc:
raise SystemExit(f"Invalid JSON in {path}: {exc}")
def find_group_nodes(node: Any, group_name: str) -> list[dict[str, Any]]:
matches: list[dict[str, Any]] = []
if isinstance(node, dict):
if node.get("group") == group_name and isinstance(node.get("pages"), list):
matches.append(node)
for value in node.values():
matches.extend(find_group_nodes(value, group_name))
elif isinstance(node, list):
for item in node:
matches.extend(find_group_nodes(item, group_name))
return matches
def collect_method_paths(node: Any) -> list[str]:
rows: list[str] = []
if isinstance(node, str):
match = METHOD_AND_PATH_RE.match(node.strip())
if match:
method = match.group(1).upper()
path = match.group(2).strip()
if method == "POST" and path.startswith("/generation/"):
rows.append(f"{method} {path}")
elif isinstance(node, dict):
if "pages" in node:
rows.extend(collect_method_paths(node["pages"]))
elif isinstance(node, list):
for item in node:
rows.extend(collect_method_paths(item))
return rows
def unique_in_order(items: list[str]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for item in items:
if item not in seen:
seen.add(item)
output.append(item)
return output
def list_endpoints_from_docs(docs: dict[str, Any], category: str) -> list[str]:
group_name = "Videos" if category == "videos" else "Images"
rows: list[str] = []
for group in find_group_nodes(docs, group_name):
rows.extend(collect_method_paths(group.get("pages", [])))
return unique_in_order(rows)
def parse_endpoint_arg(endpoint: str) -> tuple[str, str]:
endpoint = endpoint.strip()
match = METHOD_AND_PATH_RE.match(endpoint)
if match:
return match.group(1).upper(), match.group(2).strip()
return "POST", endpoint
def try_paths(path: str) -> list[str]:
candidates = [path]
if path.startswith("/v1/"):
candidates.append(path[3:])
elif path.startswith("/"):
candidates.append(f"/v1{path}")
else:
candidates.append(f"/v1/{path}")
return unique_in_order(candidates)
def resolve_operation(openapi: dict[str, Any], method: str, path: str) -> tuple[str, dict[str, Any]]:
paths = openapi.get("paths", {})
for candidate in try_paths(path):
operations = paths.get(candidate)
if isinstance(operations, dict) and method.lower() in operations:
operation = operations[method.lower()]
if isinstance(operation, dict):
return candidate, operation
raise SystemExit(f"Endpoint not found in OpenAPI: {method} {path}")
def endpoint_exists(openapi: dict[str, Any], method: str, path: str) -> bool:
operations = openapi.get("paths", {}).get(path)
return isinstance(operations, dict) and method.lower() in operations
def to_sse_path(path: str) -> str:
if "/generation/sse/" in path:
return path
return path.replace("/generation/", "/generation/sse/", 1)
def to_polling_path(path: str) -> str:
return path.replace("/generation/sse/", "/generation/", 1)
def find_sse_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
sse_candidate = to_sse_path(resolved_path)
if endpoint_exists(openapi, method, sse_candidate):
return sse_candidate
if "/generation/sse/" in resolved_path and endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_polling_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
polling_candidate = to_polling_path(resolved_path)
if endpoint_exists(openapi, method, polling_candidate):
return polling_candidate
if endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_subscribe_sse_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/predictions/\{[^}]+\}/subscribe$", path):
return path
return None
def build_default_and_sse_endpoints(
openapi: dict[str, Any], endpoints: list[str]
) -> tuple[list[str], list[str]]:
default_endpoints: list[str] = []
sse_endpoints: list[str] = []
for item in endpoints:
method, path = parse_endpoint_arg(item)
sse_path = to_sse_path(path)
if endpoint_exists(openapi, method, sse_path):
resolved = f"{method} {sse_path}"
default_endpoints.append(resolved)
sse_endpoints.append(resolved)
else:
default_endpoints.append(f"{method} {path}")
return (
unique_in_order(default_endpoints),
unique_in_order(sse_endpoints),
)
def ref_name(ref: str) -> str:
return ref.rsplit("/", 1)[-1]
def resolve_schema(openapi: dict[str, Any], schema: dict[str, Any]) -> tuple[str | None, dict[str, Any]]:
if "$ref" not in schema:
return None, schema
name = ref_name(schema["$ref"])
resolved = openapi.get("components", {}).get("schemas", {}).get(name)
if not isinstance(resolved, dict):
raise SystemExit(f"Schema not found for ref: {schema['$ref']}")
return name, resolved
def allowed_values(schema: dict[str, Any]) -> list[Any] | None:
if isinstance(schema.get("enum"), list):
return schema["enum"]
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
consts = [item["const"] for item in branch if isinstance(item, dict) and "const" in item]
if consts:
return consts
return None
def summarize_type(schema: dict[str, Any]) -> str:
if "$ref" in schema:
return ref_name(schema["$ref"])
schema_type = schema.get("type")
if schema_type == "array":
item_schema = schema.get("items") if isinstance(schema.get("items"), dict) else {}
return f"array<{summarize_type(item_schema)}>"
if isinstance(schema_type, str):
return schema_type
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
variants: list[str] = []
for item in branch:
if not isinstance(item, dict):
continue
if "const" in item:
const_value = item["const"]
if isinstance(const_value, bool):
variants.append("boolean")
elif isinstance(const_value, int):
variants.append("integer")
elif isinstance(const_value, float):
variants.append("number")
elif isinstance(const_value, str):
variants.append("string")
else:
variants.append(type(const_value).__name__)
elif "$ref" in item:
variants.append(ref_name(item["$ref"]))
elif isinstance(item.get("type"), str):
variants.append(item["type"])
if variants:
return " | ".join(unique_in_order(variants))
return "object"
def template_value(field_name: str, schema: dict[str, Any]) -> Any:
if "default" in schema:
return schema["default"]
values = allowed_values(schema)
if values:
return values[0]
schema_type = schema.get("type")
if schema_type == "string":
if schema.get("format") == "uri":
return f"https://example.com/{field_name}.png"
if "prompt" in field_name:
return "Describe what to generate"
return f"<{field_name}>"
if schema_type in {"integer", "number"}:
return 1
if schema_type == "boolean":
return False
if schema_type == "array":
item = schema.get("items") if isinstance(schema.get("items"), dict) else {}
if item.get("format") == "uri":
return ["https://example.com/input.png"]
return []
if schema_type == "object":
return {}
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
for item in branch:
if isinstance(item, dict) and "const" in item:
return item["const"]
return f"<{field_name}>"
def summarize_fields(schema: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
properties = schema.get("properties") if isinstance(schema.get("properties"), dict) else {}
required = schema.get("required") if isinstance(schema.get("required"), list) else []
required_set = set(required)
fields: list[dict[str, Any]] = []
template: dict[str, Any] = {}
for name, prop in properties.items():
if not isinstance(prop, dict):
continue
row: dict[str, Any] = {
"name": name,
"required": name in required_set,
"type": summarize_type(prop),
}
if isinstance(prop.get("description"), str):
row["description"] = prop["description"]
if "default" in prop:
row["default"] = prop["default"]
values = allowed_values(prop)
if values:
row["allowed_values"] = values
fields.append(row)
for name in required:
prop = properties.get(name)
if isinstance(prop, dict):
template[name] = template_value(name, prop)
for name, prop in properties.items():
if name in template:
continue
if isinstance(prop, dict) and "default" in prop:
template[name] = prop["default"]
return fields, template
def find_poll_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
preferred = ["/generation/{id}", "/v1/generation/{request_id}", "/v1/generation/{id}"]
for path in preferred:
ops = paths.get(path)
if isinstance(ops, dict) and "get" in ops:
return path
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/generation/\{[^}]+\}", path):
return path
return None
def status_values(openapi: dict[str, Any], schema_name: str) -> list[str] | None:
schema = openapi.get("components", {}).get("schemas", {}).get(schema_name)
if isinstance(schema, dict) and isinstance(schema.get("enum"), list):
if all(isinstance(item, str) for item in schema["enum"]):
return schema["enum"]
return None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--openapi", required=True, help="Path to OpenAPI JSON")
parser.add_argument("--docs", help="Path to docs.json for Videos/Images group extraction")
parser.add_argument("--category", choices=["videos", "images"], default=DEFAULT_CATEGORY)
parser.add_argument("--list-endpoints", action="store_true", help="List POST generation endpoints from docs.json group")
parser.add_argument("--endpoint", help="Endpoint path or full string like 'POST /generation/...'")
parser.add_argument("--include-template", action="store_true", help="Include request_template in output")
parser.add_argument("--json-indent", type=int, default=2)
args = parser.parse_args()
openapi_path = Path(args.openapi)
openapi = read_json(openapi_path)
if args.list_endpoints:
if not args.docs:
raise SystemExit("--list-endpoints requires --docs <path/to/docs.json>")
docs = read_json(Path(args.docs))
polling_endpoints = list_endpoints_from_docs(docs, args.category)
default_endpoints, sse_endpoints = build_default_and_sse_endpoints(
openapi, polling_endpoints
)
print(
json.dumps(
{
"category": args.category,
"source": str(Path(args.docs).resolve()),
"default_result_mode": "sse" if sse_endpoints else "polling",
"default_endpoints": default_endpoints,
"sse_endpoints": sse_endpoints,
"polling_endpoints": polling_endpoints,
"poll_endpoint": find_poll_endpoint(openapi),
},
ensure_ascii=False,
indent=args.json_indent,
)
)
if not args.endpoint:
return 0
if not args.endpoint:
raise SystemExit("Provide --endpoint or use --list-endpoints only")
method, raw_path = parse_endpoint_arg(args.endpoint)
resolved_path, operation = resolve_operation(openapi, method, raw_path)
sse_create_endpoint = find_sse_create_endpoint(openapi, method, resolved_path)
polling_create_endpoint = find_polling_create_endpoint(openapi, method, resolved_path)
request_schema = (
operation.get("requestBody", {})
.get("content", {})
.get("application/json", {})
.get("schema", {})
)
if not isinstance(request_schema, dict):
request_schema = {}
request_schema_name, resolved_request_schema = resolve_schema(openapi, request_schema)
fields, request_template = summarize_fields(resolved_request_schema)
responses = operation.get("responses", {})
if not isinstance(responses, dict):
responses = {}
error_codes = sorted(
int(code) for code in responses.keys() if isinstance(code, str) and code.isdigit() and int(code) >= 400
)
servers = openapi.get("servers") if isinstance(openapi.get("servers"), list) else []
server_url = None
if servers and isinstance(servers[0], dict):
server_url = servers[0].get("url")
output: dict[str, Any] = {
"endpoint": f"{method} {resolved_path}",
"default_result_mode": "sse" if sse_create_endpoint else "polling",
"default_create_endpoint": f"{method} {sse_create_endpoint or polling_create_endpoint or resolved_path}",
"sse_create_endpoint": f"{method} {sse_create_endpoint}" if sse_create_endpoint else None,
"polling_create_endpoint": f"{method} {polling_create_endpoint}" if polling_create_endpoint else None,
"summary": operation.get("summary"),
"server_url": server_url,
"request_schema": request_schema_name,
"fields": fields,
"prediction_status_values": status_values(openapi, "PredictionStatus"),
"queue_status_values": status_values(openapi, "QueueState"),
"sse_subscribe_endpoint": find_subscribe_sse_endpoint(openapi),
"poll_endpoint": find_poll_endpoint(openapi),
"error_status_codes": error_codes,
}
if args.include_template:
output["request_template"] = request_template
print(json.dumps(output, ensure_ascii=False, indent=args.json_indent))
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:agents/openai.yaml
interface:
display_name: "ai-image"
short_description: "Access the world's leading models in one place to produce images"
default_prompt: "Use $iai-image to create images of {subject}."
FILE:references/image-model-endpoints.md
# Image Model Endpoints (Snapshot)
Snapshot source: `docs.json` group `APIs -> Images`
Snapshot date: 2026-03-20
Default mode: SSE-first.
- Primary call path: replace `/generation/` with `/generation/sse/` for each endpoint below.
- Fallback call path: use the listed endpoint below with polling result fetch.
## Endpoints
- `POST /generation/google/nano-banana-2`
- `POST /generation/google/nano-banana-pro`
- `POST /generation/google/nano-banana`
- `POST /generation/openai/gpt-image-1.5`
- `POST /generation/openai/gpt-image-1`
- `POST /generation/kling-ai/v3/image`
- `POST /generation/kling-ai/o3-image`
- `POST /generation/kling-ai/o1-image`
- `POST /generation/xai/grok-imagine-image`
- `POST /generation/bytedance/seedream-5-lite`
- `POST /generation/bytedance/seedream-4.5`
- `POST /generation/bytedance/seedream-4.0`
- `POST /generation/qwen/qwen-image-2-pro`
- `POST /generation/qwen/qwen-image-2`
- `POST /generation/qwen/qwen-image`
- `POST /generation/vidu/q2/text-to-image`
- `POST /generation/runwayml/gen4-image`
- `POST /generation/runwayml/gen4-image-turbo`
- `POST /generation/black-forest-labs/flux-2-pro`
- `POST /generation/black-forest-labs/kontext`
- `POST /generation/ideogram/ideogram`
- `POST /generation/z-image/z-image`
- `POST /generation/topaz/upscale/image`
## Refresh command
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
FILE:references/open-platform-api.md
# skills.video Open Platform API Contract
## Scope
Use this reference for generation flows that create videos or images through `open.skills.video`.
## Base URL and endpoint shape
- Platform base URL: `https://open.skills.video/api/v1`
- Default generation endpoint pattern: `POST /generation/sse/{provider}/{model}`
- Polling create endpoint pattern (fallback): `POST /generation/{provider}/{model}`
- Task query endpoint pattern (fallback): `GET /generation/{id}`
Some model-scoped OpenAPI files use:
- `servers[0].url = /api`
- paths prefixed with `/v1/...`
Treat these as equivalent path representations under the same host.
## Authentication
Use bearer auth on every API call:
```http
Authorization: Bearer <API_KEY>
```
Security scheme in OpenAPI: `bearerAuth` (`type: http`, `scheme: bearer`).
## First-time API key setup
If `SKILLS_VIDEO_API_KEY` is not configured:
1. Open `https://skills.video/dashboard/developer` and sign in.
2. Click `Create API Key`.
3. Export the key in your shell:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## Async generation lifecycle
1. Submit generation with SSE endpoint `POST /generation/sse/{provider}/{model}`.
2. Read stream events (`text/event-stream`) until terminal event.
3. Fall back to polling only if SSE cannot connect, disconnects early, or does not reach terminal state.
4. Poll fallback result with `GET /generation/{id}` (or model-scoped equivalent path).
5. Stop polling at terminal status.
Important:
- `IN_QUEUE` / `IN_PROGRESS` are non-terminal. Do not return as final result.
- Return to caller only after terminal status (`COMPLETED`, `SUCCEEDED`, `FAILED`, `CANCELED`).
## Status values from OpenAPI
### PredictionStatus
- `starting`
- `processing`
- `succeeded`
- `failed`
- `canceled`
### QueueState
- `IN_QUEUE`
- `IN_PROGRESS`
- `COMPLETED`
- `FAILED`
- `CANCELED`
## Error response shape
`ErrorResponse` defines:
- `message` (`string`)
Common documented HTTP errors on generation endpoints:
- `400`
- `401`
- `404`
- `422`
Runtime note:
- Credit exhaustion can appear as `402` and/or error text indicating insufficient credits.
## Insufficient credits handling
When runtime error indicates insufficient credits:
1. Stop automatic retries for this request.
2. Tell the user to recharge credits in `https://skills.video/dashboard` (Billing/Credits).
3. Retry only after recharge.
Use `GET /credits` to check remaining balance:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
## Retry baseline
- Retry only transient failures (`429`, `5xx`, transport timeouts, connection resets).
- Do not retry unchanged payloads on validation/auth errors (`400`, `401`, `404`, `422`).
- Use bounded exponential backoff when retrying.
- Preserve SSE-first behavior on retries; use polling only as fallback.
## Unknowns
OpenAPI in this workspace does not declare concrete rate-limit windows or global timeout budgets.
Mark both as unknown unless an endpoint-specific OpenAPI explicitly documents them.
Build and execute skills.video video generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document video generation calls on...
---
name: ai-video
description: Build and execute skills.video video generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document video generation calls on open.skills.video.
license: MIT
metadata:
openclaw:
author: skills-video
os:
- linux
- darwin
requires:
env:
- SKILLS_VIDEO_API_KEY
bins:
- python3
- curl
primaryEnv: SKILLS_VIDEO_API_KEY
cliHelp: |
Configure API key:
export SKILLS_VIDEO_API_KEY="your_api_key_here"
Verify:
python scripts/ensure_api_key.py
config:
stateDirs:
- ~/.openclaw
example: "Required env vars: SKILLS_VIDEO_API_KEY. Store the key in OpenClaw skill env or shell env and do not hardcode it in files."
links:
repository: https://github.com/skills-video/skills
homepage: https://skills.video
---
# ai-video
## Overview
Use this skill to turn OpenAPI definitions into working video-generation API calls for `skills.video`.
Prefer deterministic extraction from `openapi.json` instead of guessing fields.
## Prerequisites
1. Obtain an API key at: `https://skills.video/dashboard/developer`
2. Configure `SKILLS_VIDEO_API_KEY` before using the skill.
Preferred OpenClaw setup:
- Open the skill settings for `ai-video`
- Add an environment variable named `SKILLS_VIDEO_API_KEY`
- Paste the API key as its value
Equivalent config shape:
```json
{
"skills": {
"entries": {
"ai-video": {
"enabled": true,
"env": {
"SKILLS_VIDEO_API_KEY": "your_api_key_here"
}
}
}
}
}
```
Other valid ways to provide the key:
- **Shell**: `export SKILLS_VIDEO_API_KEY="your_api_key_here"`
- **Tool-specific env config**: any runtime that injects `SKILLS_VIDEO_API_KEY`
## Workflow
1. Check API key and bootstrap environment on first use.
2. Identify the active spec.
3. Select the SSE endpoint pair for a video model.
4. Extract request schema and generate a payload template.
5. Execute `POST /generation/sse/...` as default and keep the stream open.
6. If SSE does not reach terminal completion, poll `GET /generation/{id}` to terminal status.
7. Return only terminal result (`COMPLETED`/`SUCCEEDED`/`FAILED`/`CANCELED`), never `IN_PROGRESS`.
8. Apply retry and failure handling.
## 0) Check API key (first run)
Run this check before any API call.
```bash
python scripts/ensure_api_key.py
```
If `ok` is `false`, tell the user to:
- Follow the setup in `Prerequisites`
Example:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## 1) Identify the spec
Load the most specific OpenAPI first.
- Prefer model-specific OpenAPI when available (for example `/v1/openapi.json` under a model namespace).
- Fall back to platform-level `openapi.json`.
- Use `references/open-platform-api.md` for base URL, auth, and async lifecycle.
## 2) Select a video endpoint
If `docs.json` exists, derive video endpoints from the `Videos` navigation group.
Use `default_endpoints` from the script output as the primary list (SSE first).
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
When `docs.json` is unavailable, pass a known endpoint directly (for example `/generation/sse/kling-ai/kling-v2.6`).
Use `references/video-model-endpoints.md` as a snapshot list.
## 3) Extract schema and build payload
Inspect endpoint details and generate a request template from required/default fields.
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--endpoint /generation/sse/kling-ai/kling-v2.6 \
--include-template
```
Use the returned `request_template` as the starting point.
Do not add fields not defined by the endpoint schema.
Use `default_create_endpoint` from output unless an explicit override is required.
## 4) Execute SSE request (default) with automatic fallback
Prefer the helper script. It creates via SSE and keeps streaming; if stream ends before terminal completion, it automatically switches to polling fallback.
```bash
python scripts/create_and_wait.py \
--sse-endpoint /generation/sse/kling-ai/kling-v2.6 \
--payload '{"prompt":"A cinematic dolly shot of neon city rain at night"}' \
--poll-timeout 900 \
--poll-interval 3
```
Treat SSE as the default result channel.
Do not finish the task on `IN_QUEUE` or `IN_PROGRESS`.
Return only after terminal result.
## 5) Fall back to polling
Use polling only if SSE cannot be established, disconnects early, or does not reach a terminal state.
Use `GET /generation/{id}` (or model-spec equivalent path if the OpenAPI uses `/v1/...`).
```bash
curl -X GET "https://open.skills.video/api/v1/generation/<GENERATION_ID>" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Stop polling on terminal states:
- `COMPLETED`
- `FAILED`
- `CANCELED`
Recommended helper:
```bash
python scripts/wait_generation.py \
--generation-id <GENERATION_ID> \
--timeout 900 \
--interval 3
```
Return to user only after helper emits `event=terminal`.
## 6) Handle errors and retries
Handle these response codes for create, SSE, and fallback poll operations:
- `400`: request format issue
- `401`: missing/invalid API key
- `402`: possible payment/credits issue in runtime
- `404`: endpoint or generation id not found
- `422`: schema validation failed
Classify non-2xx runtime errors with:
```bash
python scripts/handle_runtime_error.py \
--status <HTTP_STATUS> \
--body '<RAW_ERROR_BODY_JSON_OR_TEXT>'
```
If category is `insufficient_credits`, tell the user to recharge:
- Open `https://skills.video/dashboard` and go to Billing/Credits
- Recharge or purchase additional credits
- Retry after recharge
Optional balance check:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Apply retries only for transient conditions (network failure or temporary `5xx`).
Use bounded exponential backoff (for example `1s`, `2s`, `4s`, max `16s`, then fail).
Do not retry unchanged payloads after `4xx` validation errors.
## Rate limits and timeouts
Treat rate limits and server-side timeout windows as unknown unless documented in the active OpenAPI or product docs.
If unknown, explicitly note this in output and choose conservative client defaults.
## Resources
- `scripts/ensure_api_key.py`: validate `SKILLS_VIDEO_API_KEY` and show first-run setup guidance
- `scripts/handle_runtime_error.py`: classify runtime errors and provide recharge guidance for insufficient credits
- `scripts/inspect_openapi.py`: extract SSE/polling endpoint pair, contract, and payload template
- `scripts/create_and_wait.py`: create via SSE and auto-fallback to polling when stream does not reach terminal status
- `scripts/wait_generation.py`: poll generation status until terminal completion and return final response
- `references/open-platform-api.md`: SSE-first lifecycle, fallback polling, retry baseline
- `references/video-model-endpoints.md`: current video endpoint snapshot from `docs.json`
FILE:scripts/ensure_api_key.py
#!/usr/bin/env python3
"""Check SKILLS_VIDEO_API_KEY and print setup guidance when missing."""
from __future__ import annotations
import json
import os
def main() -> int:
key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if key:
print(
json.dumps(
{
"ok": True,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "API key detected.",
},
ensure_ascii=False,
indent=2,
)
)
return 0
print(
json.dumps(
{
"ok": False,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "Missing API key. Configure it before calling skills.video APIs.",
"dashboard_url": "https://skills.video/dashboard/developer",
"how_to_get_key": [
"Sign in at the dashboard URL.",
"Click 'Create API Key'.",
"Copy the generated key.",
],
"set_env_examples": [
"export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"",
"echo 'export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"' >> ~/.zshrc && source ~/.zshrc",
],
},
ensure_ascii=False,
indent=2,
)
)
return 1
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/handle_runtime_error.py
#!/usr/bin/env python3
"""Classify skills.video runtime API errors and provide actionable guidance."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
INSUFFICIENT_CREDITS_KEYWORDS = (
"insufficient credit",
"not enough credit",
"insufficient balance",
"credit balance",
"top up",
"recharge",
)
def load_body(body: str | None, body_file: str | None) -> Any:
if body_file:
raw = Path(body_file).read_text(encoding="utf-8")
elif body is not None:
raw = body
else:
return None
raw = raw.strip()
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error = payload.get("error")
if isinstance(error, dict):
nested = error.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def classify(status: int, message: str) -> str:
lowered = message.lower()
if status == 402 or any(keyword in lowered for keyword in INSUFFICIENT_CREDITS_KEYWORDS):
return "insufficient_credits"
if status == 401 or "unauthorized" in lowered:
return "auth"
if status == 422 or "validation" in lowered:
return "validation"
if status == 404:
return "not_found"
if status == 429 or status >= 500:
return "transient"
return "unknown"
def exit_code_for(category: str) -> int:
table = {
"insufficient_credits": 20,
"auth": 21,
"validation": 22,
"not_found": 23,
"transient": 24,
"unknown": 25,
}
return table.get(category, 25)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--status", type=int, required=True, help="HTTP status code")
parser.add_argument("--body", help="Raw response body JSON/text")
parser.add_argument("--body-file", help="Path to file containing raw response body")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--credits-endpoint", default="/credits")
args = parser.parse_args()
payload = load_body(args.body, args.body_file)
message = extract_message(payload)
category = classify(args.status, message)
guidance: list[str] = []
should_retry = False
if category == "insufficient_credits":
guidance = [
"Credits are insufficient for this request.",
"Open https://skills.video/dashboard and go to Billing/Credits to recharge.",
"After recharge, retry the same request.",
]
elif category == "auth":
guidance = [
"Authentication failed.",
"Verify SKILLS_VIDEO_API_KEY and retry.",
]
elif category == "validation":
guidance = [
"Request payload validation failed.",
"Fix request parameters based on OpenAPI schema and retry.",
]
elif category == "not_found":
guidance = [
"Resource or endpoint was not found.",
"Recheck endpoint path/model id and generation id.",
]
elif category == "transient":
guidance = [
"Transient server or rate-limit error.",
"Retry with bounded exponential backoff.",
]
should_retry = True
else:
guidance = [
"Unhandled runtime error.",
"Inspect response payload and apply a safe fallback path.",
]
credits_url = f"{args.base_url.rstrip('/')}{args.credits_endpoint}"
output = {
"category": category,
"status": args.status,
"message": message,
"should_retry": should_retry,
"guidance": guidance,
"credits_check_command": (
f"curl -X GET \"{credits_url}\" "
"-H \"Authorization: Bearer $SKILLS_VIDEO_API_KEY\""
),
"recharge": {
"dashboard_url": "https://skills.video/dashboard",
"pricing_url": "https://skills.video/pricing",
},
}
print(json.dumps(output, ensure_ascii=False, indent=2))
return exit_code_for(category)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/create_and_wait.py
#!/usr/bin/env python3
"""Create generation via SSE, then fallback to polling until terminal status."""
from __future__ import annotations
import argparse
import json
import os
import socket
import subprocess
import sys
from pathlib import Path
from typing import Any
from urllib import error, request
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
TERMINAL_EVENT_NAMES = {
"completed",
"complete",
"succeeded",
"success",
"failed",
"failure",
"canceled",
"cancelled",
"done",
"finished",
"terminal",
}
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def load_payload(args: argparse.Namespace) -> dict[str, Any]:
if args.payload_file:
raw = Path(args.payload_file).read_text(encoding="utf-8")
payload = parse_json_or_text(raw)
elif args.payload is not None:
payload = parse_json_or_text(args.payload)
else:
payload = {}
if payload is None:
return {}
if not isinstance(payload, dict):
raise SystemExit("Payload must be a JSON object.")
return payload
def endpoint_url(base_url: str, endpoint: str) -> str:
if endpoint.startswith("http://") or endpoint.startswith("https://"):
return endpoint
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
return f"{base_url.rstrip('/')}{endpoint}"
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def find_generation_id(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"generation_id",
"generationId",
"prediction_id",
"predictionId",
"task_id",
"taskId",
"id",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def is_terminal_event_name(event_name: str | None) -> bool:
if not event_name:
return False
return event_name.strip().lower() in TERMINAL_EVENT_NAMES
def run_sse(
url: str,
api_key: str,
payload: dict[str, Any],
request_timeout: float,
) -> tuple[int, str | None, Any]:
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
},
data=json.dumps(payload).encode("utf-8"),
method="POST",
)
generation_id: str | None = None
terminal_payload: Any = None
event_name = "message"
data_lines: list[str] = []
def flush_event() -> tuple[bool, int]:
nonlocal event_name, data_lines, generation_id, terminal_payload
if not data_lines:
event_name = "message"
return False, 0
text = "\n".join(data_lines)
parsed = parse_json_or_text(text)
status_value, status_key = find_status(parsed)
status_normalized = status_value.upper() if status_value else None
event_terminal = (status_normalized in TERMINAL_STATUSES) or is_terminal_event_name(event_name)
current_id, _ = find_generation_id(parsed)
if current_id and not generation_id:
generation_id = current_id
emit(
{
"event": "sse_event",
"sse_event_name": event_name,
"generation_id": generation_id,
"status": status_value,
"status_key": status_key,
"terminal": event_terminal,
"payload": parsed,
}
)
data_lines = []
event_name = "message"
if event_terminal:
terminal_payload = parsed
if status_normalized in SUCCESS_STATUSES:
return True, 0
if status_normalized in TERMINAL_STATUSES:
return True, 10
return True, 0
return False, 0
try:
with request.urlopen(req, timeout=request_timeout) as resp:
emit({"event": "sse_open", "http_status": resp.getcode(), "url": url})
for raw_line in resp:
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line:
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
event_name = line.split(":", 1)[1].strip() or "message"
elif line.startswith("data:"):
data_lines.append(line.split(":", 1)[1].lstrip())
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
emit(
{
"event": "sse_stream_ended",
"generation_id": generation_id,
"message": "SSE stream ended before terminal status.",
}
)
return 100, generation_id, None
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
parsed = parse_json_or_text(body)
emit(
{
"event": "sse_http_error",
"http_status": exc.code,
"message": extract_message(parsed),
"payload": parsed,
}
)
return 101, generation_id, parsed
except (socket.timeout, TimeoutError) as exc:
emit({"event": "sse_timeout", "message": str(exc), "generation_id": generation_id})
return 102, generation_id, None
except error.URLError as exc:
emit({"event": "sse_connect_error", "message": str(exc), "generation_id": generation_id})
return 103, generation_id, None
def run_poll_fallback(
generation_id: str,
base_url: str,
timeout: float,
interval: float,
request_timeout: float,
) -> int:
wait_script = Path(__file__).with_name("wait_generation.py")
if not wait_script.exists():
emit(
{
"event": "error",
"error": "wait_script_not_found",
"message": f"Missing fallback script: {wait_script}",
}
)
return 2
cmd = [
sys.executable,
str(wait_script),
"--generation-id",
generation_id,
"--base-url",
base_url,
"--timeout",
str(timeout),
"--interval",
str(interval),
"--request-timeout",
str(request_timeout),
]
emit(
{
"event": "fallback_polling_start",
"generation_id": generation_id,
"command": " ".join(cmd),
}
)
result = subprocess.run(cmd, check=False)
emit(
{
"event": "fallback_polling_end",
"generation_id": generation_id,
"exit_code": result.returncode,
}
)
return result.returncode
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--sse-endpoint", required=True, help="SSE create endpoint path or full URL")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--payload", help="JSON object payload string")
parser.add_argument("--payload-file", help="Path to JSON payload file")
parser.add_argument("--sse-request-timeout", type=float, default=120.0)
parser.add_argument("--poll-timeout", type=float, default=900.0)
parser.add_argument("--poll-interval", type=float, default=3.0)
parser.add_argument("--poll-request-timeout", type=float, default=20.0)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before calling skills.video APIs.",
}
)
return 1
if args.payload is not None and args.payload_file:
emit(
{
"event": "error",
"error": "invalid_arguments",
"message": "Use either --payload or --payload-file, not both.",
}
)
return 1
payload = load_payload(args)
url = endpoint_url(args.base_url, args.sse_endpoint)
emit({"event": "start", "url": url, "mode": "sse_then_poll_fallback"})
sse_rc, generation_id, terminal_payload = run_sse(
url=url,
api_key=api_key,
payload=payload,
request_timeout=args.sse_request_timeout,
)
if sse_rc in (0, 10):
emit(
{
"event": "terminal",
"source": "sse",
"ok": sse_rc == 0,
"generation_id": generation_id,
"response": terminal_payload,
}
)
return sse_rc
if not generation_id:
emit(
{
"event": "error",
"error": "missing_generation_id",
"message": "SSE did not return generation id; cannot start poll fallback.",
"sse_exit_code": sse_rc,
}
)
return 3
return run_poll_fallback(
generation_id=generation_id,
base_url=args.base_url,
timeout=args.poll_timeout,
interval=args.poll_interval,
request_timeout=args.poll_request_timeout,
)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/wait_generation.py
#!/usr/bin/env python3
"""Poll a skills.video generation task until it reaches terminal status."""
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Any
from urllib import error, request
TRANSIENT_HTTP_STATUSES = {429, 500, 502, 503, 504}
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def fetch_generation(
base_url: str,
generation_id: str,
api_key: str,
request_timeout: float,
) -> tuple[int, Any]:
url = f"{base_url.rstrip('/')}/generation/{generation_id}"
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
method="GET",
)
try:
with request.urlopen(req, timeout=request_timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return resp.getcode(), parse_json_or_text(raw)
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return exc.code, parse_json_or_text(body)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--generation-id", required=True, help="Generation id to poll")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--interval", type=float, default=3.0, help="Poll interval in seconds")
parser.add_argument("--timeout", type=float, default=600.0, help="Total wait timeout in seconds")
parser.add_argument(
"--request-timeout",
type=float,
default=20.0,
help="HTTP request timeout per poll in seconds",
)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before polling generation status.",
}
)
return 1
if args.interval <= 0:
emit({"event": "error", "error": "invalid_interval", "message": "--interval must be > 0"})
return 1
if args.timeout <= 0:
emit({"event": "error", "error": "invalid_timeout", "message": "--timeout must be > 0"})
return 1
start = time.monotonic()
attempt = 0
last_status: str | None = None
while True:
elapsed = time.monotonic() - start
if elapsed > args.timeout:
emit(
{
"event": "timeout",
"generation_id": args.generation_id,
"elapsed_seconds": round(elapsed, 2),
"last_status": last_status,
}
)
return 11
attempt += 1
http_status, payload = fetch_generation(
base_url=args.base_url,
generation_id=args.generation_id,
api_key=api_key,
request_timeout=args.request_timeout,
)
if http_status >= 400:
message = extract_message(payload)
transient = http_status in TRANSIENT_HTTP_STATUSES
emit(
{
"event": "poll_error",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"transient": transient,
"message": message,
}
)
if transient:
time.sleep(args.interval)
continue
return 12
status_value, status_key = find_status(payload)
normalized = status_value.upper() if status_value else None
terminal = normalized in TERMINAL_STATUSES if normalized else False
emit(
{
"event": "poll",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"status": status_value,
"status_key": status_key,
"terminal": terminal,
}
)
if status_value:
last_status = status_value
if terminal:
ok = normalized in SUCCESS_STATUSES
emit(
{
"event": "terminal",
"ok": ok,
"generation_id": args.generation_id,
"status": status_value,
"response": payload,
}
)
return 0 if ok else 10
time.sleep(args.interval)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/inspect_openapi.py
#!/usr/bin/env python3
"""Inspect skills.video generation OpenAPI contracts and emit endpoint-ready summaries."""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_CATEGORY = "videos"
METHOD_AND_PATH_RE = re.compile(r"^(GET|POST|PUT|PATCH|DELETE)\s+(.+)$", re.IGNORECASE)
def read_json(path: Path) -> dict[str, Any]:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise SystemExit(f"File not found: {path}")
except json.JSONDecodeError as exc:
raise SystemExit(f"Invalid JSON in {path}: {exc}")
def find_group_nodes(node: Any, group_name: str) -> list[dict[str, Any]]:
matches: list[dict[str, Any]] = []
if isinstance(node, dict):
if node.get("group") == group_name and isinstance(node.get("pages"), list):
matches.append(node)
for value in node.values():
matches.extend(find_group_nodes(value, group_name))
elif isinstance(node, list):
for item in node:
matches.extend(find_group_nodes(item, group_name))
return matches
def collect_method_paths(node: Any) -> list[str]:
rows: list[str] = []
if isinstance(node, str):
match = METHOD_AND_PATH_RE.match(node.strip())
if match:
method = match.group(1).upper()
path = match.group(2).strip()
if method == "POST" and path.startswith("/generation/"):
rows.append(f"{method} {path}")
elif isinstance(node, dict):
if "pages" in node:
rows.extend(collect_method_paths(node["pages"]))
elif isinstance(node, list):
for item in node:
rows.extend(collect_method_paths(item))
return rows
def unique_in_order(items: list[str]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for item in items:
if item not in seen:
seen.add(item)
output.append(item)
return output
def list_endpoints_from_docs(docs: dict[str, Any], category: str) -> list[str]:
group_name = "Videos" if category == "videos" else "Images"
rows: list[str] = []
for group in find_group_nodes(docs, group_name):
rows.extend(collect_method_paths(group.get("pages", [])))
return unique_in_order(rows)
def parse_endpoint_arg(endpoint: str) -> tuple[str, str]:
endpoint = endpoint.strip()
match = METHOD_AND_PATH_RE.match(endpoint)
if match:
return match.group(1).upper(), match.group(2).strip()
return "POST", endpoint
def try_paths(path: str) -> list[str]:
candidates = [path]
if path.startswith("/v1/"):
candidates.append(path[3:])
elif path.startswith("/"):
candidates.append(f"/v1{path}")
else:
candidates.append(f"/v1/{path}")
return unique_in_order(candidates)
def resolve_operation(openapi: dict[str, Any], method: str, path: str) -> tuple[str, dict[str, Any]]:
paths = openapi.get("paths", {})
for candidate in try_paths(path):
operations = paths.get(candidate)
if isinstance(operations, dict) and method.lower() in operations:
operation = operations[method.lower()]
if isinstance(operation, dict):
return candidate, operation
raise SystemExit(f"Endpoint not found in OpenAPI: {method} {path}")
def endpoint_exists(openapi: dict[str, Any], method: str, path: str) -> bool:
operations = openapi.get("paths", {}).get(path)
return isinstance(operations, dict) and method.lower() in operations
def to_sse_path(path: str) -> str:
if "/generation/sse/" in path:
return path
return path.replace("/generation/", "/generation/sse/", 1)
def to_polling_path(path: str) -> str:
return path.replace("/generation/sse/", "/generation/", 1)
def find_sse_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
sse_candidate = to_sse_path(resolved_path)
if endpoint_exists(openapi, method, sse_candidate):
return sse_candidate
if "/generation/sse/" in resolved_path and endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_polling_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
polling_candidate = to_polling_path(resolved_path)
if endpoint_exists(openapi, method, polling_candidate):
return polling_candidate
if endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_subscribe_sse_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/predictions/\{[^}]+\}/subscribe$", path):
return path
return None
def build_default_and_sse_endpoints(
openapi: dict[str, Any], endpoints: list[str]
) -> tuple[list[str], list[str]]:
default_endpoints: list[str] = []
sse_endpoints: list[str] = []
for item in endpoints:
method, path = parse_endpoint_arg(item)
sse_path = to_sse_path(path)
if endpoint_exists(openapi, method, sse_path):
resolved = f"{method} {sse_path}"
default_endpoints.append(resolved)
sse_endpoints.append(resolved)
else:
default_endpoints.append(f"{method} {path}")
return (
unique_in_order(default_endpoints),
unique_in_order(sse_endpoints),
)
def ref_name(ref: str) -> str:
return ref.rsplit("/", 1)[-1]
def resolve_schema(openapi: dict[str, Any], schema: dict[str, Any]) -> tuple[str | None, dict[str, Any]]:
if "$ref" not in schema:
return None, schema
name = ref_name(schema["$ref"])
resolved = openapi.get("components", {}).get("schemas", {}).get(name)
if not isinstance(resolved, dict):
raise SystemExit(f"Schema not found for ref: {schema['$ref']}")
return name, resolved
def allowed_values(schema: dict[str, Any]) -> list[Any] | None:
if isinstance(schema.get("enum"), list):
return schema["enum"]
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
consts = [item["const"] for item in branch if isinstance(item, dict) and "const" in item]
if consts:
return consts
return None
def summarize_type(schema: dict[str, Any]) -> str:
if "$ref" in schema:
return ref_name(schema["$ref"])
schema_type = schema.get("type")
if schema_type == "array":
item_schema = schema.get("items") if isinstance(schema.get("items"), dict) else {}
return f"array<{summarize_type(item_schema)}>"
if isinstance(schema_type, str):
return schema_type
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
variants: list[str] = []
for item in branch:
if not isinstance(item, dict):
continue
if "const" in item:
const_value = item["const"]
if isinstance(const_value, bool):
variants.append("boolean")
elif isinstance(const_value, int):
variants.append("integer")
elif isinstance(const_value, float):
variants.append("number")
elif isinstance(const_value, str):
variants.append("string")
else:
variants.append(type(const_value).__name__)
elif "$ref" in item:
variants.append(ref_name(item["$ref"]))
elif isinstance(item.get("type"), str):
variants.append(item["type"])
if variants:
return " | ".join(unique_in_order(variants))
return "object"
def template_value(field_name: str, schema: dict[str, Any]) -> Any:
if "default" in schema:
return schema["default"]
values = allowed_values(schema)
if values:
return values[0]
schema_type = schema.get("type")
if schema_type == "string":
if schema.get("format") == "uri":
return f"https://example.com/{field_name}.png"
if "prompt" in field_name:
return "Describe what to generate"
return f"<{field_name}>"
if schema_type in {"integer", "number"}:
return 1
if schema_type == "boolean":
return False
if schema_type == "array":
item = schema.get("items") if isinstance(schema.get("items"), dict) else {}
if item.get("format") == "uri":
return ["https://example.com/input.png"]
return []
if schema_type == "object":
return {}
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
for item in branch:
if isinstance(item, dict) and "const" in item:
return item["const"]
return f"<{field_name}>"
def summarize_fields(schema: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
properties = schema.get("properties") if isinstance(schema.get("properties"), dict) else {}
required = schema.get("required") if isinstance(schema.get("required"), list) else []
required_set = set(required)
fields: list[dict[str, Any]] = []
template: dict[str, Any] = {}
for name, prop in properties.items():
if not isinstance(prop, dict):
continue
row: dict[str, Any] = {
"name": name,
"required": name in required_set,
"type": summarize_type(prop),
}
if isinstance(prop.get("description"), str):
row["description"] = prop["description"]
if "default" in prop:
row["default"] = prop["default"]
values = allowed_values(prop)
if values:
row["allowed_values"] = values
fields.append(row)
for name in required:
prop = properties.get(name)
if isinstance(prop, dict):
template[name] = template_value(name, prop)
for name, prop in properties.items():
if name in template:
continue
if isinstance(prop, dict) and "default" in prop:
template[name] = prop["default"]
return fields, template
def find_poll_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
preferred = ["/generation/{id}", "/v1/generation/{request_id}", "/v1/generation/{id}"]
for path in preferred:
ops = paths.get(path)
if isinstance(ops, dict) and "get" in ops:
return path
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/generation/\{[^}]+\}", path):
return path
return None
def status_values(openapi: dict[str, Any], schema_name: str) -> list[str] | None:
schema = openapi.get("components", {}).get("schemas", {}).get(schema_name)
if isinstance(schema, dict) and isinstance(schema.get("enum"), list):
if all(isinstance(item, str) for item in schema["enum"]):
return schema["enum"]
return None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--openapi", required=True, help="Path to OpenAPI JSON")
parser.add_argument("--docs", help="Path to docs.json for Videos/Images group extraction")
parser.add_argument("--category", choices=["videos", "images"], default=DEFAULT_CATEGORY)
parser.add_argument("--list-endpoints", action="store_true", help="List POST generation endpoints from docs.json group")
parser.add_argument("--endpoint", help="Endpoint path or full string like 'POST /generation/...'")
parser.add_argument("--include-template", action="store_true", help="Include request_template in output")
parser.add_argument("--json-indent", type=int, default=2)
args = parser.parse_args()
openapi_path = Path(args.openapi)
openapi = read_json(openapi_path)
if args.list_endpoints:
if not args.docs:
raise SystemExit("--list-endpoints requires --docs <path/to/docs.json>")
docs = read_json(Path(args.docs))
polling_endpoints = list_endpoints_from_docs(docs, args.category)
default_endpoints, sse_endpoints = build_default_and_sse_endpoints(
openapi, polling_endpoints
)
print(
json.dumps(
{
"category": args.category,
"source": str(Path(args.docs).resolve()),
"default_result_mode": "sse" if sse_endpoints else "polling",
"default_endpoints": default_endpoints,
"sse_endpoints": sse_endpoints,
"polling_endpoints": polling_endpoints,
"poll_endpoint": find_poll_endpoint(openapi),
},
ensure_ascii=False,
indent=args.json_indent,
)
)
if not args.endpoint:
return 0
if not args.endpoint:
raise SystemExit("Provide --endpoint or use --list-endpoints only")
method, raw_path = parse_endpoint_arg(args.endpoint)
resolved_path, operation = resolve_operation(openapi, method, raw_path)
sse_create_endpoint = find_sse_create_endpoint(openapi, method, resolved_path)
polling_create_endpoint = find_polling_create_endpoint(openapi, method, resolved_path)
request_schema = (
operation.get("requestBody", {})
.get("content", {})
.get("application/json", {})
.get("schema", {})
)
if not isinstance(request_schema, dict):
request_schema = {}
request_schema_name, resolved_request_schema = resolve_schema(openapi, request_schema)
fields, request_template = summarize_fields(resolved_request_schema)
responses = operation.get("responses", {})
if not isinstance(responses, dict):
responses = {}
error_codes = sorted(
int(code) for code in responses.keys() if isinstance(code, str) and code.isdigit() and int(code) >= 400
)
servers = openapi.get("servers") if isinstance(openapi.get("servers"), list) else []
server_url = None
if servers and isinstance(servers[0], dict):
server_url = servers[0].get("url")
output: dict[str, Any] = {
"endpoint": f"{method} {resolved_path}",
"default_result_mode": "sse" if sse_create_endpoint else "polling",
"default_create_endpoint": f"{method} {sse_create_endpoint or polling_create_endpoint or resolved_path}",
"sse_create_endpoint": f"{method} {sse_create_endpoint}" if sse_create_endpoint else None,
"polling_create_endpoint": f"{method} {polling_create_endpoint}" if polling_create_endpoint else None,
"summary": operation.get("summary"),
"server_url": server_url,
"request_schema": request_schema_name,
"fields": fields,
"prediction_status_values": status_values(openapi, "PredictionStatus"),
"queue_status_values": status_values(openapi, "QueueState"),
"sse_subscribe_endpoint": find_subscribe_sse_endpoint(openapi),
"poll_endpoint": find_poll_endpoint(openapi),
"error_status_codes": error_codes,
}
if args.include_template:
output["request_template"] = request_template
print(json.dumps(output, ensure_ascii=False, indent=args.json_indent))
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:agents/openai.yaml
interface:
display_name: "ai-video"
short_description: "Access the world's leading models in one place to produce videos"
default_prompt: "Use $ai-video to create videos of {subject}."
FILE:references/video-model-endpoints.md
# Video Model Endpoints (Snapshot)
Snapshot source: `docs.json` group `APIs -> Videos`
Snapshot date: 2026-03-20
Default mode: SSE-first.
- Primary call path: replace `/generation/` with `/generation/sse/` for each endpoint below.
- Fallback call path: use the listed endpoint below with polling result fetch.
## Endpoints
- `POST /generation/openai/sora-2-pro`
- `POST /generation/openai/sora-2`
- `POST /generation/google/veo-2`
- `POST /generation/google/veo-3.1`
- `POST /generation/google/veo-3.1-fast`
- `POST /generation/bytedance/seedance-1-pro`
- `POST /generation/bytedance/seedance-1-pro-fast`
- `POST /generation/bytedance/seedance-1.5`
- `POST /generation/xai/grok-imagine-video`
- `POST /generation/kling-ai/v3/video`
- `POST /generation/kling-ai/o3/video`
- `POST /generation/kling-ai/o3/edit`
- `POST /generation/kling-ai/o1/video`
- `POST /generation/kling-ai/o1/edit`
- `POST /generation/kling-ai/kling-v2.6`
- `POST /generation/kling-ai/motion-control-v2.6`
- `POST /generation/kling-ai/kling-v2.5-turbo`
- `POST /generation/runwayml/gen-4.5`
- `POST /generation/runwayml/gen4-turbo`
- `POST /generation/runwayml/gen4-aleph`
- `POST /generation/wan/wan-v2.6`
- `POST /generation/luma/ray-2`
- `POST /generation/luma/ray-2-flash`
- `POST /generation/vidu/q2/video-turbo`
- `POST /generation/vidu/q3`
- `POST /generation/minimax/hailuo-2.3`
- `POST /generation/minimax/hailuo-2.3-fast`
- `POST /generation/lightricks/ltx-2.3`
- `POST /generation/lightricks/ltx-2.3/audio-to-video`
- `POST /generation/lightricks/ltx-2.3/extend-video`
- `POST /generation/lightricks/ltx-2.3/retake-video`
- `POST /generation/pixverse/v5.6`
- `POST /generation/bytedance/omnihuman-v1.5`
- `POST /generation/sync/lipsync-2`
- `POST /generation/veed/lipsync`
- `POST /generation/pixverse/lipsync`
- `POST /generation/topaz/upscale/video`
- `POST /generation/runwayml/upscale-v1`
## Refresh command
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
FILE:references/open-platform-api.md
# skills.video Open Platform API Contract
## Scope
Use this reference for generation flows that create videos or images through `open.skills.video`.
## Base URL and endpoint shape
- Platform base URL: `https://open.skills.video/api/v1`
- Default generation endpoint pattern: `POST /generation/sse/{provider}/{model}`
- Polling create endpoint pattern (fallback): `POST /generation/{provider}/{model}`
- Task query endpoint pattern (fallback): `GET /generation/{id}`
Some model-scoped OpenAPI files use:
- `servers[0].url = /api`
- paths prefixed with `/v1/...`
Treat these as equivalent path representations under the same host.
## Authentication
Use bearer auth on every API call:
```http
Authorization: Bearer <API_KEY>
```
Security scheme in OpenAPI: `bearerAuth` (`type: http`, `scheme: bearer`).
## First-time API key setup
If `SKILLS_VIDEO_API_KEY` is not configured:
1. Open `https://skills.video/dashboard/developer` and sign in.
2. Click `Create API Key`.
3. Export the key in your shell:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## Async generation lifecycle
1. Submit generation with SSE endpoint `POST /generation/sse/{provider}/{model}`.
2. Read stream events (`text/event-stream`) until terminal event.
3. Fall back to polling only if SSE cannot connect, disconnects early, or does not reach terminal state.
4. Poll fallback result with `GET /generation/{id}` (or model-scoped equivalent path).
5. Stop polling at terminal status.
Important:
- `IN_QUEUE` / `IN_PROGRESS` are non-terminal. Do not return as final result.
- Return to caller only after terminal status (`COMPLETED`, `SUCCEEDED`, `FAILED`, `CANCELED`).
## Status values from OpenAPI
### PredictionStatus
- `starting`
- `processing`
- `succeeded`
- `failed`
- `canceled`
### QueueState
- `IN_QUEUE`
- `IN_PROGRESS`
- `COMPLETED`
- `FAILED`
- `CANCELED`
## Error response shape
`ErrorResponse` defines:
- `message` (`string`)
Common documented HTTP errors on generation endpoints:
- `400`
- `401`
- `404`
- `422`
Runtime note:
- Credit exhaustion can appear as `402` and/or error text indicating insufficient credits.
## Insufficient credits handling
When runtime error indicates insufficient credits:
1. Stop automatic retries for this request.
2. Tell the user to recharge credits in `https://skills.video/dashboard` (Billing/Credits).
3. Retry only after recharge.
Use `GET /credits` to check remaining balance:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
## Retry baseline
- Retry only transient failures (`429`, `5xx`, transport timeouts, connection resets).
- Do not retry unchanged payloads on validation/auth errors (`400`, `401`, `404`, `422`).
- Use bounded exponential backoff when retrying.
- Preserve SSE-first behavior on retries; use polling only as fallback.
## Unknowns
OpenAPI in this workspace does not declare concrete rate-limit windows or global timeout budgets.
Mark both as unknown unless an endpoint-specific OpenAPI explicitly documents them.
Build and execute skills.video video generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document video generation calls on...
---
name: "ai-video-skills"
description: Build and execute skills.video video generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document video generation calls on open.skills.video.
---
# ai-video-skills
## Overview
Use this skill to turn OpenAPI definitions into working video-generation API calls for `skills.video`.
Prefer deterministic extraction from `openapi.json` instead of guessing fields.
## Workflow
1. Check API key and bootstrap environment on first use.
2. Identify the active spec.
3. Select the SSE endpoint pair for a video model.
4. Extract request schema and generate a payload template.
5. Execute `POST /generation/sse/...` as default and keep the stream open.
6. If SSE does not reach terminal completion, poll `GET /generation/{id}` to terminal status.
7. Return only terminal result (`COMPLETED`/`SUCCEEDED`/`FAILED`/`CANCELED`), never `IN_PROGRESS`.
8. Apply retry and failure handling.
## 0) Check API key (first run)
Run this check before any API call.
```bash
python scripts/ensure_api_key.py
```
If `ok` is `false`, tell the user to:
- Open `https://skills.video/dashboard/developer` and log in
- Click `Create API Key`
- Export the key as `SKILLS_VIDEO_API_KEY`
Example:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## 1) Identify the spec
Load the most specific OpenAPI first.
- Prefer model-specific OpenAPI when available (for example `/v1/openapi.json` under a model namespace).
- Fall back to platform-level `openapi.json`.
- Use `references/open-platform-api.md` for base URL, auth, and async lifecycle.
## 2) Select a video endpoint
If `docs.json` exists, derive video endpoints from the `Videos` navigation group.
Use `default_endpoints` from the script output as the primary list (SSE first).
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
When `docs.json` is unavailable, pass a known endpoint directly (for example `/generation/sse/kling-ai/kling-v2.6`).
Use `references/video-model-endpoints.md` as a snapshot list.
## 3) Extract schema and build payload
Inspect endpoint details and generate a request template from required/default fields.
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--endpoint /generation/sse/kling-ai/kling-v2.6 \
--include-template
```
Use the returned `request_template` as the starting point.
Do not add fields not defined by the endpoint schema.
Use `default_create_endpoint` from output unless an explicit override is required.
## 4) Execute SSE request (default) with automatic fallback
Prefer the helper script. It creates via SSE and keeps streaming; if stream ends before terminal completion, it automatically switches to polling fallback.
```bash
python scripts/create_and_wait.py \
--sse-endpoint /generation/sse/kling-ai/kling-v2.6 \
--payload '{"prompt":"A cinematic dolly shot of neon city rain at night"}' \
--poll-timeout 900 \
--poll-interval 3
```
Treat SSE as the default result channel.
Do not finish the task on `IN_QUEUE` or `IN_PROGRESS`.
Return only after terminal result.
## 5) Fall back to polling
Use polling only if SSE cannot be established, disconnects early, or does not reach a terminal state.
Use `GET /generation/{id}` (or model-spec equivalent path if the OpenAPI uses `/v1/...`).
```bash
curl -X GET "https://open.skills.video/api/v1/generation/<GENERATION_ID>" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Stop polling on terminal states:
- `COMPLETED`
- `FAILED`
- `CANCELED`
Recommended helper:
```bash
python scripts/wait_generation.py \
--generation-id <GENERATION_ID> \
--timeout 900 \
--interval 3
```
Return to user only after helper emits `event=terminal`.
## 6) Handle errors and retries
Handle these response codes for create, SSE, and fallback poll operations:
- `400`: request format issue
- `401`: missing/invalid API key
- `402`: possible payment/credits issue in runtime
- `404`: endpoint or generation id not found
- `422`: schema validation failed
Classify non-2xx runtime errors with:
```bash
python scripts/handle_runtime_error.py \
--status <HTTP_STATUS> \
--body '<RAW_ERROR_BODY_JSON_OR_TEXT>'
```
If category is `insufficient_credits`, tell the user to recharge:
- Open `https://skills.video/dashboard` and go to Billing/Credits
- Recharge or purchase additional credits
- Retry after recharge
Optional balance check:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Apply retries only for transient conditions (network failure or temporary `5xx`).
Use bounded exponential backoff (for example `1s`, `2s`, `4s`, max `16s`, then fail).
Do not retry unchanged payloads after `4xx` validation errors.
## Rate limits and timeouts
Treat rate limits and server-side timeout windows as unknown unless documented in the active OpenAPI or product docs.
If unknown, explicitly note this in output and choose conservative client defaults.
## Resources
- `scripts/ensure_api_key.py`: validate `SKILLS_VIDEO_API_KEY` and show first-run setup guidance
- `scripts/handle_runtime_error.py`: classify runtime errors and provide recharge guidance for insufficient credits
- `scripts/inspect_openapi.py`: extract SSE/polling endpoint pair, contract, and payload template
- `scripts/create_and_wait.py`: create via SSE and auto-fallback to polling when stream does not reach terminal status
- `scripts/wait_generation.py`: poll generation status until terminal completion and return final response
- `references/open-platform-api.md`: SSE-first lifecycle, fallback polling, retry baseline
- `references/video-model-endpoints.md`: current video endpoint snapshot from `docs.json`
FILE:agents/openai.yaml
interface:
display_name: "ai-video-skills"
short_description: "Access the world's leading models in one place to produce videos"
default_prompt: "Use $ai-video-skills to create videos of {subject}."
FILE:references/open-platform-api.md
# skills.video Open Platform API Contract
## Scope
Use this reference for generation flows that create videos or images through `open.skills.video`.
## Base URL and endpoint shape
- Platform base URL: `https://open.skills.video/api/v1`
- Default generation endpoint pattern: `POST /generation/sse/{provider}/{model}`
- Polling create endpoint pattern (fallback): `POST /generation/{provider}/{model}`
- Task query endpoint pattern (fallback): `GET /generation/{id}`
Some model-scoped OpenAPI files use:
- `servers[0].url = /api`
- paths prefixed with `/v1/...`
Treat these as equivalent path representations under the same host.
## Authentication
Use bearer auth on every API call:
```http
Authorization: Bearer <API_KEY>
```
Security scheme in OpenAPI: `bearerAuth` (`type: http`, `scheme: bearer`).
## First-time API key setup
If `SKILLS_VIDEO_API_KEY` is not configured:
1. Open `https://skills.video/dashboard/developer` and sign in.
2. Click `Create API Key`.
3. Export the key in your shell:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## Async generation lifecycle
1. Submit generation with SSE endpoint `POST /generation/sse/{provider}/{model}`.
2. Read stream events (`text/event-stream`) until terminal event.
3. Fall back to polling only if SSE cannot connect, disconnects early, or does not reach terminal state.
4. Poll fallback result with `GET /generation/{id}` (or model-scoped equivalent path).
5. Stop polling at terminal status.
Important:
- `IN_QUEUE` / `IN_PROGRESS` are non-terminal. Do not return as final result.
- Return to caller only after terminal status (`COMPLETED`, `SUCCEEDED`, `FAILED`, `CANCELED`).
## Status values from OpenAPI
### PredictionStatus
- `starting`
- `processing`
- `succeeded`
- `failed`
- `canceled`
### QueueState
- `IN_QUEUE`
- `IN_PROGRESS`
- `COMPLETED`
- `FAILED`
- `CANCELED`
## Error response shape
`ErrorResponse` defines:
- `message` (`string`)
Common documented HTTP errors on generation endpoints:
- `400`
- `401`
- `404`
- `422`
Runtime note:
- Credit exhaustion can appear as `402` and/or error text indicating insufficient credits.
## Insufficient credits handling
When runtime error indicates insufficient credits:
1. Stop automatic retries for this request.
2. Tell the user to recharge credits in `https://skills.video/dashboard` (Billing/Credits).
3. Retry only after recharge.
Use `GET /credits` to check remaining balance:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
## Retry baseline
- Retry only transient failures (`429`, `5xx`, transport timeouts, connection resets).
- Do not retry unchanged payloads on validation/auth errors (`400`, `401`, `404`, `422`).
- Use bounded exponential backoff when retrying.
- Preserve SSE-first behavior on retries; use polling only as fallback.
## Unknowns
OpenAPI in this workspace does not declare concrete rate-limit windows or global timeout budgets.
Mark both as unknown unless an endpoint-specific OpenAPI explicitly documents them.
FILE:references/video-model-endpoints.md
# Video Model Endpoints (Snapshot)
Snapshot source: `docs.json` group `APIs -> Videos`
Snapshot date: 2026-03-20
Default mode: SSE-first.
- Primary call path: replace `/generation/` with `/generation/sse/` for each endpoint below.
- Fallback call path: use the listed endpoint below with polling result fetch.
## Endpoints
- `POST /generation/openai/sora-2-pro`
- `POST /generation/openai/sora-2`
- `POST /generation/google/veo-2`
- `POST /generation/google/veo-3.1`
- `POST /generation/google/veo-3.1-fast`
- `POST /generation/bytedance/seedance-1-pro`
- `POST /generation/bytedance/seedance-1-pro-fast`
- `POST /generation/bytedance/seedance-1.5`
- `POST /generation/xai/grok-imagine-video`
- `POST /generation/kling-ai/v3/video`
- `POST /generation/kling-ai/o3/video`
- `POST /generation/kling-ai/o3/edit`
- `POST /generation/kling-ai/o1/video`
- `POST /generation/kling-ai/o1/edit`
- `POST /generation/kling-ai/kling-v2.6`
- `POST /generation/kling-ai/motion-control-v2.6`
- `POST /generation/kling-ai/kling-v2.5-turbo`
- `POST /generation/runwayml/gen-4.5`
- `POST /generation/runwayml/gen4-turbo`
- `POST /generation/runwayml/gen4-aleph`
- `POST /generation/wan/wan-v2.6`
- `POST /generation/luma/ray-2`
- `POST /generation/luma/ray-2-flash`
- `POST /generation/vidu/q2/video-turbo`
- `POST /generation/vidu/q3`
- `POST /generation/minimax/hailuo-2.3`
- `POST /generation/minimax/hailuo-2.3-fast`
- `POST /generation/lightricks/ltx-2.3`
- `POST /generation/lightricks/ltx-2.3/audio-to-video`
- `POST /generation/lightricks/ltx-2.3/extend-video`
- `POST /generation/lightricks/ltx-2.3/retake-video`
- `POST /generation/pixverse/v5.6`
- `POST /generation/bytedance/omnihuman-v1.5`
- `POST /generation/sync/lipsync-2`
- `POST /generation/veed/lipsync`
- `POST /generation/pixverse/lipsync`
- `POST /generation/topaz/upscale/video`
- `POST /generation/runwayml/upscale-v1`
## Refresh command
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
FILE:scripts/create_and_wait.py
#!/usr/bin/env python3
"""Create generation via SSE, then fallback to polling until terminal status."""
from __future__ import annotations
import argparse
import json
import os
import socket
import subprocess
import sys
from pathlib import Path
from typing import Any
from urllib import error, request
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
TERMINAL_EVENT_NAMES = {
"completed",
"complete",
"succeeded",
"success",
"failed",
"failure",
"canceled",
"cancelled",
"done",
"finished",
"terminal",
}
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def load_payload(args: argparse.Namespace) -> dict[str, Any]:
if args.payload_file:
raw = Path(args.payload_file).read_text(encoding="utf-8")
payload = parse_json_or_text(raw)
elif args.payload is not None:
payload = parse_json_or_text(args.payload)
else:
payload = {}
if payload is None:
return {}
if not isinstance(payload, dict):
raise SystemExit("Payload must be a JSON object.")
return payload
def endpoint_url(base_url: str, endpoint: str) -> str:
if endpoint.startswith("http://") or endpoint.startswith("https://"):
return endpoint
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
return f"{base_url.rstrip('/')}{endpoint}"
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def find_generation_id(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"generation_id",
"generationId",
"prediction_id",
"predictionId",
"task_id",
"taskId",
"id",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def is_terminal_event_name(event_name: str | None) -> bool:
if not event_name:
return False
return event_name.strip().lower() in TERMINAL_EVENT_NAMES
def run_sse(
url: str,
api_key: str,
payload: dict[str, Any],
request_timeout: float,
) -> tuple[int, str | None, Any]:
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
},
data=json.dumps(payload).encode("utf-8"),
method="POST",
)
generation_id: str | None = None
terminal_payload: Any = None
event_name = "message"
data_lines: list[str] = []
def flush_event() -> tuple[bool, int]:
nonlocal event_name, data_lines, generation_id, terminal_payload
if not data_lines:
event_name = "message"
return False, 0
text = "\n".join(data_lines)
parsed = parse_json_or_text(text)
status_value, status_key = find_status(parsed)
status_normalized = status_value.upper() if status_value else None
event_terminal = (status_normalized in TERMINAL_STATUSES) or is_terminal_event_name(event_name)
current_id, _ = find_generation_id(parsed)
if current_id and not generation_id:
generation_id = current_id
emit(
{
"event": "sse_event",
"sse_event_name": event_name,
"generation_id": generation_id,
"status": status_value,
"status_key": status_key,
"terminal": event_terminal,
"payload": parsed,
}
)
data_lines = []
event_name = "message"
if event_terminal:
terminal_payload = parsed
if status_normalized in SUCCESS_STATUSES:
return True, 0
if status_normalized in TERMINAL_STATUSES:
return True, 10
return True, 0
return False, 0
try:
with request.urlopen(req, timeout=request_timeout) as resp:
emit({"event": "sse_open", "http_status": resp.getcode(), "url": url})
for raw_line in resp:
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line:
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
event_name = line.split(":", 1)[1].strip() or "message"
elif line.startswith("data:"):
data_lines.append(line.split(":", 1)[1].lstrip())
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
emit(
{
"event": "sse_stream_ended",
"generation_id": generation_id,
"message": "SSE stream ended before terminal status.",
}
)
return 100, generation_id, None
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
parsed = parse_json_or_text(body)
emit(
{
"event": "sse_http_error",
"http_status": exc.code,
"message": extract_message(parsed),
"payload": parsed,
}
)
return 101, generation_id, parsed
except (socket.timeout, TimeoutError) as exc:
emit({"event": "sse_timeout", "message": str(exc), "generation_id": generation_id})
return 102, generation_id, None
except error.URLError as exc:
emit({"event": "sse_connect_error", "message": str(exc), "generation_id": generation_id})
return 103, generation_id, None
def run_poll_fallback(
generation_id: str,
base_url: str,
timeout: float,
interval: float,
request_timeout: float,
) -> int:
wait_script = Path(__file__).with_name("wait_generation.py")
if not wait_script.exists():
emit(
{
"event": "error",
"error": "wait_script_not_found",
"message": f"Missing fallback script: {wait_script}",
}
)
return 2
cmd = [
sys.executable,
str(wait_script),
"--generation-id",
generation_id,
"--base-url",
base_url,
"--timeout",
str(timeout),
"--interval",
str(interval),
"--request-timeout",
str(request_timeout),
]
emit(
{
"event": "fallback_polling_start",
"generation_id": generation_id,
"command": " ".join(cmd),
}
)
result = subprocess.run(cmd, check=False)
emit(
{
"event": "fallback_polling_end",
"generation_id": generation_id,
"exit_code": result.returncode,
}
)
return result.returncode
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--sse-endpoint", required=True, help="SSE create endpoint path or full URL")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--payload", help="JSON object payload string")
parser.add_argument("--payload-file", help="Path to JSON payload file")
parser.add_argument("--sse-request-timeout", type=float, default=120.0)
parser.add_argument("--poll-timeout", type=float, default=900.0)
parser.add_argument("--poll-interval", type=float, default=3.0)
parser.add_argument("--poll-request-timeout", type=float, default=20.0)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before calling skills.video APIs.",
}
)
return 1
if args.payload is not None and args.payload_file:
emit(
{
"event": "error",
"error": "invalid_arguments",
"message": "Use either --payload or --payload-file, not both.",
}
)
return 1
payload = load_payload(args)
url = endpoint_url(args.base_url, args.sse_endpoint)
emit({"event": "start", "url": url, "mode": "sse_then_poll_fallback"})
sse_rc, generation_id, terminal_payload = run_sse(
url=url,
api_key=api_key,
payload=payload,
request_timeout=args.sse_request_timeout,
)
if sse_rc in (0, 10):
emit(
{
"event": "terminal",
"source": "sse",
"ok": sse_rc == 0,
"generation_id": generation_id,
"response": terminal_payload,
}
)
return sse_rc
if not generation_id:
emit(
{
"event": "error",
"error": "missing_generation_id",
"message": "SSE did not return generation id; cannot start poll fallback.",
"sse_exit_code": sse_rc,
}
)
return 3
return run_poll_fallback(
generation_id=generation_id,
base_url=args.base_url,
timeout=args.poll_timeout,
interval=args.poll_interval,
request_timeout=args.poll_request_timeout,
)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/ensure_api_key.py
#!/usr/bin/env python3
"""Check SKILLS_VIDEO_API_KEY and print setup guidance when missing."""
from __future__ import annotations
import json
import os
def main() -> int:
key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if key:
print(
json.dumps(
{
"ok": True,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "API key detected.",
},
ensure_ascii=False,
indent=2,
)
)
return 0
print(
json.dumps(
{
"ok": False,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "Missing API key. Configure it before calling skills.video APIs.",
"dashboard_url": "https://skills.video/dashboard/developer",
"how_to_get_key": [
"Sign in at the dashboard URL.",
"Click 'Create API Key'.",
"Copy the generated key.",
],
"set_env_examples": [
"export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"",
"echo 'export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"' >> ~/.zshrc && source ~/.zshrc",
],
},
ensure_ascii=False,
indent=2,
)
)
return 1
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/handle_runtime_error.py
#!/usr/bin/env python3
"""Classify skills.video runtime API errors and provide actionable guidance."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
INSUFFICIENT_CREDITS_KEYWORDS = (
"insufficient credit",
"not enough credit",
"insufficient balance",
"credit balance",
"top up",
"recharge",
)
def load_body(body: str | None, body_file: str | None) -> Any:
if body_file:
raw = Path(body_file).read_text(encoding="utf-8")
elif body is not None:
raw = body
else:
return None
raw = raw.strip()
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error = payload.get("error")
if isinstance(error, dict):
nested = error.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def classify(status: int, message: str) -> str:
lowered = message.lower()
if status == 402 or any(keyword in lowered for keyword in INSUFFICIENT_CREDITS_KEYWORDS):
return "insufficient_credits"
if status == 401 or "unauthorized" in lowered:
return "auth"
if status == 422 or "validation" in lowered:
return "validation"
if status == 404:
return "not_found"
if status == 429 or status >= 500:
return "transient"
return "unknown"
def exit_code_for(category: str) -> int:
table = {
"insufficient_credits": 20,
"auth": 21,
"validation": 22,
"not_found": 23,
"transient": 24,
"unknown": 25,
}
return table.get(category, 25)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--status", type=int, required=True, help="HTTP status code")
parser.add_argument("--body", help="Raw response body JSON/text")
parser.add_argument("--body-file", help="Path to file containing raw response body")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--credits-endpoint", default="/credits")
args = parser.parse_args()
payload = load_body(args.body, args.body_file)
message = extract_message(payload)
category = classify(args.status, message)
guidance: list[str] = []
should_retry = False
if category == "insufficient_credits":
guidance = [
"Credits are insufficient for this request.",
"Open https://skills.video/dashboard and go to Billing/Credits to recharge.",
"After recharge, retry the same request.",
]
elif category == "auth":
guidance = [
"Authentication failed.",
"Verify SKILLS_VIDEO_API_KEY and retry.",
]
elif category == "validation":
guidance = [
"Request payload validation failed.",
"Fix request parameters based on OpenAPI schema and retry.",
]
elif category == "not_found":
guidance = [
"Resource or endpoint was not found.",
"Recheck endpoint path/model id and generation id.",
]
elif category == "transient":
guidance = [
"Transient server or rate-limit error.",
"Retry with bounded exponential backoff.",
]
should_retry = True
else:
guidance = [
"Unhandled runtime error.",
"Inspect response payload and apply a safe fallback path.",
]
credits_url = f"{args.base_url.rstrip('/')}{args.credits_endpoint}"
output = {
"category": category,
"status": args.status,
"message": message,
"should_retry": should_retry,
"guidance": guidance,
"credits_check_command": (
f"curl -X GET \"{credits_url}\" "
"-H \"Authorization: Bearer $SKILLS_VIDEO_API_KEY\""
),
"recharge": {
"dashboard_url": "https://skills.video/dashboard",
"pricing_url": "https://skills.video/pricing",
},
}
print(json.dumps(output, ensure_ascii=False, indent=2))
return exit_code_for(category)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/inspect_openapi.py
#!/usr/bin/env python3
"""Inspect skills.video generation OpenAPI contracts and emit endpoint-ready summaries."""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_CATEGORY = "videos"
METHOD_AND_PATH_RE = re.compile(r"^(GET|POST|PUT|PATCH|DELETE)\s+(.+)$", re.IGNORECASE)
def read_json(path: Path) -> dict[str, Any]:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise SystemExit(f"File not found: {path}")
except json.JSONDecodeError as exc:
raise SystemExit(f"Invalid JSON in {path}: {exc}")
def find_group_nodes(node: Any, group_name: str) -> list[dict[str, Any]]:
matches: list[dict[str, Any]] = []
if isinstance(node, dict):
if node.get("group") == group_name and isinstance(node.get("pages"), list):
matches.append(node)
for value in node.values():
matches.extend(find_group_nodes(value, group_name))
elif isinstance(node, list):
for item in node:
matches.extend(find_group_nodes(item, group_name))
return matches
def collect_method_paths(node: Any) -> list[str]:
rows: list[str] = []
if isinstance(node, str):
match = METHOD_AND_PATH_RE.match(node.strip())
if match:
method = match.group(1).upper()
path = match.group(2).strip()
if method == "POST" and path.startswith("/generation/"):
rows.append(f"{method} {path}")
elif isinstance(node, dict):
if "pages" in node:
rows.extend(collect_method_paths(node["pages"]))
elif isinstance(node, list):
for item in node:
rows.extend(collect_method_paths(item))
return rows
def unique_in_order(items: list[str]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for item in items:
if item not in seen:
seen.add(item)
output.append(item)
return output
def list_endpoints_from_docs(docs: dict[str, Any], category: str) -> list[str]:
group_name = "Videos" if category == "videos" else "Images"
rows: list[str] = []
for group in find_group_nodes(docs, group_name):
rows.extend(collect_method_paths(group.get("pages", [])))
return unique_in_order(rows)
def parse_endpoint_arg(endpoint: str) -> tuple[str, str]:
endpoint = endpoint.strip()
match = METHOD_AND_PATH_RE.match(endpoint)
if match:
return match.group(1).upper(), match.group(2).strip()
return "POST", endpoint
def try_paths(path: str) -> list[str]:
candidates = [path]
if path.startswith("/v1/"):
candidates.append(path[3:])
elif path.startswith("/"):
candidates.append(f"/v1{path}")
else:
candidates.append(f"/v1/{path}")
return unique_in_order(candidates)
def resolve_operation(openapi: dict[str, Any], method: str, path: str) -> tuple[str, dict[str, Any]]:
paths = openapi.get("paths", {})
for candidate in try_paths(path):
operations = paths.get(candidate)
if isinstance(operations, dict) and method.lower() in operations:
operation = operations[method.lower()]
if isinstance(operation, dict):
return candidate, operation
raise SystemExit(f"Endpoint not found in OpenAPI: {method} {path}")
def endpoint_exists(openapi: dict[str, Any], method: str, path: str) -> bool:
operations = openapi.get("paths", {}).get(path)
return isinstance(operations, dict) and method.lower() in operations
def to_sse_path(path: str) -> str:
if "/generation/sse/" in path:
return path
return path.replace("/generation/", "/generation/sse/", 1)
def to_polling_path(path: str) -> str:
return path.replace("/generation/sse/", "/generation/", 1)
def find_sse_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
sse_candidate = to_sse_path(resolved_path)
if endpoint_exists(openapi, method, sse_candidate):
return sse_candidate
if "/generation/sse/" in resolved_path and endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_polling_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
polling_candidate = to_polling_path(resolved_path)
if endpoint_exists(openapi, method, polling_candidate):
return polling_candidate
if endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_subscribe_sse_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/predictions/\{[^}]+\}/subscribe$", path):
return path
return None
def build_default_and_sse_endpoints(
openapi: dict[str, Any], endpoints: list[str]
) -> tuple[list[str], list[str]]:
default_endpoints: list[str] = []
sse_endpoints: list[str] = []
for item in endpoints:
method, path = parse_endpoint_arg(item)
sse_path = to_sse_path(path)
if endpoint_exists(openapi, method, sse_path):
resolved = f"{method} {sse_path}"
default_endpoints.append(resolved)
sse_endpoints.append(resolved)
else:
default_endpoints.append(f"{method} {path}")
return (
unique_in_order(default_endpoints),
unique_in_order(sse_endpoints),
)
def ref_name(ref: str) -> str:
return ref.rsplit("/", 1)[-1]
def resolve_schema(openapi: dict[str, Any], schema: dict[str, Any]) -> tuple[str | None, dict[str, Any]]:
if "$ref" not in schema:
return None, schema
name = ref_name(schema["$ref"])
resolved = openapi.get("components", {}).get("schemas", {}).get(name)
if not isinstance(resolved, dict):
raise SystemExit(f"Schema not found for ref: {schema['$ref']}")
return name, resolved
def allowed_values(schema: dict[str, Any]) -> list[Any] | None:
if isinstance(schema.get("enum"), list):
return schema["enum"]
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
consts = [item["const"] for item in branch if isinstance(item, dict) and "const" in item]
if consts:
return consts
return None
def summarize_type(schema: dict[str, Any]) -> str:
if "$ref" in schema:
return ref_name(schema["$ref"])
schema_type = schema.get("type")
if schema_type == "array":
item_schema = schema.get("items") if isinstance(schema.get("items"), dict) else {}
return f"array<{summarize_type(item_schema)}>"
if isinstance(schema_type, str):
return schema_type
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
variants: list[str] = []
for item in branch:
if not isinstance(item, dict):
continue
if "const" in item:
const_value = item["const"]
if isinstance(const_value, bool):
variants.append("boolean")
elif isinstance(const_value, int):
variants.append("integer")
elif isinstance(const_value, float):
variants.append("number")
elif isinstance(const_value, str):
variants.append("string")
else:
variants.append(type(const_value).__name__)
elif "$ref" in item:
variants.append(ref_name(item["$ref"]))
elif isinstance(item.get("type"), str):
variants.append(item["type"])
if variants:
return " | ".join(unique_in_order(variants))
return "object"
def template_value(field_name: str, schema: dict[str, Any]) -> Any:
if "default" in schema:
return schema["default"]
values = allowed_values(schema)
if values:
return values[0]
schema_type = schema.get("type")
if schema_type == "string":
if schema.get("format") == "uri":
return f"https://example.com/{field_name}.png"
if "prompt" in field_name:
return "Describe what to generate"
return f"<{field_name}>"
if schema_type in {"integer", "number"}:
return 1
if schema_type == "boolean":
return False
if schema_type == "array":
item = schema.get("items") if isinstance(schema.get("items"), dict) else {}
if item.get("format") == "uri":
return ["https://example.com/input.png"]
return []
if schema_type == "object":
return {}
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
for item in branch:
if isinstance(item, dict) and "const" in item:
return item["const"]
return f"<{field_name}>"
def summarize_fields(schema: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
properties = schema.get("properties") if isinstance(schema.get("properties"), dict) else {}
required = schema.get("required") if isinstance(schema.get("required"), list) else []
required_set = set(required)
fields: list[dict[str, Any]] = []
template: dict[str, Any] = {}
for name, prop in properties.items():
if not isinstance(prop, dict):
continue
row: dict[str, Any] = {
"name": name,
"required": name in required_set,
"type": summarize_type(prop),
}
if isinstance(prop.get("description"), str):
row["description"] = prop["description"]
if "default" in prop:
row["default"] = prop["default"]
values = allowed_values(prop)
if values:
row["allowed_values"] = values
fields.append(row)
for name in required:
prop = properties.get(name)
if isinstance(prop, dict):
template[name] = template_value(name, prop)
for name, prop in properties.items():
if name in template:
continue
if isinstance(prop, dict) and "default" in prop:
template[name] = prop["default"]
return fields, template
def find_poll_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
preferred = ["/generation/{id}", "/v1/generation/{request_id}", "/v1/generation/{id}"]
for path in preferred:
ops = paths.get(path)
if isinstance(ops, dict) and "get" in ops:
return path
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/generation/\{[^}]+\}", path):
return path
return None
def status_values(openapi: dict[str, Any], schema_name: str) -> list[str] | None:
schema = openapi.get("components", {}).get("schemas", {}).get(schema_name)
if isinstance(schema, dict) and isinstance(schema.get("enum"), list):
if all(isinstance(item, str) for item in schema["enum"]):
return schema["enum"]
return None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--openapi", required=True, help="Path to OpenAPI JSON")
parser.add_argument("--docs", help="Path to docs.json for Videos/Images group extraction")
parser.add_argument("--category", choices=["videos", "images"], default=DEFAULT_CATEGORY)
parser.add_argument("--list-endpoints", action="store_true", help="List POST generation endpoints from docs.json group")
parser.add_argument("--endpoint", help="Endpoint path or full string like 'POST /generation/...'")
parser.add_argument("--include-template", action="store_true", help="Include request_template in output")
parser.add_argument("--json-indent", type=int, default=2)
args = parser.parse_args()
openapi_path = Path(args.openapi)
openapi = read_json(openapi_path)
if args.list_endpoints:
if not args.docs:
raise SystemExit("--list-endpoints requires --docs <path/to/docs.json>")
docs = read_json(Path(args.docs))
polling_endpoints = list_endpoints_from_docs(docs, args.category)
default_endpoints, sse_endpoints = build_default_and_sse_endpoints(
openapi, polling_endpoints
)
print(
json.dumps(
{
"category": args.category,
"source": str(Path(args.docs).resolve()),
"default_result_mode": "sse" if sse_endpoints else "polling",
"default_endpoints": default_endpoints,
"sse_endpoints": sse_endpoints,
"polling_endpoints": polling_endpoints,
"poll_endpoint": find_poll_endpoint(openapi),
},
ensure_ascii=False,
indent=args.json_indent,
)
)
if not args.endpoint:
return 0
if not args.endpoint:
raise SystemExit("Provide --endpoint or use --list-endpoints only")
method, raw_path = parse_endpoint_arg(args.endpoint)
resolved_path, operation = resolve_operation(openapi, method, raw_path)
sse_create_endpoint = find_sse_create_endpoint(openapi, method, resolved_path)
polling_create_endpoint = find_polling_create_endpoint(openapi, method, resolved_path)
request_schema = (
operation.get("requestBody", {})
.get("content", {})
.get("application/json", {})
.get("schema", {})
)
if not isinstance(request_schema, dict):
request_schema = {}
request_schema_name, resolved_request_schema = resolve_schema(openapi, request_schema)
fields, request_template = summarize_fields(resolved_request_schema)
responses = operation.get("responses", {})
if not isinstance(responses, dict):
responses = {}
error_codes = sorted(
int(code) for code in responses.keys() if isinstance(code, str) and code.isdigit() and int(code) >= 400
)
servers = openapi.get("servers") if isinstance(openapi.get("servers"), list) else []
server_url = None
if servers and isinstance(servers[0], dict):
server_url = servers[0].get("url")
output: dict[str, Any] = {
"endpoint": f"{method} {resolved_path}",
"default_result_mode": "sse" if sse_create_endpoint else "polling",
"default_create_endpoint": f"{method} {sse_create_endpoint or polling_create_endpoint or resolved_path}",
"sse_create_endpoint": f"{method} {sse_create_endpoint}" if sse_create_endpoint else None,
"polling_create_endpoint": f"{method} {polling_create_endpoint}" if polling_create_endpoint else None,
"summary": operation.get("summary"),
"server_url": server_url,
"request_schema": request_schema_name,
"fields": fields,
"prediction_status_values": status_values(openapi, "PredictionStatus"),
"queue_status_values": status_values(openapi, "QueueState"),
"sse_subscribe_endpoint": find_subscribe_sse_endpoint(openapi),
"poll_endpoint": find_poll_endpoint(openapi),
"error_status_codes": error_codes,
}
if args.include_template:
output["request_template"] = request_template
print(json.dumps(output, ensure_ascii=False, indent=args.json_indent))
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/wait_generation.py
#!/usr/bin/env python3
"""Poll a skills.video generation task until it reaches terminal status."""
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Any
from urllib import error, request
TRANSIENT_HTTP_STATUSES = {429, 500, 502, 503, 504}
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def fetch_generation(
base_url: str,
generation_id: str,
api_key: str,
request_timeout: float,
) -> tuple[int, Any]:
url = f"{base_url.rstrip('/')}/generation/{generation_id}"
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
method="GET",
)
try:
with request.urlopen(req, timeout=request_timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return resp.getcode(), parse_json_or_text(raw)
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return exc.code, parse_json_or_text(body)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--generation-id", required=True, help="Generation id to poll")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--interval", type=float, default=3.0, help="Poll interval in seconds")
parser.add_argument("--timeout", type=float, default=600.0, help="Total wait timeout in seconds")
parser.add_argument(
"--request-timeout",
type=float,
default=20.0,
help="HTTP request timeout per poll in seconds",
)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before polling generation status.",
}
)
return 1
if args.interval <= 0:
emit({"event": "error", "error": "invalid_interval", "message": "--interval must be > 0"})
return 1
if args.timeout <= 0:
emit({"event": "error", "error": "invalid_timeout", "message": "--timeout must be > 0"})
return 1
start = time.monotonic()
attempt = 0
last_status: str | None = None
while True:
elapsed = time.monotonic() - start
if elapsed > args.timeout:
emit(
{
"event": "timeout",
"generation_id": args.generation_id,
"elapsed_seconds": round(elapsed, 2),
"last_status": last_status,
}
)
return 11
attempt += 1
http_status, payload = fetch_generation(
base_url=args.base_url,
generation_id=args.generation_id,
api_key=api_key,
request_timeout=args.request_timeout,
)
if http_status >= 400:
message = extract_message(payload)
transient = http_status in TRANSIENT_HTTP_STATUSES
emit(
{
"event": "poll_error",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"transient": transient,
"message": message,
}
)
if transient:
time.sleep(args.interval)
continue
return 12
status_value, status_key = find_status(payload)
normalized = status_value.upper() if status_value else None
terminal = normalized in TERMINAL_STATUSES if normalized else False
emit(
{
"event": "poll",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"status": status_value,
"status_key": status_key,
"terminal": terminal,
}
)
if status_value:
last_status = status_value
if terminal:
ok = normalized in SUCCESS_STATUSES
emit(
{
"event": "terminal",
"ok": ok,
"generation_id": args.generation_id,
"status": status_value,
"response": payload,
}
)
return 0 if ok else 10
time.sleep(args.interval)
if __name__ == "__main__":
raise SystemExit(main())
Build and execute skills.video image generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document image generation calls on...
---
name: "ai-image-skills"
description: Build and execute skills.video image generation REST requests from OpenAPI specs. Use when user needs to create, debug, or document image generation calls on open.skills.video.
---
# ai-image-skills
## Overview
Use this skill to turn OpenAPI definitions into working image-generation API calls for `skills.video`.
Prefer deterministic extraction from `openapi.json` instead of guessing fields.
## Workflow
1. Check API key and bootstrap environment on first use.
2. Identify the active spec.
3. Select the SSE endpoint pair for an image model.
4. Extract request schema and generate a payload template.
5. Execute `POST /generation/sse/...` as default and keep the stream open.
6. If SSE does not reach terminal completion, poll `GET /generation/{id}` to terminal status.
7. Return only terminal result (`COMPLETED`/`SUCCEEDED`/`FAILED`/`CANCELED`), never `IN_PROGRESS`.
8. Apply retry and failure handling.
## 0) Check API key (first run)
Run this check before any API call.
```bash
python scripts/ensure_api_key.py
```
If `ok` is `false`, tell the user to:
- Open `https://skills.video/dashboard/developer` and log in
- Click `Create API Key`
- Export the key as `SKILLS_VIDEO_API_KEY`
Example:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## 1) Identify the spec
Load the most specific OpenAPI first.
- Prefer model-specific OpenAPI when available (for example `/v1/openapi.json` under a model namespace).
- Fall back to platform-level `openapi.json`.
- Use `references/open-platform-api.md` for base URL, auth, and async lifecycle.
## 2) Select an image endpoint
If `docs.json` exists, derive image endpoints from the `Images` navigation group.
Use `default_endpoints` from the script output as the primary list (SSE first).
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
When `docs.json` is unavailable, pass a known endpoint directly (for example `/generation/sse/google/nano-banana-pro`).
Use `references/image-model-endpoints.md` as a snapshot list.
## 3) Extract schema and build payload
Inspect endpoint details and generate a request template from required/default fields.
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--endpoint /generation/sse/google/nano-banana-pro \
--include-template
```
Use the returned `request_template` as the starting point.
Do not add fields not defined by the endpoint schema.
Use `default_create_endpoint` from output unless an explicit override is required.
## 4) Execute SSE request (default) with automatic fallback
Prefer the helper script. It creates via SSE and keeps streaming; if stream ends before terminal completion, it automatically switches to polling fallback.
```bash
python scripts/create_and_wait.py \
--sse-endpoint /generation/sse/google/nano-banana-pro \
--payload '{"prompt":"Minimal product photo of a matte black coffee grinder on white background"}' \
--poll-timeout 900 \
--poll-interval 3
```
Treat SSE as the default result channel.
Do not finish the task on `IN_QUEUE` or `IN_PROGRESS`.
Return only after terminal result.
## 5) Fall back to polling
Use polling only if SSE cannot be established, disconnects early, or does not reach a terminal state.
Use `GET /generation/{id}` (or model-spec equivalent path if the OpenAPI uses `/v1/...`).
```bash
curl -X GET "https://open.skills.video/api/v1/generation/<GENERATION_ID>" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Stop polling on terminal states:
- `COMPLETED`
- `FAILED`
- `CANCELED`
Recommended helper:
```bash
python scripts/wait_generation.py \
--generation-id <GENERATION_ID> \
--timeout 900 \
--interval 3
```
Return to user only after helper emits `event=terminal`.
## 6) Handle errors and retries
Handle these response codes for create, SSE, and fallback poll operations:
- `400`: request format issue
- `401`: missing/invalid API key
- `402`: possible payment/credits issue in runtime
- `404`: endpoint or generation id not found
- `422`: schema validation failed
Classify non-2xx runtime errors with:
```bash
python scripts/handle_runtime_error.py \
--status <HTTP_STATUS> \
--body '<RAW_ERROR_BODY_JSON_OR_TEXT>'
```
If category is `insufficient_credits`, tell the user to recharge:
- Open `https://skills.video/dashboard` and go to Billing/Credits
- Recharge or purchase additional credits
- Retry after recharge
Optional balance check:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
Apply retries only for transient conditions (network failure or temporary `5xx`).
Use bounded exponential backoff (for example `1s`, `2s`, `4s`, max `16s`, then fail).
Do not retry unchanged payloads after `4xx` validation errors.
## Rate limits and timeouts
Treat rate limits and server-side timeout windows as unknown unless documented in the active OpenAPI or product docs.
If unknown, explicitly note this in output and choose conservative client defaults.
## Resources
- `scripts/ensure_api_key.py`: validate `SKILLS_VIDEO_API_KEY` and show first-run setup guidance
- `scripts/handle_runtime_error.py`: classify runtime errors and provide recharge guidance for insufficient credits
- `scripts/inspect_openapi.py`: extract SSE/polling endpoint pair, contract, and payload template
- `scripts/create_and_wait.py`: create via SSE and auto-fallback to polling when stream does not reach terminal status
- `scripts/wait_generation.py`: poll generation status until terminal completion and return final response
- `references/open-platform-api.md`: SSE-first lifecycle, fallback polling, retry baseline
- `references/image-model-endpoints.md`: current image endpoint snapshot from `docs.json`
FILE:agents/openai.yaml
interface:
display_name: "ai-image-skills"
short_description: "Access the world's leading models in one place to produce images"
default_prompt: "Use $ai-image-skills to create images of {subject}."
FILE:references/image-model-endpoints.md
# Image Model Endpoints (Snapshot)
Snapshot source: `docs.json` group `APIs -> Images`
Snapshot date: 2026-03-20
Default mode: SSE-first.
- Primary call path: replace `/generation/` with `/generation/sse/` for each endpoint below.
- Fallback call path: use the listed endpoint below with polling result fetch.
## Endpoints
- `POST /generation/google/nano-banana-2`
- `POST /generation/google/nano-banana-pro`
- `POST /generation/google/nano-banana`
- `POST /generation/openai/gpt-image-1.5`
- `POST /generation/openai/gpt-image-1`
- `POST /generation/kling-ai/v3/image`
- `POST /generation/kling-ai/o3-image`
- `POST /generation/kling-ai/o1-image`
- `POST /generation/xai/grok-imagine-image`
- `POST /generation/bytedance/seedream-5-lite`
- `POST /generation/bytedance/seedream-4.5`
- `POST /generation/bytedance/seedream-4.0`
- `POST /generation/qwen/qwen-image-2-pro`
- `POST /generation/qwen/qwen-image-2`
- `POST /generation/qwen/qwen-image`
- `POST /generation/vidu/q2/text-to-image`
- `POST /generation/runwayml/gen4-image`
- `POST /generation/runwayml/gen4-image-turbo`
- `POST /generation/black-forest-labs/flux-2-pro`
- `POST /generation/black-forest-labs/kontext`
- `POST /generation/ideogram/ideogram`
- `POST /generation/z-image/z-image`
- `POST /generation/topaz/upscale/image`
## Refresh command
```bash
python scripts/inspect_openapi.py \
--openapi /abs/path/to/openapi.json \
--docs /abs/path/to/docs.json \
--list-endpoints
```
FILE:references/open-platform-api.md
# skills.video Open Platform API Contract
## Scope
Use this reference for generation flows that create videos or images through `open.skills.video`.
## Base URL and endpoint shape
- Platform base URL: `https://open.skills.video/api/v1`
- Default generation endpoint pattern: `POST /generation/sse/{provider}/{model}`
- Polling create endpoint pattern (fallback): `POST /generation/{provider}/{model}`
- Task query endpoint pattern (fallback): `GET /generation/{id}`
Some model-scoped OpenAPI files use:
- `servers[0].url = /api`
- paths prefixed with `/v1/...`
Treat these as equivalent path representations under the same host.
## Authentication
Use bearer auth on every API call:
```http
Authorization: Bearer <API_KEY>
```
Security scheme in OpenAPI: `bearerAuth` (`type: http`, `scheme: bearer`).
## First-time API key setup
If `SKILLS_VIDEO_API_KEY` is not configured:
1. Open `https://skills.video/dashboard/developer` and sign in.
2. Click `Create API Key`.
3. Export the key in your shell:
```bash
export SKILLS_VIDEO_API_KEY="<YOUR_API_KEY>"
```
## Async generation lifecycle
1. Submit generation with SSE endpoint `POST /generation/sse/{provider}/{model}`.
2. Read stream events (`text/event-stream`) until terminal event.
3. Fall back to polling only if SSE cannot connect, disconnects early, or does not reach terminal state.
4. Poll fallback result with `GET /generation/{id}` (or model-scoped equivalent path).
5. Stop polling at terminal status.
Important:
- `IN_QUEUE` / `IN_PROGRESS` are non-terminal. Do not return as final result.
- Return to caller only after terminal status (`COMPLETED`, `SUCCEEDED`, `FAILED`, `CANCELED`).
## Status values from OpenAPI
### PredictionStatus
- `starting`
- `processing`
- `succeeded`
- `failed`
- `canceled`
### QueueState
- `IN_QUEUE`
- `IN_PROGRESS`
- `COMPLETED`
- `FAILED`
- `CANCELED`
## Error response shape
`ErrorResponse` defines:
- `message` (`string`)
Common documented HTTP errors on generation endpoints:
- `400`
- `401`
- `404`
- `422`
Runtime note:
- Credit exhaustion can appear as `402` and/or error text indicating insufficient credits.
## Insufficient credits handling
When runtime error indicates insufficient credits:
1. Stop automatic retries for this request.
2. Tell the user to recharge credits in `https://skills.video/dashboard` (Billing/Credits).
3. Retry only after recharge.
Use `GET /credits` to check remaining balance:
```bash
curl -X GET "https://open.skills.video/api/v1/credits" \
-H "Authorization: Bearer $SKILLS_VIDEO_API_KEY"
```
## Retry baseline
- Retry only transient failures (`429`, `5xx`, transport timeouts, connection resets).
- Do not retry unchanged payloads on validation/auth errors (`400`, `401`, `404`, `422`).
- Use bounded exponential backoff when retrying.
- Preserve SSE-first behavior on retries; use polling only as fallback.
## Unknowns
OpenAPI in this workspace does not declare concrete rate-limit windows or global timeout budgets.
Mark both as unknown unless an endpoint-specific OpenAPI explicitly documents them.
FILE:scripts/create_and_wait.py
#!/usr/bin/env python3
"""Create generation via SSE, then fallback to polling until terminal status."""
from __future__ import annotations
import argparse
import json
import os
import socket
import subprocess
import sys
from pathlib import Path
from typing import Any
from urllib import error, request
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
TERMINAL_EVENT_NAMES = {
"completed",
"complete",
"succeeded",
"success",
"failed",
"failure",
"canceled",
"cancelled",
"done",
"finished",
"terminal",
}
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def load_payload(args: argparse.Namespace) -> dict[str, Any]:
if args.payload_file:
raw = Path(args.payload_file).read_text(encoding="utf-8")
payload = parse_json_or_text(raw)
elif args.payload is not None:
payload = parse_json_or_text(args.payload)
else:
payload = {}
if payload is None:
return {}
if not isinstance(payload, dict):
raise SystemExit("Payload must be a JSON object.")
return payload
def endpoint_url(base_url: str, endpoint: str) -> str:
if endpoint.startswith("http://") or endpoint.startswith("https://"):
return endpoint
if not endpoint.startswith("/"):
endpoint = "/" + endpoint
return f"{base_url.rstrip('/')}{endpoint}"
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def find_generation_id(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"generation_id",
"generationId",
"prediction_id",
"predictionId",
"task_id",
"taskId",
"id",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def is_terminal_event_name(event_name: str | None) -> bool:
if not event_name:
return False
return event_name.strip().lower() in TERMINAL_EVENT_NAMES
def run_sse(
url: str,
api_key: str,
payload: dict[str, Any],
request_timeout: float,
) -> tuple[int, str | None, Any]:
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
"Content-Type": "application/json",
},
data=json.dumps(payload).encode("utf-8"),
method="POST",
)
generation_id: str | None = None
terminal_payload: Any = None
event_name = "message"
data_lines: list[str] = []
def flush_event() -> tuple[bool, int]:
nonlocal event_name, data_lines, generation_id, terminal_payload
if not data_lines:
event_name = "message"
return False, 0
text = "\n".join(data_lines)
parsed = parse_json_or_text(text)
status_value, status_key = find_status(parsed)
status_normalized = status_value.upper() if status_value else None
event_terminal = (status_normalized in TERMINAL_STATUSES) or is_terminal_event_name(event_name)
current_id, _ = find_generation_id(parsed)
if current_id and not generation_id:
generation_id = current_id
emit(
{
"event": "sse_event",
"sse_event_name": event_name,
"generation_id": generation_id,
"status": status_value,
"status_key": status_key,
"terminal": event_terminal,
"payload": parsed,
}
)
data_lines = []
event_name = "message"
if event_terminal:
terminal_payload = parsed
if status_normalized in SUCCESS_STATUSES:
return True, 0
if status_normalized in TERMINAL_STATUSES:
return True, 10
return True, 0
return False, 0
try:
with request.urlopen(req, timeout=request_timeout) as resp:
emit({"event": "sse_open", "http_status": resp.getcode(), "url": url})
for raw_line in resp:
line = raw_line.decode("utf-8", errors="replace").rstrip("\r\n")
if not line:
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
event_name = line.split(":", 1)[1].strip() or "message"
elif line.startswith("data:"):
data_lines.append(line.split(":", 1)[1].lstrip())
done, rc = flush_event()
if done:
return rc, generation_id, terminal_payload
emit(
{
"event": "sse_stream_ended",
"generation_id": generation_id,
"message": "SSE stream ended before terminal status.",
}
)
return 100, generation_id, None
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
parsed = parse_json_or_text(body)
emit(
{
"event": "sse_http_error",
"http_status": exc.code,
"message": extract_message(parsed),
"payload": parsed,
}
)
return 101, generation_id, parsed
except (socket.timeout, TimeoutError) as exc:
emit({"event": "sse_timeout", "message": str(exc), "generation_id": generation_id})
return 102, generation_id, None
except error.URLError as exc:
emit({"event": "sse_connect_error", "message": str(exc), "generation_id": generation_id})
return 103, generation_id, None
def run_poll_fallback(
generation_id: str,
base_url: str,
timeout: float,
interval: float,
request_timeout: float,
) -> int:
wait_script = Path(__file__).with_name("wait_generation.py")
if not wait_script.exists():
emit(
{
"event": "error",
"error": "wait_script_not_found",
"message": f"Missing fallback script: {wait_script}",
}
)
return 2
cmd = [
sys.executable,
str(wait_script),
"--generation-id",
generation_id,
"--base-url",
base_url,
"--timeout",
str(timeout),
"--interval",
str(interval),
"--request-timeout",
str(request_timeout),
]
emit(
{
"event": "fallback_polling_start",
"generation_id": generation_id,
"command": " ".join(cmd),
}
)
result = subprocess.run(cmd, check=False)
emit(
{
"event": "fallback_polling_end",
"generation_id": generation_id,
"exit_code": result.returncode,
}
)
return result.returncode
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--sse-endpoint", required=True, help="SSE create endpoint path or full URL")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--payload", help="JSON object payload string")
parser.add_argument("--payload-file", help="Path to JSON payload file")
parser.add_argument("--sse-request-timeout", type=float, default=120.0)
parser.add_argument("--poll-timeout", type=float, default=900.0)
parser.add_argument("--poll-interval", type=float, default=3.0)
parser.add_argument("--poll-request-timeout", type=float, default=20.0)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before calling skills.video APIs.",
}
)
return 1
if args.payload is not None and args.payload_file:
emit(
{
"event": "error",
"error": "invalid_arguments",
"message": "Use either --payload or --payload-file, not both.",
}
)
return 1
payload = load_payload(args)
url = endpoint_url(args.base_url, args.sse_endpoint)
emit({"event": "start", "url": url, "mode": "sse_then_poll_fallback"})
sse_rc, generation_id, terminal_payload = run_sse(
url=url,
api_key=api_key,
payload=payload,
request_timeout=args.sse_request_timeout,
)
if sse_rc in (0, 10):
emit(
{
"event": "terminal",
"source": "sse",
"ok": sse_rc == 0,
"generation_id": generation_id,
"response": terminal_payload,
}
)
return sse_rc
if not generation_id:
emit(
{
"event": "error",
"error": "missing_generation_id",
"message": "SSE did not return generation id; cannot start poll fallback.",
"sse_exit_code": sse_rc,
}
)
return 3
return run_poll_fallback(
generation_id=generation_id,
base_url=args.base_url,
timeout=args.poll_timeout,
interval=args.poll_interval,
request_timeout=args.poll_request_timeout,
)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/ensure_api_key.py
#!/usr/bin/env python3
"""Check SKILLS_VIDEO_API_KEY and print setup guidance when missing."""
from __future__ import annotations
import json
import os
def main() -> int:
key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if key:
print(
json.dumps(
{
"ok": True,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "API key detected.",
},
ensure_ascii=False,
indent=2,
)
)
return 0
print(
json.dumps(
{
"ok": False,
"env_var": "SKILLS_VIDEO_API_KEY",
"message": "Missing API key. Configure it before calling skills.video APIs.",
"dashboard_url": "https://skills.video/dashboard/developer",
"how_to_get_key": [
"Sign in at the dashboard URL.",
"Click 'Create API Key'.",
"Copy the generated key.",
],
"set_env_examples": [
"export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"",
"echo 'export SKILLS_VIDEO_API_KEY=\"<YOUR_API_KEY>\"' >> ~/.zshrc && source ~/.zshrc",
],
},
ensure_ascii=False,
indent=2,
)
)
return 1
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/handle_runtime_error.py
#!/usr/bin/env python3
"""Classify skills.video runtime API errors and provide actionable guidance."""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
INSUFFICIENT_CREDITS_KEYWORDS = (
"insufficient credit",
"not enough credit",
"insufficient balance",
"credit balance",
"top up",
"recharge",
)
def load_body(body: str | None, body_file: str | None) -> Any:
if body_file:
raw = Path(body_file).read_text(encoding="utf-8")
elif body is not None:
raw = body
else:
return None
raw = raw.strip()
if not raw:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return raw
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error = payload.get("error")
if isinstance(error, dict):
nested = error.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def classify(status: int, message: str) -> str:
lowered = message.lower()
if status == 402 or any(keyword in lowered for keyword in INSUFFICIENT_CREDITS_KEYWORDS):
return "insufficient_credits"
if status == 401 or "unauthorized" in lowered:
return "auth"
if status == 422 or "validation" in lowered:
return "validation"
if status == 404:
return "not_found"
if status == 429 or status >= 500:
return "transient"
return "unknown"
def exit_code_for(category: str) -> int:
table = {
"insufficient_credits": 20,
"auth": 21,
"validation": 22,
"not_found": 23,
"transient": 24,
"unknown": 25,
}
return table.get(category, 25)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--status", type=int, required=True, help="HTTP status code")
parser.add_argument("--body", help="Raw response body JSON/text")
parser.add_argument("--body-file", help="Path to file containing raw response body")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--credits-endpoint", default="/credits")
args = parser.parse_args()
payload = load_body(args.body, args.body_file)
message = extract_message(payload)
category = classify(args.status, message)
guidance: list[str] = []
should_retry = False
if category == "insufficient_credits":
guidance = [
"Credits are insufficient for this request.",
"Open https://skills.video/dashboard and go to Billing/Credits to recharge.",
"After recharge, retry the same request.",
]
elif category == "auth":
guidance = [
"Authentication failed.",
"Verify SKILLS_VIDEO_API_KEY and retry.",
]
elif category == "validation":
guidance = [
"Request payload validation failed.",
"Fix request parameters based on OpenAPI schema and retry.",
]
elif category == "not_found":
guidance = [
"Resource or endpoint was not found.",
"Recheck endpoint path/model id and generation id.",
]
elif category == "transient":
guidance = [
"Transient server or rate-limit error.",
"Retry with bounded exponential backoff.",
]
should_retry = True
else:
guidance = [
"Unhandled runtime error.",
"Inspect response payload and apply a safe fallback path.",
]
credits_url = f"{args.base_url.rstrip('/')}{args.credits_endpoint}"
output = {
"category": category,
"status": args.status,
"message": message,
"should_retry": should_retry,
"guidance": guidance,
"credits_check_command": (
f"curl -X GET \"{credits_url}\" "
"-H \"Authorization: Bearer $SKILLS_VIDEO_API_KEY\""
),
"recharge": {
"dashboard_url": "https://skills.video/dashboard",
"pricing_url": "https://skills.video/pricing",
},
}
print(json.dumps(output, ensure_ascii=False, indent=2))
return exit_code_for(category)
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/inspect_openapi.py
#!/usr/bin/env python3
"""Inspect skills.video generation OpenAPI contracts and emit endpoint-ready summaries."""
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from typing import Any
DEFAULT_CATEGORY = "images"
METHOD_AND_PATH_RE = re.compile(r"^(GET|POST|PUT|PATCH|DELETE)\s+(.+)$", re.IGNORECASE)
def read_json(path: Path) -> dict[str, Any]:
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise SystemExit(f"File not found: {path}")
except json.JSONDecodeError as exc:
raise SystemExit(f"Invalid JSON in {path}: {exc}")
def find_group_nodes(node: Any, group_name: str) -> list[dict[str, Any]]:
matches: list[dict[str, Any]] = []
if isinstance(node, dict):
if node.get("group") == group_name and isinstance(node.get("pages"), list):
matches.append(node)
for value in node.values():
matches.extend(find_group_nodes(value, group_name))
elif isinstance(node, list):
for item in node:
matches.extend(find_group_nodes(item, group_name))
return matches
def collect_method_paths(node: Any) -> list[str]:
rows: list[str] = []
if isinstance(node, str):
match = METHOD_AND_PATH_RE.match(node.strip())
if match:
method = match.group(1).upper()
path = match.group(2).strip()
if method == "POST" and path.startswith("/generation/"):
rows.append(f"{method} {path}")
elif isinstance(node, dict):
if "pages" in node:
rows.extend(collect_method_paths(node["pages"]))
elif isinstance(node, list):
for item in node:
rows.extend(collect_method_paths(item))
return rows
def unique_in_order(items: list[str]) -> list[str]:
seen: set[str] = set()
output: list[str] = []
for item in items:
if item not in seen:
seen.add(item)
output.append(item)
return output
def list_endpoints_from_docs(docs: dict[str, Any], category: str) -> list[str]:
group_name = "Videos" if category == "videos" else "Images"
rows: list[str] = []
for group in find_group_nodes(docs, group_name):
rows.extend(collect_method_paths(group.get("pages", [])))
return unique_in_order(rows)
def parse_endpoint_arg(endpoint: str) -> tuple[str, str]:
endpoint = endpoint.strip()
match = METHOD_AND_PATH_RE.match(endpoint)
if match:
return match.group(1).upper(), match.group(2).strip()
return "POST", endpoint
def try_paths(path: str) -> list[str]:
candidates = [path]
if path.startswith("/v1/"):
candidates.append(path[3:])
elif path.startswith("/"):
candidates.append(f"/v1{path}")
else:
candidates.append(f"/v1/{path}")
return unique_in_order(candidates)
def resolve_operation(openapi: dict[str, Any], method: str, path: str) -> tuple[str, dict[str, Any]]:
paths = openapi.get("paths", {})
for candidate in try_paths(path):
operations = paths.get(candidate)
if isinstance(operations, dict) and method.lower() in operations:
operation = operations[method.lower()]
if isinstance(operation, dict):
return candidate, operation
raise SystemExit(f"Endpoint not found in OpenAPI: {method} {path}")
def endpoint_exists(openapi: dict[str, Any], method: str, path: str) -> bool:
operations = openapi.get("paths", {}).get(path)
return isinstance(operations, dict) and method.lower() in operations
def to_sse_path(path: str) -> str:
if "/generation/sse/" in path:
return path
return path.replace("/generation/", "/generation/sse/", 1)
def to_polling_path(path: str) -> str:
return path.replace("/generation/sse/", "/generation/", 1)
def find_sse_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
sse_candidate = to_sse_path(resolved_path)
if endpoint_exists(openapi, method, sse_candidate):
return sse_candidate
if "/generation/sse/" in resolved_path and endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_polling_create_endpoint(openapi: dict[str, Any], method: str, resolved_path: str) -> str | None:
polling_candidate = to_polling_path(resolved_path)
if endpoint_exists(openapi, method, polling_candidate):
return polling_candidate
if endpoint_exists(openapi, method, resolved_path):
return resolved_path
return None
def find_subscribe_sse_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/predictions/\{[^}]+\}/subscribe$", path):
return path
return None
def build_default_and_sse_endpoints(
openapi: dict[str, Any], endpoints: list[str]
) -> tuple[list[str], list[str]]:
default_endpoints: list[str] = []
sse_endpoints: list[str] = []
for item in endpoints:
method, path = parse_endpoint_arg(item)
sse_path = to_sse_path(path)
if endpoint_exists(openapi, method, sse_path):
resolved = f"{method} {sse_path}"
default_endpoints.append(resolved)
sse_endpoints.append(resolved)
else:
default_endpoints.append(f"{method} {path}")
return (
unique_in_order(default_endpoints),
unique_in_order(sse_endpoints),
)
def ref_name(ref: str) -> str:
return ref.rsplit("/", 1)[-1]
def resolve_schema(openapi: dict[str, Any], schema: dict[str, Any]) -> tuple[str | None, dict[str, Any]]:
if "$ref" not in schema:
return None, schema
name = ref_name(schema["$ref"])
resolved = openapi.get("components", {}).get("schemas", {}).get(name)
if not isinstance(resolved, dict):
raise SystemExit(f"Schema not found for ref: {schema['$ref']}")
return name, resolved
def allowed_values(schema: dict[str, Any]) -> list[Any] | None:
if isinstance(schema.get("enum"), list):
return schema["enum"]
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
consts = [item["const"] for item in branch if isinstance(item, dict) and "const" in item]
if consts:
return consts
return None
def summarize_type(schema: dict[str, Any]) -> str:
if "$ref" in schema:
return ref_name(schema["$ref"])
schema_type = schema.get("type")
if schema_type == "array":
item_schema = schema.get("items") if isinstance(schema.get("items"), dict) else {}
return f"array<{summarize_type(item_schema)}>"
if isinstance(schema_type, str):
return schema_type
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
variants: list[str] = []
for item in branch:
if not isinstance(item, dict):
continue
if "const" in item:
const_value = item["const"]
if isinstance(const_value, bool):
variants.append("boolean")
elif isinstance(const_value, int):
variants.append("integer")
elif isinstance(const_value, float):
variants.append("number")
elif isinstance(const_value, str):
variants.append("string")
else:
variants.append(type(const_value).__name__)
elif "$ref" in item:
variants.append(ref_name(item["$ref"]))
elif isinstance(item.get("type"), str):
variants.append(item["type"])
if variants:
return " | ".join(unique_in_order(variants))
return "object"
def template_value(field_name: str, schema: dict[str, Any]) -> Any:
if "default" in schema:
return schema["default"]
values = allowed_values(schema)
if values:
return values[0]
schema_type = schema.get("type")
if schema_type == "string":
if schema.get("format") == "uri":
return f"https://example.com/{field_name}.png"
if "prompt" in field_name:
return "Describe what to generate"
return f"<{field_name}>"
if schema_type in {"integer", "number"}:
return 1
if schema_type == "boolean":
return False
if schema_type == "array":
item = schema.get("items") if isinstance(schema.get("items"), dict) else {}
if item.get("format") == "uri":
return ["https://example.com/input.png"]
return []
if schema_type == "object":
return {}
for branch_key in ("anyOf", "oneOf"):
branch = schema.get(branch_key)
if not isinstance(branch, list):
continue
for item in branch:
if isinstance(item, dict) and "const" in item:
return item["const"]
return f"<{field_name}>"
def summarize_fields(schema: dict[str, Any]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
properties = schema.get("properties") if isinstance(schema.get("properties"), dict) else {}
required = schema.get("required") if isinstance(schema.get("required"), list) else []
required_set = set(required)
fields: list[dict[str, Any]] = []
template: dict[str, Any] = {}
for name, prop in properties.items():
if not isinstance(prop, dict):
continue
row: dict[str, Any] = {
"name": name,
"required": name in required_set,
"type": summarize_type(prop),
}
if isinstance(prop.get("description"), str):
row["description"] = prop["description"]
if "default" in prop:
row["default"] = prop["default"]
values = allowed_values(prop)
if values:
row["allowed_values"] = values
fields.append(row)
for name in required:
prop = properties.get(name)
if isinstance(prop, dict):
template[name] = template_value(name, prop)
for name, prop in properties.items():
if name in template:
continue
if isinstance(prop, dict) and "default" in prop:
template[name] = prop["default"]
return fields, template
def find_poll_endpoint(openapi: dict[str, Any]) -> str | None:
paths = openapi.get("paths", {})
if not isinstance(paths, dict):
return None
preferred = ["/generation/{id}", "/v1/generation/{request_id}", "/v1/generation/{id}"]
for path in preferred:
ops = paths.get(path)
if isinstance(ops, dict) and "get" in ops:
return path
for path, ops in paths.items():
if not isinstance(ops, dict):
continue
if "get" in ops and re.search(r"/generation/\{[^}]+\}", path):
return path
return None
def status_values(openapi: dict[str, Any], schema_name: str) -> list[str] | None:
schema = openapi.get("components", {}).get("schemas", {}).get(schema_name)
if isinstance(schema, dict) and isinstance(schema.get("enum"), list):
if all(isinstance(item, str) for item in schema["enum"]):
return schema["enum"]
return None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--openapi", required=True, help="Path to OpenAPI JSON")
parser.add_argument("--docs", help="Path to docs.json for Videos/Images group extraction")
parser.add_argument("--category", choices=["videos", "images"], default=DEFAULT_CATEGORY)
parser.add_argument("--list-endpoints", action="store_true", help="List POST generation endpoints from docs.json group")
parser.add_argument("--endpoint", help="Endpoint path or full string like 'POST /generation/...'")
parser.add_argument("--include-template", action="store_true", help="Include request_template in output")
parser.add_argument("--json-indent", type=int, default=2)
args = parser.parse_args()
openapi_path = Path(args.openapi)
openapi = read_json(openapi_path)
if args.list_endpoints:
if not args.docs:
raise SystemExit("--list-endpoints requires --docs <path/to/docs.json>")
docs = read_json(Path(args.docs))
polling_endpoints = list_endpoints_from_docs(docs, args.category)
default_endpoints, sse_endpoints = build_default_and_sse_endpoints(
openapi, polling_endpoints
)
print(
json.dumps(
{
"category": args.category,
"source": str(Path(args.docs).resolve()),
"default_result_mode": "sse" if sse_endpoints else "polling",
"default_endpoints": default_endpoints,
"sse_endpoints": sse_endpoints,
"polling_endpoints": polling_endpoints,
"poll_endpoint": find_poll_endpoint(openapi),
},
ensure_ascii=False,
indent=args.json_indent,
)
)
if not args.endpoint:
return 0
if not args.endpoint:
raise SystemExit("Provide --endpoint or use --list-endpoints only")
method, raw_path = parse_endpoint_arg(args.endpoint)
resolved_path, operation = resolve_operation(openapi, method, raw_path)
sse_create_endpoint = find_sse_create_endpoint(openapi, method, resolved_path)
polling_create_endpoint = find_polling_create_endpoint(openapi, method, resolved_path)
request_schema = (
operation.get("requestBody", {})
.get("content", {})
.get("application/json", {})
.get("schema", {})
)
if not isinstance(request_schema, dict):
request_schema = {}
request_schema_name, resolved_request_schema = resolve_schema(openapi, request_schema)
fields, request_template = summarize_fields(resolved_request_schema)
responses = operation.get("responses", {})
if not isinstance(responses, dict):
responses = {}
error_codes = sorted(
int(code) for code in responses.keys() if isinstance(code, str) and code.isdigit() and int(code) >= 400
)
servers = openapi.get("servers") if isinstance(openapi.get("servers"), list) else []
server_url = None
if servers and isinstance(servers[0], dict):
server_url = servers[0].get("url")
output: dict[str, Any] = {
"endpoint": f"{method} {resolved_path}",
"default_result_mode": "sse" if sse_create_endpoint else "polling",
"default_create_endpoint": f"{method} {sse_create_endpoint or polling_create_endpoint or resolved_path}",
"sse_create_endpoint": f"{method} {sse_create_endpoint}" if sse_create_endpoint else None,
"polling_create_endpoint": f"{method} {polling_create_endpoint}" if polling_create_endpoint else None,
"summary": operation.get("summary"),
"server_url": server_url,
"request_schema": request_schema_name,
"fields": fields,
"prediction_status_values": status_values(openapi, "PredictionStatus"),
"queue_status_values": status_values(openapi, "QueueState"),
"sse_subscribe_endpoint": find_subscribe_sse_endpoint(openapi),
"poll_endpoint": find_poll_endpoint(openapi),
"error_status_codes": error_codes,
}
if args.include_template:
output["request_template"] = request_template
print(json.dumps(output, ensure_ascii=False, indent=args.json_indent))
return 0
if __name__ == "__main__":
raise SystemExit(main())
FILE:scripts/wait_generation.py
#!/usr/bin/env python3
"""Poll a skills.video generation task until it reaches terminal status."""
from __future__ import annotations
import argparse
import json
import os
import time
from typing import Any
from urllib import error, request
TRANSIENT_HTTP_STATUSES = {429, 500, 502, 503, 504}
SUCCESS_STATUSES = {"COMPLETED", "SUCCEEDED"}
TERMINAL_STATUSES = SUCCESS_STATUSES | {"FAILED", "CANCELED", "CANCELLED"}
def parse_json_or_text(raw: str) -> Any:
text = raw.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
return text
def emit(payload: dict[str, Any]) -> None:
print(json.dumps(payload, ensure_ascii=False))
def extract_message(payload: Any) -> str:
if payload is None:
return ""
if isinstance(payload, str):
return payload
if isinstance(payload, dict):
message = payload.get("message")
if isinstance(message, str):
return message
error_obj = payload.get("error")
if isinstance(error_obj, dict):
nested = error_obj.get("message")
if isinstance(nested, str):
return nested
return json.dumps(payload, ensure_ascii=False)
def find_status(payload: Any) -> tuple[str | None, str | None]:
preferred_keys = (
"status",
"state",
"queue_state",
"queueState",
"prediction_status",
"predictionStatus",
)
def walk(node: Any) -> tuple[str | None, str | None]:
if isinstance(node, dict):
for key in preferred_keys:
value = node.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), key
for value in node.values():
found = walk(value)
if found[0]:
return found
elif isinstance(node, list):
for item in node:
found = walk(item)
if found[0]:
return found
return None, None
return walk(payload)
def fetch_generation(
base_url: str,
generation_id: str,
api_key: str,
request_timeout: float,
) -> tuple[int, Any]:
url = f"{base_url.rstrip('/')}/generation/{generation_id}"
req = request.Request(
url,
headers={
"Authorization": f"Bearer {api_key}",
"Accept": "application/json",
},
method="GET",
)
try:
with request.urlopen(req, timeout=request_timeout) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return resp.getcode(), parse_json_or_text(raw)
except error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return exc.code, parse_json_or_text(body)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--generation-id", required=True, help="Generation id to poll")
parser.add_argument("--base-url", default="https://open.skills.video/api/v1")
parser.add_argument("--interval", type=float, default=3.0, help="Poll interval in seconds")
parser.add_argument("--timeout", type=float, default=600.0, help="Total wait timeout in seconds")
parser.add_argument(
"--request-timeout",
type=float,
default=20.0,
help="HTTP request timeout per poll in seconds",
)
args = parser.parse_args()
api_key = os.environ.get("SKILLS_VIDEO_API_KEY", "").strip()
if not api_key:
emit(
{
"event": "error",
"error": "missing_api_key",
"message": "Set SKILLS_VIDEO_API_KEY before polling generation status.",
}
)
return 1
if args.interval <= 0:
emit({"event": "error", "error": "invalid_interval", "message": "--interval must be > 0"})
return 1
if args.timeout <= 0:
emit({"event": "error", "error": "invalid_timeout", "message": "--timeout must be > 0"})
return 1
start = time.monotonic()
attempt = 0
last_status: str | None = None
while True:
elapsed = time.monotonic() - start
if elapsed > args.timeout:
emit(
{
"event": "timeout",
"generation_id": args.generation_id,
"elapsed_seconds": round(elapsed, 2),
"last_status": last_status,
}
)
return 11
attempt += 1
http_status, payload = fetch_generation(
base_url=args.base_url,
generation_id=args.generation_id,
api_key=api_key,
request_timeout=args.request_timeout,
)
if http_status >= 400:
message = extract_message(payload)
transient = http_status in TRANSIENT_HTTP_STATUSES
emit(
{
"event": "poll_error",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"transient": transient,
"message": message,
}
)
if transient:
time.sleep(args.interval)
continue
return 12
status_value, status_key = find_status(payload)
normalized = status_value.upper() if status_value else None
terminal = normalized in TERMINAL_STATUSES if normalized else False
emit(
{
"event": "poll",
"generation_id": args.generation_id,
"attempt": attempt,
"http_status": http_status,
"status": status_value,
"status_key": status_key,
"terminal": terminal,
}
)
if status_value:
last_status = status_value
if terminal:
ok = normalized in SUCCESS_STATUSES
emit(
{
"event": "terminal",
"ok": ok,
"generation_id": args.generation_id,
"status": status_value,
"response": payload,
}
)
return 0 if ok else 10
time.sleep(args.interval)
if __name__ == "__main__":
raise SystemExit(main())