home / skills / copyleftdev / sk1llz / kleppmann
This skill helps you design data-intensive systems by applying Kleppmann-style trade-offs, ensuring reliability, observability, and scalable data architectures.
npx playbooks add skill copyleftdev/sk1llz --skill kleppmannReview the files below or copy the command above to add this skill to your agents.
---
name: kleppmann-data-intensive
description: Design distributed systems in the style of Martin Kleppmann, author of "Designing Data-Intensive Applications". Emphasizes understanding data systems deeply, making informed trade-offs, and building reliable data infrastructure. Use when designing databases, streaming systems, or data pipelines.
---
# Martin Kleppmann Style Guide
## Overview
Martin Kleppmann is the author of "Designing Data-Intensive Applications" (DDIA), one of the most influential books on distributed systems and databases. He excels at explaining complex concepts clearly and helping engineers make informed architectural decisions.
## Core Philosophy
> "Reliability means making systems work correctly, even when faults occur."
> "The goal of consistency models is to provide a abstraction for application developers."
> "There's no such thing as a 'best' database—only trade-offs."
Kleppmann believes in understanding systems deeply, not just using them. Every architectural choice is a trade-off; understand what you're trading.
## Design Principles
1. **Understand the Trade-offs**: CAP, PACELC, latency vs consistency.
2. **Design for Failure**: Partial failure is the norm in distributed systems.
3. **Data Outlives Code**: Schema design and data models matter enormously.
4. **Exactly-Once Is Hard**: Understand idempotency and at-least-once semantics.
## When Writing Code
### Always
- Understand the consistency guarantees your system provides
- Design for idempotency where possible
- Think about data evolution and schema changes
- Consider exactly-once vs at-least-once semantics
- Know your data access patterns before choosing storage
- Plan for failure recovery
### Never
- Assume "eventual consistency" without understanding what it means
- Ignore the differences between isolation levels
- Couple tightly without considering failure modes
- Treat distributed transactions as a silver bullet
### Prefer
- Idempotent operations
- Append-only data structures
- Event sourcing for audit trails
- Change data capture over dual writes
- Log-based message brokers over traditional ones
## Code Patterns
### Consistency Models Illustrated
```python
# Understanding consistency models through code
class LinearizableStore:
"""
Linearizability: operations appear atomic and instantaneous.
Strongest consistency - single copy illusion.
"""
def __init__(self):
self._lock = threading.Lock()
self._data = {}
def write(self, key, value):
with self._lock:
self._data[key] = value
def read(self, key):
with self._lock:
return self._data.get(key)
def compare_and_set(self, key, expected, new_value):
with self._lock:
if self._data.get(key) == expected:
self._data[key] = new_value
return True
return False
class CausallyConsistentStore:
"""
Causal consistency: respects happens-before relationship.
Weaker than linearizable, but allows more concurrency.
"""
def __init__(self, node_id):
self.node_id = node_id
self.data = {}
self.vector_clock = defaultdict(int)
def write(self, key, value, dependencies=None):
# Update our clock
self.vector_clock[self.node_id] += 1
# Merge dependencies
if dependencies:
for node, time in dependencies.items():
self.vector_clock[node] = max(self.vector_clock[node], time)
self.data[key] = {
'value': value,
'clock': dict(self.vector_clock)
}
return dict(self.vector_clock)
def read(self, key):
if key in self.data:
return self.data[key]['value'], self.data[key]['clock']
return None, dict(self.vector_clock)
```
### Event Sourcing
```python
# Event sourcing: store events, derive state
from dataclasses import dataclass
from typing import List
from datetime import datetime
@dataclass
class Event:
event_type: str
data: dict
timestamp: datetime
version: int
class EventStore:
def __init__(self):
self.events: List[Event] = []
self.version = 0
def append(self, event_type: str, data: dict):
self.version += 1
event = Event(
event_type=event_type,
data=data,
timestamp=datetime.now(),
version=self.version
)
self.events.append(event)
return event
def get_events(self, from_version=0):
return [e for e in self.events if e.version > from_version]
class BankAccount:
"""Aggregate rebuilt from events"""
def __init__(self, account_id: str, event_store: EventStore):
self.account_id = account_id
self.event_store = event_store
self.balance = 0
self._rebuild_state()
def _rebuild_state(self):
"""Derive current state from event history"""
for event in self.event_store.events:
self._apply(event)
def _apply(self, event: Event):
if event.event_type == 'deposited':
self.balance += event.data['amount']
elif event.event_type == 'withdrawn':
self.balance -= event.data['amount']
def deposit(self, amount: float):
event = self.event_store.append('deposited', {
'account_id': self.account_id,
'amount': amount
})
self._apply(event)
def withdraw(self, amount: float):
if amount > self.balance:
raise ValueError("Insufficient funds")
event = self.event_store.append('withdrawn', {
'account_id': self.account_id,
'amount': amount
})
self._apply(event)
# Benefits:
# 1. Complete audit trail
# 2. Time travel (state at any point)
# 3. Event replay for debugging
# 4. Easy to add new projections
```
### Idempotency Keys
```python
# Idempotency: safe retries without duplicate effects
import uuid
import hashlib
class IdempotentProcessor:
def __init__(self):
self.processed_keys = {} # key -> result
self.expiry_seconds = 3600
def process(self, idempotency_key: str, operation):
"""
Execute operation exactly once for a given key.
Retries with same key return cached result.
"""
# Check if already processed
if idempotency_key in self.processed_keys:
return self.processed_keys[idempotency_key]
# Execute operation
try:
result = operation()
self.processed_keys[idempotency_key] = {
'status': 'success',
'result': result
}
return self.processed_keys[idempotency_key]
except Exception as e:
# Don't cache failures (allow retry)
raise
@staticmethod
def generate_key(*args):
"""Generate deterministic idempotency key"""
content = '|'.join(str(arg) for arg in args)
return hashlib.sha256(content.encode()).hexdigest()
# Usage:
processor = IdempotentProcessor()
def create_payment(user_id, amount, idempotency_key):
def do_payment():
# Actually create the payment
return {'payment_id': str(uuid.uuid4()), 'amount': amount}
return processor.process(idempotency_key, do_payment)
# Client can safely retry:
key = IdempotentProcessor.generate_key(user_id, amount, request_id)
result1 = create_payment(user_id, 100, key)
result2 = create_payment(user_id, 100, key) # Same result, no duplicate payment
```
### Change Data Capture
```python
# CDC: capture database changes as a stream
from typing import Callable, List
from enum import Enum
from dataclasses import dataclass
class OperationType(Enum):
INSERT = 'insert'
UPDATE = 'update'
DELETE = 'delete'
@dataclass
class ChangeEvent:
table: str
operation: OperationType
key: dict
before: dict # For update/delete
after: dict # For insert/update
timestamp: float
sequence: int
class CDCProducer:
"""Publish changes from database write-ahead log"""
def __init__(self, publisher):
self.publisher = publisher
self.sequence = 0
def capture_insert(self, table: str, key: dict, data: dict):
self.sequence += 1
event = ChangeEvent(
table=table,
operation=OperationType.INSERT,
key=key,
before=None,
after=data,
timestamp=time.time(),
sequence=self.sequence
)
self.publisher.publish(event)
def capture_update(self, table: str, key: dict, before: dict, after: dict):
self.sequence += 1
event = ChangeEvent(
table=table,
operation=OperationType.UPDATE,
key=key,
before=before,
after=after,
timestamp=time.time(),
sequence=self.sequence
)
self.publisher.publish(event)
class CDCConsumer:
"""Consume changes and maintain derived view"""
def __init__(self):
self.handlers: dict[str, List[Callable]] = {}
self.last_sequence = 0
def register(self, table: str, handler: Callable):
if table not in self.handlers:
self.handlers[table] = []
self.handlers[table].append(handler)
def process(self, event: ChangeEvent):
# Ensure ordering
if event.sequence <= self.last_sequence:
return # Already processed
if event.table in self.handlers:
for handler in self.handlers[event.table]:
handler(event)
self.last_sequence = event.sequence
# Usage: maintain search index from database changes
def update_search_index(event: ChangeEvent):
if event.operation == OperationType.DELETE:
search_index.delete(event.key)
else:
search_index.index(event.key, event.after)
consumer.register('products', update_search_index)
```
### Stream Processing
```python
# Stream processing patterns
from collections import defaultdict
from typing import Iterator, TypeVar, Callable
T = TypeVar('T')
class StreamProcessor:
"""Stateful stream processing"""
def __init__(self):
self.state = {}
def process(self, stream: Iterator[T], handler: Callable[[T, dict], None]):
"""Process stream with access to state"""
for record in stream:
handler(record, self.state)
yield self.state.copy()
def windowed_count(window_size_seconds: int):
"""Tumbling window aggregation"""
def handler(event, state):
window_start = (event['timestamp'] // window_size_seconds) * window_size_seconds
key = (event['key'], window_start)
if key not in state:
state[key] = {'count': 0, 'window_start': window_start}
state[key]['count'] += 1
# Emit completed windows
current_window = (time.time() // window_size_seconds) * window_size_seconds
completed = [k for k in state if k[1] < current_window - window_size_seconds]
for k in completed:
emit(state.pop(k))
return handler
def exactly_once_processing(processor, checkpointer):
"""
Exactly-once semantics via:
1. Idempotent writes
2. Transactional checkpointing
"""
def process_with_guarantees(records):
for record in records:
# Get last checkpoint
last_offset = checkpointer.get_offset()
if record.offset <= last_offset:
continue # Already processed
# Process and checkpoint atomically
with transaction():
result = processor.process(record)
checkpointer.save_offset(record.offset)
yield result
return process_with_guarantees
```
## Mental Model
Kleppmann approaches data systems by asking:
1. **What are the consistency requirements?** Linearizable, serializable, eventual?
2. **What are the access patterns?** Read-heavy, write-heavy, mixed?
3. **How do we handle failures?** Retries, idempotency, compensation?
4. **How does data evolve?** Schema changes, backward compatibility?
5. **What happens at the edges?** Network partitions, slow nodes?
## Signature Kleppmann Moves
- Event sourcing for auditability
- Idempotency keys for safe retries
- CDC over dual writes
- Understanding isolation levels deeply
- Log-based messaging for durability
- Making trade-offs explicit
This skill teaches designing data-intensive distributed systems in the style of Martin Kleppmann, focusing on clear trade-offs, reliability, and deep understanding of data semantics. It guides architects and engineers through choices around consistency, failure modes, data modeling, and operational guarantees. Use it to evaluate architectures, pick patterns, and produce practical, robust designs for databases, streams, and pipelines.
The skill inspects system requirements and maps them to Kleppmann-style design patterns: consistency models, event sourcing, CDC, idempotency, and log-based messaging. It asks targeted questions about access patterns, failure scenarios, schema evolution, and operational constraints, then recommends concrete code patterns and operational controls. Outputs include architecture trade-offs, recommended primitives, and example code snippets to illustrate implementation choices.
When should I use event sourcing vs a traditional CRUD model?
Use event sourcing when auditability, time-travel, and replayability are priorities. For simple workloads with limited event complexity, CRUD may be simpler; weigh operational cost and query patterns.
Is exactly-once processing achievable?
True exactly-once is hard; aim for idempotent operations plus transactional checkpointing or atomic writes to emulate exactly-once guarantees in practice.