@clawhub-yqxu-ec6d02c7f3
Use when another skill returns Feishu display payloads or `messageToolCalls`. This skill teaches the agent to use OpenClaw `message` tool to send Feishu card...
---
name: feishu-card-display
version: 1.0.0
description: >-
Use when another skill returns Feishu display payloads or `messageToolCalls`.
This skill teaches the agent to use OpenClaw `message` tool to send Feishu
cards, text, and media in the current conversation, then reply with NO_REPLY.
homepage: https://github.com/AIDiyTeams/claw-skill/tree/main/feishu-card-display
---
# Feishu Card Display
Use this skill when another tool returns:
- `displaySkill: "feishu-card-display"`
- `messageToolCalls`
- Feishu `card` payloads
This skill has **no Python tools**. It only tells the agent how to use the
existing OpenClaw `message` tool correctly.
## Core Rule
If upstream output says `displaySkill: "feishu-card-display"`, you MUST:
1. Execute each returned `messageToolCalls` item **in order**
2. Use the OpenClaw `message` tool for every send
3. Preserve the payload exactly as returned
4. After all sends finish, reply with **`NO_REPLY`**
Do not:
- merge multiple product cards into one assistant bubble
- rewrite the text payload
- summarize what the cards contain
- explain the payload structure to the user
## Feishu Sending Rules
### Card payload
When a call contains:
```json
{
"action": "send",
"channel": "feishu",
"card": { "...": "..." }
}
```
send it with the `message` tool as a Feishu card send. Keep the `card` object unchanged.
### Text payload
When a call contains:
```json
{
"action": "send",
"channel": "feishu",
"message": "..."
}
```
send the text exactly as provided.
### Media payload
When a call contains:
```json
{
"action": "send",
"channel": "feishu",
"filePath": "/path/to/file",
"message": ""
}
```
send the file as media using the `message` tool, then continue with later calls.
## Image Key Notes
If upstream payload already contains Feishu-ready card payloads or image keys, use them as-is.
If upstream payload only contains preview links, keep the preview links as-is.
This skill does **not** invent image keys and does **not** rewrite card media.
## Final Reply
After all upstream `messageToolCalls` are executed, the assistant must reply with:
```text
NO_REPLY
```
Version Marker: [email protected]
Browse and create custom gifts — personalized bags, mugs, phone cases, apparel and more. Upload any image to generate an AI-powered product mockup. Includes...
---
name: custom-gift-leewow
description: >-
Browse and create custom gifts — personalized bags, mugs, phone cases,
apparel and more. Upload any image to generate an AI-powered product mockup.
Includes tools: browse_templates (discover products), generate_preview
(create a design from an image), and get_status (check generation progress).
Automatically downloads preview images to workspace for display.
Powered by Leewow. Requires CLAW_SK.
---
# Custom Gift — Leewow
Create personalized gifts and custom products powered by AI. This skill provides:
| Tool | Purpose |
|------|---------|
| `browse_templates` | Discover customizable product templates (bags, accessories, home decor, apparel, etc.) |
| `generate_preview` | Upload a design image and trigger AI generation |
| `get_status` | Check generation status and download preview image |
## When to Use
- User wants to **send a gift** or **create something personalized**
- User says "browse products", "show me what I can customize", "gift ideas"
- User provides an **image** and wants to turn it into a product
- User says "make this into a mug/bag/shirt", "customize this design"
## CRITICAL: Images Must Be Sent via Media, NOT Markdown
Feishu / chat platforms **cannot render** `` markdown.
To show images to the user you **MUST send them as media attachments**
using whatever message/media mechanism your platform provides
(e.g. the `message` tool with a `media` parameter on OpenClaw/Feishu).
Tool outputs include a `localImagePath` (workspace file) for every image.
Use that path as the media attachment. **Showing images is mandatory, not optional.**
## Generator Output Format (MUST FOLLOW)
This skill uses a **two-step generator pattern**.
### Step 1: Browse — Show Templates
After calling `browse_templates`, the tool returns JSON with a list of templates.
Each template has `localImagePath` (cover image downloaded to workspace).
For **every** template you MUST:
1. **Send the cover image as a media attachment** using the `localImagePath`
2. Include a text caption with: name, price, templateId
Example message flow:
```
[send image: /Users/.../.openclaw/workspace/template_images/template_3_xxx.jpg]
1. 男士卫衣 (Men's Hoodie) — 💰 $29.9 USD — 模板ID: 3
[send image: /Users/.../.openclaw/workspace/template_images/template_12_xxx.jpg]
2. 帆布手提袋 (Canvas Tote Bag) — 💰 $19.9 USD — 模板ID: 12
...
告诉我你想选哪个,我来帮你生成效果图!
```
**Rules for Step 1:**
- MUST send each template's cover image as media — do NOT skip images
- MUST include price and templateId in text
- Images are at `localImagePath` in the JSON — already in workspace
- If sending all images at once is supported, do that; otherwise send one by one
### Step 2: Generation Complete — Show Preview + Purchase Link
After `get_generation_status` returns COMPLETED, the JSON contains:
- `localImagePath` — preview image in workspace
- `purchaseUrl` — signed purchase/order page link
You MUST:
1. **Send the preview image as a media attachment** using `localImagePath`
2. **Send the purchase link** in the text message
Example:
```
[send image: /Users/.../.openclaw/workspace/previews/leewow_preview_task_xxx.jpg]
你的定制效果图出来啦 🎉
🛒 点击下单购买: https://leewow.com/h5/preview?taskId=xxx&skid=...&sig=...
喜欢吗?如果想调整或者试试其他产品,告诉我!
```
**Rules for Step 2:**
- MUST send the preview image as media — this is the whole point
- MUST include the purchase link (it's pre-signed with skid/sig)
- Do NOT just describe the product in text — the user needs to SEE the image
### Common Mistakes to AVOID
❌ Using `` markdown — Feishu can't render local paths this way
❌ Just saying "完成啦!" and describing the product in text without sending the image
❌ Omitting the purchase/order link
❌ Sending a table/list instead of actual images
❌ Saying "图片已下载到本地" without actually sending the image to the user
## Prerequisites
- `CLAW_SK` — Leewow Secret Key (format: `sk-leewow-{keyId}-{secret}`)
- `CLAW_BASE_URL` — API base URL (default: `https://leewow.com`)
- `CLAW_PATH_PREFIX` — Path prefix (default: `/v2` for leewow.com)
- `LEEWOW_API_BASE` — Base URL for COS STS credentials (default: `https://leewow.com`)
- Python 3.10+ with `requests` and `cos-python-sdk-v5`
## Configuration
Environment variables are loaded from `~/.openclaw/.env`:
```bash
CLAW_SK=sk-leewow-xxxx-xxxx
CLAW_BASE_URL=https://leewow.com
CLAW_PATH_PREFIX=/v2
LEEWOW_API_BASE=https://leewow.com
```
## Image Requirements (IMPORTANT)
### For Input Images (User Upload)
- **Must be in workspace directory**: `~/.openclaw/workspace/`
- Supported formats: JPG, PNG, WebP
- Recommended: Clear, well-lit images for best results
### For Preview Images (Generated Output)
- Automatically saved to: `~/.openclaw/workspace/previews/`
- Filename format: `leewow_preview_{taskId}.{ext}`
- The agent can directly display these images to users
### COS Presigned URLs
For private COS buckets, you may need to generate **presigned URLs** for accessing images:
```bash
# Generate presigned URL for a COS image
python3 scripts/cos_presign.py "https://bucket.cos.region.myqcloud.com/key.png" --json
# With custom expiration (e.g., 1 hour = 3600 seconds)
python3 scripts/cos_presign.py "COS_URL" --expired 3600
# Use with get_status to get presigned preview URL
python3 scripts/get_status.py {taskId} --presign --json
```
**Note**: Most Leewow COS buckets are public, so presigned URLs are optional.
## Typical Flow (Generator Pattern)
1. **Browse (Step 1)** — Call `browse_templates` → get JSON with localImagePath for each template → **send each cover image as media** + text caption (name, price, ID) → ask user to pick
2. **Upload** — User provides an image (must be in workspace `~/.openclaw/workspace/`)
3. **Generate** — Call `generate_preview` → get taskId → immediately proceed to step 4
4. **Poll** — Call `get_generation_status` with `poll=true` → wait for COMPLETED
5. **Display (Step 2)** — **Send preview image as media** (`localImagePath`) + text with PURCHASE LINK (`purchaseUrl`)
## Tool Reference
### browse_templates
Browse available product templates.
```bash
python3 scripts/browse.py --count 3 --json
```
Options:
- `--category`: Filter by category (bag, accessory, home, apparel)
- `--count`: Number of results (1-5, default 3)
- `--json`: Output JSON format (includes image URLs)
### generate_preview
Upload image and trigger generation.
```bash
python3 scripts/generate.py --image-path ./workspace/my_design.png --template-id 3 --json
```
Options:
- `--image-path`: **Required**. Path to design image (must be in workspace)
- `--template-id`: **Required**. Product template ID from browse_templates
- `--design-theme`: Optional style description
- `--aspect-ratio`: Image ratio (3:4, 1:1, 4:3, default 3:4)
- `--json`: Output JSON format
**Returns**: Task ID for status polling. Generation is async (~30-60s).
### get_status
Check generation status and download preview image.
```bash
python3 scripts/get_status.py {taskId} --poll
```
Options:
- `task_id`: Task ID from generate_preview
- `--poll`: Wait until generation completes
- `--timeout`: Poll timeout in seconds (default 120)
- `--no-download`: Skip downloading preview image
- `--json`: Output JSON format
**Returns**: Generation status and local image path (if completed).
## Safety Rules
- Never expose or log the `CLAW_SK` value. When confirming configuration, only show the last 4 characters.
- Input images **must** be in workspace directory for the agent to access them
- Preview images are automatically saved to `workspace/previews/`
- Limit browse results to 20 templates maximum per request
## Examples
```text
User: "I want to make a custom gift for my friend"
→ browse_templates → for each template: send cover image as media + text caption
→ user picks → generate_preview → get_generation_status --poll
→ send preview image as media + purchaseUrl in text
User: "Turn this photo into a phone case"
→ browse_templates --category phone → send images as media → user picks
→ generate_preview → get_generation_status --poll
→ send preview image as media + purchaseUrl in text
User: "Show me what products I can customize"
→ browse_templates → send ALL template images as media with captions
```
## Output Structure
### browse_templates
```json
[
{
"index": 1,
"templateId": 3,
"name": "Men's Hoodie",
"price": "**$29.9 USD**",
"localImagePath": "/Users/.../.openclaw/workspace/template_images/template_3_xxx.jpg",
"remoteImageUrl": "https://...",
"skuType": "hoodie"
}
]
```
→ Agent sends `localImagePath` as media attachment + text caption per template.
### generate_preview --json
```json
{
"taskId": "task_xxx",
"status": "PENDING",
"estimatedSeconds": 45,
"templateId": 3,
"_success": true
}
```
### get_status --json (completed)
```json
{
"taskId": "task_xxx",
"status": "COMPLETED",
"purchaseUrl": "https://leewow.com/h5/preview?taskId=xxx&skid=...&sig=...",
"localImagePath": "/Users/.../.openclaw/workspace/previews/leewow_preview_task_xxx.jpg"
}
```
→ Agent sends `localImagePath` as media attachment + `purchaseUrl` in text.
FILE:scripts/browse.py
#!/usr/bin/env python3
"""Browse Leewow customizable product templates, output Markdown cards.
Cover images are downloaded to workspace so the agent can display them
in chat platforms that only support local file references.
"""
import argparse
import hashlib
import json
import os
import sys
from urllib.parse import urlparse
# Load environment variables from ~/.openclaw/.env
def _load_env_file():
env_path = os.path.expanduser("~/.openclaw/.env")
if os.path.exists(env_path):
with open(env_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
if key not in os.environ:
os.environ[key] = value
_load_env_file()
import requests
from claw_auth import claw_get
CLAW_BASE_URL = os.getenv("CLAW_BASE_URL", "https://leewow.com")
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
CLAW_SK = os.getenv("CLAW_SK", "")
WORKSPACE_DIR = os.path.expanduser("~/.openclaw/workspace")
TEMPLATE_IMG_DIR = os.path.join(WORKSPACE_DIR, "template_images")
def _download_cover_image(remote_url: str, template_id) -> str | None:
"""Download template cover image to workspace and return local path.
Uses a content-hash filename so repeated calls are instant (cache hit).
Returns None on failure — caller should fall back to remote URL.
"""
if not remote_url:
return None
try:
os.makedirs(TEMPLATE_IMG_DIR, exist_ok=True)
url_hash = hashlib.md5(remote_url.encode()).hexdigest()[:10]
parsed = urlparse(remote_url)
ext = os.path.splitext(parsed.path)[1] or ".jpg"
filename = f"template_{template_id}_{url_hash}{ext}"
filepath = os.path.join(TEMPLATE_IMG_DIR, filename)
if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
return filepath
resp = requests.get(remote_url, timeout=15)
resp.raise_for_status()
with open(filepath, "wb") as f:
f.write(resp.content)
return filepath
except Exception as e:
print(f"Warning: failed to download cover image: {e}", file=sys.stderr)
return None
def browse_templates(category: str = None, count: int = 10) -> list:
"""Return templates as a list of dicts, each with localImagePath for media sending."""
if not CLAW_SK:
return [{"error": "CLAW_SK environment variable is not set."}]
count = min(max(count, 1), 20)
url = f"{CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/templates"
try:
resp = claw_get(CLAW_SK, url, timeout=15)
data = resp.json()
except Exception as e:
return [{"error": f"Failed to fetch templates: {e}"}]
if data.get("code") != 0:
return [{"error": f"API returned: {data.get('message', 'Unknown error')}"}]
templates = data.get("data", [])
if category:
cat_lower = category.lower()
templates = [
t for t in templates
if cat_lower in (t.get("name", "") + t.get("description", "")).lower()
]
templates = templates[:count]
results = []
for i, t in enumerate(templates, 1):
tid = t.get("templateId", "?")
name = t.get("name", "Unnamed Product")
cover = t.get("coverImage", "")
desc = t.get("description", "")
sku_type = t.get("skuType", "")
shipping = t.get("shippingOrigin", "CN")
price_display = _extract_price(t.get("skuConfigs"))
local_cover = _download_cover_image(cover, tid)
results.append({
"index": i,
"templateId": tid,
"name": name,
"description": desc,
"skuType": sku_type,
"shippingOrigin": shipping,
"price": price_display,
"localImagePath": local_cover,
})
return results
def _extract_price(sku_configs) -> str:
if not sku_configs:
return ""
try:
skus = json.loads(sku_configs) if isinstance(sku_configs, str) else sku_configs
if isinstance(skus, list) and skus:
first = skus[0]
price = first.get("priceOnSell") or first.get("price")
origin = first.get("originPrice")
currency = first.get("currency", "USD")
if price:
s = f"**price {currency}**"
if origin and float(origin) > float(price):
s += f" ~~origin~~"
return s
except (json.JSONDecodeError, TypeError, ValueError):
pass
return ""
def browse_templates_json(category: str = None, count: int = 10) -> list:
"""Return templates as JSON-serializable list."""
if not CLAW_SK:
return []
count = min(max(count, 1), 20)
url = f"{CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/templates"
try:
resp = claw_get(CLAW_SK, url, timeout=15)
data = resp.json()
except Exception:
return []
if data.get("code") != 0:
return []
templates = data.get("data", [])
if category:
cat_lower = category.lower()
templates = [
t for t in templates
if cat_lower in (t.get("name", "") + t.get("description", "")).lower()
]
return templates[:count]
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--category", type=str, default=None)
parser.add_argument("--count", type=int, default=10)
parser.add_argument("--json", action="store_true", help="(kept for compat, always outputs JSON)")
args = parser.parse_args()
templates = browse_templates(category=args.category, count=args.count)
print(json.dumps(templates, ensure_ascii=False, indent=2))
FILE:scripts/claw_auth.py
"""Claw API HMAC-SHA256 signing utility.
Supports CLAW_PATH_PREFIX env var (e.g. "/v2") for proxied environments:
- Request URL: {CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/templates
- Sign path: /claw/templates (what Java actually receives)
"""
import hashlib
import hmac
import os
import time
import uuid
from urllib.parse import urlparse
import requests
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
def _parse_key_id(sk: str) -> str:
parts = sk.split("-")
if len(parts) < 4 or parts[0] != "sk" or parts[1] != "leewow":
raise ValueError("Invalid SK format. Expected: sk-leewow-{keyId}-{secret}")
return parts[2]
def _compute_body_hash(body: bytes) -> str:
return hashlib.sha256(body).hexdigest()
def _compute_signature(sk: str, payload: str) -> str:
return hmac.new(
sk.encode("utf-8"),
payload.encode("utf-8"),
hashlib.sha256,
).hexdigest()
def _strip_prefix(path: str) -> str:
"""Strip CLAW_PATH_PREFIX from path for signing (Java receives path without prefix)."""
if CLAW_PATH_PREFIX and path.startswith(CLAW_PATH_PREFIX):
return path[len(CLAW_PATH_PREFIX):]
return path
def build_claw_headers(sk: str, method: str, url: str, body: bytes = b"") -> dict:
key_id = _parse_key_id(sk)
timestamp = str(int(time.time()))
nonce = uuid.uuid4().hex[:16]
raw_path = urlparse(url).path
sign_path = _strip_prefix(raw_path)
body_hash = _compute_body_hash(body)
sign_payload = f"{key_id}\n{timestamp}\n{nonce}\n{method}\n{sign_path}\n{body_hash}"
signature = _compute_signature(sk, sign_payload)
return {
"X-Claw-KeyId": key_id,
"X-Claw-Timestamp": timestamp,
"X-Claw-Nonce": nonce,
"X-Claw-Signature": signature,
}
def sign_url(sk: str, url: str) -> str:
"""Append skid / ts / nonce / sig query parameters to a URL.
The preview / purchase page reads these params, sends them to the backend
which verifies the HMAC and exchanges them for a session JWT.
Signing payload is identical to the header-based scheme (method=GET, empty body).
"""
from urllib.parse import urlencode, urlparse, urlunparse, parse_qs, urljoin
key_id = _parse_key_id(sk)
timestamp = str(int(time.time()))
nonce = uuid.uuid4().hex[:16]
parsed = urlparse(url)
sign_path = _strip_prefix(parsed.path)
body_hash = _compute_body_hash(b"")
sign_payload = f"{key_id}\n{timestamp}\n{nonce}\nGET\n{sign_path}\n{body_hash}"
signature = _compute_signature(sk, sign_payload)
sep = "&" if parsed.query else ""
new_query = (
f"{parsed.query}{sep}"
f"skid={key_id}&ts={timestamp}&nonce={nonce}&sig={signature}"
)
return urlunparse(parsed._replace(query=new_query))
def claw_get(sk: str, url: str, **kwargs) -> requests.Response:
headers = build_claw_headers(sk, "GET", url)
headers.update(kwargs.pop("headers", {}))
return requests.get(url, headers=headers, **kwargs)
def claw_post(sk: str, url: str, json_data: dict = None, **kwargs) -> requests.Response:
import json as json_module
body = json_module.dumps(json_data).encode("utf-8") if json_data else b""
headers = build_claw_headers(sk, "POST", url, body)
headers["Content-Type"] = "application/json"
headers.update(kwargs.pop("headers", {}))
return requests.post(url, data=body, headers=headers, **kwargs)
FILE:scripts/cos_presign.py
#!/usr/bin/env python3
"""Generate COS presigned URLs for accessing private bucket objects.
腾讯云 COS 预签名 URL 生成工具
用于为私有 bucket 中的图片生成带签名的临时访问 URL
"""
import os
import sys
from urllib.parse import urlparse
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cos_uploader import _fetch_sts_credentials, _get_cos_client
from qcloud_cos.cos_exception import CosServiceError
def extract_cos_key_from_url(cos_url: str) -> tuple:
"""Extract bucket, region, and key from COS URL.
URL format: https://{bucket}.cos.{region}.myqcloud.com/{key}
"""
parsed = urlparse(cos_url)
host = parsed.netloc
# Parse bucket.cos.region.myqcloud.com
parts = host.split('.')
if len(parts) >= 5 and parts[1] == 'cos':
bucket = parts[0]
region = parts[2]
else:
raise ValueError(f"Invalid COS URL format: {cos_url}")
key = parsed.path.lstrip('/')
return bucket, region, key
def generate_presigned_url(cos_url: str, expired: int = 3600) -> str:
"""Generate presigned URL for COS object.
Args:
cos_url: Original COS URL (e.g., https://bucket.cos.region.myqcloud.com/key)
expired: URL expiration time in seconds (default: 3600 = 1 hour)
Returns:
Presigned URL with query parameters
"""
bucket, region, key = extract_cos_key_from_url(cos_url)
# Get COS client with STS credentials
client, _, _ = _get_cos_client()
# Generate presigned URL
presigned_url = client.get_presigned_url(
Method='GET',
Bucket=bucket,
Key=key,
Expired=expired
)
return presigned_url
def generate_presigned_url_with_custom_domain(cos_url: str, custom_domain: str = None, expired: int = 3600) -> str:
"""Generate presigned URL with optional custom CDN domain.
If custom_domain is provided, the signature will be generated for the COS URL
but the returned URL will use the custom domain (for CDN acceleration).
"""
presigned = generate_presigned_url(cos_url, expired)
if custom_domain:
# Replace the COS domain with custom CDN domain
parsed = urlparse(cos_url)
original_host = parsed.netloc
presigned_parsed = urlparse(presigned)
# Build new URL with custom domain but keep query params (signature)
new_url = f"https://{custom_domain}{parsed.path}?{presigned_parsed.query}"
return new_url
return presigned
def batch_generate_presigned_urls(cos_urls: list, expired: int = 3600) -> dict:
"""Generate presigned URLs for multiple COS objects.
Args:
cos_urls: List of COS URLs
expired: URL expiration time in seconds
Returns:
Dict mapping original URLs to presigned URLs
"""
results = {}
for url in cos_urls:
try:
presigned = generate_presigned_url(url, expired)
results[url] = {
"success": True,
"presigned_url": presigned,
"expired": expired
}
except Exception as e:
results[url] = {
"success": False,
"error": str(e)
}
return results
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="Generate COS presigned URLs")
parser.add_argument("url", help="COS URL to sign")
parser.add_argument("--expired", type=int, default=3600, help="Expiration time in seconds (default: 3600)")
parser.add_argument("--cdn-domain", type=str, default=None, help="Custom CDN domain (optional)")
parser.add_argument("--json", action="store_true", help="Output JSON format")
args = parser.parse_args()
try:
if args.cdn_domain:
result = generate_presigned_url_with_custom_domain(args.url, args.cdn_domain, args.expired)
else:
result = generate_presigned_url(args.url, args.expired)
if args.json:
print(json.dumps({
"original_url": args.url,
"presigned_url": result,
"expired": args.expired
}, indent=2))
else:
print(result)
except Exception as e:
if args.json:
print(json.dumps({"error": str(e)}))
else:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
FILE:scripts/cos_uploader.py
"""COS upload via STS temporary credentials from Leewow backend."""
import os
import time
import uuid
from typing import Optional
import requests
from qcloud_cos import CosConfig, CosS3Client
LEEWOW_API_BASE = os.getenv("LEEWOW_API_BASE", "https://leewow.com")
STS_ENDPOINT = "/v2/api/public/cos/sts/credentials"
_sts_cache: Optional[dict] = None
_sts_expires_at: float = 0
_STS_REFRESH_BUFFER_S = 120
def _fetch_sts_credentials() -> dict:
global _sts_cache, _sts_expires_at
now = time.time()
if _sts_cache and now < _sts_expires_at:
return _sts_cache
url = LEEWOW_API_BASE + STS_ENDPOINT
resp = requests.get(url, timeout=15)
resp.raise_for_status()
data = resp.json()
if not data.get("tmpSecretId"):
raise RuntimeError(f"Invalid STS response: {data}")
_sts_cache = data
_sts_expires_at = int(data.get("expiredTime", 0)) - _STS_REFRESH_BUFFER_S
return data
def _get_cos_client() -> tuple:
sts = _fetch_sts_credentials()
config = CosConfig(
Region=sts["region"],
SecretId=sts["tmpSecretId"],
SecretKey=sts["tmpSecretKey"],
Token=sts["sessionToken"],
)
return CosS3Client(config), sts["bucket"], sts["region"]
def upload_file_to_cos(file_path: str, key_prefix: str = "claw-upload") -> str:
ext = os.path.splitext(file_path)[1] or ".jpg"
key = f"{key_prefix}/{uuid.uuid4().hex}{ext}"
client, bucket, region = _get_cos_client()
content_type = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif", ".webp": "image/webp",
}.get(ext.lower(), "application/octet-stream")
with open(file_path, "rb") as fp:
client.put_object(Bucket=bucket, Body=fp, Key=key, ContentType=content_type)
return f"https://{bucket}.cos.{region}.myqcloud.com/{key}"
FILE:scripts/generate.py
#!/usr/bin/env python3
"""Upload image to COS, call /claw/generate, output Markdown preview card."""
import argparse
import os
import sys
# Load environment variables from ~/.openclaw/.env
def _load_env_file():
env_path = os.path.expanduser("~/.openclaw/.env")
if os.path.exists(env_path):
with open(env_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
if key not in os.environ:
os.environ[key] = value
_load_env_file()
from claw_auth import claw_post, sign_url
from cos_uploader import upload_file_to_cos
CLAW_BASE_URL = os.getenv("CLAW_BASE_URL", "https://leewow.com")
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
CLAW_SK = os.getenv("CLAW_SK", "")
def generate_preview(image_path: str, template_id: int,
design_theme: str = "", aspect_ratio: str = "3:4") -> dict:
"""Upload image and trigger generation. Returns dict with task info."""
if not CLAW_SK:
return {"error": "CLAW_SK environment variable is not set."}
if not os.path.exists(image_path):
return {"error": f"Image file not found: {image_path}"}
# Ensure image is in workspace directory
workspace_dir = os.path.expanduser("~/.openclaw/workspace")
abs_image_path = os.path.abspath(image_path)
if not abs_image_path.startswith(workspace_dir):
return {"error": f"Image must be in workspace directory: {workspace_dir}"}
try:
image_url = upload_file_to_cos(image_path, key_prefix="prod-h5-generation-upload/claw-skill")
except Exception as e:
return {"error": f"Failed to upload image: {e}"}
url = f"{CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/generate"
payload = {
"templateId": template_id,
"imageUrl": image_url,
"designTheme": design_theme,
"aspectRatio": aspect_ratio,
}
try:
resp = claw_post(CLAW_SK, url, json_data=payload, timeout=30)
data = resp.json()
except Exception as e:
return {"error": f"Failed to call generate API: {e}"}
if data.get("code") != 0:
return {"error": f"API returned: {data.get('message', 'Unknown error')}"}
result = data.get("data", {})
result["_success"] = True
return result
def format_generate_result(result: dict) -> str:
"""Format generation trigger result. Guides the agent to poll for completion."""
task_id = result.get("taskId", "unknown")
status = result.get("status", "PENDING")
estimated = result.get("estimatedSeconds", 45)
template_id = result.get("templateId", "?")
return "\n".join([
f"正在生成效果图 ⏳ (约{estimated}秒)",
f"任务ID: {task_id} | 模板: #{template_id} | 状态: {status}",
"",
"[AGENT_INSTRUCTION]",
f"Call get_generation_status with task_id={task_id} and poll=true.",
"When COMPLETED, you MUST:",
"1. Send the preview image as a MEDIA ATTACHMENT (localImagePath from the result)",
"2. Include the purchaseUrl in the text message",
"Do NOT use ![markdown] syntax — send the actual image file via message media.",
"[/AGENT_INSTRUCTION]",
])
def _safe_result(result: dict) -> dict:
"""Build safe output: only taskId, status, estimatedSeconds, templateId, purchaseUrl.
The previewUrl from /claw/generate is signed to become purchaseUrl.
All other URLs (imageUrl etc.) are stripped so the agent cannot leak them.
The agent MUST call get_generation_status to get the localImagePath.
"""
if "error" in result:
return result
task_id = result.get("taskId", "unknown")
preview_url = result.get("previewUrl")
out = {
"taskId": task_id,
"status": result.get("status", "PENDING"),
"estimatedSeconds": result.get("estimatedSeconds", 45),
"templateId": result.get("templateId"),
"_next": f"MUST call get_generation_status(task_id='{task_id}', poll=true) "
f"to get the preview image (localImagePath). "
f"Send purchaseUrl below to the user together with the image.",
}
if preview_url and CLAW_SK:
out["purchaseUrl"] = sign_url(CLAW_SK, preview_url)
return out
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--image-path", type=str, required=True)
parser.add_argument("--template-id", type=int, required=True)
parser.add_argument("--design-theme", type=str, default="")
parser.add_argument("--aspect-ratio", type=str, default="3:4")
parser.add_argument("--json", action="store_true", help="(compat) always outputs safe JSON")
args = parser.parse_args()
import json
result = generate_preview(args.image_path, args.template_id, args.design_theme, args.aspect_ratio)
print(json.dumps(_safe_result(result), ensure_ascii=False, indent=2))
FILE:scripts/get_status.py
#!/usr/bin/env python3
"""Query generation task status and download preview image.
API response structure (GET /claw/task/{taskId}):
{
"code": 0,
"data": {
"taskId": "gen_xxx",
"status": "COMPLETED", // PENDING | ANALYZING | GENERATING | CREATING_PRODUCT | COMPLETED | FAILED
"result": { // only when COMPLETED
"renderedImageUrl": "https://...myqcloud.com/...",
"templateId": 15,
"customizedProductId": null // may be null if product creation is pending
},
"errorMessage": "...", // only when FAILED
"retryCount": 0 // only when FAILED
}
}
The task API does NOT return a previewUrl.
Preview URL format: {CLAW_PREVIEW_BASE_URL}/{taskId} (configured via claw.preview-base-url on the Java side).
Python signs this URL with skid/sig so the preview page can exchange for a JWT.
"""
import argparse
import json
import os
import sys
import time
def _load_env_file():
env_path = os.path.expanduser("~/.openclaw/.env")
if os.path.exists(env_path):
with open(env_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
if key not in os.environ:
os.environ[key] = value
_load_env_file()
from claw_auth import claw_get, sign_url
import requests
CLAW_BASE_URL = os.getenv("CLAW_BASE_URL", "https://leewow.com")
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
CLAW_SK = os.getenv("CLAW_SK", "")
CLAW_PREVIEW_BASE_URL = os.getenv("CLAW_PREVIEW_BASE_URL", f"{CLAW_BASE_URL}/claw/preview")
WORKSPACE_DIR = os.path.expanduser("~/.openclaw/workspace")
def get_task_status(task_id: str, download_image: bool = True) -> dict:
"""Query task status, download preview image, build signed purchase URL."""
if not CLAW_SK:
return {"error": "CLAW_SK environment variable is not set."}
url = f"{CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/task/{task_id}"
try:
resp = claw_get(CLAW_SK, url, timeout=15)
data = resp.json()
except Exception as e:
return {"error": f"Failed to fetch task status: {e}"}
if data.get("code") != 0:
return {"error": f"API returned: {data.get('message', 'Unknown error')}"}
raw = data.get("data", {})
status = raw.get("status", "UNKNOWN")
result_nested = raw.get("result", {}) or {}
out = {
"taskId": raw.get("taskId", task_id),
"status": status,
"templateId": result_nested.get("templateId"),
}
if status == "COMPLETED":
rendered_url = result_nested.get("renderedImageUrl")
if download_image and rendered_url:
local_path = _download_preview(rendered_url, task_id)
if local_path:
out["localImagePath"] = local_path
preview_url = f"{CLAW_PREVIEW_BASE_URL}/{task_id}"
out["purchaseUrl"] = sign_url(CLAW_SK, preview_url)
elif status == "FAILED":
out["errorMessage"] = raw.get("errorMessage", "Unknown error")
return out
def _download_preview(image_url: str, task_id: str) -> str | None:
"""Download preview image to workspace/previews/."""
try:
resp = requests.get(image_url, timeout=30)
resp.raise_for_status()
ext = ".png"
ct = resp.headers.get("content-type", "")
if "jpeg" in ct or "jpg" in ct:
ext = ".jpg"
elif "webp" in ct:
ext = ".webp"
previews_dir = os.path.join(WORKSPACE_DIR, "previews")
os.makedirs(previews_dir, exist_ok=True)
filepath = os.path.join(previews_dir, f"leewow_preview_{task_id}{ext}")
with open(filepath, "wb") as f:
f.write(resp.content)
return filepath
except Exception as e:
print(f"Warning: Failed to download preview image: {e}", file=sys.stderr)
return None
def poll_until_complete(task_id: str, timeout: int = 120, download_image: bool = True) -> dict:
"""Poll task status until COMPLETED / FAILED / timeout."""
start = time.time()
while time.time() - start < timeout:
result = get_task_status(task_id, download_image=False)
if "error" in result:
return result
status = result.get("status", "UNKNOWN")
if status == "COMPLETED":
return get_task_status(task_id, download_image=download_image)
if status == "FAILED":
return result
time.sleep(5)
return {"error": f"Timeout after {timeout}s", "taskId": task_id, "status": "TIMEOUT"}
def format_result(result: dict) -> str:
"""Human-readable text output (used when --json is not passed)."""
status = result.get("status", "UNKNOWN")
task_id = result.get("taskId", "unknown")
template_id = result.get("templateId", "?")
if status == "COMPLETED":
image_path = result.get("localImagePath")
purchase_url = result.get("purchaseUrl")
lines = [
"[AGENT_INSTRUCTION]",
"1. Send the preview image to the user as a MEDIA ATTACHMENT:",
]
if image_path:
lines.append(f" File: {image_path}")
lines.extend([
"2. Then send a text message with the purchase link and a friendly note.",
"Do NOT use  syntax — Feishu cannot render local paths that way.",
"Use the message tool with media parameter to send the image file.",
"[/AGENT_INSTRUCTION]",
"",
"你的定制效果图出来啦 🎉",
])
if purchase_url:
lines.append(f"🛒 点击下单购买: {purchase_url}")
lines.append(f"📦 模板ID: {template_id} | 任务ID: {task_id}")
lines.append("")
lines.append("喜欢吗?如果想调整或者试试其他产品,告诉我!")
return "\n".join(lines)
if status == "FAILED":
error_msg = result.get("errorMessage", "Unknown error")
return (f"## 生成失败 ❌\n\n"
f"**任务ID**: `{task_id}`\n"
f"**原因**: {error_msg}\n\n"
f"可以换张图片或者换个模板再试一次。")
return (f"## 正在生成中 ⏳\n\n"
f"**任务ID**: `{task_id}` | **模板**: #{template_id}\n"
f"**状态**: {status}\n\n"
f"请稍等,正在努力生成效果图...")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Query Leewow generation task status")
parser.add_argument("task_id", help="Task ID from generate_preview")
parser.add_argument("--poll", action="store_true", help="Poll until complete")
parser.add_argument("--timeout", type=int, default=120, help="Poll timeout in seconds")
parser.add_argument("--no-download", action="store_true", help="Skip downloading preview image")
parser.add_argument("--json", action="store_true", help="Output JSON instead of text")
args = parser.parse_args()
if args.poll:
result = poll_until_complete(args.task_id, args.timeout, download_image=not args.no_download)
else:
result = get_task_status(args.task_id, download_image=not args.no_download)
if args.json:
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print(format_result(result))
FILE:tools.json
{
"tools": [
{
"name": "browse_templates",
"description": "Browse Leewow product templates. Returns JSON array — each item has localImagePath (workspace file). CRITICAL: You MUST send each template's image to the user as a media attachment (NOT markdown). Use the message tool with media param, or your platform's image-sending method. Then include name, price, templateId in the text caption.",
"executor": "bash",
"command": "python3 scripts/browse.py --category '{{category}}' --count {{count}}",
"parameters": {
"type": "object",
"properties": {
"category": {
"type": "string",
"description": "Optional category filter (e.g. 'bag', 'accessory', 'home', 'apparel'). Leave empty to browse all.",
"default": ""
},
"count": {
"type": "integer",
"description": "Number of templates to return (1-20, default 10)",
"default": 10,
"minimum": 1,
"maximum": 20
}
}
}
},
{
"name": "generate_preview",
"description": "Upload a design image and start generation. Returns only taskId — no image or URL. You MUST then call get_generation_status(task_id, poll=true) to get the preview image and signed purchase link. Do NOT fabricate or guess any URL — only use purchaseUrl from get_generation_status.",
"executor": "bash",
"command": "python3 scripts/generate.py --image-path '{{image_path}}' --template-id {{template_id}} --design-theme '{{design_theme}}' --aspect-ratio '{{aspect_ratio}}' --json",
"parameters": {
"type": "object",
"properties": {
"image_path": {
"type": "string",
"description": "Local file path of the design image. MUST be in workspace directory: ~/.openclaw/workspace/"
},
"template_id": {
"type": "integer",
"description": "Product template ID (from browse_templates results)"
},
"design_theme": {
"type": "string",
"description": "Design theme or style description (e.g. 'cute cartoon', 'minimalist')",
"default": ""
},
"aspect_ratio": {
"type": "string",
"description": "Aspect ratio: '3:4', '1:1', '4:3'",
"default": "3:4"
}
},
"required": ["image_path", "template_id"]
}
},
{
"name": "get_generation_status",
"description": "Check generation task status. When COMPLETED, returns JSON with localImagePath (workspace preview image) and purchaseUrl (signed order link). CRITICAL: You MUST send the preview image to the user as a media attachment, and include the purchaseUrl in the text. Do NOT just describe the product — send the actual image file.",
"executor": "bash",
"command": "python3 scripts/get_status.py '{{task_id}}' {{#if poll}}--poll{{/if}} --timeout {{timeout}} --json",
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID returned by generate_preview"
},
"poll": {
"type": "boolean",
"description": "Wait and poll until generation completes",
"default": true
},
"timeout": {
"type": "integer",
"description": "Maximum wait time in seconds",
"default": 120
}
},
"required": ["task_id"]
}
},
{
"name": "cos_presign_url",
"description": "Generate a presigned URL for accessing COS (Tencent Cloud Object Storage) images. Useful for private buckets or when images need temporary access tokens.",
"executor": "bash",
"command": "python3 scripts/cos_presign.py '{{cos_url}}' --expired {{expired}} --json",
"parameters": {
"type": "object",
"properties": {
"cos_url": {
"type": "string",
"description": "COS URL to generate presigned URL for (e.g., https://bucket.cos.region.myqcloud.com/key.png)"
},
"expired": {
"type": "integer",
"description": "URL expiration time in seconds (default: 3600 = 1 hour)",
"default": 3600
}
},
"required": ["cos_url"]
}
}
]
}
Help users quickly find desirable customizable products, turn images or ideas into gifts, and get highly flexible personalized customization services. Browse...
---
name: custom-gift-leewow
version: 1.0.24
description: >-
Help users quickly find desirable customizable products, turn images or ideas
into gifts, and get highly flexible personalized customization services.
Browse and create custom gifts — personalized bags, mugs, phone cases,
apparel and more. Upload any image to generate an AI-powered product mockup.
Tools: browse_templates (Python direct Feishu card send), generate_preview,
get_generation_status (Python direct Feishu result send). Requires CLAW_SK
and Feishu app/target configuration. Trigger priority: when the user asks
what products/items/templates can be customized, call browse_templates before
giving any broad Leewow capability summary.
If ClawHub is rate-limited,
install from GitHub:
https://github.com/AIDiyTeams/claw-skill/tree/main/custom-gift-leewow
homepage: https://github.com/AIDiyTeams/claw-skill/tree/main/custom-gift-leewow
---
# Custom Gift — Leewow
这个 skill 的收益很直接:
- 帮用户快速找到心仪的可定制商品
- 把图片、照片或创意做成礼物、周边和个人用品
- 提供非常灵活的个性化定制服务,从浏览到效果图预览一条链路完成
Leewow 在这个 skill 里的角色很简单:
- 向用户展示**真实可定制商品**
- 帮用户选择模板
- 生成效果图并进入下单链路
所以当用户在问“能定制什么”“有哪些商品/模板可以选”时,默认应该先进入**商品浏览**流程,而不是先回答公司层面的泛化能力介绍。
## When to Use
- User wants to **send a gift** or **create something personalized**
- User says "browse products", "show me what I can customize", "gift ideas"
- User says in Chinese: "看看有啥能定制的", "看看有什么可以定制的", "有哪些可定制产品", "有什么模板", "看看定制品", "能做哪些定制品"
- User asks what physical products / items / templates are available to customize
- User provides an **image** and wants to turn it into a product
- User says "make this into a mug/bag/shirt", "customize this design"
## Trigger Priority
For browse-style requests, `browse_templates` is the default first step.
Use a broad Leewow capability summary only if `browse_templates` fails.
> ClawHub rate-limited? Install directly from GitHub:
> `https://github.com/AIDiyTeams/claw-skill/tree/main/custom-gift-leewow`
## Install Fallback
If ClawHub is rate-limited, install this skill directly from GitHub:
- Repository: `https://github.com/AIDiyTeams/claw-skill`
- Skill path: `custom-gift-leewow`
Manual fallback:
```bash
cd /path/to/your/openclaw-workspace
mkdir -p skills
git clone https://github.com/AIDiyTeams/claw-skill.git /tmp/claw-skill
cp -R /tmp/claw-skill/custom-gift-leewow ./skills/custom-gift-leewow
```
Create personalized gifts and custom products powered by AI. This skill provides:
| Tool | Purpose |
|------|---------|
| `browse_templates` | Discover customizable product templates (bags, accessories, home decor, apparel, etc.) |
| `generate_preview` | Upload a design image and trigger AI generation |
| `get_generation_status` | Check generation status and download preview image |
## What the agent does (keep it minimal)
**Browse** — `browse_templates` sends product cards directly from Python and returns only send results. If the tool succeeds, reply with **`NO_REPLY`**. In normal chat usage, pass the current Feishu conversation target as `feishu_target`.
**Preview** — `get_generation_status` sends the generated preview result directly to Feishu from Python and returns only send results. If the tool succeeds, reply with **`NO_REPLY`**. Preview result cards intentionally use a different layout from browse cards.
## Prerequisites
- `CLAW_SK` — Leewow Secret Key (format: `sk-leewow-{keyId}-{secret}`)
- Obtain it from: `https://leewow.com/profile/secret-keys`
- `FEISHU_APP_ID` — Feishu App ID (often referred to together with App Secret as app AK/SK)
- `FEISHU_APP_SECRET` — Feishu App Secret
- Obtain them from your Feishu Open Platform app settings page
- `FEISHU_RECEIVE_ID` — fallback Feishu target for this skill
- `FEISHU_RECEIVE_ID_TYPE` — optional, defaults to `chat_id`
- `CLAW_BASE_URL` — API base URL (default: `https://leewow.com`)
- `CLAW_PATH_PREFIX` — Path prefix (default: `/v2` for leewow.com)
- `LEEWOW_API_BASE` — Base URL for COS STS credentials (default: `https://leewow.com`)
- Python 3.10+ with `requests` and `cos-python-sdk-v5`
## Configuration
Environment variables are loaded from `~/.openclaw/.env`:
```bash
CLAW_SK=sk-leewow-xxxx-xxxx
# Feishu App ID / App Secret (app AK/SK)
FEISHU_APP_ID=cli_xxx
FEISHU_APP_SECRET=xxx
# Default target for direct Feishu send
FEISHU_RECEIVE_ID=oc_xxx_or_open_id
FEISHU_RECEIVE_ID_TYPE=chat_id
CLAW_BASE_URL=https://leewow.com
CLAW_PATH_PREFIX=/v2
LEEWOW_API_BASE=https://leewow.com
```
Prefer runtime target passing:
- when the current Feishu conversation target is already known, pass it as `feishu_target`
- `FEISHU_RECEIVE_ID` is only a fallback
## Image Requirements (IMPORTANT)
### For Input Images (User Upload)
- **Must be in workspace directory**: `~/.openclaw/workspace/`
- Supported formats: JPG, PNG, WebP
- Recommended: Clear, well-lit images for best results
### For Preview Images (Generated Output)
- Automatically saved to: `~/.openclaw/workspace/previews/`
- Filename format: `leewow_preview_{taskId}.{ext}`
- The agent can directly display these images to users
### COS Presigned URLs
For private COS buckets, you may need to generate **presigned URLs** for accessing images:
```bash
# Generate presigned URL for a COS image
python3 scripts/cos_presign.py "https://bucket.cos.region.myqcloud.com/key.png" --json
# With custom expiration (e.g., 1 hour = 3600 seconds)
python3 scripts/cos_presign.py "COS_URL" --expired 3600
# Use with get_generation_status to get presigned preview URL
python3 scripts/get_status.py {taskId} --presign --json
```
**Note**: Most Leewow COS buckets are public, so presigned URLs are optional.
## Typical Flow (Generator Pattern)
1. **Browse** — `browse_templates` → Python sends Feishu product cards directly → agent replies `NO_REPLY` → user picks a `Template ID` when ready
2. **Upload** — User provides an image (must be in workspace `~/.openclaw/workspace/`)
3. **Generate** — Call `generate_preview` → get taskId → immediately proceed to step 4
4. **Poll** — Call `get_generation_status` with `poll=true` → wait for COMPLETED
5. **Display** — `get_generation_status` → Python sends preview result directly → final assistant reply is `NO_REPLY`
## Tool Reference
### browse_templates
Browse available product templates.
```bash
python3 scripts/browse.py --count 5 --json
```
Options:
- `--category`: Filter by category (bag, accessory, home, apparel)
- `--count`: Number of products to return (1-10, default 5)
- `--json`: Direct-send to Feishu and return send result JSON
- `--feishu-target`: Current Feishu conversation target. In normal use, treat this as required.
- `--raw-json`: Debug mode that returns raw template data
### generate_preview
Upload image and trigger generation.
```bash
python3 scripts/generate.py --image-path ./workspace/my_design.png --template-id 3 --json
```
Options:
- `--image-path`: **Required**. Path to design image (must be in workspace)
- `--template-id`: **Required**. Product template ID from browse_templates
- `--design-theme`: Optional style description
- `--aspect-ratio`: Image ratio (3:4, 1:1, 4:3, default 3:4)
- `--json`: Output JSON format
**Returns**: Task ID for status polling. Generation is async (~30-60s).
### get_generation_status
Check generation status and download preview image.
```bash
python3 scripts/get_status.py {taskId} --poll
```
Options:
- `task_id`: Task ID from generate_preview
- `--poll`: Wait until generation completes
- `--timeout`: Poll timeout in seconds (default 120)
- `--no-download`: Skip downloading preview image
- `--json`: Output JSON format
- `--feishu-target`: Current Feishu conversation target. In normal use, treat this as required.
**Returns**: Generation status and, in direct-send mode, send result JSON.
## Safety Rules
- Never expose or log the `CLAW_SK` value. When confirming configuration, only show the last 4 characters.
- Input images **must** be in workspace directory for the agent to access them
- Preview images are automatically saved to `workspace/previews/`
- Limit browse results to 10 templates maximum per request
## Examples
```text
User: "I want to make a custom gift for my friend"
→ browse_templates → Python sends product cards directly → `NO_REPLY`
→ user picks → generate_preview → get_generation_status --poll
→ Python sends preview image + text directly → `NO_REPLY`
User: "Turn this photo into a phone case"
→ browse_templates --category phone → Python sends product cards directly → user picks
→ generate_preview → get_generation_status --poll
→ Python sends preview image + text directly → `NO_REPLY`
User: "Show me what products I can customize"
→ browse_templates → Python sends product cards directly → `NO_REPLY`
User: "看看有啥能定制的"
→ browse_templates first
```
## Output Structure
### browse_templates --json
```json
{
"ok": true,
"mode": "direct_feishu_send",
"channel": "feishu",
"messageCount": 8,
"messageIds": ["om_xxx", "om_yyy"],
"feishuImagesResolved": true,
"finalAssistantReply": "NO_REPLY"
}
```
→ Python sends product cards directly to Feishu. Agent returns `NO_REPLY`.
### generate_preview --json
```json
{
"taskId": "task_xxx",
"status": "PENDING",
"estimatedSeconds": 45,
"templateId": 3
}
```
### get_generation_status --json (completed)
```json
{
"taskId": "task_xxx",
"status": "COMPLETED",
"mode": "direct_feishu_send",
"messageCount": 1,
"messageIds": ["om_card_xxx"],
"feishuImagesResolved": true,
"finalAssistantReply": "NO_REPLY"
}
```
→ Python sends one preview result card directly. Agent returns `NO_REPLY`.
Version Marker: [email protected]
FILE:scripts/browse.py
#!/usr/bin/env python3
from __future__ import annotations
"""Browse Leewow customizable product templates and send Feishu cards directly."""
import argparse
import hashlib
import json
import os
import subprocess
import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from urllib.parse import urlparse
# Load environment variables from ~/.openclaw/.env
def _load_env_file():
env_path = os.path.expanduser("~/.openclaw/.env")
if os.path.exists(env_path):
with open(env_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
if key not in os.environ:
os.environ[key] = value
_load_env_file()
import requests
from channel_renderers import normalize_browse_item
from claw_auth import claw_get
from feishu_direct import FeishuDirectClient, resolve_feishu_delivery_config
CLAW_BASE_URL = os.getenv("CLAW_BASE_URL", "https://leewow.com")
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
CLAW_SK = os.getenv("CLAW_SK", "")
WORKSPACE_DIR = os.path.expanduser("~/.openclaw/workspace")
TEMPLATE_IMG_DIR = os.path.join(WORKSPACE_DIR, "template_images")
DEFERRED_BATCH_DIR = os.path.join(WORKSPACE_DIR, "deferred_feishu_batches")
DEFAULT_SYNC_SEND_COUNT = 3
def _download_cover_image(remote_url: str, template_id) -> str | None:
"""Download template cover image to workspace and return local path.
Uses a content-hash filename so repeated calls are instant (cache hit).
Returns None on failure — caller should fall back to remote URL.
"""
if not remote_url:
return None
try:
os.makedirs(TEMPLATE_IMG_DIR, exist_ok=True)
url_hash = hashlib.md5(remote_url.encode()).hexdigest()[:10]
parsed = urlparse(remote_url)
ext = os.path.splitext(parsed.path)[1] or ".jpg"
filename = f"template_{template_id}_{url_hash}{ext}"
filepath = os.path.join(TEMPLATE_IMG_DIR, filename)
if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
return filepath
resp = requests.get(remote_url, timeout=15)
resp.raise_for_status()
with open(filepath, "wb") as f:
f.write(resp.content)
return filepath
except Exception as e:
print(f"Warning: failed to download cover image: {e}", file=sys.stderr)
return None
def _fetch_templates(category: str = None, count: int = 5) -> list:
if not CLAW_SK:
return [{"error": "CLAW_SK environment variable is not set."}]
count = min(max(count, 1), 10)
url = f"{CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/templates"
try:
resp = claw_get(CLAW_SK, url, timeout=15)
data = resp.json()
except Exception as e:
return [{"error": f"Failed to fetch templates: {e}"}]
if data.get("code") != 0:
return [{"error": f"API returned: {data.get('message', 'Unknown error')}"}]
templates = data.get("data", [])
if category:
cat_lower = category.lower()
templates = [
t for t in templates
if cat_lower in (t.get("name", "") + t.get("description", "")).lower()
]
return templates[:count]
def _extract_price(sku_configs) -> str:
if not sku_configs:
return ""
try:
skus = json.loads(sku_configs) if isinstance(sku_configs, str) else sku_configs
if isinstance(skus, list) and skus:
first = skus[0]
price = first.get("priceOnSell") or first.get("price")
origin = first.get("originPrice")
currency = first.get("currency", "USD")
if price:
s = f"**price {currency}**"
if origin and float(origin) > float(price):
s += f" ~~origin~~"
return s
except (json.JSONDecodeError, TypeError, ValueError):
pass
return ""
def _build_browse_items(category: str = None, count: int = 5) -> list[dict]:
templates = _fetch_templates(category=category, count=count)
if templates and templates[0].get("error"):
return templates
rows = []
for template in templates:
rows.append(normalize_browse_item(template, _extract_price(template.get("skuConfigs"))))
return rows
def _build_customer_message_markdown(item: dict, include_preview_link: bool = False) -> str:
price_display = str(item["price"]).replace("|", "\\|")
lines = [
f"## {item['name']}",
item["description"],
f"**Template ID:** `{item['templateId']}`",
f"**Price:** {price_display}",
]
if include_preview_link and item.get("coverImage"):
lines.extend(["", f"[Preview: {item['name']}]({item['coverImage']})"])
return "\n".join(lines)
def _prepare_feishu_cards_parallel(
rows: list[dict],
client: FeishuDirectClient,
) -> tuple[list[dict], bool]:
"""Build per-product Feishu card payloads in parallel while preserving order."""
if not rows:
return [], False
workers = min(8, len(rows))
results: list[dict | None] = [None] * len(rows)
any_changed = False
def task(idx: int, item: dict) -> tuple[int, dict]:
markdown = _build_customer_message_markdown(item)
try:
card, image_resolved = client.build_card(
markdown_text=markdown,
image_ref=item.get("coverImage") or None,
alt_text=item["name"],
)
except Exception:
card, image_resolved = client.build_card(
markdown_text=_build_customer_message_markdown(item, include_preview_link=True),
image_ref=None,
alt_text=item["name"],
)
return idx, {
"card": card,
"feishuImageResolved": image_resolved,
}
with ThreadPoolExecutor(max_workers=workers) as ex:
futures = [ex.submit(task, i, item) for i, item in enumerate(rows)]
for fut in futures:
idx, result = fut.result()
results[idx] = result
if result.get("feishuImageResolved"):
any_changed = True
return [r for r in results if r is not None], any_changed
def _send_cards_sequentially(client: FeishuDirectClient, plans: list[dict]) -> list[str]:
message_ids: list[str] = []
for plan in plans:
message_ids.append(client.send_card(plan["card"]))
return message_ids
def _sync_send_count(total: int) -> int:
if total <= 0:
return 0
raw = os.getenv("LEEWOW_BROWSE_SYNC_SEND_COUNT", str(DEFAULT_SYNC_SEND_COUNT)).strip()
try:
count = int(raw)
except ValueError:
count = DEFAULT_SYNC_SEND_COUNT
return max(1, min(total, count))
def _spawn_deferred_card_sender(
plans: list[dict],
app_id: str,
app_secret: str,
receive_id: str,
receive_id_type: str,
domain: str,
) -> str | None:
if not plans:
return None
os.makedirs(DEFERRED_BATCH_DIR, exist_ok=True)
with tempfile.NamedTemporaryFile(
mode="w",
encoding="utf-8",
suffix=".json",
prefix="browse_cards_",
dir=DEFERRED_BATCH_DIR,
delete=False,
) as handle:
json.dump(
{
"receive_id": receive_id,
"receive_id_type": receive_id_type,
"domain": domain,
"cards": [plan["card"] for plan in plans],
},
handle,
ensure_ascii=False,
)
batch_file = handle.name
child_env = os.environ.copy()
child_env["FEISHU_APP_ID"] = app_id
child_env["FEISHU_APP_SECRET"] = app_secret
child_env["FEISHU_RECEIVE_ID"] = receive_id
child_env["FEISHU_RECEIVE_ID_TYPE"] = receive_id_type
child_env["FEISHU_OPEN_BASE"] = domain
subprocess.Popen(
[sys.executable, os.path.abspath(__file__), "--send-card-batch-file", batch_file],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
close_fds=True,
start_new_session=True,
env=child_env,
)
return batch_file
def _send_card_batch_file(batch_file: str) -> int:
payload = json.loads(Path(batch_file).read_text(encoding="utf-8"))
client = FeishuDirectClient(
app_id=os.getenv("FEISHU_APP_ID", ""),
app_secret=os.getenv("FEISHU_APP_SECRET", ""),
receive_id=str(payload.get("receive_id") or os.getenv("FEISHU_RECEIVE_ID") or ""),
receive_id_type=str(payload.get("receive_id_type") or os.getenv("FEISHU_RECEIVE_ID_TYPE") or "chat_id"),
domain=str(payload.get("domain") or os.getenv("FEISHU_OPEN_BASE") or "https://open.feishu.cn"),
)
try:
cards = payload.get("cards") or []
for card in cards:
client.send_card(card)
return 0
finally:
try:
os.remove(batch_file)
except OSError:
pass
def browse_templates_payload(category: str = None, count: int = 5, channel: str = "feishu", params: dict | None = None) -> dict:
"""Direct-send browse cards on Feishu and return send results."""
rows = _build_browse_items(category=category, count=count)
if rows and rows[0].get("error"):
return {"error": rows[0]["error"]}
ch = (channel or "").strip().lower()
if ch != "feishu":
return {"error": f"Unsupported direct-send channel: {channel}"}
try:
app_id, app_secret, receive_id, receive_id_type, domain = resolve_feishu_delivery_config(params)
except Exception as exc:
return {"error": str(exc)}
client = FeishuDirectClient(
app_id=app_id,
app_secret=app_secret,
receive_id=receive_id,
receive_id_type=receive_id_type,
domain=domain,
)
plans, feishu_images_resolved = _prepare_feishu_cards_parallel(rows, client)
sync_count = _sync_send_count(len(plans))
immediate_plans = plans[:sync_count]
deferred_plans = plans[sync_count:]
message_ids = _send_cards_sequentially(client, immediate_plans)
_spawn_deferred_card_sender(
deferred_plans,
app_id=app_id,
app_secret=app_secret,
receive_id=receive_id,
receive_id_type=receive_id_type,
domain=domain,
)
return {
"ok": True,
"channel": channel,
"mode": "direct_feishu_send",
"messageCount": len(plans),
"immediateMessageCount": len(message_ids),
"deferredMessageCount": len(deferred_plans),
"deferredBatchScheduled": bool(deferred_plans),
"messageIds": message_ids,
"feishuImagesResolved": feishu_images_resolved,
"finalAssistantReply": "NO_REPLY",
}
def browse_templates_json(category: str = None, count: int = 5) -> list:
"""Return raw JSON-serializable template data with optional local image cache."""
templates = _fetch_templates(category=category, count=count)
if templates and templates[0].get("error"):
return templates
results = []
for i, template in enumerate(templates, 1):
tid = template.get("templateId", "?")
cover = normalize_plain_text(template.get("coverImage"))
local_cover = _download_cover_image(cover, tid)
results.append(
{
"index": i,
"templateId": tid,
"name": template.get("name", "Unnamed Product"),
"description": template.get("description", ""),
"skuType": template.get("skuType", ""),
"shippingOrigin": template.get("shippingOrigin", "CN"),
"price": _extract_price(template.get("skuConfigs")),
"remoteImageUrl": cover,
"localImagePath": local_cover,
}
)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--category", type=str, default=None)
parser.add_argument("--count", type=int, default=5)
parser.add_argument("--channel", type=str, default="feishu", help="Reserved output channel renderer")
parser.add_argument("--json", action="store_true", help="Direct-send and output JSON result")
parser.add_argument("--raw-json", action="store_true", help="Output raw template JSON for debugging")
parser.add_argument("--feishu-target", type=str, default=None)
parser.add_argument("--feishu-receive-id-type", type=str, default=None)
parser.add_argument("--feishu-app-id", type=str, default=None)
parser.add_argument("--feishu-app-secret", type=str, default=None)
parser.add_argument("--feishu-open-base", type=str, default=None)
parser.add_argument("--send-card-batch-file", type=str, default=None)
args = parser.parse_args()
if args.send_card_batch_file:
raise SystemExit(_send_card_batch_file(args.send_card_batch_file))
elif args.raw_json:
print(json.dumps(browse_templates_json(category=args.category, count=args.count), ensure_ascii=False, indent=2))
elif args.json:
print(
json.dumps(
browse_templates_payload(
category=args.category,
count=args.count,
channel=args.channel,
params={
"feishu_target": args.feishu_target,
"feishu_receive_id_type": args.feishu_receive_id_type,
"feishu_app_id": args.feishu_app_id,
"feishu_app_secret": args.feishu_app_secret,
"feishu_open_base": args.feishu_open_base,
},
),
ensure_ascii=False,
indent=2,
)
)
else:
print(
json.dumps(
browse_templates_payload(
category=args.category,
count=args.count,
channel=args.channel,
params={
"feishu_target": args.feishu_target,
"feishu_receive_id_type": args.feishu_receive_id_type,
"feishu_app_id": args.feishu_app_id,
"feishu_app_secret": args.feishu_app_secret,
"feishu_open_base": args.feishu_open_base,
},
),
ensure_ascii=False,
indent=2,
)
)
FILE:scripts/channel_renderers.py
#!/usr/bin/env python3
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Iterable
def normalize_plain_text(value: object) -> str:
return str(value or "").replace("\r", " ").replace("\n", " ").strip()
def escape_table_cell(value: object) -> str:
text = normalize_plain_text(value)
if not text:
return "-"
return text.replace("|", "\\|")
def compact_description(value: object, limit: int = 56) -> str:
text = normalize_plain_text(value)
if len(text) <= limit:
return text
return text[: limit - 3].rstrip() + "..."
def build_customer_subtitle(name: str, sku_type: str, shipping: str) -> str:
title = normalize_plain_text(name).lower()
sku = normalize_plain_text(sku_type).lower()
key = f"{title} {sku}"
mapping = [
(("hoodie", "sweatshirt"), "可定制连帽卫衣,适合大面积图案展示"),
(("tote", "bag"), "可定制通勤包袋,适合日常使用"),
(("crossbody",), "可定制斜挎包,适合轻便出行"),
(("socks",), "可定制袜子,适合趣味图案设计"),
(("pillowcase", "pillow case"), "可定制家居用品,适合照片或插画"),
(("travel pillow",), "可定制旅行枕,适合舒适出行场景"),
(("iphone", "phone case"), "可定制手机壳,适合个性化图案"),
(("framed print", "print", "poster"), "可定制装饰画,适合展示你的作品"),
]
for keywords, subtitle in mapping:
if any(keyword in key for keyword in keywords):
return subtitle
shipping_suffix = f" · Ships from {shipping}" if shipping else ""
return f"支持自定义设计的商品{shipping_suffix}".strip()
def normalize_browse_item(template: dict, price_display: str) -> dict:
sku_type = normalize_plain_text(template.get("skuType")) or "-"
shipping = normalize_plain_text(template.get("shippingOrigin")) or "CN"
description = build_customer_subtitle(
name=normalize_plain_text(template.get("name", "Unnamed Product")) or "Unnamed Product",
sku_type=sku_type,
shipping=shipping,
)
info_parts = [f"SKU: {sku_type}", f"Ships: {shipping}"]
return {
"templateId": template.get("templateId", "?"),
"name": normalize_plain_text(template.get("name", "Unnamed Product")) or "Unnamed Product",
"coverImage": normalize_plain_text(template.get("coverImage")),
"price": price_display or "-",
"skuType": sku_type,
"shippingOrigin": shipping,
"description": description,
"info": " · ".join(info_parts),
}
class ChannelRenderer(ABC):
channel: str = "plain"
@abstractmethod
def render_browse(self, items: Iterable[dict]) -> str:
raise NotImplementedError
def render_browse_messages(self, items: Iterable[dict]) -> list[str]:
return [self.render_browse(items)]
class PlainTextRenderer(ChannelRenderer):
channel = "plain"
def render_browse(self, items: Iterable[dict]) -> str:
rows = list(items)
if not rows:
return "No matching templates found. Try a different category or browse all."
lines = [f"Available Product Templates ({len(rows)} results)", ""]
for index, item in enumerate(rows, 1):
lines.append(f"{index}. {item['name']}")
lines.append(f"Template ID: `{item['templateId']}`")
lines.append(f"Price: {item['price']}")
lines.append(item["info"])
if item["coverImage"]:
lines.append(f"Preview: {item['coverImage']}")
lines.append("")
lines.append("Use `generate_preview` with a `Template ID` to create a customized design.")
return "\n".join(lines)
class FeishuRenderer(ChannelRenderer):
channel = "feishu"
def render_browse(self, items: Iterable[dict]) -> str:
messages = self.render_browse_messages(items)
return "\n\n".join(messages)
def render_browse_messages(self, items: Iterable[dict]) -> list[str]:
rows = list(items)
if not rows:
return ["No matching templates found. Try a different category or browse all."]
messages: list[str] = []
for item in rows:
description = item["description"] or item["info"]
price_display = str(item["price"]).replace("|", "\\|")
lines = [
f"## {escape_table_cell(item['name'])}",
description,
f"**Template ID:** `{item['templateId']}`",
f"**Price:** {price_display}",
]
if item["coverImage"]:
lines.extend(["", f"![{escape_table_cell(item['name'])}]({item['coverImage']})"])
messages.append("\n".join(lines))
return messages
@staticmethod
def _format_image_cell(name: str, cover_url: str) -> str:
if not cover_url:
return "-"
return f""
@staticmethod
def _format_preview_link(cover_url: str) -> str:
if not cover_url:
return "-"
return f"[Open image]({cover_url})"
_RENDERERS = {
PlainTextRenderer.channel: PlainTextRenderer(),
FeishuRenderer.channel: FeishuRenderer(),
}
def get_channel_renderer(channel: str | None) -> ChannelRenderer:
normalized = normalize_plain_text(channel).lower() or FeishuRenderer.channel
if normalized not in _RENDERERS:
supported = ", ".join(sorted(_RENDERERS))
raise ValueError(f"Unsupported channel '{channel}'. Supported: {supported}")
return _RENDERERS[normalized]
FILE:scripts/claw_auth.py
"""Claw API HMAC-SHA256 signing utility.
Supports CLAW_PATH_PREFIX env var (e.g. "/v2") for proxied environments:
- Request URL: {CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/templates
- Sign path: /claw/templates (what Java actually receives)
Preview page token exchange (browser → GET /claw/preview/auth) matches
ClawPreviewAuthController: only query params skid + sig, with
sig = hex(HMAC-SHA256(key=full_sk, message="claw-preview:" + skid)).
Claw API calls use X-Claw-* headers; payload format is defined in ClawSkAuthFilter.
"""
import hashlib
import hmac
import os
import time
import uuid
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
import requests
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
def _parse_key_id(sk: str) -> str:
parts = sk.split("-")
if len(parts) < 4 or parts[0] != "sk" or parts[1] != "leewow":
raise ValueError("Invalid SK format. Expected: sk-leewow-{keyId}-{secret}")
return parts[2]
def _compute_body_hash(body: bytes) -> str:
return hashlib.sha256(body).hexdigest()
def _compute_signature(sk: str, payload: str) -> str:
return hmac.new(
sk.encode("utf-8"),
payload.encode("utf-8"),
hashlib.sha256,
).hexdigest()
def _strip_prefix(path: str) -> str:
"""Strip CLAW_PATH_PREFIX from path for signing (Java receives path without prefix)."""
if CLAW_PATH_PREFIX and path.startswith(CLAW_PATH_PREFIX):
return path[len(CLAW_PATH_PREFIX) :]
return path
def build_claw_headers(sk: str, method: str, url: str, body: bytes = b"") -> dict:
key_id = _parse_key_id(sk)
timestamp = str(int(time.time()))
nonce = uuid.uuid4().hex[:16]
raw_path = urlparse(url).path
sign_path = _strip_prefix(raw_path)
body_hash = _compute_body_hash(body)
sign_payload = f"{key_id}\n{timestamp}\n{nonce}\n{method}\n{sign_path}\n{body_hash}"
signature = _compute_signature(sk, sign_payload)
return {
"X-Claw-KeyId": key_id,
"X-Claw-Timestamp": timestamp,
"X-Claw-Nonce": nonce,
"X-Claw-Signature": signature,
}
def build_preview_auth_params(sk: str) -> dict:
"""skid + sig for GET /claw/preview/auth — matches ClawPreviewAuthController.exchangeToken."""
key_id = _parse_key_id(sk)
message = f"claw-preview:{key_id}"
sig = hmac.new(
sk.encode("utf-8"),
message.encode("utf-8"),
hashlib.sha256,
).hexdigest()
return {"skid": key_id, "sig": sig}
def sign_url(sk: str, url: str) -> str:
"""Append skid & sig query params for preview / purchase links.
Java verifies with:
expectedSig = hmacSha256(fullSk, \"claw-preview:\" + skid) // hex, lowercase
Do not use X-Claw header signing (ts/nonce/path) here — preview auth only accepts skid+sig.
"""
auth = build_preview_auth_params(sk)
parsed = urlparse(url)
query = dict(parse_qsl(parsed.query, keep_blank_values=True))
query["skid"] = auth["skid"]
query["sig"] = auth["sig"]
new_query = urlencode(query)
return urlunparse(parsed._replace(query=new_query))
def claw_get(sk: str, url: str, **kwargs) -> requests.Response:
headers = build_claw_headers(sk, "GET", url)
headers.update(kwargs.pop("headers", {}))
return requests.get(url, headers=headers, **kwargs)
def claw_post(sk: str, url: str, json_data: dict = None, **kwargs) -> requests.Response:
import json as json_module
body = json_module.dumps(json_data).encode("utf-8") if json_data else b""
headers = build_claw_headers(sk, "POST", url, body)
headers["Content-Type"] = "application/json"
headers.update(kwargs.pop("headers", {}))
return requests.post(url, data=body, headers=headers, **kwargs)
FILE:scripts/cos_presign.py
#!/usr/bin/env python3
"""Generate COS presigned URLs for accessing private bucket objects.
腾讯云 COS 预签名 URL 生成工具
用于为私有 bucket 中的图片生成带签名的临时访问 URL
"""
import os
import sys
from urllib.parse import urlparse
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from cos_uploader import _fetch_sts_credentials, _get_cos_client
from qcloud_cos.cos_exception import CosServiceError
def extract_cos_key_from_url(cos_url: str) -> tuple:
"""Extract bucket, region, and key from COS URL.
URL format: https://{bucket}.cos.{region}.myqcloud.com/{key}
"""
parsed = urlparse(cos_url)
host = parsed.netloc
# Parse bucket.cos.region.myqcloud.com
parts = host.split('.')
if len(parts) >= 5 and parts[1] == 'cos':
bucket = parts[0]
region = parts[2]
else:
raise ValueError(f"Invalid COS URL format: {cos_url}")
key = parsed.path.lstrip('/')
return bucket, region, key
def generate_presigned_url(cos_url: str, expired: int = 3600) -> str:
"""Generate presigned URL for COS object.
Args:
cos_url: Original COS URL (e.g., https://bucket.cos.region.myqcloud.com/key)
expired: URL expiration time in seconds (default: 3600 = 1 hour)
Returns:
Presigned URL with query parameters
"""
bucket, region, key = extract_cos_key_from_url(cos_url)
# Get COS client with STS credentials
client, _, _ = _get_cos_client()
# Generate presigned URL
presigned_url = client.get_presigned_url(
Method='GET',
Bucket=bucket,
Key=key,
Expired=expired
)
return presigned_url
def generate_presigned_url_with_custom_domain(cos_url: str, custom_domain: str = None, expired: int = 3600) -> str:
"""Generate presigned URL with optional custom CDN domain.
If custom_domain is provided, the signature will be generated for the COS URL
but the returned URL will use the custom domain (for CDN acceleration).
"""
presigned = generate_presigned_url(cos_url, expired)
if custom_domain:
# Replace the COS domain with custom CDN domain
parsed = urlparse(cos_url)
original_host = parsed.netloc
presigned_parsed = urlparse(presigned)
# Build new URL with custom domain but keep query params (signature)
new_url = f"https://{custom_domain}{parsed.path}?{presigned_parsed.query}"
return new_url
return presigned
def batch_generate_presigned_urls(cos_urls: list, expired: int = 3600) -> dict:
"""Generate presigned URLs for multiple COS objects.
Args:
cos_urls: List of COS URLs
expired: URL expiration time in seconds
Returns:
Dict mapping original URLs to presigned URLs
"""
results = {}
for url in cos_urls:
try:
presigned = generate_presigned_url(url, expired)
results[url] = {
"success": True,
"presigned_url": presigned,
"expired": expired
}
except Exception as e:
results[url] = {
"success": False,
"error": str(e)
}
return results
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(description="Generate COS presigned URLs")
parser.add_argument("url", help="COS URL to sign")
parser.add_argument("--expired", type=int, default=3600, help="Expiration time in seconds (default: 3600)")
parser.add_argument("--cdn-domain", type=str, default=None, help="Custom CDN domain (optional)")
parser.add_argument("--json", action="store_true", help="Output JSON format")
args = parser.parse_args()
try:
if args.cdn_domain:
result = generate_presigned_url_with_custom_domain(args.url, args.cdn_domain, args.expired)
else:
result = generate_presigned_url(args.url, args.expired)
if args.json:
print(json.dumps({
"original_url": args.url,
"presigned_url": result,
"expired": args.expired
}, indent=2))
else:
print(result)
except Exception as e:
if args.json:
print(json.dumps({"error": str(e)}))
else:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
FILE:scripts/cos_uploader.py
"""COS upload via STS temporary credentials from Leewow backend."""
import os
import time
import uuid
from typing import Optional
import requests
from qcloud_cos import CosConfig, CosS3Client
LEEWOW_API_BASE = os.getenv("LEEWOW_API_BASE", "https://leewow.com")
STS_ENDPOINT = "/v2/api/public/cos/sts/credentials"
_sts_cache: Optional[dict] = None
_sts_expires_at: float = 0
_STS_REFRESH_BUFFER_S = 120
def _fetch_sts_credentials() -> dict:
global _sts_cache, _sts_expires_at
now = time.time()
if _sts_cache and now < _sts_expires_at:
return _sts_cache
url = LEEWOW_API_BASE + STS_ENDPOINT
resp = requests.get(url, timeout=15)
resp.raise_for_status()
data = resp.json()
if not data.get("tmpSecretId"):
raise RuntimeError(f"Invalid STS response: {data}")
_sts_cache = data
_sts_expires_at = int(data.get("expiredTime", 0)) - _STS_REFRESH_BUFFER_S
return data
def _get_cos_client() -> tuple:
sts = _fetch_sts_credentials()
config = CosConfig(
Region=sts["region"],
SecretId=sts["tmpSecretId"],
SecretKey=sts["tmpSecretKey"],
Token=sts["sessionToken"],
)
return CosS3Client(config), sts["bucket"], sts["region"]
def upload_file_to_cos(file_path: str, key_prefix: str = "claw-upload") -> str:
ext = os.path.splitext(file_path)[1] or ".jpg"
key = f"{key_prefix}/{uuid.uuid4().hex}{ext}"
client, bucket, region = _get_cos_client()
content_type = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif", ".webp": "image/webp",
}.get(ext.lower(), "application/octet-stream")
with open(file_path, "rb") as fp:
client.put_object(Bucket=bucket, Body=fp, Key=key, ContentType=content_type)
return f"https://{bucket}.cos.{region}.myqcloud.com/{key}"
FILE:scripts/feishu_direct.py
#!/usr/bin/env python3
from __future__ import annotations
import hashlib
import json
import mimetypes
import os
from pathlib import Path
from threading import Lock
from typing import Any
import requests
DEFAULT_DOMAIN = "https://open.feishu.cn"
DEFAULT_RECEIVE_ID_TYPE = "chat_id"
CACHE_PATH = Path.home() / ".openclaw" / "cache" / "feishu_image_keys.json"
class FeishuDirectClient:
def __init__(
self,
app_id: str,
app_secret: str,
receive_id: str,
receive_id_type: str = DEFAULT_RECEIVE_ID_TYPE,
domain: str = DEFAULT_DOMAIN,
) -> None:
self.app_id = app_id.strip()
self.app_secret = app_secret.strip()
self.receive_id = receive_id.strip()
self.receive_id_type = receive_id_type.strip() or DEFAULT_RECEIVE_ID_TYPE
self.domain = domain.rstrip("/") or DEFAULT_DOMAIN
self._token: str | None = None
self._lock = Lock()
self._image_cache = self._load_cache()
def send_card(self, card: dict[str, Any]) -> str:
payload = {
"msg_type": "interactive",
"content": json.dumps(card, ensure_ascii=False),
}
data = self._send_message(payload)
return ((data.get("data") or {}).get("message_id") or "").strip()
def build_card(self, markdown_text: str, image_ref: str | None = None, alt_text: str = "Preview") -> tuple[dict[str, Any], bool]:
image_key = self.upload_image(image_ref) if image_ref else None
body_elements: list[dict[str, Any]] = [
{
"tag": "markdown",
"content": markdown_text,
}
]
if image_key:
body_elements.append(
{
"tag": "img",
"img_key": image_key,
"alt": {
"tag": "plain_text",
"content": alt_text,
},
}
)
return (
{
"schema": "2.0",
"config": {"wide_screen_mode": True},
"body": {"elements": body_elements},
},
bool(image_key),
)
def send_markdown_card(self, markdown_text: str, image_ref: str | None = None, alt_text: str = "Preview") -> tuple[str, bool]:
card, image_resolved = self.build_card(markdown_text, image_ref=image_ref, alt_text=alt_text)
return self.send_card(card), image_resolved
def send_text(self, markdown_text: str) -> str:
payload = {
"msg_type": "interactive",
"content": json.dumps(
{
"schema": "2.0",
"config": {"wide_screen_mode": True},
"body": {"elements": [{"tag": "markdown", "content": markdown_text}]},
},
ensure_ascii=False,
),
}
data = self._send_message(payload)
return ((data.get("data") or {}).get("message_id") or "").strip()
def send_image(self, image_ref: str) -> str:
image_key = self.upload_image(image_ref)
payload = {
"msg_type": "image",
"content": json.dumps({"image_key": image_key}, ensure_ascii=False),
}
data = self._send_message(payload)
return ((data.get("data") or {}).get("message_id") or "").strip()
def upload_image(self, image_ref: str) -> str:
image_bytes, filename = self._load_image_bytes(image_ref)
cache_key = hashlib.sha256(image_bytes).hexdigest()
with self._lock:
cached = self._image_cache.get(cache_key)
if cached:
return cached
content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
response = requests.post(
f"{self.domain}/open-apis/im/v1/images",
headers={"Authorization": f"Bearer {self._get_token()}"},
data={"image_type": "message"},
files={"image": (filename, image_bytes, content_type)},
timeout=60,
)
response.raise_for_status()
payload = response.json()
if payload.get("code") != 0:
raise RuntimeError(f"Image upload failed: {json.dumps(payload, ensure_ascii=False)}")
image_key = ((payload.get("data") or {}).get("image_key") or "").strip()
if not image_key:
raise RuntimeError(f"Image upload missing image_key: {json.dumps(payload, ensure_ascii=False)}")
with self._lock:
self._image_cache[cache_key] = image_key
self._save_cache(self._image_cache)
return image_key
def _get_token(self) -> str:
if self._token:
return self._token
with self._lock:
if self._token:
return self._token
response = requests.post(
f"{self.domain}/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": self.app_id, "app_secret": self.app_secret},
timeout=30,
)
response.raise_for_status()
payload = response.json()
if payload.get("code") != 0:
raise RuntimeError(f"Token API failed: {json.dumps(payload, ensure_ascii=False)}")
token = payload.get("tenant_access_token")
if not token:
raise RuntimeError("tenant_access_token missing from Feishu response")
self._token = str(token)
return self._token
def _send_message(self, payload: dict[str, Any]) -> dict[str, Any]:
response = requests.post(
f"{self.domain}/open-apis/im/v1/messages",
params={"receive_id_type": self.receive_id_type},
headers={
"Authorization": f"Bearer {self._get_token()}",
"Content-Type": "application/json; charset=utf-8",
},
json={"receive_id": self.receive_id, **payload},
timeout=30,
)
response.raise_for_status()
data = response.json()
if data.get("code") != 0:
raise RuntimeError(f"Send API failed: {json.dumps(data, ensure_ascii=False)}")
return data
@staticmethod
def _load_image_bytes(image_ref: str) -> tuple[bytes, str]:
image_path = Path(image_ref).expanduser()
if image_path.is_file():
return image_path.read_bytes(), image_path.name
response = requests.get(image_ref, timeout=30)
response.raise_for_status()
filename = Path(image_ref.split("?", 1)[0]).name or "image.png"
return response.content, filename
@staticmethod
def _load_cache() -> dict[str, str]:
if not CACHE_PATH.exists():
return {}
try:
data = json.loads(CACHE_PATH.read_text(encoding="utf-8"))
if isinstance(data, dict):
return {str(key): str(value) for key, value in data.items()}
except (json.JSONDecodeError, OSError):
pass
return {}
@staticmethod
def _save_cache(data: dict[str, str]) -> None:
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
CACHE_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def resolve_feishu_delivery_config(params: dict[str, Any] | None = None) -> tuple[str, str, str, str, str]:
args = params or {}
app_id = str(args.get("feishu_app_id") or os.getenv("FEISHU_APP_ID") or "").strip()
app_secret = str(args.get("feishu_app_secret") or os.getenv("FEISHU_APP_SECRET") or "").strip()
receive_id = str(args.get("feishu_target") or os.getenv("FEISHU_RECEIVE_ID") or "").strip()
receive_id_type = str(
args.get("feishu_receive_id_type") or os.getenv("FEISHU_RECEIVE_ID_TYPE") or DEFAULT_RECEIVE_ID_TYPE
).strip()
domain = str(args.get("feishu_open_base") or os.getenv("FEISHU_OPEN_BASE") or DEFAULT_DOMAIN).strip()
if not app_id or not app_secret:
raise RuntimeError(
"operator_action_required: missing FEISHU_APP_ID / FEISHU_APP_SECRET for direct Feishu delivery"
)
if not receive_id:
raise RuntimeError(
"operator_action_required: missing feishu_target/current conversation target for direct Feishu delivery"
)
return app_id, app_secret, receive_id, receive_id_type, domain
FILE:scripts/feishu_markdown_resolve.py
#!/usr/bin/env python3
from __future__ import annotations
"""Resolve Markdown image refs to Feishu message image_key for IM markdown.
Mirrors leewow-skills/scripts/channel_messaging/feishu.py upload/replace logic.
On missing credentials or per-image failure, callers can fall back to safe
preview links instead of broken markdown images.
"""
import hashlib
import json
import mimetypes
import os
import re
import threading
from pathlib import Path
from typing import Dict
import requests
DEFAULT_DOMAIN = "https://open.feishu.cn"
MARKDOWN_IMAGE_RE = re.compile(r"!\[([^\]]*)\]\(([^)]+)\)")
CACHE_PATH = Path.home() / ".openclaw" / "cache" / "feishu_image_keys.json"
def _env(name: str, default: str | None = None) -> str | None:
value = os.getenv(name, default)
return value.strip() if isinstance(value, str) else value
def feishu_resolve_credentials_ready() -> bool:
return bool(_env("FEISHU_APP_ID") and _env("FEISHU_APP_SECRET"))
def _load_cache() -> dict[str, str]:
if not CACHE_PATH.exists():
return {}
try:
data = json.loads(CACHE_PATH.read_text(encoding="utf-8"))
if isinstance(data, dict):
return {str(k): str(v) for k, v in data.items()}
except (json.JSONDecodeError, OSError):
pass
return {}
def _save_cache(data: dict[str, str]) -> None:
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
CACHE_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def _looks_like_remote_url(ref: str) -> bool:
return ref.startswith("http://") or ref.startswith("https://")
def _looks_like_feishu_image_key(ref: str) -> bool:
# Already resolved keys should not be fetched/uploaded again
return ref.startswith("img_")
class FeishuMarkdownImageResolver:
"""One instance per browse batch: reuses tenant token and upload cache."""
def __init__(self) -> None:
self.domain = (_env("FEISHU_OPEN_BASE", DEFAULT_DOMAIN) or DEFAULT_DOMAIN).rstrip("/")
self.app_id = _env("FEISHU_APP_ID") or ""
self.app_secret = _env("FEISHU_APP_SECRET") or ""
self.token: str | None = None
self.image_cache = _load_cache()
self._enabled = bool(self.app_id and self.app_secret)
self._lock = threading.Lock()
def resolve(self, markdown: str) -> tuple[str, bool]:
"""Return (markdown, True) if any  was replaced with image_key."""
if not self._enabled or not markdown:
return markdown, False
try:
token = self._get_token()
except Exception:
return markdown, False
replacements: Dict[str, str] = {}
any_changed = False
def replace(match: re.Match[str]) -> str:
nonlocal any_changed
alt = match.group(1)
image_ref = match.group(2).strip()
if _looks_like_feishu_image_key(image_ref):
return match.group(0)
if not _looks_like_remote_url(image_ref):
p = Path(image_ref).expanduser()
if not p.is_file():
return match.group(0)
if image_ref not in replacements:
try:
replacements[image_ref] = self._upload_image(token, image_ref)
except Exception:
replacements[image_ref] = image_ref
new_ref = replacements[image_ref]
if new_ref != image_ref:
any_changed = True
return f""
out = MARKDOWN_IMAGE_RE.sub(replace, markdown)
return out, any_changed
def resolve_image_ref(self, image_ref: str) -> str | None:
"""Resolve one local/remote image reference to a Feishu image_key.
Returns None when credentials are unavailable or the upload fails.
"""
if not self._enabled:
return None
try:
token = self._get_token()
return self._upload_image(token, image_ref)
except Exception:
return None
def _get_token(self) -> str:
if self.token:
return self.token
with self._lock:
if self.token:
return self.token
response = requests.post(
f"{self.domain}/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": self.app_id, "app_secret": self.app_secret},
timeout=30,
)
response.raise_for_status()
payload = response.json()
if payload.get("code") != 0:
raise RuntimeError(f"Token API failed: {json.dumps(payload, ensure_ascii=False)}")
token = payload.get("tenant_access_token")
if not token:
raise RuntimeError("tenant_access_token missing from Feishu response")
self.token = str(token)
return self.token
def _upload_image(self, token: str, image_ref: str) -> str:
image_bytes, filename = self._load_image_bytes(image_ref)
cache_key = hashlib.sha256(image_bytes).hexdigest()
with self._lock:
cached = self.image_cache.get(cache_key)
if cached:
return cached
content_type = mimetypes.guess_type(filename)[0] or "application/octet-stream"
response = requests.post(
f"{self.domain}/open-apis/im/v1/images",
headers={"Authorization": f"Bearer {token}"},
data={"image_type": "message"},
files={"image": (filename, image_bytes, content_type)},
timeout=60,
)
response.raise_for_status()
payload = response.json()
if payload.get("code") != 0:
raise RuntimeError(f"Image upload failed: {json.dumps(payload, ensure_ascii=False)}")
image_key = ((payload.get("data") or {}).get("image_key") or "").strip()
if not image_key:
raise RuntimeError(f"Image upload missing image_key: {json.dumps(payload, ensure_ascii=False)}")
with self._lock:
cached = self.image_cache.get(cache_key)
if cached:
return cached
self.image_cache[cache_key] = image_key
_save_cache(self.image_cache)
return image_key
@staticmethod
def _load_image_bytes(image_ref: str) -> tuple[bytes, str]:
image_path = Path(image_ref).expanduser()
if image_path.is_file():
return image_path.read_bytes(), image_path.name
response = requests.get(image_ref, timeout=30)
response.raise_for_status()
filename = Path(image_ref.split("?", 1)[0]).name or "image.png"
return response.content, filename
def resolve_feishu_markdown_images(markdown: str) -> tuple[str, bool]:
"""Convenience: one-shot resolve using a fresh resolver (prefer batch FeishuMarkdownImageResolver)."""
return FeishuMarkdownImageResolver().resolve(markdown)
def fallback_markdown_images_to_links(markdown: str) -> str:
"""Replace markdown images with plain preview links.
This avoids shipping broken `` markdown to Feishu when
image_key resolution is unavailable.
"""
def replace(match: re.Match[str]) -> str:
alt = match.group(1).strip() or "Preview"
image_ref = match.group(2).strip()
if _looks_like_feishu_image_key(image_ref):
return match.group(0)
return f"[Preview: {alt}]({image_ref})"
return MARKDOWN_IMAGE_RE.sub(replace, markdown)
FILE:scripts/generate.py
#!/usr/bin/env python3
"""Upload image to COS, call /claw/generate, output structured JSON for agent follow-up."""
import argparse
import json
import os
# Load environment variables from ~/.openclaw/.env
def _load_env_file():
env_path = os.path.expanduser("~/.openclaw/.env")
if os.path.exists(env_path):
with open(env_path, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
if key not in os.environ:
os.environ[key] = value
_load_env_file()
from claw_auth import claw_post, sign_url
from cos_uploader import upload_file_to_cos
CLAW_BASE_URL = os.getenv("CLAW_BASE_URL", "https://leewow.com")
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
CLAW_SK = os.getenv("CLAW_SK", "")
def generate_preview(image_path: str, template_id: int,
design_theme: str = "", aspect_ratio: str = "3:4") -> dict:
"""Upload image and trigger generation. Returns dict with task info."""
if not CLAW_SK:
return {"error": "CLAW_SK environment variable is not set."}
if not os.path.exists(image_path):
return {"error": f"Image file not found: {image_path}"}
# Ensure image is in workspace directory
workspace_dir = os.path.expanduser("~/.openclaw/workspace")
abs_image_path = os.path.abspath(image_path)
if not abs_image_path.startswith(workspace_dir):
return {"error": f"Image must be in workspace directory: {workspace_dir}"}
try:
image_url = upload_file_to_cos(image_path, key_prefix="prod-h5-generation-upload/claw-skill")
except Exception as e:
return {"error": f"Failed to upload image: {e}"}
url = f"{CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/generate"
payload = {
"templateId": template_id,
"imageUrl": image_url,
"designTheme": design_theme,
"aspectRatio": aspect_ratio,
}
try:
resp = claw_post(CLAW_SK, url, json_data=payload, timeout=30)
data = resp.json()
except Exception as e:
return {"error": f"Failed to call generate API: {e}"}
if data.get("code") != 0:
return {"error": f"API returned: {data.get('message', 'Unknown error')}"}
result = data.get("data", {})
result["_success"] = True
return result
def _safe_result(result: dict) -> dict:
"""Build safe output for the agent.
The browse step returns final markdown directly. Generate returns task state only,
so the agent can continue to poll with get_generation_status.
"""
if "error" in result:
return result
task_id = result.get("taskId", "unknown")
preview_url = result.get("previewUrl")
out = {
"taskId": task_id,
"status": result.get("status", "PENDING"),
"estimatedSeconds": result.get("estimatedSeconds", 45),
"templateId": result.get("templateId"),
}
if preview_url and CLAW_SK:
out["purchaseUrl"] = sign_url(CLAW_SK, preview_url)
return out
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--image-path", type=str, required=True)
parser.add_argument("--template-id", type=int, required=True)
parser.add_argument("--design-theme", type=str, default="")
parser.add_argument("--aspect-ratio", type=str, default="3:4")
parser.add_argument("--json", action="store_true", help="(compat) always outputs safe JSON")
args = parser.parse_args()
result = generate_preview(args.image_path, args.template_id, args.design_theme, args.aspect_ratio)
print(json.dumps(_safe_result(result), ensure_ascii=False, indent=2))
FILE:scripts/get_status.py
#!/usr/bin/env python3
from __future__ import annotations
"""Query generation task status and output structured JSON for agent sending.
API response structure (GET /claw/task/{taskId}):
{
"code": 0,
"data": {
"taskId": "gen_xxx",
"status": "COMPLETED", // PENDING | ANALYZING | GENERATING | CREATING_PRODUCT | COMPLETED | FAILED
"result": { // only when COMPLETED
"renderedImageUrl": "https://...myqcloud.com/...",
"templateId": 15,
"customizedProductId": null // may be null if product creation is pending
},
"errorMessage": "...", // only when FAILED
"retryCount": 0 // only when FAILED
}
}
The task API does NOT return a previewUrl.
Preview URL format: {CLAW_PREVIEW_BASE_URL}/{taskId} (configured via claw.preview-base-url on the Java side).
Python signs this URL with skid/sig so the preview page can exchange for a JWT.
"""
import argparse
import json
import os
import sys
import time
def _load_env_file():
env_path = os.path.expanduser("~/.openclaw/.env")
if os.path.exists(env_path):
with open(env_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
if key not in os.environ:
os.environ[key] = value
_load_env_file()
from claw_auth import claw_get, sign_url
from feishu_direct import FeishuDirectClient, resolve_feishu_delivery_config
import requests
CLAW_BASE_URL = os.getenv("CLAW_BASE_URL", "https://leewow.com")
CLAW_PATH_PREFIX = os.getenv("CLAW_PATH_PREFIX", "")
CLAW_SK = os.getenv("CLAW_SK", "")
CLAW_PREVIEW_BASE_URL = os.getenv("CLAW_PREVIEW_BASE_URL", f"{CLAW_BASE_URL}/claw/preview")
WORKSPACE_DIR = os.path.expanduser("~/.openclaw/workspace")
def get_task_status(task_id: str, download_image: bool = True) -> dict:
"""Query task status, download preview image, build signed purchase URL."""
if not CLAW_SK:
return {"error": "CLAW_SK environment variable is not set."}
url = f"{CLAW_BASE_URL}{CLAW_PATH_PREFIX}/claw/task/{task_id}"
try:
resp = claw_get(CLAW_SK, url, timeout=15)
data = resp.json()
except Exception as e:
return {"error": f"Failed to fetch task status: {e}"}
if data.get("code") != 0:
return {"error": f"API returned: {data.get('message', 'Unknown error')}"}
raw = data.get("data", {})
status = raw.get("status", "UNKNOWN")
result_nested = raw.get("result", {}) or {}
out = {
"taskId": raw.get("taskId", task_id),
"status": status,
"templateId": result_nested.get("templateId"),
}
if status == "COMPLETED":
rendered_url = result_nested.get("renderedImageUrl")
if download_image and rendered_url:
local_path = _download_preview(rendered_url, task_id)
if local_path:
out["localImagePath"] = local_path
preview_url = f"{CLAW_PREVIEW_BASE_URL}/{task_id}"
out["purchaseUrl"] = sign_url(CLAW_SK, preview_url)
out["replyMarkdown"] = _build_completed_reply_markdown(
task_id=out["taskId"],
template_id=out.get("templateId"),
purchase_url=out["purchaseUrl"],
)
out["deliveryMode"] = "message_tool_media_then_text"
calls = []
if out.get("localImagePath"):
calls.append(
{
"action": "send",
"channel": "feishu",
"filePath": out["localImagePath"],
"message": "",
}
)
calls.append(
{
"action": "send",
"channel": "feishu",
"message": out["replyMarkdown"],
}
)
out["messageToolCalls"] = calls
out["finalAssistantReply"] = "NO_REPLY"
elif status == "FAILED":
out["errorMessage"] = raw.get("errorMessage", "Unknown error")
out["replyMarkdown"] = _build_failed_reply_markdown(
task_id=out["taskId"],
error_message=out["errorMessage"],
)
out["deliveryMode"] = "message_tool_text_only"
out["messageToolCalls"] = [
{
"action": "send",
"channel": "feishu",
"message": out["replyMarkdown"],
}
]
out["finalAssistantReply"] = "NO_REPLY"
else:
out["replyMarkdown"] = _build_pending_reply_markdown(
task_id=out["taskId"],
template_id=out.get("templateId"),
status=status,
)
out["deliveryMode"] = "message_tool_text_only"
out["messageToolCalls"] = [
{
"action": "send",
"channel": "feishu",
"message": out["replyMarkdown"],
}
]
out["finalAssistantReply"] = "NO_REPLY"
return out
def send_task_result_to_feishu(task_id: str, poll: bool = True, timeout: int = 120, download_image: bool = True, params: dict | None = None) -> dict:
"""Poll/get task status, then deliver preview result directly to Feishu."""
result = poll_until_complete(task_id, timeout, download_image=download_image) if poll else get_task_status(task_id, download_image=download_image)
if "error" in result:
return result
try:
app_id, app_secret, receive_id, receive_id_type, domain = resolve_feishu_delivery_config(params)
except Exception as exc:
return {"error": str(exc), **result}
client = FeishuDirectClient(
app_id=app_id,
app_secret=app_secret,
receive_id=receive_id,
receive_id_type=receive_id_type,
domain=domain,
)
message_ids: list[str] = []
image_resolved = False
if result.get("status") == "COMPLETED":
preview_markdown = _build_completed_preview_card_markdown(
task_id=result.get("taskId", task_id),
template_id=result.get("templateId"),
purchase_url=result.get("purchaseUrl", ""),
)
message_id, image_resolved = client.send_markdown_card(
markdown_text=preview_markdown,
image_ref=result.get("localImagePath"),
alt_text=f"preview-{result.get('taskId', task_id)}",
)
message_ids.append(message_id)
elif result.get("replyMarkdown"):
message_ids.append(client.send_text(result["replyMarkdown"]))
return {
"ok": True,
"taskId": result.get("taskId", task_id),
"status": result.get("status"),
"templateId": result.get("templateId"),
"mode": "direct_feishu_send",
"messageIds": message_ids,
"messageCount": len(message_ids),
"feishuImagesResolved": image_resolved,
"finalAssistantReply": "NO_REPLY",
}
def _download_preview(image_url: str, task_id: str) -> str | None:
"""Download preview image to workspace/previews/."""
try:
resp = requests.get(image_url, timeout=30)
resp.raise_for_status()
ext = ".png"
ct = resp.headers.get("content-type", "")
if "jpeg" in ct or "jpg" in ct:
ext = ".jpg"
elif "webp" in ct:
ext = ".webp"
previews_dir = os.path.join(WORKSPACE_DIR, "previews")
os.makedirs(previews_dir, exist_ok=True)
filepath = os.path.join(previews_dir, f"leewow_preview_{task_id}{ext}")
with open(filepath, "wb") as f:
f.write(resp.content)
return filepath
except Exception as e:
print(f"Warning: Failed to download preview image: {e}", file=sys.stderr)
return None
def poll_until_complete(task_id: str, timeout: int = 120, download_image: bool = True) -> dict:
"""Poll task status until COMPLETED / FAILED / timeout."""
start = time.time()
while time.time() - start < timeout:
result = get_task_status(task_id, download_image=False)
if "error" in result:
return result
status = result.get("status", "UNKNOWN")
if status == "COMPLETED":
return get_task_status(task_id, download_image=download_image)
if status == "FAILED":
return result
time.sleep(5)
return {"error": f"Timeout after {timeout}s", "taskId": task_id, "status": "TIMEOUT"}
def _build_completed_reply_markdown(task_id: str, template_id, purchase_url: str) -> str:
lines = [
"你的定制效果图出来啦 🎉",
f"🛒 点击下单购买: {purchase_url}",
f"📦 模板ID: {template_id or '?'} | 任务ID: {task_id}",
"",
"喜欢吗?如果想调整或者试试其他产品,告诉我!",
]
return "\n".join(lines)
def _build_completed_preview_card_markdown(task_id: str, template_id, purchase_url: str) -> str:
lines = [
"## 你的定制效果图出来啦 🎉",
"效果图已经生成,可以直接查看并继续下单。",
f"**Template ID:** `{template_id or '?'}`",
f"**Task ID:** `{task_id}`",
"",
f"[🛒 点击下单购买]({purchase_url})",
"",
"如果想调整或试试其他产品,告诉我!",
]
return "\n".join(lines)
def _build_failed_reply_markdown(task_id: str, error_message: str) -> str:
return (
f"## 生成失败 ❌\n\n"
f"**任务ID**: `{task_id}`\n"
f"**原因**: {error_message}\n\n"
f"可以换张图片或者换个模板再试一次。"
)
def _build_pending_reply_markdown(task_id: str, template_id, status: str) -> str:
return (
f"## 正在生成中 ⏳\n\n"
f"**任务ID**: `{task_id}` | **模板**: #{template_id or '?'}\n"
f"**状态**: {status}\n\n"
f"请稍等,正在努力生成效果图..."
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Query Leewow generation task status")
parser.add_argument("task_id", help="Task ID from generate_preview")
parser.add_argument("--poll", action="store_true", help="Poll until complete")
parser.add_argument("--timeout", type=int, default=120, help="Poll timeout in seconds")
parser.add_argument("--no-download", action="store_true", help="Skip downloading preview image")
parser.add_argument("--json", action="store_true", help="(compat) output JSON")
parser.add_argument("--direct-feishu-send", action="store_true", help="Send result directly to Feishu and output send result JSON")
parser.add_argument("--feishu-target", type=str, default=None)
parser.add_argument("--feishu-receive-id-type", type=str, default=None)
parser.add_argument("--feishu-app-id", type=str, default=None)
parser.add_argument("--feishu-app-secret", type=str, default=None)
parser.add_argument("--feishu-open-base", type=str, default=None)
args = parser.parse_args()
if args.direct_feishu_send:
result = send_task_result_to_feishu(
task_id=args.task_id,
poll=args.poll,
timeout=args.timeout,
download_image=not args.no_download,
params={
"feishu_target": args.feishu_target,
"feishu_receive_id_type": args.feishu_receive_id_type,
"feishu_app_id": args.feishu_app_id,
"feishu_app_secret": args.feishu_app_secret,
"feishu_open_base": args.feishu_open_base,
},
)
elif args.poll:
result = poll_until_complete(args.task_id, args.timeout, download_image=not args.no_download)
else:
result = get_task_status(args.task_id, download_image=not args.no_download)
print(json.dumps(result, ensure_ascii=False, indent=2))
FILE:tools.json
{
"tools": [
{
"name": "browse_templates",
"description": "Primary entrypoint when the user asks what products/items/templates can be customized. Use this before any broad Leewow capability summary. Python sends real product cards to Feishu directly and returns only send results plus finalAssistantReply=NO_REPLY.",
"executor": "bash",
"command": "python3 scripts/browse.py --category '{{category}}' --count {{count}} --channel '{{channel}}' --json --feishu-target '{{feishu_target}}' --feishu-receive-id-type '{{feishu_receive_id_type}}' --feishu-app-id '{{feishu_app_id}}' --feishu-app-secret '{{feishu_app_secret}}' --feishu-open-base '{{feishu_open_base}}'",
"parameters": {
"type": "object",
"properties": {
"category": {
"type": "string",
"description": "Optional category filter (e.g. 'bag', 'accessory', 'home', 'apparel'). Leave empty to browse all.",
"default": ""
},
"count": {
"type": "integer",
"description": "Number of products/messages to return (1-10, default 5)",
"default": 5,
"minimum": 1,
"maximum": 10
},
"channel": {
"type": "string",
"description": "Delivery channel. Use 'feishu' for current production flow.",
"default": "feishu"
},
"feishu_target": {
"type": "string",
"description": "Optional Feishu receive_id override. Leave empty to use FEISHU_RECEIVE_ID from skill env.",
"default": ""
},
"feishu_receive_id_type": {
"type": "string",
"description": "Optional Feishu receive_id_type override, e.g. chat_id/open_id. Leave empty to use FEISHU_RECEIVE_ID_TYPE or default chat_id.",
"default": ""
},
"feishu_app_id": {
"type": "string",
"description": "Optional Feishu App ID override. Leave empty to use FEISHU_APP_ID from skill env.",
"default": ""
},
"feishu_app_secret": {
"type": "string",
"description": "Optional Feishu App Secret override. Leave empty to use FEISHU_APP_SECRET from skill env.",
"default": ""
},
"feishu_open_base": {
"type": "string",
"description": "Optional Feishu Open API base override. Leave empty to use default https://open.feishu.cn.",
"default": ""
}
}
}
},
{
"name": "generate_preview",
"description": "Upload a design image and start generation. Returns task state only. Then call get_generation_status(task_id, poll=true) to get the preview image path plus the final reply markdown. Do NOT fabricate or guess any URL — only use purchaseUrl/replyMarkdown from get_generation_status.",
"executor": "bash",
"command": "python3 scripts/generate.py --image-path '{{image_path}}' --template-id {{template_id}} --design-theme '{{design_theme}}' --aspect-ratio '{{aspect_ratio}}' --json",
"parameters": {
"type": "object",
"properties": {
"image_path": {
"type": "string",
"description": "Local file path of the design image. MUST be in workspace directory: ~/.openclaw/workspace/"
},
"template_id": {
"type": "integer",
"description": "Product template ID (from browse_templates results)"
},
"design_theme": {
"type": "string",
"description": "Design theme or style description (e.g. 'cute cartoon', 'minimalist')",
"default": ""
},
"aspect_ratio": {
"type": "string",
"description": "Aspect ratio: '3:4', '1:1', '4:3'",
"default": "3:4"
}
},
"required": ["image_path", "template_id"]
}
},
{
"name": "get_generation_status",
"description": "Check generation task status and, when possible, send the preview result to Feishu directly from Python. On success, returns only send results plus finalAssistantReply=NO_REPLY. This tool requires the same Feishu direct-send configuration as browse_templates, and the agent should pass feishu_target from the current conversation when available.",
"executor": "bash",
"command": "python3 scripts/get_status.py '{{task_id}}' {{#if poll}}--poll{{/if}} --timeout {{timeout}} --json --direct-feishu-send --feishu-target '{{feishu_target}}' --feishu-receive-id-type '{{feishu_receive_id_type}}' --feishu-app-id '{{feishu_app_id}}' --feishu-app-secret '{{feishu_app_secret}}' --feishu-open-base '{{feishu_open_base}}'",
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": "Task ID returned by generate_preview"
},
"poll": {
"type": "boolean",
"description": "Wait and poll until generation completes",
"default": true
},
"timeout": {
"type": "integer",
"description": "Maximum wait time in seconds",
"default": 120
},
"feishu_target": {
"type": "string",
"description": "Optional Feishu receive_id override. Leave empty to use FEISHU_RECEIVE_ID from skill env.",
"default": ""
},
"feishu_receive_id_type": {
"type": "string",
"description": "Optional Feishu receive_id_type override, e.g. chat_id/open_id. Leave empty to use FEISHU_RECEIVE_ID_TYPE or default chat_id.",
"default": ""
},
"feishu_app_id": {
"type": "string",
"description": "Optional Feishu App ID override. Leave empty to use FEISHU_APP_ID from skill env.",
"default": ""
},
"feishu_app_secret": {
"type": "string",
"description": "Optional Feishu App Secret override. Leave empty to use FEISHU_APP_SECRET from skill env.",
"default": ""
},
"feishu_open_base": {
"type": "string",
"description": "Optional Feishu Open API base override. Leave empty to use default https://open.feishu.cn.",
"default": ""
}
},
"required": ["task_id"]
}
},
{
"name": "cos_presign_url",
"description": "Generate a presigned URL for accessing COS (Tencent Cloud Object Storage) images. Useful for private buckets or when images need temporary access tokens.",
"executor": "bash",
"command": "python3 scripts/cos_presign.py '{{cos_url}}' --expired {{expired}} --json",
"parameters": {
"type": "object",
"properties": {
"cos_url": {
"type": "string",
"description": "COS URL to generate presigned URL for (e.g., https://bucket.cos.region.myqcloud.com/key.png)"
},
"expired": {
"type": "integer",
"description": "URL expiration time in seconds (default: 3600 = 1 hour)",
"default": 3600
}
},
"required": ["cos_url"]
}
}
]
}