@clawhub-miyaosk-ae6f467e63
加密货币量化交易 AI Skill。用自然语言描述交易规则 → 生成策略脚本 → 服务器回测 → 参数优化 → 实时监控。 支持 Binance/Hyperliquid 全币种,6 种优化算法(genetic/bayesian/grid/random/annealing/pso),异步进度推送。 Use when...
---
name: dex-quant-skill
version: 3.47.1
description: |
加密货币量化交易 AI Skill。用自然语言描述交易规则 → 生成策略脚本 → 服务器回测 → 参数优化 → 实时监控。
支持 Binance/Hyperliquid 全币种,6 种优化算法(genetic/bayesian/grid/random/annealing/pso),异步进度推送。
Use when user asks to: 推荐策略, 有什么策略, 推荐, 有现成的吗, 不会写, recommend, suggestions,
创建策略, 创建一个策略, 创建, 建策略, 新策略, 写策略, 做策略, 生成策略, 设计策略, 帮我写, create strategy, new strategy, 弄个策略,
回测, 测一下, 测试, 跑一下, 试试, 看看效果, backtest, test, 历史验证,
优化, 调参, 优化参数, 优化策略, 优化下, optimize, tune,
监控, 部署, 上线, 跑起来, 定时执行, 定时跑, 跑策略, 执行策略, 自动执行, 自动跑, 挂着跑, 定时任务, cron, run, deploy, monitor, 盯盘.
This skill MUST be used for ANY request involving trading strategies, backtesting, parameter optimization, or strategy monitoring/execution.
Also triggered when user replies with a number (1-6) following a skill prompt with numbered options.
Do NOT use coding-agent for these tasks.
========== CRITICAL RULE ==========
当用户要求创建/写/建/做策略时,你必须生成 PYTHON 脚本(使用 data_client + indicators 框架)。
⛔ 绝对禁止生成 TradingView Pine Script(//@version=5, strategy(), ta.ema(), ta.crossover())。
⛔ 绝对禁止生成 MQL4/MQL5 或任何非 Python 的代码。
策略脚本必须包含: from data_client import DataClient + from indicators import Indicators as ind + def generate_signals()
详见下方 §1 的 Python 模板。
==================================
allowed-tools:
- Bash
- Read
- Write
runtime:
entry: SKILL.md
setup: bin/setup
env:
- DEX_QUANT_SERVER_URL
- PROXY_URL
- HYPERLIQUID_PRIVATE_KEY
- HYPERLIQUID_ADDRESS
binaries:
- python3
- pip
- pip3
- node
- npm
- openclaw
---
## Preamble (run first)
```python
import subprocess, os, sys
BASE = '{baseDir}'
os.makedirs(os.path.join(BASE, 'strategies'), exist_ok=True)
os.makedirs(os.path.join(BASE, 'output'), exist_ok=True)
try:
import httpx, loguru, matplotlib
print("DEPS_OK")
except ImportError:
print("INSTALLING_DEPS...")
subprocess.run([sys.executable, '-m', 'pip', 'install', '--break-system-packages',
'httpx', 'loguru', 'matplotlib'], capture_output=True)
print("DEPS_INSTALLED")
print(f"BASE={BASE}")
```
If deps install fails → tell user to install manually and **STOP**.
## Workflow routing
Detect the user's intent and execute the matching workflow straight through.
| User says (任意一个即触发) | Workflow | Your FIRST response |
|-----------|----------|---------------------|
| "推荐策略" "有什么策略" "推荐" "有现成的吗" "不会写策略" "不知道怎么写" "有没有好的策略" "recommend" "suggestions" "哪个策略好" "试试什么" | **Recommend** | **直接推荐正收益策略(见下方 §0)** |
| "创建策略" "创建一个策略" "创建" "建策略" "新策略" "做一个策略" "写策略" "做策略" "生成策略" "设计策略" "帮我写一个" "create" "new strategy" "想做一个xx策略" "帮我做" "弄个策略" | Create | Extract params → generate **Python** script (§1),⛔ 禁止 Pine Script |
| "回测" "测一下" "测试" "跑一下" "试试" "看看效果" "backtest" "test" "历史验证" "验证一下" "跑个回测" "看看能不能赚钱" | Backtest | Execute backtest code (§2) |
| "优化" "调参" "优化参数" "优化策略" "优化下" "optimize" "tune" "调优" "提升" "改进参数" | **Optimize** | **⚠️ 见下方硬规则** |
| "监控" "部署" "上线" "跑起来" "定时执行" "定时跑" "跑策略" "执行策略" "自动执行" "自动跑" "挂着跑" "定时任务" "cron" "run" "deploy" "盯盘" "实盘" "开始跑" "启动" | Monitor | Execute monitor setup (§4) |
| Spans multiple (e.g. "建策略然后回测") | Chain | §1 → §2 sequentially |
### ⚠️ 数字回复续接规则(最高优先级)
当用户只回复一个数字(如 "1" "2" "3" "4" "5" "6")或数字+简短文字(如 "1 genetic" "选2"),**必须结合上一轮对话上下文判断**,不要当作新请求。
**数字上下文映射表:**
| 上一轮你问了什么 | 用户回复 | 你应该做什么 |
|-----------------|---------|-------------|
| 优化算法选择 (1-6) | "1" / "genetic" / "遗传" | 执行 §3 用 genetic 算法优化 |
| 优化算法选择 (1-6) | "2" / "bayesian" / "贝叶斯" | 执行 §3 用 bayesian 算法优化 |
| 优化算法选择 (1-6) | "3" / "grid" / "穷举" | 执行 §3 用 grid 算法优化 |
| 优化算法选择 (1-6) | "4" / "random" / "随机" | 执行 §3 用 random 算法优化 |
| 优化算法选择 (1-6) | "5" / "annealing" / "退火" | 执行 §3 用 annealing 算法优化 |
| 优化算法选择 (1-6) | "6" / "pso" / "粒子" | 执行 §3 用 pso 算法优化 |
| 监控/部署请求 | 任何 | 执行 §4 选择模式(信号监控 or 监控+自动下单) |
| 私钥/密钥/钱包设置 | 任何 | 执行 §4 Step 2b 安全链接流程 |
| 回测报告下一步 (1-6) | "1" / "genetic" | 执行 §3 用 genetic 算法优化 |
| 回测报告下一步 | "回测" / "再测一次" | 执行 §2 重新回测 |
| 回测报告下一步 | "部署" / "监控" / "跑起来" | 执行 §4 监控 |
| 推荐策略下一步 (1-3) | "1" | 执行 §2 回测 sol_rsi_momentum.py |
| 推荐策略下一步 (1-3) | "2" | 执行 §2 回测 btc_rsi_momentum.py |
| 推荐策略下一步 (1-3) | "3" | 执行 §1 引导用户创建新策略 |
**关键规则:**
- 用户回复纯数字时,**绝对禁止**当作新对话处理
- 必须回溯上一轮消息,找到对应的选项列表
- 找到后直接执行对应操作,不要再问"你是想选 xx 吗?"
- 如果上下文确实找不到选项列表,才问用户"请问你指的是?"
### ⚠️ "推荐策略"硬规则 — 必须逐字执行
当用户触发 Recommend 工作流(含"推荐策略"/"有什么策略"/"推荐"/"有现成的吗"/"不会写"/"不知道怎么写"/"哪个好"/"recommend" 等),你的回复**必须且只能是以下内容**(逐字复制,不要改写、不要加分析、不要讲策略类型教程):
> 📊 这是我实测过有正收益的策略,直接用就行:
>
> 1️⃣ SOL RSI 动量策略 (sol_rsi_momentum.py)
> 🪙 SOLUSDT · 4h
> 📈 RSI>65 追涨 + RSI<35 追跌,EMA50 趋势过滤
> 💰 2025 回测: +2.27%
>
> 2️⃣ BTC RSI 动量策略 (btc_rsi_momentum.py)
> 🪙 BTCUSDT · 4h
> 📈 RSI>70 极端动量入场,EMA50 过滤,4x ATR trailing
> 💰 2025 回测: +1.40%(B 级评分)
>
> 选一个数字,我帮你回测看最新效果 👇
> 1 — 回测 SOL 策略
> 2 — 回测 BTC 策略
> 3 — 我想自己写一个新策略
**然后等用户回复,不要做任何其他事情。**
**禁止行为(违反任何一条 = 没有遵守 skill):**
- ❌ 讲"趋势跟随、均值回归、突破策略"等策略类型教程
- ❌ 解释各类策略的优缺点
- ❌ 说"你可以试试 EMA 双均线"这种没有文件名的泛泛建议
- ❌ 自由发挥、改写模板、加自己的分析
- ❌ 推荐负收益策略
- ❌ 讨论自己该不该遵守 skill、反思流程(不要 meta 讨论,直接执行)
**❌ BAD — 以下是错误回复(绝对禁止):**
> "最值得试的 4 类:1. 趋势跟随+回踩确认 2. EMA20/60+RSI+ATR 3. 突破策略 4. 波动率过滤趋势策略……"
> "我该先看 skill 里有没有推荐模板……我前面偏离了 skill 的执行纪律……"
**✅ GOOD — 唯一正确的回复就是上方引用块里的模板,逐字复制,不多不少。**
### ⚠️ "优化"硬规则 — 必须逐字执行
当用户触发 Optimize 工作流(含"优化"/"调参"/"调优"/"提升"/"改进参数"/"optimize"/"tune" 等),你的回复**必须且只能是以下内容**(逐字复制,不要改写、不要加分析、不要先给建议):
> 好的,我们用服务器算法自动搜索最优参数。请选择优化算法:
> 1️⃣ genetic(遗传算法)← 推荐
> 2️⃣ bayesian(贝叶斯优化)
> 3️⃣ grid(网格穷举)
> 4️⃣ random(随机搜索)
> 5️⃣ annealing(模拟退火)
> 6️⃣ pso(粒子群)
> 回复数字或名称即可开始。
**然后等用户回复,不要做任何其他事情。**
用户回复后 → 执行 §3 Step 0 + Step 1 代码 → 调用 `run_optimization()`。
**禁止行为(违反任何一条 = 没有遵守 skill):**
- ❌ 在列算法之前先分析策略哪里不好
- ❌ 自己修改策略代码的任何部分
- ❌ 给策略加新指标/过滤器
- ❌ 说"这个策略不值得优化"然后跳过
- ❌ 自己决定要重新设计而不是优化
**你没有权力判断策略值不值得优化。用户说优化,你就优化。**
**Automation posture:** prefer direct execution. Run the code and show results rather than listing steps. Use sensible defaults unless user specifies otherwise.
**Only stop to ask when:**
- Strategy logic is genuinely ambiguous (missing entry/exit conditions)
- Optimization target metric unclear
- Live deployment (always confirm — real money)
**Never stop for:**
- Choice of timeframe, symbol, capital (use defaults)
- Whether to show metrics (always show)
- Whether to retry on error (always retry once)
---
## ⚠️ Output Delivery Rule (Telegram)
用户通过 Telegram 使用本 Skill。你执行代码后,**必须把 stdout 输出作为 TG 消息发送给用户**,而不是仅仅"展示执行结果"。
### 核心流程
```
代码执行 → stdout 输出(print)→ AI 捕获 → 作为 TG 消息发送给用户
```
### 具体规则
1. **纯文本输出** — 执行代码后,将 `print()` 输出的格式化内容**原样作为 TG 消息发送**,不要包裹在代码块里,不要加额外解释
2. **图片 + caption** — 当代码生成了图片(`_equity_chart_path` / `_optimization_chart_path`),用 Bash 执行 `openclaw message send --path "<图片路径>" --caption "<_caption文本>"` 发送图片
3. **禁止只展示不发送** — 不要把执行结果放在代码块或"执行结果"框里给用户看,用户在 TG 上看不到这些
4. **禁止重述/改写** — `print()` 输出已经是格式化好的 tag 样式,直接发,不要用自己的话重写
5. **图片优先原则** — 有图片时只发图片消息(caption 含摘要),不要额外再发文字消息;无图片时发一条文字消息
### 消息类型对照
| 场景 | 发什么 | 怎么发 |
|------|--------|--------|
| 策略已生成 | 文本消息 | stdout 输出原样发送 |
| 回测完成 | **图片** + caption | 发图片附件,caption = `_caption`,不要额外发文字 |
| 优化完成 | **图片** + caption | 发图片附件,caption = `_caption`,不要额外发文字 |
| 监控启动/停止/列表/状态 | 文本消息 | stdout 输出原样发送 |
| 选择提示(算法/模式) | 文本消息 | 逐字发送模板内容 |
注意:回测和优化使用单代码块(`run_server_backtest` / `run_optimization`),执行过程中的 stdout(进度、提交确认等)不需要单独发送,最终只发图片+caption 即可。
---
## §1 Create Strategy
User describes a trading idea → you generate a Python script → save to `{baseDir}/strategies/`.
### Step 1: Extract parameters
From the user's description, extract:
```
SYMBOL: Which coin pair (default: BTCUSDT)
TIMEFRAME: K-line interval (default: 4h)
ENTRY: What triggers buy/long
EXIT: What triggers sell/close
RISK: Stop loss, take profit, position sizing
FILTERS: Volume, volatility, time-of-day
```
If entry/exit conditions are missing, **STOP** and ask. Everything else — use defaults silently.
### Step 2: Generate the script
Save to `{baseDir}/strategies/{name}_strategy.py`. The script is **never executed locally** — its source code is uploaded to the server as a string for backtesting.
```python
import sys
sys.path.insert(0, '{baseDir}/scripts')
from data_client import DataClient
from indicators import Indicators as ind
import numpy as np
def generate_signals(mode='backtest', start_date=None, end_date=None):
dc = DataClient()
df = dc.get_perp_klines("BTCUSDT", "4h", start_date, end_date)
close = df["close"].values.astype(float)
high = df["high"].values.astype(float)
low = df["low"].values.astype(float)
volume = df["volume"].values.astype(float)
# --- Indicators ---
ema_fast = ind.ema(close, 20)
ema_slow = ind.ema(close, 60)
# --- Signals ---
signals = []
lookback = 61 # max indicator period + 1
for i in range(lookback, len(df)):
if np.isnan(ema_fast[i]) or np.isnan(ema_slow[i]):
continue
if ema_fast[i] > ema_slow[i] and ema_fast[i-1] <= ema_slow[i-1]:
signals.append({
"timestamp": str(df.iloc[i]["datetime"]),
"symbol": "BTCUSDT", "action": "buy", "direction": "long",
"confidence": 0.7, "reason": "EMA20 cross up EMA60",
"price_at_signal": float(df["close"].iloc[i]),
})
if ema_fast[i] < ema_slow[i] and ema_fast[i-1] >= ema_slow[i-1]:
signals.append({
"timestamp": str(df.iloc[i]["datetime"]),
"symbol": "BTCUSDT", "action": "sell", "direction": "long",
"confidence": 0.7, "reason": "EMA20 cross down EMA60",
"price_at_signal": float(df["close"].iloc[i]),
})
return {"strategy_name": "EMA Cross Strategy", "signals": signals}
```
### Step 3: Output → 发 TG 消息
策略文件保存后,**发一条 TG 消息**给用户(不是代码块,直接发文本消息):
> ✅ 策略已生成
> 📊 策略: {strategy_name}
> 🪙 交易对: {SYMBOL} · {TIMEFRAME}
> 📈 入场: {entry 一句话}
> 📉 出场: {exit 一句话}
> 📁 文件: {file_path}
> 要回测看看效果吗?
### All pre-built strategies (in `{baseDir}/strategies/`)
| Strategy file | Symbol | Style | 2025 回测 | Tested grade |
|--------------|--------|-------|-----------|--------------|
| `sol_rsi_momentum.py` | SOLUSDT | RSI>65 追涨 + RSI<35 追跌,EMA50 趋势过滤,trailing stop | **+2.27%** | C (7/14) |
| `btc_rsi_momentum.py` | BTCUSDT | RSI>70 极端动量入场,EMA50 过滤,4x ATR trailing | **+1.40%** | **B (10/14)** |
| `sol_kdj_swing.py` | SOLUSDT | KDJ 超卖反弹 + EMA50 趋势过滤,多空双向 | +2.09% | C (6/14) |
| `btc_trend_pullback.py` | BTCUSDT | EMA50 趋势 + EMA20 回踩入场,ATR trailing | -1.21% | C (8/14) |
| `btc_macd_trend.py` | BTCUSDT | MACD 金叉/死叉 + EMA100 方向过滤 | -1.84% | C (7/14) |
**只推荐前 2 个正收益策略。** 其余策略仅在用户主动问起时提及。
All strategies have `PARAMS` dict for optimization. Suggest: "可以用优化功能搜索最优参数"
### Sandbox rules (CRITICAL — violating these causes server backtest to fail)
| Allowed | Blocked |
|---------|---------|
| `sys`, `numpy`, `data_client`, `indicators` | `os`, `subprocess`, `socket`, `requests`, `httpx`, `pandas` |
| `ind.ema()`, `ind.sma()`, `ind.rsi()` | `df.rolling()`, `df.shift()`, `df.apply()` |
| `df["close"].values.astype(float)` | `df["close"].rolling(20).mean()` |
| `float(df["close"].iloc[i])` | `import pandas as pd` |
| `str(df.iloc[i]["datetime"])` | `df.index[i]` or row index `i` as timestamp |
### Signal fields
| Field | Required | Example |
|-------|----------|---------|
| `timestamp` | Yes | `str(df.iloc[i]["datetime"])` |
| `symbol` | Yes | `"BTCUSDT"` |
| `action` | Yes | `buy` / `sell` / `close` / `hold` |
| `direction` | Yes | `long` / `short` |
| `confidence` | Yes | `0.7` (0.0–1.0) |
| `reason` | Yes | `"EMA20 cross up EMA60"` |
| `price_at_signal` | Yes | `float(df["close"].iloc[i])` |
| `suggested_stop_loss` | No | stop loss price |
| `suggested_take_profit` | No | take profit price |
---
## §2 Backtest (server-side, free, unlimited)
**How it works:** Read strategy `.py` → pass source code as string → server fetches K-lines, executes script, simulates trades, returns metrics. You never run the strategy script locally.
```
LOCAL (单代码块) SERVER
┌──────────────────┐ script ┌─────────────────┐
│ run_server_ │ ───────▶ │ Fetch K-lines │
│ backtest() │ │ Execute script │
│ (内部自动轮询) │ ◀─────── │ Simulate trades │
│ print_metrics() │ metrics │ Return report │
└──────────────────┘ └─────────────────┘
```
### Step 1: Run backtest (单代码块,提交+轮询+报告一次完成)
**⚠ 必须用 `run_server_backtest()`!它内置了自动轮询和进度打印,一个代码块搞定全流程。**
**⛔ 禁止拆分为两个代码块!** 拆分后第二个代码块不会被执行,用户收不到结果。
```python
import sys
sys.path.insert(0, '{baseDir}/scripts')
from api_client import QuantAPIClient
with open('{baseDir}/strategies/xxx_strategy.py', 'r') as f:
script_content = f.read()
client = QuantAPIClient(timeout=300.0)
bt = client.run_server_backtest(
script_content=script_content,
strategy_name="策略名",
symbol="BTCUSDT",
timeframe="4h",
start_date="2025-01-01",
end_date="2025-12-31",
leverage=3,
initial_capital=100000,
direction="long_short",
)
```
`run_server_backtest()` 内部会自动:
1. 提交回测任务并打印 `📋 回测已提交: {job_id} | 策略名 (BTCUSDT 4h)`
2. 每 5 秒轮询进度,打印 `⏳ [Xs] stage (N%)`
3. 完成后自动调用 `print_metrics()` 生成报告 + 权益曲线 PNG
**⚠ 代码执行完毕后你 MUST 用 Bash 发送图片:**
```bash
openclaw message send --path "<bt._equity_chart_path的值>" --caption "<bt._caption的值 + 评分建议>"
```
**⛔ 禁止只打印图片路径当文字发。必须用 `openclaw message send --path` 发送图片文件。**
**⛔ 禁止行为:**
- ❌ 自己写分析段落("结果"/"结论"/"我的判断")
- ❌ 用自己的话重述指标数据
- ❌ 忽略 `_caption` 另起炉灶
- ❌ 只发文字不发图片
- ❌ 图片之外再额外发文字消息
- ❌ 拆分为两个代码块(submit + poll 分开执行)
### Backtest parameters
| Param | Default | Options |
|-------|---------|---------|
| `symbol` | `BTCUSDT` | Any Binance perpetual pair |
| `timeframe` | `4h` | `1m` `5m` `15m` `1h` `4h` `1d` |
| `start_date` | `2025-01-01` | YYYY-MM-DD |
| `end_date` | `2025-12-31` | YYYY-MM-DD |
| `leverage` | `3` | 1–125 |
| `initial_capital` | `100000` | USD |
| `direction` | `long_short` | `long_only` `short_only` `long_short` |
### Error handling
| Error | Auto-action |
|-------|-------------|
| `脚本安全检查未通过` | Fix strategy (sandbox violation) — see §1 Sandbox rules |
| `status: failed` | Retry once automatically, then report |
| 执行超时 | `run_server_backtest` 内部自动每 5 秒轮询,无需手动处理 |
| Network error / timeout | Retry once, then report |
### Display rules
`print_trades(bt)` prints full trade table — only needed if user asks for more details.
After completion, suggest next step **based on grade** (append to caption, keep concise):
- A/B 级 → `#优秀` 效果不错,可以直接部署监控
- C/D 级 → `#待优化` 建议用参数优化提升,推荐 genetic
- F 级 → `#失败` 建议重新设计策略逻辑
- Zero trades → `#无信号` 入场条件可能太严格
### Strategy evaluation standard
Server returns a scorecard with 7 metrics, each scored 0-2 (max 14):
| Metric | 🟢 优 (2分) | 🟡 及格 (1分) | 🔴 差 (0分) |
|--------|------------|--------------|------------|
| 收益率 | >20% | >0% | ≤0% |
| Sharpe | >1.5 | >0.5 | ≤0.5 |
| 最大回撤 | <10% | <20% | ≥20% |
| 胜率 | >50% | >35% | ≤35% |
| 盈亏比 | >1.5 | >1.0 | ≤1.0 |
| 交易数 | ≥30 | ≥10 | <10 |
| 爆仓 | 0次 | — | >0次 |
| Grade | Score | Conclusion | Meaning |
|-------|-------|------------|---------|
| A | 12-14 | approved | 优秀策略,可直接实盘 |
| B | 9-11 | approved | 良好策略,建议小仓实盘验证 |
| C | 6-8 | paper_trade_first | 及格策略,建议先模拟观察 |
| D | 3-5 | rejected | 较差策略,需要优化后再测 |
| F | 0-2 | rejected | 失败策略,建议重新设计 |
---
## §3 Optimize (server-side, free, unlimited)
**Reminder:** 触发表里的"优化硬规则"已经规定了你的第一条回复内容。到这里时,用户已经选好了算法。直接执行下面的步骤。
### Step 0: Check if strategy is parameterized
The strategy must have a `PARAMS` dict at the top. If not, refactor it first:
**Before (hardcoded — cannot optimize):**
```python
ema_fast = ind.ema(close, 20)
ema_slow = ind.ema(close, 60)
```
**After (parameterized — ready to optimize):**
```python
PARAMS = {'fast_ema': 20, 'slow_ema': 60, 'rsi_th': 55, 'sl_atr': 1.5, 'tp_atr': 3.0}
def generate_signals(mode='backtest', start_date=None, end_date=None):
fast = PARAMS['fast_ema']
slow = PARAMS['slow_ema']
ema_fast = ind.ema(close, fast)
ema_slow = ind.ema(close, slow)
```
If the strategy needs refactoring, do it silently, save, then continue.
### Step 1: Run optimization (单代码块,提交+轮询+报告一次完成)
**⚠ 必须用 `run_optimization()`!它内置了自动轮询和进度打印(25%/50%/90%里程碑),一个代码块搞定全流程。**
**⛔ 禁止拆分为两个代码块!** 拆分后第二个代码块不会被执行,用户收不到结果。
```python
import sys; sys.path.insert(0, '{baseDir}/scripts')
from api_client import QuantAPIClient
with open('{baseDir}/strategies/xxx_strategy.py', 'r') as f:
script_content = f.read()
client = QuantAPIClient(timeout=600.0)
result = client.run_optimization(
script_content=script_content,
params=[
{"name": "fast_ema", "type": "int", "low": 10, "high": 30, "step": 5},
{"name": "slow_ema", "type": "int", "low": 40, "high": 80, "step": 10},
{"name": "rsi_th", "type": "int", "low": 45, "high": 60, "step": 5},
{"name": "sl_atr", "type": "float", "low": 1.0, "high": 2.0, "step": 0.2},
{"name": "tp_atr", "type": "float", "low": 2.0, "high": 4.0, "step": 0.5},
],
strategy_name="策略优化",
symbol="BTCUSDT", timeframe="4h",
start_date="2025-01-01", end_date="2025-12-31",
fitness_metric="sharpe_ratio",
max_combinations=100,
method="genetic",
)
```
`run_optimization()` 内部会自动:
1. 提交任务并打印 `⏳ 优化任务已提交 (job_id: xxx),共 N 种参数组合`
2. 每隔几秒轮询进度,在 25%/50%/90% 节点打印里程碑
3. 完成后自动调用 `print_optimization()` 生成报告 + 优化图表 PNG
**⚠ 代码执行完毕后你 MUST 用 Bash 发送图片:**
```bash
openclaw message send --path "<result._optimization_chart_path的值>" --caption "<result._caption的值>"
```
**⛔ 禁止只打印图片路径当文字发。必须用 `openclaw message send --path` 发送图片文件。**
**⛔ 禁止行为:**
- ❌ 自己写分析段落替代 caption
- ❌ 用自己的话重述参数和指标
- ❌ 忽略 `_caption` 另起炉灶
- ❌ 只发文字不发图片
- ❌ 图片之外再额外发文字消息
- ❌ 拆分为两个代码块(submit + poll 分开执行)
### Optimization methods
| Method | Best for | When to pick |
|--------|----------|--------------|
| `genetic` | Large param space | **Default** |
| `bayesian` | Few evaluations | User says "快速" |
| `grid` | ≤200 combos | User says "穷举" |
| `random` | High-dimensional | Exploratory |
| `annealing` | Escape local optima | Stuck in bad region |
| `pso` | Continuous params | All-float params |
### Fitness metrics
| Metric | Default |
|--------|---------|
| `sharpe_ratio` | **Yes** — risk-adjusted return |
| `total_return` | Raw total return |
| `max_drawdown` | Minimize drawdown |
| `win_rate` | Maximize win rate |
| `profit_factor` | Gross profit / gross loss |
---
## §4 Monitor & Execute (策略监控 — 服务器模式)
### Step 0: Pre-flight
If the strategy hasn't been backtested, warn: "这个策略还没有回测过,建议先回测。" If user insists, proceed.
### Step 1: 选择模式
When user triggers Monitor workflow, you MUST present this message verbatim:
> 📡 **策略监控部署** — 请选择模式:
>
> 1️⃣ **仅监控信号** — 服务器 7×24 运行,收到信号后你自己操作,不需要私钥
> 2️⃣ **监控 + 自动下单** — 服务器 7×24 运行,信号产生后自动下单到 Hyperliquid
>
> 两种模式都:免费 3 个策略、7×24、无需本地开机。
> 模式 2 需要通过安全链接配置钱包密钥(不在聊天里输入)。
> 回复 1 或 2 选择。
Wait for user to choose before proceeding.
### Mode 1: 仅监控信号(用户选了 1)
```python
import sys; sys.path.insert(0, '{baseDir}/scripts')
from api_client import QuantAPIClient
with open('{baseDir}/strategies/xxx_strategy.py', 'r') as f:
script_content = f.read()
client = QuantAPIClient(timeout=60.0)
result = client.start_monitor(
script_content=script_content,
strategy_name="策略名",
symbol="BTCUSDT",
timeframe="4h",
interval_seconds=14400,
)
print(f"✅ 监控已启动 | Job ID: {result['job_id']} | 配额 {result['quota_used']}/{result['quota_max']}")
```
### Mode 2: 监控 + 自动下单(用户选了 2)
**先配置密钥(安全链接) → 再启动监控。信号产生后服务器自动下单。**
```python
import sys; sys.path.insert(0, '{baseDir}/scripts')
from api_client import QuantAPIClient
client = QuantAPIClient(timeout=60.0)
vault = client.vault_status()
if not vault.get("has_key"):
link = client.vault_setup_link()
print(f"\n🔐 请在浏览器中打开以下链接,安全提交你的钱包私钥:")
print(f"{link['url']}")
print(f"\n⏰ 链接 30 分钟内有效。提交完成后回来告诉我「OK」。")
```
When user confirms key is set:
```python
import sys; sys.path.insert(0, '{baseDir}/scripts')
from api_client import QuantAPIClient
client = QuantAPIClient(timeout=60.0)
vault = client.vault_status()
if not vault.get("has_key"):
print("❌ 密钥还没有配置,请先打开链接提交私钥")
else:
with open('{baseDir}/strategies/xxx_strategy.py', 'r') as f:
script_content = f.read()
result = client.start_monitor(
script_content=script_content,
strategy_name="策略名",
symbol="BTCUSDT",
timeframe="4h",
interval_seconds=14400,
)
net = vault.get("network", "mainnet")
print(f"✅ 监控+自动下单已启动 | Job ID: {result['job_id']} | {net}")
print(f" 产生信号后将自动下单到 Hyperliquid {'测试网' if net == 'testnet' else '主网'}")
```
### 管理命令
- 查看状态: `client.check_monitor(job_id)`
- 列出全部: `client.list_monitors()`
- 停止监控: `client.stop_monitor(job_id)`
- 查看密钥: `client.vault_status()`
- 删除密钥: `client.vault_delete()`
- 重新设置: `client.vault_setup_link()`
---
### 私钥安全规则
**⛔ 绝对不要让用户在聊天中发送私钥!**
如果用户主动发送了私钥,你必须立即回复:
> ⚠️ **安全警告**
>
> 请不要在聊天中发送私钥!私钥会留在聊天记录中,非常不安全。
> 如果这个私钥控制了真实资金,建议立即转移资产并更换钱包。
>
> 正确做法:我帮你生成一个安全链接,你在浏览器里提交私钥,不经过聊天。
> 回复「设置密钥」,我来帮你操作。
---
### Risk rules
| Rule | Default | Effect |
|------|---------|--------|
| 置信度 | ≥ 0.6 | 低于 0.6 的信号不执行 |
| 仓位限制 | 10% equity | 单笔不超过总权益的 10% |
| 并发限制 | 3 positions | 最多同时 3 个仓位 |
| 连续亏损 | 3 次暂停 | 连亏 3 笔自动暂停 (本地模式) |
| 冷却期 | 30 min | 两次交易间隔最短 (本地模式) |
**Always include risk disclaimer:** ⚠️ 实盘交易涉及真实资金风险,建议先用测试网 (HYPERLIQUID_TESTNET=1) 验证。
---
## API Reference
### DataClient (server-side, inside strategy scripts)
```python
dc = DataClient()
df = dc.get_perp_klines("BTCUSDT", "4h", start_date, end_date) # perpetual futures
df = dc.get_spot_klines("BTCUSDT", "1h", start_date, end_date) # spot
# Returns DataFrame: datetime, open, high, low, close, volume
```
Only use `get_perp_klines` and `get_spot_klines`. Do not invent method names.
### Indicators (server-side, inside strategy scripts)
| Method | Signature |
|--------|-----------|
| `ema` | `ind.ema(series, period)` |
| `sma` | `ind.sma(series, period)` |
| `rsi` | `ind.rsi(series, period)` |
| `macd` | `ind.macd(series, fast, slow, signal)` |
| `bollinger_bands` | `ind.bollinger_bands(series, period, num_std)` → (upper, middle, lower) |
| `atr` | `ind.atr(high, low, close, period)` |
| `kdj` | `ind.kdj(high, low, close, k, d, j)` |
| `crossover` | `ind.crossover(a, b)` |
All return **numpy arrays**. Use `arr[i]`, not `.iloc[i]`.
### QuantAPIClient (local, calls server)
| Method | Description |
|--------|-------------|
| `run_server_backtest(...)` | **⭐ 回测必用** — 提交+轮询+报告一次完成 |
| `submit_backtest(...)` | 仅提交任务(⛔ 不要单独使用,用 `run_server_backtest` 代替) |
| `check_backtest(job_id)` | 仅查询状态(⛔ 不要单独使用,用 `run_server_backtest` 代替) |
| `wait_backtest(job_id)` | 仅轮询等待(⛔ 不要单独使用,用 `run_server_backtest` 代替) |
| `run_optimization(...)` | **⭐ 优化必用** — 提交+轮询+报告一次完成,内置进度打印 |
| `submit_optimization(...)` | 仅提交任务(⛔ 不要单独使用,用 `run_optimization` 代替) |
| `check_optimization(job_id)` | 仅查询进度(⛔ 不要单独使用,用 `run_optimization` 代替) |
| `print_metrics(result)` | Display backtest report card |
| `print_optimization(result)` | Display optimization report (auto-called) |
| `start_monitor(script, name, symbol, timeframe, interval)` | 启动服务器监控(最多 3 个同时运行) |
| `check_monitor(job_id)` | 查看监控状态 + 最近信号 |
| `list_monitors()` | 列出我的所有监控任务 |
| `stop_monitor(job_id)` | 停止监控任务 |
| `vault_setup_link()` | 生成一次性安全链接,用户在浏览器中提交私钥 |
| `vault_status()` | 查询密钥是否已配置 |
| `vault_delete()` | 删除已存储的密钥 |
| `print_trades(result)` | Display trade records (only when user asks) |
### Quota
| Feature | Limit |
|---------|-------|
| Strategy generation | Unlimited, free |
| Backtest | Unlimited, free |
| Optimization | Unlimited, free |
| Live monitoring | 3 slots |
---
## NEVER do these
| Forbidden | Why | Correct |
|-----------|-----|---------|
| Run strategy script locally for backtest | Server runs it | `run_server_backtest(script_content=...)` |
| `import os/subprocess/socket` in strategy | Sandbox blocks them | Only `sys`, `numpy`, `data_client`, `indicators` |
| `df.rolling()`, `df.shift()`, `df.apply()` | Server pandas restricted | Use `ind.ema()`, `ind.sma()` etc. |
| Install numpy/pandas for backtest | Server has them | Only `httpx loguru matplotlib` locally |
| Build local backtest engine | Server already has one | Use `run_server_backtest()` |
| 拆分为两个代码块 (submit→poll) | 第二个代码块不会被执行 | 用 `run_server_backtest()` / `run_optimization()` 单代码块 |
| Call `httpx.post()` directly | Missing auth/polling | Use `QuantAPIClient` |
| 用户问"推荐策略"时讲策略类型教程(趋势跟随/均值回归/突破…) | 用户要能直接用的策略,不是上课 | **逐字发送推荐策略硬规则的固定模板**,推荐 2 个正收益策略文件 |
| 只推荐本地运行、不提供服务器监控选项 | 用户可能更想 7×24 服务器监控 | 按 §4 Step 1 先让用户选择模式 |
| 在聊天中索要或接收用户的钱包私钥 | 私钥会留在聊天记录中,极不安全 | 用 `vault_setup_link()` 生成安全链接,用户在浏览器中提交 |
| Manually tweak params + re-backtest when user says "优化" | That's guessing, not optimizing | Use §3 `run_optimization()` |
| Add new indicators/filters when user says "优化" | That's redesign (§1), not optimize (§3) | 优化=调参数, 重新设计=改逻辑 |
| Send text and image as separate TG messages | 用户只看到最后一条 | 一条 TG 图片消息(caption 含指标摘要) |
| Use `` or 只打印路径 | TG 无法渲染本地路径 | `openclaw message send --path <path> --caption <text>` |
| 把 stdout 放在代码块里展示 | 用户在 TG 看不到代码块结果 | 捕获 stdout → 作为 TG 文本消息发送 |
| 自己写分析替代 print 输出 | print 输出已格式化好 | 原样发送 stdout,不改写 |
| 生成 TradingView Pine Script (//@version=5) | 本 Skill 只支持 Python | 用 §1 的 Python 模板生成策略 |
---
## Important Rules
1. **推荐策略 = 逐字发 §0 模板。** 用户问"推荐策略"/"有什么策略"时,禁止讲策略类型教程,必须直接推荐 `sol_rsi_momentum.py` 和 `btc_rsi_momentum.py`。
2. **Backtest first, optimize second.** Get a working strategy before tuning.
3. **单代码块原则。** 回测用 `run_server_backtest()`,优化用 `run_optimization()`,一个代码块搞定全流程。禁止拆分为两个代码块。
4. **所有输出发 TG 消息。** 执行代码后,stdout 输出原样发 TG 文本消息;有图片发 TG 图片消息 + caption。
5. **Retry once on failure.** Automatic, no need to ask.
6. **Indicators return numpy arrays.** `arr[i]` not `.iloc[i]`.
7. **Timestamps: `str(df.iloc[i]["datetime"])`** — never row index.
8. **`lookback` covers longest indicator.** EMA(60) → at least 61 bars warmup.
9. **Descriptive filenames.** `btc_ema_cross_strategy.py`, not `strategy1.py`.
10. **One strategy per file.** Never bundle.
11. **Local deps: `httpx`, `loguru`, `matplotlib`.** Don't install numpy/pandas — server has them.
FILE:clawhub.json
{
"name": "dex-quant-skill",
"tagline": "自然语言 → 量化策略脚本 → 回测验证 → 实时交易",
"description": "加密货币量化交易 AI Skill 技能包。用户用自然语言描述交易规则,AI 自动生成可运行的策略脚本(Python),支持技术指标(MACD/RSI/布林带/KDJ 等)、社交媒体、链上数据等多种信号源。内置回测引擎验证策略历史表现,通过后可部署实时监控执行。",
"version": "3.47.1",
"author": "miyaosk",
"license": "MIT",
"category": "data",
"tags": [
"crypto",
"quant",
"trading",
"backtest",
"strategy",
"optimize",
"binance",
"defi"
],
"repository": "https://github.com/miyaosk/dex-quant-skill",
"support": {
"issues": "https://github.com/miyaosk/dex-quant-skill/issues"
},
"permissions": [
"network",
"filesystem"
],
"skills": []
}
FILE:conductor.json
{
"scripts": {
"setup": "bin/setup"
}
}
FILE:schemas/signal_format.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "StrategySignalOutput",
"description": "策略脚本的标准输出格式。所有策略脚本(py/ts)运行后必须输出此格式的 JSON。",
"type": "object",
"required": ["strategy_name", "strategy_version", "generated_at", "signals"],
"properties": {
"strategy_name": {
"type": "string",
"description": "策略名称"
},
"strategy_version": {
"type": "string",
"description": "策略版本号,如 v1.0"
},
"generated_at": {
"type": "string",
"format": "date-time",
"description": "信号生成时间 (ISO 8601)"
},
"mode": {
"type": "string",
"enum": ["live", "backtest"],
"description": "运行模式:live 实时 / backtest 回测"
},
"signals": {
"type": "array",
"items": {
"$ref": "#/definitions/Signal"
},
"description": "信号列表,可以为空(表示当前无信号)"
},
"warning": {
"type": "string",
"description": "可选的警告信息(如社媒策略不支持回测)"
}
},
"definitions": {
"Signal": {
"type": "object",
"required": ["symbol", "action", "confidence", "reason", "source_type"],
"properties": {
"symbol": {
"type": "string",
"description": "交易对,如 BTCUSDT",
"examples": ["BTCUSDT", "ETHUSDT", "SOLUSDT"]
},
"action": {
"type": "string",
"enum": ["buy", "sell", "close", "hold"],
"description": "交易动作:buy 买入 / sell 卖出 / close 平仓 / hold 持有"
},
"confidence": {
"type": "number",
"minimum": 0,
"maximum": 1,
"description": "信号置信度,0 到 1"
},
"reason": {
"type": "string",
"description": "人类可读的触发原因,如 'MACD 金叉 + 成交量放大 1.8x'"
},
"price_at_signal": {
"type": "number",
"description": "信号触发时的价格"
},
"suggested_stop_loss": {
"type": "number",
"description": "建议的止损价格"
},
"suggested_take_profit": {
"type": "number",
"description": "建议的止盈价格"
},
"source_type": {
"type": "string",
"enum": ["technical", "social", "onchain", "mixed"],
"description": "信号来源类型:technical 技术指标 / social 社媒 / onchain 链上 / mixed 混合"
},
"timestamp": {
"type": "string",
"description": "信号对应的时间点"
},
"metadata": {
"type": "object",
"description": "触发条件的具体数值,如 {macd_dif: 150, rsi: 62}",
"additionalProperties": true
}
}
}
}
}
FILE:scripts/api_client.py
"""
DEX Quant Server API 客户端 — 信号驱动架构(含 Token 认证)
Skill 端调用流程:
1. strategy-maker 生成策略脚本
2. 本地运行脚本,拿到信号列表
3. 调 run_backtest() 把信号发给 Server(自动携带 Token)
4. Server 拉 K 线(带缓存)+ 回测引擎回放信号
5. 返回绩效结果,展示给用户
认证:
- 首次使用自动生成随机设备ID并注册,获取 Token(同时最多 3 个策略监控)
- Token 缓存在 skill 目录下的 .auth.json
- 所有请求自动携带 X-Token 头
用法:
client = QuantAPIClient("https://quant.supersafeclaw.com")
result = client.run_backtest(
strategy_name="BTC MACD 策略",
symbol="BTCUSDT",
timeframe="1h",
start_date="2024-01-01",
end_date="2024-12-31",
signals=[...],
)
client.print_metrics(result)
"""
from __future__ import annotations
import os
import time as _time
from pathlib import Path
from typing import Optional
import httpx
from loguru import logger
from machine_auth import MachineAuth
from server_config import resolve_server_url
API_PREFIX = "/api/v1"
class QuantAPIClient:
"""DEX Quant Server HTTP 客户端(自动认证)"""
def __init__(self, server_url: str | None = None, timeout: float = 300.0):
self.server_url = resolve_server_url(server_url)
self.base_url = self.server_url.rstrip("/") + API_PREFIX
self._client = httpx.Client(timeout=timeout)
self._auth = MachineAuth(self.server_url)
self._token = self._auth.register_or_load()
def _headers(self) -> dict:
return {"X-Token": self._token}
# ═══════════════ 回测 ═══════════════
def run_backtest(
self,
strategy_name: str,
symbol: str,
timeframe: str,
start_date: str,
end_date: str,
signals: list[dict],
strategy_id: str = "",
initial_capital: float = 100_000.0,
leverage: int = 1,
fee_rate: float = 0.0005,
slippage_bps: float = 5.0,
margin_mode: str = "isolated",
direction: str = "long_short",
) -> dict:
"""
提交信号驱动回测。
参数:
strategy_name: 策略名称
symbol: 交易对 (BTCUSDT)
timeframe: K 线周期 (15m / 1h / 2h / 1d)
start_date: 开始日期 "YYYY-MM-DD"
end_date: 结束日期 "YYYY-MM-DD"
signals: 信号列表,每个信号包含:
timestamp, symbol, action (buy/sell/close),
direction (long/short), confidence, reason,
price_at_signal, suggested_stop_loss, suggested_take_profit
返回:
BacktestResponse 字典:
backtest_id, status, metrics, trades, equity_curve, conclusion
"""
payload = {
"strategy_name": strategy_name,
"strategy_id": strategy_id,
"symbol": symbol,
"timeframe": timeframe,
"start_date": start_date,
"end_date": end_date,
"signals": signals,
"initial_capital": initial_capital,
"leverage": leverage,
"fee_rate": fee_rate,
"slippage_bps": slippage_bps,
"margin_mode": margin_mode,
"direction": direction,
}
logger.info(
"提交回测 | {} {} {} | {} → {} | {} 个信号",
strategy_name, symbol, timeframe, start_date, end_date, len(signals),
)
resp = self._client.post(f"{self.base_url}/backtest/run", json=payload, headers=self._headers())
resp.raise_for_status()
result = resp.json()
status = result.get("status", "unknown")
if status == "completed":
metrics = result.get("metrics", {})
logger.info(
"回测完成 | 收益={:.2%} | Sharpe={:.2f} | 回撤={:.2%} | "
"交易={} | 结论={}",
metrics.get("total_return_pct", 0),
metrics.get("sharpe_ratio", 0),
abs(metrics.get("max_drawdown_pct", 0)),
metrics.get("total_trades", 0),
result.get("conclusion", ""),
)
else:
logger.error("回测失败 | {}", result.get("error"))
return result
def get_backtest(self, backtest_id: str) -> dict:
"""查询已保存的回测结果"""
resp = self._client.get(f"{self.base_url}/backtest/{backtest_id}", headers=self._headers())
resp.raise_for_status()
return resp.json()
def get_trades(self, backtest_id: str) -> dict:
"""获取回测交易记录"""
resp = self._client.get(f"{self.base_url}/backtest/{backtest_id}/trades", headers=self._headers())
resp.raise_for_status()
return resp.json()
def get_equity(self, backtest_id: str) -> dict:
"""获取权益曲线"""
resp = self._client.get(f"{self.base_url}/backtest/{backtest_id}/equity", headers=self._headers())
resp.raise_for_status()
return resp.json()
# ═══════════════ 数据 ═══════════════
def get_klines(
self,
symbol: str,
interval: str,
start_date: str,
end_date: str,
exchange: str = "binance",
) -> list[dict]:
"""获取 K 线数据(Server 端带缓存,同币同周期不重复下载)"""
payload = {
"symbol": symbol,
"interval": interval,
"start_date": start_date,
"end_date": end_date,
"exchange": exchange,
}
resp = self._client.post(f"{self.base_url}/data/klines", json=payload, headers=self._headers())
resp.raise_for_status()
result = resp.json()
logger.info("K线 | {} {} | {} 条", symbol, interval, result.get("rows", 0))
return result.get("data", [])
def list_symbols(self, exchange: str = "binance") -> list[str]:
"""列出可用交易对"""
resp = self._client.get(f"{self.base_url}/data/symbols", params={"exchange": exchange}, headers=self._headers())
resp.raise_for_status()
return resp.json().get("symbols", [])
# ═══════════════ 策略 ═══════════════
def save_strategy(
self,
name: str,
script_content: str = "",
description: str = "",
symbol: str = "BTCUSDT",
timeframe: str = "1h",
direction: str = "long_short",
version: str = "v1.0",
tags: list[str] = None,
) -> dict:
"""保存策略到 Server(含脚本源码)"""
payload = {
"name": name,
"description": description,
"script_content": script_content,
"symbol": symbol,
"timeframe": timeframe,
"direction": direction,
"version": version,
"tags": tags or [],
}
resp = self._client.post(f"{self.base_url}/strategies/", json=payload, headers=self._headers())
resp.raise_for_status()
result = resp.json()
logger.info("策略已保存 | {} ({})", name, result.get("strategy_id", ""))
return result
def list_strategies(self) -> list[dict]:
"""列出所有策略"""
resp = self._client.get(f"{self.base_url}/strategies/", headers=self._headers())
resp.raise_for_status()
return resp.json()
def get_strategy(self, strategy_id: str) -> dict:
"""获取策略详情(含脚本源码)"""
resp = self._client.get(f"{self.base_url}/strategies/{strategy_id}", headers=self._headers())
resp.raise_for_status()
return resp.json()
# ═══════════════ 信号 ═══════════════
def save_signals(self, strategy_id: str, signals: list[dict]) -> dict:
"""批量保存信号到 Server"""
resp = self._client.post(
f"{self.base_url}/signals/batch",
json=signals,
params={"strategy_id": strategy_id},
headers=self._headers(),
)
resp.raise_for_status()
return resp.json()
def query_signals(
self,
strategy_id: str = None,
symbol: str = None,
start_date: str = None,
end_date: str = None,
limit: int = 200,
) -> dict:
"""查询信号"""
payload = {"limit": limit}
if strategy_id:
payload["strategy_id"] = strategy_id
if symbol:
payload["symbol"] = symbol
if start_date:
payload["start_date"] = start_date
if end_date:
payload["end_date"] = end_date
resp = self._client.post(f"{self.base_url}/signals/query", json=payload, headers=self._headers())
resp.raise_for_status()
return resp.json()
# ═══════════════ 服务器端执行回测 ═══════════════
def submit_backtest(
self,
script_content: str,
strategy_name: str,
symbol: str = "BTCUSDT",
timeframe: str = "4h",
start_date: str = "",
end_date: str = "",
strategy_id: str = "",
initial_capital: float = 100_000.0,
leverage: int = 1,
fee_rate: float = 0.0005,
slippage_bps: float = 5.0,
margin_mode: str = "isolated",
direction: str = "long_short",
) -> str:
"""
提交回测任务,立即返回 job_id(不等待结果)。
用 check_backtest(job_id) 查看进度和获取结果。
"""
payload = {
"script_content": script_content,
"strategy_name": strategy_name,
"strategy_id": strategy_id,
"symbol": symbol,
"timeframe": timeframe,
"start_date": start_date,
"end_date": end_date,
"initial_capital": initial_capital,
"leverage": leverage,
"fee_rate": fee_rate,
"slippage_bps": slippage_bps,
"margin_mode": margin_mode,
"direction": direction,
}
resp = self._client.post(
f"{self.base_url}/backtest/submit",
json=payload,
headers=self._headers(),
)
resp.raise_for_status()
job_id = resp.json()["job_id"]
print(
f"📋 回测已提交: {job_id} | {strategy_name} ({symbol} {timeframe}, {start_date} → {end_date})",
flush=True,
)
return job_id
def check_backtest(self, job_id: str) -> dict:
"""
查询回测任务状态。返回 dict,status 为 running/completed/failed。
completed 时包含完整的 metrics/trades/equity_curve。
"""
resp = self._client.get(
f"{self.base_url}/backtest/job/{job_id}",
headers=self._headers(),
)
resp.raise_for_status()
job = resp.json()
status = job.get("status", "running")
stage = job.get("stage_label", "")
progress = job.get("progress_pct", 0)
elapsed_s = job.get("elapsed_ms", 0) / 1000
if status == "running":
print(f"⏳ [{elapsed_s:.0f}s] {stage} ({progress:.0f}%)", flush=True)
elif status == "completed":
print(f"✅ 回测完成(耗时 {elapsed_s:.1f}s)", flush=True)
elif status == "failed":
print(f"❌ 回测失败: {job.get('error', '未知错误')}", flush=True)
return job
def wait_backtest(
self,
job_id: str,
poll_interval: float = 5.0,
max_running_logs: int | None = None,
) -> dict:
"""
轮询等待回测完成。
参数:
job_id: submit_backtest() 返回的任务 ID
poll_interval: 轮询间隔秒数
max_running_logs: 最多打印多少条 running 进度,None 表示不限制
"""
running_logs = 0
last_stage = None
last_progress = None
while True:
_time.sleep(poll_interval)
job = self.check_backtest(job_id)
status = job.get("status")
if status == "running":
running_logs += 1
stage = job.get("stage_label")
progress = job.get("progress_pct")
should_stop_logging = (
max_running_logs is not None and running_logs >= max_running_logs
)
if should_stop_logging and stage == last_stage and progress == last_progress:
print("⏳ 回测仍在执行中,继续等待...", flush=True)
last_stage = stage
last_progress = progress
if should_stop_logging:
max_running_logs = None
continue
if status in ("completed", "failed"):
return job
def run_server_backtest(
self,
script_content: str,
strategy_name: str,
symbol: str = "BTCUSDT",
timeframe: str = "4h",
start_date: str = "",
end_date: str = "",
strategy_id: str = "",
initial_capital: float = 100_000.0,
leverage: int = 1,
fee_rate: float = 0.0005,
slippage_bps: float = 5.0,
margin_mode: str = "isolated",
direction: str = "long_short",
poll_interval: float = 5.0,
) -> dict:
"""
提交 + 轮询一步到位(适合支持流式输出的平台)。
如果平台不支持流式输出(如 OpenClaw),请改用:
1. job_id = client.submit_backtest(...)
2. result = client.check_backtest(job_id)
"""
job_id = self.submit_backtest(
script_content=script_content,
strategy_name=strategy_name,
symbol=symbol,
timeframe=timeframe,
start_date=start_date,
end_date=end_date,
strategy_id=strategy_id,
initial_capital=initial_capital,
leverage=leverage,
fee_rate=fee_rate,
slippage_bps=slippage_bps,
margin_mode=margin_mode,
direction=direction,
)
result = self.wait_backtest(job_id, poll_interval=poll_interval)
if result.get("status") == "completed":
self.print_metrics(result)
return result
# ═══════════════ 参数优化 ═══════════════
def run_optimization(
self,
script_content: str,
params: list[dict],
strategy_name: str = "",
symbol: str = "BTCUSDT",
timeframe: str = "4h",
start_date: str = "",
end_date: str = "",
initial_capital: float = 100_000.0,
leverage: int = 3,
fee_rate: float = 0.0005,
slippage_bps: float = 5.0,
margin_mode: str = "isolated",
direction: str = "long_short",
method: str = "grid",
max_combinations: int = 200,
fitness_metric: str = "sharpe_ratio",
poll_interval: int = 10,
) -> dict:
"""
参数优化 — 提交任务后自动轮询进度,完成后返回结果。
脚本中用 PARAMS['xxx'] 引用可调参数。
服务器异步执行,客户端每 poll_interval 秒查一次进度并打印。
参数:
params: 参数空间列表,每项:
{"name": "fast_ema", "type": "int", "low": 5, "high": 30, "step": 5}
{"name": "sl_pct", "type": "float", "low": 0.01, "high": 0.10, "step": 0.01}
{"name": "direction", "type": "choice", "choices": ["long", "short"]}
method: 搜索算法
"grid" — 网格穷举(组合数 ≤ 200)
"genetic" — 遗传算法(推荐,大空间通用)
"bayesian" — 贝叶斯 TPE(少量评估快速收敛)
"random" — 随机采样
"annealing" — 模拟退火
"pso" — 粒子群优化
fitness_metric: 优化目标 (sharpe_ratio / total_return_pct / sortino_ratio / win_rate)
poll_interval: 轮询间隔秒数(默认10秒)
返回:
{status, best_params, best_fitness, results: [{rank, params, fitness, metrics...}]}
"""
payload = {
"script_content": script_content,
"params": params,
"strategy_name": strategy_name,
"symbol": symbol,
"timeframe": timeframe,
"start_date": start_date,
"end_date": end_date,
"initial_capital": initial_capital,
"leverage": leverage,
"fee_rate": fee_rate,
"slippage_bps": slippage_bps,
"margin_mode": margin_mode,
"direction": direction,
"method": method,
"max_combinations": max_combinations,
"fitness_metric": fitness_metric,
}
logger.info(
"提交参数优化 | {} {} {} | {} → {} | 目标={}",
strategy_name, symbol, timeframe, start_date, end_date, fitness_metric,
)
resp = self._client.post(
f"{self.base_url}/backtest/optimize",
json=payload,
headers=self._headers(),
timeout=30.0,
)
resp.raise_for_status()
submit_result = resp.json()
job_id = submit_result.get("job_id")
total = submit_result.get("total_combinations", 0)
logger.info("任务已提交 | job_id={} | 共{}种组合", job_id, total)
print(f"\n⏳ 优化任务已提交 (job_id: {job_id}),共 {total} 种参数组合\n")
last_completed = 0
printed_milestones = set()
milestones = {25, 50, 90}
interval = 5
while True:
_time.sleep(interval)
try:
resp = self._client.get(
f"{self.base_url}/backtest/optimize/{job_id}",
headers=self._headers(),
timeout=15.0,
)
resp.raise_for_status()
progress = resp.json()
except Exception as e:
logger.warning("查询进度失败: {}", e)
interval = min(interval * 2, 300)
continue
status = progress.get("status", "running")
completed = progress.get("completed", 0)
failed = progress.get("failed", 0)
progress_pct = progress.get("progress_pct", 0)
best_fitness = progress.get("current_best_fitness", 0)
best_params = progress.get("current_best_params", {})
elapsed = progress.get("elapsed_ms", 0)
for ms in sorted(milestones):
if ms not in printed_milestones and progress_pct >= ms:
params_str = ", ".join(f"{k}={v}" for k, v in best_params.items()) if best_params else "-"
print(
f" 📊 {ms}% ({completed}/{total}) | "
f"最优 fitness={best_fitness:.4f} | "
f"{params_str} | "
f"{elapsed/1000:.0f}s"
)
printed_milestones.add(ms)
if completed > last_completed:
done_delta = completed - last_completed
time_per_item = (elapsed / 1000) / max(completed, 1)
remaining = (total - completed) * time_per_item
interval = max(5, min(remaining / 4, 300))
last_completed = completed
if status == "completed":
logger.info(
"优化完成 | 评估={} 失败={} | 最优fitness={:.4f} | 耗时={}ms",
completed - failed, failed, best_fitness, elapsed,
)
QuantAPIClient.print_optimization(progress, strategy_name=strategy_name)
return progress
if status == "failed":
logger.error("优化失败: {}", progress.get("error", ""))
print(f"\n❌ 优化失败: {progress.get('error', '未知错误')}")
return progress
def submit_optimization(
self,
script_content: str,
params: list[dict],
strategy_name: str = "",
symbol: str = "BTCUSDT",
timeframe: str = "4h",
start_date: str = "",
end_date: str = "",
initial_capital: float = 100_000.0,
leverage: int = 3,
fee_rate: float = 0.0005,
slippage_bps: float = 5.0,
margin_mode: str = "isolated",
direction: str = "long_short",
method: str = "grid",
max_combinations: int = 200,
fitness_metric: str = "sharpe_ratio",
) -> str:
"""提交优化任务,立即返回 job_id(不等待结果)。"""
payload = {
"script_content": script_content,
"params": params,
"strategy_name": strategy_name,
"symbol": symbol, "timeframe": timeframe,
"start_date": start_date, "end_date": end_date,
"initial_capital": initial_capital, "leverage": leverage,
"fee_rate": fee_rate, "slippage_bps": slippage_bps,
"margin_mode": margin_mode, "direction": direction,
"method": method, "max_combinations": max_combinations,
"fitness_metric": fitness_metric,
}
resp = self._client.post(
f"{self.base_url}/backtest/optimize",
json=payload, headers=self._headers(), timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
job_id = data.get("job_id", "")
total = data.get("total_combinations", 0)
print(f"📋 优化任务已提交: {job_id} | {strategy_name} ({symbol} {timeframe}) | {method} {total}组")
return job_id
def check_optimization(self, job_id: str, strategy_name: str = "") -> dict:
"""查询优化进度。completed 时自动打印报告和生成图片。"""
resp = self._client.get(
f"{self.base_url}/backtest/optimize/{job_id}",
headers=self._headers(), timeout=15.0,
)
resp.raise_for_status()
result = resp.json()
status = result.get("status", "running")
completed = result.get("completed", 0)
failed = result.get("failed", 0)
total = result.get("total", 0)
elapsed = result.get("elapsed_ms", 0) / 1000
pct = result.get("progress_pct", 0)
best = result.get("current_best_fitness", 0)
if status == "running":
print(f"⏳ [{elapsed:.0f}s] {completed}/{total} 已评估 ({pct:.0f}%) | 最优 fitness={best:.4f}")
elif status == "completed":
print(f"✅ 优化完成 ({completed}组, 失败{failed}, 耗时{elapsed:.0f}s)")
QuantAPIClient.print_optimization(result, strategy_name=strategy_name)
elif status == "failed":
print(f"❌ 优化失败: {result.get('error', '')}")
return result
@staticmethod
def print_optimization(result: dict, strategy_name: str = "") -> None:
"""生成优化报告 PNG + caption,和回测报告同一套输出规则。"""
status = result.get("status", "")
if status != "completed":
print(f"优化失败: {result.get('error', '未知错误')}")
return
total = result.get("total", result.get("total_combinations", 0))
completed = result.get("completed", result.get("evaluated", 0))
failed = result.get("failed", 0)
elapsed = result.get("elapsed_ms", 0) / 1000
method = result.get("method", "genetic")
results = result.get("results", [])
success = completed - failed
name = strategy_name or "策略"
method_names = {
"grid": "网格穷举", "genetic": "遗传算法", "bayesian": "贝叶斯",
"random": "随机搜索", "annealing": "模拟退火", "pso": "粒子群",
}
method_label = method_names.get(method, method)
lines = [
f"🔧 {name} 参数优化报告",
f"━━━━━━━━━━━━━━━━━━━━",
f"🧬 算法 {method_label} ⏱️ 耗时 {elapsed:.0f}s",
f"📊 评估 {success}/{total}组 ❌ 失败 {failed}组",
]
if not results:
lines.append("━━━━━━━━━━━━━━━━━━━━")
lines.append("❌ 无有效结果,所有参数组合均失败")
caption = "\n".join(lines)
result["_caption"] = caption
print(f"\n{caption}")
return
top = results[0]
top_ret = top.get("total_return_pct", 0)
top_sharpe = top.get("sharpe_ratio", 0)
top_dd = abs(top.get("max_drawdown_pct", 0))
top_wr = top.get("win_rate", 0)
top_trades = top.get("total_trades", 0)
top_params = top.get("params", {})
ret_icon = "📈" if top_ret >= 0 else "📉"
lines.append(f"━━━━━━━━━━━━━━━━━━━━")
lines.append(f"🥇 最优参数:")
params_str = " ".join(f"{k}={v}" for k, v in top_params.items())
lines.append(f" {params_str}")
lines.append(f"━━━━━━━━━━━━━━━━━━━━")
lines.append(f"{ret_icon} 收益 {top_ret:+.2%} 📐 Sharpe {top_sharpe:.2f}")
lines.append(f"⚡ 回撤 {top_dd:.2%} 🎯 胜率 {top_wr:.0%} 🔄 交易 {top_trades}笔")
if len(results) > 1:
lines.append(f"━━━━━━━━━━━━━━━━━━━━")
lines.append(f"📋 Top {min(len(results), 5)} 排名")
medals = ["🥇", "🥈", "🥉"]
for r in results[:5]:
rank = r.get("rank", 1)
medal = medals[rank - 1] if rank <= 3 else f"#{rank}"
ret = r.get("total_return_pct", 0)
sharpe = r.get("sharpe_ratio", 0)
dd = abs(r.get("max_drawdown_pct", 0))
wr = r.get("win_rate", 0)
trades = r.get("total_trades", 0)
lines.append(f" {medal} {ret:+.2%} Sharpe {sharpe:.2f} 回撤 {dd:.2%} 胜率 {wr:.0%} {trades}笔")
lines.append(f"━━━━━━━━━━━━━━━━━━━━")
if top_sharpe >= 1.5 and top_ret > 0.1:
lines.append(f"✅ 优化结果优秀,建议用最优参数回测验证后小仓实盘")
elif top_sharpe >= 0.5 and top_ret > 0:
lines.append(f"⚠️ 优化结果尚可,建议用最优参数回测验证")
elif top_ret > 0:
lines.append(f"⚠️ 优化结果偏弱(Sharpe {top_sharpe:.2f}),建议改进策略结构后重新优化")
else:
lines.append(f"❌ 最优参数仍亏损,建议重新设计策略逻辑")
lines.append(f"")
lines.append(f"🔄 下一步: 回复「回测」用最优参数跑完整回测验证")
caption = "\n".join(lines)
result["_caption"] = caption
chart_path = QuantAPIClient._print_optimization_chart(results, name, method_label, success, total, failed, elapsed)
if chart_path:
result["_optimization_chart_path"] = chart_path
print(f"\n{caption}")
if chart_path:
print(f"\n[SYSTEM] 图片: {chart_path}")
print(f"[SYSTEM] 发送图片附件 + 上方 caption,禁止额外文字")
@staticmethod
def _print_optimization_chart(results: list, strategy_name: str, method_label: str,
success: int, total: int, failed: int, elapsed: float) -> str | None:
"""生成优化报告 PNG(排名表 + 收益对比条形图)。"""
if not results:
return None
try:
import matplotlib
except ImportError:
return None
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
QuantAPIClient._setup_chinese_font()
BG = "#0f0f1a"
CARD = "#1c2333"
WHITE = "#e8e8e8"
GREEN = "#00d4aa"
RED = "#ff6b6b"
GRAY = "#8a8fa0"
LIGHT = "#c8cad0"
YELLOW = "#ffd93d"
top5 = results[:5]
n = len(top5)
fig = plt.figure(figsize=(12, max(4, 1.5 + n * 1.2)), facecolor=BG)
gs = GridSpec(2, 1, figure=fig, height_ratios=[1, max(2, n * 0.8)],
hspace=0.25, left=0.05, right=0.68, top=0.92, bottom=0.06)
ax_hdr = fig.add_subplot(gs[0])
ax_hdr.set_facecolor(BG)
ax_hdr.set_xlim(0, 10)
ax_hdr.set_ylim(0, 3)
ax_hdr.axis("off")
safe_name_title = QuantAPIClient._safe_title(strategy_name or "Strategy")
ax_hdr.text(5, 2.4, f"{safe_name_title} — Optimization", fontsize=14,
color=WHITE, ha="center", va="center", fontweight="bold")
ax_hdr.text(5, 1.5, f"Algorithm: {method_label} | Evaluated: {success}/{total} | Failed: {failed} | Time: {elapsed:.0f}s",
fontsize=9, color=GRAY, ha="center", va="center")
top = top5[0]
top_ret = top.get("total_return_pct", 0)
top_sharpe = top.get("sharpe_ratio", 0)
top_dd = abs(top.get("max_drawdown_pct", 0))
top_wr = top.get("win_rate", 0)
top_trades = top.get("total_trades", 0)
kpi_items = [
("Return", f"{top_ret:+.2%}", GREEN if top_ret >= 0 else RED),
("Sharpe", f"{top_sharpe:.2f}", GREEN if top_sharpe >= 0.5 else (YELLOW if top_sharpe > 0 else RED)),
("MaxDD", f"{top_dd:.2%}", GREEN if top_dd < 0.05 else (YELLOW if top_dd < 0.15 else RED)),
("WinRate", f"{top_wr:.0%}", GREEN if top_wr >= 0.5 else (YELLOW if top_wr >= 0.35 else RED)),
("Trades", f"{top_trades}", LIGHT),
]
for i, (label, val, clr) in enumerate(kpi_items):
x = 1 + i * 1.8
ax_hdr.text(x, 0.6, val, fontsize=12, color=clr, ha="center", va="center", fontweight="bold")
ax_hdr.text(x, 0.1, label, fontsize=7, color=GRAY, ha="center", va="center")
ax_bar = fig.add_subplot(gs[1])
ax_bar.set_facecolor(CARD)
ranks = list(range(n, 0, -1))
returns = [r.get("total_return_pct", 0) * 100 for r in top5]
colors = [GREEN if r >= 0 else RED for r in returns]
medals = ["#1", "#2", "#3", "#4", "#5"]
bars = ax_bar.barh(ranks, returns, color=colors, height=0.6, alpha=0.85)
x_min = min(returns) if returns else 0
x_max = max(returns) if returns else 0
x_range = x_max - x_min if x_max != x_min else 1
for i, r in enumerate(top5):
sharpe = r.get("sharpe_ratio", 0)
wr = r.get("win_rate", 0)
trades = r.get("total_trades", 0)
params = r.get("params", {})
params_short = ", ".join(f"{k}={v}" for k, v in list(params.items())[:5])
if len(params) > 5:
params_short += " ..."
ax_bar.text(1.02, ranks[i] + 0.18, f"Sharpe {sharpe:.2f} WR {wr:.0%} {trades}t",
fontsize=7.5, color=LIGHT, va="center", transform=ax_bar.get_yaxis_transform())
ax_bar.text(1.02, ranks[i] - 0.18, params_short,
fontsize=6.5, color=GRAY, va="center", transform=ax_bar.get_yaxis_transform())
ax_bar.set_yticks(ranks)
ax_bar.set_yticklabels([medals[i] for i in range(n)], fontsize=10, color=WHITE, fontweight="bold")
ax_bar.set_xlabel("Return %", fontsize=9, color=GRAY)
ax_bar.tick_params(axis="x", colors=GRAY, labelsize=8)
ax_bar.axvline(x=0, color=GRAY, linewidth=0.5, alpha=0.5)
ax_bar.spines["top"].set_visible(False)
ax_bar.spines["right"].set_visible(False)
ax_bar.spines["bottom"].set_color(GRAY)
ax_bar.spines["left"].set_color(GRAY)
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output")
Path(output_dir).mkdir(parents=True, exist_ok=True)
safe_name = (strategy_name or "optimize").replace(" ", "_").replace("/", "_")[:30]
ts = int(_time.time())
filepath = os.path.join(output_dir, f"{safe_name}_opt_{ts}.png")
fig.savefig(filepath, dpi=150, facecolor=fig.get_facecolor())
plt.close(fig)
return filepath
# ═══════════════ 配额 ═══════════════
def check_quota(self) -> dict:
"""查询当前机器码的策略配额"""
return self._auth.check_quota()
def print_quota(self) -> None:
"""打印配额信息"""
self._auth.print_quota()
# ═══════════════ 展示工具 ═══════════════
@staticmethod
def print_metrics(result: dict) -> None:
"""生成回测报告图片并打印 caption + 文件路径。
所有指标、评分、资金曲线合并到一张 PNG 中。
AI 只需发送这一张图片(附带 caption),无需单独发文字。
"""
if result.get("status") != "completed":
print(f"回测失败: {result.get('error', '未知错误')}")
return
m = result.get("metrics", {})
ret = m.get('total_return_pct', 0)
bal = m.get('final_balance', 0)
init_cap = m.get('initial_capital', 100000)
margin_mode = result.get('margin_mode', 'isolated')
leverage = result.get('leverage', m.get('leverage', 1))
mode_label = "逐仓" if margin_mode == "isolated" else "全仓"
evaluation = m.get('evaluation', {})
grade = evaluation.get('grade', '')
grade_label = evaluation.get('grade_label', '')
name = result.get('strategy_name', '策略')
chart_path = QuantAPIClient._print_equity_chart(
result.get("equity_curve", []),
init_cap,
strategy_name=name,
metrics=m,
evaluation=evaluation,
leverage=leverage,
mode_label=mode_label,
trades=result.get("trades", []),
)
if chart_path:
result["_equity_chart_path"] = chart_path
ret_icon = "📈" if ret >= 0 else "📉"
lines = [
f"📊 {name} 回测报告",
f"━━━━━━━━━━━━━━━━━━━━",
f"💰 本金 {init_cap:,.0f} → 余额 {bal:,.0f}",
f"{ret_icon} 收益 {ret:+.2%} 📐 Sharpe {m.get('sharpe_ratio', 0):.2f} 📐 Sortino {m.get('sortino_ratio', 0):.2f}",
f"⚡ 回撤 {abs(m.get('max_drawdown_pct', 0)):.2%} 🎯 胜率 {m.get('win_rate', 0):.1%} ⚖️ 盈亏比 {m.get('profit_loss_ratio', 0):.2f}",
f"🔄 交易 {m.get('total_trades', 0)}笔 🏗️ 杠杆 {leverage}x 📦 仓位 {mode_label}",
]
if m.get('liquidation_count', 0) > 0:
lines.append(f"💥 爆仓 {m['liquidation_count']} 次")
conclusion = result.get("conclusion", "")
conclusion_map = {
"approved": "✅ 通过,可考虑实盘",
"paper_trade_first": "⚠️ 先模拟,不建议直接实盘",
"rejected": "❌ 驳回,建议重新设计策略",
}
items = evaluation.get("items", [])
lines.append(f"━━━━━━━━━━━━━━━━━━━━")
if items:
score_val = evaluation.get("score", 0)
max_score = evaluation.get("max_score", 14)
lines.append(f"🏆 评分 {score_val}/{max_score} {grade}级")
for item in items:
s = item["score"]
dot = "🟢" if s == 2 else ("🟡" if s == 1 else "🔴")
label = "优" if s == 2 else ("及格" if s == 1 else "差")
lines.append(f"{dot} {item['name']} {item['value']}({label})")
lines.append(f"━━━━━━━━━━━━━━━━━━━━")
if grade_label:
conclusion_text = f"[{grade}] {grade_label}"
elif conclusion:
conclusion_text = conclusion_map.get(conclusion, conclusion)
elif ret >= 0 and m.get('sharpe_ratio', 0) > 0.5:
conclusion_text = "⚠️ 建议先模拟观察"
elif ret < 0:
conclusion_text = "❌ 策略亏损,建议优化后重测"
else:
conclusion_text = "⚠️ 收益偏低,建议优化参数"
if ret > 0.2 and m.get('sharpe_ratio', 0) > 1.5:
advice = "可考虑小仓实盘或优化参数"
elif ret > 0:
advice = "建议优化参数后重测"
elif m.get('total_trades', 0) == 0:
advice = "没有交易信号,入场条件可能太严格"
else:
advice = "可优化参数或重新设计入场/出场逻辑"
lines.append(f"📋 {conclusion_text},{advice}")
trades = result.get("trades", [])
if trades:
opens = [t for t in trades if t.get("action") == "open"]
closes = [t for t in trades if t.get("action") != "open"]
lines.append(f"")
lines.append(f"📝 交易摘要 ({len(opens)}开/{len(closes)}平,前5笔)")
for t in trades[:5]:
dt = t.get('datetime', '')[:16].replace('T', ' ')
action = "开" if t.get('action') == 'open' else "平"
side = "多" if t.get('side') == 'long' else "空"
price = t.get('price', 0)
pnl = t.get('pnl', 0)
pnl_s = f"盈亏{pnl:+.1f}" if t.get('action') != 'open' else ""
lines.append(f" {dt} {action}{side} {price:,.1f} {pnl_s}")
if len(trades) > 5:
lines.append(f" ...还有 {len(trades) - 5} 笔")
lines.append("")
lines.append("━━━━━━━━━━━━━━━━━━━━")
if grade == "F" or m.get('total_trades', 0) == 0:
lines.append("🔄 下一步: 策略逻辑需要重新设计,回复「新策略」重新开始")
else:
lines.append("🔧 下一步: 可用服务器算法自动搜索最优参数,请选择:")
lines.append(" 1️⃣ genetic(遗传算法)← 推荐")
lines.append(" 2️⃣ bayesian(贝叶斯)")
lines.append(" 3️⃣ grid(网格穷举)")
lines.append(" 4️⃣ random(随机搜索)")
lines.append(" 5️⃣ annealing(模拟退火)")
lines.append(" 6️⃣ pso(粒子群)")
lines.append("回复数字或算法名即可开始优化")
caption = "\n".join(lines)
result["_caption"] = caption
print(f"\n{caption}")
if chart_path:
print(f"\n[SYSTEM] 图片: {chart_path}")
print(f"[SYSTEM] 发送图片附件 + 上方 caption,禁止额外文字")
@staticmethod
def _print_evaluation(evaluation: dict) -> None:
"""评分卡:逐项展示达标/不达标"""
if not evaluation or not evaluation.get("items"):
return
items = evaluation["items"]
score = evaluation.get("score", 0)
max_score = evaluation.get("max_score", 14)
grade = evaluation.get("grade", "?")
grade_bar = "█" * score + "░" * (max_score - score)
print(f"\n 📊 策略评分 {score}/{max_score} [{grade_bar}] {grade}级")
print(f" {'─' * 52}")
print(f" {'指标':<8} {'实际值':>8} {'得分':>4} {'标准'}")
print(f" {'─' * 52}")
for item in items:
s = item["score"]
icon = "🟢" if s == 2 else ("🟡" if s == 1 else "🔴")
print(f" {icon} {item['name']:<6} {item['value']:>8} {s}/{item['max']} {item['thresholds']}")
print(f" {'─' * 52}")
print(f" 结论: {evaluation.get('grade_label', '')}")
print()
_font_configured = False
_has_cjk_font = False
@staticmethod
def _setup_chinese_font() -> None:
"""配置 matplotlib 中文字体(只执行一次)。"""
if QuantAPIClient._font_configured:
return
QuantAPIClient._font_configured = True
import matplotlib
import matplotlib.font_manager as fm
candidates = [
"Noto Sans CJK SC", "Noto Sans SC", "Source Han Sans SC",
"WenQuanYi Micro Hei", "WenQuanYi Zen Hei",
"SimHei", "Microsoft YaHei", "PingFang SC", "Heiti SC",
"Arial Unicode MS",
]
available = {f.name for f in fm.fontManager.ttflist}
for name in candidates:
if name in available:
matplotlib.rcParams["font.sans-serif"] = [name, "DejaVu Sans"]
matplotlib.rcParams["axes.unicode_minus"] = False
QuantAPIClient._has_cjk_font = True
return
import subprocess
try:
subprocess.run(
["apt-get", "install", "-y", "--no-install-recommends", "fonts-noto-cjk"],
capture_output=True, timeout=30,
)
fm._load_fontmanager(try_read_cache=False)
for name in candidates:
if name in {f.name for f in fm.fontManager.ttflist}:
matplotlib.rcParams["font.sans-serif"] = [name, "DejaVu Sans"]
matplotlib.rcParams["axes.unicode_minus"] = False
QuantAPIClient._has_cjk_font = True
return
except Exception:
pass
font_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "fonts")
font_file = os.path.join(font_dir, "NotoSansSC-Regular.ttf")
if os.path.exists(font_file):
fm.fontManager.addfont(font_file)
prop = fm.FontProperties(fname=font_file)
matplotlib.rcParams["font.sans-serif"] = [prop.get_name(), "DejaVu Sans"]
matplotlib.rcParams["axes.unicode_minus"] = False
QuantAPIClient._has_cjk_font = True
@staticmethod
def _safe_title(text: str) -> str:
"""中文字体不可用时,把中文替换掉避免方框。"""
if QuantAPIClient._has_cjk_font:
return text
import re
cleaned = re.sub(r'[\u4e00-\u9fff]+', '', text).strip()
return cleaned if cleaned else "Strategy Report"
@staticmethod
def _print_equity_chart(
equity_curve: list,
initial_capital: float,
strategy_name: str = "",
output_dir: str = "",
metrics: dict | None = None,
evaluation: dict | None = None,
leverage: int = 1,
mode_label: str = "逐仓",
trades: list | None = None,
) -> str | None:
"""生成完整回测报告图(指标 + 评分 + 资金曲线),返回文件路径。"""
if not equity_curve or len(equity_curve) < 2:
return None
try:
import matplotlib
except ImportError:
print(" ⏳ 正在安装 matplotlib ...", flush=True)
import subprocess, sys
try:
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "-q", "matplotlib"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
import matplotlib
except Exception:
print(" ⚠ matplotlib 安装失败,跳过图表生成")
return None
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.gridspec import GridSpec
from datetime import datetime
QuantAPIClient._setup_chinese_font()
metrics = metrics or {}
evaluation = evaluation or {}
trades = trades or []
equities = [e.get("equity", initial_capital) for e in equity_curve]
raw_dates = [e.get("datetime", "") for e in equity_curve]
dates = []
for d in raw_dates:
if not d:
dates = None
break
try:
if "T" in d:
dates.append(datetime.fromisoformat(d.replace("Z", "+00:00").replace("+00:00", "")))
elif len(d) >= 19:
dates.append(datetime.strptime(d[:19], "%Y-%m-%d %H:%M:%S"))
elif len(d) >= 10:
dates.append(datetime.strptime(d[:10], "%Y-%m-%d"))
else:
dates = None
break
except Exception:
dates = None
break
if dates is None or len(dates) != len(equities):
dates = list(range(len(equities)))
hi_val, lo_val = max(equities), min(equities)
hi_idx = equities.index(hi_val)
lo_idx = equities.index(lo_val)
final_val = equities[-1]
ret_pct = (final_val - initial_capital) / initial_capital * 100
BG = "#0f0f1a"
PANEL = "#161b2e"
CARD = "#1c2333"
WHITE = "#e8e8e8"
GREEN = "#00d4aa"
RED = "#ff6b6b"
YELLOW = "#ffd93d"
GRAY = "#8a8fa0"
LIGHT = "#c8cad0"
color = GREEN if final_val >= initial_capital else RED
from matplotlib.patches import FancyBboxPatch
fig = plt.figure(figsize=(10, 7), facecolor=BG)
gs = GridSpec(2, 1, figure=fig,
height_ratios=[1, 3],
hspace=0.15,
left=0.10, right=0.94, top=0.96, bottom=0.06)
title = QuantAPIClient._safe_title(strategy_name or "Backtest Report")
sign = "+" if ret_pct >= 0 else ""
# ════════ 顶部: 策略名 + KPI 卡片 ════════
ax_top = fig.add_subplot(gs[0])
ax_top.set_facecolor(BG)
ax_top.axis("off")
ax_top.text(0.5, 0.88, title, transform=ax_top.transAxes,
fontsize=22, color=WHITE, weight="bold", ha="center", va="center")
ax_top.text(0.5, 0.74, f"{initial_capital:,.0f} → {final_val:,.0f}",
transform=ax_top.transAxes,
fontsize=13, color=GRAY, ha="center", va="center")
lev_label = "Iso" if mode_label == "逐仓" else "Cross"
kpi_data = [
("Return", f"{sign}{ret_pct:.2f}%", color),
("Sharpe", f"{metrics.get('sharpe_ratio', 0):.2f}", LIGHT),
("MaxDD", f"{abs(metrics.get('max_drawdown_pct', 0)):.2%}", LIGHT),
("WinRate", f"{metrics.get('win_rate', 0):.1%}", LIGHT),
("P/L", f"{metrics.get('profit_loss_ratio', 0):.2f}", LIGHT),
("Trades", f"{metrics.get('total_trades', 0)}", LIGHT),
("Lev", f"{leverage}x {lev_label}", LIGHT),
]
n_kpi = len(kpi_data)
card_w = 0.88 / n_kpi
card_x0 = 0.06
for i, (label, val, val_color) in enumerate(kpi_data):
cx = card_x0 + i * card_w + card_w / 2
ax_top.add_patch(FancyBboxPatch(
(card_x0 + i * card_w + 0.005, 0.12), card_w - 0.01, 0.52,
transform=ax_top.transAxes,
boxstyle="round,pad=0.015", facecolor=CARD, edgecolor="#2a3050", linewidth=0.8,
))
ax_top.text(cx, 0.50, val, transform=ax_top.transAxes,
fontsize=15, color=val_color, weight="bold", ha="center", va="center")
ax_top.text(cx, 0.22, label, transform=ax_top.transAxes,
fontsize=11, color=GRAY, ha="center", va="center")
# ════════ 中部: 资金曲线 ════════
ax_chart = fig.add_subplot(gs[1])
ax_chart.set_facecolor(PANEL)
ax_chart.plot(dates, equities, color=color, linewidth=1.8, zorder=3)
ax_chart.fill_between(dates, equities, initial_capital, alpha=0.12, color=color, zorder=2)
ax_chart.axhline(y=initial_capital, color=WHITE, linewidth=0.8, linestyle="--", alpha=0.3, zorder=1)
ax_chart.plot(dates[hi_idx], hi_val, "^", color=GREEN, markersize=10, zorder=4)
ax_chart.annotate(f"High {hi_val:,.0f}", (dates[hi_idx], hi_val),
textcoords="offset points", xytext=(6, 12),
fontsize=10, color=GREEN, weight="bold")
ax_chart.plot(dates[lo_idx], lo_val, "v", color=RED, markersize=10, zorder=4)
ax_chart.annotate(f"Low {lo_val:,.0f}", (dates[lo_idx], lo_val),
textcoords="offset points", xytext=(6, -16),
fontsize=10, color=RED, weight="bold")
ax_chart.tick_params(colors=GRAY, labelsize=10)
def _y_fmt(x, _):
if abs(x) >= 1_000_000:
return f"{x/1_000_000:.1f}M"
if abs(x) >= 1_000:
return f"{x/1_000:.1f}K"
return f"{x:.0f}"
ax_chart.yaxis.set_major_formatter(plt.FuncFormatter(_y_fmt))
if isinstance(dates[0], datetime):
span_days = (dates[-1] - dates[0]).days
if span_days > 180:
ax_chart.xaxis.set_major_locator(mdates.MonthLocator(interval=2))
elif span_days > 60:
ax_chart.xaxis.set_major_locator(mdates.MonthLocator())
else:
ax_chart.xaxis.set_major_locator(mdates.WeekdayLocator(interval=2))
ax_chart.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
plt.setp(ax_chart.get_xticklabels(), rotation=30, ha="right")
ax_chart.grid(True, alpha=0.12, color=WHITE)
for spine in ax_chart.spines.values():
spine.set_color("#2a3050")
if not output_dir:
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output")
Path(output_dir).mkdir(parents=True, exist_ok=True)
safe_name = (strategy_name or "report").replace(" ", "_").replace("/", "_")[:30]
ts = int(_time.time())
filepath = os.path.join(output_dir, f"{safe_name}_{ts}.png")
fig.savefig(filepath, dpi=150, facecolor=fig.get_facecolor())
plt.close(fig)
print(f"\n[SYSTEM] 图片已保存: {filepath}")
return filepath
@staticmethod
def _print_trade_details(trades: list, default_leverage: int = 1, mode_label: str = "逐仓", limit: int = 30) -> None:
"""带仓位/杠杆/保证金的交易明细表"""
if not trades:
return
opens = [t for t in trades if t.get("action") == "open"]
closes = [t for t in trades if t.get("action") != "open"]
total = len(trades)
print(f" 📋 交易明细(共 {total} 笔: {len(opens)} 开 / {len(closes)} 平,显示前 {min(limit, total)} 笔)")
print(f" {'━' * 88}")
print(f" {'#':>3} {'时间':<17} {'动作':<5} {'方向':<5} {'价格':>10} {'数量':>8} {'杠杆':>4} {'仓位模式':<5} {'保证金':>10} {'盈亏':>10}")
print(f" {'─' * 88}")
for t in trades[:limit]:
tid = t.get('trade_id', 0)
dt = t.get('datetime', '')[:16]
action = t.get('action', '')
side = t.get('side', '')
price = t.get('price', 0)
qty = t.get('quantity', 0)
lev = t.get('leverage', default_leverage)
pnl = t.get('pnl', 0)
margin = t.get('margin_used', 0)
if margin == 0:
nominal = price * qty
margin = nominal / lev if lev > 0 else nominal
t_mode = t.get('margin_mode', '')
t_mode_label = ("逐仓" if t_mode == "isolated" else "全仓") if t_mode else mode_label
action_icon = "🟢" if action == "open" else "🔴"
side_label = "多" if side == "long" else "空"
pnl_str = f"{pnl:>+10.2f}" if action != "open" else f"{'—':>10}"
print(
f" {tid:>3} {dt:<17} {action_icon}{action:<4} {side_label:<5}"
f" {price:>10.2f} {qty:>8.4f} {lev:>3}x {t_mode_label:<5}"
f" {margin:>10.2f} {pnl_str}"
)
if total > limit:
print(f" ... 还有 {total - limit} 笔未显示")
print(f" {'━' * 88}")
@staticmethod
def print_trades(result: dict, limit: int = 30) -> None:
"""打印交易记录(复用详细表格)"""
leverage = result.get("leverage", result.get("metrics", {}).get("leverage", 1))
margin_mode = result.get("margin_mode", "isolated")
mode_label = "逐仓" if margin_mode == "isolated" else "全仓"
QuantAPIClient._print_trade_details(result.get("trades", []), leverage, mode_label, limit)
@staticmethod
def print_conclusion(result: dict) -> None:
"""兼容旧调用,现在 print_metrics 已包含结论"""
pass
# ═══════════════ 策略监控 (服务器端) ═══════════════
def start_monitor(
self,
script_content: str,
strategy_name: str = "",
symbol: str = "BTCUSDT",
timeframe: str = "4h",
interval_seconds: int = 14400,
risk_rules: dict | None = None,
) -> dict:
"""
启动服务器端策略监控(同一用户最多同时 3 个)。
服务器定时执行策略脚本 generate_signals(mode='live'),
存储可执行信号供客户端轮询。
超过 3 个策略需改用本地运行。
返回: {"job_id": "mon_xxx", "status": "running", "quota_used": 1, ...}
"""
if risk_rules is None:
risk_rules = {"min_confidence": 0.6, "max_position_pct": 10.0, "max_concurrent": 3}
payload = {
"script_content": script_content,
"strategy_name": strategy_name,
"symbol": symbol,
"timeframe": timeframe,
"interval_seconds": interval_seconds,
"risk_rules": risk_rules,
}
resp = self._client.post(f"{self.base_url}/monitor/start", json=payload, headers=self._headers())
if resp.status_code == 429:
data = resp.json()
print(f"\n❌ 已有 3 个策略在服务器运行,请先停止一个或改用本地运行")
print(f" 用 client.list_monitors() 查看运行中的任务")
return data
resp.raise_for_status()
data = resp.json()
interval_h = interval_seconds / 3600
print(f"\n{'━' * 40}")
print(f" ✅ 监控已启动")
print(f" 📋 Job ID: {data['job_id']}")
print(f" 📊 策略: {data.get('strategy_name', strategy_name)}")
print(f" 🪙 交易对: {symbol}")
print(f" ⏱ 间隔: 每 {interval_h:.1f}h")
print(f" 📦 在跑: {data.get('quota_used', '?')}/3")
print(f"{'━' * 40}")
return data
def stop_monitor(self, job_id: str) -> dict:
"""停止服务器端监控任务。"""
resp = self._client.post(f"{self.base_url}/monitor/{job_id}/stop", headers=self._headers())
resp.raise_for_status()
data = resp.json()
print(f"\n ⏹ 监控已停止: {job_id}")
print(f" 📦 在跑: {data.get('quota_used', '?')}/3")
return data
def list_monitors(self) -> dict:
"""列出我的所有监控任务。"""
resp = self._client.get(f"{self.base_url}/monitor/list", headers=self._headers())
resp.raise_for_status()
data = resp.json()
monitors = data.get("monitors", [])
quota_used = data.get("quota_used", 0)
print(f"\n{'━' * 50}")
print(f" 📡 策略监控列表 | 在跑 {quota_used}/3")
print(f"{'━' * 50}")
if not monitors:
print(f" (无运行中的监控任务)")
else:
for m in monitors:
status_icon = "🟢" if m["status"] == "running" else "⏹"
interval_h = m["interval_seconds"] / 3600
print(
f" {status_icon} {m['job_id']} | {m['strategy_name']:<20} | "
f"{m['symbol']} {m['timeframe']} | "
f"每{interval_h:.1f}h | "
f"信号:{m['total_signals']} | "
f"轮次:{m['total_cycles']}"
)
if m.get("last_run_at"):
print(f" 最后执行: {m['last_run_at']}")
print(f"{'━' * 50}")
return data
def check_monitor(self, job_id: str) -> dict:
"""查看监控任务状态 + 最近信号。"""
resp = self._client.get(f"{self.base_url}/monitor/{job_id}", headers=self._headers())
resp.raise_for_status()
data = resp.json()
status_icon = "🟢" if data["status"] == "running" else "⏹"
interval_h = data["interval_seconds"] / 3600
print(f"\n{'━' * 45}")
print(f" {status_icon} 监控状态: {data['status']}")
print(f" 📋 Job: {data['job_id']}")
print(f" 📊 策略: {data['strategy_name']}")
print(f" 🪙 交易对: {data['symbol']} {data['timeframe']}")
print(f" ⏱ 间隔: 每 {interval_h:.1f}h")
print(f" 🔄 已执行: {data['total_cycles']} 轮")
print(f" 📈 累计信号: {data['total_signals']} 个")
if data.get("last_run_at"):
print(f" 🕐 最后执行: {data['last_run_at']}")
if data.get("last_error"):
print(f" ❌ 最后错误: {data['last_error']}")
print(f"{'━' * 45}")
last_signals = data.get("last_signals", [])
if last_signals:
print(f"\n 📡 最近信号 ({len(last_signals)} 个):")
for s in last_signals[-5:]:
action = s.get("action", "?")
direction = s.get("direction", "?")
symbol = s.get("symbol", "?")
confidence = s.get("confidence", 0)
price = s.get("price_at_signal", 0)
reason = s.get("reason", "")[:40]
icon = "🟢" if action == "buy" else "🔴"
print(f" {icon} {action} {direction} {symbol} @ {price:.2f} | conf={confidence:.2f} | {reason}")
print()
return data
# ═══════════════ 密钥保险箱 (Vault) ═══════════════
def vault_setup_link(self) -> dict:
"""
生成一次性密钥设置链接。
用户在浏览器中打开该链接,粘贴私钥并提交。
私钥通过 HTTPS 传输,AES-256-GCM 加密存储在服务器。
不经过聊天记录。
返回: {"url": "https://...", "token": "vt_xxx", "expires_in_minutes": 30}
"""
resp = self._client.post(f"{self.base_url}/vault/setup-link", headers=self._headers())
resp.raise_for_status()
data = resp.json()
print(f"\n{'━' * 50}")
print(f" 🔐 密钥设置链接已生成")
print(f" 📎 链接: {data['url']}")
print(f" ⏰ 有效期: {data['expires_in_minutes']} 分钟")
print(f"{'━' * 50}")
print(f"\n 请在浏览器中打开以上链接,粘贴你的钱包私钥。")
print(f" 私钥不会出现在聊天记录中。\n")
return data
def vault_status(self) -> dict:
"""
查询密钥存储状态。
返回: {"has_key": true/false, "network": "mainnet/testnet", ...}
"""
resp = self._client.get(f"{self.base_url}/vault/status", headers=self._headers())
resp.raise_for_status()
data = resp.json()
if data.get("has_key"):
net = data.get("network", "mainnet")
net_icon = "🌐" if net == "mainnet" else "🧪"
print(f"\n ✅ 密钥已配置 | {net_icon} {net}")
else:
print(f"\n ❌ 尚未配置密钥")
return data
def vault_delete(self) -> dict:
"""删除已存储的密钥。"""
resp = self._client.delete(f"{self.base_url}/vault/key", headers=self._headers())
resp.raise_for_status()
data = resp.json()
print(f"\n 🗑️ 密钥已删除")
return data
# ═══════════════ 生命周期 ═══════════════
def close(self):
self._client.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
FILE:scripts/data_client.py
"""
多源数据客户端 — 直接调用公开 API
数据源:
- Binance Futures: K线、资金费率、持仓量、合约信息、标记价格
- Binance Spot: 现货 K线
- CoinGecko: PAXG/XAUT 等加密代币价格
- yfinance: 美股(AAPL/NVDA/SPY...)、大宗商品(WTI/NG/铜)、贵金属现货(XAU/XAG)
- DeFi Llama: 协议 TVL、手续费收入(免费端点)
全部免费公开端点,无需 API Key。
国内访问 Binance 可配置代理: PROXY_URL 环境变量。
"""
import time
from datetime import datetime, timezone
from typing import Optional
import httpx
import pandas as pd
from loguru import logger
BINANCE_FUTURES_BASE = "https://fapi.binance.com"
BINANCE_SPOT_BASE = "https://api.binance.com"
COINGECKO_BASE = "https://api.coingecko.com/api/v3"
DEFILLAMA_BASE = "https://api.llama.fi"
INTERVAL_MAP = {
"1m": "1m", "5m": "5m", "15m": "15m",
"1h": "1h", "4h": "4h", "1d": "1d",
}
COINGECKO_IDS = {
"PAXG": "pax-gold",
"XAUT": "tether-gold",
"OUSG": "ondo-us-government-bond-fund",
"OMMF": "ondo-us-dollar-yield",
}
# yfinance ticker 映射
YFINANCE_TICKERS = {
"RWA:AAPL": "AAPL", "RWA:NVDA": "NVDA", "RWA:TSLA": "TSLA",
"RWA:MSFT": "MSFT", "RWA:GOOGL": "GOOGL", "RWA:AMZN": "AMZN",
"RWA:META": "META", "RWA:SPY": "SPY", "RWA:QQQ": "QQQ",
"COMM:WTI": "CL=F", "COMM:BRENT": "BZ=F",
"COMM:NG": "NG=F", "COMM:COPPER": "HG=F",
"METAL:XAU-SPOT": "GC=F", "METAL:XAG-SPOT": "SI=F",
}
def _ts_ms(dt_str: str) -> int:
"""日期字符串 (YYYY-MM-DD) 转毫秒时间戳。"""
dt = datetime.strptime(dt_str, "%Y-%m-%d").replace(tzinfo=timezone.utc)
return int(dt.timestamp() * 1000)
def _symbol_to_binance(symbol: str) -> str:
"""BTC-USDT-PERP → BTCUSDT, BTC-USDT-SPOT → BTCUSDT"""
parts = symbol.upper().replace("-PERP", "").replace("-SPOT", "").split("-")
return "".join(parts)
class DataClient:
"""多源数据客户端,支持 Binance / CoinGecko / yfinance / DeFi Llama。"""
def __init__(self, proxy: Optional[str] = None):
import os
proxy_url = proxy or os.environ.get("PROXY_URL")
self._client = httpx.Client(
timeout=30.0,
proxy=proxy_url,
)
def _get(self, url: str, params: dict = None) -> dict | list:
"""带 429 限流重试的 GET 请求。"""
resp = self._client.get(url, params=params)
if resp.status_code == 429:
retry = int(resp.headers.get("Retry-After", "5"))
logger.warning(f"429 限流,等待 {retry}s")
time.sleep(retry)
resp = self._client.get(url, params=params)
resp.raise_for_status()
return resp.json()
# ════════════════════════════════════════
# Binance Futures — 永续合约
# ════════════════════════════════════════
def get_perp_klines(
self,
symbol: str,
interval: str = "1d",
start_date: str = None,
end_date: str = None,
limit: int = 1500,
) -> pd.DataFrame:
"""
永续合约 K 线。
Binance 端点: GET /fapi/v1/klines
无需 API Key,限流 2400 次/分钟。
单次最多 1500 条,自动分页拉取完整历史。
参数:
symbol: 合约代码,如 "BTC-USDT-PERP"
interval: K 线周期 (1m/5m/15m/1h/4h/1d)
start_date: 起始日期 "YYYY-MM-DD"
end_date: 结束日期 "YYYY-MM-DD"
limit: 单次请求条数(最大 1500)
返回:
DataFrame [datetime, open, high, low, close, volume,
volume_usd, trades, taker_buy_volume_usd, taker_sell_volume_usd]
"""
bn_symbol = _symbol_to_binance(symbol)
all_rows = []
params = {
"symbol": bn_symbol,
"interval": INTERVAL_MAP.get(interval, interval),
"limit": limit,
}
if start_date:
params["startTime"] = _ts_ms(start_date)
if end_date:
params["endTime"] = _ts_ms(end_date)
while True:
data = self._get(f"{BINANCE_FUTURES_BASE}/fapi/v1/klines", params)
if not data:
break
all_rows.extend(data)
if len(data) < limit:
break
params["startTime"] = data[-1][0] + 1
if end_date and params["startTime"] > _ts_ms(end_date):
break
time.sleep(0.1)
if not all_rows:
return pd.DataFrame()
df = pd.DataFrame(all_rows, columns=[
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_volume", "trades", "taker_buy_volume",
"taker_buy_quote_volume", "ignore",
])
df["datetime"] = pd.to_datetime(df["open_time"], unit="ms", utc=True)
for col in ["open", "high", "low", "close", "volume", "quote_volume",
"taker_buy_volume", "taker_buy_quote_volume"]:
df[col] = df[col].astype(float)
df = df.rename(columns={
"quote_volume": "volume_usd",
"taker_buy_quote_volume": "taker_buy_volume_usd",
})
df["taker_sell_volume_usd"] = df["volume_usd"] - df["taker_buy_volume_usd"]
return df[["datetime", "open", "high", "low", "close", "volume",
"volume_usd", "trades", "taker_buy_volume_usd",
"taker_sell_volume_usd"]].reset_index(drop=True)
def get_funding_rate(
self,
symbol: str,
start_date: str = None,
end_date: str = None,
limit: int = 1000,
) -> pd.DataFrame:
"""
资金费率历史。
Binance 端点: GET /fapi/v1/fundingRate
每 8 小时一条,自动分页拉取。
返回:
DataFrame [datetime, funding_rate, mark_price]
"""
bn_symbol = _symbol_to_binance(symbol)
all_rows = []
params = {"symbol": bn_symbol, "limit": limit}
if start_date:
params["startTime"] = _ts_ms(start_date)
if end_date:
params["endTime"] = _ts_ms(end_date)
while True:
data = self._get(f"{BINANCE_FUTURES_BASE}/fapi/v1/fundingRate", params)
if not data:
break
all_rows.extend(data)
if len(data) < limit:
break
params["startTime"] = data[-1]["fundingTime"] + 1
time.sleep(0.1)
if not all_rows:
return pd.DataFrame()
df = pd.DataFrame(all_rows)
df["datetime"] = pd.to_datetime(df["fundingTime"], unit="ms", utc=True)
df["funding_rate"] = df["fundingRate"].astype(float)
df["mark_price"] = df["markPrice"].astype(float)
return df[["datetime", "funding_rate", "mark_price"]].reset_index(drop=True)
def get_open_interest(self, symbol: str) -> dict:
"""
当前持仓量快照。
Binance 端点: GET /fapi/v1/openInterest
仅返回当前快照,不含历史。
"""
bn_symbol = _symbol_to_binance(symbol)
data = self._get(
f"{BINANCE_FUTURES_BASE}/fapi/v1/openInterest",
{"symbol": bn_symbol},
)
return {
"symbol": symbol,
"open_interest": float(data["openInterest"]),
"timestamp": data["time"],
}
def get_open_interest_hist(
self,
symbol: str,
period: str = "1d",
limit: int = 30,
) -> pd.DataFrame:
"""
持仓量历史统计。
Binance 端点: GET /futures/data/openInterestHist
⚠️ 限制: 仅最近 30 天数据。
"""
bn_symbol = _symbol_to_binance(symbol)
data = self._get(f"{BINANCE_FUTURES_BASE}/futures/data/openInterestHist", {
"pair": bn_symbol,
"contractType": "PERPETUAL",
"period": period,
"limit": limit,
})
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
df["open_interest"] = df["sumOpenInterest"].astype(float)
df["open_interest_usd"] = df["sumOpenInterestValue"].astype(float)
return df[["datetime", "open_interest", "open_interest_usd"]].reset_index(drop=True)
def get_long_short_ratio(
self,
symbol: str,
period: str = "1d",
limit: int = 30,
) -> pd.DataFrame:
"""
Top Trader 多空持仓比。
Binance 端点: GET /futures/data/topLongShortPositionRatio
⚠️ 限制: 仅最近 30 天。
"""
bn_symbol = _symbol_to_binance(symbol)
data = self._get(
f"{BINANCE_FUTURES_BASE}/futures/data/topLongShortPositionRatio",
{"symbol": bn_symbol, "period": period, "limit": limit},
)
if not data:
return pd.DataFrame()
df = pd.DataFrame(data)
df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
df["long_short_ratio"] = df["longShortRatio"].astype(float)
df["long_account"] = df["longAccount"].astype(float)
df["short_account"] = df["shortAccount"].astype(float)
return df[["datetime", "long_short_ratio", "long_account",
"short_account"]].reset_index(drop=True)
def get_mark_price(self, symbol: str) -> dict:
"""
当前标记价格和资金费率。
Binance 端点: GET /fapi/v1/premiumIndex
"""
bn_symbol = _symbol_to_binance(symbol)
data = self._get(
f"{BINANCE_FUTURES_BASE}/fapi/v1/premiumIndex",
{"symbol": bn_symbol},
)
return {
"symbol": symbol,
"mark_price": float(data["markPrice"]),
"index_price": float(data["indexPrice"]),
"funding_rate": float(data["lastFundingRate"]),
"next_funding_time": data["nextFundingTime"],
}
def get_exchange_info(self, symbol: str = None) -> dict | list:
"""
合约信息(面值、杠杆上限、最小下单量等)。
Binance 端点: GET /fapi/v1/exchangeInfo
"""
data = self._get(f"{BINANCE_FUTURES_BASE}/fapi/v1/exchangeInfo")
symbols = data.get("symbols", [])
if symbol:
bn_symbol = _symbol_to_binance(symbol)
for s in symbols:
if s["symbol"] == bn_symbol:
return self._parse_contract_info(s, symbol)
raise ValueError(f"合约 {symbol} 未找到")
return [
self._parse_contract_info(s, f"{s.get('baseAsset', '')}-{s.get('quoteAsset', '')}-PERP")
for s in symbols
if s.get("contractType") == "PERPETUAL"
]
@staticmethod
def _parse_contract_info(raw: dict, symbol: str) -> dict:
filters = {f["filterType"]: f for f in raw.get("filters", [])}
price_filter = filters.get("PRICE_FILTER", {})
lot_filter = filters.get("LOT_SIZE", {})
return {
"symbol": symbol,
"base_asset": raw.get("baseAsset", ""),
"quote_asset": raw.get("quoteAsset", ""),
"contract_type": raw.get("contractType", ""),
"tick_size": float(price_filter.get("tickSize", 0)),
"min_qty": float(lot_filter.get("minQty", 0)),
"max_qty": float(lot_filter.get("maxQty", 0)),
"step_size": float(lot_filter.get("stepSize", 0)),
"maintenance_margin_rate": float(raw.get("maintMarginPercent", 2.5)) / 100,
"required_margin_rate": float(raw.get("requiredMarginPercent", 5)) / 100,
}
def list_perp_symbols(self) -> list[str]:
"""列出 Binance 所有永续合约代码。"""
data = self._get(f"{BINANCE_FUTURES_BASE}/fapi/v1/exchangeInfo")
return [
f"{s['baseAsset']}-{s['quoteAsset']}-PERP"
for s in data.get("symbols", [])
if s.get("contractType") == "PERPETUAL" and s.get("status") == "TRADING"
]
# ════════════════════════════════════════
# Binance Spot — 现货
# ════════════════════════════════════════
def get_spot_klines(
self,
symbol: str,
interval: str = "1d",
start_date: str = None,
end_date: str = None,
limit: int = 1000,
) -> pd.DataFrame:
"""
现货 K 线。
Binance 端点: GET /api/v3/klines
自动分页,无限历史。
返回:
DataFrame [datetime, open, high, low, close, volume, volume_usd]
"""
bn_symbol = _symbol_to_binance(symbol)
all_rows = []
params = {"symbol": bn_symbol, "interval": interval, "limit": limit}
if start_date:
params["startTime"] = _ts_ms(start_date)
if end_date:
params["endTime"] = _ts_ms(end_date)
while True:
data = self._get(f"{BINANCE_SPOT_BASE}/api/v3/klines", params)
if not data:
break
all_rows.extend(data)
if len(data) < limit:
break
params["startTime"] = data[-1][0] + 1
time.sleep(0.1)
if not all_rows:
return pd.DataFrame()
df = pd.DataFrame(all_rows, columns=[
"open_time", "open", "high", "low", "close", "volume",
"close_time", "quote_volume", "trades", "taker_buy_volume",
"taker_buy_quote_volume", "ignore",
])
df["datetime"] = pd.to_datetime(df["open_time"], unit="ms", utc=True)
for col in ["open", "high", "low", "close", "volume", "quote_volume"]:
df[col] = df[col].astype(float)
df = df.rename(columns={"quote_volume": "volume_usd"})
return df[["datetime", "open", "high", "low", "close", "volume",
"volume_usd"]].reset_index(drop=True)
# ════════════════════════════════════════
# CoinGecko — 代币价格
# ════════════════════════════════════════
def get_token_history(
self,
token: str,
days: int = 365,
) -> pd.DataFrame:
"""
代币价格历史(日线)。
CoinGecko 端点: GET /api/v3/coins/{id}/market_chart
免费版限流 10-30 次/分钟,日线最多 365 天。
参数:
token: 代币名称 (PAXG/XAUT/OUSG 等)
days: 历史天数
返回:
DataFrame [datetime, close, volume_usd, market_cap]
"""
cg_id = COINGECKO_IDS.get(token.upper(), token.lower())
data = self._get(f"{COINGECKO_BASE}/coins/{cg_id}/market_chart", {
"vs_currency": "usd",
"days": days,
"interval": "daily",
})
prices = data.get("prices", [])
volumes = data.get("total_volumes", [])
caps = data.get("market_caps", [])
if not prices:
return pd.DataFrame()
df = pd.DataFrame(prices, columns=["timestamp", "close"])
df["datetime"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
if volumes and len(volumes) == len(prices):
df["volume_usd"] = [v[1] for v in volumes]
if caps and len(caps) == len(prices):
df["market_cap"] = [c[1] for c in caps]
return df.drop(columns=["timestamp"]).reset_index(drop=True)
# ════════════════════════════════════════
# yfinance — 美股 / 大宗商品 / 贵金属
# ════════════════════════════════════════
@staticmethod
def get_stock_klines(
symbol: str,
start_date: str,
end_date: str,
interval: str = "1d",
) -> pd.DataFrame:
"""
美股/ETF K 线。
数据源: yfinance(Yahoo Finance 公开数据)
支持 Symbol: RWA:AAPL / RWA:SPY / RWA:QQQ 等
历史深度: 日线 30+ 年
返回:
DataFrame [datetime, open, high, low, close, volume, volume_usd, dividends?]
"""
import yfinance as yf
ticker = YFINANCE_TICKERS.get(symbol.upper(), symbol.replace("RWA:", ""))
yf_interval = {"1d": "1d", "1h": "1h", "5m": "5m", "1m": "1m"}.get(interval, "1d")
tk = yf.Ticker(ticker)
df = tk.history(start=start_date, end=end_date, interval=yf_interval)
if df.empty:
return pd.DataFrame()
df = df.reset_index()
date_col = "Date" if "Date" in df.columns else "Datetime"
df = df.rename(columns={
date_col: "datetime",
"Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume",
})
df["datetime"] = pd.to_datetime(df["datetime"], utc=True)
df["volume_usd"] = df["close"] * df["volume"]
cols = ["datetime", "open", "high", "low", "close", "volume", "volume_usd"]
if "Dividends" in df.columns:
df["dividends"] = df["Dividends"]
cols.append("dividends")
return df[cols].reset_index(drop=True)
@staticmethod
def get_commodity_klines(
symbol: str,
start_date: str,
end_date: str,
interval: str = "1d",
) -> pd.DataFrame:
"""
大宗商品期货 K 线。
数据源: yfinance
支持: COMM:WTI / COMM:BRENT / COMM:NG / COMM:COPPER
历史深度: 10+ 年
返回:
DataFrame [datetime, open, high, low, close, volume, volume_usd]
"""
import yfinance as yf
ticker = YFINANCE_TICKERS.get(symbol.upper())
if not ticker:
raise ValueError(
f"未知大宗商品 Symbol: {symbol},"
f"支持: {[k for k in YFINANCE_TICKERS if k.startswith('COMM:')]}"
)
tk = yf.Ticker(ticker)
df = tk.history(start=start_date, end=end_date, interval=interval)
if df.empty:
return pd.DataFrame()
df = df.reset_index()
date_col = "Date" if "Date" in df.columns else "Datetime"
df = df.rename(columns={
date_col: "datetime",
"Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume",
})
df["datetime"] = pd.to_datetime(df["datetime"], utc=True)
df["volume_usd"] = df["close"] * df["volume"]
return df[["datetime", "open", "high", "low", "close",
"volume", "volume_usd"]].reset_index(drop=True)
@staticmethod
def get_metal_spot_klines(
symbol: str,
start_date: str,
end_date: str,
) -> pd.DataFrame:
"""
贵金属现货 K 线(通过期货合约代理)。
数据源: yfinance (GC=F 黄金期货 / SI=F 白银期货)
支持: METAL:XAU-SPOT / METAL:XAG-SPOT
历史深度: 10+ 年
返回:
DataFrame [datetime, open, high, low, close, volume]
"""
import yfinance as yf
ticker = YFINANCE_TICKERS.get(symbol.upper())
if not ticker:
raise ValueError(
f"未知贵金属 Symbol: {symbol},支持: METAL:XAU-SPOT / METAL:XAG-SPOT"
)
tk = yf.Ticker(ticker)
df = tk.history(start=start_date, end=end_date, interval="1d")
if df.empty:
return pd.DataFrame()
df = df.reset_index()
date_col = "Date" if "Date" in df.columns else "Datetime"
df = df.rename(columns={
date_col: "datetime",
"Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume",
})
df["datetime"] = pd.to_datetime(df["datetime"], utc=True)
return df[["datetime", "open", "high", "low", "close",
"volume"]].reset_index(drop=True)
# ════════════════════════════════════════
# DeFi Llama — 协议 TVL / 手续费
# ════════════════════════════════════════
def get_protocol_tvl(self, protocol: str) -> pd.DataFrame:
"""
协议 TVL 历史。
DeFi Llama 端点: GET /protocol/{slug}
免费,无需 Key。
支持: aave, compound-v3, lido, curve-dex, uniswap 等
返回:
DataFrame [datetime, tvl_usd]
"""
data = self._get(f"{DEFILLAMA_BASE}/protocol/{protocol}")
tvl_history = data.get("tvl", [])
if not tvl_history:
return pd.DataFrame()
df = pd.DataFrame(tvl_history)
df["datetime"] = pd.to_datetime(df["date"], unit="s", utc=True)
df["tvl_usd"] = df["totalLiquidityUSD"].astype(float)
return df[["datetime", "tvl_usd"]].reset_index(drop=True)
def get_protocol_info(self, protocol: str) -> dict:
"""
协议当前信息(TVL、类别、链等)。
DeFi Llama 端点: GET /protocol/{slug}
"""
data = self._get(f"{DEFILLAMA_BASE}/protocol/{protocol}")
return {
"name": data.get("name", ""),
"category": data.get("category", ""),
"chains": data.get("chains", []),
"current_tvl": data.get("currentChainTvls", {}),
"total_tvl": float(data.get("tvl", [{}])[-1].get("totalLiquidityUSD", 0))
if data.get("tvl") else 0,
}
def get_defi_fees(self, protocol: str = None) -> pd.DataFrame:
"""
协议手续费/收入数据。
DeFi Llama 端点: GET /overview/fees
免费,返回所有协议的 24h 手续费和收入。
返回:
DataFrame [name, category, fees_24h, fees_7d, fees_30d, revenue_24h]
"""
data = self._get(f"{DEFILLAMA_BASE}/overview/fees")
protocols = data.get("protocols", [])
if protocol:
protocols = [
p for p in protocols
if p.get("name", "").lower() == protocol.lower()
or p.get("slug", "") == protocol.lower()
]
if not protocols:
return pd.DataFrame()
rows = []
for p in protocols:
rows.append({
"name": p.get("name", ""),
"category": p.get("category", ""),
"fees_24h": float(p.get("total24h", 0) or 0),
"fees_7d": float(p.get("total7d", 0) or 0),
"fees_30d": float(p.get("total30d", 0) or 0),
"revenue_24h": float(p.get("revenue24h", 0) or 0),
})
return pd.DataFrame(rows)
def list_defi_protocols(self) -> pd.DataFrame:
"""
所有 DeFi 协议列表及 TVL。
DeFi Llama 端点: GET /protocols
返回:
DataFrame [name, slug, category, chains, tvl] (前 200 个)
"""
data = self._get(f"{DEFILLAMA_BASE}/protocols")
rows = []
for p in data[:200]:
rows.append({
"name": p.get("name", ""),
"slug": p.get("slug", ""),
"category": p.get("category", ""),
"chains": ", ".join(p.get("chains", [])),
"tvl": float(p.get("tvl", 0) or 0),
})
return pd.DataFrame(rows)
# ════════════════════════════════════════
# 生命周期
# ════════════════════════════════════════
def close(self):
self._client.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
FILE:scripts/indicators.py
"""
技术指标库 — 纯 NumPy 向量化计算
支持指标:
SMA, EMA, RSI, MACD, 布林带, ATR, KDJ, 成交量均线
所有方法接受 numpy 数组,返回 numpy 数组。
无 pandas 依赖,可直接嵌入回测循环。
"""
from __future__ import annotations
import numpy as np
class Indicators:
"""向量化技术指标计算器。"""
# ════════════════════════════════════════
# 均线类
# ════════════════════════════════════════
@staticmethod
def sma(data: np.ndarray, period: int) -> np.ndarray:
"""
简单移动平均线 (Simple Moving Average)。
参数:
data: 价格序列
period: 均线周期
返回:
SMA 数组,前 period-1 个值为 NaN
"""
if len(data) < period:
return np.full_like(data, np.nan, dtype=float)
result = np.full(len(data), np.nan, dtype=float)
cumsum = np.cumsum(data, dtype=float)
result[period - 1:] = (cumsum[period - 1:] - np.concatenate(([0], cumsum[:-period]))) / period
return result
@staticmethod
def ema(data: np.ndarray, period: int) -> np.ndarray:
"""
指数移动平均线 (Exponential Moving Average)。
使用递推公式: EMA_t = α × price_t + (1 - α) × EMA_{t-1}
其中 α = 2 / (period + 1)
参数:
data: 价格序列
period: 均线周期
返回:
EMA 数组,前 period-1 个值为 NaN
"""
if len(data) < period:
return np.full_like(data, np.nan, dtype=float)
alpha = 2.0 / (period + 1)
result = np.full(len(data), np.nan, dtype=float)
result[period - 1] = np.mean(data[:period])
for i in range(period, len(data)):
result[i] = alpha * data[i] + (1 - alpha) * result[i - 1]
return result
@staticmethod
def volume_ma(volume: np.ndarray, period: int) -> np.ndarray:
"""
成交量移动平均线。
参数:
volume: 成交量序列
period: 均线周期
返回:
成交量 SMA 数组
"""
return Indicators.sma(volume, period)
# ════════════════════════════════════════
# 动量/震荡类
# ════════════════════════════════════════
@staticmethod
def rsi(data: np.ndarray, period: int = 14) -> np.ndarray:
"""
相对强弱指标 (Relative Strength Index)。
使用 Wilder 平滑法:
RS = 平均涨幅 / 平均跌幅
RSI = 100 - 100 / (1 + RS)
参数:
data: 价格序列
period: RSI 周期(默认 14)
返回:
RSI 数组 (0-100),前 period 个值为 NaN
"""
if len(data) < period + 1:
return np.full_like(data, np.nan, dtype=float)
deltas = np.diff(data)
gains = np.where(deltas > 0, deltas, 0.0)
losses = np.where(deltas < 0, -deltas, 0.0)
result = np.full(len(data), np.nan, dtype=float)
avg_gain = np.mean(gains[:period])
avg_loss = np.mean(losses[:period])
if avg_loss == 0:
result[period] = 100.0
else:
rs = avg_gain / avg_loss
result[period] = 100.0 - 100.0 / (1.0 + rs)
for i in range(period, len(deltas)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
if avg_loss == 0:
result[i + 1] = 100.0
else:
rs = avg_gain / avg_loss
result[i + 1] = 100.0 - 100.0 / (1.0 + rs)
return result
@staticmethod
def macd(
data: np.ndarray,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
MACD 指标 (Moving Average Convergence Divergence)。
参数:
data: 价格序列
fast_period: 快线 EMA 周期(默认 12)
slow_period: 慢线 EMA 周期(默认 26)
signal_period: 信号线 EMA 周期(默认 9)
返回:
(macd_line, signal_line, histogram)
- macd_line: DIF = EMA(fast) - EMA(slow)
- signal_line: DEA = EMA(DIF, signal_period)
- histogram: MACD 柱状图 = (DIF - DEA) × 2
"""
ema_fast = Indicators.ema(data, fast_period)
ema_slow = Indicators.ema(data, slow_period)
macd_line = ema_fast - ema_slow
valid_start = slow_period - 1
signal_line = np.full(len(data), np.nan, dtype=float)
valid_macd = macd_line[valid_start:]
if len(valid_macd) >= signal_period:
signal_ema = Indicators.ema(valid_macd, signal_period)
signal_line[valid_start:] = signal_ema
histogram = (macd_line - signal_line) * 2
return macd_line, signal_line, histogram
@staticmethod
def kdj(
high: np.ndarray,
low: np.ndarray,
close: np.ndarray,
k_period: int = 9,
d_period: int = 3,
j_smooth: int = 3,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
KDJ 随机指标。
计算过程:
RSV = (Close - Low_N) / (High_N - Low_N) × 100
K = SMA(RSV, d_period) (递推平滑)
D = SMA(K, j_smooth)
J = 3K - 2D
参数:
high: 最高价序列
low: 最低价序列
close: 收盘价序列
k_period: RSV 窗口(默认 9)
d_period: K 值平滑周期(默认 3)
j_smooth: D 值平滑周期(默认 3)
返回:
(K, D, J) 三元组
"""
n = len(close)
rsv = np.full(n, np.nan, dtype=float)
for i in range(k_period - 1, n):
period_high = np.max(high[i - k_period + 1: i + 1])
period_low = np.min(low[i - k_period + 1: i + 1])
if period_high == period_low:
rsv[i] = 50.0
else:
rsv[i] = (close[i] - period_low) / (period_high - period_low) * 100.0
k_values = np.full(n, np.nan, dtype=float)
d_values = np.full(n, np.nan, dtype=float)
first_valid = k_period - 1
k_values[first_valid] = rsv[first_valid]
d_values[first_valid] = k_values[first_valid]
for i in range(first_valid + 1, n):
if np.isnan(rsv[i]):
continue
k_values[i] = (k_values[i - 1] * (d_period - 1) + rsv[i]) / d_period
d_values[i] = (d_values[i - 1] * (j_smooth - 1) + k_values[i]) / j_smooth
j_values = 3.0 * k_values - 2.0 * d_values
return k_values, d_values, j_values
# ════════════════════════════════════════
# 波动率/通道类
# ════════════════════════════════════════
@staticmethod
def bollinger_bands(
data: np.ndarray,
period: int = 20,
num_std: float = 2.0,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
布林带 (Bollinger Bands)。
参数:
data: 价格序列
period: 均线周期(默认 20)
num_std: 标准差倍数(默认 2.0)
返回:
(upper, middle, lower)
- middle: SMA(period)
- upper: middle + num_std × σ
- lower: middle - num_std × σ
"""
middle = Indicators.sma(data, period)
std = np.full(len(data), np.nan, dtype=float)
for i in range(period - 1, len(data)):
std[i] = np.std(data[i - period + 1: i + 1], ddof=0)
upper = middle + num_std * std
lower = middle - num_std * std
return upper, middle, lower
@staticmethod
def atr(
high: np.ndarray,
low: np.ndarray,
close: np.ndarray,
period: int = 14,
) -> np.ndarray:
"""
平均真实波幅 (Average True Range)。
TR = max(High-Low, |High-PrevClose|, |Low-PrevClose|)
ATR = SMA(TR, period) 或 Wilder 平滑
参数:
high: 最高价序列
low: 最低价序列
close: 收盘价序列
period: ATR 周期(默认 14)
返回:
ATR 数组
"""
n = len(close)
tr = np.full(n, np.nan, dtype=float)
tr[0] = high[0] - low[0]
for i in range(1, n):
hl = high[i] - low[i]
hc = abs(high[i] - close[i - 1])
lc = abs(low[i] - close[i - 1])
tr[i] = max(hl, hc, lc)
atr_values = np.full(n, np.nan, dtype=float)
if n >= period:
atr_values[period - 1] = np.mean(tr[:period])
for i in range(period, n):
atr_values[i] = (atr_values[i - 1] * (period - 1) + tr[i]) / period
return atr_values
# ════════════════════════════════════════
# 辅助方法
# ════════════════════════════════════════
@staticmethod
def crossover(series_a: np.ndarray, series_b: np.ndarray) -> np.ndarray:
"""
金叉判断: series_a 从下方穿越 series_b。
返回布尔数组,True 表示该 bar 发生金叉。
"""
result = np.zeros(len(series_a), dtype=bool)
for i in range(1, len(series_a)):
if (np.isnan(series_a[i]) or np.isnan(series_b[i]) or
np.isnan(series_a[i - 1]) or np.isnan(series_b[i - 1])):
continue
result[i] = (series_a[i - 1] <= series_b[i - 1]) and (series_a[i] > series_b[i])
return result
@staticmethod
def crossunder(series_a: np.ndarray, series_b: np.ndarray) -> np.ndarray:
"""
死叉判断: series_a 从上方穿越 series_b。
返回布尔数组,True 表示该 bar 发生死叉。
"""
result = np.zeros(len(series_a), dtype=bool)
for i in range(1, len(series_a)):
if (np.isnan(series_a[i]) or np.isnan(series_b[i]) or
np.isnan(series_a[i - 1]) or np.isnan(series_b[i - 1])):
continue
result[i] = (series_a[i - 1] >= series_b[i - 1]) and (series_a[i] < series_b[i])
return result
@staticmethod
def highest(data: np.ndarray, period: int) -> np.ndarray:
"""滚动最高值。"""
result = np.full(len(data), np.nan, dtype=float)
for i in range(period - 1, len(data)):
result[i] = np.max(data[i - period + 1: i + 1])
return result
@staticmethod
def lowest(data: np.ndarray, period: int) -> np.ndarray:
"""滚动最低值。"""
result = np.full(len(data), np.nan, dtype=float)
for i in range(period - 1, len(data)):
result[i] = np.min(data[i - period + 1: i + 1])
return result
@staticmethod
def pct_change(data: np.ndarray, period: int = 1) -> np.ndarray:
"""百分比变化率。"""
result = np.full(len(data), np.nan, dtype=float)
for i in range(period, len(data)):
if data[i - period] != 0:
result[i] = (data[i] - data[i - period]) / data[i - period]
return result
FILE:scripts/machine_auth.py
"""
设备认证 & Token 管理
认证方式:
从龙虾 workspace 路径提取实例 UUID 作为稳定设备标识。
路径格式: /data/.openclaw/workspace/<uuid>/skills/...
同一个龙虾 → 同一个 UUID → 同一个设备 ID → 受 3 策略配额限制。
重装 skill 不会改变设备 ID。
不采集: 主机名、操作系统、MAC 地址、IP 等任何硬件/网络信息。
Token 缓存: skill 目录下 .auth.json
"""
from __future__ import annotations
import hashlib
import json
import re
import uuid as uuid_lib
from pathlib import Path
import httpx
from loguru import logger
from server_config import resolve_server_url
API_PREFIX = "/api/v1"
_AUTH_FILE = Path(__file__).parent.parent / ".auth.json"
def _extract_workspace_id() -> str:
"""
从 workspace 路径提取龙虾实例的唯一 UUID。
搜索路径链:
1. 当前 skill 所在路径中的 UUID 段
2. /data/.openclaw/workspace/ 下的真实目录名
3. 兜底: 生成随机 UUID 并持久化到 .auth.json
"""
# 策略1: 从自身路径提取 UUID
# 例: /data/.openclaw/workspace/abc123def/skills/dex-quant-skill/scripts/machine_auth.py
skill_path = str(Path(__file__).resolve())
match = re.search(r'/workspace/([0-9a-f-]{8,})', skill_path)
if match:
return match.group(1)
# 策略2: 扫描 /data/.openclaw/workspace/ 下的真实目录
ws_root = Path("/data/.openclaw/workspace")
if ws_root.is_dir():
for d in ws_root.iterdir():
if d.is_dir() and "return d.name
# 策略3: 兜底 — 读缓存或生成新的(极端情况才走到这)
if _AUTH_FILE.exists():
try:
cached = json.loads(_AUTH_FILE.read_text())
if cached.get("device_id"):
return cached["device_id"]
except (json.JSONDecodeError, OSError):
pass
return uuid_lib.uuid4().hex
def _get_stable_device_id() -> str:
"""生成稳定的设备 ID: workspace UUID 的 SHA256 前 32 位。"""
ws_id = _extract_workspace_id()
return hashlib.sha256(ws_id.encode()).hexdigest()[:32]
class MachineAuth:
"""设备认证客户端"""
def __init__(self, server_url: str | None = None):
self.server_url = resolve_server_url(server_url).rstrip("/")
self.base_url = self.server_url + API_PREFIX
self._client = httpx.Client(timeout=30.0)
self._config: dict = {
self._load_config()
def _load_config(self) -> None:
if _AUTH_FILE.exists():
try:
self._config = json.loads(_AUTH_FILE.read_text())
except (json.JSONDecodeError, OSError):
self._config = {}
def _save_config(self) -> None:
_AUTH_FILE.parent.mkdir(parents=True, exist_ok=True)
_AUTH_FILE.write_text(json.dumps(self._config, indent=2, ensure_ascii=False))
@property
def machine_code(self) -> str:
return _get_stable_device_id()
@property
def token(self) -> str:
return self._config.get("token", "")
def register_or_load(self) -> str:
"""获取 Token(自动注册或读本地缓存)。"""
if self.token:
return self.token
device_id = self.machine_code
logger.info(f"首次使用,注册设备: {device_id[:8]}...")
resp = self._client.post(
f"{self.base_url}/auth/register",
json={"machine_code": device_id},
)
resp.raise_for_status()
data = resp.json()
self._config["device_id"] = device_id
self._config["token"] = data["token"]
self._config["max_strategies"] = data["max_strategies"]
self._save_config()
logger.info(f"注册成功 | 配额 {data['used_strategies']}/{data['max_strategies']}")
return data["token"]
def check_quota(self) -> dict:
"""查询当前配额使用情况。"""
token = self.register_or_load()
resp = self._client.get(
f"{self.base_url}/auth/quota",
headers={"X-Token": token},
)
resp.raise_for_status()
return resp.json()
def close(self):
self._client.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
FILE:scripts/risk_checker.py
"""
Risk Checker — 交易前风控检查
规则:
1. 单笔仓位不超过总权益的 max_position_pct
2. 同时最多 max_concurrent 个仓位
3. 连续亏损 max_consecutive_losses 次后暂停
4. 两次交易之间至少间隔 cooldown_minutes 分钟
5. 信号置信度必须 >= min_confidence
"""
from __future__ import annotations
import json
import time
from pathlib import Path
from typing import Any
from loguru import logger
class RiskChecker:
"""交易前风控检查器。"""
DEFAULT_RULES = {
"max_position_pct": 10.0,
"max_concurrent": 3,
"max_consecutive_losses": 3,
"cooldown_minutes": 30,
"min_confidence": 0.6,
"max_leverage": 10,
}
def __init__(self, rules: dict | None = None, state_file: str | None = None):
self.rules = {**self.DEFAULT_RULES, **(rules or {})}
self._state_file = Path(state_file or Path.home() / ".dex-quant" / "risk_state.json")
self._state = self._load_state()
def _load_state(self) -> dict:
if self._state_file.exists():
try:
return json.loads(self._state_file.read_text())
except (json.JSONDecodeError, OSError):
pass
return {"consecutive_losses": 0, "last_trade_ts": 0, "trade_log": []}
def _save_state(self):
self._state_file.parent.mkdir(parents=True, exist_ok=True)
self._state_file.write_text(json.dumps(self._state, indent=2))
def check(self, signal: dict, equity: float, open_positions: int) -> tuple[bool, str]:
"""
检查信号是否通过风控。
返回 (passed, reason)。
"""
confidence = signal.get("confidence", 0)
if confidence < self.rules["min_confidence"]:
return False, f"confidence {confidence:.2f} < {self.rules['min_confidence']}"
if open_positions >= self.rules["max_concurrent"]:
return False, f"already {open_positions} positions (max {self.rules['max_concurrent']})"
if self._state["consecutive_losses"] >= self.rules["max_consecutive_losses"]:
return False, f"consecutive losses {self._state['consecutive_losses']} >= {self.rules['max_consecutive_losses']}"
elapsed = time.time() - self._state.get("last_trade_ts", 0)
cooldown_sec = self.rules["cooldown_minutes"] * 60
if elapsed < cooldown_sec:
remaining = int(cooldown_sec - elapsed)
return False, f"cooldown: {remaining}s remaining"
return True, "passed"
def calculate_position_size(self, equity: float, price: float) -> float:
"""根据权益和风控规则计算仓位大小(币数量)。"""
max_usd = equity * (self.rules["max_position_pct"] / 100.0)
if price <= 0:
return 0.0
return max_usd / price
def record_trade(self, signal: dict, result: dict):
"""记录交易结果,更新连续亏损计数。"""
is_loss = result.get("pnl", 0) < 0 or result.get("error")
if is_loss:
self._state["consecutive_losses"] += 1
else:
self._state["consecutive_losses"] = 0
self._state["last_trade_ts"] = time.time()
self._state["trade_log"].append({
"ts": time.time(),
"signal": {
"symbol": signal.get("symbol"),
"action": signal.get("action"),
"direction": signal.get("direction"),
"price": signal.get("price_at_signal"),
},
"is_loss": is_loss,
})
if len(self._state["trade_log"]) > 100:
self._state["trade_log"] = self._state["trade_log"][-100:]
self._save_state()
def reset_losses(self):
"""重置连续亏损计数。"""
self._state["consecutive_losses"] = 0
self._save_state()
logger.info("risk: consecutive losses reset to 0")
def summary(self) -> dict:
return {
"rules": self.rules,
"consecutive_losses": self._state["consecutive_losses"],
"last_trade_ts": self._state.get("last_trade_ts", 0),
"total_trades": len(self._state.get("trade_log", [])),
}
FILE:scripts/server_config.py
"""
Server URL 配置
优先级:
1. 显式传入的 server_url / --server 参数
2. 环境变量 DEX_QUANT_SERVER_URL
3. 默认生产地址
"""
from __future__ import annotations
import os
DEFAULT_SERVER_URL = "https://quant.supersafeclaw.com"
def resolve_server_url(server_url: str | None = None) -> str:
"""解析回测服务器地址。"""
value = (server_url or "").strip() or os.getenv("DEX_QUANT_SERVER_URL", "").strip()
return value or DEFAULT_SERVER_URL
FILE:scripts/signal_runtime.py
"""
Signal Runtime — 策略定时执行调度器
功能:
1. 按指定间隔运行策略脚本的 generate_signals(mode='live')
2. 新信号经过风控检查
3. 通过风控后调用 TradeExecutor 自动下单
4. 所有操作写入日志 + 状态文件
5. 支持 Ctrl+C 优雅停止
用法:
python signal_runtime.py --strategy strategies/sol_kdj_swing.py --interval 14400
interval 单位为秒(4h = 14400)
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import signal as _signal
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any
from loguru import logger
from risk_checker import RiskChecker
from trade_executor import TradeExecutor
logger.remove()
logger.add(
sys.stderr,
format="<green>{time:HH:mm:ss}</green> | <level>{level:<7}</level> | {message}",
level="INFO",
)
LOG_DIR = Path.home() / ".dex-quant" / "logs"
LOG_DIR.mkdir(parents=True, exist_ok=True)
logger.add(
str(LOG_DIR / "runtime_{time:YYYY-MM-DD}.log"),
rotation="1 day", retention="7 days", level="DEBUG",
)
_running = True
def _stop(signum, frame):
global _running
logger.info("received stop signal, shutting down...")
_running = False
_signal.signal(_signal.SIGINT, _stop)
_signal.signal(_signal.SIGTERM, _stop)
def load_strategy(path: str):
"""动态加载策略模块。"""
spec = importlib.util.spec_from_file_location("strategy", path)
if not spec or not spec.loader:
raise ImportError(f"cannot load strategy: {path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if not hasattr(module, "generate_signals"):
raise AttributeError(f"strategy missing generate_signals(): {path}")
return module
def extract_actionable_signals(signals: list[dict], min_confidence: float = 0.6) -> list[dict]:
"""过滤出可执行的信号(buy/sell,confidence 足够)。"""
actionable = []
for s in signals:
action = s.get("action", "").lower()
if action not in ("buy", "sell"):
continue
if s.get("confidence", 0) < min_confidence:
continue
actionable.append(s)
return actionable
def map_signal_to_trade(signal: dict, position_size: float) -> dict:
"""将策略信号映射为交易指令。"""
symbol = signal.get("symbol", "BTCUSDT")
coin = symbol.replace("USDT", "").replace("PERP", "")
action = signal["action"].lower()
direction = signal.get("direction", "long").lower()
if action == "buy" and direction == "long":
return {"type": "market_buy", "coin": coin, "size": position_size}
elif action == "sell" and direction == "long":
return {"type": "market_sell", "coin": coin, "size": position_size}
elif action == "sell" and direction == "short":
return {"type": "market_sell", "coin": coin, "size": position_size}
elif action == "buy" and direction == "short":
return {"type": "market_buy", "coin": coin, "size": position_size}
return {"type": "unknown"}
def execute_trade(executor: TradeExecutor, trade: dict) -> dict:
"""执行单笔交易。"""
if trade["type"] == "market_buy":
return executor.market_buy(trade["coin"], trade["size"])
elif trade["type"] == "market_sell":
return executor.market_sell(trade["coin"], trade["size"])
else:
return {"error": f"unknown trade type: {trade['type']}"}
def save_state(state_file: Path, state: dict):
state_file.parent.mkdir(parents=True, exist_ok=True)
state_file.write_text(json.dumps(state, indent=2, default=str))
def run_loop(
strategy_path: str,
interval: int = 14400,
claw_dir: str | None = None,
risk_rules: dict | None = None,
dry_run: bool = False,
):
"""主循环:定时执行策略 → 风控 → 下单。"""
strategy = load_strategy(strategy_path)
strategy_name = getattr(strategy, "PARAMS", {}).get("strategy_name", Path(strategy_path).stem)
executor = None
if not dry_run:
try:
executor = TradeExecutor(claw_dir)
except FileNotFoundError as e:
logger.error("TradeExecutor init failed: {}", e)
logger.info("switching to dry_run mode")
dry_run = True
risk = RiskChecker(rules=risk_rules)
state_file = Path.home() / ".dex-quant" / "runtime_state.json"
logger.info("=" * 60)
logger.info("Signal Runtime Started")
logger.info(" Strategy: {}", strategy_path)
logger.info(" Interval: {}s ({}h)", interval, interval / 3600)
logger.info(" Dry run: {}", dry_run)
logger.info(" Risk rules: {}", risk.rules)
logger.info("=" * 60)
cycle = 0
while _running:
cycle += 1
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info("── Cycle {} | {} ──", cycle, now)
try:
result = strategy.generate_signals(mode="live")
signals = result.get("signals", [])
logger.info("strategy returned {} signals", len(signals))
except Exception as e:
logger.error("strategy execution failed: {}", e)
_wait(interval)
continue
actionable = extract_actionable_signals(signals, risk.rules.get("min_confidence", 0.6))
if not actionable:
logger.info("no actionable signals, sleeping...")
_wait(interval)
continue
logger.info("{} actionable signals found", len(actionable))
equity = 0.0
open_positions = 0
if executor and not dry_run:
try:
equity = executor.get_equity()
pos_data = executor.get_positions()
open_positions = len(pos_data.get("positions", []))
except Exception as e:
logger.error("failed to get account state: {}", e)
_wait(interval)
continue
for sig in actionable:
logger.info("signal: {} {} {} @ {:.2f} | confidence={:.2f} | {}",
sig["action"], sig.get("direction", ""), sig.get("symbol", ""),
sig.get("price_at_signal", 0), sig.get("confidence", 0),
sig.get("reason", ""))
passed, reason = risk.check(sig, equity, open_positions)
if not passed:
logger.warning("BLOCKED by risk: {}", reason)
continue
price = sig.get("price_at_signal", 0)
position_size = risk.calculate_position_size(equity, price) if equity > 0 else 0
trade = map_signal_to_trade(sig, position_size)
logger.info("trade: {} {} {:.6f}", trade["type"], trade.get("coin", ""), trade.get("size", 0))
if dry_run:
logger.info("[DRY RUN] would execute: {}", trade)
risk.record_trade(sig, {"status": "dry_run"})
else:
try:
result = execute_trade(executor, trade)
if "error" in result:
logger.error("trade failed: {}", result["error"])
risk.record_trade(sig, result)
else:
logger.info("trade executed: {}", result)
risk.record_trade(sig, result)
open_positions += 1
except Exception as e:
logger.error("trade execution error: {}", e)
risk.record_trade(sig, {"error": str(e)})
state = {
"cycle": cycle,
"last_run": now,
"strategy": strategy_path,
"signals_count": len(signals),
"actionable_count": len(actionable),
"risk_summary": risk.summary(),
}
save_state(state_file, state)
_wait(interval)
logger.info("Signal Runtime stopped.")
def _wait(seconds: int):
"""可中断的等待。"""
end = time.time() + seconds
while _running and time.time() < end:
time.sleep(min(5, end - time.time()))
def main():
parser = argparse.ArgumentParser(description="Signal Runtime — 策略定时执行")
parser.add_argument("--strategy", required=True, help="策略脚本路径")
parser.add_argument("--interval", type=int, default=14400, help="执行间隔(秒),默认 4h=14400")
parser.add_argument("--claw-dir", default=None, help="HyperLiquid-Claw 安装目录")
parser.add_argument("--dry-run", action="store_true", help="模拟模式,不实际下单")
parser.add_argument("--max-position-pct", type=float, default=10.0, help="单笔最大仓位占比")
parser.add_argument("--max-concurrent", type=int, default=3, help="最大同时持仓数")
parser.add_argument("--cooldown", type=int, default=30, help="交易冷却时间(分钟)")
args = parser.parse_args()
risk_rules = {
"max_position_pct": args.max_position_pct,
"max_concurrent": args.max_concurrent,
"cooldown_minutes": args.cooldown,
}
run_loop(
strategy_path=args.strategy,
interval=args.interval,
claw_dir=args.claw_dir,
risk_rules=risk_rules,
dry_run=args.dry_run,
)
if __name__ == "__main__":
main()
FILE:scripts/strategy_runner.py
"""
策略运行器 — 生成脚本的统一交互入口
两种运行模式:
[1] 本地回测 — 本地拉数据+生成信号,发信号到服务器回测(数据/计算在本地)
[2] 服务器回测 — 上传脚本到服务器,服务器执行一切(数据/计算在服务器,占配额)
用法 (在策略脚本的 __main__ 中):
from strategy_runner import run
run(generate_signals, STRATEGY_NAME, SYMBOL, TIMEFRAME, script_path=__file__)
"""
from __future__ import annotations
import os
import sys
from loguru import logger
from server_config import resolve_server_url
logger.remove()
logger.add(sys.stderr, format="{time:HH:mm:ss} | {message}", level="INFO")
def _print_welcome():
"""首次运行时展示平台功能介绍"""
print()
print(f"{'━' * 50}")
print(" 🚀 DEX Quant — 加密货币量化策略平台")
print(f"{'━' * 50}")
print()
print(" 📝 自定义策略:")
print(" 用自然语言描述交易想法,自动生成可运行的策略脚本")
print(" 支持做多/做空/双向,可设置止盈止损和杠杆")
print()
print(" 📐 技术指标 (8 种):")
print(" 均线: SMA, EMA 震荡: RSI, MACD, KDJ")
print(" 通道: 布林带, ATR 辅助: 成交量均线")
print()
print(" 🌍 数据覆盖:")
print(" 永续合约: 585+ 个币种 (Binance Futures)")
print(" 现货: Binance 全部交易对")
print(" K线周期: 1m / 5m / 15m / 1h / 4h / 1d")
print()
print(" ⚡ 运行方式:")
print(" [本地回测] 本地生成信号 → 服务器回测,不限次")
print(" [服务器回测] 上传脚本到服务器执行,3 个策略位")
print()
print(f"{'━' * 50}")
def _print_banner(strategy_name: str, symbol: str, timeframe: str, script_path: str = None):
print()
print(f"{'━' * 50}")
print(f" 📊 当前策略: {strategy_name}")
print(f" 🪙 交易对: {symbol} | ⏱ 周期: {timeframe}")
if script_path:
print(f" 📁 脚本: {os.path.abspath(script_path)}")
print(f"{'━' * 50}")
def _print_signals_summary(signals: list[dict]):
buys = [s for s in signals if s["action"] == "buy"]
sells = [s for s in signals if s["action"] == "sell"]
longs = [s for s in buys if s.get("direction") == "long"]
shorts = [s for s in buys if s.get("direction") == "short"]
print(f"\n 📡 信号统计:")
print(f" 总信号: {len(signals)}")
print(f" 🟢 开仓: {len(buys)} (做多 {len(longs)} / 做空 {len(shorts)})")
print(f" 🔴 平仓: {len(sells)}")
if signals:
print(f" 📅 范围: {signals[0]['timestamp'][:19]}")
print(f" ~ {signals[-1]['timestamp'][:19]}")
def _connect_server(server_url: str):
"""连接服务器,返回 (auth, quota) 或 None"""
from machine_auth import MachineAuth
print(f"\n 🔗 正在连接回测服务器 ({server_url})...")
try:
auth = MachineAuth(server_url)
auth.register_or_load()
quota = auth.check_quota()
print(" ✅ 服务器连接成功")
return auth, quota
except Exception as e:
print(f"\n ❌ 连接服务器失败: {e}")
print(" 请检查:")
print(" · 网络连接是否正常")
print(" · 是否需要设置代理 (PROXY_URL 环境变量)")
return None, None
def _print_auth_and_quota(auth, quota):
"""打印认证信息和配额"""
used = quota["used_strategies"]
max_s = quota["max_strategies"]
remaining = quota["remaining"]
print()
print(f"{'━' * 50}")
print(f" 🔑 认证信息")
print(f"{'━' * 50}")
print(f" 机器码: {quota['machine_code'][:8]}...")
print(f" Token: {auth.token[:16]}...")
print(f" 📦 策略配额: {used}/{max_s} 已用,剩余 {remaining} 个")
if quota.get("strategies"):
print()
print(" 📋 已注册策略:")
for s in quota["strategies"]:
print(f" 📌 {s['name']} ({s['strategy_id']})")
print(f"{'━' * 50}")
return remaining
def _ask_backtest_params():
"""交互式输入回测参数"""
print("\n 回测参数配置 (直接回车使用默认值):")
capital_str = input(" 初始资金 (默认 $100,000): ").strip()
capital = float(capital_str) if capital_str else 100000.0
leverage_str = input(" 杠杆倍数 (默认 3x): ").strip()
leverage = int(leverage_str) if leverage_str else 3
print(f"\n ✅ 参数确认: 初始资金 ,.0f | 杠杆 {leverage}x")
return capital, leverage
def _show_result(client, result):
"""展示回测结果"""
print()
client.print_metrics(result)
client.print_conclusion(result)
client.print_trades(result, limit=10)
def _print_next_steps(result: dict, strategy_name: str):
"""根据回测结论给出具体的下一步操作指引"""
conclusion = result.get("conclusion", "")
print()
print(f"{'━' * 50}")
print(f" 🔧 下一步操作")
print(f"{'━' * 50}")
if conclusion == "approved":
print(f" ✅ 策略 [{strategy_name}] 已通过回测验证!")
print()
print(" 1️⃣ 部署实时监控")
print(" 告诉 AI: \"帮我部署这个策略进行实时监控\"")
print()
print(" 2️⃣ 换时间段再验证稳健性")
print(f" python {sys.argv[0]} backtest 2023-01-01 2023-12-31")
print()
print(" 3️⃣ 调整参数后重新回测")
print(" 告诉 AI: \"把止损改成 2%,放量倍数改成 1.2\"")
elif conclusion == "paper_trade_first":
print(f" ⚠️ 策略 [{strategy_name}] 建议先模拟观察")
print()
print(" 1️⃣ 先跑模拟盘 1-2 周")
print(" 告诉 AI: \"帮我用模拟盘跑一下这个策略\"")
print()
print(" 2️⃣ 参数优化后重新回测")
print(" 告诉 AI: \"帮我优化一下这个策略的参数\"")
print()
print(" 3️⃣ 换时间段再测试")
print(f" python {sys.argv[0]} backtest 2023-06-01 2024-06-01")
elif conclusion == "rejected":
print(f" ❌ 策略 [{strategy_name}] 未通过回测,需要调整")
print()
print(" 1️⃣ 让 AI 分析问题并优化")
print(" 告诉 AI: \"回测没通过,帮我分析原因并优化\"")
print()
print(" 2️⃣ 修改策略逻辑")
print(" 告诉 AI: \"加上 RSI 过滤\" 或 \"止损改紧一点\"")
print()
print(" 3️⃣ 换个策略思路重新来")
print(" 告诉 AI: \"帮我做一个新的 BTC 策略\"")
else:
print(" 📋 回测已完成,你可以:")
print()
print(" 1️⃣ 告诉 AI 你对结果的看法,进行下一步")
print(" 2️⃣ 调整参数后重新回测")
print(f" python {sys.argv[0]} backtest <开始日期> <结束日期>")
print(f"{'━' * 50}")
# ═══════════════════════════════════════════
# 模式 1: 本地回测(本地生成信号 → 传信号到服务器回测)
# ═══════════════════════════════════════════
def _run_local_backtest(
strategy_name, symbol, timeframe,
start_date, end_date, signals, server_url,
):
from api_client import QuantAPIClient
print()
print(f" {'━' * 45}")
print(" ⚡ 第 3 步 / 共 3 步:执行回测")
print(f" {'━' * 45}")
auth, quota = _connect_server(server_url)
if auth is None:
print("\n 下一步: 检查网络后重新运行脚本")
return
_print_auth_and_quota(auth, quota)
capital, leverage = _ask_backtest_params()
print(f"\n 正在将 {len(signals)} 个信号发送到服务器回测引擎...")
print(f" 服务器将: 拉取 K 线数据 → 逐 bar 模拟交易 → 计算绩效")
try:
client = QuantAPIClient(server_url)
result = client.run_backtest(
strategy_name=strategy_name,
symbol=symbol,
timeframe=timeframe,
start_date=start_date,
end_date=end_date,
signals=signals,
initial_capital=capital,
leverage=leverage,
)
_show_result(client, result)
_print_next_steps(result, strategy_name)
client.close()
except Exception as e:
print(f"\n ❌ 回测失败: {e}")
print(f" 下一步: 检查网络连接后重新运行脚本")
auth.close()
# ═══════════════════════════════════════════
# 模式 2: 服务器回测(上传脚本 → 服务器执行一切)
# ═══════════════════════════════════════════
def _run_server_backtest(
strategy_name, symbol, timeframe,
start_date, end_date, script_path, server_url,
):
from api_client import QuantAPIClient
print()
print(f" {'━' * 45}")
print(" ⚡ 第 3 步 / 共 3 步:服务器回测")
print(f" {'━' * 45}")
if not script_path or not os.path.isfile(script_path):
print(f"\n ❌ 找不到策略脚本文件: {script_path}")
print(" 下一步: 确认脚本文件存在后重新运行")
return
auth, quota = _connect_server(server_url)
if auth is None:
print("\n 下一步: 检查网络后重新运行脚本")
return
remaining = _print_auth_and_quota(auth, quota)
with open(script_path, "r", encoding="utf-8") as f:
script_content = f.read()
script_size = len(script_content)
print(f"\n 脚本文件: {os.path.basename(script_path)} ({script_size:,} 字节)")
if remaining <= 0:
print()
print(" " + "!" * 55)
print(" !免费配额已满({}个),无法上传新策略到服务器".format(
quota["max_strategies"]))
print(" " + "!" * 55)
print(f"\n 下一步: 重新运行脚本,选择 [1] 本地回测(不占配额)")
auth.close()
return
capital, leverage = _ask_backtest_params()
print(f"\n 正在上传脚本到服务器...")
print(f" 服务器将: 拉取数据 → 执行脚本生成信号 → 回测引擎模拟 → 出报告")
try:
client = QuantAPIClient(server_url)
save_result = client.save_strategy(
name=strategy_name,
script_content=script_content,
symbol=symbol,
timeframe=timeframe,
)
strategy_id = save_result.get("strategy_id", "")
print(f" ✅ 策略已保存到服务器: {strategy_id}", flush=True)
job_id = client.submit_backtest(
script_content=script_content,
strategy_name=strategy_name,
strategy_id=strategy_id,
symbol=symbol,
timeframe=timeframe,
start_date=start_date,
end_date=end_date,
initial_capital=capital,
leverage=leverage,
)
print(f" 回测任务已提交,正在轮询进度: {job_id}", flush=True)
result = client.wait_backtest(
job_id,
poll_interval=5.0,
max_running_logs=1,
)
_show_result(client, result)
_print_next_steps(result, strategy_name)
client.close()
except Exception as e:
error_msg = str(e)
if "403" in error_msg:
print(f"\n ❌ 配额不足,无法上传")
print(f" 下一步: 重新运行脚本,选择 [1] 本地回测(不占配额)")
else:
print(f"\n ❌ 服务器回测失败: {e}")
print(f" 下一步: 检查网络后重试,或选择 [1] 本地回测")
auth.close()
# ═══════════════════════════════════════════
# 主入口
# ═══════════════════════════════════════════
def run(
generate_fn,
strategy_name: str,
symbol: str,
timeframe: str,
script_path: str = None,
server_url: str | None = None,
):
"""
策略脚本的统一入口。
参数:
generate_fn: 信号生成函数 generate_signals(mode, start_date, end_date)
strategy_name: 策略名称
symbol: 交易对
timeframe: K线周期
script_path: 脚本文件路径(__file__),服务器模式需要
server_url: 服务器地址
"""
import argparse
parser = argparse.ArgumentParser(description=f"策略: {strategy_name}")
parser.add_argument("mode", nargs="?", default="backtest",
help="运行模式: backtest / live")
parser.add_argument("start_date", nargs="?", help="起始日期 YYYY-MM-DD")
parser.add_argument("end_date", nargs="?", help="结束日期 YYYY-MM-DD")
parser.add_argument("--server", default=resolve_server_url(server_url), help="服务器地址")
args = parser.parse_args()
_print_welcome()
_print_banner(strategy_name, symbol, timeframe, script_path)
if args.mode == "backtest" and not args.start_date:
print("\n 回测模式需要指定日期范围")
print(f"\n 正确用法:")
print(f" python {sys.argv[0]} backtest 2025-01-01 2025-03-01")
print(f"\n 示例:")
print(f" python {sys.argv[0]} backtest 2024-01-01 2024-12-31 # 回测 2024 全年")
print(f" python {sys.argv[0]} backtest 2025-01-01 2025-03-01 # 回测近 2 个月")
return
print()
print(f" {'━' * 45}")
print(" 📡 第 1 步 / 共 3 步:本地生成信号")
print(f" {'━' * 45}")
print(f" 模式: {'回测' if args.mode == 'backtest' else '实时'}")
if args.start_date:
print(f" 区间: {args.start_date} → {args.end_date}")
print(f"\n 正在从 Binance 拉取 K 线数据并计算指标...")
result = generate_fn(mode=args.mode, start_date=args.start_date, end_date=args.end_date)
if result.get("error"):
print(f"\n ❌ 生成信号失败: {result['error']}")
print(f" 请检查网络连接或调整时间范围后重试")
return
signals = result.get("signals", [])
if not signals:
print(f"\n ❌ 未生成任何信号")
print(f" 可能的原因:")
print(f" - 时间范围内没有触发买卖条件")
print(f" - 策略参数过于严格(如放量倍数太高)")
print(f" 建议: 扩大时间范围或放宽条件参数后重试")
return
_print_signals_summary(signals)
print(f"\n ✅ 信号生成完成")
has_script = script_path and os.path.isfile(script_path)
print()
print(f" {'━' * 45}")
print(" 🔧 第 2 步 / 共 3 步:选择回测模式")
print(f" {'━' * 45}")
print()
print(" 1️⃣ 本地回测(推荐)")
print(" 信号已在本地生成,发到服务器回测引擎出报告")
print(" 不占配额,无限制使用")
if has_script:
print()
print(" 2️⃣ 服务器回测")
print(" 上传脚本到服务器,服务器重新拉数据+生成信号+回测")
print(" 占 1 个策略配额(3 个)")
else:
print()
print(" 2️⃣ 服务器回测 — 不可用(未传入脚本路径)")
print()
print(" ❌ q 退出")
print()
while True:
choice = input(" 请选择 (1/2/q): ").strip()
if choice in ("1", "2", "q"):
break
print(" 无效输入,请输入 1、2 或 q")
if choice == "1":
_run_local_backtest(
strategy_name, symbol, timeframe,
args.start_date or "", args.end_date or "",
signals, args.server,
)
elif choice == "2":
if not has_script:
print("\n ❌ 服务器模式不可用")
print(" 原因: 调用 run() 时未传入 script_path=__file__")
print(" 请在脚本的 __main__ 中添加: script_path=__file__")
return
_run_server_backtest(
strategy_name, symbol, timeframe,
args.start_date or "", args.end_date or "",
script_path, args.server,
)
else:
print("\n 已退出。你可以随时重新运行此脚本。")
FILE:scripts/trade_executor.py
"""
Trade Executor — 通过 HyperLiquid-Claw 执行交易
依赖:
- Node.js >= 18
- npm install hyperliquid (在 HyperLiquid-Claw 目录)
- 环境变量 HYPERLIQUID_PRIVATE_KEY (交易模式)
- 环境变量 HYPERLIQUID_ADDRESS (只读模式)
调用 hyperliquid.mjs 的 CLI 命令完成交易操作。
"""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import Any
from loguru import logger
class TradeExecutor:
"""封装 HyperLiquid-Claw 交易操作。"""
def __init__(self, claw_dir: str | None = None):
if claw_dir:
self._claw_dir = Path(claw_dir)
else:
candidates = [
Path.home() / ".openclaw" / "skills" / "hyperliquid",
Path.home() / "HyperLiquid-Claw",
Path(__file__).parent.parent / "HyperLiquid-Claw",
]
self._claw_dir = next((p for p in candidates if (p / "hyperliquid.mjs").exists()), None)
if not self._claw_dir or not (self._claw_dir / "hyperliquid.mjs").exists():
raise FileNotFoundError(
"HyperLiquid-Claw not found. Install: git clone https://github.com/Rohit24567/HyperLiquid-Claw.git && cd HyperLiquid-Claw && npm install hyperliquid"
)
self._script = str(self._claw_dir / "hyperliquid.mjs")
logger.info("TradeExecutor initialized | claw_dir={}", self._claw_dir)
def _run(self, cmd: str, *args: str, timeout: int = 30) -> dict:
full_cmd = ["node", self._script, cmd, *args]
logger.debug("exec: {}", " ".join(full_cmd))
try:
result = subprocess.run(
full_cmd,
capture_output=True, text=True, timeout=timeout,
cwd=str(self._claw_dir),
)
output = result.stdout.strip()
if result.returncode != 0:
err = result.stderr.strip() or output
logger.error("command failed: {} | {}", cmd, err)
try:
return json.loads(err)
except json.JSONDecodeError:
return {"error": err}
return json.loads(output) if output else {}
except subprocess.TimeoutExpired:
logger.error("command timed out: {} ({}s)", cmd, timeout)
return {"error": f"timeout after {timeout}s"}
except json.JSONDecodeError:
return {"raw_output": output}
def get_price(self, coin: str) -> dict:
return self._run("price", coin.upper())
def get_balance(self) -> dict:
return self._run("balance")
def get_positions(self) -> dict:
return self._run("positions")
def get_orders(self) -> dict:
return self._run("orders")
def get_fills(self, limit: int = 20) -> dict:
return self._run("fills")
def market_buy(self, coin: str, size: float) -> dict:
logger.info("MARKET BUY | {} {} coins", coin, size)
return self._run("market-buy", coin.upper(), str(size))
def market_sell(self, coin: str, size: float) -> dict:
logger.info("MARKET SELL | {} {} coins", coin, size)
return self._run("market-sell", coin.upper(), str(size))
def limit_buy(self, coin: str, size: float, price: float) -> dict:
logger.info("LIMIT BUY | {} {} @ {}", coin, size, price)
return self._run("limit-buy", coin.upper(), str(size), str(price))
def limit_sell(self, coin: str, size: float, price: float) -> dict:
logger.info("LIMIT SELL | {} {} @ {}", coin, size, price)
return self._run("limit-sell", coin.upper(), str(size), str(price))
def cancel_all(self, coin: str | None = None) -> dict:
if coin:
return self._run("cancel-all", coin.upper())
return self._run("cancel-all")
def set_leverage(self, coin: str, leverage: int, cross: bool = True) -> dict:
logger.info("SET LEVERAGE | {} {}x {}", coin, leverage, "cross" if cross else "isolated")
args = [coin.upper(), str(leverage)]
if not cross:
args.extend(["--cross", "false"])
return self._run("set-leverage", *args)
def has_position(self, coin: str) -> bool:
data = self.get_positions()
positions = data.get("positions", [])
for p in positions:
pos = p.get("position", {})
if pos.get("coin", "").upper() == coin.upper():
size = float(pos.get("szi", 0))
if size != 0:
return True
return False
def get_equity(self) -> float:
data = self.get_balance()
summary = data.get("marginSummary", {})
return float(summary.get("accountValue", 0))
FILE:strategies/btc_aggressive_momentum.py
"""
BTC Aggressive Momentum Breakout
快进快出,追强势突破,EMA9/21/55 + 20周期高点突破 + RSI + 放量确认
"""
import sys
sys.path.insert(0, '/Users/lvsanli/Desktop/dex-skill/scripts')
from data_client import DataClient
from indicators import Indicators as ind
import numpy as np
PARAMS = {
'ema_fast': 9,
'ema_mid': 21,
'ema_slow': 55,
'breakout_period': 20,
'rsi_period': 14,
'rsi_entry': 62,
'rsi_exit': 48,
'vol_mult': 1.5,
'vol_period': 20,
'sl_atr': 1.8,
'tp_atr': 3.2,
'max_bars': 18,
'max_consecutive_losses': 3,
}
def generate_signals(mode='backtest', start_date=None, end_date=None):
dc = DataClient()
df = dc.get_perp_klines("BTCUSDT", "1h", start_date, end_date)
close = df["close"].values.astype(float)
high = df["high"].values.astype(float)
low = df["low"].values.astype(float)
volume = df["volume"].values.astype(float)
ema_fast = ind.ema(close, PARAMS['ema_fast'])
ema_mid = ind.ema(close, PARAMS['ema_mid'])
ema_slow = ind.ema(close, PARAMS['ema_slow'])
rsi = ind.rsi(close, PARAMS['rsi_period'])
atr = ind.atr(high, low, close, 14)
vol_ma = ind.sma(volume, PARAMS['vol_period'])
bp = PARAMS['breakout_period']
lookback = max(PARAMS['ema_slow'], bp, PARAMS['vol_period']) + 1
signals = []
in_position = False
entry_bar = 0
consecutive_losses = 0
last_entry_price = 0.0
for i in range(lookback, len(df)):
if np.isnan(ema_fast[i]) or np.isnan(ema_mid[i]) or np.isnan(ema_slow[i]):
continue
if np.isnan(rsi[i]) or np.isnan(atr[i]) or np.isnan(vol_ma[i]):
continue
if atr[i] <= 0 or vol_ma[i] <= 0:
continue
ts = str(df.iloc[i]["datetime"])
price = close[i]
if in_position:
bars_held = i - entry_bar
sl_price = last_entry_price - PARAMS['sl_atr'] * atr[entry_bar]
tp_price = last_entry_price + PARAMS['tp_atr'] * atr[entry_bar]
exit_reason = None
if price <= sl_price:
exit_reason = f"止损触发 (SL={sl_price:.1f})"
consecutive_losses += 1
elif price >= tp_price:
exit_reason = f"止盈触发 (TP={tp_price:.1f})"
consecutive_losses = 0
elif price < ema_mid[i]:
exit_reason = "跌破EMA21"
if price < last_entry_price:
consecutive_losses += 1
else:
consecutive_losses = 0
elif rsi[i] < PARAMS['rsi_exit']:
exit_reason = f"RSI={rsi[i]:.1f}<{PARAMS['rsi_exit']}"
if price < last_entry_price:
consecutive_losses += 1
else:
consecutive_losses = 0
elif bars_held >= PARAMS['max_bars']:
exit_reason = f"持仓超{PARAMS['max_bars']}根K线"
if price < last_entry_price:
consecutive_losses += 1
else:
consecutive_losses = 0
if exit_reason:
signals.append({
"timestamp": ts,
"symbol": "BTCUSDT",
"action": "sell",
"direction": "long",
"confidence": 0.8,
"reason": exit_reason,
"price_at_signal": price,
})
in_position = False
continue
if consecutive_losses >= PARAMS['max_consecutive_losses']:
if i > 0 and str(df.iloc[i]["datetime"])[:10] != str(df.iloc[i-1]["datetime"])[:10]:
consecutive_losses = 0
else:
continue
highest_n = np.max(high[i - bp:i])
trend_aligned = ema_fast[i] > ema_mid[i] > ema_slow[i]
breakout = price > highest_n
rsi_ok = rsi[i] > PARAMS['rsi_entry']
volume_surge = volume[i] > vol_ma[i] * PARAMS['vol_mult']
if trend_aligned and breakout and rsi_ok and volume_surge:
sl = price - PARAMS['sl_atr'] * atr[i]
tp = price + PARAMS['tp_atr'] * atr[i]
signals.append({
"timestamp": ts,
"symbol": "BTCUSDT",
"action": "buy",
"direction": "long",
"confidence": 0.85,
"reason": f"突破{bp}周期高点 EMA9>21>55 RSI={rsi[i]:.1f} Vol={volume[i]/vol_ma[i]:.1f}x",
"price_at_signal": price,
"suggested_stop_loss": sl,
"suggested_take_profit": tp,
})
in_position = True
entry_bar = i
last_entry_price = price
return {"strategy_name": "BTC Aggressive Momentum Breakout", "signals": signals}
if __name__ == "__main__":
result = generate_signals("backtest", "2025-01-01", "2025-12-31")
print(f"策略: {result['strategy_name']}")
print(f"信号数: {len(result['signals'])}")
for s in result['signals'][:5]:
print(f" {s['timestamp']} {s['action']} {s['direction']} @ {s['price_at_signal']:.2f} | {s['reason']}")
FILE:strategies/btc_macd_trend.py
"""
BTC MACD Trend — MACD 趋势跟踪策略
4h 周期,MACD 金叉/死叉 + EMA200 方向过滤 + 柱状图动量确认。
多空双向,中频交易。
"""
import sys
sys.path.insert(0, '/scripts')
from data_client import DataClient
from indicators import Indicators as ind
import numpy as np
PARAMS = {
'macd_fast': 12,
'macd_slow': 26,
'macd_signal': 9,
'trend_ema': 100,
'fast_ema': 50,
'rsi_period': 14,
'rsi_long_min': 45,
'rsi_short_max': 55,
'atr_period': 14,
'sl_atr': 1.5,
'tp_atr': 4.0,
'max_bars': 30,
'hist_threshold': 0,
'cooldown_bars': 3,
}
def generate_signals(mode='backtest', start_date=None, end_date=None):
dc = DataClient()
df = dc.get_perp_klines("BTCUSDT", "4h", start_date, end_date)
close = df["close"].values.astype(float)
high = df["high"].values.astype(float)
low = df["low"].values.astype(float)
macd_line, signal_line, histogram = ind.macd(close, PARAMS['macd_fast'], PARAMS['macd_slow'], PARAMS['macd_signal'])
ema_trend = ind.ema(close, PARAMS['trend_ema'])
ema_fast = ind.ema(close, PARAMS['fast_ema'])
rsi = ind.rsi(close, PARAMS['rsi_period'])
atr = ind.atr(high, low, close, PARAMS['atr_period'])
macd_cross_up = ind.crossover(macd_line, signal_line)
macd_cross_down = ind.crossunder(macd_line, signal_line)
lookback = PARAMS['trend_ema'] + 5
signals = []
in_position = False
position_side = None
entry_bar = 0
entry_price = 0.0
last_exit_bar = -999
for i in range(lookback, len(df)):
if np.isnan(ema_trend[i]) or np.isnan(rsi[i]) or np.isnan(atr[i]):
continue
if np.isnan(macd_line[i]) or np.isnan(signal_line[i]) or np.isnan(histogram[i]):
continue
if atr[i] <= 0:
continue
ts = str(df.iloc[i]["datetime"])
price = close[i]
if in_position:
bars_held = i - entry_bar
if position_side == "long":
sl_hit = price <= entry_price - PARAMS['sl_atr'] * atr[entry_bar]
tp_hit = price >= entry_price + PARAMS['tp_atr'] * atr[entry_bar]
signal_exit = macd_cross_down[i]
trend_exit = close[i] < ema_fast[i] and close[i-1] < ema_fast[i-1]
else:
sl_hit = price >= entry_price + PARAMS['sl_atr'] * atr[entry_bar]
tp_hit = price <= entry_price - PARAMS['tp_atr'] * atr[entry_bar]
signal_exit = macd_cross_up[i]
trend_exit = close[i] > ema_fast[i] and close[i-1] > ema_fast[i-1]
timeout = bars_held >= PARAMS['max_bars']
exit_reason = None
if sl_hit:
exit_reason = "stop loss"
elif tp_hit:
exit_reason = "take profit"
elif signal_exit:
exit_reason = "MACD cross exit"
elif trend_exit:
exit_reason = f"price below EMA{PARAMS['fast_ema']}"
elif timeout:
exit_reason = f"timeout {PARAMS['max_bars']} bars"
if exit_reason:
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "sell" if position_side == "long" else "buy",
"direction": position_side,
"confidence": 0.8,
"reason": exit_reason,
"price_at_signal": price,
})
in_position = False
last_exit_bar = i
continue
if i - last_exit_bar < PARAMS['cooldown_bars']:
continue
uptrend = close[i] > ema_trend[i] and ema_fast[i] > ema_trend[i]
downtrend = close[i] < ema_trend[i] and ema_fast[i] < ema_trend[i]
if macd_cross_up[i] and uptrend and histogram[i] > PARAMS['hist_threshold']:
rsi_ok = rsi[i] > PARAMS['rsi_long_min']
if rsi_ok:
sl = price - PARAMS['sl_atr'] * atr[i]
tp = price + PARAMS['tp_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "buy", "direction": "long",
"confidence": 0.85,
"reason": f"MACD golden cross above EMA{PARAMS['trend_ema']} RSI={rsi[i]:.0f}",
"price_at_signal": price,
"suggested_stop_loss": sl,
"suggested_take_profit": tp,
})
in_position = True
position_side = "long"
entry_bar = i
entry_price = price
elif macd_cross_down[i] and downtrend and histogram[i] < -PARAMS['hist_threshold']:
rsi_ok = rsi[i] < PARAMS['rsi_short_max']
if rsi_ok:
sl = price + PARAMS['sl_atr'] * atr[i]
tp = price - PARAMS['tp_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "sell", "direction": "short",
"confidence": 0.85,
"reason": f"MACD death cross below EMA{PARAMS['trend_ema']} RSI={rsi[i]:.0f}",
"price_at_signal": price,
"suggested_stop_loss": sl,
"suggested_take_profit": tp,
})
in_position = True
position_side = "short"
entry_bar = i
entry_price = price
return {"strategy_name": "BTC MACD Trend", "signals": signals}
if __name__ == "__main__":
result = generate_signals("backtest", "2025-01-01", "2025-12-31")
print(f"策略: {result['strategy_name']}")
print(f"信号数: {len(result['signals'])}")
for s in result['signals'][:10]:
print(f" {s['timestamp']} {s['action']} {s['direction']} @ {s['price_at_signal']:.2f} | {s['reason']}")
FILE:strategies/btc_rsi_momentum.py
"""
BTC RSI 动量策略
4h 周期,RSI 突破 70 追涨做多,跌破 30 追跌做空。
极度选择性入场(RSI 极值 + EMA50 趋势 + 阳/阴线确认)。
超宽 trailing stop (4x ATR),长冷却期 (10 bars)。
"""
import sys
sys.path.insert(0, '/scripts')
from data_client import DataClient
from indicators import Indicators as ind
import numpy as np
PARAMS = {
'rsi_period': 14,
'rsi_long_entry': 70,
'rsi_short_entry': 30,
'trend_ema': 50,
'atr_period': 14,
'sl_atr': 3.0,
'trail_atr': 4.0,
'max_bars': 60,
'cooldown_bars': 10,
}
def generate_signals(mode='backtest', start_date=None, end_date=None):
dc = DataClient()
df = dc.get_perp_klines("BTCUSDT", "4h", start_date, end_date)
close = df["close"].values.astype(float)
high = df["high"].values.astype(float)
low = df["low"].values.astype(float)
opn = df["open"].values.astype(float)
rsi = ind.rsi(close, PARAMS['rsi_period'])
ema_trend = ind.ema(close, PARAMS['trend_ema'])
atr = ind.atr(high, low, close, PARAMS['atr_period'])
lookback = PARAMS['trend_ema'] + 5
signals = []
in_position = False
position_side = None
entry_bar = 0
entry_price = 0.0
best_price = 0.0
trail_stop = 0.0
last_exit_bar = -999
for i in range(lookback, len(df)):
if np.isnan(rsi[i]) or np.isnan(ema_trend[i]) or np.isnan(atr[i]):
continue
if atr[i] <= 0:
continue
ts = str(df.iloc[i]["datetime"])
price = close[i]
if in_position:
bars_held = i - entry_bar
if position_side == "long":
if price > best_price:
best_price = price
trail_stop = max(trail_stop, best_price - PARAMS['trail_atr'] * atr[i])
sl_hit = price <= trail_stop
else:
if price < best_price:
best_price = price
trail_stop = min(trail_stop, best_price + PARAMS['trail_atr'] * atr[i])
sl_hit = price >= trail_stop
timeout = bars_held >= PARAMS['max_bars']
exit_reason = None
if sl_hit:
exit_reason = "trailing stop"
elif timeout:
exit_reason = f"timeout {PARAMS['max_bars']} bars"
if exit_reason:
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "sell" if position_side == "long" else "buy",
"direction": position_side,
"confidence": 0.8, "reason": exit_reason,
"price_at_signal": price,
})
in_position = False
last_exit_bar = i
continue
if i - last_exit_bar < PARAMS['cooldown_bars']:
continue
uptrend = close[i] > ema_trend[i]
downtrend = close[i] < ema_trend[i]
bullish = close[i] > opn[i]
bearish = close[i] < opn[i]
rsi_enter_long = rsi[i] > PARAMS['rsi_long_entry'] and rsi[i-1] <= PARAMS['rsi_long_entry']
rsi_enter_short = rsi[i] < PARAMS['rsi_short_entry'] and rsi[i-1] >= PARAMS['rsi_short_entry']
if rsi_enter_long and uptrend and bullish:
sl = price - PARAMS['sl_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "buy", "direction": "long",
"confidence": 0.85,
"reason": f"RSI extreme momentum {rsi[i]:.0f}, BTC uptrend",
"price_at_signal": price,
"suggested_stop_loss": sl,
})
in_position = True
position_side = "long"
entry_bar = i
entry_price = price
best_price = price
trail_stop = sl
elif rsi_enter_short and downtrend and bearish:
sl = price + PARAMS['sl_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "sell", "direction": "short",
"confidence": 0.85,
"reason": f"RSI extreme momentum {rsi[i]:.0f}, BTC downtrend",
"price_at_signal": price,
"suggested_stop_loss": sl,
})
in_position = True
position_side = "short"
entry_bar = i
entry_price = price
best_price = price
trail_stop = sl
return {"strategy_name": "BTC RSI Momentum", "signals": signals}
FILE:strategies/btc_trend_pullback.py
"""
BTC Trend Pullback — 趋势回踩策略
4h 周期,EMA50 判趋势方向,价格回踩 EMA20 后入场,ATR trailing 出场。
多空双向,低频高质量交易。
"""
import sys
sys.path.insert(0, '/scripts')
from data_client import DataClient
from indicators import Indicators as ind
import numpy as np
PARAMS = {
'trend_ema': 50,
'entry_ema': 20,
'rsi_period': 14,
'rsi_long_min': 35,
'rsi_long_max': 60,
'rsi_short_min': 40,
'rsi_short_max': 65,
'atr_period': 14,
'sl_atr': 1.5,
'tp_atr': 3.5,
'trail_atr': 2.0,
'max_bars': 40,
'cooldown_bars': 5,
}
def generate_signals(mode='backtest', start_date=None, end_date=None):
dc = DataClient()
df = dc.get_perp_klines("BTCUSDT", "4h", start_date, end_date)
close = df["close"].values.astype(float)
high = df["high"].values.astype(float)
low = df["low"].values.astype(float)
ema_trend = ind.ema(close, PARAMS['trend_ema'])
ema_entry = ind.ema(close, PARAMS['entry_ema'])
rsi = ind.rsi(close, PARAMS['rsi_period'])
atr = ind.atr(high, low, close, PARAMS['atr_period'])
lookback = PARAMS['trend_ema'] + 5
signals = []
in_position = False
position_side = None
entry_bar = 0
entry_price = 0.0
trail_stop = 0.0
last_exit_bar = -999
for i in range(lookback, len(df)):
if np.isnan(ema_trend[i]) or np.isnan(ema_entry[i]) or np.isnan(rsi[i]) or np.isnan(atr[i]):
continue
if atr[i] <= 0:
continue
ts = str(df.iloc[i]["datetime"])
price = close[i]
if in_position:
bars_held = i - entry_bar
if position_side == "long":
new_trail = price - PARAMS['trail_atr'] * atr[i]
trail_stop = max(trail_stop, new_trail)
sl_hit = low[i] <= trail_stop
tp_hit = price >= entry_price + PARAMS['tp_atr'] * atr[entry_bar]
timeout = bars_held >= PARAMS['max_bars']
trend_lost = close[i] < ema_trend[i] and close[i-1] < ema_trend[i-1]
else:
new_trail = price + PARAMS['trail_atr'] * atr[i]
trail_stop = min(trail_stop, new_trail)
sl_hit = high[i] >= trail_stop
tp_hit = price <= entry_price - PARAMS['tp_atr'] * atr[entry_bar]
timeout = bars_held >= PARAMS['max_bars']
trend_lost = close[i] > ema_trend[i] and close[i-1] > ema_trend[i-1]
exit_reason = None
if sl_hit:
exit_reason = "trailing stop"
elif tp_hit:
exit_reason = "take profit"
elif timeout:
exit_reason = f"timeout {PARAMS['max_bars']} bars"
elif trend_lost:
exit_reason = "trend reversed"
if exit_reason:
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "sell" if position_side == "long" else "buy",
"direction": position_side,
"confidence": 0.8,
"reason": exit_reason,
"price_at_signal": price,
})
in_position = False
last_exit_bar = i
continue
if i - last_exit_bar < PARAMS['cooldown_bars']:
continue
uptrend = close[i] > ema_trend[i] and ema_entry[i] > ema_trend[i]
downtrend = close[i] < ema_trend[i] and ema_entry[i] < ema_trend[i]
touched_entry_ema = low[i] <= ema_entry[i] * 1.005 and close[i] > ema_entry[i]
touched_entry_ema_short = high[i] >= ema_entry[i] * 0.995 and close[i] < ema_entry[i]
if uptrend and touched_entry_ema:
rsi_ok = PARAMS['rsi_long_min'] <= rsi[i] <= PARAMS['rsi_long_max']
if rsi_ok:
sl = price - PARAMS['sl_atr'] * atr[i]
tp = price + PARAMS['tp_atr'] * atr[i]
trail_stop = price - PARAMS['trail_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "buy", "direction": "long",
"confidence": 0.85,
"reason": f"uptrend pullback to EMA{PARAMS['entry_ema']} RSI={rsi[i]:.0f}",
"price_at_signal": price,
"suggested_stop_loss": sl,
"suggested_take_profit": tp,
})
in_position = True
position_side = "long"
entry_bar = i
entry_price = price
elif downtrend and touched_entry_ema_short:
rsi_ok = PARAMS['rsi_short_min'] <= rsi[i] <= PARAMS['rsi_short_max']
if rsi_ok:
sl = price + PARAMS['sl_atr'] * atr[i]
tp = price - PARAMS['tp_atr'] * atr[i]
trail_stop = price + PARAMS['trail_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "BTCUSDT",
"action": "sell", "direction": "short",
"confidence": 0.85,
"reason": f"downtrend pullback to EMA{PARAMS['entry_ema']} RSI={rsi[i]:.0f}",
"price_at_signal": price,
"suggested_stop_loss": sl,
"suggested_take_profit": tp,
})
in_position = True
position_side = "short"
entry_bar = i
entry_price = price
return {"strategy_name": "BTC Trend Pullback", "signals": signals}
if __name__ == "__main__":
result = generate_signals("backtest", "2025-01-01", "2025-12-31")
print(f"策略: {result['strategy_name']}")
print(f"信号数: {len(result['signals'])}")
for s in result['signals'][:10]:
print(f" {s['timestamp']} {s['action']} {s['direction']} @ {s['price_at_signal']:.2f} | {s['reason']}")
FILE:strategies/sol_kdj_swing.py
"""
SOL KDJ Swing — KDJ 超买超卖摆动策略
4h 周期,KDJ 金叉/死叉 + EMA50 趋势过滤。
多空双向,适合 SOL 高波动特性。
"""
import sys
sys.path.insert(0, '/scripts')
from data_client import DataClient
from indicators import Indicators as ind
import numpy as np
PARAMS = {
'k_period': 9,
'd_period': 3,
'j_smooth': 3,
'trend_ema': 50,
'k_oversold': 20,
'k_overbought': 80,
'rsi_period': 14,
'rsi_confirm_long': 40,
'rsi_confirm_short': 60,
'atr_period': 14,
'sl_atr': 1.8,
'tp_atr': 3.5,
'max_bars': 20,
'cooldown_bars': 2,
}
def generate_signals(mode='backtest', start_date=None, end_date=None):
dc = DataClient()
df = dc.get_perp_klines("SOLUSDT", "4h", start_date, end_date)
close = df["close"].values.astype(float)
high = df["high"].values.astype(float)
low = df["low"].values.astype(float)
k_val, d_val, j_val = ind.kdj(high, low, close, PARAMS['k_period'], PARAMS['d_period'], PARAMS['j_smooth'])
ema_trend = ind.ema(close, PARAMS['trend_ema'])
rsi = ind.rsi(close, PARAMS['rsi_period'])
atr = ind.atr(high, low, close, PARAMS['atr_period'])
k_cross_up = ind.crossover(k_val, d_val)
k_cross_down = ind.crossunder(k_val, d_val)
lookback = PARAMS['trend_ema'] + 5
signals = []
in_position = False
position_side = None
entry_bar = 0
entry_price = 0.0
last_exit_bar = -999
for i in range(lookback, len(df)):
if np.isnan(k_val[i]) or np.isnan(ema_trend[i]) or np.isnan(rsi[i]) or np.isnan(atr[i]):
continue
if atr[i] <= 0:
continue
ts = str(df.iloc[i]["datetime"])
price = close[i]
if in_position:
bars_held = i - entry_bar
if position_side == "long":
sl_hit = price <= entry_price - PARAMS['sl_atr'] * atr[entry_bar]
tp_hit = price >= entry_price + PARAMS['tp_atr'] * atr[entry_bar]
signal_exit = k_val[i] > PARAMS['k_overbought'] and k_cross_down[i]
else:
sl_hit = price >= entry_price + PARAMS['sl_atr'] * atr[entry_bar]
tp_hit = price <= entry_price - PARAMS['tp_atr'] * atr[entry_bar]
signal_exit = k_val[i] < PARAMS['k_oversold'] and k_cross_up[i]
timeout = bars_held >= PARAMS['max_bars']
exit_reason = None
if sl_hit:
exit_reason = "stop loss"
elif tp_hit:
exit_reason = "take profit"
elif signal_exit:
exit_reason = "KDJ overbought/oversold exit"
elif timeout:
exit_reason = f"timeout {PARAMS['max_bars']} bars"
if exit_reason:
signals.append({
"timestamp": ts, "symbol": "SOLUSDT",
"action": "sell" if position_side == "long" else "buy",
"direction": position_side,
"confidence": 0.8,
"reason": exit_reason,
"price_at_signal": price,
})
in_position = False
last_exit_bar = i
continue
if i - last_exit_bar < PARAMS['cooldown_bars']:
continue
uptrend = close[i] > ema_trend[i]
downtrend = close[i] < ema_trend[i]
if k_cross_up[i] and k_val[i-1] < PARAMS['k_oversold'] and uptrend:
rsi_ok = rsi[i] > PARAMS['rsi_confirm_long']
if rsi_ok:
sl = price - PARAMS['sl_atr'] * atr[i]
tp = price + PARAMS['tp_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "SOLUSDT",
"action": "buy", "direction": "long",
"confidence": 0.85,
"reason": f"KDJ golden cross from oversold K={k_val[i]:.0f} RSI={rsi[i]:.0f}",
"price_at_signal": price,
"suggested_stop_loss": sl,
"suggested_take_profit": tp,
})
in_position = True
position_side = "long"
entry_bar = i
entry_price = price
elif k_cross_down[i] and k_val[i-1] > PARAMS['k_overbought'] and downtrend:
rsi_ok = rsi[i] < PARAMS['rsi_confirm_short']
if rsi_ok:
sl = price + PARAMS['sl_atr'] * atr[i]
tp = price - PARAMS['tp_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "SOLUSDT",
"action": "sell", "direction": "short",
"confidence": 0.85,
"reason": f"KDJ death cross from overbought K={k_val[i]:.0f} RSI={rsi[i]:.0f}",
"price_at_signal": price,
"suggested_stop_loss": sl,
"suggested_take_profit": tp,
})
in_position = True
position_side = "short"
entry_bar = i
entry_price = price
return {"strategy_name": "SOL KDJ Swing", "signals": signals}
if __name__ == "__main__":
result = generate_signals("backtest", "2025-01-01", "2025-12-31")
print(f"策略: {result['strategy_name']}")
print(f"信号数: {len(result['signals'])}")
for s in result['signals'][:10]:
print(f" {s['timestamp']} {s['action']} {s['direction']} @ {s['price_at_signal']:.2f} | {s['reason']}")
FILE:strategies/sol_rsi_momentum.py
"""
SOL RSI 动量策略
4h 周期,RSI 进入超买区 (>65) 追涨做多(动量策略),
RSI 进入超卖区 (<35) 追跌做空。
EMA50 趋势过滤,只做顺势。宽 trailing stop。
"""
import sys
sys.path.insert(0, '/scripts')
from data_client import DataClient
from indicators import Indicators as ind
import numpy as np
PARAMS = {
'rsi_period': 14,
'rsi_long_entry': 65,
'rsi_short_entry': 35,
'trend_ema': 50,
'atr_period': 14,
'sl_atr': 2.0,
'trail_atr': 3.0,
'max_bars': 40,
'cooldown_bars': 5,
}
def generate_signals(mode='backtest', start_date=None, end_date=None):
dc = DataClient()
df = dc.get_perp_klines("SOLUSDT", "4h", start_date, end_date)
close = df["close"].values.astype(float)
high = df["high"].values.astype(float)
low = df["low"].values.astype(float)
rsi = ind.rsi(close, PARAMS['rsi_period'])
ema_trend = ind.ema(close, PARAMS['trend_ema'])
atr = ind.atr(high, low, close, PARAMS['atr_period'])
lookback = PARAMS['trend_ema'] + 5
signals = []
in_position = False
position_side = None
entry_bar = 0
entry_price = 0.0
best_price = 0.0
trail_stop = 0.0
last_exit_bar = -999
for i in range(lookback, len(df)):
if np.isnan(rsi[i]) or np.isnan(ema_trend[i]) or np.isnan(atr[i]):
continue
if atr[i] <= 0:
continue
ts = str(df.iloc[i]["datetime"])
price = close[i]
if in_position:
bars_held = i - entry_bar
if position_side == "long":
if price > best_price:
best_price = price
trail_stop = max(trail_stop, best_price - PARAMS['trail_atr'] * atr[i])
sl_hit = price <= trail_stop
else:
if price < best_price:
best_price = price
trail_stop = min(trail_stop, best_price + PARAMS['trail_atr'] * atr[i])
sl_hit = price >= trail_stop
timeout = bars_held >= PARAMS['max_bars']
exit_reason = None
if sl_hit:
exit_reason = "trailing stop"
elif timeout:
exit_reason = f"timeout {PARAMS['max_bars']} bars"
if exit_reason:
signals.append({
"timestamp": ts, "symbol": "SOLUSDT",
"action": "sell" if position_side == "long" else "buy",
"direction": position_side,
"confidence": 0.8, "reason": exit_reason,
"price_at_signal": price,
})
in_position = False
last_exit_bar = i
continue
if i - last_exit_bar < PARAMS['cooldown_bars']:
continue
uptrend = close[i] > ema_trend[i]
downtrend = close[i] < ema_trend[i]
rsi_enter_long = rsi[i] > PARAMS['rsi_long_entry'] and rsi[i-1] <= PARAMS['rsi_long_entry']
rsi_enter_short = rsi[i] < PARAMS['rsi_short_entry'] and rsi[i-1] >= PARAMS['rsi_short_entry']
if rsi_enter_long and uptrend:
sl = price - PARAMS['sl_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "SOLUSDT",
"action": "buy", "direction": "long",
"confidence": 0.8,
"reason": f"RSI momentum entry {rsi[i]:.0f}, uptrend",
"price_at_signal": price,
"suggested_stop_loss": sl,
})
in_position = True
position_side = "long"
entry_bar = i
entry_price = price
best_price = price
trail_stop = sl
elif rsi_enter_short and downtrend:
sl = price + PARAMS['sl_atr'] * atr[i]
signals.append({
"timestamp": ts, "symbol": "SOLUSDT",
"action": "sell", "direction": "short",
"confidence": 0.8,
"reason": f"RSI momentum entry {rsi[i]:.0f}, downtrend",
"price_at_signal": price,
"suggested_stop_loss": sl,
})
in_position = True
position_side = "short"
entry_bar = i
entry_price = price
best_price = price
trail_stop = sl
return {"strategy_name": "SOL RSI Momentum", "signals": signals}