home / skills / ancoleman / ai-design-components / using-message-queues

using-message-queues skill

/skills/using-message-queues

This skill helps you design and implement asynchronous messaging using brokers like Kafka, RabbitMQ, and Redis to improve reliability and scalability.

npx playbooks add skill ancoleman/ai-design-components --skill using-message-queues

Review the files below or copy the command above to add this skill to your agents.

Files (20)
SKILL.md
11.4 KB
---
name: using-message-queues
description: Async communication patterns using message brokers and task queues. Use when building event-driven systems, background job processing, or service decoupling. Covers Kafka (event streaming), RabbitMQ (complex routing), NATS (cloud-native), Redis Streams, Celery (Python), BullMQ (TypeScript), Temporal (workflows), and event sourcing patterns.
---

# Message Queues

Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.

## When to Use This Skill

Use message queues when:
- **Long-running operations** block HTTP requests (report generation, video processing)
- **Service decoupling** required (microservices, event-driven architecture)
- **Guaranteed delivery** needed (payment processing, order fulfillment)
- **Event streaming** for analytics (log aggregation, metrics pipelines)
- **Workflow orchestration** for complex processes (multi-step sagas, human-in-the-loop)
- **Background job processing** (email sending, image resizing)

## Broker Selection Decision Tree

Choose message broker based on primary need:

### Event Streaming / Log Aggregation
**→ Apache Kafka**
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing

### Simple Background Jobs
**→ Task Queues**
- **Python** → Celery + Redis
- **TypeScript** → BullMQ + Redis
- **Go** → Asynq + Redis
- Use: Email sending, report generation, webhooks

### Complex Workflows / Sagas
**→ Temporal**
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration

### Request-Reply / RPC Patterns
**→ NATS**
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control

### Complex Message Routing
**→ RabbitMQ**
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub

### Already Using Redis
**→ Redis Streams**
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues

## Performance Comparison

| Broker | Throughput | Latency (p99) | Best For |
|--------|-----------|---------------|----------|
| **Kafka** | 500K-1M msg/s | 10-50ms | Event streaming |
| **NATS JetStream** | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| **RabbitMQ** | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| **Redis Streams** | 100K+ msg/s | Sub-ms | Simple queues, caching |

## Quick Start Examples

### Kafka Producer/Consumer (Python)
See `examples/kafka-python/` for working code.

```python
from confluent_kafka import Producer, Consumer

# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processors',
    'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])

while True:
    msg = consumer.poll(1.0)
    if msg is not None:
        process_order(msg.value())
```

### Celery Background Jobs (Python)
See `examples/celery-image-processing/` for full implementation.

```python
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379')

@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
    try:
        result = expensive_image_processing(image_url)
        return result
    except RecoverableError as e:
        raise self.retry(exc=e, countdown=60)
```

### BullMQ Job Processing (TypeScript)
See `examples/bullmq-webhook-processor/` for full implementation.

```typescript
import { Queue, Worker } from 'bullmq'

const queue = new Queue('webhooks', {
  connection: { host: 'localhost', port: 6379 }
})

// Enqueue job
await queue.add('send-webhook', {
  url: 'https://example.com/webhook',
  payload: { event: 'order.created' }
})

// Process jobs
const worker = new Worker('webhooks', async job => {
  await fetch(job.data.url, {
    method: 'POST',
    body: JSON.stringify(job.data.payload)
  })
}, { connection: { host: 'localhost', port: 6379 } })
```

### Temporal Workflow Orchestration
See `examples/temporal-order-saga/` for saga pattern implementation.

```python
from temporalio import workflow, activity
from datetime import timedelta

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Step 1: Reserve inventory
        inventory_id = await workflow.execute_activity(
            reserve_inventory,
            order_id,
            start_to_close_timeout=timedelta(seconds=10),
        )

        # Step 2: Charge payment
        payment_id = await workflow.execute_activity(
            charge_payment,
            order_id,
            start_to_close_timeout=timedelta(seconds=30),
        )

        return f"Order {order_id} completed"
```

## Core Patterns

### Event Naming Convention
Use: `Domain.Entity.Action.Version`

Examples:
- `order.created.v1`
- `user.profile.updated.v2`
- `payment.failed.v1`

### Event Schema Structure
```json
{
  "event_type": "order.created.v2",
  "event_id": "uuid-here",
  "timestamp": "2025-12-02T10:00:00Z",
  "version": "2.0",
  "data": {
    "order_id": "ord_123",
    "customer_id": "cus_456"
  },
  "metadata": {
    "producer": "order-service",
    "trace_id": "abc123",
    "correlation_id": "xyz789"
  }
}
```

### Dead Letter Queue Pattern
Route failed messages to dead letter queue (DLQ) after max retries:

```python
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
    try:
        result = perform_processing(order_id)
        return result
    except UnrecoverableError as e:
        send_to_dlq(order_id, str(e))
        raise Reject(e, requeue=False)
```

### Idempotency for Exactly-Once Processing
```python
@app.post("/process")
async def process_payment(
    payment_data: dict,
    idempotency_key: str = Header(None)
):
    # Check if already processed
    cached_result = redis_client.get(f"idempotency:{idempotency_key}")
    if cached_result:
        return {"status": "already_processed"}

    result = process_payment_logic(payment_data)
    redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
    return {"status": "processed", "result": result}
```

## Frontend Integration

### Job Status Updates via SSE
```python
# FastAPI endpoint for real-time job status
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
    async def event_generator():
        while True:
            task = celery_app.AsyncResult(task_id)

            if task.state == 'PROGRESS':
                yield {"event": "progress", "data": task.info.get('progress', 0)}
            elif task.state == 'SUCCESS':
                yield {"event": "complete", "data": task.result}
                break

            await asyncio.sleep(0.5)

    return EventSourceResponse(event_generator())
```

### React Component
```typescript
export function JobStatus({ jobId }: { jobId: string }) {
  const [progress, setProgress] = useState(0)

  useEffect(() => {
    const eventSource = new EventSource(`/api/status/${jobId}`)

    eventSource.addEventListener('progress', (e) => {
      setProgress(JSON.parse(e.data))
    })

    eventSource.addEventListener('complete', (e) => {
      toast({ title: 'Job complete', description: JSON.parse(e.data) })
      eventSource.close()
    })

    return () => eventSource.close()
  }, [jobId])

  return <ProgressBar value={progress} />
}
```

## Detailed Guides

For comprehensive documentation, see reference files:

### Broker-Specific Guides
- **Kafka**: See `references/kafka.md` for partitioning, consumer groups, exactly-once semantics
- **RabbitMQ**: See `references/rabbitmq.md` for exchanges, bindings, routing patterns
- **NATS**: See `references/nats.md` for JetStream, request-reply patterns
- **Redis Streams**: See `references/redis-streams.md` for consumer groups, acknowledgments

### Task Queue Guides
- **Celery**: See `references/celery.md` for periodic tasks, canvas (workflows), monitoring
- **BullMQ**: See `references/bullmq.md` for job prioritization, flows, Bull Board monitoring
- **Temporal**: See `references/temporal-workflows.md` for saga patterns, signals, queries

### Pattern Guides
- **Event Patterns**: See `references/event-patterns.md` for event sourcing, CQRS, outbox pattern

## Common Anti-Patterns to Avoid

### 1. Synchronous API for Long Operations
```python
# ❌ BAD: Blocks request thread
@app.post("/generate-report")
def generate_report(user_id: str):
    report = expensive_computation(user_id)  # 5 minutes!
    return report

# ✅ GOOD: Enqueue background job
@app.post("/generate-report")
async def generate_report(user_id: str):
    task = generate_report_task.delay(user_id)
    return {"task_id": task.id}
```

### 2. Non-Idempotent Consumers
```python
# ❌ BAD: Processes duplicates
@app.task
def send_email(email: str):
    send_email_service(email)  # Sends twice if retried!

# ✅ GOOD: Idempotent with deduplication
@app.task
def send_email(email: str, idempotency_key: str):
    if redis.exists(f"sent:{idempotency_key}"):
        return "already_sent"
    send_email_service(email)
    redis.setex(f"sent:{idempotency_key}", 86400, "1")
```

### 3. Ignoring Dead Letter Queues
```python
# ❌ BAD: Failed messages lost forever
@app.task(max_retries=3)
def risky_task(data):
    process(data)  # If all retries fail, data disappears

# ✅ GOOD: DLQ for manual inspection
@app.task(max_retries=3)
def risky_task(data):
    try:
        process(data)
    except Exception as e:
        if self.request.retries >= 3:
            send_to_dlq(data, str(e))
        raise
```

### 4. Using Kafka for Request-Reply
```python
# ❌ BAD: Kafka is not designed for RPC
def get_user_profile(user_id: str):
    kafka_producer.send("user_requests", {"user_id": user_id})
    # How to correlate response? Kafka is asynchronous!

# ✅ GOOD: Use NATS request-reply or HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())
```

## Library Recommendations

### Context7 Research

**Confluent Kafka (Python)**
- Context7 ID: `/confluentinc/confluent-kafka-python`
- Trust Score: 68.8/100
- Code Snippets: 192+
- Production-ready Python Kafka client

**Temporal**
- Context7 ID: `/websites/temporal_io`
- Trust Score: 80.9/100
- Code Snippets: 3,769+
- Workflow orchestration for durable execution

### Installation

**Python:**
```bash
pip install confluent-kafka celery[redis] temporalio aio-pika redis
```

**TypeScript/Node.js:**
```bash
npm install kafkajs bullmq @temporalio/client amqplib ioredis
```

**Rust:**
```bash
cargo add rdkafka lapin async-nats redis
```

**Go:**
```bash
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
```

## Utilities

Use scripts for setup automation:

- **Kafka setup**: Run `python scripts/kafka_producer_consumer.py` for test utilities
- **Schema validation**: Run `python scripts/validate_message_schema.py` to validate event schemas

## Related Skills

- **api-patterns**: API design for async job submission
- **realtime-sync**: WebSocket/SSE for job status updates
- **feedback**: Toast notifications for job completion
- **databases-***: Persistent storage for event logs
- **observability**: Tracing and metrics for queue operations

Overview

This skill teaches asynchronous communication using message brokers and task queues for event-driven systems, background jobs, and service decoupling. It compares brokers (Kafka, RabbitMQ, NATS, Redis Streams), task queues (Celery, BullMQ), and orchestration tools (Temporal), and provides concrete examples and patterns for production use. The content focuses on broker selection, event schema, idempotency, DLQs, and frontend integration for job status.

How this skill works

The skill inspects use cases (throughput, latency, routing, durability) and maps them to appropriate brokers and libraries. It provides code examples for producers/consumers, task workers, and workflows, plus patterns like event naming, schema shape, DLQs, and idempotency. It also includes frontend patterns (SSE/React) for real-time job status and practical anti-patterns to avoid.

When to use it

  • Long-running operations that must not block HTTP requests (reports, media processing)
  • Decoupling microservices or implementing event-driven architectures and CQRS
  • Guaranteed delivery or replayable event streams for payments, orders, and analytics
  • Background job processing for emails, image resizing, webhooks
  • Complex workflows, sagas, and human-in-the-loop orchestration
  • Request-reply/RPC with low-latency microservices or IoT command/control

Best practices

  • Choose broker by primary need: Kafka for streaming, RabbitMQ for complex routing, NATS for RPC, Redis Streams for simple queues, Temporal for durable workflows
  • Design event names and schemas clearly (Domain.Entity.Action.Version) and include metadata for tracing and correlation
  • Make consumers idempotent and use an idempotency key or dedupe store (Redis) to ensure exactly-once semantics where needed
  • Implement dead letter queues and monitoring to surface persistent failures for manual inspection
  • Avoid synchronous heavy work in HTTP handlers: enqueue tasks and expose status endpoints or SSE for client updates
  • Use retries with exponential backoff and classify recoverable vs unrecoverable errors to route failures appropriately

Example use cases

  • High-throughput analytics pipeline with Kafka for long-term retention and event sourcing
  • Background image processing using Celery + Redis or BullMQ for TypeScript systems
  • Order processing saga implemented with Temporal to coordinate inventory, payment, and compensating actions
  • Microservice RPC and sub-ms commands using NATS request-reply
  • Notification or simple job queue on existing Redis infrastructure via Redis Streams

FAQ

Which broker should I pick for low-latency RPC?

Use NATS for sub-millisecond request-reply; avoid Kafka for RPC patterns.

How do I prevent duplicate processing?

Use idempotency keys stored in a fast store (Redis) and design consumers to check/record processing state before side effects.

When should I use Temporal instead of a task queue?

Choose Temporal for multi-step durable workflows, sagas, human approvals, or when execution must survive restarts and long waits.