home / skills / doanchienthangdev / omgkit / event-driven-architecture

This skill enables building scalable event-driven systems with event sourcing, CQRS, and messaging patterns using queues and projections.

npx playbooks add skill doanchienthangdev/omgkit --skill event-driven-architecture

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
18.4 KB
---
name: event-driven-architecture
description: Event sourcing, CQRS, and message queue patterns with RabbitMQ and Kafka for distributed systems
category: backend
triggers:
  - event driven
  - event sourcing
  - cqrs
  - message queue
  - rabbitmq
  - kafka
  - saga pattern
---

# Event-Driven Architecture

Implement **event-driven systems** with event sourcing, CQRS, and message queues. This skill covers distributed patterns for scalable, resilient applications.

## Purpose

Build loosely coupled, scalable systems:

- Implement event sourcing for audit trails
- Apply CQRS for read/write optimization
- Use message queues for async processing
- Handle distributed transactions with sagas
- Ensure eventual consistency
- Build replay and recovery capabilities

## Features

### 1. Event Sourcing

```typescript
// Event definitions
interface DomainEvent {
  eventId: string;
  eventType: string;
  aggregateId: string;
  aggregateType: string;
  timestamp: Date;
  version: number;
  data: Record<string, any>;
  metadata: {
    userId?: string;
    correlationId?: string;
    causationId?: string;
  };
}

// Order aggregate events
type OrderEvent =
  | { type: 'OrderCreated'; data: { customerId: string; items: OrderItem[] } }
  | { type: 'OrderItemAdded'; data: { item: OrderItem } }
  | { type: 'OrderItemRemoved'; data: { itemId: string } }
  | { type: 'OrderSubmitted'; data: { submittedAt: Date } }
  | { type: 'PaymentReceived'; data: { paymentId: string; amount: number } }
  | { type: 'OrderShipped'; data: { trackingNumber: string; carrier: string } }
  | { type: 'OrderDelivered'; data: { deliveredAt: Date } }
  | { type: 'OrderCancelled'; data: { reason: string } };

// Event store
class EventStore {
  async append(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    // Optimistic concurrency check
    const currentVersion = await this.getVersion(aggregateId);

    if (currentVersion !== expectedVersion) {
      throw new ConcurrencyError(
        `Expected version ${expectedVersion}, but found ${currentVersion}`
      );
    }

    // Append events atomically
    await db.$transaction(async (tx) => {
      for (let i = 0; i < events.length; i++) {
        await tx.event.create({
          data: {
            ...events[i],
            version: expectedVersion + i + 1,
          },
        });
      }
    });

    // Publish to event bus
    for (const event of events) {
      await eventBus.publish(event);
    }
  }

  async getEvents(
    aggregateId: string,
    fromVersion?: number
  ): Promise<DomainEvent[]> {
    return db.event.findMany({
      where: {
        aggregateId,
        version: fromVersion ? { gt: fromVersion } : undefined,
      },
      orderBy: { version: 'asc' },
    });
  }

  async getVersion(aggregateId: string): Promise<number> {
    const lastEvent = await db.event.findFirst({
      where: { aggregateId },
      orderBy: { version: 'desc' },
    });

    return lastEvent?.version ?? 0;
  }
}

// Aggregate with event sourcing
class OrderAggregate {
  private id: string;
  private state: OrderState;
  private version: number = 0;
  private uncommittedEvents: OrderEvent[] = [];

  static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> {
    const aggregate = new OrderAggregate(id);
    const events = await eventStore.getEvents(id);

    for (const event of events) {
      aggregate.apply(event, false);
    }

    return aggregate;
  }

  // Command handlers
  create(customerId: string, items: OrderItem[]): void {
    if (this.state) {
      throw new Error('Order already exists');
    }

    this.applyChange({
      type: 'OrderCreated',
      data: { customerId, items },
    });
  }

  addItem(item: OrderItem): void {
    this.ensureState(['draft']);

    this.applyChange({
      type: 'OrderItemAdded',
      data: { item },
    });
  }

  submit(): void {
    this.ensureState(['draft']);

    if (this.state.items.length === 0) {
      throw new Error('Cannot submit empty order');
    }

    this.applyChange({
      type: 'OrderSubmitted',
      data: { submittedAt: new Date() },
    });
  }

  // Event application
  private apply(event: OrderEvent, isNew: boolean): void {
    switch (event.type) {
      case 'OrderCreated':
        this.state = {
          status: 'draft',
          customerId: event.data.customerId,
          items: event.data.items,
          total: this.calculateTotal(event.data.items),
        };
        break;

      case 'OrderItemAdded':
        this.state.items.push(event.data.item);
        this.state.total = this.calculateTotal(this.state.items);
        break;

      case 'OrderSubmitted':
        this.state.status = 'submitted';
        this.state.submittedAt = event.data.submittedAt;
        break;

      // ... other event handlers
    }

    this.version++;

    if (isNew) {
      this.uncommittedEvents.push(event);
    }
  }

  private applyChange(event: OrderEvent): void {
    this.apply(event, true);
  }

  async save(eventStore: EventStore): Promise<void> {
    const domainEvents = this.uncommittedEvents.map((e, i) => ({
      eventId: uuid(),
      eventType: e.type,
      aggregateId: this.id,
      aggregateType: 'Order',
      timestamp: new Date(),
      version: this.version - this.uncommittedEvents.length + i + 1,
      data: e.data,
      metadata: {},
    }));

    await eventStore.append(
      this.id,
      domainEvents,
      this.version - this.uncommittedEvents.length
    );

    this.uncommittedEvents = [];
  }
}
```

### 2. CQRS Pattern

```typescript
// Command side (writes)
interface Command {
  type: string;
  payload: any;
  metadata: {
    userId: string;
    timestamp: Date;
    correlationId: string;
  };
}

class CommandBus {
  private handlers = new Map<string, CommandHandler>();

  register(commandType: string, handler: CommandHandler): void {
    this.handlers.set(commandType, handler);
  }

  async dispatch(command: Command): Promise<void> {
    const handler = this.handlers.get(command.type);

    if (!handler) {
      throw new Error(`No handler for command: ${command.type}`);
    }

    await handler.handle(command);
  }
}

// Command handler
class CreateOrderHandler implements CommandHandler {
  constructor(
    private eventStore: EventStore,
    private orderRepository: OrderRepository
  ) {}

  async handle(command: CreateOrderCommand): Promise<void> {
    const order = new OrderAggregate(uuid());
    order.create(command.payload.customerId, command.payload.items);
    await order.save(this.eventStore);
  }
}

// Query side (reads)
interface Query {
  type: string;
  params: any;
}

class QueryBus {
  private handlers = new Map<string, QueryHandler>();

  register(queryType: string, handler: QueryHandler): void {
    this.handlers.set(queryType, handler);
  }

  async execute<T>(query: Query): Promise<T> {
    const handler = this.handlers.get(query.type);

    if (!handler) {
      throw new Error(`No handler for query: ${query.type}`);
    }

    return handler.handle(query);
  }
}

// Read model projection
class OrderReadModel {
  async project(event: DomainEvent): Promise<void> {
    switch (event.eventType) {
      case 'OrderCreated':
        await db.orderView.create({
          data: {
            id: event.aggregateId,
            customerId: event.data.customerId,
            status: 'draft',
            itemCount: event.data.items.length,
            total: event.data.total,
            createdAt: event.timestamp,
          },
        });
        break;

      case 'OrderSubmitted':
        await db.orderView.update({
          where: { id: event.aggregateId },
          data: {
            status: 'submitted',
            submittedAt: event.data.submittedAt,
          },
        });
        break;

      case 'OrderShipped':
        await db.orderView.update({
          where: { id: event.aggregateId },
          data: {
            status: 'shipped',
            trackingNumber: event.data.trackingNumber,
          },
        });
        break;
    }
  }

  // Rebuild projection from events
  async rebuild(): Promise<void> {
    // Clear existing read model
    await db.orderView.deleteMany();

    // Replay all events
    const events = await eventStore.getAllEvents();

    for (const event of events) {
      await this.project(event);
    }
  }
}
```

### 3. Message Queues with RabbitMQ

```typescript
import amqp from 'amqplib';

class RabbitMQBroker {
  private connection: amqp.Connection;
  private channel: amqp.Channel;

  async connect(): Promise<void> {
    this.connection = await amqp.connect(process.env.RABBITMQ_URL!);
    this.channel = await this.connection.createChannel();

    // Setup exchanges
    await this.channel.assertExchange('events', 'topic', { durable: true });
    await this.channel.assertExchange('commands', 'direct', { durable: true });
    await this.channel.assertExchange('dlx', 'fanout', { durable: true });
  }

  async publish(exchange: string, routingKey: string, message: any): Promise<void> {
    const content = Buffer.from(JSON.stringify(message));

    this.channel.publish(exchange, routingKey, content, {
      persistent: true,
      contentType: 'application/json',
      messageId: uuid(),
      timestamp: Date.now(),
    });
  }

  async subscribe(
    queue: string,
    exchange: string,
    routingKey: string,
    handler: (message: any) => Promise<void>
  ): Promise<void> {
    // Setup queue with dead letter exchange
    await this.channel.assertQueue(queue, {
      durable: true,
      deadLetterExchange: 'dlx',
      deadLetterRoutingKey: `${queue}.dlq`,
    });

    await this.channel.bindQueue(queue, exchange, routingKey);

    // Consume messages
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const content = JSON.parse(msg.content.toString());
        await handler(content);
        this.channel.ack(msg);
      } catch (error) {
        console.error('Message processing failed:', error);

        // Retry or dead-letter
        const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;

        if (retryCount < 3) {
          // Retry with exponential backoff
          setTimeout(() => {
            this.channel.publish(exchange, routingKey, msg.content, {
              ...msg.properties,
              headers: {
                ...msg.properties.headers,
                'x-retry-count': retryCount,
              },
            });
            this.channel.ack(msg);
          }, Math.pow(2, retryCount) * 1000);
        } else {
          // Send to dead letter queue
          this.channel.reject(msg, false);
        }
      }
    });
  }
}

// Event publishing
class EventPublisher {
  constructor(private broker: RabbitMQBroker) {}

  async publish(event: DomainEvent): Promise<void> {
    const routingKey = `${event.aggregateType}.${event.eventType}`;
    await this.broker.publish('events', routingKey, event);
  }
}

// Event consumer
class OrderEventConsumer {
  constructor(
    private broker: RabbitMQBroker,
    private readModel: OrderReadModel
  ) {}

  async start(): Promise<void> {
    await this.broker.subscribe(
      'order-projector',
      'events',
      'Order.*',
      async (event) => {
        await this.readModel.project(event);
      }
    );
  }
}
```

### 4. Saga Pattern for Distributed Transactions

```typescript
// Saga orchestrator
interface SagaStep {
  name: string;
  execute: (context: SagaContext) => Promise<void>;
  compensate: (context: SagaContext) => Promise<void>;
}

class SagaOrchestrator {
  private steps: SagaStep[] = [];
  private executedSteps: SagaStep[] = [];

  addStep(step: SagaStep): this {
    this.steps.push(step);
    return this;
  }

  async execute(context: SagaContext): Promise<void> {
    try {
      for (const step of this.steps) {
        console.log(`Executing step: ${step.name}`);
        await step.execute(context);
        this.executedSteps.push(step);
      }
    } catch (error) {
      console.error('Saga failed, compensating...', error);
      await this.compensate(context);
      throw error;
    }
  }

  private async compensate(context: SagaContext): Promise<void> {
    // Execute compensation in reverse order
    for (const step of this.executedSteps.reverse()) {
      try {
        console.log(`Compensating step: ${step.name}`);
        await step.compensate(context);
      } catch (error) {
        console.error(`Compensation failed for ${step.name}:`, error);
        // Log for manual intervention
        await this.logCompensationFailure(step, context, error);
      }
    }
  }
}

// Order saga example
const createOrderSaga = new SagaOrchestrator()
  .addStep({
    name: 'Reserve Inventory',
    execute: async (ctx) => {
      const reservation = await inventoryService.reserve(ctx.items);
      ctx.reservationId = reservation.id;
    },
    compensate: async (ctx) => {
      if (ctx.reservationId) {
        await inventoryService.releaseReservation(ctx.reservationId);
      }
    },
  })
  .addStep({
    name: 'Process Payment',
    execute: async (ctx) => {
      const payment = await paymentService.charge(ctx.customerId, ctx.total);
      ctx.paymentId = payment.id;
    },
    compensate: async (ctx) => {
      if (ctx.paymentId) {
        await paymentService.refund(ctx.paymentId);
      }
    },
  })
  .addStep({
    name: 'Create Order',
    execute: async (ctx) => {
      const order = await orderService.create({
        customerId: ctx.customerId,
        items: ctx.items,
        paymentId: ctx.paymentId,
        reservationId: ctx.reservationId,
      });
      ctx.orderId = order.id;
    },
    compensate: async (ctx) => {
      if (ctx.orderId) {
        await orderService.cancel(ctx.orderId);
      }
    },
  })
  .addStep({
    name: 'Send Confirmation',
    execute: async (ctx) => {
      await notificationService.sendOrderConfirmation(ctx.orderId);
    },
    compensate: async (ctx) => {
      // No compensation needed for notifications
    },
  });

// Execute saga
async function handleCreateOrder(command: CreateOrderCommand): Promise<void> {
  const context: SagaContext = {
    customerId: command.customerId,
    items: command.items,
    total: calculateTotal(command.items),
  };

  await createOrderSaga.execute(context);
}
```

### 5. Kafka Streaming

```typescript
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';

class KafkaService {
  private kafka: Kafka;
  private producer: Producer;
  private consumers: Map<string, Consumer> = new Map();

  constructor() {
    this.kafka = new Kafka({
      clientId: process.env.SERVICE_NAME,
      brokers: (process.env.KAFKA_BROKERS || '').split(','),
    });
  }

  async connect(): Promise<void> {
    this.producer = this.kafka.producer({
      idempotent: true,
      maxInFlightRequests: 5,
    });

    await this.producer.connect();
  }

  async publish(topic: string, messages: KafkaMessage[]): Promise<void> {
    await this.producer.send({
      topic,
      messages: messages.map(m => ({
        key: m.key,
        value: JSON.stringify(m.value),
        headers: m.headers,
        partition: m.partition,
      })),
    });
  }

  async subscribe(
    groupId: string,
    topics: string[],
    handler: (payload: EachMessagePayload) => Promise<void>
  ): Promise<void> {
    const consumer = this.kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000,
    });

    await consumer.connect();
    await consumer.subscribe({ topics, fromBeginning: false });

    await consumer.run({
      eachMessage: async (payload) => {
        try {
          await handler(payload);
        } catch (error) {
          console.error('Message processing failed:', error);
          // Implement retry/DLQ logic
        }
      },
    });

    this.consumers.set(groupId, consumer);
  }

  async disconnect(): Promise<void> {
    await this.producer.disconnect();
    for (const consumer of this.consumers.values()) {
      await consumer.disconnect();
    }
  }
}

// Stream processing
class OrderStreamProcessor {
  constructor(private kafka: KafkaService) {}

  async start(): Promise<void> {
    await this.kafka.subscribe(
      'order-processor',
      ['order-events'],
      async ({ topic, partition, message }) => {
        const event = JSON.parse(message.value?.toString() || '{}');

        switch (event.type) {
          case 'OrderCreated':
            await this.handleOrderCreated(event);
            break;
          case 'OrderCompleted':
            await this.handleOrderCompleted(event);
            break;
        }
      }
    );
  }

  private async handleOrderCreated(event: any): Promise<void> {
    // Update analytics
    await analyticsService.recordOrder(event.data);

    // Trigger downstream processes
    await this.kafka.publish('inventory-commands', [{
      key: event.aggregateId,
      value: {
        type: 'ReserveInventory',
        orderId: event.aggregateId,
        items: event.data.items,
      },
    }]);
  }
}
```

## Use Cases

### 1. Order Processing System

```typescript
// Complete order workflow
async function processOrder(orderId: string): Promise<void> {
  const saga = new SagaOrchestrator()
    .addStep(reserveInventoryStep)
    .addStep(processPaymentStep)
    .addStep(createShipmentStep)
    .addStep(sendNotificationStep);

  await saga.execute({ orderId });
}
```

### 2. Real-time Analytics

```typescript
// Stream aggregation
const orderTotalsStream = kafka.subscribe(
  'analytics-aggregator',
  ['order-events'],
  async (event) => {
    await updateDailySales(event.data.total);
    await updateProductMetrics(event.data.items);
  }
);
```

## Best Practices

### Do's

- **Design events as facts** - Immutable, past-tense naming
- **Implement idempotent handlers** - Handle duplicates gracefully
- **Plan for event versioning** - Schema evolution
- **Use dead letter queues** - Handle failures
- **Monitor queue depths** - Alert on backlogs
- **Test with chaos** - Simulate failures

### Don'ts

- Don't couple services through shared databases
- Don't ignore message ordering requirements
- Don't skip compensation logic
- Don't forget about exactly-once semantics
- Don't over-engineer for simple use cases
- Don't ignore backpressure

## Related Skills

- **redis** - Pub/sub and caching
- **real-time-systems** - WebSocket integration
- **backend-development** - Service architecture

## Reference Resources

- [Event Sourcing Pattern](https://martinfowler.com/eaaDev/EventSourcing.html)
- [CQRS Pattern](https://martinfowler.com/bliki/CQRS.html)
- [RabbitMQ Documentation](https://www.rabbitmq.com/documentation.html)
- [Apache Kafka Documentation](https://kafka.apache.org/documentation/)

Overview

This skill implements event-driven architecture patterns for JavaScript systems, focusing on event sourcing, CQRS, and reliable message queues using RabbitMQ and Kafka concepts. It provides primitives for durable event stores, read-model projections, command/query buses, message brokers, and saga-based distributed transactions. The goal is to help build scalable, loosely coupled services with replayable audit trails and eventual consistency.

How this skill works

Event sourcing persists all domain events to an append-only event store and rehydrates aggregates by replaying events. CQRS separates command handling (writes) from read models (projections) so writes are processed via a CommandBus and reads served from denormalized views built by projecting events. Message brokers publish and subscribe to events and commands with retry and dead-letter handling, while sagas orchestrate multi-step distributed transactions with compensating actions on failure.

When to use it

  • When you need full auditability and the ability to replay state changes
  • When read and write workloads have very different performance or scaling needs
  • For complex distributed flows that require coordination and compensation
  • When eventual consistency is acceptable and isolation across services is required
  • To decouple services and enable async, resilient communication

Best practices

  • Define clear, versioned domain events and include metadata (correlationId, causationId, userId)
  • Use optimistic concurrency on event append to detect conflicting updates
  • Keep projections idempotent and provide a rebuild process for read models
  • Use exponential backoff, retry limits and dead-letter queues for resilient message processing
  • Design sagas with explicit compensation logic and persistent saga state for recovery

Example use cases

  • E-commerce order lifecycle: event-sourced orders, read models for storefront queries, and a saga to coordinate inventory, payment, and shipping
  • Financial ledger where immutable event streams provide regulatory audit trails and replayability
  • Microservices that publish domain events to RabbitMQ/Kafka for asynchronous integration and eventual consistency
  • High-throughput systems separating heavy write logic from optimized read views for dashboards and analytics

FAQ

How do I handle schema or event version changes?

Version events explicitly, migrate projections via replay or transformation layers, and keep backward-compatible deserializers when possible.

When should I use a saga vs a two-phase commit?

Prefer sagas for cross-service, long-running processes where distributed locking or synchronous transactions are impractical; use two-phase commit only when strict atomicity across services is mandatory and supported.