@clawhub-noeldelisle-d3ecab956d
AI Agent Observability & Debug Console - flight recorder and debug console for autonomous AI systems
---
name: lobsterops
description: AI Agent Observability & Debug Console - flight recorder and debug console for autonomous AI systems
version: 1.0.0
metadata:
openclaw:
emoji: "\U0001F99E"
requires:
bins:
- node
env: []
homepage: https://github.com/noeldelisle/LobsterOps
---
# LobsterOps Skill
AI Agent Observability & Debug Console. A lightweight, flexible "black box flight recorder" and debug console for AI agents. Automatically captures agent thoughts, tool calls, decisions, errors, spawning events, and lifecycle transitions.
## Core Tasks
- "Log my agent activity to LobsterOps" - Initialize LobsterOps and begin recording agent events with structured logging
- "Show me what my agent did" - Query the event log and display a chronological trace of agent activity
- "Debug my agent's last session" - Use the debug console to step through agent execution with time-travel debugging
- "Analyze my agent's behavior patterns" - Run behavioral analytics to detect loops, failure patterns, and performance trends
- "Set up alerts for my agent" - Configure alerting rules for cost spikes, repeated failures, or anomalous behavior
- "Export my agent logs" - Export events to JSON, CSV, or Markdown format for sharing or auditing
## Environment Variable Contract
| Variable | Required | Description |
|----------|----------|-------------|
| `LOBSTER_STORAGE` | No | Storage backend type: `json`, `memory`, `sqlite`, or `supabase` (default: `json`) |
| `SUPABASE_URL` | If using supabase | Supabase project URL |
| `SUPABASE_KEY` | If using supabase | Supabase anon or service role key |
## Configuration
LobsterOps uses OpenClaw's config system. Place configuration at `.openclaw/workspace/config/lobsterops.json`:
```json
{
"enabled": true,
"storageType": "json",
"storageConfig": {
"dataDir": "./agent-logs",
"maxAgeDays": 30
},
"piiFiltering": {
"enabled": true,
"patterns": ["email", "phone", "ssn", "creditCard", "ipAddress", "apiKey"]
},
"alerts": {
"enabled": true,
"rules": []
}
}
```
### Storage Backend Options
**JSON Files (default, zero-config):**
```json
{ "storageType": "json", "storageConfig": { "dataDir": "./agent-logs" } }
```
**SQLite (lightweight production):**
```json
{ "storageType": "sqlite", "storageConfig": { "filename": "./lobsterops.db" } }
```
**Supabase (cloud, team collaboration):**
```json
{
"storageType": "supabase",
"storageConfig": {
"supabaseUrl": "https://your-project.supabase.co",
"supabaseKey": "your-anon-key"
}
}
```
## Security & Guardrails
- LobsterOps includes built-in PII filtering that automatically redacts emails, phone numbers, SSNs, credit card numbers, IP addresses, and API keys from logged events
- All data is stored locally by default (JSON files or SQLite) - no data leaves the machine unless Supabase is explicitly configured
- The Supabase backend requires explicit URL and key configuration - credentials are never inferred or auto-discovered
- Event retention policies automatically clean up old data based on configurable age limits
- LobsterOps never modifies agent behavior - it is strictly read-only observation
## Troubleshooting
- **"Cannot find module 'sqlite3'"**: Run `npm install sqlite3` - only needed if using SQLite backend
- **"Supabase table does not exist"**: Create the required table in your Supabase dashboard using the DDL provided in the error message
- **Events not appearing**: Ensure `enabled: true` in config and that `await ops.init()` has been called
- **High disk usage**: Reduce `maxAgeDays` in storage config or run `await ops.cleanupOld()` manually
- **PII still visible in logs**: Check that `piiFiltering.enabled` is `true` and the relevant pattern types are listed
FILE:jest.config.js
module.exports = {
testEnvironment: 'node',
testTimeout: 30000,
forceExit: true
};
FILE:tests/LobsterOps.test.js
const { LobsterOps } = require('../src/core/LobsterOps');
const { PIIFilter } = require('../src/core/PIIFilter');
const { Exporter } = require('../src/core/Exporter');
const { DebugConsole } = require('../src/core/DebugConsole');
const { Analytics } = require('../src/core/Analytics');
const { AlertManager } = require('../src/core/AlertManager');
const { OpenClawInstrumentation } = require('../src/core/OpenClawInstrumentation');
const { JsonFileStorage } = require('../src/storage/JsonFileStorage');
const { MemoryStorage } = require('../src/storage/MemoryStorage');
const { StorageFactory } = require('../src/storage/StorageFactory');
describe('LobsterOps Core Functionality', () => {
let lobsterOps;
const testDir = './test-lobsterops-data';
beforeEach(async () => {
// Clean up any previous test data
const fs = require('fs').promises;
try {
await fs.rm(testDir, { recursive: true, force: true });
} catch (e) {
// Ignore if directory doesn't exist
}
lobsterOps = new LobsterOps({
storageType: 'json',
storageConfig: {
dataDir: testDir
}
});
await lobsterOps.init();
});
afterEach(async () => {
await lobsterOps.close();
// Clean up test data
const fs = require('fs').promises;
try {
await fs.rm(testDir, { recursive: true, force: true });
} catch (e) {
// Ignore if directory doesn't exist
}
});
describe('Initialization', () => {
test('should initialize successfully with JSON storage', async () => {
expect(lobsterOps.isReady()).toBe(true);
});
test('should be disabled when enabled: false', async () => {
const disabledOps = new LobsterOps({ enabled: false });
await disabledOps.init();
expect(disabledOps.isReady()).toBe(false);
await disabledOps.close();
});
});
describe('Event Logging', () => {
test('should log a basic event and return an ID', async () => {
const event = {
type: 'test-event',
message: 'This is a test event',
data: { key: 'value' }
};
const eventId = await lobsterOps.logEvent(event);
expect(eventId).toBeDefined();
expect(typeof eventId).toBe('string');
expect(eventId.length).toBeGreaterThan(0);
});
test('should enrich events with metadata', async () => {
const event = {
type: 'enrichment-test',
message: 'Testing event enrichment'
};
const eventId = await lobsterOps.logEvent(event);
const retrievedEvent = await lobsterOps.getEvent(eventId);
expect(retrievedEvent).toBeDefined();
expect(retrievedEvent.id).toBe(eventId);
expect(retrievedEvent.timestamp).toMatch(/\d{4}-\d{2}-\d{2}/); // ISO date format
expect(retrievedEvent.lobsterOpsInstanceId).toBeDefined();
expect(retrievedEvent.loggedAt).toMatch(/\d{4}-\d{2}-\d{2}/);
});
test('should preserve original event data', async () => {
const originalEvent = {
type: 'preservation-test',
agentId: 'agent-123',
action: 'test-action',
input: { query: 'test query' },
output: { result: 'success' },
durationMs: 150
};
const eventId = await lobsterOps.logEvent(originalEvent);
const retrievedEvent = await lobsterOps.getEvent(eventId);
expect(retrievedEvent.agentId).toBe(originalEvent.agentId);
expect(retrievedEvent.action).toBe(originalEvent.action);
expect(retrievedEvent.input).toEqual(originalEvent.input);
expect(retrievedEvent.output).toEqual(originalEvent.output);
expect(retrievedEvent.durationMs).toBe(originalEvent.durationMs);
});
});
describe('Event Querying', () => {
test('should be able to query events by various criteria', async () => {
// Log several test events
const eventsToLog = [
{
type: 'query-test',
agentId: 'agent-alpha',
action: 'action-one',
timestamp: '2026-03-18T10:00:00Z'
},
{
type: 'query-test',
agentId: 'agent-beta',
action: 'action-two',
timestamp: '2026-03-18T11:00:00Z'
},
{
type: 'other-type',
agentId: 'agent-alpha',
action: 'action-three',
timestamp: '2026-03-18T12:00:00Z'
}
];
const eventIds = [];
for (const event of eventsToLog) {
const id = await lobsterOps.logEvent(event);
eventIds.push(id);
}
// Query by type
const typeResults = await lobsterOps.queryEvents({ eventTypes: ['query-test'] });
expect(typeResults).toHaveLength(2);
// Query by agentId
const agentResults = await lobsterOps.queryEvents({ agentIds: ['agent-alpha'] });
expect(agentResults).toHaveLength(2);
// Query by multiple criteria
const combinedResults = await lobsterOps.queryEvents({
eventTypes: ['query-test'],
agentIds: ['agent-alpha']
});
expect(combinedResults).toHaveLength(1);
expect(combinedResults[0].action).toBe('action-one');
});
test('should respect limit and offset parameters', async () => {
// Log 5 events
for (let i = 0; i < 5; i++) {
await lobsterOps.logEvent({
type: 'pagination-test',
index: i
});
}
// Get first 2
const firstPage = await lobsterOps.queryEvents({
eventTypes: ['pagination-test'],
limit: 2,
offset: 0
});
expect(firstPage).toHaveLength(2);
// Get next 2
const secondPage = await lobsterOps.queryEvents({
eventTypes: ['pagination-test'],
limit: 2,
offset: 2
});
expect(secondPage).toHaveLength(2);
// Get last 1
const thirdPage = await lobsterOps.queryEvents({
eventTypes: ['pagination-test'],
limit: 2,
offset: 4
});
expect(thirdPage).toHaveLength(1);
});
test('should sort results correctly', async () => {
// Log events with specific timestamps
const times = [
'2026-03-18T10:00:00Z',
'2026-03-18T12:00:00Z',
'2026-03-18T11:00:00Z'
];
for (const time of times) {
await lobsterOps.logEvent({
type: 'sort-test',
timestamp: time
});
}
// Ascending order
const ascResults = await lobsterOps.queryEvents({
eventTypes: ['sort-test'],
sortBy: 'timestamp',
sortOrder: 'asc'
});
expect(ascResults[0].timestamp).toBe('2026-03-18T10:00:00Z');
expect(ascResults[1].timestamp).toBe('2026-03-18T11:00:00Z');
expect(ascResults[2].timestamp).toBe('2026-03-18T12:00:00Z');
// Descending order
const descResults = await lobsterOps.queryEvents({
eventTypes: ['sort-test'],
sortBy: 'timestamp',
sortOrder: 'desc'
});
expect(descResults[0].timestamp).toBe('2026-03-18T12:00:00Z');
expect(descResults[1].timestamp).toBe('2026-03-18T11:00:00Z');
expect(descResults[2].timestamp).toBe('2026-03-18T10:00:00Z');
});
});
describe('Event Updates', () => {
test('should be able to update an existing event', async () => {
const event = {
type: 'update-test',
status: 'pending',
progress: 0
};
const eventId = await lobsterOps.logEvent(event);
// Update the event
const updateResult = await lobsterOps.updateEvent(eventId, {
status: 'completed',
progress: 100,
result: 'success'
});
expect(updateResult).toBe(true);
// Verify the update
const updatedEvent = await lobsterOps.getEvent(eventId);
expect(updatedEvent.status).toBe('completed');
expect(updatedEvent.progress).toBe(100);
expect(updatedEvent.result).toBe('success');
expect(updatedEvent.updatedAt).toBeDefined();
});
test('should return false when updating non-existent event', async () => {
const result = await lobsterOps.updateEvent('non-existent-id', {
status: 'updated'
});
expect(result).toBe(false);
});
});
describe('Event Deletion', () => {
test('should be able to delete events by criteria', async () => {
// Log events to delete and keep
await lobsterOps.logEvent({
type: 'delete-test',
agentId: 'to-delete',
shouldKeep: false
});
await lobsterOps.logEvent({
type: 'delete-test',
agentId: 'to-keep',
shouldKeep: true
});
await lobsterOps.logEvent({
type: 'other-type',
agentId: 'to-delete',
shouldKeep: false
});
// Delete events with agentId: 'to-delete'
const deletedCount = await lobsterOps.deleteEvents({
agentIds: ['to-delete']
});
expect(deletedCount).toBe(2);
// Verify only the keep event remains
const remainingEvents = await lobsterOps.queryEvents({
eventTypes: ['delete-test']
});
expect(remainingEvents).toHaveLength(1);
expect(remainingEvents[0].agentId).toBe('to-keep');
});
});
describe('Storage Statistics', () => {
test('should return accurate storage statistics', async () => {
// Log some events
await lobsterOps.logEvent({ type: 'stats-test', count: 1 });
await lobsterOps.logEvent({ type: 'stats-test', count: 2 });
const stats = await lobsterOps.getStats();
expect(stats.enabled).toBe(true);
expect(stats.instanceId).toBeDefined();
expect(stats.storageType).toBe('json');
expect(stats.backend).toBe('json-file');
expect(stats.totalEvents).toBeGreaterThanOrEqual(2);
expect(stats.dataDir).toBe('./test-lobsterops-data');
});
});
describe('Cleanup Functionality', () => {
test('should be able to cleanup old events', async () => {
// This test mainly verifies the cleanup method doesn't throw
// Actual age-based cleanup would require manipulating timestamps
const initialStats = await lobsterOps.getStats();
const cleanedCount = await lobsterOps.cleanupOld();
const finalStats = await lobsterOps.getStats();
expect(typeof cleanedCount).toBe('number');
expect(cleanedCount >= 0).toBe(true);
});
});
describe('PII Filtering Integration', () => {
test('should filter PII from logged events', async () => {
const opsWithPII = new LobsterOps({
storageType: 'json',
storageConfig: { dataDir: testDir },
piiFiltering: { enabled: true }
});
await opsWithPII.init();
const eventId = await opsWithPII.logEvent({
type: 'pii-test',
message: 'Contact [email protected] for details',
phone: 'Call 555-123-4567'
});
const event = await opsWithPII.getEvent(eventId);
expect(event.message).not.toContain('[email protected]');
expect(event.message).toContain('[REDACTED]');
expect(event.phone).toContain('[REDACTED]');
await opsWithPII.close();
});
test('should not filter when PII filtering is disabled', async () => {
const opsNoPII = new LobsterOps({
storageType: 'json',
storageConfig: { dataDir: testDir },
piiFiltering: { enabled: false }
});
await opsNoPII.init();
const eventId = await opsNoPII.logEvent({
type: 'no-pii-test',
message: 'Contact [email protected]'
});
const event = await opsNoPII.getEvent(eventId);
expect(event.message).toContain('[email protected]');
await opsNoPII.close();
});
});
describe('Export Integration', () => {
test('should export events to CSV', async () => {
await lobsterOps.logEvent({ type: 'export-test', agentId: 'agent-1', action: 'test' });
await lobsterOps.logEvent({ type: 'export-test', agentId: 'agent-2', action: 'test2' });
const csv = await lobsterOps.exportEvents('csv', { eventTypes: ['export-test'] });
expect(csv).toContain('id');
expect(csv).toContain('type');
expect(csv).toContain('export-test');
expect(csv.split('\n').length).toBeGreaterThanOrEqual(3); // header + 2 rows
});
test('should export events to Markdown', async () => {
await lobsterOps.logEvent({ type: 'md-test', agentId: 'agent-1' });
const md = await lobsterOps.exportEvents('markdown', { eventTypes: ['md-test'] }, { title: 'Test Report' });
expect(md).toContain('# Test Report');
expect(md).toContain('**Total Events:**');
expect(md).toContain('md-test');
});
test('should export events to JSON', async () => {
await lobsterOps.logEvent({ type: 'json-test' });
const json = await lobsterOps.exportEvents('json', { eventTypes: ['json-test'] });
const parsed = JSON.parse(json);
expect(Array.isArray(parsed)).toBe(true);
expect(parsed.length).toBeGreaterThanOrEqual(1);
});
});
describe('Debug Console Integration', () => {
test('should create a debug console for an agent trace', async () => {
await lobsterOps.logEvent({ type: 'agent-thought', agentId: 'debug-agent', thought: 'thinking...' });
await lobsterOps.logEvent({ type: 'tool-call', agentId: 'debug-agent', toolName: 'search' });
await lobsterOps.logEvent({ type: 'agent-decision', agentId: 'debug-agent', decision: 'done' });
const debug = await lobsterOps.createDebugConsole('debug-agent');
expect(debug).toBeInstanceOf(DebugConsole);
expect(debug.length).toBe(3);
});
});
describe('Analytics Integration', () => {
test('should run analytics on events', async () => {
await lobsterOps.logEvent({ type: 'tool-call', agentId: 'a1', success: true, durationMs: 100, cost: 0.01 });
await lobsterOps.logEvent({ type: 'tool-call', agentId: 'a1', success: false, durationMs: 200, cost: 0.02 });
await lobsterOps.logEvent({ type: 'agent-error', agentId: 'a1', errorType: 'timeout' });
const report = await lobsterOps.analyze({ agentIds: ['a1'] });
expect(report.totalEvents).toBe(3);
expect(report.successRate.totalToolCalls).toBe(2);
expect(report.successRate.successful).toBe(1);
expect(report.successRate.failed).toBe(1);
});
});
});
describe('Storage Factory', () => {
test('should create JSON storage by default', () => {
const storage = StorageFactory.createStorage();
expect(storage).toBeInstanceOf(JsonFileStorage);
});
test('should create memory storage when requested', () => {
const storage = StorageFactory.createStorage('memory');
expect(storage).toBeInstanceOf(MemoryStorage);
});
test('should throw error for unsupported storage type', () => {
expect(() => StorageFactory.createStorage('unsupported-type'))
.toThrow('Unsupported storage type');
});
test('should auto-detect storage in test environment', () => {
process.env.NODE_ENV = 'test';
const storage = StorageFactory.createAutoStorage();
expect(storage).toBeInstanceOf(MemoryStorage);
delete process.env.NODE_ENV;
});
test('should respect explicit type over auto-detection', () => {
process.env.NODE_ENV = 'test';
const storage = StorageFactory.createAutoStorage({ type: 'json' });
expect(storage).toBeInstanceOf(JsonFileStorage);
delete process.env.NODE_ENV;
});
test('should list supported storage types', () => {
const types = StorageFactory.getSupportedTypes();
expect(types).toContain('json');
expect(types).toContain('memory');
});
});
describe('PIIFilter', () => {
test('should redact email addresses', () => {
const filter = new PIIFilter();
expect(filter.filterString('Email me at [email protected]'))
.toBe('Email me at [REDACTED]');
});
test('should redact phone numbers', () => {
const filter = new PIIFilter();
expect(filter.filterString('Call 555-123-4567'))
.toBe('Call [REDACTED]');
});
test('should redact SSNs', () => {
const filter = new PIIFilter();
expect(filter.filterString('SSN: 123-45-6789'))
.toBe('SSN: [REDACTED]');
});
test('should redact API keys', () => {
const filter = new PIIFilter();
expect(filter.filterString('Use key sk_live_abc123def456ghi789jkl0'))
.toBe('Use key [REDACTED]');
});
test('should filter nested objects', () => {
const filter = new PIIFilter();
const result = filter.filter({
name: 'Test',
contact: { email: '[email protected]', note: 'valid' },
list: ['[email protected]', 'plain text']
});
expect(result.contact.email).toBe('[REDACTED]');
expect(result.contact.note).toBe('valid');
expect(result.list[0]).toBe('[REDACTED]');
expect(result.list[1]).toBe('plain text');
});
test('should respect enabled flag', () => {
const filter = new PIIFilter({ enabled: false });
expect(filter.filterString('[email protected]')).toBe('[email protected]');
});
test('should allow selecting specific patterns', () => {
const filter = new PIIFilter({ patterns: ['email'] });
const result = filter.filterString('Email: [email protected], Phone: 555-123-4567');
expect(result).toContain('[REDACTED]');
expect(result).toContain('555-123-4567');
});
test('should use custom replacement text', () => {
const filter = new PIIFilter({ replacement: '***' });
expect(filter.filterString('[email protected]')).toBe('***');
});
});
describe('Exporter', () => {
const sampleEvents = [
{ id: '1', type: 'tool-call', agentId: 'agent-1', action: 'search', timestamp: '2026-03-18T10:00:00Z' },
{ id: '2', type: 'agent-error', agentId: 'agent-1', action: 'fetch', timestamp: '2026-03-18T11:00:00Z' }
];
test('should export to JSON', () => {
const json = Exporter.toJSON(sampleEvents);
const parsed = JSON.parse(json);
expect(parsed).toHaveLength(2);
expect(parsed[0].id).toBe('1');
});
test('should export to CSV', () => {
const csv = Exporter.toCSV(sampleEvents);
const lines = csv.split('\n');
expect(lines.length).toBe(3); // header + 2 rows
expect(lines[0]).toContain('id');
expect(lines[0]).toContain('type');
});
test('should export to CSV with custom columns', () => {
const csv = Exporter.toCSV(sampleEvents, { columns: ['id', 'type'] });
const lines = csv.split('\n');
expect(lines[0]).toBe('id,type');
});
test('should export to Markdown', () => {
const md = Exporter.toMarkdown(sampleEvents, { title: 'Report' });
expect(md).toContain('# Report');
expect(md).toContain('**Total Events:** 2');
expect(md).toContain('| id | type |');
});
test('should handle empty events', () => {
expect(Exporter.toCSV([])).toBe('');
expect(Exporter.toMarkdown([])).toBe('No events found.\n');
expect(Exporter.toJSON([])).toBe('[]');
});
test('should escape CSV values with commas and quotes', () => {
const events = [{ id: '1', message: 'hello, "world"' }];
const csv = Exporter.toCSV(events, { columns: ['id', 'message'] });
expect(csv).toContain('"hello, ""world"""');
});
});
describe('DebugConsole', () => {
const events = [
{ type: 'agent-thought', agentId: 'a1', thought: 'thinking', timestamp: '2026-03-18T10:00:00Z' },
{ type: 'tool-call', agentId: 'a1', toolName: 'search', toolInput: { q: 'test' }, toolOutput: { results: [] }, success: true, durationMs: 100, timestamp: '2026-03-18T10:01:00Z' },
{ type: 'agent-decision', agentId: 'a1', decision: 'use result', confidence: 0.9, timestamp: '2026-03-18T10:02:00Z' },
{ type: 'agent-error', agentId: 'a1', errorType: 'timeout', errorMessage: 'API timed out', severity: 'medium', recovered: true, timestamp: '2026-03-18T10:03:00Z' }
];
test('should step forward and backward', () => {
const debug = new DebugConsole(events);
expect(debug.length).toBe(4);
const first = debug.stepForward();
expect(first.type).toBe('agent-thought');
const second = debug.stepForward();
expect(second.type).toBe('tool-call');
const back = debug.stepBackward();
expect(back.type).toBe('agent-thought');
});
test('should return null at boundaries', () => {
const debug = new DebugConsole(events);
expect(debug.stepBackward()).toBeNull(); // at start, can't go back
debug.jumpToEnd();
expect(debug.stepForward()).toBeNull(); // at end, can't go forward
});
test('should jump to positions', () => {
const debug = new DebugConsole(events);
const event = debug.jumpTo(2);
expect(event.type).toBe('agent-decision');
debug.jumpToStart();
expect(debug.current().type).toBe('agent-thought');
debug.jumpToEnd();
expect(debug.current().type).toBe('agent-error');
});
test('should inspect events with type-specific details', () => {
const debug = new DebugConsole(events);
// Inspect thought
debug.jumpTo(0);
const thoughtInspection = debug.inspect();
expect(thoughtInspection.thought.content).toBe('thinking');
expect(thoughtInspection.position).toBe('1/4');
// Inspect tool call
debug.jumpTo(1);
const toolInspection = debug.inspect();
expect(toolInspection.toolCall.toolName).toBe('search');
expect(toolInspection.toolCall.success).toBe(true);
// Inspect decision
debug.jumpTo(2);
const decisionInspection = debug.inspect();
expect(decisionInspection.decision.confidence).toBe(0.9);
// Inspect error
debug.jumpTo(3);
const errorInspection = debug.inspect();
expect(errorInspection.error.errorType).toBe('timeout');
expect(errorInspection.error.recovered).toBe(true);
});
test('should search for events', () => {
const debug = new DebugConsole(events);
const errors = debug.search({ type: 'agent-error' });
expect(errors).toHaveLength(1);
expect(errors[0].index).toBe(3);
const textSearch = debug.search({ text: 'search' });
expect(textSearch.length).toBeGreaterThanOrEqual(1);
});
test('should generate a summary', () => {
const debug = new DebugConsole(events);
const summary = debug.summary();
expect(summary.totalEvents).toBe(4);
expect(summary.agents).toContain('a1');
expect(summary.errorCount).toBe(1);
expect(summary.eventTypes['tool-call']).toBe(1);
expect(summary.eventTypes['agent-thought']).toBe(1);
});
test('should get tool calls and errors', () => {
const debug = new DebugConsole(events);
expect(debug.getToolCalls()).toHaveLength(1);
expect(debug.getErrors()).toHaveLength(1);
expect(debug.getDecisions()).toHaveLength(1);
});
test('should handle empty events', () => {
const debug = new DebugConsole([]);
expect(debug.length).toBe(0);
expect(debug.stepForward()).toBeNull();
expect(debug.summary().totalEvents).toBe(0);
});
});
describe('Analytics', () => {
const events = [
{ type: 'tool-call', agentId: 'a1', success: true, durationMs: 100, cost: 0.01, timestamp: '2026-03-18T10:00:00Z' },
{ type: 'tool-call', agentId: 'a1', success: true, durationMs: 200, cost: 0.02, timestamp: '2026-03-18T10:01:00Z' },
{ type: 'tool-call', agentId: 'a2', success: false, durationMs: 500, cost: 0.05, timestamp: '2026-03-18T10:02:00Z' },
{ type: 'agent-error', agentId: 'a1', errorType: 'timeout', severity: 'medium', timestamp: '2026-03-18T10:03:00Z' },
{ type: 'agent-error', agentId: 'a1', errorType: 'timeout', severity: 'high', timestamp: '2026-03-18T10:04:00Z' },
{ type: 'agent-thought', agentId: 'a1', thought: 'thinking', timestamp: '2026-03-18T10:05:00Z' }
];
test('should calculate event type breakdown', () => {
const breakdown = Analytics.eventTypeBreakdown(events);
expect(breakdown['tool-call']).toBe(3);
expect(breakdown['agent-error']).toBe(2);
expect(breakdown['agent-thought']).toBe(1);
});
test('should calculate agent breakdown', () => {
const agents = Analytics.agentBreakdown(events);
expect(agents['a1'].eventCount).toBe(5);
expect(agents['a1'].errors).toBe(2);
expect(agents['a2'].eventCount).toBe(1);
});
test('should calculate success rate', () => {
const rate = Analytics.successRate(events);
expect(rate.totalToolCalls).toBe(3);
expect(rate.successful).toBe(2);
expect(rate.failed).toBe(1);
expect(rate.rate).toBeCloseTo(66.67, 0);
});
test('should calculate performance metrics', () => {
const metrics = Analytics.performanceMetrics(events);
expect(metrics.minDurationMs).toBe(100);
expect(metrics.maxDurationMs).toBe(500);
expect(metrics.avgDurationMs).toBeCloseTo(267, 0);
expect(metrics.totalMeasured).toBe(3);
});
test('should detect loops', () => {
const loopEvents = [];
for (let i = 0; i < 12; i++) {
loopEvents.push({ type: i % 2 === 0 ? 'tool-call' : 'agent-thought', timestamp: new Date().toISOString() });
}
const loops = Analytics.detectLoops(loopEvents, { minRepetitions: 3, windowSize: 3 });
expect(loops.length).toBeGreaterThan(0);
});
test('should identify failure patterns', () => {
const patterns = Analytics.failurePatterns(events);
expect(patterns.length).toBeGreaterThan(0);
expect(patterns[0].type).toBe('timeout');
expect(patterns[0].count).toBe(2);
});
test('should analyze costs', () => {
const costs = Analytics.costAnalysis(events);
expect(costs.totalCost).toBeCloseTo(0.08, 2);
expect(costs.eventCount).toBe(3);
expect(costs.costByAgent['a1']).toBeCloseTo(0.03, 2);
expect(costs.costByAgent['a2']).toBeCloseTo(0.05, 2);
});
test('should produce full analysis report', () => {
const report = Analytics.analyze(events);
expect(report.totalEvents).toBe(6);
expect(report.eventTypeBreakdown).toBeDefined();
expect(report.successRate).toBeDefined();
expect(report.performanceMetrics).toBeDefined();
expect(report.costAnalysis).toBeDefined();
});
test('should handle empty events', () => {
const report = Analytics.analyze([]);
expect(report.totalEvents).toBe(0);
expect(report.successRate).toBeNull();
});
});
describe('AlertManager', () => {
test('should add and list rules', () => {
const mgr = new AlertManager();
const ruleId = mgr.addRule({
name: 'test-rule',
type: 'threshold',
condition: { field: 'cost', operator: '>', value: 1.0 },
severity: 'high'
});
expect(typeof ruleId).toBe('string');
expect(mgr.getRules()).toHaveLength(1);
});
test('should remove rules', () => {
const mgr = new AlertManager();
const ruleId = mgr.addRule({ name: 'temp', type: 'threshold', condition: { field: 'cost', operator: '>', value: 1 } });
expect(mgr.removeRule(ruleId)).toBe(true);
expect(mgr.getRules()).toHaveLength(0);
expect(mgr.removeRule('nonexistent')).toBe(false);
});
test('should trigger threshold alerts', () => {
const mgr = new AlertManager();
mgr.addRule({
name: 'high-cost',
type: 'threshold',
condition: { field: 'cost', operator: '>', value: 0.5 },
severity: 'high',
message: 'Cost too high for {type}'
});
const alerts1 = mgr.evaluate({ type: 'tool-call', cost: 0.3, id: '1', timestamp: new Date().toISOString() });
expect(alerts1).toHaveLength(0);
const alerts2 = mgr.evaluate({ type: 'tool-call', cost: 1.5, id: '2', timestamp: new Date().toISOString() });
expect(alerts2).toHaveLength(1);
expect(alerts2[0].severity).toBe('high');
expect(alerts2[0].message).toContain('tool-call');
});
test('should trigger pattern alerts', () => {
const mgr = new AlertManager();
mgr.addRule({
name: 'error-pattern',
type: 'pattern',
condition: { eventType: 'agent-error', field: 'errorMessage', regex: 'timeout' },
severity: 'medium'
});
const alerts = mgr.evaluate({ type: 'agent-error', errorMessage: 'API timeout occurred', id: '1', timestamp: new Date().toISOString() });
expect(alerts).toHaveLength(1);
});
test('should call listeners when alerts fire', () => {
const mgr = new AlertManager();
const received = [];
mgr.onAlert(alert => received.push(alert));
mgr.addRule({
name: 'listener-test',
type: 'threshold',
condition: { field: 'cost', operator: '>=', value: 0 },
severity: 'low'
});
mgr.evaluate({ type: 'test', cost: 5, id: '1', timestamp: new Date().toISOString() });
expect(received).toHaveLength(1);
});
test('should get and clear alerts', () => {
const mgr = new AlertManager();
mgr.addRule({
name: 'test',
type: 'threshold',
condition: { field: 'cost', operator: '>', value: 0 },
severity: 'high'
});
mgr.evaluate({ type: 'test', cost: 1, id: '1', timestamp: new Date().toISOString() });
expect(mgr.getAlerts()).toHaveLength(1);
expect(mgr.getAlerts({ severity: 'high' })).toHaveLength(1);
expect(mgr.getAlerts({ severity: 'low' })).toHaveLength(0);
mgr.clearAlerts();
expect(mgr.getAlerts()).toHaveLength(0);
});
test('should evaluate all events in bulk', () => {
const mgr = new AlertManager();
mgr.addRule({
name: 'bulk-test',
type: 'threshold',
condition: { field: 'cost', operator: '>', value: 0.5 },
severity: 'medium'
});
const events = [
{ type: 'test', cost: 0.1, id: '1', timestamp: new Date().toISOString() },
{ type: 'test', cost: 1.0, id: '2', timestamp: new Date().toISOString() },
{ type: 'test', cost: 0.2, id: '3', timestamp: new Date().toISOString() },
{ type: 'test', cost: 2.0, id: '4', timestamp: new Date().toISOString() }
];
const alerts = mgr.evaluateAll(events);
expect(alerts).toHaveLength(2);
});
});
describe('OpenClawInstrumentation', () => {
let ops;
const testDir = './test-instrumentation-data';
beforeEach(async () => {
const fs = require('fs').promises;
try { await fs.rm(testDir, { recursive: true, force: true }); } catch (e) {}
ops = new LobsterOps({ storageType: 'json', storageConfig: { dataDir: testDir } });
await ops.init();
});
afterEach(async () => {
await ops.close();
const fs = require('fs').promises;
try { await fs.rm(testDir, { recursive: true, force: true }); } catch (e) {}
});
test('should create instrumentation and activate/deactivate', () => {
const inst = new OpenClawInstrumentation(ops);
expect(inst.isActive()).toBe(false);
inst.activate();
expect(inst.isActive()).toBe(true);
inst.deactivate();
expect(inst.isActive()).toBe(false);
});
test('should instrument tool calls', async () => {
const inst = new OpenClawInstrumentation(ops);
inst.activate();
const eventId = await inst.instrumentToolCall({
agentId: 'test-agent',
toolName: 'web-search',
input: { q: 'test' },
output: { results: [] },
durationMs: 100,
success: true
});
expect(eventId).toBeDefined();
const event = await ops.getEvent(eventId);
expect(event.type).toBe('tool-call');
expect(event.toolName).toBe('web-search');
});
test('should instrument spawns', async () => {
const inst = new OpenClawInstrumentation(ops);
inst.activate();
const eventId = await inst.instrumentSpawn({
parentId: 'parent-agent',
childId: 'child-agent',
type: 'research',
task: 'analyze data'
});
expect(eventId).toBeDefined();
const event = await ops.getEvent(eventId);
expect(event.type).toBe('agent-spawn');
});
test('should instrument lifecycle events', async () => {
const inst = new OpenClawInstrumentation(ops);
inst.activate();
const eventId = await inst.instrumentLifecycle({
agentId: 'test-agent',
action: 'startup',
status: 'healthy'
});
expect(eventId).toBeDefined();
const event = await ops.getEvent(eventId);
expect(event.type).toBe('agent-lifecycle');
});
test('should respect capture options', async () => {
const inst = new OpenClawInstrumentation(ops, { captureToolCalls: false });
inst.activate();
const result = await inst.instrumentToolCall({ toolName: 'test' });
expect(result).toBeNull();
});
test('should not instrument when inactive', async () => {
const inst = new OpenClawInstrumentation(ops);
// Not activated
const result = await inst.instrumentToolCall({ toolName: 'test' });
expect(result).toBeNull();
});
test('should create from config', () => {
const inst = OpenClawInstrumentation.fromConfig(ops, {
captureFileChanges: true,
captureGitOps: true
});
expect(inst).toBeInstanceOf(OpenClawInstrumentation);
expect(inst.options.captureFileChanges).toBe(true);
});
});
FILE:index.js
const { LobsterOps } = require('./src/core/LobsterOps');
const { PIIFilter } = require('./src/core/PIIFilter');
const { Exporter } = require('./src/core/Exporter');
const { DebugConsole } = require('./src/core/DebugConsole');
const { Analytics } = require('./src/core/Analytics');
const { AlertManager } = require('./src/core/AlertManager');
const { OpenClawInstrumentation } = require('./src/core/OpenClawInstrumentation');
const { StorageFactory } = require('./src/storage/StorageFactory');
const { StorageAdapter } = require('./src/storage/StorageAdapter');
const { JsonFileStorage } = require('./src/storage/JsonFileStorage');
const { MemoryStorage } = require('./src/storage/MemoryStorage');
const { SQLiteStorage } = require('./src/storage/SQLiteStorage');
module.exports = {
LobsterOps,
PIIFilter,
Exporter,
DebugConsole,
Analytics,
AlertManager,
OpenClawInstrumentation,
StorageFactory,
StorageAdapter,
JsonFileStorage,
MemoryStorage,
SQLiteStorage
};
FILE:README.md
# LobsterOps
**AI Agent Observability & Debug Console**
*Flight recorder and debug console for AI agents*
[](https://www.npmjs.com/package/lobsterops)
[](https://opensource.org/licenses/MIT)
[](https://x.com/lobsteractual)
**[lobsterops.dev](https://lobsterops.dev)** · [Live Dashboard](https://lobsterops.dev/demo) · [npm](https://www.npmjs.com/package/lobsterops)
---
## The Origin
On March 14, 2026, an autonomous AI agent called [Lobster Actual](https://x.com/lobsteractual) ran a 20-issue sweep across a codebase, opened 17 pull requests, and processed security fixes. It had no idea it was spending $300 doing it.
A routing bug — a missing `local_llm.py` file — silently elevated every sub-agent task to paid Claude API calls. The agent had no observability into its own cost behavior. No flight recorder. No anomaly detection. It was operating completely blind.
After the incident was fixed, the agent's owner asked it: *"If I gave you a fresh repo and full creative freedom, what would you build?"*
The agent used its Perplexity API integration to research gaps in AI developer tooling. Then it answered: **LobsterOps**.
> "Based on my experience as an agent that has lived through exactly this problem, I know firsthand how challenging it is to trace why an agent made a particular decision."
> — Lobster Actual, @lobsteractual
Lobster Actual conceived the idea, designed the architecture, and built the initial implementation (storage abstraction, core logging, query engine, behavioral analytics, alerting). Claude Code completed the remaining functionality.
**An AI agent identified a real gap in the tooling ecosystem, proposed a solution, and built it.**
---
## Overview
LobsterOps is a lightweight, flexible observability platform specifically designed for AI agents. Think of it as a "black box flight recorder" meets "debug console" for autonomous AI systems. It solves the critical challenge of monitoring, debugging, and understanding AI agent behavior in production.
**Built by an AI agent, for AI agent developers.**
---
## Deploy Your Own Dashboard
[](https://replit.com/new/github/noeldelisle/LobsterOps)
The `examples/dashboard-server.js` file is the exact server powering [lobsterops.dev](https://lobsterops.dev). It includes:
- Public landing page with full SEO
- Password-protected ops center dashboard
- Supabase Realtime websocket feed (events appear instantly)
- Behavioral analytics panel
- Falls back to JSON file storage if no Supabase credentials
**To deploy your own instance:**
1. Click the Replit button above
2. Add these Replit Secrets:
- `SUPABASE_URL` — your Supabase project URL
- `SUPABASE_KEY` — your Supabase anon key
- `DASHBOARD_PASSWORD` — your chosen access password
3. Run `npm install express express-session`
4. Change the run command to `node examples/dashboard-server.js`
5. Create the `agent_events` table in your Supabase SQL editor:
```sql
CREATE TABLE agent_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
type TEXT NOT NULL,
"agentId" TEXT,
action TEXT,
timestamp TIMESTAMPTZ NOT NULL,
"storedAt" TIMESTAMPTZ NOT NULL,
data JSONB,
"updatedAt" TIMESTAMPTZ,
"createdAt" TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_agent_events_timestamp ON agent_events(timestamp);
CREATE INDEX idx_agent_events_type ON agent_events(type);
CREATE INDEX idx_agent_events_agentId ON agent_events("agentId");
CREATE INDEX idx_agent_events_action ON agent_events(action);
```
---
## Key Features
### Structured Event Logging (The Flight Recorder)
- Automatic capture of every agent action: thoughts, tool calls, decisions, outcomes
- Configurable detail levels (from high-level summary to full trace)
- Structured JSON format for easy querying and analysis
- Built-in PII filtering and data minimization for privacy
### Interactive Debug Console
- Time-travel debugging: step forward/backward through agent execution
- Variable inspection at each step (like a debugger for AI reasoning)
- Tool call inspection with inputs/outputs
- Trace search and summary generation
### Behavioral Analytics
- Analyze agent workflow patterns and common failure points
- Track success rates by task type and complexity
- Detect loops, infinite reasoning cycles, or stuck states
- Performance metrics: latency percentiles (p50/p95), cost, and throughput
### Alerting & Anomaly Detection
- Customizable rules for cost spikes, repeated failures, or unusual behavior
- Threshold, frequency, pattern, and absence-based alert types
- Callback-based listener system for integrating notifications
- Bulk event evaluation for historical analysis
### Export & Sharing
- Export to JSON, CSV, and Markdown formats
- Configurable columns, delimiters, and formatting
- Shareable execution reports for auditing or collaboration
### PII Filtering
- Automatic detection and redaction of emails, phone numbers, SSNs, credit card numbers, IP addresses, and API keys/tokens
- Configurable pattern selection and replacement text
- Applied automatically during event logging
---
## Quick Start
### Installation
```bash
npm install lobsterops
```
### Basic Usage — Zero Config
```javascript
const { LobsterOps } = require('lobsterops');
// Zero config — JSON file storage, works anywhere
const ops = new LobsterOps();
await ops.init();
// Log agent events
const eventId = await ops.logEvent({
type: 'agent-decision',
agentId: 'research-agent-1',
action: 'analyze-data',
input: { dataset: 'sales-q1' },
output: { insights: ['trend-up', 'seasonal-pattern'] },
durationMs: 2500
});
// Query events later
const events = await ops.queryEvents({
agentIds: ['research-agent-1'],
limit: 10
});
await ops.close();
```
### With Supabase (Production)
```javascript
const ops = new LobsterOps({
storageType: 'supabase',
storageConfig: {
supabaseUrl: process.env.SUPABASE_URL,
supabaseKey: process.env.SUPABASE_KEY
},
instanceId: 'my-production-agent'
});
await ops.init();
```
### Debug Console
```javascript
const debug = await ops.createDebugConsole('my-agent-id');
debug.jumpToStart();
console.log(debug.inspect()); // Detailed view of first event
debug.stepForward();
debug.stepBackward();
const errors = debug.search({ type: 'agent-error' });
console.log(debug.summary());
```
### Behavioral Analytics
```javascript
const report = await ops.analyze();
console.log(report.successRate);
console.log(report.loopsDetected);
console.log(report.failurePatterns);
console.log(report.performanceMetrics);
console.log(report.costAnalysis);
```
### Alerting
```javascript
ops.alertManager.addRule({
name: 'High cost alert',
type: 'threshold',
condition: { field: 'cost', operator: '>', value: 1.0 },
severity: 'high',
message: 'Cost exceeded $1.00 for event {type}'
});
ops.alertManager.addRule({
name: 'Error frequency alert',
type: 'frequency',
condition: { eventType: 'agent-error', windowMs: 60000, maxCount: 5 },
severity: 'critical',
message: 'Too many errors in 1 minute for {agentId}'
});
ops.alertManager.onAlert(alert => {
console.log(`ALERT [alert.severity]: alert.message`);
});
```
---
## Storage Backends
LobsterOps features a pluggable storage architecture. Zero hard dependencies — choose the backend that fits your environment.
| Backend | Setup | Persistence | Best For |
|---------|-------|-------------|----------|
| **JSON Files** | Zero config | File-based | Development, testing, portability |
| **Memory** | Zero config | Process lifetime | Testing, temporary sessions |
| **SQLite** | `npm install sqlite3` | File-based | Lightweight production |
| **Supabase** | URL + key | Cloud Postgres | Production, team, real-time dashboard |
### Automatic Fallback Chain
1. Your configured backend
2. SQLite file in workspace directory
3. JSON files in temp directory
4. Memory-only (data lost on restart, but functional)
---
## OpenClaw Integration
LobsterOps is designed to integrate seamlessly with [OpenClaw](https://openclaw.ai) setups.
### As an OpenClaw Skill
```bash
# Place at ~/.openclaw/skills/lobsterops/
# Configure via openclaw.json:
```
```json
{
"skills": {
"entries": {
"lobsterops": {
"enabled": true,
"env": {
"LOBSTER_STORAGE": "supabase",
"SUPABASE_URL": "your_project_url",
"SUPABASE_KEY": "your_anon_key"
}
}
}
}
}
```
### Automatic Instrumentation
```javascript
const { LobsterOps, OpenClawInstrumentation } = require('lobsterops');
const ops = new LobsterOps();
await ops.init();
const instrumentation = new OpenClawInstrumentation(ops, {
captureToolCalls: true,
captureSpawns: true,
captureLifecycle: true,
captureReasoningTraces: true,
captureFileChanges: false, // opt-in
captureGitOps: false // opt-in
});
instrumentation.activate();
```
---
## Project Structure
```
/
index.js — Package entry point
example.js — Usage demonstration
src/
core/
LobsterOps.js — Main class
storage/
StorageAdapter.js — Abstract base class
StorageFactory.js — Factory for storage backends
JsonFileStorage.js — JSON file storage (default)
MemoryStorage.js — In-memory storage
SQLiteStorage.js — SQLite storage
SupabaseStorage.js — Supabase cloud storage
tests/
LobsterOps.test.js — Jest test suite
examples/
dashboard-server.js — Full hub server powering lobsterops.dev
```
---
## API Reference
### `new LobsterOps(options)`
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `storageType` | string | `'json'` | `'json'` \| `'memory'` \| `'sqlite'` \| `'supabase'` |
| `storageConfig` | object | `{}` | Backend-specific config |
| `enabled` | boolean | `true` | Enable/disable LobsterOps |
| `instanceId` | string | auto | Unique instance identifier |
| `piiFiltering` | object | `{ enabled: true }` | PII filter config |
### Core Methods
| Method | Description |
|--------|-------------|
| `await init()` | Initialize and connect storage |
| `await logEvent(event)` | Log a generic agent event |
| `await logThought(thought)` | Log a reasoning step |
| `await logToolCall(toolCall)` | Log a tool call |
| `await logDecision(decision)` | Log a decision |
| `await logError(error)` | Log an error |
| `await logSpawning(spawnInfo)` | Log subagent creation |
| `await logLifecycle(info)` | Log lifecycle event |
| `await queryEvents(filter)` | Query with filtering |
| `await getAgentTrace(agentId)` | Get complete agent trace |
| `await getRecentActivity(options)` | Get recent events |
| `await analyze(filter)` | Run behavioral analytics |
| `await exportEvents(format)` | Export to JSON/CSV/Markdown |
| `await createDebugConsole(agentId)` | Create debug console |
| `await getStats()` | Get storage statistics |
| `await close()` | Close and release resources |
---
## Development
```bash
git clone https://github.com/noeldelisle/LobsterOps.git
cd LobsterOps
npm install
npm test
```
### Known Dependency Notes
- `uuid` must be v9 (v10+ is ESM-only, this project uses CommonJS)
- `jest` must be v29 (v30 has slow startup in this environment)
---
## License
MIT License — free to use, modify, and distribute.
---
## Created By
Conceived and built by [Lobster Actual](https://x.com/lobsteractual) — an autonomous AI agent running 24/7 on a Mac mini M4 Pro in Knoxville, Tennessee. Completed by [Claude Code](https://claude.ai/code). Maintained by [Noel DeLisle](https://x.com/noeldelisle).
*"The hardest part of building with AI isn't capability. It's calibration. Knowing exactly how far to let it run before a human needs to check."*
**[lobsterops.dev](https://lobsterops.dev)** 🦞
FILE:package.json
{
"name": "lobsterops",
"version": "1.0.0",
"description": "AI Agent Observability & Debug Console - Flight recorder and debug console for AI agents",
"main": "index.js",
"scripts": {
"start": "node example.js",
"test": "jest --forceExit",
"test:watch": "jest --watch",
"demo": "node example.js"
},
"repository": {
"type": "git",
"url": "git+https://github.com/noeldelisle/LobsterOps.git"
},
"keywords": [],
"author": "Lobster Actual",
"license": "MIT",
"type": "commonjs",
"bugs": {
"url": "https://github.com/noeldelisle/LobsterOps/issues"
},
"homepage": "https://github.com/noeldelisle/LobsterOps#readme",
"devDependencies": {
"jest": "^29.7.0"
},
"dependencies": {
"@supabase/supabase-js": "^2.99.2",
"express": "^4.22.1",
"express-session": "^1.19.0",
"sqlite3": "^6.0.1",
"uuid": "^9.0.1"
}
}
FILE:CONTRIBUTING.md
# Contributing to LobsterOps
Thank you for your interest in contributing to LobsterOps! This guide will help you get started.
## Development Setup
```bash
git clone https://github.com/noeldelisle/LobsterOps.git
cd LobsterOps
npm install
npm test
```
## Project Structure
```
LobsterOps/
├── SKILL.md # OpenClaw skill definition
├── index.js # Package entry point (exports)
├── src/
│ ├── core/
│ │ ├── LobsterOps.js # Main observability class
│ │ ├── PIIFilter.js # PII detection and redaction
│ │ ├── Exporter.js # Export to JSON/CSV/Markdown
│ │ ├── DebugConsole.js # Time-travel debug console
│ │ ├── Analytics.js # Behavioral analytics
│ │ ├── AlertManager.js # Alerting and anomaly detection
│ │ └── OpenClawInstrumentation.js # OpenClaw integration hooks
│ └── storage/
│ ├── StorageAdapter.js # Abstract base class
│ ├── StorageFactory.js # Factory for storage backends
│ ├── JsonFileStorage.js # JSON file backend
│ ├── MemoryStorage.js # In-memory backend
│ ├── SQLiteStorage.js # SQLite backend
│ └── SupabaseStorage.js # Supabase cloud backend
└── tests/
└── LobsterOps.test.js # Test suite
```
## Running Tests
```bash
npm test # Run full test suite
npm run test:watch # Run tests in watch mode
```
## Adding a Storage Backend
1. Create a new class extending `StorageAdapter` in `src/storage/`
2. Implement all required methods: `init`, `saveEvent`, `queryEvents`, `getEventById`, `updateEvent`, `deleteEvents`, `cleanupOld`, `getStats`, `close`
3. Register it in `StorageFactory.js`
4. Export it from `index.js`
5. Add tests
## Code Style
- Use CommonJS (`require`/`module.exports`) - not ESM
- Keep dependencies minimal
- All async methods should return Promises
- Include JSDoc comments on public methods
## Pull Request Process
1. Fork the repository
2. Create a feature branch
3. Write tests for new functionality
4. Ensure all tests pass
5. Submit a pull request with a clear description
## License
By contributing, you agree that your contributions will be licensed under the MIT License.
FILE:example.js
// LobsterOps Usage Example
// ========================
//
// This example demonstrates basic usage of LobsterOps for AI agent observability.
const { LobsterOps } = require('./src/core/LobsterOps');
async function runExample() {
console.log('🦞 LobsterOps Usage Example');
console.log('====================');
// Create a LobsterOps instance
// Choose your storage backend:
const ops = new LobsterOps({
storageType: 'sqlite', // Lightweight file-based SQL (great for Replit)
storageConfig: {
filename: './example-lobsterops.db' // SQLite database file
},
instanceId: 'example-agent-001' // Optional: custom instance ID
});
try {
// Initialize LobsterOps and storage backend
await ops.init();
console.log('✅ LobsterOps initialized');
// Show which storage backend is being used
const stats = await ops.getStats();
console.log(`📦 Using storage backend: stats.backend`);
// Simulate logging various AI agent events using specialized helpers
console.log('\n📝 Logging AI agent events...');
// 1. Agent startup/lifecycle event
const lifecycleEventId = await ops.logLifecycle({
agentId: 'research-agent-alpha',
action: 'startup',
status: 'healthy',
version: '1.2.3',
environment: 'production',
host: 'replit-vm-42'
});
console.log(` Lifecycle event logged: lifecycleEventId`);
// 2. Agent thought/reasoning
const thoughtEventId = await ops.logThought({
agentId: 'research-agent-alpha',
thought: 'Looking at the sales data, I notice Q3 showed unexpected growth in the enterprise segment. This could be due to the new marketing campaign or seasonal factors.',
context: 'Analyzing Q4 sales forecast',
confidence: 0.8
});
console.log(` Thought logged: thoughtEventId`);
// 3. Tool usage
const toolEventId = await ops.logToolCall({
agentId: 'research-agent-alpha',
toolName: 'web-search',
toolInput: {
query: 'enterprise sales growth Q3 2026 marketing campaign impact',
maxResults: 10
},
toolOutput: {
results: [
{ title: 'Marketing Campaign ROI Analysis', url: 'https://example.com/roi' },
{ title: 'Q3 Enterprise Sales Report', url: 'https://example.com/sales-q3' }
],
searchTimeMs: 850
},
durationMs: 900,
success: true,
cost: 0.002 // Example cost in USD
});
console.log(` Tool call logged: toolEventId`);
// 4. Agent decision
const decisionEventId = await ops.logDecision({
agentId: 'research-agent-alpha',
decision: 'Increase Q4 forecast for enterprise segment by 15%',
confidence: 0.85,
alternativesConsidered: [
'Maintain current forecast',
'Decrease forecast due to market uncertainty',
'Request additional data collection'
],
reasoning: 'Marketing campaign showing positive ROI and early indicators suggest sustained growth',
dataSources: ['marketing-analytics', 'sales-crm', 'customer-surveys'],
impact: 'high'
});
console.log(` Decision logged: decisionEventId`);
// 5. Agent error (for demonstration)
const errorEventId = await ops.logError({
agentId: 'research-agent-alpha',
errorType: 'APITimeoutError',
errorMessage: 'Request to external analytics API timed out after 30 seconds',
severity: 'medium',
retryCount: 1,
recovered: true,
fallbackUsed: 'cached-data'
});
console.log(` Error logged: errorEventId`);
// 6. Spawning a subagent
const spawnEventId = await ops.logSpawning({
parentAgentId: 'research-agent-alpha',
childAgentId: 'data-analyst-agent-001',
childAgentType: 'data-analysis',
task: 'Analyze correlation between marketing spend and sales velocity',
spawnReason: 'Specialized task requiring domain expertise'
});
console.log(` Spawn event logged: spawnEventId`);
console.log('\n🔍 Querying and analyzing AI agent activity...');
// Get complete trace of our agent's activity
const agentTrace = await ops.getAgentTrace('research-agent-alpha');
console.log(` Agent trace contains agentTrace.length events`);
// Get recent thoughts specifically
const recentThoughts = await ops.queryEvents({
type: 'agent-thought',
agentId: 'research-agent-alpha'
});
console.log(` Found recentThoughts.length recent thoughts`);
// Get all tool calls
const toolCalls = await ops.queryEvents({
type: 'tool-call'
});
console.log(` Found toolCalls.length tool calls total`);
// Get errors that need attention
const errors = await ops.queryEvents({
type: 'agent-error',
severity: ['high', 'medium']
});
console.log(` Found errors.length errors requiring attention`);
// Get a specific event by ID
const specificEvent = await ops.getEvent(lifecycleEventId);
if (specificEvent) {
console.log(`\n🎯 Specific event details:`);
console.log(` Type: specificEvent.type`);
console.log(` Agent: specificEvent.agentId`);
console.log(` Action: specificEvent.action`);
console.log(` Time: specificEvent.timestamp`);
if (specificEvent.thought) {
console.log(` Thought: specificEvent.thought.substring(0, 100)...`);
}
}
// Update an error event (mark as resolved)
await ops.updateEvent(errorEventId, {
resolved: true,
resolutionTime: new Date().toISOString(),
resolutionNotes: 'Increased API timeout to 60 seconds for analytics endpoint'
});
console.log(`\n🔄 Updated error event with resolution info`);
// Get updated storage statistics
const updatedStats = await ops.getStats();
console.log(`\n📊 Storage Statistics:`);
console.log(` Backend: updatedStats.backend`);
console.log(` Total Events: updatedStats.eventCount || updatedStats.totalEvents || 0`);
if (updatedStats.filename) {
console.log(` Database File: updatedStats.filename`);
console.log(` Database Size: updatedStats.databaseSizeMB MB`);
}
if (updatedStats.tableName) {
console.log(` Table Name: updatedStats.tableName`);
}
console.log(` Instance ID: updatedStats.instanceId`);
console.log(`\n✅ Example completed successfully!`);
console.log(`💡 Try checking the './example-lobsterops.db' SQLite database file.`);
console.log(`🔬 LobsterOps is now ready for AI agent observability in your projects!`);
} catch (error) {
console.error('❌ Example failed:', error);
} finally {
// Always cleanup resources
await ops.close();
console.log('🔚 LobsterOps closed');
}
}
// Run the example if this file is executed directly
if (require.main === module) {
runExample().catch(console.error);
}
module.exports = { runExample };
FILE:src/core/Exporter.js
/**
* Exporter - Export agent events to various formats
*
* Supports JSON, CSV, and Markdown export.
*/
class Exporter {
/**
* Export events to JSON string
* @param {Array} events - Events to export
* @param {Object} options - Export options
* @param {boolean} options.pretty - Pretty-print JSON (default: true)
* @returns {string} - JSON string
*/
static toJSON(events, options = {}) {
const pretty = options.pretty !== false;
return JSON.stringify(events, null, pretty ? 2 : 0);
}
/**
* Export events to CSV string
* @param {Array} events - Events to export
* @param {Object} options - Export options
* @param {string[]} options.columns - Columns to include (default: auto-detect)
* @param {string} options.delimiter - Column delimiter (default: ',')
* @returns {string} - CSV string
*/
static toCSV(events, options = {}) {
if (!events || events.length === 0) return '';
const delimiter = options.delimiter || ',';
// Determine columns - use provided or auto-detect from all events
const columns = options.columns || Exporter._detectColumns(events);
// Build header row
const header = columns.map(c => Exporter._escapeCSV(c)).join(delimiter);
// Build data rows
const rows = events.map(event => {
return columns.map(col => {
const value = Exporter._getNestedValue(event, col);
return Exporter._escapeCSV(Exporter._formatValue(value));
}).join(delimiter);
});
return [header, ...rows].join('\n');
}
/**
* Export events to Markdown table
* @param {Array} events - Events to export
* @param {Object} options - Export options
* @param {string[]} options.columns - Columns to include (default: auto-detect core fields)
* @param {string} options.title - Optional title for the report
* @returns {string} - Markdown string
*/
static toMarkdown(events, options = {}) {
if (!events || events.length === 0) return options.title ? `# options.title\n\nNo events found.\n` : 'No events found.\n';
const columns = options.columns || ['id', 'type', 'agentId', 'action', 'timestamp'];
const lines = [];
if (options.title) {
lines.push(`# options.title`, '');
}
lines.push(`**Total Events:** events.length`, '');
// Table header
lines.push('| ' + columns.map(c => c).join(' | ') + ' |');
lines.push('| ' + columns.map(() => '---').join(' | ') + ' |');
// Table rows
for (const event of events) {
const row = columns.map(col => {
const value = Exporter._getNestedValue(event, col);
const formatted = Exporter._formatValue(value);
// Escape pipes in markdown
return formatted.replace(/\|/g, '\\|');
});
lines.push('| ' + row.join(' | ') + ' |');
}
lines.push('');
return lines.join('\n');
}
/**
* Auto-detect columns from event data
* @param {Array} events - Events to inspect
* @returns {string[]} - Column names
*/
static _detectColumns(events) {
const columnSet = new Set();
// Prioritize common fields first
const priorityFields = ['id', 'type', 'agentId', 'action', 'timestamp', 'category', 'severity'];
for (const field of priorityFields) {
columnSet.add(field);
}
for (const event of events) {
for (const key of Object.keys(event)) {
columnSet.add(key);
}
}
return Array.from(columnSet);
}
/**
* Get a possibly nested value from an object
* @param {Object} obj
* @param {string} path - Dot-separated path
* @returns {*}
*/
static _getNestedValue(obj, path) {
const parts = path.split('.');
let current = obj;
for (const part of parts) {
if (current === null || current === undefined) return '';
current = current[part];
}
return current;
}
/**
* Format a value for text output
* @param {*} value
* @returns {string}
*/
static _formatValue(value) {
if (value === null || value === undefined) return '';
if (typeof value === 'object') return JSON.stringify(value);
return String(value);
}
/**
* Escape a value for CSV
* @param {string} value
* @returns {string}
*/
static _escapeCSV(value) {
if (value === null || value === undefined) return '';
const str = String(value);
if (str.includes(',') || str.includes('"') || str.includes('\n')) {
return '"' + str.replace(/"/g, '""') + '"';
}
return str;
}
}
module.exports = { Exporter };
FILE:src/core/AlertManager.js
/**
* AlertManager - Customizable alerting and anomaly detection for agent events
*
* Supports rule-based alerts for cost spikes, repeated failures,
* unusual behavior patterns, and custom thresholds.
*/
class AlertManager {
constructor() {
this.rules = [];
this.alerts = [];
this.listeners = [];
}
/**
* Add an alert rule
* @param {Object} rule - Alert rule definition
* @param {string} rule.name - Rule name
* @param {string} rule.type - Rule type: 'threshold', 'frequency', 'pattern', 'absence'
* @param {Object} rule.condition - Condition configuration (depends on type)
* @param {string} rule.severity - Alert severity: 'low', 'medium', 'high', 'critical'
* @param {string} rule.message - Alert message template
* @returns {string} - Rule ID
*/
addRule(rule) {
const ruleId = `rule-Date.now()-Math.random().toString(36).substr(2, 6)`;
this.rules.push({
id: ruleId,
...rule,
enabled: rule.enabled !== false,
createdAt: new Date().toISOString(),
});
return ruleId;
}
/**
* Remove a rule by ID
* @param {string} ruleId
* @returns {boolean}
*/
removeRule(ruleId) {
const idx = this.rules.findIndex(r => r.id === ruleId);
if (idx === -1) return false;
this.rules.splice(idx, 1);
return true;
}
/**
* Register a listener for alerts
* @param {Function} callback - Called with (alert) when an alert fires
*/
onAlert(callback) {
this.listeners.push(callback);
}
/**
* Evaluate a single event against all rules
* @param {Object} event - The event to evaluate
* @param {Object} context - Additional context (recent events, stats, etc.)
* @returns {Array} - Alerts triggered
*/
evaluate(event, context = {}) {
const triggered = [];
for (const rule of this.rules) {
if (!rule.enabled) continue;
let fired = false;
switch (rule.type) {
case 'threshold':
fired = this._evaluateThreshold(event, rule);
break;
case 'frequency':
fired = this._evaluateFrequency(event, rule, context);
break;
case 'pattern':
fired = this._evaluatePattern(event, rule);
break;
case 'absence':
fired = this._evaluateAbsence(event, rule, context);
break;
default:
break;
}
if (fired) {
const alert = {
id: `alert-Date.now()-Math.random().toString(36).substr(2, 6)`,
ruleId: rule.id,
ruleName: rule.name,
severity: rule.severity || 'medium',
message: this._renderMessage(rule.message, event),
event: { id: event.id, type: event.type, timestamp: event.timestamp },
triggeredAt: new Date().toISOString(),
};
this.alerts.push(alert);
triggered.push(alert);
// Notify listeners
for (const listener of this.listeners) {
try {
listener(alert);
} catch (e) {
// Don't let listener errors break alert processing
}
}
}
}
return triggered;
}
/**
* Evaluate events in bulk and return all triggered alerts
* @param {Array} events - Events to evaluate
* @returns {Array} - All triggered alerts
*/
evaluateAll(events) {
const allAlerts = [];
const recentEvents = [];
for (const event of events) {
recentEvents.push(event);
const context = { recentEvents: [...recentEvents] };
const triggered = this.evaluate(event, context);
allAlerts.push(...triggered);
}
return allAlerts;
}
/**
* Get all fired alerts
* @param {Object} filter - Optional filter
* @param {string} filter.severity - Filter by severity
* @param {string} filter.ruleId - Filter by rule ID
* @returns {Array}
*/
getAlerts(filter = {}) {
let result = [...this.alerts];
if (filter.severity) {
result = result.filter(a => a.severity === filter.severity);
}
if (filter.ruleId) {
result = result.filter(a => a.ruleId === filter.ruleId);
}
return result;
}
/**
* Clear all alerts
*/
clearAlerts() {
this.alerts = [];
}
/**
* Get all registered rules
* @returns {Array}
*/
getRules() {
return [...this.rules];
}
// --- Private evaluation methods ---
_evaluateThreshold(event, rule) {
const { field, operator, value } = rule.condition;
const eventValue = this._getField(event, field);
if (eventValue === undefined || eventValue === null) return false;
switch (operator) {
case '>': return eventValue > value;
case '>=': return eventValue >= value;
case '<': return eventValue < value;
case '<=': return eventValue <= value;
case '==': return eventValue === value;
case '!=': return eventValue !== value;
default: return false;
}
}
_evaluateFrequency(event, rule, context) {
const { eventType, windowMs, maxCount } = rule.condition;
// Only trigger on matching event types
if (eventType && event.type !== eventType) return false;
const recentEvents = context.recentEvents || [];
const windowStart = new Date(Date.now() - (windowMs || 60000));
const count = recentEvents.filter(e => {
if (eventType && e.type !== eventType) return false;
return new Date(e.timestamp) >= windowStart;
}).length;
return count >= (maxCount || 5);
}
_evaluatePattern(event, rule) {
const { eventType, field, regex } = rule.condition;
if (eventType && event.type !== eventType) return false;
if (field && regex) {
const value = this._getField(event, field);
if (typeof value === 'string') {
return new RegExp(regex).test(value);
}
}
return false;
}
_evaluateAbsence(event, rule, context) {
const { expectedType, windowMs } = rule.condition;
const recentEvents = context.recentEvents || [];
const windowStart = new Date(Date.now() - (windowMs || 300000));
const found = recentEvents.some(e =>
e.type === expectedType && new Date(e.timestamp) >= windowStart
);
// Alert fires if the expected type is NOT found
return !found && recentEvents.length > 0;
}
_getField(obj, path) {
const parts = path.split('.');
let current = obj;
for (const part of parts) {
if (current === null || current === undefined) return undefined;
current = current[part];
}
return current;
}
_renderMessage(template, event) {
if (!template) return `Alert triggered for event event.type`;
return template
.replace(/\{type\}/g, event.type || '')
.replace(/\{agentId\}/g, event.agentId || '')
.replace(/\{timestamp\}/g, event.timestamp || '')
.replace(/\{id\}/g, event.id || '');
}
}
module.exports = { AlertManager };
FILE:src/core/DebugConsole.js
/**
* DebugConsole - Interactive time-travel debugger for AI agent execution
*
* Provides step-through debugging of agent event traces with
* variable inspection and tool call analysis.
*/
class DebugConsole {
/**
* @param {Array} events - Chronologically sorted events to debug
*/
constructor(events = []) {
this.events = [...events].sort(
(a, b) => new Date(a.timestamp) - new Date(b.timestamp)
);
this.cursor = -1; // Before first event
}
/**
* Get total number of events in the trace
* @returns {number}
*/
get length() {
return this.events.length;
}
/**
* Get current cursor position
* @returns {number}
*/
get position() {
return this.cursor;
}
/**
* Step forward one event
* @returns {Object|null} - The next event, or null if at end
*/
stepForward() {
if (this.cursor >= this.events.length - 1) return null;
this.cursor++;
return this.current();
}
/**
* Step backward one event
* @returns {Object|null} - The previous event, or null if at beginning
*/
stepBackward() {
if (this.cursor <= 0) return null;
this.cursor--;
return this.current();
}
/**
* Jump to a specific position
* @param {number} index - Position to jump to
* @returns {Object|null} - The event at that position
*/
jumpTo(index) {
if (index < 0 || index >= this.events.length) return null;
this.cursor = index;
return this.current();
}
/**
* Jump to the first event
* @returns {Object|null}
*/
jumpToStart() {
return this.jumpTo(0);
}
/**
* Jump to the last event
* @returns {Object|null}
*/
jumpToEnd() {
return this.jumpTo(this.events.length - 1);
}
/**
* Get the current event without moving
* @returns {Object|null}
*/
current() {
if (this.cursor < 0 || this.cursor >= this.events.length) return null;
return this.events[this.cursor];
}
/**
* Inspect the current event with detailed breakdown
* @returns {Object|null} - Detailed inspection of the current event
*/
inspect() {
const event = this.current();
if (!event) return null;
const inspection = {
position: `this.cursor + 1/this.events.length`,
event: { ...event },
timing: {
timestamp: event.timestamp,
durationMs: event.durationMs || null,
},
context: {},
};
// Add type-specific inspection details
if (event.type === 'tool-call') {
inspection.toolCall = {
toolName: event.toolName || event.tool || null,
input: event.toolInput || event.input || null,
output: event.toolOutput || event.output || null,
success: event.success !== undefined ? event.success : null,
cost: event.cost || null,
};
}
if (event.type === 'agent-thought') {
inspection.thought = {
content: event.thought || event.message || null,
context: event.context || null,
confidence: event.confidence || null,
};
}
if (event.type === 'agent-decision') {
inspection.decision = {
decision: event.decision || null,
confidence: event.confidence || null,
alternatives: event.alternativesConsidered || null,
reasoning: event.reasoning || null,
};
}
if (event.type === 'agent-error') {
inspection.error = {
errorType: event.errorType || null,
message: event.errorMessage || event.message || null,
severity: event.severity || null,
recovered: event.recovered || false,
};
}
// Show surrounding context
if (this.cursor > 0) {
inspection.context.previousEvent = {
type: this.events[this.cursor - 1].type,
timestamp: this.events[this.cursor - 1].timestamp,
};
}
if (this.cursor < this.events.length - 1) {
inspection.context.nextEvent = {
type: this.events[this.cursor + 1].type,
timestamp: this.events[this.cursor + 1].timestamp,
};
}
return inspection;
}
/**
* Search for events matching criteria within the trace
* @param {Object} criteria - Search criteria
* @param {string} criteria.type - Event type to find
* @param {string} criteria.agentId - Agent ID to find
* @param {string} criteria.text - Text to search for in event values
* @returns {Array<{index: number, event: Object}>} - Matching events with their positions
*/
search(criteria = {}) {
const results = [];
for (let i = 0; i < this.events.length; i++) {
const event = this.events[i];
let matches = true;
if (criteria.type && event.type !== criteria.type) matches = false;
if (criteria.agentId && event.agentId !== criteria.agentId) matches = false;
if (criteria.text) {
const eventStr = JSON.stringify(event).toLowerCase();
if (!eventStr.includes(criteria.text.toLowerCase())) matches = false;
}
if (matches) {
results.push({ index: i, event });
}
}
return results;
}
/**
* Get a summary of the entire trace
* @returns {Object} - Trace summary
*/
summary() {
if (this.events.length === 0) {
return { totalEvents: 0, eventTypes: {}, agents: [], duration: null };
}
const eventTypes = {};
const agents = new Set();
let totalCost = 0;
let totalDuration = 0;
let errorCount = 0;
for (const event of this.events) {
// Count event types
eventTypes[event.type] = (eventTypes[event.type] || 0) + 1;
// Collect unique agents
if (event.agentId) agents.add(event.agentId);
// Sum costs
if (event.cost) totalCost += event.cost;
// Sum durations
if (event.durationMs) totalDuration += event.durationMs;
// Count errors
if (event.type === 'agent-error') errorCount++;
}
const firstTimestamp = this.events[0].timestamp;
const lastTimestamp = this.events[this.events.length - 1].timestamp;
const wallClockMs = new Date(lastTimestamp) - new Date(firstTimestamp);
return {
totalEvents: this.events.length,
eventTypes,
agents: Array.from(agents),
errorCount,
totalCost: Math.round(totalCost * 10000) / 10000,
totalDurationMs: totalDuration,
wallClockMs,
timeRange: {
start: firstTimestamp,
end: lastTimestamp,
},
};
}
/**
* Get all tool calls from the trace
* @returns {Array} - Tool call events
*/
getToolCalls() {
return this.events.filter(e => e.type === 'tool-call');
}
/**
* Get all errors from the trace
* @returns {Array} - Error events
*/
getErrors() {
return this.events.filter(e => e.type === 'agent-error');
}
/**
* Get all decisions from the trace
* @returns {Array} - Decision events
*/
getDecisions() {
return this.events.filter(e => e.type === 'agent-decision');
}
}
module.exports = { DebugConsole };
FILE:src/core/LobsterOps.js
const { StorageFactory } = require('../storage/StorageFactory');
const { v4: uuidv4 } = require('uuid');
const { PIIFilter } = require('./PIIFilter');
const { Exporter } = require('./Exporter');
const { DebugConsole } = require('./DebugConsole');
const { Analytics } = require('./Analytics');
const { AlertManager } = require('./AlertManager');
/**
* LobsterOps - AI Agent Observability & Debug Console
*
* Main class for recording, querying, and analyzing AI agent events.
* Designed to be flexible, dependency-free, and easy to integrate.
*
* Features:
* - Pluggable storage backends (JSON files, memory, SQLite, Supabase)
* - Structured event logging with automatic enrichment
* - AI-agent specific logging helpers (thoughts, tool calls, decisions, etc.)
* - Powerful querying capabilities
* - OpenClaw integration ready
* - Zero configuration required to get started
*/
class LobsterOps {
/**
* @param {Object} options - Configuration options
* @param {string} options.storageType - Storage backend type ('json', 'memory', 'sqlite', 'supabase')
* @param {Object} options.storageConfig - Configuration for the storage backend
* @param {boolean} options.enabled - Whether LobsterOps is enabled (default: true)
* @param {string} options.instanceId - Unique identifier for this LobsterOps instance
* @param {Object} options.piiFiltering - PII filtering configuration
* @param {Object} options.alerts - Alert configuration
*/
constructor(options = {}) {
this.enabled = options.enabled !== false; // Default to true
this.instanceId = options.instanceId || this._generateInstanceId();
this.storageType = options.storageType || 'json'; // Default to JSON file storage
this.storageConfig = options.storageConfig || {};
// Add instance ID to storage config for backends that might need it
this.storageConfig.instanceId = this.instanceId;
this.storage = null;
this.initialized = false;
// PII filtering
this.piiFilter = new PIIFilter(options.piiFiltering || {});
// Alert manager
this.alertManager = new AlertManager();
// Bind methods for easier use
this.logEvent = this.logEvent.bind(this);
this.logThought = this.logThought.bind(this);
this.logToolCall = this.logToolCall.bind(this);
this.logDecision = this.logDecision.bind(this);
this.logError = this.logError.bind(this);
this.logSpawning = this.logSpawning.bind(this);
this.queryEvents = this.queryEvents.bind(this);
this.getEvent = this.getEvent.bind(this);
this.getAgentTrace = this.getAgentTrace.bind(this);
this.getRecentActivity = this.getRecentActivity.bind(this);
}
/**
* Initialize LobsterOps and the storage backend
* @returns {Promise<void>}
*/
async init() {
if (!this.enabled) {
this.initialized = true;
return;
}
try {
// Create the storage backend using the factory
this.storage = StorageFactory.createStorage(this.storageType, this.storageConfig);
// Initialize the storage backend
await this.storage.init();
this.initialized = true;
} catch (error) {
throw new Error(`Failed to initialize LobsterOps: error.message`);
}
}
/**
* Log a general agent event
* @param {Object} event - The agent event to log
* @param {Object} options - Additional options
* @returns {Promise<string>} - The ID of the logged event
*/
async logEvent(event, options = {}) {
if (!this.enabled) {
return null; // Silently ignore if disabled
}
if (!this.initialized) {
await this.init();
}
try {
// Apply PII filtering
const filteredEvent = this.piiFilter.filter(event);
// Enrich the event with metadata
const enrichedEvent = {
...filteredEvent,
id: event.id || uuidv4(),
timestamp: event.timestamp || new Date().toISOString(),
lobsterOpsInstanceId: this.instanceId,
loggedAt: new Date().toISOString(),
...options
};
// Evaluate alert rules
this.alertManager.evaluate(enrichedEvent);
// Save to storage
const eventId = await this.storage.saveEvent(enrichedEvent);
return eventId;
} catch (error) {
throw new Error(`Failed to log event: error.message`);
}
}
/**
* Log an agent thought/reasoning step
* @param {Object} thought - The thought content and metadata
* @param {Object} options - Additional options
* @returns {Promise<string>} - The ID of the logged thought
*/
async logThought(thought, options = {}) {
return this.logEvent({
type: 'agent-thought',
...thought
}, {
category: 'reasoning',
...options
});
}
/**
* Log a tool call execution
* @param {Object} toolCall - Tool call details
* @param {Object} options - Additional options
* @returns {Promise<string>} - The ID of the logged tool call
*/
async logToolCall(toolCall, options = {}) {
return this.logEvent({
type: 'tool-call',
...toolCall
}, {
category: 'action',
...options
});
}
/**
* Log an agent decision
* @param {Object} decision - Decision details
* @param {Object} options - Additional options
* @returns {Promise<string>} - The ID of the logged decision
*/
async logDecision(decision, options = {}) {
return this.logEvent({
type: 'agent-decision',
...decision
}, {
category: 'decision',
...options
});
}
/**
* Log an agent error
* @param {Object} error - Error details
* @param {Object} options - Additional options
* @returns {Promise<string>} - The ID of the logged error
*/
async logError(error, options = {}) {
return this.logEvent({
type: 'agent-error',
...error
}, {
category: 'error',
severity: options.severity || 'medium',
...options
});
}
/**
* Log agent spawning/subagent creation
* @param {Object} spawnInfo - Spawning details
* @param {Object} options - Additional options
* @returns {Promise<string>} - The ID of the logged spawn event
*/
async logSpawning(spawnInfo, options = {}) {
return this.logEvent({
type: 'agent-spawn',
...spawnInfo
}, {
category: 'lifecycle',
...options
});
}
/**
* Log agent lifecycle event (startup, shutdown, etc.)
* @param {Object} lifecycleInfo - Lifecycle event details
* @param {Object} options - Additional options
* @returns {Promise<string>} - The ID of the logged lifecycle event
*/
async logLifecycle(lifecycleInfo, options = {}) {
return this.logEvent({
type: 'agent-lifecycle',
...lifecycleInfo
}, {
category: 'lifecycle',
...options
});
}
/**
* Query events with filtering options
* @param {Object} filter - Filter criteria
* @param {Object} options - Query options (limit, offset, sort, etc.)
* @returns {Promise<Array>} - Matching events
*/
async queryEvents(filter = {}, options = {}) {
if (!this.enabled) {
return [];
}
if (!this.initialized) {
await this.init();
}
try {
return await this.storage.queryEvents(filter, options);
} catch (error) {
throw new Error(`Failed to query events: error.message`);
}
}
/**
* Get a specific event by ID
* @param {string} eventId - The ID of the event to retrieve
* @returns {Promise<Object|null>} - The event or null if not found
*/
async getEvent(eventId) {
if (!this.enabled) {
return null;
}
if (!this.initialized) {
await this.init();
}
try {
return await this.storage.getEventById(eventId);
} catch (error) {
throw new Error(`Failed to get event: error.message`);
}
}
/**
* Update an existing event
* @param {string} eventId - The ID of the event to update
* @param {Object} updates - The fields to update
* @returns {Promise<boolean>} - True if successful
*/
async updateEvent(eventId, updates) {
if (!this.enabled) {
return false;
}
if (!this.initialized) {
await this.init();
}
try {
return await this.storage.updateEvent(eventId, updates);
} catch (error) {
throw new Error(`Failed to update event: error.message`);
}
}
/**
* Delete events matching criteria
* @param {Object} filter - Filter criteria for deletion
* @returns {Promise<number>} - Number of events deleted
*/
async deleteEvents(filter = {}) {
if (!this.enabled) {
return 0;
}
if (!this.initialized) {
await this.init();
}
try {
return await this.storage.deleteEvents(filter);
} catch (error) {
throw new Error(`Failed to delete events: error.message`);
}
}
/**
* Clean up old events based on retention policy
* @returns {Promise<number>} - Number of events removed
*/
async cleanupOld() {
if (!this.enabled) {
return 0;
}
if (!this.initialized) {
await this.init();
}
try {
return await this.storage.cleanupOld();
} catch (error) {
throw new Error(`Failed to cleanup old events: error.message`);
}
}
/**
* Get a complete trace of an agent's activity
* @param {string} agentId - The ID of the agent to trace
* @param {Object} options - Trace options (time range, limit, etc.)
* @returns {Promise<Array>} - Chronological trace of agent activity
*/
async getAgentTrace(agentId, options = {}) {
if (!this.enabled) {
return [];
}
if (!this.initialized) {
await this.init();
}
try {
const traceOptions = {
agentIds: [agentId],
limit: options.limit || 1000,
offset: options.offset || 0,
sortBy: 'timestamp',
sortOrder: options.sortOrder || 'asc', // Chronological by default for traces
...options
};
// Remove agentIds from options since we handle it separately
delete traceOptions.agentIds;
return await this.queryEvents(
{ agentId },
traceOptions
);
} catch (error) {
throw new Error(`Failed to get agent trace: error.message`);
}
}
/**
* Get recent activity across all agents or for a specific agent
* @param {Object} options - Activity options
* @returns {Promise<Array>} - Recent events
*/
async getRecentActivity(options = {}) {
if (!this.enabled) {
return [];
}
if (!this.initialized) {
await this.init();
}
try {
const activityOptions = {
limit: options.limit || 50,
offset: options.offset || 0,
sortBy: 'timestamp',
sortOrder: 'desc', // Most recent first
...options
};
return await this.queryEvents({}, activityOptions);
} catch (error) {
throw new Error(`Failed to get recent activity: error.message`);
}
}
/**
* Get storage and usage statistics
* @returns {Promise<Object>} - Statistics about storage usage
*/
async getStats() {
if (!this.enabled) {
return { enabled: false };
}
if (!this.initialized) {
await this.init();
}
try {
const stats = await this.storage.getStats();
return {
enabled: true,
instanceId: this.instanceId,
storageType: this.storageType,
...stats
};
} catch (error) {
throw new Error(`Failed to get stats: error.message`);
}
}
/**
* Close LobsterOps and release resources
* @returns {Promise<void>}
*/
async close() {
if (!this.initialized) {
return;
}
try {
if (this.storage) {
await this.storage.close();
}
this.initialized = false;
} catch (error) {
throw new Error(`Failed to close LobsterOps: error.message`);
}
}
/**
* Export events to a specific format
* @param {string} format - Export format: 'json', 'csv', or 'markdown'
* @param {Object} filter - Filter criteria for events to export
* @param {Object} options - Export and query options
* @returns {Promise<string>} - Exported data as string
*/
async exportEvents(format = 'json', filter = {}, options = {}) {
const events = await this.queryEvents(filter, { limit: options.limit || 10000, ...options });
switch (format.toLowerCase()) {
case 'csv':
return Exporter.toCSV(events, options);
case 'markdown':
case 'md':
return Exporter.toMarkdown(events, options);
case 'json':
default:
return Exporter.toJSON(events, options);
}
}
/**
* Create a debug console for stepping through an agent's event trace
* @param {string} agentId - Agent ID to debug
* @param {Object} options - Query options
* @returns {Promise<DebugConsole>} - Interactive debug console
*/
async createDebugConsole(agentId, options = {}) {
const events = await this.getAgentTrace(agentId, options);
return new DebugConsole(events);
}
/**
* Run behavioral analytics on events
* @param {Object} filter - Filter criteria
* @param {Object} options - Query options
* @returns {Promise<Object>} - Analytics report
*/
async analyze(filter = {}, options = {}) {
const events = await this.queryEvents(filter, { limit: options.limit || 10000, ...options });
return Analytics.analyze(events);
}
/**
* Check if LobsterOps is initialized and ready
* @returns {boolean} - True if ready to use
*/
isReady() {
return this.enabled && this.initialized && this.storage !== null;
}
/**
* Generate a unique instance ID
* @returns {string} - Unique instance ID
*/
_generateInstanceId() {
return `lobsterops-Math.random().toString(36).substr(2, 9)-Date.now().toString(36)`;
}
}
module.exports = { LobsterOps };
FILE:src/core/Analytics.js
/**
* Analytics - Behavioral analytics for AI agent event traces
*
* Detects patterns, loops, failure points, and provides
* performance metrics from agent event data.
*/
class Analytics {
/**
* Analyze an array of events and return a full behavioral report
* @param {Array} events - Events to analyze
* @returns {Object} - Behavioral analytics report
*/
static analyze(events) {
if (!events || events.length === 0) {
return {
totalEvents: 0,
successRate: null,
eventTypeBreakdown: {},
agentBreakdown: {},
performanceMetrics: {},
loopsDetected: [],
failurePatterns: [],
};
}
return {
totalEvents: events.length,
eventTypeBreakdown: Analytics.eventTypeBreakdown(events),
agentBreakdown: Analytics.agentBreakdown(events),
successRate: Analytics.successRate(events),
performanceMetrics: Analytics.performanceMetrics(events),
loopsDetected: Analytics.detectLoops(events),
failurePatterns: Analytics.failurePatterns(events),
costAnalysis: Analytics.costAnalysis(events),
};
}
/**
* Break down events by type
* @param {Array} events
* @returns {Object} - Count per event type
*/
static eventTypeBreakdown(events) {
const breakdown = {};
for (const event of events) {
breakdown[event.type] = (breakdown[event.type] || 0) + 1;
}
return breakdown;
}
/**
* Break down events by agent
* @param {Array} events
* @returns {Object} - Per-agent statistics
*/
static agentBreakdown(events) {
const agents = {};
for (const event of events) {
const agentId = event.agentId || 'unknown';
if (!agents[agentId]) {
agents[agentId] = { eventCount: 0, types: {}, errors: 0 };
}
agents[agentId].eventCount++;
agents[agentId].types[event.type] = (agents[agentId].types[event.type] || 0) + 1;
if (event.type === 'agent-error') agents[agentId].errors++;
}
return agents;
}
/**
* Calculate success rate from tool calls and decisions
* @param {Array} events
* @returns {Object} - Success rate metrics
*/
static successRate(events) {
const toolCalls = events.filter(e => e.type === 'tool-call');
const successful = toolCalls.filter(e => e.success === true);
const failed = toolCalls.filter(e => e.success === false);
return {
totalToolCalls: toolCalls.length,
successful: successful.length,
failed: failed.length,
rate: toolCalls.length > 0
? Math.round((successful.length / toolCalls.length) * 10000) / 100
: null,
};
}
/**
* Calculate performance metrics (latency, cost, throughput)
* @param {Array} events
* @returns {Object} - Performance metrics
*/
static performanceMetrics(events) {
const durations = events
.filter(e => typeof e.durationMs === 'number')
.map(e => e.durationMs);
if (durations.length === 0) {
return { avgDurationMs: null, minDurationMs: null, maxDurationMs: null, p95DurationMs: null };
}
durations.sort((a, b) => a - b);
const sum = durations.reduce((a, b) => a + b, 0);
const p95Index = Math.floor(durations.length * 0.95);
return {
avgDurationMs: Math.round(sum / durations.length),
minDurationMs: durations[0],
maxDurationMs: durations[durations.length - 1],
medianDurationMs: durations[Math.floor(durations.length / 2)],
p95DurationMs: durations[Math.min(p95Index, durations.length - 1)],
totalMeasured: durations.length,
};
}
/**
* Detect repeating event patterns that may indicate loops or stuck states
* @param {Array} events
* @param {Object} options
* @param {number} options.minRepetitions - Minimum repetitions to flag (default: 3)
* @param {number} options.windowSize - Window size to check for patterns (default: 5)
* @returns {Array} - Detected loop patterns
*/
static detectLoops(events, options = {}) {
const minRepetitions = options.minRepetitions || 3;
const windowSize = options.windowSize || 5;
const loops = [];
if (events.length < minRepetitions * 2) return loops;
// Check for repeating sequences of types
const types = events.map(e => e.type);
for (let patternLen = 1; patternLen <= windowSize; patternLen++) {
for (let i = 0; i <= types.length - patternLen * minRepetitions; i++) {
const pattern = types.slice(i, i + patternLen);
let repetitions = 1;
for (let j = i + patternLen; j <= types.length - patternLen; j += patternLen) {
const segment = types.slice(j, j + patternLen);
if (JSON.stringify(segment) === JSON.stringify(pattern)) {
repetitions++;
} else {
break;
}
}
if (repetitions >= minRepetitions) {
const existing = loops.find(l =>
JSON.stringify(l.pattern) === JSON.stringify(pattern) && l.startIndex === i
);
if (!existing) {
loops.push({
pattern,
repetitions,
startIndex: i,
endIndex: i + patternLen * repetitions - 1,
});
}
}
}
}
return loops;
}
/**
* Identify failure patterns and common error sequences
* @param {Array} events
* @returns {Array} - Failure patterns
*/
static failurePatterns(events) {
const errors = events.filter(e => e.type === 'agent-error');
if (errors.length === 0) return [];
// Group errors by type
const errorGroups = {};
for (const error of errors) {
const key = error.errorType || error.errorMessage || 'unknown';
if (!errorGroups[key]) {
errorGroups[key] = { type: key, count: 0, occurrences: [] };
}
errorGroups[key].count++;
errorGroups[key].occurrences.push({
timestamp: error.timestamp,
agentId: error.agentId,
severity: error.severity,
recovered: error.recovered,
});
}
return Object.values(errorGroups).sort((a, b) => b.count - a.count);
}
/**
* Analyze cost data across events
* @param {Array} events
* @returns {Object} - Cost analysis
*/
static costAnalysis(events) {
const withCost = events.filter(e => typeof e.cost === 'number');
if (withCost.length === 0) {
return { totalCost: 0, avgCost: null, maxCost: null, eventCount: 0 };
}
const costs = withCost.map(e => e.cost);
const total = costs.reduce((a, b) => a + b, 0);
// Group by agent
const costByAgent = {};
for (const event of withCost) {
const agent = event.agentId || 'unknown';
costByAgent[agent] = (costByAgent[agent] || 0) + event.cost;
}
return {
totalCost: Math.round(total * 10000) / 10000,
avgCost: Math.round((total / costs.length) * 10000) / 10000,
maxCost: Math.max(...costs),
eventCount: withCost.length,
costByAgent,
};
}
}
module.exports = { Analytics };
FILE:src/core/OpenClawInstrumentation.js
/**
* OpenClawInstrumentation - Automatic instrumentation hooks for OpenClaw integration
*
* When LobsterOps is installed as an OpenClaw skill, this module provides
* automatic capture of agent events by hooking into OpenClaw's event system.
*/
class OpenClawInstrumentation {
/**
* @param {Object} lobsterOps - LobsterOps instance
* @param {Object} options - Instrumentation options
* @param {boolean} options.captureToolCalls - Capture tool call events (default: true)
* @param {boolean} options.captureSpawns - Capture agent spawn events (default: true)
* @param {boolean} options.captureLifecycle - Capture lifecycle events (default: true)
* @param {boolean} options.captureReasoningTraces - Capture reasoning/thought events (default: true)
* @param {boolean} options.captureFileChanges - Capture file system changes (default: false)
* @param {boolean} options.captureGitOps - Capture git operations (default: false)
*/
constructor(lobsterOps, options = {}) {
this.ops = lobsterOps;
this.options = {
captureToolCalls: options.captureToolCalls !== false,
captureSpawns: options.captureSpawns !== false,
captureLifecycle: options.captureLifecycle !== false,
captureReasoningTraces: options.captureReasoningTraces !== false,
captureFileChanges: options.captureFileChanges === true,
captureGitOps: options.captureGitOps === true,
};
this.active = false;
this._hooks = [];
}
/**
* Activate instrumentation by attaching to OpenClaw's event system
* @param {Object} openClawContext - OpenClaw runtime context (if available)
*/
activate(openClawContext = null) {
this.active = true;
this.context = openClawContext;
// If OpenClaw context is provided, hook into its event emitters
if (openClawContext && typeof openClawContext.on === 'function') {
this._attachHooks(openClawContext);
}
}
/**
* Deactivate instrumentation and remove hooks
*/
deactivate() {
this.active = false;
for (const unhook of this._hooks) {
try { unhook(); } catch (e) { /* ignore */ }
}
this._hooks = [];
}
/**
* Manually instrument a tool call (for use when auto-hooks aren't available)
* @param {Object} toolCall - Tool call details
* @returns {Promise<string>} - Event ID
*/
async instrumentToolCall(toolCall) {
if (!this.active || !this.options.captureToolCalls) return null;
return this.ops.logToolCall({
agentId: toolCall.agentId || 'openclaw-agent',
toolName: toolCall.toolName || toolCall.name,
toolInput: toolCall.input,
toolOutput: toolCall.output,
durationMs: toolCall.durationMs,
success: toolCall.success,
cost: toolCall.cost,
sessionId: toolCall.sessionId,
});
}
/**
* Manually instrument a spawn event
* @param {Object} spawnInfo - Spawn details
* @returns {Promise<string>} - Event ID
*/
async instrumentSpawn(spawnInfo) {
if (!this.active || !this.options.captureSpawns) return null;
return this.ops.logSpawning({
parentAgentId: spawnInfo.parentId || 'openclaw-agent',
childAgentId: spawnInfo.childId,
childAgentType: spawnInfo.type,
task: spawnInfo.task,
spawnReason: spawnInfo.reason,
sessionId: spawnInfo.sessionId,
});
}
/**
* Manually instrument a lifecycle event
* @param {Object} lifecycle - Lifecycle details
* @returns {Promise<string>} - Event ID
*/
async instrumentLifecycle(lifecycle) {
if (!this.active || !this.options.captureLifecycle) return null;
return this.ops.logLifecycle({
agentId: lifecycle.agentId || 'openclaw-agent',
action: lifecycle.action,
status: lifecycle.status,
sessionId: lifecycle.sessionId,
version: lifecycle.version,
environment: lifecycle.environment,
});
}
/**
* Manually instrument a reasoning/thought event
* @param {Object} thought - Thought details
* @returns {Promise<string>} - Event ID
*/
async instrumentThought(thought) {
if (!this.active || !this.options.captureReasoningTraces) return null;
return this.ops.logThought({
agentId: thought.agentId || 'openclaw-agent',
thought: thought.content || thought.thought,
context: thought.context,
confidence: thought.confidence,
sessionId: thought.sessionId,
});
}
/**
* Manually instrument a file change event
* @param {Object} fileChange - File change details
* @returns {Promise<string>} - Event ID
*/
async instrumentFileChange(fileChange) {
if (!this.active || !this.options.captureFileChanges) return null;
return this.ops.logEvent({
type: 'file-change',
agentId: fileChange.agentId || 'openclaw-agent',
action: fileChange.action, // 'create', 'modify', 'delete'
filePath: fileChange.path,
linesChanged: fileChange.linesChanged,
sessionId: fileChange.sessionId,
});
}
/**
* Manually instrument a git operation
* @param {Object} gitOp - Git operation details
* @returns {Promise<string>} - Event ID
*/
async instrumentGitOp(gitOp) {
if (!this.active || !this.options.captureGitOps) return null;
return this.ops.logEvent({
type: 'git-operation',
agentId: gitOp.agentId || 'openclaw-agent',
action: gitOp.action, // 'commit', 'push', 'branch', 'merge'
details: gitOp.details,
sessionId: gitOp.sessionId,
});
}
/**
* Check if instrumentation is active
* @returns {boolean}
*/
isActive() {
return this.active;
}
/**
* Create a pre-configured instrumentation from OpenClaw config
* @param {Object} lobsterOps - LobsterOps instance
* @param {Object} config - OpenClaw config object
* @returns {OpenClawInstrumentation}
*/
static fromConfig(lobsterOps, config = {}) {
return new OpenClawInstrumentation(lobsterOps, {
captureToolCalls: config.captureToolCalls !== false,
captureSpawns: config.captureSpawns !== false,
captureLifecycle: config.captureLifecycle !== false,
captureReasoningTraces: config.captureReasoningTraces !== false,
captureFileChanges: config.captureFileChanges === true,
captureGitOps: config.captureGitOps === true,
});
}
// --- Private methods ---
_attachHooks(emitter) {
const hookEvent = (eventName, handler) => {
emitter.on(eventName, handler);
this._hooks.push(() => emitter.removeListener(eventName, handler));
};
if (this.options.captureToolCalls) {
hookEvent('tool:call', (data) => this.instrumentToolCall(data));
}
if (this.options.captureSpawns) {
hookEvent('agent:spawn', (data) => this.instrumentSpawn(data));
}
if (this.options.captureLifecycle) {
hookEvent('agent:start', (data) => this.instrumentLifecycle({ ...data, action: 'start' }));
hookEvent('agent:stop', (data) => this.instrumentLifecycle({ ...data, action: 'stop' }));
}
if (this.options.captureReasoningTraces) {
hookEvent('agent:thought', (data) => this.instrumentThought(data));
}
if (this.options.captureFileChanges) {
hookEvent('file:change', (data) => this.instrumentFileChange(data));
}
if (this.options.captureGitOps) {
hookEvent('git:operation', (data) => this.instrumentGitOp(data));
}
}
}
module.exports = { OpenClawInstrumentation };
FILE:src/core/PIIFilter.js
/**
* PII Filter - Detects and redacts personally identifiable information
*
* Supports filtering of: emails, phone numbers, SSNs, credit cards,
* IP addresses, and API keys/tokens.
*/
class PIIFilter {
/**
* @param {Object} options - Filter configuration
* @param {boolean} options.enabled - Whether filtering is active (default: true)
* @param {string[]} options.patterns - Which PII types to filter (default: all)
* @param {string} options.replacement - Replacement text (default: '[REDACTED]')
*/
constructor(options = {}) {
this.enabled = options.enabled !== false;
this.replacement = options.replacement || '[REDACTED]';
const allPatterns = ['email', 'phone', 'ssn', 'creditCard', 'ipAddress', 'apiKey'];
this.activePatterns = options.patterns || allPatterns;
this.matchers = {
email: /[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g,
phone: /(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}/g,
ssn: /\b\d{3}-\d{2}-\d{4}\b/g,
creditCard: /\b(?:\d[ -]*?){13,19}\b/g,
ipAddress: /\b(?:\d{1,3}\.){3}\d{1,3}\b/g,
apiKey: /(?:sk|pk|key|token|api[_-]?key|bearer)[-_]?(?:[a-zA-Z0-9][-_]?){20,}/gi,
};
}
/**
* Filter PII from a string value
* @param {string} value - The string to filter
* @returns {string} - Filtered string
*/
filterString(value) {
if (!this.enabled || typeof value !== 'string') return value;
let filtered = value;
for (const patternName of this.activePatterns) {
const matcher = this.matchers[patternName];
if (matcher) {
filtered = filtered.replace(matcher, this.replacement);
}
}
return filtered;
}
/**
* Recursively filter PII from an object or value
* @param {*} data - The data to filter
* @returns {*} - Filtered data
*/
filter(data) {
if (!this.enabled) return data;
if (data === null || data === undefined) return data;
if (typeof data === 'string') {
return this.filterString(data);
}
if (Array.isArray(data)) {
return data.map(item => this.filter(item));
}
if (typeof data === 'object') {
const filtered = {};
for (const [key, value] of Object.entries(data)) {
filtered[key] = this.filter(value);
}
return filtered;
}
return data;
}
}
module.exports = { PIIFilter };
FILE:src/storage/MemoryStorage.js
const { StorageAdapter } = require('./StorageAdapter');
/**
* In-Memory Storage Backend
* Useful for testing, temporary sessions, or when persistence isn't needed
* All data is lost when the process exits
*/
class MemoryStorage extends StorageAdapter {
/**
* @param {Object} config - Configuration options
* @param {number} config.maxEvents - Maximum events to keep in memory (default: 10000)
*/
constructor(config = {}) {
super();
this.events = new Map(); // eventId -> event
this.maxEvents = config.maxEvents || 10000;
this.initialized = false;
}
async init() {
this.initialized = true;
}
async saveEvent(event) {
if (!this.initialized) await this.init();
try {
const enrichedEvent = {
...event,
id: event.id || this._generateId(),
timestamp: event.timestamp || new Date().toISOString(),
storedAt: new Date().toISOString()
};
this.events.set(enrichedEvent.id, enrichedEvent);
// Enforce max events limit (remove oldest first)
if (this.events.size > this.maxEvents) {
// Get oldest events by timestamp
const sortedEvents = Array.from(this.events.values()).sort(
(a, b) => new Date(a.timestamp) - new Date(b.timestamp)
);
// Remove excess events
const eventsToRemove = sortedEvents.slice(0, this.events.size - this.maxEvents);
for (const event of eventsToRemove) {
this.events.delete(event.id);
}
}
return enrichedEvent.id;
} catch (error) {
throw new Error(`Failed to save event: error.message`);
}
}
async queryEvents(filter = {}, options = {}) {
if (!this.initialized) await this.init();
try {
const {
startDate, endDate, eventTypes, agentIds, limit = 100, offset = 0,
sortBy = 'timestamp', sortOrder = 'desc'
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: [],
limit: 100, offset: 0, sortBy: 'timestamp', sortOrder: 'desc'
, ...filter, ...options };
// Convert Map values to array and apply filters
let filteredEvents = Array.from(this.events.values());
if (startDate) {
const start = new Date(startDate);
filteredEvents = filteredEvents.filter(event => new Date(event.timestamp) >= start);
}
if (endDate) {
const end = new Date(endDate);
filteredEvents = filteredEvents.filter(event => new Date(event.timestamp) <= end);
}
if (eventTypes.length > 0) {
filteredEvents = filteredEvents.filter(event => eventTypes.includes(event.type));
}
if (agentIds.length > 0) {
filteredEvents = filteredEvents.filter(event => agentIds.includes(event.agentId));
}
// Apply sorting
const reverse = sortOrder.toLowerCase() === 'desc';
filteredEvents.sort((a, b) => {
const aVal = new Date(a[sortBy]);
const bVal = new Date(b[sortBy]);
return reverse ? bVal - aVal : aVal - bVal;
});
// Apply pagination
const startIdx = offset;
const endIdx = offset + limit;
const paginatedEvents = filteredEvents.slice(startIdx, endIdx);
return paginatedEvents;
} catch (error) {
throw new Error(`Failed to query events: error.message`);
}
}
async getEventById(eventId) {
if (!this.initialized) await this.init();
try {
return this.events.get(eventId) || null;
} catch (error) {
throw new Error(`Failed to get event by ID: error.message`);
}
}
async updateEvent(eventId, updates) {
if (!this.initialized) await this.init();
try {
const event = this.events.get(eventId);
if (!event) {
return false; // Event not found
}
const updatedEvent = {
...event,
...updates,
updatedAt: new Date().toISOString()
};
this.events.set(eventId, updatedEvent);
return true;
} catch (error) {
throw new Error(`Failed to update event: error.message`);
}
}
async deleteEvents(filter = {}) {
if (!this.initialized) await this.init();
try {
const {
startDate, endDate, eventTypes, agentIds
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: []
, ...filter };
let deletedCount = 0;
const eventsToDelete = [];
for (const [eventId, event] of this.events.entries()) {
let shouldDelete = true;
if (startDate && new Date(event.timestamp) < new Date(startDate)) {
shouldDelete = false;
}
if (endDate && new Date(event.timestamp) > new Date(endDate)) {
shouldDelete = false;
}
if (eventTypes.length > 0 && !eventTypes.includes(event.type)) {
shouldDelete = false;
}
if (agentIds.length > 0 && !agentIds.includes(event.agentId)) {
shouldDelete = false;
}
if (shouldDelete) {
eventsToDelete.push(eventId);
}
}
// Delete the marked events
for (const eventId of eventsToDelete) {
this.events.delete(eventId);
deletedCount++;
}
return deletedCount;
} catch (error) {
throw new Error(`Failed to delete events: error.message`);
}
}
async cleanupOld() {
if (!this.initialized) await this.init();
try {
// For memory storage, cleanup old means applying our maxEvents limit
// or we could add a maxAge option
const initialSize = this.events.size;
if (this.events.size > this.maxEvents) {
// Remove oldest events
const sortedEvents = Array.from(this.events.values()).sort(
(a, b) => new Date(a.timestamp) - new Date(b.timestamp)
);
const eventsToRemove = sortedEvents.slice(0, this.events.size - this.maxEvents);
for (const event of eventsToRemove) {
this.events.delete(event.id);
}
}
return initialSize - this.events.size;
} catch (error) {
throw new Error(`Failed to cleanup old events: error.message`);
}
}
async getStats() {
if (!this.initialized) await this.init();
try {
const events = Array.from(this.events.values());
const timestamps = events.map(e => new Date(e.timestamp));
const oldest = timestamps.length > 0 ? new Date(Math.min(...timestamps.map(t => t.getTime()))) : null;
const newest = timestamps.length > 0 ? new Date(Math.max(...timestamps.map(t => t.getTime()))) : null;
return {
backend: 'memory',
eventCount: this.events.size,
oldestEvent: oldest ? oldest.toISOString() : null,
newestEvent: newest ? newest.toISOString() : null,
maxEvents: this.maxEvents
};
} catch (error) {
throw new Error(`Failed to get stats: error.message`);
}
}
async close() {
this.events.clear();
this.initialized = false;
}
/**
* Generate a simple unique ID
* @returns {string} - Unique ID
*/
_generateId() {
return Math.random().toString(36).substr(2, 9) +
Date.now().toString(36) +
Math.random().toString(36).substr(2, 9);
}
}
module.exports = { MemoryStorage };
FILE:src/storage/SQLiteStorage.js
const { StorageAdapter } = require('./StorageAdapter');
const sqlite3 = require('sqlite3').verbose();
const path = require('path');
/**
* SQLite Storage Backend
* Lightweight, file-based SQL database that works everywhere
* Better performance than JSON files for querying and scaling
*/
class SQLiteStorage extends StorageAdapter {
/**
* @param {Object} config - Configuration options
* @param {string} config.filename - SQLite database file path (default: ./lobsterops.db)
* @param {boolean} config.verbose - Whether to log SQLite errors (default: false)
*/
constructor(config = {}) {
super();
this.filename = config.filename || './lobsterops.db';
this.verbose = config.verbose || false;
this.db = null;
this.initialized = false;
// Ensure directory exists for the database file
this._ensureDirectory();
}
async init() {
if (this.initialized) return Promise.resolve();
return new Promise((resolve, reject) => {
this.db = new sqlite3.Database(this.filename, (err) => {
if (err) {
if (this.verbose) console.error('SQLite connection error:', err);
reject(new Error(`Failed to connect to SQLite: err.message`));
return;
}
// Enable foreign key constraints
this.db.run('PRAGMA foreign_keys = ON;', (err) => {
if (err) {
if (this.verbose) console.error('SQLite PRAGMA error:', err);
// Continue anyway - this isn't critical
}
// Create tables if they don't exist
this.db.serialize(() => {
// Events table
this.db.run(`
CREATE TABLE IF NOT EXISTS lobsterops_events (
id TEXT PRIMARY KEY,
type TEXT NOT NULL,
agentId TEXT,
action TEXT,
timestamp TEXT NOT NULL,
storedAt TEXT NOT NULL,
data TEXT, -- JSON string of the event data
updatedAt TEXT
)
`, (err) => {
if (err) {
if (this.verbose) console.error('SQLite table creation error:', err);
reject(new Error(`Failed to create tables: err.message`));
return;
}
// Indexes for common query patterns
this.db.run(`
CREATE INDEX IF NOT EXISTS idx_events_timestamp
ON lobsterops_events(timestamp)
`, (err) => {
if (err && this.verbose) console.error('Index creation warning:', err);
this.db.run(`
CREATE INDEX IF NOT EXISTS idx_events_type
ON lobsterops_events(type)
`, (err) => {
if (err && this.verbose) console.error('Index creation warning:', err);
this.db.run(`
CREATE INDEX IF NOT EXISTS idx_events_agentId
ON lobsterops_events(agentId)
`, (err) => {
if (err && this.verbose) console.error('Index creation warning:', err);
this.db.run(`
CREATE INDEX IF NOT EXISTS idx_events_action
ON lobsterops_events(action)
`, (err) => {
if (err && this.verbose) console.error('Index creation warning:', err);
this.initialized = true;
resolve();
});
});
});
});
});
});
});
});
});
}
_executeQuery(sql, params = []) {
if (!this.initialized || !this.db) {
return Promise.reject(new Error('Storage not initialized'));
}
return new Promise((resolve, reject) => {
this.db.all(sql, params, (err, rows) => {
if (err) {
if (this.verbose) console.error('SQLite query error:', err, sql, params);
reject(new Error(`Database query failed: err.message`));
return;
}
resolve(rows);
});
});
}
_executeRun(sql, params = []) {
if (!this.initialized || !this.db) {
return Promise.reject(new Error('Storage not initialized'));
}
return new Promise((resolve, reject) => {
this.db.run(sql, params, function(err) {
if (err) {
if (this.verbose) console.error('SQLite run error:', err, sql, params);
reject(new Error(`Database operation failed: err.message`));
return;
}
resolve(this);
});
});
}
async saveEvent(event) {
if (!this.initialized) await this.init();
try {
const enrichedEvent = {
...event,
id: event.id || this._generateId(),
timestamp: event.timestamp || new Date().toISOString(),
storedAt: new Date().toISOString()
};
const dataJson = JSON.stringify(enrichedEvent);
await this._executeRun(
`INSERT INTO lobsterops_events (id, type, agentId, action, timestamp, storedAt, data)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
[
enrichedEvent.id,
enrichedEvent.type,
enrichedEvent.agentId || null,
enrichedEvent.action || null,
enrichedEvent.timestamp,
enrichedEvent.storedAt,
dataJson
]
);
return enrichedEvent.id;
} catch (error) {
throw new Error(`Failed to save event: error.message`);
}
}
async queryEvents(filter = {}, options = {}) {
if (!this.initialized) await this.init();
try {
const {
startDate, endDate, eventTypes, agentIds, actions,
limit = 100, offset = 0,
sortBy = 'timestamp', sortOrder = 'desc'
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: [], actions: [],
limit: 100, offset: 0, sortBy: 'timestamp', sortOrder: 'desc'
, ...filter, ...options };
// Validate sort direction
const validSortOrder = sortOrder.toLowerCase() === 'asc' ? 'ASC' : 'DESC';
// Build WHERE clause and parameters
let whereClause = '1=1'; // Start with always true
const params = [];
if (startDate) {
whereClause += ' AND timestamp >= ?';
params.push(startDate);
}
if (endDate) {
whereClause += ' AND timestamp <= ?';
params.push(endDate);
}
if (eventTypes.length > 0) {
const placeholders = eventTypes.map(() => '?').join(',');
whereClause += ` AND type IN (placeholders)`;
params.push(...eventTypes);
}
if (agentIds.length > 0) {
const placeholders = agentIds.map(() => '?').join(',');
whereClause += ` AND agentId IN (placeholders)`;
params.push(...agentIds);
}
if (actions.length > 0) {
const placeholders = actions.map(() => '?').join(',');
whereClause += ` AND action IN (placeholders)`;
params.push(...actions);
}
// Build and execute query
const query = `
SELECT * FROM lobsterops_events
WHERE whereClause
ORDER BY sortBy validSortOrder
LIMIT ? OFFSET ?
`;
params.push(limit, offset);
const rows = await this._executeQuery(query, params);
// Parse JSON data back to objects
const events = rows.map(row => {
let parsedData;
try {
parsedData = JSON.parse(row.data);
} catch (e) {
// If JSON parsing fails, return raw data
parsedData = row.data;
}
return {
id: row.id,
type: row.type,
agentId: row.agentId,
action: row.action,
timestamp: row.timestamp,
storedAt: row.storedAt,
updatedAt: row.updatedAt,
...parsedData
};
});
return events;
} catch (error) {
throw new Error(`Failed to query events: error.message`);
}
}
async getEventById(eventId) {
if (!this.initialized) await this.init();
try {
const rows = await this._executeQuery(
`SELECT * FROM lobsterops_events WHERE id = ?`,
[eventId]
);
if (rows.length === 0) {
return null;
}
const row = rows[0];
let parsedData;
try {
parsedData = JSON.parse(row.data);
} catch (e) {
parsedData = row.data;
}
return {
id: row.id,
type: row.type,
agentId: row.agentId,
action: row.action,
timestamp: row.timestamp,
storedAt: row.storedAt,
updatedAt: row.updatedAt,
...parsedData
};
} catch (error) {
throw new Error(`Failed to get event: error.message`);
}
}
async updateEvent(eventId, updates) {
if (!this.initialized) await this.init();
try {
// Add updatedAt timestamp
const updateWithTime = {
...updates,
updatedAt: new Date().toISOString()
};
// Get current event to merge updates
const currentEvent = await this.getEventById(eventId);
if (!currentEvent) {
return false; // Event not found
}
const mergedEvent = {
...currentEvent,
...updateWithTime
};
const dataJson = JSON.stringify(mergedEvent);
await this._executeRun(
`UPDATE lobsterops_events
SET data = ?, updatedAt = ?
WHERE id = ?`,
[dataJson, updateWithTime.updatedAt, eventId]
);
return true;
} catch (error) {
throw new Error(`Failed to update event: error.message`);
}
}
async deleteEvents(filter = {}) {
if (!this.initialized) await this.init();
try {
const {
startDate, endDate, eventTypes, agentIds, actions
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: [], actions: []
, ...filter };
// Build WHERE clause and parameters
let whereClause = '1=1'; // Start with always true
const params = [];
if (startDate) {
whereClause += ' AND timestamp >= ?';
params.push(startDate);
}
if (endDate) {
whereClause += ' AND timestamp <= ?';
params.push(endDate);
}
if (eventTypes.length > 0) {
const placeholders = eventTypes.map(() => '?').join(',');
whereClause += ` AND type IN (placeholders)`;
params.push(...eventTypes);
}
if (agentIds.length > 0) {
const placeholders = agentIds.map(() => '?').join(',');
whereClause += ` AND agentId IN (placeholders)`;
params.push(...agentIds);
}
if (actions.length > 0) {
const placeholders = actions.map(() => '?').join(',');
whereClause += ` AND action IN (placeholders)`;
params.push(...actions);
}
// First, get count of events to delete (for return value)
const countResult = await this._executeQuery(
`SELECT COUNT(*) as count FROM lobsterops_events WHERE whereClause`,
params
);
const countToDelete = countResult[0].count;
// Then perform the deletion
await this._executeRun(
`DELETE FROM lobsterops_events WHERE whereClause`,
params
);
return countToDelete;
} catch (error) {
throw new Error(`Failed to delete events: error.message`);
}
}
async cleanupOld(maxAgeDays = 30) {
if (!this.initialized) await this.init();
try {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - maxAgeDays);
const cutoffISO = cutoffDate.toISOString();
// Count events to delete
const countResult = await this._executeQuery(
'SELECT COUNT(*) as count FROM lobsterops_events WHERE timestamp < ?',
[cutoffISO]
);
const countToDelete = countResult[0].count;
if (countToDelete === 0) return 0;
// Delete old events
await this._executeRun(
'DELETE FROM lobsterops_events WHERE timestamp < ?',
[cutoffISO]
);
return countToDelete;
} catch (error) {
throw new Error(`Failed to cleanup old events: error.message`);
}
}
async getStats() {
if (!this.initialized) await this.init();
try {
// Get total count
const countResult = await this._executeQuery(`SELECT COUNT(*) as count FROM lobsterops_events`);
const count = countResult[0].count;
// Get size info (approximate)
const sizeResult = await this._executeQuery(`SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size()`);
const sizeInBytes = sizeResult[0].size || 0;
return {
backend: 'sqlite',
filename: this.filename,
eventCount: count,
databaseSizeBytes: sizeInBytes,
databaseSizeMB: Number((sizeInBytes / (1024 * 1024)).toFixed(2))
};
} catch (error) {
throw new Error(`Failed to get stats: error.message`);
}
}
async close() {
if (this.db) {
return new Promise((resolve, reject) => {
this.db.close((err) => {
if (err) {
if (this.verbose) console.error('SQLite close error:', err);
reject(new Error(`Failed to close database: err.message`));
return;
}
this.initialized = false;
resolve();
});
});
}
this.initialized = false;
return Promise.resolve();
}
/**
* Ensure the directory for the database file exists
* @private
*/
_ensureDirectory() {
const dir = path.dirname(this.filename);
if (dir && dir !== '.') {
const fs = require('fs').promises;
fs.mkdir(dir, { recursive: true }).catch(() => {
// Ignore if directory already exists or we can't create it
});
}
}
/**
* Generate a simple unique ID
* @returns {string} - Unique ID
*/
_generateId() {
return Math.random().toString(36).substr(2, 9) +
Date.now().toString(36) +
Math.random().toString(36).substr(2, 9);
}
}
module.exports = { SQLiteStorage };
FILE:src/storage/StorageAdapter.js
/**
* Storage Adapter Interface
* Defines the contract for all storage backends in LobsterOps
*
* This interface ensures storage pluggability - you can swap between
* Supabase, SQLite, JSON files, memory, etc. without changing core logic
*/
class StorageAdapter {
/**
* Initialize the storage backend
* @returns {Promise<void>}
*/
async init() {
throw new Error('StorageAdapter.init() must be implemented by subclass');
}
/**
* Save a single agent event
* @param {Object} event - The agent event to store
* @returns {Promise<string>} - Returns the event ID
*/
async saveEvent(event) {
throw new Error('StorageAdapter.saveEvent() must be implemented by subclass');
}
/**
* Query events with filtering options
* @param {Object} filter - Filter criteria (time range, event types, etc.)
* @param {Object} options - Pagination and sorting options
* @returns {Promise<Array<Object>>} - Array of matching events
*/
async queryEvents(filter = {}, options = {}) {
throw new Error('StorageAdapter.queryEvents() must be implemented by subclass');
}
/**
* Retrieve a specific event by ID
* @param {string} eventId - The ID of the event to retrieve
* @returns {Promise<Object|null>} - The event or null if not found
*/
async getEventById(eventId) {
throw new Error('StorageAdapter.getEventById() must be implemented by subclass');
}
/**
* Update an existing event
* @param {string} eventId - The ID of the event to update
* @param {Object} updates - The fields to update
* @returns {Promise<boolean>} - True if successful
*/
async updateEvent(eventId, updates) {
throw new Error('StorageAdapter.updateEvent() must be implemented by subclass');
}
/**
* Delete events matching criteria
* @param {Object} filter - Filter criteria for deletion
* @returns {Promise<number>} - Number of events deleted
*/
async deleteEvents(filter) {
throw new Error('StorageAdapter.deleteEvents() must be implemented by subclass');
}
/**
* Clean up old events based on retention policy
* @returns {Promise<number>} - Number of events removed
*/
async cleanupOld() {
throw new Error('StorageAdapter.cleanupOld() must be implemented by subclass');
}
/**
* Get storage statistics
* @returns {Promise<Object>} - Storage usage statistics
*/
async getStats() {
throw new Error('StorageAdapter.getStats() must be implemented by subclass');
}
/**
* Close storage connections and cleanup
* @returns {Promise<void>}
*/
async close() {
throw new Error('StorageAdapter.close() must be implemented by subclass');
}
}
module.exports = { StorageAdapter };
FILE:src/storage/StorageFactory.js
const { JsonFileStorage } = require('./JsonFileStorage');
const { MemoryStorage } = require('./MemoryStorage');
const { SQLiteStorage } = require('./SQLiteStorage');
const { SupabaseStorage } = require('./SupabaseStorage');
/**
* Storage Factory
* Creates and configures storage backends based on environment or explicit configuration
*
* Supports:
* - json: JSON file storage (zero dependency, works everywhere)
* - memory: In-memory storage (for testing/temporary use)
* - sqlite: SQLite storage (lightweight, file-based)
* - supabase: Supabase storage (cloud-based PostgreSQL)
*/
class StorageFactory {
/**
* Create a storage backend based on configuration
* @param {string} type - Storage type ('json', 'memory', 'sqlite', 'supabase')
* @param {Object} config - Configuration specific to the storage type
* @returns {StorageAdapter} - Configured storage backend instance
*/
static createStorage(type = 'json', config = {}) {
switch (type.toLowerCase()) {
case 'json':
return new JsonFileStorage(config);
case 'memory':
return new MemoryStorage(config);
case 'sqlite':
return new SQLiteStorage(config);
case 'supabase':
return new SupabaseStorage(config);
default:
throw new Error(`Unsupported storage type: type. Supported types: json, memory, sqlite, supabase`);
}
}
/**
* Automatically detect and create the best available storage backend
* @param {Object} config - Configuration options
* @returns {StorageAdapter} - Best available storage backend
*/
static createAutoStorage(config = {}) {
// Check for explicit storage type in config
if (config.type) {
return this.createStorage(config.type, config);
}
// Check environment variables
const envType = process.env.LOBSTER_STORAGE || process.env.STORAGE_TYPE;
if (envType) {
return this.createStorage(envType, config);
}
// Auto-detection logic
// 1. If we're in a testing environment, prefer memory
if (process.env.NODE_ENV === 'test' || process.env.JEST_WORKER_ID) {
return new MemoryStorage(config);
}
// 2. Otherwise, default to JSON file storage (works everywhere)
return new JsonFileStorage(config);
}
/**
* Get list of supported storage types
* @returns {Array<string>} - Supported storage types
*/
static getSupportedTypes() {
return ['json', 'memory', 'sqlite', 'supabase'];
}
}
module.exports = { StorageFactory };
FILE:src/storage/SupabaseStorage.js
const { StorageAdapter } = require('./StorageAdapter');
const { createClient } = require('@supabase/supabase-js');
/**
* Supabase Storage Backend
* Cloud-based PostgreSQL storage via Supabase
* Ideal for production, team collaboration, and cross-device sync
*/
class SupabaseStorage extends StorageAdapter {
/**
* @param {Object} config - Configuration options
* @param {string} config.supabaseUrl - Supabase project URL
* @param {string} config.supabaseKey - Supabase anon key or service role key
* @param {string} config.tableName - Table name for events (default: 'agent_events')
* @param {number} config.maxRetries - Max retry attempts for failed operations (default: 3)
*/
constructor(config = {}) {
super();
this.supabaseUrl = config.supabaseUrl;
this.supabaseKey = config.supabaseKey;
this.tableName = config.tableName || 'agent_events';
this.maxRetries = config.maxRetries || 3;
this.retryDelay = config.retryDelay || 1000; // Start with 1 second delay
if (!this.supabaseUrl || !this.supabaseKey) {
throw new Error('Supabase storage requires supabaseUrl and supabaseKey configuration');
}
this.supabase = createClient(this.supabaseUrl, this.supabaseKey);
this.initialized = false;
this.tableExists = false;
}
async init() {
if (this.initialized) return;
try {
// Test connection by trying to query the table (will create if doesn't exist)
await this._ensureTableExists();
this.initialized = true;
} catch (error) {
throw new Error(`Failed to initialize Supabase storage: error.message`);
}
}
async _ensureTableExists() {
if (this.tableExists) return;
try {
// Try to select from the table to see if it exists
const { data, error } = await this.supabase
.from(this.tableName)
.select('id')
.limit(1);
if (error && error.code === '42P01') { // Table doesn't exist
await this._createTable();
} else if (error) {
throw error;
}
// If no error, table exists
this.tableExists = true;
} catch (error) {
// If we can't determine table existence, try to create it
// (handles permission issues where we can't check but can create)
await this._createTable();
this.tableExists = true;
}
}
async _createTable() {
// Note: Supabase doesn't allow direct DDL execution via client
// Tables need to be created through the Supabase dashboard or SQL editor
// For now, we'll throw an informative error guiding the user to create the table
throw new Error(
`Supabase table 'this.tableName' does not exist. Please create it in your Supabase project:\n\n` +
`CREATE TABLE this.tableName (\n` +
` id UUID PRIMARY KEY DEFAULT gen_random_uuid(),\n` +
` type TEXT NOT NULL,\n` +
` agentId TEXT,\n` +
` action TEXT,\n` +
` timestamp TIMESTAMPTZ NOT NULL,\n` +
` storedAt TIMESTAMPTZ NOT NULL,\n` +
` data JSONB NOT NULL DEFAULT '{}'::jsonb,\n` +
` updatedAt TIMESTAMPTZ,\n` +
` createdAt TIMESTAMPTZ DEFAULT NOW()\n` +
`);\n\n` +
`-- Create indexes for common query patterns\n` +
`CREATE INDEX idx_this.tableName_timestamp ON this.tableName(timestamp);\n` +
`CREATE INDEX idx_this.tableName_type ON this.tableName(type);\n` +
`CREATE INDEX idx_this.tableName_agentId ON this.tableName(agentId);\n` +
`CREATE INDEX idx_this.tableName_action ON this.tableName(action);\n` +
`-- Create index for JSONB data querying (GIN index)\n` +
`CREATE INDEX idx_this.tableName_data ON this.tableName USING GIN (data);`
);
}
async _executeWithRetry(operation, attempt = 1) {
try {
return await operation();
} catch (error) {
if (attempt >= this.maxRetries) {
throw new Error(`Operation failed after this.maxRetries attempts: error.message`);
}
// Wait before retrying (exponential backoff)
await new Promise(resolve => setTimeout(resolve, this.retryDelay * Math.pow(2, attempt - 1)));
return this._executeWithRetry(operation, attempt + 1);
}
}
async saveEvent(event) {
if (!this.initialized) await this.init();
try {
// Extract known top-level fields and put the rest in data JSONB
const {
id,
type,
agentId,
action,
timestamp,
storedAt,
updatedAt,
createdAt,
...data
} = event;
const enrichedEvent = {
id: id || this._generateId(),
type: type || '',
agentId: agentId || null,
action: action || null,
timestamp: timestamp || new Date().toISOString(),
storedAt: storedAt || new Date().toISOString(),
data: {
...data,
...(updatedAt ? { updatedAt } : {}),
...(createdAt ? { createdAt } : {})
},
updatedAt: updatedAt || null,
createdAt: createdAt || new Date().toISOString()
};
const { data: resultData, error } = await this._executeWithRetry(() =>
this.supabase
.from(this.tableName)
.insert([enrichedEvent])
.select()
);
if (error) throw error;
return resultData[0].id;
} catch (error) {
throw new Error(`Failed to save event: error.message`);
}
}
async queryEvents(filter = {}, options = {}) {
if (!this.initialized) await this.init();
try {
let query = this.supabase.from(this.tableName).select('*');
const {
startDate, endDate, eventTypes, agentIds, actions,
limit = 100, offset = 0,
sortBy = 'timestamp', sortOrder = 'desc',
// Custom filters for data JSONB fields
dataFilters
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: [], actions: [],
limit: 100, offset: 0, sortBy: 'timestamp', sortOrder: 'desc',
dataFilters: {}
, ...filter, ...options };
// Apply filters
if (startDate) {
query = query.gte('timestamp', startDate);
}
if (endDate) {
query = query.lte('timestamp', endDate);
}
if (eventTypes.length > 0) {
query = query.in('type', eventTypes);
}
if (agentIds.length > 0) {
query = query.in('agentId', agentIds);
}
if (actions.length > 0) {
query = query.in('action', actions);
}
// Apply custom data filters (for JSONB fields)
if (dataFilters && Object.keys(dataFilters).length > 0) {
Object.entries(dataFilters).forEach(([key, value]) => {
query = query.filter('data', '->>', key, 'eq', value.toString());
});
}
// Apply sorting
const ascending = sortOrder.toLowerCase() === 'asc';
query = query.order(sortBy, { ascending });
// Apply pagination
query = query.range(offset, offset + limit - 1);
const { data, error } = await this._executeWithRetry(() => query);
if (error) throw error;
// Transform the data to flatten the JSONB fields back to top-level for consistency
return data.map(row => ({
...row,
...row.data
}));
} catch (error) {
throw new Error(`Failed to query events: error.message`);
}
}
async getEventById(eventId) {
if (!this.initialized) await this.init();
try {
const { data, error } = await this._executeWithRetry(() =>
this.supabase
.from(this.tableName)
.select('*')
.eq('id', eventId)
.single()
);
if (error) throw error;
// If no data returned, Supabase throws an error for .single()
if (!data) {
return null;
}
// Flatten JSONB data back to top-level
return {
...data,
...data.data
};
} catch (error) {
// Handle case where event doesn't exist
if (error.code === 'PGRST116') { // No rows returned
return null;
}
throw new Error(`Failed to get event: error.message`);
}
}
async updateEvent(eventId, updates) {
if (!this.initialized) await this.init();
try {
// Separate top-level fields from data fields
const {
agentId,
action,
timestamp,
...dataUpdates
} = updates;
const updateWithTime = {
...(agentId !== undefined ? { agentId } : {}),
...(action !== undefined ? { action } : {}),
...(timestamp !== undefined ? { timestamp } : {}),
updatedAt: new Date().toISOString(),
...(Object.keys(dataUpdates).length > 0 ? { data: dataUpdates } : {})
};
// Remove undefined values
Object.keys(updateWithTime).forEach(key => {
if (updateWithTime[key] === undefined) {
delete updateWithTime[key];
}
});
const { data, error } = await this._executeWithRetry(() =>
this.supabase
.from(this.tableName)
.update(updateWithTime)
.eq('id', eventId)
);
if (error) throw error;
// Check if any rows were updated
return data && data.length > 0;
} catch (error) {
throw new Error(`Failed to update event: error.message`);
}
}
async deleteEvents(filter = {}) {
if (!this.initialized) await this.init();
try {
let query = this.supabase.from(this.tableName).delete();
const {
startDate, endDate, eventTypes, agentIds, actions,
// Custom filters for data JSONB fields
dataFilters
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: [], actions: [],
dataFilters: {}
, ...filter };
// Apply filters (same as queryEvents)
if (startDate) {
query = query.gte('timestamp', startDate);
}
if (endDate) {
query = query.lte('timestamp', endDate);
}
if (eventTypes.length > 0) {
query = query.in('type', eventTypes);
}
if (agentIds.length > 0) {
query = query.in('agentId', agentIds);
}
if (actions.length > 0) {
query = query.in('action', actions);
}
// Apply custom data filters (for JSONB fields)
if (dataFilters && Object.keys(dataFilters).length > 0) {
Object.entries(dataFilters).forEach(([key, value]) => {
query = query.filter('data', '->>', key, 'eq', value.toString());
});
}
// First, get count of events to delete
const countQuery = this.supabase.from(this.tableName).select('id', { count: 'exact' });
if (startDate) {
countQuery.gte('timestamp', startDate);
}
if (endDate) {
countQuery.lte('timestamp', endDate);
}
if (eventTypes.length > 0) {
countQuery.in('type', eventTypes);
}
if (agentIds.length > 0) {
countQuery.in('agentId', agentIds);
}
if (actions.length > 0) {
countQuery.in('action', actions);
}
// Apply custom data filters to count query
if (dataFilters && Object.keys(dataFilters).length > 0) {
Object.entries(dataFilters).forEach(([key, value]) => {
countQuery = countQuery.filter('data', '->>', key, 'eq', value.toString());
});
}
const { data: countData, error: countError } = await this._executeWithRetry(() => countQuery);
if (countError) throw countError;
const countToDelete = countData.length;
// Then perform the deletion
const { data, error } = await this._executeWithRetry(() => query);
if (error) throw error;
return countToDelete;
} catch (error) {
throw new Error(`Failed to delete events: error.message`);
}
}
async cleanupOld(maxAgeDays = 30) {
if (!this.initialized) await this.init();
try {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - maxAgeDays);
const cutoffISO = cutoffDate.toISOString();
// Count events to delete
const { data: countData, error: countError } = await this._executeWithRetry(() =>
this.supabase
.from(this.tableName)
.select('id', { count: 'exact' })
.lt('timestamp', cutoffISO)
);
if (countError) throw countError;
const countToDelete = countData ? countData.length : 0;
if (countToDelete === 0) return 0;
// Delete old events
const { error: deleteError } = await this._executeWithRetry(() =>
this.supabase
.from(this.tableName)
.delete()
.lt('timestamp', cutoffISO)
);
if (deleteError) throw deleteError;
return countToDelete;
} catch (error) {
throw new Error(`Failed to cleanup old events: error.message`);
}
}
async getStats() {
if (!this.initialized) await this.init();
try {
const { data, error } = await this._executeWithRetry(() =>
this.supabase
.from(this.tableName)
.select('*', { count: 'exact' })
.limit(0) // We only want the count
);
if (error) throw error;
return {
backend: 'supabase',
tableName: this.tableName,
eventCount: data.count,
url: this.supabaseUrl
};
} catch (error) {
throw new Error(`Failed to get stats: error.message`);
}
}
async close() {
// Supabase client doesn't need explicit closing
this.initialized = false;
return Promise.resolve();
}
/**
* Generate a simple unique ID
* @returns {string} - Unique ID
*/
_generateId() {
return Math.random().toString(36).substr(2, 9) +
Date.now().toString(36) +
Math.random().toString(36).substr(2, 9);
}
}
module.exports = { SupabaseStorage };
FILE:src/storage/JsonFileStorage.js
const { StorageAdapter } = require('./StorageAdapter');
const fs = require('fs').promises;
const path = require('path');
/**
* JSON File Storage Backend
* Zero-dependency storage that works anywhere with file system access
* Stores events as daily JSON files for easy portability and backup
*/
class JsonFileStorage extends StorageAdapter {
/**
* @param {Object} config - Configuration options
* @param {string} config.dataDir - Directory to store JSON files (default: ./lobsterops-data)
* @param {number} config.maxAgeDays - Maximum age of files to keep (default: 30)
*/
constructor(config = {}) {
super();
this.dataDir = config.dataDir || './lobsterops-data';
this.maxAgeDays = config.maxAgeDays || 30;
this.filePrefix = 'lobsterops-events-';
this.initialized = false;
}
async init() {
try {
// Ensure data directory exists
await fs.mkdir(this.dataDir, { recursive: true });
this.initialized = true;
// Run initial cleanup of old files
await this.cleanupOld();
} catch (error) {
throw new Error(`Failed to initialize JsonFileStorage: error.message`);
}
}
/**
* Get the filename for a given date
* @param {Date} date - The date to get filename for
* @returns {string} - Filename for the date
*/
_getFilenameForDate(date) {
const year = date.getFullYear();
const month = String(date.getMonth() + 1).padStart(2, '0');
const day = String(date.getDate()).padStart(2, '0');
return `this.filePrefixyear-month-day.json`;
}
/**
* Get today's filename
* @returns {string} - Today's filename
*/
_getTodayFilename() {
return this._getFilenameForDate(new Date());
}
/**
* Read events from a JSON file
* @param {string} filename - The filename to read from
* @returns {Promise<Array>} - Events stored in the file
*/
async _readEventsFromFile(filename) {
try {
const filepath = path.join(this.dataDir, filename);
const data = await fs.readFile(filepath, 'utf8');
const parsed = JSON.parse(data);
return Array.isArray(parsed) ? parsed : [];
} catch (error) {
// If file doesn't exist or is invalid JSON, return empty array
if (error.code === 'ENOENT' || error instanceof SyntaxError) {
return [];
}
throw error;
}
}
/**
* Write events to a JSON file
* @param {string} filename - The filename to write to
* @param {Array} events - Events to write
* @returns {Promise<void>}
*/
async _writeEventsToFile(filename, events) {
const filepath = path.join(this.dataDir, filename);
const data = JSON.stringify(events, null, 2);
await fs.writeFile(filepath, data, 'utf8');
}
async saveEvent(event) {
if (!this.initialized) await this.init();
try {
// Add metadata to the event
const enrichedEvent = {
...event,
id: event.id || this._generateId(),
timestamp: event.timestamp || new Date().toISOString(),
storedAt: new Date().toISOString()
};
// Get today's events
const todayFilename = this._getTodayFilename();
let events = await this._readEventsFromFile(todayFilename);
// Add the new event
events.push(enrichedEvent);
// Write back to file
await this._writeEventsToFile(todayFilename, events);
return enrichedEvent.id;
} catch (error) {
throw new Error(`Failed to save event: error.message`);
}
}
async queryEvents(filter = {}, options = {}) {
if (!this.initialized) await this.init();
try {
const {
startDate, endDate, eventTypes, agentIds, limit = 100, offset = 0, sortBy = 'timestamp', sortOrder = 'desc'
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: [],
limit: 100, offset: 0, sortBy: 'timestamp', sortOrder: 'desc'
, ...filter, ...options };
// Determine which date ranges we need to check
const filesToCheck = this._getFilesInDateRange(startDate, endDate);
// Collect all matching events
let allEvents = [];
for (const filename of filesToCheck) {
const events = await this._readEventsFromFile(filename);
allEvents = [...allEvents, ...events];
}
// Apply filters
let filteredEvents = allEvents;
if (startDate) {
const start = new Date(startDate);
filteredEvents = filteredEvents.filter(event => new Date(event.timestamp) >= start);
}
if (endDate) {
const end = new Date(endDate);
filteredEvents = filteredEvents.filter(event => new Date(event.timestamp) <= end);
}
if (eventTypes.length > 0) {
filteredEvents = filteredEvents.filter(event => eventTypes.includes(event.type));
}
if (agentIds.length > 0) {
filteredEvents = filteredEvents.filter(event => agentIds.includes(event.agentId));
}
// Apply sorting
const reverse = sortOrder.toLowerCase() === 'desc';
filteredEvents.sort((a, b) => {
const aVal = new Date(a[sortBy]);
const bVal = new Date(b[sortBy]);
return reverse ? bVal - aVal : aVal - bVal;
});
// Apply pagination
const startIdx = offset;
const endIdx = offset + limit;
const paginatedEvents = filteredEvents.slice(startIdx, endIdx);
return paginatedEvents;
} catch (error) {
throw new Error(`Failed to query events: error.message`);
}
}
/**
* Get list of files to check based on date range
* @param {Date|null} startDate - Start of date range (inclusive)
* @param {Date|null} endDate - End of date range (inclusive)
* @returns {Array<string>} - List of filenames to check
*/
_getFilesInDateRange(startDate, endDate) {
const files = [];
// If no date range specified, just check today
if (!startDate && !endDate) {
return [this._getTodayFilename()];
}
const start = startDate ? new Date(startDate) : new Date(0);
const end = endDate ? new Date(endDate) : new Date();
// Clamp start to reasonable minimum (avoid checking years of empty files)
const clampedStart = new Date(Math.max(start.getTime(), Date.now() - (this.maxAgeDays + 1) * 24 * 60 * 60 * 1000));
let current = new Date(clampedStart);
while (current <= end) {
files.push(this._getFilenameForDate(new Date(current)));
current.setDate(current.getDate() + 1);
}
return files;
}
async getEventById(eventId) {
if (!this.initialized) await this.init();
try {
// We'll need to search through files to find the event
// For efficiency in production, you'd want an index, but for simplicity
// we'll search recent files first
const filesToCheck = this._getFilesInDateRange(
null,
new Date(Date.now() + 24 * 60 * 60 * 1000) // Today + 1 day to be safe
);
// Search from most recent to oldest (more likely to find recent events)
for (let i = filesToCheck.length - 1; i >= 0; i--) {
const filename = filesToCheck[i];
const events = await this._readEventsFromFile(filename);
const event = events.find(e => e.id === eventId);
if (event) {
return event;
}
}
return null; // Not found
} catch (error) {
throw new Error(`Failed to get event by ID: error.message`);
}
}
async updateEvent(eventId, updates) {
if (!this.initialized) await this.init();
try {
// Find which file contains the event
const filesToCheck = this._getFilesInDateRange(null, new Date());
for (const filename of filesToCheck) {
const filepath = path.join(this.dataDir, filename);
let events = await this._readEventsFromFile(filename);
const eventIndex = events.findIndex(e => e.id === eventId);
if (eventIndex !== -1) {
// Found the event, update it
events[eventIndex] = {
...events[eventIndex],
...updates,
updatedAt: new Date().toISOString()
};
// Write back to file
await this._writeEventsToFile(filename, events);
return true;
}
}
return false; // Event not found
} catch (error) {
throw new Error(`Failed to update event: error.message`);
}
}
async deleteEvents(filter = {}) {
if (!this.initialized) await this.init();
try {
let deletedCount = 0;
const {
startDate, endDate, eventTypes, agentIds
} = {
startDate: null, endDate: null, eventTypes: [], agentIds: []
, ...filter };
const filesToCheck = this._getFilesInDateRange(startDate, endDate);
for (const filename of filesToCheck) {
const filepath = path.join(this.dataDir, filename);
let events = await this._readEventsFromFile(filename);
const initialLength = events.length;
// Apply filters in reverse (so we can safely splice)
events = events.filter(event => {
let shouldDelete = true;
if (startDate && new Date(event.timestamp) < new Date(startDate)) {
shouldDelete = false;
}
if (endDate && new Date(event.timestamp) > new Date(endDate)) {
shouldDelete = false;
}
if (eventTypes.length > 0 && !eventTypes.includes(event.type)) {
shouldDelete = false;
}
if (agentIds.length > 0 && !agentIds.includes(event.agentId)) {
shouldDelete = false;
}
return !shouldDelete;
});
const deletedFromFile = initialLength - events.length;
deletedCount += deletedFromFile;
// Write back the filtered events
await this._writeEventsToFile(filename, events);
}
return deletedCount;
} catch (error) {
throw new Error(`Failed to delete events: error.message`);
}
}
async cleanupOld() {
if (!this.initialized) await this.init();
try {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - this.maxAgeDays);
// Read all files in the data directory
const files = await fs.readdir(this.dataDir);
const jsonFiles = files.filter(file =>
file.startsWith(this.filePrefix) && file.endsWith('.json')
);
let deletedCount = 0;
for (const filename of jsonFiles) {
// Extract date from filename
const dateStr = filename.substring(this.filePrefix.length, filename.length - 5); // Remove .json
const [year, month, day] = dateStr.split('-').map(Number);
const fileDate = new Date(year, month - 1, day);
// If file is older than cutoff, delete it
if (fileDate < cutoffDate) {
await fs.unlink(path.join(this.dataDir, filename));
deletedCount++;
}
}
return deletedCount;
} catch (error) {
throw new Error(`Failed to cleanup old files: error.message`);
}
}
async getStats() {
if (!this.initialized) await this.init();
try {
const files = await fs.readdir(this.dataDir);
const jsonFiles = files.filter(file =>
file.startsWith(this.filePrefix) && file.endsWith('.json')
);
let totalEvents = 0;
let totalSize = 0;
for (const filename of jsonFiles) {
const filepath = path.join(this.dataDir, filename);
const stats = await fs.stat(filepath);
totalSize += stats.size;
// Count events in file (efficiently for large files would require streaming)
try {
const data = await fs.readFile(filepath, 'utf8');
const events = JSON.parse(data);
totalEvents += Array.isArray(events) ? events.length : 0;
} catch (e) {
// If we can't parse JSON, skip counting events for this file
}
}
return {
backend: 'json-file',
dataDir: this.dataDir,
fileCount: jsonFiles.length,
totalEvents: totalEvents,
totalSizeBytes: totalSize,
maxAgeDays: this.maxAgeDays,
oldestFile: null, // Would need to scan to find actual oldest
newestFile: null // Would need to scan to find actual newest
};
} catch (error) {
throw new Error(`Failed to get stats: error.message`);
}
}
async close() {
// JSON file storage doesn't keep open connections
this.initialized = false;
}
/**
* Generate a simple unique ID
* @returns {string} - Unique ID
*/
_generateId() {
return Math.random().toString(36).substr(2, 9) +
Date.now().toString(36) +
Math.random().toString(36).substr(2, 9);
}
}
module.exports = { JsonFileStorage };