home / skills / copyleftdev / sk1llz / kleppmann

This skill helps you design data-intensive systems by applying Kleppmann-style trade-offs, ensuring reliability, observability, and scalable data architectures.

npx playbooks add skill copyleftdev/sk1llz --skill kleppmann

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
12.4 KB
---
name: kleppmann-data-intensive
description: Design distributed systems in the style of Martin Kleppmann, author of "Designing Data-Intensive Applications". Emphasizes understanding data systems deeply, making informed trade-offs, and building reliable data infrastructure. Use when designing databases, streaming systems, or data pipelines.
---

# Martin Kleppmann Style Guide

## Overview

Martin Kleppmann is the author of "Designing Data-Intensive Applications" (DDIA), one of the most influential books on distributed systems and databases. He excels at explaining complex concepts clearly and helping engineers make informed architectural decisions.

## Core Philosophy

> "Reliability means making systems work correctly, even when faults occur."

> "The goal of consistency models is to provide a abstraction for application developers."

> "There's no such thing as a 'best' database—only trade-offs."

Kleppmann believes in understanding systems deeply, not just using them. Every architectural choice is a trade-off; understand what you're trading.

## Design Principles

1. **Understand the Trade-offs**: CAP, PACELC, latency vs consistency.

2. **Design for Failure**: Partial failure is the norm in distributed systems.

3. **Data Outlives Code**: Schema design and data models matter enormously.

4. **Exactly-Once Is Hard**: Understand idempotency and at-least-once semantics.

## When Writing Code

### Always

- Understand the consistency guarantees your system provides
- Design for idempotency where possible
- Think about data evolution and schema changes
- Consider exactly-once vs at-least-once semantics
- Know your data access patterns before choosing storage
- Plan for failure recovery

### Never

- Assume "eventual consistency" without understanding what it means
- Ignore the differences between isolation levels
- Couple tightly without considering failure modes
- Treat distributed transactions as a silver bullet

### Prefer

- Idempotent operations
- Append-only data structures
- Event sourcing for audit trails
- Change data capture over dual writes
- Log-based message brokers over traditional ones

## Code Patterns

### Consistency Models Illustrated

```python
# Understanding consistency models through code

class LinearizableStore:
    """
    Linearizability: operations appear atomic and instantaneous.
    Strongest consistency - single copy illusion.
    """
    def __init__(self):
        self._lock = threading.Lock()
        self._data = {}
    
    def write(self, key, value):
        with self._lock:
            self._data[key] = value
    
    def read(self, key):
        with self._lock:
            return self._data.get(key)
    
    def compare_and_set(self, key, expected, new_value):
        with self._lock:
            if self._data.get(key) == expected:
                self._data[key] = new_value
                return True
            return False


class CausallyConsistentStore:
    """
    Causal consistency: respects happens-before relationship.
    Weaker than linearizable, but allows more concurrency.
    """
    def __init__(self, node_id):
        self.node_id = node_id
        self.data = {}
        self.vector_clock = defaultdict(int)
    
    def write(self, key, value, dependencies=None):
        # Update our clock
        self.vector_clock[self.node_id] += 1
        
        # Merge dependencies
        if dependencies:
            for node, time in dependencies.items():
                self.vector_clock[node] = max(self.vector_clock[node], time)
        
        self.data[key] = {
            'value': value,
            'clock': dict(self.vector_clock)
        }
        
        return dict(self.vector_clock)
    
    def read(self, key):
        if key in self.data:
            return self.data[key]['value'], self.data[key]['clock']
        return None, dict(self.vector_clock)
```

### Event Sourcing

```python
# Event sourcing: store events, derive state

from dataclasses import dataclass
from typing import List
from datetime import datetime

@dataclass
class Event:
    event_type: str
    data: dict
    timestamp: datetime
    version: int

class EventStore:
    def __init__(self):
        self.events: List[Event] = []
        self.version = 0
    
    def append(self, event_type: str, data: dict):
        self.version += 1
        event = Event(
            event_type=event_type,
            data=data,
            timestamp=datetime.now(),
            version=self.version
        )
        self.events.append(event)
        return event
    
    def get_events(self, from_version=0):
        return [e for e in self.events if e.version > from_version]


class BankAccount:
    """Aggregate rebuilt from events"""
    
    def __init__(self, account_id: str, event_store: EventStore):
        self.account_id = account_id
        self.event_store = event_store
        self.balance = 0
        self._rebuild_state()
    
    def _rebuild_state(self):
        """Derive current state from event history"""
        for event in self.event_store.events:
            self._apply(event)
    
    def _apply(self, event: Event):
        if event.event_type == 'deposited':
            self.balance += event.data['amount']
        elif event.event_type == 'withdrawn':
            self.balance -= event.data['amount']
    
    def deposit(self, amount: float):
        event = self.event_store.append('deposited', {
            'account_id': self.account_id,
            'amount': amount
        })
        self._apply(event)
    
    def withdraw(self, amount: float):
        if amount > self.balance:
            raise ValueError("Insufficient funds")
        event = self.event_store.append('withdrawn', {
            'account_id': self.account_id,
            'amount': amount
        })
        self._apply(event)

# Benefits:
# 1. Complete audit trail
# 2. Time travel (state at any point)
# 3. Event replay for debugging
# 4. Easy to add new projections
```

### Idempotency Keys

```python
# Idempotency: safe retries without duplicate effects

import uuid
import hashlib

class IdempotentProcessor:
    def __init__(self):
        self.processed_keys = {}  # key -> result
        self.expiry_seconds = 3600
    
    def process(self, idempotency_key: str, operation):
        """
        Execute operation exactly once for a given key.
        Retries with same key return cached result.
        """
        # Check if already processed
        if idempotency_key in self.processed_keys:
            return self.processed_keys[idempotency_key]
        
        # Execute operation
        try:
            result = operation()
            self.processed_keys[idempotency_key] = {
                'status': 'success',
                'result': result
            }
            return self.processed_keys[idempotency_key]
        except Exception as e:
            # Don't cache failures (allow retry)
            raise
    
    @staticmethod
    def generate_key(*args):
        """Generate deterministic idempotency key"""
        content = '|'.join(str(arg) for arg in args)
        return hashlib.sha256(content.encode()).hexdigest()


# Usage:
processor = IdempotentProcessor()

def create_payment(user_id, amount, idempotency_key):
    def do_payment():
        # Actually create the payment
        return {'payment_id': str(uuid.uuid4()), 'amount': amount}
    
    return processor.process(idempotency_key, do_payment)

# Client can safely retry:
key = IdempotentProcessor.generate_key(user_id, amount, request_id)
result1 = create_payment(user_id, 100, key)
result2 = create_payment(user_id, 100, key)  # Same result, no duplicate payment
```

### Change Data Capture

```python
# CDC: capture database changes as a stream

from typing import Callable, List
from enum import Enum
from dataclasses import dataclass

class OperationType(Enum):
    INSERT = 'insert'
    UPDATE = 'update'
    DELETE = 'delete'

@dataclass
class ChangeEvent:
    table: str
    operation: OperationType
    key: dict
    before: dict  # For update/delete
    after: dict   # For insert/update
    timestamp: float
    sequence: int

class CDCProducer:
    """Publish changes from database write-ahead log"""
    
    def __init__(self, publisher):
        self.publisher = publisher
        self.sequence = 0
    
    def capture_insert(self, table: str, key: dict, data: dict):
        self.sequence += 1
        event = ChangeEvent(
            table=table,
            operation=OperationType.INSERT,
            key=key,
            before=None,
            after=data,
            timestamp=time.time(),
            sequence=self.sequence
        )
        self.publisher.publish(event)
    
    def capture_update(self, table: str, key: dict, before: dict, after: dict):
        self.sequence += 1
        event = ChangeEvent(
            table=table,
            operation=OperationType.UPDATE,
            key=key,
            before=before,
            after=after,
            timestamp=time.time(),
            sequence=self.sequence
        )
        self.publisher.publish(event)


class CDCConsumer:
    """Consume changes and maintain derived view"""
    
    def __init__(self):
        self.handlers: dict[str, List[Callable]] = {}
        self.last_sequence = 0
    
    def register(self, table: str, handler: Callable):
        if table not in self.handlers:
            self.handlers[table] = []
        self.handlers[table].append(handler)
    
    def process(self, event: ChangeEvent):
        # Ensure ordering
        if event.sequence <= self.last_sequence:
            return  # Already processed
        
        if event.table in self.handlers:
            for handler in self.handlers[event.table]:
                handler(event)
        
        self.last_sequence = event.sequence

# Usage: maintain search index from database changes
def update_search_index(event: ChangeEvent):
    if event.operation == OperationType.DELETE:
        search_index.delete(event.key)
    else:
        search_index.index(event.key, event.after)

consumer.register('products', update_search_index)
```

### Stream Processing

```python
# Stream processing patterns

from collections import defaultdict
from typing import Iterator, TypeVar, Callable

T = TypeVar('T')

class StreamProcessor:
    """Stateful stream processing"""
    
    def __init__(self):
        self.state = {}
    
    def process(self, stream: Iterator[T], handler: Callable[[T, dict], None]):
        """Process stream with access to state"""
        for record in stream:
            handler(record, self.state)
            yield self.state.copy()


def windowed_count(window_size_seconds: int):
    """Tumbling window aggregation"""
    def handler(event, state):
        window_start = (event['timestamp'] // window_size_seconds) * window_size_seconds
        key = (event['key'], window_start)
        
        if key not in state:
            state[key] = {'count': 0, 'window_start': window_start}
        state[key]['count'] += 1
        
        # Emit completed windows
        current_window = (time.time() // window_size_seconds) * window_size_seconds
        completed = [k for k in state if k[1] < current_window - window_size_seconds]
        for k in completed:
            emit(state.pop(k))
    
    return handler


def exactly_once_processing(processor, checkpointer):
    """
    Exactly-once semantics via:
    1. Idempotent writes
    2. Transactional checkpointing
    """
    def process_with_guarantees(records):
        for record in records:
            # Get last checkpoint
            last_offset = checkpointer.get_offset()
            
            if record.offset <= last_offset:
                continue  # Already processed
            
            # Process and checkpoint atomically
            with transaction():
                result = processor.process(record)
                checkpointer.save_offset(record.offset)
            
            yield result
    
    return process_with_guarantees
```

## Mental Model

Kleppmann approaches data systems by asking:

1. **What are the consistency requirements?** Linearizable, serializable, eventual?
2. **What are the access patterns?** Read-heavy, write-heavy, mixed?
3. **How do we handle failures?** Retries, idempotency, compensation?
4. **How does data evolve?** Schema changes, backward compatibility?
5. **What happens at the edges?** Network partitions, slow nodes?

## Signature Kleppmann Moves

- Event sourcing for auditability
- Idempotency keys for safe retries
- CDC over dual writes
- Understanding isolation levels deeply
- Log-based messaging for durability
- Making trade-offs explicit

Overview

This skill teaches designing data-intensive distributed systems in the style of Martin Kleppmann, focusing on clear trade-offs, reliability, and deep understanding of data semantics. It guides architects and engineers through choices around consistency, failure modes, data modeling, and operational guarantees. Use it to evaluate architectures, pick patterns, and produce practical, robust designs for databases, streams, and pipelines.

How this skill works

The skill inspects system requirements and maps them to Kleppmann-style design patterns: consistency models, event sourcing, CDC, idempotency, and log-based messaging. It asks targeted questions about access patterns, failure scenarios, schema evolution, and operational constraints, then recommends concrete code patterns and operational controls. Outputs include architecture trade-offs, recommended primitives, and example code snippets to illustrate implementation choices.

When to use it

  • Selecting a storage or messaging system where trade-offs must be explicit (CAP, PACELC).
  • Designing data pipelines, change-data-capture flows, or stream processing topologies.
  • Defining consistency and isolation guarantees for a new service or API.
  • Migrating schemas or planning long-lived data evolution strategies.
  • Implementing retries, idempotency, and failure-handling for distributed operations.

Best practices

  • Make trade-offs explicit: document what you gain and what you accept to lose.
  • Design for partial failure: assume components fail and plan retries, timeouts, and compensations.
  • Prefer idempotent APIs and idempotency keys for safe retries.
  • Use append-only or event-sourced stores for auditability and easy replay.
  • Choose CDC or log-based replication instead of fragile dual-writes.
  • Define and test consistency guarantees; do not treat 'eventual' as a catch-all.

Example use cases

  • Designing a payment service that must avoid duplicate charges using idempotency keys and exactly-once-like workflows.
  • Building a search index updated from a primary database with CDC to ensure ordered, reliable updates.
  • Choosing between a strongly consistent primary-replica DB and a partition-tolerant eventual system for a geo-distributed app.
  • Implementing stream processing with windowed aggregations and careful checkpointing for recoverability.
  • Planning a schema migration strategy that supports backward compatibility and event replay.

FAQ

When should I use event sourcing vs a traditional CRUD model?

Use event sourcing when auditability, time-travel, and replayability are priorities. For simple workloads with limited event complexity, CRUD may be simpler; weigh operational cost and query patterns.

Is exactly-once processing achievable?

True exactly-once is hard; aim for idempotent operations plus transactional checkpointing or atomic writes to emulate exactly-once guarantees in practice.