@clawhub-carcloud-ml-aaa447dce3
米游社工具插件。当用户消息包含以下任意关键词时触发:米游社登录、米游社绑定、米游社账号、米游社解绑、米游社任务、米游币任务、米游社签到、原神便笺、星铁便笺、开拓力、树脂、米游币商品、米游社兑换、米游社帮助、扫码补全stoken。支持短信登录、扫码登录、Cookie 登录三种方式。
---
name: mystool
description: 米游社工具插件。当用户消息包含以下任意关键词时触发:米游社登录、米游社绑定、米游社账号、米游社解绑、米游社任务、米游币任务、米游社签到、原神便笺、星铁便笺、开拓力、树脂、米游币商品、米游社兑换、米游社帮助、扫码补全stoken。支持短信登录、扫码登录、Cookie 登录三种方式。
metadata: {"openclaw":{"emoji":"🎮"}}
---
# mystool — 米游社工具
## 执行规则
收到触发关键词后,**必须**按以下步骤执行,不得直接用文字回复:
### Step 1:运行 plugin.py
```bash
python3 skills/mystool/plugin.py <用户消息>
```
或在 Python 中调用:
```python
import asyncio, sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent / "skills" / "mystool"))
from plugin import handle
# sender_id: Telegram 用 chat_id,QQBot 用 openid,webchat 用 "webchat_user"
reply = asyncio.run(handle(
message="<用户消息原文>",
user_id="<sender_id>"
))
```
### Step 2:发送回复
- **handle() 返回字符串** → 将该字符串原样发送给用户(不要修改、不要翻译、不要加前缀)
- **handle() 返回 None** → 说明不是 mystool 指令,正常处理
### Step 3:渠道特殊处理
#### Telegram
```python
# sender_id = Telegram chat_id(从系统消息获取)
reply = asyncio.run(handle(message, user_id=str(chat_id)))
# 直接回复即可,OpenClaw 自动路由到 Telegram
```
#### QQBot
```python
# sender_id = QQ openid(从系统消息获取)
reply = asyncio.run(handle(message, user_id=openid))
# 直接回复即可,OpenClaw 自动路由到 QQBot
```
---
## 指令列表
| 指令 | 说明 |
|------|------|
| `米游社登录` | 选择登录方式(短信/扫码/Cookie) |
| `1` / `短信登录` | 短信验证码登录 |
| `2` / `扫码登录` | 扫码登录 |
| `3` / `Cookie登录` | 手动粘贴 Cookie |
| `扫码补全stoken` | Cookie 登录后补全 stoken |
| `米游社账号` | 查看已绑定账号 |
| `米游社解绑 [uid]` | 解绑账号 |
| `米游社任务` | 执行每日米游币任务 |
| `米游社签到 [游戏]` | 游戏签到(原神/星铁/崩3,仅国服) |
| `原神便笺` | 查看原神树脂等状态 |
| `星铁便笺` | 查看星穹铁道便笺 |
| `米游币商品 [游戏]` | 查看可兑换商品 |
| `米游社兑换 <ID> [数量]` | 兑换商品 |
| `预约兑换 <商品ID>` | 预约定时抢购 |
| `预约列表` | 查看预约列表 |
| `取消预约 <商品ID>` | 取消预约 |
| `预约兑换 <商品ID>` | 预约定时抢购 |
| `预约列表` | 查看预约列表 |
| `取消预约 <商品ID>` | 取消预约 |
| `生成识别码` | 获取跨平台绑定识别码 |
| `链接账号 <识别码>` | 使用识别码链接账号 |
| `设置代理 <URL>` | 设置 IP 池代理地址 |
| `查看代理` | 查看代理配置 |
| `米游社帮助` | 显示帮助 |
---
## 登录方式说明
### 1. 短信登录
- 输入手机号 → 接收验证码 → 自动获取完整 Cookie
- 参考实现:https://github.com/NuoManDai/mihoyo_sms_login
### 2. 扫码登录
- 生成二维码 → 用米游社App 扫码 → 自动获取完整 Cookie
- 提示文字:"用米游社App 扫码,自动获取完整 Cookie"
### 3. Cookie 登录
- 直接粘贴 Cookie 字符串
- 如果缺少 stoken,系统会提示扫码补全
---
## 定时任务
每天 **07:20 (Asia/Shanghai)** 自动执行:
| 任务 | 命令 |
|------|------|
| 每日米游币任务 | `python3 skills/mystool/runner.py tasks` |
| 游戏签到 | `python3 skills/mystool/runner.py sign` |
结果通过 OpenClaw cron 系统推送到配置的通知渠道。
执行日志保存到 `log/` 文件夹,文件名格式:`YYYY-MM-DD_HH-MM-SS.log`
---
## 文件结构
```
skills/mystool/
├── plugin.py # 主入口 handle(message, user_id)
├── runner.py # cron 入口,带日志记录
├── channel.py # 渠道工具
├── SKILL.md # 本文件
├── README.md # 使用说明
├── log/ # 执行日志目录
└── src/
├── api.py # 米游社 API(仅国服签到)
├── login.py # 旧版扫码登录(保留兼容)
├── qr_login.py # 新版扫码登录模块
├── sms_login.py # 短信登录模块
├── stoken_qr_login.py # stoken 补全扫码模块
├── store.py # 数据存储
└── formatter.py # 消息格式化
```
## 依赖
```bash
pip3 install httpx
# 可选(终端二维码):
pip3 install qrcode
# 短信登录需要:
pip3 install pycryptodome
```
FILE:runner.py
#!/usr/bin/env python3
"""
mystool cron runner — 供 OpenClaw cron 定时调用
用法:
python runner.py tasks # 执行所有用户的每日米游币任务
python runner.py sign # 执行所有用户的游戏签到
python runner.py exchange <goods_id> [user_id] # 兑换商品(可指定用户)
python runner.py schedule # 检查并执行到期的预约抢购
日志:
执行记录保存到 log/ 文件夹,文件名格式:YYYY-MM-DD_HH-MM-SS.log
"""
import asyncio
import json
import sys
import time
from pathlib import Path
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent))
from plugin import (
run_daily_tasks_all,
run_sign_all,
cmd_exchange,
get_user_accounts,
get_address_list,
do_exchange,
get_good_detail,
_load_exchange_schedules,
_save_exchange_schedules,
GAME_BIZ,
)
def write_log(mode: str, content: str):
"""写入日志文件"""
log_dir = Path(__file__).parent / "log"
log_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_file = log_dir / f"{timestamp}_{mode}.log"
header = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] mystool {mode}\n"
separator = "=" * 50 + "\n"
with open(log_file, "w", encoding="utf-8") as f:
f.write(header)
f.write(separator)
f.write(content)
f.write("\n")
return log_file
async def run_exchange(goods_id: str, user_id: str = None, max_retries: int = 10) -> str:
"""
执行商品兑换(带重试机制)
:param goods_id: 商品ID
:param user_id: 用户ID(可选,不指定则对所有用户执行)
:param max_retries: 最大重试次数
:return: 执行结果
"""
results = []
# 获取要执行的用户列表
if user_id:
user_ids = [user_id]
else:
from plugin import get_all_user_ids
user_ids = get_all_user_ids()
if not user_ids:
return "⚠️ 没有绑定账号的用户"
for uid in user_ids:
accounts = get_user_accounts(uid)
if not accounts:
continue
acc = accounts[0]
cookies = acc["cookies"]
nickname = acc.get("nickname", acc["uid"])
games = acc.get("games", [])
if not games:
results.append(f"❌ [{nickname}] 未找到游戏账号信息")
continue
g = games[0]
game_biz = g.get("game_biz", "hk4e_cn")
game_uid = g.get("game_uid", "")
region = g.get("region", "cn_gf01")
# 获取地址
ok_addr, addresses = await get_address_list(cookies)
address_id = None
if ok_addr and addresses:
for addr in addresses:
if addr.get("is_default"):
address_id = addr.get("id")
break
if not address_id and addresses:
address_id = addresses[0].get("id")
# 获取商品信息
ok_detail, detail = await get_good_detail(goods_id)
goods_name = detail.get("name") or detail.get("goods_name", goods_id) if ok_detail else goods_id
results.append(f"🛒 [{nickname}] 开始抢购: {goods_name}")
# 重试机制
for attempt in range(max_retries):
ok, msg, next_time = await do_exchange(
cookies=cookies,
goods_id=goods_id,
uid=game_uid,
region=region,
game_biz=game_biz,
address_id=address_id,
)
if ok:
results.append(f"✅ [{nickname}] {msg}")
break
elif "库存不足" in msg:
# 库存不足,等待后重试
if attempt < max_retries - 1:
results.append(f"⏳ [{nickname}] 第{attempt+1}次尝试: {msg},等待重试...")
await asyncio.sleep(0.5)
else:
results.append(f"❌ [{nickname}] 重试{max_retries}次后仍库存不足")
else:
# 其他错误,不重试
results.append(f"❌ [{nickname}] {msg}")
break
return "\n".join(results)
async def run_scheduled_exchanges() -> str:
"""
检查并执行到期的预约抢购
删除已过期的预约
"""
schedules = _load_exchange_schedules()
now = time.time()
results = []
executed_count = 0
expired_count = 0
for user_id, user_schedules in list(schedules.items()):
for schedule in list(user_schedules):
next_time = schedule.get("next_time", 0)
goods_id = schedule.get("goods_id", "")
goods_name = schedule.get("goods_name", goods_id)
# 已过期(超过开抢时间1小时)
if next_time > 0 and now > next_time + 3600:
user_schedules.remove(schedule)
expired_count += 1
results.append(f"🗑️ 已过期: {goods_name} (用户 {user_id})")
continue
# 到期执行
if next_time > 0 and now >= next_time:
result = await run_exchange(goods_id, user_id, max_retries=10)
results.append(result)
executed_count += 1
# 执行后从预约列表移除
user_schedules.remove(schedule)
# 保存更新后的预约列表
_save_exchange_schedules(schedules)
if executed_count == 0 and expired_count == 0:
return "✅ 没有需要执行的预约抢购"
summary = f"📊 预约抢购执行完成:执行 {executed_count} 个,清理过期 {expired_count} 个"
return "\n".join(results) + f"\n\n{summary}"
async def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "tasks"
if mode == "tasks":
result = await run_daily_tasks_all()
print(result)
log_file = write_log("tasks", result)
print(f"\n[日志已保存] {log_file}")
elif mode == "sign":
result = await run_sign_all()
print(result)
log_file = write_log("sign", result)
print(f"\n[日志已保存] {log_file}")
elif mode == "exchange":
if len(sys.argv) < 3:
print("用法: python runner.py exchange <goods_id> [user_id]")
sys.exit(1)
goods_id = sys.argv[2]
user_id = sys.argv[3] if len(sys.argv) > 3 else None
result = await run_exchange(goods_id, user_id)
print(result)
log_file = write_log(f"exchange_{goods_id}", result)
print(f"\n[日志已保存] {log_file}")
elif mode == "schedule":
result = await run_scheduled_exchanges()
print(result)
log_file = write_log("schedule", result)
print(f"\n[日志已保存] {log_file}")
else:
print(f"未知模式: {mode},支持 tasks / sign / exchange / schedule")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
FILE:channel.py
"""
mystool 渠道接入层
OpenClaw 的 Telegram / QQBot 消息到达时,
在 SKILL.md 或 agent 系统提示中调用此模块处理。
用法(在 agent 处理消息时):
import asyncio, sys
sys.path.insert(0, str(Path(__file__).parent))
from channel import dispatch
reply = asyncio.run(dispatch(message_text, sender_id, channel))
# reply 为 None 表示不是 mystool 指令,忽略即可
"""
import asyncio
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
sys.path.insert(0, str(Path(__file__).parent))
from plugin import handle
def _log_command(user_id: str, command: str, result: str):
"""记录用户命令日志(仅记录错误)"""
if not result:
return
# 判断是否为错误
is_error = "❌" in result or "失败" in result or "错误" in result or "异常" in result
if not is_error:
return
log_dir = Path(__file__).parent / "log"
log_dir.mkdir(exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
log_file = log_dir / f"{today}.log"
timestamp = datetime.now().strftime("%H:%M:%S")
# 完整记录错误信息
result_line = result.replace("\n", " | ").replace("\r", "")
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] ❌ user={user_id} cmd={command[:50]} → {result_line}\n")
def _log_error(user_id: str, command: str, error: Exception):
"""记录 Python 运行时异常"""
import traceback
log_dir = Path(__file__).parent / "log"
log_dir.mkdir(exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
log_file = log_dir / f"{today}.log"
timestamp = datetime.now().strftime("%H:%M:%S")
tb = traceback.format_exc()
with open(log_file, "a", encoding="utf-8") as f:
f.write(f"\n[{timestamp}] 💥 RUNTIME ERROR user={user_id} cmd={command[:50]}\n")
f.write(f"Error: {error}\n")
f.write(f"{tb}\n")
async def dispatch(message: str, sender_id: str = "", channel: str = "") -> Optional[str]:
"""
分发消息到 mystool 处理器。
:param message: 用户发送的原始文本
:param sender_id: 发送者 ID(Telegram chat_id / QQ openid 等)
:param channel: 渠道标识(telegram / qqbot)
:return: 回复文本,None 表示不是 mystool 指令
"""
# 用 sender_id 作为账号隔离 key
user_key = sender_id or f"{channel}_default"
try:
reply = await handle(message, user_id=user_key)
except Exception as e:
# 记录运行时异常
_log_error(user_key, message.strip(), e)
reply = f"❌ 程序异常: {type(e).__name__}: {e}"
# 记录错误日志
if reply:
_log_command(user_key, message.strip(), reply)
return reply
# ── 快捷函数(供 cron agentTurn 直接 import 使用)────────────────────────────
async def push_to_channels(text: str):
"""
将文本同时推送到 Telegram 和 QQBot。
在 cron agentTurn 中通过 exec 调用此函数。
"""
# 这里只是占位,实际推送由 cron agentTurn 中的 message 工具完成
# 此函数供 runner.py 调用后打印结果,由 agent 负责发送
print(text)
FILE:README.md
# mystool - 米游社工具 (OpenClaw Skill)
米游社自动化工具,支持短信登录、扫码登录、Cookie 登录三种方式。
## 功能
- 🔐 **账号管理**:短信/扫码/Cookie 三种登录方式,支持跨平台账号绑定
- 📅 **每日任务**:自动完成米游币任务(打卡、浏览、点赞、分享)
- 🗓️ **游戏签到**:原神、星穹铁道、崩坏3、崩坏学园2、未定事件簿、绝区零(国服)
- 📊 **实时便笺**:查看树脂、开拓力等游戏状态
- 🛒 **商品兑换**:米游币兑换游戏周边,支持预约抢购
- 🔗 **跨平台绑定**:Telegram / QQ 共享米游社账号数据
- 🌐 **代理配置**:支持移动端 IP 池代理,绕过短信频率限制
## 安装
```bash
# 基础依赖
pip3 install httpx
# 短信登录需要
pip3 install pycryptodome
# 可选(终端二维码显示)
pip3 install qrcode
```
## 使用
### 指令列表
| 指令 | 说明 |
|------|------|
| `米游社登录` | 选择登录方式(短信/扫码/Cookie) |
| `米游社账号` | 查看已绑定账号 |
| `米游社解绑 [uid]` | 解绑账号 |
| `米游社任务` | 执行每日米游币任务 |
| `米游社签到 [游戏]` | 游戏签到(原神/星铁/崩3等) |
| `原神便笺` | 查看原神状态 |
| `星铁便笺` | 查看星铁状态 |
| `米游币商品 [游戏]` | 查看可兑换商品 |
| `米游社兑换 <ID> [数量]` | 兑换商品 |
| `预约兑换 <商品ID>` | 预约定时抢购 |
| `预约列表` | 查看预约列表 |
| `取消预约 <商品ID>` | 取消预约 |
| `生成识别码` | 获取跨平台绑定识别码 |
| `链接账号 <识别码>` | 使用识别码链接账号 |
| `设置代理 <URL>` | 设置 IP 池代理地址 |
| `查看代理` | 查看代理配置和剩余次数 |
| `米游社帮助` | 显示完整帮助 |
### 跨平台账号绑定
Telegram 和 QQ 用户可共享米游社账号数据:
**Telegram → QQ**:
1. Telegram 用户发送 `生成识别码` → 获得 6 位识别码
2. QQ 用户发送 `链接账号 XXXXXX` → 共享数据
**QQ → Telegram**:
1. QQ 用户发送 `生成识别码` → 获得 6 位识别码
2. Telegram 用户发送 `链接账号 XXXXXX` → 共享数据
识别码有效期 10 分钟。
### 短信登录代理
短信登录需要移动端 IP 池代理来绕过频率限制:
1. 注册代理服务(如 https://www.xkdaili.com)
2. 获取 API 提取链接
3. 发送 `设置代理 <API_URL>` 配置
代理配置自动管理:
- 每小时最多提取 20 次
- 每次提取间隔 30 秒冷却
- 配置存储在 `data/proxy_config.json`
### 定时任务
每天 **07:20 (Asia/Shanghai)** 自动执行:
```bash
# 每日米游币任务
python3 skills/mystool/runner.py tasks
# 游戏签到
python3 skills/mystool/runner.py sign
```
执行日志保存到 `log/` 目录,文件名格式:`YYYY-MM-DD_HH-MM-SS.log`
### 商品兑换预约
当商品库存不足时,自动显示下次补货时间并支持预约:
1. 发送 `米游社兑换 <商品ID>` → 兑换商品
2. 库存不足时显示补货时间
3. 发送 `预约兑换 <商品ID>` → 预约定时抢购
4. 发送 `预约列表` → 查看全部预约
## 文件结构
```
mystool/
├── plugin.py # 主入口 handle(message, user_id)
├── runner.py # cron 入口,带日志记录
├── channel.py # 渠道工具
├── SKILL.md # OpenClaw Skill 配置
├── README.md # 本文件
├── data/ # 用户数据目录
│ ├── accounts.json # 账号数据
│ ├── link_codes.json # 跨平台绑定识别码
│ ├── proxy_config.json # 代理配置
│ ├── sms_sessions.json # 短信登录会话
│ └── exchange_schedules.json # 预约抢购列表
├── log/ # 执行日志目录
└── src/
├── api.py # 米游社 API(仅国服)
├── login.py # 旧版扫码登录(兼容)
├── qr_login.py # 扫码登录模块
├── sms_login.py # 短信登录模块(含代理支持)
├── stoken_qr_login.py # stoken 补全扫码模块
├── store.py # 数据存储(含跨平台绑定)
└── formatter.py # 消息格式化
```
## 参考
- 短信登录:https://github.com/NuoManDai/mihoyo_sms_login
- MihoyoBBSTools:https://github.com/Womsxd/MihoyoBBSTools
- nonebot-plugin-mystool:https://github.com/Ljzd-PRO/nonebot-plugin-mystool
- Mys_Goods_Tool:https://github.com/Ljzd-PRO/Mys_Goods_Tool
## License
MIT
FILE:plugin.py
"""
mystool — 米游社工具 OpenClaw Skill 入口
支持指令:
米游社登录 选择登录方式(短信/扫码/Cookie)
米游社绑定 <cookie> 手动绑定账号(Cookie 登录)
米游社账号 查看已绑定账号
米游社解绑 <uid> 解绑账号
米游社任务 执行每日米游币任务
米游社签到 [游戏] 游戏签到(原神/星铁/崩3,国服)
原神便笺 查看原神实时便笺
星铁便笺 查看星穹铁道便笺
米游币商品 [游戏] 查看商品列表
米游社兑换 <商品ID> [数量] 兑换商品
米游社帮助 显示帮助
"""
import asyncio
import json
import os
import re
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
_HERE = Path(__file__).parent
sys.path.insert(0, str(_HERE / "src"))
from api import (
do_bbs_tasks,
do_exchange,
game_sign,
get_account_list_by_game,
get_address_list,
get_genshin_note,
get_good_detail,
get_good_list,
get_myb_balance,
get_starrail_note,
SIGN_APIS,
)
from store import (
add_account,
clear_login_session,
clear_sms_session,
extract_uid_from_cookies,
get_all_user_ids,
get_login_session,
get_sms_session,
get_user_accounts,
parse_cookie_string,
remove_account,
save_login_session,
save_sms_session,
validate_cookies,
generate_link_code,
verify_link_code,
merge_user_accounts,
)
from qr_login import (
fetch_qrcode,
game_token_to_cookies,
qrcode_to_image_url,
query_qrcode,
)
from sms_login import (
sms_send_captcha,
sms_login_by_captcha,
sms_get_full_cookies,
)
from formatter import (
format_account_list,
format_genshin_note,
format_good_list,
format_sign_results,
format_starrail_note,
format_task_result,
)
# ── 游戏别名映射 ───────────────────────────────────────────────────────────────
GAME_ALIASES = {
# 原神
"原神": "genshin",
"ys": "genshin",
"genshin": "genshin",
# 星穹铁道
"星穹铁道": "starrail",
"星铁": "starrail",
"hkrpg": "starrail",
"starrail": "starrail",
# 崩坏3
"崩坏3": "honkai3",
"bh3": "honkai3",
"honkai3": "honkai3",
# 崩坏学园2
"崩坏2": "honkai2",
"崩坏学园2": "honkai2",
"honkai2": "honkai2",
# 未定事件簿
"未定": "tearsofthemis",
"未定事件簿": "tearsofthemis",
"tearsofthemis": "tearsofthemis",
# 绝区零
"绝区零": "zzz",
"zzz": "zzz",
}
# 游戏对应服务器区域(国服)
GAME_REGIONS = {
"genshin": "cn_gf01",
"starrail": "prod_gf_cn",
"honkai3": "cn01",
"honkai2": "cn01",
"tearsofthemis": "cn01",
"zzz": "prod_gf_cn",
}
# 游戏对应的 game_biz
GAME_BIZ = {
"genshin": "hk4e_cn",
"starrail": "hkrpg_cn",
"honkai3": "bh3_cn",
"honkai2": "bh2_cn",
"tearsofthemis": "nxx_cn",
"zzz": "nap_cn",
}
# ── 登录菜单 ───────────────────────────────────────────────────────────────────
LOGIN_MENU = """🔐 请选择登录方式:
1️⃣ 短信登录
发送短信验证码到手机,自动获取完整 Cookie
2️⃣ 扫码登录
用米游社App 扫码,自动获取完整 Cookie
3️⃣ Cookie 登录
直接粘贴 Cookie 字符串导入
回复 1/2/3 选择,或发送对应关键词"""
# ── 主处理函数 ─────────────────────────────────────────────────────────────────
async def handle(message: str, user_id: str = "default") -> Optional[str]:
"""
处理用户消息,返回回复文本。
返回 None 表示不响应。
"""
msg = message.strip()
# ── 登录入口 ──────────────────────────────────────────────────────────────
if msg in ("米游社登录", "登录米游社", "登录"):
return LOGIN_MENU
# ── 选择登录方式 ──────────────────────────────────────────────────────────
if msg in ("1", "短信登录", "1.短信登录", "1. 短信登录"):
return await cmd_sms_login_start(user_id)
if msg in ("2", "扫码登录", "2.扫码登录", "2. 扫码登录"):
return await cmd_qr_login_start(user_id)
if msg in ("3", "cookie登录", "ck登录", "3.cookie登录", "3. cookie登录",
"cookie", "Cookie", "CK", "ck"):
return (
"请直接发送你的 Cookie 字符串,格式:\n\n"
"`account_id=xxx; cookie_token=xxx; ...`\n\n"
"💡 获取方式:登录 https://bbs.mihoyo.com 后,"
"打开浏览器开发者工具 → Application → Cookies 复制全部内容\n\n"
"⚠️ 如果 Cookie 中缺少 stoken,系统会提示你扫码补全"
)
# ── 短信登录流程(手机号 / 验证码)───────────────────────────────────────
if get_sms_session(user_id):
return await cmd_sms_login_verify(user_id, msg)
# ── 短信验证码输入 ────────────────────────────────────────────────────────
if msg.isdigit() and len(msg) == 6:
# 可能是短信验证码
return await cmd_sms_login_verify(user_id, msg)
# ── Cookie 直接导入 ───────────────────────────────────────────────────────
if "account_id=" in msg or "cookie_token=" in msg or "stoken=" in msg:
return await cmd_bind(user_id, msg)
# ── 扫码状态查询 ──────────────────────────────────────────────────────────
if msg in ("米游社登录状态", "登录状态", "扫码确认", "确认登录"):
return await cmd_qr_login_poll(user_id)
# ── 查看账号 ──────────────────────────────────────────────────────────────
if msg in ("米游社账号", "米游社账号列表", "查看账号", "我的账号"):
accounts = get_user_accounts(user_id)
return format_account_list(accounts)
# ── 解绑账号 ──────────────────────────────────────────────────────────────
if msg.startswith("米游社解绑"):
arg = msg[len("米游社解绑"):].strip()
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 未绑定任何账号"
if not arg:
if len(accounts) == 1:
uid = accounts[0]["uid"]
else:
lines = ["请指定要解绑的账号(发送序号或 UID):"]
for i, a in enumerate(accounts, 1):
lines.append(f"{i}. {a['nickname']} — {a['uid']}")
return "\n".join(lines)
elif arg.isdigit() and int(arg) <= 10:
idx = int(arg) - 1
if 0 <= idx < len(accounts):
uid = accounts[idx]["uid"]
else:
return f"❌ 序号 {arg} 超出范围(共 {len(accounts)} 个账号)"
else:
uid = arg
if remove_account(user_id, uid):
nick = next((a["nickname"] for a in accounts if a["uid"] == uid), uid)
return f"✅ 已解绑账号:{nick}(UID: {uid})"
return f"❌ 未找到 UID 为 {uid} 的绑定账号"
# ── 每日任务 ──────────────────────────────────────────────────────────────
if msg in ("米游社任务", "米游币任务", "每日任务"):
return await cmd_daily_tasks(user_id)
# ── 游戏签到 ──────────────────────────────────────────────────────────────
if msg.startswith("米游社签到"):
game_str = msg[len("米游社签到"):].strip()
game = GAME_ALIASES.get(game_str) if game_str else None
return await cmd_sign(user_id, game)
# ── 原神便笺 ──────────────────────────────────────────────────────────────
if msg in ("原神便笺", "原神实时便笺", "树脂"):
return await cmd_note(user_id, "genshin")
# ── 星铁便笺 ──────────────────────────────────────────────────────────────
if msg in ("星铁便笺", "星穹铁道便笺", "开拓力"):
return await cmd_note(user_id, "starrail")
# ── 商品列表 ──────────────────────────────────────────────────────────────
if msg.startswith("米游币商品"):
game_str = msg[len("米游币商品"):].strip()
game = GAME_ALIASES.get(game_str, "")
return await cmd_good_list(game)
# ── 兑换商品 ──────────────────────────────────────────────────────────────
if msg.startswith("米游社兑换"):
args = msg[len("米游社兑换"):].strip().split()
if not args:
return "用法: 米游社兑换 <商品ID> [数量]\n\n先用「米游币商品」查看商品 ID"
goods_id = args[0]
num = int(args[1]) if len(args) > 1 and args[1].isdigit() else 1
return await cmd_exchange(user_id, goods_id, num)
# ── 预约兑换(定时抢购)──────────────────────────────────────────────────
if msg.startswith("预约兑换"):
args = msg[len("预约兑换"):].strip().split()
if not args:
return "用法: 预约兑换 <商品ID>\n\n先用「米游币商品」查看商品 ID"
goods_id = args[0]
return await cmd_schedule_exchange(user_id, goods_id)
# ── 查看预约列表 ──────────────────────────────────────────────────────────
if msg in ("预约列表", "我的预约", "兑换预约"):
return cmd_list_scheduled_exchanges(user_id)
# ── 取消预约 ──────────────────────────────────────────────────────────────
if msg.startswith("取消预约"):
args = msg[len("取消预约"):].strip().split()
if not args:
return "用法: 取消预约 <商品ID>"
return cmd_cancel_scheduled_exchange(user_id, args[0])
# ── 链接账号(跨平台绑定)──────────────────────────────────────────────────
if msg in ("生成识别码", "生成绑定码", "创建识别码"):
return cmd_generate_link_code(user_id)
if msg.startswith("链接账号 ") or msg.startswith("绑定账号 "):
code = msg.split(maxsplit=1)[1].strip() if " " in msg else ""
return await cmd_link_account(user_id, code)
# ── 代理配置 ──────────────────────────────────────────────────────────────
if msg.startswith("设置代理 ") or msg.startswith("代理地址 "):
url = msg.split(maxsplit=1)[1].strip()
return cmd_set_proxy(url)
if msg in ("查看代理", "代理配置", "代理信息"):
return cmd_show_proxy()
# ── stoken 补全扫码 ───────────────────────────────────────────────────────
if msg in ("扫码补全stoken", "补全stoken", "stoken扫码"):
return await cmd_stoken_qr_login(user_id)
# ── 帮助 ──────────────────────────────────────────────────────────────────
if msg in ("米游社帮助", "mystool帮助", "mystool help"):
return HELP_TEXT
return None
# ── 指令实现 ───────────────────────────────────────────────────────────────────
async def cmd_sms_login_start(user_id: str) -> str:
"""
短信登录第一步:提示用户输入手机号
"""
save_sms_session(user_id, {"step": "await_phone"})
return (
"📱 短信登录\n"
"─" * 24 + "\n\n"
"请输入你的手机号(中国大陆):\n"
"格式:11位数字,如 13800138000"
)
async def cmd_sms_login_verify(user_id: str, code: str) -> str:
"""
处理手机号输入或验证码输入
"""
session = get_sms_session(user_id) or {}
step = session.get("step", "")
# 输入手机号
if step == "await_phone":
phone = code
if not re.match(r"^1\d{10}$", phone):
return "❌ 手机号格式错误,请输入 11 位数字手机号"
# 发送验证码
result = await sms_send_captcha(phone)
if not result["success"]:
return f"❌ 发送验证码失败:{result.get('message', '未知错误')}"
save_sms_session(user_id, {
"step": "await_captcha",
"phone": phone,
"action_type": result.get("action_type", "login"),
"device_id": result.get("device_id"),
"device_fp": result.get("device_fp"),
})
return (
f"✅ 验证码已发送到 {phone}\n"
f"请输入收到的 6 位验证码:"
)
# 输入验证码
if step == "await_captcha":
phone = session.get("phone", "")
action_type = session.get("action_type", "login")
device_id = session.get("device_id")
device_fp = session.get("device_fp")
if not phone:
clear_sms_session(user_id)
return "❌ 会话已过期,请重新发送「米游社登录」"
# 登录
result = await sms_login_by_captcha(
phone, code, action_type,
device_id=device_id, device_fp=device_fp
)
if not result["success"]:
clear_sms_session(user_id)
return f"❌ 登录失败:{result.get('message', '未知错误')}"
stoken = result.get("stoken", "")
stuid = result.get("stuid", "")
mid = result.get("mid", "")
if not stoken:
clear_sms_session(user_id)
return (
"⚠️ 登录成功但未获取到 stoken\n"
"请使用「扫码登录」或「Cookie 登录」方式"
)
# 换取完整 cookies
cookies = await sms_get_full_cookies(stoken, stuid, mid, device_id, device_fp)
clear_sms_session(user_id)
return await _finish_login(user_id, stuid, cookies)
# 没有活跃的短信会话,可能是其他验证码
return None
async def cmd_qr_login_start(user_id: str) -> str:
"""
发起扫码登录:生成二维码,然后后台自动轮询 60s
"""
clear_login_session(user_id)
ok, qr_url, ticket, device = await fetch_qrcode()
if not ok or not qr_url or not ticket:
return "❌ 获取二维码失败,请稍后重试"
save_login_session(user_id, ticket, qr_url, device)
qr_img_url = qrcode_to_image_url(qr_url)
lines = [
"📱 米游社扫码登录",
"─" * 24,
"",
f"🔗 [点击打开二维码图片]({qr_img_url})",
"(长按或保存图片后用米游社 App 扫码)",
"",
"📋 或复制以下链接在浏览器打开:",
f"`{qr_url}`",
"",
"⏳ 扫码并确认后自动完成登录(最多等待 60 秒)",
]
qr_msg = "\n".join(lines)
# 后台轮询
result = await _poll_qr_until_done(user_id, ticket, device, max_tries=60, interval=1)
if result:
return qr_msg + "\n\n" + result
else:
return qr_msg + "\n\n⏰ 等待超时,请重新发送「米游社登录」或使用其他方式"
async def _poll_qr_until_done(user_id: str, ticket: str, device: str,
max_tries: int = 60, interval: float = 1.0) -> Optional[str]:
"""
后台轮询二维码状态
"""
for _ in range(max_tries):
await asyncio.sleep(interval)
stat, raw = await query_qrcode(ticket, device)
if stat == "Confirmed" and raw:
uid = str(raw.get("uid", ""))
game_token = raw.get("token", "")
if not uid or not game_token:
clear_login_session(user_id)
return "❌ 登录数据异常,请重试"
ok, cookies = await game_token_to_cookies(uid, game_token)
if not ok or not cookies:
clear_login_session(user_id)
return "❌ Cookie 提取失败,请重试"
return await _finish_login(user_id, uid, cookies)
elif stat == "Expired":
clear_login_session(user_id)
return "❌ 二维码已过期,请重新发送「米游社登录」"
elif stat == "error":
return None
clear_login_session(user_id)
return None
async def cmd_qr_login_poll(user_id: str) -> str:
"""手动触发一次轮询"""
session = get_login_session(user_id)
if not session:
return "❌ 没有进行中的登录会话,请重新发送「米游社登录」"
stat, raw = await query_qrcode(session["ticket"], session.get("device", ""))
if stat == "Init":
return "⏳ 尚未扫码,请先用 App 扫描二维码"
elif stat == "Scanned":
return "📲 已扫码,请在 App 内点击「确认登录」"
elif stat == "Expired":
clear_login_session(user_id)
return "❌ 二维码已过期,请重新发送「米游社登录」"
elif stat == "Confirmed" and raw:
uid = str(raw.get("uid", ""))
game_token = raw.get("token", "")
ok, cookies = await game_token_to_cookies(uid, game_token)
if not ok or not cookies:
clear_login_session(user_id)
return "❌ Cookie 提取失败,请重试"
return await _finish_login(user_id, uid, cookies)
return "❌ 登录状态异常,请重新发送「米游社登录」"
async def cmd_stoken_qr_login(user_id: str) -> str:
"""
扫码补全 stoken(用于 Cookie 登录后 stoken 缺失的情况)
"""
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 未绑定任何账号,请先使用「米游社登录」绑定账号"
# 找到缺少 stoken 的账号
need_stoken = [a for a in accounts if not a.get("cookies", {}).get("stoken")]
if not need_stoken:
return "✅ 所有账号都已包含 stoken,无需补全"
# 目前只支持补全第一个缺少 stoken 的账号
target = need_stoken[0]
uid = target["uid"]
clear_login_session(user_id)
ok, qr_url, ticket, device = await fetch_qrcode()
if not ok:
return "❌ 获取二维码失败"
save_login_session(user_id, ticket, qr_url, device)
# 标记这是 stoken 补全会话
session = get_login_session(user_id)
if session:
session["mode"] = "stoken_patch"
session["target_uid"] = uid
save_login_session(user_id, ticket, qr_url, device)
qr_img_url = qrcode_to_image_url(qr_url)
return (
f"📱 扫码补全 stoken — 账号 {target.get('nickname', uid)}\n"
f"─" * 24 + "\n\n"
f"🔗 [点击打开二维码]({qr_img_url})\n"
f"用米游社App 扫码后,stoken 将自动补全到该账号\n\n"
f"⏳ 等待扫码确认(最多 60 秒)..."
)
async def cmd_bind(user_id: str, cookie_str: str) -> str:
"""
Cookie 登录:解析 → 验证 → 获取游戏账号 → 保存
如果缺少 stoken,提示用户扫码补全
"""
cookies = parse_cookie_string(cookie_str)
valid, msg = validate_cookies(cookies)
if not valid:
return f"❌ Cookie 无效\n{msg}"
uid = extract_uid_from_cookies(cookies)
if not uid:
return "❌ 无法从 Cookie 中提取用户 ID,请确认 Cookie 完整"
# 检查是否缺少 stoken
if not cookies.get("stoken"):
# 先保存当前 cookie,然后提示扫码补全
result = await _finish_login(user_id, uid, cookies)
return (
result + "\n\n"
"⚠️ 注意:当前 Cookie 缺少 stoken\n"
"部分功能可能受限,建议发送「扫码补全stoken」进行补全"
)
return await _finish_login(user_id, uid, cookies)
async def _finish_login(user_id: str, uid: str, cookies: dict) -> str:
"""
登录成功后:获取游戏账号、保存数据、返回成功消息
使用 getUserGameRolesByCookie 逐游戏查询,确保 game_biz 正确
"""
games = []
biz_name = {
"hk4e_cn": "原神",
"hkrpg_cn": "星穹铁道",
"bh3_cn": "崩坏3",
"bh2_cn": "崩坏学园2",
"nxx_cn": "未定事件簿",
"nap_cn": "绝区零",
}
for biz, gname in biz_name.items():
accounts = await get_account_list_by_game(cookies, biz)
for acc in accounts:
games.append({
"game_biz": biz,
"name": acc.get("nickname", gname),
"game_uid": acc.get("game_uid", ""),
"region": acc.get("region", ""),
"level": 0,
})
nickname = games[0]["name"] if games else uid
is_new = add_account(user_id, uid, nickname, cookies, games)
clear_login_session(user_id)
action = "绑定" if is_new else "更新"
ck_fields = [k for k in cookies if k not in
("account_id", "stuid", "ltuid", "login_uid",
"account_id_v2", "ltuid_v2")]
total = len(get_user_accounts(user_id))
lines = [
f"✅ 登录成功,账号已{action}!",
f"👤 昵称: {nickname}",
f"🆔 米游社 UID: {uid}",
f"🍪 Cookie: {', '.join(ck_fields) if ck_fields else '基础字段'}",
]
if games:
lines.append("🎮 游戏账号:")
for g in games:
biz = g.get("game_biz", "")
gname = biz_name.get(biz, g.get("name", biz))
lines.append(f" • {gname}: UID {g['game_uid']} Lv.{g['level']}")
lines.append(f"\n📊 当前共绑定 {total} 个米游社账号")
lines.append("发送「米游社账号」查看全部")
if not cookies.get("stoken"):
lines.append("\n⚠️ 缺少 stoken,建议发送「扫码补全stoken」进行补全")
return "\n".join(lines)
async def cmd_daily_tasks(user_id: str) -> str:
"""执行每日米游币任务"""
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 未绑定账号,请先发送「米游社登录」"
all_results = []
for acc in accounts:
cookies = acc["cookies"]
nickname = acc.get("nickname", acc["uid"])
games = acc.get("games", [])
game = "genshin"
if games:
biz = games[0].get("game_biz", "hk4e_cn")
for k, v in GAME_BIZ.items():
if v == biz:
game = k
break
results = await do_bbs_tasks(cookies, game)
all_results.append(format_task_result(results, nickname))
return "\n\n".join(all_results)
# 根据 region 推断游戏类型
REGION_TO_GAME = {
# 原神
"cn_gf01": "genshin",
"cn_qd01": "genshin",
# 星穹铁道
"prod_gf_cn": "starrail",
"prod_qd_cn": "starrail",
"hun02": "starrail",
# 崩坏3
"cn01": "honkai3",
# 崩坏学园2
"bh2_cn01": "honkai2",
# 未定事件簿
"nxx_cn01": "tearsofthemis",
# 绝区零
"nap_cn": "zzz",
"nap_gf_cn": "zzz",
}
async def cmd_sign(user_id: str, target_game: Optional[str] = None) -> str:
"""
执行游戏签到(仅国服)
使用 getUserGameRolesByCookie API 逐游戏查询账号,解决 game_biz 为空导致漏签的问题
"""
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 未绑定账号,请先发送「米游社登录」"
# 确定要签到的游戏列表
if target_game:
games_to_sign = [target_game]
else:
games_to_sign = list(GAME_BIZ.keys())
sign_results = []
for acc in accounts:
cookies = acc["cookies"]
nickname = acc.get("nickname", acc["uid"])
for game in games_to_sign:
game_biz = GAME_BIZ.get(game)
if not game_biz:
continue
# 用 game_biz 查询该游戏的账号列表
game_accounts = await get_account_list_by_game(cookies, game_biz)
if not game_accounts:
# 没有该游戏账号,跳过
continue
for ga in game_accounts:
uid = ga.get("game_uid", "")
region = ga.get("region", "")
acc_name = ga.get("nickname", nickname)
if not uid:
continue
ok, msg = await game_sign(cookies, game, region, uid=uid)
sign_results.append({
"success": ok,
"msg": f"[{acc_name}] {msg}",
})
if not sign_results:
return "❌ 未找到任何游戏账号,请重新登录绑定"
return format_sign_results(sign_results)
async def cmd_note(user_id: str, game: str) -> str:
"""查询实时便笺"""
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 未绑定账号,请先发送「米游社登录」"
acc = accounts[0]
cookies = acc["cookies"]
nickname = acc.get("nickname", acc["uid"])
# 用 getUserGameRolesByCookie 查找游戏 UID
game_biz = GAME_BIZ.get(game, "")
game_uid = None
region = GAME_REGIONS.get(game, "cn_gf01")
if game_biz:
game_accounts = await get_account_list_by_game(cookies, game_biz)
if game_accounts:
ga = game_accounts[0]
game_uid = ga.get("game_uid")
region = ga.get("region") or region
if not game_uid:
return f"❌ 未找到 {SIGN_APIS.get(game, {}).get('name', game)} 的游戏账号"
if game == "genshin":
ok, data = await get_genshin_note(cookies, game_uid, region)
if not ok or not data:
return "❌ 获取原神便笺失败,请检查 Cookie 是否有效"
return format_genshin_note(data, nickname)
elif game == "starrail":
ok, data = await get_starrail_note(cookies, game_uid, region)
if not ok or not data:
return "❌ 获取星穹铁道便笺失败,请检查 Cookie 是否有效"
return format_starrail_note(data, nickname)
return f"❌ 暂不支持 {game} 的便笺查询"
async def cmd_good_list(game: str = "") -> str:
"""查询米游币商品列表"""
ok, goods = await get_good_list(game)
if not ok or goods is None:
return "❌ 获取商品列表失败,请稍后重试"
return format_good_list(goods)
async def cmd_exchange(user_id: str, goods_id: str, num: int = 1) -> str:
"""兑换米游币商品"""
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 未绑定账号,请先发送「米游社登录」"
acc = accounts[0]
cookies = acc["cookies"]
games = acc.get("games", [])
if not games:
return "❌ 未找到游戏账号信息,请重新绑定"
g = games[0]
game_biz = g.get("game_biz", "hk4e_cn")
game_uid = g.get("game_uid", "")
region = g.get("region", "cn_gf01")
# 获取地址列表
ok_addr, addresses = await get_address_list(cookies)
address_id = None
if ok_addr and addresses:
# 优先使用默认地址
for addr in addresses:
if addr.get("is_default"):
address_id = addr.get("id")
break
# 没有默认地址则用第一个
if not address_id and addresses:
address_id = addresses[0].get("id")
ok, msg, next_time = await do_exchange(
cookies=cookies,
goods_id=goods_id,
uid=game_uid,
region=region,
game_biz=game_biz,
address_id=address_id,
num=num,
)
result = ("✅ " if ok else "❌ ") + msg
# 库存不足时提示设置定时抢购
if not ok and next_time and next_time > 0:
from datetime import datetime
next_dt = datetime.fromtimestamp(next_time)
result += f"\n\n⏰ 下次补货时间: {next_dt.strftime('%Y-%m-%d %H:%M:%S')}"
result += f"\n发送「预约兑换 {goods_id}」设置定时抢购"
return result
# ── 帮助文本 ───────────────────────────────────────────────────────────────────
HELP_TEXT = """🎮 米游社工具 (mystool)
─────────────────────────
🔐 账号管理
米游社登录 登录(短信/扫码/Cookie 三选一)✨
扫码补全stoken Cookie 登录后补全 stoken
米游社账号 查看已绑定账号
米游社解绑 [uid] 解绑账号
📅 每日任务
米游社任务 执行米游币每日任务
米游社签到 签到所有游戏
📊 游戏签到(国服)
米游社签到 原神 原神签到
米游社签到 星铁 星穹铁道签到
米游社签到 崩3 崩坏3签到
米游社签到 崩2 崩坏学园2签到
米游社签到 未定 未定事件簿签到
米游社签到 绝区零 绝区零签到
📋 实时便笺
原神便笺 查看原神树脂等状态
星铁便笺 查看星铁开拓力等状态
🛒 商品兑换
米游币商品 [游戏] 查看商品列表
米游社兑换 <ID> [数量] 兑换商品
预约兑换 <商品ID> 预约定时抢购
预约列表 查看全部预约
取消预约 <商品ID> 取消预约
🔗 跨平台绑定
生成识别码 获取跨平台绑定识别码
链接账号 <识别码> 使用识别码链接账号
🔧 代理配置
设置代理 <URL> 设置 IP 池代理地址
查看代理 查看代理配置信息
─────────────────────────
定时任务每天 07:20 自动执行"""
# ── 预约兑换存储 ───────────────────────────────────────────────────────────────
EXCHANGE_SCHEDULES_FILE = Path(__file__).parent / "data" / "exchange_schedules.json"
def _load_exchange_schedules() -> Dict[str, List[dict]]:
"""加载预约兑换列表"""
if not EXCHANGE_SCHEDULES_FILE.exists():
return {}
try:
with open(EXCHANGE_SCHEDULES_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_exchange_schedules(data: Dict[str, List[dict]]):
"""保存预约兑换列表"""
EXCHANGE_SCHEDULES_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(EXCHANGE_SCHEDULES_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# ── 代理配置指令 ───────────────────────────────────────────────────────────────
def cmd_set_proxy(url: str) -> str:
"""设置代理 API 地址"""
import json
from pathlib import Path
config_file = Path(__file__).parent / "data" / "proxy_config.json"
# 读取现有配置
config = {}
if config_file.exists():
try:
with open(config_file, "r", encoding="utf-8") as f:
config = json.load(f)
except Exception:
pass
# 更新 API 地址
config["api_url"] = url
config["last_fetch_time"] = 0
config["fetch_count_hour"] = 0
config["hour_start"] = 0
# 保存
config_file.parent.mkdir(parents=True, exist_ok=True)
with open(config_file, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
# 脱敏显示
masked = url[:30] + "..." if len(url) > 30 else url
return (
f"✅ 代理地址已更新\n"
f"────────────────────────\n"
f"🔗 {masked}\n"
f"📊 每小时限制: {config.get('max_per_hour', 20)} 次\n"
f"⏳ 冷却时间: {config.get('cooldown_seconds', 30)} 秒"
)
def cmd_show_proxy() -> str:
"""显示代理配置"""
import json
from pathlib import Path
config_file = Path(__file__).parent / "data" / "proxy_config.json"
if not config_file.exists():
return "❌ 未配置代理\n发送「设置代理 <URL>」进行配置"
try:
with open(config_file, "r", encoding="utf-8") as f:
config = json.load(f)
except Exception:
return "❌ 配置文件读取失败"
api_url = config.get("api_url", "")
if not api_url:
return "❌ 未配置代理\n发送「设置代理 <URL>」进行配置"
# 脱敏显示 URL
masked = api_url[:30] + "..." if len(api_url) > 30 else api_url
import time
now = time.time()
hour_start = config.get("hour_start", 0)
fetch_count = config.get("fetch_count_hour", 0)
max_per_hour = config.get("max_per_hour", 20)
if now - hour_start > 3600:
remaining = max_per_hour
else:
remaining = max_per_hour - fetch_count
return (
f"🔧 代理配置\n"
f"────────────────────────\n"
f"🔗 地址: {masked}\n"
f"📊 本小时剩余: {remaining}/{max_per_hour} 次\n"
f"⏳ 冷却时间: {config.get('cooldown_seconds', 30)} 秒"
)
def cmd_generate_link_code(user_id: str) -> str:
"""生成跨平台绑定识别码"""
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 你还没有绑定米游社账号\n请先发送「米游社登录」绑定账号"
code = generate_link_code(user_id)
return (
"🔗 跨平台账号绑定\n"
"────────────────────────\n"
f"你的识别码: {code}\n"
"────────────────────────\n"
"📋 使用方法:\n"
"1. 在另一个平台打开与我的对话\n"
"2. 发送「链接账号 {识别码}」\n"
"3. 自动共享米游社账号数据\n\n"
"⚠️ 识别码有效期 10 分钟"
)
async def cmd_link_account(user_id: str, code: str) -> str:
"""使用识别码链接账号"""
if not code:
return "❌ 请提供识别码\n格式:链接账号 XXXXXX"
# 验证识别码
source_user_id = verify_link_code(code)
if not source_user_id:
return "❌ 识别码无效或已过期\n请重新获取识别码"
# 不能链接自己
if source_user_id == user_id:
return "❌ 不能链接自己账号"
# 合并账号
if merge_user_accounts(source_user_id, user_id):
return (
"✅ 账号链接成功!\n"
"────────────────────────\n"
"已共享米游社账号数据\n"
"现在可以使用签到、便笺等功能了"
)
else:
return "❌ 链接失败,原账号没有绑定米游社账号"
async def cmd_schedule_exchange(user_id: str, goods_id: str) -> str:
"""预约定时抢购"""
accounts = get_user_accounts(user_id)
if not accounts:
return "❌ 未绑定账号,请先发送「米游社登录」"
# 获取商品详情
ok, detail = await get_good_detail(goods_id)
if not ok or not detail:
return f"❌ 获取商品 {goods_id} 详情失败"
next_time = detail.get("next_time", 0)
next_num = detail.get("next_num", 0)
goods_name = detail.get("name") or detail.get("goods_name", "未知商品")
if next_time <= 0:
return f"❌ 商品「{goods_name}」暂无补货计划"
from datetime import datetime
next_dt = datetime.fromtimestamp(next_time)
# 保存预约
schedules = _load_exchange_schedules()
user_schedules = schedules.setdefault(user_id, [])
# 检查是否已预约
for s in user_schedules:
if s.get("goods_id") == goods_id:
return f"❗ 已预约过此商品\n商品: {goods_name}\n时间: {next_dt.strftime('%Y-%m-%d %H:%M:%S')}"
user_schedules.append({
"goods_id": goods_id,
"goods_name": goods_name,
"next_time": next_time,
"next_num": next_num,
"created_at": time.time(),
})
_save_exchange_schedules(schedules)
# 创建定时任务(使用 OpenClaw cron)
# 这里先返回提示,实际 cron 任务需要通过 OpenClaw 的 cron 工具创建
return (
f"✅ 预约成功!\n"
f"────────────────────────\n"
f"🛒 商品: {goods_name}\n"
f"📦 补货数量: {next_num}\n"
f"⏰ 开抢时间: {next_dt.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"────────────────────────\n"
f"💡 到时间会自动尝试兑换\n"
f"发送「预约列表」查看全部预约"
)
def cmd_list_scheduled_exchanges(user_id: str) -> str:
"""查看预约列表"""
schedules = _load_exchange_schedules()
user_schedules = schedules.get(user_id, [])
if not user_schedules:
return "📭 暂无预约兑换\n\n发送「预约兑换 <商品ID>」预约抢购"
from datetime import datetime
lines = ["📋 预约兑换列表", "─" * 24]
for i, s in enumerate(user_schedules, 1):
next_dt = datetime.fromtimestamp(s.get("next_time", 0))
lines.append(f"\n{i}. {s.get('goods_name', '未知商品')}")
lines.append(f" ID: {s.get('goods_id')}")
lines.append(f" 补货: {s.get('next_num', 0)} 件")
lines.append(f" 时间: {next_dt.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f" 取消: 取消预约 {s.get('goods_id')}")
return "\n".join(lines)
def cmd_cancel_scheduled_exchange(user_id: str, goods_id: str) -> str:
"""取消预约"""
schedules = _load_exchange_schedules()
user_schedules = schedules.get(user_id, [])
before = len(user_schedules)
user_schedules[:] = [s for s in user_schedules if s.get("goods_id") != goods_id]
if len(user_schedules) < before:
schedules[user_id] = user_schedules
_save_exchange_schedules(schedules)
return f"✅ 已取消预约: {goods_id}"
return f"❌ 未找到预约: {goods_id}"
# ── 定时任务入口(供 cron 调用)────────────────────────────────────────────────
async def run_daily_tasks_all() -> str:
"""对所有绑定用户执行每日任务(cron 调用),返回汇总结果。同账号去重。"""
from store import load_accounts
all_data = load_accounts()
if not all_data:
return "⚠️ 米游社每日任务:没有绑定账号的用户,跳过执行。"
# 收集所有唯一的米游社账号(按 UID 去重)
seen_uids = set()
unique_accounts = []
for user_id, user_data in all_data.items():
for acc in user_data.get("accounts", []):
uid = acc.get("uid", "")
if uid and uid not in seen_uids:
seen_uids.add(uid)
unique_accounts.append(acc)
if not unique_accounts:
return "⚠️ 米游社每日任务:没有绑定账号的用户,跳过执行。"
# 执行任务
results = []
for acc in unique_accounts:
nickname = acc.get("nickname", acc["uid"])
cookies = acc["cookies"]
games = acc.get("games", [])
# 选择游戏(默认用 games 列表中第一个 game_biz)
game = "genshin"
if games:
biz = games[0].get("game_biz", "hk4e_cn")
for k, v in GAME_BIZ.items():
if v == biz:
game = k
break
result = await do_bbs_tasks(cookies, game)
results.append(format_task_result(result, nickname))
return "\n\n".join(results)
async def run_sign_all() -> str:
"""对所有绑定用户执行游戏签到(cron 调用),返回汇总结果。同账号去重。"""
from store import load_accounts
all_data = load_accounts()
if not all_data:
return "⚠️ 米游社游戏签到:没有绑定账号的用户,跳过执行。"
# 收集所有唯一的米游社账号(按 UID 去重)
seen_uids = set()
unique_accounts = []
for user_id, user_data in all_data.items():
for acc in user_data.get("accounts", []):
uid = acc.get("uid", "")
if uid and uid not in seen_uids:
seen_uids.add(uid)
unique_accounts.append(acc)
if not unique_accounts:
return "⚠️ 米游社游戏签到:没有绑定账号的用户,跳过执行。"
# 执行签到
results = []
for acc in unique_accounts:
nickname = acc.get("nickname", acc["uid"])
cookies = acc["cookies"]
# 遍历所有游戏签到
for game, game_biz in GAME_BIZ.items():
game_accounts = await get_account_list_by_game(cookies, game_biz)
if not game_accounts:
continue
for ga in game_accounts:
uid = ga.get("game_uid", "")
region = ga.get("region", "")
if not uid:
continue
ok, msg = await game_sign(cookies, game, region, uid=uid)
results.append({"success": ok, "msg": f"[{nickname}] {msg}"})
return format_sign_results(results)
# ── CLI 入口(调试用)─────────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
async def main():
if len(sys.argv) < 2:
print("用法: python plugin.py <指令>")
print("示例: python plugin.py 米游社帮助")
return
msg = " ".join(sys.argv[1:])
result = await handle(msg, user_id="cli_user")
print(result or "(无响应)")
asyncio.run(main())
FILE:src/store.py
"""
账号数据管理 — 读写 accounts.json
"""
import json
import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple
DATA_DIR = Path(__file__).parent.parent / "data"
ACCOUNTS_FILE = DATA_DIR / "accounts.json"
def _ensure_dir():
DATA_DIR.mkdir(parents=True, exist_ok=True)
def load_accounts() -> Dict[str, dict]:
"""
加载所有账号数据。
结构: { user_id: { "accounts": [ { "uid", "nickname", "cookies", "games": [...] } ] } }
"""
_ensure_dir()
if not ACCOUNTS_FILE.exists():
return {}
try:
with open(ACCOUNTS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def save_accounts(data: Dict[str, dict]):
"""保存账号数据"""
_ensure_dir()
with open(ACCOUNTS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get_user_accounts(user_id: str) -> List[dict]:
"""获取某用户绑定的所有米游社账号"""
data = load_accounts()
return data.get(user_id, {}).get("accounts", [])
def add_account(user_id: str, uid: str, nickname: str, cookies: Dict[str, str], games: List[dict]) -> bool:
"""
添加或更新账号绑定
:return: True=新增, False=更新
"""
data = load_accounts()
user_data = data.setdefault(user_id, {"accounts": []})
accounts = user_data["accounts"]
# 检查是否已存在
for acc in accounts:
if acc["uid"] == uid:
acc["nickname"] = nickname
acc["cookies"] = cookies
acc["games"] = games
save_accounts(data)
return False
accounts.append({
"uid": uid,
"nickname": nickname,
"cookies": cookies,
"games": games,
})
save_accounts(data)
return True
def remove_account(user_id: str, uid: str) -> bool:
"""删除账号绑定"""
data = load_accounts()
user_data = data.get(user_id)
if not user_data:
return False
before = len(user_data["accounts"])
user_data["accounts"] = [a for a in user_data["accounts"] if a["uid"] != uid]
if len(user_data["accounts"]) < before:
save_accounts(data)
return True
return False
def get_all_user_ids() -> List[str]:
"""获取所有有绑定账号的用户 ID"""
return list(load_accounts().keys())
# 标准米游社 Cookie 字段(过滤非标准 key)
STANDARD_COOKIE_KEYS = {
"account_id", "account_id_v2", "account_mid_v2",
"ltoken", "ltoken_v2", "ltmid_v2", "ltuid", "ltuid_v2",
"cookie_token", "cookie_token_v2",
"mid", "stoken", "stoken_v2", "stuid", "stuid_v2",
"login_uid", "MHYUUID", "DEVICEFP", "DEVICEFP_SEED_ID",
"DEVICEFP_SEED_TIME", "MIHOYO_LOGIN_PLATFORM_LIFECYCLE_ID",
}
def parse_cookie_string(cookie_str: str) -> Dict[str, str]:
"""
将 Cookie 字符串解析为字典
支持格式: "key1=val1; key2=val2" 或 "key1=val1\nkey2=val2"
自动过滤非标准字段(如中文 key)
"""
cookies = {}
# 统一分隔符
parts = cookie_str.replace("\n", ";").split(";")
for part in parts:
part = part.strip()
if "=" in part:
k, _, v = part.partition("=")
key = k.strip()
# 过滤非 ASCII 字符的 key
if key and all(ord(c) < 128 for c in key):
cookies[key] = v.strip()
return cookies
def extract_uid_from_cookies(cookies: Dict[str, str]) -> Optional[str]:
"""从 Cookie 中提取用户 UID"""
for key in ["account_id", "ltuid", "stuid", "login_uid"]:
if cookies.get(key):
return cookies[key]
return None
def validate_cookies(cookies: Dict[str, str]) -> Tuple[bool, str]:
"""
验证 Cookie 是否包含必要字段
返回 (是否有效, 提示信息)
"""
has_basic = bool(cookies.get("cookie_token") and cookies.get("account_id"))
has_stoken = bool(cookies.get("stoken") and (cookies.get("stuid") or cookies.get("account_id")))
if not has_basic and not has_stoken:
return False, (
"Cookie 缺少必要字段。\n"
"需要包含以下之一:\n"
"• `cookie_token` + `account_id`\n"
"• `stoken` + `stuid`"
)
return True, "OK"
# ── 登录会话状态持久化 ─────────────────────────────────────────────────────────
# 用文件存储跨消息的扫码会话,key = user_id
LOGIN_SESSIONS_FILE = DATA_DIR / "login_sessions.json"
def save_login_session(user_id: str, ticket: str, qr_url: str, device: str = None):
"""保存扫码登录会话(ticket + device + 时间戳)"""
_ensure_dir()
sessions = _load_login_sessions()
sessions[user_id] = {
"ticket": ticket,
"qr_url": qr_url,
"device": device,
"created_at": time.time(),
}
with open(LOGIN_SESSIONS_FILE, "w", encoding="utf-8") as f:
json.dump(sessions, f, ensure_ascii=False, indent=2)
def get_login_session(user_id: str) -> Optional[Dict]:
"""获取用户的扫码登录会话,超过3分钟自动失效"""
sessions = _load_login_sessions()
s = sessions.get(user_id)
if not s:
return None
if time.time() - s.get("created_at", 0) > 180:
clear_login_session(user_id)
return None
return s
def clear_login_session(user_id: str):
"""清除登录会话"""
sessions = _load_login_sessions()
sessions.pop(user_id, None)
_ensure_dir()
with open(LOGIN_SESSIONS_FILE, "w", encoding="utf-8") as f:
json.dump(sessions, f, ensure_ascii=False, indent=2)
def _load_login_sessions() -> Dict:
if not LOGIN_SESSIONS_FILE.exists():
return {}
try:
with open(LOGIN_SESSIONS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
# ── 短信登录会话状态持久化 ───────────────────────────────────────────────────────
SMS_SESSIONS_FILE = DATA_DIR / "sms_sessions.json"
def save_sms_session(user_id: str, session: dict):
"""保存短信登录会话"""
_ensure_dir()
sessions = _load_sms_sessions()
session["created_at"] = session.get("created_at", time.time())
sessions[user_id] = session
with open(SMS_SESSIONS_FILE, "w", encoding="utf-8") as f:
json.dump(sessions, f, ensure_ascii=False, indent=2)
def get_sms_session(user_id: str) -> Optional[Dict]:
"""获取用户的短信登录会话,超过 5 分钟自动失效"""
sessions = _load_sms_sessions()
s = sessions.get(user_id)
if not s:
return None
if time.time() - s.get("created_at", 0) > 300:
clear_sms_session(user_id)
return None
return s
def clear_sms_session(user_id: str):
"""清除短信登录会话"""
sessions = _load_sms_sessions()
sessions.pop(user_id, None)
_ensure_dir()
with open(SMS_SESSIONS_FILE, "w", encoding="utf-8") as f:
json.dump(sessions, f, ensure_ascii=False, indent=2)
def _load_sms_sessions() -> Dict:
if not SMS_SESSIONS_FILE.exists():
return {}
try:
with open(SMS_SESSIONS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
import time # noqa: E402 (放末尾避免循环)
# ── 跨平台用户识别码 ─────────────────────────────────────────────────────────
LINK_CODES_FILE = DATA_DIR / "link_codes.json"
def generate_link_code(user_id: str) -> str:
"""
生成用户识别码(6位字母数字)
用于跨平台账号绑定
"""
import random
import string
_ensure_dir()
# 加载现有识别码
codes = _load_link_codes()
# 生成新识别码
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
# 存储识别码映射
codes[code] = {
"user_id": user_id,
"created_at": time.time(),
}
with open(LINK_CODES_FILE, "w", encoding="utf-8") as f:
json.dump(codes, f, ensure_ascii=False, indent=2)
return code
def verify_link_code(code: str) -> Optional[str]:
"""
验证识别码,返回原用户ID
识别码有效期10分钟
"""
codes = _load_link_codes()
code_data = codes.get(code.upper())
if not code_data:
return None
# 检查是否过期(10分钟)
if time.time() - code_data.get("created_at", 0) > 600:
# 删除过期识别码
codes.pop(code.upper(), None)
_save_link_codes(codes)
return None
return code_data.get("user_id")
def _load_link_codes() -> Dict:
if not LINK_CODES_FILE.exists():
return {}
try:
with open(LINK_CODES_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_link_codes(codes: Dict):
_ensure_dir()
with open(LINK_CODES_FILE, "w", encoding="utf-8") as f:
json.dump(codes, f, ensure_ascii=False, indent=2)
def merge_user_accounts(source_user_id: str, target_user_id: str) -> bool:
"""
合并两个平台用户的账号数据
source_user_id: 原用户(有米游社账号)
target_user_id: 目标用户(要共享账号)
"""
data = load_accounts()
source_accounts = data.get(source_user_id, {}).get("accounts", [])
target_accounts = data.get(target_user_id, {}).get("accounts", [])
if not source_accounts:
return False
# 合并账号数据(目标用户复制源用户的账号)
# 如果目标用户已有相同 UID 的账号,更新 cookies 和 games
merged_accounts = []
for src_acc in source_accounts:
src_uid = src_acc["uid"]
merged = False
for tgt_acc in target_accounts:
if tgt_acc["uid"] == src_uid:
# 合并 cookies(取字段更多的版本)
merged_cookies = {**tgt_acc.get("cookies", {}), **src_acc.get("cookies", {})}
# 合并 games(取非空的版本)
merged_games = src_acc.get("games", []) or tgt_acc.get("games", [])
tgt_acc["cookies"] = merged_cookies
tgt_acc["games"] = merged_games
tgt_acc["nickname"] = src_acc.get("nickname", tgt_acc.get("nickname"))
merged = True
break
if not merged:
# 目标用户没有此账号,添加
target_accounts.append({
"uid": src_acc["uid"],
"nickname": src_acc.get("nickname", ""),
"cookies": src_acc.get("cookies", {}),
"games": src_acc.get("games", []),
})
data.setdefault(target_user_id, {"accounts": target_accounts})
data[target_user_id]["accounts"] = target_accounts
save_accounts(data)
return True
# ── 跨平台用户识别码 ─────────────────────────────────────────────────────────
LINK_CODES_FILE = DATA_DIR / "link_codes.json"
def generate_link_code(user_id: str) -> str:
"""
生成用户识别码(6位字母数字)
用于跨平台账号绑定
"""
import random
import string
_ensure_dir()
# 加载现有识别码
codes = _load_link_codes()
# 生成新识别码
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
# 存储识别码映射
codes[code] = {
"user_id": user_id,
"created_at": time.time(),
}
with open(LINK_CODES_FILE, "w", encoding="utf-8") as f:
json.dump(codes, f, ensure_ascii=False, indent=2)
return code
def verify_link_code(code: str) -> Optional[str]:
"""
验证识别码,返回原用户ID
识别码有效期10分钟
"""
codes = _load_link_codes()
code_data = codes.get(code.upper())
if not code_data:
return None
# 检查是否过期(10分钟)
if time.time() - code_data.get("created_at", 0) > 600:
# 删除过期识别码
codes.pop(code.upper(), None)
_save_link_codes(codes)
return None
return code_data.get("user_id")
def _load_link_codes() -> Dict:
if not LINK_CODES_FILE.exists():
return {}
try:
with open(LINK_CODES_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_link_codes(codes: Dict):
_ensure_dir()
with open(LINK_CODES_FILE, "w", encoding="utf-8") as f:
json.dump(codes, f, ensure_ascii=False, indent=2)
def merge_user_accounts(source_user_id: str, target_user_id: str) -> bool:
"""
合并两个平台用户的账号数据
source_user_id: 原用户(有米游社账号)
target_user_id: 目标用户(要共享账号)
"""
data = load_accounts()
source_accounts = data.get(source_user_id, {}).get("accounts", [])
target_accounts = data.get(target_user_id, {}).get("accounts", [])
if not source_accounts:
return False
# 合并账号数据(目标用户复制源用户的账号)
# 如果目标用户已有相同 UID 的账号,更新 cookies 和 games
for src_acc in source_accounts:
src_uid = src_acc["uid"]
merged = False
for tgt_acc in target_accounts:
if tgt_acc["uid"] == src_uid:
# 合并 cookies(取字段更多的版本)
merged_cookies = {**tgt_acc.get("cookies", {}), **src_acc.get("cookies", {})}
# 合并 games(取非空的版本)
merged_games = src_acc.get("games", []) or tgt_acc.get("games", [])
tgt_acc["cookies"] = merged_cookies
tgt_acc["games"] = merged_games
tgt_acc["nickname"] = src_acc.get("nickname", tgt_acc.get("nickname"))
merged = True
break
if not merged:
# 目标用户没有此账号,添加
target_accounts.append({
"uid": src_acc["uid"],
"nickname": src_acc.get("nickname", ""),
"cookies": src_acc.get("cookies", {}),
"games": src_acc.get("games", []),
})
data.setdefault(target_user_id, {"accounts": target_accounts})
data[target_user_id]["accounts"] = target_accounts
save_accounts(data)
return True
FILE:src/sms_login.py
# -*- coding: utf-8 -*-
"""
米游社短信登录模块(OpenClaw 异步适配版)
参考: https://github.com/NuoManDai/mihoyo_sms_login
流程:
1. sms_send_captcha(phone) → 发送短信验证码,返回 action_type
2. sms_login_by_captcha(phone, captcha, action_type)
→ 登录,返回 stoken + stuid + mid
3. sms_get_full_cookies(stoken, stuid, mid)
→ 换取 ltoken + cookie_token,返回完整 cookies dict
依赖: httpx, pycryptodome (或 rsa)
"""
import asyncio
import base64
import hashlib
import json
import os
import random
import string
import time
import uuid
from typing import Any, Dict, Optional, Tuple
from pathlib import Path
import httpx
# ── 代理配置 ──────────────────────────────────────────────────────────────────
DATA_DIR = Path(__file__).parent.parent / "data"
PROXY_CONFIG_FILE = DATA_DIR / "proxy_config.json"
def _load_proxy_config() -> Dict:
"""加载代理配置"""
if not PROXY_CONFIG_FILE.exists():
return {}
try:
with open(PROXY_CONFIG_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _save_proxy_config(config: Dict):
"""保存代理配置"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
with open(PROXY_CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
def _get_proxy() -> str:
"""
获取代理 IP(带次数限制)
每小时最多提取 max_per_hour 次,每次提取间隔 cooldown_seconds 秒
"""
config = _load_proxy_config()
api_url = config.get("api_url", "")
if not api_url:
return ""
now = time.time()
max_per_hour = config.get("max_per_hour", 20)
cooldown = config.get("cooldown_seconds", 30)
# 检查冷却时间
last_fetch = config.get("last_fetch_time", 0)
if now - last_fetch < cooldown:
return "" # 冷却中,跳过
# 检查每小时限制
hour_start = config.get("hour_start", 0)
if now - hour_start > 3600:
# 新的一小时,重置计数
config["hour_start"] = now
config["fetch_count_hour"] = 0
if config.get("fetch_count_hour", 0) >= max_per_hour:
return "" # 超过每小时限制
try:
# 获取代理 IP
import urllib.request
req = urllib.request.Request(api_url, method="GET")
with urllib.request.urlopen(req, timeout=10) as resp:
proxy_ip = resp.read().decode().strip()
if proxy_ip:
config["last_fetch_time"] = now
config["fetch_count_hour"] = config.get("fetch_count_hour", 0) + 1
_save_proxy_config(config)
return f"http://{proxy_ip}"
except Exception:
pass
return ""
# ── RSA 加密 ──────────────────────────────────────────────────────────────────
PUB_KEY_PEM = """-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDDvekdPMHN3AYhm/vktJT+YJr7
cI5DcsNKqdsx5DZX0gDuWFuIjzdwButrIYPNmRJ1G8ybDIF7oDW2eEpm5sMbL9zs
9ExXCdvqrn51qELbqj0XxtMTIpaCHFSI50PfPpTFV9Xt/hmyVwokoOXFlAEgCn+Q
CgGs52bFoYMtyi+xEQIDAQAB
-----END PUBLIC KEY-----"""
def _rsa_encrypt(plaintext: str) -> str:
"""RSA 加密(PKCS1_v1_5),返回 base64 字符串"""
try:
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5
key = RSA.import_key(PUB_KEY_PEM)
cipher = PKCS1_v1_5.new(key)
return base64.b64encode(cipher.encrypt(plaintext.encode())).decode()
except ImportError:
pass
try:
import rsa as rsa_lib
pub = rsa_lib.PublicKey.load_pkcs1_openssl_pem(PUB_KEY_PEM.encode())
return base64.b64encode(rsa_lib.encrypt(plaintext.encode(), pub)).decode()
except ImportError:
raise RuntimeError("需要安装 pycryptodome 或 rsa: pip install pycryptodome")
# ── 常量 ──────────────────────────────────────────────────────────────────────
BBS_VERSION = "2.102.1"
BBS_UA = f"Mozilla/5.0 (Linux; Android 12) Mobile miHoYoBBS/{BBS_VERSION}"
PASSPORT_BASE = "https://passport-api.miyoushe.com/"
CREATE_CAPTCHA_URL = f"{PASSPORT_BASE}account/ma-cn-verifier/verifier/createLoginCaptcha"
LOGIN_APP_URL = f"{PASSPORT_BASE}account/ma-cn-passport/app/loginByMobileCaptcha"
LOGIN_WEB_URL = f"{PASSPORT_BASE}account/ma-cn-passport/web/loginByMobileCaptcha"
TAKUMI_BASE = "https://passport-api.mihoyo.com/"
GET_LTOKEN_URL = f"{TAKUMI_BASE}account/auth/api/getLTokenBySToken"
GET_COOKIE_TOKEN_URL = f"{TAKUMI_BASE}account/auth/api/getCookieAccountInfoBySToken"
APP_ID = "bll8iq97cem8"
# DS salts
DS_SALT_CN_SIGNIN = "LyD1rXqMv2GJhnwdvCBjFOKGiKuLY3aO"
DS_SALT_CN_PASSPORT = "JwYDpKvLj6MrMqqYU6jTKF17KNO2PXoS"
DS_SALT_X4 = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
# ── DS 签名 ───────────────────────────────────────────────────────────────────
def _ds_simple(salt: str = DS_SALT_CN_SIGNIN) -> str:
t = str(int(time.time()))
r = "".join(random.choices(string.ascii_letters, k=6))
h = hashlib.md5(f"salt={salt}&t={t}&r={r}".encode()).hexdigest()
return f"{t},{r},{h}"
def _ds_passport(body_dict: dict) -> str:
t = str(int(time.time()))
r = "".join(random.choices(string.ascii_letters, k=6))
b = json.dumps(body_dict)
h = hashlib.md5(f"salt={DS_SALT_CN_PASSPORT}&t={t}&r={r}&b={b}&q=".encode()).hexdigest()
return f"{t},{r},{h}"
def _ds_x4(query: str = "") -> str:
t = str(int(time.time()))
r = str(random.randint(100000, 200000))
h = hashlib.md5(f"salt={DS_SALT_X4}&t={t}&r={r}&b=&q={query}".encode()).hexdigest()
return f"{t},{r},{h}"
def _device_id() -> str:
return str(uuid.uuid4())
def _device_fp() -> str:
return "".join(random.choices("0123456789abcdef", k=13))
# ── 请求头 ────────────────────────────────────────────────────────────────────
def _sms_headers(ds: str, device_id: str, device_fp: str, aigis: str = "") -> Dict[str, str]:
return {
"x-rpc-app_id": APP_ID,
"x-rpc-client_type": "4",
"x-rpc-source": "v2.webLogin",
"x-rpc-sdk_version": "2.31.0",
"x-rpc-game_biz": "bbs_cn",
"x-rpc-device_fp": device_fp,
"x-rpc-device_id": device_id,
"x-rpc-device_model": "Firefox%20131.0",
"x-rpc-device_name": "Firefox",
"x-rpc-aigis": aigis,
"ds": ds,
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0",
"content-type": "application/json",
"referer": "https://user.miyoushe.com/",
}
def _token_exchange_headers(stoken: str, mid: str, query_str: str, device_id: str, device_fp: str) -> Dict[str, str]:
return {
"user-agent": BBS_UA,
"x-rpc-app_version": BBS_VERSION,
"x-rpc-client_type": "5",
"x-requested-with": "com.mihoyo.hyperion",
"referer": "https://webstatic.mihoyo.com",
"x-rpc-device_id": device_id,
"x-rpc-device_fp": device_fp,
"ds": _ds_x4(query=query_str),
"cookie": f"mid={mid};stoken={stoken}",
}
# ── 异步 API ──────────────────────────────────────────────────────────────────
async def sms_send_captcha(phone: str, aigis: str = "",
device_id: str = None, device_fp: str = None
) -> Dict[str, Any]:
"""
发送短信验证码。
:return: {"success": bool, "action_type": str, "aigis": str, "retcode": int, "message": str,
"device_id": str, "device_fp": str}
"""
did = device_id or _device_id()
dfp = device_fp or _device_fp()
body = {
"area_code": _rsa_encrypt("+86"),
"mobile": _rsa_encrypt(phone),
}
headers = _sms_headers(_ds_simple(), did, dfp, aigis)
try:
proxy = _get_proxy()
client_kwargs = {"timeout": 20}
if proxy:
client_kwargs["proxy"] = proxy
async with httpx.AsyncClient(**client_kwargs) as client:
resp = await client.post(CREATE_CAPTCHA_URL, headers=headers, json=body, timeout=20)
data = resp.json()
if data.get("retcode") == 0:
return {
"success": True,
"action_type": data.get("data", {}).get("action_type", "login"),
"retcode": 0,
"message": "OK",
"device_id": did,
"device_fp": dfp,
}
aigis_header = resp.headers.get("x-rpc-aigis", "")
return {
"success": False,
"retcode": data.get("retcode"),
"message": data.get("message", ""),
"aigis": aigis_header,
"device_id": did,
"device_fp": dfp,
}
except Exception as e:
return {"success": False, "retcode": -1, "message": str(e), "device_id": did, "device_fp": dfp}
async def sms_login_by_captcha(phone: str, captcha: str, action_type: str = "login",
aigis: str = "",
device_id: str = None, device_fp: str = None
) -> Dict[str, Any]:
"""
使用短信验证码登录,获取 stoken。
:return: {"success": bool, "stoken": str, "stuid": str, "mid": str, ...}
"""
did = device_id or _device_id()
dfp = device_fp or _device_fp()
body = {
"area_code": _rsa_encrypt("+86"),
"mobile": _rsa_encrypt(phone),
"captcha": captcha,
}
# 优先尝试 app 端点(body 返回 stoken)
headers = _sms_headers(_ds_passport(body), did, dfp, aigis)
try:
proxy = _get_proxy()
client_kwargs = {"timeout": 20}
if proxy:
client_kwargs["proxy"] = proxy
async with httpx.AsyncClient(**client_kwargs) as client:
resp = await client.post(LOGIN_APP_URL, headers=headers, json=body, timeout=20)
data = resp.json()
if data.get("retcode") == 0:
login_data = data.get("data", {})
token_info = login_data.get("token", {})
user_info = login_data.get("user_info", {})
stoken = token_info.get("token", "")
aid = user_info.get("aid", "")
mid = user_info.get("mid", "")
if not stoken:
# 尝试从 cookies 获取
ck = dict(resp.cookies)
stoken = ck.get("stoken", "") or ck.get("stoken_v2", "")
aid = aid or ck.get("stuid", "") or ck.get("account_id", "")
mid = mid or ck.get("mid", "")
if stoken:
return {"success": True, "stoken": stoken, "stuid": aid, "mid": mid}
# app 端点失败,尝试 web 端点
if data.get("retcode") != 0:
headers2 = _sms_headers(_ds_passport(body), did, dfp, aigis)
proxy = _get_proxy()
client_kwargs = {"timeout": 20}
if proxy:
client_kwargs["proxy"] = proxy
async with httpx.AsyncClient(**client_kwargs) as client:
resp2 = await client.post(LOGIN_WEB_URL, headers=headers2, json=body, timeout=20)
data2 = resp2.json()
if data2.get("retcode") == 0:
ck = dict(resp2.cookies)
user_info = data2.get("data", {}).get("user_info", {})
stoken = ck.get("stoken", "") or ck.get("stoken_v2", "")
aid = user_info.get("aid", "") or ck.get("stuid", "") or ck.get("account_id", "")
mid = user_info.get("mid", "") or ck.get("mid", "")
return {"success": True, "stoken": stoken, "stuid": aid, "mid": mid}
aigis_header = resp2.headers.get("x-rpc-aigis", "")
return {
"success": False,
"retcode": data2.get("retcode"),
"message": data2.get("message", ""),
"aigis": aigis_header,
}
aigis_header = resp.headers.get("x-rpc-aigis", "")
return {
"success": False,
"retcode": data.get("retcode"),
"message": data.get("message", ""),
"aigis": aigis_header,
}
except Exception as e:
return {"success": False, "retcode": -1, "message": str(e)}
async def sms_get_full_cookies(stoken: str, stuid: str, mid: str,
device_id: str = None, device_fp: str = None
) -> Dict[str, str]:
"""
用 stoken 换取 ltoken + cookie_token,返回完整 cookies dict。
"""
did = device_id or _device_id()
dfp = device_fp or _device_fp()
cookies: Dict[str, str] = {
"account_id": stuid,
"stuid": stuid,
"stoken": stoken,
"mid": mid,
"login_uid": stuid,
}
# 换 ltoken
try:
query_str = f"stoken={stoken}"
headers = _token_exchange_headers(stoken, mid, query_str, did, dfp)
proxy = _get_proxy()
client_kwargs = {"timeout": 20}
if proxy:
client_kwargs["proxy"] = proxy
async with httpx.AsyncClient(**client_kwargs) as client:
res = await client.get(GET_LTOKEN_URL, headers=headers,
params={"stoken": stoken}, timeout=15)
data = res.json()
if data.get("retcode") == 0:
cookies["ltoken"] = data["data"]["ltoken"]
cookies["ltuid"] = stuid
except Exception:
pass
# 换 cookie_token
try:
query_str = f"stoken={stoken}"
headers = _token_exchange_headers(stoken, mid, query_str, did, dfp)
proxy = _get_proxy()
client_kwargs = {"timeout": 20}
if proxy:
client_kwargs["proxy"] = proxy
async with httpx.AsyncClient(**client_kwargs) as client:
res = await client.get(GET_COOKIE_TOKEN_URL, headers=headers,
params={"stoken": stoken}, timeout=15)
data = res.json()
if data.get("retcode") == 0:
cookies["cookie_token"] = data["data"]["cookie_token"]
except Exception:
pass
return cookies
FILE:src/formatter.py
"""
消息格式化工具 — 将 API 数据转为可读文本
"""
from typing import Any, Dict, List, Optional
# ── 便笺格式化 ─────────────────────────────────────────────────────────────────
def format_genshin_note(data: Dict[str, Any], nickname: str = "") -> str:
"""格式化原神实时便笺"""
lines = [f"📊 原神实时便笺" + (f" — {nickname}" if nickname else "")]
lines.append("─" * 24)
# 树脂
resin = data.get("current_resin", 0)
max_resin = data.get("max_resin", 160)
resin_bar = _progress_bar(resin, max_resin)
lines.append(f"🌙 原粹树脂: {resin}/{max_resin} {resin_bar}")
if resin < max_resin:
recovery_time = int(data.get("resin_recovery_time", 0))
h, m = divmod(recovery_time // 60, 60)
lines.append(f" 回满还需: {h}h {m}m")
# 每日委托
task_num = data.get("finished_task_num", 0)
total_task = data.get("total_task_num", 4)
lines.append(f"📋 每日委托: {task_num}/{total_task} {'✅' if task_num >= total_task else '⏳'}")
# 周本折扣
remain_resin_discount = data.get("remain_resin_discount_num", 0)
resin_discount_limit = data.get("resin_discount_num_limit", 3)
lines.append(f"🏆 周本折扣: 剩余 {remain_resin_discount}/{resin_discount_limit}")
# 洞天宝钱
home_coin = data.get("current_home_coin", 0)
max_home_coin = data.get("max_home_coin", 2400)
coin_bar = _progress_bar(home_coin, max_home_coin)
lines.append(f"🏡 洞天宝钱: {home_coin}/{max_home_coin} {coin_bar}")
# 探索派遣
expeditions = data.get("expeditions", [])
if expeditions:
lines.append(f"🗺️ 探索派遣: {len(expeditions)} 人")
for exp in expeditions:
status = exp.get("status", "")
remain = int(exp.get("remained_time", 0))
if status == "Finished" or remain == 0:
lines.append(f" • 已完成 ✅")
else:
h, m = divmod(remain // 60, 60)
lines.append(f" • 剩余 {h}h {m}m")
return "\n".join(lines)
def format_starrail_note(data: Dict[str, Any], nickname: str = "") -> str:
"""格式化星穹铁道实时便笺"""
lines = [f"📊 星穹铁道便笺" + (f" — {nickname}" if nickname else "")]
lines.append("─" * 24)
# 开拓力
power = data.get("current_stamina", 0)
max_power = data.get("max_stamina", 240)
power_bar = _progress_bar(power, max_power)
lines.append(f"⚡ 开拓力: {power}/{max_power} {power_bar}")
if power < max_power:
recovery_time = int(data.get("stamina_recover_time", 0))
h, m = divmod(recovery_time // 60, 60)
lines.append(f" 回满还需: {h}h {m}m")
# 后备开拓力
reserve = data.get("current_reserve_stamina", 0)
if reserve > 0:
lines.append(f"🔋 后备开拓力: {reserve}")
# 每日实训
daily = data.get("current_train_score", 0)
max_daily = data.get("max_train_score", 500)
lines.append(f"📋 每日实训: {daily}/{max_daily} {'✅' if daily >= max_daily else '⏳'}")
# 模拟宇宙
rogue = data.get("current_rogue_score", 0)
max_rogue = data.get("max_rogue_score", 14000)
lines.append(f"🌌 模拟宇宙: {rogue}/{max_rogue}")
# 派遣
expeditions = data.get("expeditions", [])
if expeditions:
lines.append(f"🗺️ 委托派遣: {len(expeditions)} 人")
for exp in expeditions:
remain = int(exp.get("remaining_time", 0))
name = exp.get("name", "未知")
if remain == 0:
lines.append(f" • {name} ✅")
else:
h, m = divmod(remain // 60, 60)
lines.append(f" • {name} 剩余 {h}h {m}m")
return "\n".join(lines)
# ── 任务结果格式化 ─────────────────────────────────────────────────────────────
def format_task_result(results: Dict[str, Any], nickname: str = "") -> str:
"""格式化米游币任务执行结果"""
lines = [f"🪙 米游币任务" + (f" — {nickname}" if nickname else "")]
lines.append("─" * 24)
task_map = {
"sign": ("📍 论坛打卡", 1),
"view": ("📖 浏览帖子", 3),
"like": ("👍 点赞帖子", 5),
"share": ("🔗 分享帖子", 1),
}
total_myb = 0
coins_data = results.get("coins", {})
remaining_coins = coins_data.get("remaining", 0)
for key, (label, max_count) in task_map.items():
r = results.get(key, {})
done = r.get("done", 0)
success = r.get("success", False)
icon = "✅" if success else "❌"
lines.append(f"{icon} {label}: {done}/{max_count}")
if success:
total_myb += done # 每次任务约得 1 米游币
# 显示错误信息
if not success and "errors" in r:
for err in r["errors"]:
lines.append(f" ⚠️ {err[:60]}")
if not success and "error" in r:
lines.append(f" ⚠️ {r['error'][:60]}")
# 优先使用 API 返回的米游币数量
if remaining_coins > 0:
lines.append(f"\n预计获得米游币: +{remaining_coins}")
else:
lines.append(f"\n预计获得米游币: +{total_myb}")
return "\n".join(lines)
def format_sign_results(results: List[Dict[str, Any]]) -> str:
"""格式化多游戏签到结果"""
lines = ["🗓️ 游戏签到结果"]
lines.append("─" * 24)
for r in results:
icon = "✅" if r["success"] else "❌"
lines.append(f"{icon} {r['msg']}")
return "\n".join(lines)
def format_account_list(accounts: List[dict]) -> str:
"""
格式化账号列表,依次展示每个米游社账号的详细信息。
"""
if not accounts:
return (
"📭 未绑定任何米游社账号\n\n"
"发送「米游社登录」绑定账号"
)
total = len(accounts)
lines = [f"🔐 已绑定 {total} 个米游社账号\n"]
for i, acc in enumerate(accounts, 1):
uid = acc.get("uid", "?")
nickname = acc.get("nickname", "未知")
games = acc.get("games", [])
cookies = acc.get("cookies", {})
# Cookie 有效字段
ck_fields = [k for k in cookies if k not in
("account_id", "stuid", "ltuid", "login_uid",
"account_id_v2", "ltuid_v2")]
ck_status = "✅ 完整" if ("stoken" in cookies or "cookie_token" in cookies) else "⚠️ 不完整"
lines.append(f"{'─' * 24}")
lines.append(f"账号 {i}/{total}")
lines.append(f"👤 昵称: {nickname}")
lines.append(f"🆔 米游社 UID: {uid}")
lines.append(f"🍪 Cookie: {ck_status} ({', '.join(ck_fields) if ck_fields else '无'})")
if games:
lines.append(f"🎮 游戏账号 ({len(games)} 个):")
# 游戏名映射
biz_name = {
"hk4e_cn": "原神",
"hkrpg_cn": "星穹铁道",
"bh3_cn": "崩坏3",
"nap_cn": "绝区零",
"hk4e_global": "原神(国际)",
"hkrpg_global": "星铁(国际)",
}
for g in games:
biz = g.get("game_biz", "")
gname = biz_name.get(biz, g.get("name", biz or "未知"))
gid = g.get("game_uid", "?")
lv = g.get("level", 0)
region = g.get("region", "")
region_str = f" | {region}" if region else ""
lines.append(f" • {gname}: UID {gid} Lv.{lv}{region_str}")
else:
lines.append("🎮 游戏账号: 未获取(可重新登录刷新)")
lines.append(f"🗑 解绑: 发送「米游社解绑 {uid}」")
lines.append("─" * 24)
return "\n".join(lines)
def format_good_list(goods: List[dict]) -> str:
"""格式化商品列表"""
if not goods:
return "暂无可兑换商品"
lines = ["🛒 米游币商品列表"]
lines.append("─" * 24)
for g in goods[:15]: # 最多显示15个
name = g.get("goods_name", "未知")
price = g.get("price", 0)
stock = g.get("unlimit", False)
gid = g.get("goods_id", "")
stock_str = "不限量" if stock else f"库存: {g.get('next_num', 0)}"
lines.append(f"• {name}")
lines.append(f" 💰 {price} 米游币 | {stock_str}")
lines.append(f" ID: `{gid}`")
return "\n".join(lines)
# ── 工具函数 ───────────────────────────────────────────────────────────────────
def _progress_bar(current: int, maximum: int, length: int = 10) -> str:
"""生成文字进度条"""
if maximum == 0:
return "░" * length
filled = int(current / maximum * length)
return "█" * filled + "░" * (length - filled)
FILE:src/stoken_qr_login.py
"""
stoken 扫码补全模块
用于 Cookie 登录或短信登录后 stoken 缺失时,通过扫码补全 stoken。
流程与 qr_login.py 相同,但只更新现有账号的 stoken 字段,不重新绑定。
"""
from qr_login import (
fetch_qrcode,
game_token_to_cookies,
qrcode_to_image_url,
query_qrcode,
)
__all__ = [
"fetch_qrcode",
"game_token_to_cookies",
"qrcode_to_image_url",
"query_qrcode",
]
FILE:src/qr_login.py
"""
米游社扫码登录模块(独立)
流程:
1. fetch_qrcode() → 获取二维码 URL + ticket
2. 展示二维码给用户(图片链接)
3. 轮询 query_qrcode() → 等待用户扫码确认
4. 扫码成功后用 game_token 换取完整 Cookie 套装
5. 调用 store.add_account() 保存
提示文字:用米游社App 扫码,自动获取完整 Cookie
依赖: httpx, qrcode (可选,用于生成终端二维码)
"""
import asyncio
import hashlib
import json
import random
import string
import time
import uuid
from typing import Dict, Optional, Tuple
import httpx
# ── 常量 ──────────────────────────────────────────────────────────────────────
APP_ID = "4" # 原神 app_id,用于扫码登录
APP_VERSION = "2.44.1"
UA_MOBILE = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.44.1"
)
URL_FETCH_QRCODE = "https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/fetch"
URL_QUERY_QRCODE = "https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/query"
URL_GET_TOKEN_BY_GAME_TOKEN = (
"https://api-takumi.mihoyo.com/account/ma-cn-session/app/getTokenByGameToken"
)
URL_STOKEN_V2_BY_V1 = (
"https://passport-api.mihoyo.com/account/ma-cn-session/app/getTokenBySToken"
)
URL_COOKIE_TOKEN_BY_STOKEN = (
"https://passport-api.mihoyo.com/account/auth/api/getCookieAccountInfoBySToken"
)
URL_LTOKEN_BY_STOKEN = (
"https://passport-api.mihoyo.com/account/auth/api/getLTokenBySToken"
)
HEADERS_BASE = {
"User-Agent": UA_MOBILE,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"x-rpc-app_version": APP_VERSION,
"x-rpc-client_type": "4",
"Content-Type": "application/json; charset=utf-8",
}
HEADERS_PASSPORT = {
**HEADERS_BASE,
"x-rpc-client_type": "1",
"x-rpc-game_biz": "bbs_cn",
"x-rpc-sdk_version": "1.6.1",
}
POLL_INTERVAL = 3 # 轮询间隔(秒)
POLL_TIMEOUT = 180 # 最长等待(秒)
# ── 工具 ──────────────────────────────────────────────────────────────────────
def _device_id() -> str:
return str(uuid.uuid4()).upper()
def _generate_ds(salt: str = "IZPgfb0dRPtBeLuFkdDznSmmkB5W5EXc",
body: str = "", query: str = "") -> str:
t = str(int(time.time()))
r = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b={body}&q={query}".encode()).hexdigest()
return f"{t},{r},{h}"
# ── Step 1: 获取二维码 ─────────────────────────────────────────────────────────
async def fetch_qrcode() -> Tuple[bool, Optional[str], Optional[str], Optional[str]]:
"""
申请扫码登录二维码。
:return: (ok, qrcode_url, ticket, device)
"""
device = _device_id()
payload = {"app_id": APP_ID, "device": device}
try:
async with httpx.AsyncClient() as client:
res = await client.post(
URL_FETCH_QRCODE,
headers=HEADERS_BASE,
json=payload,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
url = data["data"]["url"]
from urllib.parse import urlparse, parse_qs
ticket = parse_qs(urlparse(url).query).get("ticket", [None])[0]
return True, url, ticket, device
return False, None, None, None
except Exception:
return False, None, None, None
# ── Step 2: 生成二维码图片 URL ───────────────────────────────────────────────
def qrcode_to_image_url(url: str, size: int = 300) -> str:
"""将 URL 转为二维码图片 URL(使用 pwmqr.com API)"""
import urllib.parse
encoded = urllib.parse.quote(url, safe='')
return f"https://www.pwmqr.com/qrcodeapi/?size={size}x{size}&data={encoded}"
def qrcode_to_text(url: str) -> Optional[str]:
"""将 URL 转为终端可显示的二维码文本(需要 qrcode 库)"""
try:
import qrcode
from io import StringIO
qr = qrcode.QRCode(border=1)
qr.add_data(url)
qr.make(fit=True)
f = StringIO()
qr.print_ascii(out=f, invert=True)
return f.getvalue()
except ImportError:
return None
# ── Step 3: 轮询扫码状态 ───────────────────────────────────────────────────────
async def query_qrcode(ticket: str, device: str = "") -> Tuple[str, Optional[Dict]]:
"""
轮询二维码扫描状态。
:return: (status, raw_data)
status: "Init" | "Scanned" | "Confirmed" | "Expired" | "error"
"""
payload = {"app_id": APP_ID, "ticket": ticket, "device": device}
try:
async with httpx.AsyncClient() as client:
res = await client.post(
URL_QUERY_QRCODE,
headers=HEADERS_BASE,
json=payload,
timeout=15,
)
data = res.json()
retcode = data.get("retcode", -1)
if retcode != 0:
if retcode in (-3501, -3505):
return "Expired", None
return "Init", None
stat = data["data"].get("stat", "Init")
payload_data = data["data"].get("payload", {})
if stat == "Confirmed" and payload_data:
raw = json.loads(payload_data.get("raw", "{}"))
return "Confirmed", raw
return stat, None
except Exception:
return "error", None
# ── Step 4: game_token → 完整 Cookie 套装 ─────────────────────────────────────
async def game_token_to_cookies(uid: str, game_token: str) -> Tuple[bool, Optional[Dict[str, str]]]:
"""
用扫码得到的 game_token 换取完整 Cookie 套装:
stoken_v1 → stoken_v2 + mid → cookie_token + ltoken
"""
cookies: Dict[str, str] = {"account_id": uid, "stuid": uid}
# 1. game_token → stoken_v1
try:
body = json.dumps({"account_id": int(uid), "game_token": game_token})
h = {**HEADERS_BASE, "DS": _generate_ds(body=body)}
async with httpx.AsyncClient() as client:
res = await client.post(
URL_GET_TOKEN_BY_GAME_TOKEN,
headers=h,
json={"account_id": int(uid), "game_token": game_token},
timeout=15,
)
data = res.json()
if data.get("retcode") != 0:
return False, None
token_list = data["data"].get("token", {})
stoken_v1 = token_list.get("token")
if not stoken_v1:
return False, None
cookies["stoken"] = stoken_v1
except Exception:
return False, None
# 2. stoken_v1 → stoken_v2 + mid
try:
h = {**HEADERS_PASSPORT, "x-rpc-device_id": _device_id()}
async with httpx.AsyncClient() as client:
res = await client.post(
URL_STOKEN_V2_BY_V1,
headers=h,
cookies={"stoken": stoken_v1, "stuid": uid},
json={"app_id": "bll8iq97cem8", "dst_token_types": [1, 2, 4]},
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
token_map = {t["token_type"]: t["token"] for t in data["data"].get("tokens", [])}
stoken_v2 = token_map.get(2) or token_map.get(1)
mid = data["data"].get("user_info", {}).get("mid", "")
if stoken_v2:
cookies["stoken_v2"] = stoken_v2
if mid:
cookies["mid"] = mid
cookies["login_uid"] = uid
except Exception:
pass # stoken_v2 可选,继续
# 3. stoken → cookie_token
try:
h = {**HEADERS_PASSPORT, "x-rpc-device_id": _device_id()}
ck = {"stoken": cookies.get("stoken_v2", stoken_v1), "stuid": uid, "mid": cookies.get("mid", "")}
async with httpx.AsyncClient() as client:
res = await client.get(
URL_COOKIE_TOKEN_BY_STOKEN,
headers=h,
cookies=ck,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
cookies["cookie_token"] = data["data"]["cookie_token"]
if not cookies.get("account_id"):
cookies["account_id"] = data["data"].get("uid", uid)
except Exception:
pass
# 4. stoken → ltoken
try:
h = {**HEADERS_PASSPORT, "x-rpc-device_id": _device_id()}
ck = {"stoken": cookies.get("stoken_v2", stoken_v1), "stuid": uid, "mid": cookies.get("mid", "")}
async with httpx.AsyncClient() as client:
res = await client.get(
URL_LTOKEN_BY_STOKEN,
headers=h,
cookies=ck,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
cookies["ltoken"] = data["data"]["ltoken"]
cookies["ltuid"] = uid
except Exception:
pass
if cookies.get("stoken") and cookies.get("account_id"):
return True, cookies
return False, None
FILE:src/api.py
"""
米游社 API 封装
参考: https://github.com/Ljzd-PRO/nonebot-plugin-mystool
"""
import asyncio
import hashlib
import json
import random
import string
import time
from typing import Any, Dict, List, Optional, Tuple
import httpx
# ── 常量 ──────────────────────────────────────────────────────────────────────
APP_VERSION = "2.99.1"
CLIENT_TYPE = "5"
# Salt 值(来自 MihoyoBBSTools)
SALT_IOS = "IZPgfb0dRPtBeLuFkdDznSmmkB5W5EXc"
SALT_PROD = "6s25p5ox5y14umn1p61aqyyvbvvl3lrt"
SALT_WEB = "DlOUwIupfU6YespEUWDJmXtutuXV6owG" # 网页端
SALT_X6 = "t0qEgfub6cvueAPgR5m9aQWWVciEer7v" # 点赞/分享专用
SALT_APP = "b0EofkfMKq2saWV9fwux18J5vzcFTlex" # App 端
# 米游社 verify_key
BBS_VERIFY_KEY = "bll8iq97cem8"
UA_MOBILE = (
"Mozilla/5.0 (iPhone; CPU iPhone OS 16_0 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.44.1"
)
# 游戏签到 API(仅国服)
SIGN_APIS = {
"genshin": {
"name": "原神",
"game_id": "hk4e_cn",
"act_id": "e202311201442471",
"cn": {
"act_id": "e202311201442471",
"url": "https://api-takumi.mihoyo.com/event/luna/sign",
"info_url": "https://api-takumi.mihoyo.com/event/luna/info",
"referer": "https://webstatic.mihoyo.com/",
},
"signgame": "hk4e",
},
"starrail": {
"name": "崩坏:星穹铁道",
"game_id": "hkrpg_cn",
"act_id": "e202304121516551",
"cn": {
"act_id": "e202304121516551",
"url": "https://api-takumi.mihoyo.com/event/luna/hkrpg/sign",
"info_url": "https://api-takumi.mihoyo.com/event/luna/hkrpg/info",
"referer": "https://webstatic.mihoyo.com/",
},
"signgame": "hkrpg",
},
"honkai3": {
"name": "崩坏3",
"game_id": "bh3_cn",
"act_id": "e202306201626331",
"cn": {
"act_id": "e202306201626331",
"url": "https://api-takumi.mihoyo.com/event/luna/bh3/sign",
"info_url": "https://api-takumi.mihoyo.com/event/luna/bh3/info",
"referer": "https://webstatic.mihoyo.com/",
},
"signgame": "bh3",
},
"honkai2": {
"name": "崩坏学园2",
"game_id": "bh2_cn",
"act_id": "e202203291431091",
"cn": {
"act_id": "e202203291431091",
"url": "https://api-takumi.mihoyo.com/event/luna/bh2/sign",
"info_url": "https://api-takumi.mihoyo.com/event/luna/bh2/info",
"referer": "https://webstatic.mihoyo.com/",
},
"signgame": "bh2",
},
"tearsofthemis": {
"name": "未定事件簿",
"game_id": "nxx_cn",
"act_id": "e202202251749321",
"cn": {
"act_id": "e202202251749321",
"url": "https://api-takumi.mihoyo.com/event/luna/nxx/sign",
"info_url": "https://api-takumi.mihoyo.com/event/luna/nxx/info",
"referer": "https://webstatic.mihoyo.com/",
},
"signgame": "nxx",
},
"zzz": {
"name": "绝区零",
"game_id": "nap_cn",
"act_id": "e202406242138391",
"cn": {
"act_id": "e202406242138391",
"url": "https://act-nap-api.mihoyo.com/event/luna/zzz/sign",
"info_url": "https://act-nap-api.mihoyo.com/event/luna/zzz/info",
"referer": "https://webstatic.mihoyo.com/",
},
"signgame": "zzz",
},
}
# 米游币任务 API
URL_BBS_TASKS = "https://bbs-api.miyoushe.com/apihub/wapi/getUserMissionsState"
URL_BBS_SIGN = "https://bbs-api.miyoushe.com/apihub/app/api/signIn"
URL_BBS_LIST = "https://bbs-api.miyoushe.com/post/api/getForumPostList?forum_id={forum_id}&is_good=false&is_hot=false&page_size=20&sort_type=1"
URL_BBS_VIEW = "https://bbs-api.miyoushe.com/post/api/getPostFull?post_id={post_id}"
URL_BBS_LIKE = "https://bbs-api.miyoushe.com/apihub/sapi/upvotePost"
URL_BBS_SHARE = "https://bbs-api.miyoushe.com/apihub/api/getShareConf?entity_id={post_id}&entity_type=1"
# 便笺 API
URL_GENSHIN_NOTE = "https://api-takumi-record.mihoyo.com/game_record/app/genshin/api/dailyNote"
URL_STARRAIL_NOTE = "https://api-takumi-record.mihoyo.com/game_record/app/hkrpg/api/note"
# 游戏账号 API
URL_GAME_RECORD = "https://api-takumi-record.mihoyo.com/game_record/card/wapi/getGameRecordCard?uid={uid}"
URL_ACCOUNT_BY_GAME = "https://api-takumi.mihoyo.com/binding/api/getUserGameRolesByCookie?game_biz={game_biz}"
# 商品 API
URL_GOOD_LIST = "https://api-takumi.mihoyo.com/mall/v1/web/goods/list?app_id=1&point_sn=myb&page_size=20&page={page}&game={game}"
URL_EXCHANGE = "https://api-takumi.miyoushe.com/mall/v1/web/goods/exchange"
# 米游币余额
URL_MYB = "https://api-takumi.mihoyo.com/common/homutreasure/v1/web/user/point?app_id=1&point_sn=myb"
# 地址列表
URL_ADDRESS_LIST = "https://api-takumi.mihoyo.com/account/address/list"
# 论坛 ID(用于米游币任务)
FORUM_IDS = {
"genshin": 26,
"starrail": 52,
"honkai3": 1,
"zzz": 57,
}
# ── DS 签名生成 ────────────────────────────────────────────────────────────────
def generate_ds(salt: str = SALT_IOS, body: str = "", query: str = "") -> str:
"""
生成米游社请求所需的 DS 签名
:param salt: 签名 salt
:param body: 请求体 JSON 字符串
:param query: URL 查询参数字符串
:return: DS 签名字符串
"""
t = str(int(time.time()))
# X6 salt 使用数字随机数,其他使用字母数字
if salt == SALT_X6:
r = str(random.randint(100001, 200000))
h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b={body}&q={query}".encode()).hexdigest()
elif salt == SALT_APP:
# APP salt 使用简单格式(无 body/query)
r = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
h = hashlib.md5(f"salt={salt}&t={t}&r={r}".encode()).hexdigest()
else:
r = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
h = hashlib.md5(f"salt={salt}&t={t}&r={r}&b={body}&q={query}".encode()).hexdigest()
return f"{t},{r},{h}"
def generate_device_id() -> str:
"""生成随机设备 ID"""
import uuid
return str(uuid.uuid4()).upper()
# ── 通用请求头 ─────────────────────────────────────────────────────────────────
def get_base_headers(cookies: Dict[str, str]) -> Dict[str, str]:
return {
"User-Agent": UA_MOBILE,
"Referer": "https://webstatic.mihoyo.com/",
"Origin": "https://webstatic.mihoyo.com",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
"x-rpc-app_version": APP_VERSION,
"x-rpc-client_type": CLIENT_TYPE,
"x-rpc-device_id": generate_device_id(),
}
# ── 账号信息 ───────────────────────────────────────────────────────────────────
async def get_game_records(cookies: Dict[str, str], uid: str) -> Tuple[bool, Optional[List[Dict]]]:
"""获取用户绑定的游戏账号列表"""
headers = get_base_headers(cookies)
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_GAME_RECORD.format(uid=uid),
headers=headers,
cookies=cookies,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
return True, data["data"]["list"]
return False, None
except Exception:
return False, None
async def get_account_list_by_game(cookies: Dict[str, str], game_biz: str) -> List[Dict[str, str]]:
"""
按游戏获取账号列表(参考 MihoyoBBSTools/account.py)
:param cookies: 米游社 Cookie
:param game_biz: 游戏业务标识 (hk4e_cn/hkrpg_cn/bh3_cn/bh2_cn/nxx_cn/nap_cn)
:return: 账号列表 [{"nickname": str, "game_uid": str, "region": str}, ...]
"""
headers = get_base_headers(cookies)
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_ACCOUNT_BY_GAME.format(game_biz=game_biz),
headers=headers,
cookies=cookies,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
accounts = []
for item in data["data"].get("list", []):
accounts.append({
"nickname": item.get("nickname", ""),
"game_uid": item.get("game_uid", ""),
"region": item.get("region", ""),
})
return accounts
return []
except Exception:
return []
async def get_myb_balance(cookies: Dict[str, str]) -> Tuple[bool, Optional[int]]:
"""获取米游币余额"""
headers = get_base_headers(cookies)
try:
async with httpx.AsyncClient() as client:
res = await client.get(URL_MYB, headers=headers, cookies=cookies, timeout=15)
data = res.json()
if data.get("retcode") == 0:
return True, int(data["data"]["points"])
return False, None
except Exception:
return False, None
# ── 米游币每日任务(忠实移植 MihoyoBBSTools/mihoyobbs.py)────────────────────────
# 论坛列表(对应 MihoyoBBSTools setting.mihoyobbs_List)
BBS_LIST = {
"genshin": {"id": "2", "forumId": "26", "name": "原神"},
"starrail": {"id": "6", "forumId": "52", "name": "崩坏:星穹铁道"},
"honkai3": {"id": "1", "forumId": "1", "name": "崩坏3"},
"honkai2": {"id": "3", "forumId": "30", "name": "崩坏学园2"},
"tearsofthemis": {"id": "4", "forumId": "37", "name": "未定事件簿"},
"zzz": {"id": "8", "forumId": "57", "name": "绝区零"},
}
# 任务 ID 映射(对应 MihoyoBBSTools get_tasks_list)
TASK_IDS = {
58: {"attr": "sign", "done": "is_get_award"},
59: {"attr": "read", "done": "is_get_award", "num_attr": "read_num"},
60: {"attr": "like", "done": "is_get_award", "num_attr": "like_num"},
61: {"attr": "share", "done": "is_get_award"},
}
def _get_ds(salt: str = SALT_APP) -> str:
"""生成 DS 签名(对应 MihoyoBBSTools get_ds(web=False))"""
t = str(int(time.time()))
r = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
h = hashlib.md5(f"salt={salt}&t={t}&r={r}".encode()).hexdigest()
return f"{t},{r},{h}"
def _get_ds2(query: str = "", body: str = "") -> str:
"""生成 DS 签名(对应 MihoyoBBSTools get_ds2)"""
t = str(int(time.time()))
r = str(random.randint(100001, 200000))
h = hashlib.md5(f"salt={SALT_X6}&t={t}&r={r}&b={body}&q={query}".encode()).hexdigest()
return f"{t},{r},{h}"
def _get_stoken_cookie(cookies: Dict[str, str]) -> str:
"""构建 stoken cookie 字符串"""
stuid = cookies.get("stuid") or cookies.get("account_id", "")
stoken = cookies.get("stoken", "")
mid = cookies.get("mid", "")
ck = f"stuid={stuid};stoken={stoken}"
if mid:
ck += f";mid={mid}"
return ck
def _get_device_id(cookie_str: str) -> str:
"""用 cookie 通过 uuid v3 生成设备 ID(对应 MihoyoBBSTools get_device_id)"""
import uuid
return str(uuid.uuid3(uuid.NAMESPACE_URL, cookie_str))
class _Mihoyobbs:
"""忠实移植 MihoyoBBSTools/Mihoyobbs 类为异步版本"""
def __init__(self, cookies: Dict[str, str], game: str = "genshin"):
self.cookies = cookies
self.bbs_config = {
"checkin": True, "read": True, "like": True, "share": True, "cancel_like": False,
}
self.bbs_list = [BBS_LIST.get(game, BBS_LIST["genshin"])]
stoken_cookie = _get_stoken_cookie(cookies)
device_id = _get_device_id(stoken_cookie)
self.headers = {
"DS": _get_ds(SALT_APP),
"cookie": stoken_cookie,
"x-rpc-client_type": "2",
"x-rpc-app_version": APP_VERSION,
"x-rpc-sys_version": "12",
"x-rpc-channel": "miyousheluodi",
"x-rpc-device_id": device_id,
"x-rpc-device_name": "Unknown",
"x-rpc-device_model": "Unknown",
"x-rpc-h265_supported": "1",
"Referer": "https://app.mihoyo.com",
"x-rpc-verify_key": BBS_VERIFY_KEY,
"x-rpc-csm_source": "discussion",
"Content-Type": "application/json; charset=UTF-8",
"Host": "bbs-api.miyoushe.com",
"Connection": "Keep-Alive",
"Accept-Encoding": "gzip",
"User-Agent": "okhttp/4.9.3",
}
# 任务列表查询 headers(web 端)
account_cookie = ""
for k in ["account_id", "cookie_token", "stuid", "stoken"]:
if cookies.get(k):
account_cookie += f"{k}={cookies[k]};"
self.task_header = {
"Accept": "application/json, text/plain, */*",
"Origin": "https://webstatic.mihoyo.com",
"User-Agent": (
f"Mozilla/5.0 (Linux; Android 12; Unspecified Device) AppleWebKit/537.36 "
f"(KHTML, like Gecko) Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 "
f"miHoYoBBS/{APP_VERSION}"
),
"Referer": "https://webstatic.mihoyo.com",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,en-US;q=0.8",
"X-Requested-With": "com.mihoyo.hyperion",
"Cookie": account_cookie,
}
self.today_get_coins = 0
self.today_have_get_coins = 0
self.have_coins = 0
self.task_do = {
"sign": False, "read": False, "read_num": 3,
"like": False, "like_num": 5, "share": False,
}
self.posts_list: List[list] = []
async def _get_tasks_list(self):
"""获取任务列表"""
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_BBS_TASKS,
params={"point_sn": "myb"},
headers=self.task_header,
timeout=15,
)
data = res.json()
if "err" in data.get("message", "") or data.get("retcode") == -100:
return
self.today_get_coins = data["data"]["can_get_points"]
self.today_have_get_coins = data["data"]["already_received_points"]
self.have_coins = data["data"]["total_points"]
if self.today_get_coins == 0:
self.task_do["sign"] = True
self.task_do["read"] = True
self.task_do["like"] = True
self.task_do["share"] = True
return
missions = data["data"].get("states", [])
for task_id, do_info in TASK_IDS.items():
mission = next((x for x in missions if x["mission_id"] == task_id), None)
if mission is None:
continue
if mission[do_info["done"]]:
self.task_do[do_info["attr"]] = True
elif do_info.get("num_attr"):
self.task_do[do_info["num_attr"]] -= mission["happened_times"]
except Exception:
pass
async def _get_posts_list(self):
"""获取帖子列表"""
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_BBS_LIST.format(forum_id=self.bbs_list[0]["forumId"]),
headers=self.headers,
timeout=15,
)
data = res.json()["data"]["list"]
max_num = max(self.task_do["read_num"], self.task_do["like_num"])
self.posts_list = []
while len(self.posts_list) < max_num:
post = random.choice(data)
if post["post"]["post_id"] not in [x[0] for x in self.posts_list]:
self.posts_list.append([post["post"]["post_id"], post["post"]["subject"]])
except Exception:
pass
async def _signing(self):
"""论坛打卡"""
if self.task_do["sign"]:
return
for forum in self.bbs_list:
try:
post_data = json.dumps({"gids": forum["id"]})
h = self.headers.copy()
h["DS"] = _get_ds2("", post_data)
async with httpx.AsyncClient() as client:
res = await client.post(
URL_BBS_SIGN, data=post_data, headers=h, timeout=15,
)
data = res.json()
if data.get("retcode") in (0, -5003):
self.task_do["sign"] = True
except Exception:
pass
await asyncio.sleep(random.randint(2, 4))
async def _read_post(self, post_info: list):
"""浏览帖子"""
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_BBS_VIEW.format(post_id=post_info[0]),
headers=self.headers, timeout=15,
)
return res.json().get("message") == "OK"
except Exception:
return False
async def _like_post(self, post_info: list) -> bool:
"""点赞帖子"""
try:
h = self.headers.copy()
h["DS"] = _get_ds(SALT_APP)
async with httpx.AsyncClient() as client:
res = await client.post(
URL_BBS_LIKE, headers=h,
json={"post_id": post_info[0], "is_cancel": False},
timeout=15,
)
return res.json().get("message") == "OK"
except Exception:
return False
async def _share_post(self, post_info: list) -> bool:
"""分享帖子(重试 3 次)"""
for _ in range(3):
try:
h = self.headers.copy()
h["DS"] = _get_ds(SALT_APP)
async with httpx.AsyncClient() as client:
res = await client.get(
URL_BBS_SHARE.format(post_id=post_info[0]),
headers=h, timeout=15,
)
if res.json().get("message") == "OK":
return True
except Exception:
pass
await asyncio.sleep(random.randint(2, 4))
return False
async def _post_task(self):
"""执行帖子相关任务"""
if self.task_do["read"] and self.task_do["like"] and self.task_do["share"]:
return
for post in self.posts_list:
if self.bbs_config["read"] and not self.task_do["read"] and self.task_do["read_num"] > 0:
await self._read_post(post)
self.task_do["read_num"] -= 1
await asyncio.sleep(random.randint(3, 8))
if self.bbs_config["like"] and not self.task_do["like"] and self.task_do["like_num"] > 0:
await self._like_post(post)
self.task_do["like_num"] -= 1
await asyncio.sleep(random.randint(3, 8))
if self.bbs_config["share"] and not self.task_do["share"]:
if await self._share_post(post):
self.task_do["share"] = True
await asyncio.sleep(random.randint(3, 8))
async def run(self) -> Dict[str, Any]:
"""执行全部米游币任务"""
await self._get_tasks_list()
if self.task_do["sign"] and self.task_do["read"] and self.task_do["like"] and self.task_do["share"]:
return {
"sign": {"done": 1, "total": 1, "success": True},
"view": {"done": 3, "total": 3, "success": True},
"like": {"done": 5, "total": 5, "success": True},
"share": {"done": 1, "total": 1, "success": True},
"coins": {"got": self.today_have_get_coins, "remaining": 0, "total": self.have_coins},
}
await self._get_posts_list()
if not self.posts_list:
return {
"sign": {"done": int(self.task_do["sign"]), "total": 1, "success": self.task_do["sign"]},
"view": {"done": 3 - self.task_do["read_num"], "total": 3, "success": self.task_do["read"]},
"like": {"done": 5 - self.task_do["like_num"], "total": 5, "success": self.task_do["like"]},
"share": {"done": int(self.task_do["share"]), "total": 1, "success": self.task_do["share"]},
"error": "获取帖子列表失败",
}
for i in range(2):
if i > 0:
await asyncio.sleep(random.randint(3, 8))
await self._get_posts_list()
if self.bbs_config["checkin"]:
await self._signing()
await self._post_task()
await self._get_tasks_list()
if self.task_do["sign"] and self.task_do["read"] and self.task_do["like"] and self.task_do["share"]:
break
return {
"sign": {"done": int(self.task_do["sign"]), "total": 1, "success": self.task_do["sign"]},
"view": {"done": 3 - self.task_do["read_num"], "total": 3, "success": self.task_do["read"]},
"like": {"done": 5 - self.task_do["like_num"], "total": 5, "success": self.task_do["like"]},
"share": {"done": int(self.task_do["share"]), "total": 1, "success": self.task_do["share"]},
"coins": {"got": self.today_have_get_coins, "remaining": self.today_get_coins, "total": self.have_coins},
}
async def do_bbs_tasks(cookies: Dict[str, str], game: str = "genshin") -> Dict[str, Any]:
"""执行米游币每日任务(忠实移植 MihoyoBBSTools/mihoyobbs.py)"""
bbs = _Mihoyobbs(cookies, game)
return await bbs.run()
# ── 游戏签到(基于 MihoyoBBSTools/gamecheckin.py 改写)───────────────────────────
# 游戏签到配置(对应 MihoyoBBSTools 的 game_id, act_id 等)
GAME_CHECKIN_CONFIG = {
"genshin": {
"game_id": "hk4e_cn",
"name": "原神",
"act_id": "e202311201442471",
"signgame": "hk4e",
"player_name": "旅行者",
# 使用默认 API
},
"starrail": {
"game_id": "hkrpg_cn",
"name": "崩坏:星穹铁道",
"act_id": "e202304121516551",
"signgame": "hkrpg",
"player_name": "开拓者",
},
"honkai3": {
"game_id": "bh3_cn",
"name": "崩坏3",
"act_id": "e202306201626331",
"player_name": "舰长",
},
"honkai2": {
"game_id": "bh2_cn",
"name": "崩坏学园2",
"act_id": "e202203291431091",
"player_name": "玩家",
},
"tearsofthemis": {
"game_id": "nxx_cn",
"name": "未定事件簿",
"act_id": "e202202251749321",
"player_name": "律师",
},
"zzz": {
"game_id": "nap_cn",
"name": "绝区零",
"act_id": "e202406242138391",
"signgame": "zzz",
"player_name": "绳匠",
# 绝区零使用独立 API
"use_zzz_api": True,
},
}
def _get_game_checkin_headers(cookies: Dict[str, str], game: str) -> Dict[str, str]:
"""生成游戏签到 headers(对应 MihoyoBBSTools 的 set_headers)"""
device_id = generate_device_id()
# 构建 cookie 字符串
cookie_parts = []
for key in ["account_id", "cookie_token", "ltoken", "ltuid", "mid", "stoken", "stuid"]:
if cookies.get(key):
cookie_parts.append(f"{key}={cookies[key]}")
cookie_str = ";".join(cookie_parts)
headers = {
"DS": _get_ds(salt=SALT_WEB),
"Referer": "https://act.mihoyo.com/",
"Cookie": cookie_str,
"x-rpc-device_id": device_id,
"User-Agent": f"Mozilla/5.0 (Linux; Android 12; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 miHoYoBBS/{APP_VERSION}",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,en-US;q=0.8",
"x-rpc-app_version": APP_VERSION,
"x-rpc-client_type": "5",
"Content-Type": "application/json;charset=utf-8",
}
# 特殊处理
cfg = GAME_CHECKIN_CONFIG.get(game, {})
if game == "genshin":
headers["Origin"] = "https://act.mihoyo.com"
headers["x-rpc-signgame"] = "hk4e"
elif game == "zzz":
headers["Origin"] = "https://act.mihoyo.com"
headers["X-Rpc-Signgame"] = "zzz"
return headers
async def game_sign(cookies: Dict[str, str], game: str, region: str = "cn_gf01", uid: str = "") -> Tuple[bool, str]:
"""
执行游戏签到(基于 MihoyoBBSTools 实现)
:param cookies: 米游社 Cookie
:param game: 游戏标识
:param region: 服务器区域
:param uid: 游戏 UID
:return: (是否成功, 消息)
"""
cfg = GAME_CHECKIN_CONFIG.get(game)
if not cfg:
return False, f"不支持的游戏: {game}"
game_name = cfg["name"]
act_id = cfg["act_id"]
# 判断 API 端点
if cfg.get("use_zzz_api"):
# 绝区零使用独立 API
sign_url = "https://act-nap-api.mihoyo.com/event/luna/zzz/sign"
info_url = "https://act-nap-api.mihoyo.com/event/luna/zzz/info"
elif game == "starrail":
sign_url = "https://api-takumi.mihoyo.com/event/luna/hkrpg/sign"
info_url = "https://api-takumi.mihoyo.com/event/luna/hkrpg/info"
elif game == "honkai3":
sign_url = "https://api-takumi.mihoyo.com/event/luna/bh3/sign"
info_url = "https://api-takumi.mihoyo.com/event/luna/bh3/info"
elif game == "honkai2":
sign_url = "https://api-takumi.mihoyo.com/event/luna/bh2/sign"
info_url = "https://api-takumi.mihoyo.com/event/luna/bh2/info"
elif game == "tearsofthemis":
sign_url = "https://api-takumi.mihoyo.com/event/luna/nxx/sign"
info_url = "https://api-takumi.mihoyo.com/event/luna/nxx/info"
else:
# 原神
sign_url = "https://api-takumi.mihoyo.com/event/luna/sign"
info_url = "https://api-takumi.mihoyo.com/event/luna/info"
headers = _get_game_checkin_headers(cookies, game)
# 1. 查询签到状态
try:
async with httpx.AsyncClient() as client:
res = await client.get(
info_url,
params={"act_id": act_id, "region": region, "uid": uid},
headers=headers,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
if data["data"].get("is_sign"):
return True, f"{game_name} 今日已签到"
# 首次绑定检查
if data["data"].get("first_bind"):
return False, f"{game_name} 首次绑定,请先手动签到一次"
except Exception:
pass
await asyncio.sleep(random.randint(2, 8))
# 2. 执行签到
try:
# 重新生成 DS
headers["DS"] = _get_ds(salt=SALT_WEB)
async with httpx.AsyncClient() as client:
res = await client.post(
sign_url,
headers=headers,
json={"act_id": act_id, "region": region, "uid": uid},
timeout=15,
)
data = res.json()
retcode = data.get("retcode", -1)
if retcode == 0 and data.get("data", {}).get("success") == 0:
return True, f"{game_name} 签到成功 🎉"
elif retcode == -5003:
return True, f"{game_name} 今日已签到"
elif retcode == 1034:
return False, f"{game_name} 触发验证码,请手动签到"
else:
return False, f"{game_name} 签到失败: {data.get('message', '未知错误')}"
except Exception as e:
return False, f"{game_name} 签到请求失败: {e}"
# ── 实时便笺 ───────────────────────────────────────────────────────────────────
async def get_genshin_note(cookies: Dict[str, str], uid: str, region: str = "cn_gf01") -> Tuple[bool, Optional[Dict]]:
"""获取原神实时便笺(树脂、洞天、探索派遣等)"""
headers = get_base_headers(cookies)
headers["DS"] = generate_ds(
salt=SALT_PROD,
query=f"role_id={uid}&server={region}",
)
headers["x-rpc-tool_version"] = "v4.2.2-ys"
headers["x-rpc-page"] = "v4.2.2-ys_#/ys/daily"
headers["X-Requested-With"] = "com.mihoyo.hyperion"
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_GENSHIN_NOTE,
params={"role_id": uid, "server": region},
headers=headers,
cookies=cookies,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
return True, data["data"]
return False, None
except Exception:
return False, None
async def get_starrail_note(cookies: Dict[str, str], uid: str, region: str = "prod_gf_cn") -> Tuple[bool, Optional[Dict]]:
"""获取星穹铁道实时便笺(开拓力、每日实训等)"""
headers = get_base_headers(cookies)
headers["DS"] = generate_ds(
salt=SALT_PROD,
query=f"role_id={uid}&server={region}",
)
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_STARRAIL_NOTE,
params={"role_id": uid, "server": region},
headers=headers,
cookies=cookies,
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
return True, data["data"]
return False, None
except Exception:
return False, None
# ── 商品兑换 ───────────────────────────────────────────────────────────────────
async def get_good_list(game: str = "", page: int = 1) -> Tuple[bool, Optional[List[Dict]]]:
"""获取米游币商品列表"""
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_GOOD_LIST.format(page=page, game=game),
headers={"User-Agent": UA_MOBILE},
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
return True, data["data"]["list"]
return False, None
except Exception:
return False, None
async def get_address_list(cookies: Dict[str, str]) -> Tuple[bool, Optional[List[Dict]]]:
"""获取用户收货地址列表"""
headers = get_base_headers(cookies)
try:
async with httpx.AsyncClient() as client:
res = await client.get(URL_ADDRESS_LIST, headers=headers, cookies=cookies, timeout=15)
data = res.json()
if data.get("retcode") == 0:
return True, data["data"]["list"]
return False, None
except Exception:
return False, None
# 商品详情 API
URL_GOOD_DETAIL = "https://api-takumi.mihoyo.com/mall/v1/web/goods/detail?app_id=1&point_sn=myb&goods_id={goods_id}"
async def get_good_detail(goods_id: str) -> Tuple[bool, Optional[Dict]]:
"""获取单个商品详情"""
try:
async with httpx.AsyncClient() as client:
res = await client.get(
URL_GOOD_DETAIL.format(goods_id=goods_id),
headers={"User-Agent": UA_MOBILE},
timeout=15,
)
data = res.json()
if data.get("retcode") == 0:
return True, data["data"]
return False, None
except Exception:
return False, None
async def do_exchange(
cookies: Dict[str, str],
goods_id: str,
uid: str,
region: str,
game_biz: str,
address_id: Optional[str] = None,
num: int = 1,
) -> Tuple[bool, str, int]:
"""
执行米游币商品兑换
返回: (是否成功, 消息, 下次补货时间戳)
"""
headers = get_base_headers(cookies)
headers["x-rpc-verify_key"] = "bll8iq97cem8"
headers["x-rpc-channel"] = "appstore"
payload: Dict[str, Any] = {
"app_id": 1,
"point_sn": "myb",
"goods_id": goods_id,
"exchange_num": num,
"uid": uid,
"region": region,
"game_biz": game_biz,
}
if address_id:
payload["address_id"] = address_id
try:
async with httpx.AsyncClient() as client:
res = await client.post(
URL_EXCHANGE,
headers=headers,
cookies=cookies,
json=payload,
timeout=15,
)
data = res.json()
retcode = data.get("retcode", -1)
message = data.get("message", "未知错误")
if retcode == 0:
return True, "兑换成功 🎉", 0
# 库存不足时获取下次补货时间
if retcode == -2102: # 库存不足
ok, detail = await get_good_detail(goods_id)
if ok and detail:
next_time = detail.get("next_time", 0)
next_num = detail.get("next_num", 0)
return False, f"库存不足,下次补货: {next_num} 件", next_time
return False, "库存不足", 0
return False, f"兑换失败: {message} (code={retcode})", 0
except Exception as e:
return False, f"兑换请求失败: {e}", 0