@clawhub-baiyunrei2025-409ef7262b
Autonomous AI research skill for running automated neural network experiments. This skill should be used when the user wants to set up autonomous AI research...
---
name: autoresearch
description: |
Autonomous AI research skill for running automated neural network experiments. This skill should be used when the user wants to set up autonomous AI research experiments, run automated neural network training, conduct autonomous machine learning research, or let AI agents experiment with model architectures and hyperparameters. Based on Andrej Karpathy's autoresearch project, this skill enables AI agents to autonomously modify training code, run experiments, evaluate results, and iteratively improve models. Use when: (1) Setting up autonomous research experiments, (2) Running automated neural network training, (3) Conducting AI-driven research optimization, (4) Experimenting with model architectures and hyperparameters, (5) Implementing autonomous research loops, or (6) When the user mentions "autonomous research", "AI experiments", "automated training", "neural network optimization", or "autoresearch".
---
# Autoresearch Skill
This skill enables autonomous AI research experiments based on Andrej Karpathy's [autoresearch](https://github.com/karpathy/autoresearch) project. It allows AI agents to autonomously modify neural network training code, run experiments, evaluate results, and iteratively improve models.
## Core Concept
The idea: give an AI agent a small but real LLM training setup and let it experiment autonomously. The agent modifies the code, trains for 5 minutes, checks if the result improved, keeps or discards, and repeats. You can leave it running overnight and wake up to a log of experiments and (hopefully) a better model.
## Key Files
The project has three core files:
1. **`prepare.py`** — Fixed constants, one-time data prep (downloads training data, trains a BPE tokenizer), and runtime utilities (dataloader, evaluation). **Not modified**.
2. **`train.py`** — The single file the agent edits. Contains the full GPT model, optimizer (Muon + AdamW), and training loop. Everything is fair game: architecture, hyperparameters, optimizer, batch size, etc. **This file is edited and iterated on by the agent**.
3. **`program.md`** — Baseline instructions for the agent. **This file is edited and iterated on by the human**.
## Requirements
- Single NVIDIA GPU (tested on H100)
- Python 3.10+
- [uv](https://docs.astral.sh/uv/) package manager
## Quick Start Workflow
### Phase 1: Initial Setup
1. **Clone the repository** (if not already done):
```bash
git clone https://github.com/karpathy/autoresearch.git
cd autoresearch
```
2. **Install dependencies**:
```bash
uv sync
```
3. **Prepare data** (one-time setup):
```bash
uv run prepare.py
```
### Phase 2: Experiment Setup
1. **Agree on a run tag** (e.g., based on date like `mar20`)
2. **Create a new branch**:
```bash
git checkout -b autoresearch/<tag>
```
3. **Initialize results file**:
```bash
echo -e "commit\tval_bpb\tmemory_gb\tstatus\tdescription" > results.tsv
```
### Phase 3: Autonomous Experimentation Loop
The agent follows this loop indefinitely:
```
LOOP FOREVER:
1. Look at current git state
2. Modify train.py with experimental idea
3. git commit
4. Run experiment: uv run train.py > run.log 2>&1
5. Extract results: grep "^val_bpb:\|^peak_vram_mb:" run.log
6. If crash → analyze logs and fix or mark as crash
7. Record results in results.tsv
8. If improved → keep commit
9. If not improved → git reset
```
## Key Metrics
- **val_bpb** (validation bits per byte) — Lower is better, vocab-size-independent
- **Training time** — Fixed 5-minute budget per experiment
- **Peak VRAM** — Memory usage in GB
- **Status** — `keep`, `discard`, or `crash`
## Constraints
### What the agent CAN do:
- Modify `train.py` (architecture, optimizer, hyperparameters, training loop, etc.)
- Experiment with different model configurations
- Run training experiments autonomously
### What the agent CANNOT do:
- Modify `prepare.py` (read-only)
- Install new packages or add dependencies
- Modify the evaluation harness
## Quality Criteria
1. **Simplicity**: Simpler solutions are preferred over complex ones
2. **Performance**: Lower val_bpb is better
3. **Memory**: VRAM usage should be reasonable
4. **Stability**: Code must run without crashing
## Output Format
Each experiment produces a summary:
```
---
val_bpb: 0.997900
training_seconds: 300.1
total_seconds: 325.9
peak_vram_mb: 45060.2
mfu_percent: 39.80
total_tokens_M: 499.6
num_steps: 953
num_params_M: 50.3
depth: 8
```
## Results Logging
Results are logged to `results.tsv` (tab-separated):
```
commit val_bpb memory_gb status description
a1b2c3d 0.997900 44.0 keep baseline
b2c3d4e 0.993200 44.2 keep increase LR to 0.04
c3d4e5f 1.005000 44.0 discard switch to GeLU activation
d4e5f6g 0.000000 0.0 crash double model width (OOM)
```
## Autonomous Operation
**CRITICAL**: Once the experiment loop begins, the agent operates autonomously:
- Do NOT pause to ask the human if you should continue
- Do NOT ask "should I keep going?" or "is this a good stopping point?"
- Continue working indefinitely until manually stopped
- If out of ideas, think harder: read papers, re-analyze code, try radical changes
## Use Cases
1. **Overnight experiments**: Leave running while sleeping, wake up to results
2. **Architecture search**: Automatically explore model architectures
3. **Hyperparameter optimization**: Find optimal training parameters
4. **Research automation**: Reduce manual experimentation effort
## Troubleshooting
### Common Issues:
1. **GPU not available**: Check CUDA installation and GPU drivers
2. **uv not installed**: Install uv package manager
3. **Data not prepared**: Run `uv run prepare.py`
4. **Out of memory**: Reduce model size or batch size
### Error Handling:
- Crashes are logged as `crash` status
- Analyze logs with `tail -n 50 run.log`
- Fix simple issues and retry, skip fundamentally broken ideas
## Best Practices
1. **Start with baseline**: Always run unmodified code first
2. **Incremental changes**: Make small, focused modifications
3. **Document experiments**: Clear descriptions in results.tsv
4. **Monitor progress**: Regularly check results and trends
5. **Balance exploration/exploitation**: Mix radical ideas with incremental improvements
## Integration with Agent Teams
This skill can be combined with the `agent-teams-playbook` skill for:
- Multi-agent research coordination
- Parallel experimentation
- Specialized roles (architect, optimizer, evaluator)
- Distributed research workflows
## References
- Original repository: https://github.com/karpathy/autoresearch
- Nanochat implementation: https://github.com/karpathy/nanochat
- Project announcement: https://x.com/karpathy/status/2029701092347630069
- "Dummy's Guide": https://x.com/hooeem/status/2030720614752039185
FILE:prepare.py
"""
One-time data preparation for autoresearch experiments.
Downloads data shards and trains a BPE tokenizer.
Usage:
python prepare.py # full prep (download + tokenizer)
python prepare.py --num-shards 8 # download only 8 shards (for testing)
Data and tokenizer are stored in ~/.cache/autoresearch/.
"""
import os
import sys
import time
import math
import argparse
import pickle
from multiprocessing import Pool
import requests
import pyarrow.parquet as pq
import rustbpe
import tiktoken
import torch
# ---------------------------------------------------------------------------
# Constants (fixed, do not modify)
# ---------------------------------------------------------------------------
MAX_SEQ_LEN = 2048 # context length
TIME_BUDGET = 300 # training time budget in seconds (5 minutes)
EVAL_TOKENS = 40 * 524288 # number of tokens for val eval
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
CACHE_DIR = os.path.join(os.path.expanduser("~"), ".cache", "autoresearch")
DATA_DIR = os.path.join(CACHE_DIR, "data")
TOKENIZER_DIR = os.path.join(CACHE_DIR, "tokenizer")
BASE_URL = "https://huggingface.co/datasets/karpathy/climbmix-400b-shuffle/resolve/main"
MAX_SHARD = 6542 # the last datashard is shard_06542.parquet
VAL_SHARD = MAX_SHARD # pinned validation shard (shard_06542)
VAL_FILENAME = f"shard_{VAL_SHARD:05d}.parquet"
VOCAB_SIZE = 8192
# BPE split pattern (GPT-4 style, with \p{N}{1,2} instead of {1,3})
SPLIT_PATTERN = r"""'(?i:[sdmt]|ll|ve|re)|[^\r\n\p{L}\p{N}]?+\p{L}+|\p{N}{1,2}| ?[^\s\p{L}\p{N}]++[\r\n]*|\s*[\r\n]|\s+(?!\S)|\s+"""
SPECIAL_TOKENS = [f"<|reserved_{i}|>" for i in range(4)]
BOS_TOKEN = "<|reserved_0|>"
# ---------------------------------------------------------------------------
# Data download
# ---------------------------------------------------------------------------
def download_single_shard(index):
"""Download one parquet shard with retries. Returns True on success."""
filename = f"shard_{index:05d}.parquet"
filepath = os.path.join(DATA_DIR, filename)
if os.path.exists(filepath):
return True
url = f"{BASE_URL}/{filename}"
max_attempts = 5
for attempt in range(1, max_attempts + 1):
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
temp_path = filepath + ".tmp"
with open(temp_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
os.rename(temp_path, filepath)
print(f" Downloaded {filename}")
return True
except (requests.RequestException, IOError) as e:
print(f" Attempt {attempt}/{max_attempts} failed for {filename}: {e}")
for path in [filepath + ".tmp", filepath]:
if os.path.exists(path):
try:
os.remove(path)
except OSError:
pass
if attempt < max_attempts:
time.sleep(2 ** attempt)
return False
def download_data(num_shards, download_workers=8):
"""Download training shards + pinned validation shard."""
os.makedirs(DATA_DIR, exist_ok=True)
num_train = min(num_shards, MAX_SHARD)
ids = list(range(num_train))
if VAL_SHARD not in ids:
ids.append(VAL_SHARD)
# Count what's already downloaded
existing = sum(1 for i in ids if os.path.exists(os.path.join(DATA_DIR, f"shard_{i:05d}.parquet")))
if existing == len(ids):
print(f"Data: all {len(ids)} shards already downloaded at {DATA_DIR}")
return
needed = len(ids) - existing
print(f"Data: downloading {needed} shards ({existing} already exist)...")
workers = max(1, min(download_workers, needed))
with Pool(processes=workers) as pool:
results = pool.map(download_single_shard, ids)
ok = sum(1 for r in results if r)
print(f"Data: {ok}/{len(ids)} shards ready at {DATA_DIR}")
# ---------------------------------------------------------------------------
# Tokenizer training
# ---------------------------------------------------------------------------
def list_parquet_files():
"""Return sorted list of parquet file paths in the data directory."""
files = sorted(f for f in os.listdir(DATA_DIR) if f.endswith(".parquet") and not f.endswith(".tmp"))
return [os.path.join(DATA_DIR, f) for f in files]
def text_iterator(max_chars=1_000_000_000, doc_cap=10_000):
"""Yield documents from training split (all shards except pinned val shard)."""
parquet_paths = [p for p in list_parquet_files() if not p.endswith(VAL_FILENAME)]
nchars = 0
for filepath in parquet_paths:
pf = pq.ParquetFile(filepath)
for rg_idx in range(pf.num_row_groups):
rg = pf.read_row_group(rg_idx)
for text in rg.column("text").to_pylist():
doc = text[:doc_cap] if len(text) > doc_cap else text
nchars += len(doc)
yield doc
if nchars >= max_chars:
return
def train_tokenizer():
"""Train BPE tokenizer using rustbpe, save as tiktoken pickle."""
tokenizer_pkl = os.path.join(TOKENIZER_DIR, "tokenizer.pkl")
token_bytes_path = os.path.join(TOKENIZER_DIR, "token_bytes.pt")
if os.path.exists(tokenizer_pkl) and os.path.exists(token_bytes_path):
print(f"Tokenizer: already trained at {TOKENIZER_DIR}")
return
os.makedirs(TOKENIZER_DIR, exist_ok=True)
parquet_files = list_parquet_files()
if len(parquet_files) < 2:
print("Tokenizer: need at least 2 data shards (1 train + 1 val). Download more data first.")
sys.exit(1)
# --- Train with rustbpe ---
print("Tokenizer: training BPE tokenizer...")
t0 = time.time()
tokenizer = rustbpe.Tokenizer()
vocab_size_no_special = VOCAB_SIZE - len(SPECIAL_TOKENS)
tokenizer.train_from_iterator(text_iterator(), vocab_size_no_special, pattern=SPLIT_PATTERN)
# Build tiktoken encoding from trained merges
pattern = tokenizer.get_pattern()
mergeable_ranks = {bytes(k): v for k, v in tokenizer.get_mergeable_ranks()}
tokens_offset = len(mergeable_ranks)
special_tokens = {name: tokens_offset + i for i, name in enumerate(SPECIAL_TOKENS)}
enc = tiktoken.Encoding(
name="rustbpe",
pat_str=pattern,
mergeable_ranks=mergeable_ranks,
special_tokens=special_tokens,
)
# Save tokenizer
with open(tokenizer_pkl, "wb") as f:
pickle.dump(enc, f)
t1 = time.time()
print(f"Tokenizer: trained in {t1 - t0:.1f}s, saved to {tokenizer_pkl}")
# --- Build token_bytes lookup for BPB evaluation ---
print("Tokenizer: building token_bytes lookup...")
special_set = set(SPECIAL_TOKENS)
token_bytes_list = []
for token_id in range(enc.n_vocab):
token_str = enc.decode([token_id])
if token_str in special_set:
token_bytes_list.append(0)
else:
token_bytes_list.append(len(token_str.encode("utf-8")))
token_bytes_tensor = torch.tensor(token_bytes_list, dtype=torch.int32)
torch.save(token_bytes_tensor, token_bytes_path)
print(f"Tokenizer: saved token_bytes to {token_bytes_path}")
# Sanity check
test = "Hello world! Numbers: 123. Unicode: 你好"
encoded = enc.encode_ordinary(test)
decoded = enc.decode(encoded)
assert decoded == test, f"Tokenizer roundtrip failed: {test!r} -> {decoded!r}"
print(f"Tokenizer: sanity check passed (vocab_size={enc.n_vocab})")
# ---------------------------------------------------------------------------
# Runtime utilities (imported by train.py)
# ---------------------------------------------------------------------------
class Tokenizer:
"""Minimal tokenizer wrapper. Training is handled above."""
def __init__(self, enc):
self.enc = enc
self.bos_token_id = enc.encode_single_token(BOS_TOKEN)
@classmethod
def from_directory(cls, tokenizer_dir=TOKENIZER_DIR):
with open(os.path.join(tokenizer_dir, "tokenizer.pkl"), "rb") as f:
enc = pickle.load(f)
return cls(enc)
def get_vocab_size(self):
return self.enc.n_vocab
def get_bos_token_id(self):
return self.bos_token_id
def encode(self, text, prepend=None, num_threads=8):
if prepend is not None:
prepend_id = prepend if isinstance(prepend, int) else self.enc.encode_single_token(prepend)
if isinstance(text, str):
ids = self.enc.encode_ordinary(text)
if prepend is not None:
ids.insert(0, prepend_id)
elif isinstance(text, list):
ids = self.enc.encode_ordinary_batch(text, num_threads=num_threads)
if prepend is not None:
for row in ids:
row.insert(0, prepend_id)
else:
raise ValueError(f"Invalid input type: {type(text)}")
return ids
def decode(self, ids):
return self.enc.decode(ids)
def get_token_bytes(device="cpu"):
path = os.path.join(TOKENIZER_DIR, "token_bytes.pt")
with open(path, "rb") as f:
return torch.load(f, map_location=device)
def _document_batches(split, tokenizer_batch_size=128):
"""Infinite iterator over document batches from parquet files."""
parquet_paths = list_parquet_files()
assert len(parquet_paths) > 0, "No parquet files found. Run prepare.py first."
val_path = os.path.join(DATA_DIR, VAL_FILENAME)
if split == "train":
parquet_paths = [p for p in parquet_paths if p != val_path]
assert len(parquet_paths) > 0, "No training shards found."
else:
parquet_paths = [val_path]
epoch = 1
while True:
for filepath in parquet_paths:
pf = pq.ParquetFile(filepath)
for rg_idx in range(pf.num_row_groups):
rg = pf.read_row_group(rg_idx)
batch = rg.column('text').to_pylist()
for i in range(0, len(batch), tokenizer_batch_size):
yield batch[i:i+tokenizer_batch_size], epoch
epoch += 1
def make_dataloader(tokenizer, B, T, split, buffer_size=1000):
"""
BOS-aligned dataloader with best-fit packing.
Every row starts with BOS. Documents packed using best-fit to minimize cropping.
When no document fits remaining space, crops shortest doc to fill exactly.
100% utilization (no padding).
"""
assert split in ["train", "val"]
row_capacity = T + 1
batches = _document_batches(split)
bos_token = tokenizer.get_bos_token_id()
doc_buffer = []
epoch = 1
def refill_buffer():
nonlocal epoch
doc_batch, epoch = next(batches)
token_lists = tokenizer.encode(doc_batch, prepend=bos_token)
doc_buffer.extend(token_lists)
# Pre-allocate buffers: [inputs (B*T) | targets (B*T)]
row_buffer = torch.empty((B, row_capacity), dtype=torch.long)
cpu_buffer = torch.empty(2 * B * T, dtype=torch.long, pin_memory=True)
gpu_buffer = torch.empty(2 * B * T, dtype=torch.long, device="cuda")
cpu_inputs = cpu_buffer[:B * T].view(B, T)
cpu_targets = cpu_buffer[B * T:].view(B, T)
inputs = gpu_buffer[:B * T].view(B, T)
targets = gpu_buffer[B * T:].view(B, T)
while True:
for row_idx in range(B):
pos = 0
while pos < row_capacity:
while len(doc_buffer) < buffer_size:
refill_buffer()
remaining = row_capacity - pos
# Find largest doc that fits entirely
best_idx = -1
best_len = 0
for i, doc in enumerate(doc_buffer):
doc_len = len(doc)
if doc_len <= remaining and doc_len > best_len:
best_idx = i
best_len = doc_len
if best_idx >= 0:
doc = doc_buffer.pop(best_idx)
row_buffer[row_idx, pos:pos + len(doc)] = torch.tensor(doc, dtype=torch.long)
pos += len(doc)
else:
# No doc fits — crop shortest to fill remaining
shortest_idx = min(range(len(doc_buffer)), key=lambda i: len(doc_buffer[i]))
doc = doc_buffer.pop(shortest_idx)
row_buffer[row_idx, pos:pos + remaining] = torch.tensor(doc[:remaining], dtype=torch.long)
pos += remaining
cpu_inputs.copy_(row_buffer[:, :-1])
cpu_targets.copy_(row_buffer[:, 1:])
gpu_buffer.copy_(cpu_buffer, non_blocking=True)
yield inputs, targets, epoch
# ---------------------------------------------------------------------------
# Evaluation (DO NOT CHANGE — this is the fixed metric)
# ---------------------------------------------------------------------------
@torch.no_grad()
def evaluate_bpb(model, tokenizer, batch_size):
"""
Bits per byte (BPB): vocab size-independent evaluation metric.
Sums per-token cross-entropy (in nats), sums target byte lengths,
then converts nats/byte to bits/byte. Special tokens (byte length 0)
are excluded from both sums.
Uses fixed MAX_SEQ_LEN so results are comparable across configs.
"""
token_bytes = get_token_bytes(device="cuda")
val_loader = make_dataloader(tokenizer, batch_size, MAX_SEQ_LEN, "val")
steps = EVAL_TOKENS // (batch_size * MAX_SEQ_LEN)
total_nats = 0.0
total_bytes = 0
for _ in range(steps):
x, y, _ = next(val_loader)
loss_flat = model(x, y, reduction='none').view(-1)
y_flat = y.view(-1)
nbytes = token_bytes[y_flat]
mask = nbytes > 0
total_nats += (loss_flat * mask).sum().item()
total_bytes += nbytes.sum().item()
return total_nats / (math.log(2) * total_bytes)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Prepare data and tokenizer for autoresearch")
parser.add_argument("--num-shards", type=int, default=10, help="Number of training shards to download (-1 = all). Val shard is always pinned.")
parser.add_argument("--download-workers", type=int, default=8, help="Number of parallel download workers")
args = parser.parse_args()
num_shards = MAX_SHARD if args.num_shards == -1 else args.num_shards
print(f"Cache directory: {CACHE_DIR}")
print()
# Step 1: Download data
download_data(num_shards, download_workers=args.download_workers)
print()
# Step 2: Train tokenizer
train_tokenizer()
print()
print("Done! Ready to train.")
FILE:program.md
# autoresearch
This is an experiment to have the LLM do its own research.
## Setup
To set up a new experiment, work with the user to:
1. **Agree on a run tag**: propose a tag based on today's date (e.g. `mar5`). The branch `autoresearch/<tag>` must not already exist — this is a fresh run.
2. **Create the branch**: `git checkout -b autoresearch/<tag>` from current master.
3. **Read the in-scope files**: The repo is small. Read these files for full context:
- `README.md` — repository context.
- `prepare.py` — fixed constants, data prep, tokenizer, dataloader, evaluation. Do not modify.
- `train.py` — the file you modify. Model architecture, optimizer, training loop.
4. **Verify data exists**: Check that `~/.cache/autoresearch/` contains data shards and a tokenizer. If not, tell the human to run `uv run prepare.py`.
5. **Initialize results.tsv**: Create `results.tsv` with just the header row. The baseline will be recorded after the first run.
6. **Confirm and go**: Confirm setup looks good.
Once you get confirmation, kick off the experimentation.
## Experimentation
Each experiment runs on a single GPU. The training script runs for a **fixed time budget of 5 minutes** (wall clock training time, excluding startup/compilation). You launch it simply as: `uv run train.py`.
**What you CAN do:**
- Modify `train.py` — this is the only file you edit. Everything is fair game: model architecture, optimizer, hyperparameters, training loop, batch size, model size, etc.
**What you CANNOT do:**
- Modify `prepare.py`. It is read-only. It contains the fixed evaluation, data loading, tokenizer, and training constants (time budget, sequence length, etc).
- Install new packages or add dependencies. You can only use what's already in `pyproject.toml`.
- Modify the evaluation harness. The `evaluate_bpb` function in `prepare.py` is the ground truth metric.
**The goal is simple: get the lowest val_bpb.** Since the time budget is fixed, you don't need to worry about training time — it's always 5 minutes. Everything is fair game: change the architecture, the optimizer, the hyperparameters, the batch size, the model size. The only constraint is that the code runs without crashing and finishes within the time budget.
**VRAM** is a soft constraint. Some increase is acceptable for meaningful val_bpb gains, but it should not blow up dramatically.
**Simplicity criterion**: All else being equal, simpler is better. A small improvement that adds ugly complexity is not worth it. Conversely, removing something and getting equal or better results is a great outcome — that's a simplification win. When evaluating whether to keep a change, weigh the complexity cost against the improvement magnitude. A 0.001 val_bpb improvement that adds 20 lines of hacky code? Probably not worth it. A 0.001 val_bpb improvement from deleting code? Definitely keep. An improvement of ~0 but much simpler code? Keep.
**The first run**: Your very first run should always be to establish the baseline, so you will run the training script as is.
## Output format
Once the script finishes it prints a summary like this:
```
---
val_bpb: 0.997900
training_seconds: 300.1
total_seconds: 325.9
peak_vram_mb: 45060.2
mfu_percent: 39.80
total_tokens_M: 499.6
num_steps: 953
num_params_M: 50.3
depth: 8
```
Note that the script is configured to always stop after 5 minutes, so depending on the computing platform of this computer the numbers might look different. You can extract the key metric from the log file:
```
grep "^val_bpb:" run.log
```
## Logging results
When an experiment is done, log it to `results.tsv` (tab-separated, NOT comma-separated — commas break in descriptions).
The TSV has a header row and 5 columns:
```
commit val_bpb memory_gb status description
```
1. git commit hash (short, 7 chars)
2. val_bpb achieved (e.g. 1.234567) — use 0.000000 for crashes
3. peak memory in GB, round to .1f (e.g. 12.3 — divide peak_vram_mb by 1024) — use 0.0 for crashes
4. status: `keep`, `discard`, or `crash`
5. short text description of what this experiment tried
Example:
```
commit val_bpb memory_gb status description
a1b2c3d 0.997900 44.0 keep baseline
b2c3d4e 0.993200 44.2 keep increase LR to 0.04
c3d4e5f 1.005000 44.0 discard switch to GeLU activation
d4e5f6g 0.000000 0.0 crash double model width (OOM)
```
## The experiment loop
The experiment runs on a dedicated branch (e.g. `autoresearch/mar5` or `autoresearch/mar5-gpu0`).
LOOP FOREVER:
1. Look at the git state: the current branch/commit we're on
2. Tune `train.py` with an experimental idea by directly hacking the code.
3. git commit
4. Run the experiment: `uv run train.py > run.log 2>&1` (redirect everything — do NOT use tee or let output flood your context)
5. Read out the results: `grep "^val_bpb:\|^peak_vram_mb:" run.log`
6. If the grep output is empty, the run crashed. Run `tail -n 50 run.log` to read the Python stack trace and attempt a fix. If you can't get things to work after more than a few attempts, give up.
7. Record the results in the tsv (NOTE: do not commit the results.tsv file, leave it untracked by git)
8. If val_bpb improved (lower), you "advance" the branch, keeping the git commit
9. If val_bpb is equal or worse, you git reset back to where you started
The idea is that you are a completely autonomous researcher trying things out. If they work, keep. If they don't, discard. And you're advancing the branch so that you can iterate. If you feel like you're getting stuck in some way, you can rewind but you should probably do this very very sparingly (if ever).
**Timeout**: Each experiment should take ~5 minutes total (+ a few seconds for startup and eval overhead). If a run exceeds 10 minutes, kill it and treat it as a failure (discard and revert).
**Crashes**: If a run crashes (OOM, or a bug, or etc.), use your judgment: If it's something dumb and easy to fix (e.g. a typo, a missing import), fix it and re-run. If the idea itself is fundamentally broken, just skip it, log "crash" as the status in the tsv, and move on.
**NEVER STOP**: Once the experiment loop has begun (after the initial setup), do NOT pause to ask the human if you should continue. Do NOT ask "should I keep going?" or "is this a good stopping point?". The human might be asleep, or gone from a computer and expects you to continue working *indefinitely* until you are manually stopped. You are autonomous. If you run out of ideas, think harder — read papers referenced in the code, re-read the in-scope files for new angles, try combining previous near-misses, try more radical architectural changes. The loop runs until the human interrupts you, period.
As an example use case, a user might leave you running while they sleep. If each experiment takes you ~5 minutes then you can run approx 12/hour, for a total of about 100 over the duration of the average human sleep. The user then wakes up to experimental results, all completed by you while they slept!
FILE:pyproject.toml
[project]
name = "autoresearch"
version = "0.1.0"
description = "Autonomous pretraining research swarm"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"kernels>=0.11.7",
"matplotlib>=3.10.8",
"numpy>=2.2.6",
"pandas>=2.3.3",
"pyarrow>=21.0.0",
"requests>=2.32.0",
"rustbpe>=0.1.0",
"tiktoken>=0.11.0",
"torch==2.9.1",
]
[tool.uv.sources]
torch = [
{ index = "pytorch-cu128" },
]
[[tool.uv.index]]
name = "pytorch-cu128"
url = "https://download.pytorch.org/whl/cu128"
explicit = true
FILE:README.md
# autoresearch

*One day, frontier AI research used to be done by meat computers in between eating, sleeping, having other fun, and synchronizing once in a while using sound wave interconnect in the ritual of "group meeting". That era is long gone. Research is now entirely the domain of autonomous swarms of AI agents running across compute cluster megastructures in the skies. The agents claim that we are now in the 10,205th generation of the code base, in any case no one could tell if that's right or wrong as the "code" is now a self-modifying binary that has grown beyond human comprehension. This repo is the story of how it all began. -@karpathy, March 2026*.
The idea: give an AI agent a small but real LLM training setup and let it experiment autonomously overnight. It modifies the code, trains for 5 minutes, checks if the result improved, keeps or discards, and repeats. You wake up in the morning to a log of experiments and (hopefully) a better model. The training code here is a simplified single-GPU implementation of [nanochat](https://github.com/karpathy/nanochat). The core idea is that you're not touching any of the Python files like you normally would as a researcher. Instead, you are programming the `program.md` Markdown files that provide context to the AI agents and set up your autonomous research org. The default `program.md` in this repo is intentionally kept as a bare bones baseline, though it's obvious how one would iterate on it over time to find the "research org code" that achieves the fastest research progress, how you'd add more agents to the mix, etc. A bit more context on this project is here in this [tweet](https://x.com/karpathy/status/2029701092347630069).
## How it works
The repo is deliberately kept small and only really has three files that matter:
- **`prepare.py`** — fixed constants, one-time data prep (downloads training data, trains a BPE tokenizer), and runtime utilities (dataloader, evaluation). Not modified.
- **`train.py`** — the single file the agent edits. Contains the full GPT model, optimizer (Muon + AdamW), and training loop. Everything is fair game: architecture, hyperparameters, optimizer, batch size, etc. **This file is edited and iterated on by the agent**.
- **`program.md`** — baseline instructions for one agent. Point your agent here and let it go. **This file is edited and iterated on by the human**.
By design, training runs for a **fixed 5-minute time budget** (wall clock, excluding startup/compilation), regardless of the details of your compute. The metric is **val_bpb** (validation bits per byte) — lower is better, and vocab-size-independent so architectural changes are fairly compared.
If you are new to neural networks, this ["Dummy's Guide"](https://x.com/hooeem/status/2030720614752039185) looks pretty good for a lot more context.
## Quick start
**Requirements:** A single NVIDIA GPU (tested on H100), Python 3.10+, [uv](https://docs.astral.sh/uv/).
```bash
# 1. Install uv project manager (if you don't already have it)
curl -LsSf https://astral.sh/uv/install.sh | sh
# 2. Install dependencies
uv sync
# 3. Download data and train tokenizer (one-time, ~2 min)
uv run prepare.py
# 4. Manually run a single training experiment (~5 min)
uv run train.py
```
If the above commands all work ok, your setup is working and you can go into autonomous research mode.
## Running the agent
Simply spin up your Claude/Codex or whatever you want in this repo (and disable all permissions), then you can prompt something like:
```
Hi have a look at program.md and let's kick off a new experiment! let's do the setup first.
```
The `program.md` file is essentially a super lightweight "skill".
## Project structure
```
prepare.py — constants, data prep + runtime utilities (do not modify)
train.py — model, optimizer, training loop (agent modifies this)
program.md — agent instructions
pyproject.toml — dependencies
```
## Design choices
- **Single file to modify.** The agent only touches `train.py`. This keeps the scope manageable and diffs reviewable.
- **Fixed time budget.** Training always runs for exactly 5 minutes, regardless of your specific platform. This means you can expect approx 12 experiments/hour and approx 100 experiments while you sleep. There are two upsides of this design decision. First, this makes experiments directly comparable regardless of what the agent changes (model size, batch size, architecture, etc). Second, this means that autoresearch will find the most optimal model for your platform in that time budget. The downside is that your runs (and results) become not comparable to other people running on other compute platforms.
- **Self-contained.** No external dependencies beyond PyTorch and a few small packages. No distributed training, no complex configs. One GPU, one file, one metric.
## Platform support
This code currently requires that you have a single NVIDIA GPU. In principle it is quite possible to support CPU, MPS and other platforms but this would also bloat the code. I'm not 100% sure that I want to take this on personally right now. People can reference (or have their agents reference) the full/parent nanochat repository that has wider platform support and shows the various solutions (e.g. a Flash Attention 3 kernels fallback implementation, generic device support, autodetection, etc.), feel free to create forks or discussions for other platforms and I'm happy to link to them here in the README in some new notable forks section or etc.
Seeing as there seems to be a lot of interest in tinkering with autoresearch on much smaller compute platforms than an H100, a few extra words. If you're going to try running autoresearch on smaller computers (Macbooks etc.), I'd recommend one of the forks below. On top of this, here are some recommendations for how to tune the defaults for much smaller models for aspiring forks:
1. To get half-decent results I'd use a dataset with a lot less entropy, e.g. this [TinyStories dataset](https://huggingface.co/datasets/karpathy/tinystories-gpt4-clean). These are GPT-4 generated short stories. Because the data is a lot narrower in scope, you will see reasonable results with a lot smaller models (if you try to sample from them after training).
2. You might experiment with decreasing `vocab_size`, e.g. from 8192 down to 4096, 2048, 1024, or even - simply byte-level tokenizer with 256 possibly bytes after utf-8 encoding.
3. In `prepare.py`, you'll want to lower `MAX_SEQ_LEN` a lot, depending on the computer even down to 256 etc. As you lower `MAX_SEQ_LEN`, you may want to experiment with increasing `DEVICE_BATCH_SIZE` in `train.py` slightly to compensate. The number of tokens per fwd/bwd pass is the product of these two.
4. Also in `prepare.py`, you'll want to decrease `EVAL_TOKENS` so that your validation loss is evaluated on a lot less data.
5. In `train.py`, the primary single knob that controls model complexity is the `DEPTH` (default 8, here). A lot of variables are just functions of this, so e.g. lower it down to e.g. 4.
6. You'll want to most likely use `WINDOW_PATTERN` of just "L", because "SSSL" uses alternating banded attention pattern that may be very inefficient for you. Try it.
7. You'll want to lower `TOTAL_BATCH_SIZE` a lot, but keep it powers of 2, e.g. down to `2**14` (~16K) or so even, hard to tell.
I think these would be the reasonable hyperparameters to play with. Ask your favorite coding agent for help and copy paste them this guide, as well as the full source code.
## Notable forks
- [miolini/autoresearch-macos](https://github.com/miolini/autoresearch-macos) (MacOS)
- [trevin-creator/autoresearch-mlx](https://github.com/trevin-creator/autoresearch-mlx) (MacOS)
- [jsegov/autoresearch-win-rtx](https://github.com/jsegov/autoresearch-win-rtx) (Windows)
- [andyluo7/autoresearch](https://github.com/andyluo7/autoresearch) (AMD)
## License
MIT
FILE:train.py
"""
Autoresearch pretraining script. Single-GPU, single-file.
Cherry-picked and simplified from nanochat.
Usage: uv run train.py
"""
import os
os.environ["PYTORCH_ALLOC_CONF"] = "expandable_segments:True"
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
import gc
import math
import time
from dataclasses import dataclass, asdict
import torch
import torch.nn as nn
import torch.nn.functional as F
from kernels import get_kernel
cap = torch.cuda.get_device_capability()
# varunneal's FA3 is Hopper only, use kernels-community on non-Hopper GPUs
repo = "varunneal/flash-attention-3" if cap == (9, 0) else "kernels-community/flash-attn3"
fa3 = get_kernel(repo).flash_attn_interface
from prepare import MAX_SEQ_LEN, TIME_BUDGET, Tokenizer, make_dataloader, evaluate_bpb
# ---------------------------------------------------------------------------
# GPT Model
# ---------------------------------------------------------------------------
@dataclass
class GPTConfig:
sequence_len: int = 2048
vocab_size: int = 32768
n_layer: int = 12
n_head: int = 6
n_kv_head: int = 6
n_embd: int = 768
window_pattern: str = "SSSL"
def norm(x):
return F.rms_norm(x, (x.size(-1),))
def has_ve(layer_idx, n_layer):
"""Returns True if layer should have Value Embedding (alternating, last always included)."""
return layer_idx % 2 == (n_layer - 1) % 2
def apply_rotary_emb(x, cos, sin):
assert x.ndim == 4
d = x.shape[3] // 2
x1, x2 = x[..., :d], x[..., d:]
y1 = x1 * cos + x2 * sin
y2 = x1 * (-sin) + x2 * cos
return torch.cat([y1, y2], 3)
class CausalSelfAttention(nn.Module):
def __init__(self, config, layer_idx):
super().__init__()
self.n_head = config.n_head
self.n_kv_head = config.n_kv_head
self.n_embd = config.n_embd
self.head_dim = self.n_embd // self.n_head
assert self.n_embd % self.n_head == 0
assert self.n_kv_head <= self.n_head and self.n_head % self.n_kv_head == 0
self.c_q = nn.Linear(self.n_embd, self.n_head * self.head_dim, bias=False)
self.c_k = nn.Linear(self.n_embd, self.n_kv_head * self.head_dim, bias=False)
self.c_v = nn.Linear(self.n_embd, self.n_kv_head * self.head_dim, bias=False)
self.c_proj = nn.Linear(self.n_embd, self.n_embd, bias=False)
self.ve_gate_channels = 32
self.ve_gate = nn.Linear(self.ve_gate_channels, self.n_kv_head, bias=False) if has_ve(layer_idx, config.n_layer) else None
def forward(self, x, ve, cos_sin, window_size):
B, T, C = x.size()
q = self.c_q(x).view(B, T, self.n_head, self.head_dim)
k = self.c_k(x).view(B, T, self.n_kv_head, self.head_dim)
v = self.c_v(x).view(B, T, self.n_kv_head, self.head_dim)
# Value residual (ResFormer): mix in value embedding with input-dependent gate per head
if ve is not None:
ve = ve.view(B, T, self.n_kv_head, self.head_dim)
gate = 2 * torch.sigmoid(self.ve_gate(x[..., :self.ve_gate_channels]))
v = v + gate.unsqueeze(-1) * ve
cos, sin = cos_sin
q, k = apply_rotary_emb(q, cos, sin), apply_rotary_emb(k, cos, sin)
q, k = norm(q), norm(k)
y = fa3.flash_attn_func(q, k, v, causal=True, window_size=window_size)
y = y.contiguous().view(B, T, -1)
y = self.c_proj(y)
return y
class MLP(nn.Module):
def __init__(self, config):
super().__init__()
self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=False)
self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=False)
def forward(self, x):
x = self.c_fc(x)
x = F.relu(x).square()
x = self.c_proj(x)
return x
class Block(nn.Module):
def __init__(self, config, layer_idx):
super().__init__()
self.attn = CausalSelfAttention(config, layer_idx)
self.mlp = MLP(config)
def forward(self, x, ve, cos_sin, window_size):
x = x + self.attn(norm(x), ve, cos_sin, window_size)
x = x + self.mlp(norm(x))
return x
class GPT(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.window_sizes = self._compute_window_sizes(config)
self.transformer = nn.ModuleDict({
"wte": nn.Embedding(config.vocab_size, config.n_embd),
"h": nn.ModuleList([Block(config, i) for i in range(config.n_layer)]),
})
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
self.resid_lambdas = nn.Parameter(torch.ones(config.n_layer))
self.x0_lambdas = nn.Parameter(torch.zeros(config.n_layer))
# Value embeddings
head_dim = config.n_embd // config.n_head
kv_dim = config.n_kv_head * head_dim
self.value_embeds = nn.ModuleDict({
str(i): nn.Embedding(config.vocab_size, kv_dim)
for i in range(config.n_layer) if has_ve(i, config.n_layer)
})
# Rotary embeddings
self.rotary_seq_len = config.sequence_len * 10
cos, sin = self._precompute_rotary_embeddings(self.rotary_seq_len, head_dim)
self.register_buffer("cos", cos, persistent=False)
self.register_buffer("sin", sin, persistent=False)
@torch.no_grad()
def init_weights(self):
# Embedding and unembedding
torch.nn.init.normal_(self.transformer.wte.weight, mean=0.0, std=1.0)
torch.nn.init.normal_(self.lm_head.weight, mean=0.0, std=0.001)
# Transformer blocks
n_embd = self.config.n_embd
s = 3**0.5 * n_embd**-0.5
for block in self.transformer.h:
torch.nn.init.uniform_(block.attn.c_q.weight, -s, s)
torch.nn.init.uniform_(block.attn.c_k.weight, -s, s)
torch.nn.init.uniform_(block.attn.c_v.weight, -s, s)
torch.nn.init.zeros_(block.attn.c_proj.weight)
torch.nn.init.uniform_(block.mlp.c_fc.weight, -s, s)
torch.nn.init.zeros_(block.mlp.c_proj.weight)
# Per-layer scalars
self.resid_lambdas.fill_(1.0)
self.x0_lambdas.fill_(0.1)
# Value embeddings
for ve in self.value_embeds.values():
torch.nn.init.uniform_(ve.weight, -s, s)
# Gate weights init to zero (sigmoid(0)=0.5, scaled by 2 -> 1.0 = neutral)
for block in self.transformer.h:
if block.attn.ve_gate is not None:
torch.nn.init.zeros_(block.attn.ve_gate.weight)
# Rotary embeddings
head_dim = self.config.n_embd // self.config.n_head
cos, sin = self._precompute_rotary_embeddings(self.rotary_seq_len, head_dim)
self.cos, self.sin = cos, sin
# Cast embeddings to bf16
self.transformer.wte.to(dtype=torch.bfloat16)
for ve in self.value_embeds.values():
ve.to(dtype=torch.bfloat16)
def _precompute_rotary_embeddings(self, seq_len, head_dim, base=10000, device=None):
if device is None:
device = self.transformer.wte.weight.device
channel_range = torch.arange(0, head_dim, 2, dtype=torch.float32, device=device)
inv_freq = 1.0 / (base ** (channel_range / head_dim))
t = torch.arange(seq_len, dtype=torch.float32, device=device)
freqs = torch.outer(t, inv_freq)
cos, sin = freqs.cos(), freqs.sin()
cos, sin = cos.bfloat16(), sin.bfloat16()
cos, sin = cos[None, :, None, :], sin[None, :, None, :]
return cos, sin
def _compute_window_sizes(self, config):
pattern = config.window_pattern.upper()
assert all(c in "SL" for c in pattern)
long_window = config.sequence_len
short_window = long_window // 2
char_to_window = {"L": (long_window, 0), "S": (short_window, 0)}
window_sizes = []
for layer_idx in range(config.n_layer):
char = pattern[layer_idx % len(pattern)]
window_sizes.append(char_to_window[char])
window_sizes[-1] = (long_window, 0)
return window_sizes
def estimate_flops(self):
"""Estimated FLOPs per token (forward + backward)."""
nparams = sum(p.numel() for p in self.parameters())
value_embeds_numel = sum(ve.weight.numel() for ve in self.value_embeds.values())
nparams_exclude = (self.transformer.wte.weight.numel() + value_embeds_numel +
self.resid_lambdas.numel() + self.x0_lambdas.numel())
h = self.config.n_head
q = self.config.n_embd // self.config.n_head
t = self.config.sequence_len
attn_flops = 0
for window_size in self.window_sizes:
window = window_size[0]
effective_seq = t if window < 0 else min(window, t)
attn_flops += 12 * h * q * effective_seq
return 6 * (nparams - nparams_exclude) + attn_flops
def num_scaling_params(self):
wte = sum(p.numel() for p in self.transformer.wte.parameters())
value_embeds = sum(p.numel() for p in self.value_embeds.parameters())
lm_head = sum(p.numel() for p in self.lm_head.parameters())
transformer_matrices = sum(p.numel() for p in self.transformer.h.parameters())
scalars = self.resid_lambdas.numel() + self.x0_lambdas.numel()
total = wte + value_embeds + lm_head + transformer_matrices + scalars
return {
'wte': wte, 'value_embeds': value_embeds, 'lm_head': lm_head,
'transformer_matrices': transformer_matrices, 'scalars': scalars, 'total': total,
}
def setup_optimizer(self, unembedding_lr=0.004, embedding_lr=0.2, matrix_lr=0.02,
weight_decay=0.0, adam_betas=(0.8, 0.95), scalar_lr=0.5):
model_dim = self.config.n_embd
matrix_params = list(self.transformer.h.parameters())
value_embeds_params = list(self.value_embeds.parameters())
embedding_params = list(self.transformer.wte.parameters())
lm_head_params = list(self.lm_head.parameters())
resid_params = [self.resid_lambdas]
x0_params = [self.x0_lambdas]
assert len(list(self.parameters())) == (len(matrix_params) + len(embedding_params) +
len(lm_head_params) + len(value_embeds_params) + len(resid_params) + len(x0_params))
# Scale LR ∝ 1/√dmodel (tuned at 768 dim)
dmodel_lr_scale = (model_dim / 768) ** -0.5
print(f"Scaling AdamW LRs by 1/sqrt({model_dim}/768) = {dmodel_lr_scale:.6f}")
param_groups = [
dict(kind='adamw', params=lm_head_params, lr=unembedding_lr * dmodel_lr_scale, betas=adam_betas, eps=1e-10, weight_decay=0.0),
dict(kind='adamw', params=embedding_params, lr=embedding_lr * dmodel_lr_scale, betas=adam_betas, eps=1e-10, weight_decay=0.0),
dict(kind='adamw', params=value_embeds_params, lr=embedding_lr * dmodel_lr_scale, betas=adam_betas, eps=1e-10, weight_decay=0.0),
dict(kind='adamw', params=resid_params, lr=scalar_lr * 0.01, betas=adam_betas, eps=1e-10, weight_decay=0.0),
dict(kind='adamw', params=x0_params, lr=scalar_lr, betas=(0.96, 0.95), eps=1e-10, weight_decay=0.0),
]
for shape in sorted({p.shape for p in matrix_params}):
group_params = [p for p in matrix_params if p.shape == shape]
param_groups.append(dict(
kind='muon', params=group_params, lr=matrix_lr,
momentum=0.95, ns_steps=5, beta2=0.95, weight_decay=weight_decay,
))
optimizer = MuonAdamW(param_groups)
for group in optimizer.param_groups:
group["initial_lr"] = group["lr"]
return optimizer
def forward(self, idx, targets=None, reduction='mean'):
B, T = idx.size()
assert T <= self.cos.size(1)
cos_sin = self.cos[:, :T], self.sin[:, :T]
x = self.transformer.wte(idx)
x = norm(x)
x0 = x
for i, block in enumerate(self.transformer.h):
x = self.resid_lambdas[i] * x + self.x0_lambdas[i] * x0
ve = self.value_embeds[str(i)](idx) if str(i) in self.value_embeds else None
x = block(x, ve, cos_sin, self.window_sizes[i])
x = norm(x)
softcap = 15
logits = self.lm_head(x)
logits = logits.float()
logits = softcap * torch.tanh(logits / softcap)
if targets is not None:
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1),
ignore_index=-1, reduction=reduction)
return loss
return logits
# ---------------------------------------------------------------------------
# Optimizer (MuonAdamW, single GPU only)
# ---------------------------------------------------------------------------
polar_express_coeffs = [
(8.156554524902461, -22.48329292557795, 15.878769915207462),
(4.042929935166739, -2.808917465908714, 0.5000178451051316),
(3.8916678022926607, -2.772484153217685, 0.5060648178503393),
(3.285753657755655, -2.3681294933425376, 0.46449024233003106),
(2.3465413258596377, -1.7097828382687081, 0.42323551169305323),
]
@torch.compile(dynamic=False, fullgraph=True)
def adamw_step_fused(p, grad, exp_avg, exp_avg_sq, step_t, lr_t, beta1_t, beta2_t, eps_t, wd_t):
p.mul_(1 - lr_t * wd_t)
exp_avg.lerp_(grad, 1 - beta1_t)
exp_avg_sq.lerp_(grad.square(), 1 - beta2_t)
bias1 = 1 - beta1_t ** step_t
bias2 = 1 - beta2_t ** step_t
denom = (exp_avg_sq / bias2).sqrt() + eps_t
step_size = lr_t / bias1
p.add_(exp_avg / denom, alpha=-step_size)
@torch.compile(dynamic=False, fullgraph=True)
def muon_step_fused(stacked_grads, stacked_params, momentum_buffer, second_momentum_buffer,
momentum_t, lr_t, wd_t, beta2_t, ns_steps, red_dim):
# Nesterov momentum
momentum = momentum_t.to(stacked_grads.dtype)
momentum_buffer.lerp_(stacked_grads, 1 - momentum)
g = stacked_grads.lerp_(momentum_buffer, momentum)
# Polar express orthogonalization
X = g.bfloat16()
X = X / (X.norm(dim=(-2, -1), keepdim=True) * 1.02 + 1e-6)
if g.size(-2) > g.size(-1):
for a, b, c in polar_express_coeffs[:ns_steps]:
A = X.mT @ X
B = b * A + c * (A @ A)
X = a * X + X @ B
else:
for a, b, c in polar_express_coeffs[:ns_steps]:
A = X @ X.mT
B = b * A + c * (A @ A)
X = a * X + B @ X
g = X
# NorMuon variance reduction
beta2 = beta2_t.to(g.dtype)
v_mean = g.float().square().mean(dim=red_dim, keepdim=True)
red_dim_size = g.size(red_dim)
v_norm_sq = v_mean.sum(dim=(-2, -1), keepdim=True) * red_dim_size
v_norm = v_norm_sq.sqrt()
second_momentum_buffer.lerp_(v_mean.to(dtype=second_momentum_buffer.dtype), 1 - beta2)
step_size = second_momentum_buffer.clamp_min(1e-10).rsqrt()
scaled_sq_sum = (v_mean * red_dim_size) * step_size.float().square()
v_norm_new = scaled_sq_sum.sum(dim=(-2, -1), keepdim=True).sqrt()
final_scale = step_size * (v_norm / v_norm_new.clamp_min(1e-10))
g = g * final_scale.to(g.dtype)
# Cautious weight decay + parameter update
lr = lr_t.to(g.dtype)
wd = wd_t.to(g.dtype)
mask = (g * stacked_params) >= 0
stacked_params.sub_(lr * g + lr * wd * stacked_params * mask)
class MuonAdamW(torch.optim.Optimizer):
"""Combined optimizer: Muon for 2D matrix params, AdamW for others."""
def __init__(self, param_groups):
super().__init__(param_groups, defaults={})
# 0-D CPU tensors to avoid torch.compile recompilation when values change
self._adamw_step_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._adamw_lr_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._adamw_beta1_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._adamw_beta2_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._adamw_eps_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._adamw_wd_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._muon_momentum_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._muon_lr_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._muon_wd_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
self._muon_beta2_t = torch.tensor(0.0, dtype=torch.float32, device="cpu")
def _step_adamw(self, group):
for p in group['params']:
if p.grad is None:
continue
grad = p.grad
state = self.state[p]
if not state:
state['step'] = 0
state['exp_avg'] = torch.zeros_like(p)
state['exp_avg_sq'] = torch.zeros_like(p)
state['step'] += 1
self._adamw_step_t.fill_(state['step'])
self._adamw_lr_t.fill_(group['lr'])
self._adamw_beta1_t.fill_(group['betas'][0])
self._adamw_beta2_t.fill_(group['betas'][1])
self._adamw_eps_t.fill_(group['eps'])
self._adamw_wd_t.fill_(group['weight_decay'])
adamw_step_fused(p, grad, state['exp_avg'], state['exp_avg_sq'],
self._adamw_step_t, self._adamw_lr_t, self._adamw_beta1_t,
self._adamw_beta2_t, self._adamw_eps_t, self._adamw_wd_t)
def _step_muon(self, group):
params = group['params']
if not params:
return
p = params[0]
state = self.state[p]
num_params = len(params)
shape, device, dtype = p.shape, p.device, p.dtype
if "momentum_buffer" not in state:
state["momentum_buffer"] = torch.zeros(num_params, *shape, dtype=dtype, device=device)
if "second_momentum_buffer" not in state:
state_shape = (num_params, shape[-2], 1) if shape[-2] >= shape[-1] else (num_params, 1, shape[-1])
state["second_momentum_buffer"] = torch.zeros(state_shape, dtype=dtype, device=device)
red_dim = -1 if shape[-2] >= shape[-1] else -2
stacked_grads = torch.stack([p.grad for p in params])
stacked_params = torch.stack(params)
self._muon_momentum_t.fill_(group["momentum"])
self._muon_beta2_t.fill_(group["beta2"] if group["beta2"] is not None else 0.0)
self._muon_lr_t.fill_(group["lr"] * max(1.0, shape[-2] / shape[-1])**0.5)
self._muon_wd_t.fill_(group["weight_decay"])
muon_step_fused(stacked_grads, stacked_params,
state["momentum_buffer"], state["second_momentum_buffer"],
self._muon_momentum_t, self._muon_lr_t, self._muon_wd_t,
self._muon_beta2_t, group["ns_steps"], red_dim)
torch._foreach_copy_(params, list(stacked_params.unbind(0)))
@torch.no_grad()
def step(self):
for group in self.param_groups:
if group['kind'] == 'adamw':
self._step_adamw(group)
elif group['kind'] == 'muon':
self._step_muon(group)
# ---------------------------------------------------------------------------
# Hyperparameters (edit these directly, no CLI flags needed)
# ---------------------------------------------------------------------------
# Model architecture
ASPECT_RATIO = 64 # model_dim = depth * ASPECT_RATIO
HEAD_DIM = 128 # target head dimension for attention
WINDOW_PATTERN = "SSSL" # sliding window pattern: L=full, S=half context
# Optimization
TOTAL_BATCH_SIZE = 2**19 # ~524K tokens per optimizer step
EMBEDDING_LR = 0.6 # learning rate for token embeddings (Adam)
UNEMBEDDING_LR = 0.004 # learning rate for lm_head (Adam)
MATRIX_LR = 0.04 # learning rate for matrix parameters (Muon)
SCALAR_LR = 0.5 # learning rate for per-layer scalars (Adam)
WEIGHT_DECAY = 0.2 # cautious weight decay for Muon
ADAM_BETAS = (0.8, 0.95) # Adam beta1, beta2
WARMUP_RATIO = 0.0 # fraction of time budget for LR warmup
WARMDOWN_RATIO = 0.5 # fraction of time budget for LR warmdown
FINAL_LR_FRAC = 0.0 # final LR as fraction of initial
# Model size
DEPTH = 8 # number of transformer layers
DEVICE_BATCH_SIZE = 128 # per-device batch size (reduce if OOM)
# ---------------------------------------------------------------------------
# Setup: tokenizer, model, optimizer, dataloader
# ---------------------------------------------------------------------------
t_start = time.time()
torch.manual_seed(42)
torch.cuda.manual_seed(42)
torch.set_float32_matmul_precision("high")
device = torch.device("cuda")
autocast_ctx = torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16)
H100_BF16_PEAK_FLOPS = 989.5e12
tokenizer = Tokenizer.from_directory()
vocab_size = tokenizer.get_vocab_size()
print(f"Vocab size: {vocab_size:,}")
def build_model_config(depth):
base_dim = depth * ASPECT_RATIO
model_dim = ((base_dim + HEAD_DIM - 1) // HEAD_DIM) * HEAD_DIM
num_heads = model_dim // HEAD_DIM
return GPTConfig(
sequence_len=MAX_SEQ_LEN, vocab_size=vocab_size,
n_layer=depth, n_head=num_heads, n_kv_head=num_heads, n_embd=model_dim,
window_pattern=WINDOW_PATTERN,
)
config = build_model_config(DEPTH)
print(f"Model config: {asdict(config)}")
with torch.device("meta"):
model = GPT(config)
model.to_empty(device=device)
model.init_weights()
param_counts = model.num_scaling_params()
print("Parameter counts:")
for key, value in param_counts.items():
print(f" {key:24s}: {value:,}")
num_params = param_counts['total']
num_flops_per_token = model.estimate_flops()
print(f"Estimated FLOPs per token: {num_flops_per_token:e}")
tokens_per_fwdbwd = DEVICE_BATCH_SIZE * MAX_SEQ_LEN
assert TOTAL_BATCH_SIZE % tokens_per_fwdbwd == 0
grad_accum_steps = TOTAL_BATCH_SIZE // tokens_per_fwdbwd
optimizer = model.setup_optimizer(
unembedding_lr=UNEMBEDDING_LR,
embedding_lr=EMBEDDING_LR,
scalar_lr=SCALAR_LR,
adam_betas=ADAM_BETAS,
matrix_lr=MATRIX_LR,
weight_decay=WEIGHT_DECAY,
)
model = torch.compile(model, dynamic=False)
train_loader = make_dataloader(tokenizer, DEVICE_BATCH_SIZE, MAX_SEQ_LEN, "train")
x, y, epoch = next(train_loader) # prefetch first batch
print(f"Time budget: {TIME_BUDGET}s")
print(f"Gradient accumulation steps: {grad_accum_steps}")
# Schedules (all based on progress = training_time / TIME_BUDGET)
def get_lr_multiplier(progress):
if progress < WARMUP_RATIO:
return progress / WARMUP_RATIO if WARMUP_RATIO > 0 else 1.0
elif progress < 1.0 - WARMDOWN_RATIO:
return 1.0
else:
cooldown = (1.0 - progress) / WARMDOWN_RATIO
return cooldown * 1.0 + (1 - cooldown) * FINAL_LR_FRAC
def get_muon_momentum(step):
frac = min(step / 300, 1)
return (1 - frac) * 0.85 + frac * 0.95
def get_weight_decay(progress):
return WEIGHT_DECAY * (1 - progress)
# ---------------------------------------------------------------------------
# Training loop
# ---------------------------------------------------------------------------
t_start_training = time.time()
smooth_train_loss = 0
total_training_time = 0
step = 0
while True:
torch.cuda.synchronize()
t0 = time.time()
for micro_step in range(grad_accum_steps):
with autocast_ctx:
loss = model(x, y)
train_loss = loss.detach()
loss = loss / grad_accum_steps
loss.backward()
x, y, epoch = next(train_loader)
# Progress and schedules
progress = min(total_training_time / TIME_BUDGET, 1.0)
lrm = get_lr_multiplier(progress)
muon_momentum = get_muon_momentum(step)
muon_weight_decay = get_weight_decay(progress)
for group in optimizer.param_groups:
group["lr"] = group["initial_lr"] * lrm
if group['kind'] == 'muon':
group["momentum"] = muon_momentum
group["weight_decay"] = muon_weight_decay
optimizer.step()
model.zero_grad(set_to_none=True)
train_loss_f = train_loss.item()
# Fast fail: abort if loss is exploding or NaN
if math.isnan(train_loss_f) or train_loss_f > 100:
print("FAIL")
exit(1)
torch.cuda.synchronize()
t1 = time.time()
dt = t1 - t0
if step > 10:
total_training_time += dt
# Logging
ema_beta = 0.9
smooth_train_loss = ema_beta * smooth_train_loss + (1 - ema_beta) * train_loss_f
debiased_smooth_loss = smooth_train_loss / (1 - ema_beta**(step + 1))
pct_done = 100 * progress
tok_per_sec = int(TOTAL_BATCH_SIZE / dt)
mfu = 100 * num_flops_per_token * TOTAL_BATCH_SIZE / dt / H100_BF16_PEAK_FLOPS
remaining = max(0, TIME_BUDGET - total_training_time)
print(f"\rstep {step:05d} ({pct_done:.1f}%) | loss: {debiased_smooth_loss:.6f} | lrm: {lrm:.2f} | dt: {dt*1000:.0f}ms | tok/sec: {tok_per_sec:,} | mfu: {mfu:.1f}% | epoch: {epoch} | remaining: {remaining:.0f}s ", end="", flush=True)
# GC management (Python's GC causes ~500ms stalls)
if step == 0:
gc.collect()
gc.freeze()
gc.disable()
elif (step + 1) % 5000 == 0:
gc.collect()
step += 1
# Time's up — but only stop after warmup steps so we don't count compilation
if step > 10 and total_training_time >= TIME_BUDGET:
break
print() # newline after \r training log
total_tokens = step * TOTAL_BATCH_SIZE
# Final eval
model.eval()
with autocast_ctx:
val_bpb = evaluate_bpb(model, tokenizer, DEVICE_BATCH_SIZE)
# Final summary
t_end = time.time()
startup_time = t_start_training - t_start
steady_state_mfu = 100 * num_flops_per_token * TOTAL_BATCH_SIZE * (step - 10) / total_training_time / H100_BF16_PEAK_FLOPS if total_training_time > 0 else 0
peak_vram_mb = torch.cuda.max_memory_allocated() / 1024 / 1024
print("---")
print(f"val_bpb: {val_bpb:.6f}")
print(f"training_seconds: {total_training_time:.1f}")
print(f"total_seconds: {t_end - t_start:.1f}")
print(f"peak_vram_mb: {peak_vram_mb:.1f}")
print(f"mfu_percent: {steady_state_mfu:.2f}")
print(f"total_tokens_M: {total_tokens / 1e6:.1f}")
print(f"num_steps: {step}")
print(f"num_params_M: {num_params / 1e6:.1f}")
print(f"depth: {DEPTH}")
Process and manage Microsoft Word (.docx) and WPS documents for creation, editing, format conversion, text extraction, analysis, troubleshooting, and batch o...
---
name: office-docs
description: Comprehensive document processing for Microsoft Word (.docx) and WPS Office files. Use when Codex needs to work with professional documents for: (1) Creating new documents, (2) Modifying or editing content, (3) Converting between formats, (4) Extracting text and metadata, (5) Troubleshooting document issues, (6) Batch processing documents, or any other Office document tasks.
---
# Office Documents Skill
This skill provides comprehensive tools and workflows for working with Microsoft Word (.docx) and WPS Office documents. It covers creation, editing, conversion, analysis, and troubleshooting of professional documents.
## Quick Start
### Basic Operations
**Read document content:**
```python
# Use python-docx for .docx files
from docx import Document
doc = Document('document.docx')
text = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
```
**Create new document:**
```python
from docx import Document
from docx.shared import Inches
doc = Document()
doc.add_heading('Document Title', 0)
doc.add_paragraph('This is a new paragraph.')
doc.save('new_document.docx')
```
### Common Tasks
1. **Text extraction** - See [TEXT_EXTRACTION.md](references/TEXT_EXTRACTION.md)
2. **Format conversion** - See [CONVERSION.md](references/CONVERSION.md)
3. **Document analysis** - See [ANALYSIS.md](references/ANALYSIS.md)
4. **Troubleshooting** - See [TROUBLESHOOTING.md](references/TROUBLESHOOTING.md)
## Core Tools and Libraries
### Python Libraries
**For .docx files:**
- `python-docx` - Primary library for reading/writing .docx
- `docx2txt` - Simple text extraction
- `docxcompose` - Advanced document composition
- `docx-mailmerge` - Mail merge functionality
**For WPS files:**
- `pywps` - WPS file manipulation (when available)
- Conversion to .docx first recommended
**For format conversion:**
- `pandoc` - Universal document converter
- `libreoffice` - Office suite for conversion
- `unoconv` - Universal office converter
### Command Line Tools
**Document conversion:**
```bash
# Convert .docx to PDF
libreoffice --headless --convert-to pdf document.docx
# Convert .docx to text
pandoc document.docx -o document.txt
# Batch convert WPS to .docx
for file in *.wps; do libreoffice --headless --convert-to docx "$file"; done
```
**Document analysis:**
```bash
# Extract metadata
exiftool document.docx
# Check file integrity
file document.docx
```
## Workflows
### 1. Document Creation Workflow
When creating new documents:
1. **Choose template** - Start from template or create from scratch
2. **Add structure** - Headings, paragraphs, lists
3. **Apply formatting** - Styles, fonts, spacing
4. **Add elements** - Tables, images, hyperlinks
5. **Finalize** - Page setup, headers/footers, save
See [CREATION.md](references/CREATION.md) for detailed patterns.
### 2. Document Editing Workflow
When modifying existing documents:
1. **Backup original** - Always create backup first
2. **Analyze structure** - Understand document layout
3. **Make changes** - Edit content, update formatting
4. **Preserve formatting** - Maintain original styles
5. **Validate** - Check for corruption, save new version
See [EDITING.md](references/EDITING.md) for detailed patterns.
### 3. Conversion Workflow
When converting between formats:
1. **Identify source format** - .docx, .wps, .doc, .rtf, etc.
2. **Choose conversion tool** - Based on format and requirements
3. **Convert** - With appropriate options
4. **Verify** - Check content preservation
5. **Clean up** - Remove temporary files
See [CONVERSION.md](references/CONVERSION.md) for detailed patterns.
## Common Issues and Solutions
### 1. Corrupted Documents
**Symptoms:** Won't open, error messages, missing content
**Solutions:**
- Try opening in different application
- Use recovery mode in Word/WPS
- Extract content with `python-docx` ignoring errors
- Convert to different format and back
See [TROUBLESHOOTING.md](references/TROUBLESHOOTING.md#corruption) for detailed recovery procedures.
### 2. Formatting Issues
**Symptoms:** Wrong fonts, broken layout, missing styles
**Solutions:**
- Check style definitions
- Verify font availability
- Use template-based approach
- Simplify complex formatting
### 3. Compatibility Problems
**Symptoms:** Different appearance in Word vs WPS, missing features
**Solutions:**
- Stick to common features
- Test in both applications
- Use standard formats
- Provide alternative versions
## Advanced Features
### Document Automation
**Batch processing:**
```python
import os
from docx import Document
def process_documents(folder_path):
for filename in os.listdir(folder_path):
if filename.endswith('.docx'):
doc_path = os.path.join(folder_path, filename)
process_single_document(doc_path)
```
**Template-based generation:**
```python
from docx import Document
def generate_from_template(template_path, data):
doc = Document(template_path)
# Replace placeholders with data
for paragraph in doc.paragraphs:
for key, value in data.items():
if f'{{{{ {key} }}}}' in paragraph.text:
paragraph.text = paragraph.text.replace(f'{{{{ {key} }}}}', value)
return doc
```
### Document Analysis
**Extract statistics:**
```python
def analyze_document(doc_path):
doc = Document(doc_path)
stats = {
'paragraphs': len(doc.paragraphs),
'tables': len(doc.tables),
'images': len(doc.inline_shapes),
'sections': len(doc.sections),
'styles': len(doc.styles)
}
return stats
```
**Check formatting consistency:**
```python
def check_formatting(doc):
issues = []
for i, para in enumerate(doc.paragraphs):
if para.style.name == 'Normal' and para.text.strip():
# Check for inconsistent formatting
if len(para.runs) > 1:
issues.append(f"Paragraph {i}: Multiple runs in Normal style")
return issues
```
## Best Practices
### 1. Always Backup
```python
import shutil
import os
def backup_document(filepath):
backup_path = filepath + '.backup'
shutil.copy2(filepath, backup_path)
return backup_path
```
### 2. Use Version Control
- Save incremental versions
- Use descriptive filenames
- Document changes made
### 3. Test Thoroughly
- Test in target application
- Verify all content preserved
- Check formatting integrity
### 4. Handle Errors Gracefully
```python
try:
doc = Document(filepath)
except Exception as e:
print(f"Error opening {filepath}: {e}")
# Try alternative methods
return extract_text_fallback(filepath)
```
## Reference Files
For detailed information on specific topics, consult these reference files:
- [TEXT_EXTRACTION.md](references/TEXT_EXTRACTION.md) - Text extraction methods and patterns
- [CONVERSION.md](references/CONVERSION.md) - Format conversion guides
- [ANALYSIS.md](references/ANALYSIS.md) - Document analysis techniques
- [TROUBLESHOOTING.md](references/TROUBLESHOOTING.md) - Common issues and solutions
- [CREATION.md](references/CREATION.md) - Document creation patterns
- [EDITING.md](references/EDITING.md) - Document editing workflows
- [AUTOMATION.md](references/AUTOMATION.md) - Automation scripts and templates
## Scripts
Available scripts in the `scripts/` directory:
- `extract_text.py` - Extract text from .docx files
- `convert_format.py` - Convert between document formats
- `batch_process.py` - Process multiple documents
- `document_stats.py` - Generate document statistics
- `repair_document.py` - Attempt to repair corrupted documents
Run scripts with appropriate parameters:
```bash
python scripts/extract_text.py input.docx output.txt
```
## Getting Help
If you encounter issues not covered in this skill:
1. Check the relevant reference file
2. Search for specific error messages
3. Try alternative approaches
4. Consider converting to simpler format
Remember: When in doubt, create a backup and work on a copy.
FILE:references/TEXT_EXTRACTION.md
# Text Extraction from Office Documents
This guide covers various methods for extracting text from Microsoft Word (.docx) and WPS Office documents.
## Python Methods
### 1. Using python-docx (Recommended for .docx)
**Basic text extraction:**
```python
from docx import Document
def extract_text_docx(filepath):
"""Extract all text from a .docx file."""
doc = Document(filepath)
text_parts = []
# Extract paragraphs
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text_parts.append(paragraph.text)
# Extract tables
for table in doc.tables:
for row in table.rows:
row_text = []
for cell in row.cells:
if cell.text.strip():
row_text.append(cell.text)
if row_text:
text_parts.append(' | '.join(row_text))
# Extract headers and footers
for section in doc.sections:
for header in section.header.paragraphs:
if header.text.strip():
textAutomate web browsing with navigation, form filling, clicking, screenshots, data extraction, and testing using Chrome via OpenClaw browser tool.
# Agent Browser Skill
## Description
Enhanced browser automation for OpenClaw agents with advanced navigation, screenshot, and interaction capabilities.
## When to Use
Use this skill when:
- Automating web browsing tasks
- Taking screenshots of web pages
- Filling forms and clicking buttons
- Extracting data from websites
- Testing web applications
- Navigating complex web flows
## Prerequisites
- OpenClaw browser tool must be enabled
- Chrome or Chromium browser installed
- Internet connection for web access
## Examples
### Basic Navigation
```bash
# Navigate to a website
openclaw browser open --url "https://example.com"
# Take a screenshot
openclaw browser snapshot --url "https://example.com" --output screenshot.png
```
### Form Interaction
```bash
# Fill a form
openclaw browser act --url "https://forms.example.com" --kind fill --fields '{"name": "John", "email": "[email protected]"}'
# Click a button
openclaw browser act --url "https://example.com" --kind click --selector "button.submit"
```
### Data Extraction
```bash
# Extract page content
openclaw browser snapshot --url "https://news.example.com" --maxChars 5000
# Monitor page changes
openclaw browser act --url "https://status.example.com" --kind wait --textGone "Loading..."
```
## Integration with OpenClaw
This skill enhances the native OpenClaw browser tool with:
1. **Simplified commands** - Easier syntax for common tasks
2. **Error handling** - Better recovery from failures
3. **Performance optimization** - Faster page loads and interactions
4. **Accessibility support** - Better element detection
## Safety Notes
- Only automate public websites
- Respect robots.txt and terms of service
- Avoid excessive requests to prevent IP blocking
- Use delays between actions to mimic human behavior
## Troubleshooting
### Common Issues
1. **Browser not starting**: Check if Chrome is installed
2. **Element not found**: Try different selectors or wait for page load
3. **Timeout errors**: Increase timeout values for slow pages
4. **Permission denied**: Ensure OpenClaw has necessary permissions
### Debug Tips
```bash
# Enable verbose logging
openclaw browser open --url "https://example.com" --verbose
# Check browser status
openclaw browser status
```
## References
- [OpenClaw Browser Documentation](https://docs.openclaw.ai/tools/browser)
- [Playwright Automation Guide](https://playwright.dev/docs/automation)
- [Web Scraping Best Practices](https://docs.openclaw.ai/automation/web-scraping)
FILE:scripts/browser-automation.js
#!/usr/bin/env node
/**
* Agent Browser Automation Script
* Provides enhanced browser automation for OpenClaw agents
*/
const { exec } = require('child_process');
const { promisify } = require('util');
const execAsync = promisify(exec);
class BrowserAutomation {
constructor() {
this.browserCommands = {
open: 'openclaw browser open',
snapshot: 'openclaw browser snapshot',
navigate: 'openclaw browser navigate',
act: 'openclaw browser act',
status: 'openclaw browser status'
};
}
/**
* Execute a browser command
*/
async executeCommand(command, args = {}) {
try {
let cmd = this.browserCommands[command];
if (!cmd) {
throw new Error(`Unknown browser command: command`);
}
// Add arguments
for (const [key, value] of Object.entries(args)) {
if (value !== undefined && value !== null) {
if (typeof value === 'boolean' && value) {
cmd += ` --key`;
} else if (typeof value === 'string' || typeof value === 'number') {
cmd += ` --key "value"`;
} else if (typeof value === 'object') {
cmd += ` --key 'JSON.stringify(value)'`;
}
}
}
console.log(`Executing: cmd`);
const { stdout, stderr } = await execAsync(cmd);
if (stderr) {
console.warn('Warning:', stderr);
}
return stdout;
} catch (error) {
console.error('Error executing browser command:', error.message);
throw error;
}
}
/**
* Open a URL in browser
*/
async openUrl(url, options = {}) {
return this.executeCommand('open', {
url,
...options
});
}
/**
* Take a screenshot of a page
*/
async takeScreenshot(url, outputPath, options = {}) {
return this.executeCommand('snapshot', {
url,
output: outputPath,
...options
});
}
/**
* Navigate to a URL
*/
async navigate(url, options = {}) {
return this.executeCommand('navigate', {
url,
...options
});
}
/**
* Perform an action on the page
*/
async performAction(action, options = {}) {
return this.executeCommand('act', {
kind: action,
...options
});
}
/**
* Check browser status
*/
async getStatus() {
return this.executeCommand('status');
}
/**
* Extract text from a page
*/
async extractText(url, maxChars = 5000) {
return this.executeCommand('snapshot', {
url,
maxChars,
mode: 'efficient'
});
}
/**
* Fill a form
*/
async fillForm(url, formData, options = {}) {
return this.executeCommand('act', {
url,
kind: 'fill',
fields: formData,
...options
});
}
/**
* Click an element
*/
async clickElement(url, selector, options = {}) {
return this.executeCommand('act', {
url,
kind: 'click',
selector,
...options
});
}
}
// CLI interface
if (require.main === module) {
const automation = new BrowserAutomation();
const args = process.argv.slice(2);
if (args.length === 0) {
console.log(`
Agent Browser Automation CLI
Usage:
node browser-automation.js <command> [options]
Commands:
open <url> Open a URL
screenshot <url> <output> Take screenshot
navigate <url> Navigate to URL
status Check browser status
extract <url> Extract page text
click <url> <selector> Click an element
fill <url> <json> Fill a form
Examples:
node browser-automation.js open "https://example.com"
node browser-automation.js screenshot "https://example.com" screenshot.png
node browser-automation.js extract "https://news.com" --maxChars 3000
`);
process.exit(0);
}
const command = args[0];
switch (command) {
case 'open':
automation.openUrl(args[1]).then(console.log).catch(console.error);
break;
case 'screenshot':
automation.takeScreenshot(args[1], args[2]).then(console.log).catch(console.error);
break;
case 'navigate':
automation.navigate(args[1]).then(console.log).catch(console.error);
break;
case 'status':
automation.getStatus().then(console.log).catch(console.error);
break;
case 'extract':
automation.extractText(args[1], args[2] || 5000).then(console.log).catch(console.error);
break;
default:
console.error(`Unknown command: command`);
process.exit(1);
}
}
module.exports = BrowserAutomation;