@clawhub-anderskev-60f39d7981
Use when writing or formatting an ADR document using the MADR template, applying Definition of Done (E.C.A.D.R.) criteria, or verifying ADR completeness. Tri...
---
name: adr-writing
description: "Use when writing or formatting an ADR document using the MADR template, applying Definition of Done (E.C.A.D.R.) criteria, or verifying ADR completeness. Triggers on \"write the ADR\", \"format as MADR\", \"check ADR quality\", \"mark gaps in ADR\". Also triggers when a decision has been extracted and needs to become a document. Does NOT extract decisions from conversations (use adr-decision-extraction) or orchestrate the full extract-confirm-write workflow (use write-adr)."
---
# ADR Writing
## Overview
Generate Architectural Decision Records (ADRs) following the MADR template with systematic completeness checking.
## Quick Reference
```
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ SEQUENCE │ ──▶ │ EXPLORE │ ──▶ │ FILL │
│ (get next │ │ (context, │ │ (template │
│ number) │ │ ADRs) │ │ sections) │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
│ ▼
│ ┌─────────────┐
│ │ VERIFY │
│ │ (DoD │
└─────────────────────────────────│ checklist)│
└─────────────┘
```
## When To Use
- Documenting architectural decisions from extracted requirements
- Converting meeting notes or discussions to formal ADRs
- Recording technical choices from PR discussions
- Creating decision records from design documents
## Workflow
### Gates (objective pass conditions)
Advance to the next step only when the **pass condition** holds. These replace “I explored” / “I verified” with checkable artifacts.
| After | Pass condition |
|-------|----------------|
| Step 2 | **Pass:** You have a written list (bullets in draft preamble, scratch notes, or the ADR body) of **≥0** paths under `docs/adrs/` you consulted for related/superseded ADRs, **or** you explicitly record that `docs/adrs/` is missing or empty after checking. **And** you list **≥1** repo path for related code **or** `N/A` with one-line reason. |
| Step 5 | **Pass:** For each E, C, A, D, R in `references/definition-of-done.md`, the draft either meets that letter’s checklist **or** contains an `[INVESTIGATE: …]` marker scoped to that gap. |
| Step 7 | **Pass:** The ADR file exists at `docs/adrs/NNNN-slugified-title.md`, and a read of the file shows line 1 is `---` and frontmatter parses as YAML. |
### Step 1: Get Sequence Number
**If a number was pre-assigned** (e.g., when called from `/beagle:write-adr` with parallel writes):
- Use the pre-assigned number directly
- Do NOT call the script - this prevents duplicate numbers in parallel execution
**If no number was pre-assigned** (standalone use):
```bash
python scripts/next_adr_number.py
```
This outputs the next available ADR number (e.g., `0003`).
For parallel allocation (used by parent commands):
```bash
python scripts/next_adr_number.py --count 3
# Outputs: 0003, 0004, 0005 (one per line)
```
### Step 2: Explore Context
Before writing, gather additional context:
1. **Related code** - Find implementations affected by this decision
2. **Existing ADRs** - Check `docs/adrs/` for related or superseded decisions
3. **Discussion sources** - PRs, issues, or documents referenced in decision
**Gate:** Meet the Step 2 row in **Gates (objective pass conditions)** before Step 3.
### Step 3: Load Template
Load `references/madr-template.md` for the official MADR structure.
### Step 4: Fill Sections
Populate each section from your decision data:
| Section | Source |
|---------|--------|
| Title | Decision summary (imperative mood) |
| Status | Always `draft` initially |
| Context | Problem statement, constraints |
| Decision Drivers | Prioritized requirements |
| Considered Options | All viable alternatives |
| Decision Outcome | Chosen option with rationale |
| Consequences | Good, bad, neutral impacts |
### Step 5: Apply Definition of Done
Load `references/definition-of-done.md` and verify E.C.A.D.R. criteria:
- **E**xplicit problem statement
- **C**omprehensive options analysis
- **A**ctionable decision
- **D**ocumented consequences
- **R**eviewable by stakeholders
**Gate:** Meet the Step 5 row in **Gates (objective pass conditions)** before Step 6 (use `[INVESTIGATE: …]` where data is missing).
### Step 6: Mark Gaps
For sections that cannot be filled from available data, insert investigation prompts:
```markdown
* [INVESTIGATE: Review PR #42 discussion for additional drivers]
* [INVESTIGATE: Confirm with security team on compliance requirements]
* [INVESTIGATE: Benchmark performance of Option 2 vs Option 3]
```
These prompts signal incomplete sections for later follow-up.
### Step 7: Write File
**IMPORTANT: Every ADR MUST start with YAML frontmatter.**
The frontmatter block is REQUIRED and must include at minimum:
```yaml
---
status: draft
date: YYYY-MM-DD
---
```
Full frontmatter template:
```yaml
---
status: draft
date: 2024-01-15
decision-makers: [alice, bob]
consulted: []
informed: []
---
```
**Validation:** Before writing the file, verify the content starts with `---` followed by valid YAML frontmatter. If frontmatter is missing, add it before writing.
**Gate:** After write, meet the Step 7 row in **Gates (objective pass conditions)** (file on disk, YAML frontmatter present).
Save to `docs/adrs/NNNN-slugified-title.md`:
```
docs/adrs/0003-use-postgresql-for-user-data.md
docs/adrs/0004-adopt-event-sourcing-pattern.md
docs/adrs/0005-migrate-to-kubernetes.md
```
### Step 8: Verify Frontmatter
After writing, confirm the file:
1. Starts with `---` on the first line
2. Contains `status: draft` (or other valid status)
3. Contains `date: YYYY-MM-DD` with actual date
4. Ends frontmatter with `---` before the title
## File Naming Convention
Format: `NNNN-slugified-title.md`
| Component | Rule |
|-----------|------|
| `NNNN` | Zero-padded sequence number from script |
| `-` | Separator |
| `slugified-title` | Lowercase, hyphens, no special characters |
| `.md` | Markdown extension |
## Reference Files
- `references/madr-template.md` - Official MADR template structure
- `references/definition-of-done.md` - E.C.A.D.R. quality criteria
## Output Example
```markdown
---
status: draft
date: 2024-01-15
decision-makers: [alice, bob]
---
# Use PostgreSQL for User Data Storage
## Context and Problem Statement
We need a database for user account data...
## Decision Drivers
* Data integrity requirements
* Query flexibility needs
* [INVESTIGATE: Confirm scaling projections with infrastructure team]
## Considered Options
* PostgreSQL
* MongoDB
* CockroachDB
## Decision Outcome
Chosen option: PostgreSQL, because...
## Consequences
### Good
* ACID compliance ensures data integrity
### Bad
* Requires more upfront schema design
### Neutral
* Team has moderate PostgreSQL experience
```
FILE:references/definition-of-done.md
# Definition of Done: E.C.A.D.R. Criteria
An ADR is complete when it meets all five E.C.A.D.R. criteria.
## E.C.A.D.R. Checklist
### E - Explicit Problem Statement
| Check | Criteria |
|-------|----------|
| [ ] | Context describes a real, specific problem |
| [ ] | Problem is scoped (not too broad, not too narrow) |
| [ ] | Constraints and requirements are stated |
| [ ] | Reader understands WHY a decision is needed |
**Anti-patterns:**
- "We need to choose a database" (too vague)
- Problem buried in decision outcome section
- Missing business or technical context
### C - Comprehensive Options Analysis
| Check | Criteria |
|-------|----------|
| [ ] | At least 2 options considered |
| [ ] | Options are genuinely viable (not strawmen) |
| [ ] | Each option has pros AND cons listed |
| [ ] | "Do nothing" considered if applicable |
**Anti-patterns:**
- Single option presented as foregone conclusion
- Options listed without analysis
- Missing obvious alternatives
### A - Actionable Decision
| Check | Criteria |
|-------|----------|
| [ ] | Chosen option is clearly stated |
| [ ] | Decision is specific enough to implement |
| [ ] | Rationale links to decision drivers |
| [ ] | No ambiguity about what was decided |
**Anti-patterns:**
- "We will use a modern approach" (vague)
- Decision contradicts stated constraints
- Missing implementation guidance
### D - Documented Consequences
| Check | Criteria |
|-------|----------|
| [ ] | Good consequences listed |
| [ ] | Bad consequences listed (honest tradeoffs) |
| [ ] | Operational impacts considered |
| [ ] | Future implications noted |
**Anti-patterns:**
- Only positive consequences (overselling)
- Generic consequences that apply to any option
- Missing security, performance, or cost impacts
### R - Reviewable by Stakeholders
| Check | Criteria |
|-------|----------|
| [ ] | Status is set appropriately |
| [ ] | Decision-makers are identified |
| [ ] | Language is accessible (not jargon-heavy) |
| [ ] | Sufficient context for outsiders to understand |
**Anti-patterns:**
- Missing metadata (date, status, authors)
- Assumes reader context not in document
- Dense technical prose without summaries
## Quality Rubric
| Score | Criteria Met | Status |
|-------|--------------|--------|
| 5/5 | All E.C.A.D.R. criteria | Ready for `proposed` |
| 4/5 | One minor gap | Add `[INVESTIGATE]` prompt |
| 3/5 | Two gaps | Needs revision before proposing |
| 2/5 | Major gaps | Incomplete draft |
| 1/5 | Minimal content | Placeholder only |
## Using [INVESTIGATE] Prompts
When a criterion cannot be met from available information, insert an investigation prompt:
```markdown
## Decision Drivers
* Performance under 100ms response time
* [INVESTIGATE: Confirm budget constraints with finance team]
* Compatibility with existing Python stack
```
These prompts:
1. Signal incomplete sections
2. Document what information is missing
3. Enable async follow-up
4. Prevent premature status advancement
## Status Progression
```
draft ──▶ proposed ──▶ accepted
│ │
│ ▼
│ rejected
│
└──▶ [fix gaps, remove INVESTIGATE prompts]
```
Do not advance to `proposed` until all `[INVESTIGATE]` prompts are resolved.
## Review Checklist
Final pass before marking `proposed`:
- [ ] No `[INVESTIGATE]` prompts remain
- [ ] All E.C.A.D.R. criteria checked
- [ ] File named correctly (`NNNN-slugified-title.md`)
- [ ] Frontmatter complete (status, date, decision-makers)
- [ ] Links to related ADRs if superseding/related
FILE:references/madr-template.md
# MADR Template
> Markdown Any Decision Records (MADR) - https://adr.github.io/madr/
## Template Structure
```markdown
---
status: {draft | proposed | accepted | rejected | deprecated | superseded by [ADR-NNNN](NNNN-title.md)}
date: YYYY-MM-DD
decision-makers: [list of involved people]
consulted: [list of people whose opinions are sought]
informed: [list of people who are kept up-to-date]
---
# {Title: Short imperative statement of decision}
## Context and Problem Statement
{Describe the context and problem statement, e.g., in free form using two to three sentences or in the form of an illustrative story. You may want to articulate the problem in form of a question.}
## Decision Drivers
* {decision driver 1, e.g., a force, facing concern, ...}
* {decision driver 2, e.g., a force, facing concern, ...}
* ...
## Considered Options
* {title of option 1}
* {title of option 2}
* {title of option 3}
* ...
## Decision Outcome
Chosen option: "{title of option 1}", because {justification. e.g., only option, which meets k.o. criterion decision driver | which resolves force {force} | ... | comes out best (see below)}.
### Consequences
* Good, because {positive consequence, e.g., improvement of one or more desired qualities, ...}
* Bad, because {negative consequence, e.g., compromising one or more desired qualities, ...}
* Neutral, because {neutral consequence, neither positive nor negative}
### Confirmation
{Describe how the implementation of/compliance with the ADR is confirmed. E.g., by a review or an ArchUnit test. Although we classify this element as optional, it is recommended to include it.}
## Pros and Cons of the Options
### {title of option 1}
{example | description | pointer to more information | ...}
* Good, because {argument a}
* Good, because {argument b}
* Neutral, because {argument c}
* Bad, because {argument d}
* ...
### {title of option 2}
{example | description | pointer to more information | ...}
* Good, because {argument a}
* Good, because {argument b}
* Neutral, because {argument c}
* Bad, because {argument d}
* ...
### {title of option 3}
{example | description | pointer to more information | ...}
* Good, because {argument a}
* Good, because {argument b}
* Neutral, because {argument c}
* Bad, because {argument d}
* ...
## More Information
{You might want to provide additional evidence/confidence for the decision outcome here and/or document the team agreement on the decision and/or define when this decision should be re-considered and/or links to other decisions and resources.}
```
## Section Guide
### Status Values
| Status | Meaning |
|--------|---------|
| `draft` | Initial creation, not yet reviewed |
| `proposed` | Ready for team review |
| `accepted` | Approved and active |
| `rejected` | Considered but not adopted |
| `deprecated` | No longer recommended |
| `superseded by [ADR-NNNN]` | Replaced by newer decision |
### Title
- Use imperative mood ("Use X", "Adopt Y", "Migrate to Z")
- Keep concise (5-10 words)
- Start with verb
### Context and Problem Statement
- 2-4 sentences describing the situation
- Can be phrased as a question
- Include relevant constraints
### Decision Drivers
- List forces influencing the decision
- Prioritize by importance
- Include both technical and business drivers
### Considered Options
- Minimum 2 options (including chosen)
- Include "do nothing" if viable
- Brief titles, details in Pros/Cons section
### Decision Outcome
- State chosen option clearly
- Explain why it was chosen
- Reference decision drivers it satisfies
### Consequences
- Categorize as Good/Bad/Neutral
- Be honest about tradeoffs
- Include operational impacts
## Optional Sections
These sections enhance completeness but may be omitted for simpler decisions:
- **Confirmation** - How to verify compliance
- **Pros and Cons of the Options** - Detailed option analysis
- **More Information** - Links, references, caveats
## Minimal Template
For quick decisions, use this shortened form:
```markdown
---
status: draft
date: YYYY-MM-DD
---
# {Title}
## Context and Problem Statement
{description}
## Decision Drivers
* {driver 1}
* {driver 2}
## Decision Outcome
Chosen option: "{option}", because {reason}.
### Consequences
* Good, because {positive}
* Bad, because {negative}
```
FILE:scripts/next_adr_number.py
#!/usr/bin/env python3
"""Get the next ADR sequence number.
Scans docs/adrs/ for existing ADRs and returns the next available number.
Usage:
python scripts/next_adr_number.py
python scripts/next_adr_number.py --dir /path/to/docs/adrs
python scripts/next_adr_number.py --count 3 # Pre-allocate 3 numbers for parallel writes
"""
import argparse
import re
import sys
from pathlib import Path
def find_adr_directory() -> Path:
"""Find the ADR directory by searching up from cwd."""
candidates = [
Path("docs/adrs"),
Path("docs/adr"),
Path("adr"),
Path("adrs"),
Path("doc/adr"),
Path("doc/adrs"),
]
# Search from current directory
cwd = Path.cwd()
for candidate in candidates:
if (cwd / candidate).is_dir():
return cwd / candidate
# Search up to git root
git_root = cwd
while git_root != git_root.parent:
if (git_root / ".git").exists():
break
git_root = git_root.parent
for candidate in candidates:
if (git_root / candidate).is_dir():
return git_root / candidate
return cwd / "docs/adrs"
def get_existing_numbers(adr_dir: Path) -> list[int]:
"""Extract ADR numbers from filenames in the directory."""
pattern = re.compile(r"^(\d{4})-.*\.md$")
numbers = []
if not adr_dir.exists():
return numbers
for file in adr_dir.iterdir():
if file.is_file():
match = pattern.match(file.name)
if match:
numbers.append(int(match.group(1)))
return sorted(numbers)
def next_number(existing: list[int]) -> int:
"""Calculate the next ADR number."""
if not existing:
return 1
return max(existing) + 1
def format_number(num: int) -> str:
"""Format number as zero-padded 4-digit string."""
return f"{num:04d}"
def main() -> int:
parser = argparse.ArgumentParser(
description="Get the next ADR sequence number"
)
parser.add_argument(
"--dir",
type=Path,
help="ADR directory (auto-detected if not specified)",
)
parser.add_argument(
"--list",
action="store_true",
help="List existing ADR numbers",
)
parser.add_argument(
"--count",
type=int,
default=1,
help="Number of sequential ADR numbers to allocate (for parallel writes)",
)
args = parser.parse_args()
adr_dir = args.dir or find_adr_directory()
existing = get_existing_numbers(adr_dir)
if args.list:
if existing:
print(f"ADR directory: {adr_dir}")
print(f"Existing ADRs: {[format_number(n) for n in existing]}")
else:
print(f"No ADRs found in {adr_dir}")
return 0
next_num = next_number(existing)
if args.count == 1:
print(format_number(next_num))
else:
# Output multiple numbers, one per line, for parallel allocation
allocated = [format_number(next_num + i) for i in range(args.count)]
print("\n".join(allocated))
return 0
if __name__ == "__main__":
sys.exit(main())
Use when you need to mine a conversation, session transcript, or design discussion for architectural decisions before writing ADRs. Identifies problem-soluti...
---
name: adr-decision-extraction
description: "Use when you need to mine a conversation, session transcript, or design discussion for architectural decisions before writing ADRs. Identifies problem-solution pairs, trade-off debates, technology choices, and explicit \"[ADR]\" tags. Triggers on \"what decisions did we make\", \"extract decisions from this chat\", \"find the choices in our discussion\", or \"summarize architectural decisions\". Also useful after long planning sessions to capture decisions that were made implicitly. Does NOT write ADR documents \u2014 use adr-writing or write-adr for that."
---
# ADR Decision Extraction
Extract architectural decisions from conversation context for ADR generation.
## Detection Signals
| Signal Type | Examples |
|-------------|----------|
| Explicit markers | `[ADR]`, "decided:", "the decision is" |
| Choice patterns | "let's go with X", "we'll use Y", "choosing Z" |
| Trade-off discussions | "X vs Y", "pros/cons", "considering alternatives" |
| Problem-solution pairs | "the problem is... so we'll..." |
## Extraction Rules
### Explicit Tags (Guaranteed Inclusion)
Text marked with `[ADR]` is always extracted:
```
[ADR] Using PostgreSQL for user data storage due to ACID requirements
```
These receive `confidence: "high"` automatically.
### AI-Detected Decisions
Patterns detected without explicit tags require confidence assessment:
| Confidence | Criteria |
|------------|----------|
| **high** | Clear statement of choice with rationale |
| **medium** | Implied decision from action taken |
| **low** | Contextual inference, may need verification |
## Output Format
```json
{
"decisions": [
{
"title": "Use PostgreSQL for user data",
"problem": "Need ACID transactions for financial records",
"chosen_option": "PostgreSQL",
"alternatives_discussed": ["MongoDB", "SQLite"],
"drivers": ["ACID compliance", "team familiarity"],
"confidence": "high",
"source_context": "Discussion about database selection in planning phase"
}
]
}
```
### Field Definitions
| Field | Required | Description |
|-------|----------|-------------|
| `title` | Yes | Concise decision summary |
| `problem` | Yes | Problem or context driving the decision |
| `chosen_option` | Yes | The selected solution or approach |
| `alternatives_discussed` | No | Other options mentioned (empty array if none) |
| `drivers` | No | Factors influencing the decision |
| `confidence` | Yes | `high`, `medium`, or `low` |
| `source_context` | No | Brief description of where decision appeared |
## Extraction Workflow
1. **Scan for explicit markers** - Find all `[ADR]` tagged content
2. **Identify choice patterns** - Look for decision language
3. **Extract trade-off discussions** - Capture alternatives and reasoning
4. **Assess confidence** - Rate each non-explicit decision
5. **Capture context** - Note surrounding discussion for ADR writer
## Hard gates
Run these **in order** after the workflow above and **before** returning output. Each step has an objective pass condition.
1. **Explicit `[ADR]` inventory** — Capture every `[ADR]` segment from the full source (verbatim in working notes). **Pass:** a second pass over the same source adds no new `[ADR]` blocks.
2. **De-duplicate** — Merge or drop inferred rows that repeat an explicit `[ADR]` decision (see [Merge Related Decisions](#merge-related-decisions)). **Pass:** at most one row per distinct decision.
3. **Schema validity** — Serialized JSON matches [Output Format](#output-format) and [Field Definitions](#field-definitions). **Pass:** parse succeeds; every `decisions[]` item has non-empty `title`, `problem`, `chosen_option`; `confidence` ∈ {`high`,`medium`,`low`}; `alternatives_discussed` is an array (use `[]` if none); other optional fields per table.
4. **Low-confidence audit** — For any `confidence: "low"`, `source_context` states what was missing, weak, or contradictory. **Pass:** a reader can see why the rating is not higher.
## Pattern Examples
### High Confidence
```
"We decided to use Redis for caching because of its sub-millisecond latency
and native TTL support. Memcached was considered but lacks persistence."
```
Extracts:
- Title: Use Redis for caching
- Problem: Need fast caching with TTL
- Chosen: Redis
- Alternatives: Memcached
- Drivers: sub-millisecond latency, native TTL, persistence
- Confidence: high
### Medium Confidence
```
"Let's go with TypeScript for the frontend since we're already using it
in the backend."
```
Extracts:
- Title: Use TypeScript for frontend
- Problem: Language choice for frontend
- Chosen: TypeScript
- Alternatives: (none stated)
- Drivers: consistency with backend
- Confidence: medium
### Low Confidence
```
"The API seems to be working well with REST endpoints."
```
Extracts:
- Title: REST API architecture
- Problem: API design approach
- Chosen: REST
- Alternatives: (none stated)
- Drivers: (none stated)
- Confidence: low
## Best Practices
### Context Capture
Always capture sufficient context for the ADR writer:
- What was the discussion about?
- Who was involved (if known)?
- What prompted the decision?
### Merge Related Decisions
If multiple statements relate to the same decision, consolidate them:
- Combine alternatives from different mentions
- Aggregate drivers
- Use highest confidence level
### Flag Ambiguity
When decisions are unclear or contradictory:
- Note the ambiguity in `source_context`
- Set confidence to `low`
- Include all interpretations if multiple exist
## When to Use This Skill
- Analyzing session transcripts for ADR generation
- Reviewing conversation history for documentation
- Extracting decisions from design discussions
- Preparing input for ADR writing tools
Perform 12-Factor App compliance analysis on any codebase. Use when evaluating application architecture, auditing SaaS applications, or reviewing cloud-nativ...
---
name: 12-factor-apps
description: Perform 12-Factor App compliance analysis on any codebase. Use when evaluating application architecture, auditing SaaS applications, or reviewing cloud-native applications against the original 12-Factor methodology.
---
# 12-Factor App Compliance Analysis
> Reference: [The Twelve-Factor App](https://12factor.net)
## Overview
The 12-Factor App methodology is a set of best practices for building Software-as-a-Service applications that are:
- Portable across execution environments
- Scalable without architectural changes
- Suitable for continuous deployment
- Maintainable with minimal friction
## Input Parameters
| Parameter | Description | Required |
|-----------|-------------|----------|
| `codebase_path` | Root path of the codebase to analyze | Required |
## Analysis Framework
### Factor I: Codebase
**Principle:** One codebase tracked in revision control, many deploys.
**Search Patterns:**
```bash
# Check for version control
ls -la .git 2>/dev/null || ls -la .hg 2>/dev/null
# Check for multiple apps sharing codebase
find . -name "package.json" -o -name "pyproject.toml" -o -name "setup.py" | head -20
# Check for environment-specific code branches
grep -r "if.*production\|if.*development\|if.*staging" --include="*.py" --include="*.js" --include="*.ts"
```
**File Patterns:** `.git/`, `package.json`, `pyproject.toml`, deployment configs
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Single Git repo, same codebase for all environments, no env-specific code branches |
| **Partial** | Single repo but some environment-specific code paths |
| **Weak** | Multiple repos for same app or significant code duplication across environments |
**Anti-patterns:**
- Multiple Git repositories for the same application
- Environment-specific code branches (`if production: ...`)
- Different source files for dev vs prod
- Shared code not extracted to libraries
---
### Factor II: Dependencies
**Principle:** Explicitly declare and isolate dependencies.
**Search Patterns:**
```bash
# Python dependency files
find . -name "requirements.txt" -o -name "pyproject.toml" -o -name "setup.py" -o -name "Pipfile" -o -name "uv.lock"
# JavaScript/TypeScript dependency files
find . -name "package.json" -o -name "package-lock.json" -o -name "yarn.lock" -o -name "pnpm-lock.yaml"
# Check for system tool assumptions
grep -r "subprocess.*curl\|subprocess.*wget\|os.system.*ffmpeg\|shutil.which" --include="*.py"
grep -r "exec.*curl\|child_process.*curl" --include="*.js" --include="*.ts"
# Docker/container isolation
find . -name "Dockerfile" -o -name "docker-compose*.yml"
```
**File Patterns:** `**/requirements*.txt`, `**/package.json`, `**/*.lock`, `**/Dockerfile`
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Lock files present, dependency isolation (venv/Docker), no implicit system tools |
| **Partial** | Dependencies declared but no lock files or isolation |
| **Weak** | Dependencies in documentation only, relies on system-installed packages |
**Anti-patterns:**
- Missing lock files (non-deterministic builds)
- Assuming system tools (curl, ImageMagick, ffmpeg) are available
- Different dependency managers in dev vs production
- No virtual environment or container isolation
---
### Factor III: Config
**Principle:** Store config in the environment.
**Search Patterns:**
```bash
# Environment variable usage
grep -r "os.environ\|os.getenv\|process.env\|ENV\[" --include="*.py" --include="*.js" --include="*.ts" --include="*.rb"
# Hardcoded credentials (anti-pattern)
grep -r "password.*=.*['\"]" --include="*.py" --include="*.js" --include="*.ts" | grep -v "test\|spec\|example"
grep -r "api_key.*=.*['\"]" --include="*.py" --include="*.js" --include="*.ts" | grep -v "test\|spec\|example"
grep -r "secret.*=.*['\"]" --include="*.py" --include="*.js" --include="*.ts" | grep -v "test\|spec\|example"
# Environment-specific config files (anti-pattern)
find . -name "config.dev.*" -o -name "config.prod.*" -o -name "settings.development.*" -o -name "settings.production.*"
# Database URLs in code
grep -r "postgresql://\|mysql://\|mongodb://\|redis://" --include="*.py" --include="*.js" --include="*.ts" | grep -v ".env\|test\|example"
```
**File Patterns:** `**/.env*`, `**/config/*.py`, `**/settings.py`, environment files
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | All config via environment variables, no hardcoded secrets, could open-source without leaks |
| **Partial** | Most config externalized but some hardcoded defaults |
| **Weak** | Hardcoded credentials, environment-specific config files |
**Anti-patterns:**
- Hardcoded database URLs, API keys, passwords in source
- Config files like `config/production.yml` vs `config/development.yml`
- Environment grouping (`if ENV == 'production': ...`)
- Secrets committed to version control
---
### Factor IV: Backing Services
**Principle:** Treat backing services as attached resources.
**Search Patterns:**
```bash
# Database connection via config
grep -r "DATABASE_URL\|DB_HOST\|REDIS_URL\|CACHE_URL" --include="*.py" --include="*.js" --include="*.ts"
# Service initialization
grep -r "create_engine\|MongoClient\|Redis\|Celery\|boto3" --include="*.py"
grep -r "createPool\|createClient\|new Redis\|S3Client" --include="*.js" --include="*.ts"
# Hardcoded service locations (anti-pattern)
grep -r "localhost:5432\|localhost:6379\|localhost:27017\|127.0.0.1" --include="*.py" --include="*.js" --include="*.ts" | grep -v "test\|spec\|example\|default"
```
**File Patterns:** `**/database/*.py`, `**/services/*.py`, `**/db.py`, connection configurations
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | All services via URL/connection string in config, swappable without code changes |
| **Partial** | Most services configurable but some hardcoded defaults |
| **Weak** | Hardcoded service locations, different code paths per environment |
**Anti-patterns:**
- Hardcoded `localhost` for services in production code
- Conditional logic for local vs cloud services (`if USE_S3: ... else: local_storage`)
- Service-specific code paths based on environment
- Different drivers for dev vs prod
---
### Factor V: Build, Release, Run
**Principle:** Strictly separate build and run stages.
**Search Patterns:**
```bash
# Build/deploy configuration
find . -name "Dockerfile" -o -name "Makefile" -o -name "build.sh" -o -name "deploy.sh"
find . -name ".github/workflows/*.yml" -o -name ".gitlab-ci.yml" -o -name "Jenkinsfile"
# Build scripts in package.json
grep -A5 '"scripts"' package.json 2>/dev/null | grep -E "build|start|deploy"
# Check for runtime compilation (anti-pattern)
grep -r "compile\|transpile\|webpack" --include="*.py" | grep -v "test\|build"
```
**File Patterns:** `**/Dockerfile`, `**/Makefile`, `**/.github/workflows/**`, CI/CD configs
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Immutable releases, clear build/release/run stages, unique release IDs |
| **Partial** | Build and run separated but release not immutable |
| **Weak** | Runtime code modifications, asset compilation at startup |
**Anti-patterns:**
- Runtime code modifications
- Asset compilation during application startup
- Configuration baked into build artifacts
- No release versioning
---
### Factor VI: Processes
**Principle:** Execute the app as one or more stateless processes.
**Search Patterns:**
```bash
# Session storage patterns
grep -r "session\|Session" --include="*.py" --include="*.js" --include="*.ts" | head -20
# In-process state (anti-pattern)
grep -r "global.*cache\|process_local\|instance_cache" --include="*.py"
grep -r "global\..*=\|module\.exports\.cache" --include="*.js" --include="*.ts"
# External session stores (good pattern)
grep -r "redis.*session\|memcached.*session\|session.*redis" --include="*.py" --include="*.js" --include="*.ts"
# Sticky session configuration (anti-pattern)
grep -r "sticky.*session\|session.*affinity" --include="*.yml" --include="*.yaml" --include="*.json"
```
**File Patterns:** `**/middleware/*.py`, `**/session/*.py`, server configurations
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Stateless processes, all state in external datastores (Redis, DB) |
| **Partial** | Mostly stateless but some in-process caching |
| **Weak** | Sticky sessions, in-process session storage, shared memory state |
**Anti-patterns:**
- In-process session storage (`user_sessions = {}`)
- Sticky sessions or session affinity
- File-based caching between requests
- Global mutable state shared across requests
---
### Factor VII: Port Binding
**Principle:** Export services via port binding.
**Search Patterns:**
```bash
# Self-contained port binding
grep -r "app.run\|server.listen\|serve\|uvicorn" --include="*.py"
grep -r "app.listen\|server.listen\|createServer" --include="*.js" --include="*.ts"
# PORT environment variable
grep -r "PORT\|port" --include="*.py" --include="*.js" --include="*.ts" | grep -i "environ\|process.env"
# Webserver as dependency
grep -r "uvicorn\|gunicorn\|flask\|fastapi\|express\|koa\|hapi" package.json pyproject.toml requirements.txt 2>/dev/null
```
**File Patterns:** `**/main.py`, `**/server.py`, `**/app.py`, `**/index.js`
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Self-contained app binds to PORT, webserver is a dependency |
| **Partial** | Port binding but not configurable via environment |
| **Weak** | Relies on external webserver container (Apache, Nginx) to provide HTTP |
**Anti-patterns:**
- Relying on Apache/Nginx/Tomcat to inject webserver functionality
- Hardcoded port numbers
- No PORT environment variable support
- CGI scripts or server modules
---
### Factor VIII: Concurrency
**Principle:** Scale out via the process model.
**Search Patterns:**
```bash
# Process definitions
find . -name "Procfile" -o -name "process.yml" -o -name ".foreman"
# Multiple entry points
find . -name "worker.py" -o -name "scheduler.py" -o -name "web.py"
# Background job systems
grep -r "celery\|rq\|sidekiq\|bull\|agenda" --include="*.py" --include="*.js" --include="*.ts"
grep -r "Celery\|Worker\|BackgroundJob" --include="*.py" --include="*.js" --include="*.ts"
```
**File Patterns:** `**/Procfile`, `**/worker.py`, `**/scheduler.py`, queue configurations
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Explicit process types (web, worker, scheduler), horizontal scaling |
| **Partial** | Multiple process types but not easily scalable |
| **Weak** | Single monolithic process, no separation of concerns |
**Anti-patterns:**
- Single process handling all workloads
- Hard-coded worker counts in code
- No separation between web and background processes
- Vertical scaling only (bigger server, not more processes)
---
### Factor IX: Disposability
**Principle:** Maximize robustness with fast startup and graceful shutdown.
**Search Patterns:**
```bash
# Signal handlers
grep -r "signal.signal\|SIGTERM\|SIGINT\|atexit" --include="*.py"
grep -r "process.on.*SIGTERM\|process.on.*SIGINT" --include="*.js" --include="*.ts"
# Graceful shutdown
grep -r "graceful.*shutdown\|shutdown_handler\|cleanup" --include="*.py" --include="*.js" --include="*.ts"
# Startup time
grep -r "startup\|initialize\|bootstrap" --include="*.py" --include="*.js" --include="*.ts" | head -20
```
**File Patterns:** `**/main.py`, `**/server.py`, lifecycle management code
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Fast startup (<10s), SIGTERM handling, graceful shutdown, jobs returnable to queue |
| **Partial** | Graceful shutdown but slow startup |
| **Weak** | No signal handling, jobs lost on process death, slow startup |
**Anti-patterns:**
- No SIGTERM/SIGINT handlers
- Slow startup (>30 seconds)
- Jobs lost if process crashes
- No cleanup on shutdown
---
### Factor X: Dev/Prod Parity
**Principle:** Keep development, staging, and production as similar as possible.
**Search Patterns:**
```bash
# Different services per environment (anti-pattern)
grep -r "if.*development.*sqlite\|if.*production.*postgres" --include="*.py" --include="*.js" --include="*.ts"
grep -r "development.*SQLite\|production.*PostgreSQL" --include="*.py" --include="*.js" --include="*.ts"
# Docker for parity
find . -name "docker-compose*.yml" -o -name "Dockerfile"
# Environment-specific backends
grep -r "USE_LOCAL_\|LOCAL_STORAGE\|MOCK_" --include="*.py" --include="*.js" --include="*.ts"
```
**File Patterns:** `**/docker-compose*.yml`, environment configurations
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Same services everywhere (PostgreSQL in dev and prod), containerized |
| **Partial** | Mostly same but some lightweight dev alternatives |
| **Weak** | SQLite in dev, PostgreSQL in prod; different backing services |
**Anti-patterns:**
- SQLite for development, PostgreSQL for production
- In-memory cache in dev, Redis in prod
- Different service versions across environments
- "It works on my machine" issues
---
### Factor XI: Logs
**Principle:** Treat logs as event streams.
**Search Patterns:**
```bash
# Stdout logging
grep -r "print(\|logging.info\|logger.info\|console.log" --include="*.py" --include="*.js" --include="*.ts" | head -20
# File-based logging (anti-pattern)
grep -r "FileHandler\|open.*\.log\|writeFile.*log\|fs.appendFile.*log" --include="*.py" --include="*.js" --include="*.ts"
grep -r "/var/log\|/tmp/.*\.log\|logs/" --include="*.py" --include="*.js" --include="*.ts" | grep -v "test\|example"
# Structured logging
grep -r "structlog\|json_logger\|pino\|winston" --include="*.py" --include="*.js" --include="*.ts"
```
**File Patterns:** `**/logging.py`, `**/logger.py`, logging configurations
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Unbuffered stdout only, structured logging (JSON), no file management |
| **Partial** | Stdout logging but with some file handlers |
| **Weak** | Application writes to log files, manages rotation |
**Anti-patterns:**
- Writing logs to files (`FileHandler`, `open('/var/log/app.log')`)
- Log rotation logic in application code
- Log archival managed by application
- Buffered logging
---
### Factor XII: Admin Processes
**Principle:** Run admin/management tasks as one-off processes.
**Search Patterns:**
```bash
# Management commands
find . -name "manage.py" -o -name "Rakefile" -o -name "artisan"
grep -r "@cli.command\|@click.command\|typer.command" --include="*.py"
# Migration scripts
find . -name "migrations" -type d
find . -name "*migration*.py" -o -name "*migrate*.py"
# Admin scripts with proper isolation
grep -r "bundle exec\|source.*venv\|uv run" --include="*.sh" --include="Makefile"
```
**File Patterns:** `**/manage.py`, `**/cli.py`, `**/migrations/**`, admin scripts
**Compliance Criteria:**
| Level | Criteria |
|-------|----------|
| **Strong** | Admin tasks use same dependencies/config, proper isolation, idempotent |
| **Partial** | Admin tasks exist but different setup from app |
| **Weak** | Manual database manipulation, scripts without isolation |
**Anti-patterns:**
- Admin scripts not using app's dependency manager
- Direct SQL manipulation outside of migrations
- Admin scripts with hardcoded credentials
- Non-idempotent migrations
---
## Output Format
### Executive Summary Table
```markdown
| Factor | Status | Notes |
|--------|--------|-------|
| I. Codebase | **Strong/Partial/Weak** | [Key finding] |
| II. Dependencies | **Strong/Partial/Weak** | [Key finding] |
| III. Config | **Strong/Partial/Weak** | [Key finding] |
| IV. Backing Services | **Strong/Partial/Weak** | [Key finding] |
| V. Build/Release/Run | **Strong/Partial/Weak** | [Key finding] |
| VI. Processes | **Strong/Partial/Weak** | [Key finding] |
| VII. Port Binding | **Strong/Partial/Weak** | [Key finding] |
| VIII. Concurrency | **Strong/Partial/Weak** | [Key finding] |
| IX. Disposability | **Strong/Partial/Weak** | [Key finding] |
| X. Dev/Prod Parity | **Strong/Partial/Weak** | [Key finding] |
| XI. Logs | **Strong/Partial/Weak** | [Key finding] |
| XII. Admin Processes | **Strong/Partial/Weak** | [Key finding] |
**Overall**: X Strong, Y Partial, Z Weak
```
### Per-Factor Analysis
For each factor, provide:
1. **Current Implementation**
- Evidence with file:line references
- Code snippets showing patterns
2. **Compliance Level**
- Strong/Partial/Weak with justification
3. **Gaps**
- What's missing vs. 12-Factor ideal
4. **Recommendations**
- Actionable improvements with code examples
---
## Analysis Workflow
1. **Initial Scan**
- Run search patterns for all factors
- Identify key files for each factor
- Note any existing compliance documentation
2. **Deep Dive** (per factor)
- Read identified files
- Evaluate against compliance criteria
- Document evidence with file paths
3. **Gap Analysis**
- Compare current vs. 12-Factor ideal
- Identify anti-patterns present
- Prioritize by impact
4. **Recommendations**
- Provide actionable improvements
- Include before/after code examples
- Reference best practices
5. **Summary**
- Compile executive summary table
- Highlight strengths and critical gaps
- Suggest priority order for improvements
---
## Quick Reference: Compliance Scoring
| Score | Meaning | Action |
|-------|---------|--------|
| **Strong** | Fully implements principle | Maintain, minor optimizations |
| **Partial** | Some implementation, significant gaps | Planned improvements |
| **Weak** | Minimal or no implementation | High priority for roadmap |
## When to Use This Skill
- Evaluating new SaaS applications
- Reviewing cloud-native architecture decisions
- Auditing production applications for scalability
- Planning migration to cloud platforms
- Comparing application architectures
- Preparing for containerization/Kubernetes deployment
Vercel AI SDK for building chat interfaces with streaming. Use when implementing useChat hook, handling tool calls, streaming responses, or building chat UI....
---
name: vercel-ai-sdk
description: Vercel AI SDK for building chat interfaces with streaming. Use when implementing useChat hook, handling tool calls, streaming responses, or building chat UI. Triggers on useChat, @ai-sdk/react, UIMessage, ChatStatus, streamText, toUIMessageStreamResponse, addToolOutput, onToolCall, sendMessage.
---
# Vercel AI SDK
The Vercel AI SDK provides React hooks and server utilities for building streaming chat interfaces with support for tool calls, file attachments, and multi-step reasoning.
## Quick Reference
### Basic useChat Setup
```typescript
import { useChat } from '@ai-sdk/react';
const { messages, status, sendMessage, stop, regenerate } = useChat({
id: 'chat-id',
messages: initialMessages,
onFinish: ({ message, messages, isAbort, isError }) => {
console.log('Chat finished');
},
onError: (error) => {
console.error('Chat error:', error);
}
});
// Send a message
sendMessage({ text: 'Hello', metadata: { createdAt: Date.now() } });
// Send with files
sendMessage({
text: 'Analyze this',
files: fileList // FileList or FileUIPart[]
});
```
### ChatStatus States
The `status` field indicates the current state of the chat:
- **`ready`**: Chat is idle and ready to accept new messages
- **`submitted`**: Message sent to API, awaiting response stream start
- **`streaming`**: Response actively streaming from the API
- **`error`**: An error occurred during the request
### Message Structure
Messages use the `UIMessage` type with a parts-based structure:
```typescript
interface UIMessage {
id: string;
role: 'system' | 'user' | 'assistant';
metadata?: unknown;
parts: Array<UIMessagePart>; // text, file, tool-*, reasoning, etc.
}
```
Part types include:
- `text`: Text content with optional streaming state
- `file`: File attachments (images, documents)
- `tool-{toolName}`: Tool invocations with state machine
- `reasoning`: AI reasoning traces
- `data-{typeName}`: Custom data parts
### Server-Side Streaming
```typescript
import { streamText } from 'ai';
import { convertToModelMessages } from 'ai';
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
tools: {
getWeather: tool({
description: 'Get weather',
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => {
return { temperature: 72, weather: 'sunny' };
}
})
}
});
return result.toUIMessageStreamResponse({
originalMessages: uiMessages,
onFinish: ({ messages }) => {
// Save to database
}
});
```
### Tool Handling Patterns
**Client-Side Tool Execution:**
```typescript
const { addToolOutput } = useChat({
onToolCall: async ({ toolCall }) => {
if (toolCall.toolName === 'getLocation') {
addToolOutput({
tool: 'getLocation',
toolCallId: toolCall.toolCallId,
output: 'San Francisco'
});
}
}
});
```
**Rendering Tool States:**
```typescript
{message.parts.map(part => {
if (part.type === 'tool-getWeather') {
switch (part.state) {
case 'input-streaming':
return <pre>{JSON.stringify(part.input, null, 2)}</pre>;
case 'input-available':
return <div>Getting weather for {part.input.city}...</div>;
case 'output-available':
return <div>Weather: {part.output.weather}</div>;
case 'output-error':
return <div>Error: {part.errorText}</div>;
}
}
})}
```
## Reference Files
Detailed documentation on specific aspects:
- **[use-chat.md](references/use-chat.md)**: Complete useChat API reference
- **[messages.md](references/messages.md)**: UIMessage structure and part types
- **[streaming.md](references/streaming.md)**: Server-side streaming implementation
- **[tools.md](references/tools.md)**: Tool definition and execution patterns
## Common Patterns
### Error Handling
```typescript
const { error, clearError } = useChat({
onError: (error) => {
toast.error(error.message);
}
});
// Clear error and reset to ready state
if (error) {
clearError();
}
```
### Message Regeneration
```typescript
const { regenerate } = useChat();
// Regenerate last assistant message
await regenerate();
// Regenerate specific message
await regenerate({ messageId: 'msg-123' });
```
### Custom Transport
```typescript
import { DefaultChatTransport } from 'ai';
const { messages } = useChat({
transport: new DefaultChatTransport({
api: '/api/chat',
prepareSendMessagesRequest: ({ id, messages, trigger, messageId }) => ({
body: {
chatId: id,
lastMessage: messages[messages.length - 1],
trigger,
messageId
}
})
})
});
```
### Performance Optimization
```typescript
// Throttle UI updates to reduce re-renders
const chat = useChat({
experimental_throttle: 100 // Update max once per 100ms
});
```
### Automatic Message Sending
```typescript
import { lastAssistantMessageIsCompleteWithToolCalls } from 'ai';
const chat = useChat({
sendAutomaticallyWhen: lastAssistantMessageIsCompleteWithToolCalls
// Automatically resend when all tool calls have outputs
});
```
## Type Safety
The SDK provides full type inference for tools and messages:
```typescript
import { InferUITools, UIMessage } from 'ai';
const tools = {
getWeather: tool({
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => ({ weather: 'sunny' })
})
};
type MyMessage = UIMessage<
{ createdAt: number }, // Metadata type
UIDataTypes,
InferUITools<typeof tools> // Tool types
>;
const { messages } = useChat<MyMessage>();
```
## Key Concepts
### Parts-Based Architecture
Messages use a parts array instead of a single content field. This allows:
- Streaming text while maintaining other parts
- Tool calls with independent state machines
- File attachments and custom data mixed with text
### Tool State Machine
Tool parts progress through states:
1. `input-streaming`: Tool input streaming (optional)
2. `input-available`: Tool input complete
3. `approval-requested`: Waiting for user approval (optional)
4. `approval-responded`: User approved/denied (optional)
5. `output-available`: Tool execution complete
6. `output-error`: Tool execution failed
7. `output-denied`: User denied approval
### Streaming Protocol
The SDK uses Server-Sent Events (SSE) with UIMessageChunk types:
- `text-start`, `text-delta`, `text-end`
- `tool-input-available`, `tool-output-available`
- `reasoning-start`, `reasoning-delta`, `reasoning-end`
- `start`, `finish`, `abort`
### Client vs Server Tools
**Server-side tools** have an `execute` function and run on the API route.
**Client-side tools** omit `execute` and are handled via `onToolCall` and `addToolOutput`.
## Gates
Use this **sequence**; treat a step as incomplete until the pass condition is true in code or UI (not “should work”).
1. **Streaming route** — *Pass if:* the chat handler chains `convertToModelMessages` → `streamText` (or the SDK pattern your app standardizes) → `toUIMessageStreamResponse` (or equivalent stream response). *Fail if:* responses are plain JSON strings without the UI message stream contract.
2. **Client ↔ route** — *Pass if:* `useChat` `id` / `DefaultChatTransport` `api` (and `prepareSendMessagesRequest` body) matches the route path and the body the server reads. *Fail if:* client posts to a different path or shape than the handler expects.
3. **Tools closed loop** — *Pass if:* every tool in `tools` has server `execute` **or** `onToolCall` + `addToolOutput` with the same `toolCallId`, and the UI handles the `tool-*` part states you surface. *Fail if:* a tool name exists in `tools` but has no handler or missing states in the renderer.
4. **Persistence (if any)** — *Pass if:* before saving, the server runs `validateUIMessages` (or stricter validation). *Fail if:* unvalidated client payloads are written to storage.
## Best Practices
1. Always handle the `error` state and provide user feedback
2. Use `experimental_throttle` for high-frequency updates
3. Implement proper loading states based on `status`
4. Type your messages with custom metadata and tools
5. Use `sendAutomaticallyWhen` for multi-turn tool workflows
6. Handle all tool states in the UI for better UX
7. Use `stop()` to allow users to cancel long-running requests
8. Validate messages with `validateUIMessages` on the server
FILE:references/messages.md
# UIMessage Structure Reference
Complete reference for the UIMessage type system and message parts.
## Table of Contents
- [UIMessage Interface](#uimessage-interface)
- [Message Parts](#message-parts)
- [Text Parts](#text-parts)
- [Tool Parts](#tool-parts)
- [File Parts](#file-parts)
- [Reasoning Parts](#reasoning-parts)
- [Data Parts](#data-parts)
- [Type Guards](#type-guards)
- [Type Inference](#type-inference)
## UIMessage Interface
```typescript
interface UIMessage<
METADATA = unknown,
DATA_PARTS extends UIDataTypes = UIDataTypes,
TOOLS extends UITools = UITools,
> {
// Unique identifier
id: string;
// Message role
role: 'system' | 'user' | 'assistant';
// Optional custom metadata
metadata?: METADATA;
// Array of message parts
parts: Array<UIMessagePart<DATA_PARTS, TOOLS>>;
}
```
### Role Guidelines
- **`system`**: System prompts (avoid in UI messages, set on server instead)
- **`user`**: User-generated messages (text, files)
- **`assistant`**: AI-generated messages (text, reasoning, tools, files)
### Parts-Based Architecture
Unlike traditional chat systems with single `content` fields, UIMessages use a `parts` array. This enables:
- Multiple content types in a single message
- Independent streaming states for each part
- Tool calls with their own state machines
- File attachments mixed with text
- Custom data parts for specialized UI
## Message Parts
```typescript
type UIMessagePart<DATA_TYPES, TOOLS> =
| TextUIPart
| ReasoningUIPart
| ToolUIPart<TOOLS>
| DynamicToolUIPart
| SourceUrlUIPart
| SourceDocumentUIPart
| FileUIPart
| DataUIPart<DATA_TYPES>
| StepStartUIPart;
```
## Text Parts
### TextUIPart
```typescript
interface TextUIPart {
type: 'text';
text: string;
state?: 'streaming' | 'done';
providerMetadata?: ProviderMetadata;
}
```
### Usage
```typescript
// Complete text
const textPart: TextUIPart = {
type: 'text',
text: 'Hello, world!',
state: 'done'
};
// Streaming text
const streamingPart: TextUIPart = {
type: 'text',
text: 'Hello, wor',
state: 'streaming'
};
```
### Rendering
```typescript
function renderTextPart(part: TextUIPart) {
return (
<div className={part.state === 'streaming' ? 'opacity-70' : ''}>
{part.text}
{part.state === 'streaming' && <Cursor />}
</div>
);
}
```
## Tool Parts
### ToolUIPart
Type-safe tool parts with tool name in the type.
```typescript
type ToolUIPart<TOOLS extends UITools> = ValueOf<{
[NAME in keyof TOOLS & string]: {
type: `tool-NAME`;
} & UIToolInvocation<TOOLS[NAME]>;
}>;
```
### UIToolInvocation States
Tool parts have a state machine with the following states:
```typescript
type UIToolInvocation<TOOL> =
| { state: 'input-streaming'; input: Partial<ToolInput> | undefined; }
| { state: 'input-available'; input: ToolInput; callProviderMetadata?: ProviderMetadata; }
| { state: 'approval-requested'; input: ToolInput; approval: { id: string }; }
| { state: 'approval-responded'; input: ToolInput; approval: { id: string; approved: boolean; reason?: string }; }
| { state: 'output-available'; input: ToolInput; output: ToolOutput; preliminary?: boolean; }
| { state: 'output-error'; input: ToolInput | undefined; errorText: string; }
| { state: 'output-denied'; input: ToolInput; approval: { id: string; approved: false; reason?: string }; }
```
**Common fields:**
- `toolCallId: string` - Unique identifier for this tool call
- `title?: string` - Optional display title
- `providerExecuted?: boolean` - True if provider executed the tool
### State Progression
```
input-streaming → input-available → [approval flow] → output-available
↓
approval-requested → approval-responded → output-available/output-denied
↓
(at any point) → output-error
```
### Example: Rendering Tool Parts
```typescript
function renderToolPart(part: ToolUIPart<MyTools>) {
// Extract common fields
const { toolCallId, title } = part;
// Type narrows based on tool type
if (part.type === 'tool-getWeather') {
switch (part.state) {
case 'input-streaming':
return (
<div>
Preparing weather request...
<pre>{JSON.stringify(part.input, null, 2)}</pre>
</div>
);
case 'input-available':
return <div>Fetching weather for {part.input.city}...</div>;
case 'output-available':
return (
<div>
Weather in {part.input.city}: {part.output.weather}
{part.preliminary && <Badge>Preliminary</Badge>}
</div>
);
case 'output-error':
return <div className="error">{part.errorText}</div>;
}
}
if (part.type === 'tool-askConfirmation') {
switch (part.state) {
case 'approval-requested':
return (
<div>
{part.input.message}
<button onClick={() => approve(part.approval.id)}>Yes</button>
<button onClick={() => deny(part.approval.id)}>No</button>
</div>
);
case 'approval-responded':
return (
<div>
User {part.approval.approved ? 'approved' : 'denied'}
{part.approval.reason && `: part.approval.reason`}
</div>
);
}
}
}
```
### DynamicToolUIPart
For tools not known at compile time.
```typescript
interface DynamicToolUIPart {
type: 'dynamic-tool';
toolName: string; // Name as string, not in type
toolCallId: string;
title?: string;
providerExecuted?: boolean;
// Same state union as UIToolInvocation but with unknown types
state: 'input-streaming' | 'input-available' | ...;
input: unknown;
output?: unknown;
errorText?: string;
}
```
### Tool Type Utilities
```typescript
// Check if part is a tool part
if (isToolUIPart(part)) {
const toolName = getToolName(part); // Type-safe tool name
}
// Check if tool or dynamic tool
if (isToolOrDynamicToolUIPart(part)) {
const name = getToolOrDynamicToolName(part); // string
}
// Check if dynamic tool
if (isDynamicToolUIPart(part)) {
console.log(part.toolName); // Access toolName field
}
```
## File Parts
### FileUIPart
```typescript
interface FileUIPart {
type: 'file';
mediaType: string; // IANA media type
filename?: string;
url: string; // Hosted URL or Data URL
providerMetadata?: ProviderMetadata;
}
```
### Usage
```typescript
// Image file
const imagePart: FileUIPart = {
type: 'file',
mediaType: 'image/png',
filename: 'screenshot.png',
url: 'data:image/png;base64,iVBORw0KG...'
};
// PDF document
const pdfPart: FileUIPart = {
type: 'file',
mediaType: 'application/pdf',
filename: 'report.pdf',
url: 'https://example.com/files/report.pdf'
};
```
### Rendering
```typescript
function renderFilePart(part: FileUIPart) {
if (part.mediaType.startsWith('image/')) {
return <img src={part.url} alt={part.filename} />;
}
if (part.mediaType.startsWith('video/')) {
return <video src={part.url} controls />;
}
return (
<a href={part.url} download={part.filename}>
{part.filename || 'Download file'}
</a>
);
}
```
## Reasoning Parts
### ReasoningUIPart
For AI reasoning traces (e.g., from OpenAI o1 models).
```typescript
interface ReasoningUIPart {
type: 'reasoning';
text: string;
state?: 'streaming' | 'done';
providerMetadata?: ProviderMetadata;
}
```
### Usage
```typescript
function renderReasoningPart(part: ReasoningUIPart) {
return (
<details>
<summary>Reasoning</summary>
<div className="reasoning">
{part.text}
{part.state === 'streaming' && <Spinner />}
</div>
</details>
);
}
```
## Data Parts
### DataUIPart
Custom data parts for specialized UI components.
```typescript
type DataUIPart<DATA_TYPES extends UIDataTypes> = ValueOf<{
[NAME in keyof DATA_TYPES & string]: {
type: `data-NAME`;
id?: string;
data: DATA_TYPES[NAME];
};
}>;
```
### Defining Custom Data Types
```typescript
// Define data types
type MyDataTypes = {
progress: { percent: number; status: string };
chart: { data: number[]; labels: string[] };
};
// Use in message type
type MyMessage = UIMessage<unknown, MyDataTypes>;
// Create data parts
const progressPart: DataUIPart<MyDataTypes> = {
type: 'data-progress',
data: { percent: 75, status: 'Processing...' }
};
const chartPart: DataUIPart<MyDataTypes> = {
type: 'data-chart',
id: 'chart-1',
data: {
data: [10, 20, 30],
labels: ['A', 'B', 'C']
}
};
```
### Rendering Custom Data
```typescript
function renderDataPart(part: UIMessagePart<MyDataTypes, MyTools>) {
if (isDataUIPart(part)) {
if (part.type === 'data-progress') {
return (
<ProgressBar
percent={part.data.percent}
label={part.data.status}
/>
);
}
if (part.type === 'data-chart') {
return <Chart data={part.data.data} labels={part.data.labels} />;
}
}
}
```
## Source Parts
### SourceUrlUIPart
```typescript
interface SourceUrlUIPart {
type: 'source-url';
sourceId: string;
url: string;
title?: string;
providerMetadata?: ProviderMetadata;
}
```
### SourceDocumentUIPart
```typescript
interface SourceDocumentUIPart {
type: 'source-document';
sourceId: string;
mediaType: string;
title: string;
filename?: string;
providerMetadata?: ProviderMetadata;
}
```
### Usage
```typescript
// URL source
const urlSource: SourceUrlUIPart = {
type: 'source-url',
sourceId: 'src-1',
url: 'https://example.com/article',
title: 'Example Article'
};
// Document source
const docSource: SourceDocumentUIPart = {
type: 'source-document',
sourceId: 'src-2',
mediaType: 'application/pdf',
title: 'Research Paper',
filename: 'paper.pdf'
};
```
## Step Parts
### StepStartUIPart
Marks the beginning of a new reasoning/execution step.
```typescript
interface StepStartUIPart {
type: 'step-start';
}
```
### Rendering
```typescript
function renderMessage(message: UIMessage) {
return (
<div>
{message.parts.map((part, index) => {
if (part.type === 'step-start' && index > 0) {
return <hr key={index} className="step-divider" />;
}
return renderPart(part, index);
})}
</div>
);
}
```
## Type Guards
```typescript
// Text
if (isTextUIPart(part)) {
console.log(part.text);
}
// File
if (isFileUIPart(part)) {
console.log(part.url, part.mediaType);
}
// Reasoning
if (isReasoningUIPart(part)) {
console.log(part.text);
}
// Tool (static)
if (isToolUIPart(part)) {
const toolName = getToolName(part);
}
// Dynamic tool
if (isDynamicToolUIPart(part)) {
console.log(part.toolName);
}
// Tool or dynamic tool
if (isToolOrDynamicToolUIPart(part)) {
const name = getToolOrDynamicToolName(part);
}
// Data
if (isDataUIPart(part)) {
// Check specific data type
if (part.type === 'data-progress') {
console.log(part.data.percent);
}
}
```
## Type Inference
### Infer from UIMessage
```typescript
type MyMessage = UIMessage<
{ createdAt: number },
{ progress: { percent: number } },
{ getTool: { input: string; output: number } }
>;
// Infer metadata type
type Metadata = InferUIMessageMetadata<MyMessage>;
// { createdAt: number }
// Infer data types
type DataTypes = InferUIMessageData<MyMessage>;
// { progress: { percent: number } }
// Infer tool types
type Tools = InferUIMessageTools<MyMessage>;
// { getTool: { input: string; output: number } }
// Infer tool outputs
type ToolOutputs = InferUIMessageToolOutputs<MyMessage>;
// number
// Infer tool calls
type ToolCall = InferUIMessageToolCall<MyMessage>;
// ToolCall<'getTool', string> | ...
// Infer part type
type Part = InferUIMessagePart<MyMessage>;
// TextUIPart | ToolUIPart<...> | DataUIPart<...> | ...
```
### Infer Tool Types
```typescript
import { InferUITool, InferUITools } from 'ai';
const weatherTool = tool({
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => ({ temp: 72 })
});
// Infer single tool
type WeatherTool = InferUITool<typeof weatherTool>;
// { input: { city: string }; output: { temp: number } }
// Infer tool set
const tools = { weather: weatherTool };
type MyTools = InferUITools<typeof tools>;
// { weather: { input: { city: string }; output: { temp: number } } }
```
## Complete Example
```typescript
import { UIMessage, InferUITools, isTextUIPart, isToolUIPart } from 'ai';
// Define tools
const tools = {
getWeather: tool({
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => ({ weather: 'sunny', temp: 72 })
})
};
// Define message type
type MyMessage = UIMessage<
{ createdAt: number },
{ progress: { percent: number } },
InferUITools<typeof tools>
>;
// Render function
function Message({ message }: { message: MyMessage }) {
return (
<div className={`message message-message.role`}>
<div className="timestamp">
{new Date(message.metadata.createdAt).toLocaleString()}
</div>
<div className="parts">
{message.parts.map((part, index) => {
if (isTextUIPart(part)) {
return <div key={index}>{part.text}</div>;
}
if (part.type === 'tool-getWeather') {
if (part.state === 'output-available') {
return (
<div key={index}>
Weather: {part.output.weather}, {part.output.temp}°F
</div>
);
}
}
if (part.type === 'data-progress') {
return (
<ProgressBar
key={index}
percent={part.data.percent}
/>
);
}
return null;
})}
</div>
</div>
);
}
```
FILE:references/streaming.md
# Streaming Reference
Server-side streaming implementation with the Vercel AI SDK.
## Table of Contents
- [Basic Streaming Setup](#basic-streaming-setup)
- [streamText Function](#streamtext-function)
- [toUIMessageStreamResponse](#touimessagestreamresponse)
- [UIMessageChunk Types](#uimessagechunk-types)
- [SSE Protocol](#sse-protocol)
- [Tool Execution Flow](#tool-execution-flow)
- [Error Handling](#error-handling)
- [Advanced Patterns](#advanced-patterns)
## Basic Streaming Setup
### Server Route
```typescript
import { streamText, convertToModelMessages } from 'ai';
import { openai } from '@ai-sdk/openai';
export async function POST(req: Request) {
const { messages } = await req.json();
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(messages)
});
return result.toUIMessageStreamResponse();
}
```
### Client Setup
```typescript
import { useChat } from '@ai-sdk/react';
function Chat() {
const { messages, sendMessage } = useChat({
api: '/api/chat'
});
return <ChatUI messages={messages} onSend={sendMessage} />;
}
```
## streamText Function
### Basic Options
```typescript
const result = streamText({
// Model to use
model: openai('gpt-4'),
// Messages (converted from UIMessage)
messages: convertToModelMessages(uiMessages),
// System prompt (optional)
system: 'You are a helpful assistant.',
// Temperature, max tokens, etc.
temperature: 0.7,
maxTokens: 2000,
// Abort signal
abortSignal: abortController.signal
});
```
### With Tools
```typescript
import { tool } from 'ai';
import { z } from 'zod';
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
tools: {
getWeather: tool({
description: 'Get weather for a city',
inputSchema: z.object({
city: z.string()
}),
execute: async ({ city }) => {
const data = await fetchWeather(city);
return { temperature: data.temp, weather: data.conditions };
}
})
}
});
```
### Multi-Step Tool Execution
```typescript
import { stepCountIs } from 'ai';
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
tools: {
getWeather: weatherTool,
searchWeb: searchTool
},
// Allow up to 5 steps (model can call tools multiple times)
stopWhen: stepCountIs(5)
});
```
### Streaming Tool Outputs
```typescript
const getWeatherTool = tool({
description: 'Get weather information',
inputSchema: z.object({ city: z.string() }),
// Generator function for streaming outputs
async *execute({ city }) {
// Yield preliminary results
yield { state: 'loading' as const };
const data = await fetchWeather(city);
// Yield intermediate results
yield {
state: 'partial' as const,
temperature: data.temp
};
// Yield final result
yield {
state: 'complete' as const,
temperature: data.temp,
weather: data.conditions,
forecast: data.forecast
};
}
});
```
### Callbacks
```typescript
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
// Called on each chunk
onChunk({ chunk }) {
console.log('Chunk:', chunk);
},
// Called when a step finishes (with multi-step)
onStepFinish({ request, response, toolCalls, toolResults }) {
console.log('Step finished');
console.log('Tool calls:', toolCalls);
console.log('Tool results:', toolResults);
},
// Called when generation finishes
onFinish({ text, finishReason, usage }) {
console.log('Generated text:', text);
console.log('Finish reason:', finishReason);
console.log('Token usage:', usage);
}
});
```
## toUIMessageStreamResponse
Convert streamText result to a UIMessage stream response.
### Basic Usage
```typescript
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages)
});
return result.toUIMessageStreamResponse();
```
### With Options
```typescript
return result.toUIMessageStreamResponse({
// Include original messages in the stream
originalMessages: uiMessages,
// Custom message ID generator
generateMessageId: () => crypto.randomUUID(),
// Add/update message metadata
messageMetadata: ({ part, message }) => {
if (part.type === 'start') {
return { createdAt: Date.now() };
}
if (part.type === 'finish') {
return { finishedAt: Date.now() };
}
},
// Called when streaming finishes
onFinish: ({ messages, finishReason }) => {
// Save to database
saveMessages(messages);
}
});
```
### Custom Headers
```typescript
const response = result.toUIMessageStreamResponse({
originalMessages: uiMessages
});
// Add custom headers
response.headers.set('X-Custom-Header', 'value');
return response;
```
## UIMessageChunk Types
Chunks sent over the stream as Server-Sent Events.
### Text Chunks
```typescript
// Text streaming starts
{ type: 'text-start', id: 'text-1' }
// Text delta (incremental update)
{ type: 'text-delta', id: 'text-1', delta: 'Hello' }
{ type: 'text-delta', id: 'text-1', delta: ' world' }
// Text streaming ends
{ type: 'text-end', id: 'text-1' }
```
### Reasoning Chunks
```typescript
// Reasoning starts
{ type: 'reasoning-start', id: 'reasoning-1' }
// Reasoning delta
{ type: 'reasoning-delta', id: 'reasoning-1', delta: 'First, ' }
{ type: 'reasoning-delta', id: 'reasoning-1', delta: 'I will...' }
// Reasoning ends
{ type: 'reasoning-end', id: 'reasoning-1' }
```
### Tool Chunks
```typescript
// Tool input streaming starts
{
type: 'tool-input-start',
toolCallId: 'call-123',
toolName: 'getWeather',
dynamic: false
}
// Tool input delta (for large inputs)
{
type: 'tool-input-delta',
toolCallId: 'call-123',
inputTextDelta: '{"city": '
}
// Tool input complete
{
type: 'tool-input-available',
toolCallId: 'call-123',
toolName: 'getWeather',
input: { city: 'San Francisco' }
}
// Tool output available
{
type: 'tool-output-available',
toolCallId: 'call-123',
output: { temperature: 72, weather: 'sunny' },
preliminary: false
}
// Tool execution error
{
type: 'tool-output-error',
toolCallId: 'call-123',
errorText: 'API unavailable'
}
```
### Tool Approval Chunks
```typescript
// Tool needs approval
{
type: 'tool-approval-request',
approvalId: 'approval-1',
toolCallId: 'call-123'
}
// User responded (handled client-side, not streamed)
```
### Control Chunks
```typescript
// Stream starts
{
type: 'start',
messageId: 'msg-123',
messageMetadata: { createdAt: 1234567890 }
}
// Stream finishes
{
type: 'finish',
finishReason: 'stop', // or 'length', 'tool-calls', 'content-filter'
messageMetadata: { finishedAt: 1234567890 }
}
// Stream aborted
{ type: 'abort' }
// Error occurred
{ type: 'error', errorText: 'Something went wrong' }
// Metadata update
{
type: 'message-metadata',
messageMetadata: { updated: true }
}
```
### Step Chunks
```typescript
// New step starts (for multi-step reasoning)
{ type: 'start-step' }
// Step finishes
{ type: 'finish-step' }
```
### Data Chunks
```typescript
// Custom data part
{
type: 'data-progress',
id: 'progress-1',
data: { percent: 50, status: 'Processing...' },
transient: false // If true, not added to final message
}
```
### File Chunks
```typescript
{
type: 'file',
url: 'https://example.com/image.png',
mediaType: 'image/png'
}
```
## SSE Protocol
### Format
```
event: message
data: {"type":"text-start","id":"text-1"}
event: message
data: {"type":"text-delta","id":"text-1","delta":"Hello"}
event: message
data: {"type":"text-delta","id":"text-1","delta":" world"}
event: message
data: {"type":"text-end","id":"text-1"}
event: message
data: {"type":"finish","finishReason":"stop"}
```
### Client-Side Parsing
The SDK handles parsing automatically, but for custom implementations:
```typescript
const eventSource = new EventSource('/api/chat');
eventSource.addEventListener('message', (event) => {
const chunk: UIMessageChunk = JSON.parse(event.data);
switch (chunk.type) {
case 'text-delta':
appendText(chunk.id, chunk.delta);
break;
case 'tool-output-available':
updateToolOutput(chunk.toolCallId, chunk.output);
break;
case 'finish':
console.log('Finished:', chunk.finishReason);
eventSource.close();
break;
}
});
```
## Tool Execution Flow
### Server-Side Tools
```typescript
// 1. Client sends message
sendMessage({ text: 'What is the weather in SF?' });
// 2. Server streams tool invocation
{ type: 'tool-input-available', toolCallId: 'call-1', input: { city: 'San Francisco' } }
// 3. Server executes tool and streams output
{ type: 'tool-output-available', toolCallId: 'call-1', output: { temp: 72 } }
// 4. Server continues with model response
{ type: 'text-delta', delta: 'The weather is sunny...' }
```
### Client-Side Tools
```typescript
// 1. Client sends message
sendMessage({ text: 'Get my location' });
// 2. Server streams tool call (no execute)
{ type: 'tool-input-available', toolCallId: 'call-1', toolName: 'getLocation', input: {} }
// 3. Client handles tool call
onToolCall: async ({ toolCall }) => {
const location = await getCurrentLocation();
addToolOutput({
toolCallId: toolCall.toolCallId,
tool: 'getLocation',
output: location
});
}
// 4. Client sends tool output back to server
// 5. Server streams final response
```
### Preliminary Tool Outputs
For streaming tool results:
```typescript
// Tool yields intermediate result
{
type: 'tool-output-available',
toolCallId: 'call-1',
output: { state: 'loading' },
preliminary: true
}
// Tool yields final result
{
type: 'tool-output-available',
toolCallId: 'call-1',
output: { state: 'complete', data: {...} },
preliminary: false // or omitted
}
```
## Error Handling
### Server-Side Errors
```typescript
try {
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages)
});
return result.toUIMessageStreamResponse();
} catch (error) {
// Stream error chunk
return new Response(
new ReadableStream({
start(controller) {
const chunk: UIMessageChunk = {
type: 'error',
errorText: error.message
};
controller.enqueue(
new TextEncoder().encode(
`event: message\ndata: JSON.stringify(chunk)\n\n`
)
);
controller.close();
}
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
}
}
);
}
```
### Tool Execution Errors
```typescript
const weatherTool = tool({
inputSchema: z.object({ city: z.string() }),
async execute({ city }) {
try {
return await fetchWeather(city);
} catch (error) {
// Error is automatically converted to tool-output-error chunk
throw new Error(`Failed to fetch weather: error.message`);
}
}
});
```
### Client-Side Error Handling
```typescript
const { error } = useChat({
onError: (error) => {
console.error('Stream error:', error);
toast.error(error.message);
}
});
```
## Advanced Patterns
### Custom Data Streaming
```typescript
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
onChunk({ chunk }) {
// Stream custom progress updates
if (chunk.type === 'text-delta') {
// Calculate progress based on tokens
const progress = calculateProgress(chunk);
// Custom data will be sent via data-* chunk
}
}
});
```
### Resumable Streams
```typescript
import { createResumableStreamContext } from 'resumable-stream';
export async function POST(req: Request) {
const { chatId } = await req.json();
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages)
});
return result.toUIMessageStreamResponse({
async consumeSseStream({ stream }) {
const streamId = generateId();
const streamContext = createResumableStreamContext();
// Store stream for resumption
await streamContext.createNewResumableStream(streamId, () => stream);
// Save stream ID to database
await saveActiveStreamId(chatId, streamId);
}
});
}
```
### Backpressure Handling
The SDK automatically handles backpressure, but for custom implementations:
```typescript
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
// Throttle chunk processing
onChunk: throttle(async ({ chunk }) => {
await processChunk(chunk);
}, 100)
});
```
### Conditional Streaming
```typescript
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
onChunk({ chunk }) {
// Only stream certain content
if (shouldFilterChunk(chunk)) {
return; // Skip this chunk
}
}
});
```
### Custom Transform
```typescript
import { processUIMessageStream } from 'ai';
const stream = await transport.sendMessages({...});
const transformedStream = processUIMessageStream({
stream,
onToolCall,
onData,
runUpdateMessageJob: async ({ state, write }) => {
// Custom message update logic
customTransform(state.message);
write();
}
});
```
## Performance Considerations
### Throttling
```typescript
// Client-side throttling
const { messages } = useChat({
experimental_throttle: 100 // Max one update per 100ms
});
// Server-side throttling
const result = streamText({
model: openai('gpt-4'),
messages,
onChunk: throttle(processChunk, 100)
});
```
### Batch Updates
```typescript
let pendingDeltas: string[] = [];
const result = streamText({
model: openai('gpt-4'),
messages,
onChunk({ chunk }) {
if (chunk.type === 'text-delta') {
pendingDeltas.push(chunk.delta);
// Flush every 10 deltas or 100ms
if (pendingDeltas.length >= 10) {
flushDeltas();
}
}
}
});
```
### Memory Management
```typescript
// Clean up old messages to prevent memory leaks
const { messages, setMessages } = useChat();
useEffect(() => {
if (messages.length > 100) {
setMessages(messages.slice(-50)); // Keep last 50
}
}, [messages]);
```
## Complete Example
```typescript
// Server route
export async function POST(req: Request) {
const { messages, chatId } = await req.json();
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(messages),
tools: {
getWeather: tool({
description: 'Get weather',
inputSchema: z.object({ city: z.string() }),
async *execute({ city }) {
yield { state: 'loading' };
const data = await fetchWeather(city);
yield { state: 'complete', ...data };
}
})
},
stopWhen: stepCountIs(5),
onStepFinish({ toolCalls }) {
console.log('Tools called:', toolCalls);
}
});
return result.toUIMessageStreamResponse({
originalMessages: messages,
messageMetadata: ({ part }) => {
if (part.type === 'start') {
return { createdAt: Date.now() };
}
},
onFinish: async ({ messages }) => {
await saveChat(chatId, messages);
}
});
}
// Client component
function Chat() {
const { messages, sendMessage, status } = useChat({
experimental_throttle: 100,
onError: (error) => toast.error(error.message),
onFinish: ({ finishReason }) => {
console.log('Finished:', finishReason);
}
});
return (
<div>
{messages.map(msg => (
<Message key={msg.id} message={msg} />
))}
<ChatInput
disabled={status !== 'ready'}
onSubmit={sendMessage}
/>
</div>
);
}
```
FILE:references/tools.md
# Tools Reference
Complete reference for tool definition and execution in the Vercel AI SDK.
## Table of Contents
- [Tool Definition](#tool-definition)
- [Server-Side Tools](#server-side-tools)
- [Client-Side Tools](#client-side-tools)
- [Tool State Machine](#tool-state-machine)
- [Tool Approval Flow](#tool-approval-flow)
- [Rendering Tool States](#rendering-tool-states)
- [Type Safety](#type-safety)
- [Advanced Patterns](#advanced-patterns)
## Tool Definition
### Basic Tool
```typescript
import { tool } from 'ai';
import { z } from 'zod';
const getWeatherTool = tool({
// Description for the AI model
description: 'Get current weather for a city',
// Input schema using Zod
inputSchema: z.object({
city: z.string().describe('The city name'),
units: z.enum(['celsius', 'fahrenheit']).optional()
}),
// Optional output schema
outputSchema: z.object({
temperature: z.number(),
weather: z.string(),
humidity: z.number()
}),
// Execution function (server-side only)
execute: async ({ city, units = 'celsius' }) => {
const data = await fetchWeather(city);
return {
temperature: convertTemp(data.temp, units),
weather: data.conditions,
humidity: data.humidity
};
}
});
```
### Tool Without Execute (Client-Side)
```typescript
const askConfirmationTool = tool({
description: 'Ask the user for confirmation',
inputSchema: z.object({
message: z.string()
}),
outputSchema: z.string()
// No execute function - handled on client
});
```
## Server-Side Tools
Tools with `execute` functions run on the server during streaming.
### Simple Execution
```typescript
const searchTool = tool({
description: 'Search the web',
inputSchema: z.object({
query: z.string()
}),
async execute({ query }) {
const results = await searchWeb(query);
return {
results: results.slice(0, 5),
count: results.length
};
}
});
```
### Streaming Tool Outputs
Use generator functions to stream intermediate results:
```typescript
const analysisTool = tool({
description: 'Analyze data',
inputSchema: z.object({
data: z.array(z.number())
}),
async *execute({ data }) {
// Yield preliminary status
yield { state: 'processing', progress: 0 };
// Perform analysis in stages
const mean = calculateMean(data);
yield { state: 'processing', progress: 33, mean };
const median = calculateMedian(data);
yield { state: 'processing', progress: 66, mean, median };
const stdDev = calculateStdDev(data);
// Yield final result
yield {
state: 'complete',
progress: 100,
mean,
median,
stdDev
};
}
});
```
### Tool Callbacks
```typescript
const verboseTool = tool({
description: 'Tool with callbacks',
inputSchema: z.object({ query: z.string() }),
// Called when input streaming starts
onInputStart: () => {
console.log('Tool input starting');
},
// Called on each input delta (for large inputs)
onInputDelta: ({ inputTextDelta }) => {
console.log('Input delta:', inputTextDelta);
},
// Called when input is complete
onInputAvailable: ({ input }) => {
console.log('Input available:', input);
},
async execute({ query }) {
return await search(query);
}
});
```
### Error Handling
```typescript
const fallibleTool = tool({
description: 'Tool that might fail',
inputSchema: z.object({ id: z.string() }),
async execute({ id }) {
try {
const data = await fetchData(id);
if (!data) {
throw new Error('Data not found');
}
return data;
} catch (error) {
// Error is automatically sent as tool-output-error chunk
throw new Error(`Failed to fetch data: error.message`);
}
}
});
```
### Multi-Step Tools
Tools can trigger additional model calls:
```typescript
import { streamText, stepCountIs } from 'ai';
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
tools: {
search: searchTool,
analyze: analyzeTool,
summarize: summarizeTool
},
// Allow up to 5 steps (model can call tools multiple times)
stopWhen: stepCountIs(5),
onStepFinish({ toolCalls, toolResults }) {
console.log('Step finished');
console.log('Called:', toolCalls.map(t => t.toolName));
console.log('Results:', toolResults);
}
});
```
## Client-Side Tools
Tools without `execute` are handled on the client via `onToolCall` and `addToolOutput`.
### Automatic Execution
```typescript
const { addToolOutput } = useChat({
onToolCall: async ({ toolCall }) => {
if (toolCall.toolName === 'getLocation') {
try {
const location = await getCurrentLocation();
addToolOutput({
tool: 'getLocation',
toolCallId: toolCall.toolCallId,
output: location
});
} catch (error) {
addToolOutput({
state: 'output-error',
tool: 'getLocation',
toolCallId: toolCall.toolCallId,
errorText: error.message
});
}
}
}
});
```
### User Interaction
For tools that require user input, handle them in the render phase:
```typescript
function Message({ message, addToolOutput }) {
return (
<div>
{message.parts.map(part => {
if (part.type === 'tool-askConfirmation') {
if (part.state === 'input-available') {
return (
<div>
<p>{part.input.message}</p>
<button
onClick={() =>
addToolOutput({
tool: 'askConfirmation',
toolCallId: part.toolCallId,
output: 'Yes, confirmed'
})
}
>
Confirm
</button>
<button
onClick={() =>
addToolOutput({
state: 'output-error',
tool: 'askConfirmation',
toolCallId: part.toolCallId,
errorText: 'User declined'
})
}
>
Cancel
</button>
</div>
);
}
if (part.state === 'output-available') {
return <div>User confirmed: {part.output}</div>;
}
}
})}
</div>
);
}
```
### Automatic Resending
```typescript
import { lastAssistantMessageIsCompleteWithToolCalls } from 'ai';
const { messages } = useChat({
// Automatically resend when all tool outputs are available
sendAutomaticallyWhen: lastAssistantMessageIsCompleteWithToolCalls,
onToolCall: async ({ toolCall }) => {
// Execute client-side tools
// Message will automatically be resent when all tools complete
}
});
```
## Tool State Machine
Tool parts progress through states:
### State Flow
```
1. input-streaming (optional)
↓
2. input-available
↓
3a. [No approval needed] → output-available
OR
3b. approval-requested → approval-responded → output-available/output-denied
OR
(at any point) → output-error
```
### State Definitions
```typescript
type ToolState =
| 'input-streaming' // Tool input streaming (large inputs)
| 'input-available' // Tool input complete
| 'approval-requested' // Waiting for user approval
| 'approval-responded' // User responded (approved/denied)
| 'output-available' // Tool execution complete
| 'output-error' // Tool execution failed
| 'output-denied'; // User denied approval
```
### State Properties
```typescript
// input-streaming
{
state: 'input-streaming',
input: Partial<ToolInput> | undefined // Partial input while streaming
}
// input-available
{
state: 'input-available',
input: ToolInput, // Complete input
callProviderMetadata?: ProviderMetadata
}
// approval-requested
{
state: 'approval-requested',
input: ToolInput,
approval: {
id: string // Approval ID for response
}
}
// approval-responded
{
state: 'approval-responded',
input: ToolInput,
approval: {
id: string,
approved: boolean,
reason?: string
}
}
// output-available
{
state: 'output-available',
input: ToolInput,
output: ToolOutput,
preliminary?: boolean, // True for intermediate streaming outputs
approval?: {
id: string,
approved: true,
reason?: string
}
}
// output-error
{
state: 'output-error',
input: ToolInput | undefined,
errorText: string,
approval?: { ... } // If error after approval
}
// output-denied
{
state: 'output-denied',
input: ToolInput,
approval: {
id: string,
approved: false,
reason?: string
}
}
```
## Tool Approval Flow
### Defining Approval Requirements
```typescript
const deleteTool = tool({
description: 'Delete a file',
inputSchema: z.object({
filename: z.string()
}),
// Request approval before execution
requiresApproval: true,
async execute({ filename }) {
await deleteFile(filename);
return { deleted: filename };
}
});
```
### Handling Approval Requests
```typescript
function ToolApproval({ part, addToolApprovalResponse }) {
if (part.type === 'tool-deleteFile' && part.state === 'approval-requested') {
return (
<div>
<p>Delete {part.input.filename}?</p>
<button
onClick={() =>
addToolApprovalResponse({
id: part.approval.id,
approved: true,
reason: 'User confirmed deletion'
})
}
>
Approve
</button>
<button
onClick={() =>
addToolApprovalResponse({
id: part.approval.id,
approved: false,
reason: 'User cancelled'
})
}
>
Deny
</button>
</div>
);
}
}
```
### Automatic Resending After Approval
```typescript
const { addToolApprovalResponse } = useChat({
sendAutomaticallyWhen: ({ messages }) => {
const lastMsg = messages[messages.length - 1];
return lastMsg.parts.every(part => {
if (part.type.startsWith('tool-')) {
// Resend when all tools are either:
// - output-available
// - output-error
// - output-denied
// - approval-responded (waiting for backend)
return ['output-available', 'output-error', 'output-denied', 'approval-responded']
.includes(part.state);
}
return true;
});
}
});
```
## Rendering Tool States
### Complete Tool Renderer
```typescript
function ToolPart({ part, addToolOutput, addToolApprovalResponse }) {
// Type guard
if (!part.type.startsWith('tool-')) return null;
const toolName = part.type.replace('tool-', '');
switch (part.state) {
case 'input-streaming':
return (
<div className="tool-streaming">
<Spinner />
<span>Preparing {toolName}...</span>
<pre>{JSON.stringify(part.input, null, 2)}</pre>
</div>
);
case 'input-available':
return (
<div className="tool-executing">
<Spinner />
<span>Executing {toolName}...</span>
{part.title && <h4>{part.title}</h4>}
</div>
);
case 'approval-requested':
return (
<div className="tool-approval">
<h4>Approval Required</h4>
<p>Allow {toolName}?</p>
<div className="tool-input">
<pre>{JSON.stringify(part.input, null, 2)}</pre>
</div>
<button
onClick={() =>
addToolApprovalResponse({
id: part.approval.id,
approved: true
})
}
>
Approve
</button>
<button
onClick={() =>
addToolApprovalResponse({
id: part.approval.id,
approved: false,
reason: 'User declined'
})
}
>
Deny
</button>
</div>
);
case 'approval-responded':
return (
<div className="tool-approval-responded">
{part.approval.approved ? (
<span>✓ Approved, executing...</span>
) : (
<span>✗ Denied: {part.approval.reason}</span>
)}
</div>
);
case 'output-available':
return (
<div className="tool-output">
<h4>{toolName} Result</h4>
{part.preliminary && <Badge>Preliminary</Badge>}
<ToolOutput toolName={toolName} output={part.output} />
</div>
);
case 'output-error':
return (
<div className="tool-error">
<h4>{toolName} Error</h4>
<p className="error">{part.errorText}</p>
{part.input && (
<details>
<summary>Input</summary>
<pre>{JSON.stringify(part.input, null, 2)}</pre>
</details>
)}
</div>
);
case 'output-denied':
return (
<div className="tool-denied">
<span>✗ Tool execution denied</span>
{part.approval.reason && <p>{part.approval.reason}</p>}
</div>
);
default:
return null;
}
}
```
### Tool-Specific Renderers
```typescript
function WeatherToolOutput({ part }) {
if (part.type !== 'tool-getWeather') return null;
if (part.state === 'output-available') {
const { weather, temperature } = part.output;
return (
<div className="weather-card">
<WeatherIcon condition={weather} />
<span>{temperature}°F</span>
<span>{weather}</span>
</div>
);
}
if (part.state === 'input-available') {
return <div>Fetching weather for {part.input.city}...</div>;
}
return null;
}
```
## Type Safety
### Typed Tools
```typescript
const tools = {
getWeather: tool({
inputSchema: z.object({ city: z.string() }),
execute: async ({ city }) => ({ temp: 72, weather: 'sunny' })
}),
searchWeb: tool({
inputSchema: z.object({ query: z.string() }),
execute: async ({ query }) => ({ results: [] })
})
} as const;
// Infer tool types
type MyTools = InferUITools<typeof tools>;
// {
// getWeather: { input: { city: string }; output: { temp: number; weather: string } }
// searchWeb: { input: { query: string }; output: { results: any[] } }
// }
// Use in message type
type MyMessage = UIMessage<unknown, UIDataTypes, MyTools>;
```
### Type-Safe Tool Rendering
```typescript
function renderToolPart(part: InferUIMessagePart<MyMessage>) {
// TypeScript knows the exact tool types
if (part.type === 'tool-getWeather') {
if (part.state === 'output-available') {
// part.output is typed as { temp: number; weather: string }
return <div>{part.output.weather}: {part.output.temp}°F</div>;
}
}
if (part.type === 'tool-searchWeb') {
if (part.state === 'output-available') {
// part.output is typed as { results: any[] }
return <SearchResults results={part.output.results} />;
}
}
}
```
### Type-Safe addToolOutput
```typescript
const { addToolOutput } = useChat<MyMessage>();
// TypeScript enforces correct tool name and output type
addToolOutput({
tool: 'getWeather', // Must be a key in MyTools
toolCallId: 'call-123',
output: { temp: 72, weather: 'sunny' } // Must match getWeather output type
});
// Error: Type '"invalid"' is not assignable to type '"getWeather" | "searchWeb"'
addToolOutput({
tool: 'invalid',
toolCallId: 'call-123',
output: {}
});
```
## Advanced Patterns
### Conditional Tool Availability
```typescript
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(uiMessages),
tools: user.isPremium
? { search: searchTool, analyze: analyzeTool }
: { search: searchTool }
});
```
### Tool Chaining
```typescript
const tools = {
search: tool({
description: 'Search for information',
inputSchema: z.object({ query: z.string() }),
async execute({ query }) {
return await searchWeb(query);
}
}),
analyze: tool({
description: 'Analyze search results',
inputSchema: z.object({ results: z.array(z.any()) }),
async execute({ results }) {
return await analyzeResults(results);
}
})
};
const result = streamText({
model: openai('gpt-4'),
messages,
tools,
stopWhen: stepCountIs(5) // Allow chaining
});
```
### Parallel Tool Execution
The AI model can invoke multiple tools in parallel:
```typescript
const result = streamText({
model: openai('gpt-4'),
messages,
tools: {
getWeather: weatherTool,
getNews: newsTool,
getStocks: stocksTool
},
maxToolRoundtrips: 1 // One round with multiple parallel tools
});
```
### Tool Metadata
```typescript
const enrichedTool = tool({
description: 'Search with metadata',
inputSchema: z.object({ query: z.string() }),
async execute({ query }, { messages, abortSignal }) {
// Access message history
const context = extractContext(messages);
// Support cancellation
const results = await searchWithAbort(query, abortSignal);
return {
results,
metadata: {
searchTime: Date.now(),
contextUsed: context
}
};
}
});
```
### Custom Tool Validation
```typescript
const validatedTool = tool({
description: 'Tool with validation',
inputSchema: z.object({
amount: z.number().min(0).max(1000)
}),
async execute({ amount }) {
// Additional runtime validation
if (amount > 500) {
throw new Error('Amount requires additional approval');
}
return await processPayment(amount);
}
});
```
## Complete Example
```typescript
// Server route
const tools = {
// Server-side tool
getWeather: tool({
description: 'Get weather for a city',
inputSchema: z.object({ city: z.string() }),
async *execute({ city }) {
yield { state: 'loading' };
const data = await fetchWeather(city);
yield { state: 'complete', ...data };
}
}),
// Client-side tool with approval
getLocation: tool({
description: 'Get user location (requires permission)',
inputSchema: z.object({}),
outputSchema: z.string(),
requiresApproval: true
})
};
export async function POST(req: Request) {
const { messages } = await req.json();
const result = streamText({
model: openai('gpt-4'),
messages: convertToModelMessages(messages),
tools
});
return result.toUIMessageStreamResponse();
}
// Client component
type MyMessage = UIMessage<unknown, UIDataTypes, InferUITools<typeof tools>>;
function Chat() {
const { messages, addToolOutput, addToolApprovalResponse } = useChat<MyMessage>({
// Handle client-side tool automatically
onToolCall: async ({ toolCall }) => {
if (toolCall.toolName === 'getLocation') {
const location = await getCurrentLocation();
addToolOutput({
tool: 'getLocation',
toolCallId: toolCall.toolCallId,
output: location
});
}
},
// Resend when all tools complete
sendAutomaticallyWhen: lastAssistantMessageIsCompleteWithToolCalls
});
return (
<div>
{messages.map(msg => (
<div key={msg.id}>
{msg.parts.map((part, i) => (
<ToolPart
key={i}
part={part}
addToolOutput={addToolOutput}
addToolApprovalResponse={addToolApprovalResponse}
/>
))}
</div>
))}
</div>
);
}
```
FILE:references/use-chat.md
# useChat Hook Reference
Complete API reference for the `useChat` hook from `@ai-sdk/react`.
## Table of Contents
- [Hook Signature](#hook-signature)
- [Options](#options)
- [Return Values](#return-values)
- [Methods](#methods)
- [Callbacks](#callbacks)
- [Types](#types)
## Hook Signature
```typescript
function useChat<UI_MESSAGE extends UIMessage = UIMessage>(
options?: UseChatOptions<UI_MESSAGE>
): UseChatHelpers<UI_MESSAGE>
```
## Options
### Basic Options
```typescript
interface UseChatOptions<UI_MESSAGE extends UIMessage> {
// Chat instance (existing) or initialization parameters
chat?: Chat<UI_MESSAGE>;
// Chat identifier (auto-generated if not provided)
id?: string;
// Initial messages
messages?: UI_MESSAGE[];
// Whether to resume an ongoing stream on mount
resume?: boolean;
// Custom transport for API communication
transport?: ChatTransport<UI_MESSAGE>;
// ID generator function
generateId?: IdGenerator;
}
```
### Schema Options
```typescript
interface UseChatOptions<UI_MESSAGE extends UIMessage> {
// Schema for message metadata validation
messageMetadataSchema?: FlexibleSchema<InferUIMessageMetadata<UI_MESSAGE>>;
// Schemas for custom data parts
dataPartSchemas?: UIDataTypesToSchemas<InferUIMessageData<UI_MESSAGE>>;
}
```
### Performance Options
```typescript
interface UseChatOptions<UI_MESSAGE extends UIMessage> {
// Throttle message updates (in milliseconds)
// Default: undefined (no throttling)
experimental_throttle?: number;
}
```
### Callback Options
```typescript
interface UseChatOptions<UI_MESSAGE extends UIMessage> {
// Called when an error occurs
onError?: (error: Error) => void;
// Called when a tool call is received
onToolCall?: (options: {
toolCall: InferUIMessageToolCall<UI_MESSAGE>;
}) => void | PromiseLike<void>;
// Called when streaming finishes
onFinish?: (options: {
message: UI_MESSAGE;
messages: UI_MESSAGE[];
isAbort: boolean;
isDisconnect: boolean;
isError: boolean;
finishReason?: FinishReason;
}) => void;
// Called when a data part is received
onData?: (dataPart: DataUIPart<InferUIMessageData<UI_MESSAGE>>) => void;
}
```
### Automation Options
```typescript
interface UseChatOptions<UI_MESSAGE extends UIMessage> {
// Automatically send messages when condition is met
sendAutomaticallyWhen?: (options: {
messages: UI_MESSAGE[];
}) => boolean | PromiseLike<boolean>;
}
```
## Return Values
```typescript
interface UseChatHelpers<UI_MESSAGE extends UIMessage> {
// Chat identifier
readonly id: string;
// Current chat status
status: ChatStatus;
// Current error (if any)
error: Error | undefined;
// Array of all messages
messages: UI_MESSAGE[];
// Methods (see below)
sendMessage: (message?, options?) => Promise<void>;
regenerate: (options?) => Promise<void>;
stop: () => Promise<void>;
resumeStream: (options?) => Promise<void>;
setMessages: (messages) => void;
clearError: () => void;
addToolOutput: (options) => Promise<void>;
addToolApprovalResponse: (options) => Promise<void>;
// Deprecated
addToolResult: (options) => Promise<void>; // Use addToolOutput
}
```
### ChatStatus Type
```typescript
type ChatStatus = 'ready' | 'submitted' | 'streaming' | 'error';
```
- **`ready`**: No active request, ready for new messages
- **`submitted`**: Request sent, awaiting stream start
- **`streaming`**: Response actively streaming
- **`error`**: Error occurred during request
## Methods
### sendMessage
Send a new user message or replace an existing one.
```typescript
// Send text message
await sendMessage({ text: 'Hello' });
// Send with files
await sendMessage({
text: 'Analyze this image',
files: fileList // FileList or FileUIPart[]
});
// Send with metadata
await sendMessage({
text: 'Hello',
metadata: { createdAt: Date.now(), userId: '123' }
});
// Replace existing message
await sendMessage({
text: 'Updated message',
messageId: 'msg-123'
});
// Send full UIMessage structure
await sendMessage({
parts: [
{ type: 'text', text: 'Hello' },
{ type: 'file', url: 'data:...', mediaType: 'image/png' }
],
metadata: { custom: 'data' }
});
// Send with request options
await sendMessage(
{ text: 'Hello' },
{
headers: { 'X-Custom': 'value' },
body: { extra: 'data' },
metadata: { custom: 'metadata' }
}
);
// Continue existing conversation (no message)
await sendMessage();
```
**Signature:**
```typescript
sendMessage: (
message?:
| { text: string; files?: FileList | FileUIPart[]; metadata?: Metadata; messageId?: string }
| { files: FileList | FileUIPart[]; metadata?: Metadata; messageId?: string }
| CreateUIMessage<UI_MESSAGE> & { messageId?: string },
options?: ChatRequestOptions
) => Promise<void>
```
### regenerate
Regenerate an assistant message.
```typescript
// Regenerate last assistant message
await regenerate();
// Regenerate specific message
await regenerate({ messageId: 'msg-123' });
// With request options
await regenerate({
messageId: 'msg-123',
headers: { 'X-Custom': 'value' }
});
```
**Signature:**
```typescript
regenerate: (options?: {
messageId?: string;
headers?: Record<string, string> | Headers;
body?: object;
metadata?: unknown;
}) => Promise<void>
```
### stop
Stop the current streaming request.
```typescript
await stop();
```
Aborts the active request and sets status to `ready`. Keeps any tokens generated so far.
### resumeStream
Resume an interrupted streaming response.
```typescript
await resumeStream();
// With options
await resumeStream({
headers: { 'X-Session': 'token' }
});
```
**Signature:**
```typescript
resumeStream: (options?: ChatRequestOptions) => Promise<void>
```
### setMessages
Update messages locally without triggering a request.
```typescript
// Set messages directly
setMessages([...newMessages]);
// Update with function
setMessages(current => current.filter(m => m.role !== 'system'));
// Clear all messages
setMessages([]);
```
**Signature:**
```typescript
setMessages: (
messages: UI_MESSAGE[] | ((messages: UI_MESSAGE[]) => UI_MESSAGE[])
) => void
```
### clearError
Clear error state and reset to ready.
```typescript
if (error) {
clearError();
}
```
### addToolOutput
Provide output for a client-side tool call.
```typescript
// Successful tool execution
await addToolOutput({
tool: 'getWeather',
toolCallId: 'call-123',
output: { temperature: 72, weather: 'sunny' }
});
// Tool execution error
await addToolOutput({
state: 'output-error',
tool: 'getWeather',
toolCallId: 'call-123',
errorText: 'API unavailable'
});
```
**Signature:**
```typescript
addToolOutput: <TOOL extends keyof Tools>(
options:
| {
state?: 'output-available';
tool: TOOL;
toolCallId: string;
output: ToolOutput<TOOL>;
}
| {
state: 'output-error';
tool: TOOL;
toolCallId: string;
errorText: string;
}
) => Promise<void>
```
### addToolApprovalResponse
Respond to a tool approval request.
```typescript
// Approve tool execution
await addToolApprovalResponse({
id: 'approval-123',
approved: true,
reason: 'Safe to proceed'
});
// Deny tool execution
await addToolApprovalResponse({
id: 'approval-123',
approved: false,
reason: 'User denied location access'
});
```
**Signature:**
```typescript
addToolApprovalResponse: (options: {
id: string;
approved: boolean;
reason?: string;
}) => Promise<void>
```
## Callbacks
### onError
Called when any error occurs during the chat.
```typescript
useChat({
onError: (error) => {
console.error('Chat error:', error);
toast.error(error.message);
}
});
```
### onToolCall
Called when a tool call is received. Use for automatic client-side tool execution.
```typescript
useChat({
onToolCall: async ({ toolCall }) => {
// Handle different tools
if (toolCall.toolName === 'getLocation') {
const location = await getCurrentLocation();
addToolOutput({
tool: 'getLocation',
toolCallId: toolCall.toolCallId,
output: location
});
}
}
});
```
**Important**: This callback is for automatic execution. For user-interactive tools (like confirmations), handle them in the render phase.
### onFinish
Called when streaming completes (success, abort, or error).
```typescript
useChat({
onFinish: ({ message, messages, isAbort, isDisconnect, isError, finishReason }) => {
if (isError) {
console.error('Stream ended with error');
return;
}
if (isAbort) {
console.log('User aborted request');
return;
}
if (isDisconnect) {
console.warn('Network disconnected');
return;
}
// Save to database
saveMessages(messages);
console.log('Finish reason:', finishReason); // 'stop' | 'length' | 'tool-calls' | ...
}
});
```
### onData
Called when custom data parts are received.
```typescript
useChat<UIMessage<never, { progress: { percent: number } }>>({
onData: (dataPart) => {
if (dataPart.type === 'data-progress') {
updateProgressBar(dataPart.data.percent);
}
}
});
```
## Types
### ChatRequestOptions
```typescript
interface ChatRequestOptions {
// Additional headers for the API request
headers?: Record<string, string> | Headers;
// Additional body properties for the API request
body?: object;
// Request-specific metadata
metadata?: unknown;
}
```
### CreateUIMessage
```typescript
type CreateUIMessage<UI_MESSAGE extends UIMessage> = Omit<
UI_MESSAGE,
'id' | 'role'
> & {
id?: UI_MESSAGE['id'];
role?: UI_MESSAGE['role'];
};
```
Used for creating messages without requiring `id` and `role` (auto-generated).
## Examples
### Basic Chat
```typescript
function ChatComponent() {
const { messages, status, sendMessage } = useChat({
id: 'my-chat',
onError: (error) => toast.error(error.message)
});
return (
<div>
{messages.map(msg => (
<div key={msg.id}>{msg.role}: {msg.parts[0]?.text}</div>
))}
<input
disabled={status !== 'ready'}
onSubmit={(text) => sendMessage({ text })}
/>
</div>
);
}
```
### With Tools
```typescript
function ChatWithTools() {
const { messages, addToolOutput } = useChat<MyToolMessage>({
onToolCall: async ({ toolCall }) => {
if (toolCall.toolName === 'autoExecute') {
const result = await executeAutomatically();
addToolOutput({
tool: 'autoExecute',
toolCallId: toolCall.toolCallId,
output: result
});
}
},
sendAutomaticallyWhen: lastAssistantMessageIsCompleteWithToolCalls
});
return (
<div>
{messages.map(msg => (
<Message
key={msg.id}
message={msg}
onToolApprove={(toolCallId) =>
addToolOutput({
tool: 'userConfirm',
toolCallId,
output: 'confirmed'
})
}
/>
))}
</div>
);
}
```
### Custom Transport
```typescript
const { messages } = useChat({
transport: new DefaultChatTransport({
api: '/api/my-chat',
prepareSendMessagesRequest: ({ id, messages, trigger, messageId }) => ({
url: `/api/my-chat/id`,
body: {
lastMessage: messages[messages.length - 1],
action: trigger,
targetMessageId: messageId
},
headers: {
'X-Session-Token': getSessionToken()
}
})
})
});
```
### Throttled Updates
```typescript
// Update UI at most once per 100ms during streaming
const { messages } = useChat({
experimental_throttle: 100
});
```
### Resume Interrupted Stream
```typescript
function ChatComponent({ chatId }) {
const [shouldResume, setShouldResume] = useState(false);
useEffect(() => {
// Check if there's an active stream for this chat
checkActiveStream(chatId).then(setShouldResume);
}, [chatId]);
const chat = useChat({
id: chatId,
resume: shouldResume
});
return <ChatUI {...chat} />;
}
```
Register and implement PydanticAI tools with proper context handling, type annotations, and docstrings. Use when adding tool capabilities to agents, implemen...
---
name: pydantic-ai-tool-system
description: Register and implement PydanticAI tools with proper context handling, type annotations, and docstrings. Use when adding tool capabilities to agents, implementing function calling, or creating agent actions.
---
# PydanticAI Tool System
## Tool Registration
Two decorators based on whether you need context:
```python
from pydantic_ai import Agent, RunContext
agent = Agent('openai:gpt-4o')
# @agent.tool - First param MUST be RunContext
@agent.tool
async def get_user_data(ctx: RunContext[MyDeps], user_id: int) -> str:
"""Get user data from database.
Args:
ctx: The run context with dependencies.
user_id: The user's ID.
"""
return await ctx.deps.db.get_user(user_id)
# @agent.tool_plain - NO context parameter allowed
@agent.tool_plain
def calculate_total(prices: list[float]) -> float:
"""Calculate total price.
Args:
prices: List of prices to sum.
"""
return sum(prices)
```
## Critical Rules
1. **@agent.tool**: First parameter MUST be `RunContext[DepsType]`
2. **@agent.tool_plain**: MUST NOT have `RunContext` parameter
3. **Docstrings**: Required for LLM to understand tool purpose
4. **Google-style docstrings**: Used for parameter descriptions
### Gates (verify in the file, not from memory)
1. **Decorator matches signature** — If the first parameter is `RunContext[...]`, the decorator must be `@agent.tool` (not `@agent.tool_plain`). **Pass:** the same `def` line’s decorator stack includes `@agent.tool`, and the first parameter is typed `RunContext[...]`.
2. **Plain tools** — With `@agent.tool_plain`, the parameter list must not include `RunContext`. **Pass:** a quick scan of the signature shows no `RunContext`.
3. **Docstring for the model** — Non-empty docstring; if the tool has parameters, describe them (Google `Args:` or Sphinx `:param` when using `docstring_format='sphinx'`). **Pass:** each parameter in the signature is mentioned in the docstring body.
## Docstring Formats
Google style (default):
```python
@agent.tool_plain
async def search(query: str, limit: int = 10) -> list[str]:
"""Search for items.
Args:
query: The search query.
limit: Maximum results to return.
"""
```
Sphinx style:
```python
@agent.tool_plain(docstring_format='sphinx')
async def search(query: str) -> list[str]:
"""Search for items.
:param query: The search query.
"""
```
## Tool Return Types
Tools can return various types:
```python
# String (direct)
@agent.tool_plain
def get_info() -> str:
return "Some information"
# Pydantic model (serialized to JSON)
@agent.tool_plain
def get_user() -> User:
return User(name="John", age=30)
# Dict (serialized to JSON)
@agent.tool_plain
def get_data() -> dict[str, Any]:
return {"key": "value"}
# ToolReturn for custom content types
from pydantic_ai import ToolReturn, ImageUrl
@agent.tool_plain
def get_image() -> ToolReturn:
return ToolReturn(content=[ImageUrl(url="https://...")])
```
## Accessing Context
RunContext provides:
```python
@agent.tool
async def my_tool(ctx: RunContext[MyDeps]) -> str:
# Dependencies
db = ctx.deps.db
api = ctx.deps.api_client
# Model info
model_name = ctx.model.model_name
# Usage tracking
tokens_used = ctx.usage.total_tokens
# Retry info
attempt = ctx.retry # Current retry attempt (0-based)
max_retries = ctx.max_retries
# Message history
messages = ctx.messages
return "result"
```
## Tool Prepare Functions
Dynamically modify tools per-request:
```python
from pydantic_ai.tools import ToolDefinition
async def prepare_tools(
ctx: RunContext[MyDeps],
tool_defs: list[ToolDefinition]
) -> list[ToolDefinition]:
"""Filter or modify tools based on context."""
if ctx.deps.user_role != 'admin':
# Hide admin tools from non-admins
return [t for t in tool_defs if not t.name.startswith('admin_')]
return tool_defs
agent = Agent('openai:gpt-4o', prepare_tools=prepare_tools)
```
## Toolsets
Group and compose tools:
```python
from pydantic_ai import FunctionToolset, CombinedToolset
# Create a toolset
db_tools = FunctionToolset()
@db_tools.tool
def query_users(name: str) -> list[dict]:
"""Query users by name."""
...
@db_tools.tool
def update_user(id: int, data: dict) -> bool:
"""Update user data."""
...
# Use in agent
agent = Agent('openai:gpt-4o', toolsets=[db_tools])
# Combine toolsets
all_tools = CombinedToolset([db_tools, api_tools])
```
## Common Mistakes
### Wrong: Context in tool_plain
```python
@agent.tool_plain
async def bad_tool(ctx: RunContext[MyDeps]) -> str: # ERROR!
...
```
### Wrong: Missing context in tool
```python
@agent.tool
def bad_tool(user_id: int) -> str: # ERROR!
...
```
### Wrong: Context not first parameter
```python
@agent.tool
def bad_tool(user_id: int, ctx: RunContext[MyDeps]) -> str: # ERROR!
...
```
## Async vs Sync
Both work, but async is preferred for I/O:
```python
# Async (preferred for I/O operations)
@agent.tool
async def fetch_data(ctx: RunContext[Deps]) -> str:
return await ctx.deps.client.get('/data')
# Sync (fine for CPU-bound operations)
@agent.tool_plain
def compute(x: int, y: int) -> int:
return x * y
```
Test PydanticAI agents using TestModel, FunctionModel, VCR cassettes, and inline snapshots. Use when writing unit tests, mocking LLM responses, or recording...
---
name: pydantic-ai-testing
description: Test PydanticAI agents using TestModel, FunctionModel, VCR cassettes, and inline snapshots. Use when writing unit tests, mocking LLM responses, or recording API interactions.
---
# Testing PydanticAI Agents
## TestModel (Deterministic Testing)
Use `TestModel` for tests without API calls:
```python
import pytest
from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
def test_agent_basic():
agent = Agent('openai:gpt-4o')
# Override with TestModel for testing
result = agent.run_sync('Hello', model=TestModel())
# TestModel generates deterministic output based on output_type
assert isinstance(result.output, str)
```
## TestModel Configuration
```python
from pydantic_ai.models.test import TestModel
# Custom text output
model = TestModel(custom_output_text='Custom response')
result = agent.run_sync('Hello', model=model)
assert result.output == 'Custom response'
# Custom structured output (for output_type agents)
from pydantic import BaseModel
class Response(BaseModel):
message: str
score: int
agent = Agent('openai:gpt-4o', output_type=Response)
model = TestModel(custom_output_args={'message': 'Test', 'score': 42})
result = agent.run_sync('Hello', model=model)
assert result.output.message == 'Test'
# Seed for reproducible random output
model = TestModel(seed=42)
# Force tool calls
model = TestModel(call_tools=['my_tool', 'another_tool'])
```
## Override Context Manager
```python
from pydantic_ai import Agent
from pydantic_ai.models.test import TestModel
agent = Agent('openai:gpt-4o', deps_type=MyDeps)
def test_with_override():
mock_deps = MyDeps(db=MockDB())
with agent.override(model=TestModel(), deps=mock_deps):
# All runs use TestModel and mock_deps
result = agent.run_sync('Hello')
assert result.output
```
## FunctionModel (Custom Logic)
For complete control over model responses:
```python
from pydantic_ai import Agent, ModelMessage, ModelResponse, TextPart
from pydantic_ai.models.function import AgentInfo, FunctionModel
def custom_model(
messages: list[ModelMessage],
info: AgentInfo
) -> ModelResponse:
"""Custom model that inspects messages and returns response."""
# Access the last user message
last_msg = messages[-1]
# Return custom response
return ModelResponse(parts=[TextPart('Custom response')])
agent = Agent(FunctionModel(custom_model))
result = agent.run_sync('Hello')
```
### FunctionModel with Tool Calls
```python
from pydantic_ai import ToolCallPart, ModelResponse
from pydantic_ai.models.function import AgentInfo, FunctionModel
def model_with_tools(
messages: list[ModelMessage],
info: AgentInfo
) -> ModelResponse:
# First request: call a tool
if len(messages) == 1:
return ModelResponse(parts=[
ToolCallPart(
tool_name='get_data',
args='{"id": 123}'
)
])
# After tool response: return final result
return ModelResponse(parts=[TextPart('Done with tool result')])
agent = Agent(FunctionModel(model_with_tools))
@agent.tool_plain
def get_data(id: int) -> str:
return f"Data for {id}"
result = agent.run_sync('Get data')
```
## VCR Cassettes (Recorded API Calls)
Record and replay real LLM API interactions:
```python
import pytest
@pytest.mark.vcr
def test_with_recorded_response():
"""Uses recorded cassette from tests/cassettes/"""
agent = Agent('openai:gpt-4o')
result = agent.run_sync('Hello')
assert 'hello' in result.output.lower()
# To record/update cassettes:
# uv run pytest --record-mode=rewrite tests/test_file.py
```
Cassette files are stored in `tests/cassettes/` as YAML.
## Inline Snapshots
Assert expected outputs with auto-updating snapshots:
```python
from inline_snapshot import snapshot
def test_agent_output():
result = agent.run_sync('Hello', model=TestModel())
# First run: creates snapshot
# Subsequent runs: asserts against it
assert result.output == snapshot('expected output here')
# Update snapshots:
# uv run pytest --inline-snapshot=fix
```
## Gates: VCR cassettes and inline snapshots
Recording or fixing rewrites files on disk. Follow this sequence; do not skip steps.
1. **Replay pass (no record/fix flags):** Run `uv run pytest` on the target path; **all green** (or failures are understood and unrelated to the artifact you will refresh).
2. **Scope locked:** Identify the cassette under `tests/cassettes/` or the `snapshot(...)` assertion to update; confirm **only** those files should change.
3. **Record or fix:** Run **one** scoped command: `uv run pytest --record-mode=rewrite …` **or** `uv run pytest --inline-snapshot=fix …` for that path only.
4. **Post-condition:** Run the same tests again **without** record/fix flags; **all green**. Inspect `git diff` — only expected `.yaml` / snapshot changes.
If step 4 fails, revert unintended diffs and fix the test or model before re-recording.
## Testing Tools
```python
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.test import TestModel
def test_tool_is_called():
agent = Agent('openai:gpt-4o')
tool_called = False
@agent.tool_plain
def my_tool(x: int) -> str:
nonlocal tool_called
tool_called = True
return f"Result: {x}"
# Force TestModel to call the tool
result = agent.run_sync(
'Use my_tool',
model=TestModel(call_tools=['my_tool'])
)
assert tool_called
```
## Testing with Dependencies
```python
from dataclasses import dataclass
from unittest.mock import AsyncMock
@dataclass
class Deps:
api: ApiClient
def test_tool_with_deps():
# Create mock dependency
mock_api = AsyncMock()
mock_api.fetch.return_value = {'data': 'test'}
agent = Agent('openai:gpt-4o', deps_type=Deps)
@agent.tool
async def fetch_data(ctx: RunContext[Deps]) -> dict:
return await ctx.deps.api.fetch()
with agent.override(
model=TestModel(call_tools=['fetch_data']),
deps=Deps(api=mock_api)
):
result = agent.run_sync('Fetch data')
mock_api.fetch.assert_called_once()
```
## Capture Messages
Inspect all messages in a run:
```python
from pydantic_ai import Agent, capture_run_messages
agent = Agent('openai:gpt-4o')
with capture_run_messages() as messages:
result = agent.run_sync('Hello', model=TestModel())
# Inspect captured messages
for msg in messages:
print(msg)
```
## Testing Patterns Summary
| Scenario | Approach |
|----------|----------|
| Unit tests without API | `TestModel()` |
| Custom model logic | `FunctionModel(func)` |
| Recorded real responses | `@pytest.mark.vcr` |
| Assert output structure | `inline_snapshot` |
| Test tools are called | `TestModel(call_tools=[...])` |
| Mock dependencies | `agent.override(deps=...)` |
## pytest Configuration
Typical `pyproject.toml`:
```toml
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto" # For async tests
```
Run tests:
```bash
uv run pytest tests/test_agent.py -v
uv run pytest --inline-snapshot=fix # Update snapshots
```
Configure LLM providers, use fallback models, handle streaming, and manage model settings in PydanticAI. Use when selecting models, implementing resilience,...
---
name: pydantic-ai-model-integration
description: Configure LLM providers, use fallback models, handle streaming, and manage model settings in PydanticAI. Use when selecting models, implementing resilience, or optimizing API calls.
---
# PydanticAI Model Integration
## Provider Model Strings
Format: `provider:model-name`
```python
from pydantic_ai import Agent
# OpenAI
Agent('openai:gpt-4o')
Agent('openai:gpt-4o-mini')
Agent('openai:o1-preview')
# Anthropic
Agent('anthropic:claude-sonnet-4-5')
Agent('anthropic:claude-haiku-4-5')
# Google (API Key)
Agent('google-gla:gemini-2.0-flash')
Agent('google-gla:gemini-2.0-pro')
# Google (Vertex AI)
Agent('google-vertex:gemini-2.0-flash')
# Groq
Agent('groq:llama-3.3-70b-versatile')
Agent('groq:mixtral-8x7b-32768')
# Mistral
Agent('mistral:mistral-large-latest')
# Other providers
Agent('cohere:command-r-plus')
Agent('bedrock:anthropic.claude-3-sonnet')
```
## Model Settings
```python
from pydantic_ai import Agent
from pydantic_ai.settings import ModelSettings
agent = Agent(
'openai:gpt-4o',
model_settings=ModelSettings(
temperature=0.7,
max_tokens=1000,
top_p=0.9,
timeout=30.0, # Request timeout
)
)
# Override per-run
result = await agent.run(
'Generate creative text',
model_settings=ModelSettings(temperature=1.0)
)
```
## Fallback Models
Chain models for resilience:
```python
from pydantic_ai.models.fallback import FallbackModel
# Try models in order until one succeeds
fallback = FallbackModel(
'openai:gpt-4o',
'anthropic:claude-sonnet-4-5',
'google-gla:gemini-2.0-flash'
)
agent = Agent(fallback)
result = await agent.run('Hello')
# Custom fallback conditions
from pydantic_ai.exceptions import ModelAPIError
def should_fallback(error: Exception) -> bool:
"""Only fallback on rate limits or server errors."""
if isinstance(error, ModelAPIError):
return error.status_code in (429, 500, 502, 503)
return False
fallback = FallbackModel(
'openai:gpt-4o',
'anthropic:claude-sonnet-4-5',
fallback_on=should_fallback
)
```
## Streaming Responses
```python
async def stream_response():
async with agent.run_stream('Tell me a story') as response:
# Stream text output
async for chunk in response.stream_output():
print(chunk, end='', flush=True)
# Access final result after streaming
print(f"\nTokens used: {response.usage().total_tokens}")
```
### Streaming with Structured Output
```python
from pydantic import BaseModel
class Story(BaseModel):
title: str
content: str
moral: str
agent = Agent('openai:gpt-4o', output_type=Story)
async with agent.run_stream('Write a fable') as response:
# For structured output, stream_output yields partial JSON
async for partial in response.stream_output():
print(partial) # Partial Story object as parsed
# Final validated result
story = response.output
```
## Dynamic Model Selection
```python
import os
# Environment-based selection
model = os.getenv('PYDANTIC_AI_MODEL', 'openai:gpt-4o')
agent = Agent(model)
# Runtime model override
result = await agent.run(
'Hello',
model='anthropic:claude-sonnet-4-5' # Override default
)
# Context manager override
with agent.override(model='google-gla:gemini-2.0-flash'):
result = agent.run_sync('Hello')
```
## Deferred Model Checking
Delay model validation for testing:
```python
# Default: Validates model immediately (checks env vars)
agent = Agent('openai:gpt-4o')
# Deferred: Validates only on first run
agent = Agent('openai:gpt-4o', defer_model_check=True)
# Useful for testing with override
with agent.override(model=TestModel()):
result = agent.run_sync('Test') # No OpenAI key needed
```
## Usage Tracking
```python
result = await agent.run('Hello')
# Request usage (last request)
usage = result.usage()
print(f"Input tokens: {usage.input_tokens}")
print(f"Output tokens: {usage.output_tokens}")
print(f"Total tokens: {usage.total_tokens}")
# Full run usage (all requests in run)
run_usage = result.run_usage()
print(f"Total requests: {run_usage.requests}")
```
## Usage Limits
```python
from pydantic_ai.usage import UsageLimits
# Limit token usage
result = await agent.run(
'Generate content',
usage_limits=UsageLimits(
total_tokens=1000,
request_tokens=500,
response_tokens=500,
)
)
```
## Provider-Specific Features
### OpenAI
```python
from pydantic_ai.models.openai import OpenAIModel
model = OpenAIModel(
'gpt-4o',
api_key='your-key', # Or use OPENAI_API_KEY env var
base_url='https://custom-endpoint.com' # For Azure, proxies
)
```
### Anthropic
```python
from pydantic_ai.models.anthropic import AnthropicModel
model = AnthropicModel(
'claude-sonnet-4-5',
api_key='your-key' # Or ANTHROPIC_API_KEY
)
```
## Common Model Patterns
| Use Case | Recommendation |
|----------|---------------|
| General purpose | `openai:gpt-4o` or `anthropic:claude-sonnet-4-5` |
| Fast/cheap | `openai:gpt-4o-mini` or `anthropic:claude-haiku-4-5` |
| Long context | `anthropic:claude-sonnet-4-5` (200k) or `google-gla:gemini-2.0-flash` |
| Reasoning | `openai:o1-preview` |
| Cost-sensitive prod | `FallbackModel` with fast model first |
## Check gates before ship
Use these only where they prevent obvious misconfiguration; they do not replace integration tests.
- **Fallback chain order:** **Pass:** The first model passed to `FallbackModel(...)` is the intended primary; each subsequent model is a deliberate fallback (not reversed by mistake).
- **Secrets:** **Pass:** Production and shared scripts load API keys from environment variables or a platform secret store; no real keys committed (placeholders only in examples).
Implement dependency injection in PydanticAI agents using RunContext and deps_type. Use when agents need database connections, API clients, user context, or...
---
name: pydantic-ai-dependency-injection
description: Implement dependency injection in PydanticAI agents using RunContext and deps_type. Use when agents need database connections, API clients, user context, or any external resources.
---
# PydanticAI Dependency Injection
## Core Pattern
Dependencies flow through `RunContext`:
```python
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
@dataclass
class Deps:
db: DatabaseConn
api_client: HttpClient
user_id: int
agent = Agent(
'openai:gpt-4o',
deps_type=Deps, # Type for static analysis
)
@agent.tool
async def get_user_balance(ctx: RunContext[Deps]) -> float:
"""Get the current user's account balance."""
return await ctx.deps.db.get_balance(ctx.deps.user_id)
# At runtime, provide deps
result = await agent.run(
'What is my balance?',
deps=Deps(db=db_conn, api_client=client, user_id=123)
)
```
## Defining Dependencies
Use dataclasses or Pydantic models:
```python
from dataclasses import dataclass
from pydantic import BaseModel
# Dataclass (recommended for simplicity)
@dataclass
class Deps:
db: DatabaseConnection
cache: CacheClient
user_context: UserContext
# Pydantic model (if you need validation)
class Deps(BaseModel):
api_key: str
endpoint: str
timeout: int = 30
```
## Accessing Dependencies
In tools and instructions:
```python
@agent.tool
async def query_database(ctx: RunContext[Deps], query: str) -> list[dict]:
"""Run a database query."""
return await ctx.deps.db.execute(query)
@agent.instructions
async def add_user_context(ctx: RunContext[Deps]) -> str:
user = await ctx.deps.db.get_user(ctx.deps.user_id)
return f"User name: {user.name}, Role: {user.role}"
@agent.system_prompt
def add_permissions(ctx: RunContext[Deps]) -> str:
return f"User has permissions: {ctx.deps.permissions}"
```
## Type Safety
Full type checking with generics:
```python
# Explicit agent type annotation
agent: Agent[Deps, OutputModel] = Agent(
'openai:gpt-4o',
deps_type=Deps,
output_type=OutputModel,
)
# Now these are type-checked:
# - ctx.deps in tools is typed as Deps
# - result.output is typed as OutputModel
# - agent.run() requires deps: Deps
```
## No Dependencies Pattern
When you don't need dependencies:
```python
# Option 1: No deps_type (defaults to NoneType)
agent = Agent('openai:gpt-4o')
result = agent.run_sync('Hello') # No deps needed
# Option 2: Explicit None for type checker
agent: Agent[None, str] = Agent('openai:gpt-4o')
result = agent.run_sync('Hello', deps=None)
# In tool_plain, no context access
@agent.tool_plain
def simple_calc(a: int, b: int) -> int:
return a + b
```
## Complete Example
```python
from dataclasses import dataclass
from httpx import AsyncClient
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
@dataclass
class WeatherDeps:
client: AsyncClient
api_key: str
class WeatherReport(BaseModel):
location: str
temperature: float
conditions: str
agent: Agent[WeatherDeps, WeatherReport] = Agent(
'openai:gpt-4o',
deps_type=WeatherDeps,
output_type=WeatherReport,
instructions='You are a weather assistant.',
)
@agent.tool
async def get_weather(
ctx: RunContext[WeatherDeps],
city: str
) -> dict:
"""Fetch weather data for a city."""
response = await ctx.deps.client.get(
f'https://api.weather.com/{city}',
headers={'Authorization': ctx.deps.api_key}
)
return response.json()
async def main():
async with AsyncClient() as client:
deps = WeatherDeps(client=client, api_key='secret')
result = await agent.run('Weather in London?', deps=deps)
print(result.output.temperature)
```
## Override for Testing
```python
from pydantic_ai.models.test import TestModel
# Create mock dependencies
mock_deps = Deps(
db=MockDatabase(),
api_client=MockClient(),
user_id=999
)
# Override model and deps for testing
with agent.override(model=TestModel(), deps=mock_deps):
result = agent.run_sync('Test prompt')
```
## Gates
Run these in order before treating the agent as correct; each step has an objective pass condition.
1. **Deps cover every access** — Collect every `ctx.deps.<attr>` (and nested uses) from tools, `@agent.instructions`, and `@agent.system_prompt`. **Pass:** each `<attr>` exists on `deps_type` (and static checking passes if you use mypy/pyright on `Agent[DepsType, …]`).
2. **Every run that needs deps gets them** — **Pass:** each `agent.run` / `run_sync` path that executes those tools passes `deps=` whose type matches `deps_type` (no `None` unless the agent truly has no deps).
3. **Tests pin deps shape** — **Pass:** tests that use `agent.override` pass a `deps=` value with the same fields/types as production `Deps` (not a partial mock unless tools under test never touch missing fields).
## Best Practices
1. **Keep deps immutable**: Use frozen dataclasses or Pydantic models
2. **Pass connections, not credentials**: Deps should hold initialized clients
3. **Type your agents**: Use `Agent[DepsType, OutputType]` for full type safety
4. **Scope deps appropriately**: Create deps at the start of a request, close after
Avoid common mistakes and debug issues in PydanticAI agents. Use when encountering errors, unexpected behavior, or when reviewing agent implementations.
---
name: pydantic-ai-common-pitfalls
description: Avoid common mistakes and debug issues in PydanticAI agents. Use when encountering errors, unexpected behavior, or when reviewing agent implementations.
---
# PydanticAI Common Pitfalls and Debugging
## Tool Decorator Errors
### Wrong: RunContext in tool_plain
```python
# ERROR: RunContext not allowed in tool_plain
@agent.tool_plain
async def bad_tool(ctx: RunContext[MyDeps]) -> str:
return "oops"
# UserError: RunContext annotations can only be used with tools that take context
```
**Fix**: Use `@agent.tool` if you need context:
```python
@agent.tool
async def good_tool(ctx: RunContext[MyDeps]) -> str:
return "works"
```
### Wrong: Missing RunContext in tool
```python
# ERROR: First param must be RunContext
@agent.tool
def bad_tool(user_id: int) -> str:
return "oops"
# UserError: First parameter of tools that take context must be annotated with RunContext[...]
```
**Fix**: Add RunContext as first parameter:
```python
@agent.tool
def good_tool(ctx: RunContext[MyDeps], user_id: int) -> str:
return "works"
```
### Wrong: RunContext not first
```python
# ERROR: RunContext must be first parameter
@agent.tool
def bad_tool(user_id: int, ctx: RunContext[MyDeps]) -> str:
return "oops"
```
**Fix**: RunContext must always be the first parameter.
## Valid Patterns (Not Errors)
### Raw Function Tool Registration
The following pattern IS valid and supported by pydantic-ai:
```python
from pydantic_ai import Agent, RunContext
async def search_db(ctx: RunContext[MyDeps], query: str) -> list[dict]:
"""Search the database."""
return await ctx.deps.db.search(query)
async def get_user(ctx: RunContext[MyDeps], user_id: int) -> dict:
"""Get user by ID."""
return await ctx.deps.db.get_user(user_id)
# Valid: Pass raw functions to Agent(tools=[...])
agent = Agent(
'openai:gpt-4o',
deps_type=MyDeps,
tools=[search_db, get_user] # RunContext detected from signature
)
```
**Why this works:** PydanticAI inspects function signatures. If the first parameter is `RunContext[T]`, it's treated as a context-aware tool. No decorator required.
**Reference:** https://ai.pydantic.dev/agents/#registering-tools-via-the-tools-argument
**Do NOT flag** code that passes functions with `RunContext` signatures to `Agent(tools=[...])`. This is equivalent to using `@agent.tool` and is explicitly documented.
## Dependency Type Mismatches
### Wrong: Missing deps at runtime
```python
agent = Agent('openai:gpt-4o', deps_type=MyDeps)
# ERROR: deps required but not provided
result = agent.run_sync('Hello') # Missing deps!
```
**Fix**: Always provide deps when deps_type is set:
```python
result = agent.run_sync('Hello', deps=MyDeps(...))
```
### Wrong: Wrong deps type
```python
@dataclass
class AppDeps:
db: Database
@dataclass
class WrongDeps:
api: ApiClient
agent = Agent('openai:gpt-4o', deps_type=AppDeps)
# Type error: WrongDeps != AppDeps
result = agent.run_sync('Hello', deps=WrongDeps(...))
```
## Output Type Issues
### Pydantic validation fails
```python
class Response(BaseModel):
count: int
items: list[str]
agent = Agent('openai:gpt-4o', output_type=Response)
result = agent.run_sync('List items')
# May fail if LLM returns wrong structure
```
**Fix**: Increase retries or improve prompt:
```python
agent = Agent(
'openai:gpt-4o',
output_type=Response,
retries=3, # More attempts
instructions='Return JSON with count (int) and items (list of strings).'
)
```
### Complex nested types
```python
# May cause schema issues with some models
class Complex(BaseModel):
nested: dict[str, list[tuple[int, str]]]
```
**Fix**: Simplify or use intermediate models:
```python
class Item(BaseModel):
id: int
name: str
class Simple(BaseModel):
items: list[Item]
```
## Async vs Sync Mistakes
### Wrong: Calling async in sync context
```python
# ERROR: Can't await in sync function
def handler():
result = await agent.run('Hello') # SyntaxError!
```
**Fix**: Use run_sync or make handler async:
```python
def handler():
result = agent.run_sync('Hello')
# Or
async def handler():
result = await agent.run('Hello')
```
### Wrong: Blocking in async tools
```python
@agent.tool
async def slow_tool(ctx: RunContext[Deps]) -> str:
time.sleep(5) # WRONG: Blocks event loop!
return "done"
```
**Fix**: Use async I/O:
```python
@agent.tool
async def slow_tool(ctx: RunContext[Deps]) -> str:
await asyncio.sleep(5) # Correct
return "done"
```
## Model Configuration Errors
### Missing API key
```python
# ERROR: OPENAI_API_KEY not set
agent = Agent('openai:gpt-4o')
result = agent.run_sync('Hello')
# ModelAPIError: Authentication failed
```
**Fix**: Set environment variable or use defer_model_check:
```python
# For testing
agent = Agent('openai:gpt-4o', defer_model_check=True)
with agent.override(model=TestModel()):
result = agent.run_sync('Hello')
```
### Invalid model string
```python
# ERROR: Unknown provider
agent = Agent('unknown:model')
# ValueError: Unknown model provider
```
**Fix**: Use valid provider:model format.
## Streaming Issues
### Wrong: Using result before stream completes
```python
async with agent.run_stream('Hello') as response:
# DON'T access .output before streaming completes
print(response.output) # May be incomplete!
# Correct: access after context manager
print(response.output) # Complete result
```
### Wrong: Not iterating stream
```python
async with agent.run_stream('Hello') as response:
pass # Never consumed!
# Stream was never read - output may be incomplete
```
**Fix**: Always consume the stream:
```python
async with agent.run_stream('Hello') as response:
async for chunk in response.stream_output():
print(chunk, end='')
```
## Tool Return Issues
### Wrong: Returning non-serializable
```python
@agent.tool_plain
def bad_return() -> object:
return CustomObject() # Can't serialize!
```
**Fix**: Return serializable types (str, dict, Pydantic model):
```python
@agent.tool_plain
def good_return() -> dict:
return {"key": "value"}
```
## Debugging Tips
### Gates (ambiguous failures)
When the agent misbehaves but the stack trace or error string is unclear, follow **in order**; do not skip ahead.
1. **Capture evidence** — Re-run with `capture_run_messages()` or, after `run`/`run_sync`, inspect `result.all_messages()` (or print message types in order). **Pass:** You can name the message part type and one line of content that explains the failure (e.g. `RetryPromptPart`, tool return, model text).
2. **Separate model from schema** — If validation or `RetryPromptPart` appears, compare the last model message to your `output_type` fields and types. **Pass:** You identified a concrete mismatch (missing field, wrong type, refusal) before only raising `retries` or changing the model.
3. **Confirm deps on every path** — If tools or deps fail, verify each call site (`run`, `run_sync`, `run_stream`) supplies `deps=...` when `deps_type` is set. **Pass:** A minimal repro with explicit `deps` matches or rules out a deps wiring bug.
### Enable tracing
```python
import logfire
logfire.configure()
logfire.instrument_pydantic_ai()
# Or per-agent
agent = Agent('openai:gpt-4o', instrument=True)
```
### Capture messages
```python
from pydantic_ai import capture_run_messages
with capture_run_messages() as messages:
result = agent.run_sync('Hello')
for msg in messages:
print(type(msg).__name__, msg)
```
### Check model responses
```python
result = agent.run_sync('Hello')
print(result.all_messages()) # Full message history
print(result.response) # Last model response
print(result.usage()) # Token usage
```
## Common Error Messages
| Error | Cause | Fix |
|-------|-------|-----|
| `First parameter... RunContext` | @agent.tool missing ctx | Add `ctx: RunContext[...]` |
| `RunContext... only... context` | @agent.tool_plain has ctx | Remove ctx or use @agent.tool |
| `Unknown model provider` | Invalid model string | Use valid `provider:model` |
| `ModelAPIError` | API auth/quota | Check API key, limits |
| `RetryPromptPart` in messages | Validation failed | Check output_type, increase retries |
Create PydanticAI agents with type-safe dependencies, structured outputs, and proper configuration. Use when building AI agents, creating chat systems, or in...
---
name: pydantic-ai-agent-creation
description: Create PydanticAI agents with type-safe dependencies, structured outputs, and proper configuration. Use when building AI agents, creating chat systems, or integrating LLMs with Pydantic validation.
---
# Creating PydanticAI Agents
## Quick Start
```python
from pydantic_ai import Agent
# Minimal agent (text output)
agent = Agent('openai:gpt-4o')
result = agent.run_sync('Hello!')
print(result.output) # str
```
## Model Selection
Model strings follow `provider:model-name` format:
```python
# OpenAI
agent = Agent('openai:gpt-4o')
agent = Agent('openai:gpt-4o-mini')
# Anthropic
agent = Agent('anthropic:claude-sonnet-4-5')
agent = Agent('anthropic:claude-haiku-4-5')
# Google
agent = Agent('google-gla:gemini-2.0-flash')
agent = Agent('google-vertex:gemini-2.0-flash')
# Others: groq:, mistral:, cohere:, bedrock:, etc.
```
## Structured Outputs
Use Pydantic models for validated, typed responses:
```python
from pydantic import BaseModel
from pydantic_ai import Agent
class CityInfo(BaseModel):
city: str
country: str
population: int
agent = Agent('openai:gpt-4o', output_type=CityInfo)
result = agent.run_sync('Tell me about Paris')
print(result.output.city) # "Paris"
print(result.output.population) # int, validated
```
## Agent Configuration
```python
from pydantic_ai import Agent
from pydantic_ai.settings import ModelSettings
agent = Agent(
'openai:gpt-4o',
output_type=MyOutput, # Structured output type
deps_type=MyDeps, # Dependency injection type
instructions='You are helpful.', # Static instructions
retries=2, # Retry attempts for validation
name='my-agent', # For logging/tracing
model_settings=ModelSettings( # Provider settings
temperature=0.7,
max_tokens=1000
),
end_strategy='early', # How to handle tool calls with results
)
```
## Running Agents
Three execution methods:
```python
# Async (preferred)
result = await agent.run('prompt', deps=my_deps)
# Sync (convenience)
result = agent.run_sync('prompt', deps=my_deps)
# Streaming
async with agent.run_stream('prompt') as response:
async for chunk in response.stream_output():
print(chunk, end='')
```
## Instructions vs System Prompts
```python
# Instructions: Concatenated, for agent behavior
agent = Agent(
'openai:gpt-4o',
instructions='You are a helpful assistant. Be concise.'
)
# Dynamic instructions via decorator
@agent.instructions
def add_context(ctx: RunContext[MyDeps]) -> str:
return f"User ID: {ctx.deps.user_id}"
# System prompts: Static, for model context
agent = Agent(
'openai:gpt-4o',
system_prompt=['You are an expert.', 'Always cite sources.']
)
```
## Common Patterns
### Parameterized Agent (Type-Safe)
```python
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext
@dataclass
class Deps:
api_key: str
user_id: int
agent: Agent[Deps, str] = Agent(
'openai:gpt-4o',
deps_type=Deps,
)
# deps is now required and type-checked
result = agent.run_sync('Hello', deps=Deps(api_key='...', user_id=123))
```
### No Dependencies (Satisfy Type Checker)
```python
# Option 1: Explicit type annotation
agent: Agent[None, str] = Agent('openai:gpt-4o')
# Option 2: Pass deps=None
result = agent.run_sync('Hello', deps=None)
```
## Verification gates
Run these in order before depending on an agent in production code:
1. **Smoke run** — Execute `agent.run_sync('Reply with OK.')` (or `await agent.run(...)` in async code). **Pass:** the call completes without raising and `result.output` is present.
2. **Structured output** — If you set `output_type`, prompt for a response that should satisfy the schema. **Pass:** `result.output` is an instance of your Pydantic model; repeated validation failures mean tightening instructions or `retries`, not adding features yet.
3. **Dependencies** — If you set `deps_type`, call `run` / `run_sync` with `deps=` of that type. **Pass:** the invocation type-checks and completes (or fails only for model/API reasons, not a missing or wrong `deps` value).
## Decision Framework
| Scenario | Configuration |
|----------|--------------|
| Simple text responses | `Agent(model)` |
| Structured data extraction | `Agent(model, output_type=MyModel)` |
| Need external services | Add `deps_type=MyDeps` |
| Validation retries needed | Increase `retries=3` |
| Debugging/monitoring | Set `instrument=True` |
Implements stateful agent graphs using LangGraph. Use when building graphs, adding nodes/edges, defining state schemas, implementing checkpointing, handling...
---
name: langgraph-implementation
description: Implements stateful agent graphs using LangGraph. Use when building graphs, adding nodes/edges, defining state schemas, implementing checkpointing, handling interrupts, or creating multi-agent systems with LangGraph.
---
# LangGraph Implementation
## Core Concepts
LangGraph builds stateful, multi-actor agent applications using a graph-based architecture:
- **StateGraph**: Builder class for defining graphs with shared state
- **Nodes**: Functions that read state and return partial updates
- **Edges**: Define execution flow (static or conditional)
- **Channels**: Internal state management (LastValue, BinaryOperatorAggregate)
- **Checkpointer**: Persistence for pause/resume capabilities
## Implementation gates
Use these **sequenced checks** for persistence and human-in-the-loop flows (avoid “it should work” without evidence):
1. **Checkpointed runs**
- Build `config` with `{"configurable": {"thread_id": "<stable-id>"}}` before `invoke` / `ainvoke`.
- **Pass:** The same `thread_id` is reused for every turn of one conversation; a new conversation uses a new id.
2. **State after a step**
- **Pass:** `graph.get_state(config).values` (or equivalent) contains the keys and reducer outputs your next node or client expects; if not, fix routing, reducers, or node order before continuing.
3. **Interrupt and resume (HITL)**
- **Pass:** After a pause, you have inspected pending work (`get_state`, and your LangGraph version’s interrupt listing if you rely on it) so you know **which** node is waiting and **what** resume payload shape to send.
- **Pass:** `Command(resume=...)` (or equivalent) includes every field the code path after `interrupt()` reads.
4. **Checkpointer vs environment**
- **Pass:** Tests or local dev use `InMemorySaver` or disposable SQLite; production uses a durable checkpointer configured for that deployment (not in-memory).
## Essential Imports
```python
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import MessagesState, add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Send, interrupt, RetryPolicy
from typing import Annotated
from typing_extensions import TypedDict
```
## State Schema Patterns
### Basic State with TypedDict
```python
import operator
class State(TypedDict):
counter: int # LastValue - stores last value
messages: Annotated[list, operator.add] # Reducer - appends lists
items: Annotated[list, lambda a, b: a + [b] if b else a] # Custom reducer
```
### MessagesState for Chat Applications
```python
from langgraph.graph.message import MessagesState
class State(MessagesState):
# Inherits: messages: Annotated[list[AnyMessage], add_messages]
user_id: str
context: dict
```
### Pydantic State (for validation)
```python
from pydantic import BaseModel
class State(BaseModel):
messages: Annotated[list, add_messages]
validated_field: str # Pydantic validates on assignment
```
## Building Graphs
### Basic Pattern
```python
builder = StateGraph(State)
# Add nodes - functions that take state, return partial updates
builder.add_node("process", process_fn)
builder.add_node("decide", decide_fn)
# Add edges
builder.add_edge(START, "process")
builder.add_edge("process", "decide")
builder.add_edge("decide", END)
# Compile
graph = builder.compile()
```
### Node Function Signature
```python
def my_node(state: State) -> dict:
"""Node receives full state, returns partial update."""
return {"counter": state["counter"] + 1}
# With config access
def my_node(state: State, config: RunnableConfig) -> dict:
thread_id = config["configurable"]["thread_id"]
return {"result": process(state, thread_id)}
# With Runtime context (v0.6+)
def my_node(state: State, runtime: Runtime[Context]) -> dict:
user_id = runtime.context.get("user_id")
return {"result": user_id}
```
### Conditional Edges
```python
from typing import Literal
def router(state: State) -> Literal["agent", "tools", "__end__"]:
last_msg = state["messages"][-1]
if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:
return "tools"
return END # or "__end__"
builder.add_conditional_edges("agent", router)
# With path_map for visualization
builder.add_conditional_edges(
"agent",
router,
path_map={"agent": "agent", "tools": "tools", "__end__": END}
)
```
### Command Pattern (Dynamic Routing + State Update)
```python
from langgraph.types import Command
def dynamic_node(state: State) -> Command[Literal["next", "__end__"]]:
if state["should_continue"]:
return Command(goto="next", update={"step": state["step"] + 1})
return Command(goto=END)
# Must declare destinations for visualization
builder.add_node("dynamic", dynamic_node, destinations=["next", END])
```
### Send Pattern (Fan-out/Map-Reduce)
```python
from langgraph.types import Send
def fan_out(state: State) -> list[Send]:
"""Route to multiple node instances with different inputs."""
return [Send("worker", {"item": item}) for item in state["items"]]
builder.add_conditional_edges(START, fan_out)
builder.add_edge("worker", "aggregate") # Workers converge
```
## Checkpointing
### Enable Persistence
```python
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver # Development
from langgraph.checkpoint.postgres import PostgresSaver # Production
# In-memory (testing only)
graph = builder.compile(checkpointer=InMemorySaver())
# SQLite (development)
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
# Thread-based invocation
config = {"configurable": {"thread_id": "user-123"}}
result = graph.invoke({"messages": [...]}, config)
```
### State Management
```python
# Get current state
state = graph.get_state(config)
# Get state history
for state in graph.get_state_history(config):
print(state.values, state.next)
# Update state manually
graph.update_state(config, {"key": "new_value"}, as_node="node_name")
```
## Human-in-the-Loop
### Using interrupt()
```python
from langgraph.types import interrupt, Command
def review_node(state: State) -> dict:
# Pause and surface value to client
human_input = interrupt({"question": "Please review", "data": state["draft"]})
return {"approved": human_input["approved"]}
# Resume with Command
graph.invoke(Command(resume={"approved": True}), config)
```
### Interrupt Before/After Nodes
```python
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["human_review"], # Pause before node
interrupt_after=["agent"], # Pause after node
)
# Check pending interrupts
state = graph.get_state(config)
if state.next: # Has pending nodes
# Resume
graph.invoke(None, config)
```
## Streaming
```python
# Stream modes: "values", "updates", "custom", "messages", "debug"
# Updates only (node outputs)
for chunk in graph.stream(input, stream_mode="updates"):
print(chunk) # {"node_name": {"key": "value"}}
# Full state after each step
for chunk in graph.stream(input, stream_mode="values"):
print(chunk)
# Multiple modes
for mode, chunk in graph.stream(input, stream_mode=["updates", "messages"]):
if mode == "messages":
print("Token:", chunk)
# Custom streaming from within nodes
from langgraph.config import get_stream_writer
def my_node(state):
writer = get_stream_writer()
writer({"progress": 0.5}) # Custom event
return {"result": "done"}
```
## Subgraphs
```python
# Define subgraph
sub_builder = StateGraph(SubState)
sub_builder.add_node("step", step_fn)
sub_builder.add_edge(START, "step")
subgraph = sub_builder.compile()
# Use as node in parent
parent_builder = StateGraph(ParentState)
parent_builder.add_node("subprocess", subgraph)
parent_builder.add_edge(START, "subprocess")
# Subgraph checkpointing
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent (default)
# checkpointer=True, # Use persistent checkpointing
# checkpointer=False, # Disable checkpointing
)
```
## Retry and Caching
```python
from langgraph.types import RetryPolicy, CachePolicy
retry = RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_attempts=3,
retry_on=ValueError, # Or callable: lambda e: isinstance(e, ValueError)
)
cache = CachePolicy(ttl=3600) # Cache for 1 hour
builder.add_node("risky", risky_fn, retry_policy=retry, cache_policy=cache)
```
## Prebuilt Components
### create_react_agent (moved to langchain.agents in v1.0)
```python
from langgraph.prebuilt import create_react_agent, ToolNode
# Simple agent
graph = create_react_agent(
model="anthropic:claude-3-5-sonnet",
tools=[my_tool],
prompt="You are a helpful assistant",
checkpointer=InMemorySaver(),
)
# Custom tool node
tool_node = ToolNode([tool1, tool2])
builder.add_node("tools", tool_node)
```
## Common Patterns
### Agent Loop
```python
def should_continue(state) -> Literal["tools", "__end__"]:
if state["messages"][-1].tool_calls:
return "tools"
return END
builder.add_node("agent", call_model)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
```
### Parallel Execution
```python
# Multiple nodes execute in parallel when they share the same trigger
builder.add_edge(START, "node_a")
builder.add_edge(START, "node_b") # Runs parallel with node_a
builder.add_edge(["node_a", "node_b"], "join") # Wait for both
```
See [PATTERNS.md](PATTERNS.md) for advanced patterns including multi-agent systems, hierarchical graphs, and complex workflows.
FILE:PATTERNS.md
# Advanced LangGraph Patterns
## Multi-Agent Supervisor Pattern
```python
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import Literal
class SupervisorState(TypedDict):
messages: Annotated[list, add_messages]
next_agent: str
def supervisor(state: SupervisorState) -> Command[Literal["researcher", "coder", "__end__"]]:
"""Route to appropriate agent based on task."""
# LLM decides which agent to use
decision = llm.invoke(state["messages"])
if "research" in decision.content.lower():
return Command(goto="researcher")
elif "code" in decision.content.lower():
return Command(goto="coder")
return Command(goto=END)
def researcher(state: SupervisorState) -> dict:
result = research_agent.invoke(state["messages"])
return {"messages": [result]}
def coder(state: SupervisorState) -> dict:
result = coding_agent.invoke(state["messages"])
return {"messages": [result]}
builder = StateGraph(SupervisorState)
builder.add_node("supervisor", supervisor, destinations=["researcher", "coder", END])
builder.add_node("researcher", researcher)
builder.add_node("coder", coder)
builder.add_edge(START, "supervisor")
builder.add_edge("researcher", "supervisor")
builder.add_edge("coder", "supervisor")
```
## Map-Reduce Pattern
```python
from langgraph.types import Send
class MapReduceState(TypedDict):
topics: list[str]
results: Annotated[list[str], operator.add] # Reducer aggregates
def distribute(state: MapReduceState) -> list[Send]:
"""Fan out to process each topic."""
return [Send("process_topic", {"topic": t}) for t in state["topics"]]
def process_topic(state: dict) -> dict:
"""Process individual topic (receives Send payload)."""
result = analyze(state["topic"])
return {"results": [result]}
def aggregate(state: MapReduceState) -> dict:
"""Combine all results."""
summary = summarize(state["results"])
return {"summary": summary}
builder = StateGraph(MapReduceState)
builder.add_conditional_edges(START, distribute)
builder.add_edge("process_topic", "aggregate")
builder.add_edge("aggregate", END)
```
## Hierarchical Graph Pattern
```python
# Inner graph - specialized task
class InnerState(TypedDict):
query: str
result: str
inner_builder = StateGraph(InnerState)
inner_builder.add_node("search", search_fn)
inner_builder.add_node("analyze", analyze_fn)
inner_builder.add_edge(START, "search")
inner_builder.add_edge("search", "analyze")
inner_builder.add_edge("analyze", END)
inner_graph = inner_builder.compile()
# Outer graph - orchestration
class OuterState(TypedDict):
messages: Annotated[list, add_messages]
research_result: str
def prepare_research(state: OuterState) -> dict:
"""Transform outer state for inner graph."""
return {"query": state["messages"][-1].content}
def process_result(state: OuterState) -> dict:
"""Handle result from inner graph."""
return {"messages": [AIMessage(content=state["research_result"])]}
outer_builder = StateGraph(OuterState)
outer_builder.add_node("prepare", prepare_research)
outer_builder.add_node("research", inner_graph) # Subgraph as node
outer_builder.add_node("process", process_result)
outer_builder.add_edge(START, "prepare")
outer_builder.add_edge("prepare", "research")
outer_builder.add_edge("research", "process")
outer_builder.add_edge("process", END)
```
## Reflection/Self-Correction Pattern
```python
class ReflectionState(TypedDict):
draft: str
feedback: str
revision_count: int
def generate(state: ReflectionState) -> dict:
if state.get("feedback"):
prompt = f"Revise based on: {state['feedback']}\n\nDraft: {state['draft']}"
else:
prompt = "Generate initial draft"
return {"draft": llm.invoke(prompt).content}
def reflect(state: ReflectionState) -> dict:
feedback = critic_llm.invoke(f"Critique this: {state['draft']}").content
return {"feedback": feedback, "revision_count": state.get("revision_count", 0) + 1}
def should_continue(state: ReflectionState) -> Literal["generate", "__end__"]:
if state["revision_count"] >= 3:
return END
if "looks good" in state["feedback"].lower():
return END
return "generate"
builder = StateGraph(ReflectionState)
builder.add_node("generate", generate)
builder.add_node("reflect", reflect)
builder.add_edge(START, "generate")
builder.add_edge("generate", "reflect")
builder.add_conditional_edges("reflect", should_continue)
```
## Plan-and-Execute Pattern
```python
class PlanExecuteState(TypedDict):
objective: str
plan: list[str]
completed_steps: Annotated[list[str], operator.add]
current_step: int
def planner(state: PlanExecuteState) -> dict:
plan = planning_llm.invoke(f"Create plan for: {state['objective']}")
steps = parse_steps(plan.content)
return {"plan": steps, "current_step": 0}
def executor(state: PlanExecuteState) -> dict:
step = state["plan"][state["current_step"]]
result = execute_step(step)
return {
"completed_steps": [f"{step}: {result}"],
"current_step": state["current_step"] + 1
}
def should_continue(state: PlanExecuteState) -> Literal["executor", "__end__"]:
if state["current_step"] >= len(state["plan"]):
return END
return "executor"
builder = StateGraph(PlanExecuteState)
builder.add_node("planner", planner)
builder.add_node("executor", executor)
builder.add_edge(START, "planner")
builder.add_edge("planner", "executor")
builder.add_conditional_edges("executor", should_continue)
```
## Human Approval Gate Pattern
```python
from langgraph.types import interrupt, Command
class ApprovalState(TypedDict):
action: str
approved: bool
result: str
def propose_action(state: ApprovalState) -> dict:
action = determine_action(state)
return {"action": action}
def human_review(state: ApprovalState) -> dict:
decision = interrupt({
"action": state["action"],
"message": "Please approve or reject this action"
})
return {"approved": decision.get("approved", False)}
def execute_action(state: ApprovalState) -> dict:
if state["approved"]:
result = execute(state["action"])
else:
result = "Action rejected by human"
return {"result": result}
def route_after_review(state: ApprovalState) -> Literal["execute", "__end__"]:
return "execute" if state["approved"] else END
builder = StateGraph(ApprovalState)
builder.add_node("propose", propose_action)
builder.add_node("review", human_review)
builder.add_node("execute", execute_action)
builder.add_edge(START, "propose")
builder.add_edge("propose", "review")
builder.add_conditional_edges("review", route_after_review)
builder.add_edge("execute", END)
graph = builder.compile(checkpointer=checkpointer)
# Usage
config = {"configurable": {"thread_id": "1"}}
result = graph.invoke({"action": ""}, config)
# Graph pauses at review node
# Resume with approval
graph.invoke(Command(resume={"approved": True}), config)
```
## Branching and Joining
```python
class BranchState(TypedDict):
input: str
branch_a_result: str
branch_b_result: str
final_result: str
builder = StateGraph(BranchState)
builder.add_node("branch_a", branch_a_fn)
builder.add_node("branch_b", branch_b_fn)
builder.add_node("join", join_fn)
# Fan out - both run in parallel
builder.add_edge(START, "branch_a")
builder.add_edge(START, "branch_b")
# Fan in - wait for both
builder.add_edge(["branch_a", "branch_b"], "join")
builder.add_edge("join", END)
```
## Looping with Counter
```python
class LoopState(TypedDict):
value: int
iterations: int
def increment(state: LoopState) -> dict:
return {
"value": state["value"] * 2,
"iterations": state["iterations"] + 1
}
def should_loop(state: LoopState) -> Literal["increment", "__end__"]:
if state["iterations"] >= 5:
return END
if state["value"] >= 1000:
return END
return "increment"
builder = StateGraph(LoopState)
builder.add_node("increment", increment)
builder.add_edge(START, "increment")
builder.add_conditional_edges("increment", should_loop)
```
## Error Recovery Pattern
```python
from langgraph.types import RetryPolicy
class ErrorRecoveryState(TypedDict):
input: str
result: str
error: str
attempts: int
def risky_operation(state: ErrorRecoveryState) -> dict:
try:
result = dangerous_api_call(state["input"])
return {"result": result, "error": ""}
except Exception as e:
return {"error": str(e), "attempts": state.get("attempts", 0) + 1}
def fallback(state: ErrorRecoveryState) -> dict:
return {"result": f"Fallback result for: {state['input']}"}
def route_after_operation(state: ErrorRecoveryState) -> Literal["fallback", "__end__"]:
if state["error"] and state["attempts"] >= 3:
return "fallback"
if state["error"]:
return "risky_operation" # Retry
return END
# With RetryPolicy for automatic retries
retry = RetryPolicy(max_attempts=3, retry_on=ConnectionError)
builder.add_node("risky_operation", risky_operation, retry_policy=retry)
```
Reviews LangGraph code for bugs, anti-patterns, and improvements. Use when reviewing code that uses StateGraph, nodes, edges, checkpointing, or other LangGra...
---
name: langgraph-code-review
description: Reviews LangGraph code for bugs, anti-patterns, and improvements. Use when reviewing code that uses StateGraph, nodes, edges, checkpointing, or other LangGraph features. Catches common mistakes in state management, graph structure, and async patterns.
---
# LangGraph Code Review
When reviewing LangGraph code, check for these categories of issues.
## Review gates (sequenced)
Complete in order. Each step has an objective pass condition before moving on.
1. **Locate graph code** — Search the review scope for `StateGraph`, `compile(`, `invoke`, `ainvoke`, `add_node`, `add_edge`, `add_conditional_edges`. **Pass:** a short list of file paths (or explicit “none in scope” after searching).
2. **Map state schema** — For each graph state type (`TypedDict`, `BaseModel`, etc.), list fields that hold lists, dicts, or messages and whether `Annotated` + reducers (`add_messages`, `operator.add`, …) are present. **Pass:** every such field is either covered by a reducer pattern below or explicitly flagged as intentional overwrite.
3. **Trace persistence** — If interrupts, `thread_id`, or checkpoint APIs appear, follow them to `compile(..., checkpointer=...)` and invocation `config`. **Pass:** behavior matches the interrupt/checkpointer/thread_id guidance below—or you document a concrete mismatch with file:line.
4. **Report with evidence** — For each finding you will deliver, record **file path and line number(s)** (or a minimal quoted snippet). **Pass:** no critical or high-severity issue is stated without that citation.
5. **Run the checklist** — Use the checklist at the end of this skill; each item is **satisfied**, **not applicable** (with reason), or **open** with evidence. **Pass:** no item left silently unchecked.
## Critical Issues
### 1. State Mutation Instead of Return
```python
# BAD - mutates state directly
def my_node(state: State) -> None:
state["messages"].append(new_message) # Mutation!
# GOOD - returns partial update
def my_node(state: State) -> dict:
return {"messages": [new_message]} # Let reducer handle it
```
### 2. Missing Reducer for List Fields
```python
# BAD - no reducer, each node overwrites
class State(TypedDict):
messages: list # Will be overwritten, not appended!
# GOOD - reducer appends
class State(TypedDict):
messages: Annotated[list, operator.add]
# Or use add_messages for chat:
messages: Annotated[list, add_messages]
```
### 3. Wrong Return Type from Conditional Edge
```python
# BAD - returns invalid node name
def router(state) -> str:
return "nonexistent_node" # Runtime error!
# GOOD - use Literal type hint for safety
def router(state) -> Literal["agent", "tools", "__end__"]:
if condition:
return "agent"
return END # Use constant, not string
```
### 4. Missing Checkpointer for Interrupts
```python
# BAD - interrupt without checkpointer
def my_node(state):
answer = interrupt("question") # Will fail!
return {"answer": answer}
graph = builder.compile() # No checkpointer!
# GOOD - checkpointer required for interrupts
graph = builder.compile(checkpointer=InMemorySaver())
```
### 5. Forgetting Thread ID with Checkpointer
```python
# BAD - no thread_id
graph.invoke({"messages": [...]}) # Error with checkpointer!
# GOOD - always provide thread_id
config = {"configurable": {"thread_id": "user-123"}}
graph.invoke({"messages": [...]}, config)
```
## State Schema Issues
### 6. Using add_messages Without Message Types
```python
# BAD - add_messages expects message-like objects
class State(TypedDict):
messages: Annotated[list, add_messages]
def node(state):
return {"messages": ["plain string"]} # May fail!
# GOOD - use proper message types or tuples
def node(state):
return {"messages": [("assistant", "response")]}
# Or: [AIMessage(content="response")]
```
### 7. Returning Full State Instead of Partial
```python
# BAD - returns entire state (may reset other fields)
def my_node(state: State) -> State:
return {
"counter": state["counter"] + 1,
"messages": state["messages"], # Unnecessary!
"other": state["other"] # Unnecessary!
}
# GOOD - return only changed fields
def my_node(state: State) -> dict:
return {"counter": state["counter"] + 1}
```
### 8. Pydantic State Without Annotations
```python
# BAD - Pydantic model without reducer loses append behavior
class State(BaseModel):
messages: list # No reducer!
# GOOD - use Annotated even with Pydantic
class State(BaseModel):
messages: Annotated[list, add_messages]
```
## Graph Structure Issues
### 9. Missing Entry Point
```python
# BAD - no edge from START
builder.add_node("process", process_fn)
builder.add_edge("process", END)
graph = builder.compile() # Error: no entrypoint!
# GOOD - connect START
builder.add_edge(START, "process")
```
### 10. Unreachable Nodes
```python
# BAD - orphan node
builder.add_node("main", main_fn)
builder.add_node("orphan", orphan_fn) # Never reached!
builder.add_edge(START, "main")
builder.add_edge("main", END)
# Check with visualization
print(graph.get_graph().draw_mermaid())
```
### 11. Conditional Edge Without All Paths
```python
# BAD - missing path in conditional
def router(state) -> Literal["a", "b", "c"]:
...
builder.add_conditional_edges("node", router, {"a": "a", "b": "b"})
# "c" path missing!
# GOOD - include all possible returns
builder.add_conditional_edges("node", router, {"a": "a", "b": "b", "c": "c"})
# Or omit path_map to use return values as node names
```
### 12. Command Without destinations
```python
# BAD - Command return without destinations (breaks visualization)
def dynamic(state) -> Command[Literal["next", "__end__"]]:
return Command(goto="next")
builder.add_node("dynamic", dynamic) # Graph viz won't show edges
# GOOD - declare destinations
builder.add_node("dynamic", dynamic, destinations=["next", END])
```
## Async Issues
### 13. Mixing Sync/Async Incorrectly
```python
# BAD - async node called with sync invoke
async def my_node(state):
result = await async_operation()
return {"result": result}
graph.invoke(input) # May not await properly!
# GOOD - use ainvoke for async graphs
await graph.ainvoke(input)
# Or provide both sync and async versions
```
### 14. Blocking Calls in Async Context
```python
# BAD - blocking call in async node
async def my_node(state):
result = requests.get(url) # Blocks event loop!
return {"result": result}
# GOOD - use async HTTP client
async def my_node(state):
async with httpx.AsyncClient() as client:
result = await client.get(url)
return {"result": result}
```
## Tool Integration Issues
### 15. Tool Calls Without Corresponding ToolMessage
```python
# BAD - AI message with tool_calls but no tool execution
messages = [
HumanMessage(content="search for X"),
AIMessage(content="", tool_calls=[{"id": "1", "name": "search", ...}])
# Missing ToolMessage! Next LLM call will fail
]
# GOOD - always pair tool_calls with ToolMessage
messages = [
HumanMessage(content="search for X"),
AIMessage(content="", tool_calls=[{"id": "1", "name": "search", ...}]),
ToolMessage(content="results", tool_call_id="1")
]
```
### 16. Parallel Tool Calls Before Interrupt
```python
# BAD - model may call multiple tools including interrupt
model = ChatOpenAI().bind_tools([interrupt_tool, other_tool])
# If both called in parallel, interrupt behavior is undefined
# GOOD - disable parallel tool calls before interrupt
model = ChatOpenAI().bind_tools(
[interrupt_tool, other_tool],
parallel_tool_calls=False
)
```
## Checkpointing Issues
### 17. InMemorySaver in Production
```python
# BAD - in-memory checkpointer loses state on restart
graph = builder.compile(checkpointer=InMemorySaver()) # Testing only!
# GOOD - use persistent storage in production
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(conn_string)
graph = builder.compile(checkpointer=checkpointer)
```
### 18. Subgraph Checkpointer Confusion
```python
# BAD - subgraph with explicit False prevents persistence
subgraph = sub_builder.compile(checkpointer=False)
# GOOD - use None to inherit parent's checkpointer
subgraph = sub_builder.compile(checkpointer=None) # Inherits from parent
# Or True for independent checkpointing
subgraph = sub_builder.compile(checkpointer=True)
```
## Performance Issues
### 19. Large State in Every Update
```python
# BAD - returning large data in every node
def node(state):
large_data = fetch_large_data()
return {"large_field": large_data} # Checkpointed every step!
# GOOD - use references or store
from langgraph.store.memory import InMemoryStore
def node(state, *, store: BaseStore):
store.put(namespace, key, large_data)
return {"data_ref": f"{namespace}/{key}"}
```
### 20. Missing Recursion Limit Handling
```python
# BAD - no protection against infinite loops
def router(state):
return "agent" # Always loops!
# GOOD - check remaining steps or use RemainingSteps
from langgraph.managed import RemainingSteps
class State(TypedDict):
messages: Annotated[list, add_messages]
remaining_steps: RemainingSteps
def check_limit(state):
if state["remaining_steps"] < 2:
return END
return "continue"
```
## Code Review Checklist
1. [ ] State schema uses Annotated with reducers for collections
2. [ ] Nodes return partial state updates, not mutations
3. [ ] Conditional edges return valid node names or END
4. [ ] Graph has path from START to all nodes
5. [ ] Checkpointer provided if using interrupts
6. [ ] Thread ID provided in config when using checkpointer
7. [ ] Tool calls paired with ToolMessages
8. [ ] Async nodes use async operations
9. [ ] Production uses persistent checkpointer
10. [ ] Recursion limits considered for loops
Guides architectural decisions for LangGraph applications. Use when deciding between LangGraph vs alternatives, choosing state management strategies, designi...
---
name: langgraph-architecture
description: Guides architectural decisions for LangGraph applications. Use when deciding between LangGraph vs alternatives, choosing state management strategies, designing multi-agent systems, or selecting persistence and streaming approaches.
---
# LangGraph Architecture Decisions
## When to Use LangGraph
### Use LangGraph When You Need:
- **Stateful conversations** - Multi-turn interactions with memory
- **Human-in-the-loop** - Approval gates, corrections, interventions
- **Complex control flow** - Loops, branches, conditional routing
- **Multi-agent coordination** - Multiple LLMs working together
- **Persistence** - Resume from checkpoints, time travel debugging
- **Streaming** - Real-time token streaming, progress updates
- **Reliability** - Retries, error recovery, durability guarantees
### Consider Alternatives When:
| Scenario | Alternative | Why |
|----------|-------------|-----|
| Single LLM call | Direct API call | Overhead not justified |
| Linear pipeline | LangChain LCEL | Simpler abstraction |
| Stateless tool use | Function calling | No persistence needed |
| Simple RAG | LangChain retrievers | Built-in patterns |
| Batch processing | Async tasks | Different execution model |
## State Schema Decisions
### TypedDict vs Pydantic
| TypedDict | Pydantic |
|-----------|----------|
| Lightweight, faster | Runtime validation |
| Dict-like access | Attribute access |
| No validation overhead | Type coercion |
| Simpler serialization | Complex nested models |
**Recommendation**: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
### Reducer Selection
| Use Case | Reducer | Example |
|----------|---------|---------|
| Chat messages | `add_messages` | Handles IDs, RemoveMessage |
| Simple append | `operator.add` | `Annotated[list, operator.add]` |
| Keep latest | None (LastValue) | `field: str` |
| Custom merge | Lambda | `Annotated[list, lambda a, b: ...]` |
| Overwrite list | `Overwrite` | Bypass reducer |
### State Size Considerations
```python
# SMALL STATE (< 1MB) - Put in state
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
# LARGE DATA - Use Store
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # Reference to store
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# Process without bloating checkpoints
```
## Graph Structure Decisions
### Single Graph vs Subgraphs
**Single Graph** when:
- All nodes share the same state schema
- Simple linear or branching flow
- < 10 nodes
**Subgraphs** when:
- Different state schemas needed
- Reusable components across graphs
- Team separation of concerns
- Complex hierarchical workflows
### Conditional Edges vs Command
| Conditional Edges | Command |
|------------------|---------|
| Routing based on state | Routing + state update |
| Separate router function | Decision in node |
| Clearer visualization | More flexible |
| Standard patterns | Dynamic destinations |
```python
# Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
# Command - when combining routing with updates
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})
```
### Static vs Dynamic Routing
**Static Edges** (`add_edge`):
- Fixed flow known at build time
- Clearer graph visualization
- Easier to reason about
**Dynamic Routing** (`add_conditional_edges`, `Command`, `Send`):
- Runtime decisions based on state
- Agent-driven navigation
- Fan-out patterns
## Persistence Strategy
### Checkpointer Selection
| Checkpointer | Use Case | Characteristics |
|--------------|----------|-----------------|
| `InMemorySaver` | Testing only | Lost on restart |
| `SqliteSaver` | Development | Single file, local |
| `PostgresSaver` | Production | Scalable, concurrent |
| Custom | Special needs | Implement BaseCheckpointSaver |
### Checkpointing Scope
```python
# Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
# Subgraph options
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent
checkpointer=True, # Independent checkpointing
checkpointer=False, # No checkpointing (runs atomically)
)
```
### When to Disable Checkpointing
- Short-lived subgraphs that should be atomic
- Subgraphs with incompatible state schemas
- Performance-critical paths without need for resume
## Multi-Agent Architecture
### Supervisor Pattern
Best for:
- Clear hierarchy
- Centralized decision making
- Different agent specializations
```
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘
```
### Peer-to-Peer Pattern
Best for:
- Collaborative agents
- No clear hierarchy
- Flexible communication
```
┌──────┐ ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘ └──────┘
```
### Handoff Pattern
Best for:
- Sequential specialization
- Clear stage transitions
- Different capabilities per stage
```
┌────────┐ ┌────────┐ ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘ └────────┘ └────────┘
```
## Streaming Strategy
### Stream Mode Selection
| Mode | Use Case | Data |
|------|----------|------|
| `updates` | UI updates | Node outputs only |
| `values` | State inspection | Full state each step |
| `messages` | Chat UX | LLM tokens |
| `custom` | Progress/logs | Your data via StreamWriter |
| `debug` | Debugging | Tasks + checkpoints |
### Subgraph Streaming
```python
# Stream from subgraphs
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # Include subgraph events
):
namespace, data = chunk # namespace indicates depth
```
## Human-in-the-Loop Design
### Interrupt Placement
| Strategy | Use Case |
|----------|----------|
| `interrupt_before` | Approval before action |
| `interrupt_after` | Review after completion |
| `interrupt()` in node | Dynamic, contextual pauses |
### Resume Patterns
```python
# Simple resume (same thread)
graph.invoke(None, config)
# Resume with value
graph.invoke(Command(resume="approved"), config)
# Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
# Modify state and resume
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)
```
## Gates (sequenced)
Complete **in order** before treating a LangGraph design as locked in. Each step has an objective **pass condition** (artifact or explicit “none”), not an honor-system “we considered it.”
1. **Alternatives** — **Pass:** For the workload, either (a) at least one row from [Consider Alternatives When](#consider-alternatives-when) was evaluated and rejected with a one-line reason, or (b) the use case clearly matches [Use LangGraph When You Need](#use-langgraph-when-you-need) and does not fit a “consider alternative” row.
2. **State contract** — **Pass:** Every state field has an assigned reducer (or default/LastValue) documented in the same place as the schema; large payloads are references or Store-backed, not inlined blobs (see [State Size Considerations](#state-size-considerations)).
3. **Checkpointer** — **Pass:** The saver type is chosen for the target environment per [Checkpointer Selection](#checkpointer-selection) (e.g. production is not `InMemorySaver` unless explicitly test-only).
4. **Loops and flaky nodes** — **Pass:** `recursion_limit` (or equivalent) is set for any graph that can cycle; per-node `RetryPolicy` or a documented “no retries” choice exists for external calls (see [Retry Configuration](#retry-configuration)).
## Error Handling Strategy
### Retry Configuration
```python
# Per-node retry
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
# Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])
```
### Fallback Patterns
```python
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)
# Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "__end__"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return END
```
## Scaling Considerations
### Horizontal Scaling
- Use PostgresSaver for shared state
- Consider LangGraph Platform for managed infrastructure
- Use stores for large data outside checkpoints
### Performance Optimization
1. **Minimize state size** - Use references for large data
2. **Parallel nodes** - Fan out when possible
3. **Cache expensive operations** - Use CachePolicy
4. **Async everywhere** - Use ainvoke, astream
### Resource Limits
```python
# Set recursion limit
config = {"recursion_limit": 50}
graph.invoke(input, config)
# Track remaining steps in state
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"
```
## Decision Checklist
After [Gates (sequenced)](#gates-sequenced), before implementing:
1. [ ] Is LangGraph the right tool? (vs simpler alternatives)
2. [ ] State schema defined with appropriate reducers?
3. [ ] Persistence strategy chosen? (dev vs prod checkpointer)
4. [ ] Streaming needs identified?
5. [ ] Human-in-the-loop points defined?
6. [ ] Error handling and retry strategy?
7. [ ] Multi-agent coordination pattern? (if applicable)
8. [ ] Resource limits configured?
Implements agents using Deep Agents. Use when building agents with create_deep_agent, configuring backends, defining subagents, adding middleware, or setting...
---
name: deepagents-implementation
description: Implements agents using Deep Agents. Use when building agents with create_deep_agent, configuring backends, defining subagents, adding middleware, or setting up human-in-the-loop workflows.
---
# Deep Agents Implementation
## Core Concepts
Deep Agents provides a batteries-included agent harness built on LangGraph:
- **`create_deep_agent`**: Factory function that creates a configured agent
- **Middleware**: Injected capabilities (filesystem, todos, subagents, summarization)
- **Backends**: Pluggable file storage (state, filesystem, store, composite)
- **Subagents**: Isolated task execution via the `task` tool
The agent returned is a compiled LangGraph `StateGraph`, compatible with streaming, checkpointing, and LangGraph Studio.
## Essential Imports
```python
# Core
from deepagents import create_deep_agent
# Subagents
from deepagents import CompiledSubAgent
# Backends
from deepagents.backends import (
StateBackend, # Ephemeral (default)
FilesystemBackend, # Real disk
StoreBackend, # Persistent cross-thread
CompositeBackend, # Route paths to backends
)
# LangGraph (for checkpointing, store, streaming)
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.memory import InMemoryStore
# LangChain (for custom models, tools)
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
```
## Basic Usage
### Minimal Agent
```python
from deepagents import create_deep_agent
# Uses Claude Sonnet 4 by default
agent = create_deep_agent()
result = agent.invoke({"messages": [{"role": "user", "content": "Hello!"}]})
```
### With Custom Tools
```python
from langchain_core.tools import tool
from deepagents import create_deep_agent
@tool
def web_search(query: str) -> str:
"""Search the web for information."""
return tavily_client.search(query)
agent = create_deep_agent(
tools=[web_search],
system_prompt="You are a research assistant. Search the web to answer questions.",
)
result = agent.invoke({"messages": [{"role": "user", "content": "What is LangGraph?"}]})
```
### With Custom Model
```python
from langchain.chat_models import init_chat_model
from deepagents import create_deep_agent
# OpenAI
model = init_chat_model("openai:gpt-4o")
# Or Anthropic with custom settings
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model_name="claude-sonnet-4-5-20250929", max_tokens=8192)
agent = create_deep_agent(model=model)
```
### With Checkpointing (Persistence)
```python
from langgraph.checkpoint.memory import InMemorySaver
from deepagents import create_deep_agent
agent = create_deep_agent(checkpointer=InMemorySaver())
# Must provide thread_id with checkpointer
config = {"configurable": {"thread_id": "user-123"}}
result = agent.invoke({"messages": [...]}, config)
# Resume conversation
result = agent.invoke({"messages": [{"role": "user", "content": "Follow up"}]}, config)
```
## Streaming
The agent supports all LangGraph stream modes.
### Stream Updates
```python
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Write a report"}]},
stream_mode="updates"
):
print(chunk) # {"node_name": {"key": "value"}}
```
### Stream Messages (Token-by-Token)
```python
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Explain quantum computing"}]},
stream_mode="messages"
):
# Real-time token streaming
print(chunk.content, end="", flush=True)
```
### Async Streaming
```python
async for chunk in agent.astream(
{"messages": [...]},
stream_mode="updates"
):
print(chunk)
```
### Multiple Stream Modes
```python
for mode, chunk in agent.stream(
{"messages": [...]},
stream_mode=["updates", "messages"]
):
if mode == "messages":
print("Token:", chunk.content)
else:
print("Update:", chunk)
```
## Backend Configuration
### StateBackend (Default - Ephemeral)
Files stored in agent state, persist within thread only.
```python
# Implicit - this is the default
agent = create_deep_agent()
# Explicit
from deepagents.backends import StateBackend
agent = create_deep_agent(backend=lambda rt: StateBackend(rt))
```
### FilesystemBackend (Real Disk)
Read/write actual files on disk. Enables `execute` tool for shell commands.
```python
from deepagents.backends import FilesystemBackend
agent = create_deep_agent(
backend=FilesystemBackend(root_dir="/path/to/project"),
)
```
### StoreBackend (Persistent Cross-Thread)
Uses LangGraph Store for persistence across conversations.
```python
from langgraph.store.memory import InMemoryStore
from deepagents.backends import StoreBackend
store = InMemoryStore()
agent = create_deep_agent(
backend=lambda rt: StoreBackend(rt),
store=store, # Required for StoreBackend
)
```
### CompositeBackend (Hybrid Routing)
Route different paths to different backends.
```python
from langgraph.store.memory import InMemoryStore
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
store = InMemoryStore()
agent = create_deep_agent(
backend=CompositeBackend(
default=StateBackend(), # /workspace/* → ephemeral
routes={
"/memories/": StoreBackend(store=store), # persistent
"/preferences/": StoreBackend(store=store), # persistent
},
),
store=store,
)
# Files under /memories/ persist across all conversations
# Files under /workspace/ are ephemeral per-thread
```
## Subagents
### Using the Default General-Purpose Agent
By default, a `general-purpose` subagent is available with all main agent tools.
```python
agent = create_deep_agent(tools=[web_search])
# The agent can now delegate via the `task` tool:
# task(subagent_type="general-purpose", prompt="Research topic X in depth")
```
### Defining Custom Subagents
```python
from deepagents import create_deep_agent
research_agent = {
"name": "researcher",
"description": "Conducts deep research on complex topics with web search",
"system_prompt": """You are an expert researcher.
Search thoroughly, cross-reference sources, and synthesize findings.""",
"tools": [web_search, document_reader],
}
code_agent = {
"name": "coder",
"description": "Writes, reviews, and debugs code",
"system_prompt": "You are an expert programmer. Write clean, tested code.",
"tools": [code_executor, linter],
"model": "openai:gpt-4o", # Optional: different model per subagent
}
agent = create_deep_agent(
subagents=[research_agent, code_agent],
system_prompt="Delegate research to the researcher and coding to the coder.",
)
```
### Pre-compiled LangGraph Subagents
Use existing LangGraph graphs as subagents.
```python
from deepagents import CompiledSubAgent, create_deep_agent
from langgraph.prebuilt import create_react_agent
# Existing graph
custom_graph = create_react_agent(
model="anthropic:claude-sonnet-4-5-20250929",
tools=[specialized_tool],
prompt="Custom workflow instructions",
)
agent = create_deep_agent(
subagents=[CompiledSubAgent(
name="custom-workflow",
description="Runs my specialized analysis workflow",
runnable=custom_graph,
)]
)
```
### Subagent with Custom Middleware
```python
from langchain.agents.middleware import AgentMiddleware
class LoggingMiddleware(AgentMiddleware):
def transform_response(self, response):
print(f"Subagent response: {response}")
return response
agent_spec = {
"name": "logged-agent",
"description": "Agent with extra logging",
"system_prompt": "You are helpful.",
"tools": [],
"middleware": [LoggingMiddleware()], # Added after default middleware
}
```
## Human-in-the-Loop
### Basic Interrupt Configuration
Pause execution before specific tools for human approval.
```python
from deepagents import create_deep_agent
agent = create_deep_agent(
tools=[send_email, delete_file, web_search],
interrupt_on={
"send_email": True, # Simple interrupt
"delete_file": True, # Require approval before delete
# web_search not listed - runs without approval
},
checkpointer=checkpointer, # Required for interrupts
)
```
### Interrupt with Options
```python
agent = create_deep_agent(
tools=[send_email],
interrupt_on={
"send_email": {
"allowed_decisions": ["approve", "edit", "reject"]
},
},
checkpointer=checkpointer,
)
# Invoke - will pause at send_email
config = {"configurable": {"thread_id": "user-123"}}
result = agent.invoke({"messages": [...]}, config)
# Check state
state = agent.get_state(config)
if state.next: # Has pending interrupt
# Resume with approval
from langgraph.types import Command
agent.invoke(Command(resume={"approved": True}), config)
# Or resume with edit
agent.invoke(Command(resume={"edited_args": {"to": "[email protected]"}}), config)
# Or reject
agent.invoke(Command(resume={"rejected": True}), config)
```
### Interrupt on Subagent Tools
```python
# Interrupts apply to subagents too
agent = create_deep_agent(
subagents=[research_agent],
interrupt_on={
"web_search": True, # Interrupt even when subagent calls it
},
checkpointer=checkpointer,
)
```
## Custom Middleware
### Middleware Structure
```python
from langchain.agents.middleware.types import (
AgentMiddleware,
ModelRequest,
ModelResponse,
)
from langchain_core.tools import tool
class MyMiddleware(AgentMiddleware):
# Tools to inject
tools = []
# System prompt content to inject
system_prompt = ""
def transform_request(self, request: ModelRequest) -> ModelRequest:
"""Modify request before sending to model."""
return request
def transform_response(self, response: ModelResponse) -> ModelResponse:
"""Modify response after receiving from model."""
return response
```
### Injecting Tools via Middleware
```python
from langchain_core.tools import tool
@tool
def get_current_time() -> str:
"""Get the current time."""
from datetime import datetime
return datetime.now().isoformat()
class TimeMiddleware(AgentMiddleware):
tools = [get_current_time]
system_prompt = "You have access to get_current_time for time-sensitive tasks."
agent = create_deep_agent(middleware=[TimeMiddleware()])
```
### Context Injection Middleware
```python
class UserContextMiddleware(AgentMiddleware):
def __init__(self, user_preferences: dict):
self.user_preferences = user_preferences
@property
def system_prompt(self):
return f"User preferences: {self.user_preferences}"
agent = create_deep_agent(
middleware=[UserContextMiddleware({"theme": "dark", "language": "en"})]
)
```
### Response Logging Middleware
```python
import logging
class LoggingMiddleware(AgentMiddleware):
def transform_response(self, response: ModelResponse) -> ModelResponse:
logging.info(f"Agent response: {response.messages[-1].content[:100]}...")
return response
agent = create_deep_agent(middleware=[LoggingMiddleware()])
```
## MCP Tool Integration
Connect MCP (Model Context Protocol) servers to provide additional tools.
```python
from langchain_mcp_adapters.client import MultiServerMCPClient
from deepagents import create_deep_agent
async def main():
mcp_client = MultiServerMCPClient({
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/path"],
},
"github": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {"GITHUB_TOKEN": os.environ["GITHUB_TOKEN"]},
},
})
mcp_tools = await mcp_client.get_tools()
agent = create_deep_agent(tools=mcp_tools)
async for chunk in agent.astream(
{"messages": [{"role": "user", "content": "List my repos"}]}
):
print(chunk)
```
## Implementation gates
Use this **sequenced** table for setups that touch **real disk, human interrupts, persistence, or MCP subprocesses** (skip for minimal `create_deep_agent()` smoke tests).
| Step | Pass condition |
|------|----------------|
| 1 | **FilesystemBackend or `execute`:** `root_dir` is deliberately scoped (not an accidental home or filesystem root); smoke-test in a disposable directory before trusting production paths. |
| 2 | **`interrupt_on` / resume:** A `checkpointer` is configured; every `invoke` / `astream` that may interrupt includes `config` with `configurable["thread_id"]`; after a pause, `agent.get_state(config)` shows pending interrupt state before `Command(resume=...)`, and the resume payload matches tool options (e.g. `allowed_decisions`) when set. |
| 3 | **Store, PostgreSQL saver, MCP:** Credentials come from environment or a secret manager, not committed source; MCP `command` / `args` / required `env` keys match what the deployment host actually provides (e.g. `npx`, tokens). |
## Additional References
For detailed reference documentation, see:
- **[Built-in Tools Reference](references/tools.md)** - Complete list of tools available on every agent (filesystem, task management, subagent delegation) with path requirements
- **[Common Patterns](references/patterns.md)** - Production-ready examples including research agents with memory, code assistants with disk access, multi-specialist teams, and production PostgreSQL setup
FILE:references/patterns.md
# Common Patterns
## Research Agent with Memory
```python
from langgraph.store.memory import InMemoryStore
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
store = InMemoryStore()
agent = create_deep_agent(
tools=[web_search],
system_prompt="""You are a research assistant with persistent memory.
Save important findings to /memories/ for future reference.
Check /memories/ at the start of research tasks.""",
backend=CompositeBackend(
default=StateBackend(),
routes={"/memories/": StoreBackend(store=store)},
),
store=store,
checkpointer=checkpointer,
)
```
## Code Assistant with Disk Access
```python
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
agent = create_deep_agent(
system_prompt="You are a coding assistant. Help users with their codebase.",
backend=FilesystemBackend(root_dir="/Users/dev/project"),
)
# Agent can now read/write real files and execute shell commands
```
## Multi-Specialist Team
```python
agent = create_deep_agent(
subagents=[
{
"name": "researcher",
"description": "Deep research and fact-finding",
"system_prompt": "Research thoroughly, cite sources.",
"tools": [web_search],
},
{
"name": "writer",
"description": "Write polished content",
"system_prompt": "Write clear, engaging content.",
"tools": [],
},
{
"name": "reviewer",
"description": "Review and critique content",
"system_prompt": "Provide constructive feedback.",
"tools": [],
},
],
system_prompt="""Coordinate the team:
1. Use researcher for facts
2. Use writer to draft
3. Use reviewer to polish""",
)
```
## Production Setup
```python
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
# Production persistence
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
store = PostgresStore.from_conn_string(DATABASE_URL)
agent = create_deep_agent(
model="anthropic:claude-sonnet-4-5-20250929",
tools=[...],
backend=CompositeBackend(
default=StateBackend(),
routes={"/memories/": StoreBackend(store=store)},
),
checkpointer=checkpointer,
store=store,
)
```
FILE:references/tools.md
# Built-in Tools Reference
Every agent created with `create_deep_agent` has these tools:
## Task Management
| Tool | Description |
|------|-------------|
| `write_todos` | Create/update structured task lists |
| `read_todos` | Read current todo list state |
## Filesystem Operations
| Tool | Description |
|------|-------------|
| `ls` | List directory contents (requires absolute path) |
| `read_file` | Read file with optional offset/limit pagination |
| `write_file` | Create new file (fails if exists) |
| `edit_file` | String replacement in existing files |
| `glob` | Find files matching pattern (e.g., `**/*.py`) |
| `grep` | Search for text patterns in files |
| `execute`* | Run shell commands |
*`execute` only available with `FilesystemBackend` or backends implementing `SandboxBackendProtocol`.
## Subagent Delegation
| Tool | Description |
|------|-------------|
| `task` | Launch subagent for isolated task execution |
## Tool Path Requirements
All filesystem tools require absolute paths starting with `/`:
```python
# Correct
read_file(path="/workspace/main.py")
ls(path="/data")
# Incorrect - will fail
read_file(path="main.py")
read_file(path="./workspace/main.py")
```
Reviews Deep Agents code for bugs, anti-patterns, and improvements. Use when reviewing code that uses create_deep_agent, backends, subagents, middleware, or...
---
name: deepagents-code-review
description: Reviews Deep Agents code for bugs, anti-patterns, and improvements. Use when reviewing code that uses create_deep_agent, backends, subagents, middleware, or human-in-the-loop patterns. Catches common configuration and usage mistakes.
---
# Deep Agents Code Review
When reviewing Deep Agents code, check for these categories of issues.
## Review gates (evidence-bound)
Run these steps in order before and while you write findings. Skipping a step is a failed review.
1. **Locate** — Enumerate call sites in scope (`create_deep_agent`, `CompiledSubAgent`, `CompositeBackend`, custom `backend=`, `interrupt_on`, `checkpointer`, `store`). **Pass:** You list each relevant **file path** and **line number** (or a grep/search result that proves where the code lives).
2. **Anchor** — For each suspected issue, tie it to **quoted or line-referenced code** from those files, not to imports or names alone. **Pass:** Every finding includes **evidence** (`path:line` plus a short quote or “absent parameter” note showing the gap).
3. **Classify** — Map each anchored issue to one category below (Critical → Performance) and a severity. **Pass:** The category label matches what the cited code actually does or omits.
4. **Runtime claims** — If you say something will error, fail at runtime, or leak data, **Pass:** The cited snippet shows the exact API combo (e.g. `interrupt_on` set with no `checkpointer` in the same construction path), or you state **uncertain** and what would confirm it.
If you cannot satisfy step 1, stop and say what file or search is missing instead of inferring issues from memory.
## Critical Issues
### 1. Missing Checkpointer with interrupt_on
```python
# BAD - interrupt_on without checkpointer
agent = create_deep_agent(
tools=[send_email],
interrupt_on={"send_email": True},
# No checkpointer! Interrupts will fail
)
# GOOD - checkpointer required for interrupts
from langgraph.checkpoint.memory import InMemorySaver
agent = create_deep_agent(
tools=[send_email],
interrupt_on={"send_email": True},
checkpointer=InMemorySaver(),
)
```
### 2. Missing Store with StoreBackend
```python
# BAD - StoreBackend without store
from deepagents.backends import StoreBackend
agent = create_deep_agent(
backend=lambda rt: StoreBackend(rt),
# No store! Will raise ValueError at runtime
)
# GOOD - provide store
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
agent = create_deep_agent(
backend=lambda rt: StoreBackend(rt),
store=store,
)
```
### 3. Missing thread_id with Checkpointer
```python
# BAD - no thread_id when using checkpointer
agent = create_deep_agent(checkpointer=InMemorySaver())
agent.invoke({"messages": [...]}) # Error!
# GOOD - always provide thread_id
config = {"configurable": {"thread_id": "user-123"}}
agent.invoke({"messages": [...]}, config)
```
### 4. Relative Paths in Filesystem Tools
```python
# BAD - relative paths not supported
read_file(path="src/main.py")
read_file(path="./config.json")
# GOOD - absolute paths required
read_file(path="/workspace/src/main.py")
read_file(path="/config.json")
```
### 5. Windows Paths in Virtual Filesystem
```python
# BAD - Windows paths rejected
read_file(path="C:\\Users\\file.txt")
write_file(path="D:/projects/code.py", content="...")
# GOOD - Unix-style virtual paths
read_file(path="/workspace/file.txt")
write_file(path="/projects/code.py", content="...")
```
## Backend Issues
### 6. StateBackend Expecting Persistence
```python
# BAD - expecting files to persist across threads
agent = create_deep_agent() # Uses StateBackend by default
# Thread 1
agent.invoke({"messages": [...]}, {"configurable": {"thread_id": "a"}})
# Agent writes to /data/report.txt
# Thread 2 - file won't exist!
agent.invoke({"messages": [...]}, {"configurable": {"thread_id": "b"}})
# Agent tries to read /data/report.txt - NOT FOUND
# GOOD - use StoreBackend or CompositeBackend for cross-thread persistence
agent = create_deep_agent(
backend=CompositeBackend(
default=StateBackend(),
routes={"/data/": StoreBackend(store=store)},
),
store=store,
)
```
### 7. FilesystemBackend Without root_dir Restriction
```python
# BAD - unrestricted filesystem access
agent = create_deep_agent(
backend=FilesystemBackend(root_dir="/"), # Full system access!
)
# GOOD - scope to project directory
agent = create_deep_agent(
backend=FilesystemBackend(root_dir="/home/user/project"),
)
```
### 8. CompositeBackend Route Order Confusion
```python
# BAD - shorter prefix shadows longer prefix
agent = create_deep_agent(
backend=CompositeBackend(
default=StateBackend(),
routes={
"/mem/": backend_a, # This catches /mem/long-term/ too!
"/mem/long-term/": backend_b, # Never reached
},
),
)
# GOOD - CompositeBackend sorts by length automatically
# But be explicit about your intent:
agent = create_deep_agent(
backend=CompositeBackend(
default=StateBackend(),
routes={
"/memories/": persistent_backend,
"/workspace/": ephemeral_backend,
},
),
)
```
### 9. Expecting execute Tool Without SandboxBackend
```python
# BAD - execute tool won't work with StateBackend
agent = create_deep_agent() # Default StateBackend
# Agent calls execute("ls -la") → Error: not supported
# GOOD - use FilesystemBackend for shell execution
agent = create_deep_agent(
backend=FilesystemBackend(root_dir="/project"),
)
# Agent calls execute("ls -la") → Works
```
## Subagent Issues
### 10. Subagent Missing Required Fields
```python
# BAD - missing required fields
agent = create_deep_agent(
subagents=[{
"name": "helper",
# Missing: description, system_prompt, tools
}]
)
# GOOD - all required fields present
agent = create_deep_agent(
subagents=[{
"name": "helper",
"description": "General helper for misc tasks",
"system_prompt": "You are a helpful assistant.",
"tools": [], # Can be empty but must be present
}]
)
```
### 11. Subagent Name Collision
```python
# BAD - duplicate subagent names
agent = create_deep_agent(
subagents=[
{"name": "research", "description": "A", ...},
{"name": "research", "description": "B", ...}, # Collision!
]
)
# GOOD - unique names
agent = create_deep_agent(
subagents=[
{"name": "web-research", "description": "Web-based research", ...},
{"name": "doc-research", "description": "Document research", ...},
]
)
```
### 12. Overusing Subagents for Simple Tasks
```python
# BAD - subagent overhead for trivial task
# In system prompt or agent behavior:
"Use the task tool to check the current time"
"Delegate file reading to a subagent"
# GOOD - use subagents for complex, isolated work
"Use the task tool for multi-step research that requires many searches"
"Delegate the full analysis workflow to a subagent"
```
### 13. CompiledSubAgent Without Proper State
```python
# BAD - subgraph with incompatible state schema
from langgraph.graph import StateGraph
class CustomState(TypedDict):
custom_field: str # No messages field!
sub_builder = StateGraph(CustomState)
# ... build graph
subgraph = sub_builder.compile()
agent = create_deep_agent(
subagents=[CompiledSubAgent(
name="custom",
description="Custom workflow",
runnable=subgraph, # State mismatch!
)]
)
# GOOD - ensure compatible state or use message-based interface
class CompatibleState(TypedDict):
messages: Annotated[list, add_messages]
custom_field: str
```
## Middleware Issues
### 14. Middleware Order Misunderstanding
```python
# BAD - expecting custom middleware to run first
class PreProcessMiddleware(AgentMiddleware):
def transform_request(self, request):
# Expecting this runs before built-in middleware
return request
agent = create_deep_agent(middleware=[PreProcessMiddleware()])
# Actually runs AFTER TodoList, Filesystem, SubAgent, etc.
# GOOD - understand middleware runs after built-in stack
# Built-in order:
# 1. TodoListMiddleware
# 2. FilesystemMiddleware
# 3. SubAgentMiddleware
# 4. SummarizationMiddleware
# 5. AnthropicPromptCachingMiddleware
# 6. PatchToolCallsMiddleware
# 7. YOUR MIDDLEWARE HERE
# 8. HumanInTheLoopMiddleware (if interrupt_on set)
```
### 15. Middleware Mutating Request/Response
```python
# BAD - mutating instead of returning new object
class BadMiddleware(AgentMiddleware):
def transform_request(self, request):
request.messages.append(extra_message) # Mutation!
return request
# GOOD - return modified copy
class GoodMiddleware(AgentMiddleware):
def transform_request(self, request):
return ModelRequest(
messages=[*request.messages, extra_message],
**other_fields
)
```
### 16. Middleware Tools Without Descriptions
```python
# BAD - tool without docstring
@tool
def my_tool(arg: str) -> str:
return process(arg)
class MyMiddleware(AgentMiddleware):
tools = [my_tool] # LLM won't know how to use it!
# GOOD - descriptive docstring
@tool
def my_tool(arg: str) -> str:
"""Process the input string and return formatted result.
Args:
arg: The string to process
Returns:
Formatted result string
"""
return process(arg)
```
## System Prompt Issues
### 17. Duplicating Built-in Tool Instructions
```python
# BAD - re-explaining what middleware already covers
agent = create_deep_agent(
system_prompt="""You have access to these tools:
- write_todos: Create task lists
- read_file: Read files from the filesystem
- task: Delegate to subagents
When using files, always use absolute paths..."""
)
# This duplicates what FilesystemMiddleware and TodoListMiddleware inject!
# GOOD - focus on domain-specific guidance
agent = create_deep_agent(
system_prompt="""You are a code review assistant.
Workflow:
1. Read the files to review
2. Create a todo list of issues found
3. Delegate deep analysis to subagents if needed
4. Compile findings into a report"""
)
```
### 18. Contradicting Built-in Instructions
```python
# BAD - contradicting default behavior
agent = create_deep_agent(
system_prompt="""Never use the task tool.
Always process everything in the main thread.
Don't use todos, just remember everything."""
)
# Fighting against the framework!
# GOOD - work with the framework
agent = create_deep_agent(
system_prompt="""For simple tasks, handle directly.
For complex multi-step research, use subagents.
Track progress with todos for tasks with 3+ steps."""
)
```
### 19. Missing Stopping Criteria
```python
# BAD - no guidance on when to stop
agent = create_deep_agent(
system_prompt="Research everything about the topic thoroughly."
)
# Agent may run indefinitely!
# GOOD - define completion criteria
agent = create_deep_agent(
system_prompt="""Research the topic with these constraints:
- Maximum 5 web searches
- Stop when you have 3 reliable sources
- Limit subagent delegations to 2 parallel tasks
- Summarize findings within 500 words"""
)
```
## Performance Issues
### 20. Not Parallelizing Independent Subagents
```python
# BAD - sequential subagent calls (in agent behavior)
# Agent calls: task(research topic A) → wait → task(research topic B) → wait
# GOOD - parallel subagent calls
# Agent calls in single turn:
# task(research topic A)
# task(research topic B)
# task(research topic C)
# All run concurrently!
# Guide via system prompt:
agent = create_deep_agent(
system_prompt="""When researching multiple topics,
launch all research subagents in parallel in a single response."""
)
```
### 21. Large Files in State
```python
# BAD - writing large files to StateBackend
# Agent writes 10MB log file to /output/full_log.txt
# This bloats every checkpoint!
# GOOD - use FilesystemBackend for large files or paginate
agent = create_deep_agent(
backend=CompositeBackend(
default=StateBackend(), # Small files
routes={
"/large_files/": FilesystemBackend(root_dir="/tmp/agent"),
},
),
)
```
### 22. InMemorySaver in Production
```python
# BAD - ephemeral checkpointer in production
agent = create_deep_agent(
checkpointer=InMemorySaver(), # Lost on restart!
)
# GOOD - persistent checkpointer
from langgraph.checkpoint.postgres import PostgresSaver
agent = create_deep_agent(
checkpointer=PostgresSaver.from_conn_string(DATABASE_URL),
)
```
### 23. Missing Recursion Awareness
```python
# BAD - no guard against long-running loops
agent = create_deep_agent(
system_prompt="Keep improving the solution until it's perfect."
)
# May hit recursion limit (default 1000)
# GOOD - explicit iteration limits
agent = create_deep_agent(
system_prompt="""Improve the solution iteratively:
- Maximum 3 revision cycles
- Stop if quality score > 90%
- Stop if no improvement after 2 iterations"""
)
```
## Code Review Checklist
See [`references/checklist.md`](references/checklist.md) for the full per-area checklist (Configuration, Backends, Subagents, Middleware, System Prompt, Performance). Run it after the **Review gates** and the numbered issue catalogue above.
FILE:references/checklist.md
# Code Review Checklist
Use after working through the **Review gates** and the numbered issue catalogue in `../SKILL.md`. Each box ties back to a specific issue (see the `SKILL.md` anchors).
## Configuration
- [ ] Checkpointer provided if using `interrupt_on`
- [ ] Store provided if using `StoreBackend`
- [ ] Thread ID provided in config when using checkpointer
- [ ] Backend appropriate for use case (ephemeral vs persistent)
## Backends
- [ ] FilesystemBackend scoped to safe `root_dir`
- [ ] StoreBackend has corresponding `store` parameter
- [ ] CompositeBackend routes don't shadow each other unintentionally
- [ ] Not expecting persistence from StateBackend across threads
## Subagents
- [ ] All required fields present (name, description, system_prompt, tools)
- [ ] Unique subagent names
- [ ] CompiledSubAgent has compatible state schema
- [ ] Subagents used for complex tasks, not trivial operations
## Middleware
- [ ] Custom middleware added after built-in stack (expected behavior)
- [ ] Tools have descriptive docstrings
- [ ] Not mutating request/response objects
## System Prompt
- [ ] Not duplicating built-in tool instructions
- [ ] Not contradicting framework defaults
- [ ] Stopping criteria defined for open-ended tasks
- [ ] Parallelization guidance for independent tasks
## Performance
- [ ] Large files routed to appropriate backend
- [ ] Production uses persistent checkpointer
- [ ] Recursion/iteration limits considered
- [ ] Independent subagents parallelized
Guides architectural decisions for Deep Agents applications. Use when deciding between Deep Agents vs alternatives, choosing backend strategies, designing su...
---
name: deepagents-architecture
description: Guides architectural decisions for Deep Agents applications. Use when deciding between Deep Agents vs alternatives, choosing backend strategies, designing subagent systems, or selecting middleware approaches.
---
# Deep Agents Architecture Decisions
## When to Use Deep Agents
### Use Deep Agents When You Need:
- **Long-horizon tasks** - Complex workflows spanning dozens of tool calls
- **Planning capabilities** - Task decomposition before execution
- **Filesystem operations** - Reading, writing, and editing files
- **Subagent delegation** - Isolated task execution with separate context windows
- **Persistent memory** - Long-term storage across conversations
- **Human-in-the-loop** - Approval gates for sensitive operations
- **Context management** - Auto-summarization for long conversations
### Consider Alternatives When:
| Scenario | Alternative | Why |
|----------|-------------|-----|
| Single LLM call | Direct API call | Deep Agents overhead not justified |
| Simple RAG pipeline | LangChain LCEL | Simpler abstraction |
| Custom graph control flow | LangGraph directly | More flexibility |
| No file operations needed | `create_react_agent` | Lighter weight |
| Stateless tool use | Function calling | No middleware needed |
## Backend Selection
### Backend Comparison
| Backend | Persistence | Use Case | Requires |
|---------|-------------|----------|----------|
| `StateBackend` | Ephemeral (per-thread) | Working files, temp data | Nothing (default) |
| `FilesystemBackend` | Disk | Local development, real files | `root_dir` path |
| `StoreBackend` | Cross-thread | User preferences, knowledge bases | LangGraph `store` |
| `CompositeBackend` | Mixed | Hybrid memory patterns | Multiple backends |
### Backend Decision Tree
```
Need real disk access?
├─ Yes → FilesystemBackend(root_dir="/path")
└─ No
└─ Need persistence across conversations?
├─ Yes → Need mixed ephemeral + persistent?
│ ├─ Yes → CompositeBackend
│ └─ No → StoreBackend
└─ No → StateBackend (default)
```
### CompositeBackend Routing
Route different paths to different storage backends:
```python
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
agent = create_deep_agent(
backend=CompositeBackend(
default=StateBackend(), # Working files (ephemeral)
routes={
"/memories/": StoreBackend(store=store), # Persistent
"/preferences/": StoreBackend(store=store), # Persistent
},
),
)
```
## Subagent Architecture
### When to Use Subagents
**Use subagents when:**
- Task is complex, multi-step, and can run independently
- Task requires heavy context that would bloat the main thread
- Multiple independent tasks can run in parallel
- You need isolated execution (sandboxing)
- You only care about the final result, not intermediate steps
**Don't use subagents when:**
- Task is trivial (few tool calls)
- You need to see intermediate reasoning
- Splitting adds latency without benefit
- Task depends on main thread state mid-execution
### Subagent Patterns
#### Pattern 1: Parallel Research
```
┌─────────────┐
│ Orchestrator│
└──────┬──────┘
┌──────────┼──────────┐
▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐
│Task A│ │Task B│ │Task C│
└──┬───┘ └──┬───┘ └──┬───┘
└──────────┼──────────┘
▼
┌─────────────┐
│ Synthesize │
└─────────────┘
```
Best for: Research on multiple topics, parallel analysis, batch processing.
#### Pattern 2: Specialized Agents
```python
research_agent = {
"name": "researcher",
"description": "Deep research on complex topics",
"system_prompt": "You are an expert researcher...",
"tools": [web_search, document_reader],
}
coder_agent = {
"name": "coder",
"description": "Write and review code",
"system_prompt": "You are an expert programmer...",
"tools": [code_executor, linter],
}
agent = create_deep_agent(subagents=[research_agent, coder_agent])
```
Best for: Domain-specific expertise, different tool sets per task type.
#### Pattern 3: Pre-compiled Subagents
```python
from deepagents import CompiledSubAgent, create_deep_agent
# Use existing LangGraph graph as subagent
custom_graph = create_react_agent(model=..., tools=...)
agent = create_deep_agent(
subagents=[CompiledSubAgent(
name="custom-workflow",
description="Runs specialized workflow",
runnable=custom_graph
)]
)
```
Best for: Reusing existing LangGraph graphs, complex custom workflows.
## Middleware Architecture
### Built-in Middleware Stack
Deep Agents applies middleware in this order:
1. **TodoListMiddleware** - Task planning with `write_todos`/`read_todos`
2. **FilesystemMiddleware** - File ops: `ls`, `read_file`, `write_file`, `edit_file`, `glob`, `grep`, `execute`
3. **SubAgentMiddleware** - Delegation via `task` tool
4. **SummarizationMiddleware** - Auto-summarizes at ~85% context or 170k tokens
5. **AnthropicPromptCachingMiddleware** - Caches system prompts (Anthropic only)
6. **PatchToolCallsMiddleware** - Fixes dangling tool calls from interruptions
7. **HumanInTheLoopMiddleware** - Pauses for approval (if `interrupt_on` configured)
### Custom Middleware Placement
```python
from langchain.agents.middleware import AgentMiddleware
class MyMiddleware(AgentMiddleware):
tools = [my_custom_tool]
def transform_request(self, request):
# Modify system prompt, inject context
return request
def transform_response(self, response):
# Post-process, log, filter
return response
# Custom middleware added AFTER built-in stack
agent = create_deep_agent(middleware=[MyMiddleware()])
```
### Middleware vs Tools Decision
| Need | Use Middleware | Use Tools |
|------|----------------|-----------|
| Inject system prompt content | ✅ | ❌ |
| Add tools dynamically | ✅ | ❌ |
| Transform requests/responses | ✅ | ❌ |
| Standalone capability | ❌ | ✅ |
| User-invokable action | ❌ | ✅ |
### Subagent Middleware Inheritance
Subagents receive their own middleware stack by default:
- TodoListMiddleware
- FilesystemMiddleware (shared backend)
- SummarizationMiddleware
- AnthropicPromptCachingMiddleware
- PatchToolCallsMiddleware
Override with `default_middleware=[]` in SubAgentMiddleware or per-subagent `middleware` key.
## Gates: architecture decisions before implementation
Complete **in order**. A step **passes** only when the stated artifact exists in the design note, ADR stub, or ticket; internal intent alone does not count.
1. **Fit** - Confirm Deep Agents vs alternatives (see tables above).
- **Pass:** Short written rationale that either names one matching "Use Deep Agents When You Need" bullet **or** one "Consider Alternatives" row plus the chosen alternative.
2. **Backend** - Match the Backend Decision Tree to a concrete choice.
- **Pass:** Backend name(s) from the Backend Comparison table; if `FilesystemBackend` or `CompositeBackend`, `root_dir` and any route prefixes are written down (path placeholders OK).
3. **Subagents** - Decide delegation boundaries.
- **Pass:** Either "no subagents" plus one sentence why **or** a named list where each subagent maps to at least one "When to Use Subagents" reason; parallel plans state what merges outputs.
4. **Human-in-the-loop** - Approval surface.
- **Pass:** Explicit list of tools/operations that use `interrupt_on`, **or** "no HITL" plus one-line risk acceptance.
5. **Middleware** - Custom vs built-in only.
- **Pass:** Either "custom middleware: none" **or** each custom piece named, placed after the built-in stack, and tied to prompt injection, tools, or request/response transforms.
6. **Context** - Long threads and large inputs.
- **Pass:** Stated plan for default summarization behavior (~85% context / ~170k tokens) or an alternative cap; large files handled via references/chunking or equivalent, named in text.
7. **Checkpointing** - Resume and durability.
- **Pass:** Checkpoint/checkpointer approach named for the graph **or** "none" with one-line rationale (e.g. ephemeral demo only).
Reviews tokio async runtime usage for task management, sync primitives, channel patterns, and runtime configuration. Covers Rust 2024 edition changes includi...
---
name: tokio-async-code-review
description: Reviews tokio async runtime usage for task management, sync primitives, channel patterns, and runtime configuration. Covers Rust 2024 edition changes including async fn in traits, RPIT lifetime capture, LazyLock, and if-let temporary scoping. Use when reviewing Rust code that uses tokio, async/await patterns, spawn, channels, or async synchronization. Also covers tokio-util, tower, and hyper integration patterns.
---
# Tokio Async Code Review
## Review Workflow
1. **Check Cargo.toml** — Note tokio feature flags (`full`, `rt-multi-thread`, `macros`, `sync`, etc.). Missing features cause confusing compile errors.
2. **Check runtime setup** — Is `#[tokio::main]` or manual runtime construction used? Multi-thread vs current-thread?
3. **Scan for blocking** — Search for `std::fs`, `std::net`, `std::thread::sleep`, CPU-heavy loops in async functions.
4. **Check channel usage** — Match channel type to communication pattern (mpsc, broadcast, oneshot, watch).
5. **Check sync primitives** — Verify correct mutex type, proper guard lifetimes, no deadlock potential.
## Gates (objective passes before conclusions)
Complete in order for the review scope. Do not assert **Critical** or **Major** until the relevant gate passes.
1. **Dependency surface** — Read the crate (and workspace, if inherited) `Cargo.toml` that supplies `tokio`. **Pass:** Written note of `tokio` version and enabled features, or explicit statement that there is no direct `tokio` dependency and where it comes from (workspace/path).
2. **Runtime model** — Locate runtime construction (`#[tokio::main]`, `Runtime::builder`, tests, or library with no owned runtime). **Pass:** One line naming flavor (`multi_thread` / `current_thread` / tests-only / none) and where it is defined.
3. **Blocking inventory** — Search reviewed paths for blocking APIs (`std::fs::`, `std::net::` without async wrappers, `std::thread::sleep`, heavy CPU loops in `async fn`). **Pass:** Each hit listed as `path:line` (or tool output excerpt), or explicit “no blocking patterns found in reviewed async code” after the search.
4. **Protocol** — Load `beagle-rust:review-verification-protocol`. **Pass:** Its pass conditions met before any finding is reported (file:line evidence for asserted issues).
## Output Format
Report findings as:
```text
[FILE:LINE] ISSUE_TITLE
Severity: Critical | Major | Minor | Informational
Description of the issue and why it matters.
```
## Quick Reference
| Issue Type | Reference |
|------------|-----------|
| Task spawning, JoinHandle, structured concurrency | [references/task-management.md](references/task-management.md) |
| Mutex, RwLock, Semaphore, Notify, Barrier | [references/sync-primitives.md](references/sync-primitives.md) |
| mpsc, broadcast, oneshot, watch channel patterns | [references/channels.md](references/channels.md) |
| Pin, cancellation, Future internals, select!, blocking bridge | [references/pinning-cancellation.md](references/pinning-cancellation.md) |
## Review Checklist
### Runtime Configuration
- [ ] Tokio features in Cargo.toml match actual usage
- [ ] Runtime flavor matches workload (`multi_thread` for I/O-bound, `current_thread` for simpler cases)
- [ ] `#[tokio::test]` used for async tests (not manual runtime construction)
- [ ] Worker thread count configured appropriately for production
### Task Management
- [ ] `spawn` return values (`JoinHandle`) are tracked, not silently dropped
- [ ] `spawn_blocking` used for CPU-heavy or synchronous I/O operations
- [ ] Tasks respect cancellation (via `CancellationToken`, `select!`, or shutdown channels)
- [ ] `JoinError` (task panic or cancellation) is handled, not just unwrapped
- [ ] `tokio::select!` branches are cancellation-safe
- [ ] Native `async fn` in traits used instead of `async-trait` crate where possible (stable since Rust 1.75)
- [ ] RPIT lifetime capture reviewed in async contexts — `-> impl Future` now captures all in-scope lifetimes in edition 2024
### Sync Primitives
- [ ] `tokio::sync::Mutex` used when lock is held across `.await`; `std::sync::Mutex` for short non-async sections
- [ ] No mutex guard held across await points (deadlock risk)
- [ ] `Semaphore` used for limiting concurrent operations (not ad-hoc counters)
- [ ] `RwLock` used when read-heavy workload (many readers, infrequent writes)
- [ ] `Notify` used for simple signaling (not channel overhead)
- [ ] `std::sync::LazyLock` used instead of `once_cell::sync::Lazy` or `lazy_static!` for runtime-initialized singletons (stable since Rust 1.80)
- [ ] `if let` lock guard patterns reviewed for edition 2024 temporary scoping — temporaries drop earlier, may change borrow validity
### Channels
- [ ] Channel type matches pattern: mpsc for back-pressure, broadcast for fan-out, oneshot for request-response, watch for latest-value
- [ ] Bounded channels have appropriate capacity (not too small = deadlock, not too large = memory)
- [ ] `SendError` / `RecvError` handled (indicates other side dropped)
- [ ] Broadcast `Lagged` errors handled (receiver fell behind)
- [ ] Channel senders dropped when done to signal completion to receivers
### Timer and Sleep
- [ ] `tokio::time::sleep` used instead of `std::thread::sleep`
- [ ] `tokio::time::timeout` wraps operations that could hang
- [ ] `tokio::time::interval` used correctly (`.tick().await` for periodic work)
## Severity Calibration
### Critical
- Blocking I/O (`std::fs::read`, `std::net::TcpStream`) in async context without `spawn_blocking`
- Mutex guard held across `.await` point (deadlock potential)
- `std::thread::sleep` in async function (blocks runtime thread)
- Unbounded channel where back-pressure is needed (OOM risk)
### Major
- `JoinHandle` silently dropped (lost errors, zombie tasks)
- Missing `select!` cancellation safety consideration
- Wrong mutex type (std vs tokio) for the use case
- Missing timeout on network/external operations
### Minor
- `tokio::spawn` for trivially small async blocks (overhead > benefit)
- Overly large channel buffer without justification
- Manual runtime construction where `#[tokio::main]` suffices
- `std::sync::Mutex` where contention is high enough to benefit from tokio's async mutex
### Informational
- Suggestions to use `tokio-util` utilities (e.g., `CancellationToken`)
- Tower middleware patterns for service composition
- Structured concurrency with `JoinSet`
- Migration from `async-trait` crate to native `async fn` in traits
- Migration from `once_cell` / `lazy_static` to `std::sync::LazyLock`
- Using `#[expect(lint)]` instead of `#[allow(lint)]` for self-cleaning suppression
## Valid Patterns (Do NOT Flag)
- **`std::sync::Mutex` for short critical sections** — tokio docs recommend this when no `.await` is inside the lock
- **`tokio::spawn` without explicit join** — Valid for background tasks with proper shutdown signaling
- **Unbuffered channel capacity of 1** — Valid for synchronization barriers
- **`#[tokio::main(flavor = "current_thread")]` in simple binaries** — Not every app needs multi-thread runtime
- **`clone()` on `Arc<T>` before `spawn`** — Required for moving into tasks, not unnecessary cloning
- **Large broadcast channel capacity** — Valid when lagged errors are expensive (event sourcing)
- **Native `async fn` in traits without `async-trait`** — Stable since 1.75; the crate is still valid for `dyn` dispatch cases
- **`+ use<'a>` on `-> impl Future` returns** — Correct edition 2024 precise capture syntax to limit lifetime capture
- **`#[expect(clippy::type_complexity)]` on complex async types** — Self-cleaning alternative to `#[allow]`, warns when suppression is no longer needed
## Before Submitting Findings
After **Gates**, apply `beagle-rust:review-verification-protocol` to every reported issue (evidence and dispositions per that skill).
FILE:references/channels.md
# Channels
## Choosing the Right Channel
| Pattern | Channel | Key Trait |
|---------|---------|-----------|
| Many producers → one consumer, back-pressure | `mpsc` | Bounded, async send blocks when full |
| One value, one time | `oneshot` | Request-response, task result |
| Every consumer gets every message | `broadcast` | Fan-out, event bus |
| Latest value, no queue | `watch` | Config changes, state snapshots |
## mpsc (Multi-Producer Single-Consumer)
The most common channel. Use bounded for back-pressure, unbounded only when you have external flow control.
```rust
// Bounded - preferred, provides back-pressure
let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(100);
// Unbounded - use with caution (OOM risk)
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
```
### Capacity Sizing
- Too small (1-10): senders block frequently, throughput suffers
- Too large (100K+): memory pressure, defeats back-pressure purpose
- Rule of thumb: 2-4x the expected burst size
### Graceful Shutdown via Drop
When all senders are dropped, `rx.recv()` returns `None`. This is the idiomatic way to signal "no more items."
```rust
// Producer side
drop(tx); // signals completion
// Consumer side
while let Some(item) = rx.recv().await {
process(item);
}
// Loop exits when all senders dropped
```
### Common Mistakes
```rust
// BAD - holding tx clone prevents shutdown
let tx_clone = tx.clone();
drop(tx);
// rx.recv() will never return None because tx_clone still exists
// BAD - send without handling closed channel
tx.send(item).await.unwrap(); // panics if receiver dropped
// GOOD - handle send errors
if tx.send(item).await.is_err() {
tracing::warn!("receiver dropped, stopping producer");
break;
}
```
## broadcast
Every active subscriber receives every message. Messages are stored in a shared ring buffer.
```rust
let (tx, _rx) = tokio::sync::broadcast::channel::<Event>(16_384);
// Each subscriber gets their own receiver
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
```
### Handling Lag
When a receiver falls behind, older messages are overwritten. The receiver gets `RecvError::Lagged(n)` indicating how many messages were missed.
```rust
loop {
match rx.recv().await {
Ok(event) => handle(event),
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(missed = n, "receiver lagged, some events lost");
// Continue processing — data may be stale
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
```
### Capacity Considerations
Broadcast stores messages until the slowest receiver consumes them (up to capacity). Size the buffer for the slowest expected consumer, not the average case.
## oneshot
Single value, single use. The sender can only send once, and the receiver can only receive once.
```rust
let (tx, rx) = tokio::sync::oneshot::channel::<Result<Response, Error>>();
// Responder
tokio::spawn(async move {
let result = compute().await;
let _ = tx.send(result); // receiver may have been dropped
});
// Requester
match rx.await {
Ok(result) => handle(result),
Err(_) => tracing::error!("responder dropped without sending"),
}
```
### Common Pattern: Request-Response
```rust
struct Request {
data: InputData,
reply: oneshot::Sender<Result<OutputData, Error>>,
}
// Client
let (tx, rx) = oneshot::channel();
request_tx.send(Request { data, reply: tx }).await?;
let response = rx.await??;
// Server
while let Some(req) = request_rx.recv().await {
let result = process(req.data).await;
let _ = req.reply.send(result);
}
```
## watch
Holds the latest value. Receivers see only the most recent value, not a queue. Good for config changes or state snapshots.
```rust
let (tx, rx) = tokio::sync::watch::channel(Config::default());
// Update
tx.send(new_config)?;
// Read latest (non-blocking)
let current = rx.borrow().clone();
// Wait for changes
let mut rx = rx.clone();
loop {
rx.changed().await?;
let config = rx.borrow().clone();
apply_config(config);
}
```
## Dual-Channel Pattern (Event Bus)
For systems that need both real-time fan-out and durable persistence, combine broadcast (real-time) with mpsc (persistence):
```rust
struct EventBus {
broadcast_tx: broadcast::Sender<Arc<Event>>,
persist_tx: mpsc::Sender<Arc<Event>>,
}
impl EventBus {
async fn emit(&self, event: Event) {
let event = Arc::new(event);
// Fan-out to all subscribers (best-effort)
let _ = self.broadcast_tx.send(Arc::clone(&event));
// Durable events go to persistence channel (back-pressure aware)
if event.event_type.is_durable() {
if let Err(e) = self.persist_tx.send(event).await {
tracing::error!(error = %e, "persistence channel closed");
}
}
}
}
```
## Review Questions
1. Is the channel type matched to the communication pattern?
2. Are bounded channels sized appropriately for the workload?
3. Are `SendError` / `RecvError` handled (not unwrapped)?
4. Is broadcast `Lagged` error handled gracefully?
5. Are all sender clones dropped to allow clean shutdown?
6. Is `watch` used instead of broadcast for latest-value-only patterns?
FILE:references/pinning-cancellation.md
# Pinning, Cancellation, and Async Internals
## Pin<P<T>> Semantics
`Pin<P>` wraps a pointer type `P` (e.g., `&mut T`, `Box<T>`) and guarantees the target `T` will not move after being pinned. Required for self-referential types like async state machines, where internal references would be invalidated by a move.
- `Pin::new_unchecked()` is unsafe -- caller must guarantee the referent won't move
- `get_unchecked_mut()` is unsafe -- caller must not move `T` through the returned `&mut T`
- `Pin` always implements `Deref<Target = T>` safely (shared refs can't move `T`)
- **When required**: Futures from `async fn`/blocks (self-referential across `.await` points), any struct storing data and pointers into that data
### Unpin Trait
`Unpin` is an auto-trait indicating a type is safe to move out of a `Pin`. Most standard types are `Unpin`. Compiler-generated futures from `async` blocks are `!Unpin`.
```rust
// Unpin types can use the safe Pin::new constructor
let mut fut = ready(42);
let pinned = Pin::new(&mut fut); // safe, ready() is Unpin
// !Unpin types require heap or stack pinning
let fut = async { do_work().await };
let pinned = Box::pin(fut); // heap pinning, always safe
```
## Stack vs Heap Pinning
**`Box::pin(value)`** (heap): Always safe. Allocates on the heap, so the `Pin` can move freely without moving `T`.
**`std::pin::pin!(value)`** (stack, stable since 1.68): Avoids heap allocation. Pins the value to the current stack frame via variable shadowing.
```rust
use std::pin::pin;
// GOOD - stack pinning with std::pin::pin! (stable 1.68+)
let fut = pin!(async { long_running().await });
fut.await;
// GOOD - heap pinning when you need to store or move the pinned future
let fut = Box::pin(async { long_running().await });
tokio::spawn(fut);
```
Flag when: `pin_mut!` from `pin-utils` or `futures` crate is used instead of `std::pin::pin!` on Rust 1.68+.
## Future Trait Internals
The actual `Future` trait requires pinning and a waker context:
```rust
trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
```
- **`Poll::Pending`**: Future cannot make progress. Must arrange for `cx.waker().wake()` to be called when progress is possible.
- **`Poll::Ready(T)`**: Future has resolved. Do not poll again (may panic).
- **Waker contract**: If `poll` returns `Pending`, the future must ensure `wake()` is called eventually. Leaf futures store the waker where the event source can trigger it.
## Executor Model
Executors manage tasks (top-level futures) and decide which to poll when wakers fire.
- **Work-stealing (tokio multi-thread)**: Multiple threads share a task queue. Idle threads steal work from busy ones. Good for I/O-bound workloads with many tasks.
- **Single-threaded (tokio current_thread)**: One thread polls all tasks. No `Send` requirement on futures. Simpler, lower overhead, but no parallelism.
Check for: Blocking operations in async context. A future that runs >1ms without yielding `Pending` starves other tasks on the same executor thread.
## Stream Trait (Async Iteration)
`Stream` (from `futures` crate) is the async equivalent of `Iterator`, with `poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>`.
**Use `Stream` when**: Producing a sequence of values from a single source (file chunks, database rows, transformed events).
**Use channels when**: Multiple producers need to send to one consumer, or you need back-pressure across task boundaries.
Valid pattern: `tokio_stream::StreamExt` for combinators like `.map()`, `.filter()`, `.throttle()` on streams.
## Cancellation Patterns
### Drop as Cancellation
Dropping a future cancels it. This is fundamental to how `tokio::select!` works: unselected branches are dropped.
```rust
// Dropping the JoinHandle does NOT cancel the task (it detaches)
let handle = tokio::spawn(work());
drop(handle); // task keeps running!
// To cancel a spawned task, call abort()
let handle = tokio::spawn(work());
handle.abort(); // task is cancelled
```
### CancellationToken + Graceful Shutdown
Hierarchical cancellation for coordinated shutdown. Child tokens cancel when parents do. Combine with `JoinSet` for clean drain.
```rust
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let mut set = JoinSet::new();
for worker in workers {
let child = token.child_token();
set.spawn(async move {
tokio::select! {
_ = child.cancelled() => worker.drain().await,
_ = worker.run() => {}
}
});
}
// On SIGTERM: cancel parent, then drain all tasks
token.cancel();
while let Some(result) = set.join_next().await {
result.expect("worker panicked");
}
```
## select! Deep Semantics
- **Branch priority**: When multiple branches are ready simultaneously, `tokio::select!` picks one randomly by default. Use `biased;` to evaluate top-to-bottom.
- **Cancellation safety**: Unselected branches are dropped. A future is cancellation-safe if dropping it at any `.await` point doesn't lose data.
```rust
// RISKY - read_exact buffers internally, partial reads lost on cancel
tokio::select! {
result = reader.read_exact(&mut buf) => { ... }
_ = token.cancelled() => { return; }
}
// SAFER - use cancellation-safe recv()
tokio::select! {
msg = rx.recv() => { ... }
_ = token.cancelled() => { return; }
}
```
Flag when: `read_exact`, `read_to_end`, or custom buffering futures appear in `select!` branches without cancellation safety analysis.
## Blocking Bridge
| Situation | Use | Why |
|-----------|-----|-----|
| CPU-heavy or sync I/O from async context | `spawn_blocking` | Runs on dedicated blocking thread pool, won't starve async workers |
| Already on a tokio runtime thread, need to block briefly | `block_in_place` | Converts current thread to blocking temporarily, avoids extra thread spawn |
| Need to run async code from sync context | `Handle::block_on` | Blocks current thread until the future completes |
```rust
// spawn_blocking: preferred for most blocking work
let hash = tokio::task::spawn_blocking(move || {
compute_hash(&data)
}).await?;
// block_in_place: only on multi-thread runtime, avoids thread pool queue
tokio::task::block_in_place(|| {
std::fs::write(path, &data)?;
Ok::<_, std::io::Error>(())
})?;
```
Flag when: `block_in_place` is used on `current_thread` runtime (it will panic).
## Memory Ordering in Async Context
Async task scheduling provides happens-before relationships:
- `tokio::spawn` establishes happens-before between the spawning code and the spawned task's first poll
- `.await` on a `JoinHandle` establishes happens-before between the task's completion and the awaiting code
- Waker mechanics ensure proper ordering between `wake()` calls and subsequent `poll()`
When using atomics across async tasks, `Ordering::Relaxed` is usually sufficient for simple counters and flags, because task scheduling already provides synchronization. Use `Acquire`/`Release` only when you need ordering guarantees beyond what the runtime provides (e.g., custom lock-free structures shared across tasks).
## Review Questions
1. Are `!Unpin` futures pinned correctly before polling (via `Box::pin` or `std::pin::pin!`)?
2. Are futures held across `.await` points reviewed for size (large futures = excessive memcpy)?
3. Is cancellation handled explicitly (CancellationToken, abort, or select) for long-running tasks?
4. Are `select!` branches cancellation-safe, or is data loss possible on cancel?
5. Is `spawn_blocking` used for blocking work instead of running it directly in async context?
6. Is `block_in_place` avoided on `current_thread` runtime?
7. Are dropped `JoinHandle`s intentional (detached tasks) or accidental (lost errors)?
FILE:references/sync-primitives.md
# Sync Primitives
## Choosing the Right Primitive
| Need | Primitive | Notes |
|------|-----------|-------|
| Exclusive access to data | `Mutex<T>` | `tokio::sync` if held across await; `std::sync` for short sections |
| Read-heavy, write-rare access | `RwLock<T>` | Multiple concurrent readers, exclusive writers |
| Limit concurrent operations | `Semaphore` | Rate limiting, connection pooling |
| Signal one waiter | `Notify` | Lightweight, no data transfer |
| Signal all waiters | `Notify` + `notify_waiters()` | Broadcast wake-up |
| One-time initialization | `OnceCell` / `tokio::sync::OnceCell` | Lazy static-like patterns |
| Lazy static values | `std::sync::LazyLock` | Replaces `once_cell::sync::Lazy` and `lazy_static!` (stable 1.80) |
## tokio::sync::Mutex vs std::sync::Mutex
**Use `tokio::sync::Mutex` when:**
- Lock is held across `.await` points
- Lock contention is high and you don't want to block a runtime thread
- The protected section does async I/O
**Use `std::sync::Mutex` when:**
- Critical section is short and contains no `.await`
- You need `Send + Sync` without `async` overhead
- Performance matters and the lock is rarely contended
```rust
// std::sync::Mutex — good for fast, non-async access
use std::sync::Mutex;
struct Counter(Mutex<u64>);
impl Counter {
fn increment(&self) {
let mut count = self.0.lock().unwrap();
*count += 1;
// guard dropped immediately — no await in between
}
}
// tokio::sync::Mutex — needed when holding across await
use tokio::sync::Mutex;
struct Cache(Mutex<HashMap<String, Data>>);
impl Cache {
async fn get_or_fetch(&self, key: &str) -> Data {
let mut cache = self.0.lock().await;
if let Some(data) = cache.get(key) {
return data.clone();
}
// CAUTION: holding lock across await
let data = fetch(key).await;
cache.insert(key.to_owned(), data.clone());
data
}
}
```
## Semaphore
Limits the number of concurrent operations. Useful for connection pools, rate limiting, and resource management.
```rust
use tokio::sync::Semaphore;
use std::sync::Arc;
let semaphore = Arc::new(Semaphore::new(10)); // max 10 concurrent
for item in items {
let permit = semaphore.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
process(item).await;
drop(permit); // explicitly release, or let it drop
});
}
```
For try-acquire (non-blocking):
```rust
match semaphore.try_acquire() {
Ok(permit) => { /* proceed */ }
Err(_) => { /* at capacity, back off */ }
}
```
## RwLock
Allows multiple concurrent readers or one exclusive writer.
```rust
use tokio::sync::RwLock;
let config = Arc::new(RwLock::new(Config::default()));
// Many readers concurrently
let cfg = config.read().await;
let port = cfg.port;
drop(cfg);
// Exclusive writer
let mut cfg = config.write().await;
cfg.port = 8080;
```
**Watch for writer starvation:** if readers never release, writers wait forever. tokio's `RwLock` is write-preferring by default to mitigate this.
## LazyLock (Rust 1.80+)
`std::sync::LazyLock` (stable since 1.80) replaces the `once_cell` and `lazy_static` crates for runtime-initialized global singletons. In async/tokio code, this is commonly used for shared clients, connection info, or regex patterns.
```rust
// BAD - external dependency no longer needed
use once_cell::sync::Lazy;
static CLIENT: Lazy<reqwest::Client> = Lazy::new(|| {
reqwest::Client::builder().timeout(Duration::from_secs(30)).build().unwrap()
});
// BAD - macro-based, also superseded
lazy_static::lazy_static! {
static ref CLIENT: reqwest::Client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.unwrap();
}
// GOOD (edition 2024) - std library, no external crate
use std::sync::LazyLock;
static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(|| {
reqwest::Client::builder().timeout(Duration::from_secs(30)).build().unwrap()
});
```
For single-threaded or non-Sync contexts, use `std::cell::LazyCell` instead.
**Note:** `tokio::sync::OnceCell` is still preferred when the initialization itself is async (requires `.await`), since `LazyLock` only supports synchronous initialization closures.
## if let Temporary Scope Changes (Edition 2024)
In Rust 2024, temporaries in `if let` conditions are dropped at the end of the `if let` **condition**, not at the end of the block. This affects async lock guard patterns.
```rust
// Edition 2021 - guard lives through the if-let body
if let Some(val) = state.lock().await.get("key") {
// guard is still alive here in edition 2021
do_work(val).await; // holding the lock across await — risky but compiles
}
// Edition 2024 - guard is dropped after the condition evaluates
// val would be a dangling reference — this may fail to compile
if let Some(val) = state.lock().await.get("key") {
do_work(val).await; // guard already dropped!
}
// GOOD - explicit binding extends the guard's lifetime
let guard = state.lock().await;
if let Some(val) = guard.get("key") {
do_work(val).await;
}
drop(guard);
// GOOD - clone the value to avoid depending on guard lifetime
if let Some(val) = state.lock().await.get("key").cloned() {
do_work(val).await; // val is owned, guard already dropped — safe
}
```
This also applies to `while let` and `match` with temporary-producing expressions. Review any pattern where a lock guard is created inline in a conditional.
## Common Mistakes
### Deadlock via Lock Ordering
```rust
// BAD - potential deadlock if another task locks B then A
let _a = state_a.lock().await;
let _b = state_b.lock().await;
// GOOD - always lock in consistent order, or use a single lock
```
### Forgetting to Drop Guards
```rust
// BAD - guard lives until end of scope, holding lock during await
let guard = state.lock().await;
let value = guard.get_value();
do_async_work(value).await; // guard still held!
// GOOD - extract value and drop guard
let value = {
let guard = state.lock().await;
guard.get_value().clone()
};
do_async_work(value).await;
```
## Review Questions
1. Is the right sync primitive chosen for the access pattern?
2. Are mutex guards dropped before `.await` points?
3. Is lock ordering consistent to prevent deadlocks?
4. Is `Semaphore` used instead of ad-hoc concurrency limits?
5. Are `std::sync` vs `tokio::sync` primitives matched to their context?
6. Are `once_cell` / `lazy_static` usages replaced with `std::sync::LazyLock` where possible?
7. Do `if let` / `while let` patterns with inline lock guards account for edition 2024 temporary scoping?
FILE:references/task-management.md
# Task Management
## Spawning Tasks
### tokio::spawn
Creates an independent task on the runtime. The spawned future must be `Send + 'static`.
```rust
// Basic spawn with error handling
let handle = tokio::spawn(async move {
process(data).await
});
match handle.await {
Ok(Ok(result)) => tracing::info!(?result, "task completed"),
Ok(Err(e)) => tracing::error!(error = %e, "task failed"),
Err(e) => tracing::error!(error = %e, "task panicked"),
}
```
### tokio::spawn_blocking
Runs a closure on a dedicated thread pool for blocking operations. Returns a `JoinHandle` like `spawn`.
```rust
// CPU-heavy work belongs on blocking threads
let hash = tokio::task::spawn_blocking(move || {
argon2::hash_password(&password, &salt)
}).await??;
// Synchronous file I/O
let contents = tokio::task::spawn_blocking(move || {
std::fs::read_to_string(path)
}).await??;
```
### JoinSet for Structured Concurrency
`JoinSet` manages a group of tasks with collective lifecycle control. Preferred over tracking individual `JoinHandle`s when spawning dynamic numbers of tasks.
```rust
use tokio::task::JoinSet;
let mut set = JoinSet::new();
for item in items {
set.spawn(async move {
process(item).await
});
}
// Collect all results
while let Some(result) = set.join_next().await {
match result {
Ok(Ok(value)) => results.push(value),
Ok(Err(e)) => tracing::warn!(error = %e, "task failed"),
Err(e) => tracing::error!(error = %e, "task panicked"),
}
}
```
When a `JoinSet` is dropped, all tasks in it are cancelled (aborted). This provides automatic cleanup.
## Cancellation
### CancellationToken (tokio-util)
Hierarchical cancellation for structured shutdown. Child tokens are cancelled when parents are.
```rust
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
// Worker respects cancellation
let child = token.child_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = child.cancelled() => break,
item = rx.recv() => {
if let Some(item) = item {
process(item).await;
}
}
}
}
});
// On shutdown:
token.cancel(); // cancels all children
```
### select! Cancellation Safety
When `tokio::select!` resolves one branch, other branches are dropped. A future is cancellation-safe if dropping it at any `.await` point doesn't lose data.
**Cancellation-safe operations:**
- `tokio::sync::mpsc::Receiver::recv()`
- `tokio::sync::oneshot::Receiver::recv()`
- `tokio::time::sleep()`
- `tokio::io::AsyncReadExt::read()` (data goes to caller's buffer)
**NOT cancellation-safe:**
- `tokio::io::AsyncReadExt::read_exact()` — partial reads are lost
- Custom futures that do internal buffering
```rust
// RISKY - read_exact may partially fill buffer then get cancelled
tokio::select! {
result = reader.read_exact(&mut buf) => { ... }
_ = cancel.cancelled() => { return; }
}
// SAFER - use read() and handle partial reads manually
tokio::select! {
result = reader.read(&mut buf) => { ... }
_ = cancel.cancelled() => { return; }
}
```
## async fn in Traits (Rust 2024 Edition)
Since Rust 1.75, `async fn` works directly in trait definitions without the `async-trait` crate. This matters for tokio-based service patterns.
```rust
// BAD (edition 2024) - unnecessary async-trait dependency
#[async_trait::async_trait]
trait Handler: Send + Sync {
async fn handle(&self, request: Request) -> Response;
}
// GOOD (edition 2024) - native async fn in traits
trait Handler: Send + Sync {
fn handle(&self, request: Request) -> impl Future<Output = Response> + Send;
}
// GOOD (edition 2024) - also valid with async fn directly
trait Handler: Send + Sync {
async fn handle(&self, request: Request) -> Response;
}
```
**When `async-trait` is still needed:**
- Trait objects (`dyn Handler`) — native async traits are not yet object-safe
- When you need `Box<dyn Future>` return types for dynamic dispatch
### RPIT Lifetime Capture in Async Contexts
In edition 2024, `-> impl Trait` captures ALL in-scope lifetimes by default (including elided ones). This can cause unexpected borrow-checker errors in async code that returns `impl Future`.
```rust
// Edition 2021 - only captures 'a explicitly
fn process(data: &str) -> impl Future<Output = ()> {
async { /* ... */ }
}
// Edition 2024 - now captures the lifetime of `data` by default
// This may cause "borrowed value does not live long enough" errors
fn process(data: &str) -> impl Future<Output = ()> {
async { /* ... */ }
}
// GOOD - use precise capturing to opt out of capturing the borrow
fn process(data: &str) -> impl Future<Output = ()> + use<> {
let owned = data.to_owned();
async move { /* use owned */ }
}
// GOOD - if the future genuinely needs the borrow, capture it explicitly
fn process<'a>(data: &'a str) -> impl Future<Output = ()> + use<'a> {
async { println!("{data}"); }
}
```
This is especially relevant for spawned tasks, which require `'static`:
```rust
// BAD (edition 2024) - impl Future captures the borrow, can't spawn
fn make_task(config: &Config) -> impl Future<Output = ()> {
let value = config.get_value();
async move { use_value(value).await; }
}
// GOOD - precise capture excludes the borrow
fn make_task(config: &Config) -> impl Future<Output = ()> + use<> {
let value = config.get_value();
async move { use_value(value).await; }
}
```
## Review Questions
1. Are all `JoinHandle`s either awaited, stored, or deliberately dropped with comment?
2. Is `spawn_blocking` used for CPU-heavy or synchronous I/O work?
3. Are task groups managed with `JoinSet` instead of manual handle tracking?
4. Is cancellation implemented via `CancellationToken` or equivalent?
5. Are `select!` branches cancellation-safe?
6. Is `async-trait` crate used where native `async fn` in traits would suffice?
7. Do `-> impl Future` returns have correct lifetime capture behavior for edition 2024?