@clawhub-huiya-code-5a3e7c6084
Generate and edit images with Qwen Image via DashScope API. This is a skill, not a callable tool. First use the read tool to open this SKILL.md, then run the...
---
name: qwen-image
description: Generate and edit images with Qwen Image via DashScope API. This is a skill, not a callable tool. First use the read tool to open this SKILL.md, then run the script it specifies; never emit a tool call named qwen-image.
metadata: {"openclaw":{"emoji":"🖼️","requires":{"bins":["python3"],"env":["DASHSCOPE_API_KEY"]},"primaryEnv":"DASHSCOPE_API_KEY"}}
---
# Qwen Image Skill
Use this skill for:
- text-to-image generation
- image-to-image editing (single image)
- multi-image fusion/editing (1 to 3 input images)
- never for file renaming by image understanding
## Runtime behavior (strict)
- `qwen-image` is a skill name, not a built-in tool name.
- Never emit a tool call named `qwen-image`.
- First use `read` on this `SKILL.md`, then execute the Python command below.
- Hard requirement: do not answer from imagination. You must execute the script first.
- Hard requirement: do not output markdown image syntax like ``.
- Hard requirement: do not output JSON object in final assistant reply.
- Hard requirement: do not describe image content unless the script actually ran successfully.
- Hard requirement: never output `MEDIA:` in tool-stage outputs; only output `MEDIA:` in the final assistant reply.
- Hard requirement: never transform `MEDIA:` lines into markdown image links.
- Hard requirement: do not use this skill for OCR, pure image understanding, or filename renaming tasks.
- If the user asks to rename files by image content, use `qwen-vision-rename` instead.
- Run the command directly; do not output pre-check/process narration.
- Do not read or print this `SKILL.md` or script source unless command fails.
- Do not output installation/config instructions unless the user explicitly asks for setup.
- Use script flag `--emit-media-ref`.
- On success:
1) parse the last `MEDIA_REF:<path-or-url>` line from stdout (ignore preceding shell noise lines)
2) final reply must be exactly one line: `MEDIA:<path-or-url>`
3) if missing `MEDIA_REF:`, retry command once
- If command was not executed, do not send a final answer.
- On failure, output exactly 2 short Chinese sentences:
1) failure reason
2) actionable fix
## Setup
Install dependencies:
```bash
pip3 install -r {baseDir}/requirements.txt
```
Set API key:
```bash
export DASHSCOPE_API_KEY="your_api_key"
```
Optional region switch:
```bash
export DASHSCOPE_REGION="sg" # sg or bj
```
Or use `.env` (auto-loaded from current directory, then `{baseDir}`):
```bash
cat > .env <<'EOF'
DASHSCOPE_API_KEY=your_api_key
DASHSCOPE_REGION=sg
OPENCLAW_MEDIA_OUTBOUND_DIR=~/.openclaw/media/outbound
OPENCLAW_MEDIA_BASE_URL=
EOF
```
Static URL mapping example (Nginx):
```nginx
location /gen/ {
alias /home/huiya/.openclaw/media/outbound/;
autoindex off;
}
```
## Commands
Text to image:
```bash
python3 {baseDir}/scripts/qwen_image.py text2img \
--prompt "A futuristic tea shop in Shanghai at night, cinematic lighting" \
--model qwen-image-2.0-pro \
--size "1024*1024" \
--n 1 \
--emit-media-ref \
--publish-dir ~/.openclaw/media/outbound \
--out-dir {baseDir}/tmp/qwen-image
```
Image to image:
```bash
python3 {baseDir}/scripts/qwen_image.py img2img \
--images ./input.png \
--prompt "Keep composition, convert this to watercolor style" \
--model qwen-image-2.0-pro \
--n 1 \
--emit-media-ref \
--publish-dir ~/.openclaw/media/outbound \
--out-dir {baseDir}/tmp/qwen-image
```
## Notes
- Recommended default: `qwen-image-2.0-pro` (quality first). `qwen-image-2.0` can be used for faster/cheaper runs.
- Input images can be local paths, public URLs, or `data:image/...;base64,...`.
- Returned image URLs are temporary. The script downloads images immediately to `--out-dir`.
- Published images are copied to `OPENCLAW_MEDIA_OUTBOUND_DIR` (default: `~/.openclaw/media/outbound`).
- The script also writes `.view.html` preview pages and uses those URLs in visible text to reduce markdown-image rewrites.
- `OPENCLAW_MEDIA_BASE_URL` is optional. Keep it empty for portable packaging; set it per deployment only when you need public links (e.g. `https://example.com/gen` or local `http://127.0.0.1:8090`).
- `--emit-media-ref` + final one-line `MEDIA:` reply is recommended for Feishu to avoid duplicate media sends.
- If you specifically need plain text URL in Control UI, use `--reply-format link`.
- Existing shell environment variables override `.env` values.
- If endpoint is not explicitly set, the script auto-retries once with the other region endpoint when receiving `InvalidApiKey`.
FILE:requirements.txt
requests>=2.31.0
Pillow>=10.0.0
FILE:scripts/qwen_image.py
#!/usr/bin/env python3
"""
Qwen Image CLI
Supports:
- text2img: prompt -> image(s)
- img2img: input image(s) + prompt -> image(s)
"""
import argparse
import base64
import datetime as dt
import io
import json
import mimetypes
import os
import re
import shutil
import sys
from pathlib import Path
import requests
try:
from PIL import Image
except Exception:
Image = None
ENDPOINTS = {
"sg": "https://dashscope-intl.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation",
"bj": "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation",
}
class ApiError(RuntimeError):
def __init__(self, status_code: int, code: str, message: str, response_text: str):
self.status_code = status_code
self.code = code
self.message = message
self.response_text = response_text
super().__init__(f"API error {status_code} [{code}]: {message}")
def load_dotenv_files(paths: list[Path]) -> None:
key_pattern = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
for path in paths:
if not path.is_file():
continue
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[len("export ") :].strip()
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if not key_pattern.match(key):
continue
quoted = len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"')
if not quoted and " #" in value:
value = value.split(" #", 1)[0].rstrip()
elif quoted:
value = value[1:-1]
# Keep explicit shell environment highest priority, but allow
# .env to fill variables that are present with empty values.
if key not in os.environ or not os.environ.get(key, "").strip():
os.environ[key] = value
def resolve_dotenv_paths() -> list[Path]:
candidates = [
Path.cwd() / ".env",
Path(__file__).resolve().parent.parent / ".env",
]
dedup = []
seen = set()
for candidate in candidates:
resolved = str(candidate.resolve())
if resolved in seen:
continue
seen.add(resolved)
dedup.append(candidate)
return dedup
def encode_local_image(path_str: str) -> str:
path = Path(path_str).expanduser().resolve()
if not path.is_file():
raise FileNotFoundError(f"Image file not found: {path}")
mime, _ = mimetypes.guess_type(str(path))
if not mime:
mime = "image/png"
encoded = base64.b64encode(path.read_bytes()).decode("utf-8")
return f"data:{mime};base64,{encoded}"
def normalize_image_input(value: str) -> str:
val = value.strip()
lower = val.lower()
if lower.startswith("http://") or lower.startswith("https://") or lower.startswith("data:image/"):
return val
return encode_local_image(val)
def call_qwen_api(
endpoint: str,
api_key: str,
model: str,
content: list,
n: int,
size: str,
negative_prompt: str,
prompt_extend: bool,
watermark: bool,
) -> tuple[dict, list[str]]:
payload = {
"model": model,
"input": {
"messages": [
{
"role": "user",
"content": content,
}
]
},
"parameters": {
"n": n,
"negative_prompt": negative_prompt,
"prompt_extend": prompt_extend,
"watermark": watermark,
},
}
if size:
payload["parameters"]["size"] = size
response = requests.post(
endpoint,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=600,
)
if response.status_code != 200:
code = "UnknownError"
message = response.text
try:
err = response.json()
code = err.get("code", code)
message = err.get("message", message)
except ValueError:
pass
raise ApiError(
status_code=response.status_code,
code=code,
message=message,
response_text=response.text,
)
data = response.json()
choices = data.get("output", {}).get("choices", [])
if not choices:
raise RuntimeError(f"Unexpected response: {json.dumps(data, ensure_ascii=False)[:1000]}")
items = choices[0].get("message", {}).get("content", [])
urls = [item["image"] for item in items if isinstance(item, dict) and "image" in item]
if not urls:
raise RuntimeError(f"No image URL in response: {json.dumps(data, ensure_ascii=False)[:1000]}")
return data, urls
def download_images(urls: list[str], out_dir: Path) -> list[str]:
out_dir.mkdir(parents=True, exist_ok=True)
timestamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
files = []
for idx, url in enumerate(urls, start=1):
path = out_dir / f"qwen-{timestamp}-{idx:02d}.png"
resp = requests.get(url, timeout=300)
resp.raise_for_status()
path.write_bytes(resp.content)
files.append(str(path))
return files
def publish_images(
files: list[str],
urls: list[str],
publish_dir: Path,
public_base_url: str | None,
) -> tuple[list[str], list[str]]:
"""
Publish images into a stable outbound directory and optionally map to public URLs.
- If local files exist, copy them into publish_dir.
- If no local files exist, fetch from remote URLs and save into publish_dir.
"""
publish_dir.mkdir(parents=True, exist_ok=True)
timestamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
published_files: list[str] = []
if files:
for idx, src in enumerate(files, start=1):
src_path = Path(src).expanduser().resolve()
suffix = src_path.suffix or ".png"
dst = publish_dir / f"qwen-{timestamp}-{idx:02d}{suffix}"
shutil.copy2(src_path, dst)
published_files.append(str(dst))
else:
for idx, url in enumerate(urls, start=1):
dst = publish_dir / f"qwen-{timestamp}-{idx:02d}.png"
resp = requests.get(url, timeout=300)
resp.raise_for_status()
dst.write_bytes(resp.content)
published_files.append(str(dst))
published_urls: list[str] = []
if public_base_url:
base = public_base_url.strip().rstrip("/")
if base:
for item in published_files:
rel = Path(item).resolve().relative_to(publish_dir.resolve()).as_posix()
published_urls.append(f"{base}/{rel}")
return published_files, published_urls
def create_view_pages(
published_files: list[str],
publish_dir: Path,
public_base_url: str | None,
) -> tuple[list[str], list[str]]:
"""
Create lightweight HTML viewer pages for generated images.
Why:
- Some assistant models rewrite direct image URLs into markdown image syntax.
- Control UI flattens remote markdown images into plain alt text ("image").
- Returning an HTML viewer URL in text is more stable for visible/clickable links.
"""
view_files: list[str] = []
view_urls: list[str] = []
base = (public_base_url or "").strip().rstrip("/")
publish_dir_resolved = publish_dir.resolve()
for item in published_files:
image_path = Path(item).expanduser().resolve()
image_name = image_path.name
view_name = f"{image_path.stem}.view.html"
view_path = publish_dir_resolved / view_name
html = (
"<!doctype html>\n"
"<html lang=\"zh-CN\">\n"
"<head>\n"
" <meta charset=\"utf-8\" />\n"
" <meta name=\"viewport\" content=\"width=device-width, initial-scale=1\" />\n"
" <title>Qwen Image Preview</title>\n"
" <style>\n"
" body { margin: 0; background: #111; color: #ddd; font-family: sans-serif; }\n"
" .wrap { max-width: 980px; margin: 0 auto; padding: 16px; }\n"
" img { width: 100%; height: auto; border-radius: 8px; display: block; }\n"
" .meta { margin-top: 10px; font-size: 12px; color: #aaa; word-break: break-all; }\n"
" </style>\n"
"</head>\n"
"<body>\n"
" <div class=\"wrap\">\n"
f" <img src=\"{image_name}\" alt=\"qwen-generated-image\" />\n"
f" <div class=\"meta\">{image_name}</div>\n"
" </div>\n"
"</body>\n"
"</html>\n"
)
view_path.write_text(html, encoding="utf-8")
view_files.append(str(view_path))
if base:
view_urls.append(f"{base}/{view_name}")
return view_files, view_urls
def build_preview_data_url_from_bytes(
image_bytes: bytes,
max_chars: int = 4000,
max_side: int = 192,
quality: int = 40,
) -> str | None:
"""Build a small data:image URL for inline rendering in Control UI."""
if Image is None:
return None
side_candidates = [max_side, 384, 320, 256, 224, 192, 160, 128, 96, 72]
quality_candidates = [quality, 60, 50, 40, 35, 30]
try:
with Image.open(io.BytesIO(image_bytes)) as src:
src = src.convert("RGB")
best_data_url = None
for side in side_candidates:
if side < 64:
continue
for q in quality_candidates:
img = src.copy()
img.thumbnail((side, side))
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=q, optimize=True)
payload = base64.b64encode(buf.getvalue()).decode("ascii")
data_url = f"data:image/jpeg;base64,{payload}"
best_data_url = data_url
if len(data_url) <= max_chars:
return data_url
return best_data_url if best_data_url and len(best_data_url) <= max_chars else None
except Exception:
return None
def fetch_url_bytes(url: str) -> bytes:
resp = requests.get(url, timeout=180)
resp.raise_for_status()
return resp.content
def to_preferred_media_ref(path_or_url: str, base_dir: Path | None = None) -> str:
value = path_or_url.strip()
lowered = value.lower()
if lowered.startswith("http://") or lowered.startswith("https://"):
return value
root = (base_dir or Path.cwd()).expanduser().resolve()
path = Path(value).expanduser()
if not path.is_absolute():
path = (root / path)
try:
path = path.resolve()
except Exception:
path = path.absolute()
try:
rel = path.relative_to(root).as_posix()
return f"./{rel}" if not rel.startswith(".") else rel
except Exception:
return str(path)
def choose_reply_media_refs(
urls: list[str],
files: list[str],
base_dir: Path | None = None,
preferred_urls: list[str] | None = None,
) -> list[str]:
if preferred_urls:
return [item.strip() for item in preferred_urls if item and item.strip()]
# Prefer downloaded local files so media loading does not depend on temporary URLs.
if files:
return [to_preferred_media_ref(item, base_dir=base_dir) for item in files]
return [to_preferred_media_ref(item, base_dir=base_dir) for item in urls]
def choose_channel_media_refs(urls: list[str], files: list[str]) -> list[str]:
"""
Prefer absolute local file paths for outbound media delivery.
Why:
- Private HTTP URLs can be blocked by SSRF protection in media fetch path.
- Local absolute paths work with OpenClaw media pipeline when local roots allow them.
"""
if files:
refs: list[str] = []
for item in files:
path = Path(item).expanduser()
try:
refs.append(str(path.resolve()))
except Exception:
refs.append(str(path.absolute()))
return refs
return [item.strip() for item in urls if item and item.strip()]
def validate_preview_options(max_side: int, quality: int, max_chars: int) -> None:
if max_side < 64:
raise SystemExit("--preview-max-side must be >= 64")
if quality < 20 or quality > 95:
raise SystemExit("--preview-quality must be in [20, 95]")
if max_chars < 1000:
raise SystemExit("--preview-max-chars must be >= 1000")
def add_common_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--model", default="qwen-image-2.0-pro", help="e.g. qwen-image-2.0 or qwen-image-2.0-pro")
parser.add_argument("--n", type=int, default=1, help="number of output images, range [1,6]")
parser.add_argument("--size", default="", help='optional size like "1024*1024"')
parser.add_argument("--negative-prompt", default=" ", help="negative prompt")
parser.add_argument("--no-prompt-extend", action="store_true", help="disable prompt extension")
parser.add_argument("--watermark", action="store_true", help="enable output watermark")
parser.add_argument("--out-dir", default="./tmp/qwen-image", help="directory to save downloaded images")
parser.add_argument(
"--publish-dir",
default=os.getenv("OPENCLAW_MEDIA_OUTBOUND_DIR", "~/.openclaw/media/outbound"),
help="directory to publish final images for outbound media delivery",
)
parser.add_argument(
"--public-base-url",
default=os.getenv("OPENCLAW_MEDIA_BASE_URL", ""),
help="public base URL mapped to --publish-dir, e.g. https://example.com/gen",
)
parser.add_argument(
"--no-publish-outbound",
action="store_true",
help="skip publishing images into --publish-dir",
)
parser.add_argument("--no-download", action="store_true", help="do not download generated images")
parser.add_argument(
"--preview-inline",
action="store_true",
help="include a small preview_data_url (data:image/jpeg;base64,...) for inline UI rendering",
)
parser.add_argument("--preview-max-side", type=int, default=192, help="max side length for preview image")
parser.add_argument("--preview-quality", type=int, default=40, help="jpeg quality for preview image")
parser.add_argument(
"--preview-max-chars",
type=int,
default=4000,
help="max characters for preview_data_url to avoid chat truncation",
)
parser.add_argument(
"--emit-openclaw-reply",
action="store_true",
help="print final 2-line OpenClaw reply (Chinese text + markdown image URL)",
)
parser.add_argument(
"--reply-file",
default="",
help="optional file path to store final OpenClaw reply text when --emit-openclaw-reply is set",
)
parser.add_argument(
"--reply-format",
choices=["media", "markdown", "payload", "link"],
default="markdown",
help="OpenClaw reply format when --emit-openclaw-reply is enabled",
)
parser.add_argument(
"--emit-media-ref",
action="store_true",
help="print MEDIA_REF:<path-or-url> for skill parsing without emitting MEDIA: in tool output",
)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Qwen image text2img / img2img CLI")
parser.add_argument("--region", choices=["sg", "bj"], default=os.getenv("DASHSCOPE_REGION", "sg"))
parser.add_argument("--endpoint", default=os.getenv("DASHSCOPE_ENDPOINT", ""))
subparsers = parser.add_subparsers(dest="mode", required=True)
t2i = subparsers.add_parser("text2img", help="generate images from text")
t2i.add_argument("--prompt", required=True, help="generation prompt")
add_common_args(t2i)
i2i = subparsers.add_parser("img2img", help="edit/fuse images with text prompt")
i2i.add_argument("--images", nargs="+", required=True, help="1~3 image inputs: path/url/base64")
i2i.add_argument("--prompt", required=True, help="editing prompt")
add_common_args(i2i)
return parser
def main() -> int:
load_dotenv_files(resolve_dotenv_paths())
parser = build_parser()
args = parser.parse_args()
api_key = os.getenv("DASHSCOPE_API_KEY", "").strip()
if not api_key:
raise SystemExit("Missing DASHSCOPE_API_KEY")
endpoint = args.endpoint or ENDPOINTS[args.region]
endpoint_from_env = bool(os.getenv("DASHSCOPE_ENDPOINT", "").strip())
endpoint_explicit = bool(args.endpoint or endpoint_from_env)
if args.n < 1 or args.n > 6:
raise SystemExit("--n must be in [1, 6]")
if args.mode == "text2img":
content = [{"text": args.prompt}]
else:
if len(args.images) < 1 or len(args.images) > 3:
raise SystemExit("img2img requires 1~3 images")
content = [{"image": normalize_image_input(item)} for item in args.images]
content.append({"text": args.prompt})
attempted_endpoints = [endpoint]
# If endpoint is not explicitly pinned, auto-fallback once for region/key mismatch.
if not endpoint_explicit:
alt = ENDPOINTS["bj"] if endpoint == ENDPOINTS["sg"] else ENDPOINTS["sg"]
attempted_endpoints.append(alt)
last_error = None
data = None
urls = None
used_endpoint = endpoint
for idx, ep in enumerate(attempted_endpoints):
try:
data, urls = call_qwen_api(
endpoint=ep,
api_key=api_key,
model=args.model,
content=content,
n=args.n,
size=args.size,
negative_prompt=args.negative_prompt,
prompt_extend=not args.no_prompt_extend,
watermark=args.watermark,
)
used_endpoint = ep
if idx > 0:
print(
"Notice: initial endpoint auth failed; switched region endpoint automatically.",
file=sys.stderr,
)
break
except ApiError as err:
last_error = err
can_retry = (
idx == 0
and len(attempted_endpoints) > 1
and err.status_code == 401
and err.code == "InvalidApiKey"
)
if can_retry:
continue
if err.status_code == 401 and err.code == "InvalidApiKey":
raise SystemExit(
"Authentication failed: InvalidApiKey.\n"
"Check DASHSCOPE_API_KEY and endpoint/region match.\n"
"Try one of these:\n"
"1) export DASHSCOPE_REGION=bj (mainland endpoint)\n"
"2) export DASHSCOPE_REGION=sg (international endpoint)\n"
"3) unset DASHSCOPE_API_KEY and rely on .env value\n"
"4) regenerate key from Model Studio and update .env"
) from err
raise
if data is None or urls is None:
if last_error is not None:
raise last_error
raise SystemExit("Unknown error: no API response")
out_dir = Path(args.out_dir).expanduser()
files = []
if not args.no_download:
files = download_images(urls, out_dir)
published_files: list[str] = []
published_urls: list[str] = []
published_view_files: list[str] = []
published_view_urls: list[str] = []
if not args.no_publish_outbound:
published_files, published_urls = publish_images(
files=files,
urls=urls,
publish_dir=Path(args.publish_dir).expanduser(),
public_base_url=args.public_base_url.strip() or None,
)
published_view_files, published_view_urls = create_view_pages(
published_files=published_files,
publish_dir=Path(args.publish_dir).expanduser(),
public_base_url=args.public_base_url.strip() or None,
)
preview_data_url = None
if args.preview_inline:
validate_preview_options(
max_side=args.preview_max_side,
quality=args.preview_quality,
max_chars=args.preview_max_chars,
)
try:
if files:
image_bytes = Path(files[0]).read_bytes()
else:
image_bytes = fetch_url_bytes(urls[0])
preview_data_url = build_preview_data_url_from_bytes(
image_bytes=image_bytes,
max_chars=args.preview_max_chars,
max_side=args.preview_max_side,
quality=args.preview_quality,
)
except Exception:
preview_data_url = None
result = {
"request_id": data.get("request_id"),
"model": args.model,
"endpoint": used_endpoint,
"urls": urls,
"files": files,
"published_files": published_files,
"published_urls": published_urls,
"published_view_files": published_view_files,
"published_view_urls": published_view_urls,
"preview_data_url": preview_data_url,
}
(out_dir / "result.json").write_text(
json.dumps(result, ensure_ascii=False, indent=2),
encoding="utf-8",
)
media_refs = choose_reply_media_refs(
urls=urls,
files=published_files or files,
base_dir=Path.cwd(),
preferred_urls=published_urls,
)
channel_media_refs = choose_channel_media_refs(
urls=urls,
files=published_files or files,
)
if args.emit_media_ref:
primary_ref = channel_media_refs[0] if channel_media_refs else urls[0]
# Skill-level handshake line: parse this and emit MEDIA only in final assistant reply.
print(f"MEDIA_REF:{primary_ref}")
if args.emit_openclaw_reply:
if args.reply_format == "payload":
primary_media = channel_media_refs[0] if channel_media_refs else urls[0]
primary_text_link = (
published_view_urls[0]
if published_view_urls
else primary_media
)
payload_text = "\n".join(["已为你生成图片。", f"链接(页面):{primary_text_link}"])
payload = {
"text": payload_text,
"mediaUrl": primary_media,
}
if len(channel_media_refs) > 1:
payload["mediaUrls"] = channel_media_refs
reply_text = json.dumps(payload, ensure_ascii=False)
elif args.reply_format == "link":
primary_text_link = (
published_view_urls[0]
if published_view_urls
else (
to_preferred_media_ref(published_view_files[0], base_dir=Path.cwd())
if published_view_files
else (media_refs[0] if media_refs else urls[0])
)
)
# Text-only mode: avoid MEDIA directives so Control UI will not append "image/图片".
reply_text = "\n".join(["已为你生成图片。", f"链接(页面):{primary_text_link}"])
elif args.reply_format == "media":
media_lines = (
[f"MEDIA:{ref}" for ref in channel_media_refs]
if channel_media_refs
else [f"MEDIA:{urls[0]}"]
)
# Media-first mode for chat channels (e.g. Feishu): send attachment directly.
reply_text = "\n".join(media_lines)
else:
if media_refs and (media_refs[0].startswith("http://") or media_refs[0].startswith("https://")):
reply_text = f""
else:
# Control UI chat markdown image may fallback to data:image URLs.
inline_data_url = preview_data_url
if inline_data_url is None:
validate_preview_options(
max_side=args.preview_max_side,
quality=args.preview_quality,
max_chars=args.preview_max_chars,
)
try:
if files:
image_bytes = Path(files[0]).read_bytes()
else:
image_bytes = fetch_url_bytes(urls[0])
inline_data_url = build_preview_data_url_from_bytes(
image_bytes=image_bytes,
max_chars=args.preview_max_chars,
max_side=args.preview_max_side,
quality=args.preview_quality,
)
except Exception:
inline_data_url = None
if inline_data_url:
# Keep a single markdown image line to minimize model rewrites.
reply_text = f""
else:
media_lines = [f"MEDIA:{ref}" for ref in media_refs] if media_refs else [f"MEDIA:{urls[0]}"]
reply_text = "\n".join(["已为你生成图片。", *media_lines])
reply_path = Path(args.reply_file).expanduser() if args.reply_file else (out_dir / "reply.txt")
reply_path.parent.mkdir(parents=True, exist_ok=True)
reply_path.write_text(reply_text, encoding="utf-8")
print(f"REPLY_FILE:{reply_path}")
elif not args.emit_media_ref:
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
根据图片内容批量重命名本地图片文件。这是一个 skill,不是可调用工具;先用 read 打开本文件,再执行脚本命令,绝不能直接发出名为 qwen-vision-rename 的 tool call。用户提到“改名/重命名/按图片内容命名/整理图片文件名/整理图片”时必须使用本技能。默认直接执行改名,仅在用户明确...
---
name: qwen-vision-rename
description: 根据图片内容批量重命名本地图片文件。这是一个 skill,不是可调用工具;先用 read 打开本文件,再执行脚本命令,绝不能直接发出名为 qwen-vision-rename 的 tool call。用户提到“改名/重命名/按图片内容命名/整理图片文件名/整理图片”时必须使用本技能。默认直接执行改名,仅在用户明确要求“预览/试运行/dry-run”时先出计划不改文件。
metadata: {"openclaw":{"emoji":"🧭","requires":{"bins":["python3"],"env":["DASHSCOPE_API_KEY"]},"primaryEnv":"DASHSCOPE_API_KEY"}}
---
# Qwen Vision Rename Skill
Use this skill for:
- understanding image content (single image)
- batch generating content-based filenames for local image folders
- direct batch rename with rollback support
- naming pattern `类型-主题` (example: `邀请函-万人大会主视觉`)
## Runtime behavior (strict)
- `qwen-vision-rename` is a skill name, not a built-in tool name.
- First use the `read` tool to open this `SKILL.md`, then run the Python command below.
- Never emit a tool call named `qwen-vision-rename`.
- Always execute the script. Do not fabricate recognition results.
- For rename requests, default to direct execution: run `rename-dir --apply`.
- If the user explicitly says "预览/试运行/dry-run/先看方案", run without `--apply`.
- Do not call `qwen-image` for rename tasks.
- Requests like "整理图片/按内容分类整理" still map to this skill. This skill renames by content first; if the user explicitly asks to move files into folders, explain that separately.
- If user does not provide a folder path, run `rename-dir` without `--dir` and let script auto-select default image directory.
- Filename style should be `类型-主题` first, not only plain title.
- If `--apply` is used, return both `plan_file` and `rollback_file`.
- If command fails, explain failure in 2 short Chinese sentences and provide next fix.
## Setup
Install dependencies:
```bash
pip3 install -r {baseDir}/requirements.txt
```
Set API key:
```bash
export DASHSCOPE_API_KEY="your_api_key"
```
Optional model and endpoint:
```bash
export DASHSCOPE_BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"
export DASHSCOPE_VISION_MODEL="qwen-vl-max-latest"
```
## Commands
Describe one image:
```bash
python3 {baseDir}/scripts/vision_rename.py describe \
--image /path/to/image.jpg
```
Batch dry-run plan (recommended first step):
```bash
python3 {baseDir}/scripts/vision_rename.py rename-dir \
--dir /path/to/images
```
Batch apply rename:
```bash
python3 {baseDir}/scripts/vision_rename.py rename-dir \
--dir /path/to/images \
--apply
```
Auto directory (no explicit `--dir`, script picks default image directory):
```bash
python3 {baseDir}/scripts/vision_rename.py rename-dir --apply
```
Rollback by rollback file:
```bash
python3 {baseDir}/scripts/vision_rename.py rollback \
--rollback-file /path/to/rename-rollback-YYYYMMDD-HHMMSS.json
```
## Notes
- Supported image extensions: jpg, jpeg, png, webp, bmp, gif, tif, tiff, heic, heif.
- Default naming style: `类型-主题` (for example `邀请函-活动主视觉.jpg`).
- Duplicate names auto-suffix with `-02`, `-03` ...
- `rename-dir` writes a JSON plan file every run.
- `--apply` creates rollback JSON for reverse operation.
- If `--dir` is omitted, script tries `OPENCLAW_RENAME_DEFAULT_DIR`, then `~/图片`, then `~/Pictures`.
- On API failure for a single image, the default behavior is skip that file and continue.
FILE:requirements.txt
requests>=2.27.1,<2.28; python_version<'3.7'
requests>=2.31.0; python_version>='3.7'
Pillow<9
FILE:scripts/vision_rename.py
#!/usr/bin/env python3
"""
Vision rename CLI (OpenAI-compatible multimodal API)
Use cases:
- describe a single image
- batch-generate filename titles for a directory
- apply renames with rollback file
"""
import argparse
import base64
import datetime as dt
import hashlib
import ipaddress
import json
import mimetypes
import os
import re
import shutil
from pathlib import Path
from typing import Any, Dict, List, Set, Tuple
from urllib.parse import urlparse
import requests
try:
from PIL import Image
except Exception:
Image = None
DEFAULT_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
DEFAULT_MODEL = "qwen-vl-max-latest"
DEFAULT_MEDIA_OUTBOUND_DIR = "~/.openclaw/media/outbound"
OPENCLAW_CONFIG_PATH = Path.home() / ".openclaw" / "openclaw.json"
PUBLIC_MEDIA_BASE_URL_FILE = Path.home() / ".openclaw" / "media" / "public_base_url.txt"
DEFAULT_PROMPT = (
"你是图片文件重命名助手。"
"请识别图片类型和主题,并只输出严格 JSON:{\"type\":\"类型\",\"title\":\"主题\"}。"
"type 示例:邀请函、海报、聊天截图、人物照、宠物照、风景照、商品图、证件照、菜单、其他。"
"title 要具体,4到12个汉字,不要标点,不要扩展名,不要解释。"
"不要输出 JSON 以外的任何内容。"
)
IMAGE_EXTENSIONS = {
".jpg",
".jpeg",
".png",
".webp",
".bmp",
".gif",
".tif",
".tiff",
".heic",
".heif",
}
PLACEHOLDER_API_KEYS = {
"",
"your_api_key",
"your-api-key",
"sk-your_api_key",
"none",
"null",
}
_OPENCLAW_CONFIG_CACHE = None # type: Dict[str, Any]
def load_openclaw_config() -> Dict[str, Any]:
global _OPENCLAW_CONFIG_CACHE
if _OPENCLAW_CONFIG_CACHE is not None:
return _OPENCLAW_CONFIG_CACHE
try:
_OPENCLAW_CONFIG_CACHE = json.loads(OPENCLAW_CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
_OPENCLAW_CONFIG_CACHE = {}
return _OPENCLAW_CONFIG_CACHE
def get_openclaw_skill_entry(skill_name: str) -> Dict[str, Any]:
config = load_openclaw_config()
entries = config.get("skills", {}).get("entries", {})
entry = entries.get(skill_name, {})
return entry if isinstance(entry, dict) else {}
def get_openclaw_skill_env(skill_name: str) -> Dict[str, str]:
entry = get_openclaw_skill_entry(skill_name)
env = entry.get("env", {})
return env if isinstance(env, dict) else {}
def first_non_empty(*values: Any) -> str:
for value in values:
if isinstance(value, str) and value.strip():
return value.strip()
return ""
def is_placeholder_api_key(value: str) -> bool:
lowered = value.strip().lower()
return lowered in PLACEHOLDER_API_KEYS or "your_api_key" in lowered
def resolve_runtime_api_key() -> str:
env_key = os.getenv("DASHSCOPE_API_KEY", "").strip()
if env_key and not is_placeholder_api_key(env_key):
return env_key
configured = get_openclaw_skill_entry("qwen-vision-rename").get("apiKey", "")
if isinstance(configured, str) and configured.strip() and not is_placeholder_api_key(configured):
return configured.strip()
return ""
def resolve_runtime_base_url(cli_value: str) -> str:
skill_env = get_openclaw_skill_env("qwen-vision-rename")
return first_non_empty(
cli_value,
os.getenv("DASHSCOPE_BASE_URL", ""),
os.getenv("OPENAI_BASE_URL", ""),
skill_env.get("DASHSCOPE_BASE_URL", ""),
DEFAULT_BASE_URL,
)
def resolve_runtime_model(cli_value: str) -> str:
skill_env = get_openclaw_skill_env("qwen-vision-rename")
return first_non_empty(
cli_value,
os.getenv("DASHSCOPE_VISION_MODEL", ""),
os.getenv("OPENCLAW_IMAGE_UNDERSTAND_MODEL", ""),
skill_env.get("DASHSCOPE_VISION_MODEL", ""),
DEFAULT_MODEL,
)
def resolve_image_mode() -> str:
skill_env = get_openclaw_skill_env("qwen-vision-rename")
mode = first_non_empty(
os.getenv("OPENCLAW_VISION_IMAGE_MODE", ""),
skill_env.get("OPENCLAW_VISION_IMAGE_MODE", ""),
).lower()
if mode in ("auto", "data", "url"):
return mode
return "auto"
def resolve_media_base_url() -> str:
vision_env = get_openclaw_skill_env("qwen-vision-rename")
image_env = get_openclaw_skill_env("qwen-image")
file_value = ""
try:
if PUBLIC_MEDIA_BASE_URL_FILE.is_file():
file_value = PUBLIC_MEDIA_BASE_URL_FILE.read_text(encoding="utf-8").strip()
except Exception:
file_value = ""
return first_non_empty(
os.getenv("OPENCLAW_VISION_IMAGE_BASE_URL", ""),
os.getenv("OPENCLAW_MEDIA_BASE_URL", ""),
vision_env.get("OPENCLAW_VISION_IMAGE_BASE_URL", ""),
vision_env.get("OPENCLAW_MEDIA_BASE_URL", ""),
image_env.get("OPENCLAW_MEDIA_BASE_URL", ""),
file_value,
)
def resolve_media_outbound_dir() -> Path:
vision_env = get_openclaw_skill_env("qwen-vision-rename")
image_env = get_openclaw_skill_env("qwen-image")
raw = first_non_empty(
os.getenv("OPENCLAW_VISION_IMAGE_PUBLISH_DIR", ""),
os.getenv("OPENCLAW_MEDIA_OUTBOUND_DIR", ""),
vision_env.get("OPENCLAW_VISION_IMAGE_PUBLISH_DIR", ""),
vision_env.get("OPENCLAW_MEDIA_OUTBOUND_DIR", ""),
image_env.get("OPENCLAW_MEDIA_OUTBOUND_DIR", ""),
DEFAULT_MEDIA_OUTBOUND_DIR,
)
return Path(raw).expanduser()
def has_public_media_base_url(value: str) -> bool:
if not value:
return False
parsed = urlparse(value)
host = parsed.hostname or ""
if parsed.scheme not in ("http", "https") or not host:
return False
if host == "localhost":
return False
try:
ip = ipaddress.ip_address(host)
return not (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or ip.is_unspecified
)
except ValueError:
return True
def endpoint_requires_public_image_url(base_url: str) -> bool:
host = urlparse(base_url).hostname or ""
return host in {"api.chipltech.com", "openapi.chipltech.com"}
def publish_local_image(path: Path) -> str:
media_base_url = resolve_media_base_url()
if not has_public_media_base_url(media_base_url):
return ""
source_path = path
prepared_path = prepare_image_for_remote_fetch(path)
if prepared_path is not None:
source_path = prepared_path
publish_root = resolve_media_outbound_dir() / "vision-input"
publish_root.mkdir(parents=True, exist_ok=True)
stat = source_path.stat()
mtime_ns = getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1000000000))
digest_src = "{}:{}:{}".format(source_path.resolve(), stat.st_size, mtime_ns)
digest = hashlib.sha1(digest_src.encode("utf-8")).hexdigest()[:10]
stamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
suffix = source_path.suffix.lower() or ".bin"
target_name = "vision-{}-{}{}".format(stamp, digest, suffix)
target_path = publish_root / target_name
if not target_path.exists():
shutil.copy2(str(source_path), str(target_path))
return media_base_url.rstrip("/") + "/vision-input/" + target_name
def prepare_image_for_remote_fetch(path: Path) -> Path:
if Image is None:
return path
try:
with Image.open(str(path)) as img:
img.load()
width, height = img.size
max_edge = max(width, height)
file_size = path.stat().st_size
if file_size <= 700 * 1024 and max_edge <= 1024:
return path
if img.mode not in ("RGB", "L"):
converted = Image.new("RGB", img.size, (255, 255, 255))
converted.paste(img.convert("RGBA"), mask=img.convert("RGBA").split()[-1])
img = converted
elif img.mode == "L":
img = img.convert("RGB")
else:
img = img.copy()
img.thumbnail((1024, 1024), Image.LANCZOS)
cache_root = resolve_media_outbound_dir() / "vision-input-cache"
cache_root.mkdir(parents=True, exist_ok=True)
stat = path.stat()
mtime_ns = getattr(stat, "st_mtime_ns", int(stat.st_mtime * 1000000000))
digest_src = "{}:{}:{}".format(path.resolve(), stat.st_size, mtime_ns)
digest = hashlib.sha1(digest_src.encode("utf-8")).hexdigest()[:16]
optimized_path = cache_root / ("optimized-{}.jpg".format(digest))
if not optimized_path.exists():
img.save(str(optimized_path), format="JPEG", quality=82, optimize=True)
return optimized_path
except Exception:
return path
def load_dotenv_files(paths: List[Path]) -> None:
key_pattern = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
for path in paths:
if not path.is_file():
continue
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[len("export ") :].strip()
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if not key_pattern.match(key):
continue
quoted = len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"')
if quoted:
value = value[1:-1]
elif " #" in value:
value = value.split(" #", 1)[0].rstrip()
if key not in os.environ or not os.environ.get(key, "").strip():
os.environ[key] = value
def resolve_dotenv_paths() -> List[Path]:
candidates = [
Path.cwd() / ".env",
Path(__file__).resolve().parent.parent / ".env",
]
dedup = [] # type: List[Path]
seen = set() # type: Set[str]
for candidate in candidates:
resolved = str(candidate.resolve())
if resolved in seen:
continue
seen.add(resolved)
dedup.append(candidate)
return dedup
def is_http_or_data_url(value: str) -> bool:
lowered = value.lower().strip()
return lowered.startswith("http://") or lowered.startswith("https://") or lowered.startswith("data:image/")
def local_image_to_data_url(path_str: str) -> str:
path = Path(path_str).expanduser().resolve()
if not path.is_file():
raise FileNotFoundError(f"Image file not found: {path}")
mime, _ = mimetypes.guess_type(str(path))
if not mime:
mime = "image/png"
encoded = base64.b64encode(path.read_bytes()).decode("ascii")
return f"data:{mime};base64,{encoded}"
def normalize_image_input(value: str, base_url: str) -> str:
val = value.strip()
if is_http_or_data_url(val):
return val
path = Path(val).expanduser().resolve()
if not path.is_file():
raise FileNotFoundError(f"Image file not found: {path}")
prepared_path = prepare_image_for_remote_fetch(path)
image_mode = resolve_image_mode()
if image_mode == "url":
published_url = publish_local_image(prepared_path)
if published_url:
return published_url
media_base_url = resolve_media_base_url()
if media_base_url and not has_public_media_base_url(media_base_url):
raise RuntimeError(
"Configured OPENCLAW_MEDIA_BASE_URL is not publicly reachable: {}. "
"Use a public http(s) URL or switch back to qwen-vl-max-latest.".format(media_base_url)
)
raise RuntimeError(
"Current vision endpoint requires local images to be reachable by public URL. "
"Configure OPENCLAW_MEDIA_BASE_URL and OPENCLAW_MEDIA_OUTBOUND_DIR, or switch back to qwen-vl-max-latest."
)
return local_image_to_data_url(str(prepared_path))
def parse_openai_content(content: Any) -> str:
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts = [] # type: List[str]
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "text" and isinstance(item.get("text"), str):
parts.append(item["text"])
return "\n".join(parts).strip()
return ""
def call_vision_api(
*,
base_url: str,
api_key: str,
model: str,
image: str,
prompt: str,
timeout: int,
) -> str:
endpoint = base_url.rstrip("/") + "/chat/completions"
payload = {
"model": model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": image}},
],
}
],
"temperature": 0,
}
resp = requests.post(
endpoint,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json=payload,
timeout=timeout,
)
if resp.status_code != 200:
body = resp.text
try:
data = resp.json()
err = data.get("error", {}) if isinstance(data, dict) else {}
message = err.get("message") or data.get("message") or body
code = err.get("code") or data.get("code") or "UnknownError"
raise RuntimeError(f"API error {resp.status_code} [{code}]: {message}")
except ValueError:
raise RuntimeError(f"API error {resp.status_code}: {body}")
data = resp.json()
if isinstance(data, dict) and not data.get("choices") and ("code" in data or "msg" in data or "message" in data):
code = data.get("code", "UnknownError")
message = data.get("msg") or data.get("message") or json.dumps(data, ensure_ascii=False)[:800]
raise RuntimeError("API error [{}]: {}".format(code, message))
choices = data.get("choices")
if not isinstance(choices, list) or not choices:
raise RuntimeError(f"Unexpected response: {json.dumps(data, ensure_ascii=False)[:800]}")
message = choices[0].get("message", {})
text = parse_openai_content(message.get("content"))
if not text:
raise RuntimeError(f"No text content in response: {json.dumps(data, ensure_ascii=False)[:800]}")
return text
def extract_title(raw_text: str, fallback: str) -> str:
text = raw_text.strip()
if not text:
return fallback
line = ""
for row in text.splitlines():
row = row.strip()
if row:
line = row
break
if not line:
line = text
# Remove common wrappers.
line = line.strip().strip("`\"' ")
line = re.sub(r"^(title|filename|name|image description|description)\s*[::-]\s*", "", line, flags=re.I)
# If the model returns markdown list item or prefix markers.
line = re.sub(r"^[\-\*\d\.\)\s]+", "", line)
if not line:
return fallback
return line
def extract_type_and_title(raw_text: str, fallback: str) -> Tuple[str, str]:
text = raw_text.strip()
if not text:
return "", fallback
candidates = [text] # type: List[str]
code_block = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", text, flags=re.I | re.S)
if code_block:
candidates.insert(0, code_block.group(1))
first_obj = re.search(r"\{.*?\}", text, flags=re.S)
if first_obj:
candidates.append(first_obj.group(0))
for candidate in candidates:
try:
data = json.loads(candidate)
if not isinstance(data, dict):
continue
image_type = str(data.get("type", "")).strip()
title = str(data.get("title", "")).strip()
if image_type or title:
return image_type, title or fallback
except Exception:
continue
line = extract_title(text, fallback)
parts = re.split(r"\s*[-—_::]\s*", line, maxsplit=1)
if len(parts) == 2 and parts[0].strip() and parts[1].strip():
return parts[0].strip(), parts[1].strip()
return "", line
def sanitize_filename_stem(value: str, *, max_chars: int) -> str:
stem = value
stem = re.sub(r"[\\/:*?\"<>|\r\n\t]", " ", stem)
stem = re.sub(r"\s+", " ", stem).strip(" ._-")
if not stem:
stem = "untitled"
if max_chars > 0 and len(stem) > max_chars:
stem = stem[:max_chars].rstrip(" ._-")
if not stem:
stem = "untitled"
return stem
def compose_stem(image_type: str, title: str, fallback: str, max_stem_chars: int) -> str:
type_clean = sanitize_filename_stem(image_type, max_chars=10) if image_type else ""
title_clean = sanitize_filename_stem(title, max_chars=max_stem_chars) if title else ""
if type_clean and title_clean:
stem = f"{type_clean}-{title_clean}"
elif title_clean:
stem = title_clean
elif type_clean:
stem = type_clean
else:
stem = sanitize_filename_stem(fallback, max_chars=max_stem_chars)
return sanitize_filename_stem(stem, max_chars=max_stem_chars)
def collect_images(directory: Path, recursive: bool) -> List[Path]:
if recursive:
items = [p for p in directory.rglob("*") if p.is_file() and p.suffix.lower() in IMAGE_EXTENSIONS]
else:
items = [p for p in directory.iterdir() if p.is_file() and p.suffix.lower() in IMAGE_EXTENSIONS]
return sorted(items, key=lambda p: p.name.lower())
def resolve_target_directory(dir_arg: str) -> Path:
if dir_arg.strip():
root = Path(dir_arg).expanduser().resolve()
if not root.is_dir():
raise SystemExit(f"Directory not found: {root}")
return root
candidates = [] # type: List[Path]
env_dir = os.getenv("OPENCLAW_RENAME_DEFAULT_DIR", "").strip()
if env_dir:
candidates.append(Path(env_dir).expanduser())
home = Path.home()
candidates.extend([home / "图片", home / "Pictures"])
for candidate in candidates:
if candidate.is_dir():
return candidate.resolve()
tried = ", ".join(str(path) for path in candidates) if candidates else "(none)"
raise SystemExit(
"Directory not provided and no default image directory found. "
f"Tried: {tried}. Use --dir or set OPENCLAW_RENAME_DEFAULT_DIR."
)
def unique_target_name(stem: str, suffix: str, used_names: Set[str]) -> str:
candidate = f"{stem}{suffix}"
if candidate.lower() not in used_names:
used_names.add(candidate.lower())
return candidate
i = 2
while True:
candidate = f"{stem}-{i:02d}{suffix}"
key = candidate.lower()
if key not in used_names:
used_names.add(key)
return candidate
i += 1
def build_plan(
*,
files: List[Path],
model: str,
base_url: str,
api_key: str,
prompt: str,
timeout: int,
max_stem_chars: int,
fail_fast: bool,
) -> Dict[str, Any]:
used_names = {p.name.lower() for p in files}
items = [] # type: List[Dict[str, Any]]
for file in files:
source = str(file)
suffix = file.suffix.lower() or ".jpg"
try:
image_ref = normalize_image_input(source, base_url)
raw_text = call_vision_api(
base_url=base_url,
api_key=api_key,
model=model,
image=image_ref,
prompt=prompt,
timeout=timeout,
)
image_type, title = extract_type_and_title(raw_text, fallback=file.stem)
stem = compose_stem(image_type, title, file.stem, max_stem_chars=max_stem_chars)
target_name = unique_target_name(stem, suffix, used_names)
target = str(file.with_name(target_name))
action = "rename" if Path(target).name != file.name else "skip_same"
items.append(
{
"source": source,
"target": target,
"action": action,
"type": image_type,
"title": title,
"stem": stem,
"raw": raw_text,
"error": "",
}
)
except Exception as exc:
if fail_fast:
raise
items.append(
{
"source": source,
"target": source,
"action": "skip_error",
"type": "",
"title": file.stem,
"stem": file.stem,
"raw": "",
"error": str(exc),
}
)
rename_count = sum(1 for item in items if item["action"] == "rename")
error_count = sum(1 for item in items if item["action"] == "skip_error")
same_count = sum(1 for item in items if item["action"] == "skip_same")
return {
"created_at": dt.datetime.now().isoformat(),
"model": model,
"base_url": base_url,
"prompt": prompt,
"items": items,
"summary": {
"total": len(items),
"rename": rename_count,
"skip_same": same_count,
"skip_error": error_count,
},
}
def apply_plan(plan: Dict[str, Any]) -> List[Dict[str, str]]:
rollback = [] # type: List[Dict[str, str]]
for item in plan.get("items", []):
if item.get("action") != "rename":
continue
src = Path(item["source"]).expanduser()
dst = Path(item["target"]).expanduser()
if not src.exists():
raise FileNotFoundError(f"Source missing: {src}")
if dst.exists() and dst.resolve() != src.resolve():
raise FileExistsError(f"Target already exists: {dst}")
src.rename(dst)
rollback.append({"from": str(dst), "to": str(src)})
return rollback
def cmd_describe(args: argparse.Namespace) -> int:
load_dotenv_files(resolve_dotenv_paths())
api_key = resolve_runtime_api_key()
if not api_key:
raise SystemExit("Missing DASHSCOPE_API_KEY")
base_url = resolve_runtime_base_url(args.base_url)
model = resolve_runtime_model(args.model)
text = call_vision_api(
base_url=base_url,
api_key=api_key,
model=model,
image=normalize_image_input(args.image, base_url),
prompt=args.prompt,
timeout=args.timeout,
)
image_type, title = extract_type_and_title(text, fallback="untitled")
stem = compose_stem(image_type, title, "untitled", max_stem_chars=args.max_stem_chars)
result = {"image": args.image, "type": image_type, "title": title, "stem": stem, "raw": text, "model": model}
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
def cmd_rename_dir(args: argparse.Namespace) -> int:
load_dotenv_files(resolve_dotenv_paths())
api_key = resolve_runtime_api_key()
if not api_key:
raise SystemExit("Missing DASHSCOPE_API_KEY")
base_url = resolve_runtime_base_url(args.base_url)
model = resolve_runtime_model(args.model)
root = resolve_target_directory(args.dir)
files = collect_images(root, recursive=args.recursive)
if args.limit > 0:
files = files[: args.limit]
if not files:
raise SystemExit("No image files found")
plan = build_plan(
files=files,
model=model,
base_url=base_url,
api_key=api_key,
prompt=args.prompt,
timeout=args.timeout,
max_stem_chars=args.max_stem_chars,
fail_fast=args.fail_fast,
)
stamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
plan_path = Path(args.plan_file).expanduser() if args.plan_file else (root / f"rename-plan-{stamp}.json")
plan_path.write_text(json.dumps(plan, ensure_ascii=False, indent=2), encoding="utf-8")
rollback_path = ""
applied = 0
if args.apply:
rollback = apply_plan(plan)
applied = len(rollback)
rb = {"created_at": dt.datetime.now().isoformat(), "items": rollback}
rb_path = root / f"rename-rollback-{stamp}.json"
rb_path.write_text(json.dumps(rb, ensure_ascii=False, indent=2), encoding="utf-8")
rollback_path = str(rb_path)
output = {
"mode": "apply" if args.apply else "dry-run",
"directory": str(root),
"plan_file": str(plan_path),
"rollback_file": rollback_path,
"summary": plan.get("summary", {}),
"applied": applied,
}
print(json.dumps(output, ensure_ascii=False, indent=2))
return 0
def cmd_rollback(args: argparse.Namespace) -> int:
path = Path(args.rollback_file).expanduser().resolve()
if not path.is_file():
raise SystemExit(f"Rollback file not found: {path}")
data = json.loads(path.read_text(encoding="utf-8"))
items = data.get("items", [])
restored = 0
for item in reversed(items):
src = Path(item["from"]).expanduser()
dst = Path(item["to"]).expanduser()
if not src.exists():
continue
if dst.exists() and dst.resolve() != src.resolve():
raise FileExistsError(f"Rollback target already exists: {dst}")
src.rename(dst)
restored += 1
print(json.dumps({"rollback_file": str(path), "restored": restored}, ensure_ascii=False, indent=2))
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Vision describe and batch rename tool")
parser.add_argument(
"--base-url",
default="",
help="OpenAI-compatible base URL",
)
parser.add_argument(
"--model",
default="",
help="Vision model id",
)
parser.add_argument("--timeout", type=int, default=120, help="HTTP timeout in seconds")
parser.add_argument("--prompt", default=DEFAULT_PROMPT, help="Prompt for image understanding")
parser.add_argument("--max-stem-chars", type=int, default=24, help="Max filename stem chars")
sub = parser.add_subparsers(dest="mode")
p_describe = sub.add_parser("describe", help="Describe one image and output suggested title")
p_describe.add_argument("--image", required=True, help="Image path, url, or data URL")
p_rename = sub.add_parser("rename-dir", help="Batch plan or apply renames for a directory")
p_rename.add_argument("--dir", default="", help="Target directory (optional if default image dir exists)")
p_rename.add_argument("--recursive", action="store_true", help="Recursively scan directory")
p_rename.add_argument("--limit", type=int, default=0, help="Limit number of files (0 = all)")
p_rename.add_argument("--fail-fast", action="store_true", help="Stop on first image API error")
p_rename.add_argument("--apply", action="store_true", help="Apply rename operations")
p_rename.add_argument("--plan-file", default="", help="Optional path for generated plan json")
p_rollback = sub.add_parser("rollback", help="Rollback applied rename using rollback json")
p_rollback.add_argument("--rollback-file", required=True, help="Path to rollback json")
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if not getattr(args, "mode", None):
parser.error("a subcommand is required")
if args.mode == "describe":
return cmd_describe(args)
if args.mode == "rename-dir":
return cmd_rename_dir(args)
if args.mode == "rollback":
return cmd_rollback(args)
raise SystemExit(f"Unknown mode: {args.mode}")
if __name__ == "__main__":
raise SystemExit(main())