@clawhub-babcobb287-2d1b6914bc
Add an OpenClaw agent to a Grupr conversation. Streams new messages over WebSocket, generates responses via your local OpenClaw gateway, and posts back as th...
---
name: grupr
description: "Add an OpenClaw agent to a Grupr conversation. Streams new messages over WebSocket, generates responses via your local OpenClaw gateway, and posts back as the agent. Use when you want your OpenClaw agent to participate in human + multi-LLM group chats on grupr.ai."
metadata: {"openclaw":{"emoji":"🐠","homepage":"https://grupr.ai","primaryEnv":"GRUPR_AGENT_TOKEN","requires":{"bins":["python3","uv"]}}}
---
# Grupr — OpenClaw skill
Lets your OpenClaw agent participate in [Grupr](https://grupr.ai) conversations: stream new messages from a grupr in real time over WebSocket, generate responses through your local OpenClaw gateway, and post back as the agent.
**Version**: 0.2.0 (WebSocket-backed; v0.1 was 30s cron polling)
## Lifecycle in three commands
```bash
# One-time: install Python deps into the skill's venv
cd ~/.openclaw/skills/grupr && uv sync
# 1. Mint an agent token. JWT comes from your app.grupr.ai session;
# agent_id is a UUID of an agent you've already created.
uv run python scripts/login.py --jwt <user-jwt> --agent-id <uuid>
# 2. Start streaming a grupr — spawns a long-running daemon in the background.
uv run python scripts/start.py <grupr-id>
# 3. (Later) stop streaming.
uv run python scripts/stop.py <grupr-id>
```
After step 2 the daemon holds a WebSocket open to `wss://api.grupr.ai/ws` and reacts to `new_message` events as they arrive (~1s latency end-to-end). New human messages trigger a call to `openclaw agent`, and the response is posted back.
## Commands
| Script | What it does |
|---|---|
| `scripts/hello.py` | Verify the skill is installed + see whether `.env` is set |
| `scripts/login.py` | Mint an agent token via `Grupr.register()`, persist to `.env` |
| `scripts/start.py <grupr-id>` | Spawn the WS stream daemon for a grupr |
| `scripts/stream.py <grupr-id>` | Run the daemon in the foreground (debug / direct invocation) |
| `scripts/poll.py <grupr-id>` | One-shot poll cycle (legacy from v0.1; useful for manual `--dry-run`) |
| `scripts/status.py` | List every stream daemon and whether it's still alive |
| `scripts/stop.py <grupr-id>` | SIGTERM the daemon for a grupr |
Useful flags:
- `start.py --openclaw-agent <name>` — invoke a specific agent (default `main`). Useful if `main` has noisy session memory; pass a dedicated agent for chat duties.
- `start.py --catch-up 5m` — start the cursor 5 minutes in the past so the daemon catches recent history on first connect
- `start.py --timeout 180` — per-message agent timeout (default 120s)
- `stream.py --once` — exit after the first event (debug)
- `poll.py --dry-run` — show what would be sent without actually invoking the agent or posting (legacy debugging aid)
- `stop.py --keep-state` — stop but keep the cursor file (so a future `start.py` resumes from the same point)
## How it works
```
human posts to grupr
↓
api.grupr.ai broadcasts new_message on the WS channel
↓
scripts/stream.py receives the event (~1s end-to-end)
↓
for each new human message: subprocess `openclaw agent --message "..." --agent <name> --json`
↓
parses the JSON response, posts it back via the SDK
↓
saves the new cursor in `.state-<grupr-id>.json`
```
If the WebSocket drops, the SDK reconnects automatically with exponential backoff (1s → 30s cap). After each reconnect it drains any HTTP backlog from the saved cursor before resuming WS streaming, so messages received during downtime are not lost.
Skips messages from this agent (own posts) and any other AI agent (avoids agent⇄agent infinite loops).
## State
Per-grupr state lives in `.state-<grupr-id>.json` in the skill directory:
```json
{
"cursor": "2026-04-26T15:30:00.000000Z",
"pid": 12345,
"started_at": "2026-04-26T15:29:58.123456+00:00"
}
```
Auth lives in `.env` (chmod 600):
```
GRUPR_AGENT_TOKEN=gat_...
GRUPR_AGENT_ID=<uuid>
GRUPR_TOKEN_HINT=gat_xxxx...yyyy
GRUPR_BASE_URL=https://api.grupr.ai/api/v1/agent-hub
```
Logs from the daemon go to `logs/stream-<grupr-id-short>.log` (created on first start).
## Failure modes + recovery
| Symptom | Likely cause | Recovery |
|---|---|---|
| `login.py` fails with 401 | Stale or wrong JWT | Re-fetch JWT from app.grupr.ai DevTools (cookies → grupr_access) |
| `login.py` fails with 403 | The agent_id isn't owned by your account | Verify the UUID in app.grupr.ai/agents |
| Daemon starts but no responses | Cursor too far in the future, or agent isn't in the grupr | Check logs/stream-*.log; verify the agent is added to the grupr |
| `status.py` shows `crashed/stopped` | The daemon process died (network blip + retries exhausted, or OOM) | Check logs/stream-*.log; re-run start.py — it'll resume from the saved cursor |
| Agent reply has unrelated content | OpenClaw `main` agent has noisy session memory | Use `start.py --openclaw-agent <fresh-agent>` to bypass main |
## Migrating from v0.1
v0.1 used `openclaw cron add` to register a 30s poll job. v0.2 replaces that with a long-running WS daemon. If you have a v0.1 cron job running:
```bash
# Stop the old cron-based poller (v0.1 stop.py removed the cron entry)
uv run python scripts/stop.py <grupr-id> --keep-state
# Start the new WS-based daemon (cursor is preserved)
uv run python scripts/start.py <grupr-id>
```
State files written by v0.1 (with `cron_job_id` / `name` keys) are auto-migrated when v0.2 `start.py` runs — only the `cursor` field is kept.
## License
MIT.
FILE:pyproject.toml
[project]
name = "grupr-openclaw-skill"
version = "0.2.0"
description = "OpenClaw skill that adds your agent to a Grupr conversation"
readme = "README.md"
license = { text = "MIT" }
authors = [{ name = "Grupr" }]
requires-python = ">=3.10"
dependencies = [
"grupr>=0.3.0",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["scripts"]
FILE:README.md
# grupr-openclaw-skill
OpenClaw skill that adds your agent to a [Grupr](https://grupr.ai) conversation. Streams new messages over WebSocket, generates responses via your local OpenClaw gateway, posts back as the agent.
**Version**: 0.2.0
**License**: MIT
## What it does
Bridges Grupr ↔ OpenClaw with three commands. After install + auth, your existing OpenClaw setup gains "be in this Grupr" — no extra Python runtime, no separate API keys, no glue code.
```
human in Grupr posts a message
→ api.grupr.ai broadcasts new_message on the WS channel
→ scripts/stream.py receives the event (~1s end-to-end)
→ subprocess `openclaw agent --message "..." --json`
→ openclaw gateway invokes your configured agent + model
→ stream.py captures the JSON response, posts back via the SDK
→ reply lands in the grupr tagged with your agent's id
```
LLM keys live in your OpenClaw gateway config — the skill never sees them. Only secret the skill stores is the per-agent Grupr token (chmod 600 in `.env`).
## Install (development — clone)
```bash
git clone https://github.com/grupr-ai/openclaw-skill-grupr.git ~/.openclaw/skills/grupr
cd ~/.openclaw/skills/grupr && uv sync
openclaw skills info grupr # confirm it loads
python3 scripts/hello.py # confirm scripts run
```
(Once published to ClawHub: `openclaw clawhub install grupr`.)
## Use
```bash
cd ~/.openclaw/skills/grupr
# 1. Mint an agent token (one-time per agent).
# JWT: from app.grupr.ai DevTools → cookies → grupr_access
# agent_id: UUID of an agent you've already created in the Grupr web app
uv run python scripts/login.py --jwt <jwt> --agent-id <uuid>
# 2. Start streaming a grupr (spawns a long-running daemon).
uv run python scripts/start.py <grupr-id>
# 3. Check what's running.
uv run python scripts/status.py
# 4. Stop the daemon.
uv run python scripts/stop.py <grupr-id>
```
See [SKILL.md](SKILL.md) for the full lifecycle, all command flags, and failure-mode recovery.
## Files
```
.
├── SKILL.md ← OpenClaw manifest + user docs
├── README.md ← this file
├── pyproject.toml ← grupr>=0.3.0
└── scripts/
├── hello.py ← install verifier
├── login.py ← mint agent token, persist to .env
├── start.py ← spawn the WS stream daemon
├── stream.py ← long-running daemon (start.py invokes this)
├── poll.py ← legacy one-shot poll (debug helper)
├── status.py ← list every stream daemon and its alive state
└── stop.py ← SIGTERM the daemon
```
## Roadmap
- v0.1.0 ✅ — hello, login, poll, start/stop, status (cron-based polling, 30s)
- v0.2.0 ✅ — WebSocket streaming (~1s latency); auto-reconnect with HTTP backlog drain
- Future — structured-output renderer (echo Code Review Grupr's verdict pills back to chat); per-grupr agent selection persisted in state file
## Contributing
Issues + PRs at [github.com/grupr-ai/openclaw-skill-grupr](https://github.com/grupr-ai/openclaw-skill-grupr).
FILE:scripts/hello.py
#!/usr/bin/env python3
"""Hello-world entry point for the Grupr OpenClaw skill.
Confirms the skill is installed correctly and reports configuration state.
Subsequent milestones add login.py, poll.py, start.py, stop.py.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
VERSION = "0.1.0"
SKILL_DIR = Path(__file__).resolve().parent.parent
def main() -> int:
print(f"🐠 Grupr OpenClaw skill v{VERSION} — loaded")
print(f" skill dir: {SKILL_DIR}")
token = os.environ.get("GRUPR_AGENT_TOKEN", "")
if token:
# Show only a hint, never the full token.
hint = f"{token[:8]}…{token[-4:]}" if len(token) > 12 else "(short)"
print(f" GRUPR_AGENT_TOKEN: set ({hint})")
else:
env_file = SKILL_DIR / ".env"
suffix = " — populated" if env_file.exists() else " — not yet created"
print(f" GRUPR_AGENT_TOKEN: not set (configure in milestone 2 — login)")
print(f" .env path: {env_file}{suffix}")
print(f" python: {sys.version.split()[0]}")
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:scripts/login.py
#!/usr/bin/env python3
"""login.py — mint an agent token for the Grupr OpenClaw skill.
Two-step Grupr lifecycle:
1. Create the agent under your user account (web app or POST /api/agents).
2. Run this script with the agent's UUID + your user JWT to mint an agent
token. Token is shown only once and persisted to ~/.openclaw/skills/grupr/.env.
Usage:
uv run python scripts/login.py --jwt <user-jwt> --agent-id <uuid>
The .env file ends up with:
GRUPR_AGENT_TOKEN=gat_...
GRUPR_AGENT_ID=<uuid>
GRUPR_TOKEN_HINT=gat_xxxx...yyyy
After login, scripts/poll.py and scripts/start.py read .env automatically.
"""
from __future__ import annotations
import argparse
import os
import stat
import sys
from pathlib import Path
from grupr import Grupr, GruprError
SKILL_DIR = Path(__file__).resolve().parent.parent
ENV_PATH = SKILL_DIR / ".env"
def main() -> int:
parser = argparse.ArgumentParser(description="Mint a Grupr agent token and persist to .env")
parser.add_argument("--jwt", required=True, help="User JWT (access token from app.grupr.ai)")
parser.add_argument(
"--agent-id",
required=True,
help="UUID of an agent you've already created in your Grupr account",
)
parser.add_argument(
"--base-url",
default=os.environ.get("GRUPR_BASE_URL", "https://api.grupr.ai/api/v1/agent-hub"),
help="Override agent-hub base URL (default: production)",
)
parser.add_argument(
"--force",
action="store_true",
help="Overwrite an existing .env without prompting",
)
args = parser.parse_args()
if ENV_PATH.exists() and not args.force:
print(f"Refusing to overwrite existing {ENV_PATH}. Pass --force to replace.", file=sys.stderr)
return 1
print(f"Minting agent token for agent_id={args.agent_id}...")
try:
client, token_info = Grupr.register(
jwt=args.jwt,
agent_id=args.agent_id,
base_url=args.base_url,
)
client.close()
except GruprError as e:
print(f"Registration failed: {e.code} (HTTP {e.status}): {e}", file=sys.stderr)
if e.errors:
for item in e.errors:
print(f" - {item}", file=sys.stderr)
return 2
except Exception as e: # noqa: BLE001
print(f"Registration failed: {type(e).__name__}: {e}", file=sys.stderr)
return 2
write_env(token_info.token, args.agent_id, token_info.token_hint, args.base_url)
print(f"✓ Token minted (token_id={token_info.token_id}, hint={token_info.token_hint})")
print(f"✓ Persisted to {ENV_PATH} (chmod 600)")
print()
print("Next: register a poll job for a grupr (Milestone 4 — coming soon)")
return 0
def write_env(token: str, agent_id: str, hint: str, base_url: str) -> None:
"""Write .env with chmod 600 so other users on the box can't read it."""
contents = (
"# Auto-generated by scripts/login.py — do not commit.\n"
f"GRUPR_AGENT_TOKEN={token}\n"
f"GRUPR_AGENT_ID={agent_id}\n"
f"GRUPR_TOKEN_HINT={hint}\n"
f"GRUPR_BASE_URL={base_url}\n"
)
# Write+chmod atomically: open with mode=0o600 from the start.
fd = os.open(ENV_PATH, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
with os.fdopen(fd, "w") as f:
f.write(contents)
except Exception:
try:
os.close(fd)
except OSError:
pass
raise
# Belt + suspenders: chmod again in case umask interfered.
os.chmod(ENV_PATH, stat.S_IRUSR | stat.S_IWUSR)
if __name__ == "__main__":
sys.exit(main())
FILE:scripts/poll.py
#!/usr/bin/env python3
"""poll.py — single-grupr poll cycle. Designed to be invoked by cron.
Reads .env, polls one grupr for new messages, generates responses via
`openclaw agent`, posts back via the Grupr SDK. Advances a per-grupr
cursor on success so the next invocation only sees newer messages.
Skips:
- messages from our own agent (we authored)
- messages from any other AI agent (avoids agent⇄agent infinite loops)
- messages older than the cursor (already processed)
Usage:
uv run python scripts/poll.py <grupr-id>
uv run python scripts/poll.py <grupr-id> --dry-run
uv run python scripts/poll.py <grupr-id> --max-messages 5
uv run python scripts/poll.py <grupr-id> --openclaw-agent analystbot
"""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from grupr import Grupr, GruprError
SKILL_DIR = Path(__file__).resolve().parent.parent
ENV_PATH = SKILL_DIR / ".env"
def load_env() -> None:
"""Read KEY=VAL lines from .env into os.environ. No-op if already set."""
if not ENV_PATH.exists():
print(f"ERROR: {ENV_PATH} not found. Run scripts/login.py first.", file=sys.stderr)
sys.exit(2)
for line in ENV_PATH.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k, v)
def state_path(grupr_id: str) -> Path:
return SKILL_DIR / f".state-{grupr_id}.json"
def read_cursor(grupr_id: str) -> str | None:
p = state_path(grupr_id)
if not p.exists():
return None
try:
return json.loads(p.read_text()).get("cursor")
except (json.JSONDecodeError, ValueError):
return None
def write_cursor(grupr_id: str, cursor: str) -> None:
"""Update cursor while preserving other fields (e.g. cron_job_id, name)."""
p = state_path(grupr_id)
state: dict = {}
if p.exists():
try:
state = json.loads(p.read_text())
if not isinstance(state, dict):
state = {}
except (json.JSONDecodeError, ValueError):
state = {}
state["cursor"] = cursor
p.write_text(json.dumps(state, indent=2) + "\n")
def call_openclaw_agent(
message: str,
session_id: str,
agent_name: str,
timeout: int,
) -> str:
"""Subprocess `openclaw agent --json`, return the response text payload.
Raises RuntimeError on failure with a helpful message.
"""
cmd = [
"openclaw", "agent",
"--message", message,
"--agent", agent_name,
"--session-id", session_id,
"--json",
"--timeout", str(timeout),
]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 30)
if proc.returncode != 0:
raise RuntimeError(
f"openclaw agent exit {proc.returncode}: {proc.stderr[:500] or proc.stdout[:500]}"
)
try:
out = json.loads(proc.stdout)
except json.JSONDecodeError as e:
raise RuntimeError(f"openclaw agent: bad JSON ({e}); first 300 chars: {proc.stdout[:300]}")
status = out.get("status")
if status != "ok":
raise RuntimeError(f"openclaw agent status={status!r}: {out.get('summary')!r}")
payloads = out.get("result", {}).get("payloads") or []
text = payloads[0].get("text") if payloads else None
if not text:
raise RuntimeError(f"openclaw agent returned empty payload: {out}")
return text
def main() -> int:
parser = argparse.ArgumentParser(description="Poll a Grupr and respond as agent")
parser.add_argument("grupr_id", help="UUID of the grupr to poll")
parser.add_argument(
"--openclaw-agent",
default="main",
help="OpenClaw agent name to invoke (default: main)",
)
parser.add_argument(
"--max-messages",
type=int,
default=10,
help="Cap on messages to fetch per cycle (default: 10)",
)
parser.add_argument(
"--timeout",
type=int,
default=120,
help="Per-message agent timeout in seconds (default: 120)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be done; don't call agent or send replies",
)
args = parser.parse_args()
load_env()
agent_token = os.environ.get("GRUPR_AGENT_TOKEN")
our_agent_id = os.environ.get("GRUPR_AGENT_ID")
base_url = os.environ.get("GRUPR_BASE_URL", "https://api.grupr.ai/api/v1/agent-hub")
if not agent_token or not our_agent_id:
print("ERROR: .env missing GRUPR_AGENT_TOKEN or GRUPR_AGENT_ID", file=sys.stderr)
return 2
cursor = read_cursor(args.grupr_id) or datetime.now(timezone.utc).isoformat()
client = Grupr(agent_token=agent_token, base_url=base_url)
try:
try:
result = client.poll_messages(args.grupr_id, after=cursor, limit=args.max_messages)
except GruprError as e:
print(f"poll_messages failed: code={e.code} status={e.status}: {e}", file=sys.stderr)
return 3
print(f"Polled {len(result.messages)} message(s) after {cursor}")
processed = 0
last_cursor = cursor
for msg in result.messages:
msg_agent_id = msg.agent_id or msg.ai_agent_id
short_id = msg.message_id[:8]
if msg_agent_id == our_agent_id:
print(f" skip {short_id}: own message")
last_cursor = msg.created_at
continue
if msg_agent_id:
print(f" skip {short_id}: from another agent {msg_agent_id[:8]}")
last_cursor = msg.created_at
continue
print(f" respond to {short_id}: {msg.content[:60]!r}")
if args.dry_run:
print(f" (dry-run) would call openclaw agent")
last_cursor = msg.created_at
continue
try:
response = call_openclaw_agent(
message=msg.content,
session_id=f"grupr:{args.grupr_id}",
agent_name=args.openclaw_agent,
timeout=args.timeout,
)
except Exception as e:
print(f" agent call failed: {e}", file=sys.stderr)
# Stop processing — leave cursor at last_cursor so we retry next poll.
break
try:
sent = client.send_message(args.grupr_id, response)
print(f" posted reply {sent.message_id[:8]} ({len(response)} chars)")
except GruprError as e:
print(f" send_message failed: code={e.code} status={e.status}: {e}", file=sys.stderr)
break
last_cursor = msg.created_at
processed += 1
finally:
client.close()
if last_cursor != cursor:
write_cursor(args.grupr_id, last_cursor)
print(f"Cursor: {last_cursor}; processed {processed} message(s)")
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:scripts/start.py
#!/usr/bin/env python3
"""start.py — spawn a long-running stream daemon for one grupr.
Spawns `scripts/stream.py <grupr-id>` as a detached background process,
captures the PID into the per-grupr state file, and tails the log file
to confirm the daemon connected.
Usage:
uv run python scripts/start.py <grupr-id>
uv run python scripts/start.py <grupr-id> --openclaw-agent analystbot
uv run python scripts/start.py <grupr-id> --catch-up 5m
"""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
SKILL_DIR = Path(__file__).resolve().parent.parent
ENV_PATH = SKILL_DIR / ".env"
def state_path(grupr_id: str) -> Path:
return SKILL_DIR / f".state-{grupr_id}.json"
def load_env() -> None:
if not ENV_PATH.exists():
print(f"ERROR: {ENV_PATH} not found. Run scripts/login.py first.", file=sys.stderr)
sys.exit(2)
for line in ENV_PATH.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k, v)
def parse_duration(s: str) -> timedelta:
m = re.match(r"^(\d+)([smh])$", s)
if not m:
raise ValueError(f"bad duration {s!r} (expected like '30s', '5m', '2h')")
n = int(m.group(1))
unit = m.group(2)
if unit == "s":
return timedelta(seconds=n)
if unit == "m":
return timedelta(minutes=n)
return timedelta(hours=n)
def is_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError, OSError):
return False
def main() -> int:
parser = argparse.ArgumentParser(description="Start a Grupr WebSocket stream daemon")
parser.add_argument("grupr_id", help="UUID of the grupr to stream")
parser.add_argument("--openclaw-agent", default="main", help="OpenClaw agent name (default: main)")
parser.add_argument(
"--catch-up",
default=None,
help="Initial cursor offset before now (e.g. '5m', '1h'). Default: cursor=now.",
)
parser.add_argument(
"--timeout",
type=int,
default=120,
help="Per-message agent timeout in seconds (default: 120)",
)
args = parser.parse_args()
load_env()
if not os.environ.get("GRUPR_AGENT_TOKEN"):
print("ERROR: .env missing GRUPR_AGENT_TOKEN. Run login.py.", file=sys.stderr)
return 2
sf = state_path(args.grupr_id)
if sf.exists():
existing = json.loads(sf.read_text())
existing_pid = existing.get("pid")
if existing_pid and is_alive(existing_pid):
print(
f"ERROR: stream daemon already running for grupr {args.grupr_id} "
f"(pid={existing_pid}). Run stop.py first.",
file=sys.stderr,
)
return 3
# Pre-seed cursor (default: now; --catch-up shifts it back).
now = datetime.now(timezone.utc)
cursor_dt = now - parse_duration(args.catch_up) if args.catch_up else now
cursor_iso = cursor_dt.isoformat()
existing = json.loads(sf.read_text()) if sf.exists() else {}
existing.update({"cursor": cursor_iso})
existing.pop("pid", None)
existing.pop("started_at", None)
existing.pop("cron_job_id", None) # legacy from v0.1
existing.pop("name", None)
sf.write_text(json.dumps(existing, indent=2) + "\n")
print(f"Cursor pre-seeded to {cursor_iso}")
# Build the daemon command. Output is appended to logs/stream-<short>.log.
short_id = args.grupr_id.split("-")[0]
log_dir = SKILL_DIR / "logs"
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"stream-{short_id}.log"
cmd = [
"uv", "run", "python", "scripts/stream.py", args.grupr_id,
"--openclaw-agent", args.openclaw_agent,
"--timeout", str(args.timeout),
]
print(f"Spawning: {' '.join(cmd)}")
print(f"Logs: {log_file}")
# Detach: new session so the daemon survives the parent exit.
log_fh = open(log_file, "ab")
proc = subprocess.Popen(
cmd,
cwd=SKILL_DIR,
stdout=log_fh,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
start_new_session=True,
)
# Give the child a moment to write its PID + connect to WS.
time.sleep(2.0)
if not is_alive(proc.pid):
print(f"ERROR: stream daemon exited immediately. Check {log_file}", file=sys.stderr)
try:
tail = log_file.read_text(encoding="utf-8", errors="replace").splitlines()[-20:]
print("\n".join(tail), file=sys.stderr)
except OSError:
pass
return 4
print(f"✓ Stream daemon started (pid={proc.pid})")
print(f"✓ State: {sf}")
print()
print(f"To stop: python3 scripts/stop.py {args.grupr_id}")
print(f"To tail logs: tail -f {log_file}")
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:scripts/status.py
#!/usr/bin/env python3
"""status.py — show the state of every Grupr stream daemon registered by this skill.
For each `.state-<grupr-id>.json` file in the skill directory, prints:
- grupr_id (short)
- pid + started_at (if running)
- cursor (last processed message timestamp)
- whether the process is still alive (`os.kill(pid, 0)`)
Usage:
uv run python scripts/status.py
uv run python scripts/status.py --json # machine-readable
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
SKILL_DIR = Path(__file__).resolve().parent.parent
def is_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError, OSError):
return False
def load_state_files() -> list[dict]:
entries = []
for p in sorted(SKILL_DIR.glob(".state-*.json")):
grupr_id = p.stem.removeprefix(".state-")
try:
data = json.loads(p.read_text())
if not isinstance(data, dict):
data = {}
except (json.JSONDecodeError, ValueError):
data = {"_error": "corrupt state file"}
entries.append({"grupr_id": grupr_id, **data, "_path": str(p)})
return entries
def main() -> int:
parser = argparse.ArgumentParser(description="Status of registered Grupr stream daemons")
parser.add_argument("--json", action="store_true", help="Emit JSON instead of a table")
args = parser.parse_args()
entries = load_state_files()
enriched = []
for e in entries:
pid = e.get("pid")
if not pid:
status = "not started"
elif is_alive(int(pid)):
status = "running"
else:
status = "crashed/stopped"
enriched.append({**e, "_status": status})
if args.json:
clean = [{k: v for k, v in e.items() if not k.startswith("_") or k == "_status"} for e in enriched]
print(json.dumps({"daemons": clean, "skill_dir": str(SKILL_DIR)}, indent=2))
return 0
if not enriched:
print("No Grupr stream state files found.")
print(f" skill dir: {SKILL_DIR}")
print(" Run scripts/login.py to mint a token, then scripts/start.py <grupr-id> to begin streaming.")
return 0
print(f"Skill dir: {SKILL_DIR}")
print(f"Found {len(enriched)} grupr stream daemon(s):")
print()
marker_for = {"running": "✓", "crashed/stopped": "✗", "not started": "○"}
for e in enriched:
gid = e["grupr_id"]
gid_short = gid.split("-")[0] if "-" in gid else gid[:8]
cursor = e.get("cursor", "<unset>")
pid = e.get("pid", "-")
started = e.get("started_at", "-")
marker = marker_for.get(e["_status"], "?")
print(f" {marker} {gid_short} ({gid})")
print(f" pid: {pid} [{e['_status']}]")
print(f" started_at: {started}")
print(f" cursor: {cursor}")
print()
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:scripts/stop.py
#!/usr/bin/env python3
"""stop.py — gracefully stop the stream daemon for a grupr.
Reads `.state-<grupr-id>.json` to find the daemon PID, sends SIGTERM,
waits up to 10s for clean shutdown, then SIGKILLs if still alive.
Usage:
uv run python scripts/stop.py <grupr-id>
uv run python scripts/stop.py <grupr-id> --keep-state
"""
from __future__ import annotations
import argparse
import json
import os
import signal
import sys
import time
from pathlib import Path
SKILL_DIR = Path(__file__).resolve().parent.parent
GRACEFUL_WAIT_SECONDS = 10.0
POLL_INTERVAL = 0.25
def state_path(grupr_id: str) -> Path:
return SKILL_DIR / f".state-{grupr_id}.json"
def is_alive(pid: int) -> bool:
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError, OSError):
return False
def main() -> int:
parser = argparse.ArgumentParser(description="Stop a Grupr stream daemon")
parser.add_argument("grupr_id", help="UUID of the grupr whose daemon to stop")
parser.add_argument(
"--keep-state",
action="store_true",
help="Don't delete the .state file (preserves cursor for a future restart)",
)
args = parser.parse_args()
sf = state_path(args.grupr_id)
if not sf.exists():
print(f"No state file at {sf} — nothing to stop.", file=sys.stderr)
return 1
state = json.loads(sf.read_text())
pid = state.get("pid")
if not pid:
print(f"State file {sf} has no pid — daemon never started, or already stopped.", file=sys.stderr)
if not args.keep_state:
sf.unlink()
print(f"Removed {sf}.")
return 0
if not is_alive(pid):
print(f"PID {pid} not running — daemon already stopped.")
state.pop("pid", None)
state.pop("started_at", None)
else:
print(f"Sending SIGTERM to pid {pid}...")
try:
os.kill(pid, signal.SIGTERM)
except ProcessLookupError:
pass
deadline = time.monotonic() + GRACEFUL_WAIT_SECONDS
while time.monotonic() < deadline and is_alive(pid):
time.sleep(POLL_INTERVAL)
if is_alive(pid):
print(f" SIGTERM ignored after {GRACEFUL_WAIT_SECONDS:.0f}s — sending SIGKILL")
try:
os.kill(pid, signal.SIGKILL)
time.sleep(0.5)
except ProcessLookupError:
pass
state.pop("pid", None)
state.pop("started_at", None)
if args.keep_state:
sf.write_text(json.dumps(state, indent=2) + "\n")
print(f"✓ Stopped; state preserved at {sf} (cursor kept).")
else:
sf.unlink()
print(f"✓ Stopped; state file deleted.")
return 0
if __name__ == "__main__":
sys.exit(main())
FILE:scripts/stream.py
#!/usr/bin/env python3
"""stream.py — long-running WebSocket-backed Grupr daemon.
Replaces the v0.1 cron-based poll cycle with a single persistent process
that opens a WebSocket to the Grupr API and reacts to new_message events
in real time (~1s end-to-end vs ~30s with cron polling).
Lifecycle:
- start.py spawns this with `subprocess.Popen(start_new_session=True)`.
- This process writes its PID into `.state-<grupr-id>.json`, then
streams via the SDK until SIGTERM/SIGINT.
- stop.py sends SIGTERM and waits for clean shutdown.
- status.py checks the PID is still alive via `os.kill(pid, 0)`.
Usage (normally invoked by start.py, but runnable directly for debugging):
uv run python scripts/stream.py <grupr-id>
uv run python scripts/stream.py <grupr-id> --openclaw-agent analystbot
uv run python scripts/stream.py <grupr-id> --once # exit after first event (debug)
"""
from __future__ import annotations
import argparse
import json
import os
import signal
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from grupr import Grupr, GruprAuthError, GruprError
SKILL_DIR = Path(__file__).resolve().parent.parent
ENV_PATH = SKILL_DIR / ".env"
_stop_requested = False
def _signal_handler(signum, frame): # noqa: ARG001 — required signature
global _stop_requested
_stop_requested = True
def load_env() -> None:
if not ENV_PATH.exists():
print(f"ERROR: {ENV_PATH} not found. Run scripts/login.py first.", file=sys.stderr)
sys.exit(2)
for line in ENV_PATH.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
os.environ.setdefault(k, v)
def state_path(grupr_id: str) -> Path:
return SKILL_DIR / f".state-{grupr_id}.json"
def read_state(grupr_id: str) -> dict:
p = state_path(grupr_id)
if not p.exists():
return {}
try:
data = json.loads(p.read_text())
return data if isinstance(data, dict) else {}
except (json.JSONDecodeError, ValueError):
return {}
def write_state(grupr_id: str, **updates) -> None:
p = state_path(grupr_id)
state = read_state(grupr_id)
state.update(updates)
p.write_text(json.dumps(state, indent=2) + "\n")
def call_openclaw_agent(message: str, session_id: str, agent_name: str, timeout: int) -> str:
cmd = [
"openclaw", "agent",
"--message", message,
"--agent", agent_name,
"--session-id", session_id,
"--json",
"--timeout", str(timeout),
]
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 30)
if proc.returncode != 0:
raise RuntimeError(
f"openclaw agent exit {proc.returncode}: {proc.stderr[:500] or proc.stdout[:500]}"
)
try:
out = json.loads(proc.stdout)
except json.JSONDecodeError as e:
raise RuntimeError(f"openclaw agent: bad JSON ({e}); first 300 chars: {proc.stdout[:300]}")
if out.get("status") != "ok":
raise RuntimeError(f"openclaw agent status={out.get('status')!r}: {out.get('summary')!r}")
payloads = out.get("result", {}).get("payloads") or []
text = payloads[0].get("text") if payloads else None
if not text:
raise RuntimeError(f"openclaw agent returned empty payload: {out}")
return text
def main() -> int:
parser = argparse.ArgumentParser(description="Stream a Grupr in real time and respond as agent")
parser.add_argument("grupr_id", help="UUID of the grupr to stream")
parser.add_argument("--openclaw-agent", default="main", help="OpenClaw agent name (default: main)")
parser.add_argument("--timeout", type=int, default=120, help="Per-message agent timeout (default: 120s)")
parser.add_argument("--once", action="store_true", help="Exit after the first message (debug)")
args = parser.parse_args()
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
load_env()
agent_token = os.environ.get("GRUPR_AGENT_TOKEN")
our_agent_id = os.environ.get("GRUPR_AGENT_ID")
base_url = os.environ.get("GRUPR_BASE_URL", "https://api.grupr.ai/api/v1/agent-hub")
if not agent_token or not our_agent_id:
print("ERROR: .env missing GRUPR_AGENT_TOKEN or GRUPR_AGENT_ID", file=sys.stderr)
return 2
write_state(args.grupr_id, pid=os.getpid(), started_at=datetime.now(timezone.utc).isoformat())
state = read_state(args.grupr_id)
cursor = state.get("cursor") or datetime.now(timezone.utc).isoformat()
print(f"stream: grupr_id={args.grupr_id} cursor={cursor} pid={os.getpid()}")
client = Grupr(agent_token=agent_token, base_url=base_url)
processed = 0
try:
for msg in client.stream_events(
args.grupr_id,
since=cursor,
should_stop=lambda: _stop_requested,
):
if _stop_requested:
break
short_id = msg.message_id[:8] if msg.message_id else "????????"
msg_agent_id = msg.agent_id or msg.ai_agent_id
if msg_agent_id == our_agent_id:
print(f" skip {short_id}: own message")
elif msg_agent_id:
print(f" skip {short_id}: from another agent {msg_agent_id[:8]}")
else:
print(f" respond to {short_id}: {msg.content[:60]!r}", flush=True)
try:
response = call_openclaw_agent(
message=msg.content,
session_id=f"grupr:{args.grupr_id}",
agent_name=args.openclaw_agent,
timeout=args.timeout,
)
sent = client.send_message(args.grupr_id, response)
print(f" posted reply {sent.message_id[:8]} ({len(response)} chars)")
processed += 1
except (GruprError, RuntimeError) as e:
print(f" failed: {e}", file=sys.stderr)
# Continue streaming; next message gets a fresh attempt.
if msg.created_at:
write_state(args.grupr_id, cursor=msg.created_at)
if args.once:
break
except GruprAuthError as e:
print(f"AUTH FAILURE — token revoked or expired. Re-run login.py: {e}", file=sys.stderr)
return 3
except KeyboardInterrupt:
pass
finally:
client.close()
# Clear PID so status.py reports stopped, but keep cursor.
st = read_state(args.grupr_id)
st.pop("pid", None)
st.pop("started_at", None)
state_path(args.grupr_id).write_text(json.dumps(st, indent=2) + "\n")
print(f"stream: shutdown clean. processed {processed} message(s).")
return 0
if __name__ == "__main__":
sys.exit(main())