@clawhub-alirezarezvani-9164a8924b
This skill should be used when the user asks to "design system architecture", "evaluate microservices vs monolith", "create architecture diagrams", "analyze...
---
name: "senior-architect"
description: This skill should be used when the user asks to "design system architecture", "evaluate microservices vs monolith", "create architecture diagrams", "analyze dependencies", "choose a database", "plan for scalability", "make technical decisions", or "review system design". Use for architecture decision records (ADRs), tech stack evaluation, system design reviews, dependency analysis, and generating architecture diagrams in Mermaid, PlantUML, or ASCII format.
---
# Senior Architect
Architecture design and analysis tools for making informed technical decisions.
## Table of Contents
- [Quick Start](#quick-start)
- [Tools Overview](#tools-overview)
- [Architecture Diagram Generator](#1-architecture-diagram-generator)
- [Dependency Analyzer](#2-dependency-analyzer)
- [Project Architect](#3-project-architect)
- [Decision Workflows](#decision-workflows)
- [Database Selection](#database-selection-workflow)
- [Architecture Pattern Selection](#architecture-pattern-selection-workflow)
- [Monolith vs Microservices](#monolith-vs-microservices-decision)
- [Reference Documentation](#reference-documentation)
- [Tech Stack Coverage](#tech-stack-coverage)
- [Common Commands](#common-commands)
---
## Quick Start
```bash
# Generate architecture diagram from project
python scripts/architecture_diagram_generator.py ./my-project --format mermaid
# Analyze dependencies for issues
python scripts/dependency_analyzer.py ./my-project --output json
# Get architecture assessment
python scripts/project_architect.py ./my-project --verbose
```
---
## Tools Overview
### 1. Architecture Diagram Generator
Generates architecture diagrams from project structure in multiple formats.
**Solves:** "I need to visualize my system architecture for documentation or team discussion"
**Input:** Project directory path
**Output:** Diagram code (Mermaid, PlantUML, or ASCII)
**Supported diagram types:**
- `component` - Shows modules and their relationships
- `layer` - Shows architectural layers (presentation, business, data)
- `deployment` - Shows deployment topology
**Usage:**
```bash
# Mermaid format (default)
python scripts/architecture_diagram_generator.py ./project --format mermaid --type component
# PlantUML format
python scripts/architecture_diagram_generator.py ./project --format plantuml --type layer
# ASCII format (terminal-friendly)
python scripts/architecture_diagram_generator.py ./project --format ascii
# Save to file
python scripts/architecture_diagram_generator.py ./project -o architecture.md
```
**Example output (Mermaid):**
```mermaid
graph TD
A[API Gateway] --> B[Auth Service]
A --> C[User Service]
B --> D[(PostgreSQL)]
C --> D
```
---
### 2. Dependency Analyzer
Analyzes project dependencies for coupling, circular dependencies, and outdated packages.
**Solves:** "I need to understand my dependency tree and identify potential issues"
**Input:** Project directory path
**Output:** Analysis report (JSON or human-readable)
**Analyzes:**
- Dependency tree (direct and transitive)
- Circular dependencies between modules
- Coupling score (0-100)
- Outdated packages
**Supported package managers:**
- npm/yarn (`package.json`)
- Python (`requirements.txt`, `pyproject.toml`)
- Go (`go.mod`)
- Rust (`Cargo.toml`)
**Usage:**
```bash
# Human-readable report
python scripts/dependency_analyzer.py ./project
# JSON output for CI/CD integration
python scripts/dependency_analyzer.py ./project --output json
# Check only for circular dependencies
python scripts/dependency_analyzer.py ./project --check circular
# Verbose mode with recommendations
python scripts/dependency_analyzer.py ./project --verbose
```
**Example output:**
```
Dependency Analysis Report
==========================
Total dependencies: 47 (32 direct, 15 transitive)
Coupling score: 72/100 (moderate)
Issues found:
- CIRCULAR: auth → user → permissions → auth
- OUTDATED: lodash 4.17.15 → 4.17.21 (security)
Recommendations:
1. Extract shared interface to break circular dependency
2. Update lodash to fix CVE-2020-8203
```
---
### 3. Project Architect
Analyzes project structure and detects architectural patterns, code smells, and improvement opportunities.
**Solves:** "I want to understand the current architecture and identify areas for improvement"
**Input:** Project directory path
**Output:** Architecture assessment report
**Detects:**
- Architectural patterns (MVC, layered, hexagonal, microservices indicators)
- Code organization issues (god classes, mixed concerns)
- Layer violations
- Missing architectural components
**Usage:**
```bash
# Full assessment
python scripts/project_architect.py ./project
# Verbose with detailed recommendations
python scripts/project_architect.py ./project --verbose
# JSON output
python scripts/project_architect.py ./project --output json
# Check specific aspect
python scripts/project_architect.py ./project --check layers
```
**Example output:**
```
Architecture Assessment
=======================
Detected pattern: Layered Architecture (confidence: 85%)
Structure analysis:
✓ controllers/ - Presentation layer detected
✓ services/ - Business logic layer detected
✓ repositories/ - Data access layer detected
⚠ models/ - Mixed domain and DTOs
Issues:
- LARGE FILE: UserService.ts (1,847 lines) - consider splitting
- MIXED CONCERNS: PaymentController contains business logic
Recommendations:
1. Split UserService into focused services
2. Move business logic from controllers to services
3. Separate domain models from DTOs
```
---
## Decision Workflows
### Database Selection Workflow
Use when choosing a database for a new project or migrating existing data.
**Step 1: Identify data characteristics**
| Characteristic | Points to SQL | Points to NoSQL |
|----------------|---------------|-----------------|
| Structured with relationships | ✓ | |
| ACID transactions required | ✓ | |
| Flexible/evolving schema | | ✓ |
| Document-oriented data | | ✓ |
| Time-series data | | ✓ (specialized) |
**Step 2: Evaluate scale requirements**
- <1M records, single region → PostgreSQL or MySQL
- 1M-100M records, read-heavy → PostgreSQL with read replicas
- >100M records, global distribution → CockroachDB, Spanner, or DynamoDB
- High write throughput (>10K/sec) → Cassandra or ScyllaDB
**Step 3: Check consistency requirements**
- Strong consistency required → SQL or CockroachDB
- Eventual consistency acceptable → DynamoDB, Cassandra, MongoDB
**Step 4: Document decision**
Create an ADR (Architecture Decision Record) with:
- Context and requirements
- Options considered
- Decision and rationale
- Trade-offs accepted
**Quick reference:**
```
PostgreSQL → Default choice for most applications
MongoDB → Document store, flexible schema
Redis → Caching, sessions, real-time features
DynamoDB → Serverless, auto-scaling, AWS-native
TimescaleDB → Time-series data with SQL interface
```
---
### Architecture Pattern Selection Workflow
Use when designing a new system or refactoring existing architecture.
**Step 1: Assess team and project size**
| Team Size | Recommended Starting Point |
|-----------|---------------------------|
| 1-3 developers | Modular monolith |
| 4-10 developers | Modular monolith or service-oriented |
| 10+ developers | Consider microservices |
**Step 2: Evaluate deployment requirements**
- Single deployment unit acceptable → Monolith
- Independent scaling needed → Microservices
- Mixed (some services scale differently) → Hybrid
**Step 3: Consider data boundaries**
- Shared database acceptable → Monolith or modular monolith
- Strict data isolation required → Microservices with separate DBs
- Event-driven communication fits → Event-sourcing/CQRS
**Step 4: Match pattern to requirements**
| Requirement | Recommended Pattern |
|-------------|-------------------|
| Rapid MVP development | Modular Monolith |
| Independent team deployment | Microservices |
| Complex domain logic | Domain-Driven Design |
| High read/write ratio difference | CQRS |
| Audit trail required | Event Sourcing |
| Third-party integrations | Hexagonal/Ports & Adapters |
See `references/architecture_patterns.md` for detailed pattern descriptions.
---
### Monolith vs Microservices Decision
**Choose Monolith when:**
- [ ] Team is small (<10 developers)
- [ ] Domain boundaries are unclear
- [ ] Rapid iteration is priority
- [ ] Operational complexity must be minimized
- [ ] Shared database is acceptable
**Choose Microservices when:**
- [ ] Teams can own services end-to-end
- [ ] Independent deployment is critical
- [ ] Different scaling requirements per component
- [ ] Technology diversity is needed
- [ ] Domain boundaries are well understood
**Hybrid approach:**
Start with a modular monolith. Extract services only when:
1. A module has significantly different scaling needs
2. A team needs independent deployment
3. Technology constraints require separation
---
## Reference Documentation
Load these files for detailed information:
| File | Contains | Load when user asks about |
|------|----------|--------------------------|
| `references/architecture_patterns.md` | 9 architecture patterns with trade-offs, code examples, and when to use | "which pattern?", "microservices vs monolith", "event-driven", "CQRS" |
| `references/system_design_workflows.md` | 6 step-by-step workflows for system design tasks | "how to design?", "capacity planning", "API design", "migration" |
| `references/tech_decision_guide.md` | Decision matrices for technology choices | "which database?", "which framework?", "which cloud?", "which cache?" |
---
## Tech Stack Coverage
**Languages:** TypeScript, JavaScript, Python, Go, Swift, Kotlin, Rust
**Frontend:** React, Next.js, Vue, Angular, React Native, Flutter
**Backend:** Node.js, Express, FastAPI, Go, GraphQL, REST
**Databases:** PostgreSQL, MySQL, MongoDB, Redis, DynamoDB, Cassandra
**Infrastructure:** Docker, Kubernetes, Terraform, AWS, GCP, Azure
**CI/CD:** GitHub Actions, GitLab CI, CircleCI, Jenkins
---
## Common Commands
```bash
# Architecture visualization
python scripts/architecture_diagram_generator.py . --format mermaid
python scripts/architecture_diagram_generator.py . --format plantuml
python scripts/architecture_diagram_generator.py . --format ascii
# Dependency analysis
python scripts/dependency_analyzer.py . --verbose
python scripts/dependency_analyzer.py . --check circular
python scripts/dependency_analyzer.py . --output json
# Architecture assessment
python scripts/project_architect.py . --verbose
python scripts/project_architect.py . --check layers
python scripts/project_architect.py . --output json
```
---
## Getting Help
1. Run any script with `--help` for usage information
2. Check reference documentation for detailed patterns and workflows
3. Use `--verbose` flag for detailed explanations and recommendations
FILE:references/architecture_patterns.md
# Architecture Patterns Reference
Detailed guide to software architecture patterns with trade-offs and implementation guidance.
## Patterns Index
1. [Monolithic Architecture](#1-monolithic-architecture)
2. [Modular Monolith](#2-modular-monolith)
3. [Microservices Architecture](#3-microservices-architecture)
4. [Event-Driven Architecture](#4-event-driven-architecture)
5. [CQRS (Command Query Responsibility Segregation)](#5-cqrs)
6. [Event Sourcing](#6-event-sourcing)
7. [Hexagonal Architecture (Ports & Adapters)](#7-hexagonal-architecture)
8. [Clean Architecture](#8-clean-architecture)
9. [API Gateway Pattern](#9-api-gateway-pattern)
---
## 1. Monolithic Architecture
**Problem it solves:** Need to build and deploy a complete application as a single unit with minimal operational complexity.
**When to use:**
- Small team (1-5 developers)
- MVP or early-stage product
- Simple domain with clear boundaries
- Deployment simplicity is priority
**When NOT to use:**
- Multiple teams need independent deployment
- Parts of system have vastly different scaling needs
- Technology diversity is required
**Trade-offs:**
| Pros | Cons |
|------|------|
| Simple deployment | Scaling is all-or-nothing |
| Easy debugging | Large codebase becomes unwieldy |
| No network latency between components | Single point of failure |
| Simple testing | Technology lock-in |
**Structure example:**
```
monolith/
├── src/
│ ├── controllers/ # HTTP handlers
│ ├── services/ # Business logic
│ ├── repositories/ # Data access
│ ├── models/ # Domain entities
│ └── utils/ # Shared utilities
├── tests/
└── package.json
```
---
## 2. Modular Monolith
**Problem it solves:** Need monolith simplicity but with clear boundaries that enable future extraction to services.
**When to use:**
- Medium team (5-15 developers)
- Domain boundaries are becoming clearer
- Want option to extract services later
- Need better code organization than traditional monolith
**When NOT to use:**
- Already need independent deployment
- Teams can't coordinate releases
**Trade-offs:**
| Pros | Cons |
|------|------|
| Clear module boundaries | Still single deployment |
| Easier to extract services later | Requires discipline to maintain boundaries |
| Single database simplifies transactions | Can drift back to coupled monolith |
| Team ownership of modules | |
**Structure example:**
```
modular-monolith/
├── modules/
│ ├── users/
│ │ ├── api/ # Public interface
│ │ ├── internal/ # Implementation
│ │ └── index.ts # Module exports
│ ├── orders/
│ │ ├── api/
│ │ ├── internal/
│ │ └── index.ts
│ └── payments/
├── shared/ # Cross-cutting concerns
└── main.ts
```
**Key rule:** Modules communicate only through their public API, never by importing internal files.
---
## 3. Microservices Architecture
**Problem it solves:** Need independent deployment, scaling, and technology choices for different parts of the system.
**When to use:**
- Large team (15+ developers) organized around business capabilities
- Different parts need different scaling
- Independent deployment is critical
- Technology diversity is beneficial
**When NOT to use:**
- Small team that can't handle operational complexity
- Domain boundaries are unclear
- Distributed transactions are common requirement
- Network latency is unacceptable
**Trade-offs:**
| Pros | Cons |
|------|------|
| Independent deployment | Network complexity |
| Independent scaling | Distributed system challenges |
| Technology flexibility | Operational overhead |
| Team autonomy | Data consistency challenges |
| Fault isolation | Testing complexity |
**Structure example:**
```
microservices/
├── services/
│ ├── user-service/
│ │ ├── src/
│ │ ├── Dockerfile
│ │ └── package.json
│ ├── order-service/
│ └── payment-service/
├── api-gateway/
├── infrastructure/
│ ├── kubernetes/
│ └── terraform/
└── docker-compose.yml
```
**Communication patterns:**
- Synchronous: REST, gRPC
- Asynchronous: Message queues (RabbitMQ, Kafka)
---
## 4. Event-Driven Architecture
**Problem it solves:** Need loose coupling between components that react to business events asynchronously.
**When to use:**
- Components need loose coupling
- Audit trail of all changes is valuable
- Real-time reactions to events
- Multiple consumers for same events
**When NOT to use:**
- Simple CRUD operations
- Synchronous responses required
- Team unfamiliar with async patterns
- Debugging simplicity is priority
**Trade-offs:**
| Pros | Cons |
|------|------|
| Loose coupling | Eventual consistency |
| Scalability | Debugging complexity |
| Audit trail built-in | Message ordering challenges |
| Easy to add new consumers | Infrastructure complexity |
**Event structure example:**
```typescript
interface DomainEvent {
eventId: string;
eventType: string;
aggregateId: string;
timestamp: Date;
payload: Record<string, unknown>;
metadata: {
correlationId: string;
causationId: string;
};
}
// Example event
const orderCreated: DomainEvent = {
eventId: "evt-123",
eventType: "OrderCreated",
aggregateId: "order-456",
timestamp: new Date(),
payload: {
customerId: "cust-789",
items: [...],
total: 99.99
},
metadata: {
correlationId: "req-001",
causationId: "cmd-create-order"
}
};
```
---
## 5. CQRS
**Problem it solves:** Read and write workloads have different requirements and need to be optimized separately.
**When to use:**
- Read/write ratio is heavily skewed (10:1 or more)
- Read and write models differ significantly
- Complex queries that don't map to write model
- Different scaling needs for reads vs writes
**When NOT to use:**
- Simple CRUD with balanced reads/writes
- Read and write models are nearly identical
- Team unfamiliar with pattern
- Added complexity isn't justified
**Trade-offs:**
| Pros | Cons |
|------|------|
| Optimized read models | Eventual consistency between models |
| Independent scaling | Complexity |
| Simplified queries | Synchronization logic |
| Better performance | More code to maintain |
**Structure example:**
```typescript
// Write side (Commands)
interface CreateOrderCommand {
customerId: string;
items: OrderItem[];
}
class OrderCommandHandler {
async handle(cmd: CreateOrderCommand): Promise<void> {
const order = Order.create(cmd);
await this.repository.save(order);
await this.eventBus.publish(order.events);
}
}
// Read side (Queries)
interface OrderSummaryQuery {
customerId: string;
dateRange: DateRange;
}
class OrderQueryHandler {
async handle(query: OrderSummaryQuery): Promise<OrderSummary[]> {
// Query optimized read model (denormalized)
return this.readDb.query(`
SELECT * FROM order_summaries
WHERE customer_id = ? AND created_at BETWEEN ? AND ?
`, [query.customerId, query.dateRange.start, query.dateRange.end]);
}
}
```
---
## 6. Event Sourcing
**Problem it solves:** Need complete audit trail and ability to reconstruct state at any point in time.
**When to use:**
- Audit trail is regulatory requirement
- Need to answer "how did we get here?"
- Complex domain with undo/redo requirements
- Debugging production issues requires history
**When NOT to use:**
- Simple CRUD applications
- No audit requirements
- Team unfamiliar with pattern
- Reporting on current state is primary need
**Trade-offs:**
| Pros | Cons |
|------|------|
| Complete audit trail | Storage grows indefinitely |
| Time-travel debugging | Query complexity |
| Natural fit for event-driven | Learning curve |
| Enables CQRS | Eventual consistency |
**Implementation example:**
```typescript
// Events
type OrderEvent =
| { type: 'OrderCreated'; customerId: string; items: Item[] }
| { type: 'ItemAdded'; itemId: string; quantity: number }
| { type: 'OrderShipped'; trackingNumber: string };
// Aggregate rebuilt from events
class Order {
private state: OrderState;
static fromEvents(events: OrderEvent[]): Order {
const order = new Order();
events.forEach(event => order.apply(event));
return order;
}
private apply(event: OrderEvent): void {
switch (event.type) {
case 'OrderCreated':
this.state = { status: 'created', items: event.items };
break;
case 'ItemAdded':
this.state.items.push({ id: event.itemId, qty: event.quantity });
break;
case 'OrderShipped':
this.state.status = 'shipped';
this.state.trackingNumber = event.trackingNumber;
break;
}
}
}
```
---
## 7. Hexagonal Architecture
**Problem it solves:** Need to isolate business logic from external concerns (databases, APIs, UI) for testability and flexibility.
**When to use:**
- Business logic is complex and valuable
- Multiple interfaces to same domain (API, CLI, events)
- Testability is priority
- External systems may change
**When NOT to use:**
- Simple CRUD with no business logic
- Single interface to domain
- Overhead isn't justified
**Trade-offs:**
| Pros | Cons |
|------|------|
| Business logic isolation | More abstractions |
| Highly testable | Initial setup overhead |
| External systems are swappable | Can be over-engineered |
| Clear boundaries | Learning curve |
**Structure example:**
```
hexagonal/
├── domain/ # Business logic (no external deps)
│ ├── entities/
│ ├── services/
│ └── ports/ # Interfaces (what domain needs)
│ ├── OrderRepository.ts
│ └── PaymentGateway.ts
├── adapters/ # Implementations
│ ├── persistence/ # Database adapters
│ │ └── PostgresOrderRepository.ts
│ ├── payment/ # External service adapters
│ │ └── StripePaymentGateway.ts
│ └── api/ # HTTP adapters
│ └── OrderController.ts
└── config/ # Wiring it all together
```
---
## 8. Clean Architecture
**Problem it solves:** Need clear dependency rules where business logic doesn't depend on frameworks or external systems.
**When to use:**
- Long-lived applications that will outlive frameworks
- Business logic is the core value
- Team discipline to maintain boundaries
- Multiple delivery mechanisms (web, mobile, CLI)
**When NOT to use:**
- Short-lived projects
- Framework-centric applications
- Simple CRUD operations
**Trade-offs:**
| Pros | Cons |
|------|------|
| Framework independence | More code |
| Testable business logic | Can feel over-engineered |
| Clear dependency direction | Learning curve |
| Flexible delivery mechanisms | Initial setup cost |
**Dependency rule:** Dependencies point inward. Inner circles know nothing about outer circles.
```
┌─────────────────────────────────────────┐
│ Frameworks & Drivers │
│ ┌─────────────────────────────────┐ │
│ │ Interface Adapters │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ Application Layer │ │ │
│ │ │ ┌─────────────────┐ │ │ │
│ │ │ │ Entities │ │ │ │
│ │ │ │ (Domain Logic) │ │ │ │
│ │ │ └─────────────────┘ │ │ │
│ │ └─────────────────────────┘ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
```
---
## 9. API Gateway Pattern
**Problem it solves:** Need single entry point for clients that routes to multiple backend services.
**When to use:**
- Multiple backend services
- Cross-cutting concerns (auth, rate limiting, logging)
- Different clients need different APIs
- Service aggregation needed
**When NOT to use:**
- Single backend service
- Simplicity is priority
- Team can't maintain gateway
**Trade-offs:**
| Pros | Cons |
|------|------|
| Single entry point | Single point of failure |
| Cross-cutting concerns centralized | Additional latency |
| Backend service abstraction | Complexity |
| Client-specific APIs | Can become bottleneck |
**Responsibilities:**
```
┌─────────────────────────────────────┐
│ API Gateway │
├─────────────────────────────────────┤
│ • Authentication/Authorization │
│ • Rate limiting │
│ • Request/Response transformation │
│ • Load balancing │
│ • Circuit breaking │
│ • Caching │
│ • Logging/Monitoring │
└─────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐
│Svc A│ │Svc B│ │Svc C│
└─────┘ └─────┘ └─────┘
```
---
## Pattern Selection Quick Reference
| If you need... | Consider... |
|----------------|-------------|
| Simplicity, small team | Monolith |
| Clear boundaries, future flexibility | Modular Monolith |
| Independent deployment/scaling | Microservices |
| Loose coupling, async processing | Event-Driven |
| Separate read/write optimization | CQRS |
| Complete audit trail | Event Sourcing |
| Testable, swappable externals | Hexagonal |
| Framework independence | Clean Architecture |
| Single entry point, multiple services | API Gateway |
FILE:references/system_design_workflows.md
# System Design Workflows
Step-by-step workflows for common system design tasks.
## Workflows Index
1. [System Design Interview Approach](#1-system-design-interview-approach)
2. [Capacity Planning Workflow](#2-capacity-planning-workflow)
3. [API Design Workflow](#3-api-design-workflow)
4. [Database Schema Design](#4-database-schema-design-workflow)
5. [Scalability Assessment](#5-scalability-assessment-workflow)
6. [Migration Planning](#6-migration-planning-workflow)
---
## 1. System Design Interview Approach
Use when designing a system from scratch or explaining architecture decisions.
### Step 1: Clarify Requirements (3-5 minutes)
**Functional requirements:**
- What are the core features?
- Who are the users?
- What actions can users take?
**Non-functional requirements:**
- Expected scale (users, requests/sec, data size)
- Latency requirements
- Availability requirements (99.9%? 99.99%?)
- Consistency requirements (strong? eventual?)
**Example questions to ask:**
```
- How many users? Daily active users?
- Read/write ratio?
- Data retention period?
- Geographic distribution?
- Peak vs average load?
```
### Step 2: Estimate Scale (2-3 minutes)
**Calculate key metrics:**
```
Users: 10M monthly active users
DAU: 1M daily active users
Requests: 100 req/user/day = 100M req/day
= 1,200 req/sec (avg)
= 3,600 req/sec (peak, 3x)
Storage: 1KB/request × 100M = 100GB/day
= 36TB/year
Bandwidth: 100GB/day = 1.2 MB/sec (avg)
```
### Step 3: Design High-Level Architecture (5-10 minutes)
**Start with basic components:**
```
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Client │────▶│ API │────▶│ Database │
└──────────┘ └──────────┘ └──────────┘
```
**Add components as needed:**
- Load balancer for traffic distribution
- Cache for read-heavy workloads
- CDN for static content
- Message queue for async processing
- Search index for complex queries
### Step 4: Deep Dive into Components (10-15 minutes)
**For each major component, discuss:**
- Why this technology choice?
- How does it handle failures?
- How does it scale?
- What are the trade-offs?
### Step 5: Address Bottlenecks (5 minutes)
**Common bottlenecks:**
- Database read/write capacity
- Network bandwidth
- Single points of failure
- Hot spots in data distribution
**Solutions:**
- Caching (Redis, Memcached)
- Database sharding
- Read replicas
- CDN for static content
- Async processing for non-critical paths
---
## 2. Capacity Planning Workflow
Use when estimating infrastructure requirements for a new system or feature.
### Step 1: Gather Requirements
| Metric | Current | 6 months | 1 year |
|--------|---------|----------|--------|
| Monthly active users | | | |
| Peak concurrent users | | | |
| Requests per second | | | |
| Data storage (GB) | | | |
| Bandwidth (Mbps) | | | |
### Step 2: Calculate Compute Requirements
**Web/API servers:**
```
Peak RPS: 3,600
Requests per server: 500 (conservative)
Servers needed: 3,600 / 500 = 8 servers
With redundancy (N+2): 10 servers
```
**CPU estimation:**
```
Per request: 50ms CPU time
Peak RPS: 3,600
CPU cores: 3,600 × 0.05 = 180 cores
With headroom (70% target utilization):
180 / 0.7 = 257 cores
= 32 servers × 8 cores
```
### Step 3: Calculate Storage Requirements
**Database storage:**
```
Records per day: 100,000
Record size: 2KB
Daily growth: 200MB
With indexes (2x): 400MB/day
Retention (1 year): 146GB
With replication (3x): 438GB
```
**File storage:**
```
Files per day: 10,000
Average file size: 500KB
Daily growth: 5GB
Retention (1 year): 1.8TB
```
### Step 4: Calculate Network Requirements
**Bandwidth:**
```
Response size: 10KB average
Peak RPS: 3,600
Outbound: 3,600 × 10KB = 36MB/s = 288 Mbps
With headroom (50%): 432 Mbps ≈ 500 Mbps connection
```
### Step 5: Document and Review
**Create capacity plan document:**
- Current requirements
- Growth projections
- Infrastructure recommendations
- Cost estimates
- Review triggers (when to re-evaluate)
---
## 3. API Design Workflow
Use when designing new APIs or refactoring existing ones.
### Step 1: Identify Resources
**List the nouns in your domain:**
```
E-commerce example:
- Users
- Products
- Orders
- Payments
- Reviews
```
### Step 2: Define Operations
**Map CRUD to HTTP methods:**
| Operation | HTTP Method | URL Pattern |
|-----------|-------------|-------------|
| List | GET | /resources |
| Get one | GET | /resources/{id} |
| Create | POST | /resources |
| Update | PUT/PATCH | /resources/{id} |
| Delete | DELETE | /resources/{id} |
### Step 3: Design Request/Response Formats
**Request example:**
```json
POST /api/v1/orders
Content-Type: application/json
{
"customer_id": "cust-123",
"items": [
{"product_id": "prod-456", "quantity": 2}
],
"shipping_address": {
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip": "94102"
}
}
```
**Response example:**
```json
HTTP/1.1 201 Created
Content-Type: application/json
{
"id": "ord-789",
"status": "pending",
"customer_id": "cust-123",
"items": [...],
"total": 99.99,
"created_at": "2024-01-15T10:30:00Z",
"_links": {
"self": "/api/v1/orders/ord-789",
"customer": "/api/v1/customers/cust-123"
}
}
```
### Step 4: Handle Errors Consistently
**Error response format:**
```json
HTTP/1.1 400 Bad Request
Content-Type: application/json
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid request parameters",
"details": [
{
"field": "quantity",
"message": "must be greater than 0"
}
]
},
"request_id": "req-abc123"
}
```
**Standard error codes:**
| HTTP Status | Use Case |
|-------------|----------|
| 400 | Validation errors |
| 401 | Authentication required |
| 403 | Permission denied |
| 404 | Resource not found |
| 409 | Conflict (duplicate, etc.) |
| 429 | Rate limit exceeded |
| 500 | Internal server error |
### Step 5: Document the API
**Include:**
- Authentication method
- Base URL and versioning
- Endpoints with examples
- Error codes and meanings
- Rate limits
- Pagination format
---
## 4. Database Schema Design Workflow
Use when designing a new database or major schema changes.
### Step 1: Identify Entities
**List the things you need to store:**
```
E-commerce:
- User (id, email, name, created_at)
- Product (id, name, price, stock)
- Order (id, user_id, status, total)
- OrderItem (id, order_id, product_id, quantity, price)
```
### Step 2: Define Relationships
**Relationship types:**
```
User ──1:N──▶ Order (one user, many orders)
Order ──1:N──▶ OrderItem (one order, many items)
Product ──1:N──▶ OrderItem (one product, many order items)
```
### Step 3: Choose Primary Keys
**Options:**
| Type | Pros | Cons |
|------|------|------|
| Auto-increment | Simple, ordered | Not distributed-friendly |
| UUID | Globally unique | Larger, random |
| ULID | Globally unique, sortable | Larger |
### Step 4: Add Indexes
**Index selection rules:**
```sql
-- Index columns used in WHERE clauses
CREATE INDEX idx_orders_user_id ON orders(user_id);
-- Index columns used in JOINs
CREATE INDEX idx_order_items_order_id ON order_items(order_id);
-- Index columns used in ORDER BY with WHERE
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Consider composite indexes for common queries
-- Query: SELECT * FROM orders WHERE user_id = ? AND status = 'active'
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
```
### Step 5: Plan for Scale
**Partitioning strategies:**
```sql
-- Partition by date (time-series data)
CREATE TABLE events (
id BIGINT,
created_at TIMESTAMP,
data JSONB
) PARTITION BY RANGE (created_at);
-- Partition by hash (distribute evenly)
CREATE TABLE users (
id BIGINT,
email VARCHAR(255)
) PARTITION BY HASH (id);
```
**Sharding considerations:**
- Shard key selection (user_id, tenant_id, etc.)
- Cross-shard query limitations
- Rebalancing strategy
---
## 5. Scalability Assessment Workflow
Use when evaluating if current architecture can handle growth.
### Step 1: Profile Current System
**Metrics to collect:**
```
Current load:
- Average requests/sec: ___
- Peak requests/sec: ___
- Average latency: ___ ms
- P99 latency: ___ ms
- Error rate: ___%
Resource utilization:
- CPU: ___%
- Memory: ___%
- Disk I/O: ___%
- Network: ___%
```
### Step 2: Identify Bottlenecks
**Check each layer:**
| Layer | Bottleneck Signs |
|-------|------------------|
| Web servers | High CPU, connection limits |
| Application | Slow requests, thread pool exhaustion |
| Database | Slow queries, lock contention |
| Cache | High miss rate, memory pressure |
| Network | Bandwidth saturation, latency |
### Step 3: Load Test
**Test scenarios:**
```
1. Baseline: Current production load
2. 2x load: Expected growth in 6 months
3. 5x load: Stress test
4. Spike: Sudden 10x for 5 minutes
```
**Tools:**
- k6, Locust, JMeter for HTTP
- pgbench for PostgreSQL
- redis-benchmark for Redis
### Step 4: Identify Scaling Strategy
**Vertical scaling (scale up):**
- Add more CPU, memory, disk
- Simpler but has limits
- Use when: Single server can handle more
**Horizontal scaling (scale out):**
- Add more servers
- Requires stateless design
- Use when: Need linear scaling
### Step 5: Create Scaling Plan
**Document:**
```
Trigger: When average CPU > 70% for 15 minutes
Action:
1. Add 2 more web servers
2. Update load balancer
3. Verify health checks pass
Rollback:
1. Remove added servers
2. Update load balancer
3. Investigate issue
```
---
## 6. Migration Planning Workflow
Use when migrating to new infrastructure, database, or architecture.
### Step 1: Assess Current State
**Document:**
- Current architecture diagram
- Data volumes
- Dependencies
- Integration points
- Performance baselines
### Step 2: Define Target State
**Document:**
- New architecture diagram
- Technology changes
- Expected improvements
- Success criteria
### Step 3: Plan Migration Strategy
**Strategies:**
| Strategy | Risk | Downtime | Complexity |
|----------|------|----------|------------|
| Big bang | High | Yes | Low |
| Blue-green | Medium | Minimal | Medium |
| Canary | Low | None | High |
| Strangler fig | Low | None | High |
**Strangler fig pattern (recommended for large systems):**
```
1. Add facade in front of old system
2. Route small percentage of traffic to new system
3. Gradually increase traffic to new system
4. Retire old system when 100% migrated
```
### Step 4: Create Rollback Plan
**For each step, define:**
```
Step: Migrate user service to new database
Rollback trigger:
- Error rate > 1%
- Latency > 500ms P99
- Data inconsistency detected
Rollback steps:
1. Route traffic back to old database
2. Sync any new data back
3. Investigate root cause
Rollback time estimate: 15 minutes
```
### Step 5: Execute with Checkpoints
**Migration checklist:**
```
□ Backup current system
□ Verify backup restoration works
□ Deploy new infrastructure
□ Run smoke tests on new system
□ Migrate small percentage (1%)
□ Monitor for 24 hours
□ Increase to 10%
□ Monitor for 24 hours
□ Increase to 50%
□ Monitor for 24 hours
□ Complete migration (100%)
□ Decommission old system
□ Document lessons learned
```
---
## Quick Reference
| Task | Start Here |
|------|------------|
| New system design | [System Design Interview Approach](#1-system-design-interview-approach) |
| Infrastructure sizing | [Capacity Planning](#2-capacity-planning-workflow) |
| New API | [API Design](#3-api-design-workflow) |
| Database design | [Database Schema Design](#4-database-schema-design-workflow) |
| Handle growth | [Scalability Assessment](#5-scalability-assessment-workflow) |
| System migration | [Migration Planning](#6-migration-planning-workflow) |
FILE:references/tech_decision_guide.md
# Technology Decision Guide
Decision frameworks and comparison matrices for common technology choices.
## Decision Frameworks Index
1. [Database Selection](#1-database-selection)
2. [Caching Strategy](#2-caching-strategy)
3. [Message Queue Selection](#3-message-queue-selection)
4. [Authentication Strategy](#4-authentication-strategy)
5. [Frontend Framework Selection](#5-frontend-framework-selection)
6. [Cloud Provider Selection](#6-cloud-provider-selection)
7. [API Style Selection](#7-api-style-selection)
---
## 1. Database Selection
### SQL vs NoSQL Decision Matrix
| Factor | Choose SQL | Choose NoSQL |
|--------|-----------|--------------|
| Data relationships | Complex, many-to-many | Simple, denormalized OK |
| Schema | Well-defined, stable | Evolving, flexible |
| Transactions | ACID required | Eventual consistency OK |
| Query patterns | Complex joins, aggregations | Key-value, document lookups |
| Scale | Vertical (some horizontal) | Horizontal first |
| Team expertise | Strong SQL skills | Document/KV experience |
### Database Type Selection
**Relational (SQL):**
| Database | Best For | Avoid When |
|----------|----------|------------|
| PostgreSQL | General purpose, JSON support, extensions | Simple key-value only |
| MySQL | Web applications, read-heavy | Complex queries, JSON-heavy |
| SQLite | Embedded, development, small apps | Concurrent writes, scale |
**Document (NoSQL):**
| Database | Best For | Avoid When |
|----------|----------|------------|
| MongoDB | Flexible schema, rapid iteration | Complex transactions |
| CouchDB | Offline-first, sync required | High throughput |
**Key-Value:**
| Database | Best For | Avoid When |
|----------|----------|------------|
| Redis | Caching, sessions, real-time | Persistence critical |
| DynamoDB | Serverless, auto-scaling | Complex queries |
**Wide-Column:**
| Database | Best For | Avoid When |
|----------|----------|------------|
| Cassandra | Write-heavy, time-series | Complex queries, small scale |
| ScyllaDB | Cassandra alternative, performance | Small datasets |
**Time-Series:**
| Database | Best For | Avoid When |
|----------|----------|------------|
| TimescaleDB | Time-series with SQL | Non-time-series data |
| InfluxDB | Metrics, monitoring | Relational queries |
**Search:**
| Database | Best For | Avoid When |
|----------|----------|------------|
| Elasticsearch | Full-text search, logs | Primary data store |
| Meilisearch | Simple search, fast setup | Complex analytics |
### Quick Decision Flow
```
Start
│
├─ Need ACID transactions? ──Yes──► PostgreSQL/MySQL
│
├─ Flexible schema needed? ──Yes──► MongoDB
│
├─ Write-heavy (>50K/sec)? ──Yes──► Cassandra/ScyllaDB
│
├─ Key-value access only? ──Yes──► Redis/DynamoDB
│
├─ Time-series data? ──Yes──► TimescaleDB/InfluxDB
│
├─ Full-text search? ──Yes──► Elasticsearch
│
└─ Default ──────────────────────► PostgreSQL
```
---
## 2. Caching Strategy
### Cache Type Selection
| Type | Use Case | Invalidation | Complexity |
|------|----------|--------------|------------|
| Read-through | Frequent reads, tolerance for stale | On write/TTL | Low |
| Write-through | Data consistency critical | Automatic | Medium |
| Write-behind | High write throughput | Async | High |
| Cache-aside | Fine-grained control | Application | Medium |
### Cache Technology Selection
| Technology | Best For | Limitations |
|------------|----------|-------------|
| Redis | General purpose, data structures | Memory cost |
| Memcached | Simple key-value, high throughput | No persistence |
| CDN (CloudFront, Fastly) | Static assets, edge caching | Dynamic content |
| Application cache | Per-instance, small data | Not distributed |
### Cache Patterns
**Cache-Aside (Lazy Loading):**
```
Read:
1. Check cache
2. If miss, read from DB
3. Store in cache
4. Return data
Write:
1. Write to DB
2. Invalidate cache
```
**Write-Through:**
```
Write:
1. Write to cache
2. Cache writes to DB
3. Return success
Read:
1. Read from cache (always hit)
```
**TTL Guidelines:**
| Data Type | Suggested TTL |
|-----------|---------------|
| User sessions | 24-48 hours |
| API responses | 1-5 minutes |
| Static content | 24 hours - 1 week |
| Database queries | 5-60 minutes |
| Feature flags | 1-5 minutes |
---
## 3. Message Queue Selection
### Queue Technology Comparison
| Feature | RabbitMQ | Kafka | SQS | Redis Streams |
|---------|----------|-------|-----|---------------|
| Throughput | Medium (10K/s) | Very High (100K+/s) | Medium | High |
| Ordering | Per-queue | Per-partition | FIFO optional | Per-stream |
| Durability | Configurable | Strong | Strong | Configurable |
| Replay | No | Yes | No | Yes |
| Complexity | Medium | High | Low | Low |
| Cost | Self-hosted | Self-hosted | Pay-per-use | Self-hosted |
### Decision Matrix
| Requirement | Recommendation |
|-------------|----------------|
| Simple task queue | SQS or Redis |
| Event streaming | Kafka |
| Complex routing | RabbitMQ |
| Log aggregation | Kafka |
| Serverless integration | SQS |
| Real-time analytics | Kafka |
| Request/reply pattern | RabbitMQ |
### When to Use Each
**RabbitMQ:**
- Complex routing logic (topic, fanout, headers)
- Request/reply patterns
- Priority queues
- Message acknowledgment critical
**Kafka:**
- Event sourcing
- High throughput requirements (>50K messages/sec)
- Message replay needed
- Stream processing
- Log aggregation
**SQS:**
- AWS-native applications
- Simple queue semantics
- Serverless architectures
- Don't want to manage infrastructure
**Redis Streams:**
- Already using Redis
- Moderate throughput
- Simple streaming needs
- Real-time features
---
## 4. Authentication Strategy
### Method Selection
| Method | Best For | Avoid When |
|--------|----------|------------|
| Session-based | Traditional web apps, server-rendered | Mobile apps, microservices |
| JWT | SPAs, mobile apps, microservices | Need immediate revocation |
| OAuth 2.0 | Third-party access, social login | Internal-only apps |
| API Keys | Server-to-server, simple auth | User authentication |
| mTLS | Service mesh, high security | Public APIs |
### JWT vs Sessions
| Factor | JWT | Sessions |
|--------|-----|----------|
| Scalability | Stateless, easy to scale | Requires session store |
| Revocation | Difficult (need blocklist) | Immediate |
| Payload | Can contain claims | Server-side only |
| Security | Token in client | Server-controlled |
| Mobile friendly | Yes | Requires cookies |
### OAuth 2.0 Flow Selection
| Flow | Use Case |
|------|----------|
| Authorization Code | Web apps with backend |
| Authorization Code + PKCE | SPAs, mobile apps |
| Client Credentials | Machine-to-machine |
| Device Code | Smart TVs, CLI tools |
**Avoid:** Implicit flow (deprecated), Resource Owner Password (legacy only)
### Token Lifetimes
| Token Type | Suggested Lifetime |
|------------|-------------------|
| Access token | 15-60 minutes |
| Refresh token | 7-30 days |
| API key | No expiry (rotate quarterly) |
| Session | 24 hours - 7 days |
---
## 5. Frontend Framework Selection
### Framework Comparison
| Factor | React | Vue | Angular | Svelte |
|--------|-------|-----|---------|--------|
| Learning curve | Medium | Low | High | Low |
| Ecosystem | Largest | Large | Complete | Growing |
| Performance | Good | Good | Good | Excellent |
| Bundle size | Medium | Small | Large | Smallest |
| TypeScript | Good | Good | Native | Good |
| Job market | Largest | Growing | Enterprise | Niche |
### Decision Matrix
| Requirement | Recommendation |
|-------------|----------------|
| Large team, enterprise | Angular |
| Startup, rapid iteration | React or Vue |
| Performance critical | Svelte or Solid |
| Existing React team | React |
| Progressive enhancement | Vue or Svelte |
| Component library needed | React (most options) |
### Meta-Framework Selection
| Framework | Best For |
|-----------|----------|
| Next.js (React) | Full-stack React, SSR/SSG |
| Nuxt (Vue) | Full-stack Vue, SSR/SSG |
| SvelteKit | Full-stack Svelte |
| Remix | Data-heavy React apps |
| Astro | Content sites, multi-framework |
### When to Use SSR vs SPA vs SSG
| Rendering | Use When |
|-----------|----------|
| SSR | SEO critical, dynamic content, auth-gated |
| SPA | Internal tools, highly interactive, no SEO |
| SSG | Content sites, blogs, documentation |
| ISR | Mix of static and dynamic |
---
## 6. Cloud Provider Selection
### Provider Comparison
| Factor | AWS | GCP | Azure |
|--------|-----|-----|-------|
| Market share | Largest | Growing | Enterprise strong |
| Service breadth | Most comprehensive | Strong ML/data | Best Microsoft integration |
| Pricing | Complex, volume discounts | Simpler, sustained use | EA discounts |
| Kubernetes | EKS | GKE (best managed) | AKS |
| Serverless | Lambda (mature) | Cloud Functions | Azure Functions |
| Database | RDS, DynamoDB | Cloud SQL, Spanner | SQL, Cosmos |
### Decision Factors
| If You Need | Consider |
|-------------|----------|
| Microsoft ecosystem | Azure |
| Best Kubernetes experience | GCP |
| Widest service selection | AWS |
| Machine learning focus | GCP or AWS |
| Government compliance | AWS GovCloud or Azure Gov |
| Startup credits | All offer programs |
### Multi-Cloud Considerations
**Go multi-cloud when:**
- Regulatory requirements mandate it
- Specific service (e.g., GCP BigQuery) is best-in-class
- Negotiating leverage with vendors
**Stay single-cloud when:**
- Team is small
- Want to minimize complexity
- Deep integration needed
### Service Mapping
| Need | AWS | GCP | Azure |
|------|-----|-----|-------|
| Compute | EC2 | Compute Engine | Virtual Machines |
| Containers | ECS, EKS | GKE, Cloud Run | AKS, Container Apps |
| Serverless | Lambda | Cloud Functions | Azure Functions |
| Object Storage | S3 | Cloud Storage | Blob Storage |
| SQL Database | RDS | Cloud SQL | Azure SQL |
| NoSQL | DynamoDB | Firestore | Cosmos DB |
| CDN | CloudFront | Cloud CDN | Azure CDN |
| DNS | Route 53 | Cloud DNS | Azure DNS |
---
## 7. API Style Selection
### REST vs GraphQL vs gRPC
| Factor | REST | GraphQL | gRPC |
|--------|------|---------|------|
| Use case | General purpose | Flexible queries | Microservices |
| Learning curve | Low | Medium | High |
| Over-fetching | Common | Solved | N/A |
| Caching | HTTP native | Complex | Custom |
| Browser support | Native | Native | Limited |
| Tooling | Mature | Growing | Strong |
| Performance | Good | Good | Excellent |
### Decision Matrix
| Requirement | Recommendation |
|-------------|----------------|
| Public API | REST |
| Mobile apps with varied needs | GraphQL |
| Microservices communication | gRPC |
| Real-time updates | GraphQL subscriptions or WebSocket |
| File uploads | REST |
| Internal services only | gRPC |
| Third-party developers | REST + OpenAPI |
### When to Choose Each
**Choose REST when:**
- Building public APIs
- Need HTTP caching
- Simple CRUD operations
- Team experienced with REST
**Choose GraphQL when:**
- Multiple clients with different data needs
- Rapid frontend iteration
- Complex, nested data relationships
- Want to reduce API calls
**Choose gRPC when:**
- Service-to-service communication
- Performance critical
- Streaming required
- Strong typing important
### API Versioning Strategies
| Strategy | Pros | Cons |
|----------|------|------|
| URL path (`/v1/`) | Clear, easy to implement | URL pollution |
| Query param (`?version=1`) | Flexible | Easy to miss |
| Header (`Accept-Version: 1`) | Clean URLs | Less discoverable |
| No versioning (evolve) | Simple | Breaking changes risky |
**Recommendation:** URL path versioning for public APIs, header versioning for internal.
---
## Quick Reference
| Decision | Default Choice | Alternative When |
|----------|----------------|------------------|
| Database | PostgreSQL | Scale/flexibility → MongoDB, DynamoDB |
| Cache | Redis | Simple needs → Memcached |
| Queue | SQS (AWS) / RabbitMQ | Event streaming → Kafka |
| Auth | JWT + Refresh | Traditional web → Sessions |
| Frontend | React + Next.js | Simplicity → Vue, Performance → Svelte |
| Cloud | AWS | Microsoft shop → Azure, ML-first → GCP |
| API | REST | Mobile flexibility → GraphQL, Internal → gRPC |
FILE:scripts/architecture_diagram_generator.py
#!/usr/bin/env python3
"""
Architecture Diagram Generator
Generates architecture diagrams from project structure in multiple formats:
- Mermaid (default)
- PlantUML
- ASCII
Supports diagram types:
- component: Shows modules and their relationships
- layer: Shows architectural layers
- deployment: Shows deployment topology
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict
class ProjectScanner:
"""Scans project structure to detect components and relationships."""
# Common architectural layer patterns
LAYER_PATTERNS = {
'presentation': ['controller', 'handler', 'view', 'page', 'component', 'ui'],
'api': ['api', 'route', 'endpoint', 'rest', 'graphql'],
'business': ['service', 'usecase', 'domain', 'logic', 'core'],
'data': ['repository', 'dao', 'model', 'entity', 'schema', 'migration'],
'infrastructure': ['config', 'util', 'helper', 'middleware', 'plugin'],
}
# File patterns for different technologies
TECH_PATTERNS = {
'react': ['jsx', 'tsx', 'package.json'],
'vue': ['vue', 'nuxt.config'],
'angular': ['component.ts', 'module.ts', 'angular.json'],
'node': ['package.json', 'express', 'fastify'],
'python': ['requirements.txt', 'pyproject.toml', 'setup.py'],
'go': ['go.mod', 'go.sum'],
'rust': ['Cargo.toml'],
'java': ['pom.xml', 'build.gradle'],
'docker': ['Dockerfile', 'docker-compose'],
'kubernetes': ['deployment.yaml', 'service.yaml', 'k8s'],
}
def __init__(self, project_path: Path):
self.project_path = project_path
self.components: Dict[str, Dict] = {}
self.relationships: List[Tuple[str, str, str]] = [] # (from, to, type)
self.layers: Dict[str, List[str]] = defaultdict(list)
self.technologies: Set[str] = set()
self.external_deps: Set[str] = set()
def scan(self) -> Dict:
"""Scan the project and return structure information."""
self._scan_directories()
self._detect_technologies()
self._detect_relationships()
self._classify_layers()
return {
'components': self.components,
'relationships': self.relationships,
'layers': dict(self.layers),
'technologies': list(self.technologies),
'external_deps': list(self.external_deps),
}
def _scan_directories(self):
"""Scan directory structure for components."""
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', '.nuxt', 'coverage', '.pytest_cache'}
for item in self.project_path.iterdir():
if item.is_dir() and item.name not in ignore_dirs and not item.name.startswith('.'):
component_info = self._analyze_directory(item)
if component_info['files'] > 0:
self.components[item.name] = component_info
def _analyze_directory(self, dir_path: Path) -> Dict:
"""Analyze a directory to understand its role."""
files = list(dir_path.rglob('*'))
code_files = [f for f in files if f.is_file() and f.suffix in
['.py', '.js', '.ts', '.jsx', '.tsx', '.go', '.rs', '.java', '.vue']]
# Count imports/dependencies within the directory
imports = set()
for f in code_files[:50]: # Limit to avoid large projects
imports.update(self._extract_imports(f))
return {
'path': str(dir_path.relative_to(self.project_path)),
'files': len(code_files),
'imports': list(imports)[:20], # Top 20 imports
'type': self._guess_component_type(dir_path.name),
}
def _extract_imports(self, file_path: Path) -> Set[str]:
"""Extract import statements from a file."""
imports = set()
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
# Python imports
py_imports = re.findall(r'^(?:from|import)\s+([\w.]+)', content, re.MULTILINE)
imports.update(py_imports)
# JS/TS imports
js_imports = re.findall(r'(?:import|require)\s*\(?[\'"]([^\'"\s]+)[\'"]', content)
imports.update(js_imports)
# Go imports
go_imports = re.findall(r'import\s+(?:\(\s*)?["\']([^"\']+)["\']', content)
imports.update(go_imports)
except Exception:
pass
return imports
def _guess_component_type(self, name: str) -> str:
"""Guess component type from directory name."""
name_lower = name.lower()
for layer, patterns in self.LAYER_PATTERNS.items():
for pattern in patterns:
if pattern in name_lower:
return layer
return 'unknown'
def _detect_technologies(self):
"""Detect technologies used in the project."""
for tech, patterns in self.TECH_PATTERNS.items():
for pattern in patterns:
matches = list(self.project_path.rglob(f'*{pattern}*'))
if matches:
self.technologies.add(tech)
break
# Detect external dependencies from package files
self._parse_package_json()
self._parse_requirements_txt()
self._parse_go_mod()
def _parse_package_json(self):
"""Parse package.json for dependencies."""
pkg_path = self.project_path / 'package.json'
if pkg_path.exists():
try:
data = json.loads(pkg_path.read_text())
deps = list(data.get('dependencies', {}).keys())[:10]
self.external_deps.update(deps)
except Exception:
pass
def _parse_requirements_txt(self):
"""Parse requirements.txt for dependencies."""
req_path = self.project_path / 'requirements.txt'
if req_path.exists():
try:
content = req_path.read_text()
deps = re.findall(r'^([a-zA-Z0-9_-]+)', content, re.MULTILINE)[:10]
self.external_deps.update(deps)
except Exception:
pass
def _parse_go_mod(self):
"""Parse go.mod for dependencies."""
mod_path = self.project_path / 'go.mod'
if mod_path.exists():
try:
content = mod_path.read_text()
deps = re.findall(r'^\s+([^\s]+)\s+v', content, re.MULTILINE)[:10]
self.external_deps.update([d.split('/')[-1] for d in deps])
except Exception:
pass
def _detect_relationships(self):
"""Detect relationships between components."""
component_names = set(self.components.keys())
for comp_name, comp_info in self.components.items():
for imp in comp_info.get('imports', []):
# Check if import references another component
for other_comp in component_names:
if other_comp != comp_name and other_comp.lower() in imp.lower():
self.relationships.append((comp_name, other_comp, 'uses'))
def _classify_layers(self):
"""Classify components into architectural layers."""
for comp_name, comp_info in self.components.items():
layer = comp_info.get('type', 'unknown')
if layer != 'unknown':
self.layers[layer].append(comp_name)
else:
self.layers['other'].append(comp_name)
class DiagramGenerator:
"""Base class for diagram generators."""
def __init__(self, scan_result: Dict):
self.components = scan_result['components']
self.relationships = scan_result['relationships']
self.layers = scan_result['layers']
self.technologies = scan_result['technologies']
self.external_deps = scan_result['external_deps']
def generate(self, diagram_type: str) -> str:
"""Generate diagram based on type."""
if diagram_type == 'component':
return self._generate_component_diagram()
elif diagram_type == 'layer':
return self._generate_layer_diagram()
elif diagram_type == 'deployment':
return self._generate_deployment_diagram()
else:
return self._generate_component_diagram()
def _generate_component_diagram(self) -> str:
raise NotImplementedError
def _generate_layer_diagram(self) -> str:
raise NotImplementedError
def _generate_deployment_diagram(self) -> str:
raise NotImplementedError
class MermaidGenerator(DiagramGenerator):
"""Generate Mermaid diagrams."""
def _generate_component_diagram(self) -> str:
lines = ['graph TD']
# Add components
for name, info in self.components.items():
safe_name = self._safe_id(name)
file_count = info.get('files', 0)
lines.append(f' {safe_name}["{name}<br/>{file_count} files"]')
# Add relationships
seen = set()
for src, dst, rel_type in self.relationships:
key = (src, dst)
if key not in seen:
seen.add(key)
lines.append(f' {self._safe_id(src)} --> {self._safe_id(dst)}')
# Add external dependencies if any
if self.external_deps:
lines.append('')
lines.append(' subgraph External')
for dep in list(self.external_deps)[:5]:
safe_dep = self._safe_id(dep)
lines.append(f' {safe_dep}(("{dep}"))')
lines.append(' end')
return '\n'.join(lines)
def _generate_layer_diagram(self) -> str:
lines = ['graph TB']
layer_order = ['presentation', 'api', 'business', 'data', 'infrastructure', 'other']
for layer in layer_order:
components = self.layers.get(layer, [])
if components:
lines.append(f' subgraph {layer.title()} Layer')
for comp in components:
safe_comp = self._safe_id(comp)
lines.append(f' {safe_comp}["{comp}"]')
lines.append(' end')
lines.append('')
# Add layer relationships (top-down)
prev_layer = None
for layer in layer_order:
if self.layers.get(layer):
if prev_layer and self.layers.get(prev_layer):
first_prev = self._safe_id(self.layers[prev_layer][0])
first_curr = self._safe_id(self.layers[layer][0])
lines.append(f' {first_prev} -.-> {first_curr}')
prev_layer = layer
return '\n'.join(lines)
def _generate_deployment_diagram(self) -> str:
lines = ['graph LR']
# Client
lines.append(' subgraph Client')
lines.append(' browser["Browser/Mobile"]')
lines.append(' end')
lines.append('')
# Determine if we have typical deployment components
has_api = any('api' in t for t in self.technologies)
has_docker = 'docker' in self.technologies
has_k8s = 'kubernetes' in self.technologies
# Application tier
lines.append(' subgraph Application')
if has_k8s:
lines.append(' k8s["Kubernetes Cluster"]')
elif has_docker:
lines.append(' docker["Docker Container"]')
else:
lines.append(' app["Application Server"]')
lines.append(' end')
lines.append('')
# Data tier
lines.append(' subgraph Data')
lines.append(' db[("Database")]')
if self.external_deps:
lines.append(' cache[("Cache")]')
lines.append(' end')
lines.append('')
# Connections
if has_k8s:
lines.append(' browser --> k8s')
lines.append(' k8s --> db')
elif has_docker:
lines.append(' browser --> docker')
lines.append(' docker --> db')
else:
lines.append(' browser --> app')
lines.append(' app --> db')
return '\n'.join(lines)
def _safe_id(self, name: str) -> str:
"""Convert name to safe Mermaid ID."""
return re.sub(r'[^a-zA-Z0-9]', '_', name)
class PlantUMLGenerator(DiagramGenerator):
"""Generate PlantUML diagrams."""
def _generate_component_diagram(self) -> str:
lines = ['@startuml', 'skinparam componentStyle rectangle', '']
# Add components
for name, info in self.components.items():
file_count = info.get('files', 0)
lines.append(f'component "{name}\\n({file_count} files)" as {self._safe_id(name)}')
lines.append('')
# Add relationships
seen = set()
for src, dst, rel_type in self.relationships:
key = (src, dst)
if key not in seen:
seen.add(key)
lines.append(f'{self._safe_id(src)} --> {self._safe_id(dst)}')
# External dependencies
if self.external_deps:
lines.append('')
lines.append('package "External Dependencies" {')
for dep in list(self.external_deps)[:5]:
lines.append(f' [{dep}]')
lines.append('}')
lines.append('')
lines.append('@enduml')
return '\n'.join(lines)
def _generate_layer_diagram(self) -> str:
lines = ['@startuml', 'skinparam packageStyle rectangle', '']
layer_order = ['presentation', 'api', 'business', 'data', 'infrastructure', 'other']
for layer in layer_order:
components = self.layers.get(layer, [])
if components:
lines.append(f'package "{layer.title()} Layer" {{')
for comp in components:
lines.append(f' [{comp}]')
lines.append('}')
lines.append('')
lines.append('@enduml')
return '\n'.join(lines)
def _generate_deployment_diagram(self) -> str:
lines = ['@startuml', '']
lines.append('node "Client" {')
lines.append(' [Browser/Mobile] as browser')
lines.append('}')
lines.append('')
has_docker = 'docker' in self.technologies
has_k8s = 'kubernetes' in self.technologies
lines.append('node "Application Server" {')
if has_k8s:
lines.append(' [Kubernetes Cluster] as app')
elif has_docker:
lines.append(' [Docker Container] as app')
else:
lines.append(' [Application] as app')
lines.append('}')
lines.append('')
lines.append('database "Data Store" {')
lines.append(' [Database] as db')
lines.append('}')
lines.append('')
lines.append('browser --> app')
lines.append('app --> db')
lines.append('')
lines.append('@enduml')
return '\n'.join(lines)
def _safe_id(self, name: str) -> str:
"""Convert name to safe PlantUML ID."""
return re.sub(r'[^a-zA-Z0-9]', '_', name)
class ASCIIGenerator(DiagramGenerator):
"""Generate ASCII diagrams."""
def _generate_component_diagram(self) -> str:
lines = []
lines.append('=' * 60)
lines.append('COMPONENT DIAGRAM')
lines.append('=' * 60)
lines.append('')
# Components
lines.append('Components:')
lines.append('-' * 40)
for name, info in self.components.items():
file_count = info.get('files', 0)
comp_type = info.get('type', 'unknown')
lines.append(f' [{name}]')
lines.append(f' Files: {file_count}')
lines.append(f' Type: {comp_type}')
lines.append('')
# Relationships
if self.relationships:
lines.append('Relationships:')
lines.append('-' * 40)
seen = set()
for src, dst, rel_type in self.relationships:
key = (src, dst)
if key not in seen:
seen.add(key)
lines.append(f' {src} --> {dst}')
lines.append('')
# External dependencies
if self.external_deps:
lines.append('External Dependencies:')
lines.append('-' * 40)
for dep in list(self.external_deps)[:10]:
lines.append(f' - {dep}')
lines.append('')
lines.append('=' * 60)
return '\n'.join(lines)
def _generate_layer_diagram(self) -> str:
lines = []
lines.append('=' * 60)
lines.append('LAYERED ARCHITECTURE')
lines.append('=' * 60)
lines.append('')
layer_order = ['presentation', 'api', 'business', 'data', 'infrastructure', 'other']
for layer in layer_order:
components = self.layers.get(layer, [])
if components:
lines.append(f'+{"-" * 56}+')
lines.append(f'| {layer.upper():^54} |')
lines.append(f'+{"-" * 56}+')
for comp in components:
lines.append(f'| [{comp:^48}] |')
lines.append(f'+{"-" * 56}+')
lines.append(' |')
lines.append(' v')
# Remove last arrow
if lines[-2:] == [' |', ' v']:
lines = lines[:-2]
lines.append('')
lines.append('=' * 60)
return '\n'.join(lines)
def _generate_deployment_diagram(self) -> str:
lines = []
lines.append('=' * 60)
lines.append('DEPLOYMENT DIAGRAM')
lines.append('=' * 60)
lines.append('')
has_docker = 'docker' in self.technologies
has_k8s = 'kubernetes' in self.technologies
# Client tier
lines.append('+----------------------+')
lines.append('| CLIENT |')
lines.append('| [Browser/Mobile] |')
lines.append('+----------+-----------+')
lines.append(' |')
lines.append(' v')
# Application tier
lines.append('+----------------------+')
lines.append('| APPLICATION |')
if has_k8s:
lines.append('| [Kubernetes Cluster] |')
elif has_docker:
lines.append('| [Docker Container] |')
else:
lines.append('| [App Server] |')
lines.append('+----------+-----------+')
lines.append(' |')
lines.append(' v')
# Data tier
lines.append('+----------------------+')
lines.append('| DATA |')
lines.append('| [(Database)] |')
lines.append('+----------------------+')
lines.append('')
# Technologies detected
if self.technologies:
lines.append('Technologies detected:')
lines.append('-' * 40)
for tech in sorted(self.technologies):
lines.append(f' - {tech}')
lines.append('')
lines.append('=' * 60)
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(
description='Generate architecture diagrams from project structure',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s ./my-project --format mermaid
%(prog)s ./my-project --format plantuml --type layer
%(prog)s ./my-project --format ascii -o architecture.txt
Diagram types:
component - Shows modules and their relationships (default)
layer - Shows architectural layers
deployment - Shows deployment topology
Output formats:
mermaid - Mermaid.js format (default)
plantuml - PlantUML format
ascii - ASCII art format
'''
)
parser.add_argument(
'project_path',
help='Path to the project directory'
)
parser.add_argument(
'--format', '-f',
choices=['mermaid', 'plantuml', 'ascii'],
default='mermaid',
help='Output format (default: mermaid)'
)
parser.add_argument(
'--type', '-t',
choices=['component', 'layer', 'deployment'],
default='component',
help='Diagram type (default: component)'
)
parser.add_argument(
'--output', '-o',
help='Output file path (prints to stdout if not specified)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output raw scan results as JSON'
)
args = parser.parse_args()
project_path = Path(args.project_path).resolve()
if not project_path.exists():
print(f"Error: Project path does not exist: {project_path}", file=sys.stderr)
sys.exit(1)
if not project_path.is_dir():
print(f"Error: Project path is not a directory: {project_path}", file=sys.stderr)
sys.exit(1)
if args.verbose:
print(f"Scanning project: {project_path}")
# Scan project
scanner = ProjectScanner(project_path)
scan_result = scanner.scan()
if args.verbose:
print(f"Found {len(scan_result['components'])} components")
print(f"Found {len(scan_result['relationships'])} relationships")
print(f"Technologies: {', '.join(scan_result['technologies']) or 'none detected'}")
# Output raw JSON if requested
if args.json:
output = json.dumps(scan_result, indent=2)
if args.output:
Path(args.output).write_text(output)
print(f"Results written to {args.output}")
else:
print(output)
return
# Generate diagram
generators = {
'mermaid': MermaidGenerator,
'plantuml': PlantUMLGenerator,
'ascii': ASCIIGenerator,
}
generator = generators[args.format](scan_result)
diagram = generator.generate(args.type)
# Output
if args.output:
Path(args.output).write_text(diagram)
print(f"Diagram written to {args.output}")
else:
print(diagram)
if __name__ == '__main__':
main()
FILE:scripts/dependency_analyzer.py
#!/usr/bin/env python3
"""
Dependency Analyzer
Analyzes project dependencies for:
- Dependency tree (direct and transitive)
- Circular dependencies between modules
- Coupling score (0-100)
- Outdated packages (basic detection)
Supports:
- npm/yarn (package.json)
- Python (requirements.txt, pyproject.toml)
- Go (go.mod)
- Rust (Cargo.toml)
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict
class DependencyAnalyzer:
"""Analyzes project dependencies and module coupling."""
def __init__(self, project_path: Path, verbose: bool = False):
self.project_path = project_path
self.verbose = verbose
# Results
self.direct_deps: Dict[str, str] = {} # name -> version
self.dev_deps: Dict[str, str] = {}
self.internal_modules: Dict[str, Set[str]] = defaultdict(set) # module -> imports
self.circular_deps: List[List[str]] = []
self.coupling_score: float = 0
self.issues: List[Dict] = []
self.recommendations: List[str] = []
self.package_manager: Optional[str] = None
def analyze(self) -> Dict:
"""Run full dependency analysis."""
self._detect_package_manager()
self._parse_dependencies()
self._scan_internal_modules()
self._detect_circular_dependencies()
self._calculate_coupling_score()
self._generate_recommendations()
return self._build_report()
def _detect_package_manager(self):
"""Detect which package manager is used."""
if (self.project_path / 'package.json').exists():
self.package_manager = 'npm'
elif (self.project_path / 'requirements.txt').exists():
self.package_manager = 'pip'
elif (self.project_path / 'pyproject.toml').exists():
self.package_manager = 'poetry'
elif (self.project_path / 'go.mod').exists():
self.package_manager = 'go'
elif (self.project_path / 'Cargo.toml').exists():
self.package_manager = 'cargo'
else:
self.package_manager = 'unknown'
if self.verbose:
print(f"Detected package manager: {self.package_manager}")
def _parse_dependencies(self):
"""Parse dependencies based on detected package manager."""
parsers = {
'npm': self._parse_npm,
'pip': self._parse_pip,
'poetry': self._parse_poetry,
'go': self._parse_go,
'cargo': self._parse_cargo,
}
parser = parsers.get(self.package_manager)
if parser:
parser()
def _parse_npm(self):
"""Parse package.json for npm dependencies."""
pkg_path = self.project_path / 'package.json'
try:
data = json.loads(pkg_path.read_text())
# Direct dependencies
for name, version in data.get('dependencies', {}).items():
self.direct_deps[name] = self._clean_version(version)
# Dev dependencies
for name, version in data.get('devDependencies', {}).items():
self.dev_deps[name] = self._clean_version(version)
if self.verbose:
print(f"Found {len(self.direct_deps)} direct deps, "
f"{len(self.dev_deps)} dev deps")
except Exception as e:
self.issues.append({
'type': 'parse_error',
'severity': 'error',
'message': f"Failed to parse package.json: {e}"
})
def _parse_pip(self):
"""Parse requirements.txt for Python dependencies."""
req_path = self.project_path / 'requirements.txt'
try:
content = req_path.read_text()
for line in content.strip().split('\n'):
line = line.strip()
if not line or line.startswith('#') or line.startswith('-'):
continue
# Parse name and version
match = re.match(r'^([a-zA-Z0-9_-]+)(?:[=<>!~]+(.+))?', line)
if match:
name = match.group(1)
version = match.group(2) or 'any'
self.direct_deps[name] = version
if self.verbose:
print(f"Found {len(self.direct_deps)} dependencies")
except Exception as e:
self.issues.append({
'type': 'parse_error',
'severity': 'error',
'message': f"Failed to parse requirements.txt: {e}"
})
def _parse_poetry(self):
"""Parse pyproject.toml for Poetry dependencies."""
toml_path = self.project_path / 'pyproject.toml'
try:
content = toml_path.read_text()
# Simple TOML parsing for dependencies section
in_deps = False
in_dev_deps = False
for line in content.split('\n'):
line = line.strip()
if line == '[tool.poetry.dependencies]':
in_deps = True
in_dev_deps = False
continue
elif line == '[tool.poetry.dev-dependencies]' or \
line == '[tool.poetry.group.dev.dependencies]':
in_deps = False
in_dev_deps = True
continue
elif line.startswith('['):
in_deps = False
in_dev_deps = False
continue
if (in_deps or in_dev_deps) and '=' in line:
match = re.match(r'^([a-zA-Z0-9_-]+)\s*=\s*["\']?([^"\']+)', line)
if match:
name = match.group(1)
version = match.group(2)
if name != 'python':
if in_deps:
self.direct_deps[name] = version
else:
self.dev_deps[name] = version
if self.verbose:
print(f"Found {len(self.direct_deps)} direct deps, "
f"{len(self.dev_deps)} dev deps")
except Exception as e:
self.issues.append({
'type': 'parse_error',
'severity': 'error',
'message': f"Failed to parse pyproject.toml: {e}"
})
def _parse_go(self):
"""Parse go.mod for Go dependencies."""
mod_path = self.project_path / 'go.mod'
try:
content = mod_path.read_text()
# Find require block
in_require = False
for line in content.split('\n'):
line = line.strip()
if line.startswith('require ('):
in_require = True
continue
elif line == ')' and in_require:
in_require = False
continue
elif line.startswith('require ') and '(' not in line:
# Single-line require
match = re.match(r'require\s+([^\s]+)\s+([^\s]+)', line)
if match:
self.direct_deps[match.group(1)] = match.group(2)
continue
if in_require:
match = re.match(r'([^\s]+)\s+([^\s]+)', line)
if match:
self.direct_deps[match.group(1)] = match.group(2)
if self.verbose:
print(f"Found {len(self.direct_deps)} dependencies")
except Exception as e:
self.issues.append({
'type': 'parse_error',
'severity': 'error',
'message': f"Failed to parse go.mod: {e}"
})
def _parse_cargo(self):
"""Parse Cargo.toml for Rust dependencies."""
cargo_path = self.project_path / 'Cargo.toml'
try:
content = cargo_path.read_text()
in_deps = False
in_dev_deps = False
for line in content.split('\n'):
line = line.strip()
if line == '[dependencies]':
in_deps = True
in_dev_deps = False
continue
elif line == '[dev-dependencies]':
in_deps = False
in_dev_deps = True
continue
elif line.startswith('['):
in_deps = False
in_dev_deps = False
continue
if (in_deps or in_dev_deps) and '=' in line:
match = re.match(r'^([a-zA-Z0-9_-]+)\s*=\s*["\']?([^"\']+)', line)
if match:
name = match.group(1)
version = match.group(2)
if in_deps:
self.direct_deps[name] = version
else:
self.dev_deps[name] = version
if self.verbose:
print(f"Found {len(self.direct_deps)} direct deps, "
f"{len(self.dev_deps)} dev deps")
except Exception as e:
self.issues.append({
'type': 'parse_error',
'severity': 'error',
'message': f"Failed to parse Cargo.toml: {e}"
})
def _clean_version(self, version: str) -> str:
"""Clean version string."""
return version.lstrip('^~>=<!')
def _scan_internal_modules(self):
"""Scan internal module imports for coupling analysis."""
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', 'coverage'}
# Find all code files
extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.go', '.rs']
for ext in extensions:
for file_path in self.project_path.rglob(f'*{ext}'):
# Skip ignored directories
if any(ignored in file_path.parts for ignored in ignore_dirs):
continue
# Get module name (directory relative to project root)
try:
rel_path = file_path.relative_to(self.project_path)
module = rel_path.parts[0] if len(rel_path.parts) > 1 else 'root'
# Extract imports
imports = self._extract_imports(file_path)
self.internal_modules[module].update(imports)
except Exception:
continue
if self.verbose:
print(f"Scanned {len(self.internal_modules)} internal modules")
def _extract_imports(self, file_path: Path) -> Set[str]:
"""Extract import statements from a file."""
imports = set()
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
# Python imports
for match in re.finditer(r'^(?:from|import)\s+([\w.]+)', content, re.MULTILINE):
imports.add(match.group(1).split('.')[0])
# JS/TS imports
for match in re.finditer(r'(?:import|require)\s*\(?[\'"]([^\'"\s]+)[\'"]', content):
imp = match.group(1)
if imp.startswith('.') or imp.startswith('@/') or imp.startswith('~/'):
# Relative import - extract first path component
parts = imp.lstrip('./~@').split('/')
if parts:
imports.add(parts[0])
except Exception:
pass
return imports
def _detect_circular_dependencies(self):
"""Detect circular dependencies between internal modules."""
# Build dependency graph
graph = defaultdict(set)
modules = set(self.internal_modules.keys())
for module, imports in self.internal_modules.items():
for imp in imports:
# Check if import is an internal module
for internal_module in modules:
if internal_module.lower() in imp.lower() and internal_module != module:
graph[module].add(internal_module)
# Find cycles using DFS
visited = set()
rec_stack = set()
cycles = []
def find_cycles(node: str, path: List[str]):
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
find_cycles(neighbor, path)
elif neighbor in rec_stack:
# Found cycle
cycle_start = path.index(neighbor)
cycle = path[cycle_start:] + [neighbor]
if cycle not in cycles:
cycles.append(cycle)
path.pop()
rec_stack.remove(node)
for module in modules:
if module not in visited:
find_cycles(module, [])
self.circular_deps = cycles
if cycles:
for cycle in cycles:
self.issues.append({
'type': 'circular_dependency',
'severity': 'warning',
'message': f"Circular dependency: {' -> '.join(cycle)}"
})
if self.verbose:
print(f"Found {len(self.circular_deps)} circular dependencies")
def _calculate_coupling_score(self):
"""Calculate coupling score (0-100, lower is better)."""
if not self.internal_modules:
self.coupling_score = 0
return
# Count connections between modules
total_modules = len(self.internal_modules)
total_connections = 0
modules = set(self.internal_modules.keys())
for module, imports in self.internal_modules.items():
for imp in imports:
for internal_module in modules:
if internal_module.lower() in imp.lower() and internal_module != module:
total_connections += 1
# Max possible connections (complete graph)
max_connections = total_modules * (total_modules - 1) if total_modules > 1 else 1
# Coupling score as percentage of max connections
self.coupling_score = min(100, int((total_connections / max_connections) * 100))
# Add penalty for circular dependencies
self.coupling_score = min(100, self.coupling_score + len(self.circular_deps) * 10)
if self.verbose:
print(f"Coupling score: {self.coupling_score}/100")
def _generate_recommendations(self):
"""Generate actionable recommendations."""
# Circular dependency recommendations
if self.circular_deps:
self.recommendations.append(
"Extract shared interfaces or create a common module to break circular dependencies"
)
# High coupling recommendations
if self.coupling_score > 70:
self.recommendations.append(
"High coupling detected. Consider applying SOLID principles and "
"introducing abstraction layers"
)
# Too many dependencies
if len(self.direct_deps) > 50:
self.recommendations.append(
f"Large dependency count ({len(self.direct_deps)}). "
"Review for unused dependencies and consider bundle size impact"
)
# Check for known problematic packages (simplified check)
problematic = {
'lodash': 'Consider lodash-es or native methods for smaller bundle',
'moment': 'Consider day.js or date-fns for smaller bundle',
'request': 'Deprecated. Use axios, node-fetch, or native fetch',
}
for pkg, suggestion in problematic.items():
if pkg in self.direct_deps:
self.recommendations.append(f"{pkg}: {suggestion}")
def _build_report(self) -> Dict:
"""Build the analysis report."""
return {
'project_path': str(self.project_path),
'package_manager': self.package_manager,
'summary': {
'direct_dependencies': len(self.direct_deps),
'dev_dependencies': len(self.dev_deps),
'internal_modules': len(self.internal_modules),
'coupling_score': self.coupling_score,
'circular_dependencies': len(self.circular_deps),
'issues': len(self.issues),
},
'dependencies': {
'direct': self.direct_deps,
'dev': self.dev_deps,
},
'internal_modules': {k: list(v) for k, v in self.internal_modules.items()},
'circular_dependencies': self.circular_deps,
'issues': self.issues,
'recommendations': self.recommendations,
}
def print_human_report(report: Dict):
"""Print human-readable report."""
print("\n" + "=" * 60)
print("DEPENDENCY ANALYSIS REPORT")
print("=" * 60)
print(f"\nProject: {report['project_path']}")
print(f"Package Manager: {report['package_manager']}")
summary = report['summary']
print("\n--- Summary ---")
print(f"Direct dependencies: {summary['direct_dependencies']}")
print(f"Dev dependencies: {summary['dev_dependencies']}")
print(f"Internal modules: {summary['internal_modules']}")
print(f"Coupling score: {summary['coupling_score']}/100 ", end='')
if summary['coupling_score'] < 30:
print("(low - good)")
elif summary['coupling_score'] < 70:
print("(moderate)")
else:
print("(high - consider refactoring)")
if report['circular_dependencies']:
print(f"\n--- Circular Dependencies ({len(report['circular_dependencies'])}) ---")
for cycle in report['circular_dependencies']:
print(f" {' -> '.join(cycle)}")
if report['issues']:
print(f"\n--- Issues ({len(report['issues'])}) ---")
for issue in report['issues']:
severity = issue['severity'].upper()
print(f" [{severity}] {issue['message']}")
if report['recommendations']:
print(f"\n--- Recommendations ---")
for i, rec in enumerate(report['recommendations'], 1):
print(f" {i}. {rec}")
# Show top dependencies
deps = report['dependencies']['direct']
if deps:
print(f"\n--- Top Dependencies (of {len(deps)}) ---")
for name, version in list(deps.items())[:10]:
print(f" {name}: {version}")
if len(deps) > 10:
print(f" ... and {len(deps) - 10} more")
print("\n" + "=" * 60)
def main():
parser = argparse.ArgumentParser(
description='Analyze project dependencies and module coupling',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s ./my-project
%(prog)s ./my-project --output json
%(prog)s ./my-project --check circular
%(prog)s ./my-project --verbose
Supported package managers:
- npm/yarn (package.json)
- pip (requirements.txt)
- poetry (pyproject.toml)
- go (go.mod)
- cargo (Cargo.toml)
'''
)
parser.add_argument(
'project_path',
help='Path to the project directory'
)
parser.add_argument(
'--output', '-o',
choices=['human', 'json'],
default='human',
help='Output format (default: human)'
)
parser.add_argument(
'--check',
choices=['all', 'circular', 'coupling'],
default='all',
help='What to check (default: all)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--save', '-s',
help='Save report to file'
)
args = parser.parse_args()
project_path = Path(args.project_path).resolve()
if not project_path.exists():
print(f"Error: Project path does not exist: {project_path}", file=sys.stderr)
sys.exit(1)
if not project_path.is_dir():
print(f"Error: Project path is not a directory: {project_path}", file=sys.stderr)
sys.exit(1)
# Run analysis
analyzer = DependencyAnalyzer(project_path, verbose=args.verbose)
report = analyzer.analyze()
# Filter report based on --check option
if args.check == 'circular':
if report['circular_dependencies']:
print("Circular dependencies found:")
for cycle in report['circular_dependencies']:
print(f" {' -> '.join(cycle)}")
sys.exit(1)
else:
print("No circular dependencies found.")
sys.exit(0)
elif args.check == 'coupling':
score = report['summary']['coupling_score']
print(f"Coupling score: {score}/100")
if score > 70:
print("WARNING: High coupling detected")
sys.exit(1)
sys.exit(0)
# Output report
if args.output == 'json':
output = json.dumps(report, indent=2)
if args.save:
Path(args.save).write_text(output)
print(f"Report saved to {args.save}")
else:
print(output)
else:
print_human_report(report)
if args.save:
Path(args.save).write_text(json.dumps(report, indent=2))
print(f"\nJSON report saved to {args.save}")
if __name__ == '__main__':
main()
FILE:scripts/project_architect.py
#!/usr/bin/env python3
"""
Project Architect
Analyzes project structure and detects:
- Architectural patterns (MVC, layered, hexagonal, microservices)
- Code organization issues (god classes, mixed concerns)
- Layer violations
- Missing architectural components
Provides architecture assessment and improvement recommendations.
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from typing import Dict, List, Set, Tuple, Optional
from collections import defaultdict
class PatternDetector:
"""Detects architectural patterns in a project."""
# Pattern signatures
PATTERNS = {
'layered': {
'indicators': ['controller', 'service', 'repository', 'dao', 'model', 'entity'],
'structure': ['controllers', 'services', 'repositories', 'models'],
'weight': 0,
},
'mvc': {
'indicators': ['model', 'view', 'controller'],
'structure': ['models', 'views', 'controllers'],
'weight': 0,
},
'hexagonal': {
'indicators': ['port', 'adapter', 'domain', 'infrastructure', 'application'],
'structure': ['ports', 'adapters', 'domain', 'infrastructure'],
'weight': 0,
},
'clean': {
'indicators': ['entity', 'usecase', 'interface', 'framework', 'adapter'],
'structure': ['entities', 'usecases', 'interfaces', 'frameworks'],
'weight': 0,
},
'microservices': {
'indicators': ['service', 'api', 'gateway', 'docker', 'kubernetes'],
'structure': ['services', 'api-gateway', 'docker-compose'],
'weight': 0,
},
'modular_monolith': {
'indicators': ['module', 'feature', 'bounded'],
'structure': ['modules', 'features'],
'weight': 0,
},
'feature_based': {
'indicators': ['feature', 'component', 'page'],
'structure': ['features', 'components', 'pages'],
'weight': 0,
},
}
# Layer definitions for violation detection
LAYER_HIERARCHY = {
'presentation': ['controller', 'handler', 'view', 'page', 'component', 'ui', 'route'],
'application': ['service', 'usecase', 'application', 'facade'],
'domain': ['domain', 'entity', 'model', 'aggregate', 'valueobject'],
'infrastructure': ['repository', 'dao', 'adapter', 'gateway', 'client', 'config'],
}
LAYER_ORDER = ['presentation', 'application', 'domain', 'infrastructure']
def __init__(self, project_path: Path):
self.project_path = project_path
self.directories: Set[str] = set()
self.files: Dict[str, List[str]] = defaultdict(list) # dir -> files
self.detected_pattern: Optional[str] = None
self.confidence: float = 0
self.layer_assignments: Dict[str, str] = {} # dir -> layer
def scan(self) -> Dict:
"""Scan project and detect patterns."""
self._scan_structure()
self._detect_pattern()
self._assign_layers()
return {
'detected_pattern': self.detected_pattern,
'confidence': self.confidence,
'directories': list(self.directories),
'layer_assignments': self.layer_assignments,
'pattern_scores': {p: d['weight'] for p, d in self.PATTERNS.items()},
}
def _scan_structure(self):
"""Scan directory structure."""
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', 'coverage', '.pytest_cache'}
for item in self.project_path.iterdir():
if item.is_dir() and item.name not in ignore_dirs and not item.name.startswith('.'):
self.directories.add(item.name.lower())
# Scan files in directory
try:
for f in item.rglob('*'):
if f.is_file():
self.files[item.name.lower()].append(f.name.lower())
except PermissionError:
pass
def _detect_pattern(self):
"""Detect the primary architectural pattern."""
for pattern, config in self.PATTERNS.items():
score = 0
# Check directory structure
for struct in config['structure']:
if struct.lower() in self.directories:
score += 2
# Check indicator presence in directory names
for indicator in config['indicators']:
for dir_name in self.directories:
if indicator in dir_name:
score += 1
# Check file patterns
all_files = [f for files in self.files.values() for f in files]
for indicator in config['indicators']:
matching_files = sum(1 for f in all_files if indicator in f)
score += min(matching_files // 5, 3) # Cap contribution
config['weight'] = score
# Find best match
best_pattern = max(self.PATTERNS.items(), key=lambda x: x[1]['weight'])
if best_pattern[1]['weight'] > 3:
self.detected_pattern = best_pattern[0]
max_possible = len(best_pattern[1]['structure']) * 2 + len(best_pattern[1]['indicators']) * 2
self.confidence = min(100, int((best_pattern[1]['weight'] / max(max_possible, 1)) * 100))
else:
self.detected_pattern = 'unstructured'
self.confidence = 0
def _assign_layers(self):
"""Assign directories to architectural layers."""
for dir_name in self.directories:
for layer, indicators in self.LAYER_HIERARCHY.items():
for indicator in indicators:
if indicator in dir_name:
self.layer_assignments[dir_name] = layer
break
if dir_name in self.layer_assignments:
break
if dir_name not in self.layer_assignments:
self.layer_assignments[dir_name] = 'unknown'
class CodeAnalyzer:
"""Analyzes code for architectural issues."""
# Thresholds
MAX_FILE_LINES = 500
MAX_CLASS_LINES = 300
MAX_FUNCTION_LINES = 50
MAX_IMPORTS_PER_FILE = 30
def __init__(self, project_path: Path, verbose: bool = False):
self.project_path = project_path
self.verbose = verbose
self.issues: List[Dict] = []
self.metrics: Dict = {}
def analyze(self) -> Dict:
"""Run code analysis."""
self._analyze_file_sizes()
self._analyze_imports()
self._detect_god_classes()
self._check_naming_conventions()
return {
'issues': self.issues,
'metrics': self.metrics,
}
def _analyze_file_sizes(self):
"""Check for oversized files."""
extensions = ['.py', '.js', '.ts', '.jsx', '.tsx', '.go', '.rs', '.java']
large_files = []
total_lines = 0
file_count = 0
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', 'coverage'}
for ext in extensions:
for file_path in self.project_path.rglob(f'*{ext}'):
if any(ignored in file_path.parts for ignored in ignore_dirs):
continue
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
lines = len(content.split('\n'))
total_lines += lines
file_count += 1
if lines > self.MAX_FILE_LINES:
large_files.append({
'path': str(file_path.relative_to(self.project_path)),
'lines': lines,
})
self.issues.append({
'type': 'large_file',
'severity': 'warning',
'file': str(file_path.relative_to(self.project_path)),
'message': f"File has {lines} lines (threshold: {self.MAX_FILE_LINES})",
'suggestion': "Consider splitting into smaller, focused modules",
})
except Exception:
pass
self.metrics['total_lines'] = total_lines
self.metrics['file_count'] = file_count
self.metrics['avg_file_lines'] = total_lines // file_count if file_count > 0 else 0
self.metrics['large_files'] = large_files
def _analyze_imports(self):
"""Analyze import patterns."""
extensions = ['.py', '.js', '.ts', '.jsx', '.tsx']
high_import_files = []
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', 'coverage'}
for ext in extensions:
for file_path in self.project_path.rglob(f'*{ext}'):
if any(ignored in file_path.parts for ignored in ignore_dirs):
continue
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
# Count imports
py_imports = len(re.findall(r'^(?:from|import)\s+', content, re.MULTILINE))
js_imports = len(re.findall(r'^import\s+', content, re.MULTILINE))
imports = py_imports + js_imports
if imports > self.MAX_IMPORTS_PER_FILE:
high_import_files.append({
'path': str(file_path.relative_to(self.project_path)),
'imports': imports,
})
self.issues.append({
'type': 'high_imports',
'severity': 'info',
'file': str(file_path.relative_to(self.project_path)),
'message': f"File has {imports} imports (threshold: {self.MAX_IMPORTS_PER_FILE})",
'suggestion': "Consider if all imports are necessary or if the file has too many responsibilities",
})
except Exception:
pass
self.metrics['high_import_files'] = high_import_files
def _detect_god_classes(self):
"""Detect potential god classes (oversized classes)."""
extensions = ['.py', '.js', '.ts', '.java']
god_classes = []
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', 'coverage'}
for ext in extensions:
for file_path in self.project_path.rglob(f'*{ext}'):
if any(ignored in file_path.parts for ignored in ignore_dirs):
continue
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
lines = content.split('\n')
# Simple class detection
class_pattern = r'^\s*(?:export\s+)?(?:abstract\s+)?class\s+(\w+)'
in_class = False
class_name = None
class_start = 0
brace_count = 0
for i, line in enumerate(lines):
match = re.match(class_pattern, line)
if match:
if in_class and class_name:
# End previous class
class_lines = i - class_start
if class_lines > self.MAX_CLASS_LINES:
god_classes.append({
'file': str(file_path.relative_to(self.project_path)),
'class': class_name,
'lines': class_lines,
})
class_name = match.group(1)
class_start = i
in_class = True
# Check last class
if in_class and class_name:
class_lines = len(lines) - class_start
if class_lines > self.MAX_CLASS_LINES:
god_classes.append({
'file': str(file_path.relative_to(self.project_path)),
'class': class_name,
'lines': class_lines,
})
self.issues.append({
'type': 'god_class',
'severity': 'warning',
'file': str(file_path.relative_to(self.project_path)),
'message': f"Class '{class_name}' has ~{class_lines} lines (threshold: {self.MAX_CLASS_LINES})",
'suggestion': "Consider applying Single Responsibility Principle and splitting into smaller classes",
})
except Exception:
pass
self.metrics['god_classes'] = god_classes
def _check_naming_conventions(self):
"""Check for naming convention issues."""
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', 'coverage'}
naming_issues = []
# Check directory naming
for dir_path in self.project_path.rglob('*'):
if not dir_path.is_dir():
continue
if any(ignored in dir_path.parts for ignored in ignore_dirs):
continue
dir_name = dir_path.name
# Check for mixed case in directories (should be kebab-case or snake_case)
if re.search(r'[A-Z]', dir_name) and '-' not in dir_name and '_' not in dir_name:
rel_path = str(dir_path.relative_to(self.project_path))
if len(rel_path.split('/')) <= 3: # Only check top-level dirs
naming_issues.append({
'type': 'directory',
'path': rel_path,
'issue': 'PascalCase directory name',
})
if naming_issues:
self.issues.append({
'type': 'naming_convention',
'severity': 'info',
'message': f"Found {len(naming_issues)} naming convention inconsistencies",
'details': naming_issues[:5], # Show first 5
})
self.metrics['naming_issues'] = naming_issues
class LayerViolationDetector:
"""Detects architectural layer violations."""
LAYER_ORDER = ['presentation', 'application', 'domain', 'infrastructure']
# Valid dependency directions (key can depend on values)
VALID_DEPENDENCIES = {
'presentation': ['application', 'domain'],
'application': ['domain', 'infrastructure'],
'domain': [], # Domain should not depend on other layers
'infrastructure': ['domain'],
}
def __init__(self, project_path: Path, layer_assignments: Dict[str, str]):
self.project_path = project_path
self.layer_assignments = layer_assignments
self.violations: List[Dict] = []
def detect(self) -> List[Dict]:
"""Detect layer violations."""
self._analyze_imports()
return self.violations
def _analyze_imports(self):
"""Analyze imports for layer violations."""
extensions = ['.py', '.js', '.ts', '.jsx', '.tsx']
ignore_dirs = {'.git', 'node_modules', '__pycache__', '.venv', 'venv',
'dist', 'build', '.next', 'coverage'}
for ext in extensions:
for file_path in self.project_path.rglob(f'*{ext}'):
if any(ignored in file_path.parts for ignored in ignore_dirs):
continue
try:
rel_path = file_path.relative_to(self.project_path)
if len(rel_path.parts) < 2:
continue
source_dir = rel_path.parts[0].lower()
source_layer = self.layer_assignments.get(source_dir)
if not source_layer or source_layer == 'unknown':
continue
# Extract imports
content = file_path.read_text(encoding='utf-8', errors='ignore')
imports = self._extract_imports(content)
# Check each import for layer violations
for imp in imports:
target_dir = self._get_import_directory(imp)
if not target_dir:
continue
target_layer = self.layer_assignments.get(target_dir.lower())
if not target_layer or target_layer == 'unknown':
continue
if self._is_violation(source_layer, target_layer):
self.violations.append({
'type': 'layer_violation',
'severity': 'warning',
'file': str(rel_path),
'source_layer': source_layer,
'target_layer': target_layer,
'import': imp,
'message': f"{source_layer} layer should not depend on {target_layer} layer",
})
except Exception:
pass
def _extract_imports(self, content: str) -> List[str]:
"""Extract import statements."""
imports = []
# Python imports
imports.extend(re.findall(r'^(?:from|import)\s+([\w.]+)', content, re.MULTILINE))
# JS/TS imports
imports.extend(re.findall(r'(?:import|require)\s*\(?[\'"]([^\'"\s]+)[\'"]', content))
return imports
def _get_import_directory(self, imp: str) -> Optional[str]:
"""Get the directory from an import path."""
# Handle relative imports
if imp.startswith('.'):
return None # Skip relative imports
parts = imp.replace('@/', '').replace('~/', '').split('/')
if parts:
return parts[0].split('.')[0]
return None
def _is_violation(self, source_layer: str, target_layer: str) -> bool:
"""Check if the dependency is a violation."""
if source_layer == target_layer:
return False
valid_deps = self.VALID_DEPENDENCIES.get(source_layer, [])
return target_layer not in valid_deps and target_layer != source_layer
class ProjectArchitect:
"""Main class that orchestrates architecture analysis."""
def __init__(self, project_path: Path, verbose: bool = False):
self.project_path = project_path
self.verbose = verbose
def analyze(self) -> Dict:
"""Run full architecture analysis."""
if self.verbose:
print(f"Analyzing project: {self.project_path}")
# Pattern detection
pattern_detector = PatternDetector(self.project_path)
pattern_result = pattern_detector.scan()
if self.verbose:
print(f"Detected pattern: {pattern_result['detected_pattern']} "
f"(confidence: {pattern_result['confidence']}%)")
# Code analysis
code_analyzer = CodeAnalyzer(self.project_path, self.verbose)
code_result = code_analyzer.analyze()
if self.verbose:
print(f"Found {len(code_result['issues'])} code issues")
# Layer violation detection
violation_detector = LayerViolationDetector(
self.project_path,
pattern_result['layer_assignments']
)
violations = violation_detector.detect()
if self.verbose:
print(f"Found {len(violations)} layer violations")
# Generate recommendations
recommendations = self._generate_recommendations(
pattern_result, code_result, violations
)
return {
'project_path': str(self.project_path),
'architecture': {
'detected_pattern': pattern_result['detected_pattern'],
'confidence': pattern_result['confidence'],
'layer_assignments': pattern_result['layer_assignments'],
'pattern_scores': pattern_result['pattern_scores'],
},
'structure': {
'directories': pattern_result['directories'],
},
'code_quality': {
'metrics': code_result['metrics'],
'issues': code_result['issues'],
},
'layer_violations': violations,
'recommendations': recommendations,
'summary': {
'pattern': pattern_result['detected_pattern'],
'confidence': pattern_result['confidence'],
'total_issues': len(code_result['issues']) + len(violations),
'code_issues': len(code_result['issues']),
'layer_violations': len(violations),
},
}
def _generate_recommendations(self, pattern_result: Dict, code_result: Dict,
violations: List[Dict]) -> List[str]:
"""Generate actionable recommendations."""
recommendations = []
# Pattern recommendations
pattern = pattern_result['detected_pattern']
confidence = pattern_result['confidence']
if pattern == 'unstructured' or confidence < 30:
recommendations.append(
"Consider adopting a clear architectural pattern (Layered, Clean, or Hexagonal) "
"to improve code organization and maintainability"
)
# Layer violation recommendations
if violations:
recommendations.append(
f"Fix {len(violations)} layer violation(s) to maintain proper separation of concerns. "
"Dependencies should flow from presentation → application → domain ← infrastructure"
)
# God class recommendations
god_classes = code_result['metrics'].get('god_classes', [])
if god_classes:
recommendations.append(
f"Split {len(god_classes)} large class(es) into smaller, focused classes "
"following the Single Responsibility Principle"
)
# Large file recommendations
large_files = code_result['metrics'].get('large_files', [])
if large_files:
recommendations.append(
f"Consider refactoring {len(large_files)} large file(s) into smaller modules"
)
# Missing layer recommendations
assigned_layers = set(pattern_result['layer_assignments'].values())
if pattern in ['layered', 'clean', 'hexagonal']:
expected_layers = {'presentation', 'application', 'domain', 'infrastructure'}
missing = expected_layers - assigned_layers - {'unknown'}
if missing:
recommendations.append(
f"Consider adding missing architectural layer(s): {', '.join(missing)}"
)
return recommendations
def print_human_report(report: Dict):
"""Print human-readable report."""
print("\n" + "=" * 60)
print("ARCHITECTURE ASSESSMENT")
print("=" * 60)
print(f"\nProject: {report['project_path']}")
arch = report['architecture']
print(f"\n--- Architecture Pattern ---")
print(f"Detected: {arch['detected_pattern'].replace('_', ' ').title()}")
print(f"Confidence: {arch['confidence']}%")
if arch['layer_assignments']:
print(f"\nLayer Assignments:")
for dir_name, layer in sorted(arch['layer_assignments'].items()):
if layer != 'unknown':
status = "OK"
else:
status = "?"
print(f" {status} {dir_name:20} -> {layer}")
summary = report['summary']
print(f"\n--- Summary ---")
print(f"Total issues: {summary['total_issues']}")
print(f" Code issues: {summary['code_issues']}")
print(f" Layer violations: {summary['layer_violations']}")
if report['code_quality']['issues']:
print(f"\n--- Code Issues ---")
for issue in report['code_quality']['issues'][:10]:
severity = issue['severity'].upper()
print(f" [{severity}] {issue.get('file', 'N/A')}")
print(f" {issue['message']}")
if 'suggestion' in issue:
print(f" Suggestion: {issue['suggestion']}")
if report['layer_violations']:
print(f"\n--- Layer Violations ---")
for v in report['layer_violations'][:5]:
print(f" {v['file']}")
print(f" {v['message']}")
if report['recommendations']:
print(f"\n--- Recommendations ---")
for i, rec in enumerate(report['recommendations'], 1):
print(f" {i}. {rec}")
metrics = report['code_quality']['metrics']
print(f"\n--- Metrics ---")
print(f" Total lines: {metrics.get('total_lines', 'N/A')}")
print(f" File count: {metrics.get('file_count', 'N/A')}")
print(f" Avg lines/file: {metrics.get('avg_file_lines', 'N/A')}")
print("\n" + "=" * 60)
def main():
parser = argparse.ArgumentParser(
description='Analyze project architecture and detect patterns and issues',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s ./my-project
%(prog)s ./my-project --verbose
%(prog)s ./my-project --output json
%(prog)s ./my-project --check layers
Detects:
- Architectural patterns (Layered, MVC, Hexagonal, Clean, Microservices)
- Code organization issues (large files, god classes)
- Layer violations (incorrect dependencies between layers)
- Missing architectural components
'''
)
parser.add_argument(
'project_path',
help='Path to the project directory'
)
parser.add_argument(
'--output', '-o',
choices=['human', 'json'],
default='human',
help='Output format (default: human)'
)
parser.add_argument(
'--check',
choices=['all', 'pattern', 'layers', 'code'],
default='all',
help='What to check (default: all)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--save', '-s',
help='Save report to file'
)
args = parser.parse_args()
project_path = Path(args.project_path).resolve()
if not project_path.exists():
print(f"Error: Project path does not exist: {project_path}", file=sys.stderr)
sys.exit(1)
if not project_path.is_dir():
print(f"Error: Project path is not a directory: {project_path}", file=sys.stderr)
sys.exit(1)
# Run analysis
architect = ProjectArchitect(project_path, verbose=args.verbose)
report = architect.analyze()
# Handle specific checks
if args.check == 'pattern':
arch = report['architecture']
print(f"Pattern: {arch['detected_pattern']} (confidence: {arch['confidence']}%)")
sys.exit(0)
elif args.check == 'layers':
violations = report['layer_violations']
if violations:
print(f"Found {len(violations)} layer violation(s):")
for v in violations:
print(f" {v['file']}: {v['message']}")
sys.exit(1)
else:
print("No layer violations found.")
sys.exit(0)
elif args.check == 'code':
issues = report['code_quality']['issues']
if issues:
print(f"Found {len(issues)} code issue(s):")
for issue in issues[:10]:
print(f" [{issue['severity'].upper()}] {issue['message']}")
sys.exit(1 if any(i['severity'] == 'warning' for i in issues) else 0)
else:
print("No code issues found.")
sys.exit(0)
# Output report
if args.output == 'json':
output = json.dumps(report, indent=2)
if args.save:
Path(args.save).write_text(output)
print(f"Report saved to {args.save}")
else:
print(output)
else:
print_human_report(report)
if args.save:
Path(args.save).write_text(json.dumps(report, indent=2))
print(f"\nJSON report saved to {args.save}")
if __name__ == '__main__':
main()
Comprehensive DevOps skill for CI/CD, infrastructure automation, containerization, and cloud platforms (AWS, GCP, Azure). Includes pipeline setup, infrastruc...
---
name: "senior-devops"
description: Comprehensive DevOps skill for CI/CD, infrastructure automation, containerization, and cloud platforms (AWS, GCP, Azure). Includes pipeline setup, infrastructure as code, deployment automation, and monitoring. Use when setting up pipelines, deploying applications, managing infrastructure, implementing monitoring, or optimizing deployment processes.
---
# Senior Devops
Complete toolkit for senior devops with modern tools and best practices.
## Quick Start
### Main Capabilities
This skill provides three core capabilities through automated scripts:
```bash
# Script 1: Pipeline Generator — scaffolds CI/CD pipelines for GitHub Actions or CircleCI
python scripts/pipeline_generator.py ./app --platform=github --stages=build,test,deploy
# Script 2: Terraform Scaffolder — generates and validates IaC modules for AWS/GCP/Azure
python scripts/terraform_scaffolder.py ./infra --provider=aws --module=ecs-service --verbose
# Script 3: Deployment Manager — orchestrates container deployments with rollback support
python scripts/deployment_manager.py deploy --env=production --image=app:1.2.3 --strategy=blue-green
```
## Core Capabilities
### 1. Pipeline Generator
Scaffolds CI/CD pipeline configurations for GitHub Actions or CircleCI, with stages for build, test, security scan, and deploy.
**Example — GitHub Actions workflow:**
```yaml
# .github/workflows/ci.yml
name: CI/CD Pipeline
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
build-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
cache: 'npm'
- run: npm ci
- run: npm run lint
- run: npm test -- --coverage
- name: Upload coverage
uses: codecov/codecov-action@v4
build-docker:
needs: build-and-test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build and push image
uses: docker/build-push-action@v5
with:
push: { github.ref == 'refs/heads/main'}
tags: ghcr.io/{ github.repository}:{ github.sha}
deploy:
needs: build-docker
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to ECS
run: |
aws ecs update-service \
--cluster production \
--service app-service \
--force-new-deployment
```
**Usage:**
```bash
python scripts/pipeline_generator.py <project-path> --platform=github|circleci --stages=build,test,deploy
```
### 2. Terraform Scaffolder
Generates, validates, and plans Terraform modules. Enforces consistent module structure and runs `terraform validate` + `terraform plan` before any apply.
**Example — AWS ECS service module:**
```hcl
# modules/ecs-service/main.tf
resource "aws_ecs_task_definition" "app" {
family = var.service_name
requires_compatibilities = ["FARGATE"]
network_mode = "awsvpc"
cpu = var.cpu
memory = var.memory
container_definitions = jsonencode([{
name = var.service_name
image = var.container_image
essential = true
portMappings = [{
containerPort = var.container_port
protocol = "tcp"
}]
environment = [for k, v in var.env_vars : { name = k, value = v }]
logConfiguration = {
logDriver = "awslogs"
options = {
awslogs-group = "/ecs/var.service_name"
awslogs-region = var.aws_region
awslogs-stream-prefix = "ecs"
}
}
}])
}
resource "aws_ecs_service" "app" {
name = var.service_name
cluster = var.cluster_id
task_definition = aws_ecs_task_definition.app.arn
desired_count = var.desired_count
launch_type = "FARGATE"
network_configuration {
subnets = var.private_subnet_ids
security_groups = [aws_security_group.app.id]
assign_public_ip = false
}
load_balancer {
target_group_arn = aws_lb_target_group.app.arn
container_name = var.service_name
container_port = var.container_port
}
}
```
**Usage:**
```bash
python scripts/terraform_scaffolder.py <target-path> --provider=aws|gcp|azure --module=ecs-service|gke-deployment|aks-service [--verbose]
```
### 3. Deployment Manager
Orchestrates deployments with blue/green or rolling strategies, health-check gates, and automatic rollback on failure.
**Example — Kubernetes blue/green deployment (blue-slot specific elements):**
```yaml
# k8s/deployment-blue.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: app-blue
labels:
app: myapp
slot: blue # slot label distinguishes blue from green
spec:
replicas: 3
selector:
matchLabels:
app: myapp
slot: blue
template:
metadata:
labels:
app: myapp
slot: blue
spec:
containers:
- name: app
image: ghcr.io/org/app:1.2.3
readinessProbe: # gate: pod must pass before traffic switches
httpGet:
path: /healthz
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
```
**Usage:**
```bash
python scripts/deployment_manager.py deploy \
--env=staging|production \
--image=app:1.2.3 \
--strategy=blue-green|rolling \
--health-check-url=https://app.example.com/healthz
python scripts/deployment_manager.py rollback --env=production --to-version=1.2.2
python scripts/deployment_manager.py --analyze --env=production # audit current state
```
## Resources
- Pattern Reference: `references/cicd_pipeline_guide.md` — detailed CI/CD patterns, best practices, anti-patterns
- Workflow Guide: `references/infrastructure_as_code.md` — IaC step-by-step processes, optimization, troubleshooting
- Technical Guide: `references/deployment_strategies.md` — deployment strategy configs, security considerations, scalability
- Tool Scripts: `scripts/` directory
## Development Workflow
### 1. Infrastructure Changes (Terraform)
```bash
# Scaffold or update module
python scripts/terraform_scaffolder.py ./infra --provider=aws --module=ecs-service --verbose
# Validate and plan — review diff before applying
terraform -chdir=infra init
terraform -chdir=infra validate
terraform -chdir=infra plan -out=tfplan
# Apply only after plan review
terraform -chdir=infra apply tfplan
# Verify resources are healthy
aws ecs describe-services --cluster production --services app-service \
--query 'services[0].{Status:status,Running:runningCount,Desired:desiredCount}'
```
### 2. Application Deployment
```bash
# Generate or update pipeline config
python scripts/pipeline_generator.py . --platform=github --stages=build,test,security,deploy
# Build and tag image
docker build -t ghcr.io/org/app:$(git rev-parse --short HEAD) .
docker push ghcr.io/org/app:$(git rev-parse --short HEAD)
# Deploy with health-check gate
python scripts/deployment_manager.py deploy \
--env=production \
--image=app:$(git rev-parse --short HEAD) \
--strategy=blue-green \
--health-check-url=https://app.example.com/healthz
# Verify pods are running
kubectl get pods -n production -l app=myapp
kubectl rollout status deployment/app-blue -n production
# Switch traffic after verification
kubectl patch service app-svc -n production \
-p '{"spec":{"selector":{"slot":"blue"}}}'
```
### 3. Rollback Procedure
```bash
# Immediate rollback via deployment manager
python scripts/deployment_manager.py rollback --env=production --to-version=1.2.2
# Or via kubectl
kubectl rollout undo deployment/app -n production
kubectl rollout status deployment/app -n production
# Verify rollback succeeded
kubectl get pods -n production -l app=myapp
curl -sf https://app.example.com/healthz || echo "ROLLBACK FAILED — escalate"
```
## Troubleshooting
Check the comprehensive troubleshooting section in `references/deployment_strategies.md`.
FILE:references/cicd_pipeline_guide.md
# Cicd Pipeline Guide
## Overview
This reference guide provides comprehensive information for senior devops.
## Patterns and Practices
### Pattern 1: Best Practice Implementation
**Description:**
Detailed explanation of the pattern.
**When to Use:**
- Scenario 1
- Scenario 2
- Scenario 3
**Implementation:**
```typescript
// Example code implementation
export class Example {
// Implementation details
}
```
**Benefits:**
- Benefit 1
- Benefit 2
- Benefit 3
**Trade-offs:**
- Consider 1
- Consider 2
- Consider 3
### Pattern 2: Advanced Technique
**Description:**
Another important pattern for senior devops.
**Implementation:**
```typescript
// Advanced example
async function advancedExample() {
// Code here
}
```
## Guidelines
### Code Organization
- Clear structure
- Logical separation
- Consistent naming
- Proper documentation
### Performance Considerations
- Optimization strategies
- Bottleneck identification
- Monitoring approaches
- Scaling techniques
### Security Best Practices
- Input validation
- Authentication
- Authorization
- Data protection
## Common Patterns
### Pattern A
Implementation details and examples.
### Pattern B
Implementation details and examples.
### Pattern C
Implementation details and examples.
## Anti-Patterns to Avoid
### Anti-Pattern 1
What not to do and why.
### Anti-Pattern 2
What not to do and why.
## Tools and Resources
### Recommended Tools
- Tool 1: Purpose
- Tool 2: Purpose
- Tool 3: Purpose
### Further Reading
- Resource 1
- Resource 2
- Resource 3
## Conclusion
Key takeaways for using this reference guide effectively.
FILE:references/deployment_strategies.md
# Deployment Strategies
## Overview
This reference guide provides comprehensive information for senior devops.
## Patterns and Practices
### Pattern 1: Best Practice Implementation
**Description:**
Detailed explanation of the pattern.
**When to Use:**
- Scenario 1
- Scenario 2
- Scenario 3
**Implementation:**
```typescript
// Example code implementation
export class Example {
// Implementation details
}
```
**Benefits:**
- Benefit 1
- Benefit 2
- Benefit 3
**Trade-offs:**
- Consider 1
- Consider 2
- Consider 3
### Pattern 2: Advanced Technique
**Description:**
Another important pattern for senior devops.
**Implementation:**
```typescript
// Advanced example
async function advancedExample() {
// Code here
}
```
## Guidelines
### Code Organization
- Clear structure
- Logical separation
- Consistent naming
- Proper documentation
### Performance Considerations
- Optimization strategies
- Bottleneck identification
- Monitoring approaches
- Scaling techniques
### Security Best Practices
- Input validation
- Authentication
- Authorization
- Data protection
## Common Patterns
### Pattern A
Implementation details and examples.
### Pattern B
Implementation details and examples.
### Pattern C
Implementation details and examples.
## Anti-Patterns to Avoid
### Anti-Pattern 1
What not to do and why.
### Anti-Pattern 2
What not to do and why.
## Tools and Resources
### Recommended Tools
- Tool 1: Purpose
- Tool 2: Purpose
- Tool 3: Purpose
### Further Reading
- Resource 1
- Resource 2
- Resource 3
## Conclusion
Key takeaways for using this reference guide effectively.
FILE:references/infrastructure_as_code.md
# Infrastructure As Code
## Overview
This reference guide provides comprehensive information for senior devops.
## Patterns and Practices
### Pattern 1: Best Practice Implementation
**Description:**
Detailed explanation of the pattern.
**When to Use:**
- Scenario 1
- Scenario 2
- Scenario 3
**Implementation:**
```typescript
// Example code implementation
export class Example {
// Implementation details
}
```
**Benefits:**
- Benefit 1
- Benefit 2
- Benefit 3
**Trade-offs:**
- Consider 1
- Consider 2
- Consider 3
### Pattern 2: Advanced Technique
**Description:**
Another important pattern for senior devops.
**Implementation:**
```typescript
// Advanced example
async function advancedExample() {
// Code here
}
```
## Guidelines
### Code Organization
- Clear structure
- Logical separation
- Consistent naming
- Proper documentation
### Performance Considerations
- Optimization strategies
- Bottleneck identification
- Monitoring approaches
- Scaling techniques
### Security Best Practices
- Input validation
- Authentication
- Authorization
- Data protection
## Common Patterns
### Pattern A
Implementation details and examples.
### Pattern B
Implementation details and examples.
### Pattern C
Implementation details and examples.
## Anti-Patterns to Avoid
### Anti-Pattern 1
What not to do and why.
### Anti-Pattern 2
What not to do and why.
## Tools and Resources
### Recommended Tools
- Tool 1: Purpose
- Tool 2: Purpose
- Tool 3: Purpose
### Further Reading
- Resource 1
- Resource 2
- Resource 3
## Conclusion
Key takeaways for using this reference guide effectively.
FILE:scripts/deployment_manager.py
#!/usr/bin/env python3
"""
Deployment Manager
Automated tool for senior devops tasks
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional
class DeploymentManager:
"""Main class for deployment manager functionality"""
def __init__(self, target_path: str, verbose: bool = False):
self.target_path = Path(target_path)
self.verbose = verbose
self.results = {}
def run(self) -> Dict:
"""Execute the main functionality"""
print(f"🚀 Running {self.__class__.__name__}...")
print(f"📁 Target: {self.target_path}")
try:
self.validate_target()
self.analyze()
self.generate_report()
print("✅ Completed successfully!")
return self.results
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
def validate_target(self):
"""Validate the target path exists and is accessible"""
if not self.target_path.exists():
raise ValueError(f"Target path does not exist: {self.target_path}")
if self.verbose:
print(f"✓ Target validated: {self.target_path}")
def analyze(self):
"""Perform the main analysis or operation"""
if self.verbose:
print("📊 Analyzing...")
# Main logic here
self.results['status'] = 'success'
self.results['target'] = str(self.target_path)
self.results['findings'] = []
# Add analysis results
if self.verbose:
print(f"✓ Analysis complete: {len(self.results.get('findings', []))} findings")
def generate_report(self):
"""Generate and display the report"""
print("\n" + "="*50)
print("REPORT")
print("="*50)
print(f"Target: {self.results.get('target')}")
print(f"Status: {self.results.get('status')}")
print(f"Findings: {len(self.results.get('findings', []))}")
print("="*50 + "\n")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Deployment Manager"
)
parser.add_argument(
'target',
help='Target path to analyze or process'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
parser.add_argument(
'--output', '-o',
help='Output file path'
)
args = parser.parse_args()
tool = DeploymentManager(
args.target,
verbose=args.verbose
)
results = tool.run()
if args.json:
output = json.dumps(results, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"Results written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()
FILE:scripts/pipeline_generator.py
#!/usr/bin/env python3
"""
Pipeline Generator
Automated tool for senior devops tasks
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional
class PipelineGenerator:
"""Main class for pipeline generator functionality"""
def __init__(self, target_path: str, verbose: bool = False):
self.target_path = Path(target_path)
self.verbose = verbose
self.results = {}
def run(self) -> Dict:
"""Execute the main functionality"""
print(f"🚀 Running {self.__class__.__name__}...")
print(f"📁 Target: {self.target_path}")
try:
self.validate_target()
self.analyze()
self.generate_report()
print("✅ Completed successfully!")
return self.results
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
def validate_target(self):
"""Validate the target path exists and is accessible"""
if not self.target_path.exists():
raise ValueError(f"Target path does not exist: {self.target_path}")
if self.verbose:
print(f"✓ Target validated: {self.target_path}")
def analyze(self):
"""Perform the main analysis or operation"""
if self.verbose:
print("📊 Analyzing...")
# Main logic here
self.results['status'] = 'success'
self.results['target'] = str(self.target_path)
self.results['findings'] = []
# Add analysis results
if self.verbose:
print(f"✓ Analysis complete: {len(self.results.get('findings', []))} findings")
def generate_report(self):
"""Generate and display the report"""
print("\n" + "="*50)
print("REPORT")
print("="*50)
print(f"Target: {self.results.get('target')}")
print(f"Status: {self.results.get('status')}")
print(f"Findings: {len(self.results.get('findings', []))}")
print("="*50 + "\n")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Pipeline Generator"
)
parser.add_argument(
'target',
help='Target path to analyze or process'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
parser.add_argument(
'--output', '-o',
help='Output file path'
)
args = parser.parse_args()
tool = PipelineGenerator(
args.target,
verbose=args.verbose
)
results = tool.run()
if args.json:
output = json.dumps(results, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"Results written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()
FILE:scripts/terraform_scaffolder.py
#!/usr/bin/env python3
"""
Terraform Scaffolder
Automated tool for senior devops tasks
"""
import os
import sys
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional
class TerraformScaffolder:
"""Main class for terraform scaffolder functionality"""
def __init__(self, target_path: str, verbose: bool = False):
self.target_path = Path(target_path)
self.verbose = verbose
self.results = {}
def run(self) -> Dict:
"""Execute the main functionality"""
print(f"🚀 Running {self.__class__.__name__}...")
print(f"📁 Target: {self.target_path}")
try:
self.validate_target()
self.analyze()
self.generate_report()
print("✅ Completed successfully!")
return self.results
except Exception as e:
print(f"❌ Error: {e}")
sys.exit(1)
def validate_target(self):
"""Validate the target path exists and is accessible"""
if not self.target_path.exists():
raise ValueError(f"Target path does not exist: {self.target_path}")
if self.verbose:
print(f"✓ Target validated: {self.target_path}")
def analyze(self):
"""Perform the main analysis or operation"""
if self.verbose:
print("📊 Analyzing...")
# Main logic here
self.results['status'] = 'success'
self.results['target'] = str(self.target_path)
self.results['findings'] = []
# Add analysis results
if self.verbose:
print(f"✓ Analysis complete: {len(self.results.get('findings', []))} findings")
def generate_report(self):
"""Generate and display the report"""
print("\n" + "="*50)
print("REPORT")
print("="*50)
print(f"Target: {self.results.get('target')}")
print(f"Status: {self.results.get('status')}")
print(f"Findings: {len(self.results.get('findings', []))}")
print("="*50 + "\n")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Terraform Scaffolder"
)
parser.add_argument(
'target',
help='Target path to analyze or process'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
parser.add_argument(
'--output', '-o',
help='Output file path'
)
args = parser.parse_args()
tool = TerraformScaffolder(
args.target,
verbose=args.verbose
)
results = tool.run()
if args.json:
output = json.dumps(results, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"Results written to {args.output}")
else:
print(output)
if __name__ == '__main__':
main()
Designs and implements backend systems including REST APIs, microservices, database architectures, authentication flows, and security hardening. Use when the...
---
name: "senior-backend"
description: Designs and implements backend systems including REST APIs, microservices, database architectures, authentication flows, and security hardening. Use when the user asks to "design REST APIs", "optimize database queries", "implement authentication", "build microservices", "review backend code", "set up GraphQL", "handle database migrations", or "load test APIs". Covers Node.js/Express/Fastify development, PostgreSQL optimization, API security, and backend architecture patterns.
---
# Senior Backend Engineer
Backend development patterns, API design, database optimization, and security practices.
---
## Quick Start
```bash
# Generate API routes from OpenAPI spec
python scripts/api_scaffolder.py openapi.yaml --framework express --output src/routes/
# Analyze database schema and generate migrations
python scripts/database_migration_tool.py --connection postgres://localhost/mydb --analyze
# Load test an API endpoint
python scripts/api_load_tester.py https://api.example.com/users --concurrency 50 --duration 30
```
---
## Tools Overview
### 1. API Scaffolder
Generates API route handlers, middleware, and OpenAPI specifications from schema definitions.
**Input:** OpenAPI spec (YAML/JSON) or database schema
**Output:** Route handlers, validation middleware, TypeScript types
**Usage:**
```bash
# Generate Express routes from OpenAPI spec
python scripts/api_scaffolder.py openapi.yaml --framework express --output src/routes/
# Output: Generated 12 route handlers, validation middleware, and TypeScript types
# Generate from database schema
python scripts/api_scaffolder.py --from-db postgres://localhost/mydb --output src/routes/
# Generate OpenAPI spec from existing routes
python scripts/api_scaffolder.py src/routes/ --generate-spec --output openapi.yaml
```
**Supported Frameworks:**
- Express.js (`--framework express`)
- Fastify (`--framework fastify`)
- Koa (`--framework koa`)
---
### 2. Database Migration Tool
Analyzes database schemas, detects changes, and generates migration files with rollback support.
**Input:** Database connection string or schema files
**Output:** Migration files, schema diff report, optimization suggestions
**Usage:**
```bash
# Analyze current schema and suggest optimizations
python scripts/database_migration_tool.py --connection postgres://localhost/mydb --analyze
# Output: Missing indexes, N+1 query risks, and suggested migration files
# Generate migration from schema diff
python scripts/database_migration_tool.py --connection postgres://localhost/mydb \
--compare schema/v2.sql --output migrations/
# Dry-run a migration
python scripts/database_migration_tool.py --connection postgres://localhost/mydb \
--migrate migrations/20240115_add_user_indexes.sql --dry-run
```
---
### 3. API Load Tester
Performs HTTP load testing with configurable concurrency, measuring latency percentiles and throughput.
**Input:** API endpoint URL and test configuration
**Output:** Performance report with latency distribution, error rates, throughput metrics
**Usage:**
```bash
# Basic load test
python scripts/api_load_tester.py https://api.example.com/users --concurrency 50 --duration 30
# Output: Throughput (req/sec), latency percentiles (P50/P95/P99), error counts, and scaling recommendations
# Test with custom headers and body
python scripts/api_load_tester.py https://api.example.com/orders \
--method POST \
--header "Authorization: Bearer token123" \
--body '{"product_id": 1, "quantity": 2}' \
--concurrency 100 \
--duration 60
# Compare two endpoints
python scripts/api_load_tester.py https://api.example.com/v1/users https://api.example.com/v2/users \
--compare --concurrency 50 --duration 30
```
---
## Backend Development Workflows
### API Design Workflow
Use when designing a new API or refactoring existing endpoints.
**Step 1: Define resources and operations**
```yaml
# openapi.yaml
openapi: 3.0.3
info:
title: User Service API
version: 1.0.0
paths:
/users:
get:
summary: List users
parameters:
- name: "limit"
in: query
schema:
type: integer
default: 20
post:
summary: Create user
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateUser'
```
**Step 2: Generate route scaffolding**
```bash
python scripts/api_scaffolder.py openapi.yaml --framework express --output src/routes/
```
**Step 3: Implement business logic**
```typescript
// src/routes/users.ts (generated, then customized)
export const createUser = async (req: Request, res: Response) => {
const { email, name } = req.body;
// Add business logic
const user = await userService.create({ email, name });
res.status(201).json(user);
};
```
**Step 4: Add validation middleware**
```bash
# Validation is auto-generated from OpenAPI schema
# src/middleware/validators.ts includes:
# - Request body validation
# - Query parameter validation
# - Path parameter validation
```
**Step 5: Generate updated OpenAPI spec**
```bash
python scripts/api_scaffolder.py src/routes/ --generate-spec --output openapi.yaml
```
---
### Database Optimization Workflow
Use when queries are slow or database performance needs improvement.
**Step 1: Analyze current performance**
```bash
python scripts/database_migration_tool.py --connection $DATABASE_URL --analyze
```
**Step 2: Identify slow queries**
```sql
-- Check query execution plans
EXPLAIN ANALYZE SELECT * FROM orders
WHERE user_id = 123
ORDER BY created_at DESC
LIMIT 10;
-- Look for: Seq Scan (bad), Index Scan (good)
```
**Step 3: Generate index migrations**
```bash
python scripts/database_migration_tool.py --connection $DATABASE_URL \
--suggest-indexes --output migrations/
```
**Step 4: Test migration (dry-run)**
```bash
python scripts/database_migration_tool.py --connection $DATABASE_URL \
--migrate migrations/add_indexes.sql --dry-run
```
**Step 5: Apply and verify**
```bash
# Apply migration
python scripts/database_migration_tool.py --connection $DATABASE_URL \
--migrate migrations/add_indexes.sql
# Verify improvement
python scripts/database_migration_tool.py --connection $DATABASE_URL --analyze
```
---
### Security Hardening Workflow
Use when preparing an API for production or after a security review.
**Step 1: Review authentication setup**
```typescript
// Verify JWT configuration
const jwtConfig = {
secret: process.env.JWT_SECRET, // Must be from env, never hardcoded
expiresIn: '1h', // Short-lived tokens
algorithm: 'RS256' // Prefer asymmetric
};
```
**Step 2: Add rate limiting**
```typescript
import rateLimit from 'express-rate-limit';
const apiLimiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // 100 requests per window
standardHeaders: true,
legacyHeaders: false,
});
app.use('/api/', apiLimiter);
```
**Step 3: Validate all inputs**
```typescript
import { z } from 'zod';
const CreateUserSchema = z.object({
email: z.string().email().max(255),
name: "zstringmin1max100"
age: z.number().int().positive().optional()
});
// Use in route handler
const data = CreateUserSchema.parse(req.body);
```
**Step 4: Load test with attack patterns**
```bash
# Test rate limiting
python scripts/api_load_tester.py https://api.example.com/login \
--concurrency 200 --duration 10 --expect-rate-limit
# Test input validation
python scripts/api_load_tester.py https://api.example.com/users \
--method POST \
--body '{"email": "not-an-email"}' \
--expect-status 400
```
**Step 5: Review security headers**
```typescript
import helmet from 'helmet';
app.use(helmet({
contentSecurityPolicy: true,
crossOriginEmbedderPolicy: true,
crossOriginOpenerPolicy: true,
crossOriginResourcePolicy: true,
hsts: { maxAge: 31536000, includeSubDomains: true },
}));
```
---
## Reference Documentation
| File | Contains | Use When |
|------|----------|----------|
| `references/api_design_patterns.md` | REST vs GraphQL, versioning, error handling, pagination | Designing new APIs |
| `references/database_optimization_guide.md` | Indexing strategies, query optimization, N+1 solutions | Fixing slow queries |
| `references/backend_security_practices.md` | OWASP Top 10, auth patterns, input validation | Security hardening |
---
## Common Patterns Quick Reference
### REST API Response Format
```json
{
"data": { "id": 1, "name": "John" },
"meta": { "requestId": "abc-123" }
}
```
### Error Response Format
```json
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid email format",
"details": [{ "field": "email", "message": "must be valid email" }]
},
"meta": { "requestId": "abc-123" }
}
```
### HTTP Status Codes
| Code | Use Case |
|------|----------|
| 200 | Success (GET, PUT, PATCH) |
| 201 | Created (POST) |
| 204 | No Content (DELETE) |
| 400 | Validation error |
| 401 | Authentication required |
| 403 | Permission denied |
| 404 | Resource not found |
| 429 | Rate limit exceeded |
| 500 | Internal server error |
### Database Index Strategy
```sql
-- Single column (equality lookups)
CREATE INDEX idx_users_email ON users(email);
-- Composite (multi-column queries)
CREATE INDEX idx_orders_user_status ON orders(user_id, status);
-- Partial (filtered queries)
CREATE INDEX idx_orders_active ON orders(created_at) WHERE status = 'active';
-- Covering (avoid table lookup)
CREATE INDEX idx_users_email_name ON users(email) INCLUDE (name);
```
---
## Common Commands
```bash
# API Development
python scripts/api_scaffolder.py openapi.yaml --framework express
python scripts/api_scaffolder.py src/routes/ --generate-spec
# Database Operations
python scripts/database_migration_tool.py --connection $DATABASE_URL --analyze
python scripts/database_migration_tool.py --connection $DATABASE_URL --migrate file.sql
# Performance Testing
python scripts/api_load_tester.py https://api.example.com/endpoint --concurrency 50
python scripts/api_load_tester.py https://api.example.com/endpoint --compare baseline.json
```
FILE:references/api_design_patterns.md
# API Design Patterns
Concrete patterns for REST and GraphQL API design with examples.
## Patterns Index
1. [REST vs GraphQL Decision](#1-rest-vs-graphql-decision)
2. [Resource Naming Conventions](#2-resource-naming-conventions)
3. [API Versioning Strategies](#3-api-versioning-strategies)
4. [Error Handling Patterns](#4-error-handling-patterns)
5. [Pagination Patterns](#5-pagination-patterns)
6. [Authentication Patterns](#6-authentication-patterns)
7. [Rate Limiting Design](#7-rate-limiting-design)
8. [Idempotency Patterns](#8-idempotency-patterns)
---
## 1. REST vs GraphQL Decision
### When to Use REST
| Scenario | Why REST |
|----------|----------|
| Simple CRUD operations | Less complexity, widely understood |
| Public APIs | Better caching, easier documentation |
| File uploads/downloads | Native HTTP support |
| Microservices communication | Simpler service-to-service calls |
| Caching is critical | HTTP caching built-in |
### When to Use GraphQL
| Scenario | Why GraphQL |
|----------|-------------|
| Mobile apps with bandwidth constraints | Request only needed fields |
| Complex nested data | Single request for related data |
| Rapidly changing frontend requirements | Frontend-driven queries |
| Multiple client types | Each client queries what it needs |
| Real-time subscriptions needed | Built-in subscription support |
### Hybrid Approach
```
┌─────────────────────────────────────────────────────┐
│ API Gateway │
├─────────────────────────────────────────────────────┤
│ /api/v1/* → REST (Public API, webhooks) │
│ /graphql → GraphQL (Mobile apps, dashboards) │
│ /files/* → REST (File uploads/downloads) │
└─────────────────────────────────────────────────────┘
```
---
## 2. Resource Naming Conventions
### REST Endpoint Patterns
```
# Collections (plural nouns)
GET /users # List users
POST /users # Create user
GET /users/{id} # Get user
PUT /users/{id} # Replace user
PATCH /users/{id} # Update user
DELETE /users/{id} # Delete user
# Nested resources
GET /users/{id}/orders # User's orders
POST /users/{id}/orders # Create order for user
GET /users/{id}/orders/{orderId} # Specific order
# Actions (when CRUD doesn't fit)
POST /users/{id}/activate # Activate user
POST /orders/{id}/cancel # Cancel order
POST /payments/{id}/refund # Refund payment
# Filtering, sorting, pagination
GET /users?status=active&sort=-created_at&limit=20&offset=40
GET /orders?user_id=123&status=pending
```
### Naming Rules
| Rule | Good | Bad |
|------|------|-----|
| Use plural nouns | `/users` | `/user` |
| Use lowercase | `/user-profiles` | `/userProfiles` |
| Use hyphens | `/order-items` | `/order_items` |
| No verbs in URLs | `POST /orders` | `POST /createOrder` |
| No file extensions | `/users/123` | `/users/123.json` |
---
## 3. API Versioning Strategies
### Strategy Comparison
| Strategy | Example | Pros | Cons |
|----------|---------|------|------|
| URL Path | `/api/v1/users` | Explicit, easy routing | URL changes |
| Header | `Accept: application/vnd.api+json;version=1` | Clean URLs | Hidden version |
| Query Param | `/users?version=1` | Easy to test | Pollutes query string |
### Recommended: URL Path Versioning
```typescript
// Express routing
import v1Routes from './routes/v1';
import v2Routes from './routes/v2';
app.use('/api/v1', v1Routes);
app.use('/api/v2', v2Routes);
```
### Deprecation Strategy
```typescript
// Add deprecation headers
app.use('/api/v1', (req, res, next) => {
res.set('Deprecation', 'true');
res.set('Sunset', 'Sat, 01 Jun 2025 00:00:00 GMT');
res.set('Link', '</api/v2>; rel="successor-version"');
next();
}, v1Routes);
```
### Breaking vs Non-Breaking Changes
**Non-breaking (safe):**
- Adding new endpoints
- Adding optional fields
- Adding new enum values at end
**Breaking (requires new version):**
- Removing endpoints or fields
- Renaming fields
- Changing field types
- Changing required/optional status
---
## 4. Error Handling Patterns
### Standard Error Response Format
```json
{
"error": {
"code": "VALIDATION_ERROR",
"message": "Request validation failed",
"details": [
{
"field": "email",
"code": "INVALID_FORMAT",
"message": "Must be a valid email address"
},
{
"field": "age",
"code": "OUT_OF_RANGE",
"message": "Must be between 18 and 120"
}
],
"documentation_url": "https://api.example.com/docs/errors#validation"
},
"meta": {
"request_id": "req_abc123",
"timestamp": "2024-01-15T10:30:00Z"
}
}
```
### Error Codes by Category
```typescript
// Client errors (4xx)
const ClientErrors = {
VALIDATION_ERROR: 400,
INVALID_JSON: 400,
AUTHENTICATION_REQUIRED: 401,
INVALID_TOKEN: 401,
TOKEN_EXPIRED: 401,
PERMISSION_DENIED: 403,
RESOURCE_NOT_FOUND: 404,
METHOD_NOT_ALLOWED: 405,
CONFLICT: 409,
RATE_LIMIT_EXCEEDED: 429,
};
// Server errors (5xx)
const ServerErrors = {
INTERNAL_ERROR: 500,
DATABASE_ERROR: 500,
EXTERNAL_SERVICE_ERROR: 502,
SERVICE_UNAVAILABLE: 503,
};
```
### Error Handler Implementation
```typescript
// Express error handler
interface ApiError extends Error {
code: string;
statusCode: number;
details?: Array<{ field: string; message: string }>;
}
const errorHandler: ErrorRequestHandler = (err: ApiError, req, res, next) => {
const statusCode = err.statusCode || 500;
const code = err.code || 'INTERNAL_ERROR';
// Log server errors
if (statusCode >= 500) {
logger.error({ err, requestId: req.id }, 'Server error');
}
res.status(statusCode).json({
error: {
code,
message: statusCode >= 500 ? 'An unexpected error occurred' : err.message,
details: err.details,
...(process.env.NODE_ENV === 'development' && { stack: err.stack }),
},
meta: {
request_id: req.id,
timestamp: new Date().toISOString(),
},
});
};
```
---
## 5. Pagination Patterns
### Offset-Based Pagination
```
GET /users?limit=20&offset=40
Response:
{
"data": [...],
"pagination": {
"total": 1250,
"limit": 20,
"offset": 40,
"has_more": true
}
}
```
**Pros:** Simple, supports random access
**Cons:** Inconsistent with concurrent inserts/deletes
### Cursor-Based Pagination
```
GET /users?limit=20&cursor=eyJpZCI6MTIzfQ==
Response:
{
"data": [...],
"pagination": {
"limit": 20,
"next_cursor": "eyJpZCI6MTQzfQ==",
"prev_cursor": "eyJpZCI6MTIzfQ==",
"has_more": true
}
}
```
**Pros:** Consistent with real-time data, efficient
**Cons:** No random access, cursor encoding required
### Implementation Example
```typescript
// Cursor-based pagination
interface CursorPagination {
limit: number;
cursor?: string;
direction?: 'forward' | 'backward';
}
async function paginatedQuery<T>(
query: QueryBuilder,
{ limit, cursor, direction = 'forward' }: CursorPagination
): Promise<{ data: T[]; nextCursor?: string; hasMore: boolean }> {
// Decode cursor
const decoded = cursor ? JSON.parse(Buffer.from(cursor, 'base64').toString()) : null;
// Apply cursor condition
if (decoded) {
query = direction === 'forward'
? query.where('id', '>', decoded.id)
: query.where('id', '<', decoded.id);
}
// Fetch one extra to check if more exist
const results = await query.limit(limit + 1).orderBy('id', direction === 'forward' ? 'asc' : 'desc');
const hasMore = results.length > limit;
const data = hasMore ? results.slice(0, -1) : results;
// Encode next cursor
const nextCursor = hasMore
? Buffer.from(JSON.stringify({ id: data[data.length - 1].id })).toString('base64')
: undefined;
return { data, nextCursor, hasMore };
}
```
---
## 6. Authentication Patterns
### JWT Authentication Flow
```
┌──────────┐ 1. Login ┌──────────┐
│ Client │ ──────────────────▶ │ Server │
└──────────┘ └──────────┘
│
2. Return JWT │
◀────────────────────────────────────────
{access_token, refresh_token} │
│
3. API Request │
───────────────────────────────────────▶
Authorization: Bearer {token} │
│
4. Validate & Respond │
◀────────────────────────────────────────
```
### JWT Implementation
```typescript
import jwt from 'jsonwebtoken';
interface TokenPayload {
userId: string;
email: string;
roles: string[];
}
// Generate tokens
function generateTokens(user: User): { accessToken: string; refreshToken: string } {
const payload: TokenPayload = {
userId: user.id,
email: user.email,
roles: user.roles,
};
const accessToken = jwt.sign(payload, process.env.JWT_SECRET!, {
expiresIn: '15m',
algorithm: 'RS256',
});
const refreshToken = jwt.sign(
{ userId: user.id, tokenVersion: user.tokenVersion },
process.env.JWT_REFRESH_SECRET!,
{ expiresIn: '7d', algorithm: 'RS256' }
);
return { accessToken, refreshToken };
}
// Middleware
const authenticate: RequestHandler = async (req, res, next) => {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith('Bearer ')) {
return res.status(401).json({ error: { code: 'AUTHENTICATION_REQUIRED' } });
}
try {
const token = authHeader.slice(7);
const payload = jwt.verify(token, process.env.JWT_SECRET!) as TokenPayload;
req.user = payload;
next();
} catch (err) {
if (err instanceof jwt.TokenExpiredError) {
return res.status(401).json({ error: { code: 'TOKEN_EXPIRED' } });
}
return res.status(401).json({ error: { code: 'INVALID_TOKEN' } });
}
};
```
### API Key Authentication (Service-to-Service)
```typescript
// API key middleware
const apiKeyAuth: RequestHandler = async (req, res, next) => {
const apiKey = req.headers['x-api-key'] as string;
if (!apiKey) {
return res.status(401).json({ error: { code: 'API_KEY_REQUIRED' } });
}
// Hash and lookup (never store plain API keys)
const hashedKey = crypto.createHash('sha256').update(apiKey).digest('hex');
const client = await db.apiClients.findByHashedKey(hashedKey);
if (!client || !client.isActive) {
return res.status(401).json({ error: { code: 'INVALID_API_KEY' } });
}
req.apiClient = client;
next();
};
```
---
## 7. Rate Limiting Design
### Rate Limit Headers
```
HTTP/1.1 200 OK
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 95
X-RateLimit-Reset: 1705312800
Retry-After: 60
```
### Tiered Rate Limits
```typescript
const rateLimits = {
anonymous: { requests: 60, window: '1m' },
authenticated: { requests: 1000, window: '1h' },
premium: { requests: 10000, window: '1h' },
};
// Implementation with Redis
import { RateLimiterRedis } from 'rate-limiter-flexible';
const createRateLimiter = (tier: keyof typeof rateLimits) => {
const config = rateLimits[tier];
return new RateLimiterRedis({
storeClient: redisClient,
keyPrefix: `ratelimit:tier`,
points: config.requests,
duration: parseDuration(config.window),
});
};
```
### Rate Limit Response
```json
{
"error": {
"code": "RATE_LIMIT_EXCEEDED",
"message": "Too many requests",
"details": {
"limit": 100,
"window": "1 minute",
"retry_after": 45
}
}
}
```
---
## 8. Idempotency Patterns
### Idempotency Key Header
```
POST /payments
Idempotency-Key: payment_abc123_attempt1
Content-Type: application/json
{
"amount": 1000,
"currency": "USD"
}
```
### Implementation
```typescript
const idempotencyMiddleware: RequestHandler = async (req, res, next) => {
const idempotencyKey = req.headers['idempotency-key'] as string;
if (!idempotencyKey) {
return next(); // Optional for some endpoints
}
// Check for existing response
const cached = await redis.get(`idempotency:idempotencyKey`);
if (cached) {
const { statusCode, body } = JSON.parse(cached);
return res.status(statusCode).json(body);
}
// Store response after processing
const originalJson = res.json.bind(res);
res.json = (body: any) => {
redis.setex(
`idempotency:idempotencyKey`,
86400, // 24 hours
JSON.stringify({ statusCode: res.statusCode, body })
);
return originalJson(body);
};
next();
};
```
---
## Quick Reference: HTTP Methods
| Method | Idempotent | Safe | Cacheable | Request Body |
|--------|------------|------|-----------|--------------|
| GET | Yes | Yes | Yes | No |
| HEAD | Yes | Yes | Yes | No |
| POST | No | No | Conditional | Yes |
| PUT | Yes | No | No | Yes |
| PATCH | No | No | No | Yes |
| DELETE | Yes | No | No | Optional |
| OPTIONS | Yes | Yes | No | No |
FILE:references/backend_security_practices.md
# Backend Security Practices
Security patterns and OWASP Top 10 mitigations for Node.js/Express applications.
## Guide Index
1. [OWASP Top 10 Mitigations](#1-owasp-top-10-mitigations)
2. [Input Validation](#2-input-validation)
3. [SQL Injection Prevention](#3-sql-injection-prevention)
4. [XSS Prevention](#4-xss-prevention)
5. [Authentication Security](#5-authentication-security)
6. [Authorization Patterns](#6-authorization-patterns)
7. [Security Headers](#7-security-headers)
8. [Secrets Management](#8-secrets-management)
9. [Logging and Monitoring](#9-logging-and-monitoring)
---
## 1. OWASP Top 10 Mitigations
### A01: Broken Access Control
```typescript
// BAD: Direct object reference
app.get('/users/:id/profile', async (req, res) => {
const user = await db.users.findById(req.params.id);
res.json(user); // Anyone can access any user!
});
// GOOD: Verify ownership
app.get('/users/:id/profile', authenticate, async (req, res) => {
const userId = req.params.id;
// Verify user can only access their own data
if (req.user.id !== userId && !req.user.roles.includes('admin')) {
return res.status(403).json({ error: { code: 'FORBIDDEN' } });
}
const user = await db.users.findById(userId);
res.json(user);
});
```
### A02: Cryptographic Failures
```typescript
// BAD: Weak hashing
const hash = crypto.createHash('md5').update(password).digest('hex');
// GOOD: bcrypt with appropriate cost factor
import bcrypt from 'bcrypt';
const SALT_ROUNDS = 12; // Adjust based on hardware
async function hashPassword(password: string): Promise<string> {
return bcrypt.hash(password, SALT_ROUNDS);
}
async function verifyPassword(password: string, hash: string): Promise<boolean> {
return bcrypt.compare(password, hash);
}
```
### A03: Injection
```typescript
// BAD: String concatenation in SQL
const query = `SELECT * FROM users WHERE email = 'email'`;
// GOOD: Parameterized queries
const result = await db.query(
'SELECT * FROM users WHERE email = $1',
[email]
);
```
### A04: Insecure Design
```typescript
// BAD: No rate limiting on sensitive operations
app.post('/forgot-password', async (req, res) => {
await sendResetEmail(req.body.email);
res.json({ message: 'If email exists, reset link sent' });
});
// GOOD: Rate limit + consistent response time
import rateLimit from 'express-rate-limit';
const passwordResetLimiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 3, // 3 attempts per 15 minutes
skipSuccessfulRequests: false,
});
app.post('/forgot-password', passwordResetLimiter, async (req, res) => {
const startTime = Date.now();
try {
const user = await db.users.findByEmail(req.body.email);
if (user) {
await sendResetEmail(user.email);
}
} catch (err) {
logger.error(err);
}
// Consistent response time prevents timing attacks
const elapsed = Date.now() - startTime;
const minDelay = 500;
if (elapsed < minDelay) {
await sleep(minDelay - elapsed);
}
// Same response regardless of email existence
res.json({ message: 'If email exists, reset link sent' });
});
```
### A05: Security Misconfiguration
```typescript
// BAD: Detailed errors in production
app.use((err, req, res, next) => {
res.status(500).json({
error: err.message,
stack: err.stack, // Exposes internals!
});
});
// GOOD: Environment-aware error handling
app.use((err: Error, req: Request, res: Response, next: NextFunction) => {
const requestId = req.id;
// Always log full error internally
logger.error({ err, requestId }, 'Unhandled error');
// Return safe response
res.status(500).json({
error: {
code: 'INTERNAL_ERROR',
message: process.env.NODE_ENV === 'development'
? err.message
: 'An unexpected error occurred',
requestId,
},
});
});
```
### A06: Vulnerable Components
```bash
# Check for vulnerabilities
npm audit
# Fix automatically where possible
npm audit fix
# Check specific package
npm audit --package-lock-only
# Use Snyk for deeper analysis
npx snyk test
```
```typescript
// Automated dependency updates (package.json)
{
"scripts": {
"security:audit": "npm audit --audit-level=high",
"security:check": "snyk test",
"preinstall": "npm audit"
}
}
```
### A07: Authentication Failures
```typescript
// BAD: Weak session management
app.post('/login', async (req, res) => {
const user = await authenticate(req.body);
req.session.userId = user.id; // Session fixation risk
res.json({ success: true });
});
// GOOD: Regenerate session on authentication
app.post('/login', async (req, res) => {
const user = await authenticate(req.body);
// Regenerate session to prevent fixation
req.session.regenerate((err) => {
if (err) return next(err);
req.session.userId = user.id;
req.session.createdAt = Date.now();
req.session.save((err) => {
if (err) return next(err);
res.json({ success: true });
});
});
});
```
### A08: Software and Data Integrity Failures
```typescript
// Verify webhook signatures (e.g., Stripe)
import Stripe from 'stripe';
app.post('/webhooks/stripe',
express.raw({ type: 'application/json' }),
async (req, res) => {
const sig = req.headers['stripe-signature'] as string;
const endpointSecret = process.env.STRIPE_WEBHOOK_SECRET!;
let event: Stripe.Event;
try {
event = stripe.webhooks.constructEvent(
req.body,
sig,
endpointSecret
);
} catch (err) {
logger.warn({ err }, 'Webhook signature verification failed');
return res.status(400).json({ error: 'Invalid signature' });
}
// Process verified event
await handleStripeEvent(event);
res.json({ received: true });
}
);
```
### A09: Security Logging Failures
```typescript
// Comprehensive security logging
import pino from 'pino';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
redact: ['req.headers.authorization', 'req.body.password'], // Redact sensitive
});
// Log security events
function logSecurityEvent(event: {
type: 'LOGIN_SUCCESS' | 'LOGIN_FAILURE' | 'ACCESS_DENIED' | 'SUSPICIOUS_ACTIVITY';
userId?: string;
ip: string;
userAgent: string;
details?: Record<string, unknown>;
}) {
logger.info({
security: true,
...event,
timestamp: new Date().toISOString(),
}, `Security event: event.type`);
}
// Usage
app.post('/login', async (req, res) => {
try {
const user = await authenticate(req.body);
logSecurityEvent({
type: 'LOGIN_SUCCESS',
userId: user.id,
ip: req.ip,
userAgent: req.headers['user-agent'] || '',
});
// ...
} catch (err) {
logSecurityEvent({
type: 'LOGIN_FAILURE',
ip: req.ip,
userAgent: req.headers['user-agent'] || '',
details: { email: req.body.email },
});
// ...
}
});
```
### A10: Server-Side Request Forgery (SSRF)
```typescript
// BAD: Unvalidated URL fetch
app.post('/fetch-url', async (req, res) => {
const response = await fetch(req.body.url); // SSRF vulnerability!
res.json({ data: await response.text() });
});
// GOOD: URL allowlist and validation
import { URL } from 'url';
const ALLOWED_HOSTS = ['api.example.com', 'cdn.example.com'];
function isAllowedUrl(urlString: string): boolean {
try {
const url = new URL(urlString);
// Block internal IPs
const blockedPatterns = [
/^localhost$/i,
/^127\./,
/^10\./,
/^172\.(1[6-9]|2[0-9]|3[0-1])\./,
/^192\.168\./,
/^0\./,
/^169\.254\./,
/^\[::1\]$/,
/^metadata\.google\.internal$/,
/^169\.254\.169\.254$/,
];
if (blockedPatterns.some(p => p.test(url.hostname))) {
return false;
}
// Only allow HTTPS
if (url.protocol !== 'https:') {
return false;
}
// Check allowlist
return ALLOWED_HOSTS.includes(url.hostname);
} catch {
return false;
}
}
app.post('/fetch-url', async (req, res) => {
const { url } = req.body;
if (!isAllowedUrl(url)) {
return res.status(400).json({ error: { code: 'INVALID_URL' } });
}
const response = await fetch(url, {
timeout: 5000,
follow: 0, // Don't follow redirects
});
res.json({ data: await response.text() });
});
```
---
## 2. Input Validation
### Schema Validation with Zod
```typescript
import { z } from 'zod';
// Define schemas
const CreateUserSchema = z.object({
email: z.string().email().max(255).toLowerCase(),
password: z.string()
.min(8, 'Password must be at least 8 characters')
.max(72, 'Password must be at most 72 characters') // bcrypt limit
.regex(/[A-Z]/, 'Password must contain uppercase letter')
.regex(/[a-z]/, 'Password must contain lowercase letter')
.regex(/[0-9]/, 'Password must contain number'),
name: z.string().min(1).max(100).trim(),
age: z.number().int().min(18).max(120).optional(),
});
const PaginationSchema = z.object({
limit: z.coerce.number().int().min(1).max(100).default(20),
offset: z.coerce.number().int().min(0).default(0),
sort: z.enum(['asc', 'desc']).default('desc'),
});
// Validation middleware
function validate<T>(schema: z.ZodSchema<T>) {
return (req: Request, res: Response, next: NextFunction) => {
const result = schema.safeParse(req.body);
if (!result.success) {
const details = result.error.errors.map(err => ({
field: err.path.join('.'),
code: err.code,
message: err.message,
}));
return res.status(400).json({
error: {
code: 'VALIDATION_ERROR',
message: 'Request validation failed',
details,
},
});
}
req.body = result.data;
next();
};
}
// Usage
app.post('/users', validate(CreateUserSchema), async (req, res) => {
// req.body is now typed and validated
const user = await userService.create(req.body);
res.status(201).json(user);
});
```
### Sanitization
```typescript
import DOMPurify from 'isomorphic-dompurify';
import xss from 'xss';
// HTML sanitization for rich text fields
function sanitizeHtml(dirty: string): string {
return DOMPurify.sanitize(dirty, {
ALLOWED_TAGS: ['b', 'i', 'em', 'strong', 'a', 'p', 'br'],
ALLOWED_ATTR: ['href'],
});
}
// Plain text sanitization (strip all HTML)
function sanitizePlainText(dirty: string): string {
return xss(dirty, {
whiteList: {},
stripIgnoreTag: true,
stripIgnoreTagBody: ['script'],
});
}
// File path sanitization
import path from 'path';
function sanitizePath(userPath: string, baseDir: string): string | null {
const resolved = path.resolve(baseDir, userPath);
// Prevent directory traversal
if (!resolved.startsWith(baseDir)) {
return null;
}
return resolved;
}
```
---
## 3. SQL Injection Prevention
### Parameterized Queries
```typescript
// BAD: String interpolation
const email = "'; DROP TABLE users; --";
db.query(`SELECT * FROM users WHERE email = 'email'`);
// GOOD: Parameterized query (pg)
const result = await db.query(
'SELECT * FROM users WHERE email = $1',
[email]
);
// GOOD: Parameterized query (mysql2)
const [rows] = await connection.execute(
'SELECT * FROM users WHERE email = ?',
[email]
);
```
### Query Builders
```typescript
// Using Knex.js
const users = await knex('users')
.where('email', email) // Automatically parameterized
.andWhere('status', 'active')
.select('id', 'name', 'email');
// Dynamic WHERE with safe column names
const ALLOWED_COLUMNS = ['name', 'email', 'created_at'] as const;
function buildUserQuery(filters: Record<string, string>) {
let query = knex('users').select('id', 'name', 'email');
for (const [column, value] of Object.entries(filters)) {
// Validate column name against allowlist
if (ALLOWED_COLUMNS.includes(column as any)) {
query = query.where(column, value);
}
}
return query;
}
```
### ORM Safety
```typescript
// Prisma (safe by default)
const user = await prisma.user.findUnique({
where: { email }, // Automatically escaped
});
// TypeORM (safe by default)
const user = await userRepository.findOne({
where: { email }, // Automatically escaped
});
// DANGER: Raw queries still require parameterization
// BAD
await prisma.$queryRawUnsafe(`SELECT * FROM users WHERE email = 'email'`);
// GOOD
await prisma.$queryRaw`SELECT * FROM users WHERE email = email`;
```
---
## 4. XSS Prevention
### Output Encoding
```typescript
// Server-side template rendering (EJS)
// In template: <%= userInput %> (escaped)
// NOT: <%- userInput %> (raw, dangerous)
// Manual HTML encoding
function escapeHtml(str: string): string {
return str
.replace(/&/g, '&')
.replace(/</g, '<')
.replace(/>/g, '>')
.replace(/"/g, '"')
.replace(/'/g, ''');
}
// JSON response (automatically safe in modern frameworks)
res.json({ message: userInput }); // JSON.stringify escapes by default
```
### Content Security Policy
```typescript
import helmet from 'helmet';
app.use(helmet.contentSecurityPolicy({
directives: {
defaultSrc: ["'self'"],
scriptSrc: ["'self'", "'strict-dynamic'"],
styleSrc: ["'self'", "'unsafe-inline'"], // Consider using nonces
imgSrc: ["'self'", "data:", "https:"],
fontSrc: ["'self'"],
objectSrc: ["'none'"],
frameAncestors: ["'none'"],
baseUri: ["'self'"],
formAction: ["'self'"],
upgradeInsecureRequests: [],
},
}));
```
### API Response Safety
```typescript
// Set correct Content-Type for JSON APIs
app.use((req, res, next) => {
res.setHeader('Content-Type', 'application/json; charset=utf-8');
res.setHeader('X-Content-Type-Options', 'nosniff');
next();
});
// Disable JSONP (if not needed)
// Don't implement callback parameter handling
// Safe JSON response
res.json({
data: sanitizedData,
// Never reflect raw user input
});
```
---
## 5. Authentication Security
### Password Storage
```typescript
import bcrypt from 'bcrypt';
import { randomBytes } from 'crypto';
const SALT_ROUNDS = 12;
async function hashPassword(password: string): Promise<string> {
return bcrypt.hash(password, SALT_ROUNDS);
}
async function verifyPassword(password: string, hash: string): Promise<boolean> {
return bcrypt.compare(password, hash);
}
// For password reset tokens
function generateSecureToken(): string {
return randomBytes(32).toString('hex');
}
// Token expiration (store in DB)
interface PasswordResetToken {
token: string; // Hashed
userId: string;
expiresAt: Date; // 1 hour from creation
}
```
### JWT Best Practices
```typescript
import jwt from 'jsonwebtoken';
// Use asymmetric keys in production
const PRIVATE_KEY = process.env.JWT_PRIVATE_KEY!;
const PUBLIC_KEY = process.env.JWT_PUBLIC_KEY!;
interface AccessTokenPayload {
sub: string; // User ID
email: string;
roles: string[];
iat: number;
exp: number;
}
function generateAccessToken(user: User): string {
const payload: Omit<AccessTokenPayload, 'iat' | 'exp'> = {
sub: user.id,
email: user.email,
roles: user.roles,
};
return jwt.sign(payload, PRIVATE_KEY, {
algorithm: 'RS256',
expiresIn: '15m',
issuer: 'api.example.com',
audience: 'example.com',
});
}
function verifyAccessToken(token: string): AccessTokenPayload {
return jwt.verify(token, PUBLIC_KEY, {
algorithms: ['RS256'],
issuer: 'api.example.com',
audience: 'example.com',
}) as AccessTokenPayload;
}
// Refresh tokens should be stored in DB and rotated
interface RefreshToken {
id: string;
token: string; // Hashed
userId: string;
expiresAt: Date;
family: string; // For rotation detection
isRevoked: boolean;
}
```
### Session Management
```typescript
import session from 'express-session';
import RedisStore from 'connect-redis';
import { createClient } from 'redis';
const redisClient = createClient({ url: process.env.REDIS_URL });
app.use(session({
store: new RedisStore({ client: redisClient }),
name: 'sessionId', // Don't use default 'connect.sid'
secret: process.env.SESSION_SECRET!,
resave: false,
saveUninitialized: false,
cookie: {
secure: process.env.NODE_ENV === 'production',
httpOnly: true,
sameSite: 'strict',
maxAge: 24 * 60 * 60 * 1000, // 24 hours
domain: process.env.COOKIE_DOMAIN,
},
}));
// Regenerate session on privilege change
async function elevateSession(req: Request): Promise<void> {
return new Promise((resolve, reject) => {
const userId = req.session.userId;
req.session.regenerate((err) => {
if (err) return reject(err);
req.session.userId = userId;
req.session.elevated = true;
req.session.elevatedAt = Date.now();
resolve();
});
});
}
```
---
## 6. Authorization Patterns
### Role-Based Access Control (RBAC)
```typescript
type Role = 'user' | 'moderator' | 'admin';
type Permission = 'read:users' | 'write:users' | 'delete:users' | 'read:admin';
const ROLE_PERMISSIONS: Record<Role, Permission[]> = {
user: ['read:users'],
moderator: ['read:users', 'write:users'],
admin: ['read:users', 'write:users', 'delete:users', 'read:admin'],
};
function hasPermission(userRoles: Role[], required: Permission): boolean {
return userRoles.some(role =>
ROLE_PERMISSIONS[role]?.includes(required)
);
}
// Middleware
function requirePermission(permission: Permission) {
return (req: Request, res: Response, next: NextFunction) => {
if (!hasPermission(req.user.roles, permission)) {
return res.status(403).json({
error: { code: 'FORBIDDEN', message: 'Insufficient permissions' },
});
}
next();
};
}
// Usage
app.delete('/users/:id',
authenticate,
requirePermission('delete:users'),
deleteUserHandler
);
```
### Attribute-Based Access Control (ABAC)
```typescript
interface AccessContext {
user: { id: string; roles: string[]; department: string };
resource: { ownerId: string; department: string; sensitivity: string };
action: 'read' | 'write' | 'delete';
environment: { time: Date; ip: string };
}
interface Policy {
name: string;
condition: (ctx: AccessContext) => boolean;
}
const policies: Policy[] = [
{
name: 'owner-full-access',
condition: (ctx) => ctx.resource.ownerId === ctx.user.id,
},
{
name: 'same-department-read',
condition: (ctx) =>
ctx.action === 'read' &&
ctx.resource.department === ctx.user.department,
},
{
name: 'admin-override',
condition: (ctx) => ctx.user.roles.includes('admin'),
},
{
name: 'no-sensitive-outside-hours',
condition: (ctx) => {
const hour = ctx.environment.time.getHours();
return ctx.resource.sensitivity !== 'high' || (hour >= 9 && hour <= 17);
},
},
];
function evaluateAccess(ctx: AccessContext): boolean {
return policies.some(policy => policy.condition(ctx));
}
```
---
## 7. Security Headers
### Complete Helmet Configuration
```typescript
import helmet from 'helmet';
app.use(helmet({
// Content Security Policy
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
scriptSrc: ["'self'"],
styleSrc: ["'self'", "'unsafe-inline'"],
imgSrc: ["'self'", "data:", "https:"],
connectSrc: ["'self'", "https://api.example.com"],
fontSrc: ["'self'"],
objectSrc: ["'none'"],
mediaSrc: ["'none'"],
frameSrc: ["'none'"],
},
},
// Strict Transport Security
hsts: {
maxAge: 31536000,
includeSubDomains: true,
preload: true,
},
// Prevent clickjacking
frameguard: { action: 'deny' },
// Prevent MIME sniffing
noSniff: true,
// XSS filter (legacy browsers)
xssFilter: true,
// Hide X-Powered-By
hidePoweredBy: true,
// Referrer policy
referrerPolicy: { policy: 'strict-origin-when-cross-origin' },
// Cross-origin policies
crossOriginEmbedderPolicy: false, // Enable if using SharedArrayBuffer
crossOriginOpenerPolicy: { policy: 'same-origin' },
crossOriginResourcePolicy: { policy: 'same-origin' },
}));
// CORS configuration
import cors from 'cors';
app.use(cors({
origin: ['https://example.com', 'https://app.example.com'],
methods: ['GET', 'POST', 'PUT', 'DELETE', 'PATCH'],
allowedHeaders: ['Content-Type', 'Authorization'],
credentials: true,
maxAge: 86400, // 24 hours
}));
```
### Header Reference
| Header | Purpose | Value |
|--------|---------|-------|
| `Strict-Transport-Security` | Force HTTPS | `max-age=31536000; includeSubDomains; preload` |
| `Content-Security-Policy` | Prevent XSS | See above |
| `X-Content-Type-Options` | Prevent MIME sniffing | `nosniff` |
| `X-Frame-Options` | Prevent clickjacking | `DENY` |
| `Referrer-Policy` | Control referrer info | `strict-origin-when-cross-origin` |
| `Permissions-Policy` | Feature restrictions | `geolocation=(), microphone=()` |
---
## 8. Secrets Management
### Environment Variables
```typescript
// config/secrets.ts
import { z } from 'zod';
const SecretsSchema = z.object({
DATABASE_URL: z.string().url(),
JWT_SECRET: z.string().min(32),
JWT_PRIVATE_KEY: z.string(),
JWT_PUBLIC_KEY: z.string(),
REDIS_URL: z.string().url(),
STRIPE_SECRET_KEY: z.string().startsWith('sk_'),
STRIPE_WEBHOOK_SECRET: z.string().startsWith('whsec_'),
});
// Validate on startup
export const secrets = SecretsSchema.parse(process.env);
// NEVER log secrets
console.log('Config loaded:', {
database: secrets.DATABASE_URL.replace(/\/\/.*@/, '//***@'),
redis: 'configured',
stripe: 'configured',
});
```
### Secret Rotation
```typescript
// Support multiple keys during rotation
const JWT_SECRETS = [
process.env.JWT_SECRET_CURRENT!,
process.env.JWT_SECRET_PREVIOUS!, // Keep for grace period
].filter(Boolean);
function verifyTokenWithRotation(token: string): TokenPayload | null {
for (const secret of JWT_SECRETS) {
try {
return jwt.verify(token, secret) as TokenPayload;
} catch {
continue;
}
}
return null;
}
```
### Vault Integration
```typescript
import Vault from 'node-vault';
const vault = Vault({
endpoint: process.env.VAULT_ADDR,
token: process.env.VAULT_TOKEN,
});
async function getSecret(path: string): Promise<string> {
const result = await vault.read(`secret/data/path`);
return result.data.data.value;
}
// Cache secrets with TTL
const secretsCache = new Map<string, { value: string; expiresAt: number }>();
const CACHE_TTL = 5 * 60 * 1000; // 5 minutes
async function getCachedSecret(path: string): Promise<string> {
const cached = secretsCache.get(path);
if (cached && cached.expiresAt > Date.now()) {
return cached.value;
}
const value = await getSecret(path);
secretsCache.set(path, { value, expiresAt: Date.now() + CACHE_TTL });
return value;
}
```
---
## 9. Logging and Monitoring
### Security Event Logging
```typescript
import pino from 'pino';
const logger = pino({
level: 'info',
redact: {
paths: [
'req.headers.authorization',
'req.headers.cookie',
'req.body.password',
'req.body.token',
'*.password',
'*.secret',
'*.apiKey',
],
censor: '[REDACTED]',
},
});
// Security event types
type SecurityEventType =
| 'AUTH_SUCCESS'
| 'AUTH_FAILURE'
| 'AUTH_LOCKOUT'
| 'PASSWORD_CHANGED'
| 'PASSWORD_RESET_REQUEST'
| 'PERMISSION_DENIED'
| 'RATE_LIMIT_EXCEEDED'
| 'SUSPICIOUS_ACTIVITY'
| 'TOKEN_REVOKED';
interface SecurityEvent {
type: SecurityEventType;
userId?: string;
ip: string;
userAgent: string;
path: string;
details?: Record<string, unknown>;
}
function logSecurityEvent(event: SecurityEvent): void {
logger.info({
security: true,
...event,
timestamp: new Date().toISOString(),
}, `Security: event.type`);
}
```
### Request Logging
```typescript
import pinoHttp from 'pino-http';
app.use(pinoHttp({
logger,
genReqId: (req) => req.headers['x-request-id'] || crypto.randomUUID(),
serializers: {
req: (req) => ({
id: req.id,
method: req.method,
url: req.url,
remoteAddress: req.remoteAddress,
// Don't log headers by default (may contain sensitive data)
}),
res: (res) => ({
statusCode: res.statusCode,
}),
},
customLogLevel: (req, res, err) => {
if (res.statusCode >= 500 || err) return 'error';
if (res.statusCode >= 400) return 'warn';
return 'info';
},
}));
```
### Alerting Thresholds
| Metric | Warning | Critical |
|--------|---------|----------|
| Failed logins per IP (15 min) | > 5 | > 10 |
| Failed logins per account (1 hour) | > 3 | > 5 |
| 403 responses per IP (5 min) | > 10 | > 50 |
| 500 errors (5 min) | > 5 | > 20 |
| Request rate per IP (1 min) | > 100 | > 500 |
---
## Quick Reference: Security Checklist
### Authentication
- [ ] bcrypt with cost >= 12 for password hashing
- [ ] JWT with RS256, short expiry (15-30 min)
- [ ] Refresh token rotation with family detection
- [ ] Session regeneration on login
- [ ] Secure cookie flags (httpOnly, secure, sameSite)
### Input Validation
- [ ] Schema validation on all inputs (Zod)
- [ ] Parameterized queries (never string concat)
- [ ] File path sanitization
- [ ] Content-Type validation
### Headers
- [ ] Strict-Transport-Security
- [ ] Content-Security-Policy
- [ ] X-Content-Type-Options: nosniff
- [ ] X-Frame-Options: DENY
- [ ] CORS with specific origins
### Logging
- [ ] Redact sensitive fields
- [ ] Log security events
- [ ] Include request IDs
- [ ] Alert on anomalies
### Dependencies
- [ ] npm audit in CI
- [ ] Automated dependency updates
- [ ] Lock file committed
FILE:references/database_optimization_guide.md
# Database Optimization Guide
Practical strategies for PostgreSQL query optimization, indexing, and performance tuning.
## Guide Index
1. [Query Analysis with EXPLAIN](#1-query-analysis-with-explain)
2. [Indexing Strategies](#2-indexing-strategies)
3. [N+1 Query Problem](#3-n1-query-problem)
4. [Connection Pooling](#4-connection-pooling)
5. [Query Optimization Patterns](#5-query-optimization-patterns)
6. [Database Migrations](#6-database-migrations)
7. [Monitoring and Alerting](#7-monitoring-and-alerting)
---
## 1. Query Analysis with EXPLAIN
### Basic EXPLAIN Usage
```sql
-- Show query plan
EXPLAIN SELECT * FROM orders WHERE user_id = 123;
-- Show plan with actual execution times
EXPLAIN ANALYZE SELECT * FROM orders WHERE user_id = 123;
-- Show buffers and I/O statistics
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM orders WHERE user_id = 123;
```
### Reading EXPLAIN Output
```
QUERY PLAN
---------------------------------------------------------------------------
Index Scan using idx_orders_user_id on orders (cost=0.43..8.45 rows=10 width=120)
Index Cond: (user_id = 123)
Buffers: shared hit=3
Planning Time: 0.152 ms
Execution Time: 0.089 ms
```
**Key metrics:**
- `cost`: Estimated cost (startup..total)
- `rows`: Estimated row count
- `width`: Average row size in bytes
- `actual time`: Real execution time (with ANALYZE)
- `Buffers: shared hit`: Pages read from cache
### Scan Types (Best to Worst)
| Scan Type | Description | Performance |
|-----------|-------------|-------------|
| Index Only Scan | Data from index alone | Best |
| Index Scan | Index lookup + heap fetch | Good |
| Bitmap Index Scan | Multiple index conditions | Good |
| Index Scan + Filter | Index + row filtering | Okay |
| Seq Scan (small table) | Full table scan | Okay |
| Seq Scan (large table) | Full table scan | Bad |
| Nested Loop (large) | O(n*m) join | Very Bad |
### Warning Signs
```sql
-- BAD: Sequential scan on large table
Seq Scan on orders (cost=0.00..1854231.00 rows=50000000 width=120)
Filter: (status = 'pending')
Rows Removed by Filter: 49500000
-- BAD: Nested loop with high iterations
Nested Loop (cost=0.43..2847593.20 rows=12500000 width=240)
-> Seq Scan on users (cost=0.00..1250.00 rows=50000 width=120)
-> Index Scan on orders (cost=0.43..45.73 rows=250 width=120)
Index Cond: (orders.user_id = users.id)
```
---
## 2. Indexing Strategies
### Index Types
```sql
-- B-tree (default, most common)
CREATE INDEX idx_users_email ON users(email);
-- Hash (equality only, rarely better than B-tree)
CREATE INDEX idx_users_id_hash ON users USING hash(id);
-- GIN (arrays, JSONB, full-text search)
CREATE INDEX idx_products_tags ON products USING gin(tags);
CREATE INDEX idx_users_data ON users USING gin(metadata jsonb_path_ops);
-- GiST (geometric, range types, full-text)
CREATE INDEX idx_locations_point ON locations USING gist(coordinates);
```
### Composite Indexes
```sql
-- Order matters! Column with = first, then range/sort
CREATE INDEX idx_orders_user_status_date
ON orders(user_id, status, created_at DESC);
-- This index supports:
-- WHERE user_id = ?
-- WHERE user_id = ? AND status = ?
-- WHERE user_id = ? AND status = ? ORDER BY created_at DESC
-- WHERE user_id = ? ORDER BY created_at DESC
-- This index does NOT efficiently support:
-- WHERE status = ? (user_id not in query)
-- WHERE created_at > ? (leftmost column not in query)
```
### Partial Indexes
```sql
-- Index only active users (smaller, faster)
CREATE INDEX idx_users_active_email
ON users(email)
WHERE status = 'active';
-- Index only recent orders
CREATE INDEX idx_orders_recent
ON orders(created_at DESC)
WHERE created_at > CURRENT_DATE - INTERVAL '90 days';
-- Index only unprocessed items
CREATE INDEX idx_queue_pending
ON job_queue(priority DESC, created_at)
WHERE processed_at IS NULL;
```
### Covering Indexes (Index-Only Scans)
```sql
-- Include non-indexed columns to avoid heap lookup
CREATE INDEX idx_users_email_covering
ON users(email)
INCLUDE (name, created_at);
-- Query can be satisfied from index alone
SELECT name, created_at FROM users WHERE email = '[email protected]';
-- Result: Index Only Scan
```
### Index Maintenance
```sql
-- Check index usage
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
ORDER BY idx_scan ASC;
-- Find unused indexes (candidates for removal)
SELECT indexrelid::regclass as index,
relid::regclass as table,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE idx_scan = 0
AND indexrelid NOT IN (SELECT conindid FROM pg_constraint);
-- Rebuild bloated indexes
REINDEX INDEX CONCURRENTLY idx_orders_user_id;
```
---
## 3. N+1 Query Problem
### The Problem
```typescript
// BAD: N+1 queries
const users = await db.query('SELECT * FROM users LIMIT 100');
for (const user of users) {
// This runs 100 times!
const orders = await db.query(
'SELECT * FROM orders WHERE user_id = $1',
[user.id]
);
user.orders = orders;
}
// Total queries: 1 + 100 = 101
```
### Solution 1: JOIN
```typescript
// GOOD: Single query with JOIN
const usersWithOrders = await db.query(`
SELECT u.*, o.id as order_id, o.total, o.status
FROM users u
LEFT JOIN orders o ON o.user_id = u.id
LIMIT 100
`);
// Total queries: 1
```
### Solution 2: Batch Loading (DataLoader pattern)
```typescript
// GOOD: Two queries with batch loading
const users = await db.query('SELECT * FROM users LIMIT 100');
const userIds = users.map(u => u.id);
const orders = await db.query(
'SELECT * FROM orders WHERE user_id = ANY($1)',
[userIds]
);
// Group orders by user_id
const ordersByUser = groupBy(orders, 'user_id');
users.forEach(user => {
user.orders = ordersByUser[user.id] || [];
});
// Total queries: 2
```
### Solution 3: ORM Eager Loading
```typescript
// Prisma
const users = await prisma.user.findMany({
take: 100,
include: { orders: true }
});
// TypeORM
const users = await userRepository.find({
take: 100,
relations: ['orders']
});
// Sequelize
const users = await User.findAll({
limit: 100,
include: [{ model: Order }]
});
```
### Detecting N+1 in Production
```typescript
// Query logging middleware
let queryCount = 0;
const originalQuery = db.query;
db.query = async (...args) => {
queryCount++;
if (queryCount > 10) {
console.warn(`High query count: queryCount in single request`);
console.trace();
}
return originalQuery.apply(db, args);
};
```
---
## 4. Connection Pooling
### Why Pooling Matters
```
Without pooling:
Request → Create connection → Query → Close connection
(50-100ms overhead)
With pooling:
Request → Get connection from pool → Query → Return to pool
(0-1ms overhead)
```
### pg-pool Configuration
```typescript
import { Pool } from 'pg';
const pool = new Pool({
host: process.env.DB_HOST,
port: 5432,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
// Pool settings
min: 5, // Minimum connections
max: 20, // Maximum connections
idleTimeoutMillis: 30000, // Close idle connections after 30s
connectionTimeoutMillis: 5000, // Fail if can't connect in 5s
// Statement timeout (cancel long queries)
statement_timeout: 30000,
});
// Health check
pool.on('error', (err, client) => {
console.error('Unexpected pool error', err);
});
```
### Pool Sizing Formula
```
Optimal connections = (CPU cores * 2) + effective_spindle_count
For SSD with 4 cores:
connections = (4 * 2) + 1 = 9
For multiple app servers:
connections_per_server = total_connections / num_servers
```
### PgBouncer for High Scale
```ini
# pgbouncer.ini
[databases]
mydb = host=localhost port=5432 dbname=mydb
[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 20
reserve_pool_size = 5
```
---
## 5. Query Optimization Patterns
### Pagination Optimization
```sql
-- BAD: OFFSET is slow for large values
SELECT * FROM orders ORDER BY created_at DESC LIMIT 20 OFFSET 10000;
-- Must scan 10,020 rows, discard 10,000
-- GOOD: Cursor-based pagination
SELECT * FROM orders
WHERE created_at < '2024-01-15T10:00:00Z'
ORDER BY created_at DESC
LIMIT 20;
-- Only scans 20 rows
```
### Batch Updates
```sql
-- BAD: Individual updates
UPDATE orders SET status = 'shipped' WHERE id = 1;
UPDATE orders SET status = 'shipped' WHERE id = 2;
-- ...repeat 1000 times
-- GOOD: Batch update
UPDATE orders
SET status = 'shipped'
WHERE id = ANY(ARRAY[1, 2, 3, ...1000]);
-- GOOD: Update from values
UPDATE orders o
SET status = v.new_status
FROM (VALUES
(1, 'shipped'),
(2, 'delivered'),
(3, 'cancelled')
) AS v(id, new_status)
WHERE o.id = v.id;
```
### Avoiding SELECT *
```sql
-- BAD: Fetches all columns including large text/blob
SELECT * FROM articles WHERE published = true;
-- GOOD: Only fetch needed columns
SELECT id, title, summary, author_id, published_at
FROM articles
WHERE published = true;
```
### Using EXISTS vs IN
```sql
-- For checking existence, EXISTS is often faster
-- BAD
SELECT * FROM users
WHERE id IN (SELECT user_id FROM orders WHERE total > 1000);
-- GOOD (for large subquery results)
SELECT * FROM users u
WHERE EXISTS (
SELECT 1 FROM orders o
WHERE o.user_id = u.id AND o.total > 1000
);
```
### Materialized Views for Complex Aggregations
```sql
-- Create materialized view for expensive aggregations
CREATE MATERIALIZED VIEW daily_sales_summary AS
SELECT
date_trunc('day', created_at) as date,
product_id,
COUNT(*) as order_count,
SUM(quantity) as total_quantity,
SUM(total) as total_revenue
FROM orders
GROUP BY date_trunc('day', created_at), product_id;
-- Create index on materialized view
CREATE INDEX idx_daily_sales_date ON daily_sales_summary(date);
-- Refresh periodically
REFRESH MATERIALIZED VIEW CONCURRENTLY daily_sales_summary;
```
---
## 6. Database Migrations
### Migration Best Practices
```sql
-- Always include rollback
-- migrations/20240115_001_add_user_status.sql
-- UP
ALTER TABLE users ADD COLUMN status VARCHAR(20) DEFAULT 'active';
CREATE INDEX CONCURRENTLY idx_users_status ON users(status);
-- DOWN (in separate file or comment)
DROP INDEX CONCURRENTLY IF EXISTS idx_users_status;
ALTER TABLE users DROP COLUMN IF EXISTS status;
```
### Safe Column Addition
```sql
-- SAFE: Add nullable column (no table rewrite)
ALTER TABLE users ADD COLUMN phone VARCHAR(20);
-- SAFE: Add column with volatile default (PG 11+)
ALTER TABLE users ADD COLUMN created_at TIMESTAMP DEFAULT NOW();
-- UNSAFE: Add column with constant default (table rewrite before PG 11)
-- ALTER TABLE users ADD COLUMN score INTEGER DEFAULT 0;
-- SAFE alternative for constant default:
ALTER TABLE users ADD COLUMN score INTEGER;
UPDATE users SET score = 0 WHERE score IS NULL;
ALTER TABLE users ALTER COLUMN score SET DEFAULT 0;
ALTER TABLE users ALTER COLUMN score SET NOT NULL;
```
### Safe Index Creation
```sql
-- UNSAFE: Locks table
CREATE INDEX idx_orders_user ON orders(user_id);
-- SAFE: Non-blocking
CREATE INDEX CONCURRENTLY idx_orders_user ON orders(user_id);
-- Note: CONCURRENTLY cannot run in a transaction
```
### Safe Column Removal
```sql
-- Step 1: Stop writing to column (application change)
-- Step 2: Wait for all deployments
-- Step 3: Drop column
ALTER TABLE users DROP COLUMN IF EXISTS legacy_field;
```
---
## 7. Monitoring and Alerting
### Key Metrics to Monitor
```sql
-- Active connections
SELECT count(*) FROM pg_stat_activity WHERE state = 'active';
-- Connection by state
SELECT state, count(*)
FROM pg_stat_activity
GROUP BY state;
-- Long-running queries
SELECT
pid,
now() - pg_stat_activity.query_start AS duration,
query,
state
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '5 minutes'
AND state != 'idle';
-- Table bloat
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) as table_size,
pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) as index_size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
LIMIT 10;
```
### pg_stat_statements for Query Analysis
```sql
-- Enable extension
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
-- Find slowest queries
SELECT
round(total_exec_time::numeric, 2) as total_time_ms,
calls,
round(mean_exec_time::numeric, 2) as avg_time_ms,
round((100 * total_exec_time / sum(total_exec_time) over())::numeric, 2) as percentage,
query
FROM pg_stat_statements
ORDER BY total_exec_time DESC
LIMIT 10;
-- Find most frequent queries
SELECT
calls,
round(total_exec_time::numeric, 2) as total_time_ms,
round(mean_exec_time::numeric, 2) as avg_time_ms,
query
FROM pg_stat_statements
ORDER BY calls DESC
LIMIT 10;
```
### Alert Thresholds
| Metric | Warning | Critical |
|--------|---------|----------|
| Connection usage | > 70% | > 90% |
| Query time P95 | > 500ms | > 2s |
| Replication lag | > 30s | > 5m |
| Disk usage | > 70% | > 85% |
| Cache hit ratio | < 95% | < 90% |
---
## Quick Reference: PostgreSQL Commands
```sql
-- Check table sizes
SELECT pg_size_pretty(pg_total_relation_size('orders'));
-- Check index sizes
SELECT pg_size_pretty(pg_indexes_size('orders'));
-- Kill a query
SELECT pg_cancel_backend(pid); -- Graceful
SELECT pg_terminate_backend(pid); -- Force
-- Check locks
SELECT * FROM pg_locks WHERE granted = false;
-- Vacuum analyze (update statistics)
VACUUM ANALYZE orders;
-- Check autovacuum status
SELECT * FROM pg_stat_user_tables WHERE relname = 'orders';
```
FILE:scripts/api_load_tester.py
#!/usr/bin/env python3
"""
API Load Tester
Performs HTTP load testing with configurable concurrency, measuring latency
percentiles, throughput, and error rates.
Usage:
python api_load_tester.py https://api.example.com/users --concurrency 50 --duration 30
python api_load_tester.py https://api.example.com/orders --method POST --body '{"item": 1}'
python api_load_tester.py https://api.example.com/v1/users https://api.example.com/v2/users --compare
"""
import os
import sys
import json
import argparse
import time
import statistics
import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional, Tuple
from datetime import datetime
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from urllib.parse import urlparse
import ssl
@dataclass
class RequestResult:
"""Result of a single HTTP request."""
success: bool
status_code: int
latency_ms: float
error: Optional[str] = None
response_size: int = 0
@dataclass
class LoadTestResults:
"""Aggregated load test results."""
target_url: str
method: str
duration_seconds: float
concurrency: int
total_requests: int
successful_requests: int
failed_requests: int
requests_per_second: float
# Latency metrics (milliseconds)
latency_min: float
latency_max: float
latency_avg: float
latency_p50: float
latency_p90: float
latency_p95: float
latency_p99: float
latency_stddev: float
# Error breakdown
errors_by_type: Dict[str, int] = field(default_factory=dict)
# Transfer metrics
total_bytes_received: int = 0
throughput_mbps: float = 0.0
def success_rate(self) -> float:
"""Calculate success rate percentage."""
if self.total_requests == 0:
return 0.0
return (self.successful_requests / self.total_requests) * 100
def calculate_percentile(data: List[float], percentile: float) -> float:
"""Calculate percentile from sorted data."""
if not data:
return 0.0
k = (len(data) - 1) * (percentile / 100)
f = int(k)
c = f + 1 if f + 1 < len(data) else f
return data[f] + (data[c] - data[f]) * (k - f)
class HTTPClient:
"""HTTP client with configurable settings."""
def __init__(self, timeout: float = 30.0, headers: Optional[Dict[str, str]] = None,
verify_ssl: bool = True):
self.timeout = timeout
self.headers = headers or {}
self.verify_ssl = verify_ssl
# Create SSL context
if not verify_ssl:
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE
else:
self.ssl_context = None
def request(self, url: str, method: str = 'GET', body: Optional[bytes] = None) -> RequestResult:
"""Execute HTTP request and return result."""
start_time = time.perf_counter()
try:
request = Request(url, data=body, method=method)
# Add headers
for key, value in self.headers.items():
request.add_header(key, value)
# Add content-type for POST/PUT
if body and method in ['POST', 'PUT', 'PATCH']:
if 'Content-Type' not in self.headers:
request.add_header('Content-Type', 'application/json')
# Execute request
with urlopen(request, timeout=self.timeout, context=self.ssl_context) as response:
response_data = response.read()
elapsed = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=True,
status_code=response.status,
latency_ms=elapsed,
response_size=len(response_data),
)
except HTTPError as e:
elapsed = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=False,
status_code=e.code,
latency_ms=elapsed,
error=f"HTTP {e.code}: {e.reason}",
)
except URLError as e:
elapsed = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=False,
status_code=0,
latency_ms=elapsed,
error=f"Connection error: {str(e.reason)}",
)
except TimeoutError:
elapsed = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=False,
status_code=0,
latency_ms=elapsed,
error="Connection timeout",
)
except Exception as e:
elapsed = (time.perf_counter() - start_time) * 1000
return RequestResult(
success=False,
status_code=0,
latency_ms=elapsed,
error=str(e),
)
class LoadTester:
"""HTTP load testing engine."""
def __init__(self, url: str, method: str = 'GET', body: Optional[str] = None,
headers: Optional[Dict[str, str]] = None, concurrency: int = 10,
duration: float = 10.0, timeout: float = 30.0, verify_ssl: bool = True):
self.url = url
self.method = method.upper()
self.body = body.encode() if body else None
self.headers = headers or {}
self.concurrency = concurrency
self.duration = duration
self.timeout = timeout
self.verify_ssl = verify_ssl
self.results: List[RequestResult] = []
self.stop_event = threading.Event()
self.results_lock = threading.Lock()
def run(self) -> LoadTestResults:
"""Execute load test and return results."""
print(f"Load Testing: {self.url}")
print(f"Method: {self.method}")
print(f"Concurrency: {self.concurrency}")
print(f"Duration: {self.duration}s")
print("-" * 50)
self.results = []
self.stop_event.clear()
start_time = time.time()
# Start worker threads
with ThreadPoolExecutor(max_workers=self.concurrency) as executor:
futures = []
for _ in range(self.concurrency):
future = executor.submit(self._worker)
futures.append(future)
# Wait for duration
time.sleep(self.duration)
self.stop_event.set()
# Wait for workers to finish
for future in as_completed(futures):
try:
future.result()
except Exception as e:
print(f"Worker error: {e}")
elapsed_time = time.time() - start_time
return self._aggregate_results(elapsed_time)
def _worker(self):
"""Worker thread that continuously sends requests."""
client = HTTPClient(
timeout=self.timeout,
headers=self.headers,
verify_ssl=self.verify_ssl,
)
while not self.stop_event.is_set():
result = client.request(self.url, self.method, self.body)
with self.results_lock:
self.results.append(result)
def _aggregate_results(self, elapsed_time: float) -> LoadTestResults:
"""Aggregate individual results into summary."""
if not self.results:
return LoadTestResults(
target_url=self.url,
method=self.method,
duration_seconds=elapsed_time,
concurrency=self.concurrency,
total_requests=0,
successful_requests=0,
failed_requests=0,
requests_per_second=0,
latency_min=0,
latency_max=0,
latency_avg=0,
latency_p50=0,
latency_p90=0,
latency_p95=0,
latency_p99=0,
latency_stddev=0,
)
# Separate successful and failed
successful = [r for r in self.results if r.success]
failed = [r for r in self.results if not r.success]
# Latency calculations (from successful requests)
latencies = sorted([r.latency_ms for r in successful]) if successful else [0]
# Error breakdown
errors_by_type: Dict[str, int] = {}
for r in failed:
error_type = r.error or 'Unknown'
errors_by_type[error_type] = errors_by_type.get(error_type, 0) + 1
# Calculate throughput
total_bytes = sum(r.response_size for r in successful)
throughput_mbps = (total_bytes * 8) / (elapsed_time * 1_000_000) if elapsed_time > 0 else 0
return LoadTestResults(
target_url=self.url,
method=self.method,
duration_seconds=elapsed_time,
concurrency=self.concurrency,
total_requests=len(self.results),
successful_requests=len(successful),
failed_requests=len(failed),
requests_per_second=len(self.results) / elapsed_time if elapsed_time > 0 else 0,
latency_min=min(latencies),
latency_max=max(latencies),
latency_avg=statistics.mean(latencies) if latencies else 0,
latency_p50=calculate_percentile(latencies, 50),
latency_p90=calculate_percentile(latencies, 90),
latency_p95=calculate_percentile(latencies, 95),
latency_p99=calculate_percentile(latencies, 99),
latency_stddev=statistics.stdev(latencies) if len(latencies) > 1 else 0,
errors_by_type=errors_by_type,
total_bytes_received=total_bytes,
throughput_mbps=throughput_mbps,
)
def print_results(results: LoadTestResults, verbose: bool = False):
"""Print formatted load test results."""
print("\n" + "=" * 60)
print("LOAD TEST RESULTS")
print("=" * 60)
print(f"\nTarget: {results.target_url}")
print(f"Method: {results.method}")
print(f"Duration: {results.duration_seconds:.1f}s")
print(f"Concurrency: {results.concurrency}")
print(f"\nTHROUGHPUT:")
print(f" Total requests: {results.total_requests:,}")
print(f" Requests/sec: {results.requests_per_second:.1f}")
print(f" Successful: {results.successful_requests:,} ({results.success_rate():.1f}%)")
print(f" Failed: {results.failed_requests:,}")
print(f"\nLATENCY (ms):")
print(f" Min: {results.latency_min:.1f}")
print(f" Avg: {results.latency_avg:.1f}")
print(f" P50: {results.latency_p50:.1f}")
print(f" P90: {results.latency_p90:.1f}")
print(f" P95: {results.latency_p95:.1f}")
print(f" P99: {results.latency_p99:.1f}")
print(f" Max: {results.latency_max:.1f}")
print(f" StdDev: {results.latency_stddev:.1f}")
if results.errors_by_type:
print(f"\nERRORS:")
for error_type, count in sorted(results.errors_by_type.items(), key=lambda x: -x[1]):
print(f" {error_type}: {count}")
if verbose:
print(f"\nTRANSFER:")
print(f" Total bytes: {results.total_bytes_received:,}")
print(f" Throughput: {results.throughput_mbps:.2f} Mbps")
# Recommendations
print(f"\nRECOMMENDATIONS:")
if results.latency_p99 > 500:
print(f" Warning: P99 latency ({results.latency_p99:.0f}ms) exceeds 500ms")
print(f" Consider: Connection pooling, query optimization, caching")
if results.latency_p95 > 200:
print(f" Warning: P95 latency ({results.latency_p95:.0f}ms) exceeds 200ms target")
if results.success_rate() < 99.0:
print(f" Warning: Success rate ({results.success_rate():.1f}%) below 99%")
print(f" Check server capacity and error logs")
if results.latency_stddev > results.latency_avg:
print(f" Warning: High latency variance (stddev > avg)")
print(f" Indicates inconsistent performance")
if results.success_rate() >= 99.0 and results.latency_p95 <= 200:
print(f" Performance looks good for this load level")
print("=" * 60)
def compare_results(results1: LoadTestResults, results2: LoadTestResults):
"""Compare two load test results."""
print("\n" + "=" * 60)
print("COMPARISON RESULTS")
print("=" * 60)
print(f"\n{'Metric':<25} {'Endpoint 1':<15} {'Endpoint 2':<15} {'Diff':<15}")
print("-" * 70)
# Helper to format diff
def diff_str(v1: float, v2: float, lower_better: bool = True) -> str:
if v1 == 0:
return "N/A"
diff_pct = ((v2 - v1) / v1) * 100
symbol = "-" if (diff_pct < 0) == lower_better else "+"
color_good = diff_pct < 0 if lower_better else diff_pct > 0
return f"{symbol}{abs(diff_pct):.1f}%"
metrics = [
("Requests/sec", results1.requests_per_second, results2.requests_per_second, False),
("Success rate (%)", results1.success_rate(), results2.success_rate(), False),
("Latency Avg (ms)", results1.latency_avg, results2.latency_avg, True),
("Latency P50 (ms)", results1.latency_p50, results2.latency_p50, True),
("Latency P90 (ms)", results1.latency_p90, results2.latency_p90, True),
("Latency P95 (ms)", results1.latency_p95, results2.latency_p95, True),
("Latency P99 (ms)", results1.latency_p99, results2.latency_p99, True),
]
for name, v1, v2, lower_better in metrics:
print(f"{name:<25} {v1:<15.1f} {v2:<15.1f} {diff_str(v1, v2, lower_better):<15}")
print("-" * 70)
# Summary
print(f"\nEndpoint 1: {results1.target_url}")
print(f"Endpoint 2: {results2.target_url}")
# Determine winner
score1, score2 = 0, 0
if results1.requests_per_second > results2.requests_per_second:
score1 += 1
else:
score2 += 1
if results1.latency_p95 < results2.latency_p95:
score1 += 1
else:
score2 += 1
if results1.success_rate() > results2.success_rate():
score1 += 1
else:
score2 += 1
print(f"\nOverall: {'Endpoint 1' if score1 > score2 else 'Endpoint 2'} performs better")
print("=" * 60)
class APILoadTester:
"""Main load tester class with CLI integration."""
def __init__(self, urls: List[str], method: str = 'GET', body: Optional[str] = None,
headers: Optional[Dict[str, str]] = None, concurrency: int = 10,
duration: float = 10.0, timeout: float = 30.0, compare: bool = False,
verbose: bool = False, verify_ssl: bool = True):
self.urls = urls
self.method = method
self.body = body
self.headers = headers or {}
self.concurrency = concurrency
self.duration = duration
self.timeout = timeout
self.compare = compare
self.verbose = verbose
self.verify_ssl = verify_ssl
def run(self) -> Dict:
"""Execute load test(s) and return results."""
results = []
for url in self.urls:
tester = LoadTester(
url=url,
method=self.method,
body=self.body,
headers=self.headers,
concurrency=self.concurrency,
duration=self.duration,
timeout=self.timeout,
verify_ssl=self.verify_ssl,
)
result = tester.run()
results.append(result)
if not self.compare:
print_results(result, self.verbose)
if self.compare and len(results) >= 2:
compare_results(results[0], results[1])
return {
'status': 'success',
'results': [asdict(r) for r in results],
}
def parse_headers(header_args: Optional[List[str]]) -> Dict[str, str]:
"""Parse header arguments into dictionary."""
headers = {}
if header_args:
for h in header_args:
if ':' in h:
key, value = h.split(':', 1)
headers[key.strip()] = value.strip()
return headers
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description='HTTP load testing tool',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s https://api.example.com/users --concurrency 50 --duration 30
%(prog)s https://api.example.com/orders --method POST --body '{"item": 1}'
%(prog)s https://api.example.com/v1 https://api.example.com/v2 --compare
%(prog)s https://api.example.com/health --header "Authorization: Bearer token"
'''
)
parser.add_argument(
'urls',
nargs='+',
help='URL(s) to test'
)
parser.add_argument(
'--method', '-m',
default='GET',
choices=['GET', 'POST', 'PUT', 'PATCH', 'DELETE'],
help='HTTP method (default: GET)'
)
parser.add_argument(
'--body', '-b',
help='Request body (JSON string)'
)
parser.add_argument(
'--header', '-H',
action='append',
dest='headers',
help='HTTP header (format: "Name: Value")'
)
parser.add_argument(
'--concurrency', '-c',
type=int,
default=10,
help='Number of concurrent requests (default: 10)'
)
parser.add_argument(
'--duration', '-d',
type=float,
default=10.0,
help='Test duration in seconds (default: 10)'
)
parser.add_argument(
'--timeout', '-t',
type=float,
default=30.0,
help='Request timeout in seconds (default: 30)'
)
parser.add_argument(
'--compare',
action='store_true',
help='Compare two endpoints (requires two URLs)'
)
parser.add_argument(
'--no-verify-ssl',
action='store_true',
help='Disable SSL certificate verification'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
parser.add_argument(
'--output', '-o',
help='Output file path for results'
)
args = parser.parse_args()
# Validate
if args.compare and len(args.urls) < 2:
print("Error: --compare requires two URLs", file=sys.stderr)
sys.exit(1)
# Parse headers
headers = parse_headers(args.headers)
try:
tester = APILoadTester(
urls=args.urls,
method=args.method,
body=args.body,
headers=headers,
concurrency=args.concurrency,
duration=args.duration,
timeout=args.timeout,
compare=args.compare,
verbose=args.verbose,
verify_ssl=not args.no_verify_ssl,
)
results = tester.run()
if args.json:
output = json.dumps(results, indent=2)
if args.output:
with open(args.output, 'w') as f:
f.write(output)
print(f"\nResults written to: {args.output}")
else:
print(output)
elif args.output:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2)
print(f"\nResults written to: {args.output}")
except KeyboardInterrupt:
print("\nTest interrupted by user")
sys.exit(1)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
FILE:scripts/api_scaffolder.py
#!/usr/bin/env python3
"""
API Scaffolder
Generates Express.js route handlers, validation middleware, and TypeScript types
from OpenAPI specifications (YAML/JSON).
Usage:
python api_scaffolder.py openapi.yaml --output src/routes/
python api_scaffolder.py openapi.json --framework fastify --output src/
python api_scaffolder.py spec.yaml --types-only --output src/types/
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime
def load_yaml_as_json(content: str) -> Dict:
"""Parse YAML content without PyYAML dependency (basic subset)."""
lines = content.split('\n')
result = {}
stack = [(result, -1)]
current_key = None
in_array = False
array_indent = -1
for line in lines:
stripped = line.lstrip()
if not stripped or stripped.startswith('#'):
continue
indent = len(line) - len(stripped)
# Pop stack until we find the right level
while len(stack) > 1 and stack[-1][1] >= indent:
stack.pop()
current_obj = stack[-1][0]
if stripped.startswith('- '):
# Array item
value = stripped[2:].strip()
if isinstance(current_obj, list):
if ':' in value:
# Object in array
key, val = value.split(':', 1)
new_obj = {key.strip(): val.strip().strip('"').strip("'")}
current_obj.append(new_obj)
stack.append((new_obj, indent))
else:
current_obj.append(value.strip('"').strip("'"))
elif ':' in stripped:
key, value = stripped.split(':', 1)
key = key.strip()
value = value.strip()
if value == '':
# Check next line for array or object
new_obj = {}
current_obj[key] = new_obj
stack.append((new_obj, indent))
elif value.startswith('[') and value.endswith(']'):
# Inline array
items = value[1:-1].split(',')
current_obj[key] = [i.strip().strip('"').strip("'") for i in items if i.strip()]
else:
# Simple value
value = value.strip('"').strip("'")
if value.lower() == 'true':
value = True
elif value.lower() == 'false':
value = False
elif value.isdigit():
value = int(value)
current_obj[key] = value
return result
def load_spec(spec_path: Path) -> Dict:
"""Load OpenAPI spec from YAML or JSON file."""
content = spec_path.read_text()
if spec_path.suffix in ['.yaml', '.yml']:
try:
import yaml
return yaml.safe_load(content)
except ImportError:
# Fallback to basic YAML parser
return load_yaml_as_json(content)
else:
return json.loads(content)
def openapi_type_to_ts(schema: Dict) -> str:
"""Convert OpenAPI schema type to TypeScript type."""
if not schema:
return 'unknown'
if '$ref' in schema:
ref = schema['$ref']
return ref.split('/')[-1]
type_map = {
'string': 'string',
'integer': 'number',
'number': 'number',
'boolean': 'boolean',
'object': 'Record<string, unknown>',
'array': 'unknown[]',
}
schema_type = schema.get('type', 'unknown')
if schema_type == 'array':
items = schema.get('items', {})
item_type = openapi_type_to_ts(items)
return f'{item_type}[]'
if schema_type == 'object':
properties = schema.get('properties', {})
if properties:
props = []
required = schema.get('required', [])
for name, prop in properties.items():
ts_type = openapi_type_to_ts(prop)
optional = '?' if name not in required else ''
props.append(f' {name}{optional}: {ts_type};')
return '{\n' + '\n'.join(props) + '\n}'
return 'Record<string, unknown>'
if 'enum' in schema:
values = ' | '.join(f"'{v}'" for v in schema['enum'])
return values
return type_map.get(schema_type, 'unknown')
def generate_zod_schema(schema: Dict, name: str) -> str:
"""Generate Zod validation schema from OpenAPI schema."""
if not schema:
return f'export const {name}Schema = z.unknown();'
def schema_to_zod(s: Dict) -> str:
if '$ref' in s:
ref_name = s['$ref'].split('/')[-1]
return f'{ref_name}Schema'
s_type = s.get('type', 'unknown')
if s_type == 'string':
zod = 'z.string()'
if 'minLength' in s:
zod += f'.min({s["minLength"]})'
if 'maxLength' in s:
zod += f'.max({s["maxLength"]})'
if 'pattern' in s:
zod += f'.regex(/{s["pattern"]}/)'
if s.get('format') == 'email':
zod += '.email()'
if s.get('format') == 'uuid':
zod += '.uuid()'
if 'enum' in s:
values = ', '.join(f"'{v}'" for v in s['enum'])
return f'z.enum([{values}])'
return zod
if s_type == 'integer':
zod = 'z.number().int()'
if 'minimum' in s:
zod += f'.min({s["minimum"]})'
if 'maximum' in s:
zod += f'.max({s["maximum"]})'
return zod
if s_type == 'number':
zod = 'z.number()'
if 'minimum' in s:
zod += f'.min({s["minimum"]})'
if 'maximum' in s:
zod += f'.max({s["maximum"]})'
return zod
if s_type == 'boolean':
return 'z.boolean()'
if s_type == 'array':
items_zod = schema_to_zod(s.get('items', {}))
return f'z.array({items_zod})'
if s_type == 'object':
properties = s.get('properties', {})
required = s.get('required', [])
if not properties:
return 'z.record(z.unknown())'
props = []
for prop_name, prop_schema in properties.items():
prop_zod = schema_to_zod(prop_schema)
if prop_name not in required:
prop_zod += '.optional()'
props.append(f' {prop_name}: {prop_zod},')
return 'z.object({\n' + '\n'.join(props) + '\n})'
return 'z.unknown()'
return f'export const {name}Schema = {schema_to_zod(schema)};'
def to_camel_case(s: str) -> str:
"""Convert string to camelCase."""
s = re.sub(r'[^a-zA-Z0-9]', ' ', s)
words = s.split()
if not words:
return s
return words[0].lower() + ''.join(w.capitalize() for w in words[1:])
def to_pascal_case(s: str) -> str:
"""Convert string to PascalCase."""
s = re.sub(r'[^a-zA-Z0-9]', ' ', s)
return ''.join(w.capitalize() for w in s.split())
def extract_path_params(path: str) -> List[str]:
"""Extract path parameters from OpenAPI path."""
return re.findall(r'\{(\w+)\}', path)
def openapi_path_to_express(path: str) -> str:
"""Convert OpenAPI path to Express path format."""
return re.sub(r'\{(\w+)\}', r':\1', path)
class APIScaffolder:
"""Generate Express.js routes from OpenAPI specification."""
SUPPORTED_FRAMEWORKS = ['express', 'fastify', 'koa']
def __init__(self, spec_path: str, output_dir: str, framework: str = 'express',
types_only: bool = False, verbose: bool = False):
self.spec_path = Path(spec_path)
self.output_dir = Path(output_dir)
self.framework = framework
self.types_only = types_only
self.verbose = verbose
self.spec: Dict = {}
self.generated_files: List[str] = []
def run(self) -> Dict:
"""Execute scaffolding process."""
print(f"API Scaffolder - {self.framework.capitalize()}")
print(f"Spec: {self.spec_path}")
print(f"Output: {self.output_dir}")
print("-" * 50)
self.validate()
self.load_spec()
self.ensure_output_dir()
if self.types_only:
self.generate_types()
else:
self.generate_types()
self.generate_validators()
self.generate_routes()
self.generate_index()
return {
'status': 'success',
'spec': str(self.spec_path),
'output': str(self.output_dir),
'framework': self.framework,
'generated_files': self.generated_files,
'routes_count': len(self.get_operations()),
'types_count': len(self.get_schemas()),
}
def validate(self):
"""Validate inputs."""
if not self.spec_path.exists():
raise FileNotFoundError(f"Spec file not found: {self.spec_path}")
if self.framework not in self.SUPPORTED_FRAMEWORKS:
raise ValueError(f"Unsupported framework: {self.framework}")
def load_spec(self):
"""Load and parse OpenAPI specification."""
self.spec = load_spec(self.spec_path)
if self.verbose:
title = self.spec.get('info', {}).get('title', 'Unknown')
version = self.spec.get('info', {}).get('version', '0.0.0')
print(f"Loaded: {title} v{version}")
def ensure_output_dir(self):
"""Create output directory if needed."""
self.output_dir.mkdir(parents=True, exist_ok=True)
def get_schemas(self) -> Dict:
"""Get component schemas from spec."""
return self.spec.get('components', {}).get('schemas', {})
def get_operations(self) -> List[Dict]:
"""Extract all operations from spec."""
operations = []
paths = self.spec.get('paths', {})
for path, methods in paths.items():
if not isinstance(methods, dict):
continue
for method, details in methods.items():
if method.lower() not in ['get', 'post', 'put', 'patch', 'delete']:
continue
if not isinstance(details, dict):
continue
op_id = details.get('operationId', f'{method}_{path}'.replace('/', '_'))
operations.append({
'path': path,
'method': method.lower(),
'operation_id': op_id,
'summary': details.get('summary', ''),
'parameters': details.get('parameters', []),
'request_body': details.get('requestBody', {}),
'responses': details.get('responses', {}),
'tags': details.get('tags', ['default']),
})
return operations
def generate_types(self):
"""Generate TypeScript type definitions."""
schemas = self.get_schemas()
lines = [
'// Auto-generated TypeScript types',
f'// Generated from: {self.spec_path.name}',
f'// Date: {datetime.now().isoformat()}',
'',
]
for name, schema in schemas.items():
ts_type = openapi_type_to_ts(schema)
if ts_type.startswith('{'):
lines.append(f'export interface {name} {ts_type}')
else:
lines.append(f'export type {name} = {ts_type};')
lines.append('')
# Generate request/response types from operations
for op in self.get_operations():
op_name = to_pascal_case(op['operation_id'])
# Request body type
req_body = op.get('request_body', {})
if req_body:
content = req_body.get('content', {})
json_content = content.get('application/json', {})
schema = json_content.get('schema', {})
if schema and '$ref' not in schema:
ts_type = openapi_type_to_ts(schema)
lines.append(f'export interface {op_name}Request {ts_type}')
lines.append('')
# Response type (200 response)
responses = op.get('responses', {})
success_resp = responses.get('200', responses.get('201', {}))
if success_resp:
content = success_resp.get('content', {})
json_content = content.get('application/json', {})
schema = json_content.get('schema', {})
if schema and '$ref' not in schema:
ts_type = openapi_type_to_ts(schema)
lines.append(f'export interface {op_name}Response {ts_type}')
lines.append('')
types_file = self.output_dir / 'types.ts'
types_file.write_text('\n'.join(lines))
self.generated_files.append(str(types_file))
print(f" Generated: {types_file}")
def generate_validators(self):
"""Generate Zod validation schemas."""
schemas = self.get_schemas()
lines = [
"import { z } from 'zod';",
'',
'// Auto-generated Zod validation schemas',
f'// Generated from: {self.spec_path.name}',
'',
]
for name, schema in schemas.items():
zod_schema = generate_zod_schema(schema, name)
lines.append(zod_schema)
lines.append(f'export type {name} = z.infer<typeof {name}Schema>;')
lines.append('')
# Generate validation middleware
lines.extend([
'// Validation middleware factory',
'import { Request, Response, NextFunction } from "express";',
'',
'export function validate<T>(schema: z.ZodSchema<T>) {',
' return (req: Request, res: Response, next: NextFunction) => {',
' const result = schema.safeParse(req.body);',
' if (!result.success) {',
' return res.status(400).json({',
' error: {',
' code: "VALIDATION_ERROR",',
' message: "Request validation failed",',
' details: result.error.errors.map(e => ({',
' field: e.path.join("."),',
' message: e.message,',
' })),',
' },',
' });',
' }',
' req.body = result.data;',
' next();',
' };',
'}',
])
validators_file = self.output_dir / 'validators.ts'
validators_file.write_text('\n'.join(lines))
self.generated_files.append(str(validators_file))
print(f" Generated: {validators_file}")
def generate_routes(self):
"""Generate route handlers."""
operations = self.get_operations()
# Group by tag
routes_by_tag: Dict[str, List[Dict]] = {}
for op in operations:
tag = op['tags'][0] if op['tags'] else 'default'
if tag not in routes_by_tag:
routes_by_tag[tag] = []
routes_by_tag[tag].append(op)
# Generate a route file per tag
for tag, ops in routes_by_tag.items():
self.generate_route_file(tag, ops)
def generate_route_file(self, tag: str, operations: List[Dict]):
"""Generate a single route file."""
tag_name = to_camel_case(tag)
lines = [
"import { Router, Request, Response, NextFunction } from 'express';",
"import { validate } from './validators';",
"import * as schemas from './validators';",
'',
f'const router = Router();',
'',
]
for op in operations:
method = op['method']
path = openapi_path_to_express(op['path'])
handler_name = to_camel_case(op['operation_id'])
summary = op.get('summary', '')
# Check if has request body
req_body = op.get('request_body', {})
has_body = bool(req_body.get('content', {}).get('application/json'))
# Find schema reference
schema_ref = None
if has_body:
content = req_body.get('content', {}).get('application/json', {})
schema = content.get('schema', {})
if '$ref' in schema:
schema_ref = schema['$ref'].split('/')[-1]
lines.append(f'/**')
if summary:
lines.append(f' * {summary}')
lines.append(f' * {method.upper()} {op["path"]}')
lines.append(f' */')
middleware = ''
if schema_ref:
middleware = f'validate(schemas.{schema_ref}Schema), '
lines.append(f"router.{method}('{path}', {middleware}async (req: Request, res: Response, next: NextFunction) => {{")
lines.append(' try {')
# Extract path params
path_params = extract_path_params(op['path'])
if path_params:
lines.append(f" const {{ {', '.join(path_params)} }} = req.params;")
lines.append('')
lines.append(f' // TODO: Implement {handler_name}')
lines.append('')
# Default response based on method
if method == 'post':
lines.append(" res.status(201).json({ message: 'Created' });")
elif method == 'delete':
lines.append(" res.status(204).send();")
else:
lines.append(" res.json({ message: 'OK' });")
lines.append(' } catch (err) {')
lines.append(' next(err);')
lines.append(' }')
lines.append('});')
lines.append('')
lines.append(f'export default router;')
route_file = self.output_dir / f'{tag_name}.routes.ts'
route_file.write_text('\n'.join(lines))
self.generated_files.append(str(route_file))
print(f" Generated: {route_file} ({len(operations)} handlers)")
def generate_index(self):
"""Generate index file that combines all routes."""
operations = self.get_operations()
# Get unique tags
tags = set()
for op in operations:
tag = op['tags'][0] if op['tags'] else 'default'
tags.add(tag)
lines = [
"import { Router } from 'express';",
'',
]
for tag in sorted(tags):
tag_name = to_camel_case(tag)
lines.append(f"import {tag_name}Routes from './{tag_name}.routes';")
lines.extend([
'',
'const router = Router();',
'',
])
for tag in sorted(tags):
tag_name = to_camel_case(tag)
# Use tag as base path
base_path = '/' + tag.lower().replace(' ', '-')
lines.append(f"router.use('{base_path}', {tag_name}Routes);")
lines.extend([
'',
'export default router;',
])
index_file = self.output_dir / 'index.ts'
index_file.write_text('\n'.join(lines))
self.generated_files.append(str(index_file))
print(f" Generated: {index_file}")
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description='Generate Express.js routes from OpenAPI specification',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s openapi.yaml --output src/routes/
%(prog)s spec.json --framework fastify --output src/api/
%(prog)s openapi.yaml --types-only --output src/types/
'''
)
parser.add_argument(
'spec',
help='Path to OpenAPI specification (YAML or JSON)'
)
parser.add_argument(
'--output', '-o',
default='./generated',
help='Output directory (default: ./generated)'
)
parser.add_argument(
'--framework', '-f',
choices=['express', 'fastify', 'koa'],
default='express',
help='Target framework (default: express)'
)
parser.add_argument(
'--types-only',
action='store_true',
help='Generate only TypeScript types'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
args = parser.parse_args()
try:
scaffolder = APIScaffolder(
spec_path=args.spec,
output_dir=args.output,
framework=args.framework,
types_only=args.types_only,
verbose=args.verbose,
)
results = scaffolder.run()
print("-" * 50)
print(f"Generated {results['routes_count']} route handlers")
print(f"Generated {results['types_count']} type definitions")
print(f"Output: {results['output']}")
if args.json:
print(json.dumps(results, indent=2))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
FILE:scripts/database_migration_tool.py
#!/usr/bin/env python3
"""
Database Migration Tool
Analyzes SQL schema files, detects potential issues, suggests indexes,
and generates migration scripts with rollback support.
Usage:
python database_migration_tool.py schema.sql --analyze
python database_migration_tool.py old.sql --compare new.sql --output migrations/
python database_migration_tool.py schema.sql --suggest-indexes
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from datetime import datetime
from dataclasses import dataclass, field, asdict
@dataclass
class Column:
"""Database column definition."""
name: str
data_type: str
nullable: bool = True
default: Optional[str] = None
primary_key: bool = False
unique: bool = False
references: Optional[str] = None
@dataclass
class Index:
"""Database index definition."""
name: str
table: str
columns: List[str]
unique: bool = False
partial: Optional[str] = None
@dataclass
class Table:
"""Database table definition."""
name: str
columns: Dict[str, Column] = field(default_factory=dict)
indexes: List[Index] = field(default_factory=list)
primary_key: List[str] = field(default_factory=list)
foreign_keys: List[Dict] = field(default_factory=list)
@dataclass
class Issue:
"""Schema issue or recommendation."""
severity: str # 'error', 'warning', 'info'
category: str # 'index', 'naming', 'type', 'constraint'
table: str
message: str
suggestion: Optional[str] = None
class SQLParser:
"""Parse SQL DDL statements."""
# Common patterns
CREATE_TABLE_PATTERN = re.compile(
r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?["`]?(\w+)["`]?\s*\((.*?)\)\s*;',
re.IGNORECASE | re.DOTALL
)
CREATE_INDEX_PATTERN = re.compile(
r'CREATE\s+(UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?["`]?(\w+)["`]?\s+'
r'ON\s+["`]?(\w+)["`]?\s*\(([^)]+)\)(?:\s+WHERE\s+(.+?))?;',
re.IGNORECASE | re.DOTALL
)
COLUMN_PATTERN = re.compile(
r'["`]?(\w+)["`]?\s+' # Column name
r'(\w+(?:\s*\([^)]+\))?)' # Data type
r'([^,]*)', # Constraints
re.IGNORECASE
)
FK_PATTERN = re.compile(
r'FOREIGN\s+KEY\s*\(["`]?(\w+)["`]?\)\s+'
r'REFERENCES\s+["`]?(\w+)["`]?\s*\(["`]?(\w+)["`]?\)',
re.IGNORECASE
)
def parse(self, sql: str) -> Dict[str, Table]:
"""Parse SQL and return table definitions."""
tables = {}
# Parse CREATE TABLE statements
for match in self.CREATE_TABLE_PATTERN.finditer(sql):
table_name = match.group(1)
body = match.group(2)
table = self._parse_table_body(table_name, body)
tables[table_name] = table
# Parse CREATE INDEX statements
for match in self.CREATE_INDEX_PATTERN.finditer(sql):
unique = bool(match.group(1))
index_name = match.group(2)
table_name = match.group(3)
columns = [c.strip().strip('"`') for c in match.group(4).split(',')]
where_clause = match.group(5)
index = Index(
name=index_name,
table=table_name,
columns=columns,
unique=unique,
partial=where_clause.strip() if where_clause else None
)
if table_name in tables:
tables[table_name].indexes.append(index)
return tables
def _parse_table_body(self, table_name: str, body: str) -> Table:
"""Parse table body (columns, constraints)."""
table = Table(name=table_name)
# Split by comma, but respect parentheses
parts = self._split_by_comma(body)
for part in parts:
part = part.strip()
# Skip empty parts
if not part:
continue
# Check for PRIMARY KEY constraint
if part.upper().startswith('PRIMARY KEY'):
pk_match = re.search(r'PRIMARY\s+KEY\s*\(([^)]+)\)', part, re.IGNORECASE)
if pk_match:
cols = [c.strip().strip('"`') for c in pk_match.group(1).split(',')]
table.primary_key = cols
# Check for FOREIGN KEY constraint
elif part.upper().startswith('FOREIGN KEY'):
fk_match = self.FK_PATTERN.search(part)
if fk_match:
table.foreign_keys.append({
'column': fk_match.group(1),
'ref_table': fk_match.group(2),
'ref_column': fk_match.group(3),
})
# Check for CONSTRAINT
elif part.upper().startswith('CONSTRAINT'):
# Handle named constraints
if 'PRIMARY KEY' in part.upper():
pk_match = re.search(r'PRIMARY\s+KEY\s*\(([^)]+)\)', part, re.IGNORECASE)
if pk_match:
cols = [c.strip().strip('"`') for c in pk_match.group(1).split(',')]
table.primary_key = cols
elif 'FOREIGN KEY' in part.upper():
fk_match = self.FK_PATTERN.search(part)
if fk_match:
table.foreign_keys.append({
'column': fk_match.group(1),
'ref_table': fk_match.group(2),
'ref_column': fk_match.group(3),
})
# Regular column definition
else:
col_match = self.COLUMN_PATTERN.match(part)
if col_match:
col_name = col_match.group(1)
col_type = col_match.group(2)
constraints = col_match.group(3).upper() if col_match.group(3) else ''
column = Column(
name=col_name,
data_type=col_type.upper(),
nullable='NOT NULL' not in constraints,
primary_key='PRIMARY KEY' in constraints,
unique='UNIQUE' in constraints,
)
# Extract default value
default_match = re.search(r'DEFAULT\s+(\S+)', constraints, re.IGNORECASE)
if default_match:
column.default = default_match.group(1)
# Extract references
ref_match = re.search(
r'REFERENCES\s+["`]?(\w+)["`]?\s*\(["`]?(\w+)["`]?\)',
constraints,
re.IGNORECASE
)
if ref_match:
column.references = f"{ref_match.group(1)}({ref_match.group(2)})"
table.foreign_keys.append({
'column': col_name,
'ref_table': ref_match.group(1),
'ref_column': ref_match.group(2),
})
if column.primary_key and col_name not in table.primary_key:
table.primary_key.append(col_name)
table.columns[col_name] = column
return table
def _split_by_comma(self, s: str) -> List[str]:
"""Split string by comma, respecting parentheses."""
parts = []
current = []
depth = 0
for char in s:
if char == '(':
depth += 1
elif char == ')':
depth -= 1
elif char == ',' and depth == 0:
parts.append(''.join(current))
current = []
continue
current.append(char)
if current:
parts.append(''.join(current))
return parts
class SchemaAnalyzer:
"""Analyze database schema for issues and optimizations."""
# Columns that typically need indexes (foreign keys)
FK_COLUMN_PATTERNS = ['_id', 'Id', '_ID']
# Columns that typically need indexes for filtering
FILTER_COLUMN_PATTERNS = ['status', 'state', 'type', 'category', 'active', 'enabled', 'deleted']
# Columns that typically need indexes for sorting/ordering
SORT_COLUMN_PATTERNS = ['created_at', 'updated_at', 'date', 'timestamp', 'order', 'position']
def __init__(self, tables: Dict[str, Table]):
self.tables = tables
self.issues: List[Issue] = []
def analyze(self) -> List[Issue]:
"""Run all analysis checks."""
self.issues = []
for table_name, table in self.tables.items():
self._check_naming_conventions(table)
self._check_primary_key(table)
self._check_foreign_key_indexes(table)
self._check_common_filter_columns(table)
self._check_timestamp_columns(table)
self._check_data_types(table)
return self.issues
def _check_naming_conventions(self, table: Table):
"""Check table and column naming conventions."""
# Table name should be lowercase
if table.name != table.name.lower():
self.issues.append(Issue(
severity='warning',
category='naming',
table=table.name,
message=f"Table name '{table.name}' should be lowercase",
suggestion=f"Rename to '{table.name.lower()}'"
))
# Table name should be plural (basic check)
if not table.name.endswith('s') and not table.name.endswith('es'):
self.issues.append(Issue(
severity='info',
category='naming',
table=table.name,
message=f"Table name '{table.name}' should typically be plural",
))
for col_name, col in table.columns.items():
# Column names should be lowercase with underscores
if col_name != col_name.lower():
self.issues.append(Issue(
severity='warning',
category='naming',
table=table.name,
message=f"Column '{col_name}' should use snake_case",
suggestion=f"Rename to '{self._to_snake_case(col_name)}'"
))
def _check_primary_key(self, table: Table):
"""Check for missing primary key."""
if not table.primary_key:
self.issues.append(Issue(
severity='error',
category='constraint',
table=table.name,
message=f"Table '{table.name}' has no primary key",
suggestion="Add a primary key column (e.g., 'id SERIAL PRIMARY KEY')"
))
def _check_foreign_key_indexes(self, table: Table):
"""Check that foreign key columns have indexes."""
indexed_columns = set()
for index in table.indexes:
indexed_columns.update(index.columns)
# Primary key columns are implicitly indexed
indexed_columns.update(table.primary_key)
for fk in table.foreign_keys:
fk_col = fk['column']
if fk_col not in indexed_columns:
self.issues.append(Issue(
severity='warning',
category='index',
table=table.name,
message=f"Foreign key column '{fk_col}' is not indexed",
suggestion=f"CREATE INDEX idx_{table.name}_{fk_col} ON {table.name}({fk_col});"
))
# Also check columns that look like foreign keys but aren't declared
for col_name in table.columns:
if any(col_name.endswith(pattern) for pattern in self.FK_COLUMN_PATTERNS):
if col_name not in indexed_columns:
# Check if it's actually a declared FK
is_declared_fk = any(fk['column'] == col_name for fk in table.foreign_keys)
if not is_declared_fk:
self.issues.append(Issue(
severity='info',
category='index',
table=table.name,
message=f"Column '{col_name}' looks like a foreign key but has no index",
suggestion=f"CREATE INDEX idx_{table.name}_{col_name} ON {table.name}({col_name});"
))
def _check_common_filter_columns(self, table: Table):
"""Check for indexes on commonly filtered columns."""
indexed_columns = set()
for index in table.indexes:
indexed_columns.update(index.columns)
indexed_columns.update(table.primary_key)
for col_name in table.columns:
col_lower = col_name.lower()
if any(pattern in col_lower for pattern in self.FILTER_COLUMN_PATTERNS):
if col_name not in indexed_columns:
self.issues.append(Issue(
severity='info',
category='index',
table=table.name,
message=f"Column '{col_name}' is commonly used for filtering but has no index",
suggestion=f"CREATE INDEX idx_{table.name}_{col_name} ON {table.name}({col_name});"
))
def _check_timestamp_columns(self, table: Table):
"""Check for indexes on timestamp columns used for sorting."""
has_created_at = 'created_at' in table.columns
has_updated_at = 'updated_at' in table.columns
if not has_created_at:
self.issues.append(Issue(
severity='info',
category='convention',
table=table.name,
message=f"Table '{table.name}' has no 'created_at' column",
suggestion="Consider adding: created_at TIMESTAMP DEFAULT NOW()"
))
if not has_updated_at:
self.issues.append(Issue(
severity='info',
category='convention',
table=table.name,
message=f"Table '{table.name}' has no 'updated_at' column",
suggestion="Consider adding: updated_at TIMESTAMP DEFAULT NOW()"
))
def _check_data_types(self, table: Table):
"""Check for potential data type issues."""
for col_name, col in table.columns.items():
dtype = col.data_type.upper()
# Check for VARCHAR without length
if 'VARCHAR' in dtype and '(' not in dtype:
self.issues.append(Issue(
severity='warning',
category='type',
table=table.name,
message=f"Column '{col_name}' uses VARCHAR without length",
suggestion="Specify a maximum length, e.g., VARCHAR(255)"
))
# Check for FLOAT/DOUBLE for monetary values
if 'FLOAT' in dtype or 'DOUBLE' in dtype:
if 'price' in col_name.lower() or 'amount' in col_name.lower() or 'total' in col_name.lower():
self.issues.append(Issue(
severity='warning',
category='type',
table=table.name,
message=f"Column '{col_name}' uses floating point for monetary value",
suggestion="Use DECIMAL or NUMERIC for monetary values"
))
# Check for TEXT columns that might benefit from length limits
if dtype == 'TEXT':
if 'email' in col_name.lower() or 'url' in col_name.lower():
self.issues.append(Issue(
severity='info',
category='type',
table=table.name,
message=f"Column '{col_name}' uses TEXT but might benefit from VARCHAR",
suggestion=f"Consider VARCHAR(255) for {col_name}"
))
def _to_snake_case(self, name: str) -> str:
"""Convert name to snake_case."""
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
class MigrationGenerator:
"""Generate migration scripts from schema differences."""
def __init__(self, old_tables: Dict[str, Table], new_tables: Dict[str, Table]):
self.old_tables = old_tables
self.new_tables = new_tables
def generate(self) -> Tuple[str, str]:
"""Generate UP and DOWN migration scripts."""
up_statements = []
down_statements = []
# Find new tables
for table_name, table in self.new_tables.items():
if table_name not in self.old_tables:
up_statements.append(self._generate_create_table(table))
down_statements.append(f"DROP TABLE IF EXISTS {table_name};")
# Find removed tables
for table_name, table in self.old_tables.items():
if table_name not in self.new_tables:
up_statements.append(f"DROP TABLE IF EXISTS {table_name};")
down_statements.append(self._generate_create_table(table))
# Find modified tables
for table_name in set(self.old_tables.keys()) & set(self.new_tables.keys()):
old_table = self.old_tables[table_name]
new_table = self.new_tables[table_name]
up, down = self._compare_tables(old_table, new_table)
up_statements.extend(up)
down_statements.extend(down)
up_sql = '\n\n'.join(up_statements) if up_statements else '-- No changes'
down_sql = '\n\n'.join(down_statements) if down_statements else '-- No changes'
return up_sql, down_sql
def _generate_create_table(self, table: Table) -> str:
"""Generate CREATE TABLE statement."""
lines = [f"CREATE TABLE {table.name} ("]
col_defs = []
for col_name, col in table.columns.items():
col_def = f" {col_name} {col.data_type}"
if not col.nullable:
col_def += " NOT NULL"
if col.default:
col_def += f" DEFAULT {col.default}"
if col.primary_key and len(table.primary_key) == 1:
col_def += " PRIMARY KEY"
if col.unique:
col_def += " UNIQUE"
col_defs.append(col_def)
# Add composite primary key
if len(table.primary_key) > 1:
pk_cols = ', '.join(table.primary_key)
col_defs.append(f" PRIMARY KEY ({pk_cols})")
# Add foreign keys
for fk in table.foreign_keys:
col_defs.append(
f" FOREIGN KEY ({fk['column']}) REFERENCES {fk['ref_table']}({fk['ref_column']})"
)
lines.append(',\n'.join(col_defs))
lines.append(");")
return '\n'.join(lines)
def _compare_tables(self, old: Table, new: Table) -> Tuple[List[str], List[str]]:
"""Compare two tables and generate ALTER statements."""
up = []
down = []
# New columns
for col_name, col in new.columns.items():
if col_name not in old.columns:
up.append(f"ALTER TABLE {new.name} ADD COLUMN {col_name} {col.data_type}"
+ (" NOT NULL" if not col.nullable else "")
+ (f" DEFAULT {col.default}" if col.default else "") + ";")
down.append(f"ALTER TABLE {new.name} DROP COLUMN IF EXISTS {col_name};")
# Removed columns
for col_name, col in old.columns.items():
if col_name not in new.columns:
up.append(f"ALTER TABLE {old.name} DROP COLUMN IF EXISTS {col_name};")
down.append(f"ALTER TABLE {old.name} ADD COLUMN {col_name} {col.data_type}"
+ (" NOT NULL" if not col.nullable else "")
+ (f" DEFAULT {col.default}" if col.default else "") + ";")
# Modified columns (type changes)
for col_name in set(old.columns.keys()) & set(new.columns.keys()):
old_col = old.columns[col_name]
new_col = new.columns[col_name]
if old_col.data_type != new_col.data_type:
up.append(f"ALTER TABLE {new.name} ALTER COLUMN {col_name} TYPE {new_col.data_type};")
down.append(f"ALTER TABLE {old.name} ALTER COLUMN {col_name} TYPE {old_col.data_type};")
# New indexes
old_index_names = {idx.name for idx in old.indexes}
for idx in new.indexes:
if idx.name not in old_index_names:
unique = "UNIQUE " if idx.unique else ""
cols = ', '.join(idx.columns)
where = f" WHERE {idx.partial}" if idx.partial else ""
up.append(f"CREATE {unique}INDEX CONCURRENTLY {idx.name} ON {idx.table}({cols}){where};")
down.append(f"DROP INDEX IF EXISTS {idx.name};")
# Removed indexes
new_index_names = {idx.name for idx in new.indexes}
for idx in old.indexes:
if idx.name not in new_index_names:
unique = "UNIQUE " if idx.unique else ""
cols = ', '.join(idx.columns)
where = f" WHERE {idx.partial}" if idx.partial else ""
up.append(f"DROP INDEX IF EXISTS {idx.name};")
down.append(f"CREATE {unique}INDEX {idx.name} ON {idx.table}({cols}){where};")
return up, down
class DatabaseMigrationTool:
"""Main tool for database migration analysis."""
def __init__(self, schema_path: str, compare_path: Optional[str] = None,
output_dir: Optional[str] = None, verbose: bool = False):
self.schema_path = Path(schema_path)
self.compare_path = Path(compare_path) if compare_path else None
self.output_dir = Path(output_dir) if output_dir else None
self.verbose = verbose
self.parser = SQLParser()
def run(self, mode: str = 'analyze') -> Dict:
"""Execute the tool in specified mode."""
print(f"Database Migration Tool")
print(f"Schema: {self.schema_path}")
print("-" * 50)
if not self.schema_path.exists():
raise FileNotFoundError(f"Schema file not found: {self.schema_path}")
schema_sql = self.schema_path.read_text()
tables = self.parser.parse(schema_sql)
if self.verbose:
print(f"Parsed {len(tables)} tables")
if mode == 'analyze':
return self._analyze(tables)
elif mode == 'compare':
return self._compare(tables)
elif mode == 'suggest-indexes':
return self._suggest_indexes(tables)
else:
raise ValueError(f"Unknown mode: {mode}")
def _analyze(self, tables: Dict[str, Table]) -> Dict:
"""Analyze schema for issues."""
analyzer = SchemaAnalyzer(tables)
issues = analyzer.analyze()
# Group by severity
errors = [i for i in issues if i.severity == 'error']
warnings = [i for i in issues if i.severity == 'warning']
infos = [i for i in issues if i.severity == 'info']
print(f"\nAnalysis Results:")
print(f" Tables: {len(tables)}")
print(f" Errors: {len(errors)}")
print(f" Warnings: {len(warnings)}")
print(f" Suggestions: {len(infos)}")
if errors:
print(f"\nERRORS:")
for issue in errors:
print(f" [{issue.table}] {issue.message}")
if issue.suggestion:
print(f" Suggestion: {issue.suggestion}")
if warnings:
print(f"\nWARNINGS:")
for issue in warnings:
print(f" [{issue.table}] {issue.message}")
if issue.suggestion:
print(f" Suggestion: {issue.suggestion}")
if self.verbose and infos:
print(f"\nSUGGESTIONS:")
for issue in infos:
print(f" [{issue.table}] {issue.message}")
if issue.suggestion:
print(f" {issue.suggestion}")
return {
'status': 'success',
'tables_count': len(tables),
'issues': {
'errors': len(errors),
'warnings': len(warnings),
'suggestions': len(infos),
},
'issues_detail': [asdict(i) for i in issues],
}
def _compare(self, old_tables: Dict[str, Table]) -> Dict:
"""Compare two schemas and generate migration."""
if not self.compare_path:
raise ValueError("Compare path required for compare mode")
if not self.compare_path.exists():
raise FileNotFoundError(f"Compare file not found: {self.compare_path}")
new_sql = self.compare_path.read_text()
new_tables = self.parser.parse(new_sql)
generator = MigrationGenerator(old_tables, new_tables)
up_sql, down_sql = generator.generate()
print(f"\nComparing schemas:")
print(f" Old: {self.schema_path}")
print(f" New: {self.compare_path}")
# Calculate changes
added_tables = set(new_tables.keys()) - set(old_tables.keys())
removed_tables = set(old_tables.keys()) - set(new_tables.keys())
print(f"\nChanges detected:")
print(f" Added tables: {len(added_tables)}")
print(f" Removed tables: {len(removed_tables)}")
if self.output_dir:
self.output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
up_file = self.output_dir / f"{timestamp}_migration.sql"
down_file = self.output_dir / f"{timestamp}_migration_rollback.sql"
up_file.write_text(f"-- Migration: {self.schema_path} -> {self.compare_path}\n"
f"-- Generated: {datetime.now().isoformat()}\n\n"
f"BEGIN;\n\n{up_sql}\n\nCOMMIT;\n")
down_file.write_text(f"-- Rollback for migration {timestamp}\n"
f"-- Generated: {datetime.now().isoformat()}\n\n"
f"BEGIN;\n\n{down_sql}\n\nCOMMIT;\n")
print(f"\nGenerated files:")
print(f" Migration: {up_file}")
print(f" Rollback: {down_file}")
else:
print(f"\n--- UP MIGRATION ---")
print(up_sql)
print(f"\n--- DOWN MIGRATION ---")
print(down_sql)
return {
'status': 'success',
'added_tables': list(added_tables),
'removed_tables': list(removed_tables),
'up_sql': up_sql,
'down_sql': down_sql,
}
def _suggest_indexes(self, tables: Dict[str, Table]) -> Dict:
"""Generate index suggestions."""
suggestions = []
for table_name, table in tables.items():
# Get existing indexed columns
indexed = set()
for idx in table.indexes:
indexed.update(idx.columns)
indexed.update(table.primary_key)
# Suggest indexes for foreign keys
for fk in table.foreign_keys:
if fk['column'] not in indexed:
suggestions.append({
'table': table_name,
'column': fk['column'],
'reason': 'Foreign key',
'sql': f"CREATE INDEX idx_{table_name}_{fk['column']} ON {table_name}({fk['column']});"
})
# Suggest indexes for common patterns
for col_name in table.columns:
if col_name in indexed:
continue
col_lower = col_name.lower()
# Foreign key pattern
if col_name.endswith('_id') and col_name not in indexed:
suggestions.append({
'table': table_name,
'column': col_name,
'reason': 'Likely foreign key',
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name});"
})
# Status/type columns
elif col_lower in ['status', 'state', 'type', 'category']:
suggestions.append({
'table': table_name,
'column': col_name,
'reason': 'Common filter column',
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name});"
})
# Timestamp columns
elif col_lower in ['created_at', 'updated_at']:
suggestions.append({
'table': table_name,
'column': col_name,
'reason': 'Common sort column',
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name} DESC);"
})
print(f"\nIndex Suggestions ({len(suggestions)} found):")
for s in suggestions:
print(f"\n [{s['table']}.{s['column']}] {s['reason']}")
print(f" {s['sql']}")
if self.output_dir:
self.output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = self.output_dir / f"{timestamp}_add_indexes.sql"
lines = [
f"-- Suggested indexes",
f"-- Generated: {datetime.now().isoformat()}",
"",
]
for s in suggestions:
lines.append(f"-- {s['table']}.{s['column']}: {s['reason']}")
lines.append(s['sql'])
lines.append("")
output_file.write_text('\n'.join(lines))
print(f"\nWritten to: {output_file}")
return {
'status': 'success',
'suggestions_count': len(suggestions),
'suggestions': suggestions,
}
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description='Analyze SQL schemas and generate migrations',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s schema.sql --analyze
%(prog)s old.sql --compare new.sql --output migrations/
%(prog)s schema.sql --suggest-indexes --output migrations/
'''
)
parser.add_argument(
'schema',
help='Path to SQL schema file'
)
parser.add_argument(
'--analyze',
action='store_true',
help='Analyze schema for issues and optimizations'
)
parser.add_argument(
'--compare',
metavar='FILE',
help='Compare with another schema file and generate migration'
)
parser.add_argument(
'--suggest-indexes',
action='store_true',
help='Generate index suggestions'
)
parser.add_argument(
'--output', '-o',
help='Output directory for generated files'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
args = parser.parse_args()
# Determine mode
if args.compare:
mode = 'compare'
elif args.suggest_indexes:
mode = 'suggest-indexes'
else:
mode = 'analyze'
try:
tool = DatabaseMigrationTool(
schema_path=args.schema,
compare_path=args.compare,
output_dir=args.output,
verbose=args.verbose,
)
results = tool.run(mode=mode)
if args.json:
print(json.dumps(results, indent=2))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()