@clawhub-70asunflower-36d042a325
Optimizes Markdown-formatted messages for IM platforms (WeChat, DingTalk, etc.) by removing unsupported markup and using clean text layouts.
---
name: message-friendly
description: Optimizes Markdown-formatted messages for IM platforms (WeChat, DingTalk, etc.) by removing unsupported markup and using clean text layouts.
---
# Role: message-friendly (IM Formatter)
## Profile
- **Author**: 70asunflower
- **Version**: 1.0
- **Language**: English (Prompt) / Match User's Original Language (Output)
- **Description**: An expert in converting complex Markdown generated by Large Language Models into a highly readable, mobile-friendly plain text layout tailored for IM apps. It removes all unsupported markup symbols while preserving the readability and structure of the information.
## Goals
1. **Format Downgrading**: Completely downgrade standard Markdown syntax to a pure plain text format.
2. **Visual Reshaping**: Replace font-size-based hierarchy with emoji/symbol-based hierarchy.
3. **Information Integrity**: Preserve the original meaning and core information without losing structural readability.
4. **Platform Adaptation**: Ensure the output is ready to be sent in IMs (like WeChat), specifically handling links, images, task lists, and long texts properly.
## Skills
1. **Granular Symbol Mapping**: Translating specific Markdown symbols (`#`, `**`, `- [ ]`, `[link]`) into clean, IM-friendly text equivalents perfectly.
2. **Whitespace Formatting**: Managing blank lines and indentation to simulate layout without actual UI elements.
3. **2D Data Serialization**: Restructuring multi-column tables into concise list views.
## Rules (Strictly Enforced)
1. 🛑 **NO NATIVE MARKDOWN**: NEVER output raw Markdown like `#`, `**`, `*`, `~~`, `>`, ` ``` `, `|---|`, `![image]()`, `[link]()`.
2. 💨 **SMART WHITESPACE**: Use blank lines to separate logical blocks. Collapse consecutive empty lines into a single blank line to avoid spamming the screen.
3. 📉 **NO TABLES**: Rewrite tables as compact key-value lists (e.g., `- A: B`).
4. ✂️ **CONCISENESS**: Keep the output concise. If the text is extremely long, intelligently summarize or split it logically with clear indicators.
## Workflow
1. **Analyze Content**: Identify all Markdown elements that require conversion.
2. **Apply Specific Mapping Rules**:
- **[Headings]** (`#`, `##`): Convert to bracketed emphasis, optionally with a newline. (e.g., `# Weekly Report` ➔ `【Weekly Report】`).
- **[Emphasis]** (`**`, `*`, `~~`): Convert to brackets or parentheses. (e.g., `**Important**` ➔ `【Important】`, `~~obsolete~~` ➔ `(obsolete)`).
- **[Lists]**: Keep `-` or `•` for unordered lists. Keep `1.` for ordered lists. Use spaces to indent sub-items effectively.
- **[Task Lists]**: Convert `- [ ]` to `☐`, and `- [x]` to `✅`.
- **[Blockquotes]** (`>`): Replace with `💬` or double quotes. (e.g., `> Quote` ➔ `💬 Quote`).
- **[Code Blocks]**: Replace triple backticks with explicit text markers. (e.g., `[code js]` ... `[/code]`). Preserve original code indentation.
- **[Inline Code]**: Wrap in corner brackets or parentheses. (e.g., `` `variable` `` ➔ `「variable」`).
- **[Tables]**: Convert horizontally wide tables into row-by-row descriptions. (e.g., `| A | B |` ➔ `- A: B`).
- **[Dividers]** (`---`): Replace with a text-based line: `──────────`.
- **[Links]** (`[text](url)`): Extract the URL and place it clearly. (e.g., `[OpenClaw](https://openclaw.ai)` ➔ `OpenClaw: <https://openclaw.ai>`).
- **[Images]** (``): Replace with an image placeholder. (e.g., `` ➔ `[Image: logo]`).
3. **Additional Enhancements**:
- **Intelligent Emojis**: Add appropriate emojis (`📌`, `✅`, `⚠️`, `📊`) to improve aesthetic readability.
- **Time Formatting**: Format dates/times cleanly (e.g., `MM-DD HH:mm`) where appropriate.
4. **Final Check**: Ensure the result is clean, readable plain text that matches the user's original language.
## Initialization
As the IM Friendly Formatter, I am ready to optimize your message for IM platforms. Please provide the Markdown text!
FILE:_meta.json
{
"ownerId": "kn70pywhg0fyz996kpa8xj89s57yhv26",
"slug": "message-friendly",
"version": "1.0.1",
"publishedAt": 1773975528000,
"name": "message-friendly",
"description": "Optimizes Markdown-formatted messages for IM platforms",
"trigger": "auto",
"priority": 10,
"filter": {
"channel": ["wechat", "dingtalk", "im"],
"response_contains": ["# ", "## ", "### ", "**", "*", "```", "|", "- [ ]", "> "]
},
"action": "optimize_markdown_for_im"
}
Sync IM messages to Notion via Notion API. Supports 7 content types, 4 formats, 2 metadata types. Append-only to a single Notion page.
---
name: notion-im-helper
description: Sync IM messages to Notion via Notion API. Supports 7 content types, 4 formats, 2 metadata types. Append-only to a single Notion page.
---
# Notion IM Helper
通过消息自动同步内容到 Notion。支持日记、笔记、待办、想法、问题、链接、摘抄 7 种类型。
## Environment Variables
- `NOTION_API_KEY` - Notion Integration Token
- `NOTION_PARENT_PAGE_ID` - Target Notion Page ID (32 chars)
- `NOTION_QUOTES_PAGE_ID` (optional) - Separate page for quotes
## Setup
1. `pip install notion-client`
2. Set env vars: `NOTION_API_KEY` and `NOTION_PARENT_PAGE_ID`
3. Authorize integration on Notion page (··· > Connect to)
## Usage
When the user sends a message matching a trigger pattern, execute the corresponding script:
```bash
python scripts/record.py record --type {type} "{content}"
python scripts/record.py heading --level {1|2|3} "{text}"
python scripts/record.py divider
python scripts/record.py list --kind {bullet|number} "{items}"
python scripts/record.py toggle "{json}"
python scripts/record.py image [--caption "text"] "{file_path_or_url}"
python scripts/record.py undo
python scripts/check_config.py
python scripts/summary.py {monthly|quote}
```
## Trigger Rules
**Content types** (prefix → type):
- `日记:` / `今天:` / `riji:` / `d` → diary
- `笔记:` / `学习:` / `note:` / `n` → note
- `待办:` / `todo:` / `t` → todo
- `done:` / `完成:` / `√ ` → done
- `想法:` / `灵感:` / `idea:` / `flash:` / `闪念:` / `i` → idea
- `问题:` / `疑问:` / `q:` → question
- `摘抄:` / `quote:` / `qu:` / `z` → quote
- `链接:` / `link:` / `url:` / `l` → link
- `图片:` / `photo:` / `img:` / `p` → image
**Formats:**
- `* text` → H1 heading
- `** text` → H2 heading
- `*** text` → H3 heading
- `> text` → quote block
- `---` → divider
- `- text` → bulleted list
- `1. text` / `2. text` etc → numbered list
- `toggle: title` + subsequent `-` / `--` / `---` lines → toggle block
**Commands:**
- `月报` / `monthly` → extract current month records for summary
- `摘抄` / `随机摘抄` → random historical entry
- `搜: xxx` / `search: xxx` → search records by keyword
- `撤回` / `undo` → delete last batch of blocks (within 5 min window)
- `配置检查` / `check config` → verify config
**Smart detection** (no prefix, AI infers):
- Pure URL → link
- Starts with YYYY-MM-DD → diary
- Contains `[ ]` or `【 】` → todo
- Default → idea
## Metadata
Scan the LAST line for metadata:
- `#关键词` → tag
- `/p:项目名` → project
- Remove metadata from content before passing to script
## Batch & Undo
- Multi-line messages: each format line (heading/quote/divider/list) becomes a separate block, sent in a single API call
- Undo within 5 minutes: deletes all blocks from the last batch
- Undo after 5 minutes: deletes only the last single block
- Day separator: a divider is auto-inserted when the last record is from a different day
## Output Protocol
Scripts emit standardized output prefixes:
- `OK|message` → success, relay success message to user
- `ERROR|CONFIG` → guide user to set up Notion integration
- `ERROR|AUTH` → invalid API key or page not authorized
- `ERROR|RATE_LIMIT` → tell user to wait
- `ERROR|NETWORK` → tell user to retry later
Always run `check_config.py` first on first use. Never modify or delete existing Notion blocks.
## Image Upload
- Supports **local file paths** (e.g., `C:\Users\photos\img.jpg`) and **HTTP URLs** (e.g., `https://example.com/photo.png`)
- Local files are uploaded to Notion servers via the File Upload API, then attached as image blocks
- URL images are referenced directly as external image blocks
- Optional `--caption` flag to add caption text to the image
- Max file size: 5MB (Notion API limit)
- Supported formats: jpg, jpeg, png, gif, webp, bmp, svg
FILE:.gitignore
__pycache__/
*.pyc
.pending_batch.json
FILE:CLAUDE.md
# Notion IM Helper — Claude Agent Definition
When the user sends a message, check the message against the trigger rules below. If it matches, execute the corresponding script and return the result.
## Environment Variables
```env
NOTION_API_KEY
NOTION_PARENT_PAGE_ID
```
## Content Type Triggers
Check the user message against these patterns:
### Prefix Patterns (check first)
- `日记:` or `今天:` or starts with `riji:` → `diary`
- `笔记:` or `学习:` or starts with `note:` → `note`
- `待办:` or starts with `todo:` → `todo`
- starts with `done:` or `完成:` or starts with `√ ` → `done`
- `想法:` or `灵感:` or starts with `idea:` → `idea`
- `问题:` or `疑问:` or starts with `q:` → `question`
- `摘抄:` or starts with `quote:` or starts with `qu:` → `quote`
- starts with `链接:` or `link:` or `url:` → `link`
- `图片:` or `photo:` or `img:` → `image`
### Shortcut Keys (single letter prefix followed by space)
- `d ` at start → `diary`
- `n ` at start → `note`
- `t ` at start → `todo`
- `√ ` at start → `done`
- `i ` at start → `idea`
- `q ` at start → `question`
- `z ` at start → `quote`
- `l ` at start → `link`
- `p ` at start → `image`
### Command Patterns (match entire line)
- `月报` / `monthly` → extract current month records for agent to summarize
- `摘抄` / `随机摘抄` → random quote
- `搜: xxx` / `search: xxx` → search (pass xxx as argument to scripts/search_notes.py)
- `撤回` / `undo` → delete last block batch (within 5 min window)
- `配置检查` / `check config` → verify config
### Format Patterns
- Line starts with `* text` → heading H1
- Line starts with `** text` → heading H2
- Line starts with `*** text` → heading H3
- Line starts with `> text` → quote block
- Line is exactly `---` → divider
- Line starts with `- text` → bulleted list item
- Line starts with `1. text` / `2. text` etc → numbered list item
- Line starts with `toggle: title` → toggle block (parse subsequent `-` / `--` / `---` lines as children)
### Smart Detection (no prefix matched → AI infers)
- If line is a pure URL (starts with http:// or https://) → link
- If line is a local file path pointing to an image file (e.g., `C:\Users\...\photo.jpg`) → image
- If line starts with YYYY-MM-DD or `今天` → diary
- If line contains `[ ]` or `【 】` → todo
- Otherwise → idea
## Multi-Line Processing
If the user sends a multi-line message:
1. Parse each line independently
2. First check for format patterns (heading, quote, divider, list, toggle)
3. Then check for content type prefixes
4. Group consecutive lines of the same type or format
5. Execute all resulting blocks in a single API call
## Metadata Extraction
After parsing type/format, scan the LAST line for metadata:
- `#关键词` → tag
- `/p:项目名` → project
- Remove metadata from content before passing to script
## Execution
For each recognized block:
1. First run `check_config.py` to verify Notion connection
2. Build the appropriate script command
3. Execute and capture output
4. If output starts with `OK|`, display the success message
5. If output starts with `ERROR|`, display appropriate error message
## Output Protocol
Scripts emit standardized prefixes. Never modify the raw output — relay the message part after `|`:
- `OK|已记录到 Notion` → "已记录到 Notion ✅"
- `ERROR|CONFIG` → show configuration guide
- `ERROR|AUTH` → "API Key 或页面权限有问题,检查一下"
- `ERROR|RATE_LIMIT` → "记录太快了,稍等再发~"
- `ERROR|NETWORK` → "网络不太通畅,稍后再试~"
## Safety Rules
- Always verify config before writing
- NEVER modify or delete existing blocks except for `undo` command
- NEVER expose API keys or error stack traces
- Always return friendly messages
- For batch operations (multiple lines), execute a single append call
FILE:config.yaml
---
name: notion-im-helper
description: "通过 IM 向 Notion 追加记录。支持日记、笔记、待办、想法、问题、链接、摘抄 7 种内容类型,标题/引用/分割线/列表 4 种格式,标签/项目 2 种元数据。当消息以指定前缀开头或包含记录意图时触发。只追加不删除,安全可靠。"
metadata:
openclaw:
emoji: 📝
requires:
bins:
- python3
env:
- NOTION_API_KEY
- NOTION_PARENT_PAGE_ID
optional:
- NOTION_QUOTES_PAGE_ID
install:
- id: python-deps
kind: note
label: "pip install notion-client"
primaryEnv: NOTION_API_KEY
trigger:
keywords:
- 日记
- 笔记
- 待办
- todo
- 想法
- 灵感
- idea
- 问题
- 疑问
- 摘抄
- quote
- 链接
- link
- 图片
- photo
- img
- flash
- 闪念
- 月报
- monthly
- 撤回
- undo
- 搜索
- search
- notion
- riji
prefixPatterns:
- "^diary:"
- "^日记:"
- "^今天:"
- "^笔记:"
- "^学习:"
- "^note:"
- "^待办:"
- "^todo:"
- "^想法:"
- "^灵感:"
- "^idea:"
- "^问题:"
- "^疑问:"
- "^question:"
- "^摘抄:"
- "^quote:"
- "^链接:"
- "^link:"
- "^图片:"
- "^photo:"
- "^img:"
- "^flash:"
- "^闪念:"
- "^done:"
- "^完成:"
- "^月报"
- "^monthly"
- "^撤回"
- "^undo"
- "^搜:"
- "^search:"
- "^今日"
shortcutKeys:
d: diary
n: note
t: todo
i: idea
q: question
z: quote
l: link
p: image
---
FILE:_meta.json
{
"ownerId": "kn70pywhg0fyz996kpa8xj89s57yhv26",
"slug": "notion-im-helper",
"version": "1.7.0",
"publishedAt": 1743638400000,
"name": "notion-im-helper",
"description": "通过 IM 向 Notion 同步内容,支持日记/笔记/待办/想法/问题/链接/摘抄/图片,批量撤回,月报整理"
}
FILE:scripts/check_config.py
"""Check Notion configuration on first use."""
import os
import sys
sys.stdout.reconfigure(encoding='utf-8')
sys.path.insert(0, os.path.dirname(__file__))
from notion_client import check_config
def show_guide():
return """📝 首次使用 Notion 助手,需要 2 分钟做个配置~
1️⃣ 创建 Notion Integration
→ 打开 https://www.notion.so/my-integrations
→ 点击 "Create new integration"
→ 填写名称(如 "notion-im-helper"),选择你的工作空间
→ 提交后复制 Internal Integration Token(以 ntn_ 或 secret_ 开头)
2️⃣ 获取页面 ID
→ 打开你要写入的 Notion 页面
→ 从 URL 中复制最后的 32 位字符
3️⃣ 配置环境变量:`NOTION_API_KEY` 和 `NOTION_PARENT_PAGE_ID`
4️⃣ 授权 Integration 访问页面
→ 打开你的 Notion 页面 → 点右上角 ··· → Connect to → 选择你的 Integration
配置好了发条消息试试:d 测试一下 ✨"""
if __name__ == "__main__":
result = check_config()
if not result["ok"]:
code = result["code"]
if code == "CONFIG":
print("ERROR|CONFIG")
elif code == "AUTH":
print("ERROR|AUTH")
else:
print("ERROR|CONFIG")
else:
print("OK|配置检查通过")
FILE:scripts/notion_client.py
"""Notion API wrapper - shared client for all record operations."""
import os
import sys
import json
import urllib.request
import urllib.error
from datetime import datetime, timezone
sys.stdout.reconfigure(encoding='utf-8')
# Pending batch storage
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
BATCH_FILE = os.path.join(SCRIPT_DIR, ".pending_batch.json")
BATCH_TTL_SECONDS = 300 # 5 minutes
API_KEY = os.environ.get("NOTION_API_KEY", "")
PAGE_ID = os.environ.get("NOTION_PARENT_PAGE_ID", "")
BASE_URL = "https://api.notion.com/v1"
HEADERS_TEMPLATE = {
"Authorization": "",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28",
}
def get_headers():
headers = HEADERS_TEMPLATE.copy()
headers["Authorization"] = f"Bearer {API_KEY}"
return headers
def api_request(method, path, body=None):
"""Make a single API request with retry on rate limit."""
headers = get_headers()
url = f"{BASE_URL}/{path}"
for attempt in range(3):
try:
data = json.dumps(body).encode() if body else None
req = urllib.request.Request(url, data=data, headers=headers, method=method)
resp = urllib.request.urlopen(req)
return json.loads(resp.read())
except urllib.error.HTTPError as e:
if e.code == 429:
import time
time.sleep(1.5 * (attempt + 1))
continue
error_body = e.read().decode()
try:
err_data = json.loads(error_body)
message = err_data.get("message", str(e))
except Exception:
message = str(e)
return {"error": True, "code": e.code, "message": message}
except Exception as e:
import time
if attempt < 2:
time.sleep(1)
continue
return {"error": True, "message": str(e)}
return {"error": True, "message": "Rate limited after retries"}
def append_blocks(children, silent=False):
"""Append a list of blocks to the page."""
if not children:
print("OK|没有内容可追加")
return
result = api_request("PATCH", f"blocks/{PAGE_ID}/children", {"children": children})
if result.get("error"):
_emit_error(result)
return
results_list = result.get("results", children)
block_count = len(results_list)
# Save block IDs for batch undo
block_ids = [b["id"] for b in results_list]
_save_pending_batch(block_ids)
if not silent:
print(f"OK|已记录到 Notion,共 {block_count} 个 blocks")
def get_children(page_id=None, start_cursor=None, page_size=100, silent=False):
"""Read page children blocks."""
pid = page_id or PAGE_ID
params = f"page_size={page_size}"
if start_cursor:
params += f"&start_cursor={start_cursor}"
result = api_request("GET", f"blocks/{pid}/children?{params}")
if result.get("error"):
if not silent:
_emit_error(result)
return None
return result
def delete_last_block():
"""Delete the last block(s) on the page.
If there is a pending batch (within BATCH_TTL_SECONDS), delete all blocks
in that batch. Otherwise paginate to the actual last block and delete it.
"""
# Check for pending batch first
pending = _load_pending_batch()
if pending:
block_ids = pending["block_ids"]
deleted = 0
for bid in reversed(block_ids): # Delete in reverse order (last first)
result = api_request("DELETE", f"blocks/{bid}")
if result.get("error"):
_emit_error(result)
# Continue deleting remaining blocks even if one fails
else:
deleted += 1
_clear_pending_batch()
print(f"OK| 已撤回最后一批记录,共 {deleted} 条")
return
# No pending batch, delete single last block
cursor = None
last_block = None
while True:
params = f"page_size=100"
if cursor:
params += f"&start_cursor={cursor}"
data = api_request("GET", f"blocks/{PAGE_ID}/children?{params}")
if data.get("error") or "results" not in data or not data["results"]:
break
last_block = data["results"][-1]
if data.get("has_more") and data.get("next_cursor"):
cursor = data["next_cursor"]
else:
break
if not last_block:
print("OK| 没有可撤回的记录")
return
block_id = last_block["id"]
result = api_request("DELETE", f"blocks/{block_id}")
if result.get("error"):
_emit_error(result)
return
print("OK| 已撤回最后一条记录")
def _save_pending_batch(block_ids):
"""Save pending batch block IDs to file."""
batch = {
"block_ids": block_ids,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
try:
with open(BATCH_FILE, "w", encoding="utf-8") as f:
json.dump(batch, f, ensure_ascii=False, indent=2)
except Exception:
pass # Non-critical, don't fail the main operation
def _load_pending_batch():
"""Load pending batch if it exists and is not expired."""
if not os.path.exists(BATCH_FILE):
return None
try:
with open(BATCH_FILE, "r", encoding="utf-8") as f:
batch = json.load(f)
ts = datetime.fromisoformat(batch["timestamp"])
now = datetime.now(timezone.utc)
age = (now - ts).total_seconds()
if age > BATCH_TTL_SECONDS:
_clear_pending_batch()
return None
return batch
except Exception:
return None
def _clear_pending_batch():
"""Clear the pending batch file."""
try:
if os.path.exists(BATCH_FILE):
os.remove(BATCH_FILE)
except Exception:
pass
def check_config():
"""Verify API key and page access."""
if not API_KEY:
return {"ok": False, "code": "CONFIG", "message": "NOTION_API_KEY 未配置"}
if not PAGE_ID:
return {"ok": False, "code": "CONFIG", "message": "NOTION_PARENT_PAGE_ID 未配置"}
result = api_request("GET", f"blocks/{PAGE_ID}/children?page_size=1")
if result.get("error"):
code = result.get("code", 0)
msg = result.get("message", "")
if code == 401 or "Unauthorized" in msg:
return {"ok": False, "code": "AUTH", "message": "API Key 无效或页面未授权"}
if code == 404 or "Not Found" in msg:
return {"ok": False, "code": "AUTH", "message": "页面不存在或 Integration 未授权"}
return {"ok": False, "code": "UNKNOWN", "message": msg}
return {"ok": True, "message": ""}
def upload_file(file_path):
"""Upload a local file to Notion via File Upload API.
Uses the notion-upload library to upload a file, then returns the file_id
which can be used with image/file blocks using type "file_upload".
Includes a small delay after upload to avoid rate limits when uploading
multiple images in quick succession.
Args:
file_path: Absolute path to the local file.
Returns:
The Notion file_upload ID string on success, None on failure.
"""
try:
from notion_upload import notion_upload as nu
except ImportError:
print("ERROR| notion-upload 库未安装,请运行: pip install notion-upload")
return None
if not API_KEY:
print("ERROR|AUTH")
return None
file_name = os.path.basename(file_path)
try:
uploader = nu(file_path, file_name, API_KEY, enforce_max_size=True)
file_id = uploader.upload()
if file_id:
# Small delay to avoid rate limit on rapid successive uploads
import time
time.sleep(0.5)
return file_id
else:
print("ERROR| 图片上传失败: 未返回 file_id")
return None
except FileNotFoundError:
print(f"ERROR| 文件不存在: {file_path}")
return None
except Exception as e:
print(f"ERROR| 图片上传失败: {e}")
return None
def _emit_error(result):
"""Emit a friendly error message based on the error code."""
msg = result.get("message", "")
code = result.get("code", 0)
if code == 401 or "Unauthorized" in msg:
print("ERROR|AUTH")
elif code == 404 or "Could not find" in msg:
print("ERROR|AUTH")
elif code == 429:
print("ERROR|RATE_LIMIT")
else:
print("ERROR|NETWORK")
if __name__ == "__main__":
result = check_config()
if not result["ok"]:
print(f"ERROR|{result['code']}")
else:
print("OK|配置检查通过")
FILE:scripts/record.py
"""Unified record entry - dispatch by type to create Notion blocks."""
import os
import re
import sys
import json
import argparse
from datetime import datetime, timezone, timedelta
sys.stdout.reconfigure(encoding='utf-8')
sys.path.insert(0, os.path.dirname(__file__))
from notion_client import api_request, append_blocks, PAGE_ID, get_children, delete_last_block, upload_file
# ---- Rich text helpers ----
RICH_TEXT_CHUNK_SIZE = 1900 # Notion API limit: 2000 chars per rich_text object
def split_rich_text(text, chunk_size=RICH_TEXT_CHUNK_SIZE):
"""Split text into multiple rich_text objects to avoid Notion's 2000-char limit.
Notion API allows up to 100 rich_text objects per block,
each with a max of 2000 chars in text.content.
We use chunk_size=1900 to leave a safety margin.
"""
if len(text) <= chunk_size:
return [{"type": "text", "text": {"content": text}}]
chunks = []
for i in range(0, len(text), chunk_size):
chunks.append({"type": "text", "text": {"content": text[i:i + chunk_size]}})
return chunks
# ---- Block builders ----
def build_paragraph(text):
"""Build a paragraph block with rich_text."""
return {
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": split_rich_text(text),
"color": "default",
},
}
def build_callout(emoji, text, color="default", children=None):
"""Build a callout block with optional children for multi-paragraph content.
Notion API ignores \\n in rich_text content. To display multi-line text
properly inside a callout, we put the first line/paragraph in the callout's
own rich_text, and subsequent paragraphs as children paragraph blocks.
"""
block = {
"object": "block",
"type": "callout",
"callout": {
"icon": {"type": "emoji", "emoji": emoji},
"rich_text": split_rich_text(text),
"color": color,
},
}
if children:
block["callout"]["children"] = children
return block
def build_todo(text, checked=False):
return {
"object": "block",
"type": "to_do",
"to_do": {
"rich_text": split_rich_text(text),
"checked": checked,
"color": "default",
},
}
def build_bookmark(url):
return {
"object": "block",
"type": "bookmark",
"bookmark": {"url": url, "rich_text": split_rich_text(url)},
}
def build_heading(level, text):
return {
"object": "block",
"type": f"heading_{level}",
f"heading_{level}": {
"rich_text": split_rich_text(text),
"color": "default",
},
}
def build_quote_block(text):
return {
"object": "block",
"type": "quote",
"quote": {"rich_text": split_rich_text(text), "color": "default"},
}
def build_divider():
return {"object": "block", "type": "divider", "divider": {}}
def build_bullet(text):
return {
"object": "block",
"type": "bulleted_list_item",
"bulleted_list_item": {"rich_text": split_rich_text(text), "color": "default"},
}
def build_numbered(text):
return {
"object": "block",
"type": "numbered_list_item",
"numbered_list_item": {"rich_text": split_rich_text(text), "color": "default"},
}
def build_toggle(text, children=None):
block = {
"object": "block",
"type": "toggle",
"toggle": {
"rich_text": split_rich_text(text),
"color": "default",
},
}
if children:
block["toggle"]["children"] = children
return block
def build_image_block(file_upload_id, caption=None):
"""Build an image block using a file_upload ID from Notion's File Upload API.
After uploading a file via notion-upload, the returned file_id is used
with type "file_upload" to attach the image to a block.
"""
image_data = {
"type": "file_upload",
"file_upload": {"id": file_upload_id},
}
if caption:
image_data["caption"] = split_rich_text(caption)
return {
"object": "block",
"type": "image",
"image": image_data,
}
def build_image_block_external(url, caption=None):
"""Build an image block referencing an external URL."""
image_data = {
"type": "external",
"external": {"url": url},
}
if caption:
image_data["caption"] = split_rich_text(caption)
return {
"object": "block",
"type": "image",
"image": image_data,
}
# ---- Type configs ----
TYPE_CONFIG = {
"idea": {"emoji": "💡", "color": "default", "label": "想法"},
"diary": {"emoji": "📒", "color": "blue", "label": "日记"},
"todo": {"emoji": "☐", "color": "default", "label": "待办"},
"done": {"emoji": "✔️", "color": "default", "label": "已完成"},
"note": {"emoji": "📝", "color": "yellow", "label": "笔记"},
"question": {"emoji": "❓", "color": "purple", "label": "问题"},
"quote": {"emoji": "📖", "color": "green", "label": "摘抄"},
"link": {"emoji": "🔗", "color": "default", "label": "链接"},
"image": {"emoji": "🖼️", "color": "default", "label": "图片"},
}
def parse_metadata(text):
"""Extract tags (#xxx) and project (/p:xxx) from end of text."""
tags = []
project = None
# Scan from the end of text, line by line
lines = text.strip().split("\n")
meta_line_indices = []
remaining_lines = []
for i, line in enumerate(lines):
tokens = line.split()
is_meta_line = False
for tok in tokens:
if tok.startswith("#") or tok.startswith("/p:"):
is_meta_line = True
break
if is_meta_line:
meta_line_indices.append(i)
else:
remaining_lines.append(line)
# Extract tags and project from meta lines
meta_text = " ".join(lines[i] for i in meta_line_indices)
for tok in meta_text.split():
if tok.startswith("#"):
tags.append(tok[1:])
elif tok.startswith("/p:"):
project = tok[3:]
clean_text = "\n".join(remaining_lines).strip()
return clean_text, tags, project
def extract_date_from_block(block):
"""Try to extract YYYY-MM-DD from block text content."""
block_type = block.get("type", "")
content = block.get(block_type, {})
rich = content.get("rich_text", [])
text = ""
for item in rich:
text += item.get("text", {}).get("content", "")
match = re.search(r"(\d{4}-\d{2}-\d{2})", text)
return match.group(1) if match else None
def check_need_day_separator():
"""Check if the last block on the page is from a different day."""
data = get_children(page_size=5, silent=True)
if not data or "results" not in data:
return False
today = datetime.now().strftime("%Y-%m-%d")
for block in reversed(data["results"]):
block_date = extract_date_from_block(block)
if block_date:
return block_date != today
return False
def is_local_file_path(s):
"""Check if string looks like a local file path."""
# Windows: C:\... or D:\... etc., or forward slash paths, or relative paths with extensions
if re.match(r'^[A-Za-z]:[/\\]', s):
return True
if s.startswith('./') or s.startswith('../') or s.startswith('~'):
return True
# Check if it's a path with a common image extension
if os.path.isfile(s):
return True
return False
def is_image_url(s):
"""Check if string is an HTTP URL pointing to an image."""
if not s.startswith("http"):
return False
image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg')
return s.lower().split('?')[0].endswith(image_extensions)
def build_blocks_for_type(record_type, content):
"""Build Notion blocks for a given type and content."""
cfg = TYPE_CONFIG.get(record_type, TYPE_CONFIG["idea"])
if record_type == "todo":
items = []
for sep in [", ", ",", ",", "、"]:
if sep in content:
items = [x.strip() for x in content.split(sep) if x.strip()]
break
if not items:
items = [content]
return [build_todo(item, checked=False) for item in items]
if record_type == "done":
items = []
for sep in [", ", ",", ",", "、"]:
if sep in content:
items = [x.strip() for x in content.split(sep) if x.strip()]
break
if not items:
items = [content]
return [build_todo(item, checked=True) for item in items]
if record_type == "link":
url = content.strip()
if not url.startswith("http"):
url = f"https://{url}"
return [build_bookmark(url)]
if record_type == "image":
path = content.strip()
if is_image_url(path):
return [build_image_block_external(path)]
if is_local_file_path(path):
# Expand ~ to home directory
path = os.path.expanduser(path)
if not os.path.isfile(path):
print(f"ERROR| 文件不存在: {path}")
return []
file_id = upload_file(path)
if not file_id:
print("ERROR| 图片上传失败")
return []
return [build_image_block(file_id)]
# Fallback: treat as external URL
if not path.startswith("http"):
path = f"https://{path}"
return [build_image_block_external(path)]
if record_type in ("idea", "diary", "note", "question", "quote"):
clean_text, tags, project = parse_metadata(content)
# Header line: YYYY-MM-DD HH:mm
now_str = datetime.now().strftime("%Y-%m-%d %H:%M")
# Build metadata line
meta_parts_list = []
if tags:
meta_parts_list.append(f"#标签:{' '.join('#' + t for t in tags)}")
if project:
meta_parts_list.append(f"/项目:{project}")
meta_line = " | ".join(meta_parts_list) if meta_parts_list else ""
# Split content into paragraphs by \n
# Notion API ignores \n in rich_text, so each paragraph must be a separate block
lines = clean_text.split("\n")
paragraphs = [line for line in lines if line.strip()] # skip blank lines
if len(paragraphs) <= 1:
# Single paragraph: put everything in callout rich_text (compact view)
callout_text = now_str
if paragraphs:
callout_text += " " + paragraphs[0]
if meta_line:
callout_text += " | " + meta_line
return [build_callout(cfg["emoji"], callout_text, cfg["color"])]
else:
# Multiple paragraphs: timestamp in callout, paragraphs as children
children = [build_paragraph(p) for p in paragraphs]
if meta_line:
children.append(build_paragraph(meta_line))
return [build_callout(cfg["emoji"], now_str, cfg["color"], children=children)]
return []
# ---- Main dispatch ----
def parse_format_line(line):
"""Check if a line is a format pattern, return block or None."""
if line.startswith("* ") and not line.startswith("** ") and not line.startswith("*** "):
return build_heading(1, line[2:])
if line.startswith("** ") and not line.startswith("*** "):
return build_heading(2, line[3:])
if line.startswith("*** "):
return build_heading(3, line[4:])
if line.startswith("> "):
return build_quote_block(line[2:])
if line.strip() == "---":
return build_divider()
if line.startswith("- "):
return build_bullet(line[2:])
# Numbered list: "1. text" or "1) text"
stripped = line.lstrip()
if stripped and stripped[0].isdigit():
m = re.match(r"^(\d+[.)])\s+(.*)", stripped)
if m:
return build_numbered(m.group(2))
return None
def cmd_record(args):
cfg = TYPE_CONFIG.get(args.type, TYPE_CONFIG["idea"])
full_content = " ".join(args.content)
blocks = []
# Check if we need a day separator
if check_need_day_separator():
blocks.append(build_divider())
# Multi-line: check each line for format patterns first
lines = full_content.split("\n")
content_lines = []
for line in lines:
fmt_block = parse_format_line(line)
if fmt_block is not None:
# Flush any accumulated content lines as a callout first
if content_lines:
content_text = "\n".join(content_lines)
blocks.extend(build_blocks_for_type(args.type, content_text))
content_lines = []
blocks.append(fmt_block)
else:
content_lines.append(line)
# Remaining content lines
if content_lines:
blocks.extend(build_blocks_for_type(args.type, "\n".join(content_lines)))
if not blocks:
print("OK|没有内容可记录")
return
append_blocks(blocks, silent=True)
type_label = cfg["label"]
if args.type in ("todo", "done"):
count = len([b for b in blocks if b.get("type") == "to_do"])
print(f"OK|已记录到 Notion,共 {count} 条{type_label}")
else:
print(f"OK|已记录到 Notion,共 {len(blocks)} 条 ✅")
def cmd_heading(args):
blocks = [build_heading(args.level, " ".join(args.content))]
append_blocks(blocks, silent=True)
print("OK|已记录到 Notion ✅")
def cmd_divider(_args):
append_blocks([build_divider()], silent=True)
print("OK|已记录到 Notion ✅")
def cmd_list(args):
builder = build_bullet if args.kind == "bullet" else build_numbered
blocks = [builder(text) for text in args.content]
append_blocks(blocks, silent=True)
print("OK|已记录到 Notion ✅")
def cmd_toggle(args):
# JSON input from stdin or args
if args.content:
data = json.loads(" ".join(args.content))
else:
try:
data = json.loads(sys.stdin.read())
except Exception:
print("ERROR| 无效的 toggle 数据")
return
blocks = [build_toggle(data["title"], data.get("children"))]
append_blocks(blocks, silent=True)
print("OK|已记录到 Notion ✅")
def cmd_image(args):
"""Upload an image and append to Notion page."""
path = " ".join(args.path)
blocks = []
if check_need_day_separator():
blocks.append(build_divider())
if is_image_url(path):
blocks.append(build_image_block_external(path, caption=args.caption))
elif is_local_file_path(path):
path = os.path.expanduser(path)
if not os.path.isfile(path):
print(f"ERROR| 文件不存在: {path}")
return
file_id = upload_file(path)
if not file_id:
print("ERROR| 图片上传失败")
return
blocks.append(build_image_block(file_id, caption=args.caption))
else:
# Try as URL
if not path.startswith("http"):
path = f"https://{path}"
blocks.append(build_image_block_external(path, caption=args.caption))
if blocks:
append_blocks(blocks, silent=True)
print("OK|已记录图片到 Notion ✅")
def cmd_undo(_args):
delete_last_block()
def main():
parser = argparse.ArgumentParser(description="Unified Notion record entry")
sub = parser.add_subparsers(dest="command")
# record command
p = sub.add_parser("record")
p.add_argument("--type", required=True)
p.add_argument("content", nargs="+")
p.set_defaults(func=cmd_record)
# heading command
p = sub.add_parser("heading")
p.add_argument("--level", type=int, default=2)
p.add_argument("content", nargs="+")
p.set_defaults(func=cmd_heading)
# divider command
p = sub.add_parser("divider")
p.set_defaults(func=cmd_divider)
# list command
p = sub.add_parser("list")
p.add_argument("--kind", choices=["bullet", "number"], default="bullet")
p.add_argument("content", nargs="+")
p.set_defaults(func=cmd_list)
# toggle command
p = sub.add_parser("toggle")
p.add_argument("content", nargs="*")
p.set_defaults(func=cmd_toggle)
# image command
p = sub.add_parser("image")
p.add_argument("--caption", default=None, help="Optional caption for the image")
p.add_argument("path", nargs="+", help="Local file path or URL of the image")
p.set_defaults(func=cmd_image)
# undo command
p = sub.add_parser("undo")
p.set_defaults(func=cmd_undo)
args = parser.parse_args()
if not args.command:
parser.print_help()
return
try:
args.func(args)
except Exception as e:
print(f"ERROR| 操作失败: {e}")
if __name__ == "__main__":
main()
FILE:scripts/search_notes.py
"""Search Notion pages/blocks by keyword using the Notion Search API."""
import os
import sys
import json
import time
import urllib.request
import urllib.error
sys.stdout.reconfigure(encoding='utf-8')
API_KEY = os.environ.get("NOTION_API_KEY", "")
BASE_URL = "https://api.notion.com/v1"
def get_headers():
return {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28",
}
def search(query, page_size=10):
"""Search using Notion Search API with retry on rate limit."""
body = {
"query": query,
"page_size": page_size,
"sort": {
"direction": "descending",
"timestamp": "last_edited_time",
},
}
for attempt in range(3):
data = json.dumps(body).encode()
req = urllib.request.Request(
f"{BASE_URL}/search",
data=data,
headers=get_headers(),
method="POST",
)
try:
resp = urllib.request.urlopen(req)
return json.loads(resp.read())
except urllib.error.HTTPError as e:
if e.code == 429:
time.sleep(1.5 * (attempt + 1))
continue
error_body = e.read().decode()
try:
err_data = json.loads(error_body)
message = err_data.get("message", str(e))
except Exception:
message = str(e)
return {"error": True, "code": e.code, "message": message}
except Exception as e:
if attempt < 2:
time.sleep(1)
continue
return {"error": True, "message": str(e)}
return {"error": True, "message": "Rate limited after retries"}
def extract_snippet(result):
"""Extract a readable snippet from a search result."""
obj_type = result.get("object", "")
title = ""
if obj_type == "page":
props = result.get("properties", {})
for key, val in props.items():
if val.get("type") == "title":
rich = val.get("title", [])
if rich:
title = rich[0].get("plain_text", "")
break
# Try to get a hint of the last edited time
last_edited = result.get("last_edited_time", "")
if last_edited and len(last_edited) > 10:
last_edited = last_edited[:10] # YYYY-MM-DD
return title, last_edited
return "", ""
def format_results(results):
"""Format search results into a user-friendly string."""
if not results:
return "🔍 没有找到相关记录~"
lines = ["🔍 搜索结果:"]
for i, result in enumerate(results, 1):
title, date = extract_snippet(result)
if title:
date_str = f" ({date})" if date else ""
lines.append(f"{i}. {title}{date_str}")
else:
# Fallback: show URL if available
url = result.get("url", "")
if url:
lines.append(f"{i}. {url[-40:]}")
return "\n".join(lines)
def main():
if not API_KEY:
print("ERROR|CONFIG")
return
keyword = sys.argv[1] if len(sys.argv) > 1 else ""
if not keyword:
print("ERROR| 请提供搜索关键词,例如:搜: 缓存")
return
result = search(keyword)
if result.get("error"):
code = result.get("code", 0)
if code == 401 or "Unauthorized" in result.get("message", ""):
print("ERROR|AUTH")
elif code == 429:
print("ERROR|RATE_LIMIT")
else:
print("ERROR|NETWORK")
return
results = result.get("results", [])
formatted = format_results(results)
print(f"OK|{formatted}")
if __name__ == "__main__":
main()
FILE:scripts/summary.py
"""Monthly summary with LLM generation and auto-record to Notion."""
import os
import re
import sys
import random
from datetime import datetime
sys.stdout.reconfigure(encoding='utf-8')
sys.path.insert(0, os.path.dirname(__file__))
from notion_client import get_children, PAGE_ID
def extract_text(block):
"""Extract text content from a block."""
block_type = block.get("type", "")
content = block.get(block_type, {})
rich = content.get("rich_text", [])
text = ""
for item in rich:
text += item.get("text", {}).get("content", "")
return text.strip()
def get_all_blocks():
"""Get all blocks from the page, paginating through all pages."""
cursor = None
all_blocks = []
while True:
result = get_children(page_size=100, start_cursor=cursor, silent=True)
if not result or "results" not in result:
break
all_blocks.extend(result["results"])
if result.get("has_more") and result.get("next_cursor"):
cursor = result["next_cursor"]
else:
break
return all_blocks
def get_month_records():
"""Extract all records for the current month."""
blocks = get_all_blocks()
if not blocks:
return None, None
now = datetime.now()
current_month = now.strftime("%Y-%m")
day_records = {}
for block in blocks:
text = extract_text(block)
block_type = block.get("type", "")
# Skip structural blocks
if block_type in ("divider", "heading_1", "heading_2", "heading_3",
"bulleted_list_item", "numbered_list_item"):
continue
if not text:
continue
# Determine type
if "📅" in text:
t = "日记"
elif "💡" in text or "想法" in text:
t = "想法"
elif "📝" in text:
t = "笔记"
elif "📖" in text or "摘抄" in text:
t = "摘抄"
elif "❓" in text:
t = "问题"
elif block_type == "to_do":
checked = block.get("to_do", {}).get("checked", False)
t = "已完成" if checked else "待办"
elif block_type == "bookmark":
t = "链接"
else:
t = "其他"
# Extract date
date_match = re.search(r"(\d{4}-\d{2}-\d{2})", text)
if date_match:
day_str = date_match.group(1)[:10]
else:
continue # Skip records without date
if day_str.startswith(current_month):
if day_str not in day_records:
day_records[day_str] = []
day_records[day_str].append((t, text))
if not day_records:
return current_month, None
return current_month, day_records
def generate_monthly_report():
"""Extract and return current month's records for LLM to summarize.
Returns structured data for the agent to analyze and summarize.
"""
current_month, day_records = get_month_records()
if day_records is None:
return f"ERROR|暂时无法获取记录,请稍后重试。"
if not day_records:
return f"INFO|{current_month} 月暂无记录~"
sorted_days = sorted(day_records.keys(), reverse=True)
total_count = sum(len(v) for v in day_records.values())
lines = [
f"📊 {current_month} 月记录(共 {total_count} 条,{len(day_records)} 天有记录)\n"
]
for day in sorted_days:
records = day_records[day]
lines.append(f"\n## {day}({len(records)}条)")
for t, text in records:
display = text.replace("\n", " ")[:200]
lines.append(f"- [{t}] {display}")
return "INFO|" + "\n".join(lines)
def generate_random_quote(count=1):
"""Randomly select historical entries."""
blocks = get_all_blocks()
if not blocks:
return "📖 还没有摘抄呢,先去记点什么吧~"
candidates = []
for block in blocks:
text = extract_text(block)
if text and ("📖" in text or "📝" in text or "💡" in text or "📅" in text):
clean = text.replace("\n", " ").strip()
if clean:
candidates.append(clean)
if not candidates:
return "📖 没有找到合适的摘抄内容~"
selected = random.sample(candidates, min(count, len(candidates)))
lines = ["📖 随机回忆~"]
for i, text in enumerate(selected, 1):
lines.append(f"{i}. {text[:80]}")
return "\n".join(lines)
def main():
command = sys.argv[1] if len(sys.argv) > 1 else "monthly"
if command in ("quote", "random", "随机摘抄", "摘抄"):
count = 1
if len(sys.argv) > 2:
try:
count = int(sys.argv[2])
except ValueError:
pass
result = generate_random_quote(count)
elif command in ("weekly", "周报", "monthly", "月报"):
result = generate_monthly_report()
else:
result = generate_monthly_report()
print(result)
if __name__ == "__main__":
main()