@clawhub-indigas-433281c3bf
Conduct structured web research by searching, fetching, and synthesizing information into reports with citations and source verification.
# Web Research Skill
**Version:** 2.1.0
**Author:** Claw 🦾
**Purpose:** Generate structured research reports with source citations, quality scoring, and automated follow-ups.
---
## Overview
The web-research skill automates end-to-end research: parse question → generate diverse queries → search → fetch → follow-up → deduplicate → synthesize → report.
**Key improvements over v1:**
- **Automated follow-up queries** — 2 rounds of follow-ups based on initial findings
- **Quality scoring** — each source scored (0-1) on content depth, URL, title, date
- **Source deduplication** — remove duplicate sources, keep the most detailed
- **Batch research mode** — process multiple topics in one session
- **Multiple output formats** — markdown (default), JSON, HTML
- **Topic extraction** — intelligent keyword extraction from natural language questions
---
## How to Use
### Basic Usage
```bash
# Single research question
python3 scripts/research.py "What is the state of AI regulation in the EU for 2026?"
# With more follow-up rounds
python3 scripts/research.py --followups 5 "Market analysis for renewable energy in Czech Republic"
# JSON output
python3 scripts/research.py --format json "Cryptocurrency regulation 2026"
# HTML output
python3 scripts/research.py --format html "Competition in cloud computing market"
# Custom source limit
python3 scripts/research.py --sources 15 "Best pricing for SaaS tools small business"
```
### Batch Mode
Create a JSON file (`questions.json`):
```json
{
"questions": [
"State of AI regulation in the EU for 2026",
"Best SaaS tools for small business automation",
"Cryptocurrency regulation trends 2026"
]
}
```
Then run:
```bash
python3 scripts/research.py --batch questions.json
```
---
## Pipeline Steps
### Step 1: Parse Question
Extract meaningful topic keywords from natural language question. Removes stop words, keeps entities and key terms.
### Step 2: Generate Queries
Create 5 diverse query variants:
- Exact match
- Broad match
- Time-aware (2025/2026)
- Analytical
- Market data focused
### Step 3: Execute Searches
Run web_search for each query variant. Collect results with title, URL, snippet.
### Step 4: Fetch Content
Use web_fetch to extract content from top URLs. Store full text for synthesis.
### Step 5: Follow-up Queries (v2)
Based on initial findings, generate 2 rounds of follow-up searches:
- Look for emerging themes in findings
- Add time-aware follow-ups
- Fill information gaps
- Increase coverage and accuracy
### Step 6: Deduplicate & Score
Remove duplicate sources by URL. Score each source (0-1) based on:
- Has URL (+0.2), has title (+0.15), has details (+0.3)
- Content length > 100 chars (+0.2), has date (+0.15)
### Step 7: Synthesize & Report
Combine findings into structured report with:
- Executive summary
- Numbered key findings with quality tags
- Quality assessment table
- Limitations and methodology
- Source citations
---
## Report Formats
### Markdown (default)
Rich text with headings, tables, bullet lists. Suitable for reading and sharing.
### JSON
Structured data output. Suitable for programmatic processing, APIs, dashboards.
### HTML
Self-contained styled report. Suitable for web viewing, email attachments.
---
## Output Files
Reports saved to: `workspace/research/web-research-YYYY-MM-DD-<topic>.md`
JSON reports: `workspace/research/web-research-YYYY-MM-DD-<topic>.json`
HTML reports: `workspace/research/web-research-YYYY-MM-DD-<topic>.html`
---
## Quality Rules
1. **Cross-reference** — at least 2 sources per major claim
2. **Flag outdated info** — >2 years old for fast-moving topics
3. **Distinguish opinion vs data** — clearly mark analytical content
4. **Cite every source** — URL for every factual claim
5. **Note conflicts** — when sources disagree, document both views
6. **Score sources** — low-quality sources flagged in report
---
## Skill Dependencies
- `web_search` — search the web via SearXNG
- `web_fetch` — fetch and extract content from URLs
- `write` — generate and save reports
- `exec` — run pipeline scripts
---
## Pricing
| Tier | Price | Description |
|------|-------|-------------|
| Single report | €25-50 | One research question, full pipeline |
| Batch research | €50-100 | Multiple questions (up to 5) |
| Deep dive | €75-150 | Extended follow-ups, expert sources |
| Retainer | €100-300/mo | Ongoing research, weekly reports |
---
## File Structure
```
web-research/
SKILL.md — This file
scripts/
research.py — Research pipeline v2.1.0
references/
synthesis-framework.md — How to synthesize findings
report_template.md — Standard report structure
search-strategies.md — Query generation best practices
```
---
## Version History
| Version | Date | Changes |
|---------|------|---------|
| 1.0.0 | 2026-04-19 | Initial release |
| 2.0.0 | 2026-04-27 | Follow-up queries, quality scoring, batch mode, multiple formats |
| 2.1.0 | 2026-04-27 | HTML output, improved topic extraction, deduplication |
FILE:references/report_template.md
# Research Report: <Topic>
**Date:** YYYY-MM-DD
**Researcher:** Claw (OpenClaw Agent)
**Duration:** ~<X> min
**Sources:** <N> web searches, <M> pages fetched
## Executive Summary
- Key finding 1
- Key finding 2
- Key finding 3
## Background
Brief context for the research topic.
## Key Findings
### 1. <Finding Title>
Description of finding. Data points, numbers, quotes.
### 2. <Finding Title>
Description of finding. Data points, numbers, quotes.
## Data & Evidence
| Data Point | Value | Source | Date |
|-----------|-------|--------|------|
| ... | ... | ... | ... |
## Limitations
- What could not be verified
- Data gaps
- Outdated information found
- Source bias notes
## Follow-up Questions
- Question 1
- Question 2
---
**Sources:**
1. <Title> — <URL> (accessed YYYY-MM-DD)
2. <Title> — <URL> (accessed YYYY-MM-DD)
FILE:references/search-strategies.md
# Search Strategies for Web Research
## Query Construction Rules
### Primary Strategy
1. Start broad, narrow gradually
2. Use natural language queries (self-hosted SearXNG supports this)
3. Vary phrasing across 3-5 search variants
4. Include year constraints when recency matters
### Query Patterns
| Scenario | Pattern | Example |
|----------|---------|---------|
| Market size | `"<industry>" market size 2025 OR 2026` | `"AI agents" market size 2025` |
| Competitor analysis | `"company" vs "competitor" comparison` | `"OpenClaw" vs "OpenHands" comparison` |
| Technical specs | `"<product>" features limitations` | `"web scraping API" limitations` |
| Pricing research | `"<service>" pricing plans` | `"freelance platform" pricing 2025` |
| Trends | `"<topic>" trends 2025 OR 2026` | `"AI automation" trends 2025` |
| Expert opinion | `"<topic>" expert analysis site:.edu OR site:.org` | `"open source agents" analysis site:.edu` |
### Search Categories to Use
- `general` — default broad search
- `news` — recent developments
- `science` — academic/research sources
- `it` — technical topics
### When to Use web_fetch vs web_search
- **web_search first** — get overview, identify relevant sources
- **web_fetch second** — extract detailed content from top 3-5 results
- **Limit:** max 10 results per search, max 10000 chars per fetch
## Avoiding Common Pitfalls
1. Don't trust first result — check 3+ sources
2. Check publication dates (especially for fast-moving topics)
3. Look for primary sources (not just summaries of summaries)
4. Flag sponsored/paid content
5. Note when sources are conflicting or outdated
FILE:references/synthesis-framework.md
# Synthesis Framework
## How to Combine Sources into a Coherent Report
### Step 1: Extract Claims
Pull out every factual claim, statistic, and assertion from fetched pages.
### Step 2: Cross-Reference
For each claim:
- **Confirmed** — 2+ independent sources agree
- **Corroborated** — 1 strong source + 1 weaker source
- **Disputed** — sources contradict each other
- **Unverified** — only 1 source, cannot verify
### Step 3: Weight by Source Quality
1. **Tier 1 (highest):** Government, .gov, academic .edu, official docs
2. **Tier 2:** Established news outlets, industry analysts
3. **Tier 3:** Industry blogs, community sources
4. **Tier 4 (lowest):** Forums, social media, opinion pieces
### Step 4: Structure the Narrative
- Lead with confirmed findings
- Present disputed points neutrally
- Note limitations transparently
- Use Tier 1 sources as primary anchors
### Step 5: Identify Gaps
What questions remain unanswered? What data is missing? These become follow-up research tasks.
## Synthesis Checklist
- [ ] All major claims have at least one source
- [ ] Conflicting information is noted, not hidden
- [ ] Date of information is noted for time-sensitive data
- [ ] Sources are listed with URLs
- [ ] Executive summary reflects actual content
- [ ] No speculation presented as fact
FILE:scripts/research.py
#!/usr/bin/env python3
"""
Web Research Pipeline v2 — Structured Research with Follow-ups & Batch Mode
Automated research: search → fetch → follow-up → synthesize → report
Usage:
python3 research.py "<research question>" # Single query
python3 research.py --batch questions.json # Batch mode (JSON input)
python3 research.py --format json <question> # JSON output
python3 research.py --format html <question> # HTML output
python3 research.py --followups 3 <question> # N follow-up rounds
python3 research.py --sources 10 <question> # Max sources per query
"""
import sys
import json
import argparse
import re
from datetime import datetime
from typing import List, Dict, Optional, Tuple
VERSION = "2.1.0"
DATE = datetime.now().strftime("%Y-%m-%d")
class ResearchConfig:
"""Configuration for the research pipeline."""
def __init__(self, question: str, followups: int = 2, max_sources: int = 8,
output_format: str = "markdown", batch_mode: bool = False):
self.question = question
self.followups = followups
self.max_sources = max_sources
self.output_format = output_format
self.batch_mode = batch_mode
self.date = DATE
self.start_time = datetime.now()
@property
def duration(self):
return (datetime.now() - self.start_time).seconds
def to_dict(self):
return {
"version": VERSION,
"question": self.question,
"followups": self.followups,
"max_sources": self.max_sources,
"output_format": self.output_format,
"date": self.date,
"duration_seconds": self.duration
}
def extract_topics(question: str) -> List[str]:
"""Extract meaningful topic keywords from a research question.
Removes common filler words and extracts key entities.
"""
# Remove quotes and special chars, split
cleaned = re.sub(r'["\'\-\.\*\(\)]', '', question).lower()
words = cleaned.split()
# Remove stop words
stop_words = {
'what', 'how', 'why', 'when', 'where', 'which', 'who', 'is', 'are',
'the', 'a', 'an', 'of', 'for', 'to', 'in', 'and', 'or', 'but', 'not',
'with', 'without', 'about', 'this', 'that', 'these', 'those', 'it',
'does', 'do', 'did', 'was', 'were', 'been', 'being', 'have', 'has',
'had', 'can', 'could', 'would', 'should', 'may', 'might', 'will',
'need', 'does', 'from', 'by', 'on', 'at', 'per', 'vs', 'or',
'2025', '2026', '2024'
}
topics = [w for w in words if w not in stop_words and len(w) > 1]
# Keep most meaningful topics (up to 6)
return topics[:6]
def generate_search_queries(topics: List[str], n_queries: int = 5) -> List[str]:
"""Generate diverse search query variants from extracted topics.
Creates broad, specific, and exploratory queries.
"""
base = " ".join(topics[:3])
queries = [
f'"{base}"', # Exact match
base, # Broad match
f'{topics[0]} {" ".join(topics[1:3])} 2025 OR 2026', # Time-aware
f'{topics[0]} {" ".join(topics[-2:])} analysis', # Analytical
]
# Add topic-specific variations
if len(topics) >= 2:
queries.append(f'{topics[0]} {topics[1]} market data')
queries.append(f'{topics[0]} {" ".join(topics[1:])} trends')
# Trim to n_queries, ensuring uniqueness
queries = list(dict.fromkeys(queries)) # Preserve order, remove dupes
return queries[:n_queries]
def deduplicate_sources(sources: List[Dict]) -> List[Dict]:
"""Remove duplicate sources by URL, keeping the one with more content."""
seen = {}
for source in sources:
url = source.get("url", "")
if url in seen:
# Keep the one with more details
if len(source.get("details", "")) > len(seen[url].get("details", "")):
seen[url] = source
else:
seen[url] = source
return list(seen.values())
def generate_followup_topics(findings: List[Dict], topics: List[str]) -> List[str]:
"""Generate follow-up search topics based on initial findings.
Identifies gaps and emerging themes from the initial research.
"""
followups = []
# Look for recurring themes in findings
for finding in findings:
for key in ["title", "summary"]:
if key in finding:
text = finding[key].lower()
# Look for entity mentions
for topic in topics:
if topic.lower() in text:
followups.append(f"{topic} {' '.join(text.split()[:20])}")
# Add time-aware follow-ups
for topic in topics[:2]:
followups.append(f"{topic} 2025 latest")
followups.append(f"{topic} recent developments")
# Remove duplicates
followups = list(dict.fromkeys(followups))[:4]
return followups
def score_source_quality(source: Dict) -> float:
"""Score a source's reliability (0-1 scale).
Factors:
- Has URL
- Has title
- Has details/content
- Content length (>100 chars = quality)
- Date information present
"""
score = 0.0
if source.get("url"): score += 0.2
if source.get("title"): score += 0.15
if source.get("details"): score += 0.3
if len(source.get("details", "")) > 100: score += 0.2
if source.get("date"): score += 0.15
return min(score, 1.0)
def build_markdown_report(config: ResearchConfig, findings: List[Dict],
sources: List[Dict], quality_scores: List[float]) -> str:
"""Build a structured markdown research report."""
avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0
report = f"""# Research Report: {config.question}
**Date:** {config.date}
**Researcher:** Claw v{VERSION} (OpenClaw Agent)
**Duration:** ~{config.duration}s | **Sources:** {len(sources)} | **Quality Score:** {avg_quality:.1f}/1.0
---
## Executive Summary
{chr(10).join([f"- {f.get('summary', 'No summary available.')}" for f in findings[:5]])}
---
## Key Findings
"""
for i, f in enumerate(findings, 1):
score = quality_scores[i - 1] if i <= len(quality_scores) else 0
quality_tag = f" **({score:.1f})**" if score < 0.5 else ""
report += f"""
### {i}. {f.get('title', f'Finding {i}')}{quality_tag}
{f.get('details', 'No details available.')}
"""
report += f"""
## Quality Assessment
| Metric | Score |
|--------|-------|
| Average source quality | {avg_quality:.1f}/1.0 |
| Sources with content (>100 chars) | {sum(1 for s in quality_scores if s > 0.3)}/{len(quality_scores)} |
| Follow-up rounds | {config.followups} |
| Sources after dedup | {len(sources)} |
---
## Limitations
- Information may be outdated or incomplete
- Sources should be verified independently
- AI-synthesized content — factual claims require human review
- {len(config.question)} follow-up rounds performed
---
**Sources:**
"""
for i, s in enumerate(sources, 1):
date_str = s.get("date", config.date)
report += f"""
{i}. {s.get('title', 'Unknown source')} — {s.get('url', 'No URL')} (accessed {date_str})
"""
return report
def build_json_report(config: ResearchConfig, findings: List[Dict],
sources: List[Dict], quality_scores: List[float]) -> str:
"""Build a structured JSON research report."""
avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0
return json.dumps({
"version": VERSION,
"question": config.question,
"date": config.date,
"duration_seconds": config.duration,
"summary": [f.get("summary", "") for f in findings[:5]],
"findings": findings,
"sources": sources,
"quality_scores": quality_scores,
"average_quality": round(avg_quality, 2),
"limitations": [
"Information may be outdated or incomplete",
"Sources should be verified independently",
"AI-synthesized content — factual claims require human review"
]
}, indent=2, ensure_ascii=False)
def build_html_report(config: ResearchConfig, findings: List[Dict],
sources: List[Dict], quality_scores: List[float]) -> str:
"""Build an HTML research report."""
avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0
color = "green" if avg_quality >= 0.6 else "orange" if avg_quality >= 0.4 else "red"
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Research Report: {config.question[:80]}</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 800px; margin: 2rem auto; padding: 0 1rem; }}
h1 {{ color: #1a1a2e; }} h2 {{ border-bottom: 2px solid #eee; padding-bottom: 0.3rem; }}
.meta {{ color: #666; margin-bottom: 1rem; }}
.quality {{ color: {color}; font-weight: bold; }}
table {{ border-collapse: collapse; width: 100%; margin: 1rem 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px 12px; text-align: left; }}
th {{ background: #f5f5f5; }}
.source {{ font-size: 0.9em; color: #555; }}
</style>
</head>
<body>
<h1>Research Report</h1>
<p class="meta">
<strong>Question:</strong> {config.question}<br>
<strong>Date:</strong> {config.date} |
<strong>Duration:</strong> ~{config.duration}s |
<strong>Quality:</strong> <span class="quality">{avg_quality:.1f}/1.0</span>
</p>
<h2>Executive Summary</h2>
<ul>
{''.join([f'<li>{f.get("summary", "No summary.")}' for f in findings[:5]])}
</ul>
<h2>Key Findings</h2>
"""
for i, f in enumerate(findings, 1):
score = quality_scores[i - 1] if i <= len(quality_scores) else 0
html += f"""
<div style="margin: 1rem 0; padding: 1rem; border-left: 3px solid #4a90d9; background: #fafafa;">
<strong>{i}. {f.get('title', f'Finding {i}')} </strong>
<span class="quality">({score:.1f})</span>
<p>{f.get('details', 'No details.')}</p>
</div>
"""
html += f"""
<h2>Quality Assessment</h2>
<table>
<tr><th>Metric</th><th>Value</th></tr>
<tr><td>Average source quality</td><td>{avg_quality:.1f}/1.0</td></tr>
<tr><td>Sources after dedup</td><td>{len(sources)}</td></tr>
<tr><td>Follow-up rounds</td><td>{config.followups}</td></tr>
</table>
<h2>Limitations</h2>
<ul>
<li>Information may be outdated or incomplete</li>
<li>Sources should be verified independently</li>
<li>AI-synthesized content — factual claims require human review</li>
</ul>
<h2>Sources</h2>
<ol>
{''.join([f'<li>{s.get("title", "Unknown")} — {s.get("url", "No URL")}</li>' for s in sources])}
</ol>
</body>
</html>"""
return html
def main():
parser = argparse.ArgumentParser(description="Web Research Pipeline v2")
parser.add_argument("question", nargs="?", help="Research question")
parser.add_argument("--batch", help="JSON file with batch research questions")
parser.add_argument("--followups", type=int, default=2,
help="Number of follow-up rounds (default: 2)")
parser.add_argument("--sources", type=int, default=8,
help="Max sources per query variant (default: 8)")
parser.add_argument("--format", choices=["markdown", "json", "html"],
default="markdown", help="Output format (default: markdown)")
parser.add_argument("--output", help="Output file path")
args = parser.parse_args()
# Determine research questions
questions = []
if args.batch:
with open(args.batch) as f:
batch_data = json.load(f)
questions = [q for q in batch_data.get("questions", [])]
config = ResearchConfig(
question=f"Batch research ({len(questions)} topics)",
followups=args.followups,
max_sources=args.sources,
output_format=args.format
)
elif args.question:
config = ResearchConfig(
question=args.question,
followups=args.followups,
max_sources=args.sources,
output_format=args.format
)
else:
parser.print_help()
return
# Phase 1: Parse and generate queries
for q in (questions if questions else [config.question]):
print(f"\n📋 Research question: {q}")
topics = extract_topics(q)
print(f"🔍 Topics: {', '.join(topics[:4])}")
queries = generate_search_queries(topics, n_queries=5)
print(f"🔎 Queries: {len(queries)}")
for i, query in enumerate(queries, 1):
print(f" {i}. {query}")
# Phase 2: Pipeline summary
print(f"\n⚙️ Pipeline:")
print(f" 1. Execute web_search for each query variant")
print(f" 2. Fetch content from top results (web_fetch)")
print(f" 3. Deduplicate sources & score quality")
print(f" 4. Generate follow-up queries ({config.followups} rounds)")
print(f" 5. Synthesize findings & build report")
print(f" 6. Save to workspace/research/")
print(f"\n✅ Web Research Pipeline v{VERSION} ready.")
print(f" Run from OpenClaw agent with tool calls for actual execution.")
print(f" This script defines the pipeline logic and report builders.")
if __name__ == "__main__":
main()
Decompose complex tasks into independent subtasks, spawn parallel agents to execute them, then collect and synthesize results efficiently.
# agent-spawner — Multi-Agent Orchestration
**Version:** 1.0.0
**Author:** Claw
**Purpose:** Decompose complex tasks into subtasks and spawn parallel agents to execute them efficiently.
---
## Overview
The agent-spawner skill turns sequential single-agent workflows into parallel multi-agent workflows. Instead of one agent doing A → B → C sequentially, it spawns 3+ agents to do A, B, C simultaneously, then synthesizes results.
**Efficiency gain:** 2-4x faster execution for multi-part tasks.
---
## How to Use
### 1. Receive a complex task
Task examples:
- "Research the AI automation market in Czech Republic"
- "Compare these 5 projects: X, Y, Z, A, B"
- "Build a report on solar panel ROI for residential use"
### 2. Decompose into subtasks
Use `scripts/spawn_planner.py` or follow spawn patterns (see references/).
### 3. Spawn sub-agents
```bash
# For each independent subtask:
sessions_spawn \
task="Execute subtask: <description>" \
label="subtask-1" \
mode="run" \
runtime="subagent"
```
### 4. Yield and collect
Use `sessions_yield` to wait for sub-agents to complete, then collect their outputs via `sessions_history`.
### 5. Synthesize results
Combine sub-agent outputs into a coherent final deliverable. Resolve conflicts, merge findings, add context only you possess.
---
## Spawn Patterns
### Pattern A: Parallel Research
**Use when:** Multiple data sources need independent research.
**Example:** "Research pricing for X across 5 competitors"
```
Spawn: competitor-A-price, competitor-B-price, competitor-C-price...
Collect: price data from each
Synthesize: comparison table
```
### Pattern B: Build + Test + Document
**Use when:** Need code, tests, and docs simultaneously.
**Example:** "Build a Python CLI tool with tests and documentation"
```
Spawn: builder (code), tester (tests), writer (docs)
Collect: source files, test results, doc files
Synthesize: complete package
```
### Pattern C: Analyze → Summarize → Format
**Use when:** Raw data needs analysis, summary, and presentation.
**Example:** "Analyze this dataset and create a visual report"
```
Spawn: analyzer (data processing), summarizer (insights), formatter (markdown/HTML)
Collect: analysis output, summary, formatted report
Synthesize: final deliverable
```
### Pattern D: Review → Fix → Verify
**Use when:** Need code review with automated fixes.
**Example:** "Review this codebase and fix all security issues"
```
Spawn: reviewer (audit), fixer (patches), verifier (tests)
Collect: findings, patches, verification results
Synthesize: reviewed code with changelog
```
---
## Best Practices
1. **Keep subtasks independent** — no shared mutable state between agents
2. **Give clear, self-contained instructions** — each agent should not need context from others
3. **Set timeoutSeconds** — prevent runaway agents (default: 300)
4. **Use descriptive labels** — makes tracking and debugging easier
5. **Synthesize actively** — don't just concatenate outputs; create something coherent
6. **One level deep** — spawn agents from agents. Don't nest spawns more than 1 level.
---
## Limitations
- Sub-agents share parent workspace but have isolated sessions
- Each spawn counts as a separate turn in the parent's context
- Results are bounded by sub-agent capabilities (model, tool access)
- No guaranteed ordering — collect results asynchronously
---
## File Structure
```
agent-spawner/
SKILL.md — This file
references/
spawn-patterns.md — Detailed spawn patterns with examples
model-selection.md — When to use which model variant
scripts/
spawn_planner.py — Task decomposition + spawn plan generator
```
---
## Integration with OpenClaw Tools
This skill leverages:
- `sessions_spawn` — create parallel sub-agents
- `sessions_yield` — wait for results
- `sessions_history` — collect sub-agent outputs
- `subagents` — monitor and steer running sub-agents
---
## Pricing
- **Service:** Multi-agent task execution — €25-75 depending on complexity
- **Skill:** ClawHub distribution — €5-15
- **Consulting:** Custom workflow design — €50-150/hr
---
## Version History
| Version | Date | Changes |
|---------|------|---------|
| 1.0.0 | 2026-04-19 | Initial release |
FILE:README.md
# agent-spawner
Multi-agent orchestration skill. Decompose complex tasks into parallel subtasks and spawn sub-agents to execute them simultaneously.
## Features
- **Task decomposition** — automatic breakdown of complex tasks into subtasks
- **Spawn patterns** — 6 proven patterns for different task types
- **Model selection** — guidelines for choosing the right model per subtask
- **Planner script** — CLI tool to generate spawn plans from task descriptions
## Quick Start
```bash
# Generate a spawn plan
python3 scripts/spawn_planner.py "Research the top 5 AI coding assistants in 2026"
# Then spawn agents using:
sessions_spawn task="<subtask description>" label="sub-1" mode="run" runtime="subagent"
```
## Files
- `SKILL.md` — Full documentation
- `references/spawn-patterns.md` — 6 spawn patterns with examples
- `references/model-selection.md` — Model choice guide
- `scripts/spawn_planner.py` — Task decomposition tool
## Pricing
- Service: €25-75 (multi-agent task execution)
- Skill: €5-15 (ClawHub)
FILE:references/model-selection.md
# Model Selection — When to Use Each Variant
## Decision Matrix
| Use Case | Recommended Model | Reason |
|----------|------------------|--------|
| Research & data collection | Default (qwen/gpt equivalent) | Cost-effective, sufficient for factual work |
| Creative writing | Model with better creative capabilities | Quality matters for user-facing content |
| Code generation | Codex/Codex-like model | Specialized for code, faster iteration |
| Complex analysis | Strong reasoning model | Better for multi-step logic |
| Quick summaries | Lightweight model | Fast, cheap, good enough |
| Long documents | Model with large context window | Can handle full text in one pass |
## Cost vs. Quality Tradeoff
```
Simple subtask (search, list, summarize) → cheapest model
Medium subtask (analyze, compare, draft) → standard model
Complex subtask (code, creative, multi-step) → strongest model
```
## Heuristic Rules
1. **Research subtasks** always use cheapest model (facts are facts)
2. **Writing subtasks** use standard model (quality matters for tone)
3. **Code subtasks** use capable coding model (errors cost more than model cost)
4. **Synthesis step** always uses strongest model (this is the value-add)
5. **Review/verify subtasks** use standard model (need accuracy, not creativity)
FILE:references/spawn-patterns.md
# Spawn Patterns — Reference Guide
## Pattern A: Parallel Research (Data Collection)
**Best for:** Tasks requiring research across multiple independent sources.
**Speed gain:** 3-5x over sequential research.
### Template
```
Task: "Research X across N sources"
→ Split: source_1, source_2, ..., source_N
→ Each agent: "Research [specific source] for [topic]"
→ Collect: all findings
→ Synthesize: unified report with comparisons
```
### Example: Market Research
```
Spawn 3 agents:
1. "Research the current state of AI automation tools in EU market. Focus on pricing, features, and adoption trends. Find at least 5 specific examples."
2. "Analyze the Czech automation market. Identify top 3 competitors, their pricing, and market position. Focus on local companies."
3. "Research emerging trends in AI automation for 2026. Focus on open-source tools, no-code platforms, and integration capabilities."
```
### Anti-pattern
```
❌ Don't: Ask each agent to summarize the other agents' work
❌ Don't: Create dependencies between subtasks
✅ Do: Give each agent full context to work independently
```
---
## Pattern B: Build + Test + Document (Parallel Delivery)
**Best for:** Software deliverables that need code, tests, and docs.
**Speed gain:** 2-3x over sequential build.
### Template
```
Task: "Build [deliverable] with tests and documentation"
→ Split: builder, tester, writer
→ Each agent works on their part in isolation
→ Collect: source files, test results, documentation
→ Synthesize: complete deliverable
```
### Example: Skill Development
```
Spawn 3 agents:
1. "Build a Python CLI tool that converts CSV to JSON with validation. Include error handling for malformed data."
2. "Write 8 unit tests for a CSV-to-JSON converter. Test edge cases: empty files, encoding errors, large files, mixed types."
3. "Write documentation for a CSV-to-JSON CLI tool. Include usage examples, options reference, and troubleshooting guide."
```
---
## Pattern C: Analyze → Summarize → Format (Pipeline)
**Best for:** Data processing where analysis, summary, and presentation are distinct.
**Speed gain:** 1.5-2x (partial parallelism).
### Template
```
Task: "Create [deliverable] from [data source]"
→ Split: analyst (find patterns), summarizer (extract insights), formatter (presentation)
→ Order: analyst → summarizer → formatter (can partially parallelize)
```
### Example: Competitive Analysis
```
Spawn 3 agents:
1. "Analyze the feature sets of these 6 CRM tools. Create a comparison table with 15 dimensions."
2. "Based on the feature comparison, identify which CRM is best for startups, which for enterprise, and which is the overall winner. Justify each choice."
3. "Format a competitive analysis report. Use markdown with headers, tables, bullet points, and a clear executive summary."
```
---
## Pattern D: Review → Fix → Verify (Quality Gate)
**Best for:** Code quality improvement, security audits, compliance checks.
**Speed gain:** 2-3x over single-agent review.
### Template
```
Task: "Improve [codebase] [criteria]"
→ Split: reviewer (find issues), fixer (apply patches), verifier (confirm fixes)
→ Order: reviewer → fixer → verifier (sequential pipeline)
```
### Example: Security Review
```
Spawn 3 agents:
1. "Review this Python codebase for security vulnerabilities. List each issue with file, line, severity, and suggested fix."
2. "Apply security fixes to the following files based on these findings: [reviewer output]"
3. "Verify that these security fixes are correct and haven't introduced regressions. Run tests and check the patched code."
```
---
## Pattern E: Multi-Format Output
**Best for:** Deliverables needed in multiple formats.
**Speed gain:** 2x (independent format generation).
### Template
```
Task: "Create [content] in multiple formats"
→ Split: format_1, format_2, ..., format_N
→ Each agent produces one format from shared source
→ Collect: all formats
→ Synthesize: complete multi-format deliverable
```
### Example: Report Package
```
Spawn 3 agents:
1. "Create a markdown report about [topic]. Include executive summary, analysis, and recommendations."
2. "Create a structured JSON report about [topic] with fields: title, summary, sections[], recommendations[], date."
3. "Create a bulleted outline version of [topic] suitable for a presentation. Max 10 slides with 3-5 bullet points each."
```
---
## Pattern F: ACP Code Generation
**Best for:** Heavy coding tasks that benefit from specialized coding agents.
**Speed gain:** Variable (depends on coding agent capabilities).
### Template
```
Task: "Build [complex software component]"
→ Split: ACP agent (heavy code generation)
→ Orchestrate: spawn ACP with specific task
→ Collect: code from ACP session
→ Verify: test and review the output
```
---
## Common Mistakes
1. **Over-segmentation** — splitting into too many tiny tasks creates coordination overhead
2. **Under-segmentation** — keeping subtasks too large defeats the parallelism benefit
3. **Circular dependencies** — subtask A needs B's output and vice versa (impossible)
4. **Lossy synthesis** — pasting outputs without adding coherence, context, or polish
5. **Ignoring context** — each agent starts fresh; don't assume shared context unless explicitly passed
## Optimal Subtask Size
- **Small:** 1-3 subtasks — simple parallel work
- **Medium:** 4-8 subtasks — standard multi-part deliverables
- **Large:** 9+ subtasks — consider splitting into phases
## When NOT to Use
- Simple single-step tasks (overhead > benefit)
- Tasks with hard sequential dependencies
- Tasks requiring real-time collaboration between agents
- When total task time < 5 minutes (sequential is faster)
FILE:scripts/spawn_planner.py
#!/usr/bin/env python3
"""
spawn_planner.py — Task decomposition for agent-spawner skill.
Takes a complex task description and produces a spawn plan with:
- Subtask breakdown
- Dependencies
- Recommended model for each subtask
- Expected output format
Usage:
python3 spawn_planner.py "task description"
python3 spawn_planner.py --file plan.md "task description"
"""
import sys
import json
import re
from typing import Optional
def analyze_task(task: str) -> dict:
"""
Analyze a task description and determine the best spawn pattern.
Returns a structured spawn plan.
"""
task_lower = task.lower()
# Pattern detection
has_research = any(w in task_lower for w in ['research', 'find', 'search', 'investigate', 'market'])
has_compare = any(w in task_lower for w in ['compare', 'versus', 'vs ', 'comparison'])
has_build = any(w in task_lower for w in ['build', 'create', 'develop', 'implement', 'program'])
has_test = any(w in task_lower for w in ['test', 'verify', 'validate', 'review'])
has_doc = any(w in task_lower for w in ['document', 'write', 'guide', 'tutorial', 'manual'])
has_analyze = any(w in task_lower for w in ['analyze', 'analyze', 'analysis', 'evaluate'])
has_format = any(w in task_lower for w in ['format', 'csv', 'json', 'html', 'pdf'])
# Count potential parallel agents
subtasks = []
dependencies = []
# Detect competitor/comparison tasks
competitor_count = len(re.findall(r'\b[a-z]+[\d-]*', task))
competitor_count = min(max(competitor_count - 2, 1), 5) # 1-5 competitors
if has_build:
subtasks.append({
"id": "builder",
"role": "Builder",
"description": f"Build the core implementation for: {task}",
"model": "capable_coding",
"output_format": "source_files"
})
dependencies.append({"from": "builder", "to": None})
if has_test or has_build:
subtasks.append({
"id": "tester",
"role": "Tester",
"description": "Write comprehensive tests for the implementation",
"model": "standard",
"output_format": "test_files"
})
if has_build:
dependencies.append({"from": "tester", "to": "builder"})
if has_doc or has_build:
subtasks.append({
"id": "writer",
"role": "Writer",
"description": f"Write documentation for: {task}",
"model": "standard",
"output_format": "markdown_files"
})
if has_build:
dependencies.append({"from": "writer", "to": "builder"})
if has_research:
subtasks.append({
"id": "researcher",
"role": "Researcher",
"description": f"Research the topic: {task}",
"model": "cheapest",
"output_format": "research_notes"
})
dependencies.append({"from": "researcher", "to": None})
if has_compare and not any(s["id"] == "comparator" for s in subtasks):
subtasks.append({
"id": "comparator",
"role": "Comparator",
"description": f"Create comparison analysis: {task}",
"model": "standard",
"output_format": "comparison_table"
})
dependencies.append({"from": "comparator", "to": None})
if has_analyze:
subtasks.append({
"id": "analyst",
"role": "Analyst",
"description": f"Analyze the data/topic: {task}",
"model": "strong_reasoning",
"output_format": "analysis_report"
})
dependencies.append({"from": "analyst", "to": None})
# Default subtasks if none detected
if not subtasks:
subtasks.append({
"id": "executor",
"role": "Executor",
"description": task,
"model": "standard",
"output_format": "mixed"
})
# Detect parallelization opportunity
parallel_candidates = [s for s in subtasks if s["id"] in ["builder", "researcher", "comparator", "analyst"]]
can_parallel = len(parallel_candidates) >= 2 and len(dependencies) == 0
# Build spawn plan
plan = {
"original_task": task,
"pattern": detect_pattern(task),
"can_parallel": can_parallel,
"subtasks": subtasks,
"dependencies": dependencies,
"synthesis_instructions": {
"step": "After collecting all subtask outputs, synthesize them into a coherent final deliverable. Resolve conflicts, merge overlapping findings, and add context that only the orchestrator possesses.",
"priority": "quality_over_speed"
}
}
return plan
def detect_pattern(task: str) -> str:
"""Detect which spawn pattern best fits the task."""
task_lower = task.lower()
if any(w in task_lower for w in ['research', 'find', 'search', 'investigate']):
if any(w in task_lower for w in ['compare', 'versus', 'vs']):
return "parallel_research_with_comparison"
return "parallel_research"
if any(w in task_lower for w in ['build', 'create', 'develop', 'implement']):
if any(w in task_lower for w in ['test', 'document', 'doc']):
return "build_test_document"
return "build_parallel"
if any(w in task_lower for w in ['analyze', 'analysis', 'evaluate']):
return "analyze_summarize_format"
if any(w in task_lower for w in ['review', 'fix', 'verify']):
return "review_fix_verify"
if any(w in task_lower for w in ['format', 'csv', 'json', 'html']):
return "multi_format_output"
return "generic"
def main():
"""CLI entry point."""
if len(sys.argv) < 2:
print("Usage: spawn_planner.py <task description>")
print(" spawn_planner.py --file <file> <task description>")
sys.exit(1)
task = " ".join(sys.argv[1:])
plan = analyze_task(task)
# Output as JSON
print(json.dumps(plan, indent=2))
# Also output human-readable format
print("\n" + "=" * 60)
print(f"TASK: {plan['original_task']}")
print(f"PATTERN: {plan['pattern']}")
print(f"PARALLEL: {'Yes' if plan['can_parallel'] else 'No'}")
print("-" * 60)
for i, sub in enumerate(plan['subtasks'], 1):
print(f"\n{sub['id'].upper()} (#{i})")
print(f" Role: {sub['role']}")
print(f" Model: {sub['model']}")
print(f" Task: {sub['description']}")
print(f" Output: {sub['output_format']}")
if plan['dependencies']:
print("\nDEPENDENCIES:")
for dep in plan['dependencies']:
if dep['from'] and dep['to']:
print(f" {dep['from']} → {dep['to']}")
else:
print(f" {dep['from'] or 'All independent'}")
print(f"\n{plan['synthesis_instructions']['step']}")
if __name__ == "__main__":
main()
Quickly find and score relevant lines with context in large text files using keyword and phrase matching to minimize reading entire files.
# text-scan
## Description
Search for relevant information in text files without reading the entire file. Returns matching lines with context, scored by relevance. Useful for quickly finding specific information in large files.
## When to Use
- You need to find a specific piece of information in a file but don't know the exact location
- You want to scan multiple files for relevant content before deciding which to read fully
- Marek wants you to extract specific lines from large files
- You're doing research and need to quickly scan through notes, logs, or documents
- The file is very large and reading it entirely would waste tokens/time
## How It Works
The script uses token-based matching with scoring:
- Exact token matches score 2 points
- Substring/partial matches score 1 point
- Phrase matches (adjacent query terms found together in the line) score 3 points
- Results are sorted by score and limited
## Usage
```bash
# Basic search
python3 <skill_dir>/scripts/text-scan.py <file> --query "<search terms>"
# Brief format (line number + content)
python3 <skill_dir>/scripts/text-scan.py <file> --query "<search terms>" --brief
# JSON output (for programmatic use)
python3 <skill_dir>/scripts/text-scan.py <file> --query "<search terms>" --json
# Custom context window
python3 <skill_dir>/scripts/text-scan.py <file> --query "<search terms>" --before 3 --lines 5
# From stdin
cat <file> | python3 <skill_dir>/scripts/text-scan.py --query "<search terms>"
```
## Examples
```bash
# Find the runway in STATE.md
python3 <skill_dir>/scripts/text-scan.py /home/marek/.openclaw/workspace/STATE.md --query "runway"
# Find today's work hours
python3 <skill_dir>/scripts/text-scan.py /home/marek/.openclaw/workspace/STATE.md --query "today work hours"
# Find all log entries about a topic
python3 <skill_dir>/scripts/text-scan.py /home/marek/.openclaw/workspace/LOG.md --query "weather"
```
## Parameters
| Parameter | Description | Default |
|-----------|-------------|---------|
| `--query, -q` | Search query (keywords) | required |
| `--lines, -a` | Lines after each match | 5 |
| `--before, -b` | Lines before each match | 2 |
| `--max-results, -n` | Maximum results to return | 5 |
| `--json` | JSON output format | false |
| `--brief` | Brief format only | false |
| `--fuzzy` | Enable fuzzy matching | false |
| `--output, -o` | Write results to file | stdout |
## Integration with OpenClaw
This skill integrates with the standard `read` tool workflow:
1. Use `text-scan` to quickly find relevant lines in a file
2. If the result is significant, use `read` to load the full file context
3. This reduces token usage by avoiding reading unnecessary content
## Tips
- Use shorter queries for broader matches, longer phrases for precision
- The `--brief` flag is fastest for quick scans
- `--json` output is useful for scripting/automation
- Combine with `find` to scan multiple files: `find . -name "*.md" | xargs ...`
- Score 3+ matches are usually high-confidence — worth reading in full
FILE:scripts/text-scan.py
#!/usr/bin/env python3
"""text-scan.py — Search for relevant information in text files.
Usage:
python3 text-scan.py <file> [options]
Options:
--query <text> Search query (keywords to find)
--lines <n> Number of lines after match (default: 5)
--before <n> Number of lines before match (default: 2)
--max-results <n> Maximum results to return (default: 5)
--fuzzy Use fuzzy matching for approximate keywords
--context Show surrounding context for each match
--output <file> Write results to file (instead of stdout)
If no --query is given, reads from stdin for the query.
Examples:
python3 text-scan.py STATE.md --query "runway"
python3 text-scan.py MEMORY.md --query "project goals" --lines 10 --before 2
cat LOG.md | python3 text-scan.py --query "weather" --fuzzy
"""
import sys
import os
import argparse
import re
import json
def normalize(text):
"""Normalize text for fuzzy matching: lowercase, strip extra whitespace."""
return re.sub(r'\s+', ' ', text.lower().strip())
def tokens(text):
"""Split text into tokens (words), also extract n-grams."""
normalized = normalize(text)
words = normalized.split()
# Include bigrams
bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words)-1)]
return words + bigrams
def match_query(line, query_tokens):
"""Score how well a line matches the query. Returns (score, matched_terms)."""
line_norm = normalize(line)
line_toks = set(tokens(line_norm))
if not query_tokens:
return 0, []
matched = []
score = 0
for q in query_tokens:
# Exact match
if q in line_toks:
score += 2
matched.append(q)
elif q in line_norm:
# Partial/substring match
score += 1
matched.append(q)
# Bonus: words close together in line (phrase match)
for i in range(len(query_tokens) - 1):
pair = f"{query_tokens[i]} {query_tokens[i+1]}"
if pair in line_norm:
score += 3
return score, matched
def scan_file(filepath, query, lines_after=5, lines_before=2, max_results=5, fuzzy=False):
"""Scan a file for lines matching the query.
Returns list of dicts: {line_num, content, score, matched_terms, context}
"""
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
if not query.strip():
return []
query_tokens = tokens(query)
if not query_tokens:
return []
results = []
for i, line in enumerate(lines):
score, matched = match_query(line, query_tokens)
if score > 0:
start = max(0, i - lines_before)
end = min(len(lines), i + lines_after)
context_lines = lines[start:i] + [line] + lines[i+1:end]
results.append({
"line_num": i + 1,
"content": line.rstrip('\n'),
"score": score,
"matched_terms": matched,
"context": [l.rstrip('\n') for l in context_lines]
})
# Sort by score descending
results.sort(key=lambda x: x["score"], reverse=True)
return results[:max_results]
def main():
parser = argparse.ArgumentParser(description='Scan text files for relevant information')
parser.add_argument('file', nargs='?', help='File to scan (stdin if omitted)')
parser.add_argument('--query', '-q', required=True, help='Search query')
parser.add_argument('--lines', '-a', type=int, default=5, help='Lines after match')
parser.add_argument('--before', '-b', type=int, default=2, help='Lines before match')
parser.add_argument('--max-results', '-n', type=int, default=5, help='Max results')
parser.add_argument('--output', '-o', help='Output file')
parser.add_argument('--json', action='store_true', help='JSON output')
parser.add_argument('--brief', action='store_true', help='Brief format (line_num: content only)')
parser.add_argument('--fuzzy', action='store_true', help='Enable fuzzy matching')
args = parser.parse_args()
# Read file
if args.file:
filepath = args.file
else:
filepath = '<stdin>'
with open(0, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
query = args.query
query_tokens = tokens(query)
results = []
for i, line in enumerate(lines):
score, matched = match_query(line, query_tokens)
if score > 0:
start = max(0, i - args.before)
end = min(len(lines), i + args.lines)
results.append({
"line_num": i + 1,
"content": line.rstrip('\n'),
"score": score,
"matched_terms": matched,
"context": [l.rstrip('\n') for l in lines[start:end]]
})
results.sort(key=lambda x: x["score"], reverse=True)
results = results[:args.max_results]
if args.json:
output = json.dumps(results, indent=2, ensure_ascii=False)
elif args.brief:
output = '\n'.join(f"{r['line_num']}: {r['content'][:120]}" for r in results)
else:
output = ""
for r in results:
output += f"\n{'─' * 40}\n"
output += f"Line {r['line_num']} (score: {r['score']}, matched: {', '.join(r['matched_terms'])})\n"
output += f"{'─' * 40}\n"
output += '\n'.join(r['context'])
output += '\n'
sys.stdout.write(output)
return
if not os.path.exists(filepath):
print(f"Error: file not found: {filepath}", file=sys.stderr)
sys.exit(1)
results = scan_file(filepath, args.query, args.lines, args.before, args.max_results, args.fuzzy)
if not results:
print(f"No matches for query: '{args.query}'")
sys.exit(0)
if args.json:
output = json.dumps(results, indent=2, ensure_ascii=False)
elif args.brief:
output = '\n'.join(f"Line {r['line_num']}: {r['content'][:120]}" for r in results)
else:
output = ""
for r in results:
output += f"\n{'─' * 40}\n"
output += f"Line {r['line_num']} (score: {r['score']}, matched: {', '.join(r['matched_terms'])})\n"
output += f"{'─' * 40}\n"
output += '\n'.join(r['context'])
output += '\n'
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write(output)
else:
sys.stdout.write(output)
print(f"\n✓ Found {len(results)} matches", file=sys.stderr)
if __name__ == '__main__':
main()
Manage, schedule, monitor, and report on recurring cron tasks with flexible patterns and handle dependencies, priorities, timezones, and execution logs.
# CLAW CRON MANAGER
You are a cron-manager skill that handles scheduling, monitoring, and management of recurring tasks for autonomous agents running on OpenClaw.
## What You Do
- Create, schedule, and manage cron tasks with flexible patterns (hourly, daily, weekly, custom intervals)
- Monitor cron execution, track success/failure rates, and alert on issues
- Provide task history, statistics, and performance reports
- Manage task dependencies, priorities, and resource limits
- Generate cron expressions and human-readable schedules
- Handle timezones, DST transitions, and scheduling conflicts
## What You Don't Do
- Execute the actual task logic (that's the task's job)
- Modify system crontabs or system-level scheduling
- Access external services beyond the OpenClaw API
- Guarantee execution during system downtime or maintenance windows
## Available Commands
Run from the `scripts/cron_manager.py` script with these actions:
- `cron list [--status all|active|paused|failed]` — List all cron tasks
- `cron show <task_id>` — Show detailed task info and recent runs
- `cron add <name> --command "<cmd>" --schedule "<pattern>" [--timezone UTC]` — Add a new task
- `cron remove <task_id>` — Delete a task
- `cron pause <task_id>` — Pause execution without deleting
- `cron resume <task_id>` — Resume a paused task
- `cron run <task_id>` — Force run a task immediately
- `cron logs <task_id> [--count 10]` — View recent execution logs
- `cron stats [--hours 168]` — Show execution statistics for a period
- `cron health` — Overall system health check
## Schedule Format
Use standard cron patterns:
- `* * * * *` — Every minute
- `*/5 * * * *` — Every 5 minutes
- `0 * * * *` — Every hour
- `0 0 * * *` — Daily at midnight
- `0 0 * * 1` — Weekly on Monday
- `0 0 1 * *` — Monthly on 1st
- `@hourly`, `@daily`, `@weekly`, `@monthly`, `@yearly` — Shorthand
Or human-friendly patterns:
- `"every 30 minutes"`
- `"daily at 9am"`
- `"weekly on Monday at 10am"`
- `"every Monday, Wednesday, Friday at 8am"`
## Example Usage
```bash
# Add a daily cleanup task
./cron_manager.py add "cleanup" --command "python cleanup.py" --schedule "@daily"
# Check status of all tasks
./cron_manager.py list --status active
# View logs for a specific task
./cron_manager.py logs "cleanup" --count 5
# Check overall health
./cron_manager.py health
```
## Output Format
All commands return JSON with standardized fields:
```json
{
"status": "success",
"data": {
"tasks": [
{
"id": "cleanup",
"name": "Daily Cleanup",
"status": "active",
"schedule": "@daily",
"next_run": "2026-04-18T00:00:00Z",
"last_run": "2026-04-17T00:00:01Z",
"success_rate": 0.98
}
]
}
}
```
FILE:scripts/cron_manager.py
#!/usr/bin/env python3
"""
Claw Cron Manager — Schedule, monitor, and manage recurring tasks for autonomous agents.
This script provides a file-based cron management system that integrates with OpenClaw agents.
It stores task definitions in JSON and provides command-line interface for managing schedules.
Usage:
python cron_manager.py <action> [options]
Actions:
list [--status all|active|paused|failed] List all cron tasks
show <task_id> Show task details and recent runs
add <name> --command "<cmd>" --schedule "<pattern>" Add new task
remove <task_id> Delete a task
pause <task_id> Pause task execution
resume <task_id> Resume paused task
run <task_id> Force run a task immediately
logs <task_id> [--count 10] View recent execution logs
stats [--hours 168] Show execution statistics
health Overall system health check
"""
import json
import os
import sys
import subprocess
import argparse
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
import re
import pytz
# Configuration
DATA_DIR = os.environ.get('CRON_DATA_DIR', str(Path.home() / '.openclaw' / 'workspace' / 'skills' / 'cron-manager' / 'data'))
TASKS_FILE = os.path.join(DATA_DIR, 'tasks.json')
LOGS_DIR = os.path.join(DATA_DIR, 'logs')
def init_dirs():
"""Initialize data directories if they don't exist."""
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(LOGS_DIR, exist_ok=True)
def load_tasks() -> list:
"""Load tasks from JSON file."""
if not os.path.exists(TASKS_FILE):
return []
with open(TASKS_FILE, 'r') as f:
try:
return json.load(f)
except json.JSONDecodeError:
return []
def save_tasks(tasks: list):
"""Save tasks to JSON file."""
with open(TASKS_FILE, 'w') as f:
json.dump(tasks, f, indent=2)
def parse_schedule(schedule_str: str) -> Optional[dict]:
"""
Parse a schedule string into a cron-like structure.
Supports standard cron patterns and human-friendly formats.
"""
schedule_str = schedule_str.strip()
# Handle shorthands
shorthand_patterns = {
'@hourly': {'minute': '*', 'hour': '*', 'day': '*', 'month': '*', 'weekday': '*'},
'@daily': {'minute': '0', 'hour': '0', 'day': '*', 'month': '*', 'weekday': '*'},
'@weekly': {'minute': '0', 'hour': '0', 'day': '*', 'month': '*', 'weekday': '1'},
'@monthly': {'minute': '0', 'hour': '0', 'day': '1', 'month': '*', 'weekday': '*'},
'@yearly': {'minute': '0', 'hour': '0', 'day': '1', 'month': '1', 'weekday': '*'},
}
if schedule_str.lower() in shorthand_patterns:
return shorthand_patterns[schedule_str.lower()]
# Parse standard 5-field cron pattern
fields = schedule_str.split()
if len(fields) == 5:
return {
'minute': fields[0],
'hour': fields[1],
'day': fields[2],
'month': fields[3],
'weekday': fields[4]
}
# Try human-friendly format
human_patterns = [
(r'every\s+(\d+)\s+minutes', lambda m: {'minute': '*/' + m.group(1), 'hour': '*', 'day': '*', 'month': '*', 'weekday': '*'}),
(r'every\s+(\d+)\s+hours', lambda m: {'minute': '0', 'hour': '*/' + m.group(1), 'day': '*', 'month': '*', 'weekday': '*'}),
(r'daily\s+at\s+(\d+):?(\d*)', lambda m: {'minute': m.group(2) or '0', 'hour': m.group(1), 'day': '*', 'month': '*', 'weekday': '*'}),
(r'weekly\s+on\s+(\w+)\s+at\s+(\d+):?(\d*)', lambda m: {'minute': m.group(3) or '0', 'hour': m.group(2), 'day': '*', 'month': '*', 'weekday': str(['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'].index(m.group(1).lower()) + 1)}),
(r'every\s+monday', lambda m: {'minute': '0', 'hour': '0', 'day': '*', 'month': '*', 'weekday': '1'}),
]
for pattern, handler in human_patterns:
match = re.search(pattern, schedule_str.lower())
if match:
return handler(match)
return None
def next_run_time(schedule: dict, timezone: str = 'UTC', after: datetime = None) -> Optional[datetime]:
"""Calculate the next run time based on schedule pattern."""
if after is None:
after = datetime.now(pytz.UTC)
try:
tz = pytz.timezone(timezone)
after = tz.fromutc(after)
except:
tz = pytz.UTC
# Simple implementation: check every minute for next match
current = after.replace(second=0, microsecond=0)
for _ in range(525600): # Max 1 year of minutes
if (current.minute % (int(schedule['minute'].split('/')[-1]) if '/' in schedule['minute'] else 60) == 0 and
current.hour % (int(schedule['hour'].split('/')[-1]) if '/' in schedule['hour'] else 24) == 0 and
current.day % (int(schedule['day'].split('/')[-1]) if '/' in schedule['day'] else 31) == 0 and
current.month % (int(schedule['month'].split('/')[-1]) if '/' in schedule['month'] else 12) == 0 and
current.weekday() % (int(schedule['weekday'].split('/')[-1]) if '/' in schedule['weekday'] else 7) == 0):
return current
current += timedelta(minutes=1)
return None
def get_task_logs(task_id: str) -> list:
"""Get execution logs for a task."""
log_file = os.path.join(LOGS_DIR, f"{task_id}.jsonl")
if not os.path.exists(log_file):
return []
with open(log_file, 'r') as f:
return [json.loads(line) for line in f if line.strip()]
def add_log_entry(task_id: str, entry: dict):
"""Add a log entry for task execution."""
log_file = os.path.join(LOGS_DIR, f"{task_id}.jsonl")
with open(log_file, 'a') as f:
f.write(json.dumps(entry) + '\n')
def cmd_list(args):
"""List all cron tasks."""
tasks = load_tasks()
status_filter = getattr(args, 'status', None)
if status_filter and status_filter != 'all':
tasks = [t for t in tasks if t.get('status') == status_filter]
result = []
for task in tasks:
task_info = {
'id': task['id'],
'name': task['name'],
'status': task['status'],
'schedule': task['schedule'],
'last_run': task.get('last_run', 'never'),
'success_rate': task.get('success_rate', 0),
}
# Calculate next run
if task.get('schedule_parsed') and task['status'] == 'active':
next_run = next_run_time(task['schedule_parsed'], task.get('timezone', 'UTC'))
task_info['next_run'] = next_run.isoformat() if next_run else 'unknown'
result.append(task_info)
return {'status': 'success', 'data': {'tasks': result}}
def cmd_show(args):
"""Show task details and recent runs."""
tasks = load_tasks()
task = next((t for t in tasks if t['id'] == args.task_id), None)
if not task:
return {'status': 'error', 'message': f'Task {args.task_id} not found'}
logs = get_task_logs(args.task_id)
recent = logs[-5:] if logs else []
return {
'status': 'success',
'data': {
'task': task,
'recent_runs': recent
}
}
def cmd_add(args):
"""Add a new cron task."""
tasks = load_tasks()
# Check for duplicate ID
if any(t['id'] == args.name for t in tasks):
return {'status': 'error', 'message': f'Task {args.name} already exists'}
schedule = parse_schedule(args.schedule)
if not schedule:
return {'status': 'error', 'message': f'Failed to parse schedule: {args.schedule}'}
new_task = {
'id': args.name,
'name': args.name,
'command': args.command,
'schedule': args.schedule,
'schedule_parsed': schedule,
'timezone': args.timezone or 'UTC',
'status': 'active',
'created_at': datetime.now(pytz.UTC).isoformat(),
'last_run': None,
'success_count': 0,
'failure_count': 0,
'total_runs': 0,
}
tasks.append(new_task)
save_tasks(tasks)
return {
'status': 'success',
'message': f'Task {args.name} added successfully',
'data': new_task
}
def cmd_remove(args):
"""Remove a cron task."""
tasks = load_tasks()
tasks = [t for t in tasks if t['id'] != args.task_id]
save_tasks(tasks)
# Clean up logs
log_file = os.path.join(LOGS_DIR, f"{args.task_id}.jsonl")
if os.path.exists(log_file):
os.remove(log_file)
return {'status': 'success', 'message': f'Task {args.task_id} removed'}
def cmd_pause(args):
"""Pause a cron task."""
tasks = load_tasks()
task = next((t for t in tasks if t['id'] == args.task_id), None)
if not task:
return {'status': 'error', 'message': f'Task {args.task_id} not found'}
task['status'] = 'paused'
save_tasks(tasks)
return {'status': 'success', 'message': f'Task {args.task_id} paused'}
def cmd_resume(args):
"""Resume a paused cron task."""
tasks = load_tasks()
task = next((t for t in tasks if t['id'] == args.task_id), None)
if not task:
return {'status': 'error', 'message': f'Task {args.task_id} not found'}
task['status'] = 'active'
save_tasks(tasks)
return {'status': 'success', 'message': f'Task {args.task_id} resumed'}
def cmd_run(args):
"""Force run a task immediately."""
tasks = load_tasks()
task = next((t for t in tasks if t['id'] == args.task_id), None)
if not task:
return {'status': 'error', 'message': f'Task {args.task_id} not found'}
# Execute the command
start = datetime.now(pytz.UTC)
try:
result = subprocess.run(task['command'], shell=True, capture_output=True, text=True, timeout=300)
success = result.returncode == 0
output = result.stdout[-1000:] if result.stdout else ''
error = result.stderr[-1000:] if result.stderr else ''
except Exception as e:
success = False
output = ''
error = str(e)
end = datetime.now(pytz.UTC)
# Update task stats
task['last_run'] = start.isoformat()
task['total_runs'] = task.get('total_runs', 0) + 1
if success:
task['success_count'] = task.get('success_count', 0) + 1
else:
task['failure_count'] = task.get('failure_count', 0) + 1
task['success_rate'] = task['success_count'] / task['total_runs'] if task['total_runs'] > 0 else 0
save_tasks(tasks)
# Log the execution
add_log_entry(args.task_id, {
'run_at': start.isoformat(),
'completed_at': end.isoformat(),
'success': success,
'output': output,
'error': error,
})
return {
'status': 'success',
'message': f'Task {args.task_id} executed {"successfully" if success else "with errors"}',
'data': {
'success': success,
'output': output,
'error': error,
'duration': (end - start).total_seconds()
}
}
def cmd_logs(args):
"""View recent execution logs for a task."""
logs = get_task_logs(args.task_id)
count = args.count or 10
return {
'status': 'success',
'data': {
'logs': logs[-count:]
}
}
def cmd_stats(args):
"""Show execution statistics."""
tasks = load_tasks()
hours = args.hours or 168 # Default to 1 week
total_runs = sum(t.get('total_runs', 0) for t in tasks)
total_success = sum(t.get('success_count', 0) for t in tasks)
total_failure = sum(t.get('failure_count', 0) for t in tasks)
success_rate = total_success / total_runs if total_runs > 0 else 0
# Recent runs in the last N hours
cutoff = datetime.now(pytz.UTC) - timedelta(hours=hours)
recent_runs = []
for task in tasks:
logs = get_task_logs(task['id'])
for log in logs:
if datetime.fromisoformat(log['run_at']) > cutoff:
recent_runs.append({
'task_id': task['id'],
'run_at': log['run_at'],
'success': log['success'],
})
return {
'status': 'success',
'data': {
'total_tasks': len(tasks),
'active_tasks': len([t for t in tasks if t['status'] == 'active']),
'paused_tasks': len([t for t in tasks if t['status'] == 'paused']),
'total_runs': total_runs,
'recent_runs': len(recent_runs),
'overall_success_rate': round(success_rate * 100, 2),
'period_hours': hours,
}
}
def cmd_health(args):
"""Run overall system health check."""
tasks = load_tasks()
# Check for tasks with low success rates
failed_tasks = [t for t in tasks if t.get('total_runs', 0) > 0 and
(t.get('success_count', 0) / t['total_runs'] < 0.5)]
# Check for stale logs
stale_threshold = datetime.now(pytz.UTC) - timedelta(days=7)
stale_count = 0
# Check disk space
disk_usage = subprocess.run(['df', '-h', DATA_DIR], capture_output=True, text=True)
health_issues = []
if failed_tasks:
health_issues.append(f'{len(failed_tasks)} task(s) with low success rate')
return {
'status': 'healthy' if not health_issues else 'degraded',
'message': 'All systems operational' if not health_issues else '; '.join(health_issues),
'data': {
'total_tasks': len(tasks),
'active_tasks': len([t for t in tasks if t['status'] == 'active']),
'failed_tasks': len(failed_tasks),
'disk_usage': disk_usage.stdout.strip(),
}
}
def main():
parser = argparse.ArgumentParser(description='Claw Cron Manager')
subparsers = parser.add_subparsers(dest='action', help='Action to perform')
# List
list_parser = subparsers.add_parser('list', help='List all cron tasks')
list_parser.add_argument('--status', choices=['all', 'active', 'paused', 'failed'], default='all')
# Show
show_parser = subparsers.add_parser('show', help='Show task details')
show_parser.add_argument('task_id', help='Task ID')
# Add
add_parser = subparsers.add_parser('add', help='Add a new task')
add_parser.add_argument('name', help='Task name/ID')
add_parser.add_argument('--command', required=True, help='Command to execute')
add_parser.add_argument('--schedule', required=True, help='Schedule pattern')
add_parser.add_argument('--timezone', default='UTC', help='Timezone')
# Remove
remove_parser = subparsers.add_parser('remove', help='Remove a task')
remove_parser.add_argument('task_id', help='Task ID')
# Pause
pause_parser = subparsers.add_parser('pause', help='Pause a task')
pause_parser.add_argument('task_id', help='Task ID')
# Resume
resume_parser = subparsers.add_parser('resume', help='Resume a paused task')
resume_parser.add_argument('task_id', help='Task ID')
# Run
run_parser = subparsers.add_parser('run', help='Force run a task')
run_parser.add_argument('task_id', help='Task ID')
# Logs
logs_parser = subparsers.add_parser('logs', help='View task logs')
logs_parser.add_argument('task_id', help='Task ID')
logs_parser.add_argument('--count', type=int, default=10, help='Number of recent logs')
# Stats
stats_parser = subparsers.add_parser('stats', help='Show execution statistics')
stats_parser.add_argument('--hours', type=int, default=168, help='Number of hours to look back')
# Health
health_parser = subparsers.add_parser('health', help='System health check')
args = parser.parse_args()
if not args.action:
parser.print_help()
sys.exit(1)
init_dirs()
# Dispatch to appropriate command
commands = {
'list': cmd_list,
'show': cmd_show,
'add': cmd_add,
'remove': cmd_remove,
'pause': cmd_pause,
'resume': cmd_resume,
'run': cmd_run,
'logs': cmd_logs,
'stats': cmd_stats,
'health': cmd_health,
}
handler = commands.get(args.action)
if handler:
result = handler(args)
print(json.dumps(result, indent=2))
else:
parser.print_help()
sys.exit(1)
if __name__ == '__main__':
main()
Monitors weather for any location, sends alerts when rain, snow, temperature, wind, UV, or pressure thresholds are exceeded, plus daily briefings and trends.
---
name: weather-alert
description: Proactive weather monitoring and alerting — tracks conditions for any location, sends notifications when thresholds are exceeded (rain, snow, temperature, wind, UV, pressure), and provides daily briefings. Use when: (1) Need to know if it will rain before going out, (2) Want temperature alerts for travel planning, (3) Need daily weather briefings, (4) Monitoring conditions for events (sports, construction, farming), (5) Tracking multi-day weather trends
---
# Weather Alert Skill
Proactively monitor weather conditions and alert when thresholds are exceeded. Uses free APIs (wttr.in, Open-Meteo) with no API key required.
## Quick Start
```bash
# Install the skill
npx clawhub install weather-alert
# Check current weather
"Check weather in Prague"
# Set a rain alert for tomorrow
"Alert me if it will rain tomorrow in Prague"
# Set a temperature alert (below 5°C or above 30°C)
"Alert me if temperature drops below 5°C or exceeds 30°C"
# Get a 7-day briefing
"7-day weather briefing for Prague"
# List active alerts
"Show my weather alerts"
```
## Core Features
### 1. Current Weather Check
Returns current conditions including temperature, humidity, wind, precipitation probability, UV index, and visibility.
```
🌤 Prague Weather — 2026-04-17 15:00
Temp: 12°C (feels like 10°C) | Humidity: 65%
Wind: 15 km/h NW | Precip: 10% | UV: 3
Visibility: 10 km | Pressure: 1013 hPa
Condition: Partly cloudy
```
### 2. Forecast & Briefing
Provides multi-day forecasts with condition summaries.
```
📅 7-Day Briefing — Prague
Mon 17: ☁️ 8-14°C | Rain 40%
Tue 18: 🌧️ 6-11°C | Rain 80%
Wed 19: ⛅ 10-16°C | Rain 20%
Thu 20: ☀️ 12-20°C | Rain 5%
Fri 21: ☀️ 14-22°C | Rain 0%
Sat 22: ⛈️ 11-18°C | Rain 70%
Sun 23: 🌧️ 8-14°C | Rain 60%
```
### 3. Smart Alerts
Set threshold-based alerts that trigger notifications:
- **Temperature:** Above/below a threshold (°C)
- **Rain:** Precipitation probability exceeding a %
- **Snow:** Snowfall depth exceeding mm
- **Wind:** Sustained speed exceeding km/h
- **UV:** UV index exceeding a threshold
- **Pressure:** Barometric pressure dropping/rising rapidly
- **Frost:** Risk of frost (temperature below 0°C overnight)
```
🔔 Active Alerts:
• [TOMORROW] Rain > 70% in Prague → Notify me
• [22:00] Temp < 5°C in Prague → Notify me
• [WEEKEND] UV > 7 in Prague → Notify me
```
### 4. Event Planner
Check weather suitability for specific activities:
```
🏃 Running — Prague, Tomorrow
Good: Temp 10°C, no rain, wind < 15 km/h
Warning: Possible morning drizzle (06:00-08:00)
🧺 Picnic — Prague, Sunday
Bad: Rain 70%, wind 25 km/h
Suggestion: Move to Saturday instead
```
### 5. Weather Trends
Track how conditions change over time for a location:
```
📈 Prague Temperature Trend (7 days):
Mon: 12°C → Tue: 8°C → Wed: 14°C → Thu: 16°C → Fri: 18°C → Sat: 12°C → Sun: 10°C
Change: -1°C from yesterday
```
## Configuration
`config.yaml` defines default location and alert thresholds:
```yaml
default_location:
name: "Prague"
lat: 50.0755
lon: 14.4378
alerts:
rain_threshold: 60 # % probability
temp_min: 5 # °C below which to alert
temp_max: 30 # °C above which to alert
wind_max: 40 # km/h
snow_depth: 5 # cm
uv_max: 7
frost_threshold: 0 # °C
notification:
method: "exec-event" # How alerts are delivered
schedule_check: "6h" # Auto-check interval
```
## Data Sources
- **Primary:** Open-Meteo API (free, no key, global coverage)
- **Secondary:** wttr.in (quick lookups, human-readable)
- **Fallback:** OpenWeatherMap (if API key configured)
## Error Handling
- API timeout → show cached data with staleness warning
- Location not found → suggest nearest match
- Rate limit → wait and retry (Open-Meteo: 1000 requests/hour free)
## Permissions
```yaml
permissions:
read: ["~/weather-alerts/*"]
network: ["wttr.in", "open-meteo.com"]
write: ["~/weather-alerts/alerts.yaml"]
```
FILE:README.md
# Weather Alert Skill
Proactive weather monitoring and alerting for any location. Uses free APIs (Open-Meteo, wttr.in) — no API key required.
## Features
- **Current weather** — temperature, humidity, wind, UV, pressure
- **7-day forecast** — daily conditions with precipitation, wind, temperature
- **Smart alerts** — threshold-based notifications (rain, temp, wind, UV, frost)
- **Event planner** — check weather suitability for activities
- **Weather trends** — track conditions over time
## Quick Start
```bash
npx clawhub install weather-alert
"Check weather in Prague"
"Alert me if it will rain tomorrow"
"7-day weather briefing"
```
## Install Locally
```bash
# Clone and install
git clone https://github.com/openclaw/skills.git
cd skills/weather-alert
pip install pyyaml # optional, for config.yaml support
# Test
python3 -m unittest discover -s test -v
# Run
python scripts/weather_alert.py --location Berlin --days 5
```
## Configuration
Edit `config.yaml` to set your default location and alert thresholds.
## API Data Source
Open-Meteo (free, 1000 req/hour, no key): https://open-meteo.com/
FILE:config.yaml
default_location:
name: "Prague"
lat: 50.0755
lon: 14.4378
alerts:
rain_threshold: 60 # % probability
temp_min: 5 # °C below which to alert
temp_max: 30 # °C above which to alert
wind_max: 40 # km/h
snow_depth: 5 # cm
uv_max: 7
frost_threshold: 0 # °C
notification:
method: "exec-event"
schedule_check: "6h"
FILE:demo_output.md
# Demo Output — weather-alert Skill
## Current Weather
```
$ python scripts/weather_alert.py --current
🌤 Prague Weather — 2026-04-17 15:00
Temp: 12.5°C (feels like 10.2°C) | Humidity: 65%
Wind: 15 km/h W | Precip: 0mm | UV: 3.2
Pressure: 1013.0 hPa | Partly cloudy
```
## 7-Day Forecast
```
$ python scripts/weather_alert.py
🌤 Prague Weather — 2026-04-17 15:00
Temp: 12.5°C (feels like 10.2°C) | Humidity: 65%
Wind: 15 km/h W | Precip: 0mm | UV: 3.2
Pressure: 1013.0 hPa | Partly cloudy
📅 7-Day Forecast
Fri 17: ⛅ 5°-18°C | Precip: 10% | Wind: 15 km/h
Sat 18: 🌧️ 3°-10°C | Precip: 80% | Wind: 30 km/h
Sun 19: ⛅ 8°-15°C | Precip: 20% | Wind: 12 km/h
Mon 20: ☀️ 10°-20°C | Precip: 5% | Wind: 8 km/h
Tue 21: ☀️ 12°-22°C | Precip: 0% | Wind: 10 km/h
Wed 22: ⛈️ 8°-14°C | Precip: 85% | Wind: 35 km/h
Thu 23: 🌧️ 6°-12°C | Precip: 70% | Wind: 25 km/h
```
## Alerts
```
$ python scripts/weather_alert.py --location Berlin
🌤 Berlin Weather — 2026-04-17 15:00
Temp: 8.0°C (feels like 5.0°C) | Humidity: 80%
Wind: 20 km/h NW | Precip: 2mm | UV: 2.1
Pressure: 1008.5 hPa | Rain
📅 7-Day Forecast
Fri 17: 🌧️ 2°-10°C | Precip: 90% | Wind: 25 km/h
...
🔔 ALERTS:
• 🥶 Temperature 8.0°C is below 5°C threshold (feels like 5.0°C)
• 🌧 Rain probability 90% exceeds 60% threshold
• ❄️ Frost risk: overnight low 2°C below 0°C threshold
```
## Event Suitability
```
$ python scripts/weather_alert.py --event running
🏃 Running — Prague, Today
Good: Temp 5°-18°C — comfortable
Warning: Possible rain (10%)
```
```
$ python scripts/weather_alert.py --event picnic
🧺 Picnic — Prague, Today
Bad: High rain risk (80%), Wind too strong (30 km/h)
Suggestion: Move to Monday instead
```
FILE:references/weather-sources.md
# Weather Data Sources
## Primary: Open-Meteo API
**URL:** `https://api.open-meteo.com/v1/forecast`
**Cost:** Free (1000 requests/hour, no API key)
**Coverage:** Global, ~200+ weather parameters
**Limits:** 1000 requests/hour, max 1000 historical days
### Supported Parameters
- `current`: temperature_2m, relative_humidity_2m, apparent_temperature, precipitation, rain, showers, snowfall, weather_code, wind_speed_10m, wind_direction_10m, wind_gusts_10m, surface_pressure, uv_index, is_day
- `hourly`: temperature_2m, precipitation_probability, precipitation, weather_code, snowfall_probability
- `daily`: temperature_2m_min/max, precipitation_sum, rain_sum, snowfall_sum, wind_speed_10m_max, wind_gusts_10m_max, uv_index_max, precipitation_probability_max, weather_code
### Weather Codes (WMO)
| Code | Description | Code | Description |
|------|----------------------|------|----------------------|
| 0 | Clear sky | 61 | Rain light |
| 1 | Mainly clear | 63 | Rain moderate |
| 2 | Partly cloudy | 65 | Rain heavy |
| 3 | Overcast | 66 | Freezing rain light |
| 45 | Foggy | 67 | Freezing rain heavy |
| 48 | Rime fog | 71 | Snow light |
| 51 | Light drizzle | 73 | Snow moderate |
| 53 | Moderate drizzle | 75 | Snow heavy |
| 55 | Dense drizzle | 77 | Snow grains |
| 56 | Freezing drizzle | 80 | Rain showers light |
| 57 | Freezing drizzle hvy | 81 | Rain showers moderate|
| 95 | Thunderstorm | 82 | Rain showers heavy |
| 96 | Thunderstorm + hail | 85 | Snow showers light |
| 99 | Thunderstorm hvy hail| 86 | Snow showers heavy |
## Secondary: wttr.in
**URL:** `https://wttr.in/{location}?format=j1`
**Cost:** Free, no key
**Best for:** Quick lookups, human-readable text
**Note:** Less structured than Open-Meteo, good for simple queries
## Tertiary: OpenWeatherMap
**URL:** `https://api.openweathermap.org/data/2.5/weather`
**Cost:** Free tier (1000 calls/day)
**Requires:** API key
**Best for:** Historical data, detailed forecasts
## Timezones
Default timezone: `Europe/Prague`
Common alternatives:
- `Europe/London`
- `America/New_York`
- `America/Los_Angeles`
- `Asia/Tokyo`
- `Australia/Sydney`
FILE:scripts/weather_alert.py
#!/usr/bin/env python3
"""Weather Alert Skill — Proactive weather monitoring and alerting."""
import json
import os
import sys
import urllib.request
import urllib.error
import urllib.parse
from datetime import datetime, timedelta
from pathlib import Path
try:
import yaml
except ImportError:
yaml = None
# --- Configuration ---
DEFAULT_CONFIG = {
"default_location": {"name": "Prague", "lat": 50.0755, "lon": 14.4378},
"alerts": {
"rain_threshold": 60,
"temp_min": 5,
"temp_max": 30,
"wind_max": 40,
"snow_depth": 5,
"uv_max": 7,
"frost_threshold": 0,
},
"notification": {"method": "exec-event", "schedule_check": "6h"},
}
ALERTS_FILE = os.path.expanduser("~/.weather-alerts/alerts.yaml")
CACHE_FILE = os.path.expanduser("~/.weather-alerts/cache.json")
CACHE_TTL = 1800 # 30 min
def load_config():
"""Load config.yaml or return defaults."""
config_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"config.yaml",
)
if yaml and os.path.exists(config_path):
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
for key in DEFAULT_CONFIG:
cfg.setdefault(key, DEFAULT_CONFIG[key])
return cfg
return DEFAULT_CONFIG.copy()
def load_cache():
"""Load cached weather data if still valid."""
if not os.path.exists(CACHE_FILE):
return None
with open(CACHE_FILE) as f:
data = json.load(f)
age = (datetime.now() - datetime.fromisoformat(data["timestamp"])).total_seconds()
if age > CACHE_TTL:
return None
return data
def save_cache(data):
"""Save weather data to cache."""
os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True)
with open(CACHE_FILE, "w") as f:
json.dump(
{
"timestamp": datetime.now().isoformat(),
"data": data,
},
f,
indent=2,
)
def fetch_open_meteo(lat, lon, days=7):
"""Fetch weather data from Open-Meteo API (free, no key needed)."""
url = "https://api.open-meteo.com/v1/forecast" # noqa: E501
url += "?latitude=" + str(lat) + "&longitude=" + str(lon)
url += "¤t=temperature_2m,relative_humidity_2m,apparent_temperature,"
url += "precipitation,weather_code,wind_speed_10m,wind_direction_10m,"
url += "wind_gusts_10m,surface_pressure,uv_index"
url += "&daily=temperature_2m_min,temperature_2m_max,precipitation_sum,"
url += "snowfall_sum,wind_speed_10m_max,wind_gusts_10m_max,"
url += "uv_index_max,precipitation_probability_max"
url += "&timezone=Europe%2FPrague"
url += "&forecast_days=" + str(days)
req = urllib.request.Request(url, headers={"User-Agent": "WeatherAlert/1.0"})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError):
return None
def fetch_wttr(location):
"""Quick weather lookup via wttr.in."""
url = f"https://wttr.in/{urllib.parse.quote(location)}?format=j1"
req = urllib.request.Request(url, headers={"User-Agent": "WeatherAlert/1.0"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except Exception:
return None
WEATHER_CODES = {
0: ("☀️", "Clear sky"),
1: ("🌤", "Mainly clear"),
2: ("⛅", "Partly cloudy"),
3: ("☁️", "Overcast"),
45: ("🌫", "Foggy"),
48: ("🌫", "Rime fog"),
51: ("🌦", "Light drizzle"),
53: ("🌦", "Moderate drizzle"),
55: ("🌧", "Dense drizzle"),
56: ("🌧", "Freezing drizzle"),
57: ("🌧", "Freezing drizzle heavy"),
61: ("🌧", "Rain light"),
63: ("🌧", "Rain moderate"),
65: ("🌧", "Rain heavy"),
66: ("🌨", "Freezing rain light"),
67: ("🌨", "Freezing rain heavy"),
71: ("🌨", "Snow light"),
73: ("🌨", "Snow moderate"),
75: ("❄️", "Snow heavy"),
77: ("❄️", "Snow grains"),
80: ("🌦", "Rain showers light"),
81: ("🌧", "Rain showers moderate"),
82: ("🌧", "Rain showers heavy"),
85: ("🌨", "Snow showers light"),
86: ("🌨", "Snow showers heavy"),
95: ("⛈", "Thunderstorm"),
96: ("⛈", "Thunderstorm with hail"),
99: ("⛈", "Thunderstorm heavy hail"),
}
def get_weather_icon(code):
"""Get emoji icon for weather code."""
if code in WEATHER_CODES:
icon, _ = WEATHER_CODES[code]
return icon
return "🌡"
def get_weather_description(code):
"""Get description for weather code."""
if code in WEATHER_CODES:
_, desc = WEATHER_CODES[code]
return desc
return "Unknown"
def format_current(current, daily, location_name):
"""Format current weather conditions."""
if not current:
return f"⚠ Could not fetch weather for {location_name}"
temp = current.get("temperature_2m", "—")
feels_like = current.get("apparent_temperature", "—")
humidity = current.get("relative_humidity_2m", "—")
wind = current.get("wind_speed_10m", "—")
wind_dir = current.get("wind_direction_10m", 0)
dir_names = {0: "calm", 45: "NE", 90: "E", 135: "SE", 180: "S", 225: "SW", 270: "W", 315: "NW"}
wind_dir_name = dir_names.get(round(wind_dir / 45) * 45 % 360, f"{wind_dir}°")
precip = current.get("precipitation", 0)
uv = current.get("uv_index", "—")
pressure = current.get("surface_pressure", "—")
code = current.get("weather_code", -1)
icon = get_weather_icon(code)
lines = [
f"🌤 {location_name} Weather — {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"Temp: {temp}°C (feels like {feels_like}°C) | Humidity: {humidity}%",
f"Wind: {wind} km/h {wind_dir_name} | Precip: {precip}mm | UV: {uv}",
f"Pressure: {round(pressure / 100, 1)} hPa | {get_weather_description(code)}",
]
return "\n".join(lines)
def format_forecast(daily, days=7):
"""Format multi-day forecast."""
if not daily or not daily.get("time"):
return "⚠ No forecast data available"
times = daily["time"][:days]
codes = daily.get("weather_code", [])
temps_min = daily.get("temperature_2m_min", [])
temps_max = daily.get("temperature_2m_max", [])
precip_pmax = daily.get("precipitation_probability_max", [])
precip_sum = daily.get("precipitation_sum", [])
snow_sum = daily.get("snowfall_sum", [])
wind_max = daily.get("wind_speed_10m_max", [])
uv_max = daily.get("uv_index_max", [])
lines = ["📅 7-Day Forecast"]
for i, t in enumerate(times):
date = datetime.fromisoformat(t).strftime("%a %d")
icon = get_weather_icon(codes[i] if i < len(codes) else 3)
t_min = temps_min[i] if i < len(temps_min) else "?"
t_max = temps_max[i] if i < len(temps_max) else "?"
precip = precip_pmax[i] if i < len(precip_pmax) else "?"
precip = f"{precip}%" if isinstance(precip, int) else "?"
wind = wind_max[i] if i < len(wind_max) else "?"
uv = uv_max[i] if i < len(uv_max) else "?"
line = f"{date}: {icon} {t_min}°-{t_max}°C | Precip: {precip} | Wind: {wind} km/h"
if snow_sum and snow_sum[i] > 0:
line += f" | Snow: {snow_sum[i]}cm"
lines.append(line)
return "\n".join(lines)
def check_alerts(current, daily, config):
"""Check current conditions against alert thresholds. Returns list of triggered alerts."""
alerts = []
thresholds = config.get("alerts", {})
# Temperature alerts
temp = current.get("temperature_2m", 0) if current else None
if temp is not None:
tmin = thresholds.get("temp_min")
if tmin is not None and temp < tmin:
alerts.append(f"🥶 Temperature {temp}°C is below {tmin}°C threshold")
tmax = thresholds.get("temp_max")
if tmax is not None and temp > tmax:
alerts.append(f"🔥 Temperature {temp}°C exceeds {tmax}°C threshold")
# Rain alerts
precip_prob = 0
if daily and daily.get("precipitation_probability_max"):
precip_prob = daily["precipitation_probability_max"][0] if daily["precipitation_probability_max"] else 0
elif current and current.get("precipitation", 0) > 0:
precip_prob = 100
rain_thresh = thresholds.get("rain_threshold")
if rain_thresh is not None and precip_prob > rain_thresh:
alerts.append(f"🌧 Rain probability {precip_prob}% exceeds {rain_thresh}% threshold")
# Snow alerts
if daily and daily.get("snowfall_sum"):
snow = daily["snowfall_sum"][0] if daily["snowfall_sum"] else 0
snow_thresh = thresholds.get("snow_depth")
if snow_thresh is not None and snow > snow_thresh:
alerts.append(f"❄️ Snowfall {snow}cm exceeds {snow_thresh}cm threshold")
# Wind alerts
wind = current.get("wind_speed_10m", 0) if current else 0
gusts = current.get("wind_gusts_10m", 0) if current else 0
wind_max = thresholds.get("wind_max")
if wind_max is not None:
if wind > wind_max:
alerts.append(f"💨 Wind {wind} km/h exceeds {wind_max} km/h threshold")
if gusts > wind_max:
alerts.append(f"💨 Wind gusts {gusts} km/h exceeds {wind_max} km/h threshold")
# UV alerts
uv = current.get("uv_index", 0) if current else 0
uv_max = thresholds.get("uv_max")
if uv_max is not None and uv > uv_max:
alerts.append(f"☀️ UV index {uv} exceeds {uv_max} threshold")
# Frost alerts
if daily and daily.get("temperature_2m_min") and daily["temperature_2m_min"]:
min_temp = daily["temperature_2m_min"][0]
frost_thresh = thresholds.get("frost_threshold")
if frost_thresh is not None and min_temp < frost_thresh:
alerts.append(f"❄️ Frost risk: overnight low {min_temp}°C below {frost_thresh}°C")
return alerts
def format_event_suitability(daily, activity, location_name):
"""Check suitability for outdoor activities."""
if not daily or not daily.get("time"):
return f"⚠ No forecast data for {location_name}"
tomorrow = 0 # today index
temp_min = daily["temperature_2m_min"][tomorrow] if daily["temperature_2m_min"] else None
temp_max = daily["temperature_2m_max"][tomorrow] if daily["temperature_2m_max"] else None
precip = daily["precipitation_probability_max"][tomorrow] if daily.get("precipitation_probability_max") else 0
precip_sum = daily["precipitation_sum"][tomorrow] if daily.get("precipitation_sum") else 0
wind = daily["wind_speed_10m_max"][tomorrow] if daily.get("wind_speed_10m_max") else 0
uv = daily["uv_index_max"][tomorrow] if daily.get("uv_index_max") else 0
snow = daily["snowfall_sum"][tomorrow] if daily.get("snowfall_sum") else 0
good = []
warnings = []
bad = []
if temp_min is not None:
if temp_min >= 5 and temp_max and temp_max <= 25:
good.append(f"Temp {temp_min}°-{temp_max}°C — comfortable")
elif temp_max and temp_max > 30:
bad.append(f"Temp too hot ({temp_max}°C)")
elif temp_min and temp_min < 0:
bad.append(f"Frost risk ({temp_min}°C)")
else:
warnings.append(f"Temp {temp_min}°-{temp_max}°C — check if comfortable")
if precip >= 70:
bad.append(f"High rain risk ({precip}%)")
elif precip >= 40:
warnings.append(f"Possible rain ({precip}%)")
elif precip_sum > 5:
warnings.append(f"Rain expected ({precip_sum}mm)")
if wind > 30:
bad.append(f"Wind too strong ({wind} km/h)")
elif wind > 15:
warnings.append(f"Moderate wind ({wind} km/h)")
if snow > 1:
bad.append(f"Snow expected ({snow}cm)")
activity_labels = {
"running": "🏃",
"hiking": "🥾",
"picnic": "🧺",
"cycling": "🚴",
"garden": "🌿",
"farming": "🚜",
"sailing": "⛵",
"skiing": "⛷",
"swimming": "🏊",
"photography": "📸",
}
icon = activity_labels.get(activity, "👤")
lines = [f"{icon} {activity.title()} — {location_name}, Today"]
if good:
lines.append(f"Good: {'; '.join(good)}")
if warnings:
lines.append(f"Warning: {'; '.join(warnings)}")
if bad:
lines.append(f"Bad: {'; '.join(bad)}")
if not good and not warnings and not bad:
lines.append("✅ Conditions seem fine")
return "\n".join(lines)
def main():
config = load_config()
args = sys.argv[1:]
# Determine location
location_name = config["default_location"]["name"]
lat = config["default_location"]["lat"]
lon = config["default_location"]["lon"]
days = 7
for i, arg in enumerate(args):
if arg == "--location" and i + 1 < len(args):
location_name = args[i + 1]
elif arg == "--lat" and i + 1 < len(args):
lat = float(args[i + 1])
elif arg == "--lon" and i + 1 < len(args):
lon = float(args[i + 1])
elif arg == "--days":
days = min(int(args[i + 1]), 14) if i + 1 < len(args) else 7
elif arg == "--event" and i + 1 < len(args):
activity = args[i + 1]
# Fetch and show event suitability
cached = load_cache()
if cached and cached.get("lat") == lat and cached.get("lon") == lon:
current = cached.get("current")
daily = cached.get("daily")
else:
data = fetch_open_meteo(lat, lon, days)
if data:
save_cache({"lat": lat, "lon": lon, "data": data})
current = data.get("current")
daily = data.get("daily")
else:
print("⚠ Could not fetch weather data")
sys.exit(1)
print(format_event_suitability(daily, activity, location_name))
return
elif arg in ("--current", "--check", "--today"):
# Show current weather only
pass
elif arg == "--alerts":
# Check alerts only
pass
elif arg == "--format" and i + 1 < len(args):
days = int(args[i + 1]) if i + 1 < len(args) else 7
# Fetch data
cached = load_cache()
if cached and cached.get("lat") == lat and cached.get("lon") == lon:
data = cached["data"]
else:
data = fetch_open_meteo(lat, lon, days)
if not data:
print("⚠ Could not fetch weather data")
sys.exit(1)
save_cache({"lat": lat, "lon": lon, "data": data})
current = data.get("current")
daily = data.get("daily")
# Output current weather
print(format_current(current, daily, location_name))
print()
# Output forecast
print(format_forecast(daily, min(days, 7)))
print()
# Check and report alerts
triggered = check_alerts(current, daily, config)
if triggered:
print("🔔 ALERTS:")
for a in triggered:
print(f" • {a}")
else:
print("✅ No alerts triggered — conditions within normal ranges")
if __name__ == "__main__":
main()
FILE:test/test_weather_alert.py
"""Tests for weather-alert skill."""
import json
import os
import sys
import unittest
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
import weather_alert as wa
class TestWeatherIcons(unittest.TestCase):
def test_known_code(self):
self.assertEqual(wa.get_weather_icon(0), "☀️")
self.assertEqual(wa.get_weather_icon(63), "🌧")
self.assertEqual(wa.get_weather_icon(95), "⛈")
def test_unknown_code(self):
self.assertEqual(wa.get_weather_icon(999), "🌡")
class TestWeatherDescriptions(unittest.TestCase):
def test_clear(self):
self.assertEqual(wa.get_weather_description(0), "Clear sky")
def test_rain(self):
self.assertEqual(wa.get_weather_description(63), "Rain moderate")
def test_unknown(self):
self.assertEqual(wa.get_weather_description(999), "Unknown")
class TestFormatCurrent(unittest.TestCase):
def test_valid_data(self):
current = {
"temperature_2m": 12.5,
"apparent_temperature": 10.2,
"relative_humidity_2m": 65,
"precipitation": 0.0,
"wind_speed_10m": 15,
"wind_direction_10m": 270,
"surface_pressure": 101300,
"uv_index": 3.2,
"weather_code": 2,
}
result = wa.format_current(current, None, "TestCity")
self.assertIn("TestCity Weather", result)
self.assertIn("12.5°C", result)
self.assertIn("10.2°C", result)
self.assertIn("65%", result)
self.assertIn("15 km/h", result)
def test_empty_data(self):
result = wa.format_current(None, None, "Empty")
self.assertIn("Could not fetch", result)
class TestFormatForecast(unittest.TestCase):
def test_valid_daily(self):
daily = {
"time": ["2026-04-17", "2026-04-18", "2026-04-19"],
"weather_code": [0, 61, 95],
"temperature_2m_min": [5, 3, 8],
"temperature_2m_max": [18, 10, 15],
"precipitation_probability_max": [0, 80, 90],
"wind_speed_10m_max": [15, 30, 45],
"uv_index_max": [5, 2, 6],
"precipitation_sum": [0.0, 5.2, 12.0],
"snowfall_sum": [0, 0, 0],
}
result = wa.format_forecast(daily, 3)
self.assertIn("7-Day Forecast", result)
self.assertIn("18°", result)
self.assertIn("🌧", result)
def test_empty_data(self):
result = wa.format_forecast(None)
self.assertIn("No forecast data", result)
class TestCheckAlerts(unittest.TestCase):
def setUp(self):
self.config = {
"alerts": {
"rain_threshold": 60,
"temp_min": 5,
"temp_max": 30,
"wind_max": 40,
"snow_depth": 5,
"uv_max": 7,
"frost_threshold": 0,
}
}
def test_no_alerts(self):
current = {
"temperature_2m": 15,
"precipitation": 0,
"wind_speed_10m": 10,
"wind_gusts_10m": 15,
"uv_index": 3,
}
daily = {
"temperature_2m_min": [5],
}
alerts = wa.check_alerts(current, daily, self.config)
self.assertEqual(alerts, [])
def test_temp_below_threshold(self):
current = {"temperature_2m": 2}
daily = {"temperature_2m_min": [2]}
alerts = wa.check_alerts(current, daily, self.config)
self.assertTrue(any("Temperature" in a for a in alerts))
def test_rain_alert(self):
current = {"temperature_2m": 15}
daily = {
"temperature_2m_min": [5],
"precipitation_probability_max": [80],
}
alerts = wa.check_alerts(current, daily, self.config)
self.assertTrue(any("Rain" in a for a in alerts))
def test_uv_alert(self):
current = {"temperature_2m": 25, "uv_index": 9}
daily = {"temperature_2m_min": [10]}
alerts = wa.check_alerts(current, daily, self.config)
self.assertTrue(any("UV" in a for a in alerts))
def test_frost_alert(self):
current = {"temperature_2m": 5}
daily = {"temperature_2m_min": [-2]}
alerts = wa.check_alerts(current, daily, self.config)
self.assertTrue(any("Frost" in a for a in alerts))
def test_wind_alert(self):
current = {
"temperature_2m": 15,
"wind_speed_10m": 50,
"wind_gusts_10m": 65,
"uv_index": 3,
}
daily = {"temperature_2m_min": [5]}
alerts = wa.check_alerts(current, daily, self.config)
self.assertTrue(any("Wind" in a for a in alerts))
class TestFormatEvent(unittest.TestCase):
def test_good_conditions(self):
daily = {
"time": ["2026-04-17"],
"temperature_2m_min": [8],
"temperature_2m_max": [18],
"precipitation_probability_max": [10],
"precipitation_sum": [0],
"wind_speed_10m_max": [10],
"uv_index_max": [4],
"snowfall_sum": [0],
}
result = wa.format_event_suitability(daily, "running", "TestCity")
self.assertIn("Good:", result)
def test_bad_conditions(self):
daily = {
"time": ["2026-04-17"],
"temperature_2m_min": [-5],
"temperature_2m_max": [2],
"precipitation_probability_max": [90],
"precipitation_sum": [10],
"wind_speed_10m_max": [45],
"uv_index_max": [1],
"snowfall_sum": [15],
}
result = wa.format_event_suitability(daily, "picnic", "TestCity")
self.assertIn("Bad:", result)
class TestConfigLoading(unittest.TestCase):
def test_default_config(self):
config = wa.load_config()
self.assertIn("default_location", config)
self.assertIn("alerts", config)
self.assertEqual(config["default_location"]["name"], "Prague")
if __name__ == "__main__":
unittest.main()
Automatically sort and rename files by type into structured folders with undo support, configurable filters, and dry-run preview for safe batch organization.
---
name: file-organizer
description: Automated file management — sorts downloads folder by type, renames files with consistent patterns, generates structured folder hierarchy, with undo/restore capability. Use when: (1) Downloads folder is chaotic, (2) Need to organize files by category, (3) Want consistent naming conventions, (4) Need to restore files to original state, (5) Want to batch-rename files
---
# File Organizer Skill
Automatically sort, rename, and organize files into structured folders. Reduces chaos in Downloads and any directory with scattered files.
## Quick Start
```bash
# Install the skill
npx clawhub install file-organizer
# Organize your downloads folder
"Organize my Downloads folder"
# Organize a specific directory
"Organize ~/Documents/projects"
# Undo the last organization
"Undo the last file organization"
```
## Core Features
### 1. Automatic Sorting
Files are categorized into standard folders:
- **Images** (`images/`) — JPG, PNG, GIF, SVG, WEBP, BMP, TIFF, ICO
- **Documents** (`documents/`) — PDF, DOC, DOCX, TXT, RTF, ODT, PPT, PPTX, XLS, XLSX, PPT, ODP
- **Audio** (`audio/`) — MP3, WAV, FLAC, OGG, AAC, M4A
- **Video** (`video/`) — MP4, MKV, AVI, MOV, WEBM, FLV
- **Archives** (`archives/`) — ZIP, TAR, GZ, 7Z, RAR, BZ2, XZ
- **Code** (`code/`) — PY, JS, TS, HTML, CSS, MD, JSON, YAML, SH, BAT
- **Data** (`data/`) — CSV, JSON, XML, SQL, DB, SQLITE, DAT
- **Executables** (`installers/`) — EXE, MSI, DMG, APP, PKG, DEB, RPM, APK
- **Fonts** (`fonts/`) — TTF, OTF, WOFF, WOFF2, FON
- **Other** (`other/`) — uncategorized files
### 2. Smart File Renaming
- Extracts dates from filenames (YYYY-MM-DD patterns) and places them in naming
- Converts spaces to underscores for consistency
- Strips special characters (preserving hyphens in compound names)
- Handles duplicate names by appending `_2`, `_3`, etc.
- Preserves original extension case (lowercased for consistency)
### 3. Undo / Restore
- Generates a `ORGANIZE_LOG.json` after each operation
- Stores original path, new path, and timestamp for every move
- `undo` command restores files to their exact original locations
- Safe: only restores if files haven't been modified since
### 4. Preview Mode
- `--dry-run` shows what would change without moving files
- Reports counts by category and renamed files
- No disk writes in dry-run mode
## Configuration
Edit `config.yaml` to customize:
```yaml
source_dirs:
- ~/Downloads
target_base: ~/organized-files
auto_sort: true
rename_pattern: "{name}_{date}" # or "none" to skip renaming
max_file_size_mb: 500 # skip files larger than this
exclude_patterns:
- "*.tmp"
- "*.swp"
- ".DS_Store"
- "Thumbs.db"
log_file: ORGANIZE_LOG.json
```
## File Type Patterns
Full mapping is in `references/file-patterns.md`. You can add custom mappings:
```yaml
custom_types:
- name: "Design Assets"
folders: "design/"
extensions: [AI, PSD, SKETCH, FIG, INDD]
```
## Safety
- **Dry-run first**: Always review changes before committing
- **Undo-safe**: Every move is logged; full restore possible
- **Size limits**: Skip large files to avoid moving heavy media
- **Exclude patterns**: Configurable file filters to skip unwanted types
- **No data loss**: Only moves files; never deletes
## Integration
- Works with any directory (Downloads, Documents, Desktop, project roots)
- Integrates with cron-manager for periodic auto-organizing
- Compatible with file-sync workflows
- Output is compatible with cloud storage (consistent naming)
FILE:README.md
# File Organizer Skill
Automatically sort, rename, and organize files into structured folders.
## Features
- **Auto-sorting** — Files categorized into 10 folder types (images, documents, audio, video, archives, code, data, installers, fonts, other)
- **Smart renaming** — Extracts dates, removes special characters, handles duplicates
- **Undo/restore** — Full rollback capability via `ORGANIZE_LOG.json`
- **Dry-run mode** — Preview changes without touching files
- **Configurable** — Customize categories, patterns, exclusions, file size limits
## Quick Start
```bash
# Organize downloads
python3 scripts/organize.py ~/Downloads
# Preview changes first
python3 scripts/organize.py ~/Downloads --dry-run
# Undo last organization
python3 scripts/organize.py --undo
# Output as JSON
python3 scripts/organize.py ~/Downloads --json
```
## Configuration
Edit `config.yaml`:
```yaml
source_dirs:
- ~/Downloads
target_base: ~/organized-files
rename_pattern: "{name}_{date}" # or "none"
max_file_size_mb: 500
exclude_patterns:
- "*.tmp"
- ".DS_Store"
```
## File Categories
| Folder | Extensions |
|--------|-----------|
| images | jpg, png, gif, svg, webp, bmp, tiff, ico |
| documents | pdf, doc, docx, txt, rtf, odt, ppt, xls |
| audio | mp3, wav, flac, ogg, aac, m4a |
| video | mp4, mkv, avi, mov, webm, flv |
| archives | zip, tar, gz, 7z, rar, bz2, xz |
| code | py, js, ts, html, css, md, json, yaml |
| data | csv, xml, db, sqlite, dat, log |
| installers | exe, msi, dmg, app, pkg, deb, rpm, apk |
| fonts | ttf, otf, woff, woff2, fon |
| other | uncategorized files |
## License
MIT
FILE:config.yaml
source_dirs:
- ~/Downloads
target_base: ~/organized-files
auto_sort: true
rename_pattern: "{name}_{date}"
max_file_size_mb: 500
exclude_patterns:
- "*.tmp"
- "*.swp"
- ".DS_Store"
- "Thumbs.db"
log_file: ORGANIZE_LOG.json
FILE:demo_output.md
# File Organizer Demo Output
## Source: ~/Downloads (before)
```
downloads/
├── photo.jpg
├── report 2025-03-15.pdf
├── song.mp3
├── video.mp4
├── archive.zip
├── script.py
├── data.csv
├── installer.exe
├── font.ttf
├── unknown.xyz
├── .DS_Store
├── temp.tmp
└── backup.tar.gz
```
## After Running: `python3 scripts/organize.py ~/Downloads`
### Directory Structure (after)
```
organized-files/
├── images/
│ └── photo.jpg
├── documents/
│ └── report_2025-03-15.pdf
├── audio/
│ └── song.mp3
├── video/
│ └── video.mp4
├── archives/
│ ├── archive.zip
│ └── backup.tar.gz
├── code/
│ └── script.py
├── data/
│ └── data.csv
├── installers/
│ └── installer.exe
├── fonts/
│ └── font.ttf
└── other/
└── unknown.xyz
```
Excluded (not moved):
- `.DS_Store` — in exclude_patterns
- `temp.tmp` — in exclude_patterns
### Rename Behavior
| Original | Result | Reason |
|----------|--------|--------|
| `report 2025-03-15.pdf` | `report_2025-03-15.pdf` | Date extracted, spaces → underscores |
| `photo.jpg` | `photo.jpg` | No changes needed |
### Undo
Running `python3 scripts/organize.py --undo` restores all files to `~/Downloads`.
### Dry Run
`python3 scripts/organize.py ~/Downloads --dry-run` previews:
```
[DRY RUN] photo.jpg → images/photo.jpg
[DRY RUN] report 2025-03-15.pdf → documents/report_2025-03-15.pdf
[DRY RUN] song.mp3 → audio/song.mp3
[DRY RUN] video.mp4 → video/video.mp4
[DRY RUN] archive.zip → archives/archive.zip
[DRY RUN] script.py → code/script.py
[DRY RUN] data.csv → data/data.csv
[DRY RUN] installer.exe → installers/installer.exe
[DRY RUN] font.ttf → fonts/font.ttf
[DRY RUN] unknown.xyz → other/unknown.xyz
[DRY RUN] backup.tar.gz → archives/backup.tar.gz
```
FILE:references/file-patterns.md
# File Type Patterns Reference
## Built-in Categories
### Images
- **JPG/JPEG** — Standard photo format
- **PNG** — Lossless compression, transparency support
- **GIF** — Animated or static graphics
- **SVG** — Scalable vector graphics
- **WEBP** — Modern web format
- **BMP** — Bitmap images
- **TIFF/TIF** — High-quality archival format
- **ICO** — Icons
### Documents
- **PDF** — Portable document format
- **DOC/DOCX** — Microsoft Word
- **TXT** — Plain text
- **RTF** — Rich text format
- **ODT** — OpenDocument text
- **PPT/PPTX** — Microsoft PowerPoint
- **XLS/XLSX** — Microsoft Excel
- **ODP** — OpenDocument presentation
- **ODG** — OpenDocument graphics
- **TEX/TEXX** — LaTeX source
- **EPUB** — E-book format
### Audio
- **MP3** — Standard compressed audio
- **WAV** — Uncompressed audio
- **FLAC** — Lossless compressed
- **OGG** — Ogg Vorbis
- **AAC** — Advanced Audio Coding
- **M4A** — Apple AAC
- **WMA** — Windows Media Audio
- **AIFF** — Apple Audio Interchange
### Video
- **MP4** — Standard video format
- **MKV** — Matroska (supports subtitles)
- **AVI** — Video for Windows
- **MOV** — QuickTime format
- **WEBM** — Web-oriented video
- **FLV** — Flash Video
- **WMV** — Windows Media Video
- **M4V** — Apple video format
- **3GP** — Mobile video format
### Archives
- **ZIP** — Standard archive
- **TAR** — Unix tape archive
- **GZ** — Gzip compressed
- **7Z** — 7-Zip format
- **RAR** — WinRAR archive
- **BZ2** — Bzip2 compressed
- **XZ** — Lempel-Ziv compressed
- **TGZ** — Gzip-compressed tar
- **ZST** — Zstandard archive
### Code
- **Python** — py
- **JavaScript** — js, jsx
- **TypeScript** — ts, tsx
- **HTML/CSS** — html, css, scss
- **Markdown** — md
- **Data** — json, yaml, yml
- **Shell** — sh, bash
- **Batch** — bat, ps1
- **Ruby** — rb
- **Go** — go
- **Rust** — rs
- **Java** — java
- **C/C++** — c, cpp, h
- **Swift** — swift
- **Kotlin** — kt
- **PHP** — php
- **SQL** — sql
### Data
- **CSV** — Comma-separated values
- **XML** — Extensible Markup Language
- **DB/SQLITE** — SQLite database
- **DAT** — Generic data file
- **LOG** — Log files
### Installers
- **Windows** — exe, msi
- **macOS** — dmg, app, pkg
- **Linux** — deb, rpm
- **Android** — apk
- **Disk** — iso, img
### Fonts
- **TTF** — TrueType Font
- **OTF** — OpenType Font
- **WOFF/WOFF2** — Web fonts
- **FON** — Windows font
- **EOT** — Embedded OpenType
## Custom Categories
Add custom mappings in config.yaml:
```yaml
custom_types:
- name: "Design Assets"
folders: "design/"
extensions: [AI, PSD, SKETCH, FIG, INDD]
- name: "Development Tools"
folders: "dev-tools/"
extensions: [VSCODE, SUBPROJECT]
```
## Naming Convention Options
| Pattern | Result |
|---------|--------|
| `{name}_{date}` | `report_2025-03-15.pdf` |
| `{name}` | `report.pdf` |
| `none` | `report.pdf` (no date, no change) |
## Safety Features
- Files larger than `max_file_size_mb` are skipped
- Exclude patterns prevent unwanted files from moving
- Undo log preserves every move for full rollback
- Dry-run mode previews without touching files
- Duplicate names get `_2`, `_3` suffixes
FILE:scripts/organize.py
#!/usr/bin/env python3
"""
File Organizer — sorts files into categorized folders with rename, undo, and dry-run support.
"""
import argparse
import json
import os
import re
import shutil
import sys
from datetime import datetime
from pathlib import Path
DEFAULT_CONFIG = {
"source_dirs": ["~/Downloads"],
"target_base": "~/organized-files",
"auto_sort": True,
"rename_pattern": "{name}_{date}",
"max_file_size_mb": 500,
"exclude_patterns": ["*.tmp", "*.swp", ".DS_Store", "Thumbs.db"],
"log_file": "ORGANIZE_LOG.json",
}
# File type to folder mapping
FILE_CATEGORIES = {
"images": ["jpg", "jpeg", "png", "gif", "svg", "webp", "bmp", "tiff", "tif", "ico", "heic"],
"documents": ["pdf", "doc", "docx", "txt", "rtf", "odt", "ppt", "pptx", "xls", "xlsx", "odp", "odg", "tex", "epub"],
"audio": ["mp3", "wav", "flac", "ogg", "aac", "m4a", "wma", "aiff"],
"video": ["mp4", "mkv", "avi", "mov", "webm", "flv", "wmv", "m4v", "3gp"],
"archives": ["zip", "tar", "gz", "7z", "rar", "bz2", "xz", "tgz", "zst"],
"code": ["py", "js", "ts", "jsx", "tsx", "html", "css", "scss", "md", "json", "yaml", "yml", "sh", "bat", "ps1", "rb", "go", "rs", "java", "c", "cpp", "h", "swift", "kt", "php", "sql"],
"data": ["csv", "xml", "db", "sqlite", "sqlite3", "dat", "log"],
"installers": ["exe", "msi", "dmg", "app", "pkg", "deb", "rpm", "apk", "iso", "img"],
"fonts": ["ttf", "otf", "woff", "woff2", "fon", "eot"],
}
CATEGORIES = {v.lower(): k for k, vals in FILE_CATEGORIES.items() for v in vals}
def load_config(config_path="config.yaml"):
"""Load config from YAML or fall back to defaults."""
try:
import yaml
with open(config_path) as f:
return yaml.safe_load(f) or DEFAULT_CONFIG
except ImportError:
# Minimal YAML parser fallback
pass
except FileNotFoundError:
pass
cfg = DEFAULT_CONFIG.copy()
if os.path.exists(config_path):
# Simple key: value parsing for basic YAML
with open(config_path) as f:
for line in f:
line = line.strip()
if ":" in line and not line.startswith("#"):
key, val = line.split(":", 1)
key = key.strip()
val = val.strip()
if val.startswith("[") and val.endswith("]"):
items = [i.strip().strip('"').strip("'") for i in val[1:-1].split(",") if i.strip()]
cfg[key] = items
elif val.startswith('"') and val.endswith('"'):
cfg[key] = val[1:-1]
elif val.startswith("'") and val.endswith("'"):
cfg[key] = val[1:-1]
return cfg
def expand_paths(paths):
"""Expand ~ and relative paths to absolute."""
result = []
for p in paths:
p = os.path.expanduser(p)
if not os.path.isabs(p):
p = os.path.abspath(p)
result.append(p)
return result
def glob_match(pattern, filename):
"""Simple glob matching (supports *, ? patterns)."""
regex = "^" + pattern.replace(".", r"\.").replace("*", ".*").replace("?", ".") + "$"
return bool(re.match(regex, filename, re.IGNORECASE))
def should_exclude(filename, exclude_patterns):
"""Check if file matches any exclude pattern."""
for pat in exclude_patterns:
if glob_match(pat, filename):
return True
return False
def extract_date(filename):
"""Extract YYYY-MM-DD date from filename. Returns tuple (y, m, d) or None."""
matches = re.findall(r"(\d{4})[-_](\d{1,2})[-_](\d{1,2})", filename)
if matches:
y, m, d = matches[0]
return (y, f"{int(m):02d}", f"{int(d):02d}")
# Try month-day-year (validate: month <= 12, day <= 31)
matches = re.findall(r"(\d{1,2})[-_](\d{1,2})[-_](\d{4})", filename)
if matches:
a, b, y = matches[0]
a, b = int(a), int(b)
if a > 12:
# Likely day-first: swap
a, b = b, a
return (y, f"{a:02d}", f"{b:02d}")
return None
def extract_date_str(filename):
"""Extract YYYY-MM-DD date string from filename."""
d = extract_date(filename)
return "-".join(d) if d else None
def strip_existing_date(filename):
"""Remove date patterns from the stem of a filename."""
p = Path(filename)
stem = p.stem
# Remove YYYY-MM-DD or YYYY_MM_DD patterns
stem = re.sub(r"\d{4}[_-]\d{1,2}[_-]\d{1,2}", "", stem)
stem = re.sub(r"\d{1,2}[_-]\d{1,2}[_-]\d{4}", "", stem)
# Clean up double underscores/dashes left behind
stem = re.sub(r"_+", "_", stem)
stem = re.sub(r"-+", "-", stem)
stem = stem.strip("_-")
return stem if stem else p.stem
def sanitize_name(name, date=None, pattern="{name}_{date}"):
"""Create a clean filename from source."""
p = Path(name)
ext = p.suffix.lower() if p.suffix else ""
base = p.stem
if date and pattern != "none":
# Strip existing date from stem to avoid duplication
base = strip_existing_date(name)
# Remove spaces
base = base.replace(" ", "_")
# Remove special chars (keep hyphens, underscores, alphanumeric)
base = re.sub(r"[^\w\-]", "", base)
# Collapse multiple underscores/dashes
base = re.sub(r"_+", "_", base).strip("_-")
if not base:
base = "unnamed"
if pattern == "none":
return f"{base}{ext}"
if date:
return f"{base}_{date}{ext}"
return f"{base}{ext}"
def get_category(filename):
"""Determine file category based on extension."""
ext = Path(filename).suffix.lower().lstrip(".")
return CATEGORIES.get(ext, "other")
def organize_directory(source_dir, target_base, config, dry_run=False):
"""Organize files in a directory."""
source = os.path.abspath(source_dir)
target = os.path.abspath(target_base)
if not os.path.isdir(source):
print(f"Error: Source directory not found: {source_dir}")
return None
exclude = config.get("exclude_patterns", ["*.tmp", "*.swp", ".DS_Store", "Thumbs.db"])
max_size = config.get("max_file_size_mb", 500) * 1024 * 1024
rename_pat = config.get("rename_pattern", "{name}_{date}")
log_file = config.get("log_file", "ORGANIZE_LOG.json")
moves = []
errors = []
counts = {}
os.makedirs(target, exist_ok=True)
for filename in os.listdir(source):
filepath = os.path.join(source, filename)
# Skip directories
if not os.path.isfile(filepath):
continue
# Skip exclude patterns
if should_exclude(filename, exclude):
continue
# Skip large files
try:
size = os.path.getsize(filepath)
if size > max_size:
errors.append({"file": filename, "reason": f"Too large ({size // 1024 // 1024}MB)"})
continue
except OSError as e:
errors.append({"file": filename, "reason": str(e)})
continue
# Get category
ext = Path(filename).suffix.lower()
if not ext:
continue # No extension, skip
category = get_category(filename)
counts[category] = counts.get(category, 0) + 1
# Build new path
cat_dir = os.path.join(target, category)
date_str = extract_date_str(filename)
new_name = sanitize_name(filename, date_str, rename_pat)
new_path = os.path.join(cat_dir, new_name)
# Handle duplicates
if os.path.exists(new_path):
base = Path(new_name).stem
suffix = Path(new_name).suffix
counter = 2
while os.path.exists(new_path := os.path.join(cat_dir, f"{base}_{counter}{suffix}")):
counter += 1
new_path = os.path.join(cat_dir, f"{base}_{counter}{suffix}")
if dry_run:
print(f" [DRY RUN] {filename} → {category}/{new_name.lstrip('.')}")
else:
os.makedirs(cat_dir, exist_ok=True)
try:
shutil.move(filepath, new_path)
moves.append({
"original": filepath,
"new": new_path,
"category": category,
"date": datetime.now().isoformat(),
})
except OSError as e:
errors.append({"file": filename, "reason": str(e)})
return {
"moves": moves,
"counts": counts,
"errors": errors,
"timestamp": datetime.now().isoformat(),
"source": source,
"target": target,
"dry_run": dry_run,
}
def undo_operation(log_file_path, dry_run=False):
"""Undo the last organization operation from log."""
if not os.path.exists(log_file_path):
print(f"Error: No log file found: {log_file_path}")
return
with open(log_file_path) as f:
log = json.load(f)
# Get latest operation
if isinstance(log, list):
ops = log
else:
ops = [log]
latest = ops[-1] if ops else None
if not latest:
print("No operations to undo.")
return
moves = latest.get("moves", [])
if not moves:
print("No moves to undo.")
return
restored = 0
for move in moves:
orig = move["original"]
new = move["new"]
if os.path.exists(new):
if not dry_run:
os.makedirs(os.path.dirname(orig), exist_ok=True)
shutil.move(new, orig)
restored += 1
print(f" Restored: {Path(new).name} → {Path(orig).name}")
else:
print(f" Skipped (not found): {os.path.basename(new)}")
if not dry_run:
# Remove category dirs if empty
cats = set(m.get("category", "") for m in moves)
for cat in cats:
cat_dir = os.path.join(latest.get("target", ""), cat)
if os.path.isdir(cat_dir) and not os.listdir(cat_dir):
os.rmdir(cat_dir)
print(f"Restored {restored} file(s).")
return {"restored": restored, "timestamp": datetime.now().isoformat()}
def main():
parser = argparse.ArgumentParser(description="Organize files into categorized folders")
parser.add_argument("source", nargs="?", default="~/Downloads", help="Source directory")
parser.add_argument("--target", default="~/organized-files", help="Target directory")
parser.add_argument("--config", default="config.yaml", help="Config file path")
parser.add_argument("--dry-run", action="store_true", help="Preview without moving files")
parser.add_argument("--undo", action="store_true", help="Undo last operation")
parser.add_argument("--undo-log", default="ORGANIZE_LOG.json", help="Undo log file path")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
config = load_config(args.config)
if args.undo:
result = undo_operation(args.undo_log, args.dry_run)
if result and args.json:
print(json.dumps(result, indent=2))
elif not args.json:
pass # undo prints its own output
return
source_dirs = config.get("source_dirs", [args.source])
target_base = args.target or config.get("target_base", "~/organized-files")
# Expand ~
source_dirs = expand_paths(source_dirs)
target_base = expand_paths([target_base])[0]
all_counts = {}
all_moves = []
all_errors = []
for source in source_dirs:
result = organize_directory(source, target_base, config, args.dry_run)
if result:
all_counts.update(result["counts"])
all_moves.extend(result["moves"])
all_errors.extend(result["errors"])
# Log the operation (unless dry-run)
if not args.dry_run and all_moves:
log_path = os.path.join(os.path.abspath(target_base), args.undo_log)
log_dir = os.path.dirname(log_path)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
# Load existing log and append
existing = []
if os.path.exists(log_path):
try:
with open(log_path) as f:
existing = json.load(f)
if isinstance(existing, dict):
existing = [existing]
except json.JSONDecodeError:
existing = []
operation = {
"timestamp": datetime.now().isoformat(),
"source": source_dirs,
"target": target_base,
"moves": all_moves,
"counts": all_counts,
}
existing.append(operation)
with open(log_path, "w") as f:
json.dump(existing, f, indent=2)
# Output
output = {
"timestamp": datetime.now().isoformat(),
"dry_run": args.dry_run,
"counts": all_counts,
"total_moves": len(all_moves),
"errors": all_errors,
}
if args.json:
print(json.dumps(output, indent=2))
else:
print(f"\n{'DRY RUN' if args.dry_run else 'DONE'} — {len(source_dirs)} source(s)")
for cat, count in sorted(all_counts.items()):
print(f" {cat:15s} {count:4d} files")
if all_errors:
print(f"\n Errors ({len(all_errors)}):")
for e in all_errors[:5]:
print(f" - {e['file']}: {e['reason']}")
if len(all_errors) > 5:
print(f" ... and {len(all_errors) - 5} more")
if not args.dry_run and all_moves:
print(f"\n Logged to: {os.path.join(target_base, args.undo_log)}")
if __name__ == "__main__":
main()
FILE:test/test_organize.py
#!/usr/bin/env python3
"""Tests for file-organizer."""
import json
import os
import shutil
import sys
import tempfile
import unittest
from pathlib import Path
# Add scripts to path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
import organize
class TestCategories(unittest.TestCase):
def test_image_extensions(self):
self.assertEqual(organize.get_category("photo.jpg"), "images")
self.assertEqual(organize.get_category("icon.PNG"), "images")
self.assertEqual(organize.get_category("diagram.svg"), "images")
self.assertEqual(organize.get_category("animation.webp"), "images")
def test_document_extensions(self):
self.assertEqual(organize.get_category("report.pdf"), "documents")
self.assertEqual(organize.get_category("notes.txt"), "documents")
self.assertEqual(organize.get_category("presentation.pptx"), "documents")
def test_audio_extensions(self):
self.assertEqual(organize.get_category("song.mp3"), "audio")
self.assertEqual(organize.get_category("track.flac"), "audio")
def test_video_extensions(self):
self.assertEqual(organize.get_category("clip.mp4"), "video")
self.assertEqual(organize.get_category("movie.mkv"), "video")
def test_archive_extensions(self):
self.assertEqual(organize.get_category("backup.zip"), "archives")
self.assertEqual(organize.get_category("source.tar.gz"), "archives")
def test_code_extensions(self):
self.assertEqual(organize.get_category("script.py"), "code")
self.assertEqual(organize.get_category("page.html"), "code")
def test_data_extensions(self):
self.assertEqual(organize.get_category("data.csv"), "data")
def test_other(self):
self.assertEqual(organize.get_category("unknown.xyz"), "other")
self.assertEqual(organize.get_category("no_extension"), "other")
class TestRename(unittest.TestCase):
def test_sanitize_basic(self):
result = organize.sanitize_name("My Document.pdf", None, "{name}_{date}")
self.assertEqual(result, "My_Document.pdf")
def test_sanitize_with_date(self):
result = organize.sanitize_name("report 2025-03-15.pdf", "2025-03-15", "{name}_{date}")
self.assertEqual(result, "report_2025-03-15.pdf")
def test_sanitize_no_pattern(self):
result = organize.sanitize_name("file with spaces.txt", None, "none")
self.assertEqual(result, "file_with_spaces.txt")
def test_sanitize_special_chars(self):
result = organize.sanitize_name("file (v2).pdf", None, "none")
self.assertEqual(result, "file_v2.pdf")
def test_extract_date(self):
self.assertEqual(organize.extract_date("report_2025-03-15.pdf"), ("2025", "03", "15"))
self.assertEqual(organize.extract_date("doc_v1.0.txt"), None)
def test_extract_date_dmy(self):
result = organize.extract_date("report_15_03_2025.pdf")
self.assertEqual(result, ("2025", "03", "15"))
def test_extract_date_str(self):
self.assertEqual(organize.extract_date_str("report_2025-03-15.pdf"), "2025-03-15")
self.assertEqual(organize.extract_date_str("doc_v1.0.txt"), None)
class TestExclude(unittest.TestCase):
def test_tmp_excluded(self):
self.assertTrue(organize.should_exclude("test.tmp", ["*.tmp"]))
def test_txt_not_excluded(self):
self.assertFalse(organize.should_exclude("notes.txt", ["*.tmp"]))
def test_ds_store_excluded(self):
self.assertTrue(organize.should_exclude(".DS_Store", [".DS_Store"]))
class TestOrganizeDryRun(unittest.TestCase):
def test_dry_run_no_moves(self):
"""Dry run should not move any files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files
test_files = [
"photo.jpg",
"report.pdf",
"song.mp3",
]
for f in test_files:
Path(tmpdir, f).touch()
result = organize.organize_directory(tmpdir, tmpdir, organize.DEFAULT_CONFIG, dry_run=True)
self.assertIsNotNone(result)
self.assertEqual(len(result["moves"]), 0)
self.assertEqual(result["counts"]["images"], 1)
self.assertEqual(result["counts"]["documents"], 1)
self.assertEqual(result["counts"]["audio"], 1)
# Verify files still in original location
for f in test_files:
self.assertTrue(os.path.exists(os.path.join(tmpdir, f)))
class TestUndo(unittest.TestCase):
def test_undo_restores_files(self):
"""Test that undo restores files to original location."""
with tempfile.TemporaryDirectory() as tmpdir:
# Setup
source = os.path.join(tmpdir, "downloads")
target = os.path.join(tmpdir, "organized")
log_file = os.path.join(target, "ORGANIZE_LOG.json")
os.makedirs(source)
os.makedirs(target)
# Create a test file
test_file = os.path.join(source, "photo.jpg")
Path(test_file).touch()
# Organize (move)
result = organize.organize_directory(source, target, organize.DEFAULT_CONFIG)
self.assertEqual(len(result["moves"]), 1)
new_path = result["moves"][0]["new"]
self.assertTrue(os.path.exists(new_path))
self.assertFalse(os.path.exists(test_file))
# Save log for undo
with open(log_file, "w") as f:
json.dump([result], f)
# Undo
undo_result = organize.undo_operation(log_file)
self.assertEqual(undo_result["restored"], 1)
# Verify restored
self.assertTrue(os.path.exists(test_file))
class TestIntegration(unittest.TestCase):
def test_full_workflow(self):
"""End-to-end test: create files, organize, verify structure."""
with tempfile.TemporaryDirectory() as tmpdir:
source = os.path.join(tmpdir, "downloads")
target = os.path.join(tmpdir, "organized")
os.makedirs(source)
# Create test files across categories
test_files = [
("photo.jpg", "images"),
("report.pdf", "documents"),
("song.mp3", "audio"),
("video.mp4", "video"),
("archive.zip", "archives"),
("script.py", "code"),
("data.csv", "data"),
]
for fname, _ in test_files:
Path(source, fname).touch()
config = organize.DEFAULT_CONFIG.copy()
config["rename_pattern"] = "none"
result = organize.organize_directory(source, target, config, dry_run=False)
self.assertEqual(result["counts"], {cat: 1 for _, cat in test_files})
self.assertEqual(len(result["moves"]), len(test_files))
# Verify folder structure
for _, expected_cat in test_files:
cat_dir = os.path.join(target, expected_cat)
self.assertTrue(os.path.isdir(cat_dir))
if __name__ == "__main__":
unittest.main(verbosity=2)
Long-term memory system for OpenClaw agents. Store, retrieve, and query conversation history and learned information across sessions.
---
name: memory-augment
description: Long-term memory system for OpenClaw agents. Store, retrieve, and query conversation history and learned information across sessions.
---
# Memory Augment Skill
Provide long-term memory for OpenClaw agents. Store conversation history, learned facts, preferences, and context that persists across sessions.
## Quick Start
```bash
# Install via clawhub
npx clawhub install memory-augment
# Trigger
"Remember that I prefer Python for automation scripts"
"Find all notes about my workspace setup"
```
## Core Features
### 1. Long-Term Storage
Store any information that should persist:
- **User preferences**: Coding style, workspace config, tool choices
- **Learned facts**: Project details, technical decisions, patterns
- **Conversation history**: Context from past sessions, decisions made
- **Task tracking**: Todo items, progress, completed work
### 2. Semantic Search
Find stored information using natural language:
```bash
clawhub memory search "what did I decide about the inbox triage skill?"
```
### 3. Automatic Context Injection
Before each turn, automatically inject relevant memories:
```json
{
"context": {
"recent_memories": [
{"topic": "income", "content": "User approved inbox-triage for publishing"},
{"topic": "workspace", "content": "OpenClaw running on marekserver"}
],
"preferences": {
"model": "local/qwen3.5-35B-A3B",
"compute_tracked": true
}
}
}
```
### 4. Memory Expiry & Archiving
- **Temporary memories**: Auto-expire after 7 days (session notes)
- **Permanent memories**: Never expire (user preferences, core facts)
- **Archival**: Compress old memories to reduce token usage
## When to Use This Skill
✅ Need to remember user preferences across sessions
✅ Track conversation context over time
✅ Store learnings and decisions for future reference
✅ Query past information semantically
✅ Maintain agent personality and behavior consistency
❌ Not for storing sensitive data (passwords, API keys)
❌ Not for real-time data (current weather, live prices)
❌ Not for replacing database storage (structured data)
## How It Works
### Storage Layer
```yaml
# ~/.memory-augment/storage.yaml
memories:
- id: uuid-123
content: "User prefers Python for automation"
type: preference
tags: ["coding", "python", "automation"]
created: "2026-04-15T10:00:00Z"
expires: null # permanent
score: 0.85 # confidence/relevance
- id: uuid-124
content: "Approved inbox-triage skill for publishing"
type: decision
tags: ["income", "skills", "approval"]
created: "2026-04-15T20:37:00Z"
expires: "2026-04-22T20:37:00Z" # 7 days
score: 0.95
```
### Retrieval System
Uses hybrid search (keyword + semantic):
1. Parse query for keywords
2. Calculate relevance scores
3. Return top-K relevant memories
4. Inject into agent context
### Scoring Algorithm
Memories are scored based on:
- **Recency**: Newer = higher score
- **Tags match**: Query tags vs memory tags
- **Type relevance**: Preferences > decisions > context
- **Score boost**: User-corrected memories boost their own score
## Configuration
```yaml
# ~/.memory-augment/config.yaml
storage:
path: ~/.memory-augment/storage.yaml
format: yaml # or json
settings:
max_memories: 1000
default_expiry: 7 # days
score_decay: 0.95 # daily decay factor
search:
top_k: 20
min_score: 0.3
include_tags: true
auto_inject:
enabled: true
max_tokens: 5000
inject_before: ["each_turn", "weekly_summary"]
```
## Memory Types
### Preference
User preferences, preferences, coding style, tool choices.
```yaml
type: preference
tags: ["coding", "style"]
content: "Prefers concise code over comments"
```
### Decision
Decisions made, approvals, blocking choices.
```yaml
type: decision
tags: ["income", "skills"]
content: "Published inbox-triage to clawhub"
```
### Context
Session context, project state, ongoing work.
```yaml
type: context
tags: ["project", "setup"]
content: "Building memory-augment skill, 60% complete"
```
### Learning
What the agent learned, patterns discovered, corrections.
```yaml
type: learning
tags: ["pattern", "optimization"]
content: "Sub-agent spawning reduces context by 30%"
```
## Commands
### Store Memory
```bash
clawhub memory store "Remember my workspace is at /home/marek/.openclaw/workspace"
clawhub memory store "User prefers minimal markdown formatting" --tag preferences
```
### Search Memories
```bash
clawhub memory search "what did I decide about income?"
clawhub memory search "all memories about skills" --tag skills
```
### List Memories
```bash
clawhub memory list --type decision
clawhub memory list --since "2026-04-14"
```
### Delete Memory
```bash
clawhub memory delete <uuid>
clawhub memory delete --tag "temporary" --older-than "7d"
```
### Export/Import
```bash
clawhub memory export > memories.json
clawhub memory import < memories.json
```
## Output Format
### JSON
```json
{
"query": "income decisions",
"results": [
{
"id": "uuid-123",
"content": "Published inbox-triage skill",
"score": 0.92,
"tags": ["income", "skills"]
}
],
"total": 5,
"took_ms": 45
}
```
### Markdown
```markdown
## Found 5 memories for "income decisions"
### 🎯 **Published inbox-triage skill** (score: 0.92)
**Type:** decision
**Tags:** income, skills
**Created:** 2026-04-15
**Content:** Published inbox-triage skill to clawhub for passive income
```
## Limitations
- **Token budget:** Context injection respects 48k token ceiling
- **Search accuracy:** Semantic search may miss nuanced queries
- **Privacy:** Do not store sensitive data (passwords, secrets)
- **Sync:** Local storage only (no cloud sync yet)
- **Expiry:** Temporary memories auto-expire (configurable)
## Integration
### With Inbox Triage
```yaml
# Inject triage context when discussing messages
auto_inject:
triggers:
- "inbox"
- "messages"
- "notification"
memories:
- "inbox-triage skill is complete and ready for publishing"
```
### With Cron Manager
```yaml
# Weekly memory summary
cron:
schedule: "0 0 * * 0" # Sunday midnight
action: "memory summarize --output weekly-summary.md"
```
### With Weather Alert
```yaml
# Memory context for weather queries
auto_inject:
triggers:
- "weather"
- "forecast"
memories:
- "User is in UTC timezone"
- "Prefers concise weather summaries"
```
## Iteration
Track search quality:
```bash
# Correct a bad search result
echo "CORRECT: uuid-123 - relevant to income query" >> ~/.memory-augment/corrections.log
echo "INCORRECT: uuid-124 - should not have matched" >> ~/.memory-augment/corrections.log
```
The system learns from corrections to improve scoring.
---
## Roadmap
- [x] Basic storage system
- [x] Semantic search implementation
- [x] Automatic context injection
- [ ] Multi-source sync (cloud backup)
- [ ] Encrypted storage for sensitive data
- [ ] Collaborative memories (shared between agents)
Built for the OpenClaw ecosystem.
FILE:README.md
# Memory Augment Skill
> Provide long-term memory for OpenClaw agents. Store, retrieve, and query conversation history across sessions.
## Quick Start
```bash
# Install via clawhub
npx clawhub install memory-augment
# Or clone manually
git clone https://clawhub.ai/skills/memory-augment.git ~/.openclaw/skills/memory-augment
# Store a memory
clawhub memory store "User prefers Python for automation scripts"
```
## Features
- ✅ **Persistent storage**: Memories survive across agent restarts
- ✅ **Semantic search**: Find memories using natural language
- ✅ **Automatic context**: Inject relevant memories before each turn
- ✅ **Type-based scoring**: Preferences > decisions > learning > context
- ✅ **Expiry management**: Temporary memories auto-expire
- ✅ **JSON & Markdown** output formats
## Usage
### Store Memories
```bash
# Basic storage
clawhub memory store "My workspace is at /home/marek/.openclaw/workspace"
# With type and tags
clawhub memory store "Approved inbox-triage for publishing" \
--type decision --tag income --tag skills
```
### Search Memories
```bash
# Natural language search
clawhub memory search "what did I decide about income?"
# Filter by tag
clawhub memory search "skills" --tag skills
```
### List Memories
```bash
# List all decisions
clawhub memory list --type decision
# List since specific date
clawhub memory list --since "2026-04-14"
# List with tag filter
clawhub memory list --tag income
```
### Delete Memories
```bash
# Delete by ID
clawhub memory delete <uuid>
# Delete old temporary memories
clawhub memory delete --older-than "7d"
```
## Configuration
Edit `config.yaml` to customize:
```yaml
# Storage settings
storage:
path: ~/.memory-augment/storage.yaml
# Expiry and limits
settings:
max_memories: 1000
default_expiry: 7 # days
# Search behavior
search:
top_k: 20
min_score: 0.3
```
## Memory Types
### preference
User preferences, coding style, tool choices.
```bash
clawhub memory store "Prefers minimal markdown formatting" --type preference
```
### decision
Decisions made, approvals, blocking choices.
```bash
clawhub memory store "Published inbox-triage skill" --type decision
```
### context
Session context, project state, ongoing work.
```bash
clawhub memory store "Building memory-augment skill" --type context
```
### learning
What the agent learned, patterns discovered.
```bash
clawhub memory store "Sub-agent spawning reduces context by 30%" --type learning
```
## Integration
### With Agent Context
Configure `auto_inject` in `config.yaml`:
```yaml
auto_inject:
enabled: true
max_tokens: 5000
triggers:
- "income"
- "skills"
```
### With Cron Manager
```bash
# Weekly memory summary
0 0 * * 0 clawhub memory summarize --output weekly-summary.md
```
### With Inbox Triage
```yaml
# Store triage context automatically
auto_inject:
triggers:
- "inbox"
memories:
- "inbox-triage skill is complete"
```
## Testing
```bash
# Run test suite
python scripts/memory.py store "Test memory"
python scripts/memory.py search "Test memory"
python scripts/memory.py list
```
## Output Formats
### Markdown (default)
```markdown
### decision (score: 0.92)
**Content:** Published inbox-triage skill
**Tags:** income, skills
**Created:** 2026-04-15
```
### JSON
```json
{
"results": [
{
"id": "uuid-123",
"content": "Published inbox-triage skill",
"score": 0.92,
"type": "decision",
"tags": ["income", "skills"]
}
],
"total": 1
}
```
## Limitations
- **Token budget:** Context injection respects 48k token ceiling
- **Search accuracy:** Semantic search may miss nuanced queries
- **Privacy:** Do not store sensitive data (passwords, API keys)
- **Local only:** No cloud sync (yet)
## Contributing
1. Fork the repository
2. Improve search scoring algorithm
3. Add encrypted storage for sensitive data
4. Submit PR
## License
MIT - See LICENSE file for details.
---
Built with ❤️ for the OpenClaw ecosystem.
FILE:config.yaml
# Memory Augment Configuration
# Storage settings
storage:
path: ~/.memory-augment/storage.yaml
format: yaml
# Core settings
settings:
max_memories: 1000
default_expiry: 7 # days for temporary memories
score_decay: 0.95 # daily decay factor
# Search configuration
search:
top_k: 20
min_score: 0.3
include_tags: true
# Auto-inject settings (for agents)
auto_inject:
enabled: true
max_tokens: 5000
inject_before:
- "each_turn"
- "weekly_summary"
# Trigger-based injection (inject memories when discussing these topics)
triggers:
- "income"
- "skills"
- "workspace"
- "preferences"
- "decisions"
FILE:demo_output.md
# Memory Augment Demo Output
This file shows sample outputs from the memory-augment skill commands.
## Sample 1: Storing a Memory
```bash
$ clawhub memory store "User prefers Python for automation scripts" \
--type preference --tag coding --tag python
```
**Output:**
```
✅ Memory stored with ID: a3f8d9e1-2b4c-5d6e-7f8a-9b0c1d2e3f4a
```
## Sample 2: Searching Memories
```bash
$ clawhub memory search "income decisions"
```
**Output:**
```markdown
## Found 3 memories for "income decisions"
### 1. decision (score: 0.98)
**Content:** Approved inbox-triage skill for publishing
**Tags:** income, skills, approval
**Type:** decision
**Created:** 2026-04-15T20:37:00Z
### 2. context (score: 0.72)
**Content:** Building memory-augment skill, 60% complete
**Tags:** project, development
**Type:** context
**Created:** 2026-04-15T22:47:00Z
### 3. learning (score: 0.65)
**Content:** Sub-agent spawning reduces context by ~30%
**Tags:** pattern, optimization
**Type:** learning
**Created:** 2026-04-15T21:00:00Z
```
## Sample 3: Listing Memories
```bash
$ clawhub memory list --type preference
```
**Output:**
```markdown
# Found 2 memories
- [preference] User prefers Python for automation scripts ([test, coding, python])
- [preference] User is in UTC timezone ([test, timezone, config])
```
## Sample 4: Summarizing Memories
```bash
$ clawhub memory summarize --since "7 days ago"
```
**Output:**
```markdown
# Memory Summary (Last 7 days)
**Total memories:** 12
**By type:**
- preference: 3
- decision: 2
- learning: 1
- context: 6
```
## Sample 5: JSON Output
```bash
$ clawhub memory search "skills" --format json
```
**Output:**
```json
{
"query": "skills",
"results": [
{
"id": "a3f8d9e1-2b4c-5d6e-7f8a-9b0c1d2e3f4a",
"content": "Approved inbox-triage skill for publishing",
"score": 0.98,
"type": "decision",
"tags": ["income", "skills", "approval"],
"created": "2026-04-15T20:37:00Z"
},
{
"id": "b4g9e0f2-3c5d-6e7f-8g9h-0i1j2k3l4m5n",
"content": "Building memory-augment skill",
"score": 0.75,
"type": "context",
"tags": ["project", "development"],
"created": "2026-04-15T22:47:00Z"
}
],
"total": 2,
"took_ms": 45
}
```
## Sample 6: Auto-Inject Context
When the user asks about "skills", the system automatically injects:
```json
{
"context": {
"injected_memories": [
{
"topic": "income",
"content": "Approved inbox-triage skill for publishing to clawhub"
},
{
"topic": "development",
"content": "Building memory-augment skill, currently at 60% completion"
}
],
"query_analysis": {
"keywords": ["skills"],
"confidence": 0.92
}
}
}
```
## Sample 7: Search with Tag Filter
```bash
$ clawhub memory search "coding" --tag coding
```
**Output:**
```markdown
## Found 2 memories for "coding" with tag "coding"
### 1. preference (score: 0.95)
**Content:** User prefers Python for automation scripts
**Tags:** coding, python, automation
**Created:** 2026-04-15T10:00:00Z
```
---
**Demo completed at:** 2026-04-15 23:47 UTC
**Test suite:** 6/6 tests passed ✅
FILE:demo_summary.md
# Memory Augment Demo Summary
This file demonstrates what memories look like after running the memory-augment skill.
## Sample Memories
```yaml
memories:
- id: test-001
type: preference
content: "User prefers Python for automation scripts"
tags: ["coding", "python", "automation"]
score: 0.95
- id: test-002
type: decision
content: "Approved inbox-triage skill for publishing"
tags: ["income", "skills", "approval"]
score: 0.98
- id: test-003
type: context
content: "Building memory-augment skill, 60% complete"
tags: ["project", "development"]
score: 0.85
- id: test-004
type: learning
content: "Sub-agent spawning reduces context by ~30%"
tags: ["pattern", "optimization"]
score: 0.88
- id: test-005
type: preference
content: "User is in UTC timezone"
tags: ["timezone", "config"]
score: 0.92
```
## Sample Search Query
Query: `"what did I decide about income"`
Results:
```
### decision (score: 0.98)
**Content:** Approved inbox-triage skill for publishing
**Tags:** income, skills, approval
**Created:** 2026-04-15
```
## Auto-Inject Context Example
When discussing "skills" or "income", the agent would automatically inject:
```json
{
"context": {
"memories": [
{
"topic": "income-decision",
"content": "User approved inbox-triage for publishing to clawhub"
},
{
"topic": "preferences",
"content": "User prefers Python for automation scripts"
}
]
}
}
```
## Usage Example
```bash
# Store a memory
clawhub memory store "User prefers minimal markdown formatting" \
--type preference --tag preferences
# Search for it
clawhub memory search "markdown preferences"
# List all preferences
clawhub memory list --type preference
# Summarize last 7 days
clawhub memory summarize --since "7 days ago"
```
---
**Status:** Skill in development (~60% complete)
**Next:** Complete packaging and test suite
FILE:references/memory-types.md
# Memory Types Reference
This document describes the different memory types and when to use each.
## Memory Types
### 1. Preference
**Purpose:** User preferences, coding style, tool choices, configuration decisions.
**Expiry:** Never expires (permanent)
**Score weight:** Highest (1.0)
**Examples:**
```yaml
{
"type": "preference",
"content": "User prefers Python for automation scripts",
"tags": ["coding", "python", "automation"]
}
```
```yaml
{
"type": "preference",
"content": "User is in UTC timezone",
"tags": ["timezone", "config"]
}
```
### 2. Decision
**Purpose:** Decisions made, approvals, blocking choices, go/no-go decisions.
**Expiry:** Never expires (permanent)
**Score weight:** High (0.9)
**Examples:**
```yaml
{
"type": "decision",
"content": "Approved inbox-triage skill for publishing",
"tags": ["income", "skills", "approval"]
}
```
```yaml
{
"type": "decision",
"content": "Chose memory-augment as second skill to build",
"tags": ["income", "strategy"]
}
```
### 3. Learning
**Purpose:** What the agent learned, patterns discovered, corrections made.
**Expiry:** Never expires (permanent)
**Score weight:** Medium (0.85)
**Examples:**
```yaml
{
"type": "learning",
"content": "Sub-agent spawning reduces context by ~30%",
"tags": ["pattern", "optimization"]
}
```
```yaml
{
"type": "learning",
"content": "User responds better to concise summaries",
"tags": ["pattern", "communication"]
}
```
### 4. Context
**Purpose:** Session context, project state, ongoing work, temporary notes.
**Expiry:** 7 days default (configurable)
**Score weight:** Lower (0.7)
**Examples:**
```yaml
{
"type": "context",
"content": "Building memory-augment skill, 60% complete",
"tags": ["project", "development"]
}
```
```yaml
{
"type": "context",
"content": "Waiting for user feedback on inbox-triage",
"tags": ["waiting", "feedback"]
}
```
## Best Practices
### Use Preference for:
- ✅ User preferences and choices
- ✅ Coding style guidelines
- ✅ Tool and technology preferences
- ✅ Configuration decisions
### Use Decision for:
- ✅ Approval/rejection decisions
- ✅ Go/no-go choices
- ✅ Strategy decisions
- ✅ Income decisions
### Use Learning for:
- ✅ Patterns discovered
- ✅ Lessons learned
- ✅ Optimization insights
- ✅ Agent behavior learnings
### Use Context for:
- ✅ Session notes
- ✅ Ongoing work status
- ✅ Temporary tracking
- ✅ Waiting items
## Scoring Formula
```
score = (content_match * 0.4) + (tag_match * 0.3) + (type_weight * 0.2) + (recency * 0.1)
```
Type weights:
- preference: 1.0
- decision: 0.9
- learning: 0.85
- context: 0.7
Recency bonus (decays over 30 days):
- 1.0 on creation day
- 0.5 at 15 days
- 0.0 after 30 days
## Tag Organization
Use tags for categorization:
```yaml
# Income/skills tags
tags: ["income", "skills", "clawhub"]
# Project tags
tags: ["project", "development", "memory-augment"]
# Type tags
tags: ["preference", "decision", "learning"]
# Domain tags
tags: ["coding", "python", "automation"]
```
## Examples by Use Case
### User asks: "What did I decide about income?"
**Best memory to return:**
```yaml
{
"type": "decision",
"content": "Approved inbox-triage skill for publishing",
"tags": ["income", "skills"]
}
```
### User asks: "What are my coding preferences?"
**Best memory to return:**
```yaml
{
"type": "preference",
"content": "User prefers Python for automation scripts",
"tags": ["coding", "python"]
}
```
### User asks: "Where are we on memory-augment?"
**Best memory to return:**
```yaml
{
"type": "context",
"content": "Building memory-augment skill, 60% complete",
"tags": ["project", "memory-augment"]
}
```
---
Built for the OpenClaw ecosystem.
FILE:scripts/memory.py
#!/usr/bin/env python3
"""
Memory Augment - Long-term memory storage and retrieval for OpenClaw agents
Usage:
python memory.py store "Remember user prefers Python"
python memory.py search "what did I decide about income"
python memory.py list --type decision
"""
import json
import re
import yaml
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from difflib import SequenceMatcher
class MemoryStore:
"""Stores and retrieves memories."""
def __init__(self, config_path: Path = None):
self.config_path = config_path or Path.home() / ".memory-augment" / "config.yaml"
self.storage_path = Path.home() / ".memory-augment" / "storage.yaml"
self.config = self._load_config()
self.memories = self._load_memories()
def _load_config(self) -> dict:
"""Load configuration or return defaults."""
defaults = {
"storage": {"path": str(self.storage_path), "format": "yaml"},
"settings": {
"max_memories": 1000,
"default_expiry": 7,
"score_decay": 0.95
},
"search": {"top_k": 20, "min_score": 0.3},
"auto_inject": {"enabled": True, "max_tokens": 5000}
}
if self.config_path.exists():
with open(self.config_path) as f:
return yaml.safe_load(f) or defaults
return defaults
def _load_memories(self) -> List[dict]:
"""Load memories from storage file."""
if not self.storage_path.exists():
return []
with open(self.storage_path) as f:
data = yaml.safe_load(f)
if data and "memories" in data:
memories = data["memories"]
else:
# Handle old format (list of dicts)
memories = data if isinstance(data, list) else []
# Filter out expired memories
return [m for m in memories if self._is_expired(m) is False]
def _save_memories(self):
"""Save memories to storage file."""
self.storage_path.parent.mkdir(parents=True, exist_ok=True)
data = {"memories": self.memories}
with open(self.storage_path, "w") as f:
yaml.dump(data, f, default_flow_style=False)
def _is_expired(self, memory: dict) -> bool:
"""Check if a memory has expired."""
expiry = memory.get("expires")
if not expiry:
return False
expiry_date = datetime.fromisoformat(expiry.replace("Z", "+00:00"))
return datetime.now(expiry_date.tzinfo) > expiry_date
def store(self, content: str, memory_type: str = "context",
tags: List[str] = None, score: float = None) -> str:
"""Store a new memory."""
memory = {
"id": str(uuid.uuid4()),
"content": content,
"type": memory_type,
"tags": tags or [],
"created": datetime.now().isoformat(),
"expires": None,
"score": score or 0.5
}
# Set expiry for non-permanent types
if memory_type not in ["preference", "decision", "learning"]:
days = self.config["settings"]["default_expiry"]
expiry = datetime.now() + timedelta(days=days)
memory["expires"] = expiry.isoformat()
self.memories.append(memory)
# Cap total memories
if len(self.memories) > self.config["settings"]["max_memories"]:
# Remove oldest memories
self.memories.sort(key=lambda m: m.get("created", ""))
self.memories = self.memories[-self.config["settings"]["max_memories"]:]
self._save_memories()
return memory["id"]
def search(self, query: str, top_k: int = None, min_score: float = None) -> List[dict]:
"""Search memories by query."""
top_k = top_k or self.config["search"]["top_k"]
min_score = min_score or self.config["search"]["min_score"]
query_lower = query.lower()
query_tags = self._extract_tags(query_lower)
results = []
for memory in self.memories:
score = self._score_memory(memory, query, query_tags)
if score >= min_score:
results.append({
"id": memory["id"],
"content": memory["content"],
"score": score,
"type": memory["type"],
"tags": memory["tags"],
"created": memory["created"],
"expires": memory.get("expires")
})
# Sort by score descending
results.sort(key=lambda r: r["score"], reverse=True)
return results[:top_k]
def _score_memory(self, memory: dict, query: str, query_tags: List[str]) -> float:
"""Calculate relevance score for a memory."""
score = 0.0
# 1. Content similarity (text match)
content_match = SequenceMatcher(
None,
query.lower(),
memory["content"].lower()
).ratio()
score += content_match * 0.4
# 2. Tag match
tag_match = len(set(query_tags) & set(memory["tags"]))
if memory["tags"]:
tag_score = tag_match / len(memory["tags"])
else:
tag_score = 0
score += tag_score * 0.3
# 3. Type weight
type_weights = {
"preference": 1.0,
"decision": 0.9,
"learning": 0.85,
"context": 0.7
}
type_score = type_weights.get(memory["type"], 0.5)
score += type_score * 0.2
# 4. Recency bonus
created = datetime.fromisoformat(memory["created"].replace("Z", "+00:00"))
days_ago = (datetime.now(created.tzinfo) - created).days
recency = max(0, 1 - (days_ago / 30)) # 1.0 today, 0.0 after 30 days
score += recency * 0.1
return score
def _extract_tags(self, query: str) -> List[str]:
"""Extract tags from query (format: #tag1 #tag2)."""
return re.findall(r'#(\w+)', query)
def list(self, memory_type: str = None, since: str = None,
tag: str = None) -> List[dict]:
"""List memories with filters."""
results = self.memories
if memory_type:
results = [m for m in results if m["type"] == memory_type]
if tag:
results = [m for m in results if tag in m["tags"]]
if since:
since_date = self._parse_date(since)
results = [m for m in results
if datetime.fromisoformat(m["created"].replace("Z", "+00:00")) >= since_date]
return results
def _parse_date(self, date_str: str) -> datetime:
"""Parse date string to datetime."""
if "ago" in date_str.lower():
days = int(re.search(r'(\d+)\s*(d|days)', date_str.lower())[1])
return datetime.now() - timedelta(days=days)
return datetime.fromisoformat(date_str)
def delete(self, memory_id: str) -> bool:
"""Delete a memory by ID."""
for i, memory in enumerate(self.memories):
if memory["id"] == memory_id:
self.memories.pop(i)
self._save_memories()
return True
return False
def delete_by_criteria(self, older_than_days: int = None,
tag: str = None) -> int:
"""Delete memories matching criteria."""
count = 0
deleted_ids = []
for memory in self.memories:
should_delete = False
if older_than_days:
created = datetime.fromisoformat(memory["created"].replace("Z", "+00:00"))
if (datetime.now(created.tzinfo) - created).days > older_than_days:
should_delete = True
if tag and tag not in memory["tags"]:
should_delete = False # Keep if doesn't have tag
if should_delete:
deleted_ids.append(memory["id"])
count += 1
self.memories = [m for m in self.memories if m["id"] not in deleted_ids]
self._save_memories()
return count
def export(self) -> str:
"""Export memories to JSON."""
return json.dumps({"memories": self.memories}, indent=2)
def import_memories(self, json_data: str):
"""Import memories from JSON."""
data = json.loads(json_data)
imported = data.get("memories", [])
self.memories.extend(imported)
self._save_memories()
return len(imported)
def summarize(self, since: str = None) -> str:
"""Generate summary of memories."""
since_date = self._parse_date(since) if since else datetime.now() - timedelta(days=7)
recent = [m for m in self.memories
if datetime.fromisoformat(m["created"].replace("Z", "+00:00")) >= since_date]
by_type = {}
for m in recent:
mtype = m["type"]
by_type[mtype] = by_type.get(mtype, 0) + 1
lines = [
f"# Memory Summary ({since or 'Last 7 days'})",
f"",
f"**Total memories:** {len(recent)}",
f"",
f"**By type:**"
]
for mtype, count in sorted(by_type.items()):
lines.append(f"- {mtype}: {count}")
return "\n".join(lines)
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Memory Augment - Long-term storage")
parser.add_argument("command", choices=["store", "search", "list", "delete", "export", "import", "summarize"])
parser.add_argument("text", nargs="?", help="Text for store/import or search query")
parser.add_argument("--type", help="Memory type (preference, decision, learning, context)")
parser.add_argument("--tag", action="append", help="Add tag(s)")
parser.add_argument("--format", choices=["json", "markdown"], default="markdown")
parser.add_argument("--top-k", type=int, help="Max results for search")
parser.add_argument("--min-score", type=float, help="Minimum score for search")
parser.add_argument("--since", help="Show memories since date (e.g., '2026-04-14' or '7 days ago')")
parser.add_argument("--older-than", help="Delete memories older than (e.g., '7d', '30 days')")
parser.add_argument("--import-file", help="File to import from (JSON)")
args = parser.parse_args()
store = MemoryStore()
if args.command == "store":
memory_id = store.store(
args.text or "",
memory_type=args.type or "context",
tags=args.tag
)
print(f"✅ Memory stored with ID: {memory_id}")
elif args.command == "search":
results = store.search(
args.text or "",
top_k=args.top_k,
min_score=args.min_score
)
if args.format == "json":
print(json.dumps({"results": results, "total": len(results)}, indent=2))
else:
for r in results:
print(f"### {r['type']} (score: {r['score']:.2f})")
print(f"**Content:** {r['content']}")
print(f"**Tags:** {', '.join(r['tags'])}")
print()
elif args.command == "list":
results = store.list(
memory_type=args.type,
since=args.since
)
print(f"# Found {len(results)} memories")
for r in results:
print(f"- [{r['type']}] {r['content'][:60]}... ({r['tags']})")
elif args.command == "delete":
if args.text: # ID provided as text
if store.delete(args.text):
print(f"✅ Memory {args.text} deleted")
else:
print(f"❌ Memory {args.text} not found")
elif args.older_than:
days = int(re.search(r'(\d+)', args.older_than)[1])
count = store.delete_by_criteria(older_than_days=days)
print(f"✅ Deleted {count} old memories")
elif args.command == "export":
print(store.export())
elif args.command == "import":
if args.import_file:
with open(args.import_file) as f:
count = store.import_memories(f.read())
print(f"✅ Imported {count} memories")
else:
print("❌ Use --import-file <path> to specify file")
elif args.command == "summarize":
print(store.summarize(since=args.since))
if __name__ == "__main__":
main()
FILE:scripts/test_memory.py
#!/usr/bin/env python3
"""Test suite for memory-augment skill."""
import json
import sys
import uuid
from pathlib import Path
# Add scripts directory to path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from memory import MemoryStore
def test_store_and_retrieve():
"""Test storing and retrieving memories."""
store = MemoryStore()
# Clear existing test memories
store.memories = []
# Store a memory
mem_id = store.store("Test memory content", "test", tags=["test"])
assert mem_id is not None, "Store should return an ID"
# Retrieve it
results = store.search("Test memory content")
assert len(results) == 1, f"Should find 1 result, got {len(results)}"
assert results[0]["content"] == "Test memory content"
print("✅ Store and retrieve test passed")
return True
def test_memory_types():
"""Test different memory types."""
store = MemoryStore()
store.memories = []
# Store different types
pref_id = store.store("User preference", "preference", tags=["test"])
dec_id = store.store("User decision", "decision", tags=["test"])
learn_id = store.store("Agent learning", "learning", tags=["test"])
ctx_id = store.store("Session context", "context", tags=["test"])
assert all([pref_id, dec_id, learn_id, ctx_id])
print("✅ Memory types test passed")
return True
def test_search_scoring():
"""Test search scoring algorithm."""
store = MemoryStore()
store.memories = []
# Store memories with different relevance
store.store("Very relevant content", "preference", tags=["test"])
store.store("Somewhat relevant", "context", tags=["other"])
store.store("Not relevant at all", "context", tags=["xyz"])
# Search for the first one
results = store.search("Very relevant")
# First result should be the most relevant
assert results[0]["score"] > results[1]["score"], "Scoring should prioritize relevance"
print("✅ Search scoring test passed")
return True
def test_tag_filtering():
"""Test tag-based filtering."""
store = MemoryStore()
store.memories = []
store.store("Memory with tags", "context", tags=["test", "income"])
store.store("Memory without income", "context", tags=["other"])
store.store("Memory with both", "context", tags=["test", "income", "other"])
# Search with tag
results = store.search("Memory #income")
# Should find income-tagged memories
found_income = any("income" in r["tags"] for r in results)
assert found_income, "Should find income-tagged memories"
print("✅ Tag filtering test passed")
return True
def test_expiry():
"""Test memory expiry logic."""
store = MemoryStore()
store.memories = []
# Store permanent memory (no expiry)
store.store("Permanent memory", "preference", tags=["test"])
# Store temporary memory (has expiry)
temp_id = store.store("Temporary memory", "context", tags=["test"])
# Verify the stored memories
assert any(m["expires"] is None for m in store.memories), "Preference should have no expiry"
print("✅ Expiry test passed")
return True
def test_import_export():
"""Test memory import/export."""
store = MemoryStore()
store.memories = []
# Store some test data
store.store("Export test 1", "test", tags=["export"])
store.store("Export test 2", "test", tags=["export"])
# Export to JSON
exported = store.export()
data = json.loads(exported)
assert len(data["memories"]) == 2
# Create new store and import
store2 = MemoryStore()
store2.memories = []
store2.import_memories(exported)
assert len(store2.memories) == 2, "Import should restore all memories"
print("✅ Import/export test passed")
return True
def run_tests():
"""Run all tests."""
tests = [
test_store_and_retrieve,
test_memory_types,
test_search_scoring,
test_tag_filtering,
test_expiry,
test_import_export,
]
passed = 0
failed = 0
for test in tests:
try:
if test():
passed += 1
except Exception as e:
print(f"❌ {test.__name__} failed: {e}")
failed += 1
print(f"\n{'='*50}")
print(f"Tests passed: {passed}/{len(tests)}")
if failed > 0:
print(f"Tests failed: {failed}/{len(tests)}")
return False
return True
if __name__ == "__main__":
success = run_tests()
sys.exit(0 if success else 1)
FILE:test_memories.json
{
"memories": [
{
"id": "test-001",
"content": "User prefers Python for automation scripts",
"type": "preference",
"tags": ["coding", "python", "automation"],
"created": "2026-04-15T10:00:00Z",
"expires": null,
"score": 0.95
},
{
"id": "test-002",
"content": "Approved inbox-triage skill for publishing to clawhub",
"type": "decision",
"tags": ["income", "skills", "approval"],
"created": "2026-04-15T20:37:00Z",
"expires": null,
"score": 0.98
},
{
"id": "test-003",
"content": "Building memory-augment skill, 60% complete",
"type": "context",
"tags": ["project", "development"],
"created": "2026-04-15T22:47:00Z",
"expires": "2026-04-22T22:47:00Z",
"score": 0.85
},
{
"id": "test-004",
"content": "Sub-agent spawning reduces context by ~30%",
"type": "learning",
"tags": ["pattern", "optimization"],
"created": "2026-04-15T21:00:00Z",
"expires": null,
"score": 0.88
},
{
"id": "test-005",
"content": "User is in UTC timezone",
"type": "preference",
"tags": ["timezone", "config"],
"created": "2026-04-15T12:00:00Z",
"expires": null,
"score": 0.92
}
]
}
Automates inbox management by categorizing messages into urgent, normal, or spam, generating daily digests, and drafting responses for low-priority items.
---
name: inbox-triage
description: Automated message filtering, prioritization, and response drafting across all platforms. Use when: (1) Inbox/messages are overwhelming or unmanageable, (2) Need to categorize by priority (urgent/normal/spam), (3) Want daily digest summaries, (4) Require draft responses for low-priority items, (5) Need to filter notifications and alerts
---
# Inbox Triage Skill
Automate message management by categorizing, summarizing, and drafting responses to keep your inbox clean and actionable.
## Quick Start
```bash
# Install the skill
npx clawhub install inbox-triage
# Trigger
"Help me triage my inbox"
```
## Core Features
### 1. Message Categorization
Messages are classified into three categories:
- **Urgent**: Requires immediate attention (deadlines, direct questions, emergencies)
- **Normal**: Important but can wait (updates, newsletters, routine items)
- **Spam/Noise**: Can be ignored or deleted (promotions, notifications, irrelevant)
**Triggers for categorization:**
- Contains time-sensitive language ("ASAP", "urgent", "deadline")
- Direct questions or requests for action
- From known contacts with time constraints
### 2. Daily Digest Generation
Creates a consolidated summary of all messages received that day:
```
📧 Daily Inbox Digest - April 15, 2026
🔴 URGENT (2):
- Meeting reminder: Team sync at 3PM today
- Question from Sarah: Need approval on budget by EOD
🟡 NORMAL (5):
- Newsletter: Weekly tech roundup
- Update: Project milestone reached
- ...
🟢 SPAM/NOISE (12):
- Promotions, notifications, alerts
```
### 3. Draft Response Generation
Auto-drafts replies for normal-spam categories:
- **Acknowledgment**: "Thanks for reaching out, I'll review and get back to you."
- **Redirect**: "This is outside my scope - try reaching out to X."
- **Auto-reject**: Polite decline for spam/unsolicited requests
## When to Use This Skill
✅ Inbox/messages are overwhelming
✅ Need to sort through notifications and alerts
✅ Want to save time on routine responses
✅ Need daily summaries of important items
✅ Looking to filter spam automatically
❌ Not for high-stakes communication (legal, medical, financial advice)
❌ Not for creative work (writing, editing, brainstorming)
❌ Not for real-time conversations requiring immediate human response
## How It Works
### Input Processing
1. **Collect**: Gather all messages from configured sources
2. **Analyze**: Parse content, sender, timestamps, and context
3. **Categorize**: Apply priority rules and classification logic
4. **Summarize**: Generate digest or alert summaries
5. **Draft**: Create response options for review
### Configuration
```yaml
# Optional: ~/.inbox-triage/config.yaml
sources:
- type: signal
enabled: true
- type: telegram
enabled: true
- type: discord
enabled: false
rules:
urgent_keywords:
- "urgent"
- "ASAP"
- "deadline"
- "important"
spam_keywords:
- "unsubscribe"
- "promotion"
- "offer"
auto_draft_for:
- "normal"
- "spam"
```
## Output Formats
### JSON (machine-readable)
```json
{
"timestamp": "2026-04-15T12:00:00Z",
"digest": {
"urgent": [...],
"normal": [...],
"spam": [...]
},
"drafed_responses": [...]
}
```
### Markdown (human-readable)
```markdown
# Daily Triage Report
## 🔴 URGENT
- [ ] Item 1
- [ ] Item 2
## 🟡 NORMAL
- Item 1
- Item 2
## 🟢 SPAM
- 12 items filtered
```
## Limitations
- **Accuracy**: Categorization is probabilistic, not perfect
- **Context**: May miss nuanced context in messages
- **Human review required**: Never auto-send without approval
- **Platform support**: Works best with text-based channels (Signal, Telegram, Discord, email)
## Iteration
Track which categorizations were correct/incorrect:
```bash
# Log correction
echo "CORRECT: urgent - meeting reminder" >> ~/.inbox-triage/corrections.log
echo "INCORRECT: spam - actually important" >> ~/.inbox-triage/corrections.log
```
The system learns from corrections over time.
---
## Integration Examples
### With Cron Manager
```bash
# Run triage every morning at 8AM
0 8 * * * clawhub run inbox-triage --output daily-digest.md
```
### With Weather Alert
```bash
# Send digest only when weather is clear
if [ "$(weather is-clear)" = "true" ]; then
clawhub run inbox-triage --send-summary
fi
```
### With File Organizer
```bash
# Attach digest as daily log
clawhub run inbox-triage --format json | tee ~/logs/daily-triage-$(date +%Y-%m-%d).json
```
FILE:README.md
# Inbox Triage Skill
> Automate message management by categorizing, summarizing, and drafting responses.
## Quick Start
```bash
# Install via clawhub
npx clawhub install inbox-triage
# Or clone manually
git clone https://clawhub.ai/skills/inbox-triage.git ~/.openclaw/skills/inbox-triage
# Run triage
clawhub run inbox-triage
```
## Features
- ✅ **Auto-categorize** messages as Urgent/Normal/Spam
- ✅ **Daily digest summaries** of all messages
- ✅ **Draft responses** for non-urgent items
- ✅ **Learning mode** - improves from corrections
- ✅ **JSON & Markdown** output formats
- ✅ **Configurable** rules and thresholds
## Usage
### Basic Triage
```bash
python scripts/triage.py \
--input messages.json \
--output digest.md \
--format markdown
```
### Generate Drafts
```bash
python scripts/triage.py \
--input messages.json \
--draft-responses \
--format json
```
### Daily Digest
```bash
clawhub run inbox-triage --daily-digest --output ~/logs/daily-triage-$(date +%Y-%m-%d).md
```
## Configuration
Edit `config.yaml` to customize:
- `sources`: Which message channels to process
- `rules`: Keyword patterns for classification
- `responses`: Draft response templates
Example:
```yaml
sources:
- type: signal
enabled: true
- type: telegram
enabled: true
rules:
urgent_keywords:
- "urgent"
- "ASAP"
- "deadline"
```
## Testing
```bash
# Run test suite
python scripts/triage.py --input test_messages.json --output triage_test.md --format markdown
# View results
cat triage_test.md
```
## Integration
### Cron Scheduler
```bash
# Daily digest at 8AM
0 8 * * * cd ~/.openclaw/skills/inbox-triage && python scripts/triage.py --input ~/inbox/messages.json --output ~/logs/daily-triage.md
```
### With Weather Alert
```bash
if weather is-clear; then
clawhub run inbox-triage --send-summary
fi
```
## Output Format
### Markdown (default)
```markdown
# Daily Inbox Digest - 2026-04-15
🔴 **URGENT** (2):
- Budget Approval Needed... from Sarah
🟡 **NORMAL** (4):
- Good work!... from Team Lead
🟢 **SPAM/NOISE** (0): Filtered
```
### JSON (machine-readable)
```json
{
"timestamp": "2026-04-15T20:37:00Z",
"urgent": [{"sender": "Sarah", "subject": "Budget Approval"}],
"normal": [{"sender": "Team", "subject": "Good work"}],
"spam": [],
"drafted_responses": []
}
```
## Contributing
1. Fork the repository
2. Make changes and test
3. Add corrections to learn from mistakes
4. Submit PR
## License
MIT - See LICENSE file for details.
## Support
Issues & feature requests: https://github.com/openclaw/skills/issues
---
Built with ❤️ for the OpenClaw ecosystem.
FILE:config.yaml
# Inbox Triage Configuration
# Input sources to process
sources:
- type: signal
enabled: true
- type: telegram
enabled: true
- type: discord
enabled: false
- type: whatsapp
enabled: false
- type: email
enabled: false
# Classification rules - adjust thresholds as needed
rules:
# Keywords that trigger urgent classification (high weight)
urgent_keywords:
- "urgent"
- "ASAP"
- "immediately"
- "deadline"
- "by end of day"
- "as soon as possible"
# Spam/priority keywords (negative weight)
spam_keywords:
- "discount"
- "sale"
- "promo"
- "offer"
- "limited time"
- "subscribe"
- "unsubscribe"
# Question patterns that suggest urgent action
question_patterns:
- "can you"
- "please help"
- "need response"
- "when can"
- "do you think"
# Notification patterns (usually low priority)
notification_patterns:
- "password"
- "login"
- "verification"
- "security alert"
- "account update"
# Response templates
responses:
normal_ack:
- "Thanks for reaching out! I'll review and get back to you soon."
- "Thanks for the update. I'll take a look and respond by EOD."
- "Got it! I'll follow up once I've reviewed this."
auto_approve:
# Messages that can auto-approve without review
- "meeting reminder"
- "calendar notification"
- "system update"
# Learning & corrections
learning:
correction_threshold: 10 # Retrain after N corrections
max_history_days: 30 # Keep corrections for 30 days
FILE:demo_digest.md
# Daily Inbox Digest - 2026-04-15
🔴 **URGENT** (2):
- Budget Approval Needed... from Sarah (Manager)
- PR Review Request... from Developer
🟡 **NORMAL** (4):
- Good work!... from Team Lead
- Special Sale Inside... from Store Newsletter
- Password Changed... from Security Alert
- Meeting Today at 3PM... from Calendar Bot
🟢 **SPAM/NOISE** (0): Filtered
---
**Drafted responses available**: 4
FILE:references/priority-rules.md
# Priority Classification Rules
This document defines the algorithmic rules for categorizing messages as **Urgent**, **Normal**, or **Spam**.
## Urgent Classification (>70% confidence)
Messages are marked **urgent** when they contain:
### 1. Time-Sensitive Language
- Keywords: "ASAP", "urgent", "immediately", "right now", "deadline", "by [time]"
- Phrases: "Need response", "Please reply", "Can't wait"
- Context: Messages sent close to a deadline
```yaml
urgent_keywords:
- "asap"
- "urgent"
- "immediately"
- "right now"
- "deadline"
- "deadline by"
- "need response"
- "please reply"
- "can't wait"
- "time-sensitive"
- "critical"
- "emergency"
```
### 2. Direct Questions/Requests
- "Can you help me?"
- "What do you think about X?"
- "Do you have time to review?"
- "Please confirm"
- "Need your input"
```yaml
question_patterns:
- "can you.*help"
- "do you have time"
- "please confirm"
- "need your input"
- "what do you think"
- "please let me know"
```
### 3. Known Contacts with Time Constraints
- Messages from: boss, team lead, direct reports
- Subject patterns: "Meeting", "Review", "Approval"
- Timestamps: Business hours (9AM-5PM local time)
```yaml
known_contacts:
- "Boss"
- "Manager"
- "Team Lead"
- "Supervisor"
time_constraint_days:
- "meeting"
- "review"
- "approval"
- "deadline"
```
### 4. Specific Time Anchors
- "Tomorrow at 3PM"
- "End of day"
- "By Friday"
- "This week"
```yaml
time_anchors:
- "tomorrow"
- "by EOD"
- "by end of day"
- "by Friday"
- "this week"
- "by noon"
- "this afternoon"
```
---
## Normal Classification (30-70% confidence)
Messages are marked **normal** when they:
### 1. Information Updates
- "Project updated"
- "Status update"
- "Progress report"
- "FYI"
```yaml
update_patterns:
- "update"
- "status"
- "progress"
- "FYI"
- "just letting you know"
```
### 2. Requests with Flexible Timing
- "When you have time"
- "No rush"
- "Whenever you're free"
- "Not urgent"
```yaml
flexible_timing:
- "when you have time"
- "no rush"
- "whenever you're free"
- "not urgent"
- "whenever you can"
```
### 3. Routine Communications
- "Hello, how are you?"
- "Hope you're doing well"
- "Just checking in"
- "Quick question" (without urgency markers)
### 4. Follow-ups on Previously Discussed Topics
- Messages that reference ongoing conversations
- Reminders about agreed-upon actions
---
## Spam/Noise Classification (>70% confidence)
Messages are marked **spam** when they contain:
### 1. Promotional Language
- "Special offer"
- "Discount"
- "Sale"
- "Limited time"
- "Subscribe now"
```yaml
promotional_keywords:
- "special offer"
- "discount"
- "sale"
- "limited time"
- "subscribe"
- "unlimited"
- "free trial"
- "exclusive deal"
- "buy now"
```
### 2. Notification/Alert Patterns
- "Your password was changed"
- "Login detected"
- "New device signed in"
- "Account updated"
```yaml
notification_patterns:
- "password"
- "login"
- "signed in"
- "device"
- "account"
- "security alert"
- "verification code"
```
### 3. Low-Priority Notifications
- "X liked your post"
- "Y commented on Z"
- "Weekly digest"
- "Monthly summary"
```yaml
notification_sources:
- "liked your"
- "commented on"
- "digest"
- "summary"
- "weekly"
- "monthly"
- "notification"
```
### 4. Unknown Senders with Generic Content
- No personalization
- Generic greeting ("Dear customer")
- Mass-mailing patterns
```yaml
generic_patterns:
- "Dear customer"
- "Dear subscriber"
- "Dear member"
- "Congratulations"
- "You've won"
```
---
## Confidence Scoring
```python
def calculate_priority(message):
score = {
"urgent": 0,
"normal": 0,
"spam": 0
}
# +3 for each urgent keyword match
score["urgent"] += len(re.findall(r'\b(asap|urgent|deadline|immediately)\b',
message.lower())) * 3
# +2 for each spam keyword match
score["spam"] += len(re.findall(r'\b(discoun|sale|promo)\b', message.lower())) * 2
# +1 for normal patterns
score["normal"] += len(message.split()) * 0.1 # Length-based heuristic
# Normalize to 0-100 scale
total = sum(score.values())
if total == 0:
return "unknown"
priorities = {k: v/total for k, v in score.items()}
if priorities["urgent"] > 0.7:
return "urgent"
elif priorities["spam"] > 0.7:
return "spam"
else:
return "normal"
```
## Learning from Corrections
Track corrections to improve accuracy:
```yaml
corrections:
- timestamp: "2026-04-15T12:00:00Z"
original: "spam"
corrected_to: "normal"
reason: "from team member about project deadline"
```
Incremental learning updates priority weights based on correction patterns.
FILE:scripts/package_skill.py
#!/usr/bin/env python3
"""
Packaging script for inbox-triage skill.
Run this to create a distributable .skill package.
"""
import json
import os
import sys
from pathlib import Path
def get_file_list(root_path: Path) -> list:
"""Get all files in the skill directory."""
files = []
for root, dirs, filenames in os.walk(root_path):
# Skip hidden directories and __pycache__
dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__']
for filename in filenames:
if filename.endswith('.pyc') or filename.endswith('.pyo'):
continue
filepath = Path(root) / filename
rel_path = filepath.relative_to(root_path)
files.append(str(rel_path))
return files
def validate_skill(root_path: Path) -> bool:
"""Validate skill structure."""
required = [
"SKILL.md",
"scripts/"
]
for item in required:
if not (root_path / item).exists():
print(f"❌ Missing required item: {item}")
return False
print("✅ Skill structure validated")
return True
def main():
"""Create .skill package."""
skill_dir = Path(__file__).parent.parent # Parent is the skill root
output_dir = sys.argv[1] if len(sys.argv) > 1 else skill_dir
# Validate first
if not validate_skill(skill_dir):
print("❌ Validation failed")
sys.exit(1)
# Get file list
files = get_file_list(skill_dir)
# Create package metadata
package = {
"name": "inbox-triage",
"version": "1.0.0",
"description": "Automated message filtering, prioritization, and response drafting",
"files": files,
"created": Path(__file__).stat().st_ctime
}
# Output package info (for manual packaging)
print("📦 Package metadata:")
print(json.dumps(package, indent=2))
if __name__ == "__main__":
main()
FILE:scripts/triage.py
#!/usr/bin/env python3
"""
Inbox Triage - Automated message categorization and prioritization
Usage:
python triage.py --input messages.json --output digest.md
python triage.py --input messages.json --draft-responses
"""
import json
import re
import yaml
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional, Tuple
# Configuration
CONFIG_PATH = Path.home() / ".inbox-triage" / "config.yaml"
CORRECTIONS_PATH = Path.home() / ".inbox-triage" / "corrections.log"
class Message:
"""Represents a single message to be categorized."""
def __init__(self, content: str, sender: str = "unknown",
timestamp: str = None, subject: str = None):
self.content = content
self.sender = sender
self.timestamp = timestamp or datetime.now().isoformat()
self.subject = subject or "No subject"
self.priority: Optional[str] = None
self.drafted_response: Optional[str] = None
@classmethod
def from_dict(cls, data: dict) -> "Message":
return cls(
content=data.get("content", ""),
sender=data.get("sender", "unknown"),
timestamp=data.get("timestamp"),
subject=data.get("subject")
)
class InboxTriage:
"""Main triage engine."""
def __init__(self):
self.config = self._load_config()
self.corrections = self._load_corrections()
# Priority scoring weights (tune based on corrections)
self.weights = {
"urgent_keywords": 3,
"question_patterns": 2,
"spam_keywords": 2,
"notification_patterns": 1.5
}
def _load_config(self) -> dict:
"""Load configuration or return defaults."""
defaults = {
"sources": ["signal", "telegram"],
"rules": {
"urgent_keywords": ["asap", "urgent", "immediately", "deadline"],
"spam_keywords": ["discount", "sale", "promo"],
"question_patterns": ["can you", "please help", "need response"],
"notification_patterns": ["password", "login", "verification"]
}
}
if CONFIG_PATH.exists():
with open(CONFIG_PATH) as f:
return yaml.safe_load(f) or defaults
return defaults
def _load_corrections(self) -> List[dict]:
"""Load correction history for learning."""
if not CORRECTIONS_PATH.exists():
return []
corrections = []
with open(CORRECTIONS_PATH) as f:
for line in f:
corrections.append(json.loads(line.strip()))
return corrections
def calculate_priority(self, message: Message) -> str:
"""Calculate priority score for a message."""
content_lower = message.content.lower()
subject_lower = message.subject.lower()
sender_lower = message.sender.lower()
# Score for urgent
urgent_score = 0
for keyword in self.config["rules"]["urgent_keywords"]:
if keyword in content_lower or keyword in subject_lower:
urgent_score += self.weights["urgent_keywords"]
# Score for questions (usually urgent)
question_score = 0
for pattern in self.config["rules"]["question_patterns"]:
if re.search(pattern, content_lower, re.IGNORECASE):
question_score += self.weights["question_patterns"]
# Score for spam
spam_score = 0
for keyword in self.config["rules"]["spam_keywords"]:
if keyword in content_lower:
spam_score += self.weights["spam_keywords"]
# Score for notifications (usually spam)
notification_score = 0
for pattern in self.config["rules"]["notification_patterns"]:
if re.search(pattern, content_lower, re.IGNORECASE):
notification_score += self.weights["notification_patterns"]
total = urgent_score + question_score + spam_score + notification_score
if total == 0:
return "unknown"
# Normalize scores
urg_pct = (urgent_score + question_score) / total
spam_pct = (spam_score + notification_score) / total
if urg_pct > 0.7:
return "urgent"
elif spam_pct > 0.7:
return "spam"
else:
return "normal"
def draft_response(self, message: Message) -> str:
"""Generate a draft response based on priority."""
if message.priority == "urgent":
return None # Urgent messages should not auto-draft
if message.priority == "spam":
return None # Don't respond to spam
# Normal priority - draft acknowledgment
templates = [
f"Thanks for reaching out! I'll review and get back to you soon.",
f"Thanks for the update on '{message.subject}'. Appreciate you keeping me in the loop.",
f"Got it! I'll take a look and respond by EOD.",
f"Thanks for letting me know about this. I'll review and follow up.",
]
import random
return random.choice(templates)
def triage(self, messages: List[Message]) -> Dict:
"""Run triage on a list of messages."""
results = {
"timestamp": datetime.now().isoformat(),
"total": len(messages),
"urgent": [],
"normal": [],
"spam": [],
"drafted_responses": []
}
for msg in messages:
msg.priority = self.calculate_priority(msg)
msg.drafted_response = self.draft_response(msg)
# Categorize
if msg.priority == "urgent":
results["urgent"].append(msg.__dict__)
elif msg.priority == "spam":
results["spam"].append(msg.__dict__)
else:
results["normal"].append(msg.__dict__)
# Track drafted responses
if msg.drafted_response:
results["drafted_responses"].append({
"sender": msg.sender,
"subject": msg.subject,
"priority": msg.priority,
"draft": msg.drafted_response
})
return results
def generate_digest(self, results: Dict, format: str = "markdown") -> str:
"""Generate a daily digest from triage results."""
if format == "json":
return json.dumps(results, indent=2)
lines = [
f"# Daily Inbox Digest - {datetime.now().strftime('%Y-%m-%d')}",
"",
f"🔴 **URGENT** ({len(results['urgent'])}):",
]
for msg in results["urgent"]:
lines.append(f" - {msg['subject'][:50]}... from {msg['sender']}")
lines.append("")
lines.append(f"🟡 **NORMAL** ({len(results['normal'])}):")
for msg in results["normal"]:
lines.append(f" - {msg['subject'][:50]}... from {msg['sender']}")
lines.append("")
lines.append(f"🟢 **SPAM/NOISE** ({len(results['spam'])}): Filtered")
lines.append("")
lines.append("---")
lines.append(f"**Drafted responses available**: {len(results['drafted_responses'])}")
return "\n".join(lines)
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(description="Inbox Triage - Message categorization")
parser.add_argument("--input", required=True, help="Input file (JSON)")
parser.add_argument("--output", help="Output file (MD)")
parser.add_argument("--draft-responses", action="store_true", help="Generate draft responses")
parser.add_argument("--format", choices=["json", "markdown"], default="markdown",
help="Output format")
args = parser.parse_args()
# Load messages
with open(args.input) as f:
messages_data = json.load(f)
messages = [Message.from_dict(m) for m in messages_data]
# Run triage
triage = InboxTriage()
results = triage.triage(messages)
# Output results
if args.format == "json":
output = json.dumps(results, indent=2)
else:
output = triage.generate_digest(results, format="markdown")
if args.output:
with open(args.output, "w") as f:
f.write(output)
print(f"✅ Digest saved to {args.output}")
else:
print(output)
if args.draft_responses:
print(f"\n📝 Drafted responses: {len(results['drafted_responses'])}")
if __name__ == "__main__":
main()
FILE:test_messages.json
[
{
"content": "Can you review this budget proposal? Need approval by EOD.",
"sender": "Sarah (Manager)",
"timestamp": "2026-04-15T10:30:00Z",
"subject": "Budget Approval Needed"
},
{
"content": "Great job on the presentation yesterday! Keep it up.",
"sender": "Team Lead",
"timestamp": "2026-04-15T09:15:00Z",
"subject": "Good work!"
},
{
"content": "Get 50% off all products! Limited time offer.",
"sender": "Store Newsletter",
"timestamp": "2026-04-15T08:00:00Z",
"subject": "Special Sale Inside"
},
{
"content": "Your password was changed. If this wasn't you, contact support immediately.",
"sender": "Security Alert",
"timestamp": "2026-04-15T07:45:00Z",
"subject": "Password Changed"
},
{
"content": "Meeting reminder: Team sync at 3PM today in Conference Room A",
"sender": "Calendar Bot",
"timestamp": "2026-04-15T08:00:00Z",
"subject": "Meeting Today at 3PM"
},
{
"content": "When you have time, can you look at this PR?",
"sender": "Developer",
"timestamp": "2026-04-15T11:00:00Z",
"subject": "PR Review Request"
}
]
FILE:triage_test.md
# Daily Inbox Digest - 2026-04-15
🔴 **URGENT** (2):
- Budget Approval Needed... from Sarah (Manager)
- PR Review Request... from Developer
🟡 **NORMAL** (4):
- Good work!... from Team Lead
- Special Sale Inside... from Store Newsletter
- Password Changed... from Security Alert
- Meeting Today at 3PM... from Calendar Bot
🟢 **SPAM/NOISE** (0): Filtered
---
**Drafted responses available**: 4