home / skills / yonatangross / orchestkit / outbox-pattern

outbox-pattern skill

/plugins/ork/skills/outbox-pattern

This skill helps you implement reliable event publishing with the transactional outbox pattern, ensuring atomic writes and exactly-once semantics.

npx playbooks add skill yonatangross/orchestkit --skill outbox-pattern

Review the files below or copy the command above to add this skill to your agents.

Files (7)
SKILL.md
13.7 KB
---
name: outbox-pattern
description: Transactional outbox pattern for reliable event publishing. Use when implementing atomic writes with event delivery, ensuring exactly-once semantics, or building event-driven microservices.
context: fork
agent: event-driven-architect
version: 2.0.0
tags: [event-driven, outbox, transactions, reliability, microservices, cdc, idempotency, 2026]
allowed-tools: [Read, Write, Grep, Glob, Bash]
author: OrchestKit
user-invocable: false
---

# Outbox Pattern (2026)

Ensure atomic state changes and event publishing by writing both to a database transaction, then publishing asynchronously.

## Overview

- Ensuring database writes and event publishing are atomic
- Building reliable event-driven microservices
- Implementing exactly-once message delivery semantics
- Avoiding dual-write problems (DB + message broker)
- Decoupling domain logic from message infrastructure
- High-throughput systems needing CDC-based publishing

## Quick Reference

### Outbox Table Schema

```sql
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id UUID NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    idempotency_key VARCHAR(255) UNIQUE,  -- For consumer deduplication
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,
    retry_count INT DEFAULT 0,
    last_error TEXT
);

-- Index for polling unpublished messages
CREATE INDEX idx_outbox_unpublished ON outbox(created_at)
    WHERE published_at IS NULL;

-- Index for aggregate ordering
CREATE INDEX idx_outbox_aggregate ON outbox(aggregate_id, created_at);

-- Index for idempotency key lookups
CREATE INDEX idx_outbox_idempotency ON outbox(idempotency_key)
    WHERE idempotency_key IS NOT NULL;
```

### SQLAlchemy Model

```python
from sqlalchemy.dialects.postgresql import UUID, JSONB
import hashlib

class OutboxMessage(Base):
    __tablename__ = "outbox"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    aggregate_type = Column(String(100), nullable=False)
    aggregate_id = Column(UUID(as_uuid=True), nullable=False, index=True)
    event_type = Column(String(100), nullable=False)
    payload = Column(JSONB, nullable=False)
    idempotency_key = Column(String(255), unique=True, nullable=True)
    created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
    published_at = Column(DateTime, nullable=True)
    retry_count = Column(Integer, default=0)
    last_error = Column(Text, nullable=True)

    @staticmethod
    def generate_idempotency_key(aggregate_id: str, event_type: str, payload: dict) -> str:
        """Generate deterministic idempotency key for deduplication."""
        content = f"{aggregate_id}:{event_type}:{json.dumps(payload, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()[:32]
```

### Write to Outbox in Transaction

```python
from sqlalchemy.ext.asyncio import AsyncSession

class OrderService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def create_order(self, order_data: OrderCreate) -> Order:
        """Create order AND outbox message in single transaction."""
        # Create the order
        order = Order(**order_data.model_dump())
        self.db.add(order)

        # Create outbox message in SAME transaction
        outbox_msg = OutboxMessage(
            aggregate_type="Order",
            aggregate_id=order.id,
            event_type="OrderCreated",
            payload={
                "order_id": str(order.id),
                "customer_id": str(order.customer_id),
                "total": order.total,
            },
            idempotency_key=OutboxMessage.generate_idempotency_key(
                str(order.id), "OrderCreated", {"total": order.total}
            ),
        )
        self.db.add(outbox_msg)

        await self.db.flush()  # Both written atomically
        return order
```

### Polling Publisher

```python
class OutboxPublisher:
    """Polls outbox and publishes to message broker."""

    def __init__(self, session_factory, producer):
        self.session_factory = session_factory
        self.producer = producer

    async def publish_pending(self, batch_size: int = 100) -> int:
        async with self.session_factory() as session:
            stmt = (
                select(OutboxMessage)
                .where(OutboxMessage.published_at.is_(None))
                .order_by(OutboxMessage.created_at)
                .limit(batch_size)
                .with_for_update(skip_locked=True)  # Prevent duplicate processing
            )
            result = await session.execute(stmt)
            messages = result.scalars().all()

            published = 0
            for msg in messages:
                try:
                    await self.producer.publish(
                        topic=f"{msg.aggregate_type.lower()}-events",
                        key=str(msg.aggregate_id),
                        value={
                            "type": msg.event_type,
                            "idempotency_key": msg.idempotency_key,
                            **msg.payload
                        },
                    )
                    msg.published_at = datetime.now(timezone.utc)
                    published += 1
                except Exception as e:
                    msg.retry_count += 1
                    msg.last_error = str(e)

            await session.commit()
            return published
```

### CDC with Debezium (High-Throughput)

```yaml
# docker-compose.yml - Debezium connector
version: '3.8'
services:
  debezium:
    image: debezium/connect:2.5
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: outbox-connector
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets

# Register outbox connector
# POST http://debezium:8083/connectors
```

```json
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "app",
    "table.include.list": "public.outbox",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.topic.replacement": "${routedByValue}-events"
  }
}
```

### Idempotent Consumer

```python
class IdempotentConsumer:
    """Consumer with deduplication using idempotency keys."""

    def __init__(self, db: AsyncSession, redis: Redis):
        self.db = db
        self.redis = redis

    async def process(self, event: dict, handler) -> bool:
        """Process event idempotently - returns False if duplicate."""
        idempotency_key = event.get("idempotency_key")
        if not idempotency_key:
            # No key = always process (but risky)
            await handler(event)
            return True

        # Check Redis cache first (fast path)
        if await self.redis.exists(f"processed:{idempotency_key}"):
            return False  # Already processed

        # Check database (slow path, but durable)
        exists = await self.db.execute(
            select(ProcessedEvent)
            .where(ProcessedEvent.idempotency_key == idempotency_key)
        )
        if exists.scalar_one_or_none():
            # Cache for future fast lookups
            await self.redis.setex(f"processed:{idempotency_key}", 86400, "1")
            return False

        # Process and record
        async with self.db.begin():
            await handler(event)
            self.db.add(ProcessedEvent(idempotency_key=idempotency_key))
            await self.db.flush()

        # Cache the processed key
        await self.redis.setex(f"processed:{idempotency_key}", 86400, "1")
        return True


class ProcessedEvent(Base):
    """Track processed events for idempotency."""
    __tablename__ = "processed_events"

    idempotency_key = Column(String(255), primary_key=True)
    processed_at = Column(DateTime, default=lambda: datetime.now(timezone.utc))
```

### Dapr Outbox Integration

```python
# Using Dapr's built-in outbox support
from dapr.clients import DaprClient

async def create_order_with_dapr(order_data: dict):
    """Dapr handles outbox automatically."""
    async with DaprClient() as client:
        # Single call - Dapr ensures atomicity
        await client.save_state(
            store_name="statestore",
            key=f"order-{order_data['id']}",
            value=order_data,
            state_metadata={
                "outbox.publish": "true",
                "outbox.topic": "orders",
            }
        )
```

## Key Decisions

| Decision | Option A | Option B | Recommendation |
|----------|----------|----------|----------------|
| Delivery | Polling | CDC (Debezium) | Polling for simplicity, CDC for > 10K msg/s |
| Batch size | Small (10-50) | Large (100-500) | Start with 100, tune based on latency |
| Retry | Fixed delay | Exponential backoff | Exponential with max 5 retries |
| Cleanup | Delete published | Archive to cold storage | Archive for audit, delete after 30 days |
| Ordering | Per-aggregate | Global | Per-aggregate via partition key |
| Idempotency | Consumer-side | Built-in key | Always include idempotency key |
| Tool | Custom | Dapr | Dapr if K8s, custom otherwise |

## Outbox vs CDC Trade-offs

```
┌────────────────────────────────────────────────────────────────────────┐
│                     POLLING vs CDC COMPARISON                           │
├────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  POLLING (OutboxPublisher)          CDC (Debezium)                     │
│  ──────────────────────────         ────────────────                   │
│  ✓ Simple to implement              ✓ Higher throughput (100K+ msg/s)  │
│  ✓ No extra infrastructure          ✓ Lower latency (sub-second)       │
│  ✓ Easy to debug                    ✓ No polling overhead              │
│  ✗ Polling overhead                 ✗ Complex infrastructure           │
│  ✗ Higher latency (1-5s)            ✗ Harder to debug                  │
│  ✗ Limited throughput (~10K/s)      ✗ Requires Kafka Connect           │
│                                                                         │
│  USE WHEN:                          USE WHEN:                          │
│  - Starting out                     - High throughput required         │
│  - Simple architecture              - Sub-second latency needed        │
│  - < 10K events/second              - Already using Kafka              │
│                                                                         │
└────────────────────────────────────────────────────────────────────────┘
```

## Anti-Patterns (FORBIDDEN)

```python
# NEVER publish before commit - dual-write problem
await producer.publish(event)  # May succeed but commit may fail!
await session.commit()

# NEVER delete without publishing - events lost
await session.execute(delete(OutboxMessage).where(...))

# NEVER process without locking - causes duplicates
messages = await session.execute(select(OutboxMessage))  # No lock!

# NEVER store large payloads - use URL references instead
OutboxMessage(payload={"file": large_binary_data})

# NEVER ignore ordering - use aggregate_id as partition key
await producer.publish(event_a)  # May arrive out of order!

# NEVER skip idempotency keys
OutboxMessage(payload=event_data)  # No idempotency_key = duplicate risk

# NEVER process without idempotency check on consumer
async def handle(event):
    await process(event)  # Duplicate processing on retry!
```

## Related Skills

- `message-queues` - Kafka, RabbitMQ, Redis Streams integration
- `event-sourcing` - Full event-sourced architecture patterns
- `database-schema-designer` - Schema design and migrations
- `sqlalchemy-2-async` - Async database session patterns

## Capability Details

### outbox-schema
**Keywords:** outbox table, transactional outbox, event table, schema, idempotency
**Solves:**
- How do I design an outbox table?
- What indexes are needed for outbox?
- Outbox table best practices
- Idempotency key generation

### polling-publisher
**Keywords:** polling, publish outbox, background worker, relay
**Solves:**
- How do I publish from the outbox?
- Batch publishing patterns
- Handling publish failures
- FOR UPDATE SKIP LOCKED pattern

### cdc-debezium
**Keywords:** cdc, debezium, change data capture, kafka connect
**Solves:**
- When to use CDC vs polling?
- Debezium connector configuration
- High-throughput event publishing
- Outbox event routing

### idempotent-consumer
**Keywords:** idempotent, deduplication, exactly-once, processed events
**Solves:**
- How to handle duplicate messages?
- Idempotency key patterns
- Consumer deduplication strategies
- Redis + DB deduplication

### atomic-operations
**Keywords:** atomic, transaction, dual-write, consistency
**Solves:**
- How to avoid dual-write problems?
- Ensuring atomicity between DB and events
- Exactly-once delivery patterns

Overview

This skill implements the transactional outbox pattern for reliable event publishing in TypeScript-based systems. It helps make database state changes and event emission atomic, avoid dual-write problems, and achieve exactly-once or at-least-once delivery using polling, CDC, or platform integrations like Dapr. Use it when building event-driven microservices that require durable, ordered, and idempotent events.

How this skill works

Write domain changes and a corresponding outbox row inside the same database transaction. A background publisher or a CDC connector reads the outbox table and delivers events to the message broker. Consumers use idempotency keys and a processed-events store (cache + DB) to deduplicate and ensure safe processing. The pattern separates domain logic from message infrastructure while guaranteeing no events are lost if a commit succeeds.

When to use it

  • When you must make DB changes and event publishing atomic
  • When you need exactly-once semantics or deduplication guarantees
  • For service boundaries in event-driven microservices
  • When avoiding the dual-write problem (DB + broker)
  • If you need ordered, per-aggregate event delivery

Best practices

  • Persist outbox rows inside the same transaction as the domain write
  • Include deterministic idempotency keys for consumer deduplication
  • Use FOR UPDATE SKIP LOCKED when polling to avoid duplicate processing
  • Choose CDC (Debezium) for >10k msg/s and polling for simplicity
  • Archive or delete published rows after retention; keep audit copies if needed
  • Avoid storing large binary payloads in the outbox; store references instead

Example use cases

  • Order service: create order + enqueue OrderCreated event atomically
  • Payment flow: record transaction and outbox event to trigger downstream workflows
  • High-throughput analytics: use Debezium to stream outbox changes to Kafka topics
  • Microservice integrations: decouple internal writes from external event delivery
  • Resilient retries: publisher increments retry_count and records last_error for observability

FAQ

Should I use polling or CDC?

Start with polling for simplicity and low infrastructure cost. Move to CDC (Debezium + Kafka Connect) when you need sub-second latency or very high throughput.

How do consumers avoid duplicate processing?

Always include an idempotency_key in outbox events. Consumers check a fast cache (Redis) and a durable processed_events table before processing, then record the key atomically after handling.