home / skills / personamanagmentlayer / pcl / microservices-expert
This skill guides scalable microservices design with patterns, service mesh, and distributed transactions to improve resilience and agility.
npx playbooks add skill personamanagmentlayer/pcl --skill microservices-expertReview the files below or copy the command above to add this skill to your agents.
---
name: microservices-expert
version: 1.0.0
description: Expert-level microservices architecture, patterns, service mesh, and distributed systems
category: api
tags: [microservices, distributed-systems, service-mesh, architecture]
allowed-tools:
- Read
- Write
- Edit
---
# Microservices Expert
Expert guidance for microservices architecture, design patterns, service communication, and distributed system challenges.
## Core Concepts
### Microservices Principles
- Single responsibility per service
- Independently deployable
- Decentralized data management
- Infrastructure automation
- Design for failure
- Evolutionary design
### Architecture Patterns
- API Gateway
- Service Discovery
- Circuit Breaker
- Saga Pattern
- Event Sourcing
- CQRS
### Communication
- Synchronous (HTTP/REST, gRPC)
- Asynchronous (Message queues, Events)
- Service mesh
- API composition
- Backend for Frontend (BFF)
## Service Design
```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncio
# Individual Microservice
app = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel):
id: str
user_id: str
items: List[dict]
total: float
status: str
class OrderService:
def __init__(self, inventory_url: str, payment_url: str):
self.inventory_url = inventory_url
self.payment_url = payment_url
self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
"""Check inventory availability with circuit breaker"""
try:
response = await self.client.post(
f"{self.inventory_url}/check",
json={"items": items},
timeout=5.0
)
return response.json()["available"]
except Exception as e:
print(f"Inventory service error: {e}")
raise
@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
"""Process payment with circuit breaker"""
try:
response = await self.client.post(
f"{self.payment_url}/charge",
json={"user_id": user_id, "amount": amount},
timeout=10.0
)
return response.json()
except Exception as e:
print(f"Payment service error: {e}")
raise
async def create_order(self, order: Order) -> Order:
"""Create order with coordination"""
# 1. Check inventory
inventory_available = await self.check_inventory(order.items)
if not inventory_available:
raise HTTPException(400, "Items not available")
# 2. Process payment
payment = await self.process_payment(order.user_id, order.total)
if payment["status"] != "success":
raise HTTPException(400, "Payment failed")
# 3. Reserve inventory
await self.reserve_inventory(order.items)
# 4. Create order record
order.status = "confirmed"
await self.save_order(order)
return order
@app.post("/orders", response_model=Order)
async def create_order(order: Order):
service = OrderService(
inventory_url="http://inventory-service",
payment_url="http://payment-service"
)
return await service.create_order(order)
```
## Saga Pattern (Distributed Transactions)
```python
from enum import Enum
from typing import List, Callable
import asyncio
class SagaStep:
def __init__(self, action: Callable, compensation: Callable):
self.action = action
self.compensation = compensation
class SagaOrchestrator:
"""Orchestrate distributed transactions using Saga pattern"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(self, action: Callable, compensation: Callable):
"""Add a step to the saga"""
self.steps.append(SagaStep(action, compensation))
async def execute(self) -> bool:
"""Execute saga with compensation on failure"""
try:
# Execute all steps
for step in self.steps:
result = await step.action()
self.completed_steps.append(step)
if not result:
await self.compensate()
return False
return True
except Exception as e:
print(f"Saga failed: {e}")
await self.compensate()
return False
async def compensate(self):
"""Rollback completed steps"""
# Execute compensations in reverse order
for step in reversed(self.completed_steps):
try:
await step.compensation()
except Exception as e:
print(f"Compensation failed: {e}")
# Example: Order Saga
class OrderSaga:
async def create_order_with_saga(self, order_data: dict):
saga = SagaOrchestrator()
# Step 1: Reserve inventory
saga.add_step(
action=lambda: self.reserve_inventory(order_data["items"]),
compensation=lambda: self.release_inventory(order_data["items"])
)
# Step 2: Process payment
saga.add_step(
action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
)
# Step 3: Create order
saga.add_step(
action=lambda: self.create_order_record(order_data),
compensation=lambda: self.delete_order_record(order_data["id"])
)
# Execute saga
success = await saga.execute()
if success:
await self.send_confirmation(order_data["user_id"])
return {"status": "success", "order_id": order_data["id"]}
else:
return {"status": "failed", "message": "Order creation failed"}
```
## Service Discovery
```python
import consul
from typing import List, Optional
import random
class ServiceRegistry:
"""Service discovery using Consul"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, service_name: str, service_id: str,
host: str, port: int, tags: List[str] = None):
"""Register service with Consul"""
self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=host,
port=port,
tags=tags or [],
check=consul.Check.http(
f"http://{host}:{port}/health",
interval="10s",
timeout="5s"
)
)
def deregister_service(self, service_id: str):
"""Deregister service"""
self.consul.agent.service.deregister(service_id)
def discover_service(self, service_name: str) -> Optional[dict]:
"""Discover healthy service instance"""
_, services = self.consul.health.service(service_name, passing=True)
if not services:
return None
# Load balance: random selection
service = random.choice(services)
return {
"id": service["Service"]["ID"],
"address": service["Service"]["Address"],
"port": service["Service"]["Port"],
"tags": service["Service"]["Tags"]
}
def get_all_services(self, service_name: str) -> List[dict]:
"""Get all healthy instances of a service"""
_, services = self.consul.health.service(service_name, passing=True)
return [
{
"id": s["Service"]["ID"],
"address": s["Service"]["Address"],
"port": s["Service"]["Port"]
}
for s in services
]
```
## API Gateway
```python
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt
app = FastAPI(title="API Gateway")
class APIGateway:
"""API Gateway for routing and cross-cutting concerns"""
def __init__(self):
self.service_registry = ServiceRegistry()
self.client = httpx.AsyncClient()
async def route_request(self, service: str, path: str,
method: str, **kwargs) -> Response:
"""Route request to appropriate microservice"""
# Discover service
service_info = self.service_registry.discover_service(service)
if not service_info:
return Response(
content={"error": "Service unavailable"},
status_code=503
)
# Build URL
url = f"http://{service_info['address']}:{service_info['port']}{path}"
# Forward request
response = await self.client.request(method, url, **kwargs)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
async def authenticate(self, token: str) -> Optional[dict]:
"""Centralized authentication"""
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except jwt.JWTError:
return None
async def rate_limit(self, client_id: str) -> bool:
"""Centralized rate limiting"""
# Implementation using Redis
pass
# Gateway endpoints
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_route(service: str, path: str, request: Request):
gateway = APIGateway()
# Authentication
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = await gateway.authenticate(token)
if not user:
return Response(content={"error": "Unauthorized"}, status_code=401)
# Rate limiting
if not await gateway.rate_limit(user["id"]):
return Response(content={"error": "Rate limit exceeded"}, status_code=429)
# Route request
return await gateway.route_request(
service=service,
path=f"/{path}",
method=request.method,
headers=dict(request.headers),
content=await request.body()
)
```
## Event-Driven Architecture
```python
import pika
import json
from typing import Callable, Dict
import asyncio
class EventBus:
"""Message broker for event-driven communication"""
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.handlers: Dict[str, Callable] = {}
def publish_event(self, event_type: str, data: dict):
"""Publish event to all subscribers"""
self.channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
message = json.dumps({
"event_type": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
})
self.channel.basic_publish(
exchange='events',
routing_key=event_type,
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to event type"""
self.handlers[event_type] = handler
# Declare queue
queue_name = f"{event_type}_queue"
self.channel.queue_declare(queue=queue_name, durable=True)
# Bind queue to exchange
self.channel.queue_bind(
queue=queue_name,
exchange='events',
routing_key=event_type
)
# Start consuming
def callback(ch, method, properties, body):
message = json.loads(body)
handler(message["data"])
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
def start_consuming(self):
"""Start consuming events"""
self.channel.start_consuming()
# Usage Example
event_bus = EventBus("amqp://localhost")
# Service A publishes event
event_bus.publish_event("order.created", {
"order_id": "12345",
"user_id": "user_1",
"total": 99.99
})
# Service B subscribes to event
def handle_order_created(data):
print(f"Processing order: {data['order_id']}")
# Send email, update inventory, etc.
event_bus.subscribe("order.created", handle_order_created)
```
## Best Practices
### Design
- Keep services small and focused
- Design for failure (circuit breakers, retries)
- Use asynchronous communication when possible
- Implement proper service boundaries
- Avoid distributed monoliths
- Use API versioning
- Implement health checks
### Data Management
- Database per service
- Use eventual consistency
- Implement saga pattern for distributed transactions
- Use event sourcing for audit trails
- Cache aggressively
- Avoid distributed joins
### Operations
- Implement distributed tracing
- Centralized logging
- Monitor service health
- Automate deployments
- Use service mesh for cross-cutting concerns
- Implement feature flags
- Practice chaos engineering
## Anti-Patterns
❌ Distributed monolith
❌ Shared database between services
❌ Synchronous communication everywhere
❌ No service versioning
❌ Tight coupling between services
❌ No circuit breakers
❌ Missing distributed tracing
## Resources
- Microservices Patterns: https://microservices.io/patterns/
- Building Microservices (book)
- Service Mesh: https://istio.io/
- Consul: https://www.consul.io/
- RabbitMQ: https://www.rabbitmq.com/
This skill provides expert-level guidance for designing, building, and operating microservices and distributed systems. It focuses on architecture patterns, service communication, transactional strategies, and operational concerns to help teams deliver resilient, scalable systems. Practical patterns and anti-patterns are emphasized for immediate application.
The skill inspects architecture choices and recommends patterns like API Gateway, service discovery, circuit breakers, sagas, CQRS, and event sourcing. It evaluates communication strategies (synchronous vs asynchronous), data ownership, and operational controls such as tracing, health checks, and service mesh. Recommendations include code-level coordination patterns and pragmatic trade-offs for reliability and consistency.
When should I choose event-driven architecture over synchronous APIs?
Use event-driven approaches when you need loose coupling, higher resilience, and eventual consistency. Prefer synchronous APIs when you need immediate consistency or low-latency request/response semantics.
How do I handle distributed transactions across services?
Avoid two-phase commits. Implement the Saga pattern with compensating actions or use event sourcing and CQRS to achieve eventual consistency while keeping services autonomous.