home / skills / eyadsibai / ltk / cqrs

This skill helps you implement CQRS by separating read and write models and optimizing queries for scalable architectures.

npx playbooks add skill eyadsibai/ltk --skill cqrs

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
5.0 KB
---
name: cqrs
description: Use when implementing CQRS pattern, separating read and write models, optimizing query performance, or asking about "CQRS", "Command Query Responsibility Segregation", "read model", "write model", "command bus", "query bus"
version: 1.0.0
---

# CQRS Implementation

Command Query Responsibility Segregation for scalable architectures.

## Architecture

```
              ┌────────────┴────────────┐
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │  Commands   │          │   Queries   │
       │    API      │          │    API      │
       └──────┬──────┘          └──────┬──────┘
              │                         │
              ▼                         ▼
       ┌─────────────┐          ┌─────────────┐
       │   Write     │─────────►│    Read     │
       │   Model     │  Events  │   Model     │
       └─────────────┘          └─────────────┘
```

## Command Infrastructure

```python
@dataclass
class Command:
    command_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.utcnow)

@dataclass
class CreateOrder(Command):
    customer_id: str
    items: list
    shipping_address: dict

class CommandHandler(ABC, Generic[T]):
    @abstractmethod
    async def handle(self, command: T) -> Any:
        pass

class CommandBus:
    def __init__(self):
        self._handlers: Dict[Type[Command], CommandHandler] = {}

    def register(self, command_type, handler):
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> Any:
        handler = self._handlers.get(type(command))
        return await handler.handle(command)
```

## Query Infrastructure

```python
@dataclass
class GetOrderById(Query):
    order_id: str

@dataclass
class OrderView:
    order_id: str
    customer_id: str
    status: str
    total_amount: float
    created_at: datetime

class GetOrderByIdHandler(QueryHandler[GetOrderById, OrderView]):
    async def handle(self, query: GetOrderById) -> Optional[OrderView]:
        row = await self.read_db.fetchrow(
            "SELECT * FROM order_views WHERE order_id = $1",
            query.order_id
        )
        return OrderView(**dict(row)) if row else None
```

## FastAPI Integration

```python
# Command endpoints (POST, PUT, DELETE)
@app.post("/orders")
async def create_order(request: CreateOrderRequest, command_bus: CommandBus = Depends()):
    command = CreateOrder(
        customer_id=request.customer_id,
        items=request.items
    )
    order_id = await command_bus.dispatch(command)
    return {"order_id": order_id}

# Query endpoints (GET)
@app.get("/orders/{order_id}")
async def get_order(order_id: str, query_bus: QueryBus = Depends()):
    query = GetOrderById(order_id=order_id)
    return await query_bus.dispatch(query)
```

## Read Model Synchronization

```python
class ReadModelSynchronizer:
    async def sync_projection(self, projection: Projection):
        checkpoint = await self._get_checkpoint(projection.name)
        events = await self.event_store.read_all(from_position=checkpoint)

        for event in events:
            if event.event_type in projection.handles():
                await projection.apply(event)
            await self._save_checkpoint(projection.name, event.position)

    async def rebuild_projection(self, projection_name: str):
        projection = self.projections[projection_name]
        await projection.clear()
        await self._save_checkpoint(projection_name, 0)
        # Rebuild from beginning
```

## Eventual Consistency

```python
async def query_after_command(self, query, expected_version, stream_id, timeout=5.0):
    """Read-your-writes consistency."""
    start = time.time()
    while time.time() - start < timeout:
        projection_version = await self._get_projection_version(stream_id)
        if projection_version >= expected_version:
            return await self.execute_query(query)
        await asyncio.sleep(0.1)

    return {"data": await self.execute_query(query), "_warning": "May be stale"}
```

## Best Practices

1. **Separate command and query models** - Different optimization needs
2. **Accept eventual consistency** - Define acceptable lag
3. **Validate in command handlers** - Before state change
4. **Denormalize read models** - Optimize for queries
5. **Version your events** - For schema evolution

## When to Use CQRS

**Good for:**

- Different read/write scaling needs
- Complex query requirements
- Event-sourced systems
- High-performance reporting

**Not for:**

- Simple CRUD applications
- No scaling requirements
- Small data sets

Overview

This skill helps implement the CQRS (Command Query Responsibility Segregation) pattern in Python services, with examples for command/query buses, read-model synchronization, and FastAPI integration. It focuses on separating write and read models, handling eventual consistency, and scaling query performance. Use it to design systems where read and write concerns have different optimization needs.

How this skill works

The skill provides patterns and lightweight components: command and query dataclasses, handler interfaces, and buses that dispatch to registered handlers. It shows read-model projection synchronization driven by event streams, a rebuild path for projections, and a read-your-writes helper to wait for projection catch-up. Examples include FastAPI endpoints wired to command/query buses and a simple strategy for denormalized read views.

When to use it

  • When read and write workloads have different scaling or latency profiles
  • When queries require denormalized or optimized read models
  • When adopting event sourcing or wanting immutable event logs
  • When complex reporting or analytics need tailored views
  • When you can accept eventual consistency between writes and reads

Best practices

  • Keep command handlers authoritative for state changes and validations
  • Design read models denormalized for query performance, not normalization
  • Version events and projections to support schema evolution
  • Define acceptable consistency windows and use read-after-write waits sparingly
  • Provide projection rebuild and checkpointing for safe recovery

Example use cases

  • Order processing: commands for create/update, denormalized order_views for fast lookups
  • Inventory: write model enforces business rules, read model supports catalog queries and search
  • Reporting: projections aggregate events into precomputed analytical views
  • Microservices: command bus for state transitions, query bus for read-side APIs

FAQ

How do I handle consistency after a command?

Use the read-your-writes helper to poll projection version until it reaches the expected event version, or return the command result with a clear eventual-consistency contract to clients.

When is CQRS overkill?

Avoid CQRS for simple CRUD apps, small teams, or systems without divergent read/write scaling needs; added complexity and operational overhead may outweigh benefits.