@clawhub-ryanchan720-7b4bbf6d0d
Email management skill for AI assistants with real-time notifications, smart categorization (7 categories), verification code extraction, and HTML content sa...
--- name: email-bridge description: Email management skill for AI assistants with real-time notifications, smart categorization (7 categories), verification code extraction, and HTML content sanitization. Supports Gmail, QQ Mail, and NetEase. homepage: https://github.com/ryanchan720/email-bridge source: https://github.com/ryanchan720/email-bridge version: 0.6.3 --- # Email Bridge Skill Email management skill for OpenClaw. Provides real-time email monitoring with smart categorization and clean notifications for AI assistants. ## Features - **Real-time notifications**: IMAP IDLE (QQ/NetEase) + polling (Gmail) - **Smart categorization**: 7 categories with subject-only classification - **Verification code extraction**: Context-aware, low false positive rate - **HTML content sanitization**: Clean text from HTML emails, remove invisible chars - **Prompt injection protection**: Safe email content for AI processing - **Multi-provider support**: Gmail (API), QQ Mail (IMAP), NetEase (IMAP) ## Installation ```bash cd skills/email-bridge pip install -e . ``` ## Setup (Manual CLI Required) ⚠️ **Security Note**: Do NOT share authorization codes in chat. Configure accounts via CLI only. ```bash # Add account (prompts for authorization code securely) email-bridge accounts add [email protected] -p qq # Sync emails email-bridge sync # Start daemon for real-time notifications email-bridge daemon start -d ``` ### Getting Authorization Codes **QQ Mail:** https://service.mail.qq.com/detail/0/75 (send SMS, get 16-char code) **NetEase (163/126):** Settings → POP3/SMTP/IMAP → Enable → Get code **Gmail:** Requires OAuth setup (see README.md) ## Capabilities - **Receive emails**: Sync and read emails from configured accounts - **Send emails**: Send emails via SMTP - **Real-time notifications**: Push to OpenClaw via `openclaw system event` - **Smart categorization**: 7 categories with keyword-based classification - **Verification code extraction**: Context-aware extraction with low false positives - **Link extraction**: Extract action links from emails - **HTML sanitization**: Clean text extraction with invisible char removal - **Prompt injection protection**: Sanitize email content for safe AI processing ## Email Categories Subject-only classification for fast, reliable categorization: | Category | Icon | Description | Example Keywords | |----------|------|-------------|------------------| | verification | 🔐 | Verification codes, activation | 验证码, OTP, activate, 绑定邮箱 | | security | ⚠️ | Security alerts, login warnings | 安全提醒, security alert, 密码修改 | | transactional | 📦 | Orders, payments, shipping | 订单确认, receipt, 发货通知 | | promotion | 🎁 | Marketing, promotions, rewards | 奖励, 优惠, promo, discount | | subscription | 📰 | Newsletters, digests | newsletter, 订阅, weekly digest | | spam_like | 🚫 | Suspected spam | 中奖, FREE, click here now | | normal | — | Regular email | (default) | ## Trigger Keywords **Chinese:** 邮箱、邮件、发邮件、查看邮件、验证码、QQ邮箱、Gmail **English:** email, mail, send email, check email, verification code ## Common Commands ```bash # List recent emails email-bridge messages list -n 10 # Get verification codes from recent emails email-bridge codes # Send email email-bridge send -a <account_id> -t [email protected] -s "Subject" -b "Body" # Daemon management email-bridge daemon status email-bridge daemon stop ``` ## Configuration Configuration file: `~/.email-bridge/config.json` **Default configuration** (auto-generated, minimal): ```json { "daemon": { "poll_interval": 300, "notify_openclaw": true } } ``` **Full configuration with all options** (customize as needed): ```json { "daemon": { "poll_interval": 300, "notify_openclaw": true, "notification": { "include_body": false, "body_max_length": 500, "include_verification_codes": true, "include_links": false } } } ``` ### Notification Options | Option | Default | Description | |--------|---------|-------------| | `include_body` | `false` | Include email body preview in notifications | | `body_max_length` | `500` | Max characters for body preview | | `include_verification_codes` | `true` | Auto-extract and show verification codes | | `include_links` | `false` | Include action links (verify/reset) | ## Notifications When new emails arrive, the daemon sends formatted notifications: ``` 📧 新邮件: [email protected] 1. 🔐 Google 您的验证码 ✨ 验证码: 123456 2. ⚠️ Microsoft 登录提醒 📝 We noticed a new sign-in... 3. 🎁 OKX 150 USDT 奖励等您拿 📝 亲爱的欧易用户,欧易诚邀您加入邀请好友计划... ``` ## HTML Content Processing HTML-only emails are processed through: 1. **Tag stripping**: Remove `<style>`, `<script>`, and all HTML tags 2. **Entity decoding**: Convert HTML entities to text 3. **Invisible char removal**: Remove zero-width spaces, BOM, etc. 4. **Whitespace normalization**: Clean up spacing 5. **Prompt injection protection**: Remove dangerous patterns **Example**: HTML with invisible chars → Clean readable text ## Security Features - **Subject-only classification**: No body scanning for privacy - **Context-aware code extraction**: Only extract near verification keywords - **Invisible char sanitization**: Remove U+200B, U+FEFF, U+034F, etc. - **Prompt injection protection**: Filter dangerous instruction patterns - **Address pattern exclusion**: Don't extract numbers from addresses ## Data Storage All data stored locally at `~/.email-bridge/`: ``` ~/.email-bridge/ ├── email_bridge.db # SQLite database (accounts, messages) ├── config.json # Configuration file ├── daemon.pid # Daemon process ID ├── daemon.log # Logs └── gmail/ ├── credentials.json # OAuth credentials └── token_*.json # OAuth tokens ``` ⚠️ Credentials are stored unencrypted. Protect this directory. ## Revoking Access ```bash # Stop daemon email-bridge daemon stop # Remove all stored data rm -rf ~/.email-bridge # For Gmail: revoke at https://myaccount.google.com/permissions # For QQ/NetEase: regenerate authorization codes in email settings ``` ## Dependencies All from PyPI: - click >= 8.0 - pydantic >= 2.0 - imaplib2 >= 3.6 - google-api-python-client >= 2.0 (Gmail only) - google-auth-oauthlib >= 1.0 (Gmail only) ## Security Notes 1. **Never share authorization codes in chat** - use CLI interactively 2. **Credentials stored unencrypted** - protect `~/.email-bridge/` directory 3. **Email content is sanitized** - prompt injection protection enabled 4. **Daemon runs with user privileges** - no elevated access needed 5. **Subject-only classification** - privacy-conscious processing ## Changelog ### v0.6.2 - Add PROMOTION category for marketing emails (🎁 icon) - Add TRANSACTIONAL category for orders/shipping (📦 icon) - Expand keyword pools for all categories - Add invisible character sanitization (U+200B, U+FEFF, U+034F, etc.) - Improve HTML-to-text extraction - Update documentation (DESIGN.md, README.md) ### v0.6.1 - Add IDLE keepalive (60s timeout) for connection stability - Add sync retry mechanism (up to 3 retries) - Improve daemon reliability for flaky networks ### v0.6.0 - Smart notification format based on email category - Prompt injection protection with `sanitize_for_notification()` - HTML-to-text fallback for HTML-only emails - Subject-only classification for privacy - Context-aware verification code extraction - Category icons (🔐 ⚠️ 📦 🎁 📰 🚫) ### v0.5.7 - Initial ClawHub release - Gmail, QQ Mail, NetEase support - IMAP IDLE real-time notifications - Verification code extraction - Link extraction FILE:DESIGN.md # Email Bridge Design Document ## Overview Email Bridge is a minimal personal email middleware designed for AI assistants. It provides real-time email monitoring, verification code extraction, and seamless integration with OpenClaw for push notifications. **Current Version**: 0.6.1 **Supported Providers**: Gmail (API), QQ Mail (IMAP), NetEase Mail (IMAP) --- ## Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ OpenClaw Integration │ │ Push notifications via `openclaw system event` │ └─────────────────────────────────────────────────────────────────┘ ▲ │ notify ┌─────────────────────────────────────────────────────────────────┐ │ Daemon (daemon.py) │ │ Background monitoring with IMAP IDLE + polling │ │ Keepalive (60s) + sync retry (3x) for reliability │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ CLI (cli.py) │ │ Click-based commands │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Service Layer (service.py) │ │ Business logic, orchestration, sync │ └─────────────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌──────────────────┐ ┌─────────────────────────────────────┐ │ Database (db.py) │ │ Adapters (adapters/) │ │ SQLite │ │ Provider interface │ └──────────────────┘ └─────────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Gmail │ │ IMAP │ │ Mock │ │ API │ │ QQ/Net │ │ Adapter │ └─────────┘ └─────────┘ └─────────┘ ``` --- ## Core Components ### 1. Data Models (`models.py`) Pydantic models for type safety: - **Account**: Email account configuration and metadata - **Message**: Email message with optional cached content - **EmailCategory**: Enum for message classification (verification, security, transactional, promotion, subscription, spam_like, normal) - **EmailProvider**: Supported providers (gmail, qq, netease, mock) ### 2. Database Layer (`db.py`) SQLite-backed storage with simple schema: **accounts table:** - id, email, provider, status, display_name, config, timestamps **messages table:** - id, account_id, message_id, subject, sender, recipients - received_at, is_read, is_starred, category - preview, body_text, body_html (cached content) - provider_data (provider-specific raw data) ### 3. Service Layer (`service.py`) Clean API layer providing: - Account CRUD operations - Message listing, searching, retrieval - Sync orchestration (pull from adapters to DB) - Statistics ### 4. Adapter Interface (`adapters/base.py`) Abstract base class for provider implementations: ```python class BaseAdapter(ABC): @property @abstractmethod def provider(self) -> EmailProvider: ... @abstractmethod def fetch_messages(self, account, options) -> Iterator[RawMessage]: ... @abstractmethod def get_message(self, account, message_id) -> RawMessage: ... def authenticate(self, account) -> bool: ... def test_connection(self, account) -> bool: ... ``` ### 5. Gmail Adapter (`adapters/gmail.py`) Real Gmail API integration using OAuth2: - **Authentication**: OAuth2 with automatic token caching - **Recent-window sync**: Configurable days back and message limit ### 6. IMAP Adapter (`adapters/imap.py`) IMAP integration for QQ Mail and NetEase Mail: - **QQ Mail**: `imap.qq.com:993` with app-specific password - **NetEase**: `imap.163.com`, `imap.126.com`, `imap.yeah.net` with authorization code - **Real-time monitoring**: IMAP IDLE for push notifications - **Fallback polling**: For Gmail (no IDLE support) ### 7. Category Detection (`categories.py`) **Subject-only classification** for fast, reliable categorization: ```python class EmailCategory(str, Enum): VERIFICATION = "verification" # Verification codes, activation links SECURITY = "security" # Security alerts, login warnings TRANSACTIONAL = "transactional" # Order confirmations, receipts PROMOTION = "promotion" # Marketing emails, promotions, rewards SUBSCRIPTION = "subscription" # Newsletters, digests SPAM_LIKE = "spam_like" # Likely spam/promotional NORMAL = "normal" # Regular personal/business email ``` **Keyword pools** (more specific phrases first, subject-only matching): | Category | Example Keywords | |----------|-----------------| | verification | 验证码, verification code, OTP, activate your account, 绑定邮箱 | | security | 安全提醒, security alert, login attempt, password reset | | transactional | 订单确认, order #, 发货通知, receipt, invoice | | promotion | 奖励, 优惠, promo, discount, USDT, BTC | | subscription | newsletter, unsubscribe, 订阅, 退订 | | spam_like | 恭喜您中奖, winner, click here now, FREE | **Design principle**: Only match against subject line (not body) for faster classification and lower false positive rate. ### 8. Extraction Module (`extraction.py`) **Strict verification code extraction**: 1. **Context-aware only**: Codes must appear near verification keywords 2. **No standalone extraction**: Prevents false positives (addresses, order numbers) 3. **Address pattern exclusion**: STE, ST, AVE, BLVD etc. filtered out **Patterns matched**: - Numeric codes (4-8 digits) near "验证码", "code is", "OTP" - Alphanumeric codes near verification keywords - Chinese patterns: "验证码:123456" **Action link extraction**: - verify/confirm/activate links - reset password links - unsubscribe links ### 9. Sanitization (`sanitize.py`) **Content cleaning for notifications**: 1. **Invisible character removal**: Zero-width spaces (U+200B-U+200F), BOM (U+FEFF), etc. 2. **Prompt injection prevention**: Remove dangerous patterns (ignore instructions, role-play, etc.) 3. **Control character filtering**: Strip non-printable characters 4. **Whitespace normalization**: Collapse multiple spaces 5. **Length limiting**: Truncate with ellipsis **Why it matters**: HTML emails often contain invisible formatting characters that appear as garbage in plain text output. The sanitization layer ensures clean, readable notifications. ### 10. Daemon (`daemon.py`) Background service for real-time email monitoring: **Key Features**: - **IMAP IDLE**: Real-time push for QQ/NetEase (timeout = 60s keepalive) - **Polling**: Gmail fallback (configurable interval, default 5 min) - **Sync retry**: Up to 3 retries on sync failure - **OpenClaw integration**: Push notifications via `openclaw system event` **Configuration** (`~/.email-bridge/config.json`): ```json { "daemon": { "poll_interval": 300, "notify_openclaw": true, "notification": { "include_body": true, "body_max_length": 500, "include_verification_codes": true, "include_links": false } } } ``` **Notification Format**: - 🔐 verification: Emphasizes verification codes - ⚠️ security: Warning icon + body summary - 📦 transactional: Order/shipping notifications - 🎁 promotion: Marketing/promotional content - 📰 subscription: Simplified display - 🚫 spam_like: Marked as "疑似垃圾" - normal: Standard format --- ## Data Flow ### Real-time Monitoring Flow ``` 1. Daemon starts → connects to IMAP/loads Gmail poller 2. IMAP IDLE waits for new mail (keepalive every 60s) 3. New mail detected → sync with retry (up to 3x) 4. Extract verification codes (if keywords present) 5. Format notification based on category 6. Push to OpenClaw via `openclaw system event` ``` ### Sync Process ``` 1. CLI calls `service.sync_account(account_id)` 2. Service fetches adapter for account's provider 3. Adapter yields `RawMessage` objects 4. Service classifies by subject (category detection) 5. Service extracts verification codes (if applicable) 6. Service persists to SQLite ``` --- ## Design Decisions 1. **Subject-only classification**: Fast, reliable, no body parsing needed 2. **Strict code extraction**: Context-aware only, prevents false positives 3. **Keepalive + retry**: Handles network flakiness gracefully 4. **SQLite**: Zero-config, sufficient for personal use 5. **Pydantic**: Type safety without ORM overhead 6. **Adapter pattern**: Clean separation for multiple providers 7. **OpenClaw push**: Real-time notifications to AI assistant 8. **Daemon mode**: Background service with IMAP IDLE --- ## Key Improvements (v0.6.1) ### Category Detection - **Subject-only**: No body scanning for classification - **Expanded keywords**: Verification, security, transactional categories fully populated - **Chinese + English**: Bilingual keyword support - **Specific phrases**: Avoid single-character keywords (e.g., "确认" alone is too broad) ### Verification Code Extraction - **Context-required**: Codes must appear near verification keywords - **No standalone extraction**: Removed guessing of isolated numbers - **Address exclusion**: Filter out STE, ST, AVE patterns - **Lower false positive rate**: Glassdoor addresses no longer misidentified ### Daemon Reliability - **Keepalive**: 60-second IDLE timeout acts as heartbeat - **Sync retry**: 3 retries with 2-second delay between attempts - **Reconnection**: Automatic reconnect on connection errors - **Graceful degradation**: If sync fails, logs error and continues monitoring --- ## Configuration ### Account Setup ```bash # Add Gmail account (OAuth2) email-bridge accounts add --provider gmail --email [email protected] # Add QQ Mail (app-specific password) email-bridge accounts add --provider qq --email [email protected] --password <app-password> # Add NetEase (authorization code) email-bridge accounts add --provider netease --email [email protected] --password <auth-code> ``` ### Daemon Management ```bash # Start daemon in background email-bridge daemon start -d # Check status email-bridge daemon status # Stop daemon email-bridge daemon stop ``` --- ## Future Considerations - **Attachment handling**: Download and process attachments - **Full-text search**: SQLite FTS5 for better search - **Multi-user support**: Account isolation for teams - **Web UI**: FastAPI backend with React/Vue frontend - **LLM classification**: Replace keyword matching with ML FILE:PROGRESS.md # Email Bridge 开发进度 > 最后更新:2026-03-25 11:30 AM --- ## 已完成 ### v0.5.7 - 核心功能 **发布地址**:https://clawhub.ai/skills/email-bridge **功能清单**: | 功能 | 状态 | 说明 | |------|------|------| | Gmail 接收 | ✅ | Gmail API + OAuth | | QQ 邮箱接收 | ✅ | IMAP + 授权码 | | 网易邮箱接收 | ✅ | IMAP + 授权码 | | SMTP 发送 | ✅ | 所有邮箱 | | 守护进程 | ✅ | 后台运行 | | IMAP IDLE | ✅ | QQ/网易实时推送 | | 新邮件通知 | ✅ | 通过 `openclaw system event` | | 验证码提取 | ✅ | 正则匹配 + 上下文验证 | | 链接提取 | ✅ | 识别验证/重置/退订链接 | ### v0.6.0 - 通知增强 + 安全防护 ✅ NEW **提交**:`0165c5a` **新增功能**: | 功能 | 状态 | 说明 | |------|------|------| | 配置开关 | ✅ | `include_body`, `include_verification_codes`, `body_max_length`, `include_links` | | 提示词注入防护 | ✅ | `sanitize_for_notification()` 函数 | | JSON 结构化通知 | ✅ | 防止纯文本拼接导致的注入风险 | | 验证码自动提取 | ✅ | 通知中可包含验证码 | --- ## 配置示例 ```json { "daemon": { "poll_interval": 300, "notify_openclaw": true, "notification": { "include_body": false, "body_max_length": 500, "include_verification_codes": true, "include_links": false } } } ``` 配置文件位置:`~/.email-bridge/config.json` --- ## 待开发 ### 优先级 1:测试与验证 1. 重启守护进程测试新通知格式 2. 发送测试邮件验证: - 验证码是否正确提取 - 正文是否正确清洗 - JSON 格式是否正确解析 ### 优先级 2:发布更新 - 更新 ClawHub 版本到 0.6.0 - 更新 README 文档 --- ## 仓库信息 - **GitHub**:`~/repos/email-bridge` - **ClawHub**:`[email protected]` - **守护进程日志**:`~/.email-bridge/daemon.log` - **数据库**:`~/.email-bridge/email_bridge.db` --- ## 相关文件 ``` ~/repos/email-bridge/ ├── email_bridge/ │ ├── daemon.py # 守护进程(已更新通知逻辑) │ ├── sanitize.py # 内容清洗(新增) │ ├── extraction.py # 验证码提取 │ ├── service.py # 业务逻辑 │ └── cli.py # CLI(已更新配置加载) ├── config.example.json # 配置示例(已更新) ├── SKILL.md # ClawHub 文档 └── README.md # 使用文档 ``` --- ## 联系方式 - Ryan(陈秋宇) - 飞书:ou_f0fd6de033ff968791eef05ea8a9a26c FILE:README.md # Email Bridge A personal email middleware with real-time notifications, verification code extraction, and multi-provider support (Gmail, QQ Mail, NetEase). ## Quick Start ### 安装 ```bash # 克隆项目 git clone https://github.com/ryanchan720/email-bridge.git cd email-bridge # 安装 pip install -e . # 验证安装 python test_installation.py ``` ### 配置账户 ⚠️ **安全提示**: 请使用 CLI 本地配置,**不要在聊天中分享授权码**。 ```bash # 添加账户(会交互式提示输入授权码) email-bridge accounts add [email protected] -p qq # 同步邮件 email-bridge sync # 启动守护进程 email-bridge daemon start -d ``` ## 添加邮箱账户 ### QQ 邮箱(推荐,5 分钟) 1. 获取授权码:https://service.mail.qq.com/detail/0/75 2. 添加账户: ```bash email-bridge accounts add [email protected] -p qq # 系统会提示输入授权码 ``` ### 网易邮箱(163/126) 1. 在邮箱设置中开启 IMAP/SMTP 服务,获取授权码 2. 添加账户: ```bash email-bridge accounts add [email protected] -p netease \ --config '{"password": "YOUR_AUTH_CODE"}' ``` ### Gmail(高级用户,20 分钟) Gmail 需要 OAuth 配置,详见 [Gmail 配置指南](#gmail-配置指南)。 ## 日常使用 ```bash # 同步邮件 email-bridge sync # 查看最近邮件 email-bridge messages list -n 10 # 启动守护进程(实时通知) email-bridge daemon start -d # 提取验证码 email-bridge codes # 发送邮件 email-bridge send -a <account_id> -t [email protected] -s "主题" -b "正文" ``` --- ## Gmail 配置指南 > ⚠️ **注意**:Gmail API 配置流程较复杂,需要 Google Cloud 项目和 OAuth 授权。如果你只需要基本的收发邮件功能,建议等待后续 IMAP/SMTP 支持。 ### 前置条件 - 一个 Google 账号(Gmail 邮箱) - 能访问 Google Cloud Console(可能需要科学上网) ### Step 1:创建 Google Cloud 项目 1. 打开 [Google Cloud Console](https://console.cloud.google.com/) 2. 登录你的 Google 账号 3. 页面顶部点击项目选择器,点击 **"NEW PROJECT"** 4. 项目名称随便填(如 `email-bridge`),点击 **"CREATE"** 5. 创建完成后选中该项目 ### Step 2:启用 Gmail API 1. 打开 [Gmail API 页面](https://console.cloud.google.com/apis/library/gmail.googleapis.com) 2. 确保顶部显示的是刚创建的项目 3. 点击 **"ENABLE"** ### Step 3:配置 OAuth 同意屏幕 1. 打开 [OAuth 同意屏幕配置](https://console.cloud.google.com/apis/credentials/consent) 2. 用户类型选择 **"External"**,点击 **"CREATE"** **填写配置:** | 字段 | 填写内容 | |------|----------| | App name | `Email Bridge` 或任意名称 | | User support email | 选择你的邮箱 | | Developer contact | 填你的邮箱 | | 其他字段 | 可留空 | 点击 **"SAVE AND CONTINUE"**。 **Scopes(权限范围):** 1. 点击 **"ADD OR REMOVE SCOPES"** 2. 搜索 `gmail.readonly`,勾选 `https://www.googleapis.com/auth/gmail.readonly` 3. 点击 **"UPDATE"**,然后 **"SAVE AND CONTINUE"** > 💡 `gmail.readonly` 是只读权限,足够用于读取邮件。如需发送邮件,请使用 SMTP(需配置应用专用密码)。 **Test Users(测试用户):** 1. 点击 **"ADD USERS"** 2. 填写你的 Gmail 地址 3. 点击 **"ADD"**,然后 **"SAVE AND CONTINUE"** 最后检查无误,点击 **"BACK TO DASHBOARD"**。 ### Step 4:创建 OAuth 客户端凭证 1. 打开 [Credentials 页面](https://console.cloud.google.com/apis/credentials) 2. 点击 **"CREATE CREDENTIALS"** → **"OAuth client ID"** 3. Application type 选择 **"Desktop app"** 4. Name 随便填(如 `Email Bridge CLI`) 5. 点击 **"CREATE"** 6. 在弹出窗口点击 **"DOWNLOAD JSON"** 下载凭证文件 ### Step 5:安装凭证文件 ```bash # 创建目录 mkdir -p ~/.email-bridge/gmail # 将下载的凭证文件移动过去,重命名为 credentials.json mv ~/Downloads/client_secret_xxx.json ~/.email-bridge/gmail/credentials.json ``` ### Step 6:添加账户并授权 ```bash # 添加 Gmail 账户 email-bridge accounts add [email protected] --provider gmail --name "Personal Gmail" # 首次同步,会打开浏览器要求授权 email-bridge sync -a <account_id> ``` 授权完成后,token 会自动保存在 `~/.email-bridge/gmail/token_*.json`,后续同步无需再次授权。 ### Gmail 配置选项 ```bash # 自定义同步范围:最近 3 天,最多 50 封 email-bridge accounts add [email protected] -p gmail \ --config '{"sync_days": 3, "sync_max_messages": 50}' ``` | 配置项 | 说明 | 默认值 | |--------|------|--------| | `sync_days` | 同步最近 N 天的邮件 | 7 | | `sync_max_messages` | 每次同步最大邮件数 | 100 | | `credentials_path` | 自定义凭证文件路径 | `~/.email-bridge/gmail/credentials.json` | | `token_path` | 自定义 token 存储路径 | 自动生成 | > ⚠️ **Gmail 发送邮件**:如果需要用 Gmail 发送邮件,需要额外配置应用专用密码: > 1. 开启 Google 账户的两步验证 > 2. 访问 [应用专用密码](https://myaccount.google.com/apppasswords) > 3. 生成一个新密码,选择"邮件"和"其他设备" > 4. 用应用专用密码更新账户配置: > ```bash > email-bridge accounts update <account_id> --config '{"password": "YOUR_APP_PASSWORD"}' > ``` --- ## QQ 邮箱配置 > ✅ **推荐**:配置简单,只需要授权码。 ### Step 1:开启 IMAP 服务 1. 登录 [QQ 邮箱](https://mail.qq.com/) 2. 点击 **设置** → **账户** 3. 找到 **"POP3/IMAP/SMTP/Exchange/CardDAV/CalDAV服务"** 4. 开启 **"IMAP/SMTP服务"** 5. 按提示发送短信验证 6. 获得一个 **16 位授权码**(保存好!) ### Step 2:添加账户 ```bash # 使用授权码(不是登录密码!) email-bridge accounts add [email protected] -p qq \ --config '{"password": "YOUR_16_CHAR_AUTH_CODE"}' \ --name "QQ Mail" ``` ### Step 3:同步邮件 ```bash email-bridge sync -a <account_id> ``` --- ## 网易邮箱配置(163/126) > ✅ **推荐**:配置简单,只需要授权码。 ### Step 1:开启 IMAP 服务 **163 邮箱:** 1. 登录 [163 邮箱](https://mail.163.com/) 2. 点击 **设置** → **POP3/SMTP/IMAP** 3. 开启 **"IMAP/SMTP服务"** 4. 获得授权码 **126 邮箱:** 1. 登录 [126 邮箱](https://mail.126.com/) 2. 同上步骤 ### Step 2:添加账户 ```bash # 163 邮箱 email-bridge accounts add [email protected] -p netease \ --config '{"password": "YOUR_AUTH_CODE"}' \ --name "163 Mail" # 126 邮箱 email-bridge accounts add [email protected] -p netease \ --config '{"password": "YOUR_AUTH_CODE"}' \ --name "126 Mail" ``` ### Step 3:同步邮件 ```bash email-bridge sync -a <account_id> ``` --- ## 使用示例 ### 查看邮件 ```bash # 列出最近邮件 email-bridge messages list # 列出最近 20 封 email-bridge messages list -n 20 # 只看未读 email-bridge messages unread # 按关键词搜索 email-bridge messages search --keyword "verification" # 按日期范围搜索 email-bridge messages search --from-date 2026-03-20 --to-date 2026-03-24 # 查看邮件详情 email-bridge messages get <message_id> ``` ### 验证码提取 ```bash # 提取邮件中的验证码 email-bridge extract <message_id> --codes # 提取操作链接 email-bridge extract <message_id> --links # 两者都要 email-bridge extract <message_id> --all # 快速查找最近验证码 email-bridge codes email-bridge codes -a <account_id> ``` ### 发送邮件 ```bash # 发送简单邮件 email-bridge send -a <account_id> -t [email protected] -s "主题" -b "正文内容" # 发送给多人 email-bridge send -a <account_id> -t [email protected] -t [email protected] -s "主题" -b "正文" # 抄送 email-bridge send -a <account_id> -t [email protected] -c [email protected] -s "主题" -b "正文" # 发送 HTML 邮件 email-bridge send -a <account_id> -t [email protected] -s "主题" --html "<h1>标题</h1><p>内容</p>" ``` ### 守护进程(实时监听) Email Bridge 支持后台运行,实时监听新邮件: ```bash # 启动守护进程(后台运行) email-bridge daemon start -d # 查看状态 email-bridge daemon status # 停止守护进程 email-bridge daemon stop # 自定义轮询间隔(默认 300 秒,仅影响 Gmail) email-bridge daemon start -d -i 60 # 禁用 OpenClaw 通知 email-bridge daemon start -d --no-notify ``` **工作模式:** | 邮箱 | 模式 | 延迟 | |------|------|------| | QQ 邮箱 | IDLE(实时推送) | 秒级 | | 网易邮箱 | IDLE(实时推送) | 秒级 | | Gmail | 轮询 | 可配置(默认 5 分钟) | > 💡 QQ/网易邮箱使用 IMAP IDLE 协议,服务器有新邮件会主动推送通知。Gmail 不支持 IDLE,使用定时轮询。 **通知内容:** 收到新邮件时,会通过 `openclaw system event` 发送通知,包含发件人和主题: ``` 📧 新邮件: [email protected] 1. 发件人名称 邮件主题 2. 发件人名称 邮件主题 ... 还有 N 封 ``` > 💡 通知只在有新邮件到达时触发,不会重复通知已存在的邮件。 **日志文件:** `~/.email-bridge/daemon.log` ### 账户管理 ```bash # 列出所有账户 email-bridge accounts list # 更新账户名称 email-bridge accounts update <id> --name "New Name" # 禁用/启用账户 email-bridge accounts update <id> --disable email-bridge accounts update <id> --enable # 删除账户(同时删除邮件) email-bridge accounts delete <id> ``` ### 同步 ```bash # 同步所有账户 email-bridge sync # 同步指定账户 email-bridge sync -a <account_id> # 同步最近 30 天 email-bridge sync --days 30 ``` ### 统计 ```bash email-bridge stats ``` --- ## 邮件分类 邮件根据主题关键词自动分类(仅扫描主题,不扫描正文): | 分类 | 说明 | 示例关键词 | |------|------|-----------| | `verification` | 验证码、账户确认 | 验证码, OTP, activate, 绑定邮箱 | | `security` | 安全提醒、登录通知 | 安全提醒, security alert, 密码修改 | | `transactional` | 订单、支付、发货 | 订单确认, receipt, 发货通知 | | `promotion` | 营销推广、优惠活动 | 奖励, 优惠, promo, discount | | `subscription` | 订阅、Newsletter | newsletter, 订阅, weekly digest | | `spam_like` | 疑似垃圾邮件 | 中奖, FREE, click here now | | `normal` | 普通邮件(默认) | - | **通知格式**: - 🔐 verification:重点显示验证码 - ⚠️ security:突出警示信息 - 📦 transactional:订单/发货通知 - 🎁 promotion:营销推广 - 📰 subscription:订阅内容 - 🚫 spam_like:标记但不展开 --- ## 功能总结 | 功能 | Gmail | QQ 邮箱 | 网易邮箱 | |------|-------|---------|----------| | 接收邮件 | ✅ API | ✅ IMAP | ✅ IMAP | | 发送邮件 | ✅ SMTP* | ✅ SMTP | ✅ SMTP | | 实时监听 | ✅ 轮询 | ✅ IDLE | ✅ IDLE | | 新邮件通知 | ✅ | ✅ | ✅ | | 验证码提取 | ✅ | ✅ | ✅ | | 链接提取 | ✅ | ✅ | ✅ | | 自动分类 | ✅ | ✅ | ✅ | *\*Gmail 发送需要额外配置应用专用密码* --- ## 数据存储 所有数据存储在 `~/.email-bridge/`: ``` ~/.email-bridge/ ├── email_bridge.db # SQLite 数据库 ├── daemon.pid # 守护进程 PID(运行时) ├── daemon.log # 守护进程日志 └── gmail/ ├── credentials.json # OAuth 凭证 └── token_xxx.json # 授权 Token(自动生成) ``` --- ## 项目结构 ``` email-bridge/ ├── email_bridge/ │ ├── cli.py # CLI 入口(Click) │ ├── service.py # 业务逻辑层 │ ├── models.py # Pydantic 数据模型 │ ├── db.py # SQLite 存储层 │ ├── categories.py # 分类检测 │ ├── extraction.py # 验证码/链接提取 │ ├── daemon.py # 守护进程 │ └── adapters/ │ ├── base.py # Adapter 接口 │ ├── mock.py # Mock 适配器(演示用) │ ├── gmail.py # Gmail API 适配器 │ ├── imap.py # IMAP 适配器(QQ/网易) │ └── smtp.py # SMTP 发送适配器 ├── fixtures/ │ └── sample_emails.json ├── tests/ ├── README.md └── DESIGN.md ``` --- ## Roadmap **当前版本 v0.5:** - ✅ Gmail API 集成(OAuth2) - ✅ QQ 邮箱 IMAP/SMTP 集成 - ✅ 网易邮箱(163/126)IMAP/SMTP 集成 - ✅ 验证码提取 - ✅ 操作链接提取 - ✅ 发送邮件 - ✅ 守护进程模式 - ✅ Mock 模式演示 **计划中:** - ⏳ Web UI - ⏳ 附件处理 --- ## License MIT FILE:config.example.json { "accounts": [ { "email": "[email protected]", "provider": "qq", "name": "QQ Mail", "config": { "password": "YOUR_QQ_MAIL_AUTH_CODE" }, "_comment": "QQ邮箱授权码获取: https://service.mail.qq.com/detail/0/75" }, { "email": "[email protected]", "provider": "netease", "name": "163 Mail", "config": { "password": "YOUR_163_AUTH_CODE" }, "_comment": "163邮箱授权码: 设置 → POP3/SMTP/IMAP → 开启服务" }, { "email": "[email protected]", "provider": "gmail", "name": "Gmail", "_comment": "Gmail需要OAuth配置,请参考README.md中的Gmail配置指南" } ], "daemon": { "poll_interval": 300, "notify_openclaw": true, "notification": { "include_body": false, "body_max_length": 500, "include_verification_codes": true, "include_links": false } }, "_help": { "qq_auth_code": "QQ邮箱: 设置 → 账户 → POP3/IMAP/SMTP服务 → 生成授权码", "netease_auth_code": "163/126邮箱: 设置 → POP3/SMTP/IMAP → 开启服务 → 获取授权码", "gmail_oauth": "Gmail需要Google Cloud项目配置OAuth,详见README.md" } } FILE:email_bridge/__init__.py """Email Bridge - Personal email middleware with real-time notifications.""" __version__ = "0.5.5" FILE:email_bridge/adapters/__init__.py """Email provider adapters.""" from .base import BaseAdapter, FetchOptions, RawMessage from .mock import MockAdapter from .gmail import GmailAdapter, GmailAdapterError, GmailCredentialsNotFoundError, GmailAuthError from .imap import IMAPAdapter, QQMailAdapter, NetEaseMailAdapter, IMAPAdapterError, IMAPAuthError from .smtp import SMTPAdapter, SMTPAdapterError, SMTPAuthError, SMTPSendError __all__ = [ "BaseAdapter", "FetchOptions", "RawMessage", "MockAdapter", "GmailAdapter", "GmailAdapterError", "GmailCredentialsNotFoundError", "GmailAuthError", "IMAPAdapter", "QQMailAdapter", "NetEaseMailAdapter", "IMAPAdapterError", "IMAPAuthError", "SMTPAdapter", "SMTPAdapterError", "SMTPAuthError", "SMTPSendError", ] FILE:email_bridge/adapters/base.py """Base adapter interface for email providers.""" from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime from typing import Iterator, Optional from ..models import Account, EmailProvider @dataclass class FetchOptions: """Options for fetching messages.""" since: Optional[datetime] = None limit: int = 100 unread_only: bool = False @dataclass class RawMessage: """Raw message data from a provider.""" message_id: str subject: str sender: str sender_name: Optional[str] recipients: list[str] received_at: datetime is_read: bool body_text: Optional[str] = None body_html: Optional[str] = None # Provider-specific raw data raw_data: dict = None def __post_init__(self): if self.raw_data is None: self.raw_data = {} class BaseAdapter(ABC): """Abstract base class for email provider adapters. Future implementations: - GmailAdapter: Uses Gmail API (OAuth2) - QQAdapter: Uses IMAP or QQ Mail API - NetEaseAdapter: Uses IMAP """ @property @abstractmethod def provider(self) -> EmailProvider: """Return the provider this adapter handles.""" pass @abstractmethod def fetch_messages( self, account: Account, options: FetchOptions = None ) -> Iterator[RawMessage]: """Fetch messages from the provider. This is a generator to support streaming large result sets. """ pass @abstractmethod def get_message(self, account: Account, message_id: str) -> Optional[RawMessage]: """Fetch a single message by ID.""" pass def authenticate(self, account: Account) -> bool: """Authenticate with the provider. Override this for providers requiring OAuth2 or other auth flows. Returns True if authentication is valid/successful. """ return True def test_connection(self, account: Account) -> bool: """Test the connection to the provider. Override this for health checks. """ return True FILE:email_bridge/adapters/gmail.py """Gmail adapter using Gmail API with OAuth2.""" import base64 import os from datetime import datetime, timedelta from pathlib import Path from typing import Iterator, Optional from ..models import Account, EmailProvider from .base import BaseAdapter, FetchOptions, RawMessage # Gmail API scopes - read-only access SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"] # Default paths for credentials DEFAULT_CREDENTIALS_DIR = Path.home() / ".email-bridge" / "gmail" class GmailAdapterError(Exception): """Base exception for Gmail adapter errors.""" pass class GmailCredentialsNotFoundError(GmailAdapterError): """Raised when credentials file is not found.""" pass class GmailAuthError(GmailAdapterError): """Raised when authentication fails.""" pass class GmailAdapter(BaseAdapter): """Gmail adapter using Gmail API. Configuration (via account.config): credentials_path: Path to credentials.json from Google Cloud Console token_path: Path to store/load OAuth token (default: ~/.email-bridge/gmail/token.json) sync_days: Number of days back to sync (default: 7) sync_max_messages: Maximum messages per sync (default: 100) Setup: 1. Create a Google Cloud project 2. Enable Gmail API 3. Create OAuth 2.0 Desktop credentials 4. Download credentials.json to ~/.email-bridge/gmail/credentials.json 5. Run first sync - browser will open for OAuth consent 6. Token is cached for subsequent runs """ @property def provider(self) -> EmailProvider: return EmailProvider.GMAIL def __init__( self, credentials_dir: Optional[Path] = None, ): """Initialize Gmail adapter. Args: credentials_dir: Directory to store credentials and tokens """ self.credentials_dir = credentials_dir or DEFAULT_CREDENTIALS_DIR self._service = None def _get_credentials_path(self, account: Account) -> Path: """Get credentials path from account config or default.""" if "credentials_path" in account.config: return Path(account.config["credentials_path"]) return self.credentials_dir / "credentials.json" def _get_token_path(self, account: Account) -> Path: """Get token path from account config or default.""" if "token_path" in account.config: return Path(account.config["token_path"]) # Use account-specific token file safe_email = account.email.replace("@", "_at_").replace(".", "_") return self.credentials_dir / f"token_{safe_email}.json" def _get_credentials(self, account: Account): """Load or create OAuth2 credentials. Returns credentials object for Gmail API. Raises GmailCredentialsNotFoundError if credentials.json not found. Raises GmailAuthError if authentication fails. """ try: from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow except ImportError as e: raise GmailAdapterError( "Gmail dependencies not installed. Run: pip install google-api-python-client google-auth google-auth-oauthlib" ) from e credentials_path = self._get_credentials_path(account) token_path = self._get_token_path(account) creds = None # Load existing token if available if token_path.exists(): try: creds = Credentials.from_authorized_user_file(str(token_path), SCOPES) except Exception: creds = None # Refresh if expired if creds and creds.expired and creds.refresh_token: try: creds.refresh(Request()) # Save refreshed token token_path.parent.mkdir(parents=True, exist_ok=True) token_path.write_text(creds.to_json()) except Exception as e: creds = None # If no valid credentials, need to authenticate if not creds or not creds.valid: if not credentials_path.exists(): raise GmailCredentialsNotFoundError( f"Gmail credentials not found at {credentials_path}. " "Download credentials.json from Google Cloud Console. " "See README.md for setup instructions." ) try: flow = InstalledAppFlow.from_client_secrets_file( str(credentials_path), SCOPES ) creds = flow.run_local_server(port=0) # Save token for future use token_path.parent.mkdir(parents=True, exist_ok=True) token_path.write_text(creds.to_json()) except Exception as e: raise GmailAuthError(f"Failed to authenticate with Gmail: {e}") from e return creds def _get_service(self, account: Account): """Get or create Gmail API service.""" if self._service is None: try: from googleapiclient.discovery import build except ImportError as e: raise GmailAdapterError( "Gmail dependencies not installed. Run: pip install google-api-python-client" ) from e creds = self._get_credentials(account) self._service = build("gmail", "v1", credentials=creds) return self._service def _parse_message(self, msg_data: dict) -> RawMessage: """Parse Gmail API message data into RawMessage.""" headers = {h["name"]: h["value"] for h in msg_data.get("payload", {}).get("headers", [])} # Extract sender info sender = headers.get("From", "") sender_name = None if "<" in sender and ">" in sender: # Parse "Name <[email protected]>" format import re match = re.match(r"(.+?)\s*<(.+?)>", sender) if match: sender_name = match.group(1).strip().strip('"') sender = match.group(2) # Extract recipients recipients = [] for field in ["To", "Cc"]: if field in headers: # Simple extraction - could be improved for complex formats recipients.extend([r.strip() for r in headers[field].split(",")]) # Extract body body_text = None body_html = None payload = msg_data.get("payload", {}) def extract_body(part: dict) -> tuple[Optional[str], Optional[str]]: """Recursively extract body from message parts.""" text = None html = None if part.get("mimeType") == "text/plain" and "data" in part.get("body", {}): data = part["body"]["data"] text = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace") elif part.get("mimeType") == "text/html" and "data" in part.get("body", {}): data = part["body"]["data"] html = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace") for subpart in part.get("parts", []): sub_text, sub_html = extract_body(subpart) if sub_text and not text: text = sub_text if sub_html and not html: html = sub_html return text, html body_text, body_html = extract_body(payload) # Parse date date_str = headers.get("Date", "") try: # Gmail provides RFC 2822 format from email.utils import parsedate_to_datetime received_at = parsedate_to_datetime(date_str) except Exception: received_at = datetime.utcnow() # Get labels for read status labels = msg_data.get("labelIds", []) is_read = "UNREAD" not in labels return RawMessage( message_id=msg_data["id"], subject=headers.get("Subject", "(no subject)"), sender=sender, sender_name=sender_name, recipients=recipients, received_at=received_at, is_read=is_read, body_text=body_text, body_html=body_html, raw_data={ "threadId": msg_data.get("threadId"), "labelIds": labels, "snippet": msg_data.get("snippet"), }, ) def fetch_messages( self, account: Account, options: FetchOptions = None ) -> Iterator[RawMessage]: """Fetch messages from Gmail. Supports recent-window sync via account.config: - sync_days: Only fetch messages from last N days (default: 7) - sync_max_messages: Maximum messages to fetch (default: 100) """ options = options or FetchOptions() # Get sync configuration sync_days = account.config.get("sync_days", 7) sync_max = account.config.get("sync_max_messages", 100) # Calculate effective limit limit = min(options.limit, sync_max) if options.limit else sync_max # Build query query_parts = [] # Add date filter for recent window if not options.since: since_date = datetime.utcnow() - timedelta(days=sync_days) query_parts.append(f"after:{since_date.strftime('%Y/%m/%d')}") else: query_parts.append(f"after:{options.since.strftime('%Y/%m/%d')}") # Add unread filter if requested if options.unread_only: query_parts.append("is:unread") query = " ".join(query_parts) try: service = self._get_service(account) except (GmailCredentialsNotFoundError, GmailAuthError) as e: # Re-raise to let caller handle raise try: # List messages matching query results = service.users().messages().list( userId="me", q=query, maxResults=limit ).execute() messages = results.get("messages", []) for msg_ref in messages: # Fetch full message details msg_data = service.users().messages().get( userId="me", id=msg_ref["id"], format="full" ).execute() raw_msg = self._parse_message(msg_data) yield raw_msg except Exception as e: if isinstance(e, (GmailCredentialsNotFoundError, GmailAuthError)): raise raise GmailAdapterError(f"Failed to fetch messages: {e}") from e def get_message(self, account: Account, message_id: str) -> Optional[RawMessage]: """Get a specific message by Gmail message ID.""" try: service = self._get_service(account) except (GmailCredentialsNotFoundError, GmailAuthError): return None try: msg_data = service.users().messages().get( userId="me", id=message_id, format="full" ).execute() return self._parse_message(msg_data) except Exception: return None def authenticate(self, account: Account) -> bool: """Test authentication with Gmail. Returns True if credentials are valid. """ try: self._get_credentials(account) return True except (GmailCredentialsNotFoundError, GmailAuthError): return False def test_connection(self, account: Account) -> bool: """Test connection to Gmail API. Returns True if we can connect and fetch profile. """ try: service = self._get_service(account) service.users().getProfile(userId="me").execute() return True except Exception: return False FILE:email_bridge/adapters/imap.py """IMAP adapter for QQ Mail, NetEase Mail, and other IMAP providers.""" import email import imaplib import re from datetime import datetime, timedelta from email.header import decode_header from email.utils import parsedate_to_datetime from typing import Iterator, Optional from ..models import Account, EmailProvider from .base import BaseAdapter, FetchOptions, RawMessage class IMAPAdapterError(Exception): """Base exception for IMAP adapter errors.""" pass class IMAPAuthError(IMAPAdapterError): """Raised when authentication fails. Common causes: - Using login password instead of authorization code - Authorization code has expired or been reset - IMAP service not enabled in email settings """ pass class IMAPConnectionError(IMAPAdapterError): """Raised when connection fails. Common causes: - Network connectivity issues - IMAP server is down - Firewall blocking the connection """ pass # Help URLs for common email providers AUTH_HELP_URLS = { EmailProvider.QQ: "https://service.mail.qq.com/detail/0/75", EmailProvider.NETEASE: "163/126邮箱: 设置 → POP3/SMTP/IMAP → 开启服务", } # IMAP server configurations IMAP_SERVERS = { EmailProvider.QQ: { "host": "imap.qq.com", "port": 993, }, EmailProvider.NETEASE: { # Will be determined by email domain (163.com or 126.com) "163.com": {"host": "imap.163.com", "port": 993}, "126.com": {"host": "imap.126.com", "port": 993}, "yeah.net": {"host": "imap.yeah.net", "port": 993}, }, } class IMAPAdapter(BaseAdapter): """IMAP adapter for QQ Mail and NetEase Mail. Configuration (via account.config): password: Authorization code (NOT the login password) - QQ Mail: Generate at https://service.mail.qq.com/detail/0/75 - 163 Mail: Generate at Settings > POP3/SMTP/IMAP - 126 Mail: Same as 163 Setup: 1. Enable IMAP in your email settings 2. Generate an authorization code (授权码) 3. Add account with the authorization code as password """ def __init__(self): self._connections = {} # Cache connections by account ID @property def provider(self) -> EmailProvider: # This is a base class, subclasses will override raise NotImplementedError def _get_imap_server(self, account: Account) -> tuple[str, int]: """Get IMAP server host and port for the account.""" provider = account.provider email_domain = account.email.split("@")[1].lower() if provider == EmailProvider.QQ: config = IMAP_SERVERS[EmailProvider.QQ] return config["host"], config["port"] elif provider == EmailProvider.NETEASE: netease_config = IMAP_SERVERS[EmailProvider.NETEASE] # Determine by email domain if email_domain in netease_config: return netease_config[email_domain]["host"], netease_config[email_domain]["port"] # Default to 163 return netease_config["163.com"]["host"], netease_config["163.com"]["port"] raise IMAPAdapterError(f"Unknown provider: {provider}") def _connect(self, account: Account) -> imaplib.IMAP4_SSL: """Connect to IMAP server and authenticate.""" host, port = self._get_imap_server(account) password = account.config.get("password") if not password: help_url = AUTH_HELP_URLS.get(account.provider, "") raise IMAPAuthError( f"❌ 未设置授权码: {account.email}\n\n" f"请使用授权码(不是登录密码)配置账户:\n" f" email-bridge accounts add {account.email} -p {account.provider.value} \\\n" f" --config '{{\"password\": \"YOUR_AUTH_CODE\"}}'\n\n" f"📚 获取授权码: {help_url}" ) try: conn = imaplib.IMAP4_SSL(host, port) conn.login(account.email, password) return conn except imaplib.IMAP4.error as e: error_msg = str(e) if "authentication failed" in error_msg.lower() or "login" in error_msg.lower(): help_url = AUTH_HELP_URLS.get(account.provider, "") raise IMAPAuthError( f"❌ 认证失败: {account.email}\n\n" f"可能原因:\n" f" 1. 使用了登录密码而非授权码\n" f" 2. 授权码已过期或被重置\n" f" 3. 未开启 IMAP 服务\n\n" f"📚 获取授权码: {help_url}" ) from e raise IMAPConnectionError( f"❌ 连接失败: {host}:{port}\n\n" f"可能原因:\n" f" 1. 网络连接问题\n" f" 2. 防火墙阻止了连接\n" f" 3. IMAP 服务暂时不可用\n\n" f"错误详情: {e}" ) from e def _get_connection(self, account: Account) -> imaplib.IMAP4_SSL: """Get or create a connection for the account.""" if account.id not in self._connections: self._connections[account.id] = self._connect(account) return self._connections[account.id] def _decode_header_value(self, value: str) -> str: """Decode email header value.""" if not value: return "" decoded_parts = [] for part, charset in decode_header(value): if isinstance(part, bytes): try: decoded_parts.append(part.decode(charset or "utf-8", errors="replace")) except (LookupError, UnicodeDecodeError): decoded_parts.append(part.decode("utf-8", errors="replace")) else: decoded_parts.append(part) return "".join(decoded_parts) def _parse_sender(self, sender: str) -> tuple[str, Optional[str]]: """Parse sender into email and name. Returns: (email, name or None) """ if not sender: return "", None sender = self._decode_header_value(sender) # Try "Name <[email protected]>" format match = re.match(r"(.+?)\s*<(.+?)>", sender) if match: name = match.group(1).strip().strip('"') email_addr = match.group(2).strip() return email_addr, name # Plain email return sender.strip(), None def _parse_message(self, msg: email.message.Message, message_id: str) -> RawMessage: """Parse email.message.Message into RawMessage.""" # Headers subject = self._decode_header_value(msg.get("Subject", "(no subject)")) sender_raw = msg.get("From", "") sender, sender_name = self._parse_sender(sender_raw) # Recipients recipients = [] for field in ["To", "Cc"]: value = msg.get(field) if value: # Simple extraction for addr in value.split(","): addr = addr.strip() if "<" in addr and ">" in addr: match = re.search(r"<(.+?)>", addr) if match: recipients.append(match.group(1)) else: recipients.append(addr) # Date date_str = msg.get("Date", "") try: received_at = parsedate_to_datetime(date_str) except Exception: received_at = datetime.utcnow() # Flags for read status is_read = True # IMAP doesn't always give us flags easily # Body extraction body_text = None body_html = None if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition", "")) # Skip attachments if "attachment" in content_disposition: continue try: payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" text = payload.decode(charset, errors="replace") if content_type == "text/plain" and not body_text: body_text = text elif content_type == "text/html" and not body_html: body_html = text except Exception: pass else: try: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" text = payload.decode(charset, errors="replace") content_type = msg.get_content_type() if content_type == "text/html": body_html = text else: body_text = text except Exception: pass return RawMessage( message_id=message_id, subject=subject, sender=sender, sender_name=sender_name, recipients=recipients, received_at=received_at, is_read=is_read, body_text=body_text, body_html=body_html, raw_data={}, ) def fetch_messages( self, account: Account, options: FetchOptions = None ) -> Iterator[RawMessage]: """Fetch messages from IMAP server.""" options = options or FetchOptions() conn = self._get_connection(account) try: # Select INBOX conn.select("INBOX") # Build search criteria search_criteria = "ALL" # Date filter if options.since: date_str = options.since.strftime("%d-%b-%Y") search_criteria = f'(SINCE "{date_str}")' # Search for messages status, message_ids = conn.search(None, search_criteria) if status != "OK": raise IMAPAdapterError(f"Failed to search messages: {status}") message_id_list = message_ids[0].split() # Apply limit (from newest to oldest) if options.limit and len(message_id_list) > options.limit: message_id_list = message_id_list[-options.limit:] # Fetch messages for msg_id in message_id_list: status, msg_data = conn.fetch(msg_id, "(RFC822)") if status != "OK": continue # Parse the message raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) # Use IMAP message ID as unique ID imap_id = msg_id.decode() raw_msg = self._parse_message(msg, f"imap-{imap_id}") yield raw_msg except IMAPAdapterError: raise except Exception as e: raise IMAPAdapterError(f"Failed to fetch messages: {e}") from e def get_message(self, account: Account, message_id: str) -> Optional[RawMessage]: """Get a specific message by ID.""" if not message_id.startswith("imap-"): return None imap_id = message_id[5:] # Remove "imap-" prefix conn = self._get_connection(account) try: conn.select("INBOX") status, msg_data = conn.fetch(imap_id.encode(), "(RFC822)") if status != "OK": return None raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) return self._parse_message(msg, message_id) except Exception: return None def authenticate(self, account: Account) -> bool: """Test authentication with IMAP server.""" try: conn = self._connect(account) conn.logout() return True except (IMAPAuthError, IMAPConnectionError): return False def test_connection(self, account: Account) -> bool: """Test connection to IMAP server.""" return self.authenticate(account) class QQMailAdapter(IMAPAdapter): """QQ Mail IMAP adapter.""" @property def provider(self) -> EmailProvider: return EmailProvider.QQ class NetEaseMailAdapter(IMAPAdapter): """NetEase Mail (163/126) IMAP adapter.""" @property def provider(self) -> EmailProvider: return EmailProvider.NETEASE FILE:email_bridge/adapters/mock.py """Mock adapter for demo and testing with local JSON fixtures.""" import json from datetime import datetime from pathlib import Path from typing import Iterator, Optional from ..models import Account, EmailProvider from .base import BaseAdapter, FetchOptions, RawMessage # Default fixture path DEFAULT_FIXTURES_PATH = Path(__file__).parent.parent.parent / "fixtures" / "sample_emails.json" class MockAdapter(BaseAdapter): """Mock adapter that loads emails from local JSON fixtures. This allows full demo/testing without real email connections. """ @property def provider(self) -> EmailProvider: return EmailProvider.MOCK def __init__(self, fixtures_path: Optional[Path] = None): self.fixtures_path = fixtures_path or DEFAULT_FIXTURES_PATH self._messages: Optional[list[dict]] = None def _load_fixtures(self) -> list[dict]: """Load messages from JSON fixtures file.""" if self._messages is not None: return self._messages if not self.fixtures_path.exists(): return [] with open(self.fixtures_path, "r", encoding="utf-8") as f: data = json.load(f) self._messages = data.get("messages", []) return self._messages def fetch_messages( self, account: Account, options: FetchOptions = None ) -> Iterator[RawMessage]: """Yield messages from fixtures for the given account.""" options = options or FetchOptions() messages = self._load_fixtures() count = 0 for msg_data in messages: # Filter by account email if specified in fixture if msg_data.get("account_email") and msg_data["account_email"] != account.email: continue received_at = datetime.fromisoformat(msg_data["received_at"]) # Apply filters if options.since and received_at < options.since: continue if options.unread_only and msg_data.get("is_read", False): continue yield RawMessage( message_id=msg_data["message_id"], subject=msg_data["subject"], sender=msg_data["sender"], sender_name=msg_data.get("sender_name"), recipients=msg_data.get("recipients", []), received_at=received_at, is_read=msg_data.get("is_read", False), body_text=msg_data.get("body_text"), body_html=msg_data.get("body_html"), raw_data=msg_data.get("raw_data", {}), ) count += 1 if count >= options.limit: break def get_message(self, account: Account, message_id: str) -> Optional[RawMessage]: """Get a specific message by ID.""" messages = self._load_fixtures() for msg_data in messages: if msg_data["message_id"] == message_id: return RawMessage( message_id=msg_data["message_id"], subject=msg_data["subject"], sender=msg_data["sender"], sender_name=msg_data.get("sender_name"), recipients=msg_data.get("recipients", []), received_at=datetime.fromisoformat(msg_data["received_at"]), is_read=msg_data.get("is_read", False), body_text=msg_data.get("body_text"), body_html=msg_data.get("body_html"), raw_data=msg_data.get("raw_data", {}), ) return None FILE:email_bridge/adapters/smtp.py """SMTP adapter for sending emails via QQ Mail, NetEase Mail, etc.""" import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.utils import formataddr from typing import Optional from ..models import Account, EmailProvider from .base import BaseAdapter class SMTPAdapterError(Exception): """Base exception for SMTP adapter errors.""" pass class SMTPAuthError(SMTPAdapterError): """Raised when authentication fails.""" pass class SMTPSendError(SMTPAdapterError): """Raised when sending fails.""" pass # SMTP server configurations SMTP_SERVERS = { EmailProvider.QQ: { "host": "smtp.qq.com", "port": 465, }, EmailProvider.NETEASE: { "163.com": {"host": "smtp.163.com", "port": 465}, "126.com": {"host": "smtp.126.com", "port": 465}, "yeah.net": {"host": "smtp.yeah.net", "port": 465}, }, EmailProvider.GMAIL: { "host": "smtp.gmail.com", "port": 465, }, } class SMTPAdapter: """SMTP adapter for sending emails. Configuration (via account.config): password: Authorization code (NOT the login password) - QQ Mail: Same as IMAP authorization code - 163/126: Same as IMAP authorization code - Gmail: App password (need to enable 2FA first) """ def _get_smtp_server(self, account: Account) -> tuple[str, int]: """Get SMTP server host and port for the account.""" provider = account.provider email_domain = account.email.split("@")[1].lower() if provider == EmailProvider.QQ: config = SMTP_SERVERS[EmailProvider.QQ] return config["host"], config["port"] elif provider == EmailProvider.NETEASE: netease_config = SMTP_SERVERS[EmailProvider.NETEASE] if email_domain in netease_config: return netease_config[email_domain]["host"], netease_config[email_domain]["port"] return netease_config["163.com"]["host"], netease_config["163.com"]["port"] elif provider == EmailProvider.GMAIL: config = SMTP_SERVERS[EmailProvider.GMAIL] return config["host"], config["port"] raise SMTPAdapterError(f"Unknown provider: {provider}") def send_email( self, account: Account, to: list[str], subject: str, body_text: Optional[str] = None, body_html: Optional[str] = None, cc: Optional[list[str]] = None, bcc: Optional[list[str]] = None, from_name: Optional[str] = None, ) -> bool: """Send an email. Args: account: The account to send from to: List of recipient email addresses subject: Email subject body_text: Plain text body body_html: HTML body cc: List of CC recipients bcc: List of BCC recipients from_name: Display name for sender Returns: True if sent successfully Raises: SMTPAuthError: Authentication failed SMTPSendError: Sending failed """ password = account.config.get("password") if not password: raise SMTPAuthError( f"Password/authorization code not set for {account.email}. " "Please set it in account config." ) host, port = self._get_smtp_server(account) # Build message msg = MIMEMultipart("alternative") msg["Subject"] = subject # From if from_name: msg["From"] = formataddr((from_name, account.email)) else: msg["From"] = account.email # To msg["To"] = ", ".join(to) # CC if cc: msg["Cc"] = ", ".join(cc) # Body if body_text: msg.attach(MIMEText(body_text, "plain", "utf-8")) if body_html: msg.attach(MIMEText(body_html, "html", "utf-8")) if not body_text and not body_html: msg.attach(MIMEText("", "plain", "utf-8")) # All recipients all_recipients = to + (cc or []) + (bcc or []) try: # Connect and send with smtplib.SMTP_SSL(host, port) as server: server.login(account.email, password) server.sendmail(account.email, all_recipients, msg.as_string()) return True except smtplib.SMTPAuthenticationError as e: raise SMTPAuthError( f"Authentication failed for {account.email}. " "Make sure you're using the authorization code, not your password." ) from e except smtplib.SMTPException as e: raise SMTPSendError(f"Failed to send email: {e}") from e except Exception as e: raise SMTPSendError(f"Failed to send email: {e}") from e def test_connection(self, account: Account) -> bool: """Test SMTP connection and authentication.""" password = account.config.get("password") if not password: return False host, port = self._get_smtp_server(account) try: with smtplib.SMTP_SSL(host, port) as server: server.login(account.email, password) return True except Exception: return False FILE:email_bridge/categories.py """Email category definitions for classification.""" from enum import Enum class EmailCategory(str, Enum): """Predefined email categories for classification.""" VERIFICATION = "verification" # Verification codes, account confirmations, activation links SECURITY = "security" # Security alerts, login notifications, password changes TRANSACTIONAL = "transactional" # Receipts, order confirmations, shipping notices PROMOTION = "promotion" # Marketing emails, promotions, rewards (legitimate) SUBSCRIPTION = "subscription" # Newsletters, digests, regular updates SPAM_LIKE = "spam_like" # Likely spam or low-priority promotional NORMAL = "normal" # Regular personal/business email # Keywords for category detection (subject-only matching) # More specific keywords should come first CATEGORY_KEYWORDS = { EmailCategory.VERIFICATION: [ # Chinese - 验证码相关(具体短语优先,避免单字太宽泛) "验证码是", "验证码:", "您的验证码", "验证码为", "校验码是", "校验码:", "您的校验码", "校验码为", "动态码是", "动态码:", "您的动态码", "一次性密码", "一次性验证", "短信验证码", "邮箱验证码", "安全验证码", "登录验证码", "身份验证码", # Chinese - 动作短语 "验证您的", "确认您的邮箱", "激活您的账户", "请验证", "请确认邮箱", "请激活账户", # Chinese - 链接验证 "确认链接", "激活链接", "验证链接", "点击验证", "点击激活", # Chinese - 绑定相关 "绑定邮箱", "绑定手机", "绑定账户", "绑定账号", "邮箱绑定", "手机绑定", "帐号绑定", "账户绑定", # Chinese - 核心关键词(放在最后作为兜底) "验证码", "校验码", "激活码", # English - verification codes(具体短语优先) "verification code", "verification code is", "your verification code", "verify code", "verify your email", "verify your account", "confirmation code", "confirmation code is", "your confirmation code", "confirm code", "confirm your email", "confirm your account", "activation code", "activation code is", "your activation code", "activate code", "activate your email", "activate your account", "security code", "security code is", "your security code", "login code", "auth code", "authentication code", "one-time password", "one-time code", "one time password", "OTP", "your code is", "enter code", "use code", "code is", # English - verification links "click to verify", "click to confirm", "click to activate", "verify your email", "confirm your email", "activate your account", # English - 核心关键词(放在最后作为兜底) "activate", "activation", ], EmailCategory.SECURITY: [ # Chinese "安全提醒", "安全警告", "账户安全", "登录提醒", "登录警告", "新设备登录", "异地登录", "可疑登录", "异常登录", "密码修改", "密码重置", "密码变更", "修改密码", "双重验证", "两步验证", "二次验证", # English "security alert", "security warning", "security notice", "login attempt", "new login", "unusual login", "suspicious login", "new device", "unrecognized device", "password change", "password reset", "change your password", "two-factor", "2FA", "two-step verification", "unusual activity", "account compromise", "account security", "protect your account", "secure your account", ], EmailCategory.TRANSACTIONAL: [ # Chinese "订单确认", "订单详情", "发货通知", "快递通知", "物流信息", "支付成功", "支付通知", "付款成功", "收款通知", "收据", "发票", "账单", "交易通知", # English "order confirmation", "order details", "order #", "order number", "shipping notification", "shipped", "delivery", "tracking number", "payment confirmation", "payment received", "receipt", "invoice", "transaction", "purchase confirmation", "your order", ], EmailCategory.PROMOTION: [ # Chinese - 正规营销推广(区别于 spam) "奖励", "福利", "优惠", "优惠券", "折扣", "促销", "领取", "立即领取", "点击领取", "限时", "抢购", "会员专享", "积分", "返现", "红包", "礼包", "活动", "促销活动", "优惠活动", "新品上市", "USDT", "BTC", "ETH", # 加密货币交易所推广 # English "reward", "bonus", "promo", "promotion", "discount", "sale", "claim your", "get your", "exclusive offer", "limited offer", "member benefit", "points", "cashback", "gift", "special promotion", "new arrival", "flash sale", ], EmailCategory.SUBSCRIPTION: [ # Chinese "订阅", "退订", "取消订阅", "周刊", "日报", "周报", "每日推送", "每周推送", "新闻速递", "最新动态", # English "newsletter", "unsubscribe", "opt-out", "opt out", "weekly digest", "daily digest", "daily update", "weekly update", "subscription", "your subscription", ], EmailCategory.SPAM_LIKE: [ # Chinese "恭喜您", "中奖", "免费领取", "限时优惠", "最后机会", "点击领取", "立即领取", "马上行动", # English "winner", "congratulations", "you've won", "you have won", "click here now", "act now", "limited time", "last chance", "exclusive offer", "special offer", "claim your", ], } def detect_category(subject: str, body: str = "") -> EmailCategory: """Detect email category based on subject line keywords. Args: subject: Email subject line (primary classification source) body: Email body (currently unused, reserved for future ML) Returns: EmailCategory enum value """ # Use subject only for classification text = subject.lower() for category, keywords in CATEGORY_KEYWORDS.items(): for keyword in keywords: if keyword.lower() in text: return category return EmailCategory.NORMAL FILE:email_bridge/cli.py """Command-line interface for email bridge.""" import click from datetime import datetime from typing import Optional from .models import EmailProvider, AccountStatus, EmailCategory from .service import EmailBridgeService from .extraction import extract_from_email from .db import Database def get_service() -> EmailBridgeService: """Get service instance.""" return EmailBridgeService() @click.group() @click.version_option() def main(): """Email Bridge - Minimal personal email middleware.""" pass # Account commands @main.group() def accounts(): """Manage email accounts.""" pass @accounts.command("list") @click.option("--all", "show_all", is_flag=True, help="Include disabled accounts") def accounts_list(show_all: bool): """List all configured accounts.""" service = get_service() accounts = service.list_accounts(include_disabled=show_all) if not accounts: click.echo("No accounts configured.") return for acc in accounts: status = click.style(acc.status.value, fg="green" if acc.status == AccountStatus.ACTIVE else "red") click.echo(f"[{acc.id}] {acc.email} ({acc.provider.value}) - {status}") if acc.display_name: click.echo(f" Name: {acc.display_name}") @accounts.command("add") @click.argument("email") @click.option("--provider", "-p", type=click.Choice(["mock", "gmail", "qq", "netease"]), required=True) @click.option("--name", "-n", help="Display name for the account") @click.option("--config", "-c", "config_json", help="JSON config (e.g. '{\"password\": \"xxx\"}')") def accounts_add(email: str, provider: str, name: Optional[str], config_json: Optional[str]): """Add a new email account.""" import json service = get_service() config = None if config_json: try: config = json.loads(config_json) except json.JSONDecodeError as e: click.echo(f"Invalid JSON config: {e}", err=True) raise click.Abort() try: account = service.add_account( email=email, provider=EmailProvider(provider), display_name=name, config=config ) click.echo(f"Added account: [{account.id}] {account.email}") except Exception as e: click.echo(f"Error: {e}", err=True) raise click.Abort() @accounts.command("update") @click.argument("account_id") @click.option("--name", "-n", help="Update display name") @click.option("--disable", is_flag=True, help="Disable the account") @click.option("--enable", is_flag=True, help="Enable the account") def accounts_update(account_id: str, name: Optional[str], disable: bool, enable: bool): """Update an account.""" service = get_service() status = None if disable: status = AccountStatus.DISABLED elif enable: status = AccountStatus.ACTIVE account = service.update_account(account_id, display_name=name, status=status) if account: click.echo(f"Updated account: [{account.id}] {account.email}") else: click.echo(f"Account not found: {account_id}", err=True) raise click.Abort() @accounts.command("delete") @click.argument("account_id") @click.option("--force", "-f", is_flag=True, help="Skip confirmation") def accounts_delete(account_id: str, force: bool): """Delete an account and all its messages.""" if not force: if not click.confirm(f"Delete account {account_id} and all messages?"): click.echo("Aborted.") return service = get_service() if service.delete_account(account_id): click.echo(f"Deleted account: {account_id}") else: click.echo(f"Account not found: {account_id}", err=True) # Message commands @main.group() def messages(): """View and manage messages.""" pass @messages.command("list") @click.option("--account", "-a", "account_id", help="Filter by account ID") @click.option("--limit", "-n", default=20, help="Number of messages to show") def messages_list(account_id: Optional[str], limit: int): """List recent messages.""" service = get_service() msgs = service.list_recent_messages(account_id=account_id, limit=limit) if not msgs: click.echo("No messages found.") return for msg in msgs: read_marker = " " if msg.is_read else click.style("*", fg="yellow", bold=True) cat_color = { EmailCategory.VERIFICATION: "cyan", EmailCategory.SECURITY: "red", EmailCategory.SUBSCRIPTION: "blue", EmailCategory.SPAM_LIKE: "magenta", EmailCategory.NORMAL: "white", }.get(msg.category, "white") cat = click.style(f"[{msg.category.value:12}]", fg=cat_color) click.echo(f"{read_marker} [{msg.id.split(':')[1][:12]}] {cat} {msg.received_at.strftime('%Y-%m-%d %H:%M')}") click.echo(f" From: {msg.sender_name or msg.sender}") click.echo(f" Subj: {msg.subject[:60]}{'...' if len(msg.subject) > 60 else ''}") @messages.command("unread") @click.option("--account", "-a", "account_id", help="Filter by account ID") @click.option("--limit", "-n", default=20, help="Number of messages to show") def messages_unread(account_id: Optional[str], limit: int): """List unread messages.""" service = get_service() msgs = service.list_unread_messages(account_id=account_id, limit=limit) if not msgs: click.echo("No unread messages.") return click.echo(f"Unread messages ({len(msgs)}):") for msg in msgs: click.echo(f"* [{msg.id.split(':')[1][:12]}] {msg.sender_name or msg.sender}") click.echo(f" {msg.subject[:60]}{'...' if len(msg.subject) > 60 else ''}") @messages.command("search") @click.option("--keyword", "-k", help="Search keyword") @click.option("--account", "-a", "account_id", help="Filter by account ID") @click.option("--from-date", help="Start date (YYYY-MM-DD)") @click.option("--to-date", help="End date (YYYY-MM-DD)") @click.option("--limit", "-n", default=50, help="Max results") def messages_search( keyword: Optional[str], account_id: Optional[str], from_date: Optional[str], to_date: Optional[str], limit: int ): """Search messages by keyword and/or date range.""" service = get_service() start_time = None end_time = None if from_date: start_time = datetime.strptime(from_date, "%Y-%m-%d") if to_date: end_time = datetime.strptime(to_date, "%Y-%m-%d") msgs = service.search_messages( keyword=keyword, start_time=start_time, end_time=end_time, account_id=account_id, limit=limit ) if not msgs: click.echo("No messages found.") return click.echo(f"Found {len(msgs)} message(s):") for msg in msgs: click.echo(f"[{msg.id.split(':')[1][:12]}] {msg.received_at.strftime('%Y-%m-%d')} - {msg.subject[:50]}") @messages.command("get") @click.argument("message_id") def messages_get(message_id: str): """Get full details of a message.""" service = get_service() msg = service.get_message(message_id) if not msg: click.echo(f"Message not found: {message_id}", err=True) raise click.Abort() click.echo(f"Subject: {msg.subject}") click.echo(f"From: {msg.sender_name} <{msg.sender}>" if msg.sender_name else f"From: {msg.sender}") click.echo(f"To: {', '.join(msg.recipients)}") click.echo(f"Date: {msg.received_at.strftime('%Y-%m-%d %H:%M')}") click.echo(f"Category: {msg.category.value}") click.echo(f"Read: {'Yes' if msg.is_read else 'No'}") click.echo() if msg.body_text: click.echo("--- Body ---") click.echo(msg.body_text) elif msg.preview: click.echo("--- Preview ---") click.echo(msg.preview) @messages.command("read") @click.argument("message_id") @click.option("--unread", "-u", is_flag=True, help="Mark as unread instead") def messages_read(message_id: str, unread: bool): """Mark a message as read (or unread with --unread).""" service = get_service() if service.mark_read(message_id, is_read=not unread): status = "unread" if unread else "read" click.echo(f"Marked message as {status}: {message_id}") else: click.echo(f"Message not found: {message_id}", err=True) # Sync command @main.command("sync") @click.option("--account", "-a", "account_id", help="Sync specific account only") @click.option("--days", "-d", default=None, type=int, help="Sync messages from last N days") def sync(account_id: Optional[str], days: Optional[int]): """Sync messages from providers. This fetches messages from configured email providers into local storage. Gmail accounts require credentials setup (see README.md). Examples: email-bridge sync # Sync all accounts email-bridge sync -a abc123 # Sync specific account email-bridge sync --days 3 # Sync last 3 days only """ service = get_service() since = None if days: from datetime import timedelta since = datetime.utcnow() - timedelta(days=days) if account_id: try: count = service.sync_account(account_id, since=since) click.echo(f"Synced {count} message(s) from account {account_id}") except ValueError as e: click.echo(f"Sync failed: {e}", err=True) except Exception as e: # Check for Gmail-specific errors error_msg = str(e) if "credentials not found" in error_msg.lower(): click.echo(f"Gmail credentials not found. See README.md for setup instructions.", err=True) elif "authentication" in error_msg.lower() or "auth" in error_msg.lower(): click.echo(f"Authentication failed: {e}", err=True) else: click.echo(f"Sync failed: {e}", err=True) else: results = service.sync_all_accounts(since=since) for acc_id, result in results.items(): if isinstance(result, int): click.echo(f"Account {acc_id}: {result} message(s)") else: click.echo(f"Account {acc_id}: {result}") # Stats command @main.command("stats") def stats(): """Show statistics.""" service = get_service() s = service.get_stats() click.echo(f"Accounts: {s['active_accounts']} active / {s['total_accounts']} total") click.echo(f"Unread: {s['unread_messages']}") # Extraction commands @main.command("extract") @click.argument("message_id") @click.option("--codes", "-c", is_flag=True, help="Show verification codes") @click.option("--links", "-l", is_flag=True, help="Show action links") @click.option("--all", "show_all", is_flag=True, help="Show all extractions") def extract(message_id: str, codes: bool, links: bool, show_all: bool): """Extract verification codes and action links from a message. Examples: email-bridge extract mock-001 --codes email-bridge extract mock-001 --links email-bridge extract mock-001 --all """ service = get_service() msg = service.get_message(message_id) if not msg: click.echo(f"Message not found: {message_id}", err=True) raise click.Abort() # Default to showing all if no specific flag if not codes and not links: show_all = True # Extract from message extracted = extract_from_email( subject=msg.subject, body_text=msg.body_text, body_html=msg.body_html ) if show_all or codes: if extracted['codes']: click.echo(click.style("Verification Codes:", fg="cyan", bold=True)) for code in extracted['codes']: ctx = f" ({code.context})" if code.context else "" click.echo(f" {click.style(code.code, fg='yellow')}{ctx}") else: click.echo("No verification codes found.") if show_all or links: if extracted['links']: click.echo(click.style("\nAction Links:", fg="cyan", bold=True)) for link in extracted['links']: link_type = click.style(f"[{link.link_type}]", fg="green") domain = f" ({link.domain})" if link.domain else "" click.echo(f" {link_type} {link.url}{domain}") if link.text: click.echo(f" Context: {link.text[:60]}...") else: click.echo("No action links found.") @main.command("codes") @click.option("--account", "-a", "account_id", help="Filter by account ID") @click.option("--limit", "-n", default=20, help="Number of messages to check") def codes(account_id: Optional[str], limit: int): """Find verification codes in recent messages. Scans recent messages and extracts any verification codes found. """ service = get_service() msgs = service.list_recent_messages(account_id=account_id, limit=limit) found_any = False for msg in msgs: extracted = extract_from_email( subject=msg.subject, body_text=msg.body_text, body_html=msg.body_html ) if extracted['codes']: found_any = True click.echo(f"\n[{msg.id.split(':')[1][:12]}] {msg.subject[:50]}") click.echo(f" From: {msg.sender_name or msg.sender}") for code in extracted['codes']: ctx = f" ({code.context})" if code.context else "" click.echo(f" Code: {click.style(code.code, fg='yellow')}{ctx}") if not found_any: click.echo("No verification codes found in recent messages.") # Send command @main.command("send") @click.option("--account", "-a", "account_id", required=True, help="Account ID to send from") @click.option("--to", "-t", "recipients", multiple=True, required=True, help="Recipient email address") @click.option("--cc", "-c", "cc_list", multiple=True, help="CC email address") @click.option("--subject", "-s", required=True, help="Email subject") @click.option("--body", "-b", "body_text", help="Email body (plain text)") @click.option("--html", "-h", "body_html", help="Email body (HTML)") def send_email( account_id: str, recipients: tuple, cc_list: tuple, subject: str, body_text: Optional[str], body_html: Optional[str] ): """Send an email. Examples: email-bridge send -a abc123 -t [email protected] -s "Hello" -b "Hi there!" email-bridge send -a abc123 -t [email protected] -t [email protected] -s "Hello" -b "Hi!" """ service = get_service() # Convert tuples to lists to_list = list(recipients) cc = list(cc_list) if cc_list else None # If no body provided, prompt for it if not body_text and not body_html: click.echo("No body provided. Use --body or --html option.") raise click.Abort() try: result = service.send_email( account_id=account_id, to=to_list, subject=subject, body_text=body_text, body_html=body_html, cc=cc, ) if result: click.echo(click.style("✓ Email sent successfully!", fg="green")) except ValueError as e: click.echo(f"Error: {e}", err=True) raise click.Abort() except Exception as e: click.echo(f"Failed to send email: {e}", err=True) raise click.Abort() # Daemon commands @main.group() def daemon(): """Manage the background daemon for real-time email monitoring.""" pass def _load_daemon_config() -> dict: """Load daemon configuration from config file. Returns: Dict with daemon settings or empty dict if no config file """ import json from pathlib import Path config_path = Path.home() / ".email-bridge" / "config.json" if not config_path.exists(): return {} try: with open(config_path) as f: config = json.load(f) return config.get("daemon", {}) except Exception: return {} @daemon.command("start") @click.option("-d", "--detach", is_flag=True, help="Run in background (daemon mode)") @click.option("-i", "--interval", default=300, type=int, help="Polling interval in seconds (default: 300)") @click.option("--no-notify", is_flag=True, help="Disable OpenClaw notifications") def daemon_start(detach: bool, interval: int, no_notify: bool): """Start the email monitoring daemon. The daemon monitors your email accounts in real-time using IMAP IDLE (for QQ/NetEase) or polling (for Gmail). When new emails arrive, the daemon will: - Sync them to local database - Send a notification to OpenClaw main session (unless --no-notify) Configuration is read from ~/.email-bridge/config.json: { "daemon": { "poll_interval": 300, "notify_openclaw": true, "notification": { "include_body": false, "body_max_length": 500, "include_verification_codes": true, "include_links": false } } } Examples: email-bridge daemon start # Run in foreground email-bridge daemon start -d # Run in background email-bridge daemon start -i 60 # Poll every 60 seconds email-bridge daemon start -d --no-notify # No OpenClaw notifications """ from .daemon import EmailDaemon # Load config from file config = _load_daemon_config() # CLI options override config file poll_interval = interval notify = not no_notify notification_config = config.get("notification", {}) # Use config file values if CLI options are defaults if config.get("poll_interval") and interval == 300: # Default value poll_interval = config["poll_interval"] if config.get("notify_openclaw") is not None and not no_notify: notify = config["notify_openclaw"] d = EmailDaemon( poll_interval=poll_interval, notify_openclaw=notify, notification_config=notification_config ) d.start(background=detach) @daemon.command("stop") def daemon_stop(): """Stop the email monitoring daemon.""" from .daemon import EmailDaemon d = EmailDaemon() d.stop() @daemon.command("status") def daemon_status(): """Show daemon status.""" from .daemon import EmailDaemon import json d = EmailDaemon() status = d.status() if status["running"]: click.echo(click.style("✓ Daemon is running", fg="green")) click.echo(f" PID: {status['pid']}") if status["accounts"]: click.echo("\n Monitored accounts:") for acc in status["accounts"]: click.echo(f" - {acc['email']} ({acc['provider']})") else: click.echo(click.style("✗ Daemon is not running", fg="red")) if __name__ == "__main__": main() FILE:email_bridge/daemon.py #!/usr/bin/env python3 """Email Bridge Daemon - Background service for real-time email monitoring. This daemon runs in the background and monitors email accounts using IMAP IDLE for real-time new message notifications. Usage: email-bridge daemon start email-bridge daemon stop email-bridge daemon status """ import json import os import signal import sys import threading import time from datetime import datetime from pathlib import Path from typing import Optional, Callable, Dict, Any # Add parent to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from email_bridge.models import EmailProvider from email_bridge.service import EmailBridgeService from email_bridge.adapters.imap import IMAPAdapter from email_bridge.sanitize import sanitize_for_notification, sanitize_sender, sanitize_subject from email_bridge.extraction import extract_verification_codes class EmailDaemon: """Background daemon for monitoring emails.""" def __init__( self, poll_interval: int = 300, # Fallback polling interval (seconds) on_new_email: Optional[Callable[[Dict[str, Any]], None]] = None, notify_openclaw: bool = True, # Send notifications to OpenClaw notification_time_limit: int = 3600, # Only notify for emails received within this many seconds notification_config: Optional[Dict[str, Any]] = None, # Notification settings idle_timeout: int = 60, # IDLE timeout in seconds (keepalive interval) sync_max_retries: int = 3, # Max retries when sync fails sync_retry_delay: float = 2.0, # Delay between sync retries (seconds) ): """Initialize the daemon. Args: poll_interval: Fallback polling interval in seconds (default 5 min) on_new_email: Callback function when new email arrives notify_openclaw: Send new email notifications to OpenClaw main session notification_time_limit: Only notify for emails received within this time (seconds) notification_config: Dict with notification settings: - include_body: bool (default False) - body_max_length: int (default 500) - include_verification_codes: bool (default True) - include_links: bool (default False) idle_timeout: IDLE timeout in seconds, acts as keepalive interval (default 60) sync_max_retries: Max retries when sync fails after detecting new mail (default 3) sync_retry_delay: Delay between sync retries in seconds (default 2.0) """ self.poll_interval = poll_interval self.on_new_email = on_new_email self.notify_openclaw = notify_openclaw self.notification_time_limit = notification_time_limit self.idle_timeout = idle_timeout self.sync_max_retries = sync_max_retries self.sync_retry_delay = sync_retry_delay # Default notification config self.notification_config = notification_config or {} self.notification_config.setdefault('include_body', False) self.notification_config.setdefault('body_max_length', 500) self.notification_config.setdefault('include_verification_codes', True) self.notification_config.setdefault('include_links', False) self.service = EmailBridgeService() self._running = False self._threads: Dict[str, threading.Thread] = {} self._stop_events: Dict[str, threading.Event] = {} self._last_seen: Dict[str, set] = {} # account_id -> set of message_ids self._notified_messages: Dict[str, set] = {} # account_id -> set of notified message_ids # PID file for daemon management self.pid_file = Path.home() / ".email-bridge" / "daemon.pid" self.log_file = Path.home() / ".email-bridge" / "daemon.log" def start(self, background: bool = False): """Start the daemon. Args: background: If True, fork to background (daemon mode) """ if self.is_running(): print("Daemon is already running") return if background: self._fork_to_background() else: self._run_foreground() def stop(self): """Stop the daemon.""" if not self.is_running(): print("Daemon is not running") return pid = self._read_pid() if pid: try: os.kill(pid, signal.SIGTERM) print(f"Sent SIGTERM to daemon (PID {pid})") # Wait for daemon to stop for _ in range(10): time.sleep(0.5) if not self.is_running(): break if self.is_running(): os.kill(pid, signal.SIGKILL) print(f"Sent SIGKILL to daemon (PID {pid})") except ProcessLookupError: pass # Clean up PID file if self.pid_file.exists(): self.pid_file.unlink() def status(self) -> dict: """Get daemon status.""" result = { "running": self.is_running(), "pid": self._read_pid(), "accounts": [], } if self.is_running(): accounts = self.service.list_accounts() for acc in accounts: if acc.status.value == "active": result["accounts"].append({ "id": acc.id, "email": acc.email, "provider": acc.provider.value, }) return result def is_running(self) -> bool: """Check if daemon is running.""" pid = self._read_pid() if not pid: return False try: os.kill(pid, 0) # Check if process exists return True except ProcessLookupError: # PID file exists but process is dead if self.pid_file.exists(): self.pid_file.unlink() return False def _read_pid(self) -> Optional[int]: """Read PID from file.""" if not self.pid_file.exists(): return None try: return int(self.pid_file.read_text().strip()) except (ValueError, IOError): return None def _write_pid(self): """Write current PID to file.""" self.pid_file.parent.mkdir(parents=True, exist_ok=True) self.pid_file.write_text(str(os.getpid())) def _fork_to_background(self): """Fork to background (Unix daemon).""" # First fork pid = os.fork() if pid > 0: # Parent exits print(f"Daemon started in background (PID {pid})") sys.exit(0) # Decouple from parent environment os.chdir("/") os.setsid() os.umask(0) # Second fork pid = os.fork() if pid > 0: sys.exit(0) # Redirect standard file descriptors sys.stdout.flush() sys.stderr.flush() # Redirect to log file self.log_file.parent.mkdir(parents=True, exist_ok=True) with open(self.log_file, "a") as log: os.dup2(log.fileno(), sys.stdout.fileno()) os.dup2(log.fileno(), sys.stderr.fileno()) # Write PID self._write_pid() # Run daemon self._run() def _run_foreground(self): """Run in foreground (for debugging).""" self._write_pid() print(f"Daemon started (PID {os.getpid()})") try: self._run() except KeyboardInterrupt: print("\nShutting down...") finally: if self.pid_file.exists(): self.pid_file.unlink() def _run(self): """Main daemon loop.""" self._running = True # Set up signal handlers signal.signal(signal.SIGTERM, self._handle_signal) signal.signal(signal.SIGINT, self._handle_signal) self._log("Daemon started") # Get active accounts accounts = self.service.list_accounts() active_accounts = [a for a in accounts if a.status.value == "active"] if not active_accounts: self._log("No active accounts to monitor") return # Start a monitor thread for each account for account in active_accounts: if account.provider in [EmailProvider.QQ, EmailProvider.NETEASE]: self._start_account_monitor(account) elif account.provider == EmailProvider.GMAIL: # Gmail doesn't support IMAP IDLE, use polling self._start_account_poller(account) # Wait for stop signal while self._running: time.sleep(1) # Stop all threads for account_id in list(self._stop_events.keys()): self._stop_events[account_id].set() # Wait for threads to finish for thread in self._threads.values(): thread.join(timeout=5) self._log("Daemon stopped") def _handle_signal(self, signum, frame): """Handle shutdown signals.""" self._log(f"Received signal {signum}") self._running = False def _log(self, message: str): """Log a message.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_line = f"[{timestamp}] {message}" print(log_line, flush=True) def _start_account_monitor(self, account): """Start IMAP IDLE monitor for an account.""" stop_event = threading.Event() self._stop_events[account.id] = stop_event thread = threading.Thread( target=self._monitor_account, args=(account, stop_event), daemon=True, ) thread.start() self._threads[account.id] = thread self._log(f"Started monitor for {account.email}") def _start_account_poller(self, account): """Start polling for Gmail (no IDLE support).""" stop_event = threading.Event() self._stop_events[account.id] = stop_event thread = threading.Thread( target=self._poll_account, args=(account, stop_event), daemon=True, ) thread.start() self._threads[account.id] = thread self._log(f"Started poller for {account.email} (interval: {self.poll_interval}s)") def _monitor_account(self, account, stop_event: threading.Event): """Monitor an account using IMAP IDLE with keepalive and sync retry. Features: - IDLE with configurable timeout (acts as keepalive) - Sync retry on failure (up to sync_max_retries times) - Automatic reconnection on connection errors """ import imaplib2 # Get IMAP server config host, port = self._get_imap_server(account) password = account.config.get("password") if not password: self._log(f"{account.email}: No password configured") return while not stop_event.is_set(): try: # Connect conn = imaplib2.IMAP4_SSL(host, port) conn.login(account.email, password) conn.select("INBOX") # Get initial message count status, data = conn.search(None, "ALL") last_count = len(data[0].split()) if data[0] else 0 self._log(f"{account.email}: Connected, {last_count} messages in inbox") # Clear any pending responses list(conn.pop_untagged_responses()) # IDLE loop with keepalive last_keepalive = time.time() while not stop_event.is_set(): try: # Use configured idle_timeout for keepalive self._log(f"{account.email}: Entering IDLE mode (timeout={self.idle_timeout}s)...") # Start IDLE - timeout acts as keepalive result = conn.idle(timeout=self.idle_timeout) last_keepalive = time.time() # Check untagged responses for new mail notifications untagged = list(conn.pop_untagged_responses()) # Check for EXISTS response (new message) for resp in untagged: if len(resp) >= 2 and resp[0] == 'EXISTS': new_count = int(resp[1][0]) if resp[1] else 0 self._log(f"{account.email}: EXISTS response, count={new_count}") break # Also check if message count changed status, data = conn.search(None, "ALL") current_count = len(data[0].split()) if data[0] else 0 if current_count > last_count: new_msg_count = current_count - last_count self._log(f"{account.email}: {new_msg_count} new message(s) detected!") # Sync with retry logic sync_success = self._sync_with_retry( account, new_msg_count, conn, stop_event ) if sync_success: last_count = current_count # Notify OpenClaw self._notify_openclaw(account.email, new_msg_count, account.id) # Trigger callback if self.on_new_email: self.on_new_email({ "account_id": account.id, "email": account.email, "new_count": new_msg_count, }) else: # Sync failed after all retries, but still update count # to avoid re-detecting same messages last_count = current_count self._log(f"{account.email}: Sync failed after {self.sync_max_retries} retries, will retry notification on next sync") else: self._log(f"{account.email}: No new messages (count: {current_count})") except Exception as e: self._log(f"{account.email}: IDLE error: {e}") break try: conn.logout() except: pass self._log(f"{account.email}: Disconnected") except Exception as e: self._log(f"{account.email}: Connection error: {e}") # Wait before reconnecting if not stop_event.is_set(): stop_event.wait(timeout=60) def _sync_with_retry(self, account, new_msg_count: int, conn, stop_event: threading.Event) -> bool: """Sync new messages with retry logic. Args: account: Email account to sync new_msg_count: Number of new messages detected conn: Current IMAP connection (may be used for direct fetch) stop_event: Stop event to check for shutdown Returns: True if sync succeeded, False otherwise """ for attempt in range(1, self.sync_max_retries + 1): if stop_event.is_set(): return False try: self._log(f"{account.email}: Syncing {new_msg_count} message(s) (attempt {attempt}/{self.sync_max_retries})") sync_count = self.service.sync_account(account.id, limit=new_msg_count) self._log(f"{account.email}: Synced {sync_count} message(s)") return True except Exception as e: self._log(f"{account.email}: Sync error (attempt {attempt}): {e}") if attempt < self.sync_max_retries: # Wait before retry if stop_event.wait(timeout=self.sync_retry_delay): return False # Stop requested # Try to reconnect IMAP if connection is broken try: # Test if connection is still alive conn.noop() except: self._log(f"{account.email}: Connection lost, will reconnect") return False # Let outer loop handle reconnection return False # All retries failed def _get_imap_server(self, account) -> tuple: """Get IMAP server host and port for an account.""" from .adapters.imap import IMAP_SERVERS email_domain = account.email.split("@")[1].lower() if account.provider == EmailProvider.QQ: config = IMAP_SERVERS[EmailProvider.QQ] return config["host"], config["port"] elif account.provider == EmailProvider.NETEASE: netease_config = IMAP_SERVERS[EmailProvider.NETEASE] if email_domain in netease_config: return netease_config[email_domain]["host"], netease_config[email_domain]["port"] return netease_config["163.com"]["host"], netease_config["163.com"]["port"] raise ValueError(f"Unknown provider: {account.provider}") def _get_latest_message_info(self, account_id: str, count: int, time_limit: int = None) -> list: """Get details of the latest messages from an account. Args: account_id: Account ID count: Max number of messages to return time_limit: Only include messages received within this many seconds (default: self.notification_time_limit) Returns: List of message info dicts with sender, subject, message_id, received_at, and optionally body_preview, codes, links based on notification_config """ from datetime import datetime, timedelta, timezone if time_limit is None: time_limit = self.notification_time_limit try: messages = self.service.list_recent_messages(account_id=account_id, limit=count * 2) # Get already notified messages notified = self._notified_messages.get(account_id, set()) # Use timezone-aware now for comparison now = datetime.now(timezone.utc) time_cutoff = now - timedelta(seconds=time_limit) result = [] for msg in messages: # Skip already notified messages if msg.id in notified: continue # Check time limit - handle both naive and aware datetimes if hasattr(msg, 'received_at') and msg.received_at: msg_time = msg.received_at # If message time is naive, assume UTC if msg_time.tzinfo is None: msg_time = msg_time.replace(tzinfo=timezone.utc) if msg_time < time_cutoff: continue # Build message info info = { "message_id": msg.id, "sender": sanitize_sender(msg.sender_name or msg.sender), "subject": sanitize_subject(msg.subject), "category": msg.category.value if hasattr(msg, 'category') and msg.category else "normal", } # Get body text (fallback to HTML if no plain text) import re body_text = msg.body_text if not body_text and msg.body_html: # Strip HTML tags and CSS for basic text extraction html = msg.body_html # Remove style blocks html = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL | re.IGNORECASE) # Remove script blocks html = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL | re.IGNORECASE) # Remove HTML tags html = re.sub(r'<[^>]+>', ' ', html) # Decode HTML entities import html as html_module html = html_module.unescape(html) # Normalize whitespace body_text = re.sub(r'\s+', ' ', html).strip() # Optionally include body preview if self.notification_config.get('include_body') and body_text: max_len = self.notification_config.get('body_max_length', 500) info['body_preview'] = sanitize_for_notification(body_text, max_length=max_len) # Extract verification codes for all emails (not just verification category) if self.notification_config.get('include_verification_codes'): codes = extract_verification_codes(f"{msg.subject}\n\n{body_text or ''}") if codes: info['codes'] = [c.code for c in codes[:3]] # Max 3 codes # Optionally extract links (disabled by default for security) if self.notification_config.get('include_links') and body_text: from email_bridge.extraction import extract_action_links links = extract_action_links(body_text, msg.body_html) # Only include verify/reset links, not unsubscribe filtered_links = [l for l in links if l.link_type in ('verify', 'reset')][:2] if filtered_links: info['links'] = [{'type': l.link_type, 'domain': l.domain} for l in filtered_links] result.append(info) if len(result) >= count: break return result except Exception as e: self._log(f"Error getting message info: {e}") return [] def _notify_openclaw(self, account_email: str, new_count: int, account_id: str = None): """Send notification to OpenClaw main session via system event. Uses structured JSON format internally for safety, then formats as human-readable text for display. """ if not self.notify_openclaw: return try: import subprocess # Get message details if available details = [] if account_id: details = self._get_latest_message_info(account_id, min(new_count, 3)) # Skip notification if no new messages to report if not details: self._log(f"{account_email}: No new messages to notify (already notified or outside time limit)") return # Mark messages as notified if account_id not in self._notified_messages: self._notified_messages[account_id] = set() for info in details: self._notified_messages[account_id].add(info["message_id"]) # Build structured data (for internal processing) notification = { "type": "email_notification", "account": account_email, "count": new_count, "messages": details } # Format as human-readable text message = self._format_notification(notification) self._log(f"Sending notification to OpenClaw: {len(details)} messages") result = subprocess.run( ["openclaw", "system", "event", "--text", message, "--mode", "now"], capture_output=True, text=True, timeout=60 ) if result.returncode == 0: self._log(f"OpenClaw notification sent successfully") else: self._log(f"OpenClaw notification failed (code {result.returncode}): {result.stderr.strip() or 'No error'}") except subprocess.TimeoutExpired: self._log("OpenClaw notification timed out") except FileNotFoundError: self._log("OpenClaw CLI not found at PATH") except Exception as e: self._log(f"Error sending notification: {e}") def _format_notification(self, notification: dict) -> str: """Format notification data as human-readable text. Adapts format based on email category for better UX. Args: notification: Structured notification dict Returns: Human-readable text message """ lines = [f"📧 新邮件: {notification['account']}"] for i, msg in enumerate(notification['messages'], 1): category = msg.get('category', 'normal') sender = msg['sender'] subject = msg['subject'] # Category emoji and format if category == 'verification': # 验证码邮件:重点显示验证码 lines.append(f"\n{i}. 🔐 {sender}") lines.append(f" {subject}") if msg.get('codes'): codes_str = ', '.join(msg['codes']) lines.append(f" ✨ 验证码: {codes_str}") elif msg.get('body_preview'): # 没提取到验证码但显示正文 lines.append(f" 📝 {msg['body_preview'][:80]}...") elif category == 'security': # 安全邮件:突出警示 lines.append(f"\n{i}. ⚠️ {sender}") lines.append(f" {subject}") if msg.get('body_preview'): lines.append(f" 📝 {msg['body_preview'][:100]}...") elif category == 'transactional': # 交易邮件:订单/支付/发货 lines.append(f"\n{i}. 📦 {sender}") lines.append(f" {subject}") if msg.get('body_preview'): lines.append(f" 📝 {msg['body_preview'][:100]}...") elif category == 'promotion': # 推广邮件:营销/优惠/奖励 lines.append(f"\n{i}. 🎁 {sender}") lines.append(f" {subject}") if msg.get('body_preview'): lines.append(f" 📝 {msg['body_preview'][:80]}...") elif category == 'subscription': # 订阅邮件:简化显示 lines.append(f"\n{i}. 📰 {sender}") lines.append(f" {subject}") # 订阅邮件不显示正文,避免噪音 elif category == 'spam_like': # 垃圾邮件:标记但不展开 lines.append(f"\n{i}. 🚫 {sender}") lines.append(f" {subject} (疑似垃圾)") else: # 普通邮件:标准格式 lines.append(f"\n{i}. {sender}") lines.append(f" {subject}") if msg.get('body_preview'): lines.append(f" 📝 {msg['body_preview'][:100]}...") if msg.get('codes'): codes_str = ', '.join(msg['codes']) lines.append(f" 🔐 验证码: {codes_str}") # Links for any category if msg.get('links'): for link in msg['links'][:2]: lines.append(f" 🔗 {link['type']}: {link['domain']}") if notification['count'] > len(notification['messages']): lines.append(f"\n... 还有 {notification['count'] - len(notification['messages'])} 封") return '\n'.join(lines) def _poll_account(self, account, stop_event: threading.Event): """Poll an account for new messages (for Gmail).""" # Track message count to detect new messages last_count = None while not stop_event.is_set(): try: # Get current message count from database messages = self.service.list_recent_messages(account_id=account.id, limit=1000) current_count = len(messages) # First poll: just record the count, don't notify if last_count is None: last_count = current_count self._log(f"{account.email}: Initial count = {current_count}") elif current_count > last_count: # New messages detected new_count = current_count - last_count self._log(f"{account.email}: {new_count} new message(s) detected (was {last_count}, now {current_count})") last_count = current_count # Sync to get message details self.service.sync_account(account.id, limit=new_count) # Notify OpenClaw self._notify_openclaw(account.email, new_count, account.id) # Trigger callback if self.on_new_email: self.on_new_email({ "account_id": account.id, "email": account.email, "new_count": new_count, }) else: # No new messages, just sync to keep database updated self._log(f"{account.email}: No new messages (count: {current_count})") self.service.sync_account(account.id, limit=10) last_count = current_count except Exception as e: self._log(f"{account.email}: Poll error: {e}") # Wait for next poll stop_event.wait(timeout=self.poll_interval) def main(): """CLI entry point for daemon management.""" import argparse parser = argparse.ArgumentParser(description="Email Bridge Daemon") parser.add_argument( "action", choices=["start", "stop", "status"], help="Action to perform", ) parser.add_argument( "-d", "--daemon", action="store_true", help="Run in background (daemon mode)", ) parser.add_argument( "-i", "--interval", type=int, default=300, help="Polling interval in seconds (default: 300)", ) args = parser.parse_args() daemon = EmailDaemon(poll_interval=args.interval) if args.action == "start": daemon.start(background=args.daemon) elif args.action == "stop": daemon.stop() elif args.action == "status": status = daemon.status() print(json.dumps(status, indent=2)) if __name__ == "__main__": main() FILE:email_bridge/db.py """SQLite database layer for email bridge.""" import json import sqlite3 from datetime import datetime from pathlib import Path from typing import Optional from .models import Account, AccountStatus, EmailProvider, Message, EmailCategory DEFAULT_DB_PATH = Path.home() / ".email-bridge" / "email_bridge.db" class Database: """SQLite-backed storage for accounts and messages.""" def __init__(self, db_path: Optional[Path] = None): self.db_path = db_path or DEFAULT_DB_PATH self.db_path.parent.mkdir(parents=True, exist_ok=True) self._conn: Optional[sqlite3.Connection] = None @property def conn(self) -> sqlite3.Connection: if self._conn is None: self._conn = sqlite3.connect(str(self.db_path), check_same_thread=False) self._conn.row_factory = sqlite3.Row self._init_schema() return self._conn def _init_schema(self): """Initialize database schema.""" self.conn.executescript(""" CREATE TABLE IF NOT EXISTS accounts ( id TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, provider TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'active', display_name TEXT, config TEXT DEFAULT '{}', created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, account_id TEXT NOT NULL, message_id TEXT NOT NULL, subject TEXT NOT NULL, sender TEXT NOT NULL, sender_name TEXT, recipients TEXT DEFAULT '[]', received_at TEXT NOT NULL, is_read INTEGER DEFAULT 0, is_starred INTEGER DEFAULT 0, category TEXT DEFAULT 'normal', preview TEXT, body_text TEXT, body_html TEXT, synced_at TEXT NOT NULL, provider_data TEXT DEFAULT '{}', UNIQUE(account_id, message_id) ); CREATE INDEX IF NOT EXISTS idx_messages_account ON messages(account_id); CREATE INDEX IF NOT EXISTS idx_messages_received ON messages(received_at DESC); CREATE INDEX IF NOT EXISTS idx_messages_read ON messages(is_read); CREATE INDEX IF NOT EXISTS idx_messages_category ON messages(category); """) # Account operations def list_accounts(self, include_disabled: bool = False) -> list[Account]: """List all accounts.""" query = "SELECT * FROM accounts" if not include_disabled: query += " WHERE status = ?" rows = self.conn.execute(query, (AccountStatus.ACTIVE.value,)).fetchall() else: rows = self.conn.execute(query).fetchall() return [self._row_to_account(row) for row in rows] def get_account(self, account_id: str) -> Optional[Account]: """Get account by ID.""" row = self.conn.execute( "SELECT * FROM accounts WHERE id = ?", (account_id,) ).fetchone() return self._row_to_account(row) if row else None def add_account(self, account: Account) -> Account: """Add a new account.""" self.conn.execute( """INSERT INTO accounts (id, email, provider, status, display_name, config, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( account.id, account.email, account.provider.value, account.status.value, account.display_name, json.dumps(account.config), account.created_at.isoformat(), account.updated_at.isoformat() ) ) self.conn.commit() return account def update_account(self, account: Account) -> Account: """Update an existing account.""" account.updated_at = datetime.utcnow() self.conn.execute( """UPDATE accounts SET email = ?, provider = ?, status = ?, display_name = ?, config = ?, updated_at = ? WHERE id = ?""", ( account.email, account.provider.value, account.status.value, account.display_name, json.dumps(account.config), account.updated_at.isoformat(), account.id ) ) self.conn.commit() return account def delete_account(self, account_id: str) -> bool: """Delete an account and its messages.""" self.conn.execute("DELETE FROM messages WHERE account_id = ?", (account_id,)) self.conn.execute("DELETE FROM accounts WHERE id = ?", (account_id,)) self.conn.commit() return True # Message operations def list_messages( self, account_id: Optional[str] = None, unread_only: bool = False, category: Optional[EmailCategory] = None, limit: int = 50, offset: int = 0 ) -> list[Message]: """List messages with optional filters.""" query = "SELECT * FROM messages WHERE 1=1" params = [] if account_id: query += " AND account_id = ?" params.append(account_id) if unread_only: query += " AND is_read = 0" if category: query += " AND category = ?" params.append(category.value) query += " ORDER BY received_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = self.conn.execute(query, params).fetchall() return [self._row_to_message(row) for row in rows] def search_messages( self, keyword: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, account_id: Optional[str] = None, limit: int = 50 ) -> list[Message]: """Search messages by keyword and/or time range.""" query = "SELECT * FROM messages WHERE 1=1" params = [] if keyword: query += " AND (subject LIKE ? OR sender LIKE ? OR preview LIKE ?)" kw = f"%{keyword}%" params.extend([kw, kw, kw]) if start_time: query += " AND received_at >= ?" params.append(start_time.isoformat()) if end_time: query += " AND received_at <= ?" params.append(end_time.isoformat()) if account_id: query += " AND account_id = ?" params.append(account_id) query += " ORDER BY received_at DESC LIMIT ?" params.append(limit) rows = self.conn.execute(query, params).fetchall() return [self._row_to_message(row) for row in rows] def get_message(self, message_id: str) -> Optional[Message]: """Get a single message by ID.""" row = self.conn.execute( "SELECT * FROM messages WHERE id = ?", (message_id,) ).fetchone() return self._row_to_message(row) if row else None def find_message(self, message_id: str) -> Optional[Message]: """Find a message by flexible ID lookup. Tries exact match first, then message_id suffix, then partial match. """ # Try exact match first msg = self.get_message(message_id) if msg: return msg # Try matching by message_id suffix (e.g., "mock-001" matches "abc:mock-001") row = self.conn.execute( "SELECT * FROM messages WHERE message_id = ?", (message_id,) ).fetchone() if row: return self._row_to_message(row) # Try partial match on id (ends with) row = self.conn.execute( "SELECT * FROM messages WHERE id LIKE ?", (f"%:{message_id}%",) ).fetchone() if row: return self._row_to_message(row) return None def add_message(self, message: Message) -> Message: """Add or update a message.""" self.conn.execute( """INSERT OR REPLACE INTO messages (id, account_id, message_id, subject, sender, sender_name, recipients, received_at, is_read, is_starred, category, preview, body_text, body_html, synced_at, provider_data) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( message.id, message.account_id, message.message_id, message.subject, message.sender, message.sender_name, json.dumps(message.recipients), message.received_at.isoformat(), int(message.is_read), int(message.is_starred), message.category.value, message.preview, message.body_text, message.body_html, message.synced_at.isoformat(), json.dumps(message.provider_data) ) ) self.conn.commit() return message def mark_read(self, message_id: str, is_read: bool = True) -> bool: """Mark a message as read/unread.""" self.conn.execute( "UPDATE messages SET is_read = ? WHERE id = ?", (int(is_read), message_id) ) self.conn.commit() return True def count_unread(self, account_id: Optional[str] = None) -> int: """Count unread messages.""" if account_id: row = self.conn.execute( "SELECT COUNT(*) FROM messages WHERE account_id = ? AND is_read = 0", (account_id,) ).fetchone() else: row = self.conn.execute( "SELECT COUNT(*) FROM messages WHERE is_read = 0" ).fetchone() return row[0] if row else 0 def _row_to_account(self, row: sqlite3.Row) -> Account: return Account( id=row["id"], email=row["email"], provider=EmailProvider(row["provider"]), status=AccountStatus(row["status"]), display_name=row["display_name"], config=json.loads(row["config"]), created_at=datetime.fromisoformat(row["created_at"]), updated_at=datetime.fromisoformat(row["updated_at"]), ) def _row_to_message(self, row: sqlite3.Row) -> Message: return Message( id=row["id"], account_id=row["account_id"], message_id=row["message_id"], subject=row["subject"], sender=row["sender"], sender_name=row["sender_name"], recipients=json.loads(row["recipients"]), received_at=datetime.fromisoformat(row["received_at"]), is_read=bool(row["is_read"]), is_starred=bool(row["is_starred"]), category=EmailCategory(row["category"]), preview=row["preview"], body_text=row["body_text"], body_html=row["body_html"], synced_at=datetime.fromisoformat(row["synced_at"]), provider_data=json.loads(row["provider_data"]), ) def close(self): if self._conn: self._conn.close() self._conn = None FILE:email_bridge/extraction.py """Extraction helpers for email content. Provides utilities to extract useful information from emails: - Verification codes (OTP, confirmation codes) - Action links (verify, confirm, reset password, unsubscribe) """ import re from dataclasses import dataclass from typing import Optional from urllib.parse import urlparse, unquote @dataclass class VerificationCode: """Extracted verification code.""" code: str context: Optional[str] = None # e.g., "Google", "GitHub", "Your code" @dataclass class ActionLink: """Extracted action link.""" url: str link_type: str # verify, confirm, reset, unsubscribe, action domain: Optional[str] = None text: Optional[str] = None # Link text if available # Patterns for verification codes VERIFICATION_PATTERNS = [ # Numeric codes (4-8 digits) (r'\b(\d{4,8})\b', 'numeric'), # Alphanumeric codes (common formats) (r'\b([A-Z0-9]{4,8})\b', 'alphanumeric'), # Codes with dashes (r'\b([A-Z0-9]{3,4}-[A-Z0-9]{3,4})\b', 'dashed'), # Chinese verification code patterns (r'验证码[::\s]*(\d{4,8})', 'chinese_numeric'), (r'验证码[::\s]*([A-Z0-9]{4,8})', 'chinese_alphanumeric'), ] # Context patterns that indicate verification codes VERIFICATION_CONTEXTS = [ r'verification\s*code', r'confirm(?:ation)?\s*code', r'security\s*code', r'OTP', r'one[- ]?time[- ]?password', r'PIN', r'authenticate', r'验证码', r'校验码', r'确认码', r'动态码', r'Your\s+(?:verification|security|confirmation)?\s*code\s+is', r'Enter\s+(?:this|the)\s+code', r'Use\s+(?:this|the)\s+code', r'code\s+(?:is|:)\s*[A-Z0-9]', ] # Link patterns for different action types LINK_PATTERNS = { 'verify': [ r'verify', r'confirm', r'activate', r'validate', r'认证', r'验证', ], 'reset': [ r'reset', r'change\s+password', r'new\s+password', r'forgot\s+password', r'重置', r'修改密码', ], 'unsubscribe': [ r'unsubscribe', r'opt[- ]?out', r'remove', r'stop\s+receiving', r'cancel\s+subscription', r'退订', r'取消订阅', ], 'action': [ r'click\s+here', r'view\s+(?:your|the)', r'respond', r'reply', r'accept', r'decline', r'join', r'start', r'continue', ], } def extract_verification_codes(text: str) -> list[VerificationCode]: """Extract verification codes from email text. IMPORTANT: This function ONLY extracts codes that appear near verification keywords. Standalone numbers are NOT extracted to avoid false positives (addresses, order numbers, tracking numbers, etc.). Args: text: Email body text to search Returns: List of VerificationCode objects found """ if not text: return [] codes = [] seen = set() # Only extract codes that are near verification keywords # This is the ONLY extraction method to avoid false positives text_lower = text.lower() for context_pattern in VERIFICATION_CONTEXTS: # Find context matches for match in re.finditer(context_pattern, text_lower, re.IGNORECASE): # Look for codes in surrounding text (100 chars before and after) start = max(0, match.start() - 100) end = min(len(text), match.end() + 100) context_text = text[start:end] # Try to find codes in this context for code_pattern, _ in VERIFICATION_PATTERNS: for code_match in re.finditer(code_pattern, context_text): code = code_match.group(1) if code not in seen and _is_likely_code_in_context(code, context_text): seen.add(code) # Extract context (service name if available) ctx = _extract_service_context(text, match.start()) codes.append(VerificationCode(code=code, context=ctx)) return codes def _is_likely_code(code: str) -> bool: """Check if a string looks like a verification code. Filters out things like years, prices, phone numbers, etc. """ # Filter out years (2000-2100) if len(code) == 4 and code.isdigit(): year = int(code) if 2000 <= year <= 2100: return False # Filter out common non-code patterns if code.lower() in ['http', 'https', 'html', 'www', 'utf8', 'json']: return False # Filter out pure letters (likely words) if code.isalpha(): return False # Filter out very long numeric sequences (likely phone/tracking numbers) if code.isdigit() and len(code) > 8: return False # Filter out dates in YYYYMMDD format if code.isdigit() and len(code) == 8: if code.startswith('20'): # Likely a date like 20260321 return False return True def _is_likely_code_in_context(code: str, context: str) -> bool: """Check if a code is likely a verification code given surrounding context. This is more strict than _is_likely_code and considers the context to filter out order numbers, invoice numbers, etc. """ if not _is_likely_code(code): return False # Check if the code appears after order/invoice/tracking markers context_lower = context.lower() false_positive_markers = [ 'order #', 'order no', 'order:', 'invoice', 'tracking', 'reference #', 'ref #', 'account #', 'transaction', 'zip', 'postal', 'phone', 'tel:', 'fax:', # Address patterns 'ste', 'st', 'ave', 'avenue', 'blvd', 'rd', 'road', 'drive', 'dr', 'lane', 'ln', 'court', 'ct', ] # Look for these markers within 30 chars before the code code_pos = context.find(code) if code_pos > 0: before = context[max(0, code_pos - 30):code_pos].lower() for marker in false_positive_markers: if marker in before: return False return True def _extract_service_context(text: str, position: int) -> Optional[str]: """Try to extract service/company name near a code.""" # Look backwards for common patterns before = text[max(0, position - 200):position] # Pattern: "Your Google verification code" or "GitHub code" patterns = [ r'(?:your\s+)?([A-Z][a-zA-Z]+)\s+(?:verification|security|confirmation)\s+code', r'([A-Z][a-zA-Z]+)\s+code\s+is', r'from\s+([A-Z][a-zA-Z]+)', ] for pattern in patterns: match = re.search(pattern, before, re.IGNORECASE) if match: return match.group(1) return None def extract_action_links(text: str, html: Optional[str] = None) -> list[ActionLink]: """Extract action links from email content. Args: text: Plain text email body html: Optional HTML email body for better link extraction Returns: List of ActionLink objects found """ links = [] seen_urls = set() # Extract from HTML if available (better link text extraction) if html: links.extend(_extract_links_from_html(html, seen_urls)) # Also extract from plain text links.extend(_extract_links_from_text(text, seen_urls)) # Filter and classify links action_links = [] for link in links: classified = _classify_link(link) if classified: action_links.append(classified) return action_links def _extract_links_from_html(html: str, seen_urls: set) -> list[ActionLink]: """Extract links from HTML content.""" links = [] # Match <a href="URL">text</a> pattern = r'<a\s+[^>]*href=["\']([^"\']+)["\'][^>]*>([^<]+)</a>' for match in re.finditer(pattern, html, re.IGNORECASE): url = match.group(1).strip() text = match.group(2).strip() # Clean up URL url = _clean_url(url) if url and url not in seen_urls: seen_urls.add(url) domain = _extract_domain(url) links.append(ActionLink( url=url, link_type='unknown', domain=domain, text=text[:100] if text else None )) return links def _extract_links_from_text(text: str, seen_urls: set) -> list[ActionLink]: """Extract links from plain text content.""" links = [] # Match URLs url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' for match in re.finditer(url_pattern, text): url = match.group(0).strip() url = _clean_url(url) if url and url not in seen_urls: seen_urls.add(url) domain = _extract_domain(url) # Try to get context before the URL context_start = max(0, match.start() - 50) context = text[context_start:match.start()].strip() links.append(ActionLink( url=url, link_type='unknown', domain=domain, text=context if context else None )) return links def _clean_url(url: str) -> str: """Clean and normalize a URL.""" # Remove trailing punctuation url = url.rstrip('.,;:)>]') # Decode URL encoding for better pattern matching try: url = unquote(url) except Exception: pass return url def _extract_domain(url: str) -> Optional[str]: """Extract domain from URL.""" try: parsed = urlparse(url) return parsed.netloc except Exception: return None def _classify_link(link: ActionLink) -> Optional[ActionLink]: """Classify a link by type based on URL and text.""" # Combine URL and text for pattern matching check_text = f"{link.url} {link.text or ''}".lower() for link_type, patterns in LINK_PATTERNS.items(): for pattern in patterns: if re.search(pattern, check_text, re.IGNORECASE): return ActionLink( url=link.url, link_type=link_type, domain=link.domain, text=link.text ) # Not an action link return None def extract_from_email( subject: str, body_text: Optional[str] = None, body_html: Optional[str] = None ) -> dict: """Extract all useful information from an email. Args: subject: Email subject line body_text: Plain text body body_html: HTML body Returns: Dict with 'codes' and 'links' keys """ # Combine subject with body for code extraction full_text = f"{subject}\n\n{body_text or ''}" codes = extract_verification_codes(full_text) links = extract_action_links(body_text or '', body_html) return { 'codes': codes, 'links': links, } FILE:email_bridge/models.py """Data models for email bridge.""" from datetime import datetime from typing import Optional from enum import Enum from pydantic import BaseModel, Field class EmailCategory(str, Enum): """Email categories.""" VERIFICATION = "verification" SECURITY = "security" SUBSCRIPTION = "subscription" SPAM_LIKE = "spam_like" NORMAL = "normal" class AccountStatus(str, Enum): """Account status.""" ACTIVE = "active" DISABLED = "disabled" class EmailProvider(str, Enum): """Supported email providers.""" MOCK = "mock" GMAIL = "gmail" QQ = "qq" NETEASE = "netease" class Account(BaseModel): """Email account configuration.""" id: str email: str provider: EmailProvider status: AccountStatus = AccountStatus.ACTIVE display_name: Optional[str] = None created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) # Provider-specific config (e.g., tokens, credentials) stored encrypted later config: dict = Field(default_factory=dict) class Message(BaseModel): """Email message metadata.""" id: str account_id: str message_id: str # Original message ID from provider subject: str sender: str sender_name: Optional[str] = None recipients: list[str] = Field(default_factory=list) received_at: datetime is_read: bool = False is_starred: bool = False category: EmailCategory = EmailCategory.NORMAL # Optional cached content preview: Optional[str] = None # First ~200 chars body_text: Optional[str] = None body_html: Optional[str] = None # Metadata synced_at: datetime = Field(default_factory=datetime.utcnow) provider_data: dict = Field(default_factory=dict) # Provider-specific raw data FILE:email_bridge/sanitize.py """Content sanitization for email notifications. Prevents prompt injection attacks from email content. """ import re from typing import Optional # Dangerous patterns that could be used for prompt injection DANGEROUS_PATTERNS = [ # Instruction override attempts r'ignore\s+(all\s+)?previous\s+instructions?', r'forget\s+(all\s+)?(previous\s+)?instructions?', r'disregard\s+(all\s+)?instructions?', r'override\s+(all\s+)?instructions?', # Role manipulation r'you\s+are\s+now\s+', r'act\s+as\s+(a|an)\s+', r'pretend\s+(to\s+be|you\s+are)\s+', r'role[\s\-]*play\s+(as|that)\s+', # System prompt injection r'new\s+instructions?:?\s*', r'system\s*:\s*', r'assistant\s*:\s*', r'user\s*:\s*', # Special tokens r'<\|.*?\|>', r'\[SYSTEM\]', r'\[INST\]', r'<<.*?>>', # Escape sequences r'\\n\\n', r'\\r\\n', ] # Invisible Unicode characters to remove (zero-width spaces, etc.) INVISIBLE_CHARS_PATTERN = r'[\u200b-\u200f\u2028-\u202f\u205f-\u206f\ufeff\u00ad\u034f\u061c\u17b4-\u17b5\u180e\u200c\u200d]+' def remove_invisible_chars(text: str) -> str: """Remove invisible Unicode characters from text. These characters are often used in HTML emails for formatting but appear as garbage in plain text output. Includes: - Zero-width spaces (U+200B, U+200C, U+200D) - Zero-width non-joiners/joiners - Word joiners, function applications, etc. - BOM (U+FEFF) - Soft hyphen (U+00AD) - Various other invisible formatting chars """ if not text: return text return re.sub(INVISIBLE_CHARS_PATTERN, '', text) def sanitize_for_notification( text: str, max_length: int = 500, placeholder: str = "[REMOVED]", ) -> str: """Sanitize text content for safe notification. Removes potential prompt injection patterns, invisible characters, and limits length. Args: text: The text to sanitize max_length: Maximum allowed length (default 500) placeholder: String to replace dangerous patterns with Returns: Sanitized text safe for inclusion in notifications """ if not text: return "" result = text # Remove invisible Unicode characters first result = remove_invisible_chars(result) # Remove dangerous patterns (case-insensitive) for pattern in DANGEROUS_PATTERNS: result = re.sub(pattern, placeholder, result, flags=re.IGNORECASE) # Remove control characters except newlines and tabs result = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', result) # Normalize whitespace result = re.sub(r'\s+', ' ', result) # Truncate to max length if len(result) > max_length: result = result[:max_length].rsplit(' ', 1)[0] + "..." return result.strip() def sanitize_sender(sender: str, max_length: int = 100) -> str: """Sanitize sender name/email for display. Args: sender: Sender name or email max_length: Maximum allowed length Returns: Sanitized sender string """ if not sender: return "Unknown" # Remove any special characters that could be problematic result = re.sub(r'[<>"\']', '', sender) # Remove invisible characters result = remove_invisible_chars(result) # Limit length if len(result) > max_length: result = result[:max_length] + "..." return result.strip() def sanitize_subject(subject: str, max_length: int = 100) -> str: """Sanitize email subject for display. Args: subject: Email subject max_length: Maximum allowed length Returns: Sanitized subject string """ if not subject: return "(No subject)" # Remove invisible characters result = remove_invisible_chars(subject) # Remove newlines that could break formatting result = result.replace('\n', ' ').replace('\r', ' ') # Limit length if len(result) > max_length: result = result[:max_length].rsplit(' ', 1)[0] + "..." return result.strip() FILE:email_bridge/service.py """Service layer for email bridge business logic.""" import uuid from datetime import datetime from typing import Optional, List from .adapters.base import FetchOptions from .adapters.mock import MockAdapter from .adapters.gmail import GmailAdapter, GmailCredentialsNotFoundError, GmailAuthError from .adapters.imap import QQMailAdapter, NetEaseMailAdapter, IMAPAdapterError, IMAPAuthError from .adapters.smtp import SMTPAdapter, SMTPAdapterError, SMTPAuthError, SMTPSendError from .categories import detect_category from .db import Database from .models import Account, AccountStatus, EmailCategory, EmailProvider, Message class EmailBridgeService: """Core service for email management. This service provides a clean API layer between the CLI/presentation and the data/adapters layer. """ def __init__(self, db: Optional[Database] = None): self.db = db or Database() self._adapters = { EmailProvider.MOCK: MockAdapter(), EmailProvider.GMAIL: GmailAdapter(), EmailProvider.QQ: QQMailAdapter(), EmailProvider.NETEASE: NetEaseMailAdapter(), } # Account management def list_accounts(self, include_disabled: bool = False) -> list[Account]: """List all configured accounts.""" return self.db.list_accounts(include_disabled=include_disabled) def get_account(self, account_id: str) -> Optional[Account]: """Get account by ID.""" return self.db.get_account(account_id) def add_account( self, email: str, provider: EmailProvider, display_name: Optional[str] = None, config: Optional[dict] = None ) -> Account: """Add a new email account.""" account = Account( id=str(uuid.uuid4())[:8], email=email, provider=provider, display_name=display_name, config=config or {}, ) return self.db.add_account(account) def update_account( self, account_id: str, display_name: Optional[str] = None, status: Optional[AccountStatus] = None, config: Optional[dict] = None ) -> Optional[Account]: """Update an existing account.""" account = self.db.get_account(account_id) if not account: return None if display_name is not None: account.display_name = display_name if status is not None: account.status = status if config is not None: account.config = config return self.db.update_account(account) def disable_account(self, account_id: str) -> bool: """Disable an account (soft delete).""" account = self.db.get_account(account_id) if not account: return False account.status = AccountStatus.DISABLED self.db.update_account(account) return True def delete_account(self, account_id: str) -> bool: """Permanently delete an account and its messages.""" return self.db.delete_account(account_id) # Message management def list_recent_messages( self, account_id: Optional[str] = None, limit: int = 20 ) -> list[Message]: """List recent messages across all or one account.""" return self.db.list_messages(account_id=account_id, limit=limit) def list_unread_messages( self, account_id: Optional[str] = None, limit: int = 20 ) -> list[Message]: """List unread messages.""" return self.db.list_messages( account_id=account_id, unread_only=True, limit=limit ) def search_messages( self, keyword: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, account_id: Optional[str] = None, limit: int = 50 ) -> list[Message]: """Search messages by keyword and/or time range.""" return self.db.search_messages( keyword=keyword, start_time=start_time, end_time=end_time, account_id=account_id, limit=limit ) def get_message(self, message_id: str) -> Optional[Message]: """Get a single message with full details. Supports flexible ID lookup: full ID, message_id only, or partial. """ return self.db.find_message(message_id) def mark_read(self, message_id: str, is_read: bool = True) -> bool: """Mark a message as read or unread.""" # First find the message to get its full ID msg = self.db.find_message(message_id) if msg: return self.db.mark_read(msg.id, is_read) return False # Sync operations def sync_account( self, account_id: str, since: Optional[datetime] = None, limit: int = 100 ) -> int: """Sync messages from a provider into local storage. Returns the number of messages synced. """ account = self.db.get_account(account_id) if not account: raise ValueError(f"Account {account_id} not found") if account.status != AccountStatus.ACTIVE: raise ValueError(f"Account {account_id} is not active") adapter = self._adapters.get(account.provider) if not adapter: raise ValueError(f"No adapter for provider {account.provider}") options = FetchOptions(since=since, limit=limit) count = 0 for raw_msg in adapter.fetch_messages(account, options): # Detect category category = detect_category( raw_msg.subject, raw_msg.body_text or "" ) # Generate preview preview = None if raw_msg.body_text: preview = raw_msg.body_text[:200].strip() # Create message record message = Message( id=f"{account_id}:{raw_msg.message_id}", account_id=account_id, message_id=raw_msg.message_id, subject=raw_msg.subject, sender=raw_msg.sender, sender_name=raw_msg.sender_name, recipients=raw_msg.recipients, received_at=raw_msg.received_at, is_read=raw_msg.is_read, category=category, preview=preview, body_text=raw_msg.body_text, body_html=raw_msg.body_html, provider_data=raw_msg.raw_data, ) self.db.add_message(message) count += 1 return count def sync_all_accounts(self, since: Optional[datetime] = None) -> dict[str, int]: """Sync all active accounts. Returns a dict mapping account_id to sync count. """ results = {} for account in self.db.list_accounts(include_disabled=False): try: count = self.sync_account(account.id, since=since) results[account.id] = count except Exception as e: results[account.id] = f"Error: {e}" return results # Stats def get_stats(self) -> dict: """Get overall statistics.""" accounts = self.db.list_accounts(include_disabled=True) active_accounts = [a for a in accounts if a.status == AccountStatus.ACTIVE] unread_count = self.db.count_unread() return { "total_accounts": len(accounts), "active_accounts": len(active_accounts), "unread_messages": unread_count, } # Send email def send_email( self, account_id: str, to: List[str], subject: str, body_text: Optional[str] = None, body_html: Optional[str] = None, cc: Optional[List[str]] = None, bcc: Optional[List[str]] = None, ) -> bool: """Send an email from an account. Args: account_id: Account to send from to: List of recipient email addresses subject: Email subject body_text: Plain text body body_html: HTML body cc: List of CC recipients bcc: List of BCC recipients Returns: True if sent successfully Raises: ValueError: Account not found or inactive SMTPAuthError: Authentication failed SMTPSendError: Sending failed """ account = self.db.get_account(account_id) if not account: raise ValueError(f"Account {account_id} not found") if account.status != AccountStatus.ACTIVE: raise ValueError(f"Account {account_id} is not active") # Use SMTP adapter for all providers # (Gmail can also use API, but SMTP is simpler for now) smtp_adapter = SMTPAdapter() from_name = account.display_name return smtp_adapter.send_email( account=account, to=to, subject=subject, body_text=body_text, body_html=body_html, cc=cc, bcc=bcc, from_name=from_name, ) FILE:fixtures/sample_emails.json { "messages": [ { "message_id": "mock-001", "account_email": "[email protected]", "subject": "Your verification code is 123456", "sender": "[email protected]", "sender_name": "Example Service", "recipients": ["[email protected]"], "received_at": "2026-03-21T09:30:00", "is_read": false, "body_text": "Your verification code is 123456.\n\nThis code will expire in 5 minutes.\n\nIf you didn't request this code, please ignore this email.", "raw_data": {} }, { "message_id": "mock-002", "account_email": "[email protected]", "subject": "Security Alert: New login to your account", "sender": "[email protected]", "sender_name": "Security Team", "recipients": ["[email protected]"], "received_at": "2026-03-21T08:15:00", "is_read": false, "body_text": "We detected a new login to your account from:\n\nDevice: Chrome on Windows\nLocation: Beijing, China\nTime: March 21, 2026 at 8:15 AM\n\nIf this was you, no action is needed.\nIf you didn't log in, please secure your account immediately.", "raw_data": {} }, { "message_id": "mock-003", "account_email": "[email protected]", "subject": "Weekly Newsletter - March Edition", "sender": "[email protected]", "sender_name": "Tech Blog Weekly", "recipients": ["[email protected]"], "received_at": "2026-03-20T14:00:00", "is_read": true, "body_text": "Welcome to the March edition of our newsletter!\n\nTop stories this week:\n- AI advances continue to reshape industries\n- New Python 3.14 released with performance improvements\n- Cloud computing trends for 2026\n\nTo unsubscribe, click here.", "raw_data": {} }, { "message_id": "mock-004", "account_email": "[email protected]", "subject": "Re: Project Update Meeting", "sender": "[email protected]", "sender_name": "Zhang Wei", "recipients": ["[email protected]"], "received_at": "2026-03-20T11:30:00", "is_read": true, "body_text": "Hi,\n\nThanks for the update. The meeting notes look good.\nLet's schedule a follow-up for next week.\n\nBest regards,\nZhang Wei", "raw_data": {} }, { "message_id": "mock-005", "account_email": "[email protected]", "subject": "CONGRATULATIONS! You've WON $1,000,000!!!", "sender": "[email protected]", "sender_name": "International Lottery", "recipients": ["[email protected]"], "received_at": "2026-03-19T22:45:00", "is_read": false, "body_text": "CONGRATULATIONS!!!\n\nYou have been selected as the WINNER of our international lottery!\nClick here NOW to claim your $1,000,000 prize!\n\nAct now - limited time offer!!!", "raw_data": {} }, { "message_id": "mock-006", "account_email": "[email protected]", "subject": "您的QQ邮箱账户安全提醒", "sender": "[email protected]", "sender_name": "QQ邮箱安全中心", "recipients": ["[email protected]"], "received_at": "2026-03-19T16:20:00", "is_read": true, "body_text": "尊敬的用户:\n\n我们检测到您的账户在新设备上登录。\n\n登录时间:2026年3月19日 16:20\n登录地点:上海\n\n如果这不是您本人的操作,请立即修改密码。\n\nQQ邮箱安全中心", "raw_data": {} }, { "message_id": "mock-007", "account_email": "[email protected]", "subject": "Netflix 订阅确认", "sender": "[email protected]", "sender_name": "Netflix", "recipients": ["[email protected]"], "received_at": "2026-03-18T10:00:00", "is_read": true, "body_text": "您的 Netflix 订阅已成功续期。\n\n订阅类型:高级版\n下次扣款日期:2026年4月18日\n\n如需退订,请访问账户设置。", "raw_data": {} }, { "message_id": "mock-008", "account_email": "[email protected]", "subject": "GitHub Security: New personal access token created", "sender": "[email protected]", "sender_name": "GitHub", "recipients": ["[email protected]"], "received_at": "2026-03-17T15:45:00", "is_read": false, "body_text": "A new personal access token (classic) was recently created for your account.\n\nToken name: CI Token\nScopes: repo, workflow\n\nIf you did not make this change, please contact GitHub Support.", "raw_data": {} }, { "message_id": "mock-009", "account_email": "[email protected]", "subject": "Invoice #INV-2026-0342 from Acme Corp", "sender": "[email protected]", "sender_name": "Acme Corp Billing", "recipients": ["[email protected]"], "received_at": "2026-03-15T09:00:00", "is_read": true, "body_text": "Dear Customer,\n\nPlease find attached Invoice #INV-2026-0342 for $299.00.\n\nPayment is due within 30 days.\n\nThank you for your business!\n\nAcme Corp", "raw_data": {} }, { "message_id": "mock-010", "account_email": "[email protected]", "subject": "Your order has shipped!", "sender": "[email protected]", "sender_name": "Online Shop", "recipients": ["[email protected]"], "received_at": "2026-03-14T13:20:00", "is_read": true, "body_text": "Great news! Your order #ORD-12345 has shipped.\n\nTracking number: 1Z999AA10123456784\nExpected delivery: March 17, 2026\n\nItems in this order:\n- Widget Pro x 1\n- Gadget Plus x 2\n\nThank you for shopping with us!", "raw_data": {} } ] } FILE:install.sh #!/bin/bash # Email Bridge Installation Script # Usage: ./install.sh set -e echo "📧 Email Bridge Installer" echo "========================" # Check Python version PYTHON_VERSION=$(python3 --version 2>/dev/null | awk '{print $2}' | cut -d. -f1,2) if [ -z "$PYTHON_VERSION" ]; then echo "❌ Python 3 not found. Please install Python 3.10+ first." exit 1 fi echo "✓ Python $PYTHON_VERSION detected" # Check if uv is available (preferred) if command -v uv &> /dev/null; then echo "✓ Using uv for installation" uv sync source .venv/bin/activate elif command -v pip &> /dev/null; then echo "✓ Using pip for installation" python3 -m venv .venv source .venv/bin/activate pip install -e . else echo "❌ Neither uv nor pip found. Please install one of them." exit 1 fi echo "" echo "✅ Installation complete!" echo "" echo "Quick Start:" echo "-----------" echo "" echo "1. Add your email account:" echo " # QQ Mail (with authorization code)" echo " email-bridge accounts add [email protected] -p qq --config '{\"password\": \"YOUR_AUTH_CODE\"}'" echo "" echo "2. Sync emails:" echo " email-bridge sync" echo "" echo "3. Start daemon for real-time notifications:" echo " email-bridge daemon start -d" echo "" echo "For Gmail setup, see README.md → Gmail 配置指南" echo "" echo "Documentation: https://github.com/ryanchan720/email-bridge" FILE:openclaw.plugin.json { "id": "email-bridge", "name": "Email Bridge", "version": "0.5.0", "description": "Email management skill with real-time notifications, verification code extraction, and multi-provider support (Gmail, QQ Mail, NetEase)", "author": "Ryan Chan", "license": "MIT", "repository": "https://github.com/ryanchan720/email-bridge", "keywords": ["email", "gmail", "qq-mail", "imap", "smtp", "verification-code"], "runtime": "python", "entry": "email_bridge.cli:main", "install": { "command": "pip install -e .", "dependencies": [ "click>=8.0", "pydantic>=2.0", "google-api-python-client>=2.0", "google-auth>=2.0", "google-auth-oauthlib>=1.0", "imaplib2>=3.6" ] }, "triggers": { "keywords": { "zh": ["邮箱", "邮件", "电子邮件", "发邮件", "查看邮件", "验证码", "QQ邮箱", "Gmail", "163邮箱", "126邮箱"], "en": ["email", "mail", "send email", "check email", "verification code"] } }, "capabilities": [ "email:receive", "email:send", "email:sync", "email:extract_code", "email:daemon" ], "config": { "accounts": { "type": "array", "description": "Email accounts to configure", "items": { "email": "string", "provider": "string", "password": "string" } }, "daemon": { "poll_interval": { "type": "number", "default": 300, "description": "Polling interval for Gmail (seconds)" }, "notify": { "type": "boolean", "default": true, "description": "Send notifications for new emails" } } } } FILE:pyproject.toml [project] name = "email-bridge" version = "0.6.3" description = "Personal email middleware for AI assistants with real-time notifications, smart categorization, and verification code extraction (Gmail, QQ Mail, NetEase)" readme = "README.md" license = {text = "MIT"} requires-python = ">=3.10" authors = [ {name = "Ryan Chan", email = "[email protected]"} ] keywords = ["email", "gmail", "qq-mail", "imap", "smtp", "verification-code", "notification"] classifiers = [ "Development Status :: 4 - Beta", "Environment :: Console", "Intended Audience :: Developers", "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Topic :: Communications :: Email", ] dependencies = [ "click>=8.0", "pydantic>=2.0", "google-api-python-client>=2.0", "google-auth>=2.0", "google-auth-oauthlib>=1.0", "imaplib2>=3.6", ] [project.optional-dependencies] dev = [ "pytest>=7.0", "pytest-cov>=4.0", ] [project.scripts] email-bridge = "email_bridge.cli:main" [project.urls] Homepage = "https://github.com/ryanchan720/email-bridge" Repository = "https://github.com/ryanchan720/email-bridge" Issues = "https://github.com/ryanchan720/email-bridge/issues" [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["email_bridge"] FILE:references/gmail-setup.md # Gmail 配置指南 > ⚠️ **注意**:Gmail API 配置流程较复杂,需要 Google Cloud 项目和 OAuth 授权。 > 如果你只需要基本的收发邮件功能,建议使用 QQ 邮箱或网易邮箱(授权码配置更简单)。 ## 前置条件 - 一个 Google 账号(Gmail 邮箱) - 能访问 Google Cloud Console(可能需要科学上网) --- ## Step 1:创建 Google Cloud 项目 1. 打开 [Google Cloud Console](https://console.cloud.google.com/) 2. 登录你的 Google 账号 3. 页面顶部点击项目选择器,点击 **"NEW PROJECT"** 4. 项目名称随便填(如 `email-bridge`),点击 **"CREATE"** 5. 创建完成后选中该项目 --- ## Step 2:启用 Gmail API 1. 打开 [Gmail API 页面](https://console.cloud.google.com/apis/library/gmail.googleapis.com) 2. 确保顶部显示的是刚创建的项目 3. 点击 **"ENABLE"** --- ## Step 3:配置 OAuth 同意屏幕 1. 打开 [OAuth 同意屏幕配置](https://console.cloud.google.com/apis/credentials/consent) 2. 用户类型选择 **"External"**,点击 **"CREATE"** ### 填写配置 | 字段 | 填写内容 | |------|----------| | App name | `Email Bridge` 或任意名称 | | User support email | 选择你的邮箱 | | Developer contact | 填你的邮箱 | | 其他字段 | 可留空 | 点击 **"SAVE AND CONTINUE"**。 ### Scopes(权限范围) 1. 点击 **"ADD OR REMOVE SCOPES"** 2. 搜索 `gmail.modify`,勾选 `https://www.googleapis.com/auth/gmail.modify` 3. 点击 **"UPDATE"**,然后 **"SAVE AND CONTINUE"** > 💡 `gmail.modify` 包含读取和修改权限,以后发邮件也能用。 ### Test Users(测试用户) 1. 点击 **"ADD USERS"** 2. 填写你的 Gmail 地址 3. 点击 **"ADD"**,然后 **"SAVE AND CONTINUE"** 最后检查无误,点击 **"BACK TO DASHBOARD"**。 --- ## Step 4:创建 OAuth 客户端凭证 1. 打开 [Credentials 页面](https://console.cloud.google.com/apis/credentials) 2. 点击 **"CREATE CREDENTIALS"** → **"OAuth client ID"** 3. Application type 选择 **"Desktop app"** 4. Name 随便填(如 `Email Bridge CLI`) 5. 点击 **"CREATE"** 6. 在弹出窗口点击 **"DOWNLOAD JSON"** 下载凭证文件 --- ## Step 5:安装凭证文件 ```bash # 创建目录 mkdir -p ~/.email-bridge/gmail # 将下载的凭证文件移动过去,重命名为 credentials.json mv ~/Downloads/client_secret_xxx.json ~/.email-bridge/gmail/credentials.json ``` --- ## Step 6:添加账户并授权 ```bash # 添加 Gmail 账户 email-bridge accounts add [email protected] --provider gmail --name "Personal Gmail" # 首次同步,会打开浏览器要求授权 email-bridge sync ``` 授权完成后,token 会自动保存在 `~/.email-bridge/gmail/token_*.json`,后续同步无需再次授权。 --- ## 配置选项 ```bash # 自定义同步范围:最近 3 天,最多 50 封 email-bridge accounts add [email protected] -p gmail \ --config '{"sync_days": 3, "sync_max_messages": 50}' ``` | 配置项 | 说明 | 默认值 | |--------|------|--------| | `sync_days` | 同步最近 N 天的邮件 | 7 | | `sync_max_messages` | 每次同步最大邮件数 | 100 | | `credentials_path` | 自定义凭证文件路径 | `~/.email-bridge/gmail/credentials.json` | | `token_path` | 自定义 token 存储路径 | 自动生成 | --- ## 发送邮件 Gmail SMTP 发送需要**应用专用密码**(与 OAuth token 不同): 1. 开启 Google 账户的两步验证 2. 访问 [应用专用密码](https://myaccount.google.com/apppasswords) 3. 生成一个新密码,选择"邮件"和"其他设备" 4. 更新账户配置: ```bash email-bridge accounts update <account_id> --config '{"password": "YOUR_APP_PASSWORD"}' ``` FILE:scripts/email-bridge.sh #!/bin/bash # Email Bridge CLI wrapper # This script ensures email-bridge is installed and forwards commands set -e SKILL_DIR="$(cd "$(dirname "$0")/.." && pwd)" VENV_DIR="$SKILL_DIR/.venv" # Check if installed if [ ! -d "$VENV_DIR" ]; then echo "📧 Email Bridge 未安装,正在安装..." cd "$SKILL_DIR" if command -v uv &> /dev/null; then uv sync else python3 -m venv .venv source .venv/bin/activate pip install -e . fi echo "✅ 安装完成" fi # Activate venv and run command source "$VENV_DIR/bin/activate" email-bridge "$@" FILE:scripts/email_bridge.py #!/usr/bin/env python3 """ Email Bridge skill installer and entry point. When first called, installs the package if needed, then forwards commands. """ import subprocess import sys from pathlib import Path SKILL_DIR = Path(__file__).parent.parent VENV_DIR = SKILL_DIR / ".venv" EMAIL_BRIDGE_CMD = "email-bridge" def is_installed() -> bool: """Check if email-bridge is available.""" try: subprocess.run( [EMAIL_BRIDGE_CMD, "--version"], capture_output=True, check=True ) return True except (subprocess.CalledProcessError, FileNotFoundError): return False def install_package(): """Install email-bridge package.""" print("📧 正在安装 Email Bridge...") # Check Python version if sys.version_info < (3, 10): print("❌ 需要 Python 3.10 或更高版本") sys.exit(1) # Install using pip subprocess.run( [sys.executable, "-m", "pip", "install", "-e", str(SKILL_DIR)], check=True ) print("✅ Email Bridge 安装完成") def main(): """Main entry point.""" # Check if installed if not is_installed(): install_package() # Forward command to email-bridge cmd = [EMAIL_BRIDGE_CMD] + sys.argv[1:] result = subprocess.run(cmd) sys.exit(result.returncode) if __name__ == "__main__": main() FILE:test_installation.py #!/usr/bin/env python3 """ Email Bridge Test Script Run this to verify your Email Bridge installation. """ import sys from pathlib import Path # Add parent to path sys.path.insert(0, str(Path(__file__).parent)) from email_bridge.service import EmailBridgeService from email_bridge.models import EmailProvider def test_database(): """Test database initialization.""" print("📦 Testing database...") service = EmailBridgeService() accounts = service.list_accounts() print(f" ✓ Database initialized ({len(accounts)} accounts)") return service def test_mock_adapter(): """Test mock adapter.""" print("\n🎭 Testing mock adapter...") service = EmailBridgeService() # Check if mock account exists accounts = service.list_accounts() mock_account = next((a for a in accounts if a.provider == EmailProvider.MOCK), None) if not mock_account: # Add mock account mock_account = service.add_account( email="[email protected]", provider=EmailProvider.MOCK, display_name="Demo Account" ) print(f" ✓ Created mock account: {mock_account.email}") # Sync messages count = service.sync_account(mock_account.id) print(f" ✓ Synced {count} messages from mock adapter") # List messages messages = service.list_recent_messages(account_id=mock_account.id, limit=5) print(f" ✓ Retrieved {len(messages)} messages") return True def test_verification_extraction(): """Test verification code extraction.""" print("\n🔢 Testing verification code extraction...") from email_bridge.extraction import extract_from_email # Test cases test_cases = [ ("验证码: 123456", "您的验证码是 123456,5分钟内有效"), ("Your code is: 789012", "Your verification code: 789012"), ("验证码:ABC123", "短信验证码 ABC123"), ] for subject, body in test_cases: result = extract_from_email(subject=subject, body_text=body) if result['codes']: print(f" ✓ Extracted: {result['codes'][0].code} from '{subject}'") else: print(f" ✗ Failed to extract from '{subject}'") return False return True def test_accounts_command(): """Test accounts list command.""" print("\n👥 Testing accounts command...") service = EmailBridgeService() accounts = service.list_accounts() print(f" ✓ Found {len(accounts)} account(s):") for acc in accounts: print(f" - {acc.email} ({acc.provider.value})") return True def main(): print("=" * 50) print("📧 Email Bridge Test Suite") print("=" * 50) tests = [ ("Database", test_database), ("Mock Adapter", test_mock_adapter), ("Verification Extraction", test_verification_extraction), ("Accounts Command", test_accounts_command), ] results = [] for name, test_func in tests: try: result = test_func() results.append((name, result, None)) except Exception as e: results.append((name, False, str(e))) print("\n" + "=" * 50) print("📊 Test Results") print("=" * 50) passed = sum(1 for _, r, _ in results if r) total = len(results) for name, result, error in results: status = "✅ PASS" if result else "❌ FAIL" print(f" {status}: {name}") if error: print(f" Error: {error}") print(f"\n总计: {passed}/{total} 通过") if passed == total: print("\n🎉 所有测试通过!Email Bridge 工作正常。") return 0 else: print("\n⚠️ 部分测试失败,请检查上述错误。") return 1 if __name__ == "__main__": sys.exit(main()) FILE:tests/__init__.py FILE:tests/test_basic.py """Basic tests for email bridge.""" import tempfile from pathlib import Path import pytest from email_bridge.models import Account, EmailProvider, AccountStatus, Message, EmailCategory from email_bridge.db import Database from email_bridge.service import EmailBridgeService from email_bridge.categories import detect_category, EmailCategory as Cat class TestModels: def test_account_creation(self): account = Account( id="test1", email="[email protected]", provider=EmailProvider.MOCK, ) assert account.id == "test1" assert account.email == "[email protected]" assert account.status == AccountStatus.ACTIVE def test_message_creation(self): from datetime import datetime msg = Message( id="acc:msg1", account_id="acc", message_id="msg1", subject="Test Subject", sender="[email protected]", recipients=["[email protected]"], received_at=datetime.utcnow(), ) assert msg.subject == "Test Subject" assert msg.is_read is False class TestDatabase: def test_database_operations(self, tmp_path): db = Database(db_path=tmp_path / "test.db") # Add account account = Account( id="test1", email="[email protected]", provider=EmailProvider.MOCK, ) db.add_account(account) # List accounts accounts = db.list_accounts() assert len(accounts) == 1 assert accounts[0].email == "[email protected]" # Get account acc = db.get_account("test1") assert acc is not None assert acc.email == "[email protected]" db.close() def test_message_operations(self, tmp_path): from datetime import datetime db = Database(db_path=tmp_path / "test.db") # Add account first account = Account(id="acc1", email="[email protected]", provider=EmailProvider.MOCK) db.add_account(account) # Add message msg = Message( id="acc1:msg1", account_id="acc1", message_id="msg1", subject="Test Subject", sender="[email protected]", recipients=["[email protected]"], received_at=datetime.utcnow(), category=EmailCategory.NORMAL, ) db.add_message(msg) # List messages msgs = db.list_messages() assert len(msgs) == 1 assert msgs[0].subject == "Test Subject" # Find message found = db.find_message("msg1") assert found is not None assert found.subject == "Test Subject" db.close() class TestCategoryDetection: def test_verification_category(self): cat = detect_category("Your verification code is 123456", "") assert cat == Cat.VERIFICATION def test_security_category(self): cat = detect_category("Security Alert: New login", "") assert cat == Cat.SECURITY def test_subscription_category(self): cat = detect_category("Weekly Newsletter", "Click to unsubscribe") assert cat == Cat.SUBSCRIPTION def test_spam_category(self): cat = detect_category("CONGRATULATIONS! You've WON!!!", "") assert cat == Cat.SPAM_LIKE def test_normal_category(self): cat = detect_category("Re: Project Update Meeting", "Hi, thanks for the update") assert cat == Cat.NORMAL class TestService: def test_add_account(self, tmp_path): from email_bridge.db import Database db = Database(db_path=tmp_path / "test.db") service = EmailBridgeService(db=db) account = service.add_account( email="[email protected]", provider=EmailProvider.MOCK, display_name="Test Account" ) assert account.email == "[email protected]" assert account.provider == EmailProvider.MOCK db.close() def test_stats(self, tmp_path): from email_bridge.db import Database db = Database(db_path=tmp_path / "test.db") service = EmailBridgeService(db=db) stats = service.get_stats() assert stats["total_accounts"] == 0 assert stats["unread_messages"] == 0 db.close() FILE:tests/test_extraction.py """Tests for extraction module.""" import pytest from email_bridge.extraction import ( extract_verification_codes, extract_action_links, extract_from_email, VerificationCode, ActionLink, ) class TestVerificationCodeExtraction: def test_numeric_code_with_context(self): text = "Your verification code is 123456. Enter this code to verify your account." codes = extract_verification_codes(text) assert len(codes) >= 1 assert any(c.code == "123456" for c in codes) def test_chinese_verification_code(self): text = "您的验证码是:888999。请在5分钟内输入。" codes = extract_verification_codes(text) assert len(codes) >= 1 assert any(c.code == "888999" for c in codes) def test_alphanumeric_code(self): text = "Your security code is: ABC123XY" codes = extract_verification_codes(text) # Should find the alphanumeric code assert len(codes) >= 1 found = any("ABC" in c.code.upper() or "XY" in c.code.upper() for c in codes) assert found def test_dashed_code(self): text = "Enter confirmation code AB12-CD34 to continue" codes = extract_verification_codes(text) assert len(codes) >= 1 assert any("-" in c.code for c in codes) def test_empty_text(self): codes = extract_verification_codes("") assert codes == [] codes = extract_verification_codes(None) assert codes == [] def test_filters_out_years(self): text = "In 2024 we launched our product. Your code is 1234." codes = extract_verification_codes(text) # Should find 1234 but not 2024 (it's a year) assert any(c.code == "1234" for c in codes) assert not any(c.code == "2024" for c in codes) def test_otp_keyword(self): text = "OTP: 998877" codes = extract_verification_codes(text) assert len(codes) >= 1 assert any(c.code == "998877" for c in codes) class TestVerificationCodeFalsePositives: """Tests for filtering out false positives.""" def test_filters_order_numbers(self): """Order/invoice numbers should not be treated as verification codes.""" text = "Your order #123456 has been shipped. Track your package." codes = extract_verification_codes(text) # Should not match 123456 as a verification code assert not any(c.code == "123456" for c in codes) def test_filters_invoice_numbers(self): """Invoice numbers should not be treated as verification codes.""" text = "Invoice INV-123456 is due on March 30th." codes = extract_verification_codes(text) # INV-123456 should not be extracted as a verification code assert not any("123456" in c.code for c in codes) def test_filters_phone_numbers(self): """Phone numbers should not be treated as verification codes.""" text = "Call us at 1234567890 for support." codes = extract_verification_codes(text) # 10-digit number should not be extracted assert not any(c.code == "1234567890" for c in codes) def test_filters_tracking_numbers(self): """Long tracking numbers should not be treated as verification codes.""" text = "Tracking number: 123456789012345" codes = extract_verification_codes(text) # 15-digit number is too long for a verification code assert not any("123456789012345" in c.code for c in codes) def test_filters_credit_card_like_numbers(self): """Credit card-like patterns should not be treated as verification codes.""" text = "Card ending in 123456789012" codes = extract_verification_codes(text) # 12-digit number is too long assert not any("123456789012" in c.code for c in codes) def test_real_verification_code_still_works(self): """Real verification codes should still be extracted.""" text = "Your verification code is 123456. Do not share it." codes = extract_verification_codes(text) assert any(c.code == "123456" for c in codes) def test_real_code_with_context_still_works(self): """Codes with verification context should still be extracted.""" text = "GitHub: Your security code is 789012" codes = extract_verification_codes(text) assert any(c.code == "789012" for c in codes) def test_filters_price_like_numbers(self): """Numbers that look like prices should not be codes.""" # Note: This is tricky - 1999 could be a code or a price # But without context, we should be conservative text = "Your total is 1999 dollars." codes = extract_verification_codes(text) # Without verification context, should not extract assert not any(c.code == "1999" for c in codes) def test_filters_plain_numbers_without_context(self): """Plain numbers without verification context should not be extracted.""" text = "The meeting is at 1234 Main Street, room 5678." codes = extract_verification_codes(text) # These are not verification codes assert not any(c.code == "1234" for c in codes) assert not any(c.code == "5678" for c in codes) class TestActionLinkExtraction: def test_verify_link_from_text(self): text = "Click here to verify your email: https://example.com/verify?token=abc123" links = extract_action_links(text) assert len(links) >= 1 verify_link = next((l for l in links if l.link_type == "verify"), None) assert verify_link is not None assert "example.com" in verify_link.url def test_reset_link_from_html(self): html = """ <html> <body> <p>Click below to reset your password:</p> <a href="https://example.com/reset-password?token=xyz">Reset Password</a> </body> </html> """ links = extract_action_links("", html) reset_link = next((l for l in links if l.link_type == "reset"), None) assert reset_link is not None assert reset_link.text == "Reset Password" def test_unsubscribe_link(self): text = "To stop receiving emails, visit https://example.com/unsubscribe" links = extract_action_links(text) unsub_link = next((l for l in links if l.link_type == "unsubscribe"), None) assert unsub_link is not None def test_multiple_links(self): html = """ <a href="https://example.com/verify">Verify Email</a> <a href="https://example.com/reset">Reset Password</a> <a href="https://example.com/home">Visit Homepage</a> """ links = extract_action_links("", html) # Should have verify and reset links (not homepage - it's not an action) action_types = [l.link_type for l in links] assert "verify" in action_types or "reset" in action_types def test_empty_content(self): links = extract_action_links("") # May have some links found, but no action links classified assert all(l.link_type == "unknown" for l in links) or len(links) == 0 class TestExtractFromEmail: def test_full_extraction(self): subject = "Your verification code" body_text = """ Your verification code is 123456. Click here to verify: https://example.com/verify """ body_html = """ <a href="https://example.com/verify">Verify Now</a> """ result = extract_from_email(subject, body_text, body_html) assert "codes" in result assert "links" in result assert len(result["codes"]) >= 1 assert len(result["links"]) >= 1 def test_chinese_email(self): subject = "您的验证码" body_text = "您的验证码是:666888,请在10分钟内使用。" result = extract_from_email(subject, body_text) assert len(result["codes"]) >= 1 assert any(c.code == "666888" for c in result["codes"]) class TestVerificationCodeDataclass: def test_code_creation(self): code = VerificationCode(code="123456", context="GitHub") assert code.code == "123456" assert code.context == "GitHub" def test_optional_context(self): code = VerificationCode(code="123456") assert code.context is None class TestActionLinkDataclass: def test_link_creation(self): link = ActionLink( url="https://example.com/verify", link_type="verify", domain="example.com", text="Verify Email" ) assert link.url == "https://example.com/verify" assert link.link_type == "verify" assert link.domain == "example.com" assert link.text == "Verify Email" FILE:tests/test_gmail.py """Tests for Gmail adapter.""" import pytest from datetime import datetime from pathlib import Path from unittest.mock import Mock, patch, MagicMock from email_bridge.models import Account, EmailProvider from email_bridge.adapters.gmail import ( GmailAdapter, GmailAdapterError, GmailCredentialsNotFoundError, GmailAuthError, ) from email_bridge.adapters.base import FetchOptions, RawMessage class TestGmailAdapter: def test_provider_property(self): adapter = GmailAdapter() assert adapter.provider == EmailProvider.GMAIL def test_default_credentials_dir(self): adapter = GmailAdapter() expected = Path.home() / ".email-bridge" / "gmail" assert adapter.credentials_dir == expected def test_custom_credentials_dir(self): custom_dir = Path("/tmp/test-gmail") adapter = GmailAdapter(credentials_dir=custom_dir) assert adapter.credentials_dir == custom_dir def test_get_credentials_path_from_config(self): adapter = GmailAdapter() account = Account( id="test1", email="[email protected]", provider=EmailProvider.GMAIL, config={"credentials_path": "/custom/creds.json"} ) path = adapter._get_credentials_path(account) assert path == Path("/custom/creds.json") def test_get_credentials_path_default(self): adapter = GmailAdapter() account = Account( id="test1", email="[email protected]", provider=EmailProvider.GMAIL, ) path = adapter._get_credentials_path(account) assert path == adapter.credentials_dir / "credentials.json" def test_get_token_path_from_config(self): adapter = GmailAdapter() account = Account( id="test1", email="[email protected]", provider=EmailProvider.GMAIL, config={"token_path": "/custom/token.json"} ) path = adapter._get_token_path(account) assert path == Path("/custom/token.json") def test_get_token_path_default(self): adapter = GmailAdapter() account = Account( id="test1", email="[email protected]", provider=EmailProvider.GMAIL, ) path = adapter._get_token_path(account) # Email should be sanitized for filename assert "test_user_at_gmail_com" in str(path) def test_credentials_not_found_error(self): adapter = GmailAdapter(credentials_dir=Path("/nonexistent")) account = Account( id="test1", email="[email protected]", provider=EmailProvider.GMAIL, ) with pytest.raises(GmailCredentialsNotFoundError): adapter._get_credentials(account) def test_parse_message(self): adapter = GmailAdapter() # Mock Gmail API message data msg_data = { "id": "msg123", "threadId": "thread123", "labelIds": ["INBOX"], "snippet": "This is a test...", "payload": { "headers": [ {"name": "Subject", "value": "Test Subject"}, {"name": "From", "value": "Sender <[email protected]>"}, {"name": "To", "value": "[email protected]"}, {"name": "Date", "value": "Mon, 20 Mar 2026 10:00:00 +0000"}, ], "mimeType": "text/plain", "body": { "data": "VGhpcyBpcyB0ZXN0IGJvZHk=" # base64 encoded "This is test body" } } } raw_msg = adapter._parse_message(msg_data) assert raw_msg.message_id == "msg123" assert raw_msg.subject == "Test Subject" assert raw_msg.sender == "[email protected]" assert raw_msg.sender_name == "Sender" assert raw_msg.is_read is True # No UNREAD label assert raw_msg.body_text == "This is test body" def test_parse_message_unread(self): adapter = GmailAdapter() msg_data = { "id": "msg123", "threadId": "thread123", "labelIds": ["INBOX", "UNREAD"], "payload": { "headers": [ {"name": "Subject", "value": "Test"}, {"name": "From", "value": "[email protected]"}, {"name": "Date", "value": "Mon, 20 Mar 2026 10:00:00 +0000"}, ], } } raw_msg = adapter._parse_message(msg_data) assert raw_msg.is_read is False def test_parse_message_with_name_parsing(self): adapter = GmailAdapter() msg_data = { "id": "msg123", "threadId": "thread123", "payload": { "headers": [ {"name": "Subject", "value": "Test"}, {"name": "From", "value": '"John Doe" <[email protected]>'}, {"name": "Date", "value": "Mon, 20 Mar 2026 10:00:00 +0000"}, ], } } raw_msg = adapter._parse_message(msg_data) assert raw_msg.sender_name == "John Doe" assert raw_msg.sender == "[email protected]" def test_sync_config_from_account(self): adapter = GmailAdapter() # Test that account config is used for sync parameters account = Account( id="test1", email="[email protected]", provider=EmailProvider.GMAIL, config={ "sync_days": 3, "sync_max_messages": 50 } ) # We can't test actual fetch without mock service, but we can verify # the config is accessible assert account.config.get("sync_days") == 3 assert account.config.get("sync_max_messages") == 50 class TestGmailAdapterErrors: def test_error_hierarchy(self): # Test that errors are properly inherited assert issubclass(GmailCredentialsNotFoundError, GmailAdapterError) assert issubclass(GmailAuthError, GmailAdapterError) assert issubclass(GmailAdapterError, Exception) def test_credentials_error_message(self): error = GmailCredentialsNotFoundError("Credentials not found") assert "Credentials not found" in str(error) def test_auth_error_message(self): error = GmailAuthError("Auth failed") assert "Auth failed" in str(error)
让 OpenClaw 能够操作 Revit。当用户提及 Revit 相关操作(图纸、标注、视图、元素等)时自动调用。 支持的操作包括: - 检查 Revit 状态 - 列出可用工具 - 执行 Revit 命令(生成图纸、创建标注、查询元素等)
---
name: copilot-for-revit
description: |
让 OpenClaw 能够操作 Revit。当用户提及 Revit 相关操作(图纸、标注、视图、元素等)时自动调用。
支持的操作包括:
- 检查 Revit 状态
- 列出可用工具
- 执行 Revit 命令(生成图纸、创建标注、查询元素等)
env:
required:
- name: REVIT_MCP_URL
description: Revit MCP 服务地址(格式:http://<WINDOWS_IP>:18181)
default: "http://localhost:18181"
- name: OPENCLAW_BRIDGE_DIR
description: openclaw-bridge 仓库路径
default: "~/repos/openclaw-bridge"
config:
- path: REVIT_MCP_URL
description: Revit MCP 服务地址
required: true
- path: OPENCLAW_BRIDGE_DIR
description: openclaw-bridge 仓库路径
required: false
security:
permissions:
- network-access # 需要访问远程 Revit MCP 服务
warnings:
- "此 skill 可以执行修改 Revit 项目的命令(如删除元素、修改参数等),请在信任的环境中使用"
- "默认无命令执行确认,建议在测试项目中验证后再用于生产环境"
---
# Copilot for Revit Skill
自动检测 Revit 相关意图,通过 `openclaw-bridge` 调用远程 Revit MCP 服务。
## ⚠️ 安全警告
**此 skill 可以执行修改 Revit 项目的命令**(如删除元素、修改参数、生成图纸等)。
- 请确保在**信任的网络环境**中使用
- 建议先在**测试项目**中验证
- 如需命令执行确认,可在 OpenClaw 配置中启用命令确认机制
## 前置条件
1. **Windows 端**已配置好 Copilot for Revit(参考 [快速开始指南](https://github.com/ryanchan720/copilot-for-revit/blob/main/QUICKSTART.md))
2. **Linux 端**已安装 openclaw-bridge(`git clone https://github.com/ryanchan720/openclaw-bridge`)
3. 网络互通:Linux 能访问 Windows 的 18181 端口
## 配置
### 环境变量
| 变量 | 说明 | 默认值 |
|------|------|--------|
| `REVIT_MCP_URL` | Revit MCP 服务地址 | `http://localhost:18181` |
| `OPENCLAW_BRIDGE_DIR` | openclaw-bridge 仓库路径 | `~/repos/openclaw-bridge` |
在 `~/.bashrc` 或 `~/.zshrc` 中配置:
```bash
export REVIT_MCP_URL="http://192.168.1.100:18181"
export OPENCLAW_BRIDGE_DIR="$HOME/repos/openclaw-bridge"
```
### 验证
```bash
# 测试连通性
curl $REVIT_MCP_URL/sse
# 或使用 bridge CLI
cd $OPENCLAW_BRIDGE_DIR
uv run python -m openclaw_bridge.cli health
```
## 使用方式
### 自动检测(推荐)
直接在聊天中提及 Revit 相关操作:
```
用户: Revit 在线吗?
OpenClaw: [自动调用 health 检查]
用户: 帮我看看 Revit 里有哪些可用命令
OpenClaw: [自动调用 tools list]
用户: 帮我在当前视图里创建梁标记
OpenClaw: [自动匹配 TagBeamCommand 并调用]
```
### 显式调用
明确指定要使用的工具:
```
用户: 用 GetEnvInfoCommand 查看一下环境信息
OpenClaw: [调用指定工具]
```
## 触发关键词
当用户消息包含以下关键词时自动激活:
- Revit
- 图纸 / 标注 / 视图 / 元素
- 墙 / 门 / 窗 / 梁 / 柱
- 标高 / 房间
- 以及从 `tools list` 动态获取的所有工具名称
## 错误处理
| 错误 | 提示 |
|------|------|
| Revit 不在线 | 提示用户启动 Revit 或检查网络 |
| Revit is not ready | 提示用户打开项目文件 |
| 工具不存在 | 列出可用工具供用户选择 |
| 参数缺失 | 向用户询问缺失的参数 |
## 注意事项
1. **Revit 必须打开项目**:大多数命令需要在打开 `.rvt` 文件后才能执行
2. **同步执行**:当前为同步调用,长耗时命令可能需要等待
3. **网络依赖**:需要 Linux 主机能访问 Windows 的 18181 端口
## 相关链接
- [Copilot for Revit](https://github.com/ryanchan720/copilot-for-revit) - 主框架
- [openclaw-bridge](https://github.com/ryanchan720/openclaw-bridge) - 桥接器
- [通用命令插件](https://github.com/ryanchan720/general-copilot-addins-for-revit) - 开箱即用的命令
FILE:README.md
# Copilot for Revit Skill
OpenClaw skill,让用户能在飞书、Telegram 等聊天工具中用自然语言操作 Revit。
## 安装
### 方式一:从 ClawHub 安装(推荐)
```bash
openclaw skill install copilot-for-revit
```
### 方式二:从 GitHub 安装
```bash
git clone https://github.com/ryanchan720/copilot-for-revit-skill.git
cp -r copilot-for-revit-skill ~/.openclaw/workspace/skills/copilot-for-revit
```
## 前置条件
1. **Windows 端**已配置好 Copilot for Revit
2. **Linux 端**已安装 openclaw-bridge
3. 网络互通
详细配置请参考 [快速开始指南](https://github.com/ryanchan720/copilot-for-revit/blob/main/QUICKSTART.md)。
## 配置
在 `~/.bashrc` 或 `~/.zshrc` 中添加:
```bash
export REVIT_MCP_URL="http://<WINDOWS_IP>:18181"
export OPENCLAW_BRIDGE_DIR="$HOME/repos/openclaw-bridge"
```
## 使用示例
```
你: Revit 在线吗?
OpenClaw: 在线,版本 1.0.0,协议 2024-11-05
你: 帮我看看当前项目有哪些门
OpenClaw: 找到 15 种门类型...
你: 把所有门的高度改成 2100
OpenClaw: 已更新 23 扇门的高度参数
```
## 文件结构
```
copilot-for-revit/
├── SKILL.md # Skill 定义
├── README.md # 本文件
└── scripts/
└── revit_call.py # Bridge 调用脚本
```
## 许可证
MIT License
FILE:scripts/revit_call.py
#!/usr/bin/env python3
"""
OpenClaw Bridge Caller
调用 openclaw-bridge CLI 来操作 Revit
"""
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any
# 配置 - 从环境变量读取,支持自定义
REVIT_MCP_URL = os.environ.get("REVIT_MCP_URL", "http://localhost:18181")
BRIDGE_DIR = Path(os.environ.get("OPENCLAW_BRIDGE_DIR", Path.home() / "repos" / "openclaw-bridge"))
def run_bridge_command(command: str, args: dict | None = None) -> dict:
"""
运行 bridge CLI 命令
Args:
command: 命令类型,如 "health", "tools list", "tools call <name>"
args: 工具参数(仅用于 tools call)
Returns:
dict: 命令结果
"""
# 构建 uv run 命令
cmd_parts = [
"uv", "run",
"--with", "httpx",
"--with", "click",
"python", "-m", "openclaw_bridge.cli"
]
# 添加子命令
cmd_parts.extend(command.split())
# 如果是 tools call,添加参数
if "tools call" in command and args:
cmd_parts.extend(["--args", json.dumps(args)])
try:
result = subprocess.run(
cmd_parts,
cwd=BRIDGE_DIR,
capture_output=True,
text=True,
timeout=60,
env={**subprocess.os.environ, "REVIT_MCP_URL": REVIT_MCP_URL}
)
# 尝试解析 JSON 输出(可能跨多行)
stdout = result.stdout.strip()
# 找到第一个 { 和最后一个 }
start_idx = stdout.find('{')
end_idx = stdout.rfind('}')
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
json_str = stdout[start_idx:end_idx + 1]
try:
return json.loads(json_str)
except json.JSONDecodeError as e:
return {
"error": True,
"message": f"Failed to parse JSON: {e}",
"raw": json_str
}
else:
return {
"error": True,
"message": f"No JSON object found in output: {stdout}",
"stderr": result.stderr
}
except subprocess.TimeoutExpired:
return {"error": True, "message": "Command timed out"}
except Exception as e:
return {"error": True, "message": str(e)}
def health() -> dict:
"""检查 Revit 状态"""
return run_bridge_command("health")
def tools_list() -> dict:
"""列出所有可用工具"""
return run_bridge_command("tools list")
def tools_call(name: str, arguments: dict | None = None) -> dict:
"""调用指定工具"""
return run_bridge_command(f"tools call {name}", arguments)
def main():
"""CLI 入口"""
if len(sys.argv) < 2:
print("Usage: revit_call.py <command> [args]")
print("Commands: health, tools-list, tools-call")
sys.exit(1)
command = sys.argv[1]
if command == "health":
result = health()
elif command == "tools-list":
result = tools_list()
elif command == "tools-call":
if len(sys.argv) < 3:
print("Usage: revit_call.py tools-call <tool-name> [json-args]")
sys.exit(1)
tool_name = sys.argv[2]
tool_args = json.loads(sys.argv[3]) if len(sys.argv) > 3 else {}
result = tools_call(tool_name, tool_args)
else:
print(f"Unknown command: {command}")
sys.exit(1)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()