@clawhub-songguocola-ef14bee2fb
智能音频降噪服务,基于阿里巴巴通义实验室 AI 算法,一键消除背景噪音,还原纯净人声。支持 wav、mp3、aac 等主流格式,适用于录音降噪、语音识别预处理、播客后期制作、会议录音优化等场景。当用户需要音频降噪、去除噪音、音频预处理、提升录音质量时使用。
---
name: fun-denoise
description: 智能音频降噪服务,基于阿里巴巴通义实验室 AI 算法,一键消除背景噪音,还原纯净人声。支持 wav、mp3、aac 等主流格式,适用于录音降噪、语音识别预处理、播客后期制作、会议录音优化等场景。当用户需要音频降噪、去除噪音、音频预处理、提升录音质量时使用。
---
# FunAudioDenoise 智能音频降噪
## 服务简介
FunAudioDenoise 是阿里云百炼平台提供的专业级音频降噪服务,采用深度学习算法精准分离人声与背景噪音,让您的录音更清晰、更专业。
### 核心优势
| 特性 | 说明 |
|------|------|
| **AI 智能降噪** | 基于通义实验室深度学习模型,精准识别人声,有效消除环境噪音 |
| **实时流式处理** | WebSocket 双向流式协议,支持边传边处理,响应迅速 |
| **多格式兼容** | 支持 wav、mp3、aac、opus、amr、pcm 等主流音频格式 |
| **大文件支持** | 单文件最大支持 2 小时或 1GB,满足长录音需求 |
| **质量评估** | 自动输出音频质量评分,帮助您了解录音状况 |
### 适用场景
- **会议录音** - 消除会议室回声、空调声、键盘声等干扰
- **播客制作** - 提升人声清晰度,打造专业音质
- **语音识别预处理** - 提高 ASR 识别准确率
- **在线教育** - 优化课程录音质量
- **采访录音** - 还原清晰的对话内容
- **有声书制作** - 打造沉浸式听书体验
## 快速开始
### 环境准备
```bash
pip install dashscope websocket-client
```
### 设置密钥(只需一次)
```bash
export DASHSCOPE_API_KEY="你的阿里云 API 密钥"
```
### 一行命令降噪
```bash
python denoise_cli.py input.mp3 output.wav
```
### Python API 调用
```python
from denoise_cli import denoise_audio
result = denoise_audio(
input_path="noisy_recording.wav",
output_path="clean_audio.wav"
)
if result["success"]:
print(f"降噪完成!音频质量评分: {result['output_info']['voice_quality']}")
```
## 核心 API
### DenoiseParam 参数配置
| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `model` | str | "fun-audio-denoising" | 模型名称(固定值) |
| `apikey` | str | None | DashScope API Key |
| `format` | str | "wav" | 音频格式:wav、mp3、aac、opus、amr、pcm |
| `sample_rate_in` | int | 16000 | 输入采样率(PCM 格式必填) |
| `sample_rate_out` | int | None | 输出采样率(默认同输入) |
| `enable_denoise` | bool | True | 是否启用降噪 |
### 处理结果元数据
```python
{
"sample_rate_out": 48000, # 输出采样率
"voice_quality": "0.89", # 音频质量评分 (0-1)
"valid_speech_ms": "15000" # 有效语音时长(毫秒)
}
```
## 使用示例
### 示例 1:命令行快速降噪
```bash
# 基础使用(自动推断格式)
python denoise_cli.py meeting_recording.mp3
# 指定输出文件
python denoise_cli.py interview.wav clean_interview.wav
# 自定义参数
python denoise_cli.py podcast.mp3 --format mp3 --sample-rate 48000
```
### 示例 2:Python 脚本集成
```python
import dashscope
from audio_process import Denoise, DenoiseParam, ResultCallback, DenoiseResult
import threading
# 设置 API Key
dashscope.api_key = "your-api-key"
class MyCallback(ResultCallback):
def __init__(self):
self.audio_data = b""
self.complete_event = threading.Event()
def on_event(self, result: DenoiseResult):
if result.audio_frame:
self.audio_data += result.audio_frame
def on_complete(self):
print("处理完成!")
self.complete_event.set()
# 配置参数
param = DenoiseParam(
format="wav",
sample_rate_in=16000,
enable_denoise=True
)
# 执行降噪
callback = MyCallback()
denoise = Denoise(param=param, callback=callback)
denoise.start_task()
# 发送音频数据
with open("input.wav", "rb") as f:
while chunk := f.read(3200):
denoise.send_audio_frame(chunk)
denoise.sync_stop_task()
# 保存结果
with open("output.wav", "wb") as f:
f.write(callback.audio_data)
```
### 示例 3:批量处理多个文件
```python
from denoise_cli import denoise_audio
import os
input_dir = "raw_recordings/"
output_dir = "clean_recordings/"
os.makedirs(output_dir, exist_ok=True)
for filename in os.listdir(input_dir):
if filename.endswith(".wav"):
input_path = os.path.join(input_dir, filename)
output_path = os.path.join(output_dir, f"clean_{filename}")
result = denoise_audio(input_path, output_path, verbose=False)
if result["success"]:
quality = result["output_info"].get("voice_quality", "N/A")
print(f"✓ {filename} - 质量评分: {quality}")
else:
print(f"✗ {filename} - 失败: {result['error']}")
```
## 最佳实践
### 1. 音频分帧策略
- **推荐帧大小**:3200 字节(对应 16000Hz 采样率下 100ms 音频)
- **发送间隔**:配合音频时长,模拟实时流(100ms 数据间隔 100ms 发送)
- **大文件处理**:分块读取,避免内存溢出
### 2. 采样率选择
| 场景 | 推荐采样率 | 说明 |
|------|------------|------|
| 语音识别 | 16000Hz | 平衡质量与处理速度 |
| 电话录音 | 8000Hz | 兼容传统电话系统 |
| 音乐/播客 | 44100Hz/48000Hz | 高保真音质 |
### 3. 质量评估解读
- `voice_quality` > 0.8:音频质量优秀
- `voice_quality` 0.5-0.8:音频质量良好,轻度噪音
- `voice_quality` < 0.5:音频质量较差,噪音较多
### 4. 异常处理建议
```python
try:
denoise.start_task()
# ... 发送音频数据
denoise.sync_stop_task(timeout=120000)
except TimeoutError:
print("处理超时,请检查网络连接")
except Exception as e:
print(f"处理失败: {e}")
finally:
denoise.close() # 确保资源释放
```
## 命令行工具详解
```
usage: denoise_cli.py [-h] [--api-key API_KEY] [--format FORMAT]
[--sample-rate SAMPLE_RATE] [--no-denoise]
[--chunk-size CHUNK_SIZE] [--chunk-delay CHUNK_DELAY]
[-q]
input [output]
positional arguments:
input 输入音频文件路径
output 输出音频文件路径(可选)
optional arguments:
-h, --help 显示帮助信息
--api-key API_KEY DashScope API Key
--format FORMAT 音频格式 (wav, mp3, pcm, aac, opus, amr)
--sample-rate SAMPLE_RATE
采样率 (默认: 16000)
--no-denoise 禁用降噪(仅转换格式)
--chunk-size CHUNK_SIZE
分块大小(默认: 3200)
--chunk-delay CHUNK_DELAY
分块发送间隔(默认: 0.1秒)
-q, --quiet 静默模式
```
## 技术规格
| 项目 | 规格 |
|------|------|
| 支持格式 | wav、mp3、aac、opus、amr、pcm |
| 最大时长 | 2 小时 |
| 最大文件 | 1 GB |
| 输出采样率 | 自动优化(默认 48kHz) |
| 协议 | WebSocket 双向流式 |
| 延迟 | < 200ms(首包响应) |
## 相关资源
- [Denoise.py 源码](audio_process/Denoise.py) - SDK 实现源码
---
*FunAudioDenoise - 让每一句声音都清晰可闻*
FILE:denoise_cli.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FunAudioDenoise CLI Tool
智能音频降噪命令行工具
Usage:
python denoise_cli.py <input_audio> [output_audio] [options]
Example:
python denoise_cli.py input.wav output_denoised.wav
python denoise_cli.py input.mp3 --format mp3 --sample-rate 16000
"""
import os
import sys
import time
import argparse
import threading
from pathlib import Path
# Add current directory to path for importing audio_process
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import dashscope
from audio_process import Denoise, DenoiseParam, DenoiseResult, ResultCallback
class DenoiseCallback(ResultCallback):
"""回调处理类,用于接收降噪结果"""
def __init__(self, output_path: str, verbose: bool = True):
self.output_path = output_path
self.verbose = verbose
self.complete_event = threading.Event()
self.file_handle = None
self.audio_count = 0
self.total_bytes = 0
self.error_message = None
self.output_info = None
self.usage_info = None
def on_open(self) -> None:
if self.verbose:
print("[INFO] WebSocket 连接已建立")
try:
self.file_handle = open(self.output_path, "wb")
except Exception as e:
print(f"[ERROR] 无法创建输出文件: {e}")
self.error_message = str(e)
def on_complete(self) -> None:
if self.verbose:
print("[INFO] 降噪处理完成")
if self.file_handle:
self.file_handle.close()
self.complete_event.set()
def on_error(self, message) -> None:
print(f"[ERROR] 处理出错: {message}")
self.error_message = message
if self.file_handle:
self.file_handle.close()
self.complete_event.set()
def on_close(self) -> None:
if self.verbose:
print("[INFO] WebSocket 连接已关闭")
def on_event(self, result: DenoiseResult) -> None:
if result.audio_frame is not None:
if self.file_handle:
self.file_handle.write(result.audio_frame)
self.audio_count += 1
self.total_bytes += len(result.audio_frame)
if self.verbose and self.audio_count % 10 == 0:
print(f" 已接收音频帧 #{self.audio_count}: {self.total_bytes} 字节")
if result.output is not None:
self.output_info = result.output
if self.verbose:
print(f" [元数据] {result.output}")
if result.usage is not None:
self.usage_info = result.usage
def get_audio_format(file_path: str) -> str:
"""根据文件扩展名推断音频格式"""
ext = Path(file_path).suffix.lower()
format_map = {
'.wav': 'wav',
'.mp3': 'mp3',
'.pcm': 'pcm',
'.aac': 'aac',
'.opus': 'opus',
'.amr': 'amr',
}
return format_map.get(ext, 'wav')
def denoise_audio(
input_path: str,
output_path: str = None,
audio_format: str = None,
sample_rate: int = 16000,
enable_denoise: bool = True,
verbose: bool = True,
chunk_size: int = 3200,
chunk_delay: float = 0.1
) -> dict:
"""
对音频文件进行降噪处理
Args:
input_path: 输入音频文件路径
output_path: 输出音频文件路径(默认为 input_denoised.wav)
audio_format: 音频格式(自动检测或手动指定)
sample_rate: 采样率
enable_denoise: 是否启用降噪
verbose: 是否显示详细日志
chunk_size: 分块大小(字节)
chunk_delay: 分块发送间隔(秒)
Returns:
处理结果字典
"""
# 检查输入文件
if not os.path.exists(input_path):
return {"success": False, "error": f"输入文件不存在: {input_path}"}
# 自动推断输出路径
if output_path is None:
input_path_obj = Path(input_path)
output_path = str(input_path_obj.parent / f"{input_path_obj.stem}_denoised.wav")
# 自动推断音频格式
if audio_format is None:
audio_format = get_audio_format(input_path)
if verbose:
print(f"[INFO] 输入文件: {input_path}")
print(f"[INFO] 输出文件: {output_path}")
print(f"[INFO] 音频格式: {audio_format}")
print(f"[INFO] 采样率: {sample_rate}Hz")
print(f"[INFO] 降噪启用: {enable_denoise}")
print("[INFO] 开始降噪处理...")
# 配置参数
param = DenoiseParam(
model="fun-audio-denoising",
format=audio_format,
sample_rate_in=sample_rate,
enable_denoise=enable_denoise,
)
# 创建回调
callback = DenoiseCallback(output_path, verbose=verbose)
# 创建降噪处理器
denoise = Denoise(param=param, callback=callback)
start_time = time.time()
try:
# 启动任务
denoise.start_task()
# 读取并发送音频文件
with open(input_path, "rb") as f:
chunk_num = 0
while True:
data = f.read(chunk_size)
if not data:
break
denoise.send_audio_frame(data)
chunk_num += 1
if chunk_delay > 0:
time.sleep(chunk_delay)
if verbose:
print(f"[INFO] 已发送 {chunk_num} 个音频块")
# 停止任务并等待完成
denoise.sync_stop_task(complete_timeout_millis=120000)
elapsed_time = time.time() - start_time
# 检查结果
if callback.error_message:
return {
"success": False,
"error": callback.error_message,
"output_path": output_path,
"elapsed_time": elapsed_time
}
# 验证输出文件
if not os.path.exists(output_path):
return {
"success": False,
"error": "输出文件未生成",
"elapsed_time": elapsed_time
}
output_size = os.path.getsize(output_path)
input_size = os.path.getsize(input_path)
result = {
"success": True,
"input_path": input_path,
"output_path": output_path,
"input_size": input_size,
"output_size": output_size,
"audio_frames": callback.audio_count,
"elapsed_time": elapsed_time,
"output_info": callback.output_info,
"usage_info": callback.usage_info
}
if verbose:
print(f"\n[INFO] 处理成功!")
print(f" 输入大小: {input_size} 字节")
print(f" 输出大小: {output_size} 字节")
print(f" 音频帧数: {callback.audio_count}")
print(f" 处理耗时: {elapsed_time:.2f} 秒")
if callback.output_info:
print(f" 输出采样率: {callback.output_info.get('sample_rate_out', 'N/A')}Hz")
print(f" 音频质量: {callback.output_info.get('voice_quality', 'N/A')}")
print(f" 有效语音: {callback.output_info.get('valid_speech_ms', 'N/A')}ms")
if callback.usage_info:
print(f" 计费时长: {callback.usage_info.get('duration', 'N/A')} 秒")
return result
except Exception as e:
elapsed_time = time.time() - start_time
return {
"success": False,
"error": str(e),
"output_path": output_path,
"elapsed_time": elapsed_time
}
finally:
denoise.close()
def main():
parser = argparse.ArgumentParser(
description="FunAudioDenoise - 智能音频降噪工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python denoise_cli.py input.wav # 基础使用
python denoise_cli.py input.mp3 output.wav # 指定输出文件
python denoise_cli.py input.wav --format wav --sample-rate 16000
python denoise_cli.py input.wav --no-denoise # 仅转换格式
"""
)
parser.add_argument("input", help="输入音频文件路径")
parser.add_argument("output", nargs="?", help="输出音频文件路径(可选)")
parser.add_argument("--format", help="音频格式 (wav, mp3, pcm, aac, opus, amr)")
parser.add_argument("--sample-rate", type=int, default=16000, help="采样率 (默认: 16000)")
parser.add_argument("--no-denoise", action="store_true", help="禁用降噪(仅转换格式)")
parser.add_argument("--chunk-size", type=int, default=3200, help="分块大小(默认: 3200)")
parser.add_argument("--chunk-delay", type=float, default=0.1, help="分块发送间隔(默认: 0.1秒)")
parser.add_argument("-q", "--quiet", action="store_true", help="静默模式")
args = parser.parse_args()
result = denoise_audio(
input_path=args.input,
output_path=args.output,
audio_format=args.format,
sample_rate=args.sample_rate,
enable_denoise=not args.no_denoise,
verbose=not args.quiet,
chunk_size=args.chunk_size,
chunk_delay=args.chunk_delay
)
if not result["success"]:
print(f"[FAILED] {result['error']}")
sys.exit(1)
else:
print(f"[SUCCESS] 降噪完成: {result['output_path']}")
sys.exit(0)
if __name__ == "__main__":
main()
FILE:audio_process/__init__.py
from .Denoise import Denoise, DenoiseParam, DenoiseResult, ResultCallback
__all__ = ['Denoise', 'DenoiseParam', 'DenoiseResult', 'ResultCallback']
FILE:audio_process/Denoise.py
# -*- coding: utf-8 -*-
# Copyright (c) Alibaba, Inc. and its affiliates.
import json
import platform
import threading
import time
import uuid
from dataclasses import dataclass, field
from typing import Dict, Optional, Any
import websocket
import dashscope
from dashscope.common.error import InputRequired, InvalidTask, ModelRequired
from dashscope.common.logging import logger
from dashscope.protocol.websocket import (
ACTION_KEY,
EVENT_KEY,
HEADER,
TASK_ID,
ActionType,
EventType,
)
class ResultCallback:
"""
An interface that defines callback methods for getting audio denoise results.
Derive from this class and implement its function to provide your own data.
"""
def on_open(self) -> None:
pass
def on_complete(self) -> None:
pass
def on_error(self, message) -> None:
pass
def on_close(self) -> None:
pass
def on_event(self, result: 'DenoiseResult') -> None:
pass
@dataclass
class DenoiseResult:
"""
Result of audio denoise processing.
Attributes:
audio_frame: Processed audio data (bytes)
output: Output information including sample_rate_out, voice_quality, valid_speech_ms
usage: Usage statistics including duration
request_id: Request ID
"""
audio_frame: Optional[bytes] = None
output: Optional[Dict[str, Any]] = None
usage: Optional[Dict[str, Any]] = None
request_id: Optional[str] = None
@dataclass
class DenoiseParam:
"""
Parameters for audio denoise processing.
Attributes:
model: Model name (default: "fun-audio-denoising")
apikey: API key for authentication
format: Audio format (pcm, wav, mp3, aac, opus, amr)
sample_rate_in: Input audio sample rate (required for PCM format, default 16000)
sample_rate_out: Output audio sample rate (must be > 0)
enable_denoise: Whether to enable denoise processing (default True)
url: Audio file URL (for URL-based upload)
workspace: Dashscope workspace ID
headers: User-defined headers
"""
model: str = "fun-audio-denoising"
apikey: Optional[str] = None
format: str = "wav"
sample_rate_in: int = 16000
sample_rate_out: Optional[int] = None
enable_denoise: bool = True
url: Optional[str] = None
workspace: Optional[str] = None
headers: Optional[Dict[str, str]] = None
parameters: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
result = {
"model": self.model,
"format": self.format,
"sample_rate_in": self.sample_rate_in,
"enable_denoise": self.enable_denoise,
}
if self.sample_rate_out is not None:
result["sample_rate_out"] = self.sample_rate_out
if self.url is not None:
result["url"] = self.url
return result
class Denoise:
def __init__(
self,
param: Optional[DenoiseParam] = None,
callback: Optional[ResultCallback] = None,
url: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
workspace: Optional[str] = None,
):
"""
Audio Denoise SDK for real-time audio processing.
Parameters:
-----------
param: DenoiseParam
Configuration for audio denoise processing.
callback: ResultCallback
Callback to receive real-time processing results.
url: str
Dashscope WebSocket URL.
headers: Dict
User-defined headers.
workspace: str
Dashscope workspace ID.
"""
self.ws = None
self.start_event = threading.Event()
self.complete_event = threading.Event()
self._stopped = threading.Event()
self._audio_data: bytes = b""
self._is_started = False
self._cancel = False
self._cancel_lock = threading.Lock()
self.async_call = True
self._is_first = True
self._start_stream_timestamp = -1
self._first_package_timestamp = -1
self._recv_audio_length = 0
self.last_response = None
self._close_ws_after_use = True
# Initialize parameters
self._update_params(param, callback, url, headers, workspace)
def _update_params(
self,
param: Optional[DenoiseParam],
callback: Optional[ResultCallback],
url: Optional[str],
headers: Optional[Dict[str, str]],
workspace: Optional[str],
):
if url is None:
url = dashscope.base_websocket_api_url
self.url = url
# Use provided param or create default
if param is None:
param = DenoiseParam()
# Set API key
if param.apikey is None:
self.apikey = dashscope.api_key
else:
self.apikey = param.apikey
if self.apikey is None:
raise InputRequired("apikey is required!")
self.param = param
self.headers = headers
self.workspace = workspace
self.callback = callback
if not self.callback:
self.async_call = False
# Generate task ID
self.task_id = uuid.uuid4().hex
self.last_request_id = self.task_id
def _get_websocket_headers(self) -> Dict[str, str]:
ua = (
f"dashscope/1.18.0; python/{platform.python_version()}; "
f"platform/{platform.platform()}; "
f"processor/{platform.processor()}"
)
headers = {
"user-agent": ua,
"Authorization": "Bearer " + self.apikey,
}
if self.headers:
headers = {**headers, **self.headers}
if self.workspace:
headers = {**headers, "X-DashScope-WorkSpace": self.workspace}
return headers
def _send_str(self, data: str):
logger.debug(">>>send %s", data)
self.ws.send(data)
def _connect(self, timeout_seconds=5) -> None:
"""
Establish a connection to the Dashscope WebSocket server.
"""
self.ws = websocket.WebSocketApp(
self.url,
header=self._get_websocket_headers(),
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open,
)
self.thread = threading.Thread(target=self.ws.run_forever)
self.thread.daemon = True
self.thread.start()
# Wait for connection to establish
start_time = time.time()
while (
not (self.ws.sock and self.ws.sock.connected)
and (time.time() - start_time) < timeout_seconds
):
time.sleep(0.1)
if not (self.ws.sock and self.ws.sock.connected):
raise TimeoutError(
"websocket connection could not established within 5s. "
"Please check your network connection, firewall settings, or server status."
)
def _is_connected(self) -> bool:
"""
Returns True if the connection is established and still exists;
otherwise, returns False.
"""
if not self.ws:
return False
if not (self.ws.sock and self.ws.sock.connected):
return False
return True
def _reset(self):
self.start_event.clear()
self.complete_event.clear()
self._stopped.clear()
self._audio_data: bytes = b""
self._is_started = False
self._cancel = False
self.async_call = True
self._is_first = True
self._start_stream_timestamp = -1
self._first_package_timestamp = -1
self._recv_audio_length = 0
self.last_response = None
def _get_start_request(self) -> str:
"""Generate run-task request."""
cmd = {
HEADER: {
ACTION_KEY: ActionType.START,
TASK_ID: self.task_id,
"streaming": "duplex",
},
"payload": {
"task_group": "audio",
"task": "audio-process",
"function": "process",
"model": self.param.model,
"parameters": self.param.to_dict(),
"input": {},
},
}
return json.dumps(cmd)
def _get_finish_request(self) -> str:
"""Generate finish-task request."""
cmd = {
HEADER: {
ACTION_KEY: ActionType.FINISHED,
TASK_ID: self.task_id,
"streaming": "duplex",
},
"payload": {
"input": {},
},
}
return json.dumps(cmd)
def connect(self, timeout_seconds=5) -> None:
"""
Establish a connection to the Dashscope WebSocket server.
Parameters:
-----------
timeout_seconds: int
Timeout in seconds for connection establishment.
"""
self._connect(timeout_seconds)
def _start_stream(self):
"""Start the denoise processing stream."""
self._start_stream_timestamp = time.time() * 1000
self._first_package_timestamp = -1
self._recv_audio_length = 0
if self._is_started:
raise InvalidTask("task has already started.")
# Establish WebSocket connection
if self.ws is None:
self._connect(5)
# Send run-task command
request = self._get_start_request()
logger.debug(">>>send run-task: %s", request)
self._send_str(request)
if not self.start_event.wait(10):
raise TimeoutError("start audio denoise failed within 10s.")
self._is_started = True
def start_task(self) -> None:
"""
Start the audio denoise task.
"""
if self._is_started:
raise InvalidTask("audio denoise has already started.")
self._start_stream()
def send_audio_frame(self, audio_data: bytes) -> None:
"""
Send audio frame data for processing.
Parameters:
-----------
audio_data: bytes
Audio data in bytes format.
"""
if not self._is_started:
raise InvalidTask("audio denoise has not been started.")
if self._stopped.is_set():
raise InvalidTask("audio denoise task has stopped.")
logger.debug(">>>send binary %s", len(audio_data))
self.ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
def stop_task(self) -> None:
"""
Stop the audio denoise task.
"""
if not self._is_started:
raise InvalidTask("audio denoise has not been started.")
if self._stopped.is_set():
return
request = self._get_finish_request()
self._send_str(request)
def sync_stop_task(self, complete_timeout_millis=600000):
"""
Synchronously stop the audio denoise task.
Wait for all remaining processing to complete before returning.
Parameters:
-----------
complete_timeout_millis: int
Timeout in milliseconds. If None or <= 0, wait indefinitely.
"""
if not self._is_started:
raise InvalidTask("audio denoise has not been started.")
if self._stopped.is_set():
raise InvalidTask("audio denoise task has stopped.")
self.stop_task()
if complete_timeout_millis is not None and complete_timeout_millis > 0:
if not self.complete_event.wait(timeout=complete_timeout_millis / 1000):
raise TimeoutError(
f"audio denoise wait for complete timeout "
f"{complete_timeout_millis}ms"
)
else:
self.complete_event.wait()
if self._close_ws_after_use:
self.close()
self._stopped.set()
self._is_started = False
def cancel(self):
"""
Immediately terminate the audio denoise task.
"""
if not self._is_started:
raise InvalidTask("audio denoise has not been started.")
if self._stopped.is_set():
return
self.stop_task()
self.ws.close()
self.start_event.set()
self.complete_event.set()
def on_open(self, ws):
"""WebSocket connection opened callback."""
logger.info("WebSocket connection opened")
if self.callback:
self.callback.on_open()
def on_message(self, ws, message):
"""WebSocket message received callback."""
if isinstance(message, str):
logger.debug("<<<recv %s", message)
try:
json_data = json.loads(message)
self.last_response = json_data
if "header" in json_data:
header = json_data["header"]
if EVENT_KEY in header:
event = header[EVENT_KEY]
if event == "task-started":
self.start_event.set()
self._first_package_timestamp = -1
elif event == "task-finished":
self.complete_event.set()
if self.callback:
self.callback.on_complete()
self.callback.on_close()
elif event == "task-failed":
self.start_event.set()
self.complete_event.set()
error_message = "Unknown error"
if "error_message" in header:
error_message = header["error_message"]
if self.async_call:
self.callback.on_error(error_message)
self.callback.on_close()
else:
logger.error(f"TaskFailed: {message}")
raise Exception(f"TaskFailed: {error_message}")
elif event == "result-generated":
if self.callback:
result = DenoiseResult()
if TASK_ID in header:
result.request_id = header[TASK_ID]
if "payload" in json_data:
payload = json_data["payload"]
if "output" in payload:
result.output = payload["output"]
if "usage" in payload:
result.usage = payload["usage"]
self.callback.on_event(result)
except json.JSONDecodeError:
logger.error("Failed to parse message as JSON.")
raise Exception("Failed to parse message as JSON.")
elif isinstance(message, (bytes, bytearray)):
# Binary audio data
logger.debug("<<<recv binary %s", len(message))
if self._recv_audio_length == 0:
self._first_package_timestamp = time.time() * 1000
logger.debug(
"first package delay %s",
self._first_package_timestamp - self._start_stream_timestamp,
)
self._recv_audio_length += len(message)
# Only save audio data in non-async mode
if not self.async_call:
self._audio_data += message
if self.callback:
result = DenoiseResult()
result.audio_frame = bytes(message)
self.callback.on_event(result)
def on_error(self, ws, error):
"""WebSocket error callback."""
logger.error(f"websocket error: {error}")
if self.callback:
self.callback.on_error(str(error))
# Release waiting events
if self.start_event.is_set() and not self.complete_event.is_set():
self.complete_event.set()
def on_close(self, ws, close_status_code, close_msg):
"""WebSocket connection closed callback."""
logger.debug(f"websocket closed: {close_status_code} - {close_msg}")
def close(self):
"""Close WebSocket connection."""
if self.ws:
self.ws.close()
def get_last_request_id(self) -> str:
"""Get the last request ID."""
return self.last_request_id
def update_param_and_callback(
self, param: DenoiseParam, callback: ResultCallback
):
"""
Update parameters and callback for reuse.
Parameters:
-----------
param: DenoiseParam
New parameters for audio denoise.
callback: ResultCallback
New callback for receiving results.
"""
self._reset()
self.param = param
self.callback = callback
self.async_call = self.callback is not None
self.task_id = uuid.uuid4().hex
self.last_request_id = self.task_id
让文字"开口说话"!用 AI 把任意文本变成自然流畅的语音,支持各种方言、情感和角色模仿。当你想把文章转成有声书、给视频配音、制作播客,或者只是好奇河南话/四川话怎么说时,用这个 skill。
---
name: cosyvoice-speech-synthesizer
description: 让文字"开口说话"!用 AI 把任意文本变成自然流畅的语音,支持各种方言、情感和角色模仿。当你想把文章转成有声书、给视频配音、制作播客,或者只是好奇河南话/四川话怎么说时,用这个 skill。
---
# 🎙️ CosyVoice 语音合成 - 让文字开口说话
> 想把一段文字变成语音?想听听广东话、四川话怎么说?想模仿老师的语气读课文?
>
> 这个工具帮你一键搞定!
## ✨ 它能做什么?
- 📖 **文字转语音** - 把文章、故事、通知变成真人般的语音
- 🗣️ **说各种方言** - 广东话、四川话、东北话、河南话...想听哪里的方言都行
- 😊 **带感情朗读** - 开心的、生气的、温柔的、严肃的,想怎么读就怎么读
- 🎭 **角色扮演** - 像老师讲课、像播音员播报、像小朋友说话
- 🎵 **调整声音** - 语速快慢、音调高低、音量大小,随心调节
## 🚀 快速上手
### 第一步:设置密钥(只需一次)
```bash
export DASHSCOPE_API_KEY="你的阿里云 API 密钥"
```
### 第二步:开始合成!
你可以直接说:"合成今天天气怎么样?", 或者 "用河南话说:这碗面真香!"
也可以用代码调用:
```bash
# 最简单的用法 - 直接输入文字
python ~/.qoderwork/skills/cosyvoice-speech-sythesizer/scripts/synthesize.py \
--text "你好,世界!" \
--output hello.wav
```
## 💡 实用场景示例
### 场景 1:听听各地方言
```bash
# 用广东话说
python synthesize.py --text "使用广东话合成:我想吃干炒牛河" --output cantonese.wav
# 用四川话说
python synthesize.py --text "用四川话说:这道菜太巴适了" --output sichuan.wav
# 用河南话说
python synthesize.py --text "用河南话说:这碗面忒香了" --output henan.wav
# 用东北话说
python synthesize.py --text "东北话版本:这旮旯真不错" --output dongbei.wav
```
### 场景 2:给视频配音
```bash
# 热情活泼的宣传语
python synthesize.py \
--text "开心地:欢迎光临我们的新店,全场八折优惠!" \
--output promotion.wav
# 正式庄重的通知
python synthesize.py \
--text "严肃地:请各位同学注意,明天上午九点开会" \
--output announcement.wav
# 温柔亲切的睡前故事
python synthesize.py \
--text "温柔地:从前有一只小兔子,它最喜欢在森林里散步..." \
--output story.wav
```
### 场景 3:模仿不同角色
```bash
# 像老师一样讲课
python synthesize.py \
--text "像老师一样:同学们,今天我们来学习光合作用" \
--output teacher.wav
# 像新闻主播一样播报
python synthesize.py \
--text "像播音员一样:据本台记者报道,今日天气晴朗" \
--output news.wav
# 像小朋友一样说话
python synthesize.py \
--text "像小孩一样:妈妈,我想吃冰淇淋!" \
--output child.wav
```
### 场景 4:调整声音效果
```bash
# 放慢语速,适合学习跟读
python synthesize.py \
--text "慢速朗读:春眠不觉晓,处处闻啼鸟" \
--rate 0.8 \
--output slow.wav
# 加快语速,适合快速浏览
python synthesize.py \
--text "快速播报:今日股市大涨,投资者信心倍增" \
--rate 1.3 \
--output fast.wav
# 调大音量
python synthesize.py \
--text "大声说:重要通知!" \
--volume 80 \
--output loud.wav
```
## 🎯 智能理解你的意思
这个工具很聪明,能听懂你的自然语言描述,自动转换成专业的语音指令:
| 你说的话 | 工具自动理解 |
|---------|------------|
| "用广东话合成..." | 自动加上广东话发音 |
| "开心地说..." | 自动用开心的语气 |
| "像老师一样..." | 自动模仿老师的口吻 |
| "严肃地表达..." | 自动用严肃的语气 |
**不需要记复杂的参数,像说话一样告诉它就行!**
## 🎵 可选音色
- `longanhuan`(默认)- 龙安欢,温柔女声
- `longanyang` - 龙安洋,沉稳男声
- `longhuhu_v3` - 龙呼呼,天真烂漫女童声
更多音色请参考阿里云百炼[官方文档](https://help.aliyun.com/zh/model-studio/cosyvoice-voice-list)
## 📋 常用参数速查
| 参数 | 作用 | 示例 |
|------|------|------|
| `--text` | 要合成的文字 | `"你好"` |
| `--output` | 输出文件名 | `output.wav` |
| `--rate` | 语速(0.5慢,2.0快) | `1.2` |
| `--volume` | 音量(0-100) | `80` |
| `--voice` | 换不同的声音 | `longanyang` |
## 🗣️ 支持的方言和情感
**方言**:广东话、四川话、东北话、上海话、北京话、湖南话、湖北话、河南话、山东话、陕西话、台湾话
**情感**:开心、生气、温柔、悲伤、严肃、幽默、亲切、紧张、兴奋、平静、撒娇、威严
**角色**:老师、播音员、小孩、老人、客服、领导、朋友、医生、律师、销售、导游、新闻主播、诗人
> ⚠️ 注意:方言、情感和角色功能需要特定音色支持,不是所有音色都能用。
## ❓ 常见问题
**Q: 为什么我的账号提示不支持 HTTP 调用?**
A: 该 API 可能正在逐步开放中,请联系阿里云百炼客服确认你的账号权限。
**Q: 合成的音频能直接用吗?**
A: 可以!生成的音频是标准 WAV/MP3 格式,可以直接用于视频、播客、通知等场景。
**Q: 音频链接多久有效?**
A: 24 小时,建议及时下载保存。
**Q: 所有音色都支持方言和情感吗?**
A: 不是,只有部分音色(特别是复刻音色)支持完整的方言和情感控制。
---
**现在就开始,让你的文字开口说话吧!** 🎉
FILE:scripts/synthesize.py
#!/usr/bin/env python3
"""
CosyVoice 语音合成脚本
调用阿里云百炼 CosyVoice API 将文本转换为音频文件
支持从自然语言描述中自动提取 instruction 指令
"""
import argparse
import json
import os
import re
import sys
from urllib.parse import urlparse
import requests
# API 端点
API_ENDPOINT = "https://dashscope.aliyuncs.com/api/v1/services/audio/tts/SpeechSynthesizer"
# Instruction 提取规则
INSTRUCTION_PATTERNS = {
# 方言模式
"dialect": {
"patterns": [
(r"使用?([\u4e00-\u9fa5]{2,3})话?", "请用{0}表达"),
(r"用([\u4e00-\u9fa5]{2,3})话?", "请用{0}表达"),
(r"([\u4e00-\u9fa5]{2,3})话?版本?", "请用{0}表达"),
(r"([\u4e00-\u9fa5]{2,3})口音", "请用{0}表达"),
],
"keywords": {
"广东": "广东话", "粤语": "广东话", "广州": "广东话",
"四川": "四川话", "川普": "四川话",
"东北": "东北话",
"上海": "上海话", "沪语": "上海话",
"北京": "北京话", "京腔": "北京话",
"湖南": "湖南话", "长沙": "湖南话",
"湖北": "湖北话", "武汉": "湖北话",
"河南": "河南话",
"山东": "山东话",
"陕西": "陕西话", "西安": "陕西话",
"台湾": "台湾话", "闽南": "台湾话",
}
},
# 情感模式
"emotion": {
"patterns": [
(r"([\u4e00-\u9fa5]{2,4})地(说|讲|表达|念)", "请用{0}的语气说"),
(r"(非常|很|特别)?([\u4e00-\u9fa5]{2,4})地(说|讲|表达|念)", "请用{1}的语气说"),
(r"([\u4e00-\u9fa5]{2,4})的(语气|口吻|感觉|情绪)", "请用{0}的语气说"),
(r"(显得|表现|带着)([\u4e00-\u9fa5]{2,4}).*?(说|讲|表达)", "请用{1}的语气说"),
],
"keywords": {
"开心": "开心", "高兴": "开心", "快乐": "开心", "愉快": "开心", "欢乐": "开心",
"生气": "生气", "愤怒": "生气", "气愤": "生气", "恼火": "生气",
"温柔": "温柔", "柔和": "温柔", "轻柔": "温柔", "轻声": "温柔", "温婉": "温柔",
"悲伤": "悲伤", "难过": "悲伤", "伤心": "悲伤", "哀伤": "悲伤",
"严肃": "严肃", "认真": "严肃", "庄重": "严肃",
"幽默": "幽默", "搞笑": "幽默", "诙谐": "幽默",
"亲切": "亲切", "热情": "亲切", "友好": "亲切",
"紧张": "紧张", "焦急": "紧张", "着急": "紧张",
"兴奋": "兴奋", "激动": "兴奋", "亢奋": "兴奋",
"平静": "平静", "冷静": "平静", "淡定": "平静",
"撒娇": "撒娇", "可爱": "撒娇", "卖萌": "撒娇",
"威严": "威严", "霸气": "威严", "庄重": "威严",
}
},
# 角色模式
"role": {
"patterns": [
(r"像([\u4e00-\u9fa5]{2,4})一样", "请像{0}一样讲解"),
(r"([\u4e00-\u9fa5]{2,4})的(语气|口吻|风格|方式)", "请像{0}一样讲解"),
(r"模仿([\u4e00-\u9fa5]{2,4})", "请像{0}一样讲解"),
(r"扮演([\u4e00-\u9fa5]{2,4})", "请像{0}一样讲解"),
],
"keywords": {
"老师": "老师", "教师": "老师", "教授": "老师",
"播音员": "播音员", "主持人": "播音员", "主播": "播音员", "主播": "播音员",
"小孩": "小孩", "儿童": "小孩", "孩子": "小孩", "小朋友": "小孩",
"老人": "老人", "老爷爷": "老人", "老奶奶": "老人",
"客服": "客服", "服务员": "客服",
"领导": "领导", "老板": "领导", "上司": "领导",
"朋友": "朋友", "闺蜜": "朋友", "兄弟": "朋友",
"医生": "医生", "护士": "医生",
"律师": "律师",
"销售": "销售", "推销员": "销售",
"导游": "导游",
"新闻": "新闻主播", "记者": "新闻主播",
"诗朗诵": "诗人", "朗诵": "诗人", "诗人": "诗人",
"讲故事": "讲故事的人", "故事": "讲故事的人",
}
},
# 风格模式
"style": {
"patterns": [
(r"([\u4e00-\u9fa5]{2,4})地(说|讲|表达|念)", "请{0}地表达"),
(r"(用|以)?([\u4e00-\u9fa5]{2,4})的(方式|风格|语气)", "请{1}地表达"),
],
"keywords": {
"正式": "正式", "官方": "正式", "规范": "正式",
"随意": "随意", "轻松": "随意", "休闲": "随意",
"专业": "专业", "内行": "专业",
"通俗": "通俗", "易懂": "通俗", "简单": "通俗",
"文艺": "文艺", "诗意": "文艺",
"商务": "商务", "职场": "商务",
"日常": "日常", "生活化": "日常",
}
}
}
def extract_instruction(text: str) -> tuple:
"""
从自然语言描述中提取 instruction 指令和纯文本内容
Args:
text: 用户输入的完整文本
Returns:
(instruction, clean_text): 提取的指令和清理后的文本
"""
instruction = None
clean_text = text
# 按优先级检查各类模式
for category, config in INSTRUCTION_PATTERNS.items():
# 先检查关键词匹配
for keyword, mapped_value in config["keywords"].items():
if keyword in text:
if category == "dialect":
instruction = f"请用{mapped_value}表达"
elif category == "emotion":
instruction = f"请用{mapped_value}的语气说"
elif category == "role":
instruction = f"请像{mapped_value}一样讲解"
elif category == "style":
instruction = f"请{mapped_value}地表达"
# 尝试移除描述性前缀
clean_text = remove_instruction_prefix(text, keyword, category)
return instruction, clean_text
# 再检查正则模式
for pattern, template in config["patterns"]:
match = re.search(pattern, text)
if match:
# 提取匹配的内容
groups = match.groups()
if groups:
# 根据模板中的占位符索引决定使用哪些组
import re as re_module
# 找出模板中所有的占位符,如 {0}, {1}
placeholders = re_module.findall(r'\{(\d+)\}', template)
if placeholders:
# 根据占位符索引提取对应的组
values = []
for idx in placeholders:
idx = int(idx)
if idx < len(groups):
# 如果组为None或空字符串,尝试找下一个非空组
val = groups[idx]
if not val and idx + 1 < len(groups):
val = groups[idx + 1]
values.append(val if val else "")
else:
values.append("")
instruction = template.format(*values)
else:
# 没有占位符,直接使用第一个组
extracted = groups[0] if groups[0] else (groups[1] if len(groups) > 1 else groups[-1])
instruction = template.format(extracted)
clean_text = remove_instruction_prefix(text, match.group(0), category)
return instruction, clean_text
return instruction, clean_text
def remove_instruction_prefix(text: str, matched_text: str, category: str) -> str:
"""
从文本中移除 instruction 描述部分,保留纯内容
Args:
text: 原始文本
matched_text: 匹配到的 instruction 描述
category: 类别
Returns:
清理后的文本
"""
# 常见的指令前缀模式
prefix_patterns = [
rf"使用?{matched_text}[::]?",
rf"用{matched_text}[::]?",
rf"以{matched_text}[::]?",
rf"{matched_text}[::]?",
rf"请{matched_text}[::]?",
r"^(使用?|用|以)?[\u4e00-\u9fa5]{2,4}话?[::]",
r"^(像[\u4e00-\u9fa5]{2,4}一样)[::]?",
r"^[\u4e00-\u9fa5]{2,4}地(说|讲|表达|念)[::]?",
]
clean_text = text
for pattern in prefix_patterns:
clean_text = re.sub(pattern, "", clean_text, flags=re.IGNORECASE)
# 清理多余的标点符号和空格
clean_text = clean_text.strip(" ::").strip()
# 如果清理后为空,返回原始文本
if not clean_text:
clean_text = text
return clean_text
def get_api_key():
"""从环境变量获取 API Key"""
api_key = os.environ.get("DASHSCOPE_API_KEY")
if not api_key:
print("错误: 未设置 DASHSCOPE_API_KEY 环境变量", file=sys.stderr)
print("请设置: export DASHSCOPE_API_KEY='your-api-key'", file=sys.stderr)
sys.exit(1)
return api_key
def synthesize(
text: str,
voice: str = "longanhuan",
model: str = "cosyvoice-v3-flash",
format: str = "wav",
sample_rate: int = 24000,
volume: int = 50,
rate: float = 1.0,
pitch: float = 1.0,
instruction: str = None,
enable_ssml: bool = False,
seed: int = None,
word_timestamp_enabled: bool = False,
hot_fix: str = None,
enable_markdown_filter: bool = False,
auto_extract_instruction: bool = True,
stream: bool = False,
api_key: str = None
) -> dict:
"""
调用 CosyVoice API 进行语音合成
Args:
text: 待合成文本
voice: 音色名称
model: 模型名称
format: 音频格式 (pcm/wav/mp3/opus)
sample_rate: 采样率 (8000-48000 Hz)
volume: 音量 (0-100)
rate: 语速 (0.5-2.0)
pitch: 音高 (0.5-2.0)
instruction: 控制方言/情感/角色 (≤100字符,仅部分音色支持)
enable_ssml: 是否开启 SSML
seed: 随机种子 (0-65535)
word_timestamp_enabled: 是否开启字级时间戳
hot_fix: 文本热修复(仅复刻音色)
enable_markdown_filter: Markdown 过滤(仅复刻音色)
auto_extract_instruction: 是否自动从文本中提取 instruction
stream: 是否使用流式输出
api_key: API Key(如为 None 则从环境变量获取)
Returns:
API 响应字典
"""
if api_key is None:
api_key = get_api_key()
# 自动提取 instruction
extracted_instruction = None
clean_text = text
if auto_extract_instruction and not instruction:
extracted_instruction, clean_text = extract_instruction(text)
if extracted_instruction:
instruction = extracted_instruction
print(f" 自动提取指令: {instruction}")
print(f" 合成文本: {clean_text}")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
if stream:
headers["X-DashScope-SSE"] = "enable"
# 构建 payload
payload = {
"model": model,
"input": {
"text": clean_text,
"voice": voice,
"format": format,
"sample_rate": sample_rate
},
"parameters": {}
}
# 添加可选参数
params = payload["parameters"]
if volume != 50:
params["volume"] = volume
if rate != 1.0:
params["rate"] = rate
if pitch != 1.0:
params["pitch"] = pitch
if instruction:
params["instruction"] = instruction
if enable_ssml:
params["enable_ssml"] = enable_ssml
if seed is not None:
params["seed"] = seed
if word_timestamp_enabled:
params["word_timestamp_enabled"] = word_timestamp_enabled
if hot_fix:
params["hot_fix"] = hot_fix
if enable_markdown_filter:
params["enable_markdown_filter"] = enable_markdown_filter
# 如果没有参数,删除 parameters 字段
if not params:
del payload["parameters"]
try:
response = requests.post(
API_ENDPOINT,
headers=headers,
json=payload,
timeout=300
)
response.raise_for_status()
if stream:
# 流式输出:解析 SSE 数据,获取最后一条消息
lines = response.text.strip().split('\n')
last_data = None
for line in lines:
if line.startswith('data:'):
data_str = line[5:].strip()
try:
data = json.loads(data_str)
if data.get("output", {}).get("finish_reason") == "stop":
last_data = data
except json.JSONDecodeError:
continue
return last_data or {}
else:
return response.json()
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"解析响应失败: {e}", file=sys.stderr)
sys.exit(1)
def download_audio(url: str, output_path: str) -> bool:
"""
下载音频文件
Args:
url: 音频文件 URL
output_path: 保存路径
Returns:
是否下载成功
"""
try:
response = requests.get(url, timeout=120, stream=True)
response.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True
except requests.exceptions.RequestException as e:
print(f"下载音频失败: {e}", file=sys.stderr)
return False
def main():
parser = argparse.ArgumentParser(
description="CosyVoice 语音合成工具 - 支持智能 instruction 提取",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
智能 Instruction 提取示例:
# 自动识别方言
python synthesize.py --text "使用广东话合成:我想吃干炒牛河" --output cantonese.wav
# 自动识别情感
python synthesize.py --text "开心地说:今天天气真好!" --output happy.wav
# 自动识别角色
python synthesize.py --text "像老师一样:同学们上课了" --output teacher.wav
常规用法示例:
# 基本用法
python synthesize.py --text "你好,世界!" --output hello.wav
# 调整语速和音量
python synthesize.py --text "你好!" --rate 1.2 --volume 80 --output hello.wav
# 手动指定 instruction
python synthesize.py --text "今天天气真不错!" --instruction "请用温柔的语气说" --output gentle.wav
# 使用流式输出
python synthesize.py --text "流式合成测试" --output stream.wav --stream
"""
)
parser.add_argument(
"--text",
required=True,
help="待合成的文本内容(支持自然语言描述,如'使用广东话合成:...')"
)
parser.add_argument(
"--voice",
default="longanhuan",
help="音色名称 (默认: longanhuan)"
)
parser.add_argument(
"--model",
default="cosyvoice-v3-flash",
help="模型名称 (默认: cosyvoice-v3-flash)"
)
parser.add_argument(
"--format",
default="wav",
choices=["wav", "mp3", "pcm", "opus"],
help="音频格式 (默认: wav)"
)
parser.add_argument(
"--sample-rate",
type=int,
default=24000,
help="采样率 Hz,范围 8000-48000 (默认: 24000)"
)
parser.add_argument(
"--volume",
type=int,
default=50,
help="音量,范围 0-100 (默认: 50)"
)
parser.add_argument(
"--rate",
type=float,
default=1.0,
help="语速,范围 0.5-2.0 (默认: 1.0)"
)
parser.add_argument(
"--pitch",
type=float,
default=1.0,
help="音高,范围 0.5-2.0 (默认: 1.0)"
)
parser.add_argument(
"--instruction",
help="手动指定控制方言/情感/角色的指令 (≤100字符,仅部分音色支持)"
)
parser.add_argument(
"--no-auto-extract",
action="store_true",
help="禁用自动提取 instruction 功能"
)
parser.add_argument(
"--enable-ssml",
action="store_true",
help="开启 SSML 支持"
)
parser.add_argument(
"--seed",
type=int,
help="随机种子,范围 0-65535"
)
parser.add_argument(
"--word-timestamp",
action="store_true",
dest="word_timestamp_enabled",
help="开启字级时间戳"
)
parser.add_argument(
"--hot-fix",
help="文本热修复(仅复刻音色)"
)
parser.add_argument(
"--enable-markdown-filter",
action="store_true",
help="开启 Markdown 过滤(仅复刻音色)"
)
parser.add_argument(
"--output",
required=True,
help="输出文件路径"
)
parser.add_argument(
"--stream",
action="store_true",
help="使用流式输出模式"
)
parser.add_argument(
"--no-download",
action="store_true",
help="只打印音频 URL,不下载文件"
)
args = parser.parse_args()
# 调用 API
print(f"正在合成语音...")
print(f" 原始文本: {args.text}")
# 预检查是否能提取 instruction
auto_extract = not args.no_auto_extract and not args.instruction
if auto_extract:
extracted_instruction, clean_text = extract_instruction(args.text)
if extracted_instruction:
print(f" 自动提取指令: {extracted_instruction}")
print(f" 合成文本: {clean_text}")
else:
print(f" 合成文本: {args.text}")
else:
print(f" 合成文本: {args.text}")
if args.instruction:
print(f" 手动指令: {args.instruction}")
print(f" 音色: {args.voice}")
print(f" 模型: {args.model}")
print(f" 格式: {args.format}")
print(f" 采样率: {args.sample_rate} Hz")
print(f" 音量: {args.volume}")
print(f" 语速: {args.rate}")
print(f" 音高: {args.pitch}")
if args.enable_ssml:
print(f" SSML: 开启")
if args.seed is not None:
print(f" 随机种子: {args.seed}")
if args.word_timestamp_enabled:
print(f" 字级时间戳: 开启")
if args.hot_fix:
print(f" 热修复: {args.hot_fix}")
if args.enable_markdown_filter:
print(f" Markdown 过滤: 开启")
if args.stream:
print(f" 模式: 流式输出")
print()
result = synthesize(
text=args.text,
voice=args.voice,
model=args.model,
format=args.format,
sample_rate=args.sample_rate,
volume=args.volume,
rate=args.rate,
pitch=args.pitch,
instruction=args.instruction,
enable_ssml=args.enable_ssml,
seed=args.seed,
word_timestamp_enabled=args.word_timestamp_enabled,
hot_fix=args.hot_fix,
enable_markdown_filter=args.enable_markdown_filter,
auto_extract_instruction=auto_extract,
stream=args.stream
)
if not result:
print("合成失败: 未获取到有效响应", file=sys.stderr)
sys.exit(1)
# 检查响应状态
output = result.get("output", {})
finish_reason = output.get("finish_reason")
if finish_reason != "stop":
print(f"合成未完成,状态: {finish_reason}", file=sys.stderr)
print(f"完整响应: {json.dumps(result, indent=2, ensure_ascii=False)}", file=sys.stderr)
sys.exit(1)
# 获取音频信息
audio_info = output.get("audio", {})
audio_url = audio_info.get("url")
audio_id = audio_info.get("id")
expires_at = audio_info.get("expires_at")
request_id = result.get("request_id")
usage = result.get("usage", {})
characters = usage.get("characters", 0)
print(f"合成成功!")
print(f" 请求 ID: {request_id}")
print(f" 音频 ID: {audio_id}")
print(f" 字符数: {characters}")
if expires_at:
from datetime import datetime
expire_time = datetime.fromtimestamp(expires_at)
print(f" URL 过期时间: {expire_time.strftime('%Y-%m-%d %H:%M:%S')}")
print()
if args.no_download:
print(f"音频 URL: {audio_url}")
else:
print(f"正在下载音频到: {args.output}")
if download_audio(audio_url, args.output):
# 获取文件大小
file_size = os.path.getsize(args.output)
print(f"下载完成!")
print(f" 文件路径: {os.path.abspath(args.output)}")
print(f" 文件大小: {file_size / 1024:.2f} KB")
else:
print(f"下载失败,但音频 URL 仍然有效(24小时内):")
print(f" {audio_url}")
sys.exit(1)
if __name__ == "__main__":
main()