@clawhub-oblio-falootin-ac5a03eb14
Semantic memory layer for OpenClaw agents. Use when: (1) persisting agent memories with importance scoring, (2) hierarchical memory rollups (daily→weekly→mon...
---
name: sql-memory
description: "Semantic memory layer for OpenClaw agents. Use when: (1) persisting agent memories with importance scoring, (2) hierarchical memory rollups (daily→weekly→monthly→yearly), (3) queuing tasks for agents, (4) logging activity and audit trails, (5) managing knowledge bases with semantic search. Provides remember/recall/search/queue_task/log_event APIs. Built on sql-connector for reliable parameterized SQL execution."
---
# SQL Memory Skill
> Semantic memory layer for OpenClaw agents
## Overview
Provides agent-friendly memory operations: remember, recall, search, forget, plus task queue management, knowledge indexing, activity logging, and hierarchical memory rollups. All operations go through the SQL Connector skill for reliable, parameterized SQL execution.
See `scripts/sql_memory.py` for full implementation.
## Dependencies
- **sql-connector** — provides the underlying database connection and query execution
## Quick Start
```python
from sql_memory import SQLMemory, get_memory
mem = get_memory('cloud')
# Remember something
mem.remember('facts', 'vex_timezone', 'VeX is in EST/EDT timezone', importance=7)
# Recall it
entry = mem.recall('facts', 'vex_timezone')
# Search across all memories
results = mem.search_memories('timezone')
# Queue a task
mem.queue_task('nlp_agent', 'analyze_document', '{"doc": "..."}', priority=3)
# Log an event
mem.log_event('training_complete', 'nlp_agent', 'Finished training cycle 42')
# Store knowledge
mem.store_knowledge('stamps', 'inverted_jenny', 'Rare 1918 misprint...', 'catalog')
```
## Schema
All tables live in the `memory` schema (SQL Server database):
| Table | Purpose |
|-------|---------|
| `memory.Memories` | Long-term curated memories with importance scoring |
| `memory.TaskQueue` | Task queue for agent work items |
| `memory.ActivityLog` | Event/activity logging for audit trail |
| `memory.KnowledgeIndex` | Domain-specific knowledge store |
| `memory.Sessions` | Session tracking for agents |
## Memory Rollups
Hierarchical consolidation keeps memories fresh and relevant:
```
Daily memories → Weekly rollup (Sundays 3AM)
Weekly rollups → Monthly rollup (1st of month)
Monthly → Quarterly (Jan/Apr/Jul/Oct)
Quarterly → Yearly (Jan 1st)
```
Each rollup:
1. Summarizes source entries
2. Creates a consolidated entry with back-references
3. Reduces importance of source entries
4. Tags sources as `rolled_up`
### Importance Scale
| Level | Meaning | Example |
|-------|---------|---------|
| 1-2 | Ephemeral, archive | Old workspace file |
| 3-4 | Context, nice-to-know | Debug notes |
| 5-6 | Standard operational | Task completion |
| 7-8 | Important milestone | Architecture decision |
| 9 | Critical | System design choice |
| 10 | Permanent | Core identity/values |
## API Reference
### Memory Operations
| Method | Description | Example |
|--------|-------------|---------|
| `remember(cat, key, content, importance, tags)` | Store a memory | `mem.remember('facts', 'name', 'Oblio', 7)` |
| `recall(cat, key)` | Retrieve a memory | `mem.recall('facts', 'name')` |
| `search_memories(query, limit)` | Semantic search | `mem.search_memories('timezone', limit=5)` |
| `forget(cat, key)` | Delete a memory | `mem.forget('facts', 'name')` |
### Task Queue
| Method | Description |
|--------|-------------|
| `queue_task(agent, type, payload, priority)` | Add a task |
| `claim_task(id)` | Mark task as processing |
| `complete_task(id, result)` | Mark task as completed |
| `fail_task(id, error, retries, max)` | Fail with retry logic |
### Activity Logging
| Method | Description |
|--------|-------------|
| `log_event(type, agent, detail, extra)` | Log an activity |
| `get_recent_activity(hours, agent)` | Query recent events |
## Configuration
Uses the same environment variables as sql-connector:
```
SQL_CLOUD_SERVER=sql5112.site4now.net
SQL_CLOUD_DATABASE=db_99ba1f_memory4oblio
SQL_CLOUD_USER=...
SQL_CLOUD_PASSWORD=...
SQL_LOCAL_SERVER=10.0.0.110
SQL_LOCAL_DATABASE=Oblio_Memories
SQL_LOCAL_USER=sa
SQL_LOCAL_PASSWORD=...
```
## Architecture
```
┌──────────────────┐
│ Agents │ ← OblioAgent subclasses
├──────────────────┤
│ SQLMemory │ ← Semantic operations (remember/recall/queue/log)
├──────────────────┤
│ SQLConnector │ ← Generic SQL execution (retry, parameterized, logging)
├──────────────────┤
│ pymssql (TDS) │ ← Native SQL Server driver
└──────────────────┘
```
## License
MIT
FILE:README.md
# clawbot-sql-memory
> ⚠️ **ALPHA — Use at your own risk.** Functional and in active use, but API may change. We'll lock the API after 30 days of community feedback. Open issues freely — this improves with use.
SQL Server-based persistent memory for OpenClaw agents. Provides semantic memory, task queuing, activity logging, todo management, and hierarchical rollups (daily → weekly → monthly → yearly).
## Requirements
- SQL Server 2019+ (or Azure SQL, site4now, etc.)
- [clawbot-sql-connector](https://github.com/VeXHarbinger/clawbot-sql-connector) — install first
- `pymssql` and `python-dotenv`
## Step 1: Create the Schema
Before installing this skill, create the required tables in your SQL Server database. You can do this one of two ways:
### Option A — Run the setup script (recommended)
```bash
python3 setup_schema.py
```
This will connect using your `.env` credentials and create all tables automatically. Run it once before first use.
### Option B — Manual SQL (paste into SSMS, Azure Data Studio, or sqlcmd)
```sql
-- Run against your target database
CREATE SCHEMA memory;
GO
CREATE TABLE memory.Memories (
id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT newid(),
category NVARCHAR(100) NOT NULL,
key NVARCHAR(255) NOT NULL,
content NVARCHAR(MAX) NOT NULL,
importance INT DEFAULT 3,
tags NVARCHAR(500) DEFAULT '',
status NVARCHAR(50) DEFAULT 'active',
created_at DATETIME2 DEFAULT GETUTCDATE(),
updated_at DATETIME2 DEFAULT GETUTCDATE()
);
CREATE TABLE memory.TaskQueue (
id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT newid(),
agent NVARCHAR(100) NOT NULL,
task_type NVARCHAR(100) NOT NULL,
payload NVARCHAR(MAX) DEFAULT '',
priority INT DEFAULT 5,
status NVARCHAR(50) DEFAULT 'pending',
retries INT DEFAULT 0,
model_hint NVARCHAR(100) DEFAULT '',
created_at DATETIME2 DEFAULT GETUTCDATE(),
updated_at DATETIME2 DEFAULT GETUTCDATE(),
claimed_at DATETIME2 NULL,
completed_at DATETIME2 NULL,
error NVARCHAR(MAX) DEFAULT ''
);
CREATE TABLE memory.ActivityLog (
id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT newid(),
event_type NVARCHAR(100) NOT NULL,
agent NVARCHAR(100) DEFAULT '',
description NVARCHAR(MAX) DEFAULT '',
metadata NVARCHAR(MAX) DEFAULT '',
importance INT DEFAULT 3,
created_at DATETIME2 DEFAULT GETUTCDATE()
);
CREATE TABLE memory.Sessions (
id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT newid(),
session_key NVARCHAR(255) NOT NULL,
agent NVARCHAR(100) DEFAULT '',
status NVARCHAR(50) DEFAULT 'active',
metadata NVARCHAR(MAX) DEFAULT '',
started_at DATETIME2 DEFAULT GETUTCDATE(),
ended_at DATETIME2 NULL
);
CREATE TABLE memory.KnowledgeIndex (
id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT newid(),
domain NVARCHAR(100) NOT NULL,
key NVARCHAR(255) NOT NULL,
content NVARCHAR(MAX) NOT NULL,
source NVARCHAR(255) DEFAULT '',
tags NVARCHAR(500) DEFAULT '',
created_at DATETIME2 DEFAULT GETUTCDATE()
);
CREATE TABLE memory.Todos (
id UNIQUEIDENTIFIER PRIMARY KEY DEFAULT newid(),
title NVARCHAR(500) NOT NULL,
description NVARCHAR(MAX) DEFAULT '',
priority INT DEFAULT 3,
status NVARCHAR(50) DEFAULT 'open',
tags NVARCHAR(500) DEFAULT '',
created_at DATETIME2 DEFAULT GETUTCDATE(),
updated_at DATETIME2 DEFAULT GETUTCDATE(),
closed_at DATETIME2 NULL
);
GO
```
## Step 2: Configure .env
Backend configuration uses a simple naming pattern. Add these to your `.env`:
```env
# Local SQL Server
SQL_local_server=10.0.0.110
SQL_local_port=1433
SQL_local_database=YourDatabase
SQL_local_user=your_user
SQL_local_password=your_password
# Cloud SQL Server (Azure / site4now / etc.)
SQL_cloud_server=yourserver.database.windows.net
SQL_cloud_port=1433
SQL_cloud_database=your_cloud_db
SQL_cloud_user=your_cloud_user
SQL_cloud_password=your_cloud_password
# Add more backends using the same pattern: SQL_<backend>_server, SQL_<backend>_database, etc.
```
See [clawbot-sql-connector README](https://github.com/VeXHarbinger/clawbot-sql-connector#env-setup) for more details on backend naming.
## Step 3: Install
```bash
clawhub install sql-connector # dependency — install first
clawhub install sql-memory
```
## Quick Start
```python
from sql_memory import SQLMemory, get_memory
mem = get_memory('cloud') # or 'local'
# Store a memory
mem.remember('facts', 'user_timezone', 'User is in EST/EDT', importance=7, tags='user,prefs')
# Recall it
entry = mem.recall('facts', 'user_timezone')
print(entry) # → 'User is in EST/EDT'
# Search across all memories
results = mem.search_memories('timezone')
# Queue a task for an agent
task_id = mem.queue_task('my_agent', 'process_data', payload='{"source":"api"}', priority=3)
# Log an event
mem.log_event(event_type='task_started', agent='my_agent', description='Processing began')
# Todos
todo_id = mem.add_todo('Fix the login bug', priority=2, tags='bug,auth')
mem.complete_todo(todo_id)
# Connectivity check
mem.ping() # → True
```
## API Reference
### Memory
| Method | Description |
|---|---|
| `remember(category, key, content, importance=3, tags='')` | Store or update a memory entry |
| `recall(category, key)` | Retrieve most recent active entry |
| `search_memories(query, limit=10)` | Full-text search across all memories |
| `get_recent(category, limit=10)` | Most recent entries in a category |
| `forget(category, key)` | Mark entry as inactive |
### Task Queue
| Method | Description |
|---|---|
| `queue_task(agent, task_type, payload='', priority=5, model_hint='')` | Add a task |
| `get_pending_tasks(agent=None, limit=10)` | Fetch pending tasks |
| `complete_task(task_id, result='')` | Mark task complete |
| `fail_task(task_id, error='')` | Mark task failed |
### Todos
| Method | Description |
|---|---|
| `add_todo(title, description='', priority=3, tags='')` | Create a todo |
| `complete_todo(todo_id)` | Mark complete |
| `update_todo(todo_id, **kwargs)` | Update fields |
| `delete_todo(todo_id)` | Hard delete |
### Activity Logging
| Method | Description |
|---|---|
| `log_event(event_type, agent='', description='', metadata='', importance=3)` | Write to ActivityLog |
## Memory Rollup Schedule
Hierarchical compression keeps long-term memory manageable:
```
Daily entries → rolled up weekly (every Sunday)
Weekly → monthly (1st of month)
Monthly → yearly (January 1st)
```
Each rollup preserves source references for traceability.
## Design Principles
- **UTC everywhere** — all timestamps use `GETUTCDATE()` in SQL, `datetime.now(timezone.utc)` in Python
- **Parameterized only** — no f-string SQL, ever; the connector layer enforces this
- **Importance scale** — 1–10: `3`=routine, `5`=significant, `7`=strategic, `10`=permanent facts
## Related
- [clawbot-sql-connector](https://github.com/VeXHarbinger/clawbot-sql-connector) — the transport layer this builds on
- [oblio-heart-and-soul](https://github.com/VeXHarbinger/oblio-heart-and-soul) — full reference implementation
## Community
Alpha software — your feedback shapes the v1 API. Open issues for broken installs, schema questions, or API suggestions.
## License
MIT
FILE:knowledge-base/INFRASTRUCTURE.md
# Infrastructure & Application Framework
_Oblio System Architecture — Documented 2026-03-09_
---
## Core Principles
### 1. Task Decomposition Pattern
**Never execute tasks blindly.** Understand the goal first, then break into recursive subtasks.
**Pattern:**
```
GOAL (macro understanding)
↓
SUBTASK 1 (macro+micro context) → Agent chooses decomposition level
SUBTASK 2 (macro+micro context) → Agent executes or breaks down further
SUBTASK 3 (macro+micro context) → Recursive until atomic
```
**Example:**
- GOAL: "Enable dashboard reporting"
- MACRO: Users need to see system state (reports, queue, logs)
- SUBTASKS:
- Backend 1: Cache layer (SQL data + TTL)
- Backend 2: REST endpoints (/api/report, /api/queue, /api/logs)
- Frontend 1: Report view rendering
- Frontend 2: Queue view rendering
- Frontend 3: Logs view rendering
- QA: Integration tests
**Benefits:**
- Agents see the big picture → smarter decisions
- Parallel execution → efficiency
- Recursive decomposition → handles complexity naturally
- No "what do I do next?" bottleneck
---
## Architecture Layers
### Layer 1: Data (SQL Memory)
**Primary Backend:** Cloud (SQL5112.site4now.net)
```
Database: db_99ba1f_memory4oblio
Schema: memory.*
```
**Tables:**
- `memory.Memories` — Facts, decisions, learnings (domain, key, content)
- `memory.Sessions` — Session snapshots for continuity
- `memory.TaskQueue` — Work queue (agent, task_type, status, priority)
- `memory.KnowledgeIndex` — Knowledge base (domain, topic, summary)
- `memory.ActivityLog` — Agent activity (agent, event_type, description)
**Module:** `infrastructure/sql_memory.py`
- Dual-backend support (local fallback, cloud primary)
- 15+ methods: remember, recall, queue_task, complete_task, log_event, etc.
- Automatic retry + graceful degradation
**Configuration:**
```python
# All agents default to cloud
db = SQLMemory('cloud')
```
---
### Layer 2: Agent Framework
**Base Class:** `infrastructure/agent_base.py` → `OblioAgent`
**Features:**
- SQL memory integration (auto-logging to ActivityLog)
- Model selection via model_router.py
- Structured logging (file + DB)
- Graceful error handling with retry (3 attempts default)
- Ollama inference (text, chat, vision, embeddings)
- Reporting framework (per-agent metrics)
**Standard Methods:**
```python
class MyAgent(OblioAgent):
agent_name = "my_agent"
task_types = ["my_task_type"]
backend = "cloud" # MUST be "cloud"
def run_task(self, task: dict) -> str:
# Run one task, return summary
pass
# Usage
agent = MyAgent()
agent.run_once() # Process all pending tasks once
agent.run_loop(interval=60) # Loop continuously
```
**Logging:**
```python
# Automatic to logs/my_agent.log
self.log.info("message")
# Automatic to memory.ActivityLog via
self.log_activity("event_type", "description", "metadata")
```
---
### Layer 3: Task Queue & Dispatch
**Queue System:** `infrastructure/sql_memory.py` + `agents/agent_dispatcher.py`
**Task Structure:**
```json
{
"id": 47,
"agent": "backend",
"task_type": "dashboard_cache_layer",
"payload": {
"macro": "Agents need reliable, cached access to SQL data",
"component": "util/dashboard_cache.py",
"methods": ["get_latest_report", "get_queue_status", "get_recent_logs"],
"ttl_seconds": 30,
"tests_required": true
},
"priority": 9,
"status": "pending",
"created_at": "2026-03-09T06:40:00Z"
}
```
**Task Lifecycle:**
1. Queue → Pending
2. Agent claims → Processing
3. Agent executes → Completed (with result) OR Failed (with error)
**Dispatcher:** Routes tasks by type to specialized agents or handles directly
- Handles: github_setup, github_clone, github_checkin, ui_fix, security_test
- Scheduled: Every 3 hours (cron: `0 */3 * * *`)
---
### Layer 4: AI Models
**Primary:** Ollama (local to DEAUS 10.0.0.110:11434)
**Available Models:**
- `gemma3:4b` — Text generation (default, free)
- `codellama:7b` — Code analysis + generation
- `mistral:7b` — General purpose LLM
- `llava` — Vision (images)
- `moondream` — Lightweight vision
- `nomic-embed-text` — Embeddings
**Model Router:** `infrastructure/model_router.py`
- Selects model by task_type + budget tier
- Routing logic: `select_model(task_type, budget, **kwargs)`
- Falls back to cloud APIs (with approval) if needed
**Usage:**
```python
# Agent automatically selects model
result = self.ollama_generate(prompt="...", model="gemma3:4b")
# Or let router choose
model = self.get_model(task_type="code_analysis", budget="free")
result = self.ollama_generate(prompt, model)
```
---
### Layer 5: Specialized Agents
**Current Agents:**
| Agent | Purpose | Cron | Type |
|-------|---------|------|------|
| `agent_stamps.py` | Stamp identification + cataloging | 0 */2 * * * | Background |
| `agent_facs.py` | FACS micro-expression training | 0 2 * * * | Background |
| `agent_nlp.py` | NLP document processing | 30 2 * * * | Background |
| `agent_security.py` | Security audits (11 checks) | 30 3 * * * | Background |
| `agent_dispatcher.py` | General task router | 0 */3 * * * | Background |
| `agent_idle.py` | Background task scheduling | */15 * * * * | Background |
| `agent_reporter.py` | Daily reports (4:20 AM/PM) | 20 4,16 * * * | Reporting |
---
## Development Standards
### Solid Design Paradigm
**Requirements:**
1. **Every function MUST have a unit test**
- Write test first (TDD)
- Test covers happy path + error cases
- Never ship untested code
2. **No silent failures**
- All errors logged with severity (INFO, WARN, ERROR, CRITICAL)
- Errors surface in dashboard alerts
- Retry logic for transient failures
3. **SOLID Principles**
- Single Responsibility: One job per class/function
- Open/Closed: Open for extension, closed for modification
- Liskov Substitution: Agents interchangeable
- Interface Segregation: Minimal required interfaces
- Dependency Inversion: Depend on abstractions (sql_memory), not concrete
4. **DRY (Don't Repeat Yourself)**
- Shared code → `infrastructure/` modules
- Agents inherit from OblioAgent base class
- Configuration in `.env` (not hardcoded)
5. **Composition over Inheritance**
- Agents compose behaviors (memory, models, logging)
- Not deep inheritance hierarchies
- Mixins for orthogonal concerns
### GitHub Workflow
**Repositories:**
- `VeXHarbinger/AI-UI` — Dashboard application
- `VeXHarbinger/sequel-memory-skill` — SQL memory skill
- `VeXHarbinger/clawbot-sql-memory` — Memory infrastructure
- `Oblio-Falootin/*` — Infrastructure/tooling (not for sharing)
**Branch Strategy:**
```
main (stable, tagged releases)
↑
development (next release, PR-based)
↑
feature/* (individual features, merged via PR)
```
**PR Requirements:**
- [ ] Tests pass (100% of changed code)
- [ ] Code review approved
- [ ] CI pipeline green
- [ ] Documentation updated
**Initial Checkin Checklist:**
- [ ] README.md (what, how, why)
- [ ] ARCHITECTURE.md (design decisions, module breakdown)
- [ ] tests/README.md (how to run tests)
- [ ] tests/test_*.py (unit tests for all functions)
- [ ] .gitignore (Python: .pyc, __pycache__, .env, venv/)
- [ ] .github/workflows/test.yml (CI: run pytest on push)
---
## Cron Schedule
**Execution Times:**
```
*/5 min → inbox_monitor (file watcher keepalive)
*/15 min → agent_idle (background scheduling when CPU < 15%)
*/30 min → task-monitor.sh (check for stuck tasks)
0 1 * * * → agent_github (GitHub monitoring)
15 1 * * * → agent_git_sync (repo sync)
0 2 * * * → agent_facs (FACS training)
30 2 * * * → agent_nlp (NLP training)
0 3 * * * → agent_lightsound (background task)
30 3 * * * → agent_security (security audit)
0 3,15 * * * → db_backup (SQL backup + rotate)
0 */2 * * * → agent_stamps (stamp processing)
20 4,16 * * * → agent_reporter (daily report)
0 */3 * * * → agent_dispatcher (task dispatch)
```
---
## Testing Framework
**Structure:**
```
tests/
__init__.py
conftest.py # Pytest fixtures (mock DB, etc.)
test_dispatcher.py # Task dispatcher tests
test_agent_base.py # Base agent tests
test_sql_memory.py # Memory module tests
test_model_router.py # Model selection tests
test_ui_endpoints.py # Dashboard API tests
test_dashboard_*.py # UI component tests
```
**Requirements:**
- **Coverage:** ≥ 80% of production code
- **Test types:**
- Unit: Test functions in isolation (mock dependencies)
- Integration: Test components together
- End-to-end: Test full workflows
- **Naming:** `test_<function>_<scenario>.py`
- **Assertions:** Clear, specific (not just `assertTrue`)
**Run Tests:**
```bash
python -m unittest discover tests/ -v
# or
python -m pytest tests/ -v --cov=.
```
---
## Quality Gates
**Before merging to main:**
1. [ ] All tests pass (100% of changed code)
2. [ ] No new warnings/errors in logs
3. [ ] Code review approved (peer or lead)
4. [ ] Documentation updated
5. [ ] Commit messages clear + descriptive
**Before deploying:**
1. [ ] Tag release (v0.1.0, v1.2.3, etc.)
2. [ ] Update CHANGELOG.md
3. [ ] CI pipeline green
4. [ ] Manual testing in staging
5. [ ] Rollback plan documented
---
## Monitoring & Debugging
**Logs Location:** `/home/oblio/.openclaw/workspace/logs/`
- `agent_*.log` — Per-agent logs
- `sql_memory.log` — SQL query log
- `dispatcher.log` — Task dispatcher
- `report-*.md` — Daily reports
**Memory Location:** SQL `memory.*` tables
- Query agent activity: `SELECT * FROM memory.ActivityLog ORDER BY timestamp DESC`
- Query task queue: `SELECT * FROM memory.TaskQueue WHERE status='pending'`
- Query memories: `SELECT * FROM memory.Memories WHERE domain='....'`
**Dashboard:** http://localhost:3000
- Report: Latest 4:20 AM/PM summary
- Queue: Pending + in-progress tasks
- Logs: Recent agent activity
---
## Deployment Checklist
**First Time Setup:**
```bash
# 1. Clone repo
git clone https://github.com/VeXHarbinger/clawbot-sql-memory.git
# 2. Install dependencies
pip install -r requirements.txt
# 3. Configure .env
cp .env.example .env
# Edit .env with cloud DB credentials
# 4. Run tests
python -m unittest discover tests/ -v
# 5. Start agents
crontab -e # Add cron jobs
# or manually: python agents/agent_dispatcher.py
```
**Verification:**
```bash
# Check cloud DB connection
python -c "from sql_memory import SQLMemory; db = SQLMemory('cloud'); print(db.execute('SELECT COUNT(*) FROM memory.TaskQueue'))"
# Start UI
cd ui && npm start
# Monitor logs
tail -f logs/*.log
```
---
## Troubleshooting
**Problem:** "No pending tasks"
- Check: `SELECT * FROM memory.TaskQueue WHERE status='pending'`
- Check agent filter: `agent='dispatcher'` and task_type matches
**Problem:** "Cloud DB connection timeout"
- Check `.env` credentials
- Check firewall (site4now accessible from your network)
- Try local backend as fallback
**Problem:** "Ollama not responding"
- Check DEAUS is reachable: `curl http://10.0.0.110:11434`
- Check model available: `ollama list`
- Pull model: `ollama pull gemma3:4b`
**Problem:** "Tests failing with SQL errors"
- Mock DB in tests (don't hit cloud during tests)
- Use `infrastructure/agent_base.py` test fixtures
- Check `.env` not loaded in test environment
---
## Next Steps & Continuous Improvement
1. **Monitor queue** — Periodically check pending tasks
2. **Review logs** — Look for WARN/ERROR patterns
3. **Optimize bottlenecks** — Profile slow tasks
4. **Add tests** — If test coverage < 80%, add tests
5. **Refactor** — Clean up tech debt before new features
6. **Document decisions** — Update ARCHITECTURE.md as design evolves
---
_Last updated: 2026-03-09 | Framework version: 1.0_
FILE:knowledge-base/database_structure.md
# Database Structure Documentation
_Last Updated: 2026-03-10_
The database forms the backbone for system memory, task delegation, and knowledge retention. Here's an overview of its schema and usage principles:
## Core Principles
1. **Persistence:** All significant memories and logs are stored in the database to ensure they persist across sessions.
2. **Scalability:** Modular schemas support scaling for new agents and workflows.
3. **Maintainability:** Data is well-documented for ease of use and debugging.
4. **Longevity:** The database is designed to last "forever" as Oblio evolves.
## Schemas & Tables
### `memory` Schema
Holds all data related to knowledge retention and agent operations.
| **Table** | **Purpose** |
|----------------------|-----------------------------------------------------------------------------|
| `ActivityLog` | Tracks agent activities, including logs of completed tasks and operations. |
| `AgentState` | Stores metadata about active agents, their readiness, and current status. |
| `ContextSnapshot` | Holds context dumps for session continuity or debugging. |
| `DecisionLog` | Records decisions made by agents and their justification. |
| `KnowledgeIndex` | Index of key knowledge and their associated domains. |
| `Memories` | Central memory store for facts, patterns, and persistent insights. |
| `PersonaLog` | Tracks persona evolution, including changes in Oblio's personality. |
| `Sessions` | Metadata about chat and interaction history. |
| `SessionLog` | Detailed logs of session-specific activity. |
| `TaskQueue` | Contains queued, pending, processing, and completed tasks. |
### Usage Guidelines
1. Store **everything** meaningful—memories, tasks, logs—to ensure Oblio's long-term growth and reliability.
2. Never overwrite existing data unless explicitly stated (e.g., updates to task statuses).
3. Regularly audit schemas for optimization (e.g., indexes, stored procedures).
### Future Directions
- Add detailed stored procedures for transaction standardization (reduce ad-hoc queries).
- Integrate encrypted storage for sensitive data.
- Explore replication for redundancy and high availability.
<End of documentation.>
FILE:setup_schema.py
"""
setup_schema.py — Create the memory.* schema for clawbot-sql-memory
Run once before first use:
python3 setup_schema.py # uses 'local' profile
python3 setup_schema.py --cloud # uses 'cloud' profile
Requires .env with SQL credentials. See README.md for format.
"""
import argparse
import sys
import os
sys.path.insert(0, os.path.dirname(__file__))
try:
from sql_connector import get_connector
except ImportError:
print("ERROR: sql-connector not found. Install it first:")
print(" clawhub install sql-connector")
sys.exit(1)
SCHEMA_SQL = [
("CREATE SCHEMA memory", "memory schema"),
("""
CREATE TABLE memory.Memories (
id INT IDENTITY(1,1) PRIMARY KEY,
category NVARCHAR(100) NOT NULL,
[key] NVARCHAR(255) NOT NULL,
content NVARCHAR(MAX) NOT NULL,
importance INT DEFAULT 3,
tags NVARCHAR(500) DEFAULT '',
status NVARCHAR(50) DEFAULT 'active',
created_at DATETIME2 DEFAULT GETUTCDATE(),
updated_at DATETIME2 DEFAULT GETUTCDATE()
)
""", "memory.Memories"),
("""
CREATE TABLE memory.TaskQueue (
id INT IDENTITY(1,1) PRIMARY KEY,
agent NVARCHAR(100) NOT NULL,
task_type NVARCHAR(100) NOT NULL,
payload NVARCHAR(MAX) DEFAULT '',
priority INT DEFAULT 5,
status NVARCHAR(50) DEFAULT 'pending',
retries INT DEFAULT 0,
model_hint NVARCHAR(100) DEFAULT '',
created_at DATETIME2 DEFAULT GETUTCDATE(),
updated_at DATETIME2 DEFAULT GETUTCDATE(),
claimed_at DATETIME2 NULL,
completed_at DATETIME2 NULL,
error NVARCHAR(MAX) DEFAULT ''
)
""", "memory.TaskQueue"),
("""
CREATE TABLE memory.ActivityLog (
id INT IDENTITY(1,1) PRIMARY KEY,
event_type NVARCHAR(100) NOT NULL,
agent NVARCHAR(100) DEFAULT '',
description NVARCHAR(MAX) DEFAULT '',
metadata NVARCHAR(MAX) DEFAULT '',
importance INT DEFAULT 3,
created_at DATETIME2 DEFAULT GETUTCDATE()
)
""", "memory.ActivityLog"),
("""
CREATE TABLE memory.Sessions (
id INT IDENTITY(1,1) PRIMARY KEY,
session_key NVARCHAR(255) NOT NULL,
agent NVARCHAR(100) DEFAULT '',
status NVARCHAR(50) DEFAULT 'active',
metadata NVARCHAR(MAX) DEFAULT '',
started_at DATETIME2 DEFAULT GETUTCDATE(),
ended_at DATETIME2 NULL
)
""", "memory.Sessions"),
("""
CREATE TABLE memory.KnowledgeIndex (
id INT IDENTITY(1,1) PRIMARY KEY,
domain NVARCHAR(100) NOT NULL,
[key] NVARCHAR(255) NOT NULL,
content NVARCHAR(MAX) NOT NULL,
source NVARCHAR(255) DEFAULT '',
tags NVARCHAR(500) DEFAULT '',
created_at DATETIME2 DEFAULT GETUTCDATE()
)
""", "memory.KnowledgeIndex"),
("""
CREATE TABLE memory.Todos (
id INT IDENTITY(1,1) PRIMARY KEY,
title NVARCHAR(500) NOT NULL,
description NVARCHAR(MAX) DEFAULT '',
priority INT DEFAULT 3,
status NVARCHAR(50) DEFAULT 'open',
tags NVARCHAR(500) DEFAULT '',
created_at DATETIME2 DEFAULT GETUTCDATE(),
updated_at DATETIME2 DEFAULT GETUTCDATE(),
closed_at DATETIME2 NULL
)
""", "memory.Todos"),
]
def table_exists(db, table_name):
schema, name = table_name.split('.')
rows = db.query(
"SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s",
(schema, name)
)
return len(rows) > 0
def schema_exists(db, schema_name):
rows = db.query(
"SELECT 1 FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = %s",
(schema_name,)
)
return len(rows) > 0
def run(profile='local'):
print(f"\nclawbot-sql-memory schema setup — profile: {profile}\n")
db = get_connector(profile)
if not db.ping():
print("ERROR: Cannot connect to SQL Server. Check your .env credentials.")
sys.exit(1)
print("✅ Connected to SQL Server\n")
for sql, label in SCHEMA_SQL:
if label == "memory schema":
if schema_exists(db, 'memory'):
print(f" SKIP {label} (already exists)")
continue
else:
if table_exists(db, label):
print(f" SKIP {label} (already exists)")
continue
ok = db.execute(sql.strip())
if ok:
print(f" CREATE {label}")
else:
print(f" ERROR {label} — check logs")
print("\nSchema setup complete.\n")
print("Next step: configure your .env and run:")
print(" python3 -c \"from sql_memory import get_memory; print(get_memory('local').ping())\"")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create memory.* schema for clawbot-sql-memory')
parser.add_argument('--cloud', action='store_true', help='Use cloud SQL profile')
args = parser.parse_args()
run('cloud' if args.cloud else 'local')
FILE:sql-memory/SKILL.md
---
name: sql-memory
version: 2.1.0-alpha
status: alpha
description: "Semantic memory layer for OpenClaw agents. Use when: (1) persisting agent memories with importance scoring, (2) hierarchical memory rollups (daily→weekly→monthly→yearly), (3) queuing tasks for agents, (4) logging activity and audit trails, (5) managing knowledge bases with semantic search. Provides remember/recall/search/queue_task/log_event/add_todo APIs. Built on sql-connector. Requires SQL Server schema setup — see README. ALPHA: use at your own risk, API may change."
---
# SQL Memory Skill
> Semantic memory layer for OpenClaw agents
## Overview
Provides agent-friendly memory operations: remember, recall, search, forget, plus task queue management, knowledge indexing, activity logging, and hierarchical memory rollups. All operations go through the SQL Connector skill for reliable, parameterized SQL execution.
See `sql_memory.py` for full implementation.
## Dependencies
- **sql-connector** — provides the underlying database connection and query execution
## Quick Start
```python
from sql_memory import SQLMemory, get_memory
mem = get_memory('cloud')
# Remember something
mem.remember('facts', 'vex_timezone', 'VeX is in EST/EDT timezone', importance=7)
# Recall it
entry = mem.recall('facts', 'vex_timezone')
# Search across all memories
results = mem.search_memories('timezone')
# Queue a task
mem.queue_task('nlp_agent', 'analyze_document', '{"doc": "..."}', priority=3)
# Log an event
mem.log_event('training_complete', 'nlp_agent', 'Finished training cycle 42')
# Store knowledge
mem.store_knowledge('stamps', 'inverted_jenny', 'Rare 1918 misprint...', 'catalog')
```
## Schema
All tables live in the `memory` schema (SQL Server database):
| Table | Purpose |
|-------|---------|
| `memory.Memories` | Long-term curated memories with importance scoring |
| `memory.TaskQueue` | Task queue for agent work items |
| `memory.ActivityLog` | Event/activity logging for audit trail |
| `memory.KnowledgeIndex` | Domain-specific knowledge store |
| `memory.Sessions` | Session tracking for agents |
## Memory Rollups
Hierarchical consolidation keeps memories fresh and relevant:
```
Daily memories → Weekly rollup (Sundays 3AM)
Weekly rollups → Monthly rollup (1st of month)
Monthly → Quarterly (Jan/Apr/Jul/Oct)
Quarterly → Yearly (Jan 1st)
```
Each rollup:
1. Summarizes source entries
2. Creates a consolidated entry with back-references
3. Reduces importance of source entries
4. Tags sources as `rolled_up`
### Importance Scale
| Level | Meaning | Example |
|-------|---------|---------|
| 1-2 | Ephemeral, archive | Old workspace file |
| 3-4 | Context, nice-to-know | Debug notes |
| 5-6 | Standard operational | Task completion |
| 7-8 | Important milestone | Architecture decision |
| 9 | Critical | System design choice |
| 10 | Permanent | Core identity/values |
## API Reference
### Memory Operations
| Method | Description | Example |
|--------|-------------|---------|
| `remember(cat, key, content, importance, tags)` | Store a memory | `mem.remember('facts', 'name', 'Oblio', 7)` |
| `recall(cat, key)` | Retrieve a memory | `mem.recall('facts', 'name')` |
| `search_memories(query, limit)` | Semantic search | `mem.search_memories('timezone', limit=5)` |
| `forget(cat, key)` | Delete a memory | `mem.forget('facts', 'name')` |
### Task Queue
| Method | Description |
|--------|-------------|
| `queue_task(agent, type, payload, priority)` | Add a task |
| `claim_task(id)` | Mark task as processing |
| `complete_task(id, result)` | Mark task as completed |
| `fail_task(id, error, retries, max)` | Fail with retry logic |
### Activity Logging
| Method | Description |
|--------|-------------|
| `log_event(type, agent, detail, extra)` | Log an activity |
| `get_recent_activity(hours, agent)` | Query recent events |
## Configuration
Uses the same environment variables as sql-connector:
```
SQL_CLOUD_SERVER=sql5112.site4now.net
SQL_CLOUD_DATABASE=db_99ba1f_memory4oblio
SQL_CLOUD_USER=...
SQL_CLOUD_PASSWORD=...
SQL_LOCAL_SERVER=10.0.0.110
SQL_LOCAL_DATABASE=Oblio_Memories
SQL_LOCAL_USER=sa
SQL_LOCAL_PASSWORD=...
```
## Architecture
```
┌──────────────────┐
│ Agents │ ← OblioAgent subclasses
├──────────────────┤
│ SQLMemory │ ← Semantic operations (remember/recall/queue/log)
├──────────────────┤
│ SQLConnector │ ← Generic SQL execution (retry, parameterized, logging)
├──────────────────┤
│ pymssql (TDS) │ ← Native SQL Server driver
└──────────────────┘
```
## License
MIT
FILE:sql-memory/sql_memory.py
#!/usr/bin/env python3
"""
sql_memory.py — Oblio SQL Memory Module (v2.0)
===============================================
Semantic memory layer for OpenClaw agents. All operations go through
SQLConnector v2 (pymssql, parameterised, sealed API) — no subprocess/sqlcmd.
Supports two backends:
- 'cloud' → site4now hosted (SQL5112.site4now.net) — default
- 'local' → SQL Server on DEAUS (10.0.0.110)
Backward-compatible with v1.x callers:
- SQLMemory('cloud') — works as before
- get_memory('cloud') — singleton factory preserved
- mem.remember / recall / search / queue_task / log_event — all preserved
- mem.execute(raw_sql) — preserved as legacy passthrough (returns bool)
- mem.execute_scalar(sql) — preserved, returns Any
New in v2.0:
- Transport: pymssql native driver (no sqlcmd subprocess)
- UTC timestamps everywhere (datetime.now(timezone.utc))
- Parameterised queries throughout — no string interpolation
- execute_via_file() preserved as execute() — file-based workaround no longer needed
- _parse_table() kept for any callers using raw output parsing (no-op path)
Usage:
from sql_memory import SQLMemory, get_memory
mem = get_memory('cloud')
mem.remember('facts', 'sky_color', 'The sky is blue', importance=3)
result = mem.recall('facts', 'sky_color')
"""
import os
import sys
import json
import logging
from datetime import datetime, timezone
from typing import Optional, List, Dict, Any
# ── Find and load .env ────────────────────────────────────────────────────────
import pathlib as _pathlib
def _find_env() -> Optional[str]:
p = _pathlib.Path(os.path.abspath(__file__)).parent
for _ in range(5):
c = p / '.env'
if c.exists():
return str(c)
p = p.parent
return None
try:
from dotenv import load_dotenv
_env = _find_env()
if _env:
load_dotenv(_env, override=True)
except ImportError:
_env = _find_env()
if _env:
with open(_env) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, v = line.split('=', 1)
os.environ[k.strip()] = v.strip().strip('"').strip("'")
# ── Import connector (handles both skill install path and infrastructure path) ─
def _import_connector():
"""Find and import SQLConnector from wherever it's installed."""
# Try adjacent skill install first (workspace/skills/sql-connector/scripts/)
skill_scripts_path = os.path.join(os.path.dirname(__file__), '..', '..', 'sql-connector', 'scripts')
skill_path = os.path.join(os.path.dirname(__file__), '..', 'sql-connector')
infra_path = os.path.join(os.path.dirname(__file__), '..', 'infrastructure')
for p in [skill_scripts_path, skill_path, infra_path, os.path.dirname(__file__)]:
abs_p = os.path.abspath(p)
if os.path.exists(os.path.join(abs_p, 'sql_connector.py')):
if abs_p not in sys.path:
sys.path.insert(0, abs_p)
from sql_connector import get_connector, SQLConnector
return get_connector, SQLConnector
raise ImportError("sql_connector.py not found. Install the sql-connector skill first.")
get_connector, SQLConnector = _import_connector()
# ── Logger ────────────────────────────────────────────────────────────────────
LOG_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'logs')
os.makedirs(LOG_DIR, exist_ok=True)
_log = logging.getLogger('sql_memory')
if not _log.handlers:
_log.setLevel(logging.INFO)
_fh = logging.FileHandler(os.path.join(LOG_DIR, 'sql_dbo.log'))
_fh.setFormatter(logging.Formatter('%(asctime)s [sql_memory] %(levelname)s %(message)s'))
_fh.formatter.converter = __import__('time').gmtime # UTC timestamps in log file
_log.addHandler(_fh)
# ── SQLMemory ─────────────────────────────────────────────────────────────────
class SQLMemory:
"""
Unified SQL memory interface for Oblio agents.
Wraps SQLConnector — all queries are parameterised, no string interpolation.
Args:
backend: 'cloud' (default) or 'local'
"""
def __init__(self, backend: str = 'cloud') -> None:
self.backend = backend
self._db = get_connector(backend)
_log.info(f"SQLMemory v2.0 initialized (backend={backend})")
# ── Low-level passthrough (v1.x compatibility) ────────────────────────────
def execute(self, query: str, timeout: int = 30) -> bool:
"""
Legacy passthrough for raw SQL. Returns bool (v2) instead of stdout string (v1).
Callers that checked 'error' in result.lower() should switch to checking False.
NOTE: Raw SQL here bypasses parameterisation — migrate to mem.* methods for new code.
"""
return self._db.execute(query)
def execute_scalar(self, query: str) -> Optional[Any]:
"""Execute query and return first value. Parameterised via scalar()."""
return self._db.scalar(query)
def execute_via_file(self, query: str, timeout: int = 30) -> bool:
"""v1.x large-payload workaround — now just delegates to execute() since pymssql has no arg-length limit."""
return self._db.execute(query)
def execute_rows(self, query: str) -> List[str]:
"""Execute query and return rows as list of strings (v1.x compat)."""
rows = self._db.query(query)
return [str(list(r.values())) for r in rows]
def ping(self) -> bool:
return self._db.ping()
# ── Memory Operations ─────────────────────────────────────────────────────
def remember(self, category: str, key: str, content: str,
importance: int = 3, tags: str = '') -> bool:
"""
Store or update a memory. Upserts by category + key_name.
importance: 1-10 (10 = permanent)
"""
now = datetime.now(timezone.utc)
ok = self._db.execute("""
MERGE memory.Memories AS target
USING (SELECT %s AS category, %s AS key_name) AS source
ON target.category = source.category
AND target.key_name = source.key_name
AND target.is_active = 1
WHEN MATCHED THEN
UPDATE SET content=%s, importance=%s, tags=%s, updated_at=%s
WHEN NOT MATCHED THEN
INSERT (category, key_name, content, importance, tags, source, is_active, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, 'sql_memory', 1, %s, %s);
""", (category, key, content, importance, tags, now,
category, key, content, importance, tags, now, now))
_log.info(f"remember({category}/{key}) → {'ok' if ok else 'failed'}")
return ok
def recall(self, category: str, key: str) -> Optional[str]:
"""Retrieve a specific memory's content. Returns content string or None."""
rows = self._db.query(
"SELECT content FROM memory.Memories WHERE category=%s AND key_name=%s AND is_active=1",
(category, key)
)
return rows[0]['content'] if rows else None
def recall_recent(self, n: int = 10) -> List[Dict[str, Any]]:
"""Return the N most recently updated memories across all categories."""
return self._db.query("""
SELECT TOP (%s) category, key_name, content, importance, tags,
CONVERT(varchar, ISNULL(updated_at, created_at), 120) AS ts
FROM memory.Memories WHERE is_active=1
ORDER BY ISNULL(updated_at, created_at) DESC
""", (n,))
def search_memories(self, keyword: str, limit: int = 20) -> List[Dict[str, Any]]:
"""Full-text search across content, tags, and key_name."""
like = f'%{keyword}%'
return self._db.query("""
SELECT TOP (%s) category, key_name, content, importance, tags
FROM memory.Memories
WHERE is_active=1
AND (content LIKE %s OR tags LIKE %s OR key_name LIKE %s)
ORDER BY importance DESC, ISNULL(updated_at, created_at) DESC
""", (limit, like, like, like))
def forget(self, category: str, key: str) -> bool:
"""Soft-delete a memory (set is_active=0)."""
ok = self._db.execute(
"UPDATE memory.Memories SET is_active=0 WHERE category=%s AND key_name=%s",
(category, key)
)
_log.info(f"forget({category}/{key})")
return ok
# ── Activity Log ──────────────────────────────────────────────────────────
def log_event(self, event_type: str, agent: str, description: str,
metadata: str = '', importance: int = 3) -> bool:
"""Write an event to the activity log (logged_at set by DB default)."""
return self._db.execute("""
INSERT INTO memory.ActivityLog (event_type, agent, description, metadata, importance)
VALUES (%s, %s, %s, %s, %s)
""", (event_type, agent, description, metadata, importance))
def get_recent_activity(self, since_hours: int = 24,
agent: Optional[str] = None) -> List[Dict]:
"""Get recent activity log entries."""
col = 'logged_at' # actual column name on cloud schema
if agent:
return self._db.query(f"""
SELECT event_type, agent, description,
CONVERT(varchar, {col}, 120) AS ts
FROM memory.ActivityLog
WHERE {col} >= DATEADD(HOUR, -%s, GETUTCDATE())
AND agent=%s
ORDER BY {col} DESC
""", (since_hours, agent))
return self._db.query(f"""
SELECT event_type, agent, description,
CONVERT(varchar, {col}, 120) AS ts
FROM memory.ActivityLog
WHERE {col} >= DATEADD(HOUR, -%s, GETUTCDATE())
ORDER BY {col} DESC
""", (since_hours,))
# ── Task Queue ────────────────────────────────────────────────────────────
def queue_task(self, agent: str, task_type: str, payload: str = '{}',
priority: Any = 5) -> Optional[str]:
"""Insert a new task. Priority can be int or string name."""
_pmap = {'critical': 1, 'high': 2, 'medium': 5, 'low': 7, 'free': 9}
if isinstance(priority, str):
priority = _pmap.get(priority.lower(), 5)
priority = int(priority)
now = datetime.now(timezone.utc)
ok = self._db.execute("""
INSERT INTO memory.TaskQueue (agent, task_type, payload, priority, status, created_at)
VALUES (%s, %s, %s, %s, 'pending', %s)
""", (agent, task_type, payload, priority, now))
if not ok:
return None
tid = self._db.scalar("""
SELECT TOP 1 id FROM memory.TaskQueue
WHERE agent=%s AND task_type=%s AND status='pending'
ORDER BY created_at DESC
""", (agent, task_type))
_log.info(f"queue_task({agent}/{task_type}) → id={tid}")
return str(tid) if tid else None
def get_pending_tasks(self, agent: str, task_types: List[str],
limit: int = 10) -> List[Dict]:
"""Fetch pending tasks for an agent, ordered by priority then age."""
if not task_types:
return []
placeholders = ','.join(['%s'] * len(task_types))
return self._db.query(f"""
SELECT TOP (%s) id, task_type, payload, priority, retry_count
FROM memory.TaskQueue
WHERE agent=%s AND task_type IN ({placeholders}) AND status='pending'
ORDER BY priority ASC, created_at ASC
""", (limit, agent, *task_types))
def claim_task(self, task_id: Any) -> bool:
"""Mark a task as processing."""
return self._db.execute("""
UPDATE memory.TaskQueue
SET status='processing', started_at=%s
WHERE id=%s
""", (datetime.now(timezone.utc), int(task_id)))
def complete_task(self, task_id: Any, result: str = '') -> bool:
"""Mark a task as completed."""
ok = self._db.execute("""
UPDATE memory.TaskQueue
SET status='completed', completed_at=%s, error_log=%s
WHERE id=%s
""", (datetime.now(timezone.utc), result[:500], int(task_id)))
_log.info(f"complete_task({task_id})")
return ok
def fail_task(self, task_id: Any, error: str, retry_count: int = 0,
max_retries: int = 3) -> bool:
"""Fail or re-queue a task based on retry count."""
new_status = 'pending' if retry_count < max_retries else 'failed'
ok = self._db.execute("""
UPDATE memory.TaskQueue
SET status=%s, retry_count=retry_count+1, error_log=%s
WHERE id=%s
""", (new_status, error[:800], int(task_id)))
_log.info(f"fail_task({task_id}) → {new_status}")
return ok
def get_completed_tasks(self, since_hours: int = 24,
agent: Optional[str] = None) -> List[Dict]:
"""Get recently completed or failed tasks."""
if agent:
return self._db.query("""
SELECT id, agent, task_type, status,
CONVERT(varchar, completed_at, 120) AS ts
FROM memory.TaskQueue
WHERE status IN ('completed','failed')
AND completed_at >= DATEADD(HOUR, -%s, GETUTCDATE())
AND agent=%s
ORDER BY completed_at DESC
""", (since_hours, agent))
return self._db.query("""
SELECT id, agent, task_type, status,
CONVERT(varchar, completed_at, 120) AS ts
FROM memory.TaskQueue
WHERE status IN ('completed','failed')
AND completed_at >= DATEADD(HOUR, -%s, GETUTCDATE())
ORDER BY completed_at DESC
""", (since_hours,))
# ── Knowledge Index ───────────────────────────────────────────────────────
def store_knowledge(self, domain: str, topic: str, summary: str = '',
file_path: str = '', tags: str = '') -> bool:
"""Store or update a knowledge entry. Upserts by domain + topic."""
now = datetime.now(timezone.utc)
return self._db.execute("""
MERGE memory.KnowledgeIndex AS target
USING (SELECT %s AS domain, %s AS topic) AS source
ON target.domain = source.domain AND target.topic = source.topic
WHEN MATCHED THEN
UPDATE SET summary=%s, file_path=%s,
last_trained=%s, training_count=ISNULL(training_count,0)+1
WHEN NOT MATCHED THEN
INSERT (domain, topic, summary, file_path, last_trained, training_count, created_at)
VALUES (%s, %s, %s, %s, %s, 1, %s);
""", (domain, topic, summary, file_path, now,
domain, topic, summary, file_path, now, now))
def search_knowledge(self, domain: str, keyword: str = '') -> List[Dict]:
"""Search knowledge entries by domain and optional keyword."""
if keyword:
like = f'%{keyword}%'
return self._db.query("""
SELECT domain, topic, summary, file_path, training_count,
CONVERT(varchar, last_trained, 120) AS ts
FROM memory.KnowledgeIndex
WHERE domain=%s AND (topic LIKE %s OR summary LIKE %s)
ORDER BY last_trained DESC
""", (domain, like, like))
return self._db.query("""
SELECT domain, topic, summary, file_path, training_count,
CONVERT(varchar, last_trained, 120) AS ts
FROM memory.KnowledgeIndex
WHERE domain=%s ORDER BY last_trained DESC
""", (domain,))
def get_recent_knowledge(self, n: int = 10) -> List[Dict]:
"""Get the N most recently updated knowledge entries."""
return self._db.query("""
SELECT TOP (%s) domain, topic, summary,
CONVERT(varchar, last_trained, 120) AS ts
FROM memory.KnowledgeIndex ORDER BY last_trained DESC
""", (n,))
# ── Sessions ──────────────────────────────────────────────────────────────
def get_session_context(self, session_id: str) -> Optional[Dict]:
"""Load session context from the database."""
rows = self._db.query(
"SELECT session_key, channel, summary, token_count FROM memory.Sessions WHERE session_key=%s",
(session_id,)
)
if not rows:
return None
row = rows[0]
try:
row['context'] = json.loads(row.get('summary') or '{}')
except (json.JSONDecodeError, TypeError):
row['context'] = {}
return row
def save_session_context(self, session_id: str, context_data: Dict,
channel: str = 'agent', token_count: int = 0) -> bool:
"""Persist session context as JSON in memory.Sessions."""
ctx_json = json.dumps(context_data, default=str)
now = datetime.now(timezone.utc)
return self._db.execute("""
MERGE memory.Sessions AS target
USING (SELECT %s AS session_key) AS source
ON target.session_key = source.session_key
WHEN MATCHED THEN
UPDATE SET summary=%s, token_count=%s, ended_at=%s
WHEN NOT MATCHED THEN
INSERT (session_key, channel, summary, token_count, started_at)
VALUES (%s, %s, %s, %s, %s);
""", (session_id, ctx_json, token_count, now,
session_id, channel, ctx_json, token_count, now))
# ── Todos ─────────────────────────────────────────────────────────────────
def add_todo(self, title: str, project: str = '', priority: int = 5,
tags: str = '', due_date=None) -> Optional[int]:
"""Insert a new todo item. Returns the new todo id."""
now = datetime.now(timezone.utc)
return self._db.scalar("""
INSERT INTO memory.Todos (title, project, priority, status, tags, due_date, created_at)
OUTPUT INSERTED.id
VALUES (%s, %s, %s, 'open', %s, %s, %s)
""", (title, project, priority, tags, due_date, now))
def complete_todo(self, todo_id: int, status: str = 'done') -> bool:
"""Mark a todo as done/completed."""
now = datetime.now(timezone.utc)
return self._db.execute("""
UPDATE memory.Todos SET status=%s, completed_at=%s WHERE id=%s
""", (status, now, todo_id))
def update_todo(self, todo_id: int, **fields) -> bool:
"""Update arbitrary todo fields. Allowed: title, project, priority, status, tags, due_date."""
allowed = {'title', 'project', 'priority', 'status', 'tags', 'due_date'}
updates = {k: v for k, v in fields.items() if k in allowed}
if not updates:
return False
set_clause = ', '.join(f'{k}=%s' for k in updates)
params = list(updates.values()) + [todo_id]
return self._db.execute(
f'UPDATE memory.Todos SET {set_clause} WHERE id=%s', params
)
def delete_todo(self, todo_id: int) -> bool:
"""Hard-delete a todo. Prefer complete_todo() for audit trail."""
return self._db.execute('DELETE FROM memory.Todos WHERE id=%s', (todo_id,))
# ── Utility ───────────────────────────────────────────────────────────────
def _parse_table(self, raw_output: str, columns: List[str]) -> List[Dict]:
"""
v1.x compat stub. v2 query() returns list[dict] directly.
Kept so any code that calls _parse_table on raw output still runs.
Returns empty list — callers should migrate to query().
"""
_log.debug("_parse_table() called — migrate caller to use query() directly")
return []
def ensure_schema(self) -> bool:
"""Create memory schema + core tables if they don't exist."""
self._db.execute("IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name='memory') EXEC('CREATE SCHEMA memory')")
tables = {
'Memories': """CREATE TABLE memory.Memories (
id BIGINT IDENTITY(1,1) PRIMARY KEY, category NVARCHAR(100) NOT NULL,
key_name NVARCHAR(255), content NVARCHAR(MAX) NOT NULL,
importance TINYINT DEFAULT 3, tags NVARCHAR(500), source NVARCHAR(255),
created_at DATETIME2 DEFAULT GETUTCDATE(), updated_at DATETIME2 DEFAULT GETUTCDATE(),
expires_at DATETIME2, is_active BIT DEFAULT 1)""",
'Sessions': """CREATE TABLE memory.Sessions (
id BIGINT IDENTITY(1,1) PRIMARY KEY, session_key NVARCHAR(255),
channel NVARCHAR(100), started_at DATETIME2 DEFAULT GETUTCDATE(),
ended_at DATETIME2, summary NVARCHAR(MAX), token_count INT DEFAULT 0)""",
'TaskQueue': """CREATE TABLE memory.TaskQueue (
id BIGINT IDENTITY(1,1) PRIMARY KEY, agent NVARCHAR(100) NOT NULL,
task_type NVARCHAR(100) NOT NULL, payload NVARCHAR(MAX),
priority TINYINT DEFAULT 5, status NVARCHAR(50) DEFAULT 'pending',
created_at DATETIME2 DEFAULT GETUTCDATE(), started_at DATETIME2,
completed_at DATETIME2, error_log NVARCHAR(MAX), retry_count TINYINT DEFAULT 0)""",
'KnowledgeIndex': """CREATE TABLE memory.KnowledgeIndex (
id BIGINT IDENTITY(1,1) PRIMARY KEY, domain NVARCHAR(100) NOT NULL,
topic NVARCHAR(255) NOT NULL, file_path NVARCHAR(1000),
summary NVARCHAR(MAX), last_trained DATETIME2, training_count INT DEFAULT 0,
created_at DATETIME2 DEFAULT GETUTCDATE())""",
'ActivityLog': """CREATE TABLE memory.ActivityLog (
id BIGINT IDENTITY(1,1) PRIMARY KEY, event_type NVARCHAR(100) NOT NULL,
agent NVARCHAR(100), description NVARCHAR(MAX), metadata NVARCHAR(MAX),
importance TINYINT DEFAULT 3, created_at DATETIME2 DEFAULT GETUTCDATE())""",
}
for name, ddl in tables.items():
self._db.execute(f"""
IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA='memory' AND TABLE_NAME='{name}')
BEGIN {ddl} END
""")
_log.info(f"ensure_schema: {name} OK")
return True
# ── Module-level factory (singleton) ─────────────────────────────────────────
_instances: Dict[str, SQLMemory] = {}
def get_memory(backend: str = 'cloud') -> SQLMemory:
"""Get or create a SQLMemory instance. Singleton per backend."""
if backend not in _instances:
_instances[backend] = SQLMemory(backend)
return _instances[backend]
# ── Self-test ─────────────────────────────────────────────────────────────────
if __name__ == '__main__':
print("sql_memory v2.0 — self-test")
passed = failed = 0
def t(name, fn):
global passed, failed
try:
r = fn()
print(f" ✅ {name}: {r!r:.60}" if r is not None else f" ✅ {name}: ok")
passed += 1
except Exception as e:
print(f" ❌ {name}: {e}")
failed += 1
mem = get_memory('cloud')
t("ping", mem.ping)
t("remember", lambda: mem.remember('test', '_v2_test', 'v2 self-test', importance=1, tags='test'))
t("recall", lambda: mem.recall('test', '_v2_test'))
t("search", lambda: mem.search_memories('v2 self-test'))
t("log_event", lambda: mem.log_event('selftest', 'sql_memory', 'v2 test'))
t("queue_task", lambda: mem.queue_task('test_agent', '_v2_task', '{}', priority=1))
t("forget", lambda: mem.forget('test', '_v2_test'))
print(f"\n{passed} passed, {failed} failed")
import sys; sys.exit(1 if failed else 0)
FILE:tests/test_queue_daemon.py
#!/usr/bin/env python3
"""
test_queue_daemon.py — Unit tests for queue daemon routing and task processing
"""
import os
import sys
import unittest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'agents'))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'infrastructure'))
class TestAgentRegistry(unittest.TestCase):
"""Test that agent registry is properly configured."""
def test_registry_has_entries(self):
from queue_daemon import AGENT_REGISTRY
self.assertGreater(len(AGENT_REGISTRY), 5)
def test_registry_values_are_tuples(self):
from queue_daemon import AGENT_REGISTRY
for name, val in AGENT_REGISTRY.items():
self.assertIsInstance(val, tuple, f"{name} should map to (module, class) tuple")
self.assertEqual(len(val), 2, f"{name} tuple should have 2 elements")
class TestStubAgent(unittest.TestCase):
"""Test the fallback stub agent."""
def test_stub_handles_any_task(self):
from queue_daemon import _get_stub_agent
stub = _get_stub_agent()
result = stub.run_task({
'id': '999', 'agent': 'unknown', 'task_type': 'unknown_type'
})
self.assertIn('Stub', result)
def test_stub_singleton(self):
from queue_daemon import _get_stub_agent
a = _get_stub_agent()
b = _get_stub_agent()
self.assertIs(a, b)
class TestResolveAgent(unittest.TestCase):
"""Test agent resolution logic."""
def test_resolve_registered_agent(self):
from queue_daemon import resolve_agent_for_task
agent = resolve_agent_for_task('dispatcher', 'github_setup')
self.assertIsNotNone(agent)
def test_resolve_unknown_agent_returns_stub(self):
from queue_daemon import resolve_agent_for_task, _StubAgent
agent = resolve_agent_for_task('totally_fake_agent', 'fake_task')
self.assertIsInstance(agent, _StubAgent)
class TestFetchParsing(unittest.TestCase):
"""Test that fetch_next_task returns proper structure."""
def test_fetch_returns_none_or_dict(self):
from sql_memory import get_memory
from queue_daemon import fetch_next_task
mem = get_memory('cloud')
result = fetch_next_task(mem)
self.assertTrue(result is None or isinstance(result, dict))
if __name__ == '__main__':
unittest.main(verbosity=2)
FILE:tests/test_queue_verifier.py
#!/usr/bin/env python3
"""
Unit tests for infrastructure/queue_verifier.py
Auto-generated stub — 2026-03-09
BEST PRACTICES:
- One test per function/behavior
- Arrange → Act → Assert pattern
- Mock all external dependencies (SQL, Ollama, filesystem)
- Test happy path + edge cases + error conditions
- Use pytest fixtures for reusable setup
- All tests must be independent (no shared state)
"""
import pytest
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch, call
# ── Path setup ──────────────────────────────────────────────────────────────
WORKSPACE = Path(__file__).parent.parent
sys.path.insert(0, str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE / "infrastructure"))
# ── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
def mock_sql():
"""Mock SQLMemory to prevent real DB calls in tests."""
with patch("infrastructure.sql_memory.SQLMemory") as mock:
instance = mock.return_value
instance.queue_task.return_value = True
instance.log_event.return_value = True
instance.get_pending_tasks.return_value = []
yield instance
@pytest.fixture
def mock_ollama():
"""Mock Ollama API calls."""
with patch("urllib.request.urlopen") as mock:
import json
mock.return_value.__enter__.return_value.read.return_value = \
json.dumps({"response": "Mock Ollama response"}).encode()
yield mock
# ── Tests for infrastructure/queue_verifier.py ──────────────────────────────────────────────────────
class TestQueueVerifier:
"""Test suite for queue_verifier."""
def test___init__(self, mock_sql, mock_ollama):
"""
Test: __init__()
Source line: 17
TODO: Add test docstring
"""
# TODO: Implement test for __init__
# Arrange
# ... set up test data ...
# Act
# result = __init__('test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test___init___handles_errors(self, mock_sql):
"""Test error handling in __init__()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_get_pending_count(self, mock_sql, mock_ollama):
"""
Test: get_pending_count()
Source line: 20
Docstring: Get count of pending tasks
"""
# TODO: Implement test for get_pending_count
# Arrange
# ... set up test data ...
# Act
# result = get_pending_count()
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_get_pending_count_handles_errors(self, mock_sql):
"""Test error handling in get_pending_count()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_get_pending_by_agent(self, mock_sql, mock_ollama):
"""
Test: get_pending_by_agent()
Source line: 35
Docstring: Get pending tasks grouped by agent
"""
# TODO: Implement test for get_pending_by_agent
# Arrange
# ... set up test data ...
# Act
# result = get_pending_by_agent()
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_get_pending_by_agent_handles_errors(self, mock_sql):
"""Test error handling in get_pending_by_agent()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_mark_completed(self, mock_sql, mock_ollama):
"""
Test: mark_completed()
Source line: 45
Docstring: Mark tasks as completed
"""
# TODO: Implement test for mark_completed
# Arrange
# ... set up test data ...
# Act
# result = mark_completed('test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_mark_completed_handles_errors(self, mock_sql):
"""Test error handling in mark_completed()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_get_failed_tasks(self, mock_sql, mock_ollama):
"""
Test: get_failed_tasks()
Source line: 61
Docstring: Get tasks that failed recently
"""
# TODO: Implement test for get_failed_tasks
# Arrange
# ... set up test data ...
# Act
# result = get_failed_tasks('test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_get_failed_tasks_handles_errors(self, mock_sql):
"""Test error handling in get_failed_tasks()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_retry_failed(self, mock_sql, mock_ollama):
"""
Test: retry_failed()
Source line: 71
Docstring: Retry a failed task if under retry limit
"""
# TODO: Implement test for retry_failed
# Arrange
# ... set up test data ...
# Act
# result = retry_failed('test', 'test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_retry_failed_handles_errors(self, mock_sql):
"""Test error handling in retry_failed()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
FILE:tests/test_sql_memory.py
#!/usr/bin/env python3
"""
test_sql_memory.py — Unit tests for SQLMemory
Covers: connection, CRUD, task queue, knowledge, error handling
"""
import os
import sys
import unittest
from unittest.mock import patch, MagicMock
from datetime import datetime
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'infrastructure'))
from sql_memory import SQLMemory, _esc, get_memory
class TestEscaping(unittest.TestCase):
"""Test SQL string escaping."""
def test_single_quotes_escaped(self):
self.assertEqual(_esc("it's"), "it''s")
def test_double_quotes_preserved(self):
self.assertEqual(_esc('say "hello"'), 'say "hello"')
def test_max_length_truncation(self):
long = "x" * 5000
result = _esc(long, max_len=100)
self.assertEqual(len(result), 100)
def test_empty_string(self):
self.assertEqual(_esc(""), "")
def test_none_handling(self):
self.assertEqual(_esc(None), "")
def test_numeric_coercion(self):
self.assertEqual(_esc(42), "42")
class TestSQLMemoryConnection(unittest.TestCase):
"""Test connection and ping."""
def test_get_memory_cloud(self):
mem = get_memory('cloud')
self.assertIsInstance(mem, SQLMemory)
def test_ping(self):
mem = get_memory('cloud')
result = mem.ping()
self.assertIsInstance(result, bool)
def test_ping_succeeds_with_valid_creds(self):
mem = get_memory('cloud')
self.assertTrue(mem.ping(), "Ping should succeed with valid credentials")
class TestMemoryCRUD(unittest.TestCase):
"""Test remember/recall/search/forget cycle."""
def setUp(self):
self.mem = get_memory('cloud')
self.test_key = f"test_{datetime.now().strftime('%Y%m%d%H%M%S')}"
def test_remember_and_recall(self):
self.mem.remember('test', self.test_key, 'test content', importance=1, tags='test')
result = self.mem.recall('test', self.test_key)
self.assertIsNotNone(result)
self.assertIn('test content', str(result))
def test_recall_nonexistent(self):
result = self.mem.recall('test', 'nonexistent_key_xyz_99999')
self.assertIsNone(result)
def test_search_memories(self):
self.mem.remember('test', self.test_key, 'searchable content xyz', importance=1, tags='test')
results = self.mem.search_memories('searchable content xyz')
self.assertIsInstance(results, list)
def test_forget(self):
self.mem.remember('test', self.test_key, 'to be forgotten', importance=1, tags='test')
result = self.mem.forget('test', self.test_key)
self.assertIsInstance(result, bool)
def tearDown(self):
# Cleanup test entries
self.mem.execute(f"DELETE FROM memory.Memories WHERE key_name='{self.test_key}'")
class TestTaskQueue(unittest.TestCase):
"""Test task queue operations."""
def setUp(self):
self.mem = get_memory('cloud')
self.test_task_type = f"test_task_{datetime.now().strftime('%H%M%S')}"
def test_queue_and_claim(self):
self.mem.queue_task('test_agent', self.test_task_type, '{}', priority=9)
# Verify it was queued
out = self.mem.execute(
f"SELECT id FROM memory.TaskQueue WHERE task_type='{self.test_task_type}' AND status='pending'"
)
self.assertIn(self.test_task_type[:5], self.test_task_type) # sanity
def test_complete_task(self):
self.mem.queue_task('test_agent', self.test_task_type, '{}', priority=9)
out = self.mem.execute_scalar(
f"SELECT TOP 1 id FROM memory.TaskQueue WHERE task_type='{self.test_task_type}' AND status='pending'"
)
if out and out.strip().isdigit():
tid = out.strip()
self.mem.claim_task(tid)
result = self.mem.complete_task(tid, 'test result')
self.assertTrue(result)
def test_fail_task_retry(self):
self.mem.queue_task('test_agent', self.test_task_type, '{}', priority=9)
out = self.mem.execute_scalar(
f"SELECT TOP 1 id FROM memory.TaskQueue WHERE task_type='{self.test_task_type}' AND status='pending'"
)
if out and out.strip().isdigit():
tid = out.strip()
self.mem.claim_task(tid)
self.mem.fail_task(tid, 'test error', 0, 3)
# Should be back to pending with retry_count incremented
status = self.mem.execute_scalar(
f"SELECT status FROM memory.TaskQueue WHERE id={tid}"
)
self.assertIn('pending', str(status))
def tearDown(self):
self.mem.execute(
f"DELETE FROM memory.TaskQueue WHERE task_type='{self.test_task_type}'"
)
class TestActivityLog(unittest.TestCase):
"""Test event logging."""
def setUp(self):
self.mem = get_memory('cloud')
def test_log_event(self):
self.mem.log_event('test_event', 'test_agent', 'unit test log entry')
results = self.mem.get_recent_activity(since_hours=1, agent='test_agent')
self.assertIsInstance(results, list)
class TestKnowledge(unittest.TestCase):
"""Test knowledge store operations."""
def setUp(self):
self.mem = get_memory('cloud')
self.test_topic = f"test_topic_{datetime.now().strftime('%H%M%S')}"
def test_store_and_search_knowledge(self):
self.mem.store_knowledge('test_domain', self.test_topic, 'test summary', 'test_source')
results = self.mem.search_knowledge('test_domain', self.test_topic)
self.assertIsInstance(results, list)
def tearDown(self):
self.mem.execute(
f"DELETE FROM memory.KnowledgeIndex WHERE topic='{self.test_topic}'"
)
class TestEdgeCases(unittest.TestCase):
"""Test error handling and edge cases."""
def test_execute_invalid_sql(self):
mem = get_memory('cloud')
result = mem.execute("SELECT * FROM nonexistent_table_xyz")
# Should not crash, may return error text
self.assertIsInstance(result, str)
def test_remember_with_special_chars(self):
mem = get_memory('cloud')
key = f"special_test_{datetime.now().strftime('%H%M%S')}"
mem.remember('test', key, "Content with 'quotes' and \"doubles\" and <html>", importance=1)
result = mem.recall('test', key)
# Cleanup
mem.execute(f"DELETE FROM memory.Memories WHERE key_name='{key}'")
def test_very_long_content(self):
mem = get_memory('cloud')
key = f"long_test_{datetime.now().strftime('%H%M%S')}"
long_content = "x" * 5000
mem.remember('test', key, long_content, importance=1)
# Should truncate gracefully
mem.execute(f"DELETE FROM memory.Memories WHERE key_name='{key}'")
if __name__ == '__main__':
unittest.main(verbosity=2)
FILE:tests/test_todo.py
#!/usr/bin/env python3
"""
Unit tests for infrastructure/todo.py
Auto-generated stub — 2026-03-09
BEST PRACTICES:
- One test per function/behavior
- Arrange → Act → Assert pattern
- Mock all external dependencies (SQL, Ollama, filesystem)
- Test happy path + edge cases + error conditions
- Use pytest fixtures for reusable setup
- All tests must be independent (no shared state)
"""
import pytest
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch, call
# ── Path setup ──────────────────────────────────────────────────────────────
WORKSPACE = Path(__file__).parent.parent
sys.path.insert(0, str(WORKSPACE))
sys.path.insert(0, str(WORKSPACE / "infrastructure"))
# ── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture
def mock_sql():
"""Mock SQLMemory to prevent real DB calls in tests."""
with patch("infrastructure.sql_memory.SQLMemory") as mock:
instance = mock.return_value
instance.queue_task.return_value = True
instance.log_event.return_value = True
instance.get_pending_tasks.return_value = []
yield instance
@pytest.fixture
def mock_ollama():
"""Mock Ollama API calls."""
with patch("urllib.request.urlopen") as mock:
import json
mock.return_value.__enter__.return_value.read.return_value = \
json.dumps({"response": "Mock Ollama response"}).encode()
yield mock
# ── Tests for infrastructure/todo.py ──────────────────────────────────────────────────────
class TestTodo:
"""Test suite for todo."""
def test___init__(self, mock_sql, mock_ollama):
"""
Test: __init__()
Source line: 37
TODO: Add test docstring
"""
# TODO: Implement test for __init__
# Arrange
# ... set up test data ...
# Act
# result = __init__('test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test___init___handles_errors(self, mock_sql):
"""Test error handling in __init__()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_add_task(self, mock_sql, mock_ollama):
"""
Test: add_task()
Source line: 40
Docstring: Add a new TODO item.
"""
# TODO: Implement test for add_task
# Arrange
# ... set up test data ...
# Act
# result = add_task('test', 'test', 'test', 'test', 'test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_add_task_handles_errors(self, mock_sql):
"""Test error handling in add_task()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_list_by_priority(self, mock_sql, mock_ollama):
"""
Test: list_by_priority()
Source line: 59
Docstring: Print all TODOs organized by priority.
"""
# TODO: Implement test for list_by_priority
# Arrange
# ... set up test data ...
# Act
# result = list_by_priority()
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_list_by_priority_handles_errors(self, mock_sql):
"""Test error handling in list_by_priority()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_get_report(self, mock_sql, mock_ollama):
"""
Test: get_report()
Source line: 93
Docstring: Generate text report of TODOs.
"""
# TODO: Implement test for get_report
# Arrange
# ... set up test data ...
# Act
# result = get_report('test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_get_report_handles_errors(self, mock_sql):
"""Test error handling in get_report()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_claim_task(self, mock_sql, mock_ollama):
"""
Test: claim_task()
Source line: 120
Docstring: Claim a task to work on.
"""
# TODO: Implement test for claim_task
# Arrange
# ... set up test data ...
# Act
# result = claim_task('test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_claim_task_handles_errors(self, mock_sql):
"""Test error handling in claim_task()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_complete_task(self, mock_sql, mock_ollama):
"""
Test: complete_task()
Source line: 125
Docstring: Mark task complete.
"""
# TODO: Implement test for complete_task
# Arrange
# ... set up test data ...
# Act
# result = complete_task('test', 'test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_complete_task_handles_errors(self, mock_sql):
"""Test error handling in complete_task()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
def test_fail_task(self, mock_sql, mock_ollama):
"""
Test: fail_task()
Source line: 132
Docstring: Mark task failed.
"""
# TODO: Implement test for fail_task
# Arrange
# ... set up test data ...
# Act
# result = fail_task('test', 'test')
# Assert
# assert result is not None
pytest.skip("STUB — implement me")
def test_fail_task_handles_errors(self, mock_sql):
"""Test error handling in fail_task()."""
# TODO: Test error conditions (bad input, network failure, etc.)
pytest.skip("STUB — implement me")
Generic SQL Server connectivity for OpenClaw agents. Use when: (1) executing parameterized queries against SQL Server, (2) building repository layers that ne...
---
name: sql-connector
version: 2.1.0-alpha
status: alpha
description: "Generic SQL Server connectivity for OpenClaw agents. Use when: (1) executing parameterized queries against SQL Server, (2) building repository layers that need a sealed, retry-capable SQL transport, (3) any agent that needs reliable MSSQL access without subprocess/sqlcmd. Provides execute/query/scalar APIs via pymssql with automatic retry, connection pooling, and structured error handling. ALPHA: use at your own risk, API may change."
---
# SQL Connector Skill
> Generic SQL Server connectivity for OpenClaw agents — pymssql transport
## Overview
Provides a reusable, sealed SQL Server connection layer with automatic retry, parameterized queries, and structured error handling. Built on **pymssql** (native TDS driver — no sqlcmd required).
## Installation
```bash
clawhub install sql-connector
```
## Quick Start
```python
from sql_connector import get_connector
db = get_connector('cloud') # or 'local'
# Execute (INSERT/UPDATE/DELETE)
ok = db.execute("INSERT INTO memory.Logs (msg) VALUES (%s)", ("hello",))
# Query (SELECT → list of dicts)
rows = db.query("SELECT id, name FROM memory.Memories WHERE category=%s", ("facts",))
# Scalar (single value)
count = db.scalar("SELECT COUNT(*) FROM memory.TaskQueue WHERE status='pending'")
```
## Environment Variables
```
SQL_CLOUD_SERVER=sql5112.site4now.net
SQL_CLOUD_DATABASE=db_99ba1f_memory4oblio
SQL_CLOUD_USER=...
SQL_CLOUD_PASSWORD=...
SQL_LOCAL_SERVER=10.0.0.110
SQL_LOCAL_DATABASE=Oblio_Memories
SQL_LOCAL_USER=sa
SQL_LOCAL_PASSWORD=...
```
## Architecture
```
SQLConnector (ABC, _LockCoreMethods metaclass)
execute() / query() / scalar() ← SEALED — parameterized only, no override
MSSQLConnector (pymssql, TDS 7.4)
└── get_connector(backend) factory
```
`execute()` and `query()` are sealed by metaclass — subclasses cannot override them, enforcing parameterized-only access.
## License
MIT
FILE:README.md
# clawbot-sql-connector
> ⚠️ **ALPHA — Use at your own risk.** API is functional and tested but may change. We're actively using this in production and will stabilize the API after 30 days of community feedback. Please open issues for anything that breaks.
A sealed, retry-capable SQL Server connector for OpenClaw agents. Built on **pymssql** — no `sqlcmd` or `mssql-tools` required.
## Features
- `get_connector('cloud')` / `get_connector('local')` factory
- Abstract base (`SQLConnector`) with `_LockCoreMethods` metaclass — `execute()` and `query()` cannot be overridden in subclasses, keeping all queries parameterized
- Automatic retry with exponential backoff on transient failures
- `execute()` — INSERT/UPDATE/DELETE, returns bool
- `query()` — SELECT, returns list of dicts
- `scalar()` — single value (e.g. `INSERTED.id`)
- `ping()` — connectivity check
- Environment-based credentials via `.env` — nothing hardcoded
## Requirements
```bash
pip install pymssql python-dotenv
```
> **Note:** `pymssql` bundles its own TDS driver. No `sqlcmd`, no ODBC drivers, no `mssql-tools` needed.
## Installation
```bash
clawhub install sql-connector
```
## .env Setup
Backend configuration uses a simple naming pattern. Add these to your `.env`:
```env
# Local instance
SQL_local_server=10.0.0.110
SQL_local_port=1433
SQL_local_database=YourDatabase
SQL_local_user=your_user
SQL_local_password=your_password
# Cloud instance (Azure / site4now / etc.)
SQL_cloud_server=yourserver.database.windows.net
SQL_cloud_port=1433
SQL_cloud_database=your_cloud_db
SQL_cloud_user=your_cloud_user
SQL_cloud_password=your_cloud_password
# Add new backends with the same pattern:
# SQL_<backend>_server, SQL_<backend>_database, SQL_<backend>_user, SQL_<backend>_password
SQL_staging_server=staging.database.windows.net
SQL_staging_port=1433
SQL_staging_database=staging_db
SQL_staging_user=staging_user
SQL_staging_password=staging_password
```
Then connect:
```python
db = get_connector('local') # Uses SQL_local_* vars
db = get_connector('cloud') # Uses SQL_cloud_* vars
db = get_connector('staging') # Uses SQL_staging_* vars
```
**To add a new backend:** Just add 4 env vars to `.env` following the pattern. No code changes needed.
## Quick Start
```python
from sql_connector import get_connector
db = get_connector('cloud') # or 'local'
# INSERT / UPDATE / DELETE
ok = db.execute(
"INSERT INTO memory.Logs (category, msg) VALUES (%s, %s)",
("info", "hello world")
)
# SELECT → list of dicts
rows = db.query(
"SELECT id, content FROM memory.Memories WHERE category = %s",
("facts",)
)
# Single value
count = db.scalar("SELECT COUNT(*) FROM memory.TaskQueue WHERE status = %s", ("pending",))
# Connectivity check
if db.ping():
print("connected")
```
## Architecture
```
SQLConnector (ABC, _LockCoreMethods metaclass)
├── execute() / query() / scalar() ← SEALED — parameterized only, no overrides
├── ping()
├── _connect() ← abstract
└── MSSQLConnector (pymssql) ← concrete implementation
```
Extend by subclassing `MSSQLConnector` to add domain-specific repository methods. See [clawbot-sql-memory](https://github.com/VeXHarbinger/clawbot-sql-memory) for an example.
## Security Note
All queries are parameterized. The metaclass seals `execute()` and `query()` so subclasses cannot bypass parameterization. Never pass user input via f-strings or string concatenation into SQL — the connector will not prevent it at the call site if you build your query string before passing it in.
## Related
- [clawbot-sql-memory](https://github.com/VeXHarbinger/clawbot-sql-memory) — Semantic memory layer built on this connector
- [oblio-heart-and-soul](https://github.com/VeXHarbinger/oblio-heart-and-soul) — Full agent system reference implementation
## Community
Found a bug? Have an improvement? Open an issue — this is alpha and community feedback shapes the v1 API.
## License
MIT
FILE:sql_connector.py
#!/usr/bin/env python3
"""
sql_connector.py — Generic SQL Server Connector (v2.0)
=======================================================
Reusable, driver-native SQL Server connectivity for OpenClaw agents.
Transport: pymssql (native TDS driver — no subprocess, no sqlcmd dependency).
Security model:
- SQLConnector is abstract (ABC) — cannot be instantiated directly
- execute() and query() are sealed via metaclass — subclasses cannot bypass them
- All queries must use parameterised binding (%s placeholders)
- No string interpolation in execute/query — enforced by design
- Credentials loaded from environment only
Upgrade path from v1.x (sqlcmd-based):
- API is backward-compatible: from_env(), execute(), query(), ping() all preserved
- execute() now returns bool (success/failure) instead of raw stdout string
- query() now returns list[dict] directly (no columns arg needed)
- execute_scalar() preserved, returns Any instead of Optional[str]
- New: scalar() method for single-value queries
Usage:
from sql_connector import MSSQLConnector, get_connector
db = get_connector('cloud')
rows = db.query("SELECT id, name FROM memory.Memories WHERE category=%s", ('facts',))
ok = db.execute("UPDATE memory.Memories SET importance=%s WHERE id=%s", (5, 42))
val = db.scalar("SELECT COUNT(*) FROM memory.TaskQueue WHERE status=%s", ('pending',))
"""
from __future__ import annotations
import abc
import logging
import os
import time
from typing import Any
import pymssql
from dotenv import load_dotenv
# Walk up from this file to find .env (handles install into skills/ subdir)
import pathlib as _pathlib
def _find_env() -> str | None:
p = _pathlib.Path(os.path.abspath(__file__)).parent
for _ in range(5):
c = p / '.env'
if c.exists():
return str(c)
p = p.parent
return None
_env = _find_env()
if _env:
load_dotenv(_env, override=True)
_log = logging.getLogger(__name__)
# ── Backend configuration ─────────────────────────────────────────────────────
_BACKENDS: dict[str, dict[str, Any]] = {
'local': {
'server': os.getenv('SQL_SERVER', os.getenv('SQL_LOCAL_SERVER', '10.0.0.110')),
'port': int(os.getenv('SQL_PORT', os.getenv('SQL_LOCAL_PORT', '1433'))),
'database': os.getenv('SQL_DATABASE', os.getenv('SQL_LOCAL_DATABASE', 'Oblio_Memories')),
'user': os.getenv('SQL_USER', os.getenv('SQL_LOCAL_USER', 'oblio')),
'password': os.getenv('SQL_PASSWORD', os.getenv('SQL_LOCAL_PASSWORD', '')),
},
'cloud': {
'server': os.getenv('SQL_CLOUD_SERVER', ''),
'port': int(os.getenv('SQL_CLOUD_PORT', '1433')),
'database': os.getenv('SQL_CLOUD_DATABASE', ''),
'user': os.getenv('SQL_CLOUD_USER', ''),
'password': os.getenv('SQL_CLOUD_PASSWORD', ''),
},
}
# ── Metaclass: seal execute/query against subclass override ──────────────────
_SEALED = frozenset({'execute', 'query'})
class _SealCoreMethods(abc.ABCMeta):
"""Prevent any subclass from overriding execute() or query()."""
def __new__(mcs, name, bases, namespace):
for method in _SEALED:
if method in namespace:
for base in bases:
for ancestor in getattr(base, '__mro__', []):
if method in vars(ancestor) and getattr(ancestor, '__name__', '') == 'SQLConnector':
raise TypeError(
f"{name}: '{method}()' is sealed and cannot be overridden. "
"Add domain logic in a repository subclass instead."
)
return super().__new__(mcs, name, bases, namespace)
# ── Custom exceptions ─────────────────────────────────────────────────────────
class SQLConnectorError(Exception):
"""Base connector error."""
class SQLConnectionError(SQLConnectorError):
"""Connection-level failure (retry-eligible)."""
class SQLQueryError(SQLConnectorError):
"""Query execution failure (do not retry)."""
# ── Abstract base ─────────────────────────────────────────────────────────────
class SQLConnector(abc.ABC, metaclass=_SealCoreMethods):
"""
Abstract SQL connector.
Concrete subclasses must implement _connect().
execute() and query() are sealed — extend via repository subclasses.
"""
MAX_RETRIES: int = 3
RETRY_DELAY: float = 2.0
def __init__(self, backend: str = 'cloud') -> None:
if backend not in _BACKENDS:
raise ValueError(f"Unknown backend '{backend}'. Options: {list(_BACKENDS)}")
self._backend = backend
self._cfg = _BACKENDS[backend]
@classmethod
def from_env(cls, profile: str = 'cloud', **kwargs) -> 'SQLConnector':
"""Create connector from environment variables (v1.x compat)."""
if profile not in _BACKENDS:
raise SQLConnectionError(f"Unknown profile '{profile}'")
instance = cls.__new__(cls)
SQLConnector.__init__(instance, profile)
return instance
@abc.abstractmethod
def _connect(self) -> Any:
"""Return an open DB-API 2.0 connection."""
# ── Sealed public API ─────────────────────────────────────────────────────
def execute(self, sql: str, params: tuple = ()) -> bool:
"""
Run INSERT / UPDATE / DELETE with parameterised binding.
Returns True on success, False on failure after retries.
"""
for attempt in range(self.MAX_RETRIES):
try:
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
conn.commit()
return True
except Exception as exc:
_log.warning("execute attempt %d/%d: %s", attempt + 1, self.MAX_RETRIES, exc)
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY)
_log.error("execute failed after %d attempts", self.MAX_RETRIES)
return False
def query(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]:
"""
Run SELECT with parameterised binding. Returns list[dict].
"""
for attempt in range(self.MAX_RETRIES):
try:
with self._connect() as conn:
with conn.cursor(as_dict=True) as cur:
cur.execute(sql, params)
return cur.fetchall() or []
except Exception as exc:
_log.warning("query attempt %d/%d: %s", attempt + 1, self.MAX_RETRIES, exc)
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY)
_log.error("query failed after %d attempts", self.MAX_RETRIES)
return []
def scalar(self, sql: str, params: tuple = ()) -> Any:
"""Return first column of first row, or None. Tuple cursor avoids unnamed-column issues."""
for attempt in range(self.MAX_RETRIES):
try:
with self._connect() as conn:
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
return row[0] if row else None
except Exception as exc:
_log.warning("scalar attempt %d/%d: %s", attempt + 1, self.MAX_RETRIES, exc)
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY)
return None
def execute_scalar(self, sql: str, params: tuple = ()) -> Any:
"""Alias for scalar() — v1.x compatibility."""
return self.scalar(sql, params)
def ping(self) -> bool:
"""Test connectivity. Returns True if reachable."""
try:
return self.scalar("SELECT 1") == 1
except Exception:
return False
@property
def backend(self) -> str:
return self._backend
# ── Concrete: Microsoft SQL Server via pymssql ────────────────────────────────
class MSSQLConnector(SQLConnector):
"""
SQL Server connector using pymssql (native TDS, no sqlcmd dependency).
One connection per call — pymssql is not thread-safe with shared connections.
"""
def _connect(self) -> Any:
cfg = self._cfg
return pymssql.connect(
server=cfg['server'],
port=cfg['port'],
user=cfg['user'],
password=cfg['password'],
database=cfg['database'],
timeout=30,
login_timeout=10,
tds_version='7.4',
)
# ── Factory ───────────────────────────────────────────────────────────────────
def get_connector(backend: str = 'cloud') -> SQLConnector:
"""
Factory: returns the appropriate SQLConnector for the given backend.
Add new database types here without changing callers.
"""
return MSSQLConnector(backend)
# ── Self-test ─────────────────────────────────────────────────────────────────
if __name__ == '__main__':
import sys
print("sql_connector v2.0 — self-test")
for profile in ['cloud', 'local']:
try:
db = get_connector(profile)
ok = db.ping()
print(f" {profile}: {'✅ connected' if ok else '⚠️ ping returned False'}")
except Exception as e:
print(f" {profile}: ❌ {e}")
sys.exit(0)