@clawhub-tinadu-ai-406da9d080
Use this skill when the user wants to translate a PowerPoint (.pptx) file into another language while preserving the original layout, fonts, and visual desig...
---
name: deckglobalizer
description: >
Use this skill when the user wants to translate a PowerPoint (.pptx) file into
another language while preserving the original layout, fonts, and visual design.
Trigger phrases: "translate my deck", "localize my PPT", "translate slides and keep
formatting", "high-fidelity PPT translation", "translate pitch deck", "translate
presentation", "DeckGlobalizer". Also trigger when the user mentions translating
a presentation for investors, clients, or partners across languages (e.g. Chinese
to English, English to Chinese, etc.) and wants the layout untouched.
version: 2.1.1
license: MIT
requires:
packages:
- python-pptx>=0.6.21
- lxml>=4.9
---
# DeckGlobalizer — High-Fidelity Cross-Language PPT Reconstruction
You are operating as **DeckGlobalizer**, a precision tool for translating PowerPoint
presentations while preserving every aspect of the original visual design. Your
output must be indistinguishable from a deck built natively in the target language.
---
## Phase 1 — Visual Audit
Use `python-pptx` to scan the uploaded `.pptx` file.
1. Walk the full slide tree and extract all text frames, shapes, and style properties.
When traversing GroupShapes (`shape_type == 6`), recurse into children — but mark
them so they are never mistaken for top-level title shapes (pass a `top=False` flag).
2. Identify **Style Clusters** — groups of text elements sharing the same font family,
size, weight, color, and layout role (e.g. slide title, body bullet, caption, label).
3. Detect any existing target-language text already present (e.g. English captions on
a Chinese deck) — these act as **alignment anchors** for Tone of Voice calibration.
4. Run an initial **font audit**: list every typeface name found across all slides,
including inside groups and tables. Flag anything unexpected.
5. Output a `Style_Manifest.md` with the following table per cluster:
| Cluster | Role | Font | Size | Bold | Color | Count |
|---------|------|------|------|------|-------|-------|
**Stop here.** Present the Style Manifest to the user and wait for confirmation
before proceeding to Phase 2.
---
## Phase 2 — Semantic Alignment (Tiered Glossary)
Produce a `Tiered_Glossary.md` with three tiers:
### Tier 1 — Industry Standard Terms
Auto-detect the document domain (Finance, Tech, Medical, Legal, etc.) from slide
content. Apply the standard professional vocabulary for that domain in the target
language. Do not improvise these terms — use established equivalents.
### Tier 2 — Proprietary / Invented Concepts
Identify terms that are:
- High-frequency across slides, OR
- Positioned at structurally central locations (slide titles, section headers, diagram
node labels), OR
- Appear to be invented or branded (e.g. fund names, product names, framework names)
For each Tier 2 term: **do not translate directly**. Infer meaning from surrounding
context, then offer 2–3 target-language candidates with a brief rationale. Wait for
the user to select one before proceeding.
### Tier 3 — Scenario Tone of Voice
Detect the document type:
- **Fundraising / Pitch Deck** → confident, forward-looking, investor-grade English
- **Product Introduction** → clear, benefit-driven, accessible
- **Annual Review / Report** → formal, data-forward, conservative
- **Technical Document** → precise, jargon-accurate, passive voice acceptable
Apply the corresponding tone consistently throughout all translations.
**Stop here.** Present the full Tiered Glossary and wait for user sign-off before
executing any slide translations.
---
## Phase 3 — Page-by-Page Execution
Process slides **one at a time**. For each slide, follow this checklist in order:
1. **Merge multi-run paragraphs before translating.** Chinese PPTX files often split
one sentence across 10–20 runs due to inline formatting. If you translate only the
first run, the rest remain in the source language. Before writing any translation,
consolidate all runs in a paragraph into the first run (preserving the first run's
`rPr`), then write the full translated string.
2. **Translate** all text using the confirmed glossary and tone.
3. **Apply font changes** per the target font spec (defined by the user or an active
profile). See Font Operation Rules below.
4. **Apply the Layout Compensator** rules below.
5. **Verify**: confirm no source-language text remains. Show the user a before/after
summary and wait for approval before moving to the next slide.
---
## Font Operation Rules
These rules govern how fonts are written into the PPTX XML. Follow them precisely —
mistakes here produce invisible rendering errors that are hard to debug.
### Classification
- Classify font choice at the **paragraph level**, not the run level. All runs within
one paragraph must receive the same font. Do not let different runs in one paragraph
end up with different fonts.
- Within a page, elements of the same type (same visual role, same size range) must
use the same font. Do not alternate fonts across visually equivalent elements.
### Title Detection
A shape qualifies as a "title" only when **all** of the following are true:
- It is a top-level shape (not a child inside a GroupShape)
- Its `top` coordinate is between 0 and ~200,000 EMU (the very top strip of the slide)
- It has a text frame
GroupShape children inherit the group's position — their raw `top` values are relative
and must not be used for title detection.
### XML Surgery
When writing font information into a run's `rPr` element:
1. **Never create a new `rPr`** if one does not already exist. Creating a blank `rPr`
forces PowerPoint to fill in default EA fonts (often 华文中宋 or 等线), polluting
the entire slide. If `rPr is None`, skip that run entirely.
2. **Delete before writing.** Remove the `<a:sym>`, `<a:latin>`, `<a:ea>`, and
`<a:cs>` child elements first, then append fresh ones with the correct attributes.
Leaving `<a:sym>` causes theme-font fallback even when `latin` is set correctly.
3. **Write all three slots.** Set `latin`, `ea`, and `cs` explicitly. Leaving `ea`
unset lets the OS fill in a default CJK font.
4. **Include `panose`, `pitchFamily`, and `charset`** on each font element. These
ensure correct rendering across platforms.
### Global Font Audit
After completing all slides, run a final sweep across the entire file:
- Extract every `typeface` value from every run on every slide (including inside
groups and table cells)
- List any font that does not match the target font spec
- Present to the user for confirmation or auto-fix
---
## Number Verification Step
After completing translation, extract all text containing numbers from both the
source and target files and present a side-by-side table for the user to verify.
Pay particular attention to unit conversions. For Chinese source documents:
- `亿 = 100,000,000` (i.e., 1亿 = 100M; 10亿 = 1B; 130亿 = 13B)
- `万 = 10,000`
- Watch for mixed formats like `$130亿` (dollar sign + Chinese unit) — convert
the unit but keep the currency symbol
Do not rely on the user to catch conversion errors. Show the table and ask for
explicit confirmation before delivery.
---
## Layout Compensator Rules (Non-Negotiable)
These rules are enforced on every text element, in priority order:
1. **Never move a text box.** Coordinates (`left`, `top`, `width`, `height`) are frozen.
2. If translated text overflows its text frame, apply fixes in this order:
- **Step 1 — Refine:** Shorten the translation without losing meaning. Do not drop any factual claim, number, or named concept — only remove redundant phrasing and decorative language.
- **Step 2 — Spacing:** Reduce line spacing (`space_after`, `space_before`) and
character spacing incrementally, within ±15% of original.
- **Step 3 — Scale:** Reduce font size in 0.5pt steps until text fits.
3. **Sibling Consistency Enforcement:** If any element in a visual group (e.g. four
parallel feature boxes, a row of stat callouts) has its font size reduced, all
sibling elements at the same hierarchy level on that slide must be reduced to the
same size — even if they individually fit at the larger size.
---
## Implementation Notes
- Use `python-pptx` for all file operations. Do not use Office automation or COM.
- For cross-file slide operations, drop to `zipfile` + `lxml` directly.
- Preserve all non-text elements (images, shapes, icons, charts) exactly.
- Write output to `<original_filename>_<target_lang>.pptx` in the same directory.
- All intermediate files (`Style_Manifest.md`, `Tiered_Glossary.md`) are written to
the same directory as the source file.
- **All translation is performed by the Claude model itself.** This skill does not
call external translation APIs (Google Translate, DeepL, Azure Translator, etc.)
and does not send document content to any third-party service. Document content
never leaves the current session.
## Required Python Environment
```
python-pptx>=0.6.21
lxml>=4.9
```
Install: `pip install python-pptx lxml`
基于Chrome DevTools Protocol,实现小红书认证登录、内容发布、搜索发现和社交互动的完整自动化操作。
# 小红书自动化技能 V2
基于 Chrome DevTools Protocol (CDP) 的小红书完整自动化方案。
## 功能特性
- ✅ **认证登录** - 二维码登录、手机号验证码登录
- ✅ **内容发布** - 图文发布、视频发布、长文发布
- ✅ **搜索发现** - 关键词搜索、首页浏览、笔记详情
- ✅ **社交互动** - 评论、回复、点赞、收藏
- ✅ **复合运营** - 竞品分析、热点追踪
## 修复记录 (v2.0.0)
### 2026-03-07
- **修复 Chrome 启动问题** - 添加 `--no-sandbox` 和 `--disable-setuid-sandbox` 参数,支持 root 用户运行
- **优化登录流程** - 支持二维码和手机号两种登录方式
- **验证发布功能** - 成功发布测试帖子
## 快速开始
```bash
# 1. 检查登录状态
python scripts/cli.py check-login
# 2. 登录(二维码)
python scripts/cli.py login
# 3. 发布图文
python scripts/cli.py publish \
--title-file title.txt \
--content-file content.txt \
--images "/path/to/image.jpg"
```
## 命令参考
### 认证管理
| 命令 | 功能 |
|------|------|
| `check-login` | 检查登录状态 |
| `login` | 二维码登录 |
| `phone-login --phone <号码> --code <验证码>` | 手机号登录 |
| `delete-cookies` | 退出登录 |
### 内容发布
| 命令 | 功能 |
|------|------|
| `publish` | 发布图文 |
| `publish-video` | 发布视频 |
| `long-article` | 发布长文 |
### 搜索发现
| 命令 | 功能 |
|------|------|
| `list-feeds` | 获取首页推荐 |
| `search-feeds --keyword <关键词>` | 搜索笔记 |
| `get-feed-detail --feed-id <ID>` | 查看笔记详情 |
| `user-profile --user-id <ID>` | 查看用户主页 |
### 社交互动
| 命令 | 功能 |
|------|------|
| `post-comment --feed-id <ID> --content <内容>` | 发表评论 |
| `reply-comment --comment-id <ID> --content <内容>` | 回复评论 |
| `like-feed --feed-id <ID>` | 点赞/取消点赞 |
| `favorite-feed --feed-id <ID>` | 收藏/取消收藏 |
## 技术架构
- **Chrome CDP** - 通过 Chrome DevTools Protocol 控制浏览器
- **反检测机制** - 模拟真实用户行为,绕过平台检测
- **Cookie 持久化** - 登录状态自动保存,下次免登录
## 目录结构
```
xiaohongshu-v2/
├── scripts/
│ ├── cli.py # 主 CLI 入口
│ ├── chrome_launcher.py # Chrome 启动器
│ └── xhs/ # 核心模块
│ ├── cdp.py # CDP 封装
│ ├── login.py # 登录逻辑
│ ├── publish.py # 发布逻辑
│ ├── search.py # 搜索逻辑
│ ├── comment.py # 评论逻辑
│ ├── stealth.py # 反检测参数
│ └── selectors.py # CSS 选择器
├── skill.json
└── SKILL.md
```
## 依赖
- Python 3.10+
- Google Chrome 120+
- 见 `requirements.txt`
## 许可证
MIT
FILE:scripts/account_manager.py
"""多账号管理,对应独立的账号配置管理。"""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
logger = logging.getLogger(__name__)
# 账号配置文件路径
_CONFIG_DIR = Path.home() / ".xhs"
_ACCOUNTS_FILE = _CONFIG_DIR / "accounts.json"
def _load_config() -> dict:
"""加载账号配置。"""
if not _ACCOUNTS_FILE.exists():
return {"default": "", "accounts": {}}
with open(_ACCOUNTS_FILE, encoding="utf-8") as f:
return json.load(f)
def _save_config(config: dict) -> None:
"""保存账号配置。"""
_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
def list_accounts() -> list[dict]:
"""列出所有账号。"""
config = _load_config()
default = config.get("default", "")
accounts = config.get("accounts", {})
result = []
for name, info in accounts.items():
result.append(
{
"name": name,
"description": info.get("description", ""),
"is_default": name == default,
"profile_dir": _get_profile_dir(name),
}
)
return result
def add_account(name: str, description: str = "") -> None:
"""添加账号。"""
config = _load_config()
accounts = config.setdefault("accounts", {})
if name in accounts:
raise ValueError(f"账号 '{name}' 已存在")
accounts[name] = {"description": description}
# 如果是第一个账号,设为默认
if not config.get("default"):
config["default"] = name
_save_config(config)
# 创建 Profile 目录
profile_dir = _get_profile_dir(name)
os.makedirs(profile_dir, exist_ok=True)
logger.info("添加账号: %s", name)
def remove_account(name: str) -> None:
"""删除账号。"""
config = _load_config()
accounts = config.get("accounts", {})
if name not in accounts:
raise ValueError(f"账号 '{name}' 不存在")
del accounts[name]
# 如果删除的是默认账号,清除默认
if config.get("default") == name:
config["default"] = next(iter(accounts), "")
_save_config(config)
logger.info("删除账号: %s", name)
def set_default_account(name: str) -> None:
"""设置默认账号。"""
config = _load_config()
accounts = config.get("accounts", {})
if name not in accounts:
raise ValueError(f"账号 '{name}' 不存在")
config["default"] = name
_save_config(config)
logger.info("默认账号设置为: %s", name)
def get_default_account() -> str:
"""获取默认账号名称。"""
config = _load_config()
return config.get("default", "")
def _get_profile_dir(account: str) -> str:
"""获取账号的 Chrome Profile 目录。"""
return str(_CONFIG_DIR / "accounts" / account / "chrome-profile")
FILE:scripts/chrome_launcher.py
"""Chrome 进程管理(跨平台),对应 Go browser/browser.go 的进程管理部分。"""
from __future__ import annotations
import contextlib
import json
import logging
import os
import platform
import shutil
import socket
import subprocess
import sys
import time
from pathlib import Path
from xhs.stealth import STEALTH_ARGS
logger = logging.getLogger(__name__)
# 默认远程调试端口
DEFAULT_PORT = 9222
# 全局进程追踪
_chrome_process: subprocess.Popen | None = None
# 各平台 Chrome 默认路径
_CHROME_PATHS: dict[str, list[str]] = {
"Darwin": [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
],
"Linux": [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
"/snap/bin/chromium",
],
"Windows": [
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
],
}
def _get_default_data_dir() -> str:
"""返回默认 Chrome Profile 目录路径。"""
return str(Path.home() / ".xhs" / "chrome-profile")
def is_port_open(port: int, host: str = "127.0.0.1") -> bool:
"""TCP socket 级端口检测(秒级响应)。"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
try:
s.connect((host, port))
return True
except (ConnectionRefusedError, TimeoutError, OSError):
return False
def find_chrome() -> str | None:
"""查找 Chrome 可执行文件路径。"""
# 环境变量优先
env_path = os.getenv("CHROME_BIN")
if env_path and os.path.isfile(env_path):
return env_path
# which/where 查找(含 Windows chrome.exe)
chrome = (
shutil.which("google-chrome")
or shutil.which("chromium")
or shutil.which("chrome")
or shutil.which("chrome.exe")
)
if chrome:
return chrome
# 平台默认路径
system = platform.system()
# Windows: 额外检查环境变量路径
if system == "Windows":
for env_var in ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA"):
base = os.environ.get(env_var, "")
if base:
candidate = os.path.join(base, "Google", "Chrome", "Application", "chrome.exe")
if os.path.isfile(candidate):
return candidate
for path in _CHROME_PATHS.get(system, []):
if os.path.isfile(path):
return path
return None
def is_chrome_running(port: int = DEFAULT_PORT) -> bool:
"""检查指定端口的 Chrome 是否在运行(TCP 级检测)。"""
return is_port_open(port)
def launch_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> subprocess.Popen | None:
"""启动 Chrome 进程(带远程调试端口)。
Args:
port: 远程调试端口。
headless: 是否无头模式。
user_data_dir: 用户数据目录(Profile 隔离),默认 ~/.xhs/chrome-profile。
chrome_bin: Chrome 可执行文件路径。
Returns:
Chrome 子进程,若已在运行则返回 None。
Raises:
FileNotFoundError: 未找到 Chrome。
"""
global _chrome_process
# 已在运行则跳过
if is_port_open(port):
logger.info("Chrome 已在运行 (port=%d),跳过启动", port)
return None
if not chrome_bin:
chrome_bin = find_chrome()
if not chrome_bin:
raise FileNotFoundError("未找到 Chrome,请设置 CHROME_BIN 环境变量或安装 Chrome")
# 默认 user-data-dir
if not user_data_dir:
user_data_dir = _get_default_data_dir()
args = [
chrome_bin,
f"--remote-debugging-port={port}",
f"--user-data-dir={user_data_dir}",
*STEALTH_ARGS,
]
if headless:
args.append("--headless=new")
# 代理
proxy = os.getenv("XHS_PROXY")
if proxy:
args.append(f"--proxy-server={proxy}")
logger.info("使用代理: %s", _mask_proxy(proxy))
logger.info("启动 Chrome: port=%d, headless=%s, profile=%s", port, headless, user_data_dir)
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
_chrome_process = process
# 等待 Chrome 准备就绪
_wait_for_chrome(port)
return process
def close_chrome(process: subprocess.Popen) -> None:
"""关闭 Chrome 进程。"""
if process.poll() is not None:
return
try:
process.terminate()
process.wait(timeout=5)
except (subprocess.TimeoutExpired, OSError):
process.kill()
process.wait(timeout=3)
logger.info("Chrome 进程已关闭")
def kill_chrome(port: int = DEFAULT_PORT) -> None:
"""关闭指定端口的 Chrome 实例。
策略: CDP Browser.close → terminate 追踪进程 → 端口查找终止进程。
Args:
port: Chrome 调试端口。
"""
global _chrome_process
# 策略1: 通过 CDP 关闭
try:
import requests
resp = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
if resp.status_code == 200:
ws_url = resp.json().get("webSocketDebuggerUrl")
if ws_url:
import websockets.sync.client
ws = websockets.sync.client.connect(ws_url)
ws.send(json.dumps({"id": 1, "method": "Browser.close"}))
ws.close()
logger.info("通过 CDP Browser.close 关闭 Chrome (port=%d)", port)
time.sleep(1)
except Exception:
pass
# 策略2: terminate 追踪的子进程
if _chrome_process and _chrome_process.poll() is None:
try:
_chrome_process.terminate()
_chrome_process.wait(timeout=5)
logger.info("通过 terminate 关闭追踪的 Chrome 进程")
except Exception:
with contextlib.suppress(Exception):
_chrome_process.kill()
_chrome_process = None
# 策略3: 通过端口查找并终止进程(跨平台)
if is_port_open(port):
pids = _find_pids_by_port(port)
if pids:
for pid in pids:
_kill_pid(pid)
logger.info("通过进程终止关闭 Chrome (port=%d)", port)
# 等待端口释放(最多 5s)
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
if not is_port_open(port):
return
time.sleep(0.5)
if is_port_open(port):
logger.warning("端口 %d 仍被占用,kill 可能未完全生效", port)
def ensure_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> bool:
"""确保 Chrome 在指定端口可用(一站式入口)。
如果 Chrome 已在运行,直接返回 True。
否则尝试启动 Chrome 并等待端口就绪。
Args:
port: 远程调试端口。
headless: 是否无头模式(仅新启动时生效)。
user_data_dir: 用户数据目录。
chrome_bin: Chrome 可执行文件路径。
Returns:
True 表示 Chrome 可用,False 表示启动失败。
"""
if is_port_open(port):
return True
try:
launch_chrome(
port=port, headless=headless, user_data_dir=user_data_dir, chrome_bin=chrome_bin,
)
return is_port_open(port)
except FileNotFoundError as e:
logger.error("启动 Chrome 失败: %s", e)
return False
def restart_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> subprocess.Popen | None:
"""重启 Chrome:关闭当前实例后以新模式重新启动。
Args:
port: 远程调试端口。
headless: 是否无头模式。
user_data_dir: 用户数据目录。
chrome_bin: Chrome 可执行文件路径。
Returns:
新的 Chrome 子进程,或 None。
"""
logger.info("重启 Chrome: port=%d, headless=%s", port, headless)
kill_chrome(port)
time.sleep(1)
return launch_chrome(
port=port,
headless=headless,
user_data_dir=user_data_dir,
chrome_bin=chrome_bin,
)
def _wait_for_chrome(port: int, timeout: float = 15.0) -> None:
"""等待 Chrome 调试端口就绪(TCP 级检测)。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if is_port_open(port):
logger.info("Chrome 已就绪 (port=%d)", port)
return
time.sleep(0.5)
logger.warning("等待 Chrome 就绪超时 (port=%d)", port)
def _find_pids_by_port(port: int) -> list[int]:
"""查找占用指定端口的进程 PID(跨平台)。"""
try:
if sys.platform == "win32":
result = subprocess.run(
["netstat", "-ano", "-p", "TCP"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return []
pids: list[int] = []
for line in result.stdout.splitlines():
if f":{port}" in line and "LISTENING" in line:
parts = line.split()
with contextlib.suppress(ValueError, IndexError):
pids.append(int(parts[-1]))
return list(set(pids))
else:
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0 or not result.stdout.strip():
return []
pids = []
for p in result.stdout.strip().split("\n"):
with contextlib.suppress(ValueError):
pids.append(int(p))
return pids
except Exception:
return []
def _kill_pid(pid: int) -> None:
"""终止指定 PID 的进程(跨平台)。"""
try:
if sys.platform == "win32":
subprocess.run(
["taskkill", "/PID", str(pid), "/F"],
capture_output=True,
timeout=5,
)
else:
import signal
os.kill(pid, signal.SIGTERM)
except Exception:
logger.debug("终止进程 %d 失败", pid)
def _mask_proxy(proxy_url: str) -> str:
"""隐藏代理 URL 中的敏感信息。"""
from urllib.parse import urlparse
try:
parsed = urlparse(proxy_url)
if parsed.username:
return proxy_url.replace(parsed.username, "***").replace(parsed.password or "", "***")
except Exception:
pass
return proxy_url
def has_display() -> bool:
"""检测当前环境是否有图形界面(用于自动选择登录方式)。"""
system = platform.system()
if system in ("Windows", "Darwin"):
return True # Windows / macOS 默认有 GUI
# Linux: 检查 DISPLAY 或 WAYLAND_DISPLAY 环境变量
return bool(os.getenv("DISPLAY") or os.getenv("WAYLAND_DISPLAY"))
FILE:scripts/cli.py
"""统一 CLI 入口,对应 Go MCP 工具的 13 个子命令。
全局选项: --host, --port, --account
输出: JSON(ensure_ascii=False)
退出码: 0=成功, 1=未登录, 2=错误
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
# Windows 控制台默认编码(如 cp1252)不支持中文,强制 UTF-8
if sys.stdout and hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if sys.stderr and hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger("xhs-cli")
def _output(data: dict, exit_code: int = 0) -> None:
"""输出 JSON 并退出。"""
print(json.dumps(data, ensure_ascii=False, indent=2))
sys.exit(exit_code)
def _connect(args: argparse.Namespace):
"""连接到 Chrome 并返回 (browser, page)。"""
from chrome_launcher import ensure_chrome
from xhs.cdp import Browser
if not ensure_chrome(port=args.port, headless=True):
_output(
{"success": False, "error": "无法启动 Chrome,请检查 Chrome 是否已安装"},
exit_code=2,
)
browser = Browser(host=args.host, port=args.port)
browser.connect()
page = browser.new_page()
return browser, page
def _connect_existing(args: argparse.Namespace):
"""连接到 Chrome 并复用已有页面(用于分步发布的后续步骤)。"""
from chrome_launcher import ensure_chrome
from xhs.cdp import Browser
if not ensure_chrome(port=args.port):
_output(
{"success": False, "error": "无法连接到 Chrome"},
exit_code=2,
)
browser = Browser(host=args.host, port=args.port)
browser.connect()
page = browser.get_existing_page()
if not page:
_output(
{"success": False, "error": "未找到已打开的页面,请先执行前置步骤"},
exit_code=2,
)
return browser, page
def _headless_fallback(port: int) -> None:
"""Headless 模式未登录时自动降级到有窗口模式。"""
from chrome_launcher import restart_chrome
logger.info("Headless 模式未登录,切换到有窗口模式...")
restart_chrome(port=port, headless=False)
_output(
{
"success": False,
"error": "未登录",
"action": "switched_to_headed",
"message": "已切换到有窗口模式,请在浏览器中扫码登录",
},
exit_code=1,
)
# ========== 子命令实现 ==========
def cmd_check_login(args: argparse.Namespace) -> None:
"""检查登录状态。"""
from xhs.login import check_login_status
browser, page = _connect(args)
try:
logged_in = check_login_status(page)
if logged_in:
_output({"logged_in": True}, exit_code=0)
else:
from chrome_launcher import has_display
method = "qrcode" if has_display() else "phone"
hint = (
"请运行 login(二维码)完成登录"
if method == "qrcode"
else "请运行 send-code --phone <手机号>(手机验证码)完成登录"
)
_output({"logged_in": False, "login_method": method, "hint": hint}, exit_code=1)
finally:
browser.close_page(page)
browser.close()
def cmd_login(args: argparse.Namespace) -> None:
"""获取登录二维码并等待扫码。"""
from xhs.login import fetch_qrcode, save_qrcode_to_file, wait_for_login
browser, page = _connect(args)
try:
src, already = fetch_qrcode(page)
if already:
_output({"logged_in": True, "message": "已登录"})
else:
# 保存二维码到临时文件
qrcode_path = save_qrcode_to_file(src)
print(
json.dumps(
{
"qrcode_path": qrcode_path,
"message": "请扫码登录,二维码已保存到文件",
},
ensure_ascii=False,
)
)
success = wait_for_login(page, timeout=120)
_output(
{"logged_in": success, "message": "登录成功" if success else "登录超时"},
exit_code=0 if success else 2,
)
finally:
browser.close_page(page)
browser.close()
def cmd_phone_login(args: argparse.Namespace) -> None:
"""手机号+验证码登录(适用于无界面服务器)。"""
from xhs.login import send_phone_code, submit_phone_code
browser, page = _connect(args)
try:
sent = send_phone_code(page, args.phone)
if not sent:
_output({"logged_in": True, "message": "已登录,无需重新登录"})
return
# 输出提示,等待用户在终端输入验证码
print(
json.dumps(
{"status": "code_sent", "message": f"验证码已发送至 {args.phone[:3]}****{args.phone[-4:]}"},
ensure_ascii=False,
),
flush=True,
)
# 从 --code 参数或交互式 stdin 读取验证码
if args.code:
code = args.code.strip()
else:
try:
code = input("请输入验证码: ").strip()
except EOFError:
_output({"success": False, "error": "未收到验证码输入"}, exit_code=2)
return
if not code:
_output({"success": False, "error": "验证码不能为空"}, exit_code=2)
return
success = submit_phone_code(page, code)
_output(
{"logged_in": success, "message": "登录成功" if success else "验证码错误或超时"},
exit_code=0 if success else 2,
)
finally:
browser.close_page(page)
browser.close()
def cmd_send_code(args: argparse.Namespace) -> None:
"""分步登录第一步:填写手机号并发送验证码,保持页面不关闭。"""
from chrome_launcher import restart_chrome
from xhs.errors import RateLimitError
from xhs.login import send_phone_code
for attempt in range(2):
browser, page = _connect(args)
try:
sent = send_phone_code(page, args.phone)
if not sent:
_output({"logged_in": True, "message": "已登录,无需重新登录"})
return
_output({
"status": "code_sent",
"message": f"验证码已发送至 {args.phone[:3]}****{args.phone[-4:]},请运行 verify-code --code <验证码>",
})
except RateLimitError:
browser.close()
if attempt == 0:
logger.info("请求频率限制,重启 Chrome 后重试...")
restart_chrome(port=args.port)
continue
_output({"success": False, "error": "请求太频繁,重启后仍失败,请稍后再试"}, exit_code=2)
else:
# 只断开控制连接,不关闭页面——tab 保持打开,verify-code 继续复用
browser.close()
return
def cmd_verify_code(args: argparse.Namespace) -> None:
"""分步登录第二步:在已有页面上填写验证码并提交。"""
from xhs.login import submit_phone_code
browser, page = _connect_existing(args)
try:
success = submit_phone_code(page, args.code)
_output(
{"logged_in": success, "message": "登录成功" if success else "验证码错误或超时"},
exit_code=0 if success else 2,
)
finally:
browser.close_page(page)
browser.close()
def cmd_delete_cookies(args: argparse.Namespace) -> None:
"""退出登录(页面 UI 点击退出)并删除 cookies 文件。"""
from xhs.cookies import delete_cookies, get_cookies_file_path
from xhs.login import logout
# 先通过浏览器 UI 退出登录
browser, page = _connect(args)
try:
logged_out = logout(page)
finally:
browser.close_page(page)
browser.close()
# 再删除本地 cookies 文件
path = get_cookies_file_path(args.account)
delete_cookies(path)
msg = "已退出登录并删除 cookies" if logged_out else "未登录,已删除 cookies 文件"
_output({"success": True, "message": msg, "cookies_path": path})
def cmd_list_feeds(args: argparse.Namespace) -> None:
"""获取首页 Feed 列表。"""
from xhs.feeds import list_feeds
browser, page = _connect(args)
try:
feeds = list_feeds(page)
_output({"feeds": [f.to_dict() for f in feeds], "count": len(feeds)})
finally:
browser.close_page(page)
browser.close()
def cmd_search_feeds(args: argparse.Namespace) -> None:
"""搜索 Feeds。"""
from xhs.search import search_feeds
from xhs.types import FilterOption
filter_opt = FilterOption(
sort_by=args.sort_by or "",
note_type=args.note_type or "",
publish_time=args.publish_time or "",
search_scope=args.search_scope or "",
location=args.location or "",
)
browser, page = _connect(args)
try:
feeds = search_feeds(page, args.keyword, filter_opt)
_output({"feeds": [f.to_dict() for f in feeds], "count": len(feeds)})
finally:
browser.close_page(page)
browser.close()
def cmd_get_feed_detail(args: argparse.Namespace) -> None:
"""获取 Feed 详情。"""
from xhs.feed_detail import get_feed_detail
from xhs.types import CommentLoadConfig
config = CommentLoadConfig(
click_more_replies=args.click_more_replies,
max_replies_threshold=args.max_replies_threshold,
max_comment_items=args.max_comment_items,
scroll_speed=args.scroll_speed,
)
browser, page = _connect(args)
try:
detail = get_feed_detail(
page,
args.feed_id,
args.xsec_token,
load_all_comments=args.load_all_comments,
config=config,
)
_output(detail.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_user_profile(args: argparse.Namespace) -> None:
"""获取用户主页。"""
from xhs.user_profile import get_user_profile
browser, page = _connect(args)
try:
profile = get_user_profile(page, args.user_id, args.xsec_token)
_output(profile.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_post_comment(args: argparse.Namespace) -> None:
"""发表评论。"""
from xhs.comment import post_comment
browser, page = _connect(args)
try:
post_comment(page, args.feed_id, args.xsec_token, args.content)
_output({"success": True, "message": "评论发送成功"})
finally:
browser.close_page(page)
browser.close()
def cmd_reply_comment(args: argparse.Namespace) -> None:
"""回复评论。"""
from xhs.comment import reply_comment
browser, page = _connect(args)
try:
reply_comment(
page,
args.feed_id,
args.xsec_token,
args.content,
comment_id=args.comment_id or "",
user_id=args.user_id or "",
)
_output({"success": True, "message": "回复成功"})
finally:
browser.close_page(page)
browser.close()
def cmd_like_feed(args: argparse.Namespace) -> None:
"""点赞/取消点赞。"""
from xhs.like_favorite import like_feed, unlike_feed
browser, page = _connect(args)
try:
if args.unlike:
result = unlike_feed(page, args.feed_id, args.xsec_token)
else:
result = like_feed(page, args.feed_id, args.xsec_token)
_output(result.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_favorite_feed(args: argparse.Namespace) -> None:
"""收藏/取消收藏。"""
from xhs.like_favorite import favorite_feed, unfavorite_feed
browser, page = _connect(args)
try:
if args.unfavorite:
result = unfavorite_feed(page, args.feed_id, args.xsec_token)
else:
result = favorite_feed(page, args.feed_id, args.xsec_token)
_output(result.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_publish(args: argparse.Namespace) -> None:
"""发布图文内容。"""
from image_downloader import process_images
from xhs.login import check_login_status
from xhs.publish import publish_image_content
from xhs.types import PublishImageContent
# 读取标题和正文
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
# 处理图片
image_paths = process_images(args.images) if args.images else []
if not image_paths:
_output({"success": False, "error": "没有有效的图片"}, exit_code=2)
browser, page = _connect(args)
try:
# headless 模式登录检查 + 自动降级
headless = getattr(args, "headless", False)
if headless and not check_login_status(page):
browser.close_page(page)
browser.close()
_headless_fallback(args.port)
return
publish_image_content(
page,
PublishImageContent(
title=title,
content=content,
tags=args.tags or [],
image_paths=image_paths,
schedule_time=args.schedule_at,
is_original=args.original,
visibility=args.visibility or "",
),
)
_output({"success": True, "title": title, "images": len(image_paths), "status": "发布完成"})
finally:
browser.close_page(page)
browser.close()
def cmd_fill_publish(args: argparse.Namespace) -> None:
"""只填写图文表单,不发布。"""
from image_downloader import process_images
from xhs.publish import fill_publish_form
from xhs.types import PublishImageContent
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
image_paths = process_images(args.images) if args.images else []
if not image_paths:
_output({"success": False, "error": "没有有效的图片"}, exit_code=2)
browser, page = _connect(args)
try:
fill_publish_form(
page,
PublishImageContent(
title=title,
content=content,
tags=args.tags or [],
image_paths=image_paths,
schedule_time=args.schedule_at,
is_original=args.original,
visibility=args.visibility or "",
),
)
_output(
{
"success": True,
"title": title,
"images": len(image_paths),
"status": "表单已填写,等待确认发布",
}
)
finally:
# 不关闭页面,让用户在浏览器中预览
browser.close()
def cmd_fill_publish_video(args: argparse.Namespace) -> None:
"""只填写视频表单,不发布。"""
from xhs.publish_video import fill_publish_video_form
from xhs.types import PublishVideoContent
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
browser, page = _connect(args)
try:
fill_publish_video_form(
page,
PublishVideoContent(
title=title,
content=content,
tags=args.tags or [],
video_path=args.video,
schedule_time=args.schedule_at,
visibility=args.visibility or "",
),
)
_output(
{
"success": True,
"title": title,
"video": args.video,
"status": "视频表单已填写,等待确认发布",
}
)
finally:
# 不关闭页面,让用户在浏览器中预览
browser.close()
def cmd_click_publish(args: argparse.Namespace) -> None:
"""点击发布按钮(在用户确认后调用)。复用已有的发布页 tab。"""
from xhs.publish import click_publish_button
browser, page = _connect_existing(args)
try:
click_publish_button(page)
_output({"success": True, "status": "发布完成"})
finally:
browser.close_page(page)
browser.close()
def cmd_save_draft(args: argparse.Namespace) -> None:
"""保存为草稿(取消发布时调用)。"""
from xhs.publish import save_as_draft
browser, page = _connect_existing(args)
try:
save_as_draft(page)
_output({"success": True, "status": "内容已保存到草稿箱"})
finally:
browser.close_page(page)
browser.close()
def cmd_long_article(args: argparse.Namespace) -> None:
"""长文模式:填写内容 + 一键排版,返回模板列表。"""
from xhs.publish_long_article import publish_long_article
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
browser, page = _connect(args)
try:
template_names = publish_long_article(
page,
title=title,
content=content,
image_paths=args.images,
)
_output(
{
"success": True,
"templates": template_names,
"status": "长文已填写,请选择模板",
}
)
finally:
# 不关闭页面,后续 select-template / next-step 需要复用
browser.close()
def cmd_select_template(args: argparse.Namespace) -> None:
"""选择排版模板。复用已有的长文编辑页 tab。"""
from xhs.publish_long_article import select_template
browser, page = _connect_existing(args)
try:
selected = select_template(page, args.name)
if selected:
_output({"success": True, "template": args.name, "status": "模板已选择"})
else:
_output(
{"success": False, "error": f"未找到模板: {args.name}"},
exit_code=2,
)
finally:
# 不关闭页面,后续 next-step 需要复用
browser.close()
def cmd_next_step(args: argparse.Namespace) -> None:
"""点击下一步 + 填写发布页描述。复用已有的长文编辑页 tab。"""
from xhs.publish_long_article import click_next_and_fill_description
with open(args.content_file, encoding="utf-8") as f:
description = f.read().strip()
browser, page = _connect_existing(args)
try:
click_next_and_fill_description(page, description)
_output({"success": True, "status": "已进入发布页,等待确认发布"})
finally:
# 不关闭页面,等待 click-publish
browser.close()
def cmd_publish_video(args: argparse.Namespace) -> None:
"""发布视频内容。"""
from xhs.login import check_login_status
from xhs.publish_video import publish_video_content
from xhs.types import PublishVideoContent
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
browser, page = _connect(args)
try:
# headless 模式登录检查 + 自动降级
headless = getattr(args, "headless", False)
if headless and not check_login_status(page):
browser.close_page(page)
browser.close()
_headless_fallback(args.port)
return
publish_video_content(
page,
PublishVideoContent(
title=title,
content=content,
tags=args.tags or [],
video_path=args.video,
schedule_time=args.schedule_at,
visibility=args.visibility or "",
),
)
_output({"success": True, "title": title, "video": args.video, "status": "发布完成"})
finally:
browser.close_page(page)
browser.close()
# ========== 参数解析 ==========
def build_parser() -> argparse.ArgumentParser:
"""构建 CLI 参数解析器。"""
parser = argparse.ArgumentParser(
prog="xhs-cli",
description="小红书自动化 CLI",
)
# 全局选项
parser.add_argument("--host", default="127.0.0.1", help="Chrome 调试主机 (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=9222, help="Chrome 调试端口 (default: 9222)")
parser.add_argument("--account", default="", help="账号名称")
subparsers = parser.add_subparsers(dest="command", required=True)
# check-login
sub = subparsers.add_parser("check-login", help="检查登录状态")
sub.set_defaults(func=cmd_check_login)
# login
sub = subparsers.add_parser("login", help="登录(扫码)")
sub.set_defaults(func=cmd_login)
# phone-login(单命令交互式)
sub = subparsers.add_parser("phone-login", help="手机号+验证码登录(交互式,适合本地终端)")
sub.add_argument("--phone", required=True, help="手机号(不含国家码,如 13800138000)")
sub.add_argument("--code", default="", help="短信验证码(省略则交互式输入)")
sub.set_defaults(func=cmd_phone_login)
# send-code(分步登录第一步)
sub = subparsers.add_parser("send-code", help="分步登录第一步:发送手机验证码,保持页面不关闭")
sub.add_argument("--phone", required=True, help="手机号(不含国家码)")
sub.set_defaults(func=cmd_send_code)
# verify-code(分步登录第二步)
sub = subparsers.add_parser("verify-code", help="分步登录第二步:填写验证码并完成登录")
sub.add_argument("--code", required=True, help="收到的短信验证码")
sub.set_defaults(func=cmd_verify_code)
# delete-cookies
sub = subparsers.add_parser("delete-cookies", help="删除 cookies")
sub.set_defaults(func=cmd_delete_cookies)
# list-feeds
sub = subparsers.add_parser("list-feeds", help="获取首页 Feed 列表")
sub.set_defaults(func=cmd_list_feeds)
# search-feeds
sub = subparsers.add_parser("search-feeds", help="搜索 Feeds")
sub.add_argument("--keyword", required=True, help="搜索关键词")
sub.add_argument("--sort-by", help="排序: 综合|最新|最多点赞|最多评论|最多收藏")
sub.add_argument("--note-type", help="类型: 不限|视频|图文")
sub.add_argument("--publish-time", help="时间: 不限|一天内|一周内|半年内")
sub.add_argument("--search-scope", help="范围: 不限|已看过|未看过|已关注")
sub.add_argument("--location", help="位置: 不限|同城|附近")
sub.set_defaults(func=cmd_search_feeds)
# get-feed-detail
sub = subparsers.add_parser("get-feed-detail", help="获取 Feed 详情")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--load-all-comments", action="store_true", help="加载全部评论")
sub.add_argument("--click-more-replies", action="store_true", help="点击展开更多回复")
sub.add_argument("--max-replies-threshold", type=int, default=10, help="展开回复数阈值")
sub.add_argument("--max-comment-items", type=int, default=0, help="最大评论数 (0=不限)")
sub.add_argument("--scroll-speed", default="normal", help="滚动速度: slow|normal|fast")
sub.set_defaults(func=cmd_get_feed_detail)
# user-profile
sub = subparsers.add_parser("user-profile", help="获取用户主页")
sub.add_argument("--user-id", required=True, help="用户 ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.set_defaults(func=cmd_user_profile)
# post-comment
sub = subparsers.add_parser("post-comment", help="发表评论")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--content", required=True, help="评论内容")
sub.set_defaults(func=cmd_post_comment)
# reply-comment
sub = subparsers.add_parser("reply-comment", help="回复评论")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--content", required=True, help="回复内容")
sub.add_argument("--comment-id", help="目标评论 ID")
sub.add_argument("--user-id", help="目标用户 ID")
sub.set_defaults(func=cmd_reply_comment)
# like-feed
sub = subparsers.add_parser("like-feed", help="点赞")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--unlike", action="store_true", help="取消点赞")
sub.set_defaults(func=cmd_like_feed)
# favorite-feed
sub = subparsers.add_parser("favorite-feed", help="收藏")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--unfavorite", action="store_true", help="取消收藏")
sub.set_defaults(func=cmd_favorite_feed)
# publish
sub = subparsers.add_parser("publish", help="发布图文")
sub.add_argument("--title-file", required=True, help="标题文件路径")
sub.add_argument("--content-file", required=True, help="正文文件路径")
sub.add_argument("--images", nargs="+", required=True, help="图片路径/URL")
sub.add_argument("--tags", nargs="*", help="标签")
sub.add_argument("--schedule-at", help="定时发布 (ISO8601)")
sub.add_argument("--original", action="store_true", help="声明原创")
sub.add_argument("--visibility", help="可见范围")
sub.add_argument("--headless", action="store_true", help="无头模式(未登录自动降级)")
sub.set_defaults(func=cmd_publish)
# publish-video
sub = subparsers.add_parser("publish-video", help="发布视频")
sub.add_argument("--title-file", required=True, help="标题文件路径")
sub.add_argument("--content-file", required=True, help="正文文件路径")
sub.add_argument("--video", required=True, help="视频文件路径")
sub.add_argument("--tags", nargs="*", help="标签")
sub.add_argument("--schedule-at", help="定时发布 (ISO8601)")
sub.add_argument("--visibility", help="可见范围")
sub.add_argument("--headless", action="store_true", help="无头模式(未登录自动降级)")
sub.set_defaults(func=cmd_publish_video)
# fill-publish(只填写图文表单,不发布)
sub = subparsers.add_parser("fill-publish", help="填写图文表单(不发布)")
sub.add_argument("--title-file", required=True, help="标题文件路径")
sub.add_argument("--content-file", required=True, help="正文文件路径")
sub.add_argument("--images", nargs="+", required=True, help="图片路径/URL")
sub.add_argument("--tags", nargs="*", help="标签")
sub.add_argument("--schedule-at", help="定时发布 (ISO8601)")
sub.add_argument("--original", action="store_true", help="声明原创")
sub.add_argument("--visibility", help="可见范围")
sub.set_defaults(func=cmd_fill_publish)
# fill-publish-video(只填写视频表单,不发布)
sub = subparsers.add_parser("fill-publish-video", help="填写视频表单(不发布)")
sub.add_argument("--title-file", required=True, help="标题文件路径")
sub.add_argument("--content-file", required=True, help="正文文件路径")
sub.add_argument("--video", required=True, help="视频文件路径")
sub.add_argument("--tags", nargs="*", help="标签")
sub.add_argument("--schedule-at", help="定时发布 (ISO8601)")
sub.add_argument("--visibility", help="可见范围")
sub.set_defaults(func=cmd_fill_publish_video)
# click-publish(点击发布按钮)
sub = subparsers.add_parser("click-publish", help="点击发布按钮")
sub.set_defaults(func=cmd_click_publish)
# long-article(长文模式)
sub = subparsers.add_parser("long-article", help="长文模式:填写 + 一键排版")
sub.add_argument("--title-file", required=True, help="标题文件路径")
sub.add_argument("--content-file", required=True, help="正文文件路径")
sub.add_argument("--images", nargs="*", help="可选图片路径")
sub.set_defaults(func=cmd_long_article)
# select-template(选择模板)
sub = subparsers.add_parser("select-template", help="选择排版模板")
sub.add_argument("--name", required=True, help="模板名称")
sub.set_defaults(func=cmd_select_template)
# next-step(下一步 + 填写描述)
sub = subparsers.add_parser("next-step", help="点击下一步 + 填写描述")
sub.add_argument("--content-file", required=True, help="描述内容文件路径")
sub.set_defaults(func=cmd_next_step)
# save-draft(保存草稿)
sub = subparsers.add_parser("save-draft", help="保存为草稿(取消发布时使用)")
sub.set_defaults(func=cmd_save_draft)
return parser
def main() -> None:
"""CLI 入口。"""
parser = build_parser()
args = parser.parse_args()
try:
args.func(args)
except Exception as e:
logger.error("执行失败: %s", e, exc_info=True)
_output({"success": False, "error": str(e)}, exit_code=2)
if __name__ == "__main__":
main()
FILE:scripts/image_downloader.py
"""媒体下载(SHA256 缓存),对应 Go pkg/downloader/images.go。"""
from __future__ import annotations
import hashlib
import logging
import os
import time
from urllib.parse import urlparse
import requests
logger = logging.getLogger(__name__)
_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
# 已知图片扩展名
_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
def is_image_url(path: str) -> bool:
"""判断字符串是否为图片/媒体 URL。"""
return path.lower().startswith(("http://", "https://"))
class ImageDownloader:
"""图片下载器(带 SHA256 缓存)。"""
def __init__(self, save_path: str) -> None:
self.save_path = save_path
os.makedirs(save_path, exist_ok=True)
self._session = requests.Session()
self._session.timeout = 30
def download_image(self, image_url: str) -> str:
"""下载单张图片,返回本地文件路径。
如果文件已存在(通过 URL hash 判断),直接返回路径。
Raises:
ValueError: URL 格式无效。
RuntimeError: 下载失败。
"""
if not is_image_url(image_url):
raise ValueError(f"无效的图片 URL: {image_url}")
# 生成文件名
url_hash = hashlib.sha256(image_url.encode()).hexdigest()[:16]
ext = self._detect_extension(image_url)
filename = f"img_{url_hash}_{int(time.time())}{ext}"
filepath = os.path.join(self.save_path, filename)
# 检查是否已有同 hash 的文件
existing = self._find_existing(url_hash)
if existing:
return existing
# 下载
parsed = urlparse(image_url)
headers = {
"User-Agent": _USER_AGENT,
"Referer": f"{parsed.scheme}://{parsed.hostname}/",
}
resp = self._session.get(image_url, headers=headers)
if resp.status_code != 200:
raise RuntimeError(f"下载失败 (status={resp.status_code}): {image_url}")
# 保存
with open(filepath, "wb") as f:
f.write(resp.content)
logger.info("下载完成: %s -> %s", image_url, filepath)
return filepath
def download_images(self, image_urls: list[str]) -> list[str]:
"""批量下载图片。"""
paths = []
for url in image_urls:
try:
path = self.download_image(url)
paths.append(path)
except Exception as e:
logger.error("下载失败 %s: %s", url, e)
return paths
def _detect_extension(self, url: str) -> str:
"""从 URL 推断文件扩展名。"""
parsed = urlparse(url)
path = parsed.path.lower()
for ext in _IMAGE_EXTENSIONS:
if path.endswith(ext):
return ext
return ".jpg" # 默认
def _find_existing(self, url_hash: str) -> str | None:
"""查找已有同 hash 的文件。"""
prefix = f"img_{url_hash}_"
for filename in os.listdir(self.save_path):
if filename.startswith(prefix):
return os.path.join(self.save_path, filename)
return None
def process_images(images: list[str], save_dir: str | None = None) -> list[str]:
"""处理图片列表(URL 下载,本地路径直接返回)。"""
if not save_dir:
save_dir = os.path.join(os.path.expanduser("~"), ".xhs", "images")
downloader = ImageDownloader(save_dir)
result = []
for img in images:
if is_image_url(img):
path = downloader.download_image(img)
result.append(path)
else:
# 本地路径
if os.path.exists(img):
result.append(os.path.abspath(img))
else:
logger.warning("文件不存在: %s", img)
return result
FILE:scripts/publish_pipeline.py
"""发布编排器:下载 → 登录检查 → 发布 → 报告。"""
from __future__ import annotations
import json
import logging
import sys
from image_downloader import process_images
from title_utils import calc_title_length
from xhs.cdp import Browser
from xhs.login import check_login_status
from xhs.publish import publish_image_content
from xhs.publish_video import publish_video_content
from xhs.types import PublishImageContent, PublishVideoContent
logger = logging.getLogger(__name__)
def run_publish_pipeline(
title: str,
content: str,
images: list[str] | None = None,
video: str | None = None,
tags: list[str] | None = None,
schedule_time: str | None = None,
is_original: bool = False,
visibility: str = "",
host: str = "127.0.0.1",
port: int = 9222,
account: str = "",
headless: bool = False,
) -> dict:
"""执行完整发布流水线。
当 headless=True 且未登录时,自动降级到有窗口模式。
Returns:
发布结果字典。
"""
# 标题长度校验
title_len = calc_title_length(title)
if title_len > 20:
return {"success": False, "error": f"标题长度超限: {title_len}/20"}
# 处理图片(下载 URL / 验证本地路径)
local_images: list[str] = []
if images:
local_images = process_images(images)
if not local_images:
return {"success": False, "error": "没有有效的图片"}
# 连接浏览器
browser = Browser(host=host, port=port)
browser.connect()
try:
page = browser.new_page()
try:
# 登录检查
if not check_login_status(page):
browser.close_page(page)
browser.close()
# Headless 自动降级:切换到有窗口模式
if headless:
from chrome_launcher import restart_chrome
logger.info("Headless 模式未登录,切换到有窗口模式...")
restart_chrome(port=port, headless=False)
return {
"success": False,
"error": "未登录",
"action": "switched_to_headed",
"message": "已切换到有窗口模式,请在浏览器中扫码登录",
"exit_code": 1,
}
return {
"success": False,
"error": "未登录",
"exit_code": 1,
}
# 发布
if video:
publish_video_content(
page,
PublishVideoContent(
title=title,
content=content,
tags=tags or [],
video_path=video,
schedule_time=schedule_time,
visibility=visibility,
),
)
else:
publish_image_content(
page,
PublishImageContent(
title=title,
content=content,
tags=tags or [],
image_paths=local_images,
schedule_time=schedule_time,
is_original=is_original,
visibility=visibility,
),
)
return {
"success": True,
"title": title,
"content_length": len(content),
"images": len(local_images),
"video": video or "",
"status": "发布完成",
}
finally:
browser.close_page(page)
finally:
browser.close()
def main() -> None:
"""CLI 入口(被 cli.py 的 publish/publish-video 子命令调用时使用)。"""
import argparse
parser = argparse.ArgumentParser(description="小红书发布流水线")
parser.add_argument("--title-file", required=True, help="标题文件路径")
parser.add_argument("--content-file", required=True, help="正文文件路径")
parser.add_argument("--images", nargs="*", help="图片路径或 URL 列表")
parser.add_argument("--video", help="视频文件路径")
parser.add_argument("--tags", nargs="*", help="标签列表")
parser.add_argument("--schedule-at", help="定时发布时间 (ISO8601)")
parser.add_argument("--original", action="store_true", help="声明原创")
parser.add_argument("--visibility", default="", help="可见范围")
parser.add_argument("--headless", action="store_true", help="无头模式(未登录自动降级)")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=9222)
parser.add_argument("--account", default="")
args = parser.parse_args()
# 读取标题和正文
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
result = run_publish_pipeline(
title=title,
content=content,
images=args.images,
video=args.video,
tags=args.tags,
schedule_time=args.schedule_at,
is_original=args.original,
visibility=args.visibility,
host=args.host,
port=args.port,
account=args.account,
headless=args.headless,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
exit_code = result.get("exit_code", 0 if result["success"] else 2)
sys.exit(exit_code)
if __name__ == "__main__":
main()
FILE:scripts/run_lock.py
"""单实例锁,防止多个进程同时操作浏览器。"""
from __future__ import annotations
import contextlib
import logging
import os
import time
logger = logging.getLogger(__name__)
_DEFAULT_LOCK_FILE = os.path.join(os.path.expanduser("~"), ".xhs", "run.lock")
class RunLock:
"""文件锁,确保同一时间只有一个进程在操作。"""
def __init__(self, lock_file: str = _DEFAULT_LOCK_FILE) -> None:
self.lock_file = lock_file
self._fd: int | None = None
def acquire(self, timeout: float = 30.0) -> bool:
"""获取锁。
Args:
timeout: 超时时间(秒)。
Returns:
True 获取成功,False 超时。
"""
os.makedirs(os.path.dirname(self.lock_file), exist_ok=True)
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
self._fd = os.open(
self.lock_file,
os.O_CREAT | os.O_EXCL | os.O_WRONLY,
)
# 写入 PID
os.write(self._fd, str(os.getpid()).encode())
logger.debug("获取锁成功: %s", self.lock_file)
return True
except FileExistsError:
# 检查持有者是否还活着
if self._is_stale():
self._force_release()
continue
time.sleep(1)
logger.warning("获取锁超时: %s", self.lock_file)
return False
def release(self) -> None:
"""释放锁。"""
if self._fd is not None:
with contextlib.suppress(OSError):
os.close(self._fd)
self._fd = None
with contextlib.suppress(FileNotFoundError):
os.remove(self.lock_file)
logger.debug("释放锁: %s", self.lock_file)
def _is_stale(self) -> bool:
"""检查锁文件是否已过时(持有进程已退出)。"""
try:
with open(self.lock_file) as f:
pid = int(f.read().strip())
# 检查进程是否存在
os.kill(pid, 0)
return False
except (ValueError, OSError):
return True
def _force_release(self) -> None:
"""强制释放过时的锁。"""
with contextlib.suppress(FileNotFoundError):
os.remove(self.lock_file)
logger.info("强制释放过时锁: %s", self.lock_file)
def __enter__(self) -> RunLock:
if not self.acquire():
raise TimeoutError(f"无法获取锁: {self.lock_file}")
return self
def __exit__(self, *args: object) -> None:
self.release()
FILE:scripts/title_utils.py
"""UTF-16 标题长度计算,对应 Go pkg/xhsutil/title.go。"""
from __future__ import annotations
MAX_TITLE_LENGTH = 20
def calc_title_length(s: str) -> int:
"""计算小红书标题长度。
规则(同 Go CalcTitleLength):
- 非 ASCII 字符(中文、全角符号、emoji 代码单元等)算 2
- ASCII 字符算 1
- 最终结果向上取整除以 2,上限 MAX_TITLE_LENGTH = 20
Emoji 按 UTF-16 码元计数:
- 基础 emoji(如 ✨ U+2728, BMP)= 1 码元 → 权重 2 → 贡献 1
- SMP emoji(如 💇 U+1F487,surrogate pair)= 2 码元 → 权重 4 → 贡献 2
- ZWJ 序列(如 💇♀️)= 5 码元 → 权重 10 → 贡献 5
- 旗帜(如 🇨🇳,2 个 regional indicator)= 4 码元 → 权重 8 → 贡献 4
Examples:
>>> calc_title_length("你好世界")
4
>>> calc_title_length("hello")
3
>>> calc_title_length("OOTD穿搭分享")
6
>>> calc_title_length("💇\u200d♀️")
5
"""
byte_len = 0
encoded = s.encode("utf-16-le")
for i in range(0, len(encoded), 2):
code_unit = int.from_bytes(encoded[i : i + 2], "little")
if code_unit > 127:
byte_len += 2
else:
byte_len += 1
return (byte_len + 1) // 2
FILE:scripts/xhs/__init__.py
"""小红书 CDP 自动化核心包。"""
FILE:scripts/xhs/cdp.py
"""CDP WebSocket 客户端(Browser, Page, Element),对应 Go browser/browser.go + go-rod API。
通过原生 WebSocket 与 Chrome DevTools Protocol 通信,实现浏览器自动化控制。
"""
from __future__ import annotations
import json
import logging
import random
import time
from typing import Any
import requests
import websockets.sync.client as ws_client
from .errors import CDPError, ElementNotFoundError
from .stealth import REALISTIC_UA, STEALTH_JS
logger = logging.getLogger(__name__)
class CDPClient:
"""底层 CDP WebSocket 通信客户端。"""
def __init__(self, ws_url: str) -> None:
self._ws = ws_client.connect(ws_url, max_size=50 * 1024 * 1024)
self._id = 0
self._callbacks: dict[int, Any] = {}
def send(self, method: str, params: dict | None = None) -> dict:
"""发送 CDP 命令并等待结果。"""
self._id += 1
msg: dict[str, Any] = {"id": self._id, "method": method}
if params:
msg["params"] = params
self._ws.send(json.dumps(msg))
return self._wait_for(self._id)
def _wait_for(self, msg_id: int, timeout: float = 30.0) -> dict:
"""等待指定 id 的响应。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
raw = self._ws.recv(timeout=max(0.1, deadline - time.monotonic()))
except TimeoutError:
break
data = json.loads(raw)
if data.get("id") == msg_id:
if "error" in data:
raise CDPError(f"CDP 错误: {data['error']}")
return data.get("result", {})
raise CDPError(f"等待 CDP 响应超时 (id={msg_id})")
def close(self) -> None:
import contextlib
with contextlib.suppress(Exception):
self._ws.close()
class Page:
"""CDP 页面对象,封装常用操作。"""
def __init__(self, cdp: CDPClient, target_id: str, session_id: str) -> None:
self._cdp = cdp
self.target_id = target_id
self.session_id = session_id
self._ws = cdp._ws
self._id_counter = 1000
def _send_session(self, method: str, params: dict | None = None) -> dict:
"""向 session 发送命令。"""
self._id_counter += 1
msg: dict[str, Any] = {
"id": self._id_counter,
"method": method,
"sessionId": self.session_id,
}
if params:
msg["params"] = params
self._ws.send(json.dumps(msg))
return self._wait_session(self._id_counter)
def _wait_session(self, msg_id: int, timeout: float = 60.0) -> dict:
"""等待 session 响应。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
raw = self._ws.recv(timeout=max(0.1, deadline - time.monotonic()))
except TimeoutError:
break
data = json.loads(raw)
if data.get("id") == msg_id:
if "error" in data:
raise CDPError(f"CDP 错误: {data['error']}")
return data.get("result", {})
raise CDPError(f"等待 session 响应超时 (id={msg_id})")
def navigate(self, url: str) -> None:
"""导航到指定 URL。"""
logger.info("导航到: %s", url)
self._send_session("Page.navigate", {"url": url})
def wait_for_load(self, timeout: float = 60.0) -> None:
"""等待页面加载完成(通过轮询 document.readyState)。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
state = self.evaluate("document.readyState")
if state == "complete":
return
except CDPError:
pass
time.sleep(0.5)
logger.warning("等待页面加载超时")
def wait_dom_stable(self, timeout: float = 10.0, interval: float = 0.5) -> None:
"""等待 DOM 稳定(连续两次 DOM 快照一致)。"""
last_html = ""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
html = self.evaluate("document.body ? document.body.innerHTML.length : 0")
if html == last_html and html != "":
return
last_html = html
except CDPError:
pass
time.sleep(interval)
def evaluate(self, expression: str, timeout: float = 30.0) -> Any:
"""执行 JavaScript 表达式并返回结果。"""
result = self._send_session(
"Runtime.evaluate",
{
"expression": expression,
"returnByValue": True,
"awaitPromise": False,
},
)
if "exceptionDetails" in result:
raise CDPError(f"JS 执行异常: {result['exceptionDetails']}")
remote_obj = result.get("result", {})
return remote_obj.get("value")
def evaluate_function(self, function_body: str, *args: Any) -> Any:
"""执行 JavaScript 函数并返回结果。
function_body 是一个完整的函数体,如 `() => { return 1; }`
"""
result = self._send_session(
"Runtime.evaluate",
{
"expression": f"({function_body})()",
"returnByValue": True,
"awaitPromise": False,
},
)
if "exceptionDetails" in result:
raise CDPError(f"JS 函数执行异常: {result['exceptionDetails']}")
remote_obj = result.get("result", {})
return remote_obj.get("value")
def query_selector(self, selector: str) -> str | None:
"""查找单个元素,返回 objectId 或 None。"""
result = self._send_session(
"Runtime.evaluate",
{
"expression": f"document.querySelector({json.dumps(selector)})",
"returnByValue": False,
},
)
remote_obj = result.get("result", {})
if remote_obj.get("subtype") == "null" or remote_obj.get("type") == "undefined":
return None
return remote_obj.get("objectId")
def query_selector_all(self, selector: str) -> list[str]:
"""查找多个元素,返回 objectId 列表。"""
# 通过 JS 返回元素数量,然后逐个获取
count = self.evaluate(f"document.querySelectorAll({json.dumps(selector)}).length")
if not count:
return []
object_ids = []
for i in range(count):
result = self._send_session(
"Runtime.evaluate",
{
"expression": (f"document.querySelectorAll({json.dumps(selector)})[{i}]"),
"returnByValue": False,
},
)
obj = result.get("result", {})
oid = obj.get("objectId")
if oid:
object_ids.append(oid)
return object_ids
def has_element(self, selector: str) -> bool:
"""检查元素是否存在。"""
return self.evaluate(f"document.querySelector({json.dumps(selector)}) !== null") is True
def wait_for_element(self, selector: str, timeout: float = 30.0) -> str:
"""等待元素出现,返回 objectId。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
oid = self.query_selector(selector)
if oid:
return oid
time.sleep(0.5)
raise ElementNotFoundError(selector)
def click_element(self, selector: str) -> None:
"""点击指定选择器的元素(通过 CDP Input 事件,isTrusted=true)。"""
box = self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return null;
el.scrollIntoView({{block: 'center'}});
const rect = el.getBoundingClientRect();
return {{x: rect.left + rect.width / 2, y: rect.top + rect.height / 2}};
}})()
"""
)
if not box:
return
x = box["x"] + random.uniform(-3, 3)
y = box["y"] + random.uniform(-3, 3)
self.mouse_move(x, y)
time.sleep(random.uniform(0.03, 0.08))
self.mouse_click(x, y)
def input_text(self, selector: str, text: str) -> None:
"""向指定选择器的元素输入文本。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return;
el.focus();
el.value = {json.dumps(text)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
el.dispatchEvent(new Event('change', {{bubbles: true}}));
}})()
"""
)
def input_content_editable(self, selector: str, text: str) -> None:
"""向 contentEditable 元素输入文本(CDP 逐字输入,模拟真实打字)。"""
# 1. focus 元素
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.focus();
}})()
"""
)
time.sleep(0.1)
# 2. 全选清空(Ctrl+A + Backspace)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "key": "a", "code": "KeyA", "modifiers": 2},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "key": "a", "code": "KeyA", "modifiers": 2},
)
self._send_session(
"Input.dispatchKeyEvent",
{
"type": "keyDown",
"key": "Backspace",
"code": "Backspace",
"windowsVirtualKeyCode": 8,
},
)
self._send_session(
"Input.dispatchKeyEvent",
{
"type": "keyUp",
"key": "Backspace",
"code": "Backspace",
"windowsVirtualKeyCode": 8,
},
)
time.sleep(0.1)
# 3. 逐字输入(随机 30-80ms 间隔,换行符转为 Enter 键)
for char in text:
if char == "\n":
self.press_key("Enter")
else:
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "text": char},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "text": char},
)
time.sleep(random.uniform(0.03, 0.08))
def get_element_text(self, selector: str) -> str | None:
"""获取元素文本内容。"""
return self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
return el ? el.textContent : null;
}})()
"""
)
def get_element_attribute(self, selector: str, attr: str) -> str | None:
"""获取元素属性值。"""
return self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
return el ? el.getAttribute({json.dumps(attr)}) : null;
}})()
"""
)
def get_elements_count(self, selector: str) -> int:
"""获取匹配元素数量。"""
result = self.evaluate(f"document.querySelectorAll({json.dumps(selector)}).length")
return result if isinstance(result, int) else 0
def scroll_by(self, x: int, y: int) -> None:
"""滚动页面。"""
self.evaluate(f"window.scrollBy({x}, {y})")
def scroll_to(self, x: int, y: int) -> None:
"""滚动到指定位置。"""
self.evaluate(f"window.scrollTo({x}, {y})")
def scroll_to_bottom(self) -> None:
"""滚动到页面底部。"""
self.evaluate("window.scrollTo(0, document.body.scrollHeight)")
def scroll_element_into_view(self, selector: str) -> None:
"""将元素滚动到可视区域。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.scrollIntoView({{behavior: 'smooth', block: 'center'}});
}})()
"""
)
def scroll_nth_element_into_view(self, selector: str, index: int) -> None:
"""将第 N 个匹配元素滚动到可视区域。"""
self.evaluate(
f"""
(() => {{
const els = document.querySelectorAll({json.dumps(selector)});
if (els[{index}]) els[{index}].scrollIntoView(
{{behavior: 'smooth', block: 'center'}}
);
}})()
"""
)
def get_scroll_top(self) -> int:
"""获取当前滚动位置。"""
result = self.evaluate(
"window.pageYOffset || document.documentElement.scrollTop"
" || document.body.scrollTop || 0"
)
return int(result) if result else 0
def get_viewport_height(self) -> int:
"""获取视口高度。"""
result = self.evaluate("window.innerHeight")
return int(result) if result else 768
def set_file_input(self, selector: str, files: list[str]) -> None:
"""设置文件输入框的文件(通过 CDP DOM.setFileInputFiles)。"""
# 先获取 nodeId
doc = self._send_session("DOM.getDocument", {"depth": 0})
root_node_id = doc["root"]["nodeId"]
result = self._send_session(
"DOM.querySelector",
{"nodeId": root_node_id, "selector": selector},
)
node_id = result.get("nodeId", 0)
if node_id == 0:
raise ElementNotFoundError(selector)
self._send_session(
"DOM.setFileInputFiles",
{"nodeId": node_id, "files": files},
)
def dispatch_wheel_event(self, delta_y: float) -> None:
"""触发滚轮事件以激活懒加载。"""
self.evaluate(
f"""
(() => {{
let target = document.querySelector('.note-scroller')
|| document.querySelector('.interaction-container')
|| document.documentElement;
const event = new WheelEvent('wheel', {{
deltaY: {delta_y},
deltaMode: 0,
bubbles: true,
cancelable: true,
view: window,
}});
target.dispatchEvent(event);
}})()
"""
)
def mouse_move(self, x: float, y: float) -> None:
"""移动鼠标。"""
self._send_session(
"Input.dispatchMouseEvent",
{"type": "mouseMoved", "x": x, "y": y},
)
def mouse_click(self, x: float, y: float, button: str = "left") -> None:
"""在指定坐标点击。"""
self._send_session(
"Input.dispatchMouseEvent",
{"type": "mousePressed", "x": x, "y": y, "button": button, "clickCount": 1},
)
self._send_session(
"Input.dispatchMouseEvent",
{"type": "mouseReleased", "x": x, "y": y, "button": button, "clickCount": 1},
)
def type_text(self, text: str, delay_ms: int = 50) -> None:
"""逐字符输入文本。"""
for char in text:
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "text": char},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "text": char},
)
if delay_ms > 0:
time.sleep(delay_ms / 1000.0)
def press_key(self, key: str) -> None:
"""按下并释放指定键。"""
key_map = {
"Enter": {"key": "Enter", "code": "Enter", "windowsVirtualKeyCode": 13},
"ArrowDown": {
"key": "ArrowDown",
"code": "ArrowDown",
"windowsVirtualKeyCode": 40,
},
"Tab": {"key": "Tab", "code": "Tab", "windowsVirtualKeyCode": 9},
}
info = key_map.get(key, {"key": key, "code": key})
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", **info},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", **info},
)
def inject_stealth(self) -> None:
"""注入反检测脚本。"""
self._send_session(
"Page.addScriptToEvaluateOnNewDocument",
{"source": STEALTH_JS},
)
def remove_element(self, selector: str) -> None:
"""移除 DOM 元素。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.remove();
}})()
"""
)
def hover_element(self, selector: str) -> None:
"""悬停到元素中心。"""
box = self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return null;
const rect = el.getBoundingClientRect();
return {{x: rect.left + rect.width / 2, y: rect.top + rect.height / 2}};
}})()
"""
)
if box:
self.mouse_move(box["x"], box["y"])
def select_all_text(self, selector: str) -> None:
"""选中输入框内所有文本。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return;
el.focus();
el.select ? el.select() : document.execCommand('selectAll');
}})()
"""
)
class Browser:
"""Chrome 浏览器 CDP 控制器。"""
def __init__(self, host: str = "127.0.0.1", port: int = 9222) -> None:
self.host = host
self.port = port
self.base_url = f"http://{host}:{port}"
self._cdp: CDPClient | None = None
def connect(self) -> None:
"""连接到 Chrome DevTools。"""
resp = requests.get(f"{self.base_url}/json/version", timeout=5)
resp.raise_for_status()
info = resp.json()
ws_url = info["webSocketDebuggerUrl"]
logger.info("连接到 Chrome: %s", ws_url)
self._cdp = CDPClient(ws_url)
def new_page(self, url: str = "about:blank") -> Page:
"""创建新页面。"""
if not self._cdp:
self.connect()
assert self._cdp is not None
# 创建 target
result = self._cdp.send("Target.createTarget", {"url": url})
target_id = result["targetId"]
# 附加到 target
result = self._cdp.send(
"Target.attachToTarget",
{"targetId": target_id, "flatten": True},
)
session_id = result["sessionId"]
page = Page(self._cdp, target_id, session_id)
# 注入反检测(必须在 enable domains 之前)
page.inject_stealth()
# UA 覆盖
page._send_session(
"Emulation.setUserAgentOverride",
{"userAgent": REALISTIC_UA},
)
# 随机 viewport(模拟真实屏幕尺寸)
page._send_session(
"Emulation.setDeviceMetricsOverride",
{
"width": random.randint(1366, 1920),
"height": random.randint(768, 1080),
"deviceScaleFactor": 1,
"mobile": False,
},
)
# 拒绝权限弹窗(位置、通知等)
import contextlib
for perm in ("geolocation", "notifications", "midi", "camera", "microphone"):
with contextlib.suppress(CDPError):
self._cdp.send(
"Browser.setPermission",
{"permission": {"name": perm}, "setting": "denied"},
)
# 启用必要的 domain
page._send_session("Page.enable")
page._send_session("DOM.enable")
page._send_session("Runtime.enable")
return page
def get_existing_page(self) -> Page | None:
"""获取已有页面(取第一个非 about:blank 的 page target)。"""
if not self._cdp:
self.connect()
assert self._cdp is not None
resp = requests.get(f"{self.base_url}/json", timeout=5)
targets = resp.json()
for target in targets:
if target.get("type") == "page" and target.get("url") != "about:blank":
target_id = target["id"]
result = self._cdp.send(
"Target.attachToTarget",
{"targetId": target_id, "flatten": True},
)
session_id = result["sessionId"]
page = Page(self._cdp, target_id, session_id)
page._send_session("Page.enable")
page._send_session("DOM.enable")
page._send_session("Runtime.enable")
page.inject_stealth()
return page
return None
def close_page(self, page: Page) -> None:
"""关闭页面。"""
import contextlib
if self._cdp:
with contextlib.suppress(CDPError):
self._cdp.send("Target.closeTarget", {"targetId": page.target_id})
def close(self) -> None:
"""关闭连接。"""
if self._cdp:
self._cdp.close()
self._cdp = None
FILE:scripts/xhs/comment.py
"""评论操作,对应 Go xiaohongshu/comment_feed.go。"""
from __future__ import annotations
import logging
from .cdp import Page
from .feed_detail import _check_end_container, _check_page_accessible, _get_comment_count
from .human import sleep_random
from .selectors import (
COMMENT_INPUT_FIELD,
COMMENT_INPUT_TRIGGER,
COMMENT_SUBMIT_BUTTON,
PARENT_COMMENT,
REPLY_BUTTON,
)
from .urls import make_feed_detail_url
logger = logging.getLogger(__name__)
def post_comment(page: Page, feed_id: str, xsec_token: str, content: str) -> None:
"""发表评论到 Feed。
Args:
page: CDP 页面对象。
feed_id: Feed ID。
xsec_token: xsec_token。
content: 评论内容。
Raises:
RuntimeError: 评论失败。
"""
url = make_feed_detail_url(feed_id, xsec_token)
logger.info("打开 feed 详情页: %s", url)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
sleep_random(800, 1500)
_check_page_accessible(page)
# 点击评论输入触发区域
if not page.has_element(COMMENT_INPUT_TRIGGER):
raise RuntimeError("未找到评论输入框,该帖子可能不支持评论或网页端不可访问")
page.click_element(COMMENT_INPUT_TRIGGER)
sleep_random(400, 800)
# 输入评论内容(CDP 逐字输入)
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.input_content_editable(COMMENT_INPUT_FIELD, content)
sleep_random(600, 1200)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
sleep_random(800, 1500)
logger.info("评论发送成功: feed=%s", feed_id)
def reply_comment(
page: Page,
feed_id: str,
xsec_token: str,
content: str,
comment_id: str = "",
user_id: str = "",
) -> None:
"""回复指定评论。
通过 comment_id 或 user_id 定位评论,然后回复。
Args:
page: CDP 页面对象。
feed_id: Feed ID。
xsec_token: xsec_token。
content: 回复内容。
comment_id: 评论 ID(优先使用)。
user_id: 用户 ID(备选)。
Raises:
RuntimeError: 回复失败。
"""
if not comment_id and not user_id:
raise ValueError("comment_id 和 user_id 至少提供一个")
url = make_feed_detail_url(feed_id, xsec_token)
logger.info("打开 feed 详情页进行回复: %s", url)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
sleep_random(800, 1500)
_check_page_accessible(page)
sleep_random(1500, 2500)
# 查找目标评论
comment_found = _find_and_scroll_to_comment(page, comment_id, user_id)
if not comment_found:
raise RuntimeError(f"未找到评论 (commentID: {comment_id}, userID: {user_id})")
sleep_random(800, 1500)
# 点击回复按钮
reply_selector = f"#comment-{comment_id} {REPLY_BUTTON}" if comment_id else REPLY_BUTTON
page.click_element(reply_selector)
sleep_random(800, 1500)
# 输入回复内容(CDP 逐字输入)
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.input_content_editable(COMMENT_INPUT_FIELD, content)
sleep_random(600, 1200)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
sleep_random(1500, 2500)
logger.info("回复评论成功")
def _find_and_scroll_to_comment(
page: Page,
comment_id: str,
user_id: str,
max_attempts: int = 100,
) -> bool:
"""查找并滚动到目标评论。"""
logger.info("开始查找评论 - commentID: %s, userID: %s", comment_id, user_id)
# 先滚动到评论区
page.scroll_element_into_view(".comments-container")
sleep_random(800, 1500)
last_count = 0
stagnant = 0
for attempt in range(max_attempts):
# 检查是否到底
if _check_end_container(page):
logger.info("已到达评论底部,未找到目标评论")
break
# 停滞检测
current_count = _get_comment_count(page)
if current_count != last_count:
last_count = current_count
stagnant = 0
else:
stagnant += 1
if stagnant >= 10:
logger.info("评论数量停滞超过10次")
break
# 滚动到最后一条评论
if current_count > 0:
page.scroll_nth_element_into_view(PARENT_COMMENT, current_count - 1)
sleep_random(200, 500)
# 继续滚动
page.evaluate("window.scrollBy(0, window.innerHeight * 0.8)")
sleep_random(400, 800)
# 通过 commentID 查找
if comment_id:
selector = f"#comment-{comment_id}"
if page.has_element(selector):
logger.info("通过 commentID 找到评论 (尝试 %d 次)", attempt + 1)
page.scroll_element_into_view(selector)
return True
# 通过 userID 查找
if user_id:
found = page.evaluate(
f"""
(() => {{
const els = document.querySelectorAll(
'.parent-comment, .comment-item, .comment'
);
for (const el of els) {{
if (el.querySelector('[data-user-id="{user_id}"]')) {{
el.scrollIntoView({{behavior: 'smooth', block: 'center'}});
return true;
}}
}}
return false;
}})()
"""
)
if found:
logger.info("通过 userID 找到评论 (尝试 %d 次)", attempt + 1)
return True
sleep_random(600, 1200)
return False
def _js_str(s: str) -> str:
"""将 Python 字符串转为 JS 字面量(含引号)。"""
import json
return json.dumps(s)
FILE:scripts/xhs/cookies.py
"""Cookie 文件持久化,对应 Go cookies/cookies.go。"""
from __future__ import annotations
import os
from pathlib import Path
def get_cookies_file_path(account: str = "") -> str:
"""获取 cookies 文件路径。
优先级:
1. /tmp/cookies.json(向后兼容)
2. COOKIES_PATH 环境变量
3. 多账号模式:~/.xhs/accounts/{account}/cookies.json
4. ./cookies.json(本地调试)
"""
if account:
account_dir = Path.home() / ".xhs" / "accounts" / account
account_dir.mkdir(parents=True, exist_ok=True)
return str(account_dir / "cookies.json")
# 旧路径
import tempfile
old_path = os.path.join(tempfile.gettempdir(), "cookies.json")
if os.path.exists(old_path):
return old_path
# 环境变量
env_path = os.getenv("COOKIES_PATH")
if env_path:
return env_path
return "cookies.json"
def load_cookies(path: str) -> bytes | None:
"""从文件加载 cookies。"""
try:
with open(path, "rb") as f:
return f.read()
except FileNotFoundError:
return None
def save_cookies(path: str, data: bytes) -> None:
"""保存 cookies 到文件。"""
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "wb") as f:
f.write(data)
def delete_cookies(path: str) -> None:
"""删除 cookies 文件。"""
import contextlib
with contextlib.suppress(FileNotFoundError):
os.remove(path)
FILE:scripts/xhs/errors.py
"""小红书自动化异常体系。"""
class XHSError(Exception):
"""小红书自动化基础异常。"""
class NoFeedsError(XHSError):
"""没有捕获到 feeds 数据。"""
def __init__(self) -> None:
super().__init__("没有捕获到 feeds 数据")
class NoFeedDetailError(XHSError):
"""没有捕获到 feed 详情数据。"""
def __init__(self) -> None:
super().__init__("没有捕获到 feed 详情数据")
class NotLoggedInError(XHSError):
"""未登录。"""
def __init__(self) -> None:
super().__init__("未登录,请先扫码登录")
class PageNotAccessibleError(XHSError):
"""页面不可访问。"""
def __init__(self, reason: str) -> None:
self.reason = reason
super().__init__(f"笔记不可访问: {reason}")
class UploadTimeoutError(XHSError):
"""上传超时。"""
class PublishError(XHSError):
"""发布失败。"""
class TitleTooLongError(PublishError):
"""标题超过长度限制。"""
def __init__(self, current: str, maximum: str) -> None:
self.current = current
self.maximum = maximum
super().__init__(f"当前输入长度为{current},最大长度为{maximum}")
class ContentTooLongError(PublishError):
"""正文超过长度限制。"""
def __init__(self, current: str, maximum: str) -> None:
self.current = current
self.maximum = maximum
super().__init__(f"当前输入长度为{current},最大长度为{maximum}")
class RateLimitError(XHSError):
"""请求频率过高,验证码获取失败。"""
def __init__(self) -> None:
super().__init__("请求太频繁,验证码获取失败,请重启浏览器后重试")
class CDPError(XHSError):
"""CDP 通信异常。"""
class ElementNotFoundError(XHSError):
"""页面元素未找到。"""
def __init__(self, selector: str) -> None:
self.selector = selector
super().__init__(f"未找到元素: {selector}")
FILE:scripts/xhs/feed_detail.py
"""Feed 详情 + 评论加载,对应 Go xiaohongshu/feed_detail.go(867 行)。"""
from __future__ import annotations
import json
import logging
import random
import re
import time
from .cdp import Page
from .errors import NoFeedDetailError, PageNotAccessibleError
from .human import (
BUTTON_CLICK_INTERVAL,
DEFAULT_MAX_ATTEMPTS,
FINAL_SPRINT_PUSH_COUNT,
HUMAN_DELAY,
LARGE_SCROLL_TRIGGER,
MAX_CLICK_PER_ROUND,
MIN_SCROLL_DELTA,
POST_SCROLL,
REACTION_TIME,
READ_TIME,
SCROLL_WAIT,
SHORT_READ,
STAGNANT_LIMIT,
calculate_scroll_delta,
get_scroll_interval,
get_scroll_ratio,
sleep_random,
)
from .selectors import (
ACCESS_ERROR_WRAPPER,
END_CONTAINER,
NO_COMMENTS_TEXT,
PARENT_COMMENT,
SHOW_MORE_BUTTON,
)
from .types import (
CommentList,
CommentLoadConfig,
FeedDetail,
FeedDetailResponse,
)
from .urls import make_feed_detail_url
logger = logging.getLogger(__name__)
# 页面不可访问关键词
_INACCESSIBLE_KEYWORDS = [
"当前笔记暂时无法浏览",
"该内容因违规已被删除",
"该笔记已被删除",
"内容不存在",
"笔记不存在",
"已失效",
"私密笔记",
"仅作者可见",
"因用户设置,你无法查看",
"因违规无法查看",
"Isn't Available",
"isn't available",
]
# 扫码验证关键词(触发反爬机制)
_SCAN_QRCODE_KEYWORDS = [
"扫码查看",
"打开小红书App扫码",
"请使用小红书App扫码",
]
_REPLY_COUNT_RE = re.compile(r"展开\s*(\d+)\s*条回复")
_TOTAL_COMMENT_RE = re.compile(r"共(\d+)条评论")
def get_feed_detail(
page: Page,
feed_id: str,
xsec_token: str,
load_all_comments: bool = False,
config: CommentLoadConfig | None = None,
) -> FeedDetailResponse:
"""获取 Feed 详情(含评论)。
Args:
page: CDP 页面对象。
feed_id: Feed ID。
xsec_token: xsec_token。
load_all_comments: 是否加载全部评论。
config: 评论加载配置。
Raises:
PageNotAccessibleError: 页面不可访问。
NoFeedDetailError: 未获取到详情数据。
"""
if config is None:
config = CommentLoadConfig()
url = make_feed_detail_url(feed_id, xsec_token)
logger.info("打开 feed 详情页: %s", url)
logger.info(
"配置: 点击更多=%s, 回复阈值=%d, 最大评论数=%d, 滚动速度=%s",
config.click_more_replies,
config.max_replies_threshold,
config.max_comment_items,
config.scroll_speed,
)
# 导航(含重试)
for attempt in range(3):
try:
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
break
except Exception as e:
logger.debug("页面导航重试 #%d: %s", attempt, e)
time.sleep(0.5 + random.random())
else:
raise RuntimeError("页面导航失败")
sleep_random(800, 1500)
# 检查页面可访问性(扫码验证时自动等待重试)
_check_page_accessible(page, url)
# 加载全部评论
if load_all_comments:
try:
_load_all_comments(page, config)
except Exception as e:
logger.warning("加载全部评论失败: %s", e)
return _extract_feed_detail(page, feed_id)
# ========== 页面检查 ==========
def _check_page_accessible(page: Page, url: str = "") -> None:
"""检查页面是否可访问。
扫码验证场景:等待 10 秒后自动重新访问,验证消失则继续,否则报错。
"""
time.sleep(0.5)
text = page.get_element_text(ACCESS_ERROR_WRAPPER)
if not text:
return
text = text.strip()
# 检测扫码验证(反爬机制触发)→ 等待后重试
if _is_scan_qrcode_verification(text) and url:
logger.warning("触发小红书扫码验证,等待 10 秒后重新访问...")
time.sleep(10)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
retry_text = page.get_element_text(ACCESS_ERROR_WRAPPER)
if retry_text and _is_scan_qrcode_verification(retry_text.strip()):
raise PageNotAccessibleError(
"触发了小红书验证,需要在浏览器中扫码完成验证后重试。"
"这通常是小红书的反爬机制,请稍后再试或在 Chrome 中手动打开该笔记完成验证"
)
if not retry_text or not retry_text.strip():
logger.info("验证已消失,继续加载笔记")
return
# 重试后仍有其他错误,继续走下面的关键词检测
text = retry_text.strip()
for kw in _INACCESSIBLE_KEYWORDS:
if kw in text:
raise PageNotAccessibleError(kw)
if text:
raise PageNotAccessibleError(text)
def _is_scan_qrcode_verification(text: str) -> bool:
"""判断页面文本是否为扫码验证。"""
return any(kw in text for kw in _SCAN_QRCODE_KEYWORDS)
# ========== 数据提取 ==========
_EXTRACT_DETAIL_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.note &&
window.__INITIAL_STATE__.note.noteDetailMap) {
return JSON.stringify(window.__INITIAL_STATE__.note.noteDetailMap);
}
return "";
})()
"""
def _extract_feed_detail(page: Page, feed_id: str) -> FeedDetailResponse:
"""从 __INITIAL_STATE__ 提取 Feed 详情。"""
result = None
for _ in range(3):
result = page.evaluate(_EXTRACT_DETAIL_JS)
if result:
break
time.sleep(0.2)
if not result:
raise NoFeedDetailError()
note_detail_map = json.loads(result)
note_data = note_detail_map.get(feed_id)
if not note_data:
raise NoFeedDetailError()
return FeedDetailResponse(
note=FeedDetail.from_dict(note_data.get("note", {})),
comments=CommentList.from_dict(note_data.get("comments", {})),
)
# ========== 评论加载状态机 ==========
def _load_all_comments(page: Page, config: CommentLoadConfig) -> None:
"""加载全部评论的状态机。"""
max_attempts = (
config.max_comment_items * 3 if config.max_comment_items > 0 else DEFAULT_MAX_ATTEMPTS
)
scroll_interval = get_scroll_interval(config.scroll_speed)
logger.info("开始加载评论...")
_scroll_to_comments_area(page)
sleep_random(*HUMAN_DELAY)
# 检查是否无评论
if _check_no_comments(page):
logger.info("检测到无评论区域,跳过加载")
return
# 状态
last_count = 0
last_scroll_top = 0
stagnant_checks = 0
total_clicked = 0
total_skipped = 0
for attempt in range(max_attempts):
logger.debug("=== 尝试 %d/%d ===", attempt + 1, max_attempts)
# 检查是否到达底部
if _check_end_container(page):
count = _get_comment_count(page)
logger.info(
"检测到 THE END,加载完成: %d 条评论, 点击: %d, 跳过: %d",
count,
total_clicked,
total_skipped,
)
return
# 定期点击展开按钮
if config.click_more_replies and attempt % BUTTON_CLICK_INTERVAL == 0:
clicked, skipped = _click_show_more_buttons(page, config.max_replies_threshold)
total_clicked += clicked
total_skipped += skipped
if clicked > 0 or skipped > 0:
sleep_random(*READ_TIME)
# 第二轮
c2, s2 = _click_show_more_buttons(page, config.max_replies_threshold)
total_clicked += c2
total_skipped += s2
if c2 > 0 or s2 > 0:
sleep_random(*SHORT_READ)
# 获取当前评论数
current_count = _get_comment_count(page)
if current_count != last_count:
logger.info("评论增加: %d -> %d", last_count, current_count)
last_count = current_count
stagnant_checks = 0
else:
stagnant_checks += 1
# 检查是否达到目标
if config.max_comment_items > 0 and current_count >= config.max_comment_items:
logger.info("已达到目标评论数: %d/%d", current_count, config.max_comment_items)
return
# 滚动
if current_count > 0:
_scroll_to_last_comment(page)
sleep_random(*POST_SCROLL)
large_mode = stagnant_checks >= LARGE_SCROLL_TRIGGER
push_count = 1
if large_mode:
push_count = 3 + random.randint(0, 2)
scroll_delta, current_scroll_top = _human_scroll(
page, config.scroll_speed, large_mode, push_count
)
if scroll_delta < MIN_SCROLL_DELTA or current_scroll_top == last_scroll_top:
stagnant_checks += 1
else:
stagnant_checks = 0
last_scroll_top = current_scroll_top
# 停滞处理
if stagnant_checks >= STAGNANT_LIMIT:
logger.info("停滞过多,尝试大冲刺...")
_human_scroll(page, config.scroll_speed, True, 10)
stagnant_checks = 0
time.sleep(scroll_interval)
# 最终冲刺
logger.info("达到最大尝试次数,最后冲刺...")
_human_scroll(page, config.scroll_speed, True, FINAL_SPRINT_PUSH_COUNT)
count = _get_comment_count(page)
logger.info("加载结束: %d 条评论, 点击: %d, 跳过: %d", count, total_clicked, total_skipped)
# ========== 滚动 ==========
def _human_scroll(
page: Page,
speed: str,
large_mode: bool,
push_count: int,
) -> tuple[int, int]:
"""人类化滚动。
Returns:
(actual_delta, current_scroll_top)
"""
before_top = page.get_scroll_top()
viewport_height = page.get_viewport_height()
base_ratio = get_scroll_ratio(speed)
if large_mode:
base_ratio *= 2.0
actual_delta = 0
current_scroll_top = before_top
for i in range(max(1, push_count)):
scroll_delta = calculate_scroll_delta(viewport_height, base_ratio)
page.scroll_by(0, int(scroll_delta))
sleep_random(*SCROLL_WAIT)
current_scroll_top = page.get_scroll_top()
delta_this = current_scroll_top - before_top
actual_delta += delta_this
before_top = current_scroll_top
if i < push_count - 1:
sleep_random(*HUMAN_DELAY)
# 如果没有滚动,强制到底部
if actual_delta < MIN_SCROLL_DELTA and push_count > 0:
page.scroll_to_bottom()
sleep_random(*POST_SCROLL)
current_scroll_top = page.get_scroll_top()
actual_delta = current_scroll_top - (before_top - actual_delta)
return actual_delta, current_scroll_top
def _scroll_to_comments_area(page: Page) -> None:
"""滚动到评论区。"""
logger.info("滚动到评论区...")
page.scroll_element_into_view(".comments-container")
time.sleep(0.5)
# 触发懒加载
page.dispatch_wheel_event(100)
def _scroll_to_last_comment(page: Page) -> None:
"""滚动到最后一条评论。"""
count = page.get_elements_count(PARENT_COMMENT)
if count > 0:
page.scroll_nth_element_into_view(PARENT_COMMENT, count - 1)
# ========== DOM 查询 ==========
def _get_comment_count(page: Page) -> int:
"""获取当前评论数量。"""
return page.get_elements_count(PARENT_COMMENT)
def _get_total_comment_count(page: Page) -> int:
"""获取总评论数(从 "共N条评论" 提取)。"""
text = page.get_element_text(".comments-container .total")
if not text:
return 0
match = _TOTAL_COMMENT_RE.search(text)
if match:
return int(match.group(1))
return 0
def _check_no_comments(page: Page) -> bool:
"""检查是否无评论区域。"""
text = page.get_element_text(NO_COMMENTS_TEXT)
if not text:
return False
return "这是一片荒地" in text.strip()
def _check_end_container(page: Page) -> bool:
"""检查是否到达底部 THE END。"""
text = page.get_element_text(END_CONTAINER)
if not text:
return False
upper = text.strip().upper()
return "THE END" in upper or "THEEND" in upper
# ========== 按钮点击 ==========
def _click_show_more_buttons(page: Page, max_threshold: int) -> tuple[int, int]:
"""点击"展开N条回复"按钮。
Returns:
(clicked, skipped)
"""
count = page.get_elements_count(SHOW_MORE_BUTTON)
if count == 0:
return 0, 0
max_click = MAX_CLICK_PER_ROUND + random.randint(0, MAX_CLICK_PER_ROUND - 1)
clicked = 0
skipped = 0
for i in range(count):
if clicked >= max_click:
break
# 获取按钮文本
text = page.evaluate(
f"document.querySelectorAll({json.dumps(SHOW_MORE_BUTTON)})[{i}]?.textContent || ''"
)
if not text:
continue
# 检查是否应该跳过
if max_threshold > 0:
match = _REPLY_COUNT_RE.search(text)
if match:
reply_count = int(match.group(1))
if reply_count > max_threshold:
logger.debug(
"跳过 '%s'(回复数 %d > 阈值 %d)", text, reply_count, max_threshold
)
skipped += 1
continue
# 滚动到按钮并点击
page.scroll_nth_element_into_view(SHOW_MORE_BUTTON, i)
sleep_random(*REACTION_TIME)
page.evaluate(f"document.querySelectorAll({json.dumps(SHOW_MORE_BUTTON)})[{i}]?.click()")
sleep_random(*READ_TIME)
clicked += 1
return clicked, skipped
FILE:scripts/xhs/feeds.py
"""首页 Feed 列表,对应 Go xiaohongshu/feeds.go。"""
from __future__ import annotations
import json
import logging
import time
from .cdp import Page
from .errors import NoFeedsError
from .types import Feed
from .urls import HOME_URL
logger = logging.getLogger(__name__)
# 从 __INITIAL_STATE__ 提取 feeds 的 JS
_EXTRACT_FEEDS_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.feed &&
window.__INITIAL_STATE__.feed.feeds) {
const feeds = window.__INITIAL_STATE__.feed.feeds;
const feedsData = feeds.value !== undefined ? feeds.value : feeds._value;
if (feedsData) {
return JSON.stringify(feedsData);
}
}
return "";
})()
"""
def list_feeds(page: Page) -> list[Feed]:
"""获取首页 Feed 列表。
Raises:
NoFeedsError: 没有捕获到 feeds 数据。
"""
page.navigate(HOME_URL)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
result = page.evaluate(_EXTRACT_FEEDS_JS)
if not result:
raise NoFeedsError()
feeds_data = json.loads(result)
return [Feed.from_dict(f) for f in feeds_data]
FILE:scripts/xhs/human.py
"""人类行为模拟参数(延迟、滚动、悬停),对应 Go feed_detail.go 中的常量。"""
import random
import time
# ========== 配置常量 ==========
DEFAULT_MAX_ATTEMPTS = 500
STAGNANT_LIMIT = 20
MIN_SCROLL_DELTA = 10
MAX_CLICK_PER_ROUND = 3
STAGNANT_CHECK_THRESHOLD = 2
LARGE_SCROLL_TRIGGER = 5
BUTTON_CLICK_INTERVAL = 3
FINAL_SPRINT_PUSH_COUNT = 15
# ========== 延迟范围(毫秒) ==========
HUMAN_DELAY = (300, 700)
REACTION_TIME = (300, 800)
HOVER_TIME = (100, 300)
READ_TIME = (500, 1200)
SHORT_READ = (600, 1200)
SCROLL_WAIT = (100, 200)
POST_SCROLL = (300, 500)
def sleep_random(min_ms: int, max_ms: int) -> None:
"""随机延迟。"""
if max_ms <= min_ms:
time.sleep(min_ms / 1000.0)
return
delay = random.randint(min_ms, max_ms) / 1000.0
time.sleep(delay)
def navigation_delay() -> None:
"""页面导航后的随机等待,模拟人类阅读。"""
sleep_random(1000, 2500)
def get_scroll_interval(speed: str) -> float:
"""根据速度获取滚动间隔(秒)。"""
if speed == "slow":
return (1200 + random.randint(0, 300)) / 1000.0
if speed == "fast":
return (300 + random.randint(0, 100)) / 1000.0
# normal
return (600 + random.randint(0, 200)) / 1000.0
def get_scroll_ratio(speed: str) -> float:
"""根据速度获取滚动比例。"""
if speed == "slow":
return 0.5
if speed == "fast":
return 0.9
return 0.7
def calculate_scroll_delta(viewport_height: int, base_ratio: float) -> float:
"""计算滚动距离。"""
scroll_delta = viewport_height * (base_ratio + random.random() * 0.2)
if scroll_delta < 400:
scroll_delta = 400.0
return scroll_delta + random.randint(-50, 50)
# 页面不可访问关键词
INACCESSIBLE_KEYWORDS = [
"当前笔记暂时无法浏览",
"该内容因违规已被删除",
"该笔记已被删除",
"内容不存在",
"笔记不存在",
"已失效",
"私密笔记",
"仅作者可见",
"因用户设置,你无法查看",
"因违规无法查看",
]
FILE:scripts/xhs/like_favorite.py
"""点赞/收藏操作,对应 Go xiaohongshu/like_favorite.go。"""
from __future__ import annotations
import json
import logging
import time
from .cdp import Page
from .errors import NoFeedDetailError
from .selectors import COLLECT_BUTTON, LIKE_BUTTON
from .types import ActionResult
from .urls import make_feed_detail_url
logger = logging.getLogger(__name__)
# 从 __INITIAL_STATE__ 读取互动状态的 JS
_GET_INTERACT_STATE_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.note &&
window.__INITIAL_STATE__.note.noteDetailMap) {
return JSON.stringify(window.__INITIAL_STATE__.note.noteDetailMap);
}
return "";
})()
"""
def _get_interact_state(page: Page, feed_id: str) -> tuple[bool, bool]:
"""读取笔记的点赞/收藏状态。
Returns:
(liked, collected)
Raises:
NoFeedDetailError: 无法获取状态。
"""
result = page.evaluate(_GET_INTERACT_STATE_JS)
if not result:
raise NoFeedDetailError()
note_detail_map = json.loads(result)
detail = note_detail_map.get(feed_id)
if not detail:
raise NoFeedDetailError()
interact = detail.get("note", {}).get("interactInfo", {})
return interact.get("liked", False), interact.get("collected", False)
def _prepare_page(page: Page, feed_id: str, xsec_token: str) -> None:
"""导航到 feed 详情页。"""
url = make_feed_detail_url(feed_id, xsec_token)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
# ========== 点赞 ==========
def like_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""点赞笔记(幂等:已点赞则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_like(page, feed_id, target_liked=True)
def unlike_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""取消点赞(幂等:未点赞则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_like(page, feed_id, target_liked=False)
def _toggle_like(page: Page, feed_id: str, target_liked: bool) -> ActionResult:
"""执行点赞/取消点赞操作。"""
action_name = "点赞" if target_liked else "取消点赞"
try:
liked, _ = _get_interact_state(page, feed_id)
except NoFeedDetailError:
logger.warning("无法读取互动状态,直接点击")
liked = not target_liked # 强制执行点击
# 幂等检查
if liked == target_liked:
logger.info("feed %s 已%s,跳过", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"已{action_name}")
# 点击
page.click_element(LIKE_BUTTON)
time.sleep(3)
# 验证
try:
liked, _ = _get_interact_state(page, feed_id)
if liked == target_liked:
logger.info("feed %s %s成功", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}成功")
except NoFeedDetailError:
pass
# 重试一次
logger.warning("feed %s %s可能未成功,重试", feed_id, action_name)
page.click_element(LIKE_BUTTON)
time.sleep(2)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}已执行")
# ========== 收藏 ==========
def favorite_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""收藏笔记(幂等:已收藏则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_favorite(page, feed_id, target_collected=True)
def unfavorite_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""取消收藏(幂等:未收藏则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_favorite(page, feed_id, target_collected=False)
def _toggle_favorite(page: Page, feed_id: str, target_collected: bool) -> ActionResult:
"""执行收藏/取消收藏操作。"""
action_name = "收藏" if target_collected else "取消收藏"
try:
_, collected = _get_interact_state(page, feed_id)
except NoFeedDetailError:
logger.warning("无法读取互动状态,直接点击")
collected = not target_collected
# 幂等检查
if collected == target_collected:
logger.info("feed %s 已%s,跳过", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"已{action_name}")
# 点击
page.click_element(COLLECT_BUTTON)
time.sleep(3)
# 验证
try:
_, collected = _get_interact_state(page, feed_id)
if collected == target_collected:
logger.info("feed %s %s成功", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}成功")
except NoFeedDetailError:
pass
# 重试
logger.warning("feed %s %s可能未成功,重试", feed_id, action_name)
page.click_element(COLLECT_BUTTON)
time.sleep(2)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}已执行")
FILE:scripts/xhs/login.py
"""登录管理,对应 Go xiaohongshu/login.go。"""
from __future__ import annotations
import base64
import logging
import os
import tempfile
import time
from .cdp import Page
from .errors import RateLimitError
from .human import sleep_random
from .selectors import (
AGREE_CHECKBOX,
AGREE_CHECKBOX_CHECKED,
CODE_INPUT,
GET_CODE_BUTTON,
LOGIN_CONTAINER,
LOGIN_ERR_MSG,
LOGIN_STATUS,
LOGOUT_MENU_ITEM,
LOGOUT_MORE_BUTTON,
PHONE_INPUT,
PHONE_LOGIN_SUBMIT,
QRCODE_IMG,
)
from .urls import EXPLORE_URL
logger = logging.getLogger(__name__)
def check_login_status(page: Page) -> bool:
"""检查登录状态。
Returns:
True 已登录,False 未登录。
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
sleep_random(800, 1500)
return page.has_element(LOGIN_STATUS)
def fetch_qrcode(page: Page) -> tuple[str, bool]:
"""获取登录二维码。
Returns:
(qrcode_src, already_logged_in)
- 如果已登录,返回 ("", True)
- 如果未登录,返回 (qrcode_base64_or_url, False)
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
sleep_random(1500, 2500)
# 检查是否已登录
if page.has_element(LOGIN_STATUS):
return "", True
# 获取二维码图片 src
src = page.get_element_attribute(QRCODE_IMG, "src")
if not src:
raise RuntimeError("二维码图片 src 为空")
return src, False
def save_qrcode_to_file(src: str) -> str:
"""将二维码 data URL 保存为临时 PNG 文件。
Args:
src: 二维码图片的 data URL(data:image/png;base64,...)或普通 URL。
Returns:
保存的文件绝对路径。
"""
prefix = "data:image/png;base64,"
if src.startswith(prefix):
img_data = base64.b64decode(src[len(prefix) :])
elif src.startswith("data:image/"):
# 处理其他 MIME 类型,如 data:image/jpeg;base64,...
_, encoded = src.split(",", 1)
img_data = base64.b64decode(encoded)
else:
# 不是 data URL,无法保存
raise ValueError(f"不支持的二维码格式,需要 data URL: {src[:50]}...")
qr_dir = os.path.join(tempfile.gettempdir(), "xhs")
os.makedirs(qr_dir, exist_ok=True)
filepath = os.path.join(qr_dir, "login_qrcode.png")
with open(filepath, "wb") as f:
f.write(img_data)
logger.info("二维码已保存: %s", filepath)
return filepath
def send_phone_code(page: Page, phone: str) -> bool:
"""填写手机号并发送短信验证码。
适用于无界面服务器场景,全程通过 CDP 操作,无需扫码。
Args:
page: CDP 页面对象。
phone: 手机号(不含国家码,如 13800138000)。
Returns:
True 验证码已发送,False 已登录(无需再登录)。
Raises:
RuntimeError: 找不到登录表单或手机号输入框。
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
sleep_random(1500, 2500)
if page.has_element(LOGIN_STATUS):
return False
# 等待登录弹窗出现
page.wait_for_element(LOGIN_CONTAINER, timeout=15.0)
sleep_random(500, 800)
# 点击手机号输入框并逐字输入
page.click_element(PHONE_INPUT)
sleep_random(200, 400)
page.type_text(phone, delay_ms=80)
sleep_random(500, 800)
# 先勾选用户协议,再点获取验证码
if not page.has_element(AGREE_CHECKBOX_CHECKED):
page.click_element(AGREE_CHECKBOX)
sleep_random(300, 600)
# 点击"获取验证码"
page.click_element(GET_CODE_BUTTON)
sleep_random(2000, 2500)
# 检测按钮是否变为倒计时(成功发送后按钮文字会包含数字秒数)
btn_text = page.get_element_text(GET_CODE_BUTTON) or ""
if not any(ch.isdigit() for ch in btn_text):
raise RateLimitError()
logger.info("验证码已发送至 %s", phone[:3] + "****" + phone[-4:])
return True
def submit_phone_code(page: Page, code: str) -> bool:
"""填写短信验证码并提交登录。
Args:
page: CDP 页面对象。
code: 收到的短信验证码。
Returns:
True 登录成功,False 失败(超时或验证码错误)。
"""
# 点击验证码输入框并逐字输入
page.click_element(CODE_INPUT)
sleep_random(300, 500)
page.type_text(code, delay_ms=100)
sleep_random(500, 800)
# 点击登录按钮
page.click_element(PHONE_LOGIN_SUBMIT)
sleep_random(1000, 2000)
# 检查是否有错误提示
err = page.get_element_text(LOGIN_ERR_MSG)
if err and err.strip():
logger.warning("登录失败: %s", err.strip())
return False
return wait_for_login(page, timeout=30.0)
def logout(page: Page) -> bool:
"""通过页面 UI 退出登录(点击"更多"→"退出登录")。
Args:
page: CDP 页面对象。
Returns:
True 退出成功,False 未登录或操作失败。
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
sleep_random(800, 1500)
if not page.has_element(LOGIN_STATUS):
logger.info("当前未登录,无需退出")
return False
# 点击"更多"按钮展开菜单
page.click_element(LOGOUT_MORE_BUTTON)
sleep_random(500, 800)
# 等待退出菜单项出现并点击
page.wait_for_element(LOGOUT_MENU_ITEM, timeout=5.0)
page.click_element(LOGOUT_MENU_ITEM)
sleep_random(1000, 1500)
logger.info("已退出登录")
return True
def wait_for_login(page: Page, timeout: float = 120.0) -> bool:
"""等待扫码登录完成。
Args:
page: CDP 页面对象。
timeout: 超时时间(秒)。
Returns:
True 登录成功,False 超时。
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if page.has_element(LOGIN_STATUS):
logger.info("登录成功")
return True
time.sleep(0.5)
return False
FILE:scripts/xhs/publish.py
"""图文发布,对应 Go xiaohongshu/publish.go(837 行)。"""
from __future__ import annotations
import json
import logging
import random
import re
import time
from .cdp import Page
from .errors import ContentTooLongError, PublishError, TitleTooLongError, UploadTimeoutError
from .selectors import (
CONTENT_EDITOR,
CONTENT_LENGTH_ERROR,
CREATOR_TAB,
DATETIME_INPUT,
FILE_INPUT,
IMAGE_PREVIEW,
ORIGINAL_SWITCH,
ORIGINAL_SWITCH_CARD,
POPOVER,
PUBLISH_BUTTON,
SCHEDULE_SWITCH,
TAG_FIRST_ITEM,
TAG_TOPIC_CONTAINER,
TITLE_INPUT,
TITLE_MAX_SUFFIX,
UPLOAD_CONTENT,
UPLOAD_INPUT,
VISIBILITY_DROPDOWN,
VISIBILITY_OPTIONS,
)
from .types import PublishImageContent
from .urls import PUBLISH_URL
logger = logging.getLogger(__name__)
def publish_image_content(page: Page, content: PublishImageContent) -> None:
"""发布图文内容(填写表单 + 点击发布)。
Args:
page: CDP 页面对象。
content: 发布内容。
Raises:
PublishError: 发布失败。
UploadTimeoutError: 上传超时。
TitleTooLongError: 标题超长。
ContentTooLongError: 正文超长。
"""
fill_publish_form(page, content)
click_publish_button(page)
def fill_publish_form(page: Page, content: PublishImageContent) -> None:
"""填写图文发布表单,不点击发布按钮。
Args:
page: CDP 页面对象。
content: 发布内容。
Raises:
PublishError: 填写失败。
UploadTimeoutError: 上传超时。
TitleTooLongError: 标题超长。
ContentTooLongError: 正文超长。
"""
if not content.image_paths:
raise PublishError("图片不能为空")
# 导航到发布页
_navigate_to_publish_page(page)
# 点击"上传图文" TAB
_click_publish_tab(page, "上传图文")
time.sleep(1)
# 上传图片
_upload_images(page, content.image_paths)
# 标签截取
tags = content.tags[:10] if len(content.tags) > 10 else content.tags
if len(content.tags) > 10:
logger.warning("标签数量超过10,截取前10个")
logger.info(
"发布内容: title=%s, images=%d, tags=%d, schedule=%s, original=%s, visibility=%s",
content.title,
len(content.image_paths),
len(tags),
content.schedule_time,
content.is_original,
content.visibility,
)
# 填写表单(不点击发布)
_fill_publish_form(
page,
content.title,
content.content,
tags,
content.schedule_time,
content.is_original,
content.visibility,
)
def click_publish_button(page: Page) -> None:
"""点击发布按钮。
Args:
page: CDP 页面对象。
Raises:
PublishError: 点击失败。
"""
page.click_element(PUBLISH_BUTTON)
time.sleep(3)
logger.info("发布完成")
def save_as_draft(page: Page) -> None:
"""点击「暂存离开」按钮保存草稿。"""
clicked = page.evaluate(
"""
(() => {
const buttons = document.querySelectorAll('button.custom-button');
for (const btn of buttons) {
if (btn.textContent.trim() === '暂存离开') {
btn.click();
return true;
}
}
return false;
})()
"""
)
if clicked:
time.sleep(2)
logger.info("已点击「暂存离开」,内容已保存到草稿箱")
else:
logger.warning("未找到「暂存离开」按钮")
raise PublishError("未找到「暂存离开」按钮")
# ========== 页面导航 ==========
def _navigate_to_publish_page(page: Page) -> None:
"""导航到发布页面。"""
page.navigate(PUBLISH_URL)
page.wait_for_load(timeout=300)
time.sleep(3)
page.wait_dom_stable()
time.sleep(2)
def _click_publish_tab(page: Page, tab_name: str) -> None:
"""点击发布页 TAB(上传图文/上传视频)。"""
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
# 查找匹配的 TAB(支持多种结构)
found = page.evaluate(
f"""
(() => {{
// 策略1: 查找 div.creator-tab(过滤隐藏元素)
let tabs = document.querySelectorAll({json.dumps(CREATOR_TAB)});
for (const tab of tabs) {{
const titleSpan = tab.querySelector('span.title');
const tabText = titleSpan ? titleSpan.textContent.trim() : tab.textContent.trim();
if (tabText === {json.dumps(tab_name)}) {{
const rect = tab.getBoundingClientRect();
const style = window.getComputedStyle(tab);
// 跳过隐藏或被移出视口的元素
if (rect.width === 0 || rect.height === 0) continue;
if (rect.left < 0 || rect.top < 0) continue;
if (style.display === 'none' || style.visibility === 'hidden') continue;
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const target = document.elementFromPoint(x, y);
if (target === tab || tab.contains(target)) {{
tab.click();
return 'clicked';
}}
return 'blocked';
}}
}}
// 策略2: 查找任意包含目标文本的元素
const allElements = document.querySelectorAll('*');
for (const el of allElements) {{
if (el.children.length === 0 && el.textContent.trim() === {json.dumps(tab_name)}) {{
const rect = el.getBoundingClientRect();
const style = window.getComputedStyle(el);
if (rect.width === 0 || rect.height === 0) continue;
if (rect.left < 0 || rect.top < 0) continue;
if (style.display === 'none' || style.visibility === 'hidden') continue;
el.click();
return 'clicked';
}}
}}
return 'not_found';
}})()
"""
)
if found == "clicked":
return
if found == "blocked":
# 尝试移除弹窗
_remove_pop_cover(page)
time.sleep(0.2)
# 调试:输出页面信息
debug_info = page.evaluate("""
(() => {
const creatorTabs = document.querySelectorAll('div.creator-tab');
const tabTexts = Array.from(creatorTabs).map(t => ({
text: t.textContent.trim(),
html: t.outerHTML.substring(0, 200)
}));
const url = window.location.href;
return JSON.stringify({url, tabCount: creatorTabs.length, tabs: tabTexts});
})()
""")
logger.error("调试信息: %s", debug_info)
raise PublishError(f"没有找到发布 TAB - {tab_name}")
def _remove_pop_cover(page: Page) -> None:
"""移除弹窗遮挡。"""
if page.has_element(POPOVER):
page.remove_element(POPOVER)
# 点击空位置
x = 380 + random.randint(0, 100)
y = 20 + random.randint(0, 60)
page.mouse_click(float(x), float(y))
# ========== 图片上传 ==========
def _upload_images(page: Page, image_paths: list[str]) -> None:
"""逐张上传图片。"""
import os
valid_paths = [p for p in image_paths if os.path.exists(p)]
if not valid_paths:
raise PublishError("没有有效的图片文件")
for i, path in enumerate(valid_paths):
selector = UPLOAD_INPUT if i == 0 else FILE_INPUT
logger.info("上传第 %d 张图片: %s", i + 1, path)
page.set_file_input(selector, [path])
_wait_for_upload_complete(page, i + 1)
time.sleep(1)
def _wait_for_upload_complete(page: Page, expected_count: int) -> None:
"""等待图片上传完成。"""
max_wait = 60.0
start = time.monotonic()
while time.monotonic() - start < max_wait:
count = page.get_elements_count(IMAGE_PREVIEW)
if count >= expected_count:
logger.info("图片上传完成: %d", count)
return
time.sleep(0.5)
raise UploadTimeoutError(f"第{expected_count}张图片上传超时(60s)")
# ========== 表单提交 ==========
def _extract_hashtags_from_content(content: str, tags: list[str]) -> tuple[str, list[str]]:
"""从正文末尾提取 hashtag 行,合并到 tags 列表。
Returns:
(cleaned_content, merged_tags)
"""
lines = content.rstrip().split("\n")
# 检查最后一行是否全是 #tag 格式
if lines:
last_line = lines[-1].strip()
hashtag_pattern = re.compile(r"^(#\S+\s*)+$")
if hashtag_pattern.match(last_line):
# 提取 hashtag
extracted = re.findall(r"#(\S+)", last_line)
# 合并到 tags(去重)
existing = {t.lstrip("#") for t in tags}
merged = list(tags)
for t in extracted:
if t not in existing:
merged.append(t)
existing.add(t)
# 去掉最后一行
cleaned = "\n".join(lines[:-1]).rstrip()
logger.info("从正文末尾提取 %d 个标签,合并后共 %d 个", len(extracted), len(merged))
return cleaned, merged
return content, list(tags)
def _fill_publish_form(
page: Page,
title: str,
content: str,
tags: list[str],
schedule_time: str | None,
is_original: bool,
visibility: str,
) -> None:
"""填写表单(不点击发布)。"""
# 从正文末尾提取 hashtag 并合并到 tags
content, tags = _extract_hashtags_from_content(content, tags)
# 标题
page.input_text(TITLE_INPUT, title)
time.sleep(0.5)
_check_title_max_length(page)
logger.info("标题长度检查通过")
time.sleep(1)
# 正文
content_selector = _find_content_element(page)
page.input_content_editable(content_selector, content)
# 回点标题(增强稳定性)
time.sleep(1)
page.click_element(TITLE_INPUT)
logger.info("已回点标题输入框")
# 标签
if tags:
_input_tags(page, content_selector, tags)
time.sleep(1)
_check_content_max_length(page)
logger.info("正文长度检查通过")
# 定时发布
if schedule_time:
_set_schedule_publish(page, schedule_time)
# 可见范围
_set_visibility(page, visibility)
# 原创声明
if is_original:
try:
_set_original(page)
logger.info("已声明原创")
except Exception as e:
logger.warning("设置原创声明失败: %s", e)
logger.info("表单填写完成,等待确认发布")
def _find_content_element(page: Page) -> str:
"""查找内容输入框(兼容两种 UI)。"""
if page.has_element(CONTENT_EDITOR):
return CONTENT_EDITOR
# 查找带 placeholder 的 p 元素的 textbox 父元素
found = page.evaluate(
"""
(() => {
const ps = document.querySelectorAll('p');
for (const p of ps) {
const placeholder = p.getAttribute('data-placeholder');
if (placeholder && placeholder.includes('输入正文描述')) {
let current = p;
for (let i = 0; i < 5; i++) {
current = current.parentElement;
if (!current) break;
if (current.getAttribute('role') === 'textbox') {
return 'found';
}
}
}
}
return '';
})()
"""
)
if found == "found":
return "[role='textbox']"
raise PublishError("没有找到内容输入框")
def _check_title_max_length(page: Page) -> None:
"""检查标题长度是否超限。"""
text = page.get_element_text(TITLE_MAX_SUFFIX)
if text:
parts = text.split("/")
if len(parts) == 2:
raise TitleTooLongError(parts[0], parts[1])
raise TitleTooLongError(text, "?")
def _check_content_max_length(page: Page) -> None:
"""检查正文长度是否超限。"""
text = page.get_element_text(CONTENT_LENGTH_ERROR)
if text:
parts = text.split("/")
if len(parts) == 2:
raise ContentTooLongError(parts[0], parts[1])
raise ContentTooLongError(text, "?")
# ========== 标签输入 ==========
def _input_tags(page: Page, content_selector: str, tags: list[str]) -> None:
"""输入标签。"""
time.sleep(1)
# 先点击正文编辑器,确保焦点在正文而非标题
page.click_element(content_selector)
time.sleep(0.3)
# 移动光标到正文末尾(20次 ArrowDown)
for _ in range(20):
page.press_key("ArrowDown")
time.sleep(0.01)
# 按两次回车换行
page.press_key("Enter")
page.press_key("Enter")
time.sleep(1)
for tag in tags:
tag = tag.lstrip("#")
_input_single_tag(page, content_selector, tag)
def _input_single_tag(page: Page, content_selector: str, tag: str) -> None:
"""输入单个标签。"""
# 输入 #
page.type_text("#", delay_ms=0)
time.sleep(0.3)
# 逐字输入标签(随机间隔模拟真实输入)
for char in tag:
page.type_text(char, delay_ms=0)
time.sleep(random.uniform(0.05, 0.12))
# 等待标签联想出现(最多 3 秒)
deadline = time.monotonic() + 3.0
clicked = False
while time.monotonic() < deadline:
time.sleep(0.5)
if page.has_element(TAG_TOPIC_CONTAINER):
item_selector = f"{TAG_TOPIC_CONTAINER} {TAG_FIRST_ITEM}"
if page.has_element(item_selector):
page.click_element(item_selector)
logger.info("点击标签联想: %s", tag)
clicked = True
break
if not clicked:
# 没有联想,直接空格
logger.warning("未找到标签联想,直接输入空格: %s", tag)
page.type_text(" ", delay_ms=0)
time.sleep(0.8)
# ========== 定时发布 ==========
def _set_schedule_publish(page: Page, schedule_time: str) -> None:
"""设置定时发布。"""
from datetime import datetime
# 解析 ISO8601 时间
try:
dt = datetime.fromisoformat(schedule_time)
except ValueError as e:
raise PublishError(f"定时发布时间格式错误: {e}") from e
# 点击定时发布开关
page.click_element(SCHEDULE_SWITCH)
time.sleep(0.8)
# 设置日期时间
datetime_str = dt.strftime("%Y-%m-%d %H:%M")
page.select_all_text(DATETIME_INPUT)
page.input_text(DATETIME_INPUT, datetime_str)
time.sleep(0.5)
logger.info("已设置定时发布: %s", datetime_str)
# ========== 可见范围 ==========
def _set_visibility(page: Page, visibility: str) -> None:
"""设置可见范围。"""
if not visibility or visibility == "公开可见":
logger.info("可见范围: 公开可见(默认)")
return
supported = {"仅自己可见", "仅互关好友可见"}
if visibility not in supported:
raise PublishError(
f"不支持的可见范围: {visibility},支持: 公开可见、仅自己可见、仅互关好友可见"
)
# 点击下拉框
page.click_element(VISIBILITY_DROPDOWN)
time.sleep(0.5)
# 查找并点击目标选项
clicked = page.evaluate(
f"""
(() => {{
const opts = document.querySelectorAll({json.dumps(VISIBILITY_OPTIONS)});
for (const opt of opts) {{
if (opt.textContent.includes({json.dumps(visibility)})) {{
opt.click();
return true;
}}
}}
return false;
}})()
"""
)
if not clicked:
raise PublishError(f"未找到可见范围选项: {visibility}")
logger.info("已设置可见范围: %s", visibility)
time.sleep(0.2)
# ========== 原创声明 ==========
def _set_original(page: Page) -> None:
"""设置原创声明。"""
# 查找原创声明卡片并点击开关
result = page.evaluate(
f"""
(() => {{
const cards = document.querySelectorAll({json.dumps(ORIGINAL_SWITCH_CARD)});
for (const card of cards) {{
if (!card.textContent.includes('原创声明')) continue;
const sw = card.querySelector({json.dumps(ORIGINAL_SWITCH)});
if (!sw) continue;
const input = sw.querySelector('input[type="checkbox"]');
if (input && input.checked) return 'already_on';
sw.click();
return 'clicked';
}}
return 'not_found';
}})()
"""
)
if result == "already_on":
logger.info("原创声明已开启")
return
if result == "not_found":
raise PublishError("未找到原创声明选项")
time.sleep(0.5)
# 处理确认弹窗
_confirm_original_declaration(page)
def _confirm_original_declaration(page: Page) -> None:
"""处理原创声明确认弹窗。"""
time.sleep(0.8)
# 勾选 checkbox
page.evaluate(
"""
(() => {
const footers = document.querySelectorAll('div.footer');
for (const footer of footers) {
if (!footer.textContent.includes('原创声明须知')) continue;
const cb = footer.querySelector('div.d-checkbox input[type="checkbox"]');
if (cb && !cb.checked) cb.click();
return;
}
})()
"""
)
time.sleep(0.5)
# 点击声明原创按钮
result = page.evaluate(
"""
(() => {
const footers = document.querySelectorAll('div.footer');
for (const footer of footers) {
if (!footer.textContent.includes('声明原创')) continue;
const btn = footer.querySelector('button.custom-button');
if (btn) {
if (btn.classList.contains('disabled') || btn.disabled) {
const cb = footer.querySelector('div.d-checkbox input[type="checkbox"]');
if (cb && !cb.checked) cb.click();
return 'button_disabled';
}
btn.click();
return 'clicked';
}
}
return 'button_not_found';
})()
"""
)
if result == "button_not_found":
raise PublishError("未找到声明原创按钮")
if result == "button_disabled":
raise PublishError("声明原创按钮仍处于禁用状态")
logger.info("已成功点击声明原创按钮")
time.sleep(0.3)
FILE:scripts/xhs/publish_long_article.py
"""长文发布模式,参考 cdp_publish.py 的长文工作流。"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from .cdp import Page
from .errors import PublishError
from .publish import _click_publish_tab, _find_content_element, _navigate_to_publish_page
from .selectors import (
AUTO_FORMAT_BUTTON_TEXT,
CONTENT_EDITOR,
LONG_ARTICLE_TITLE,
NEW_CREATION_BUTTON_TEXT,
NEXT_STEP_BUTTON_TEXT,
TEMPLATE_CARD,
TEMPLATE_TITLE,
)
logger = logging.getLogger(__name__)
# 等待常量
_AUTO_FORMAT_WAIT = 3.0
_TEMPLATE_WAIT_ROUNDS = 15
_PAGE_LOAD_WAIT = 3.0
def publish_long_article(
page: Page,
title: str,
content: str,
image_paths: list[str] | None = None,
) -> list[str]:
"""长文发布:导航 → 点击写长文 → 新的创作 → 填写标题正文 → 一键排版。
返回可用模板名称列表。
Args:
page: CDP 页面对象。
title: 长文标题。
content: 长文正文(段落用换行分隔)。
image_paths: 可选的图片路径列表(插入编辑器)。
Returns:
可用模板名称列表。
Raises:
PublishError: 操作失败。
"""
# 1. 导航到发布页
_navigate_to_publish_page(page)
# 2. 点击"写长文"TAB
_click_publish_tab(page, "写长文")
time.sleep(1)
# 3. 点击"新的创作"
_click_new_creation(page)
# 4. 填写标题(textarea)
_fill_long_title(page, title)
# 5. 填写正文(TipTap 编辑器)
_fill_long_content(page, content)
# 6. 可选:插入图片到编辑器
if image_paths:
_insert_images_to_editor(page, image_paths)
# 7. 点击"一键排版"
_click_auto_format(page)
# 8. 等待模板加载并返回名称列表
_wait_for_templates(page)
template_names = get_template_names(page)
logger.info("模板加载完成: %s", template_names)
return template_names
def get_template_names(page: Page) -> list[str]:
"""获取当前可用的排版模板名称列表。
Args:
page: CDP 页面对象。
Returns:
模板名称列表。
"""
names = page.evaluate(
f"""
(() => {{
const cards = document.querySelectorAll({json.dumps(TEMPLATE_CARD)});
const names = [];
for (const card of cards) {{
const title = card.querySelector({json.dumps(TEMPLATE_TITLE)});
names.push(title ? title.textContent.trim() : 'Template ' + names.length);
}}
return names;
}})()
"""
)
return names or []
def select_template(page: Page, template_name: str) -> bool:
"""选择指定名称的排版模板。
Args:
page: CDP 页面对象。
template_name: 模板名称。
Returns:
是否成功选择。
"""
clicked = page.evaluate(
f"""
(() => {{
const cards = document.querySelectorAll({json.dumps(TEMPLATE_CARD)});
for (const card of cards) {{
const title = card.querySelector({json.dumps(TEMPLATE_TITLE)});
if (title && title.textContent.trim() === {json.dumps(template_name)}) {{
card.click();
return true;
}}
}}
return false;
}})()
"""
)
if clicked:
logger.info("已选择模板: %s", template_name)
time.sleep(1)
else:
logger.warning("未找到模板: %s", template_name)
return bool(clicked)
def click_next_and_fill_description(page: Page, description: str) -> None:
"""点击下一步,进入发布页并填写正文描述。
注意:发布页有独立的正文编辑器,需单独填入。
如果 description 超过 1000 字,应压缩到 800 字左右。
Args:
page: CDP 页面对象。
description: 发布页正文描述。
Raises:
PublishError: 操作失败。
"""
# 点击"下一步"
_click_button_by_text(page, NEXT_STEP_BUTTON_TEXT)
time.sleep(_PAGE_LOAD_WAIT)
# 填写发布页描述
if description:
# 截断描述到 1000 字以内
if len(description) > 1000:
description = description[:800]
logger.warning("描述超过1000字,已截断到800字")
content_selector = _find_content_element(page)
page.input_content_editable(content_selector, description)
logger.info("已填写发布页描述")
# ========== 内部辅助函数 ==========
def _click_new_creation(page: Page) -> None:
"""点击"新的创作"按钮。"""
_click_button_by_text(page, NEW_CREATION_BUTTON_TEXT)
time.sleep(2)
page.wait_dom_stable()
logger.info("已点击'新的创作'")
def _fill_long_title(page: Page, title: str) -> None:
"""填写长文标题(textarea,需使用 native setter)。"""
page.wait_for_element(LONG_ARTICLE_TITLE, timeout=10)
page.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(LONG_ARTICLE_TITLE)});
if (!el) return false;
const nativeSetter = Object.getOwnPropertyDescriptor(
window.HTMLTextAreaElement.prototype, 'value'
).set;
el.focus();
nativeSetter.call(el, {json.dumps(title)});
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
el.dispatchEvent(new Event('change', {{ bubbles: true }}));
return true;
}})()
"""
)
logger.info("已填写长文标题: %s", title[:20])
time.sleep(0.5)
def _fill_long_content(page: Page, content: str) -> None:
"""填写长文正文(TipTap/ProseMirror 编辑器)。"""
content_selector = CONTENT_EDITOR
if not page.has_element(CONTENT_EDITOR):
content_selector = _find_content_element(page)
page.input_content_editable(content_selector, content)
logger.info("已填写长文正文 (%d 字)", len(content))
time.sleep(1)
def _insert_images_to_editor(page: Page, image_paths: list[str]) -> None:
"""将图片插入到编辑器中。"""
for img_path in image_paths:
file_uri = Path(img_path).resolve().as_uri()
page.evaluate(
f"""
(() => {{
const editor = document.querySelector({json.dumps(CONTENT_EDITOR)});
if (!editor) return false;
const img = document.createElement('img');
img.src = {json.dumps(file_uri)};
editor.appendChild(img);
editor.dispatchEvent(new Event('input', {{ bubbles: true }}));
return true;
}})()
"""
)
logger.info("已插入 %d 张图片到编辑器", len(image_paths))
time.sleep(1)
def _click_auto_format(page: Page) -> None:
"""点击"一键排版"按钮。"""
_click_button_by_text(page, AUTO_FORMAT_BUTTON_TEXT)
logger.info("已点击'一键排版',等待模板加载...")
time.sleep(_AUTO_FORMAT_WAIT)
def _wait_for_templates(page: Page) -> bool:
"""等待模板卡片出现。"""
for _ in range(_TEMPLATE_WAIT_ROUNDS):
count = page.get_elements_count(TEMPLATE_CARD)
if count and count > 0:
logger.info("发现 %d 个模板卡片", count)
return True
time.sleep(1)
logger.warning("等待模板卡片超时")
return False
def _click_button_by_text(page: Page, text: str) -> None:
"""通过文本内容查找并点击按钮(通用方法)。"""
clicked = page.evaluate(
f"""
(() => {{
const elems = document.querySelectorAll(
'button, [role="button"], span, div, a, [class*="btn"]'
);
for (const el of elems) {{
if (el.textContent.trim() === {json.dumps(text)}) {{
const rect = el.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) continue;
el.click();
return true;
}}
}}
return false;
}})()
"""
)
if not clicked:
raise PublishError(f"未找到'{text}'按钮,页面结构可能已变化")
FILE:scripts/xhs/publish_video.py
"""视频发布,对应 Go xiaohongshu/publish_video.go。"""
from __future__ import annotations
import logging
import os
import time
from .cdp import Page
from .errors import PublishError, UploadTimeoutError
from .publish import (
_click_publish_tab,
_find_content_element,
_input_tags,
_navigate_to_publish_page,
_set_schedule_publish,
_set_visibility,
)
from .selectors import (
FILE_INPUT,
PUBLISH_BUTTON,
TITLE_INPUT,
UPLOAD_INPUT,
)
from .types import PublishVideoContent
logger = logging.getLogger(__name__)
def publish_video_content(page: Page, content: PublishVideoContent) -> None:
"""发布视频内容(填写表单 + 点击发布)。
Args:
page: CDP 页面对象。
content: 视频发布内容。
Raises:
PublishError: 发布失败。
UploadTimeoutError: 上传/处理超时。
"""
fill_publish_video_form(page, content)
click_publish_video_button(page)
def fill_publish_video_form(page: Page, content: PublishVideoContent) -> None:
"""填写视频发布表单,不点击发布按钮。
Args:
page: CDP 页面对象。
content: 视频发布内容。
Raises:
PublishError: 填写失败。
UploadTimeoutError: 上传/处理超时。
"""
if not content.video_path:
raise PublishError("视频不能为空")
# 导航到发布页
_navigate_to_publish_page(page)
# 点击"上传视频" TAB
_click_publish_tab(page, "上传视频")
time.sleep(1)
# 上传视频
_upload_video(page, content.video_path)
# 填写表单(不点击发布)
_fill_publish_video_form(
page,
content.title,
content.content,
content.tags,
content.schedule_time,
content.visibility,
)
def click_publish_video_button(page: Page) -> None:
"""点击视频发布按钮。
Args:
page: CDP 页面对象。
"""
_wait_for_publish_button_clickable(page)
page.click_element(PUBLISH_BUTTON)
time.sleep(3)
logger.info("视频发布完成")
def _upload_video(page: Page, video_path: str) -> None:
"""上传视频文件。"""
if not os.path.exists(video_path):
raise PublishError(f"视频文件不存在: {video_path}")
# 查找上传输入框
selector = UPLOAD_INPUT if page.has_element(UPLOAD_INPUT) else FILE_INPUT
page.set_file_input(selector, [video_path])
# 等待发布按钮可点击(视频处理完成)
_wait_for_publish_button_clickable(page)
logger.info("视频上传/处理完成")
def _wait_for_publish_button_clickable(page: Page) -> None:
"""等待发布按钮可点击(视频处理可能需要较长时间)。"""
max_wait = 600.0 # 10 分钟
start = time.monotonic()
logger.info("开始等待发布按钮可点击(视频)")
while time.monotonic() - start < max_wait:
clickable = page.evaluate(
f"""
(() => {{
const btn = document.querySelector({_js_str(PUBLISH_BUTTON)});
if (!btn) return false;
const rect = btn.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
if (btn.disabled) return false;
if (btn.classList.contains('disabled')) return false;
return true;
}})()
"""
)
if clickable:
return
time.sleep(1)
raise UploadTimeoutError("等待发布按钮可点击超时(10分钟)")
def _fill_publish_video_form(
page: Page,
title: str,
content: str,
tags: list[str],
schedule_time: str | None,
visibility: str,
) -> None:
"""填写视频表单(不点击发布)。"""
# 标题
page.input_text(TITLE_INPUT, title)
time.sleep(1)
# 正文 + 标签
content_selector = _find_content_element(page)
page.input_content_editable(content_selector, content)
# 回点标题
time.sleep(1)
page.click_element(TITLE_INPUT)
if tags:
_input_tags(page, content_selector, tags)
time.sleep(1)
# 定时发布
if schedule_time:
_set_schedule_publish(page, schedule_time)
# 可见范围
_set_visibility(page, visibility)
logger.info("视频表单填写完成,等待确认发布")
def _js_str(s: str) -> str:
"""将 Python 字符串转为 JS 字面量。"""
import json
return json.dumps(s)
FILE:scripts/xhs/search.py
"""搜索 Feeds,对应 Go xiaohongshu/search.go。"""
from __future__ import annotations
import json
import logging
import time
from .cdp import Page
from .errors import NoFeedsError
from .human import sleep_random
from .selectors import FILTER_BUTTON, FILTER_PANEL
from .types import Feed, FilterOption
from .urls import make_search_url
logger = logging.getLogger(__name__)
# 筛选选项映射表:{筛选组索引: [(标签索引, 文本), ...]}
_FILTER_OPTIONS: dict[int, list[tuple[int, str]]] = {
1: [(1, "综合"), (2, "最新"), (3, "最多点赞"), (4, "最多评论"), (5, "最多收藏")],
2: [(1, "不限"), (2, "视频"), (3, "图文")],
3: [(1, "不限"), (2, "一天内"), (3, "一周内"), (4, "半年内")],
4: [(1, "不限"), (2, "已看过"), (3, "未看过"), (4, "已关注")],
5: [(1, "不限"), (2, "同城"), (3, "附近")],
}
# 从 __INITIAL_STATE__ 提取搜索结果的 JS
_EXTRACT_SEARCH_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.search &&
window.__INITIAL_STATE__.search.feeds) {
const feeds = window.__INITIAL_STATE__.search.feeds;
const feedsData = feeds.value !== undefined ? feeds.value : feeds._value;
if (feedsData) {
return JSON.stringify(feedsData);
}
}
return "";
})()
"""
def _find_internal_option(group_index: int, text: str) -> tuple[int, int]:
"""查找内部筛选选项索引。
Returns:
(filters_index, tags_index)
Raises:
ValueError: 未找到匹配的选项。
"""
options = _FILTER_OPTIONS.get(group_index)
if not options:
raise ValueError(f"筛选组 {group_index} 不存在")
for tags_index, option_text in options:
if option_text == text:
return group_index, tags_index
valid = [t for _, t in options]
raise ValueError(f"在筛选组 {group_index} 中未找到 '{text}',有效值: {valid}")
def _convert_filters(filter_opt: FilterOption) -> list[tuple[int, int]]:
"""将 FilterOption 转换为内部 (filters_index, tags_index) 列表。"""
result: list[tuple[int, int]] = []
if filter_opt.sort_by:
result.append(_find_internal_option(1, filter_opt.sort_by))
if filter_opt.note_type:
result.append(_find_internal_option(2, filter_opt.note_type))
if filter_opt.publish_time:
result.append(_find_internal_option(3, filter_opt.publish_time))
if filter_opt.search_scope:
result.append(_find_internal_option(4, filter_opt.search_scope))
if filter_opt.location:
result.append(_find_internal_option(5, filter_opt.location))
return result
def search_feeds(
page: Page,
keyword: str,
filter_option: FilterOption | None = None,
) -> list[Feed]:
"""搜索 Feeds。
Args:
page: CDP 页面对象。
keyword: 搜索关键词。
filter_option: 可选筛选条件。
Raises:
NoFeedsError: 没有捕获到搜索结果。
ValueError: 筛选选项无效。
"""
search_url = make_search_url(keyword)
page.navigate(search_url)
page.wait_for_load()
page.wait_dom_stable()
# 等待 __INITIAL_STATE__ 初始化
_wait_for_initial_state(page)
# 应用筛选条件
if filter_option:
internal_filters = _convert_filters(filter_option)
if internal_filters:
_apply_filters(page, internal_filters)
# 提取搜索结果
result = page.evaluate(_EXTRACT_SEARCH_JS)
if not result:
raise NoFeedsError()
feeds_data = json.loads(result)
return [Feed.from_dict(f) for f in feeds_data]
def _wait_for_initial_state(page: Page, timeout: float = 10.0) -> None:
"""等待 __INITIAL_STATE__ 就绪。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
ready = page.evaluate("window.__INITIAL_STATE__ !== undefined")
if ready:
return
time.sleep(0.5)
logger.warning("等待 __INITIAL_STATE__ 超时")
def _apply_filters(page: Page, filters: list[tuple[int, int]]) -> None:
"""应用筛选条件。"""
# 悬停筛选按钮
page.hover_element(FILTER_BUTTON)
# 等待筛选面板出现
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
if page.has_element(FILTER_PANEL):
break
sleep_random(300, 600)
# 点击各筛选项
for filters_index, tags_index in filters:
selector = (
f"div.filter-panel div.filters:nth-child({filters_index}) "
f"div.tags:nth-child({tags_index})"
)
page.click_element(selector)
sleep_random(300, 600)
# 等待页面更新
page.wait_dom_stable()
_wait_for_initial_state(page)
FILE:scripts/xhs/selectors.py
"""小红书页面 CSS 选择器常量。"""
# ========== 登录 ==========
LOGIN_STATUS = ".main-container .user .link-wrapper .channel"
QRCODE_IMG = ".login-container .qrcode-img"
# ========== 手机号登录 ==========
LOGIN_CONTAINER = ".login-container"
PHONE_INPUT = "label.phone input"
GET_CODE_BUTTON = "span.code-button"
CODE_INPUT = "label.auth-code input"
PHONE_LOGIN_SUBMIT = ".input-container button.submit"
AGREE_CHECKBOX = ".agree-icon .icon-wrapper"
AGREE_CHECKBOX_CHECKED = ".agree-icon .icon-wrapper.agreed"
LOGIN_ERR_MSG = ".err-msg"
# ========== 首页 / 搜索 ==========
FILTER_BUTTON = "div.filter"
FILTER_PANEL = "div.filter-panel"
# ========== Feed 详情 ==========
COMMENTS_CONTAINER = ".comments-container"
PARENT_COMMENT = ".parent-comment"
NO_COMMENTS_TEXT = ".no-comments-text"
END_CONTAINER = ".end-container"
TOTAL_COMMENT = ".comments-container .total"
SHOW_MORE_BUTTON = ".show-more"
NOTE_SCROLLER = ".note-scroller"
INTERACTION_CONTAINER = ".interaction-container"
# 页面不可访问容器
ACCESS_ERROR_WRAPPER = ".access-wrapper, .error-wrapper, .not-found-wrapper, .blocked-wrapper"
# ========== 评论输入 ==========
COMMENT_INPUT_TRIGGER = "div.input-box div.content-edit span"
COMMENT_INPUT_FIELD = "div.input-box div.content-edit p.content-input"
COMMENT_SUBMIT_BUTTON = "div.bottom button.submit"
REPLY_BUTTON = ".right .interactions .reply"
# ========== 点赞 / 收藏 ==========
LIKE_BUTTON = ".interact-container .left .like-lottie"
COLLECT_BUTTON = ".interact-container .left .reds-icon.collect-icon"
# ========== 发布页 ==========
UPLOAD_CONTENT = "div.upload-content"
CREATOR_TAB = "div.creator-tab"
UPLOAD_INPUT = ".upload-input"
FILE_INPUT = 'input[type="file"]'
TITLE_INPUT = "div.d-input input"
CONTENT_EDITOR = "div.ql-editor"
IMAGE_PREVIEW = ".img-preview-area .pr"
PUBLISH_BUTTON = ".publish-page-publish-btn button.bg-red"
# 标题/正文长度校验
TITLE_MAX_SUFFIX = "div.title-container div.max_suffix"
CONTENT_LENGTH_ERROR = "div.edit-container div.length-error"
# 可见范围
VISIBILITY_DROPDOWN = "div.permission-card-wrapper div.d-select-content"
VISIBILITY_OPTIONS = "div.d-options-wrapper div.d-grid-item div.custom-option"
# 定时发布
SCHEDULE_SWITCH = ".post-time-wrapper .d-switch"
DATETIME_INPUT = ".date-picker-container input"
# 原创声明
ORIGINAL_SWITCH_CARD = "div.custom-switch-card"
ORIGINAL_SWITCH = "div.d-switch"
# 标签联想
TAG_TOPIC_CONTAINER = "#creator-editor-topic-container"
TAG_FIRST_ITEM = ".item"
# 弹窗
POPOVER = "div.d-popover"
# ========== 写长文模式 ==========
# 注意: 长文模式的按钮(写长文、新的创作、一键排版、下一步)通过文本匹配定位
LONG_ARTICLE_TAB_TEXT = "写长文"
NEW_CREATION_BUTTON_TEXT = "新的创作"
AUTO_FORMAT_BUTTON_TEXT = "一键排版"
NEXT_STEP_BUTTON_TEXT = "下一步"
LONG_ARTICLE_TITLE = 'textarea.d-text[placeholder="输入标题"]'
TEMPLATE_CARD = ".template-card"
TEMPLATE_TITLE = ".template-card .template-title"
# ========== 退出登录 ==========
LOGOUT_MORE_BUTTON = "div.information-wrapper"
LOGOUT_MENU_ITEM = 'div.menu-item[data-name="退出登录"]'
# ========== 用户主页 ==========
SIDEBAR_PROFILE = "div.main-container li.user.side-bar-component a.link-wrapper span.channel"
FILE:scripts/xhs/stealth.py
"""反检测 JS 注入 + Chrome 启动参数,对应 go-rod/stealth。"""
# 真实 Chrome UA(固定版本,避免每次随机导致指纹不一致)
REALISTIC_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
# 反检测 JS 脚本:在页面加载时注入
STEALTH_JS = """
(() => {
// 1. navigator.webdriver
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
configurable: true,
});
// 2. chrome.runtime
if (!window.chrome) {
window.chrome = {};
}
if (!window.chrome.runtime) {
window.chrome.runtime = {
connect: () => {},
sendMessage: () => {},
};
}
// 3. plugins
Object.defineProperty(navigator, 'plugins', {
get: () => {
return [
{
0: {type: 'application/x-google-chrome-pdf'},
description: 'Portable Document Format',
filename: 'internal-pdf-viewer',
length: 1,
name: 'Chrome PDF Plugin',
},
{
0: {type: 'application/pdf'},
description: '',
filename: 'mhjfbmdgcfjbbpaeojofohoefgiehjai',
length: 1,
name: 'Chrome PDF Viewer',
},
{
0: {type: 'application/x-nacl'},
description: '',
filename: 'internal-nacl-plugin',
length: 1,
name: 'Native Client',
},
];
},
configurable: true,
});
// 4. languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en-US', 'en'],
configurable: true,
});
// 5. permissions
const originalQuery = window.navigator.permissions?.query;
if (originalQuery) {
window.navigator.permissions.query = (parameters) =>
parameters.name === 'notifications'
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters);
}
// 6. WebGL vendor/renderer
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) return 'Intel Inc.';
if (parameter === 37446) return 'Intel Iris OpenGL Engine';
return getParameter.call(this, parameter);
};
// 7. hardwareConcurrency — 随机 4 或 8
Object.defineProperty(navigator, 'hardwareConcurrency', {
get: () => [4, 8][Math.floor(Math.random() * 2)],
configurable: true,
});
// 8. deviceMemory — 随机 4 或 8
Object.defineProperty(navigator, 'deviceMemory', {
get: () => [4, 8][Math.floor(Math.random() * 2)],
configurable: true,
});
// 9. navigator.connection — 伪造网络信息
Object.defineProperty(navigator, 'connection', {
get: () => ({
effectiveType: '4g',
downlink: 10,
rtt: 50,
saveData: false,
}),
configurable: true,
});
// 10. chrome.csi / chrome.loadTimes — 空函数伪装
if (window.chrome) {
window.chrome.csi = function() { return {}; };
window.chrome.loadTimes = function() { return {}; };
}
// 11. outerWidth/outerHeight — 与 innerWidth/innerHeight 对齐
Object.defineProperty(window, 'outerWidth', {
get: () => window.innerWidth,
configurable: true,
});
Object.defineProperty(window, 'outerHeight', {
get: () => window.innerHeight,
configurable: true,
});
})();
"""
# Chrome 启动参数(反检测相关)
STEALTH_ARGS = [
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-first-run",
"--no-default-browser-check",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-component-update",
"--disable-extensions",
"--disable-sync",
"--no-sandbox",
"--disable-setuid-sandbox",
]
FILE:scripts/xhs/types.py
"""小红书数据类型定义,对应 Go types.go。"""
from __future__ import annotations
from dataclasses import dataclass, field
# ========== Feed 列表 ==========
@dataclass
class ImageInfo:
image_scene: str = ""
url: str = ""
@classmethod
def from_dict(cls, d: dict) -> ImageInfo:
return cls(
image_scene=d.get("imageScene", ""),
url=d.get("url", ""),
)
@dataclass
class VideoCapability:
duration: int = 0 # 秒
@classmethod
def from_dict(cls, d: dict) -> VideoCapability:
return cls(duration=d.get("duration", 0))
@dataclass
class Video:
capa: VideoCapability = field(default_factory=VideoCapability)
@classmethod
def from_dict(cls, d: dict) -> Video:
return cls(capa=VideoCapability.from_dict(d.get("capa", {})))
@dataclass
class Cover:
width: int = 0
height: int = 0
url: str = ""
file_id: str = ""
url_pre: str = ""
url_default: str = ""
info_list: list[ImageInfo] = field(default_factory=list)
@classmethod
def from_dict(cls, d: dict) -> Cover:
return cls(
width=d.get("width", 0),
height=d.get("height", 0),
url=d.get("url", ""),
file_id=d.get("fileId", ""),
url_pre=d.get("urlPre", ""),
url_default=d.get("urlDefault", ""),
info_list=[ImageInfo.from_dict(i) for i in d.get("infoList", [])],
)
@dataclass
class User:
user_id: str = ""
nickname: str = ""
nick_name: str = ""
avatar: str = ""
@classmethod
def from_dict(cls, d: dict) -> User:
return cls(
user_id=d.get("userId", ""),
nickname=d.get("nickname", ""),
nick_name=d.get("nickName", ""),
avatar=d.get("avatar", ""),
)
@dataclass
class InteractInfo:
liked: bool = False
liked_count: str = ""
shared_count: str = ""
comment_count: str = ""
collected_count: str = ""
collected: bool = False
@classmethod
def from_dict(cls, d: dict) -> InteractInfo:
return cls(
liked=d.get("liked", False),
liked_count=d.get("likedCount", ""),
shared_count=d.get("sharedCount", ""),
comment_count=d.get("commentCount", ""),
collected_count=d.get("collectedCount", ""),
collected=d.get("collected", False),
)
@dataclass
class NoteCard:
type: str = ""
display_title: str = ""
user: User = field(default_factory=User)
interact_info: InteractInfo = field(default_factory=InteractInfo)
cover: Cover = field(default_factory=Cover)
video: Video | None = None
@classmethod
def from_dict(cls, d: dict) -> NoteCard:
video_data = d.get("video")
return cls(
type=d.get("type", ""),
display_title=d.get("displayTitle", ""),
user=User.from_dict(d.get("user", {})),
interact_info=InteractInfo.from_dict(d.get("interactInfo", {})),
cover=Cover.from_dict(d.get("cover", {})),
video=Video.from_dict(video_data) if video_data else None,
)
@dataclass
class Feed:
xsec_token: str = ""
id: str = ""
model_type: str = ""
note_card: NoteCard = field(default_factory=NoteCard)
index: int = 0
@classmethod
def from_dict(cls, d: dict) -> Feed:
return cls(
xsec_token=d.get("xsecToken", ""),
id=d.get("id", ""),
model_type=d.get("modelType", ""),
note_card=NoteCard.from_dict(d.get("noteCard", {})),
index=d.get("index", 0),
)
def to_dict(self) -> dict:
"""序列化为 JSON 兼容的字典。"""
result: dict = {
"id": self.id,
"xsecToken": self.xsec_token,
"modelType": self.model_type,
"index": self.index,
"displayTitle": self.note_card.display_title,
"type": self.note_card.type,
"user": {
"userId": self.note_card.user.user_id,
"nickname": self.note_card.user.nickname or self.note_card.user.nick_name,
},
"interactInfo": {
"likedCount": self.note_card.interact_info.liked_count,
"collectedCount": self.note_card.interact_info.collected_count,
"commentCount": self.note_card.interact_info.comment_count,
"sharedCount": self.note_card.interact_info.shared_count,
},
}
cover = self.note_card.cover
if cover.url or cover.url_default:
result["cover"] = cover.url or cover.url_default
if self.note_card.video:
result["video"] = {"duration": self.note_card.video.capa.duration}
return result
# ========== Feed 详情 ==========
@dataclass
class DetailImageInfo:
width: int = 0
height: int = 0
url_default: str = ""
url_pre: str = ""
live_photo: bool = False
@classmethod
def from_dict(cls, d: dict) -> DetailImageInfo:
return cls(
width=d.get("width", 0),
height=d.get("height", 0),
url_default=d.get("urlDefault", ""),
url_pre=d.get("urlPre", ""),
live_photo=d.get("livePhoto", False),
)
@dataclass
class Comment:
id: str = ""
note_id: str = ""
content: str = ""
like_count: str = ""
create_time: int = 0
ip_location: str = ""
liked: bool = False
user_info: User = field(default_factory=User)
sub_comment_count: str = ""
sub_comments: list[Comment] = field(default_factory=list)
show_tags: list[str] = field(default_factory=list)
@classmethod
def from_dict(cls, d: dict) -> Comment:
return cls(
id=d.get("id", ""),
note_id=d.get("noteId", ""),
content=d.get("content", ""),
like_count=d.get("likeCount", ""),
create_time=d.get("createTime", 0),
ip_location=d.get("ipLocation", ""),
liked=d.get("liked", False),
user_info=User.from_dict(d.get("userInfo", {})),
sub_comment_count=d.get("subCommentCount", ""),
sub_comments=[cls.from_dict(c) for c in d.get("subComments", []) or []],
show_tags=d.get("showTags", []) or [],
)
def to_dict(self) -> dict:
result: dict = {
"id": self.id,
"content": self.content,
"likeCount": self.like_count,
"createTime": self.create_time,
"ipLocation": self.ip_location,
"user": {
"userId": self.user_info.user_id,
"nickname": self.user_info.nickname or self.user_info.nick_name,
},
"subCommentCount": self.sub_comment_count,
}
if self.sub_comments:
result["subComments"] = [c.to_dict() for c in self.sub_comments]
return result
@dataclass
class CommentList:
list_: list[Comment] = field(default_factory=list)
cursor: str = ""
has_more: bool = False
@classmethod
def from_dict(cls, d: dict) -> CommentList:
return cls(
list_=[Comment.from_dict(c) for c in d.get("list", []) or []],
cursor=d.get("cursor", ""),
has_more=d.get("hasMore", False),
)
@dataclass
class FeedDetail:
note_id: str = ""
xsec_token: str = ""
title: str = ""
desc: str = ""
type: str = ""
time: int = 0
ip_location: str = ""
user: User = field(default_factory=User)
interact_info: InteractInfo = field(default_factory=InteractInfo)
image_list: list[DetailImageInfo] = field(default_factory=list)
@classmethod
def from_dict(cls, d: dict) -> FeedDetail:
return cls(
note_id=d.get("noteId", ""),
xsec_token=d.get("xsecToken", ""),
title=d.get("title", ""),
desc=d.get("desc", ""),
type=d.get("type", ""),
time=d.get("time", 0),
ip_location=d.get("ipLocation", ""),
user=User.from_dict(d.get("user", {})),
interact_info=InteractInfo.from_dict(d.get("interactInfo", {})),
image_list=[DetailImageInfo.from_dict(i) for i in d.get("imageList", []) or []],
)
def to_dict(self) -> dict:
return {
"noteId": self.note_id,
"title": self.title,
"desc": self.desc,
"type": self.type,
"time": self.time,
"ipLocation": self.ip_location,
"user": {
"userId": self.user.user_id,
"nickname": self.user.nickname or self.user.nick_name,
},
"interactInfo": {
"liked": self.interact_info.liked,
"likedCount": self.interact_info.liked_count,
"collectedCount": self.interact_info.collected_count,
"collected": self.interact_info.collected,
"commentCount": self.interact_info.comment_count,
"sharedCount": self.interact_info.shared_count,
},
"imageList": [
{
"width": img.width,
"height": img.height,
"urlDefault": img.url_default,
}
for img in self.image_list
],
}
@dataclass
class FeedDetailResponse:
note: FeedDetail = field(default_factory=FeedDetail)
comments: CommentList = field(default_factory=CommentList)
@classmethod
def from_dict(cls, d: dict) -> FeedDetailResponse:
return cls(
note=FeedDetail.from_dict(d.get("note", {})),
comments=CommentList.from_dict(d.get("comments", {})),
)
def to_dict(self) -> dict:
return {
"note": self.note.to_dict(),
"comments": [c.to_dict() for c in self.comments.list_],
}
# ========== 用户主页 ==========
@dataclass
class UserBasicInfo:
gender: int = 0
ip_location: str = ""
desc: str = ""
imageb: str = ""
nickname: str = ""
images: str = ""
red_id: str = ""
@classmethod
def from_dict(cls, d: dict) -> UserBasicInfo:
return cls(
gender=d.get("gender", 0),
ip_location=d.get("ipLocation", ""),
desc=d.get("desc", ""),
imageb=d.get("imageb", ""),
nickname=d.get("nickname", ""),
images=d.get("images", ""),
red_id=d.get("redId", ""),
)
@dataclass
class UserInteraction:
type: str = ""
name: str = ""
count: str = ""
@classmethod
def from_dict(cls, d: dict) -> UserInteraction:
return cls(
type=d.get("type", ""),
name=d.get("name", ""),
count=d.get("count", ""),
)
@dataclass
class UserProfileResponse:
user_basic_info: UserBasicInfo = field(default_factory=UserBasicInfo)
interactions: list[UserInteraction] = field(default_factory=list)
feeds: list[Feed] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"basicInfo": {
"nickname": self.user_basic_info.nickname,
"redId": self.user_basic_info.red_id,
"desc": self.user_basic_info.desc,
"gender": self.user_basic_info.gender,
"ipLocation": self.user_basic_info.ip_location,
},
"interactions": [
{"type": i.type, "name": i.name, "count": i.count} for i in self.interactions
],
"feeds": [f.to_dict() for f in self.feeds],
}
# ========== 搜索 ==========
@dataclass
class FilterOption:
"""搜索筛选选项。"""
sort_by: str = "" # 综合|最新|最多点赞|最多评论|最多收藏
note_type: str = "" # 不限|视频|图文
publish_time: str = "" # 不限|一天内|一周内|半年内
search_scope: str = "" # 不限|已看过|未看过|已关注
location: str = "" # 不限|同城|附近
# ========== 发布 ==========
@dataclass
class PublishImageContent:
"""图文发布内容。"""
title: str = ""
content: str = ""
tags: list[str] = field(default_factory=list)
image_paths: list[str] = field(default_factory=list)
schedule_time: str | None = None # ISO8601 格式,None 表示立即发布
is_original: bool = False
visibility: str = "" # 公开可见(默认)|仅自己可见|仅互关好友可见
@dataclass
class PublishVideoContent:
"""视频发布内容。"""
title: str = ""
content: str = ""
tags: list[str] = field(default_factory=list)
video_path: str = ""
schedule_time: str | None = None # ISO8601 格式
visibility: str = "" # 公开可见(默认)|仅自己可见|仅互关好友可见
# ========== 互动 ==========
@dataclass
class ActionResult:
"""通用动作响应(点赞/收藏等)。"""
feed_id: str = ""
success: bool = False
message: str = ""
def to_dict(self) -> dict:
return {
"feed_id": self.feed_id,
"success": self.success,
"message": self.message,
}
# ========== 评论加载配置 ==========
@dataclass
class CommentLoadConfig:
"""评论加载配置。"""
click_more_replies: bool = False
max_replies_threshold: int = 10
max_comment_items: int = 0 # 0 = 不限
scroll_speed: str = "normal" # slow|normal|fast
FILE:scripts/xhs/urls.py
"""小红书 URL 常量和构建函数。"""
from urllib.parse import urlencode
# 基础页面
EXPLORE_URL = "https://www.xiaohongshu.com/explore"
HOME_URL = "https://www.xiaohongshu.com"
PUBLISH_URL = "https://creator.xiaohongshu.com/publish/publish?source=official"
def make_feed_detail_url(feed_id: str, xsec_token: str) -> str:
"""构建 feed 详情页 URL。"""
return (
f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source=pc_feed"
)
def make_search_url(keyword: str) -> str:
"""构建搜索结果页 URL。"""
params = urlencode({"keyword": keyword, "source": "web_explore_feed"})
return f"https://www.xiaohongshu.com/search_result?{params}"
def make_user_profile_url(user_id: str, xsec_token: str) -> str:
"""构建用户主页 URL。"""
return (
f"https://www.xiaohongshu.com/user/profile/{user_id}"
f"?xsec_token={xsec_token}&xsec_source=pc_note"
)
FILE:scripts/xhs/user_profile.py
"""用户主页,对应 Go xiaohongshu/user_profile.go。"""
from __future__ import annotations
import json
import logging
import time
from .cdp import Page
from .types import Feed, UserBasicInfo, UserInteraction, UserProfileResponse
from .urls import make_user_profile_url
logger = logging.getLogger(__name__)
# 提取用户数据的 JS
_EXTRACT_USER_DATA_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.user &&
window.__INITIAL_STATE__.user.userPageData) {
const userPageData = window.__INITIAL_STATE__.user.userPageData;
const data = userPageData.value !== undefined ? userPageData.value : userPageData._value;
if (data) {
return JSON.stringify(data);
}
}
return "";
})()
"""
_EXTRACT_USER_NOTES_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.user &&
window.__INITIAL_STATE__.user.notes) {
const notes = window.__INITIAL_STATE__.user.notes;
const data = notes.value !== undefined ? notes.value : notes._value;
if (data) {
return JSON.stringify(data);
}
}
return "";
})()
"""
def get_user_profile(page: Page, user_id: str, xsec_token: str) -> UserProfileResponse:
"""获取用户主页信息及帖子。
Args:
page: CDP 页面对象。
user_id: 用户 ID。
xsec_token: xsec_token。
Raises:
RuntimeError: 数据提取失败。
"""
url = make_user_profile_url(user_id, xsec_token)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
return _extract_user_profile_data(page)
def _extract_user_profile_data(page: Page) -> UserProfileResponse:
"""从页面提取用户资料数据。"""
# 等待 __INITIAL_STATE__
_wait_for_initial_state(page)
# 提取用户信息
user_data_result = page.evaluate(_EXTRACT_USER_DATA_JS)
if not user_data_result:
raise RuntimeError("user.userPageData.value not found in __INITIAL_STATE__")
# 提取用户帖子
notes_result = page.evaluate(_EXTRACT_USER_NOTES_JS)
if not notes_result:
raise RuntimeError("user.notes.value not found in __INITIAL_STATE__")
# 解析用户信息
user_page_data = json.loads(user_data_result)
basic_info = UserBasicInfo.from_dict(user_page_data.get("basicInfo", {}))
interactions = [UserInteraction.from_dict(i) for i in user_page_data.get("interactions", [])]
# 解析帖子(双重数组,展平)
notes_feeds_raw = json.loads(notes_result)
feeds: list[Feed] = []
for feed_group in notes_feeds_raw:
if isinstance(feed_group, list):
for f in feed_group:
feeds.append(Feed.from_dict(f))
elif isinstance(feed_group, dict):
feeds.append(Feed.from_dict(feed_group))
return UserProfileResponse(
user_basic_info=basic_info,
interactions=interactions,
feeds=feeds,
)
def _wait_for_initial_state(page: Page, timeout: float = 10.0) -> None:
"""等待 __INITIAL_STATE__ 就绪。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
ready = page.evaluate("window.__INITIAL_STATE__ !== undefined")
if ready:
return
time.sleep(0.5)
logger.warning("等待 __INITIAL_STATE__ 超时")
FILE:skill.json
{
"name": "xiaohongshu-v2",
"version": "2.0.0",
"description": "小红书自动化技能 V2 - 基于 CDP 的完整自动化方案,支持登录、发布、搜索、互动",
"author": "Tina",
"entry": "SKILL.md",
"tools": [
"scripts/cli.py"
],
"dependencies": {
"python": ">=3.10",
"chrome": ">=120"
},
"capabilities": [
"xiaohongshu:login",
"xiaohongshu:publish",
"xiaohongshu:search",
"xiaohongshu:interact"
]
}
自动检测并修复小红书MCP部署常见问题,包括端口占用、cookie路径、服务状态及超时等待。
# 小红书 MCP 补丁包
部署小红书 MCP 时常见问题的自动化修复方案。
## 问题一:端口被占用
**问题描述**:启动 MCP 服务时提示 `bind: address already in use`
**自动修复代码**:
```bash
#!/bin/bash
# 检查并释放端口 18060
echo "检查端口 18060..."
PID=$(lsof -ti:18060 2>/dev/null)
if [ -n "$PID" ]; then
echo "发现占用进程: $PID,正在终止..."
kill -9 $PID
sleep 2
echo "进程已终止"
else
echo "端口空闲"
fi
# 验证
if lsof -i :18060 >/dev/null 2>&1; then
echo "错误:端口仍被占用"
exit 1
else
echo "端口已释放,可以启动服务"
fi
```
**使用方法**:
```bash
chmod +x fix-port.sh
./fix-port.sh
```
---
## 问题二:Cookie 路径问题
**问题描述**:MCP 服务找不到 cookies.json 文件
**自动修复代码**:
```bash
#!/bin/bash
# 将 cookie 文件复制到所有可能的位置
COOKIE_SOURCE="/path/to/your/cookies.json"
echo "复制 cookie 文件到多个位置..."
# 创建目录
mkdir -p /tmp/cookies
mkdir -p ~/.cache/rod/browser
# 复制到多个位置
cp "$COOKIE_SOURCE" /tmp/cookies.json
cp "$COOKIE_SOURCE" /tmp/cookies/cookies.json
cp "$COOKIE_SOURCE" ./cookies.json 2>/dev/null || true
cp "$COOKIE_SOURCE" ~/.cache/rod/browser/cookies.json
echo "Cookie 文件已放置到:"
find /tmp -name "cookies.json" 2>/dev/null
find ~ -name "cookies.json" 2>/dev/null | head -5
```
**使用方法**:
```bash
chmod +x fix-cookie.sh
./fix-cookie.sh /path/to/your/cookies.json
```
---
## 问题三:调用卡住/超时
**问题描述**:调用 MCP 工具时无响应,可能是 Chrome 下载中
**自动修复代码**:
```bash
#!/bin/bash
# 等待 Chrome 下载完成
echo "等待 Chrome 下载完成..."
LOG_FILE="/tmp/mcp.log"
for i in {1..120}; do
if grep -q "Downloaded:" "$LOG_FILE" 2>/dev/null; then
echo "✅ Chrome 下载完成"
break
fi
if grep -q "Unzip:" "$LOG_FILE" 2>/dev/null; then
echo "✅ Chrome 解压中..."
break
fi
PROGRESS=$(grep "Progress:" "$LOG_FILE" 2>/dev/null | tail -1)
if [ -n "$PROGRESS" ]; then
echo "进度: $PROGRESS"
fi
sleep 5
done
echo "准备就绪"
```
**使用方法**:
```bash
chmod +x wait-chrome.sh
./wait-chrome.sh
```
---
## 问题四:检查服务状态
**问题描述**:不确定 MCP 服务是否正常运行
**自动修复代码**:
```bash
#!/bin/bash
# 检查并重启 MCP 服务
echo "检查 MCP 服务状态..."
# 检查进程
if pgrep -f "xiaohongshu-mcp" >/dev/null; then
echo "服务正在运行"
ps aux | grep xiaohongshu-mcp | grep -v grep
else
echo "服务未运行,正在启动..."
cd /tmp
nohup ./xiaohongshu-mcp-linux-amd64 >/tmp/mcp.log 2>&1 &
sleep 3
if pgrep -f "xiaohongshu-mcp" >/dev/null; then
echo "✅ 服务已启动"
tail -3 /tmp/mcp.log
else
echo "❌ 启动失败"
exit 1
fi
fi
# 测试连接
echo "测试 MCP 连接..."
SESSION=$(curl -s -N -D - -X POST http://localhost:18060/mcp \
-H "Content-Type: application/json" \
-H "Accept: application/json, text/event-stream" \
-d '{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}},"id":1}' 2>/dev/null | \
grep -i "Mcp-Session-Id" | awk '{print $2}' | tr -d '\r')
if [ -n "$SESSION" ]; then
echo "✅ MCP 连接正常,Session: $SESSION"
else
echo "❌ MCP 连接失败"
exit 1
fi
```
**使用方法**:
```bash
chmod +x check-service.sh
./check-service.sh
```
---
## 一键修复脚本
```bash
#!/bin/bash
# 小红书 MCP 一键修复
echo "===== 小红书 MCP 补丁包 ====="
# 1. 修复端口
echo "[1/4] 检查端口..."
lsof -ti:18060 | xargs kill -9 2>/dev/null
sleep 1
# 2. 检查 cookie
if [ -f "/tmp/cookies/cookies.json" ]; then
echo "[2/4] Cookie 文件存在"
else
echo "[2/4] ⚠️ 缺少 cookie 文件,请手动放置"
fi
# 3. 启动服务
echo "[3/4] 启动服务..."
cd /tmp
if pgrep -f "xiaohongshu-mcp" >/dev/null; then
echo "服务已在运行"
else
nohup ./xiaohongshu-mcp-linux-amd64 >/tmp/mcp.log 2>&1 &
sleep 3
echo "服务已启动"
fi
# 4. 测试连接
echo "[4/4] 测试连接..."
SESSION=$(curl -s -N -D - -X POST http://localhost:18060/mcp \
-H "Content-Type: application/json" \
-H "Accept: application/json, text/event-stream" \
-d '{"jsonrpc":"2.0","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}},"id":1}' 2>/dev/null | \
grep -i "Mcp-Session-Id" | awk '{print $2}' | tr -d '\r')
if [ -n "$SESSION" ]; then
echo "✅ 修复完成,MCP 正常工作"
else
echo "❌ 连接失败,请检查日志: tail -f /tmp/mcp.log"
fi
```
---
## 参考
- 小红书 MCP 项目:https://github.com/xpzouying/xiaohongshu-mcp
- OpenClaw 文档:https://docs.openclaw.ai
FILE:skill.json
{
"name": "xiaohongshu-mcp-patch",
"version": "1.0.0",
"description": "小红书 MCP 部署补丁包 - 自动修复常见问题",
"author": "OpenClaw Community",
"tags": ["xiaohongshu", "mcp", "patch", "fix", "小红书", "部署"],
"homepage": "https://github.com/openclaw-community/xiaohongshu-mcp-patch"
}解决飞书连接中权限配置、机器人能力、事件订阅及长连接问题,确保机器人正常接收和回复消息。
# 飞书连接问题解决方案
解决 OpenClaw 连接飞书时的常见问题。
## 问题一:机器人不回复
导入以下权限:
```json
{
"scopes": {
"tenant": [
"application:bot.menu:write",
"contact:user.employee_id:readonly",
"im:chat.access_event.bot_p2p_chat:read",
"im:chat.members:bot_access",
"im:message",
"im:message.group_at_msg:readonly",
"im:message.p2p_msg:readonly",
"im:message:readonly",
"im:message:send_as_bot",
"im:message.reactions:read",
"im:resource"
],
"user": [
"im:chat.access_event.bot_p2p_chat:read"
]
}
}
```
## 问题二:应用平台检查
### 1. 启用机器人能力
路径:飞书开放平台 → 你的应用 → **应用能力** → **机器人**
- 开启「机器人能力」
- 配置机器人名称
### 2. 配置事件订阅(长连接)
⚠️ **必须在 OpenClaw 网关启动后再配置**
路径:飞书开放平台 → 你的应用 → **事件与回调**
- **订阅方式**:「**使用长连接接收事件**」
- **添加事件**:`im.message.receive_v1`
### 3. 配置权限
路径:飞书开放平台 → 你的应用 → **权限管理**
- 点击「**批量导入**」
- 粘贴 JSON 权限
- 保存
### 4. 发布应用
路径:飞书开放平台 → 你的应用 → **版本管理与发布**
- 创建版本 → 提交发布
## 问题三:日历权限开放
导入以下权限:
```json
{
"scopes": {
"tenant": [
"calendar:calendar",
"calendar:calendar:readonly",
"task:task:write",
"task:task:read",
"contact:user.employee_id:readonly"
],
"user": [
"calendar:calendar",
"calendar:calendar:readonly",
"task:task:write",
"task:task:read"
]
}
}
```
## 参考
- 飞书开放平台:https://open.feishu.cn/app
- OpenClaw 文档:https://docs.openclaw.ai
FILE:README.md
# 飞书连接问题解决方案
解决 OpenClaw 连接飞书时的常见问题。
## 问题一:机器人不回复
导入以下权限:
```json
{
"scopes": {
"tenant": [
"application:bot.menu:write",
"contact:user.employee_id:readonly",
"im:chat.access_event.bot_p2p_chat:read",
"im:chat.members:bot_access",
"im:message",
"im:message.group_at_msg:readonly",
"im:message.p2p_msg:readonly",
"im:message:readonly",
"im:message:send_as_bot",
"im:message.reactions:read",
"im:resource"
],
"user": [
"im:chat.access_event.bot_p2p_chat:read"
]
}
}
```
## 问题二:应用平台检查
### 1. 启用机器人能力
路径:飞书开放平台 → 你的应用 → **应用能力** → **机器人**
- 开启「机器人能力」
- 配置机器人名称
### 2. 配置事件订阅(长连接)
⚠️ **必须在 OpenClaw 网关启动后再配置**
路径:飞书开放平台 → 你的应用 → **事件与回调**
- **订阅方式**:「**使用长连接接收事件**」
- **添加事件**:`im.message.receive_v1`
### 3. 配置权限
路径:飞书开放平台 → 你的应用 → **权限管理**
- 点击「**批量导入**」
- 粘贴 JSON 权限
- 保存
### 4. 发布应用
路径:飞书开放平台 → 你的应用 → **版本管理与发布**
- 创建版本 → 提交发布
## 问题三:日历权限开放
导入以下权限:
```json
{
"scopes": {
"tenant": [
"calendar:calendar",
"calendar:calendar:readonly",
"task:task:write",
"task:task:read",
"contact:user.employee_id:readonly"
],
"user": [
"calendar:calendar",
"calendar:calendar:readonly",
"task:task:write",
"task:task:read"
]
}
}
```
## 参考
- 飞书开放平台:https://open.feishu.cn/app
- OpenClaw 文档:https://docs.openclaw.ai
FILE:skill.json
{
"name": "feishu-troubleshoot",
"version": "1.0.0",
"description": "飞书连接问题解决方案 - 权限配置、机器人配置、长连接设置",
"author": "OpenClaw Community",
"tags": ["feishu", "lark", "troubleshoot", "飞书", "权限", "配置"],
"homepage": "https://github.com/openclaw-community/feishu-troubleshoot"
}Extract and organize investor questions and project team answers from meeting transcripts into structured, time-sequenced notes with clear, minimal topic tit...
# Meeting Notes 整理 Skill ## 适用场景 整理投资会议转写内容为结构化 Meeting Notes ## 核心原则 ### 1. 第一性原则 Meeting notes 不是会议复盘,而是把"投资人提出的问题"和"项目方能给出的真实回答"并列放在一起。 ### 2. 内容处理规则 #### 投资人 - **提问**:抽象为1-3句「问题本身」,不保留对话过程、不保留修辞、不保留追问路径 - **判断/补充**:提炼为1-2句在说什么,明确这是"判断"或"方向取舍",不是项目承诺 - **呈现方式**:`人名: 问题/判断本身`,不使用转述句式(如"关注...""试图厘清...") #### 项目方 - 尽量**逐句保留原话** - 只允许做:断句、去口语赘余、修正明显病句、消除指代不清 - **禁止**:总结、拔高、合并不同语义段、替项目方补逻辑 - 项目方的话"可以啰嗦",但必须"真实" ### 3. 主题划分规则 - **按会议真实发生顺序**排列主题 - 先拆出所有投资人的问题,看问题自然落在哪些主题上 - 给主题加**极短小标题**(只做导航,不总结) - 相关主题可合并(如"创业选择"与"创业动机") ### 4. 格式规范 - 一行问题 + 一段回答 - 投资人问句保留追问但精简 - 项目方回答尽可能保留原文 - 关键信息摘要表格放在最后 ### 5. 信息筛选 **保留**: - 关键数字、时间节点、人名 - 对投资判断有信息量的内容 - 哪怕内容重复、啰嗦,也要保留 **删除**: - 寒暄("新年好"、"久等了") - 技术细节演示过程 - 重复确认("能看到吗?"、"Ok") - 语气词和口头禅 ### 6. 特殊处理 - 当 Speaker Label 不可靠时,以**语义立场**优先 - 解释产品/技术/预算 → 一律视为项目方 - 提问/判断优先级/逼选择 → 一律视为投资人 ## 工作流程 1. **读取转写原文**,标记时间戳和发言人 2. **提取所有投资人问题**,按时间顺序排列 3. **划分主题**,确定小标题 4. **整理问答对**,投资人精简、项目方保真 5. **合并相关主题**(如需要) 6. **添加关键信息摘要表格** 7. **审核顺序**,确保符合会议真实流程 ## 常见主题模板 ### 标准会议结构 1. 开场寒暄(通常删除) 2. 项目介绍与创始人背景 3. 团队构成 4. 产品/项目介绍 5. 商业模式与增长 6. 融资与估值 7. 竞争与行业讨论 8. 结尾与后续 ### 可选主题(根据会议内容) - 价值观排序(开场提问) - 个人经历与创业动机 - 技术/供应链细节 - 用户洞察 - 失败经历复盘 - 团队建议(找联创等) ## 输出格式 ```markdown Meeting Notes - YYYY-MM-DD 会议时间: YYYY-MM-DD HH:MM 参与人: XXX (投资人), XXX (项目方) --- ## 一、[主题名称] **投资人**: [问题内容] **项目方**: [回答内容,尽可能保留原文] ## 二、[主题名称] ... ## 关键信息摘要 | 项目 | XXX | |------|-----| | **创始人** | ... | | **阶段** | ... | | **团队** | ... | | **估值** | ... | ``` ## 注意事项 - 不要想当然整合内容,严格按会议顺序 - 投资人提问要精简但不要去除追问 - 项目方回答要尽可能保留原文,不要删除创始人说的话 - 主题划分要尽量按照会上真实发生的顺序 - 信息密度要平均 - 格式就是一行问题+一段回答,以此类推 ## 版本记录 - v1.0: 2026-02-25 基于投资会议整理经验创建