@clawhub-370299455cx-web-4df5d31a54
流式视频处理工具集 - 压缩、封面提取、音频转换,无需下载完整视频
---
name: ym-meidatoolkit
version: 1.1.0
description: 流式视频处理工具集 - 压缩、封面提取、音频转换,无需下载完整视频
author: your_name
tags:
- video
- compression
- thumbnail
- audio
- streaming
- ffmpeg
categories:
- media
- utility
clawhub:
entrypoint: python run.py
runtime: python3
http_port: 8080
---
# Video Streaming Toolkit
## 概述
一个高性能的流式视频处理 Skill,**无需下载完整视频文件**即可完成:
- ✅ **视频压缩** - 保持清晰度,体积可压缩至 1/10,根据情况输出多个尺寸小尺寸视频可供选择
- ✅ **封面提取** - 任意时间点或帧号提取封面
- ✅ **音频提取** - 转成 MP3 / WAV / AAC / M4A 格式
所有操作均采用**流式处理**,边下载边处理,大幅节省时间和磁盘空间。
---
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
FILE:video_compressor.py
"""
流式视频压缩 - 无需下载完整文件
"""
import subprocess
import requests
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
def get_remote_file_size(url: str) -> int:
"""获取远程文件大小(不下载)"""
try:
response = requests.head(url, timeout=10)
if 'content-length' in response.headers:
return int(response.headers['content-length'])
except Exception as e:
logger.warning(f"获取文件大小失败: {e}")
return 0
def compress_video_streaming(
video_url: str,
output_path: str = None,
target_ratio: float = 0.1,
crf: int = 24,
preset: str = 'veryfast'
) -> dict:
"""
流式压缩视频 - ffmpeg 直接处理 URL
Args:
video_url: 视频 URL
output_path: 输出路径(可选)
target_ratio: 目标体积比例(用于检查,不自动重试)
crf: CRF值(18-28,越大体积越小)
preset: 编码预设(ultrafast/veryfast/fast/medium/slow)
Returns:
{'status': 'success', 'output_path': str, 'original_size_mb': float,
'new_size_mb': float, 'ratio': float}
"""
if output_path is None:
output_path = f"compressed_{Path(video_url).stem}.mp4"
# 获取原始文件大小
original_size = get_remote_file_size(video_url)
# ffmpeg 流式压缩命令
cmd = [
'ffmpeg',
'-i', video_url, # 直接输入 URL
'-c:v', 'libx264',
'-preset', preset,
'-crf', str(crf),
'-g', '30',
'-keyint_min', '30',
'-sc_threshold', '0',
'-bf', '0',
'-refs', '1',
'-vsync', 'cfr',
'-c:a', 'aac',
'-b:a', '128k',
'-movflags', '+faststart',
'-y',
output_path
]
logger.info(f"开始流式压缩: {video_url[:80]}...")
logger.info(f"输出文件: {output_path}")
try:
# 执行压缩,实时显示进度
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
# 实时打印 ffmpeg 进度
last_progress = ""
for line in process.stderr:
if 'frame=' in line and 'speed=' in line:
# 提取进度信息
progress = line.strip()
if progress != last_progress:
logger.info(f"进度: {progress}")
last_progress = progress
# 等待完成
return_code = process.wait()
if return_code != 0:
return {
'status': 'error',
'message': f'ffmpeg 错误,返回码: {return_code}'
}
# 检查输出文件
if not Path(output_path).exists():
return {'status': 'error', 'message': '输出文件未生成'}
new_size = Path(output_path).stat().st_size
actual_ratio = new_size / original_size if original_size > 0 else 0
logger.info(f"压缩完成: {original_size/(1024*1024):.2f}MB -> {new_size/(1024*1024):.2f}MB, 比例: {actual_ratio:.2f}")
return {
'status': 'success',
'output_path': output_path,
'original_size_mb': round(original_size / (1024 * 1024), 2) if original_size else 0,
'new_size_mb': round(new_size / (1024 * 1024), 2),
'ratio': round(actual_ratio, 3),
'crf_used': crf,
'streaming': True
}
except subprocess.TimeoutExpired:
return {'status': 'error', 'message': '压缩超时(300秒)'}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def compress_with_adaptive_crf(
video_url: str,
output_path: str = None,
target_ratio: float = 0.1,
max_attempts: int = 3
) -> dict:
"""
自适应 CRF 压缩 - 自动调整参数直到达到目标比例
"""
crf_values = [24, 26, 28, 30] # 依次尝试
best_result = None
for i, crf in enumerate(crf_values[:max_attempts]):
logger.info(f"尝试 {i+1}/{max_attempts}: CRF={crf}")
result = compress_video_streaming(
video_url=video_url,
output_path=output_path if i == 0 else f"{output_path}.try{i+1}.mp4",
target_ratio=target_ratio,
crf=crf
)
if result['status'] != 'success':
continue
if result['ratio'] <= target_ratio:
# 达到目标,移动文件到最终位置
if i > 0 and output_path:
import shutil
shutil.move(result['output_path'], output_path)
result['output_path'] = output_path
return result
best_result = result
# 未达到目标,返回最好的结果
if best_result:
logger.warning(f"未达到目标比例 {target_ratio},最佳比例: {best_result['ratio']}")
if best_result['output_path'] != output_path and output_path:
import shutil
shutil.move(best_result['output_path'], output_path)
best_result['output_path'] = output_path
return best_result
return {'status': 'error', 'message': '所有压缩尝试均失败'}
FILE:run.py
#!/usr/bin/env python3
"""
ClawHub Skill 统一入口 - 流式视频处理
支持:
1. 压缩: ffmpeg 流式处理,无需下载
2. 封面: 部分下载,只取需要的帧
3. 音频: 流式提取,转 MP3/WAV
"""
import sys
import json
import argparse
import logging
from pathlib import Path
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 导入模块
from frame_extractor import extract_thumbnail_from_url
from video_compressor import compress_video_streaming, compress_with_adaptive_crf
from audio_extractor import extract_audio_streaming, extract_audio_batch, get_audio_info
def handle_compress(params: dict) -> dict:
"""处理视频压缩请求"""
video_url = params.get('video_url')
if not video_url:
return {'status': 'error', 'message': 'Missing video_url'}
output_path = params.get('output_path')
target_ratio = params.get('target_ratio', 0.1)
adaptive = params.get('adaptive', True)
crf = params.get('crf', 24)
preset = params.get('preset', 'veryfast')
logger.info(f"压缩请求: {video_url[:80]}...")
if adaptive:
result = compress_with_adaptive_crf(
video_url=video_url,
output_path=output_path,
target_ratio=target_ratio,
max_attempts=params.get('max_attempts', 3)
)
else:
result = compress_video_streaming(
video_url=video_url,
output_path=output_path,
target_ratio=target_ratio,
crf=crf,
preset=preset
)
return result
def handle_thumbnail(params: dict) -> dict:
"""处理封面提取请求"""
video_url = params.get('video_url')
if not video_url:
return {'status': 'error', 'message': 'Missing video_url'}
time_seconds = params.get('time_seconds')
frame_number = params.get('frame_number')
if time_seconds is None and frame_number is None:
time_seconds = 0
save_path = params.get('save_path')
resize_width = params.get('resize_width')
quality = params.get('quality', 85)
logger.info(f"封面提取: {video_url[:80]}... time={time_seconds}, frame={frame_number}")
result = extract_thumbnail_from_url(
video_url=video_url,
time_seconds=time_seconds,
frame_number=frame_number,
save_path=save_path,
resize_width=resize_width,
quality=quality
)
return result
def handle_audio(params: dict) -> dict:
"""处理音频提取请求"""
video_url = params.get('video_url')
if not video_url:
return {'status': 'error', 'message': 'Missing video_url'}
output_path = params.get('output_path')
audio_format = params.get('format', 'mp3') # mp3, wav, aac, m4a
audio_bitrate = params.get('bitrate', '128k')
sample_rate = params.get('sample_rate', 44100)
channels = params.get('channels', 2)
start_time = params.get('start_time')
duration = params.get('duration')
# 格式验证
if audio_format not in ['mp3', 'wav', 'aac', 'm4a']:
return {'status': 'error', 'message': f'Unsupported format: {audio_format}. Supported: mp3, wav, aac, m4a'}
logger.info(f"音频提取: {video_url[:80]}... format={audio_format}, bitrate={audio_bitrate}")
result = extract_audio_streaming(
video_url=video_url,
output_path=output_path,
audio_format=audio_format,
audio_bitrate=audio_bitrate,
sample_rate=sample_rate,
channels=channels,
start_time=start_time,
duration=duration
)
return result
def handle_audio_batch(params: dict) -> dict:
"""批量音频提取"""
videos = params.get('videos', [])
if not videos:
return {'status': 'error', 'message': 'Missing videos list'}
output_dir = params.get('output_dir', './audio_output')
audio_format = params.get('format', 'mp3')
audio_bitrate = params.get('bitrate', '128k')
sample_rate = params.get('sample_rate', 44100)
logger.info(f"批量音频提取: {len(videos)} 个视频, 格式={audio_format}")
result = extract_audio_batch(
videos=videos,
output_dir=output_dir,
audio_format=audio_format,
audio_bitrate=audio_bitrate,
sample_rate=sample_rate
)
return result
def handle_audio_info(params: dict) -> dict:
"""获取视频音频流信息"""
video_url = params.get('video_url')
if not video_url:
return {'status': 'error', 'message': 'Missing video_url'}
logger.info(f"获取音频信息: {video_url[:80]}...")
result = get_audio_info(video_url)
return result
def handle_batch(params: dict) -> dict:
"""批量处理(压缩/封面)"""
videos = params.get('videos', [])
action = params.get('action', 'thumbnail')
if not videos:
return {'status': 'error', 'message': 'Missing videos list'}
results = []
for i, video in enumerate(videos):
logger.info(f"批量处理 [{i+1}/{len(videos)}]")
if action == 'compress':
res = handle_compress(video)
elif action == 'audio':
res = handle_audio(video)
else:
res = handle_thumbnail(video)
results.append(res)
success_count = sum(1 for r in results if r.get('status') == 'success')
return {
'status': 'success',
'total': len(results),
'success': success_count,
'failed': len(results) - success_count,
'results': results
}
def handle_info(params: dict) -> dict:
"""获取视频信息"""
video_url = params.get('video_url')
if not video_url:
return {'status': 'error', 'message': 'Missing video_url'}
from frame_extractor import RemoteVideoFrameExtractor
try:
extractor = RemoteVideoFrameExtractor(video_url, timeout=30)
info = extractor.get_video_info()
info['file_size_mb'] = round(extractor.file_size / (1024 * 1024), 2)
return {'status': 'success', 'info': info}
except Exception as e:
return {'status': 'error', 'message': str(e)}
# Action 映射
ACTIONS = {
'compress': handle_compress,
'thumbnail': handle_thumbnail,
'audio': handle_audio,
'audio_batch': handle_audio_batch,
'audio_info': handle_audio_info,
'batch': handle_batch,
'info': handle_info
}
def run_cli():
"""命令行模式"""
parser = argparse.ArgumentParser(description='Video Streaming Skill')
parser.add_argument('--input', '-i', required=True, help='Input JSON string or file path')
parser.add_argument('--action', '-a', choices=ACTIONS.keys(), help='Action to perform')
args = parser.parse_args()
try:
if Path(args.input).exists():
with open(args.input, 'r') as f:
params = json.load(f)
else:
params = json.loads(args.input)
except json.JSONDecodeError:
params = {'action': args.action} if args.action else {}
for pair in args.input.split():
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
action = params.get('action')
if not action and args.action:
action = args.action
if not action or action not in ACTIONS:
print(json.dumps({'status': 'error', 'message': f'Invalid action: {action}'}))
sys.exit(1)
result = ACTIONS[action](params)
print(json.dumps(result, ensure_ascii=False, indent=2))
def run_http_server(host='0.0.0.0', port=8080):
"""HTTP 服务模式"""
try:
from flask import Flask, request, jsonify
from flask_cors import CORS
app = Flask(__name__)
CORS(app)
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'ok', 'skill': 'video-streaming-toolkit'})
@app.route('/skill/compress', methods=['POST'])
def compress():
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No JSON body'}), 400
return jsonify(handle_compress(data))
@app.route('/skill/thumbnail', methods=['POST'])
def thumbnail():
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No JSON body'}), 400
return jsonify(handle_thumbnail(data))
@app.route('/skill/audio', methods=['POST'])
def audio():
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No JSON body'}), 400
return jsonify(handle_audio(data))
@app.route('/skill/audio_batch', methods=['POST'])
def audio_batch():
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No JSON body'}), 400
return jsonify(handle_audio_batch(data))
@app.route('/skill/audio_info', methods=['POST'])
def audio_info():
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No JSON body'}), 400
return jsonify(handle_audio_info(data))
@app.route('/skill/batch', methods=['POST'])
def batch():
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No JSON body'}), 400
return jsonify(handle_batch(data))
@app.route('/skill/info', methods=['POST'])
def info():
data = request.get_json()
if not data:
return jsonify({'status': 'error', 'message': 'No JSON body'}), 400
return jsonify(handle_info(data))
logger.info(f"Starting HTTP server on {host}:{port}")
app.run(host=host, port=port, threaded=True)
except ImportError:
logger.error("Flask not installed. Run: pip install flask flask-cors")
sys.exit(1)
if __name__ == '__main__':
if '--serve' in sys.argv or '-s' in sys.argv:
run_http_server()
else:
run_cli()
FILE:requirements.txt
requests>=2.28.0
opencv-python>=4.8.0
numpy>=1.24.0
aiohttp>=3.8.0
FILE:audio_extractor.py
"""
流式音频提取 - 从远程视频直接提取音频,无需下载完整视频
支持格式: MP3, WAV, AAC, M4A
"""
import subprocess
import requests
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
def get_remote_file_size(url: str) -> int:
"""获取远程文件大小(不下载)"""
try:
response = requests.head(url, timeout=10)
if 'content-length' in response.headers:
return int(response.headers['content-length'])
except Exception as e:
logger.warning(f"获取文件大小失败: {e}")
return 0
def extract_audio_streaming(
video_url: str,
output_path: str = None,
audio_format: str = 'mp3', # mp3, wav, aac, m4a
audio_bitrate: str = '128k', # 128k, 192k, 320k
sample_rate: int = 44100, # 44100, 48000
channels: int = 2, # 1=mono, 2=stereo
start_time: float = None, # 开始时间(秒)
duration: float = None, # 持续时间(秒)
) -> dict:
"""
流式提取音频 - ffmpeg 直接从 URL 提取,无需下载视频
Args:
video_url: 视频 URL
output_path: 输出路径(可选)
audio_format: 音频格式 (mp3, wav, aac, m4a)
audio_bitrate: 音频比特率 (128k, 192k, 320k)
sample_rate: 采样率 (44100, 48000)
channels: 声道数 (1=单声道, 2=立体声)
start_time: 开始时间(秒),提取片段
duration: 持续时间(秒)
Returns:
{
'status': 'success',
'output_path': str,
'format': str,
'size_mb': float,
'duration_sec': float,
'streaming': True
}
"""
# 自动生成输出路径
if output_path is None:
from urllib.parse import urlparse
video_name = Path(urlparse(video_url).path).stem
output_path = f"{video_name}.{audio_format}"
# 构建 ffmpeg 命令
cmd = ['ffmpeg', '-i', video_url]
# 片段提取参数
if start_time is not None:
cmd.extend(['-ss', str(start_time)])
if duration is not None:
cmd.extend(['-t', str(duration)])
# 音频参数
if audio_format == 'mp3':
cmd.extend([
'-c:a', 'libmp3lame',
'-b:a', audio_bitrate,
'-ar', str(sample_rate),
'-ac', str(channels)
])
elif audio_format == 'wav':
cmd.extend([
'-c:a', 'pcm_s16le', # WAV 无损格式
'-ar', str(sample_rate),
'-ac', str(channels)
])
elif audio_format == 'aac':
cmd.extend([
'-c:a', 'aac',
'-b:a', audio_bitrate,
'-ar', str(sample_rate),
'-ac', str(channels)
])
elif audio_format == 'm4a':
cmd.extend([
'-c:a', 'aac',
'-b:a', audio_bitrate,
'-ar', str(sample_rate),
'-ac', str(channels),
'-movflags', '+faststart'
])
else:
return {'status': 'error', 'message': f'Unsupported format: {audio_format}'}
# 输出参数
cmd.extend(['-y', output_path])
logger.info(f"开始流式音频提取: {video_url[:80]}...")
logger.info(f"输出格式: {audio_format}, 比特率: {audio_bitrate}, 采样率: {sample_rate}")
logger.info(f"命令: {' '.join(cmd[:5])}...")
try:
# 执行提取
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
# 实时显示进度
last_progress = ""
duration_sec = 0
for line in process.stderr:
if 'Duration:' in line and duration_sec == 0:
# 解析总时长
import re
match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2}\.\d{2})', line)
if match:
h, m, s = match.groups()
duration_sec = int(h) * 3600 + int(m) * 60 + float(s)
logger.info(f"视频总时长: {duration_sec:.2f}秒")
if 'size=' in line and 'time=' in line:
progress = line.strip()
if progress != last_progress:
logger.info(f"进度: {progress}")
last_progress = progress
return_code = process.wait()
if return_code != 0:
return {
'status': 'error',
'message': f'ffmpeg 错误,返回码: {return_code}'
}
# 检查输出文件
if not Path(output_path).exists():
return {'status': 'error', 'message': '输出文件未生成'}
file_size = Path(output_path).stat().st_size
# 获取提取的音频时长
audio_duration = duration_sec if duration_sec > 0 else None
logger.info(f"音频提取完成: {file_size/(1024*1024):.2f}MB")
return {
'status': 'success',
'output_path': output_path,
'format': audio_format,
'size_mb': round(file_size / (1024 * 1024), 2),
'duration_sec': audio_duration,
'bitrate': audio_bitrate,
'sample_rate': sample_rate,
'channels': channels,
'streaming': True
}
except subprocess.TimeoutExpired:
return {'status': 'error', 'message': '音频提取超时(300秒)'}
except Exception as e:
return {'status': 'error', 'message': str(e)}
def extract_audio_batch(
videos: list,
output_dir: str = './audio_output',
audio_format: str = 'mp3',
audio_bitrate: str = '128k',
sample_rate: int = 44100
) -> dict:
"""
批量提取音频
Args:
videos: 视频列表 [{'url': 'https://...', 'name': 'video1'}, ...]
output_dir: 输出目录
audio_format: 音频格式
audio_bitrate: 比特率
sample_rate: 采样率
Returns:
批量结果
"""
import os
os.makedirs(output_dir, exist_ok=True)
results = []
success_count = 0
for i, video in enumerate(videos):
url = video.get('url')
name = video.get('name', f'audio_{i+1}')
if not url:
results.append({'name': name, 'status': 'error', 'message': 'Missing url'})
continue
output_path = os.path.join(output_dir, f"{name}.{audio_format}")
logger.info(f"批量处理 [{i+1}/{len(videos)}]: {name}")
result = extract_audio_streaming(
video_url=url,
output_path=output_path,
audio_format=audio_format,
audio_bitrate=audio_bitrate,
sample_rate=sample_rate
)
result['name'] = name
results.append(result)
if result.get('status') == 'success':
success_count += 1
return {
'status': 'success',
'total': len(videos),
'success': success_count,
'failed': len(videos) - success_count,
'results': results
}
def get_audio_info(video_url: str) -> dict:
"""
获取视频的音频流信息(不下载)
Args:
video_url: 视频 URL
Returns:
{
'has_audio': bool,
'audio_codec': str,
'audio_bitrate': str,
'sample_rate': int,
'channels': int,
'language': str
}
"""
import re
cmd = ['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_streams', video_url]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return {'status': 'error', 'message': 'ffprobe failed'}
import json
data = json.loads(result.stdout)
for stream in data.get('streams', []):
if stream.get('codec_type') == 'audio':
return {
'status': 'success',
'has_audio': True,
'audio_codec': stream.get('codec_name', 'unknown'),
'audio_bitrate': stream.get('bit_rate', 'unknown'),
'sample_rate': int(stream.get('sample_rate', 0)) if stream.get('sample_rate') else 0,
'channels': stream.get('channels', 0),
'language': stream.get('tags', {}).get('language', 'unknown')
}
return {'status': 'success', 'has_audio': False, 'message': 'No audio stream found'}
except Exception as e:
return {'status': 'error', 'message': str(e)}
FILE:utils.py
import os
import subprocess
import tempfile
from pathlib import Path
def get_file_size_mb(path: str) -> float:
"""获取文件大小(MB)"""
return Path(path).stat().st_size / (1024 * 1024)
def download_video_to_temp(url: str, timeout: int = 300) -> str:
"""下载视频到临时文件(用于压缩场景)"""
import requests
temp_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
temp_path = temp_file.name
temp_file.close()
response = requests.get(url, stream=True, timeout=timeout)
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return temp_path
def cleanup_temp_file(path: str):
"""清理临时文件"""
if path and os.path.exists(path):
os.unlink(path)
FILE:frame_extractor.py
"""
远程视频帧提取服务
支持从远程 URL 按时间/帧号提取视频帧,无需下载完整文件
"""
import requests
import struct
import logging
import os
import tempfile
from typing import Optional, Dict, List
import cv2
import numpy as np
logger = logging.getLogger(__name__)
class RemoteVideoFrameExtractor:
"""远程视频帧提取器 - 通过解析 MP4 结构实现部分下载"""
# MP4 Box 类型常量
BOX_TYPE_MOOV = b'moov'
BOX_TYPE_TRAK = b'trak'
BOX_TYPE_MDIA = b'mdia'
BOX_TYPE_MINF = b'minf'
BOX_TYPE_STBL = b'stbl'
BOX_TYPE_STSD = b'stsd'
BOX_TYPE_STSS = b'stss'
BOX_TYPE_STCO = b'stco'
BOX_TYPE_CO64 = b'co64'
BOX_TYPE_STSZ = b'stsz'
BOX_TYPE_STSC = b'stsc'
def __init__(self, video_url: str, timeout: int = 30):
self.video_url = video_url
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
})
self.file_size = 0
self.width = 0
self.height = 0
self.codec_type = None
self.timescale = 0
self.duration = 0
self.stts = []
self.stss = []
self.stco = []
self.stsz = []
self.stsc = []
self.nal_length_size = 4
self.vps_sps_pps_nalus = []
self._init_video_info()
def _init_video_info(self):
try:
self.file_size = self._get_file_size()
self._find_and_parse_moov()
except Exception as e:
logger.error(f"视频信息解析失败: {e}")
raise
def _get_file_size(self) -> int:
response = self.session.head(self.video_url, timeout=self.timeout)
if 'content-length' in response.headers:
return int(response.headers['content-length'])
response = self.session.get(self.video_url, stream=True, timeout=self.timeout)
return int(response.headers.get('content-length', 0))
def _download_range(self, start: int, end: int) -> bytes:
headers = {'Range': f'bytes={start}-{end}'}
response = self.session.get(self.video_url, headers=headers, timeout=self.timeout)
if response.status_code in [200, 206]:
return response.content
raise Exception(f"HTTP Range 请求失败: {response.status_code}")
def _find_and_parse_moov(self):
pos = 0
probe_size = min(64 * 1024, self.file_size)
probe_data = self._download_range(0, probe_size - 1)
while pos < self.file_size:
if pos + 8 > len(probe_data):
header_bytes = self._download_range(pos, min(pos + 7, self.file_size - 1))
if len(header_bytes) < 8:
break
else:
header_bytes = probe_data[pos:pos + 8]
box_size = struct.unpack('>I', header_bytes[0:4])[0]
box_type = header_bytes[4:8]
if box_size == 1:
if pos + 16 > len(probe_data):
ext_header = self._download_range(pos, min(pos + 15, self.file_size - 1))
else:
ext_header = probe_data[pos:pos + 16]
if len(ext_header) < 16:
break
box_size = struct.unpack('>Q', ext_header[8:16])[0]
if box_size == 0:
box_size = self.file_size - pos
if box_size < 8:
break
if box_type == self.BOX_TYPE_MOOV:
moov_data = self._download_range(pos, pos + box_size - 1)
self._parse_moov(moov_data)
return
pos += box_size
tail_size = min(5 * 1024 * 1024, self.file_size)
tail_data = self._download_range(self.file_size - tail_size, self.file_size - 1)
tail_base_offset = self.file_size - tail_size
scan_pos = 0
while scan_pos < len(tail_data) - 8:
box_size = struct.unpack('>I', tail_data[scan_pos:scan_pos + 4])[0]
box_type = tail_data[scan_pos + 4:scan_pos + 8]
if box_size == 1 and scan_pos + 16 <= len(tail_data):
box_size = struct.unpack('>Q', tail_data[scan_pos + 8:scan_pos + 16])[0]
if box_size < 8:
scan_pos += 1
continue
if box_type == self.BOX_TYPE_MOOV:
actual_offset = tail_base_offset + scan_pos
moov_data = self._download_range(actual_offset, actual_offset + box_size - 1)
self._parse_moov(moov_data)
return
scan_pos += box_size
raise Exception("未找到 moov box")
def _parse_moov(self, moov_data: bytes):
pos = 8
while pos < len(moov_data) - 8:
box_size = struct.unpack('>I', moov_data[pos:pos+4])[0]
if moov_data[pos+4:pos+8] == self.BOX_TYPE_TRAK:
self._parse_trak(moov_data[pos:pos+box_size])
pos += box_size if box_size > 0 else 1
def _parse_trak(self, trak_data: bytes):
is_video = False
mdia_offset, mdia_size = 0, 0
pos = 8
while pos < len(trak_data) - 8:
box_size = struct.unpack('>I', trak_data[pos:pos+4])[0]
if trak_data[pos+4:pos+8] == self.BOX_TYPE_MDIA:
mdia_offset, mdia_size = pos, box_size
m_pos = pos + 8
while m_pos < pos + box_size - 8:
m_size = struct.unpack('>I', trak_data[m_pos:m_pos+4])[0]
if trak_data[m_pos+4:m_pos+8] == b'hdlr':
if trak_data[m_pos+16:m_pos+20] == b'vide':
is_video = True
break
m_pos += m_size if m_size > 0 else 1
pos += box_size if box_size > 0 else 1
if is_video and mdia_size > 0:
self._parse_mdia(trak_data[mdia_offset:mdia_offset+mdia_size])
def _parse_mdia(self, mdia_data: bytes):
pos = 8
while pos < len(mdia_data) - 8:
box_size = struct.unpack('>I', mdia_data[pos:pos+4])[0]
box_type = mdia_data[pos+4:pos+8]
if box_type == b'mdhd':
version = mdia_data[pos+8]
if version == 0:
self.timescale = struct.unpack('>I', mdia_data[pos+20:pos+24])[0]
self.duration = struct.unpack('>I', mdia_data[pos+24:pos+28])[0]
else:
self.timescale = struct.unpack('>I', mdia_data[pos+28:pos+32])[0]
self.duration = struct.unpack('>Q', mdia_data[pos+32:pos+40])[0]
elif box_type == self.BOX_TYPE_MINF:
self._parse_minf(mdia_data[pos:pos+box_size])
pos += box_size if box_size > 0 else 1
def _parse_minf(self, minf_data: bytes):
pos = 8
while pos < len(minf_data) - 8:
box_size = struct.unpack('>I', minf_data[pos:pos+4])[0]
if minf_data[pos+4:pos+8] == self.BOX_TYPE_STBL:
self._parse_stbl(minf_data[pos:pos+box_size])
pos += box_size if box_size > 0 else 1
def _parse_stbl(self, stbl_data: bytes):
pos = 8
while pos < len(stbl_data) - 8:
box_size = struct.unpack('>I', stbl_data[pos:pos+4])[0]
box_type = stbl_data[pos+4:pos+8]
if box_type == self.BOX_TYPE_STSD:
self._parse_stsd(stbl_data[pos:pos+box_size])
elif box_type == self.BOX_TYPE_STSS:
entry_count = struct.unpack('>I', stbl_data[pos+12:pos+16])[0]
for i in range(entry_count):
self.stss.append(struct.unpack('>I', stbl_data[pos+16+i*4:pos+20+i*4])[0])
elif box_type == self.BOX_TYPE_STCO:
entry_count = struct.unpack('>I', stbl_data[pos+12:pos+16])[0]
for i in range(entry_count):
self.stco.append(struct.unpack('>I', stbl_data[pos+16+i*4:pos+20+i*4])[0])
elif box_type == self.BOX_TYPE_CO64:
entry_count = struct.unpack('>I', stbl_data[pos+12:pos+16])[0]
for i in range(entry_count):
self.stco.append(struct.unpack('>Q', stbl_data[pos+16+i*8:pos+24+i*8])[0])
elif box_type == self.BOX_TYPE_STSZ:
sample_size = struct.unpack('>I', stbl_data[pos+12:pos+16])[0]
sample_count = struct.unpack('>I', stbl_data[pos+16:pos+20])[0]
if sample_size == 0:
for i in range(sample_count):
self.stsz.append(struct.unpack('>I', stbl_data[pos+20+i*4:pos+24+i*4])[0])
else:
self.stsz = [sample_size] * sample_count
elif box_type == self.BOX_TYPE_STSC:
entry_count = struct.unpack('>I', stbl_data[pos+12:pos+16])[0]
for i in range(entry_count):
o = pos + 16 + i * 12
self.stsc.append({
'first_chunk': struct.unpack('>I', stbl_data[o:o+4])[0],
'samples_per_chunk': struct.unpack('>I', stbl_data[o+4:o+8])[0]
})
elif box_type == b'stts':
entry_count = struct.unpack('>I', stbl_data[pos+12:pos+16])[0]
for i in range(entry_count):
count = struct.unpack('>I', stbl_data[pos+16+i*8:pos+20+i*8])[0]
delta = struct.unpack('>I', stbl_data[pos+20+i*8:pos+24+i*8])[0]
self.stts.append({'count': count, 'delta': delta})
pos += box_size if box_size > 0 else 1
def _parse_stsd(self, stsd_data: bytes):
pos = 16
while pos < len(stsd_data) - 8:
box_size = struct.unpack('>I', stsd_data[pos:pos+4])[0]
box_type = stsd_data[pos+4:pos+8]
if box_type in [b'avc1', b'hvc1', b'hev1']:
self.codec_type = 'h264' if box_type == b'avc1' else 'h265'
self.width = struct.unpack('>H', stsd_data[pos+32:pos+34])[0]
self.height = struct.unpack('>H', stsd_data[pos+34:pos+36])[0]
v_pos = pos + 86
while v_pos < pos + box_size - 8:
v_size = struct.unpack('>I', stsd_data[v_pos:v_pos+4])[0]
v_type = stsd_data[v_pos+4:v_pos+8]
config_data = stsd_data[v_pos+8:v_pos+v_size]
if v_type == b'avcC':
self._parse_avcc(config_data)
elif v_type == b'hvcC':
self._parse_hvcc(config_data)
v_pos += v_size if v_size > 0 else 1
pos += box_size if box_size > 0 else 1
def _parse_avcc(self, data: bytes):
self.nal_length_size = (data[4] & 0x03) + 1
pos = 6
start_code = b'\x00\x00\x00\x01'
num_sps = data[5] & 0x1F
for _ in range(num_sps):
sps_len = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
self.vps_sps_pps_nalus.append(start_code + data[pos:pos+sps_len])
pos += sps_len
num_pps = data[pos]
pos += 1
for _ in range(num_pps):
pps_len = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
self.vps_sps_pps_nalus.append(start_code + data[pos:pos+pps_len])
pos += pps_len
def _parse_hvcc(self, data: bytes):
self.nal_length_size = (data[21] & 0x03) + 1
num_arrays = data[22]
pos = 23
start_code = b'\x00\x00\x00\x01'
for _ in range(num_arrays):
pos += 1
num_nalus = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
for _ in range(num_nalus):
nal_len = struct.unpack('>H', data[pos:pos+2])[0]
pos += 2
self.vps_sps_pps_nalus.append(start_code + data[pos:pos+nal_len])
pos += nal_len
def get_sample_position(self, sample_number: int) -> Optional[Dict]:
if not self.stsz or sample_number > len(self.stsz) or sample_number < 1:
return None
target_chunk, samples_so_far, first_sample_in_chunk = 1, 0, 1
for i in range(len(self.stsc)):
current = self.stsc[i]
next_chunk = self.stsc[i+1]['first_chunk'] if i+1 < len(self.stsc) else len(self.stco) + 1
chunks_in_rule = next_chunk - current['first_chunk']
samples_in_rule = chunks_in_rule * current['samples_per_chunk']
if samples_so_far + samples_in_rule >= sample_number:
chunks_to_target = (sample_number - samples_so_far - 1) // current['samples_per_chunk']
target_chunk = current['first_chunk'] + chunks_to_target
first_sample_in_chunk = samples_so_far + chunks_to_target * current['samples_per_chunk'] + 1
break
samples_so_far += samples_in_rule
if target_chunk > len(self.stco):
return None
offset = self.stco[target_chunk - 1]
for i in range(first_sample_in_chunk, sample_number):
offset += self.stsz[i - 1]
return {'offset': offset, 'size': self.stsz[sample_number - 1]}
def _get_frame_number_by_time(self, seconds: float) -> int:
if not self.stts or not self.timescale:
return max(1, int(seconds * 30.0))
target_ticks = int(seconds * self.timescale)
current_ticks = 0
current_sample = 1
for entry in self.stts:
entry_ticks = entry['count'] * entry['delta']
if current_ticks + entry_ticks > target_ticks:
ticks_into_entry = target_ticks - current_ticks
samples_into_entry = ticks_into_entry // entry['delta']
return current_sample + samples_into_entry
current_ticks += entry_ticks
current_sample += entry['count']
return current_sample - 1 if current_sample > 1 else 1
def extract_frame_by_time(self, seconds: float) -> Optional[np.ndarray]:
target_frame = self._get_frame_number_by_time(seconds)
if self.stsz and target_frame > len(self.stsz):
target_frame = len(self.stsz)
target_frame = max(1, target_frame)
return self.extract_frame(target_frame)
def extract_frame(self, frame_number: int) -> Optional[np.ndarray]:
keyframe = frame_number
if self.stss:
keyframes = [kf for kf in self.stss if kf <= frame_number]
keyframe = max(keyframes) if keyframes else self.stss[0]
sample_infos = []
min_offset = float('inf')
max_offset = 0
for f in range(keyframe, frame_number + 1):
info = self.get_sample_position(f)
if not info:
logger.warning(f"无法获取帧 {f} 的位置信息")
return None
sample_infos.append(info)
min_offset = min(min_offset, info['offset'])
max_offset = max(max_offset, info['offset'] + info['size'] - 1)
raw_data = self._download_range(min_offset, max_offset)
annexb_stream = bytearray()
for nalu in self.vps_sps_pps_nalus:
annexb_stream.extend(nalu)
for info in sample_infos:
local_offset = info['offset'] - min_offset
sample_data = raw_data[local_offset : local_offset + info['size']]
annexb_stream.extend(self._convert_sample_to_annexb(sample_data))
frames_to_step = frame_number - keyframe + 1
return self._decode_video_stream(bytes(annexb_stream), frames_to_step)
def _convert_sample_to_annexb(self, sample_data: bytes) -> bytes:
result = bytearray()
pos = 0
start_code = b'\x00\x00\x00\x01'
while pos < len(sample_data):
if pos + self.nal_length_size > len(sample_data):
break
if self.nal_length_size == 4:
nal_len = struct.unpack('>I', sample_data[pos:pos+4])[0]
elif self.nal_length_size == 2:
nal_len = struct.unpack('>H', sample_data[pos:pos+2])[0]
else:
nal_len = sample_data[pos]
pos += self.nal_length_size
if pos + nal_len > len(sample_data):
break
result.extend(start_code)
result.extend(sample_data[pos:pos+nal_len])
pos += nal_len
return bytes(result)
def _decode_video_stream(self, video_data: bytes, target_read_count: int) -> Optional[np.ndarray]:
if not video_data:
return None
ext = '.h265' if self.codec_type == 'h265' else '.h264'
temp_path = None
target_frame_img = None
try:
with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as f:
f.write(video_data)
temp_path = f.name
cap = cv2.VideoCapture(temp_path)
for i in range(target_read_count):
ret, frame = cap.read()
if not ret:
break
target_frame_img = frame
cap.release()
if target_frame_img is not None:
return cv2.cvtColor(target_frame_img, cv2.COLOR_BGR2RGB)
except Exception as e:
logger.error(f"视频流解码失败: {e}")
return None
finally:
if temp_path and os.path.exists(temp_path):
os.unlink(temp_path)
return None
def get_video_info(self) -> Dict:
fps = self.timescale if self.stts else 30
duration_sec = self.duration / self.timescale if self.timescale else 0
return {
'width': self.width,
'height': self.height,
'codec': self.codec_type,
'fps': fps,
'duration': duration_sec,
'total_frames': len(self.stsz) if self.stsz else 0
}
def extract_thumbnail_from_url(
video_url: str,
time_seconds: float = None,
frame_number: int = None,
save_path: str = None,
resize_width: int = None,
quality: int = 85
) -> dict:
"""
从远程视频提取封面(流式,只下载必要部分)
"""
extractor = RemoteVideoFrameExtractor(video_url, timeout=60)
if frame_number is not None:
frame = extractor.extract_frame(frame_number)
used_method = f'frame_{frame_number}'
else:
ts = time_seconds if time_seconds is not None else 0
frame = extractor.extract_frame_by_time(ts)
used_method = f'time_{ts}s'
if frame is None:
return {'status': 'error', 'message': 'Failed to extract frame'}
video_info = extractor.get_video_info()
video_info['extract_method'] = used_method
result = {
'status': 'success',
'video_info': video_info,
'shape': frame.shape
}
if save_path:
os.makedirs(os.path.dirname(save_path) if os.path.dirname(save_path) else '.', exist_ok=True)
if resize_width:
h, w = frame.shape[:2]
scale = resize_width / w
new_h = int(h * scale)
resized = cv2.resize(frame, (resize_width, new_h))
frame_to_save = resized
else:
frame_to_save = frame
bgr_frame = cv2.cvtColor(frame_to_save, cv2.COLOR_RGB2BGR)
cv2.imwrite(save_path, bgr_frame, [int(cv2.IMWRITE_JPEG_QUALITY), quality])
result['saved_path'] = save_path
return result
FILE:skill.json
{
"name": "ym-mediatoolkit",
"version": "1.0.0",
"description": "视频处理工具集:1) 视频压缩 2) 封面提取 3) 音频提取(MP3/WAV)",
"author": "your_name",
"entrypoint": "python run.py --input {input_json}",
"http_port": 8080,
"actions": [
{
"name": "compress",
"description": "流式压缩视频,保持清晰度",
"input_schema": {
"type": "object",
"required": ["video_url"],
"properties": {
"video_url": {"type": "string"},
"target_ratio": {"type": "number", "default": 0.1},
"adaptive": {"type": "boolean", "default": true},
"crf": {"type": "integer", "default": 24},
"preset": {"type": "string", "default": "veryfast"}
}
}
},
{
"name": "thumbnail",
"description": "从视频任意时间点或帧号提取封面",
"input_schema": {
"type": "object",
"required": ["video_url"],
"properties": {
"video_url": {"type": "string"},
"time_seconds": {"type": "number"},
"frame_number": {"type": "integer"},
"save_path": {"type": "string"},
"resize_width": {"type": "integer"},
"quality": {"type": "integer", "default": 85}
}
}
},
{
"name": "audio",
"description": "流式提取音频,转成 MP3 或 WAV 格式",
"input_schema": {
"type": "object",
"required": ["video_url"],
"properties": {
"video_url": {"type": "string"},
"format": {"type": "string", "enum": ["mp3", "wav", "aac", "m4a"], "default": "mp3"},
"bitrate": {"type": "string", "default": "128k", "description": "比特率: 128k, 192k, 320k"},
"sample_rate": {"type": "integer", "default": 44100, "description": "采样率: 44100, 48000"},
"channels": {"type": "integer", "default": 2, "description": "声道: 1=单声道, 2=立体声"},
"start_time": {"type": "number", "description": "开始时间(秒)"},
"duration": {"type": "number", "description": "持续时间(秒)"},
"output_path": {"type": "string", "description": "输出路径"}
}
}
},
{
"name": "audio_batch",
"description": "批量提取多个视频的音频",
"input_schema": {
"type": "object",
"required": ["videos"],
"properties": {
"videos": {"type": "array", "description": "视频列表 [{'url': '...', 'name': '...'}]"},
"output_dir": {"type": "string", "default": "./audio_output"},
"format": {"type": "string", "default": "mp3"},
"bitrate": {"type": "string", "default": "128k"},
"sample_rate": {"type": "integer", "default": 44100}
}
}
},
{
"name": "audio_info",
"description": "获取视频的音频流信息",
"input_schema": {
"type": "object",
"required": ["video_url"],
"properties": {
"video_url": {"type": "string"}
}
}
},
{
"name": "info",
"description": "获取完整视频信息",
"input_schema": {
"type": "object",
"required": ["video_url"],
"properties": {
"video_url": {"type": "string"}
}
}
}
]
}