home / skills / personamanagmentlayer / pcl / microservices-expert

microservices-expert skill

/stdlib/api/microservices-expert

This skill guides scalable microservices design with patterns, service mesh, and distributed transactions to improve resilience and agility.

npx playbooks add skill personamanagmentlayer/pcl --skill microservices-expert

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
13.7 KB
---
name: microservices-expert
version: 1.0.0
description: Expert-level microservices architecture, patterns, service mesh, and distributed systems
category: api
tags: [microservices, distributed-systems, service-mesh, architecture]
allowed-tools:
  - Read
  - Write
  - Edit
---

# Microservices Expert

Expert guidance for microservices architecture, design patterns, service communication, and distributed system challenges.

## Core Concepts

### Microservices Principles
- Single responsibility per service
- Independently deployable
- Decentralized data management
- Infrastructure automation
- Design for failure
- Evolutionary design

### Architecture Patterns
- API Gateway
- Service Discovery
- Circuit Breaker
- Saga Pattern
- Event Sourcing
- CQRS

### Communication
- Synchronous (HTTP/REST, gRPC)
- Asynchronous (Message queues, Events)
- Service mesh
- API composition
- Backend for Frontend (BFF)

## Service Design

```python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncio

# Individual Microservice
app = FastAPI(title="Order Service", version="1.0.0")

class Order(BaseModel):
    id: str
    user_id: str
    items: List[dict]
    total: float
    status: str

class OrderService:
    def __init__(self, inventory_url: str, payment_url: str):
        self.inventory_url = inventory_url
        self.payment_url = payment_url
        self.client = httpx.AsyncClient()

    @circuit(failure_threshold=5, recovery_timeout=60)
    async def check_inventory(self, items: List[dict]) -> bool:
        """Check inventory availability with circuit breaker"""
        try:
            response = await self.client.post(
                f"{self.inventory_url}/check",
                json={"items": items},
                timeout=5.0
            )
            return response.json()["available"]
        except Exception as e:
            print(f"Inventory service error: {e}")
            raise

    @circuit(failure_threshold=5, recovery_timeout=60)
    async def process_payment(self, user_id: str, amount: float) -> dict:
        """Process payment with circuit breaker"""
        try:
            response = await self.client.post(
                f"{self.payment_url}/charge",
                json={"user_id": user_id, "amount": amount},
                timeout=10.0
            )
            return response.json()
        except Exception as e:
            print(f"Payment service error: {e}")
            raise

    async def create_order(self, order: Order) -> Order:
        """Create order with coordination"""
        # 1. Check inventory
        inventory_available = await self.check_inventory(order.items)
        if not inventory_available:
            raise HTTPException(400, "Items not available")

        # 2. Process payment
        payment = await self.process_payment(order.user_id, order.total)
        if payment["status"] != "success":
            raise HTTPException(400, "Payment failed")

        # 3. Reserve inventory
        await self.reserve_inventory(order.items)

        # 4. Create order record
        order.status = "confirmed"
        await self.save_order(order)

        return order

@app.post("/orders", response_model=Order)
async def create_order(order: Order):
    service = OrderService(
        inventory_url="http://inventory-service",
        payment_url="http://payment-service"
    )
    return await service.create_order(order)
```

## Saga Pattern (Distributed Transactions)

```python
from enum import Enum
from typing import List, Callable
import asyncio

class SagaStep:
    def __init__(self, action: Callable, compensation: Callable):
        self.action = action
        self.compensation = compensation

class SagaOrchestrator:
    """Orchestrate distributed transactions using Saga pattern"""

    def __init__(self):
        self.steps: List[SagaStep] = []
        self.completed_steps: List[SagaStep] = []

    def add_step(self, action: Callable, compensation: Callable):
        """Add a step to the saga"""
        self.steps.append(SagaStep(action, compensation))

    async def execute(self) -> bool:
        """Execute saga with compensation on failure"""
        try:
            # Execute all steps
            for step in self.steps:
                result = await step.action()
                self.completed_steps.append(step)

                if not result:
                    await self.compensate()
                    return False

            return True

        except Exception as e:
            print(f"Saga failed: {e}")
            await self.compensate()
            return False

    async def compensate(self):
        """Rollback completed steps"""
        # Execute compensations in reverse order
        for step in reversed(self.completed_steps):
            try:
                await step.compensation()
            except Exception as e:
                print(f"Compensation failed: {e}")

# Example: Order Saga
class OrderSaga:
    async def create_order_with_saga(self, order_data: dict):
        saga = SagaOrchestrator()

        # Step 1: Reserve inventory
        saga.add_step(
            action=lambda: self.reserve_inventory(order_data["items"]),
            compensation=lambda: self.release_inventory(order_data["items"])
        )

        # Step 2: Process payment
        saga.add_step(
            action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
            compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
        )

        # Step 3: Create order
        saga.add_step(
            action=lambda: self.create_order_record(order_data),
            compensation=lambda: self.delete_order_record(order_data["id"])
        )

        # Execute saga
        success = await saga.execute()

        if success:
            await self.send_confirmation(order_data["user_id"])
            return {"status": "success", "order_id": order_data["id"]}
        else:
            return {"status": "failed", "message": "Order creation failed"}
```

## Service Discovery

```python
import consul
from typing import List, Optional
import random

class ServiceRegistry:
    """Service discovery using Consul"""

    def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
        self.consul = consul.Consul(host=consul_host, port=consul_port)

    def register_service(self, service_name: str, service_id: str,
                        host: str, port: int, tags: List[str] = None):
        """Register service with Consul"""
        self.consul.agent.service.register(
            name=service_name,
            service_id=service_id,
            address=host,
            port=port,
            tags=tags or [],
            check=consul.Check.http(
                f"http://{host}:{port}/health",
                interval="10s",
                timeout="5s"
            )
        )

    def deregister_service(self, service_id: str):
        """Deregister service"""
        self.consul.agent.service.deregister(service_id)

    def discover_service(self, service_name: str) -> Optional[dict]:
        """Discover healthy service instance"""
        _, services = self.consul.health.service(service_name, passing=True)

        if not services:
            return None

        # Load balance: random selection
        service = random.choice(services)

        return {
            "id": service["Service"]["ID"],
            "address": service["Service"]["Address"],
            "port": service["Service"]["Port"],
            "tags": service["Service"]["Tags"]
        }

    def get_all_services(self, service_name: str) -> List[dict]:
        """Get all healthy instances of a service"""
        _, services = self.consul.health.service(service_name, passing=True)

        return [
            {
                "id": s["Service"]["ID"],
                "address": s["Service"]["Address"],
                "port": s["Service"]["Port"]
            }
            for s in services
        ]
```

## API Gateway

```python
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt

app = FastAPI(title="API Gateway")

class APIGateway:
    """API Gateway for routing and cross-cutting concerns"""

    def __init__(self):
        self.service_registry = ServiceRegistry()
        self.client = httpx.AsyncClient()

    async def route_request(self, service: str, path: str,
                           method: str, **kwargs) -> Response:
        """Route request to appropriate microservice"""
        # Discover service
        service_info = self.service_registry.discover_service(service)

        if not service_info:
            return Response(
                content={"error": "Service unavailable"},
                status_code=503
            )

        # Build URL
        url = f"http://{service_info['address']}:{service_info['port']}{path}"

        # Forward request
        response = await self.client.request(method, url, **kwargs)

        return Response(
            content=response.content,
            status_code=response.status_code,
            headers=dict(response.headers)
        )

    async def authenticate(self, token: str) -> Optional[dict]:
        """Centralized authentication"""
        try:
            payload = jwt.decode(token, "secret", algorithms=["HS256"])
            return payload
        except jwt.JWTError:
            return None

    async def rate_limit(self, client_id: str) -> bool:
        """Centralized rate limiting"""
        # Implementation using Redis
        pass

# Gateway endpoints
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_route(service: str, path: str, request: Request):
    gateway = APIGateway()

    # Authentication
    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    user = await gateway.authenticate(token)

    if not user:
        return Response(content={"error": "Unauthorized"}, status_code=401)

    # Rate limiting
    if not await gateway.rate_limit(user["id"]):
        return Response(content={"error": "Rate limit exceeded"}, status_code=429)

    # Route request
    return await gateway.route_request(
        service=service,
        path=f"/{path}",
        method=request.method,
        headers=dict(request.headers),
        content=await request.body()
    )
```

## Event-Driven Architecture

```python
import pika
import json
from typing import Callable, Dict
import asyncio

class EventBus:
    """Message broker for event-driven communication"""

    def __init__(self, rabbitmq_url: str):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.handlers: Dict[str, Callable] = {}

    def publish_event(self, event_type: str, data: dict):
        """Publish event to all subscribers"""
        self.channel.exchange_declare(
            exchange='events',
            exchange_type='topic',
            durable=True
        )

        message = json.dumps({
            "event_type": event_type,
            "data": data,
            "timestamp": datetime.now().isoformat()
        })

        self.channel.basic_publish(
            exchange='events',
            routing_key=event_type,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2  # persistent
            )
        )

    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to event type"""
        self.handlers[event_type] = handler

        # Declare queue
        queue_name = f"{event_type}_queue"
        self.channel.queue_declare(queue=queue_name, durable=True)

        # Bind queue to exchange
        self.channel.queue_bind(
            queue=queue_name,
            exchange='events',
            routing_key=event_type
        )

        # Start consuming
        def callback(ch, method, properties, body):
            message = json.loads(body)
            handler(message["data"])
            ch.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback
        )

    def start_consuming(self):
        """Start consuming events"""
        self.channel.start_consuming()

# Usage Example
event_bus = EventBus("amqp://localhost")

# Service A publishes event
event_bus.publish_event("order.created", {
    "order_id": "12345",
    "user_id": "user_1",
    "total": 99.99
})

# Service B subscribes to event
def handle_order_created(data):
    print(f"Processing order: {data['order_id']}")
    # Send email, update inventory, etc.

event_bus.subscribe("order.created", handle_order_created)
```

## Best Practices

### Design
- Keep services small and focused
- Design for failure (circuit breakers, retries)
- Use asynchronous communication when possible
- Implement proper service boundaries
- Avoid distributed monoliths
- Use API versioning
- Implement health checks

### Data Management
- Database per service
- Use eventual consistency
- Implement saga pattern for distributed transactions
- Use event sourcing for audit trails
- Cache aggressively
- Avoid distributed joins

### Operations
- Implement distributed tracing
- Centralized logging
- Monitor service health
- Automate deployments
- Use service mesh for cross-cutting concerns
- Implement feature flags
- Practice chaos engineering

## Anti-Patterns

❌ Distributed monolith
❌ Shared database between services
❌ Synchronous communication everywhere
❌ No service versioning
❌ Tight coupling between services
❌ No circuit breakers
❌ Missing distributed tracing

## Resources

- Microservices Patterns: https://microservices.io/patterns/
- Building Microservices (book)
- Service Mesh: https://istio.io/
- Consul: https://www.consul.io/
- RabbitMQ: https://www.rabbitmq.com/

Overview

This skill provides expert-level guidance for designing, building, and operating microservices and distributed systems. It focuses on architecture patterns, service communication, transactional strategies, and operational concerns to help teams deliver resilient, scalable systems. Practical patterns and anti-patterns are emphasized for immediate application.

How this skill works

The skill inspects architecture choices and recommends patterns like API Gateway, service discovery, circuit breakers, sagas, CQRS, and event sourcing. It evaluates communication strategies (synchronous vs asynchronous), data ownership, and operational controls such as tracing, health checks, and service mesh. Recommendations include code-level coordination patterns and pragmatic trade-offs for reliability and consistency.

When to use it

  • Designing new systems that must scale independently and deploy services separately
  • Refactoring a monolith into loosely coupled services with clear boundaries
  • Implementing distributed transactions or eventual consistency across services
  • Introducing observability, resilience, or a service mesh into production
  • Choosing between synchronous APIs and event-driven async flows

Best practices

  • Give each service a single responsibility and its own data store
  • Design for failure: use circuit breakers, timeouts, retries, and bulkheads
  • Prefer asynchronous events for decoupling and scalability where possible
  • Apply Saga pattern or compensating actions for distributed transactions
  • Centralize cross-cutting concerns via API Gateway or service mesh
  • Automate deployments, health checks, tracing, and centralized logging

Example use cases

  • Order processing: coordinate inventory, payment, and order record using a saga
  • API Gateway: centralize auth, rate limiting, and routing to backend services
  • Service discovery: register and discover instances with Consul for dynamic routing
  • Event-driven workflows: publish domain events for downstream processing and integration
  • Resilience hardening: add circuit breakers and retries around external service calls

FAQ

When should I choose event-driven architecture over synchronous APIs?

Use event-driven approaches when you need loose coupling, higher resilience, and eventual consistency. Prefer synchronous APIs when you need immediate consistency or low-latency request/response semantics.

How do I handle distributed transactions across services?

Avoid two-phase commits. Implement the Saga pattern with compensating actions or use event sourcing and CQRS to achieve eventual consistency while keeping services autonomous.