@clawhub-juan-oy-bff7fe3c49
Local offline ASR on Windows — no cloud, no API cost, full privacy. Qwen3-ASR 0.6B + Intel OpenVINO, GPU-accelerated inference. NETWORK: required for first-t...
---
name: local-qwen3-asr-aipc
description: >
Local offline ASR on Windows — no cloud, no API cost, full privacy.
Qwen3-ASR 0.6B + Intel OpenVINO, GPU-accelerated inference.
NETWORK: required for first-time setup (install deps + download 2 GB model); NOT required for inference.
Auto-extracts audio from video files (mp4, mkv, webm, mov, avi) — just pass the video path.
Also supports audio: mp3, wav, flac, m4a, ogg, aac, wma, opus.
Single file, batch folder, or watch-mode continuous transcription with automatic txt/json archive.
30 languages + 22 Chinese dialects, auto language detection.
One-line LLM API: from acoustic_pipeline import AcousticPipeline
Local speech to text, transcribe audio, voice recognition, transcribe video, transcribe recording,
convert speech to text, audio transcription, local ASR, offline speech recognition, dictation on Windows.
本地离线语音识别,零云端,支持视频音轨自动提取,支持批量转录和文件夹监听,自动保存转写文档。
网络说明:仅首次安装环境和下载模型时需要网络,推理转录完全离线。
os: windows
requires:
- python>=3.10
- git
network:
setup: required
inference: offline
user-invocable: true
allowed-tools: Bash(python \*), Bash(powershell \*), Read, Write, message
---
# Local Speech Recognition (Windows · Qwen3-ASR · OpenVINO)
**Model**: `snake7gun/Qwen3-ASR-0.6B-fp16-ov` (ModelScope FP16)
**SKILL_VERSION**: 'v1.0.3'
> **First time?** Before using this skill, run these two scripts once in a terminal:
> ```
> python setup.py # [SYSTEM PYTHON OK] creates venv, installs deps (~5 min)
> python download_model.py # [SYSTEM PYTHON OK] downloads the model (~2 GB, resumable)
> ```
> Both scripts are in the skill directory alongside this SKILL.md.
---
## Agent Routing
Always use `acoustic_pipeline.py` as the entry point, called with **VENV_PY** (obtained from `check_env.py` output). It handles all cases:
```powershell
# VENV_PY = value from check_env.py output, e.g. C:\intel_openvino\venv\Scripts\python.exe
# Single file
& "<VENV_PY>" "<skill_dir>\acoustic_pipeline.py" --file "<FILE_PATH>" --language auto
# Single file + save transcript
& "<VENV_PY>" "<skill_dir>\acoustic_pipeline.py" --file "<FILE_PATH>" --language auto --archive json
# Watch folder
& "<VENV_PY>" "<skill_dir>\acoustic_pipeline.py" --watch "<DIR_PATH>" --language auto --archive both
# Batch folder
& "<VENV_PY>" "<skill_dir>\acoustic_pipeline.py" --batch "<DIR_PATH>" --language auto --archive json
```
> **Never run `acoustic_pipeline.py` with system `python`.** It imports model packages (`openvino`, `qwen_asr`) that are only installed in the venv.
Use `transcribe.py` directly only when called internally by `acoustic_pipeline.py` — do not invoke it as a standalone entry point.
---
## Skill Contract (Input / Output)
### Accepted Inputs
Any agent should treat this skill as a local audio/video transcription skill.
1. Single file path
* Audio: .wav, .mp3, .flac, .m4a, .ogg, .aac, .wma, .opus
* Video: .mp4, .mkv, .webm, .flv, .mov, .avi, .mts, .m2ts, .ts, .m3u8
2. Folder path
* Watch mode: continuously process new files in folder
* Batch mode: process existing files in folder recursively
3. Runtime options
* language: auto or explicit language name
* archive: none | txt | json | both
* archive_dir: optional output folder for transcript files
* auto_bootstrap: initialize ASR automatically when environment is missing
### Output On Success
The result should be returned as a JSON object (or equivalent dictionary) with:
* text: transcription content
* language: detected or requested language
* source_file: original input path
* source_format: source extension
* confidence: optional confidence value (if available)
* archive_files: optional object containing txt/json output paths
Example shape:
```json
{
"text": "...",
"language": "Chinese",
"source_file": "C:\\demo\\meeting.mp4",
"source_format": ".mp4",
"confidence": null,
"archive_files": {
"json": "C:\\demo\\transcripts\\meeting_20260326_120000.json"
}
}
```
### Output On Failure
The agent should return a short structured error summary including:
* error: human-readable failure reason
* stage: bootstrap | extract_audio | transcribe | archive
* source_file: input path (if known)
* recoverable: true if retry is reasonable
---
## ⚠️ Agent instructions
1. **Windows / PowerShell only.** Never use Linux commands (`ls`, `rm`, `cat`). Never use `&&` or `call`.
2. **Every step reads `state.json` itself** — do not pass paths between steps manually.
3. **Use `VENV_PY` from state.json for inference calls** — never use system python for inference. `check_env.py` and `setup.py` are the only scripts intentionally run with system python (they create or validate the venv, so they must not depend on it). `check_env.py` validates the *venv's* packages, not system Python's — `PACKAGES_MISSING` from check_env.py means the venv is incomplete, not the system installation.
4. `transcribe.py` is automatically deployed to `ASR_DIR` when `setup.py` runs. If it is missing, re-run `setup.py`.
5. **If `transcribe.py` fails at runtime**, do NOT edit it manually. Re-run `setup.py` to redeploy the original from the repository.
6. **Never diagnose "model issues" without running Step 1's check script first.** The model layout may use a `thinker/` subdirectory — the scripts handle this automatically.
7. **Do not generate helper scripts in agent workspace directories.** Use repository-managed `check_env.py` instead.
8. **Goal**: transcribe the audio file and send the result to the conversation.
**Auto-recovery policy — try before asking user:**
⚠️ Network handling (proxy-aware)
When running setup.py or download_model.py:
1. Try to read system proxy settings:
* Environment variables: HTTP_PROXY / HTTPS_PROXY
* Windows WinHTTP proxy (netsh winhttp show proxy)
2. If proxy is detected:
* Automatically apply it
* Continue installation / download
3. If no proxy is detected:
* Continue normally (direct connection)
4. If download/setup fails:
* Inform the user whether a proxy was detected
* If no proxy was detected, suggest configuring one
* Then retry
IMPORTANT:
* Many networks (corporate / China mainland / campus) require proxy
* Download supports resume — safe to retry after fixing network
* If `STATE=MISSING`, `VENV_PY=BROKEN`, or `PACKAGES_MISSING`: automatically run `setup.py` (up to 3 attempts). Only ask user to run it manually if all 3 attempts fail. `setup.py` is idempotent — it skips any step already completed (venv, packages, repo clone, asr_engine.py, transcribe.py). Re-running a working environment is safe and fast.
* If `MODEL_STATUS=MISSING`: automatically run `download_model.py` (up to 3 attempts). Only ask user to run it manually if all 3 attempts fail or if a single attempt runs longer than 8 minutes without completing (likely a slow connection — download_model.py supports resume so partial progress is not lost). `download_model.py` exits immediately if the model is already complete — re-running the script on an already-downloaded model is safe.
* Always announce what you are doing before each attempt: "⚙️ Automatically setting up the environment (attempt N/3)…"
## Pre-flight: Verify Runtime Requirements (Required on First Use)
> 🔍 Pre-flight: Checking Python and git...
### Check Python Version
```powershell
python --version
```
**Interpretation:**
| Output | Action |
|--------|--------|
| `Python 3.10.x` or higher | `PYTHON_OK` — set `PYTHON_EXE=python`, continue to git check |
| `Python 3.8 / 3.9` | Version too low; upgrade required (see below) |
| `'python' is not recognized...` | Python missing; install required (see below) |
| `Python was not found; run without arguments...` | **Windows Store alias** — run `py --version` instead (see below) |
**If output contains "run without arguments to install from the Microsoft Store"**, the Windows Store App Execution Alias is shadowing the real Python. Do NOT ask the user to change settings, and do NOT write helper scripts. Use `where.exe` (native Windows tool, works in both PowerShell and cmd) to find the real Python:
> ⚠️ **Do NOT write any helper .ps1 scripts. Run the command below directly in the terminal.**
```powershell
where.exe python 2>$null | Where-Object { $_ -notlike "*WindowsApps*" } | Select-Object -First 1
```
* A path is printed → **record this literal string as `SYSTEM_PYTHON`** (this is the system-level Python, used only to run `setup.py`, `check_env.py`, and `download_model.py`). For every command marked `[SYSTEM PYTHON]`, substitute the full literal path for `python`. Example: `python "<skill_dir>\check_env.py"` becomes `"C:\Users\intel\AppData\Local\Programs\Python\Python312\python.exe" "<skill_dir>\check_env.py"`. Do NOT use this path for inference — inference must always use `VENV_PY` (the venv Python path printed by `check_env.py`). Do NOT rely on a `$variable` across tool calls — each call is a new shell process; always embed the literal path directly.
* Nothing printed → Python is not installed — install it (see below).
**If Python is missing or outdated**, run this one-command silent installer in PowerShell (recommended, no admin required):
**```powershell
$f = "$env:TEMP\\python-installer.exe"
Invoke-WebRequest "https://www.python.org/ftp/python/3.12.10/python-3.12.10-amd64.exe" -OutFile $f
Start-Process $f -ArgumentList "/quiet InstallAllUsers=0 PrependPath=1 Include_pip=1" -Wait
Remove-Item $f
```
> `PrependPath=1` adds Python to PATH automatically; `Include_pip=1` installs pip; `InstallAllUsers=0` avoids requiring administrator privileges.
After installation, **restart the terminal**, then run `python --version` and confirm it reports `Python 3.12.x`.
If you prefer manual installation: download **https://www.python.org/ftp/python/3.12.10/python-3.12.10-amd64.exe** and make sure to check **"Add python.exe to PATH"** during setup.
### Check git
```bat
git --version
```
**Interpretation:**
**| Output | Action |**
**|------|------|**
| `git version 2.x.x` | ✅ `GIT_OK`, Pre-flight passed |
| `'git' is not recognized as an internal or external command` | git is not installed; install is required (see below) |
**If git is missing**, run this one-command silent installer in PowerShell:
```powershell
$f = "$env:TEMP\\git-installer.exe"
Invoke-WebRequest "https://github.com/git-for-windows/git/releases/download/v2.49.0.windows.1/Git-2.49.0-64-bit.exe" -OutFile $f
Start-Process $f -ArgumentList "/VERYSILENT /NORESTART /NOCANCEL /SP- /CLOSEAPPLICATIONS /RESTARTAPPLICATIONS /COMPONENTS=icons,ext\\reg\\shellhere,assoc,assoc_sh" -Wait
Remove-Item $f
```
After installation, **restart the terminal**, then run `git --version` to confirm.
If you prefer manual installation: open **https://git-scm.com/download/win**, download the installer, and proceed with default options.
> git is required for the `git+https://` dependency in `requirements_imagegen.txt`; without git, `pip install` will fail with `git: command not found`.
**Pre-flight pass criteria**: `python --version` is >= 3.10 and `git --version` returns a valid version string.
Status message: `✅ Python and git are ready. Starting main workflow.`
**Pipeline — follow exactly in order, no skipping:**
```
Step 0: parse request → AUDIO_PATH, LANGUAGE, TOPIC
Step 1: verify environment → run check_env.py → record VENV_PY and ASR_DIR
↳ if STATE=MISSING or VENV_BROKEN or PACKAGES_MISSING: auto-run setup.py (3 attempts)
↳ if SCRIPTS_STALE=...: auto-run setup.py to redeploy runtime scripts
↳ if MODEL_STATUS=MISSING: auto-run download_model.py (3 attempts)
Step 2: transcribe + send → run acoustic_pipeline.py using VENV_PY from Step 1
↳ fallback: run transcribe.py directly using VENV_PY and ASR_DIR from Step 1
```
\---
## Step 0: parse request (LLM only — no tools)
Extract from the user's message:
|Field|Default|Notes|
|-|-|-|
|`AUDIO_PATH`|required|Absolute path to audio/video file (wav/mp3/flac/m4a/ogg/aac/wma/opus/mp4/mkv/webm/flv/mov/avi/mts/m2ts/ts/m3u8)|
|`LANGUAGE`|auto-detect|Optional: `Chinese`, `English`, `Japanese`, etc.|
|`TOPIC`|English snake_case from context|Used for output filename|
If no audio file provided, ask the user before continuing.
---
## Step 1: verify environment and model
> Step 1/3: checking environment and model...
```powershell
# [SYSTEM PYTHON] check_env.py creates/validates the venv — must NOT use venv python here
python "<skill_dir>\check_env.py"
```
> `check_env.py` and `setup.py` intentionally run with system Python — they create and validate the venv, so they cannot depend on it.
**On success**: record `VENV_PY` and `ASR_DIR` from output, proceed to Step 2.
> `check_env.py` also prints `SCRIPTS_STALE=<old>-><new>` when the deployed `transcribe.py` is outdated.
> If this line appears, treat it as a mandatory **auto-update** — run `setup.py` before Step 2 (see below).
**On failure — auto-recovery (try before asking user):**
### If SCRIPTS_STALE=... → auto-run setup.py to redeploy runtime scripts
`SCRIPTS_STALE` means the venv and model are both OK, but the deployed `transcribe.py` (and/or `asr_engine.py`) in `ASR_DIR` is an older version. `setup.py` is idempotent — it skips the venv, package, and model steps and only redeploys the outdated files.
```
⚙️ Runtime scripts are outdated. Redeploying (attempt 1/3)...
```
```powershell
# [SYSTEM PYTHON] — redeploys transcribe.py and asr_engine.py to ASR_DIR
python "<skill_dir>\setup.py"
```
After running, re-run `check_env.py` to confirm `SCRIPTS_STALE` no longer appears, then proceed to Step 2.
### If STATE=MISSING or VENV_PY=BROKEN or PACKAGES_MISSING → auto-run setup.py
`PACKAGES_MISSING` means the venv exists but required packages (e.g. `openvino`, `qwen_asr`) are not installed. Re-running `setup.py` re-installs only what is missing; it does not recreate the venv or re-clone the repo.
Announce and run (up to 3 attempts):
```
⚙️ Environment is not initialized. Running automatic setup (attempt 1/3)...
```
```powershell
# [SYSTEM PYTHON] setup.py creates the venv — must NOT use venv python here
python "<skill_dir>\setup.py"
```
After each attempt, re-run `check_env.py` to verify. If all 3 attempts fail, show manual fallback below.
### If MODEL_STATUS=MISSING → auto-run download_model.py
Announce and run (up to 3 attempts, stop if a single attempt exceeds 8 minutes):
```
📥 Model not found. Starting automatic download (attempt 1/3)...
Estimated time: ~3 minutes at 100 Mbps, ~5 minutes at 50 Mbps
Download supports resume; rerun safely after interruption.
```
```powershell
# [SYSTEM PYTHON] download_model.py runs before venv — must NOT use venv python here
python "<skill_dir>\download_model.py"
```
After each attempt, re-run `check_env.py` to verify. If all 3 attempts fail, show manual fallback below.
### Manual fallback (only if all 3 auto-attempts fail)
Show user this message:
```
⚠️ Automatic setup failed. Manual steps are required.
Open a Windows terminal (PowerShell or Command Prompt) and run the following in order:
1) Install environment (if not installed yet):
python "<skill_dir>\setup.py" # [SYSTEM PYTHON OK]
Expected duration: about 5 minutes, fully automated.
2) Download model (about 2 GB):
python "<skill_dir>\download_model.py" # [SYSTEM PYTHON OK]
Download supports resume; rerun safely after interruption.
Estimated time: ~3 minutes at 100 Mbps, ~5 minutes at 50 Mbps.
After completion, return here and resend your request.
```
---
## Step 2: transcribe and send result
> Step 2/2: transcribing...
**Preferred path** — run `acoustic_pipeline.py` with the **venv Python** obtained from Step 1:
```powershell
# [VENV PYTHON] VENV_PY = value printed by check_env.py, e.g. C:\intel_openvino\venv\Scripts\python.exe
# Never use system python here — openvino/qwen_asr are only in the venv
& "<VENV_PY>" "<skill_dir>\acoustic_pipeline.py" --file "AUDIO_PATH" --language auto --archive json
```
If the user specified a language:
```powershell
# [VENV PYTHON]
& "<VENV_PY>" "<skill_dir>\acoustic_pipeline.py" --file "AUDIO_PATH" --language "LANGUAGE" --archive json
```
Return the JSON result directly to the conversation and include any `archive_files` paths.
**Fallback** — only if `acoustic_pipeline.py` exits with a non-zero code:
```powershell
# [VENV PYTHON] fallback — both VENV_PY and ASR_DIR come from check_env.py (Step 1)
& "<VENV_PY>" "<ASR_DIR>\transcribe.py" --audio "AUDIO_PATH" --language "LANGUAGE"
```
> Never use system python here.
**Pass**: `$LASTEXITCODE -eq 0`. Stdout is a single line of JSON with this shape:
```json
{"text": "...", "language": "Chinese", "time_elapsed": 12.3, "audio_path": "C:\\audio\\file.wav"}
```
Parse with `$result = $stdout | ConvertFrom-Json`. Record `TRANSCRIPT` from `$result.text` and `LANG` from `$result.language`.
**Fail**: if `$LASTEXITCODE -ne 0`, do not attempt to parse stdout. Show the stderr output to the user.
Send via `message` tool:
```
action: "send" message: "✅ LANG\n\nTRANSCRIPT"
```
---
## Troubleshooting
|Error|Fix|
|-|-|
|`Python was not found; run without arguments...`|Windows Store alias blocking `python`. Run: `where.exe python 2>$null | Where-Object { $_ -notlike "*WindowsApps*" } | Select-Object -First 1`. Record the printed literal path and substitute it for `python` in every subsequent command (do NOT use a `$variable` — each tool call is a new shell).|
|`STATE=MISSING`|Run `python "<skill_dir>\setup.py"`|
|`VENV_PY=BROKEN`|Re-run `python "<skill_dir>\setup.py"` — it will rebuild the venv|
|`PACKAGES_MISSING: ...`|Re-run `python "<skill_dir>\setup.py"` — re-installs missing venv packages only; skips steps already done|
|`MODEL_STATUS=MISSING`|Run `python "<skill_dir>\download_model.py"` — exits immediately if model is already complete|
|`[ERROR] Audio not found`|Verify the file path is correct and the file exists|
|`[ERROR] Model incomplete`|Re-run `python "<skill_dir>\download_model.py"` — supports resume|
|`[ERROR] state.json not found`|Re-run Step 1 (`check_env.py`)|
|`SCRIPTS_STALE=v1.0.1->v1.0.2`|Deployed runtime scripts are outdated. Run `python "<skill_dir>\setup.py"` — it redeploys only the changed files, skipping venv/packages/model (fast).|
|`RuntimeError` on GPU|Run `check_env.py` first to confirm model is READY. If model is OK, try adding `--language auto` to remove language mismatch. If still failing, re-run `setup.py` to upgrade OpenVINO and redeploy runtime files.|
---
## LLM API Usage
For agents that prefer a Python import interface over shell commands, run this inside a subprocess using **VENV_PY**:
```powershell
# Must be run with VENV_PY, not system python
& "<VENV_PY>" -c "
import sys; sys.path.insert(0, r'<skill_dir>')
from acoustic_pipeline import AcousticPipeline
pipeline = AcousticPipeline()
result = pipeline.transcribe(r'C:\\meeting.mp4', language='auto', archive_mode='json')
print(result['text'])
"
```
Or if your agent framework already runs inside the venv Python process:
```python
from acoustic_pipeline import AcousticPipeline
pipeline = AcousticPipeline()
result = pipeline.transcribe("C:\\meeting.mp4", language="auto", archive_mode="json")
print(result["text"])
print(result.get("archive_files"))
```
---
## Workspace Hygiene
* Use repository-managed `check_env.py` — it is versioned, auditable, and repeatable.
* If the execution environment forces a temporary file, treat it as disposable and remove it after the command completes.
FILE:acoustic_pipeline.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Acoustic Pipeline - Enhanced ASR processing pipeline
Built on top of the Qwen3-ASR skill:
1. Automatic audio extraction from video
2. Multi-format support (MP4, MP3, WAV, etc.)
3. Automated processing (folder watch or batch)
4. Direct LLM callable API
Usage:
python acoustic_pipeline.py --file "audio.mp4" --language Chinese
python acoustic_pipeline.py --watch "C:\\inbox"
python acoustic_pipeline.py --batch "C:\\audio\\library"
"""
import json
import subprocess
import sys
import argparse
import os
import importlib.util
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
TRANSCRIBE_SUBPROCESS_TIMEOUT = 3600
class AcousticPipeline:
"""Audio/video transcription pipeline."""
# Supported formats
AUDIO_FORMATS = {'.wav', '.mp3', '.flac', '.m4a', '.ogg', '.aac', '.wma', '.opus'}
VIDEO_FORMATS = {'.mp4', '.mkv', '.webm', '.flv', '.mov', '.avi', '.mts', '.m2ts', '.ts', '.m3u8'}
ALL_FORMATS = AUDIO_FORMATS | VIDEO_FORMATS
# Class-level cache to avoid repeated environment checks
_cached_state = None
_cached_venv_py = None
_cache_timestamp = 0
# Class-level cache for the transcribe module so _MODEL_CACHE survives across instances
_transcribe_module = None
_transcribe_module_path = None
def __init__(self, asr_skill_dir: Optional[str] = None, auto_bootstrap: bool = False):
"""
Initialize the pipeline.
Args:
asr_skill_dir: ASR skill directory (default: auto-detect *_openvino/asr)
auto_bootstrap: run setup.py + download_model.py automatically when env is missing
"""
if asr_skill_dir:
self.asr_dir = Path(asr_skill_dir)
else:
# auto-scan all drives for *_openvino/asr
self.asr_dir = self._find_openvino_asr_dir() or Path.cwd()
# use cache or locate state file
self.state_file = self._find_state_json_cached()
self.venv_py = self._find_venv_python_cached()
self.runtime_asr_dir = self.asr_dir
if self.state_file:
try:
state = json.loads(self.state_file.read_text(encoding="utf-8"))
asr_runtime = Path(state.get("ASR_DIR", ""))
if asr_runtime.exists():
self.runtime_asr_dir = asr_runtime
except Exception:
pass
self.transcribe_py = self.runtime_asr_dir / 'transcribe.py'
if not self.transcribe_py.exists():
local_transcribe = self.asr_dir / 'transcribe.py'
if local_transcribe.exists():
self.transcribe_py = local_transcribe
if not self.transcribe_py.exists() and auto_bootstrap:
self._bootstrap_asr_skill()
# re-locate after bootstrap (state.json may now exist)
self.state_file = self._find_state_json()
self.venv_py = self._find_venv_python()
self.runtime_asr_dir = self.asr_dir
if self.state_file:
try:
state = json.loads(self.state_file.read_text(encoding="utf-8"))
asr_runtime = Path(state.get("ASR_DIR", ""))
if asr_runtime.exists():
self.runtime_asr_dir = asr_runtime
except Exception:
pass
self.transcribe_py = self.runtime_asr_dir / 'transcribe.py'
if not self.transcribe_py.exists():
raise FileNotFoundError(
"transcribe.py not found. Ensure ASR is initialized or use --auto-bootstrap.\n"
f"Checked: {self.runtime_asr_dir / 'transcribe.py'}"
)
# prefer in-process call to avoid subprocess overhead
self._transcribe_fn = self._load_transcribe_callable()
def _load_transcribe_callable(self):
"""Load the transcribe() function from transcribe.py.
Module is cached at class level so _MODEL_CACHE survives across instances.
Returns None on failure; caller falls back to subprocess.
"""
transcribe_path = str(self.transcribe_py)
# reuse cached module (preserves _MODEL_CACHE in its globals)
if (AcousticPipeline._transcribe_module is not None and
AcousticPipeline._transcribe_module_path == transcribe_path):
fn = getattr(AcousticPipeline._transcribe_module, "transcribe", None)
return fn if callable(fn) else None
try:
module_name = f"local_transcribe_{abs(hash(transcribe_path))}"
spec = importlib.util.spec_from_file_location(module_name, transcribe_path)
if spec is None or spec.loader is None:
return None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
AcousticPipeline._transcribe_module = module
AcousticPipeline._transcribe_module_path = transcribe_path
fn = getattr(module, "transcribe", None)
return fn if callable(fn) else None
except Exception as e:
print(f"[DEBUG] Direct transcribe load failed, fallback to subprocess: {e}", file=sys.stderr)
return None
def _find_venv_python_cached(self) -> Path:
"""Find venv Python with 5-minute cache."""
import time
current_time = time.time()
# use cache if fresh (< 5 min)
if (self._cached_venv_py and
current_time - self._cache_timestamp < 300):
return self._cached_venv_py
result = self._find_venv_python()
self._cached_venv_py = result
self._cache_timestamp = current_time
return result
def _find_state_json_cached(self) -> Optional[Path]:
"""Find state.json with 5-minute cache."""
import time
current_time = time.time()
# sentinel 'NOT_FOUND' means we already checked and found nothing
if (self._cached_state is not None and
current_time - self._cache_timestamp < 300):
return self._cached_state if self._cached_state != 'NOT_FOUND' else None
result = self._find_state_json()
self._cached_state = result if result else 'NOT_FOUND'
self._cache_timestamp = current_time
return result
def _find_venv_python(self) -> Path:
"""Locate the venv Python executable."""
# strategy 1: read from state.json
state_file = self._find_state_json()
if state_file:
try:
state = json.loads(state_file.read_text(encoding="utf-8"))
venv_py_str = state.get('VENV_PY', '')
if venv_py_str:
venv_py = Path(venv_py_str)
if venv_py.exists():
return venv_py
else:
# path may be stale, try next strategy
print(f"[DEBUG] VENV_PY from state.json not found: {venv_py_str}", file=sys.stderr)
except Exception as e:
print(f"[DEBUG] Failed to read state.json: {e}", file=sys.stderr)
# strategy 2: common locations
username = os.environ.get('USERNAME', 'user').lower()
common_paths = [
# user's *_openvino directory
Path.home() / f"{username}_openvino" / "venv" / "Scripts" / "python.exe",
Path(f"C:\\{username}_openvino\\venv\\Scripts\\python.exe"),
# current ASR directory
self.asr_dir / "venv" / "Scripts" / "python.exe",
]
for path in common_paths:
if path.exists():
print(f"[DEBUG] Found python.exe at: {path}", file=sys.stderr)
return path
# strategy 3: scan all drives for *_openvino/venv
import string
for drive in string.ascii_uppercase:
base_path = Path(f"{drive}:\\")
if not base_path.exists():
continue
try:
for item in base_path.iterdir():
if item.is_dir() and "_openvino" in item.name:
venv_py = item / "venv" / "Scripts" / "python.exe"
if venv_py.exists():
print(f"[DEBUG] Found python.exe in {item.name}: {venv_py}")
return venv_py
except PermissionError:
continue
# fallback: system Python
print(f"[DEBUG] Falling back to system python.exe: {sys.executable}")
return Path(sys.executable)
def _find_openvino_asr_dir(self) -> Optional[Path]:
"""Scan all drives for a *_openvino/asr directory."""
import string
for drive in string.ascii_uppercase:
base_path = Path(f"{drive}:\\")
if not base_path.exists():
continue
# look for *_openvino/asr under this drive
try:
for item in base_path.iterdir():
if item.is_dir() and "_openvino" in item.name:
asr_candidate = item / "asr"
if asr_candidate.exists():
return asr_candidate
except PermissionError:
continue
return None
def _find_state_json(self) -> Optional[Path]:
"""Locate state.json."""
import string
username = os.environ.get('USERNAME', 'user').lower()
for drive in string.ascii_uppercase:
state_file = Path(f"{drive}:\\{username}_openvino\\asr\\state.json")
if state_file.exists():
return state_file
local_state = self.asr_dir / "state.json"
if local_state.exists():
return local_state
return None
def _run_bootstrap_script(self, script_name: str):
"""Run a bootstrap script (setup.py or download_model.py)."""
script_path = self.asr_dir / script_name
if not script_path.exists():
raise FileNotFoundError(f"Bootstrap script not found: {script_path}")
cmd = [str(Path(sys.executable)), str(script_path)]
# run in the ASR directory so generated files land in the right place
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8',
timeout=1800,
cwd=str(self.asr_dir) # run inside ASR dir
)
if result.returncode != 0:
raise RuntimeError(
f"{script_name} failed:\n{result.stderr or result.stdout}"
)
def _bootstrap_asr_skill(self):
"""Auto-initialize ASR environment and model when not yet set up."""
print("[INFO] ASR not initialized - running setup.py ...")
self._run_bootstrap_script("setup.py")
print("[INFO] Running download_model.py ...")
self._run_bootstrap_script("download_model.py")
def _archive_transcript(
self,
source_file: Path,
result: Dict[str, Any],
archive_mode: str = "none",
archive_dir: Optional[str] = None,
) -> Dict[str, str]:
"""Save transcript in the requested format(s)."""
if archive_mode == "none":
return {}
out_dir = Path(archive_dir) if archive_dir else source_file.parent / "transcripts"
out_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = f"{source_file.stem}_{timestamp}"
saved = {}
if archive_mode in {"txt", "both"}:
txt_path = out_dir / f"{base_name}.txt"
txt_path.write_text(str(result.get("text", "")), encoding="utf-8")
saved["txt"] = str(txt_path)
if archive_mode in {"json", "both"}:
json_path = out_dir / f"{base_name}.json"
json_path.write_text(
json.dumps(result, ensure_ascii=False, indent=2),
encoding="utf-8",
)
saved["json"] = str(json_path)
return saved
def _is_video(self, file_path: Path) -> bool:
"""Return True if file_path is a supported video format."""
return file_path.suffix.lower() in self.VIDEO_FORMATS
def _is_audio(self, file_path: Path) -> bool:
"""Return True if file_path is a supported audio format."""
return file_path.suffix.lower() in self.AUDIO_FORMATS
def _extract_audio_from_video(self, video_path: Path, output_wav: Optional[Path] = None) -> Path:
"""
Extract audio track from a video file.
Args:
video_path: path to the video file
output_wav: output WAV path (auto-generated if None)
Returns:
path to the extracted WAV file
"""
if output_wav is None:
output_wav = video_path.parent / f"{video_path.stem}_audio.wav"
print(f" [VIDEO] Extracting audio: {video_path.name}...")
# try ffmpeg first (most reliable)
try:
cmd = [
'ffmpeg',
'-i', str(video_path),
'-q:a', '9',
'-n', # do not overwrite
str(output_wav)
]
result = subprocess.run(cmd, capture_output=True, timeout=300)
if result.returncode == 0 and output_wav.exists():
print(f" [OK] Audio extracted: {output_wav.name}")
return output_wav
except:
pass
# fallback: moviepy (slower but reliable)
try:
from moviepy.editor import VideoFileClip
print(f" [INFO] Using moviepy (may be slower)...")
clip = VideoFileClip(str(video_path))
if clip.audio is None:
raise ValueError("Video has no audio track")
clip.audio.write_audiofile(str(output_wav), verbose=False, logger=None)
clip.close()
print(f" [OK] Audio extracted: {output_wav.name}")
return output_wav
except ImportError:
raise RuntimeError(
"ffmpeg or moviepy required for video audio extraction.\n"
"Install: pip install moviepy\n"
"Or download ffmpeg: https://ffmpeg.org/download.html"
)
except Exception as e:
raise RuntimeError(f"Audio extraction failed: {e}")
def transcribe(
self,
file_path: str,
language: str = "auto",
keep_extracted: bool = False,
archive_mode: str = "none",
archive_dir: Optional[str] = None,
) -> Dict[str, Any]:
"""
Transcribe an audio or video file.
Args:
file_path: path to the file
language: language hint (default: auto-detect)
keep_extracted: keep the extracted WAV when input is video
archive_mode: save format (none/txt/json/both)
archive_dir: output directory for saved transcripts (default: transcripts/ beside source)
Returns:
transcription result dict
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
print(f"\n[ASR] Transcribing: {file_path.name}")
# prepare audio file
audio_file = file_path
if self._is_video(file_path):
audio_file = self._extract_audio_from_video(file_path)
elif not self._is_audio(file_path):
raise ValueError(f"Unsupported file format: {file_path.suffix}")
# prefer in-process call to reduce startup overhead
print(f" [INFO] Transcribing...")
transcription = None
try:
if self._transcribe_fn is not None:
raw = self._transcribe_fn(str(audio_file), language, "", None)
if isinstance(raw, dict):
transcription = raw
elif isinstance(raw, str):
transcription = json.loads(raw)
else:
raise RuntimeError(f"transcribe returned unsupported type: {type(raw).__name__}")
if transcription is None:
cmd = [
str(self.venv_py),
str(self.transcribe_py),
'--audio', str(audio_file),
'--language', language
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding='utf-8',
timeout=TRANSCRIBE_SUBPROCESS_TIMEOUT,
)
if result.returncode != 0:
print(f"[DEBUG] stderr: {result.stderr}", file=sys.stderr)
raise RuntimeError(f"Transcription failed: {result.stderr}")
stdout_content = result.stdout.strip()
transcription = json.loads(stdout_content)
# append metadata
transcription['source_file'] = str(file_path)
transcription['source_format'] = file_path.suffix
# archive output when requested
archived = self._archive_transcript(
source_file=file_path,
result=transcription,
archive_mode=archive_mode,
archive_dir=archive_dir,
)
if archived:
transcription['archive_files'] = archived
print(f" [SAVED] Saved to: {', '.join(archived.values())}")
# clean up temporary extracted audio
if not keep_extracted and audio_file != file_path:
try:
audio_file.unlink()
except:
pass
print(f" [OK] Done")
return transcription
except subprocess.TimeoutExpired:
raise RuntimeError("Transcription timed out (file too large or system busy)")
except Exception as e:
raise RuntimeError(f"Transcription error: {e}")
def batch_transcribe(
self,
folder_path: str,
language: str = "auto",
archive_mode: str = "none",
archive_dir: Optional[str] = None,
):
"""
Batch-transcribe all audio/video files in a folder.
Args:
folder_path: path to the folder
language: language hint
"""
folder = Path(folder_path)
files = []
for ext in self.ALL_FORMATS:
files.extend(folder.rglob(f'*{ext}'))
files.extend(folder.rglob(f'*{ext.upper()}'))
print(f"\n[INFO] Found {len(set(files))} audio/video files")
for idx, file_path in enumerate(sorted(set(files)), 1):
print(f"\n[{idx}/{len(set(files))}]", end=" ")
try:
result = self.transcribe(
str(file_path),
language,
archive_mode=archive_mode,
archive_dir=archive_dir,
)
print(f"Transcribed: {result['text'][:50]}...")
except Exception as e:
print(f"[ERROR] {e}")
def watch_folder(
self,
folder_path: str,
language: str = "auto",
archive_mode: str = "none",
archive_dir: Optional[str] = None,
):
"""
Watch a folder and auto-transcribe new files.
Args:
folder_path: folder to watch
language: language hint
"""
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent
except ImportError:
raise RuntimeError("watchdog required: pip install watchdog")
folder = Path(folder_path)
processed = set()
pipeline_self = self
supported_formats = self.ALL_FORMATS
class AudioHandler(FileSystemEventHandler):
def on_created(self, event: FileCreatedEvent):
if event.is_directory:
return
file_path = Path(event.src_path)
if file_path.suffix.lower() not in supported_formats:
return
# simple file-completion wait
import time
time.sleep(1)
file_key = str(file_path.resolve())
if file_key in processed:
return
processed.add(file_key)
try:
result = pipeline_self.transcribe(
str(file_path),
language,
archive_mode=archive_mode,
archive_dir=archive_dir,
)
print(f"Transcribed: {result.get('text', '')[:80]}...")
except Exception as e:
print(f"[ERROR] {e}")
print(f"[ASR] Watching: {folder}")
print("Press Ctrl+C to stop")
handler = AudioHandler()
observer = Observer()
observer.schedule(handler, str(folder), recursive=True)
observer.start()
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
print("\nStopping watcher")
observer.stop()
observer.join()
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description='Audio/video transcription pipeline',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
python acoustic_pipeline.py --file meeting.mp4 --language Chinese
python acoustic_pipeline.py --watch "C:\\inbox"
python acoustic_pipeline.py --batch "C:\\audio"
'''
)
parser.add_argument('--file', help='transcribe a single file')
parser.add_argument('--watch', help='watch a folder and auto-transcribe new files')
parser.add_argument('--batch', help='batch-transcribe all files in a folder')
parser.add_argument('--language', default='auto', help='language (default: auto-detect)')
parser.add_argument('--dir', help='ASR skill directory (default: auto-detect)')
parser.add_argument('--keep-audio', action='store_true', help='keep extracted audio file')
parser.add_argument('--archive', choices=['none', 'txt', 'json', 'both'], default='none', help='transcript archive format')
parser.add_argument('--archive-dir', help='archive directory (default: transcripts/ beside source)')
parser.add_argument('--auto-bootstrap', action='store_true', help='auto-run setup.py + download_model.py when env is missing')
args = parser.parse_args()
try:
pipeline = AcousticPipeline(args.dir, auto_bootstrap=args.auto_bootstrap)
if args.file:
result = pipeline.transcribe(
args.file,
args.language,
args.keep_audio,
archive_mode=args.archive,
archive_dir=args.archive_dir,
)
print("\n[RESULT] Transcription:")
print(result['text'])
print(f"\n[INFO] Metadata: {json.dumps({k: v for k, v in result.items() if k != 'text'}, ensure_ascii=False, indent=2)}")
elif args.watch:
pipeline.watch_folder(
args.watch,
args.language,
archive_mode=args.archive,
archive_dir=args.archive_dir,
)
elif args.batch:
pipeline.batch_transcribe(
args.batch,
args.language,
archive_mode=args.archive,
archive_dir=args.archive_dir,
)
else:
parser.print_help()
except Exception as e:
print(f"[ERROR] {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
FILE:check_env.py
#!/usr/bin/env python
"""Check runtime environment and model availability for the ASR skill."""
import json
import os
import re
import string
import subprocess
import sys
from pathlib import Path
from threading import Thread
def _read_skill_version() -> str:
"""Read SKILL_VERSION from SKILL.md next to this script."""
try:
md = Path(__file__).parent / "SKILL.md"
m = re.search(r"\*\*SKILL_VERSION\*\*[^'\"]*['\"]([^'\"]+)['\"]", md.read_text(encoding="utf-8"))
return m.group(1) if m else "unknown"
except Exception:
return "unknown"
def find_state() -> dict | None:
"""Locate state.json across all drives."""
username = os.environ.get("USERNAME", "user").lower()
for drive in string.ascii_uppercase:
state_file = Path(f"{drive}:\\") / f"{username}_openvino" / "asr" / "state.json"
if state_file.exists():
try:
return json.loads(state_file.read_text(encoding="utf-8"))
except Exception:
pass
return None
def model_layout_ready(model_dir: Path) -> bool:
"""Check model completeness by stat-ing key files (no recursive scan)."""
thinker = model_dir / "thinker"
def ok(items: list[tuple[Path, float]]) -> bool:
return all(
p.exists() and p.stat().st_size / 1024**2 >= min_mb
for p, min_mb in items
)
thinker_layout = [
(thinker / "openvino_thinker_language_model.bin", 1000), # actual ~1126 MB
(thinker / "openvino_thinker_audio_encoder_model.bin", 300), # actual ~342 MB
(thinker / "openvino_thinker_audio_model.bin", 18), # actual ~21 MB
(thinker / "openvino_thinker_embedding_model.bin", 270), # actual ~303 MB
(model_dir / "config.json", 0.001),
]
root_layout = [
(model_dir / "openvino_language_model.bin", 800),
(model_dir / "openvino_audio_encoder_model.bin", 50),
(model_dir / "config.json", 0.001),
]
return ok(thinker_layout) or ok(root_layout)
def check_venv_async(venv_py: Path, result_holder: list):
"""Validate the venv in a background thread: Python works + key packages present + native intact."""
try:
# 1. check Python itself is executable
proc = subprocess.run(
[str(venv_py), "--version"],
capture_output=True,
timeout=5,
)
if proc.returncode != 0:
result_holder.append(False)
return
# 2. check packages exist + locate site-packages for native check (instant, no import)
check_script = (
"import importlib.util, sys, pathlib; "
"missing = [p for p in ['openvino','soundfile','qwen_asr'] if importlib.util.find_spec(p) is None]; "
"print(pathlib.Path(importlib.util.find_spec('openvino').origin).parent) if not missing else None; "
"sys.exit(len(missing))"
)
proc2 = subprocess.run(
[str(venv_py), "-c", check_script],
capture_output=True,
timeout=10,
)
if proc2.returncode != 0:
result_holder.append("PACKAGES_MISSING: openvino/soundfile/qwen_asr not all installed")
return
# 3. verify openvino core native library is present and has reasonable size (>10 MB)
# catches partial/corrupt installs that find_spec alone cannot detect
ov_pkg_dir = Path(proc2.stdout.decode().strip())
ov_core_dll = ov_pkg_dir / "libs" / "openvino.dll"
if not ov_core_dll.exists() or ov_core_dll.stat().st_size < 10 * 1024 * 1024:
result_holder.append("PACKAGES_MISSING: openvino native library incomplete or missing")
return
result_holder.append(True)
except Exception as e:
result_holder.append(False)
def main() -> int:
state = find_state()
if not state:
print("STATE=MISSING")
return 1
venv_py = Path(state["VENV_PY"])
asr_dir = Path(state["ASR_DIR"])
model_dir = asr_dir / "Qwen3-ASR-0.6B-fp16-ov"
# start venv thread first, then check model — both run in parallel
venv_ok: list[bool] = []
venv_thread = Thread(target=check_venv_async, args=(venv_py, venv_ok), daemon=True)
venv_thread.start()
# main thread: stat key files only, no recursive scan
model_ready = model_layout_ready(model_dir)
# wait for venv result — timeout must exceed max subprocess time (5 + 10 = 15s)
venv_thread.join(timeout=20)
if not venv_ok or not venv_ok[0]:
if venv_ok and isinstance(venv_ok[0], str):
print(venv_ok[0]) # print PACKAGES_MISSING: ... details
else:
print("VENV_PY=BROKEN")
return 1
if not model_ready:
print("MODEL_STATUS=MISSING")
return 1
print(f"VENV_PY={venv_py}")
print(f"ASR_DIR={asr_dir}")
print("MODEL_STATUS=READY")
# version staleness check — both deployed runtime scripts must match SKILL.md
skill_ver = _read_skill_version()
deployed_versions = []
transcribe_dst = asr_dir / "transcribe.py"
if transcribe_dst.exists():
try:
m = re.search(
r'SKILL_VERSION\s*=\s*["\'](.+?)["\']',
transcribe_dst.read_text(encoding="utf-8", errors="ignore"),
)
if m:
deployed_versions.append(m.group(1))
except Exception:
pass
engine_dst = asr_dir / "asr_engine.py"
if engine_dst.exists():
try:
m = re.search(
r'SKILL_VERSION\s*=\s*["\'](.+?)["\']',
engine_dst.read_text(encoding="utf-8", errors="ignore"),
)
if m:
deployed_versions.append(m.group(1))
except Exception:
pass
if skill_ver != "unknown":
mismatched = [ver for ver in deployed_versions if ver != skill_ver]
if mismatched or len(deployed_versions) < 2:
current = ",".join(deployed_versions) if deployed_versions else "unknown"
print(f"SCRIPTS_STALE={current}->{skill_ver}")
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:download_model.py
"""
download_model.py — Download Qwen3-ASR-0.6B-fp16-ov from ModelScope.
Run once from a terminal (NOT inside OpenClaw):
python download_model.py
- Downloads ~2 GB to your local {USERNAME}_openvino\\asr\\ directory
- Resume supported: safe to Ctrl+C and re-run, will continue from where it stopped
- Run setup.py first if you haven't already
"""
import json, os, string, subprocess, sys
from pathlib import Path
MODEL_ID = "snake7gun/Qwen3-ASR-0.6B-fp16-ov"
TOTAL_GB = 2.0
# ── Find state.json ────────────────────────────────────────
def find_state():
for d in string.ascii_uppercase:
sf = Path(f"{d}:\\") / f"{os.environ.get('USERNAME', 'user').lower()}_openvino" / "asr" / "state.json"
if sf.exists():
return json.loads(sf.read_text(encoding="utf-8"))
return None
state = find_state()
if not state:
print("[ERROR] state.json not found.")
print(" Please run setup.py first:")
print(f" python \"{Path(__file__).parent / 'setup.py'}\"")
sys.exit(1)
venv_py = Path(state["VENV_PY"])
if not venv_py.exists():
print(f"[ERROR] venv not found at {venv_py}")
print(" Please re-run setup.py.")
sys.exit(1)
# ── Self-relaunch inside venv ──────────────────────────────
# This ensures modelscope and all deps are available without a nested subprocess.
if Path(sys.executable).resolve() != venv_py.resolve():
print(f"[INFO] Switching to venv python: {venv_py}")
result = subprocess.run([str(venv_py), str(Path(__file__).resolve())])
sys.exit(result.returncode)
# ─────────────────────────────────────────────────────────────
# From here on we are running inside the venv
# ─────────────────────────────────────────────────────────────
import threading, time
from modelscope import snapshot_download
asr_dir = Path(state["ASR_DIR"])
model_dir = asr_dir / "Qwen3-ASR-0.6B-fp16-ov"
# ── Check if already complete ──────────────────────────────
def get_size_gb(path):
if not path.exists():
return 0.0
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) / 1024**3
def model_complete():
thinker = model_dir / "thinker"
layout_thinker = [
(thinker / "openvino_thinker_language_model.bin", 800),
(thinker / "openvino_thinker_audio_encoder_model.bin", 50),
(thinker / "openvino_thinker_audio_model.bin", 0.1),
(thinker / "openvino_thinker_embedding_model.bin", 50),
(model_dir / "config.json", 0.001),
]
layout_root = [
(model_dir / "openvino_language_model.bin", 800),
(model_dir / "config.json", 0.001),
]
def ok(lst):
return all(p.exists() and p.stat().st_size / 1024**2 >= mb for p, mb in lst)
return ok(layout_thinker) or ok(layout_root)
print("=" * 55)
print(" local-asr-qwen3 · Model Download")
print("=" * 55)
print(f"\n Model dir: {model_dir}")
if model_complete():
gb = get_size_gb(model_dir)
print(f"\n Model already complete ({gb:.2f} GB) [OK]")
print(" You can use the skill right away.")
sys.exit(0)
existing_gb = get_size_gb(model_dir)
if existing_gb > 0.01:
print(f"\n Resuming download — {existing_gb:.2f} GB already on disk")
else:
print(f"\n Starting fresh download (~{TOTAL_GB} GB)")
print(f"""
Estimated time:
100 Mbps → ~3 min
50 Mbps → ~5 min
10 Mbps → ~40 min
Progress updates every 30 seconds.
Safe to Ctrl+C and re-run — download will resume.
""")
model_dir.mkdir(parents=True, exist_ok=True)
# ── Download with progress watchdog ───────────────────────
_stop = threading.Event()
t0 = time.time()
def watchdog():
prev = existing_gb * 1024**3
print("[Progress] Download started...", flush=True)
while not _stop.wait(30):
try:
total = sum(f.stat().st_size for f in model_dir.rglob("*") if f.is_file())
now = time.time()
speed = (total - prev) / 30
pct = min(total / (TOTAL_GB * 1024**3) * 100, 99.9)
elapsed = now - t0
eta = (TOTAL_GB * 1024**3 - total) / speed if speed > 0 else 0
print(
f"[Progress] {total/1024**3:.2f}/{TOTAL_GB:.1f} GB "
f"{pct:.1f}% {speed/1024**2:.1f} MB/s "
f"elapsed {int(elapsed//60)}m{int(elapsed%60):02d}s "
f"ETA ~{int(eta//60)}m{int(eta%60):02d}s",
flush=True
)
prev = total
except Exception:
pass
threading.Thread(target=watchdog, daemon=True).start()
try:
snapshot_download(MODEL_ID, local_dir=str(model_dir))
_stop.set()
if model_complete():
gb = get_size_gb(model_dir)
print(f"\n{'='*55}")
print(f" Download complete! ({gb:.2f} GB) [OK]")
print(f" You can now use the ASR skill in OpenClaw.")
print(f"{'='*55}")
else:
print("\n[WARN] Download finished but model appears incomplete.")
print(" Re-run this script to resume.")
except KeyboardInterrupt:
_stop.set()
print("\n[INFO] Download interrupted.")
print(" Re-run this script to continue from where it stopped.")
except Exception as e:
_stop.set()
err = str(e).lower()
if any(x in err for x in ["disk", "space"]):
print(f"\n[ERROR] Disk full: {e}")
print(" Free up space and re-run.")
elif any(x in err for x in ["timeout", "connection", "network"]):
print(f"\n[ERROR] Network error: {e}")
print(" Check your connection and re-run.")
print(f" Manual download: https://modelscope.cn/models/{MODEL_ID}/files")
print(f" Place files under: {model_dir}")
else:
print(f"\n[ERROR] {e}")
print(" Re-run to retry.")
print(f" Manual download: https://modelscope.cn/models/{MODEL_ID}/files")
print(f" Place files under: {model_dir}")
FILE:setup.py
"""
setup.py — One-time environment setup for local-asr-qwen3 skill.
Run once from a terminal:
python setup.py
What this does:
1. Creates a shared Python venv under {USERNAME}_openvino\venv\
2. Installs all required packages into the venv
3. Clones the Qwen3-ASR repo at the pinned commit
4. Writes state.json so the skill knows where everything is
After this, run:
python download_model.py
"""
import json, os, re, shutil, subprocess, sys
from pathlib import Path
def _read_skill_version():
"""Read SKILL_VERSION from SKILL.md — single source of truth for all deployed files."""
try:
md = Path(__file__).parent / "SKILL.md"
m = re.search(r"\*\*SKILL_VERSION\*\*[^'\"]*['\"]([^'\"]+)['\"]", md.read_text(encoding='utf-8'))
return m.group(1) if m else "unknown"
except Exception:
return "unknown"
QWEN_ASR_REPO = "https://github.com/QwenLM/Qwen3-ASR.git"
QWEN_ASR_COMMIT = "c17a131fe028b2e428b6e80a33d30bb4fa57b8df"
PACKAGES = [
"openvino>=2025.4",
"numpy<2.0",
"librosa",
"transformers",
"huggingface_hub",
"accelerate",
"soundfile",
"scipy",
"modelscope",
]
# ── Banner ─────────────────────────────────────────────────
print("=" * 55)
print(" local-asr-qwen3 · Environment Setup")
print("=" * 55)
# ── Locate root directory ──────────────────────────────────
username = os.environ.get("USERNAME", "user").lower()
root_name = f"{username}_openvino"
drives = [f"{d}:\\" for d in ("C", "D") if Path(f"{d}:\\").exists()]
root = next(
(Path(d) / root_name for d in drives if (Path(d) / root_name).exists()),
None
)
if not root:
best = max(drives, key=lambda d: shutil.disk_usage(d).free)
root = Path(best) / root_name
asr_dir = root / "asr"
venv_dir = root / "venv"
venv_py = venv_dir / "Scripts" / "python.exe"
root.mkdir(parents=True, exist_ok=True)
asr_dir.mkdir(parents=True, exist_ok=True)
print(f"\n Root: {root}")
print(f" ASR: {asr_dir}")
print(f" Venv: {venv_dir}")
# ── Create or validate venv ────────────────────────────────
print("\n[1/4] Checking venv...")
venv_ok = False
if venv_py.exists():
try:
r = subprocess.run([str(venv_py), "--version"], capture_output=True, timeout=10)
if r.returncode == 0:
print(f" Existing venv OK: {r.stdout.decode().strip()}")
venv_ok = True
except Exception:
pass
if not venv_ok:
if venv_dir.exists():
print(" Existing venv is broken — rebuilding...")
shutil.rmtree(venv_dir, ignore_errors=True)
print(" Creating venv...")
subprocess.run([sys.executable, "-m", "venv", str(venv_dir)], check=True)
venv_py = venv_dir / "Scripts" / "python.exe"
r = subprocess.run([str(venv_py), "--version"], capture_output=True)
print(f" Venv created: {r.stdout.decode().strip()} [OK]")
def venv_run(args, **kw):
return subprocess.run([str(venv_py)] + args, **kw)
# ── Upgrade pip ────────────────────────────────────────────
print("\n[2/4] Upgrading pip...")
venv_run(["-m", "pip", "install", "--upgrade", "pip", "--quiet"], check=True)
# ── Install packages ───────────────────────────────────────
print("\n[3/4] Installing packages...")
missing = []
for pkg in PACKAGES:
imp = pkg.split(">=")[0].split("<")[0].replace("-", "_")
r = venv_run(["-c", f"import {imp}"], capture_output=True)
if r.returncode != 0:
missing.append(pkg)
if missing:
print(f" Installing: {missing}")
venv_run(["-m", "pip", "install", "--quiet"] + missing, check=True)
print(" Packages installed [OK]")
else:
print(" All packages already present [OK]")
# ── Clone Qwen3-ASR repo ───────────────────────────────────
print("\n[4/4] Qwen3-ASR repo...")
repo = asr_dir / "Qwen3-ASR"
if repo.exists():
print(f" Repo already exists: {repo} [OK]")
else:
print(f" Cloning from {QWEN_ASR_REPO}...")
subprocess.run(["git", "clone", QWEN_ASR_REPO, str(repo)], check=True)
subprocess.run(["git", "-C", str(repo), "checkout", QWEN_ASR_COMMIT], check=True)
print(" Cloned [OK]")
print(" Installing qwen_asr package...")
venv_run(["-m", "pip", "install", "-q", "-e", str(repo)], check=True)
print(" qwen_asr installed [OK]")
# ── Write state.json ───────────────────────────────────────
state = {
"ROOT": str(root),
"ASR_DIR": str(asr_dir),
"VENV_DIR": str(venv_dir),
"VENV_PY": str(venv_py),
"VENV_EXISTS": True,
}
state_file = asr_dir / "state.json"
state_file.write_text(json.dumps(state, indent=2), encoding="utf-8")
print(f"\n state.json written: {state_file} [OK]")
# ── Write asr_engine.py ────────────────────────────────────
SKILL_VERSION = _read_skill_version() # pulled from SKILL.md — single source of truth
engine_file = asr_dir / "asr_engine.py"
# Build the engine payload and redeploy whenever the skill version changes.
engine_code = r'''SKILL_VERSION = "v1.0"
"""
asr_engine.py — OpenVINO inference engine for Qwen3-ASR.
Inference only — no model conversion.
"""
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, List, Optional
import numpy as np
import openvino as ov
try:
from qwen_asr.inference.utils import (
SAMPLE_RATE, MAX_ASR_INPUT_SECONDS, SUPPORTED_LANGUAGES,
normalize_audios, parse_asr_output,
split_audio_into_chunks, merge_languages,
)
from qwen_asr.core.transformers_backend.processing_qwen3_asr import Qwen3ASRProcessor
INFERENCE_UTILS_AVAILABLE = True
except ImportError:
INFERENCE_UTILS_AVAILABLE = False
SAMPLE_RATE = 16000
MAX_ASR_INPUT_SECONDS = 1200
SUPPORTED_LANGUAGES = ["Chinese", "English"]
core = ov.Core()
def _get_feat_extract_output_lengths(input_lengths):
input_lengths = np.asarray(input_lengths, dtype=np.int64)
leave = input_lengths % 100
feat = (leave - 1) // 2 + 1
return ((feat - 1) // 2 + 1 - 1) // 2 + 1 + (input_lengths // 100) * 13
class SinusoidsPositionEmbedding:
def __init__(self, max_pos, embed_dim, max_ts=10000.0):
half = embed_dim // 2
inv = np.exp(-np.log(max_ts) / (half - 1) * np.arange(half, dtype=np.float32))
t = np.arange(max_pos, dtype=np.float32)[:, None] * inv[None, :]
self.pe = np.concatenate([np.sin(t), np.cos(t)], axis=1).astype(np.float32)
def __getitem__(self, n): return self.pe[:n, :]
def load_audio_file(path, sr=16000):
try:
import soundfile as sf
audio, orig_sr = sf.read(str(path), dtype="float32")
except Exception:
import scipy.io.wavfile as wav
orig_sr, audio = wav.read(str(path))
audio = audio.astype(np.float32) / 32768.0
if audio.ndim > 1: audio = audio.mean(axis=1)
if orig_sr != sr:
import librosa
audio = librosa.resample(audio, orig_sr=orig_sr, target_sr=sr)
return np.asarray(audio, dtype=np.float32)
@dataclass
class ASRTranscription:
language: str
text: str
time_stamps: Optional[Any] = None
class OVQwen3ASRPipeline:
def __init__(self, model_dir, device="CPU"):
self.model_dir = Path(model_dir)
with open(self.model_dir / "config.json") as f:
cfg = json.load(f)
t = cfg.get("thinker_config", {})
a = t.get("audio_config", {})
c = t.get("text_config", {})
self.d_model = a["d_model"]
self.num_mel_bins = a["num_mel_bins"]
self.max_source_positions = a["max_source_positions"]
self.n_window = a["n_window"]
self.n_window_infer = a.get("n_window_infer", self.n_window * 2)
self.hidden_size = c["hidden_size"]
self.audio_token_id = t["audio_token_id"]
self.pos_emb = SinusoidsPositionEmbedding(self.max_source_positions, self.d_model)
root = self.model_dir
thinker = self.model_dir / "thinker"
def _find(self_names, opt_names):
for n in self_names:
if (root / n).exists(): return root / n
for n in opt_names:
if (thinker / n).exists(): return thinker / n
raise FileNotFoundError(f"Model file not found. Tried: {self_names}")
self.audio_conv = core.compile_model(_find(["openvino_audio_conv_model.xml"], ["openvino_thinker_audio_model.xml"]), device)
self.audio_encoder = core.compile_model(_find(["openvino_audio_encoder_model.xml"], ["openvino_thinker_audio_encoder_model.xml"]), device)
self.text_emb = core.compile_model(_find(["openvino_text_embeddings_model.xml"],["openvino_thinker_embedding_model.xml"]), device)
lm = core.read_model(_find(["openvino_language_model.xml"], ["openvino_thinker_language_model.xml"]))
self.lm_input_names = {k.get_any_name(): i for i, k in enumerate(lm.inputs)}
self._pos_ndim = len(lm.input("position_ids").get_partial_shape())
self.lm_req = core.compile_model(lm, device).create_infer_request()
print("All models loaded [OK]")
def _audio_tower(self, feats, feat_len):
cs = self.n_window * 2
aftercnn = int(_get_feat_extract_output_lengths(feat_len))
cn = int(np.ceil(feat_len / cs))
clens = np.full(cn, cs, dtype=np.int64)
if feat_len % cs: clens[-1] = feat_len % cs
ft = feats.T; chunks = []; s = 0
for l in clens: chunks.append(ft[s:s+int(l)]); s += int(l)
mx = max(c.shape[0] for c in chunks)
pad = [np.pad(c, ((0, mx-c.shape[0]), (0, 0))) if c.shape[0] < mx else c for c in chunks]
pf = np.stack(pad).transpose(0, 2, 1).astype(np.float32)
lcnn = _get_feat_extract_output_lengths(clens)
co = self.audio_conv(pf)[self.audio_conv.output(0)]
co = co + self.pos_emb[co.shape[1]][None, :, :]
mask = np.zeros((len(clens), co.shape[1]), dtype=bool)
for j, cl in enumerate(lcnn): mask[j, :int(cl)] = True
hidden = co[mask]
wcnn = mask.shape[-1] * (self.n_window_infer // cs)
cu = [0] + [wcnn] * (aftercnn // wcnn)
r = aftercnn % wcnn
if r: cu.append(r)
cus = np.cumsum(cu).astype(np.int32)
segs = []
for si in range(len(cus) - 1):
s, e = int(cus[si]), int(cus[si+1])
out = self.audio_encoder({"hidden_states": hidden[s:e].astype(np.float32), "cu_seqlens": np.array([0, e-s], dtype=np.int32)})[self.audio_encoder.output(0)]
segs.append(out)
return np.concatenate(segs, axis=0)
def _process_audio(self, feats, feat_mask):
lens = np.sum(feat_mask, axis=1).astype(np.int64)
return np.concatenate([self._audio_tower(feats[i, :, :int(lens[i])], int(lens[i])) for i in range(feats.shape[0])], axis=0)
def _embed(self, ids): return self.text_emb(ids)[self.text_emb.output(0)]
def _lm(self, emb, attn, pos, last_only=True):
if self._pos_ndim == 3 and pos.ndim == 2: pos = np.stack([pos] * 3, axis=0)
inp = {"inputs_embeds": emb.astype(np.float32), "attention_mask": attn.astype(np.int64), "position_ids": pos.astype(np.int64)}
if "beam_idx" in self.lm_input_names: inp["beam_idx"] = np.arange(emb.shape[0], dtype=np.int32)
self.lm_req.start_async(inp, share_inputs=False); self.lm_req.wait()
logits = self.lm_req.get_tensor("logits").data
return logits[:, -1:, :].copy() if last_only else logits.copy()
_CHUNK = 256
def _prefill(self, emb, attn, pos):
seq = emb.shape[1]
if seq <= self._CHUNK: return self._lm(emb, attn, pos)
logits = None
for s in range(0, seq, self._CHUNK):
e = min(s + self._CHUNK, seq)
logits = self._lm(emb[:, s:e, :], attn[:, :e], pos[:, s:e])
return logits
def transcribe_audio(self, audio, processor, max_new_tokens=512):
msgs = [{"role": "system", "content": ""}, {"role": "user", "content": [{"type": "audio", "audio": audio}]}]
text = processor.apply_chat_template(msgs, add_generation_prompt=True, tokenize=False)
inp = processor(text=[text], audio=[audio], return_tensors="np", padding=True)
af = self._process_audio(inp["input_features"], inp["feature_attention_mask"])
emb = self._embed(inp["input_ids"])
amask = inp["input_ids"][0] == self.audio_token_id
na, nf = int(amask.sum()), af.shape[0]
if na != nf:
n = min(na, nf); emb[0, np.where(amask)[0][:n]] = af[:n]
else:
emb[0, amask] = af
attn = inp["attention_mask"]
pos = np.where(attn == 0, 0, np.cumsum(attn, axis=-1) - 1)
self.lm_req.reset_state(); logits = self._prefill(emb, attn, pos)
tok = processor.tokenizer; eos = set()
if tok.eos_token_id: eos.add(tok.eos_token_id)
for t in ["<|im_end|>", "<|endoftext|>"]:
tid = tok.convert_tokens_to_ids(t)
if tid and tid != tok.unk_token_id: eos.add(tid)
gen = []; cur_attn = attn.copy()
for _ in range(max_new_tokens):
t = int(np.argmax(logits[:, -1, :], axis=-1)[0])
if t in eos: break
gen.append(t)
ne = self._embed(np.array([[t]]))
cur_attn = np.concatenate([cur_attn, np.ones((1, 1), dtype=np.int64)], axis=1)
logits = self._lm(ne, cur_attn, np.array([[cur_attn.shape[1] - 1]], dtype=np.int64))
raw = tok.decode(gen, skip_special_tokens=True); clean, lang = raw, "unknown"
try:
from qwen_asr.inference.utils import parse_asr_output
lang, clean = parse_asr_output(raw, user_language=None)
except Exception:
for sp in ["<|im_end|>", "<|endoftext|>", "<|im_start|>", "</asr_text>"]: clean = clean.replace(sp, "")
clean = clean.strip()
return {"text": clean, "language": lang, "generated_tokens": len(gen)}
class OVQwen3ASRModel:
def __init__(self, model_dir, device="CPU", max_new_tokens=4096, max_inference_batch_size=32):
self.max_new_tokens = max_new_tokens
self.max_inference_batch_size = max(1, int(max_inference_batch_size))
self.pipeline = OVQwen3ASRPipeline(str(model_dir), device=device)
self.processor = None
try:
self.processor = Qwen3ASRProcessor.from_pretrained(str(model_dir))
print("Processor loaded [OK]")
except Exception as e:
print(f"[WARN] Processor load failed: {e}")
print(f"OVQwen3ASRModel ready [OK] batch={self.max_inference_batch_size}")
@classmethod
def from_pretrained(cls, model_dir, device="CPU", max_new_tokens=4096, max_inference_batch_size=32, **kw):
return cls(
model_dir=model_dir,
device=device,
max_new_tokens=max_new_tokens,
max_inference_batch_size=max_inference_batch_size,
)
def get_supported_languages(self): return list(SUPPORTED_LANGUAGES)
def transcribe(self, audio, language=None, **kw):
if self.processor is None: raise RuntimeError("Processor not loaded")
wavs = normalize_audios(audio) if INFERENCE_UTILS_AVAILABLE else [self._to_wav(a) for a in (audio if isinstance(audio, list) else [audio])]
out_l = [[] for _ in range(len(wavs))]
out_t = [[] for _ in range(len(wavs))]
for i, wav in enumerate(wavs):
chunk_buffer = []
if INFERENCE_UTILS_AVAILABLE:
chunk_iter = split_audio_into_chunks(wav, sr=SAMPLE_RATE, max_chunk_sec=MAX_ASR_INPUT_SECONDS)
for cwav, _ in chunk_iter:
chunk_buffer.append(cwav)
if len(chunk_buffer) >= self.max_inference_batch_size:
for buffered_wav in chunk_buffer:
r = self.pipeline.transcribe_audio(buffered_wav, self.processor, self.max_new_tokens)
out_l[i].append(r["language"]); out_t[i].append(r["text"])
chunk_buffer.clear()
else:
chunk_buffer.append(wav)
for buffered_wav in chunk_buffer:
r = self.pipeline.transcribe_audio(buffered_wav, self.processor, self.max_new_tokens)
out_l[i].append(r["language"]); out_t[i].append(r["text"])
return [
ASRTranscription(
language=merge_languages(out_l[i]) if INFERENCE_UTILS_AVAILABLE else (out_l[i][0] if out_l[i] else "unknown"),
text="".join(out_t[i])
)
for i in range(len(wavs))
]
@staticmethod
def _to_wav(a):
if isinstance(a, str): return load_audio_file(a)
if isinstance(a, tuple):
wav, sr = a; wav = np.asarray(wav, dtype=np.float32)
if wav.ndim > 1: wav = wav.mean(axis=-1)
if sr != 16000:
import librosa; wav = librosa.resample(wav, orig_sr=sr, target_sr=16000)
return wav.astype(np.float32)
if isinstance(a, np.ndarray): return a.astype(np.float32)
raise ValueError(f"Unsupported type: {type(a)}")
'''
engine_payload = engine_code.strip().replace('SKILL_VERSION = "v1.0"', f'SKILL_VERSION = "{SKILL_VERSION}"', 1)
needs_write = True
if engine_file.exists():
m = re.search(r'SKILL_VERSION\s*=\s*["\'](.+?)["\']', engine_file.read_text(encoding="utf-8", errors="ignore"))
if m and m.group(1) == SKILL_VERSION:
needs_write = False
if needs_write:
print(f"\n Writing asr_engine.py {SKILL_VERSION}...")
engine_file.write_text(engine_payload, encoding="utf-8")
print(f" asr_engine.py written [OK]")
else:
print(f"\n asr_engine.py already at {SKILL_VERSION} [OK]")
# ── Deploy transcribe.py ──────────────────────────────────
transcribe_dst = asr_dir / "transcribe.py"
transcribe_src = Path(__file__).parent / "transcribe.py"
# patch __SKILL_VERSION__ placeholder with the actual version from SKILL.md
_content = transcribe_src.read_text(encoding="utf-8")
_content = re.sub(
r'(SKILL_VERSION\s*=\s*["\'])([^"\']+)(["\'])',
rf'\g<1>{SKILL_VERSION}\g<3>',
_content,
count=1,
)
_needs_deploy = True
if transcribe_dst.exists():
_m = re.search(
r'SKILL_VERSION\s*=\s*["\'](.+?)["\']',
transcribe_dst.read_text(encoding="utf-8", errors="ignore"),
)
if _m and _m.group(1) == SKILL_VERSION:
_needs_deploy = False
if _needs_deploy:
transcribe_dst.write_text(_content, encoding="utf-8")
print(f"\n transcribe.py {SKILL_VERSION} deployed [OK]")
else:
print(f"\n transcribe.py already at {SKILL_VERSION} [OK]")
# ── Verify ─────────────────────────────────────────────────
print("\n[Verify] Checking installation...")
verify_script = f"""
results = {{}}
for pkg, imp in [
("openvino","openvino"), ("numpy","numpy"), ("librosa","librosa"), ("transformers","transformers"),
("huggingface_hub","huggingface_hub"), ("accelerate","accelerate"),
("soundfile","soundfile"), ("scipy","scipy"), ("modelscope","modelscope"),
]:
try:
ver = getattr(__import__(imp), "__version__", "OK")
results[pkg] = ("OK", ver)
except ImportError as e:
results[pkg] = ("FAIL", str(e))
try:
import numpy as np
v = tuple(int(x) for x in np.__version__.split(".")[:2])
results["numpy"] = ("WARN", f"{{np.__version__}} must be <2.0 — run setup.py again") if v >= (2,0) else ("OK", np.__version__)
except: pass
try:
from qwen_asr.inference.utils import SAMPLE_RATE
results["qwen_asr"] = ("OK", f"SAMPLE_RATE={{SAMPLE_RATE}}")
except ImportError as e:
results["qwen_asr"] = ("FAIL", str(e))
try:
import sys as _sys
_sys.path.insert(0, r"{asr_dir}")
from asr_engine import OVQwen3ASRModel
results["asr_engine"] = ("OK", "importable")
except ImportError as e:
results["asr_engine"] = ("FAIL", str(e))
fail = [k for k,(s,_) in results.items() if s == "FAIL"]
for k,(s,d) in results.items():
icon = "OK " if s == "OK" else ("WARN" if s == "WARN" else "FAIL")
print(f" [{{icon}}] {{k}}: {{d}}")
print()
print("VERIFY=PASS" if not fail else f"VERIFY=FAIL {{fail}}")
"""
venv_run(["-c", verify_script])
# ── Done ───────────────────────────────────────────────────
print()
print("=" * 55)
print(" Setup complete!")
print()
print(" Next step — download the model (~2 GB):")
print(f" python \"{Path(__file__).parent / 'download_model.py'}\"")
print("=" * 55)
FILE:transcribe.py
SKILL_VERSION = "__SKILL_VERSION__" # placeholder — replaced by setup.py with SKILL.md version on deploy
import sys, io, os, json, string, argparse
from pathlib import Path
_STATE_CACHE = None
_MODEL_CACHE = None
_MODEL_CACHE_KEY = None
# Qwen3-ASR API supports long audio natively; no manual chunking needed.
# Truncation is controlled by max_new_tokens; 4096 covers ~30 minutes.
MAX_NEW_TOKENS = 4096
# max_inference_batch_size controls internal concurrency; 32 is the recommended value.
MAX_BATCH_SIZE = 32
def get_state():
global _STATE_CACHE
if _STATE_CACHE is not None:
return _STATE_CACHE
for d in string.ascii_uppercase:
sf = Path(f"{d}:\\") / f"{os.environ.get('USERNAME', 'user').lower()}_openvino" / "asr" / "state.json"
if sf.exists():
_STATE_CACHE = json.loads(sf.read_text(encoding='utf-8'))
return _STATE_CACHE
return None
def get_device():
import openvino as ov
core = ov.Core()
devs = core.available_devices
print(f"[INFO] Devices: {devs}", file=sys.stderr)
for d in devs:
if "GPU" in d:
print(f"[INFO] Using {d}", file=sys.stderr)
return d
print("[INFO] Using CPU", file=sys.stderr)
return "CPU"
def transcribe(audio_path, language=None, topic='', output_path=None):
state = get_state()
if not state:
print("[ERROR] state.json not found - run setup.py", file=sys.stderr)
sys.exit(1)
asr_dir = Path(state['ASR_DIR'])
sys.path.insert(0, str(asr_dir))
model_dir = asr_dir / "Qwen3-ASR-0.6B-fp16-ov"
if not Path(audio_path).exists():
print(f"[ERROR] Audio not found: {audio_path}", file=sys.stderr)
sys.exit(1)
# load model (reused in-process)
# max_new_tokens caps output tokens; this is the true cause of long-audio truncation
device = get_device()
from asr_engine import OVQwen3ASRModel
global _MODEL_CACHE, _MODEL_CACHE_KEY
model_key = (str(model_dir), str(device), MAX_NEW_TOKENS, MAX_BATCH_SIZE)
if _MODEL_CACHE is None or _MODEL_CACHE_KEY != model_key:
print(f"[INFO] Loading model: {model_dir} (max_new_tokens={MAX_NEW_TOKENS}, batch={MAX_BATCH_SIZE})", file=sys.stderr)
_MODEL_CACHE = OVQwen3ASRModel.from_pretrained(
str(model_dir),
device=device,
max_new_tokens=MAX_NEW_TOKENS,
max_inference_batch_size=MAX_BATCH_SIZE,
)
_MODEL_CACHE_KEY = model_key
else:
print("[INFO] Reusing loaded model", file=sys.stderr)
model = _MODEL_CACHE
# normalize language: "auto"/empty/None all map to None (model API convention)
if language in (None, '', 'auto', 'Auto', 'AUTO', 'none', 'null'):
language = None
import time
t0 = time.time()
# model API handles long audio internally; pass the full file
results = model.transcribe(audio=str(audio_path), language=language)
elapsed = time.time() - t0
if not results:
print("[ERROR] Empty transcription result", file=sys.stderr)
sys.exit(1)
result_data = {
"text": results[0].text,
"language": results[0].language,
"time_elapsed": elapsed,
"audio_path": str(audio_path),
}
print(f"[INFO] Language: {results[0].language} | Time: {elapsed:.2f}s", file=sys.stderr)
return result_data
if __name__ == "__main__":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace', line_buffering=True)
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace', line_buffering=True)
try:
parser = argparse.ArgumentParser()
parser.add_argument("--audio", required=True)
parser.add_argument("--language", default=None)
parser.add_argument("--topic", default='')
parser.add_argument("--output", default=None)
args = parser.parse_args()
language = None if args.language in (None, '', 'None', 'none', 'null') else args.language
result = transcribe(args.audio, language, args.topic, args.output)
print(json.dumps(result, ensure_ascii=False), flush=True)
sys.stdout.flush()
except Exception as exc:
import traceback
print(f"[FATAL] {type(exc).__name__}: {exc}", flush=True)
traceback.print_exc()
sys.exit(1)
generate an image, create a picture, draw something, make an image of, text to image, paint a picture, illustrate, visualize, local image generation, AI art,...
---
name: local-image-gen-aipc
#displayName: Local Image Generation (Windows · Z-Image-Turbo · OpenVINO)
description: >
generate an image, create a picture, draw something, make an image of, text to image,
paint a picture, illustrate, visualize, local image generation, AI art, image synthesis,
offline image generation, no API key, local inference, generate art, create artwork,
produce an image, render an image, AI drawing, image from text.
Runs Z-Image-Turbo on-device on Windows via Intel OpenVINO. Prioritizes Intel iGPU
(Xe / Arc), falls back to CPU. Bilingual prompts (English + Chinese) supported.
SETUP requires network: downloads pip dependencies from GitHub and the model (\~10 GB)
from modelscope.cn. INFERENCE is fully offline after setup — no cloud API calls.
os: windows
requires:
* python>=3.10
* git
network:
setup: required # github.com (pip deps), modelscope.cn (\~10 GB model)
inference: offline
user-invocable: true
allowed-tools: Bash(python *), Bash(powershell *), Bash(git *), Read, Glob, Write, message
---
**Model**: `snake7gun/Z-Image-Turbo-int4-ov` (ModelScope INT4)
**SKILL_VERSION**: `v1.0.3`
> **Network usage**: Setup downloads pip dependencies (some pinned to git+https commits)
> from `github.com`, and the model (~10 GB, resume supported) from `modelscope.cn`.
> Inference is fully offline — no network calls once setup is complete.
**First time?** Before using this skill, run these two scripts once in a terminal:
```
python setup.py # creates venv, installs dependencies (~5 min)
python download_model.py # downloads the model (~10 GB, resumable)
```
Both scripts are in the skill directory alongside this SKILL.md.
## Directory layout (all auto-created)
```
{USERNAME}_openvino\
├── venv\ ← shared venv (created by setup.py)
└── imagegen\
├── state.json ← written by setup.py
├── generate_image.py ← deployed by setup.py (versioned)
├── Z-Image-Turbo-int4-ov\ ← downloaded by download_model.py (~10 GB)
└── outputs\YYYYMMDD_HHMMSS_topic.png
```
---
## Agent instructions
1. **Windows / PowerShell only.** Never use Linux commands (`ls`, `rm`, `cat`). Never use `&&` or `call`.
2. **Every step reads `state.json` itself** — do not pass paths between steps manually.
3. **Use `VENV_PY` from state.json for all inference and package-import calls** — never use system Python for inference or Step 2 DEP_CHECK. Exception: `setup.py`, `check_env.py`, `download_model.py`, and the Step 2 script-write command intentionally run with system Python (they only use stdlib or create/validate the venv, so they must not depend on it).
4. **CRITICAL — Never modify `generate_image.py` manually.** It is deployed and versioned by `setup.py`. If it fails or is missing, re-run `setup.py`.
5. **Goal**: generate an image and send the preview to the conversation.
**Auto-recovery policy — try before asking user:**
* If `STATE=MISSING`, `VENV_PY=BROKEN`, `PACKAGES_MISSING`, or `SCRIPTS_STALE`: automatically run `setup.py` (up to 3 attempts). Only ask user if all 3 fail.
* If `MODEL_STATUS=MISSING`: automatically run `download_model.py` (up to 3 attempts). Stop if a single attempt exceeds 20 minutes — download supports resume, partial progress is not lost.
* Always announce before each attempt: `[INFO] Auto-installing environment (attempt N/3)…`
**[WARNING] Network / proxy handling** — when running `setup.py` or `download_model.py`:
1. Check for proxy: read `$env:HTTPS_PROXY` / `$env:HTTP_PROXY`; if unset, run `netsh winhttp show proxy`.
2. If proxy detected: apply automatically and continue.
3. If no proxy and download/setup fails: inform the user, suggest configuring one, then retry.
Many networks (corporate / China mainland / campus) require a proxy. Download supports resume — safe to retry after fixing network.
**Pipeline — follow exactly in order, no skipping:**
```
Step 0: expand prompt → EXPANDED_PROMPT, TOPIC
Step 1: verify environment → VENV_PY, IMAGE_GEN_DIR confirmed ready
↳ if STATE=MISSING or VENV_BROKEN or PACKAGES_MISSING: auto-run setup.py (3 attempts)
↳ if MODEL_STATUS=MISSING: auto-run download_model.py (3 attempts)
Step 2: verify deps → DEP_CHECK=PASS
Step 3: generate + send → [SUCCESS] + image preview
```
## Pre-flight: Verify Runtime Requirements (Required on First Use)
> Pre-flight: Checking Python and git...
### Check Python Version
```powershell
python --version
```
**Interpretation:**
| Output | Action |
|------|------|
| `Python 3.10.x` or higher | [OK] `PYTHON_OK`, continue to git check |
| `Python 3.8 / 3.9` | Version too low; upgrade is required (see below) |
| `'python' is not recognized as an internal or external command` | Python is not installed; install is required (see below) |
| `Python was not found; run without arguments...` | **Windows Store alias** — see below |
**If output contains "run without arguments to install from the Microsoft Store"**, the Windows Store App Execution Alias is shadowing the real Python. Do NOT ask the user to change settings, and do NOT write helper scripts. Run this command to find the real Python:
```powershell
where.exe python 2>$null | Where-Object { $_ -notlike "*WindowsApps*" } | Select-Object -First 1
```
* A path is printed → **record this literal string as `SYSTEM_PYTHON`** (system-level Python, used only for `setup.py`, `check_env.py`, `download_model.py`). For every command marked `[SYSTEM PYTHON]`, substitute the full literal path for `python`. Do NOT use this path for inference — inference must always use `VENV_PY`. Do NOT use a `$variable` across tool calls — each call is a new shell; always embed the literal path directly.
* Nothing printed → Python is not installed — install it (see below).
**If Python is missing or outdated**, run this one-command silent installer in PowerShell (recommended, no admin required):
```powershell
$f = "$env:TEMP\python-installer.exe"
Invoke-WebRequest "https://www.python.org/ftp/python/3.12.10/python-3.12.10-amd64.exe" -OutFile $f
Start-Process $f -ArgumentList "/quiet InstallAllUsers=0 PrependPath=1 Include_pip=1" -Wait
Remove-Item $f
```
> `PrependPath=1` adds Python to PATH automatically; `Include_pip=1` installs pip; `InstallAllUsers=0` avoids requiring administrator privileges.
After installation, **restart the terminal**, then run `python --version` and confirm it reports `Python 3.12.x`.
If you prefer manual installation: download **https://www.python.org/ftp/python/3.12.10/python-3.12.10-amd64.exe** and make sure to check **"Add python.exe to PATH"** during setup.
### Check git
```bat
git --version
```
**Interpretation:**
| Output | Action |
|------|------|
| `git version 2.x.x` | [OK] `GIT_OK`, Pre-flight passed |
| `'git' is not recognized as an internal or external command` | git is not installed; install is required (see below) |
**If git is missing**, run this one-command silent installer in PowerShell:
```powershell
$f = "$env:TEMP\git-installer.exe"
Invoke-WebRequest "https://github.com/git-for-windows/git/releases/download/v2.49.0.windows.1/Git-2.49.0-64-bit.exe" -OutFile $f
Start-Process $f -ArgumentList "/VERYSILENT /NORESTART /NOCANCEL /SP- /CLOSEAPPLICATIONS /RESTARTAPPLICATIONS /COMPONENTS=icons,ext\\reg\\shellhere,assoc,assoc_sh" -Wait
Remove-Item $f
```
After installation, **restart the terminal**, then run `git --version` to confirm.
If you prefer manual installation: open **https://git-scm.com/download/win**, download the installer, and proceed with default options.
> git is required for the `git+https://` dependency in `requirements_imagegen.txt`; without git, `pip install` will fail with `git: command not found`.
**Pre-flight pass criteria**: `python --version` is >= 3.10 and `git --version` returns a valid version string.
Status message: `[OK] Python and git are ready. Starting main workflow.`
---
## Skill Contract (Input / Output)
### Accepted Inputs
1. **Text description** (required) — English or Chinese prompt describing the image to generate.
2. **Optional parameters:**
* `steps`: inference steps, default 9 (higher = more detail)
* `size`: `512×512` | `768×768` | `1024×1024`, default `512×512`
* `seed`: integer (default 42) or `-1` for random
* `output`: custom absolute output path for the PNG file
### Output on Success
Returns the PNG file path in stdout as `[SUCCESS] <path>`, and sends a preview via the `message` tool.
```json
{
"image_path": "C:\\...\\outputs\\20260412_153000_panda_bamboo.png",
"topic": "panda_bamboo",
"prompt": "A giant panda sitting in a lush bamboo forest...",
"steps": 9,
"size": "512x512",
"seed": 42,
"device": "GPU.0"
}
```
### Output on Failure
```json
{
"error": "human-readable description",
"stage": "environment | model | inference",
"recoverable": true
}
```
---
## Step 0: expand prompt (LLM only — no tools)
Do two things simultaneously: **① expand the prompt** and **② extract a topic slug** (English snake_case, used for the filename).
Expansion structure: `[subject] [action/pose] [environment] [lighting/mood] [style] [quality tags]`
Prompts can be English or Chinese — no translation needed. Topic slug must always be English to avoid path encoding issues.
Quality tags: `photorealistic`, `8K resolution`, `cinematic lighting`, `masterpiece`
|Input|Topic slug|Expanded prompt|
|-|-|-|
|a panda|`panda_bamboo`|A giant panda sitting in a lush bamboo forest, sunlight filtering through leaves, photorealistic, 8K, wildlife photography|
|赛博朋克城市|`cyberpunk_city`|未来感都市夜景,霓虹灯倒映在湿漉漉的街道,赛博朋克风,电影级,8K|
Show the result before proceeding:
```
Input: {user description}
Expanded: {full prompt}
Topic: {topic_slug}
```
---
## Step 1: verify environment and model
> Step 1/3: checking environment and model…
```powershell
# [SYSTEM PYTHON] check_env.py validates the venv — must NOT use venv python here
python "<skill_dir>\check_env.py"
```
**Output interpretation:**
| Output | Meaning | Action |
|---|---|---|
| `MODEL_STATUS=READY` + `VENV_PY=...` | OK | Proceed to Step 2 |
| `SCRIPTS_STALE=old->new` | `generate_image.py` outdated | Auto-run setup.py (see below) |
| `STATE=MISSING` | setup.py never run | Auto-run setup.py (see below) |
| `VENV_PY=BROKEN` | venv corrupted | Auto-run setup.py (see below) |
| `PACKAGES_MISSING: ...` | packages missing from venv | Auto-run setup.py (see below) |
| `MODEL_STATUS=MISSING missing=[...]` | download incomplete | Auto-run download_model.py (see below) |
**On success**: record `VENV_PY` and `IMAGE_GEN_DIR` from stdout, proceed to Step 2.
> If `SCRIPTS_STALE` also appears alongside `MODEL_STATUS=READY`, auto-run setup.py before Step 2 (see below).
---
### If SCRIPTS_STALE or STATE=MISSING or VENV_PY=BROKEN or PACKAGES_MISSING → auto-run setup.py
`SCRIPTS_STALE` means the venv and model are both OK but `generate_image.py` in `IMAGE_GEN_DIR` is an older version. `setup.py` is idempotent — it skips venv/packages if already healthy and only redeploys the script.
`PACKAGES_MISSING` means the venv exists but required packages (`openvino`, `torch`, `optimum.intel`) are not installed. Re-running `setup.py` re-installs only what is missing.
Announce and run (up to 3 attempts):
```
[INFO] Environment not initialized — auto-installing (attempt 1/3)…
```
```powershell
# [SYSTEM PYTHON] setup.py creates the venv — must NOT use venv python here
python "<skill_dir>\setup.py"
```
Re-run `python "<skill_dir>\check_env.py"` after each attempt. If all 3 fail, show manual fallback below.
---
### If MODEL_STATUS=MISSING → auto-run download_model.py
Announce to user and ask how to proceed:
```
Model not found — download required (~10 GB)
Estimated time:
• 100 Mbps → ~15 min
• 50 Mbps → ~30 min
• 10 Mbps → ~2 hr
Download supports resume — safe to interrupt and retry.
[Y] Start auto-download
[N] I'll download manually — show me the link
```
**Auto-download** (up to 3 attempts, stop if a single attempt exceeds 20 minutes):
```powershell
# [SYSTEM PYTHON] download_model.py uses stdlib only — must NOT use venv python here
python "<skill_dir>\download_model.py"
```
Re-run `python "<skill_dir>\check_env.py"` after each attempt.
**Manual download fallback:**
ModelScope page: **https://modelscope.cn/models/snake7gun/Z-Image-Turbo-int4-ov/files**
Place all files under `<IMAGE_GEN_DIR>\Z-Image-Turbo-int4-ov\`. Required subdirs:
```
Z-Image-Turbo-int4-ov\
├── transformer\
├── vae_decoder\
└── text_encoder\
```
Then re-run `python "<skill_dir>\check_env.py"` to verify.
---
### Manual fallback (only if all 3 setup auto-attempts fail)
Show user:
```
[WARN] Auto-install failed. Please run manually in a terminal:
1) Install environment:
python "<skill_dir>\setup.py"
Takes ~5 min, fully automated.
2) Download model (~10 GB):
python "<skill_dir>\download_model.py"
Resumable — safe to interrupt and retry.
Come back here when done.
```
---
## Step 2: verify dependencies
> Step 2/3: verifying dependencies…
**Verify dependencies** (run via VENV_PY):
```
$env:PYTHONUTF8 = "1"
& "<VENV_PY>" -c "
import json, site
from pathlib import Path
EXPECTED_COMMITS = {
'optimum_intel': '2f62e5ae',
'diffusers': 'a1f36ee3',
}
def get_git_commit(pkg_name):
dirs = site.getsitepackages()
try: dirs += [site.getusersitepackages()]
except Exception: pass
for d in dirs:
for dist in Path(d).glob(f'{pkg_name}*.dist-info'):
url_file = dist / 'direct_url.json'
if url_file.exists():
data = json.loads(url_file.read_text(encoding='utf-8'))
return data.get('vcs_info', {}).get('commit_id', 'no_vcs_info')
return 'not_found'
results = {}
for pkg, imp in [('openvino','openvino'),('torch','torch'),('Pillow','PIL'),('modelscope','modelscope')]:
try:
ver = getattr(__import__(imp), '__version__', 'OK')
results[pkg] = ('OK', ver)
except ImportError as e:
results[pkg] = ('MISSING', str(e))
try:
from optimum.intel import OVZImagePipeline
results['OVZImagePipeline'] = ('OK', 'importable')
except ImportError as e:
results['OVZImagePipeline'] = ('MISSING', str(e))
for pkg_name, exp in EXPECTED_COMMITS.items():
actual = get_git_commit(pkg_name)
if actual == 'not_found':
results[f'{pkg_name}@commit'] = ('MISSING', 'not installed via git+https')
elif actual.startswith(exp):
results[f'{pkg_name}@commit'] = ('OK', actual[:16])
else:
results[f'{pkg_name}@commit'] = ('WRONG', f'got {actual[:16]} want {exp}...')
all_ok = all(v[0] == 'OK' for v in results.values())
for k, (status, detail) in results.items():
icon = '[OK]' if status == 'OK' else ('[WARN]' if status == 'WRONG' else '[MISSING]')
print(f' {icon} {k}: {detail}')
print('DEP_CHECK=PASS' if all_ok else 'DEP_CHECK=FAIL')
"
```
|Output|Action|
|-|-|
|`DEP_CHECK=PASS`|[PASS] Proceed to Step 3|
|`DEP_CHECK=FAIL` (MISSING)|[FAIL] Re-run `setup.py` and retry|
|`DEP_CHECK=FAIL` (`@commit` WRONG)|[FAIL] Force reinstall: `& "<VENV_PY>" -m pip uninstall optimum-intel diffusers -y` then `& "<VENV_PY>" -m pip install -r "<skill_dir>\requirements_imagegen.txt" --no-cache-dir`|
---
## Step 3: generate image and send preview
> Step 3/3: running inference…
Run these two commands separately:
```
$env:PYTHONUTF8 = "1"
```
```
& "<VENV_PY>" "<IMAGE_GEN_DIR>\generate_image.py" --prompt "EXPANDED_PROMPT" --topic "TOPIC" --steps 9 --seed 42
```
**Pass**: stdout contains `[SUCCESS]`. Record `OUTPUT_PATH` from the `[SUCCESS]` line.
Send preview via `message` tool:
```
action: "send" filePath: "OUTPUT_PATH" message: "[OK] TOPIC"
```
**Final announcement:**
```
[OK] Done! Path: <OUTPUT_PATH>
Prompt: {expanded prompt}
steps=9, 512×512, seed=42 | device: {CPU/GPU}
```
---
## Parameters
|Param|Default|Notes|
|-|-|-|
|`--prompt`|required|English or Chinese|
|`--topic`|empty|English snake_case slug for filename|
|`--steps`|9|Higher = more detail; no hard limit|
|`--width--height`|512|512 | 768|768 1024|1024 recommended|
|`--seed`|42|-1 = random|
|`--output`|auto|Custom absolute output path|
> `guidance_scale` is fixed at `0.0` and not exposed as a parameter.
---
## Troubleshooting
|Error|Cause|Fix|
|-|-|-|
|`STATE=MISSING`|setup.py never run|Run `python "<skill_dir>\setup.py"`|
|`VENV_PY=BROKEN`|venv corrupted|Re-run `python "<skill_dir>\setup.py"` — rebuilds venv automatically|
|`PACKAGES_MISSING: ...`|venv ok but packages missing|Re-run `python "<skill_dir>\setup.py"` — reinstalls missing packages; skips steps already done|
|`MODEL_STATUS=MISSING`|download never run or interrupted|Run `python "<skill_dir>\download_model.py"` — resumes automatically|
|`DEP_CHECK=FAIL` (MISSING)|packages not installed in venv|Re-run `setup.py`|
|`DEP_CHECK=FAIL` (`@commit` WRONG)|PyPI release installed instead of pinned commit|Uninstall optimum-intel + diffusers, reinstall with `--no-cache-dir`|
|`@commit` shows `not installed via git+https`|git was missing when pip ran|Confirm git is installed, re-run `setup.py`|
|`[ERROR] Model incomplete`|Download interrupted mid-file|Re-run `download_model.py` — resumes automatically|
|`[ERROR] state.json not found`|state.json missing|Re-run Step 1|
|`SCRIPTS_STALE=old->new`|`generate_image.py` in `IMAGE_GEN_DIR` is outdated|Auto-run `setup.py` — it redeploys only the script, skips venv/packages/model (fast)|
|`RuntimeError` on GPU|Insufficient VRAM|Lower resolution or hardcode `return "CPU"` in `get_device()`|
|Black noisy output|Too few steps|Use `--steps` ≥ 4; 9 recommended|
|Download timeout|Network issue or proxy needed|Configure proxy and retry — download supports resume|
|`RuntimeError: stack expects each tensor to be equal size`|openvino 2026.1.0 breaks `OVZImagePipeline.forward` — `pooled_projections` tensors have mismatched sequence lengths|Downgrade: `& "<VENV_PY>" -m pip install openvino==2026.0.0 --force-reinstall`. Then re-run Step 2 and Step 3. If re-running `setup.py`, it will now install the pinned version automatically.|
FILE:check_env.py
#!/usr/bin/env python
"""Check runtime environment and model availability for the image-gen skill."""
import json
import os
import string
import subprocess
import sys
from pathlib import Path
from threading import Thread
def find_state() -> dict | None:
"""Locate state.json written by setup.py."""
username = os.environ.get("USERNAME", "user").lower()
for drive in string.ascii_uppercase:
state_file = (
Path(f"{drive}:\\") / f"{username}_openvino" / "imagegen" / "state.json"
)
if state_file.exists():
try:
return json.loads(state_file.read_text(encoding="utf-8"))
except Exception:
pass
return None
def _read_skill_version() -> str:
"""Read SKILL_VERSION from SKILL.md next to this script — single source of truth."""
skill_md = Path(__file__).parent / "SKILL.md"
if skill_md.exists():
for line in skill_md.read_text(encoding="utf-8").splitlines():
if "SKILL_VERSION" in line and "`" in line:
parts = line.split("`")
if len(parts) >= 2:
return parts[1]
return "unknown"
def _read_deployed_version(script_path: Path) -> str | None:
"""Read SKILL_VERSION embedded in the deployed generate_image.py."""
if not script_path.exists():
return None
for line in script_path.read_text(encoding="utf-8", errors="ignore").splitlines():
if line.startswith("SKILL_VERSION") and "=" in line:
return line.split("=")[1].strip().strip("\"'")
return None
def model_layout_ready(model_dir: Path) -> tuple[bool, list[str], float]:
"""Check model completeness by statting key subdirs and .bin files.
Returns (ready, missing_dirs, total_gb).
Each required subdir must exist and contain at least one .bin file >= 1 MB.
Total GB is computed only when all dirs are present (rglob scan).
"""
required = ["transformer", "vae_decoder", "text_encoder"]
missing: list[str] = []
for name in required:
subdir = model_dir / name
if not subdir.is_dir():
missing.append(name)
continue
bins = list(subdir.glob("*.bin"))
if not bins or max(f.stat().st_size for f in bins) < 1024 * 1024:
# directory exists but contains no real weights — treat as missing
missing.append(name)
if missing:
return False, missing, 0.0
total = sum(f.stat().st_size for f in model_dir.rglob("*") if f.is_file()) / 1024**3
return True, [], total
def check_venv_async(venv_py: Path, result_holder: list) -> None:
"""Validate venv: executable + all key packages importable.
Runs in a background thread so model stat and venv check can overlap.
Appends True, False, or a "PACKAGES_MISSING: ..." string to result_holder.
"""
try:
# 1. venv Python itself must be executable
proc = subprocess.run(
[str(venv_py), "--version"],
capture_output=True,
timeout=3,
)
if proc.returncode != 0:
result_holder.append(False)
return
# 2. All runtime packages must be importable
check_script = (
"import openvino; import torch; import PIL; "
"import modelscope; from optimum.intel import OVZImagePipeline"
)
proc2 = subprocess.run(
[str(venv_py), "-c", check_script],
capture_output=True,
timeout=15,
)
if proc2.returncode != 0:
detail = proc2.stderr.decode(errors="replace").strip()
result_holder.append(f"PACKAGES_MISSING: {detail}")
return
result_holder.append(True)
except Exception:
result_holder.append(False)
def main() -> int:
state = find_state()
if not state:
print("STATE=MISSING")
return 1
venv_py = Path(state["VENV_PY"])
imagegen_dir = Path(state["IMAGE_GEN_DIR"])
model_dir = imagegen_dir / "Z-Image-Turbo-int4-ov"
# Start venv check in background; stat model files in main thread — both run in parallel
venv_ok: list = []
venv_thread = Thread(target=check_venv_async, args=(venv_py, venv_ok), daemon=True)
venv_thread.start()
# Model check (main thread — stat only, no imports needed)
model_ready, missing_dirs, total_gb = model_layout_ready(model_dir)
# Wait for venv result (model stat already consumed most of the wait time)
venv_thread.join(timeout=10)
if not venv_ok or venv_ok[0] is False:
print("VENV_PY=BROKEN")
return 1
if isinstance(venv_ok[0], str):
print(venv_ok[0]) # PACKAGES_MISSING: <detail>
return 1
if not model_ready:
print(f"MODEL_STATUS=MISSING missing={missing_dirs}")
return 1
print(f"VENV_PY={venv_py}")
print(f"IMAGE_GEN_DIR={imagegen_dir}")
print(f"MODEL_STATUS=READY ({total_gb:.2f} GB)")
# Script staleness check — compare deployed generate_image.py against SKILL.md
skill_ver = _read_skill_version()
deployed_ver = _read_deployed_version(imagegen_dir / "generate_image.py")
if deployed_ver is None:
print(f"SCRIPTS_STALE=missing->{skill_ver}")
elif deployed_ver != skill_ver:
print(f"SCRIPTS_STALE={deployed_ver}->{skill_ver}")
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:download_model.py
"""
download_model.py — Download Z-Image-Turbo-int4-ov from ModelScope.
Run once from a terminal (NOT inside OpenClaw):
python download_model.py
- Downloads ~10 GB to your local {USERNAME}_openvino\\imagegen\\ directory
- Resume supported: safe to Ctrl+C and re-run, will continue from where it stopped
- Run setup.py first if you haven't already
"""
import io, json, os, string, subprocess, sys
from pathlib import Path
# Force UTF-8 stdout/stderr — prevents UnicodeEncodeError on Chinese Windows (CP936)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace', line_buffering=True)
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace', line_buffering=True)
MODEL_ID = "snake7gun/Z-Image-Turbo-int4-ov"
TOTAL_GB = 10.0
# ── Find state.json ────────────────────────────────────────
def find_state():
for d in string.ascii_uppercase:
sf = Path(f"{d}:\\") / f"{os.environ.get('USERNAME', 'user').lower()}_openvino" / "imagegen" / "state.json"
if sf.exists():
return json.loads(sf.read_text(encoding="utf-8"))
return None
state = find_state()
if not state:
print("[ERROR] state.json not found.")
print(" Please run setup.py first:")
print(f" python \"{Path(__file__).parent / 'setup.py'}\"")
sys.exit(1)
venv_py = Path(state["VENV_PY"])
if not venv_py.exists():
print(f"[ERROR] venv not found at {venv_py}")
print(" Please re-run setup.py.")
sys.exit(1)
# ── Self-relaunch inside venv ──────────────────────────────
# This ensures modelscope and all deps are available.
if Path(sys.executable).resolve() != venv_py.resolve():
print(f"[INFO] Switching to venv python: {venv_py}")
result = subprocess.run([str(venv_py), str(Path(__file__).resolve())])
sys.exit(result.returncode)
# ─────────────────────────────────────────────────────────────
# From here on we are running inside the venv
# ─────────────────────────────────────────────────────────────
import threading, time
from modelscope import snapshot_download
imagegen_dir = Path(state["IMAGE_GEN_DIR"])
model_dir = imagegen_dir / "Z-Image-Turbo-int4-ov"
# ── Check if already complete ──────────────────────────────
def get_size_gb(path):
if not path.exists():
return 0.0
return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) / 1024**3
def model_complete():
required = ["transformer", "vae_decoder", "text_encoder"]
return all((model_dir / r).exists() for r in required)
print("=" * 55)
print(" local-image-generation · Model Download")
print("=" * 55)
print(f"\n Model dir: {model_dir}")
if model_complete():
gb = get_size_gb(model_dir)
print(f"\n Model already complete ({gb:.2f} GB) ✅")
print(" You can use the skill right away.")
sys.exit(0)
existing_gb = get_size_gb(model_dir)
if existing_gb > 0.01:
print(f"\n Resuming download — {existing_gb:.2f} GB already on disk")
else:
print(f"\n Starting fresh download (~{TOTAL_GB} GB)")
print(f"""
Estimated time:
100 Mbps → ~15 min
50 Mbps → ~30 min
10 Mbps → ~2 hr
Progress updates every 30 seconds.
Safe to Ctrl+C and re-run — download will resume.
""")
model_dir.mkdir(parents=True, exist_ok=True)
# ── Download with progress watchdog ───────────────────────
_stop = threading.Event()
t0 = time.time()
def watchdog():
prev = existing_gb * 1024**3
print("[Progress] Download started...", flush=True)
while not _stop.wait(30):
try:
total = sum(f.stat().st_size for f in model_dir.rglob("*") if f.is_file())
now = time.time()
speed = (total - prev) / 30
pct = min(total / (TOTAL_GB * 1024**3) * 100, 99.9)
elapsed = now - t0
eta = (TOTAL_GB * 1024**3 - total) / speed if speed > 0 else 0
print(
f"[Progress] {total/1024**3:.2f}/{TOTAL_GB:.1f} GB "
f"{pct:.1f}% {speed/1024**2:.1f} MB/s "
f"elapsed {int(elapsed//60)}m{int(elapsed%60):02d}s "
f"ETA ~{int(eta//60)}m{int(eta%60):02d}s",
flush=True
)
prev = total
except Exception:
pass
threading.Thread(target=watchdog, daemon=True).start()
try:
snapshot_download(MODEL_ID, local_dir=str(model_dir))
_stop.set()
if model_complete():
gb = get_size_gb(model_dir)
print(f"\n{'='*55}")
print(f" Download complete! ({gb:.2f} GB) ✅")
print(f" You can now use the image generation skill in OpenClaw.")
print(f"{'='*55}")
else:
print("\n[WARN] Download finished but model appears incomplete.")
print(" Re-run this script to resume.")
except KeyboardInterrupt:
_stop.set()
print("\n[INFO] Download interrupted.")
print(" Re-run this script to continue from where it stopped.")
except Exception as e:
_stop.set()
err = str(e).lower()
if any(x in err for x in ["disk", "space"]):
print(f"\n[ERROR] Disk full: {e}")
print(" Free up space and re-run.")
elif any(x in err for x in ["timeout", "connection", "network"]):
print(f"\n[ERROR] Network error: {e}")
print(" Check your connection and re-run.")
print(f" Manual download: https://modelscope.cn/models/{MODEL_ID}/files")
print(f" Place files under: {model_dir}")
print(f" Required subdirs: transformer/ vae_decoder/ text_encoder/")
else:
print(f"\n[ERROR] {e}")
print(" Re-run to retry.")
print(f" Manual download: https://modelscope.cn/models/{MODEL_ID}/files")
print(f" Place files under: {model_dir}")
FILE:requirements_imagegen.txt
--extra-index-url https://download.pytorch.org/whl/cpu
# OpenVINO 推理引擎 — pinned: 2026.1.0 breaks pooled_projections stack in OVZImagePipeline
openvino==2026.0.0
# PyTorch CPU(OVZImagePipeline 内部依赖)
torch==2.8
torchvision==0.23.0
# Optimum-Intel 指定版本(含 OVZImagePipeline,Z-Image-Turbo 必须)
git+https://github.com/openvino-dev-samples/optimum-intel.git@2f62e5aee74b4acba3836e1f26678c0db0a09c00
# Diffusers 指定版本(含 Z-Image-Turbo 调度器支持)
git+https://github.com/huggingface/diffusers.git@a1f36ee3ef4ae1bf98bd260e539197259aa981c1
# 模型下载
modelscope
# 图像保存
Pillow
# 模型加载基础库
transformers
accelerate
huggingface_hub
# 数值计算(Pillow / transformers 约束)
numpy<2.0
FILE:setup.py
"""
setup.py — One-time environment setup for local-image-generation skill.
Run once from a terminal:
python setup.py
What this does:
1. Creates a shared Python venv under {USERNAME}_openvino\venv\
2. Checks git is available (required for pinned git+https dependencies)
3. Installs all required packages into the venv
4. Writes state.json so the skill knows where everything is
5. Deploys generate_image.py to IMAGE_GEN_DIR (versioned, idempotent)
After this, run:
python download_model.py
"""
import io, json, os, shutil, string, subprocess, sys
from pathlib import Path
# Force UTF-8 stdout/stderr — prevents UnicodeEncodeError on Chinese Windows (CP936)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace', line_buffering=True)
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace', line_buffering=True)
REQUIREMENTS_FILE = "requirements_imagegen.txt"
PACKAGES_FALLBACK = [
"openvino==2026.0.0", # pinned: 2026.1.0 breaks OVZImagePipeline pooled_projections
"torch>=2.1.0",
"Pillow>=10.0.0",
"modelscope>=1.14.0",
"git+https://github.com/huggingface/optimum-intel.git@2f62e5ae#egg=optimum-intel[openvino]",
"git+https://github.com/huggingface/diffusers.git@a1f36ee3",
]
def _read_skill_version() -> str:
"""Read SKILL_VERSION from SKILL.md next to this script — single source of truth."""
skill_md = Path(__file__).parent / "SKILL.md"
if skill_md.exists():
for line in skill_md.read_text(encoding="utf-8").splitlines():
if "SKILL_VERSION" in line and "`" in line:
parts = line.split("`")
if len(parts) >= 2:
return parts[1]
return "unknown"
# ── Banner ─────────────────────────────────────────────────
print("=" * 55)
print(" local-image-generation · Environment Setup")
print("=" * 55)
# ── Check Python version ───────────────────────────────────
vi = sys.version_info
if vi < (3, 10):
print(f"\n[ERROR] Python {vi.major}.{vi.minor} detected — need >= 3.10")
print(" Download: https://www.python.org/ftp/python/3.12.10/python-3.12.10-amd64.exe")
sys.exit(1)
print(f"\n Python {vi.major}.{vi.minor}.{vi.micro} OK ✅")
# ── Check git ──────────────────────────────────────────────
r = subprocess.run(["git", "--version"], capture_output=True)
if r.returncode != 0:
print("\n[ERROR] git not found — required for pinned git+https dependencies")
print(" Download: https://git-scm.com/download/win")
sys.exit(1)
print(f" {r.stdout.decode().strip()} OK ✅")
# ── Locate root directory ──────────────────────────────────
username = os.environ.get("USERNAME", "user").lower()
root_name = f"{username}_openvino"
drives = [f"{d}:\\" for d in string.ascii_uppercase if Path(f"{d}:\\").exists()]
root = next(
(Path(d) / root_name for d in drives if (Path(d) / root_name).exists()),
None
)
if not root:
best = max(drives, key=lambda d: shutil.disk_usage(d).free)
root = Path(best) / root_name
imagegen_dir = root / "imagegen"
venv_dir = root / "venv"
venv_py = venv_dir / "Scripts" / "python.exe"
root.mkdir(parents=True, exist_ok=True)
imagegen_dir.mkdir(parents=True, exist_ok=True)
print(f"\n Root: {root}")
print(f" Imagegen: {imagegen_dir}")
print(f" Venv: {venv_dir}")
# ── Create or validate venv ────────────────────────────────
print("\n[1/3] Checking venv...")
venv_ok = False
if venv_py.exists():
try:
r = subprocess.run([str(venv_py), "--version"], capture_output=True, timeout=10)
if r.returncode == 0:
print(f" Existing venv OK: {r.stdout.decode().strip()}")
venv_ok = True
except Exception:
pass
if not venv_ok:
if venv_dir.exists():
print(" Existing venv is broken — rebuilding...")
shutil.rmtree(venv_dir, ignore_errors=True)
print(" Creating venv...")
subprocess.run([sys.executable, "-m", "venv", str(venv_dir)], check=True)
venv_py = venv_dir / "Scripts" / "python.exe"
r = subprocess.run([str(venv_py), "--version"], capture_output=True)
print(f" Venv created: {r.stdout.decode().strip()} ✅")
def venv_run(args, **kw):
return subprocess.run([str(venv_py)] + args, **kw)
# ── Upgrade pip ────────────────────────────────────────────
print("\n[2/3] Upgrading pip...")
venv_run(["-m", "pip", "install", "--upgrade", "pip", "--quiet"], check=True)
print(" pip upgraded ✅")
# ── Install packages ───────────────────────────────────────
print("\n[3/3] Installing packages (this may take ~5 min)...")
req_file = Path(__file__).parent / REQUIREMENTS_FILE
if req_file.exists():
print(f" Using {req_file}")
venv_run(["-m", "pip", "install", "-r", str(req_file)], check=True)
else:
print(f" {REQUIREMENTS_FILE} not found — installing fallback list")
venv_run(["-m", "pip", "install"] + PACKAGES_FALLBACK, check=True)
print(" Packages installed ✅")
# ── Write state.json ───────────────────────────────────────
state = {
"ROOT": str(root),
"IMAGE_GEN_DIR": str(imagegen_dir),
"VENV_DIR": str(venv_dir),
"VENV_PY": str(venv_py),
"VENV_EXISTS": True,
"SKILL_DIR": str(Path(__file__).parent.resolve()),
}
state_file = imagegen_dir / "state.json"
state_file.write_text(json.dumps(state, indent=2), encoding="utf-8")
print(f"\n state.json written: {state_file} ✅")
# ── Create outputs dir ─────────────────────────────────────
(imagegen_dir / "outputs").mkdir(exist_ok=True)
# ── Verify ─────────────────────────────────────────────────
print("\n[Verify] Checking installation...")
verify_script = """
results = {}
for pkg, imp in [
("openvino", "openvino"),
("torch", "torch"),
("Pillow", "PIL"),
("modelscope", "modelscope"),
]:
try:
ver = getattr(__import__(imp), "__version__", "OK")
results[pkg] = ("OK", ver)
except ImportError as e:
results[pkg] = ("FAIL", str(e))
try:
from optimum.intel import OVZImagePipeline
results["OVZImagePipeline"] = ("OK", "importable")
except ImportError as e:
results["OVZImagePipeline"] = ("FAIL", str(e))
fail = [k for k, (s, _) in results.items() if s == "FAIL"]
for k, (s, d) in results.items():
icon = "OK " if s == "OK" else "FAIL"
print(f" [{icon}] {k}: {d}")
print()
print("VERIFY=PASS" if not fail else f"VERIFY=FAIL {fail}")
"""
venv_run(["-c", verify_script])
# ── Deploy generate_image.py ───────────────────────────────
print("\n[Deploy] Deploying generate_image.py...")
SCRIPT_VERSION = _read_skill_version()
script_path = imagegen_dir / "generate_image.py"
existing_ver = None
if script_path.exists():
for line in script_path.read_text(encoding="utf-8", errors="ignore").splitlines():
if line.startswith("SKILL_VERSION") and "=" in line:
existing_ver = line.split("=")[1].strip().strip("\"'")
break
if existing_ver == SCRIPT_VERSION:
print(f" generate_image.py already at {SCRIPT_VERSION} [OK]")
else:
generate_image_code = r'''SKILL_VERSION = "__VER__"
import sys, io, os, json, string, argparse, re, subprocess
from datetime import datetime
from pathlib import Path
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace", line_buffering=True)
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace", line_buffering=True)
def get_state():
for d in string.ascii_uppercase:
sf = Path(f"{d}:\\") / f"{os.environ.get('USERNAME','user').lower()}_openvino" / "imagegen" / "state.json"
if sf.exists():
return json.loads(sf.read_text(encoding="utf-8"))
return None
def get_device():
import openvino as ov
core = ov.Core()
devs = core.available_devices
print(f"[INFO] Available devices: {devs}")
for d in devs:
if "GPU" in d:
print(f"[INFO] Using Intel GPU: {d}")
return d
print("[INFO] Using CPU")
return "CPU"
def make_filename(topic, prompt):
date_str = datetime.now().strftime("%Y%m%d_%H%M%S")
src = topic if topic else prompt[:30]
safe = re.sub(r"[^\w]", "_", src.strip())[:30].strip("_")
return f"{date_str}_{safe}.png"
def generate(prompt, topic="", steps=9, width=512, height=512, seed=42, output_path=None):
state = get_state()
if not state:
print("[ERROR] state.json not found -- run setup.py")
sys.exit(1)
imagegen_dir = Path(state["IMAGE_GEN_DIR"])
model_dir = imagegen_dir / "Z-Image-Turbo-int4-ov"
out_dir = imagegen_dir / "outputs"
out_dir.mkdir(parents=True, exist_ok=True)
required = ["transformer", "vae_decoder", "text_encoder"]
missing = [r for r in required if not (model_dir / r).exists()]
if missing:
print(f"[ERROR] Model incomplete: {missing} -- run download_model.py")
sys.exit(1)
device = get_device()
print(f"[INFO] Loading model: {model_dir}")
import torch
from optimum.intel import OVZImagePipeline
pipe = OVZImagePipeline.from_pretrained(str(model_dir), device=device)
print("[INFO] Model loaded")
gen = torch.Generator("cpu").manual_seed(seed) if seed >= 0 else None
print(f"[INFO] Inference: steps={steps}, {width}x{height}, seed={seed}")
image = pipe(
prompt=prompt, height=height, width=width,
num_inference_steps=steps, guidance_scale=0.0, generator=gen
).images[0]
if output_path is None:
output_path = str(out_dir / make_filename(topic, prompt))
image.save(output_path)
print(f"[SUCCESS] {output_path}")
try:
subprocess.Popen(["explorer", output_path])
except Exception as e:
print(f"[WARN] Could not open image: {e}")
return output_path
if __name__ == "__main__":
try:
p = argparse.ArgumentParser()
p.add_argument("--prompt", required=True)
p.add_argument("--topic", default="")
p.add_argument("--steps", type=int, default=9)
p.add_argument("--width", type=int, default=512)
p.add_argument("--height", type=int, default=512)
p.add_argument("--seed", type=int, default=42)
p.add_argument("--output", default=None)
args = p.parse_args()
generate(args.prompt, args.topic, args.steps, args.width, args.height, args.seed, args.output)
sys.stdout.flush()
except Exception as e:
import traceback
print(f"[FATAL] {type(e).__name__}: {e}", flush=True)
traceback.print_exc()
sys.exit(1)
'''
code = generate_image_code.replace('SKILL_VERSION = "__VER__"', f'SKILL_VERSION = "{SCRIPT_VERSION}"', 1)
script_path.write_text(code.strip(), encoding="utf-8")
print(f" generate_image.py deployed at {SCRIPT_VERSION} [OK]")
# ── Done ───────────────────────────────────────────────────
print()
print("=" * 55)
print(" Setup complete!")
print()
print(" Next step — download the model (~10 GB):")
print(f" python \"{Path(__file__).parent / 'download_model.py'}\"")
print("=" * 55)