home / skills / markus41 / claude / fastapi-observability

This skill helps you implement production-ready observability for FastAPI with structured logging, metrics, and tracing.

npx playbooks add skill markus41/claude --skill fastapi-observability

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
11.3 KB
---
name: FastAPI Observability
description: This skill should be used when the user asks to "add logging", "implement metrics", "add tracing", "configure Prometheus", "setup OpenTelemetry", "add health checks", "monitor API", or mentions observability, APM, monitoring, structured logging, distributed tracing, or Grafana. Provides comprehensive observability patterns.
version: 0.1.0
---

# FastAPI Observability

This skill provides production-ready observability patterns including structured logging, Prometheus metrics, and OpenTelemetry tracing.

## Structured Logging

### Configuration with structlog

```python
# app/core/logging.py
import structlog
import logging
import sys
from typing import Any

def setup_logging(log_level: str = "INFO", json_logs: bool = True):
    """Configure structured logging."""

    # Shared processors
    shared_processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
    ]

    if json_logs:
        # JSON format for production
        processors = shared_processors + [
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer()
        ]
    else:
        # Console format for development
        processors = shared_processors + [
            structlog.dev.ConsoleRenderer()
        ]

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level.upper())
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

def get_logger(name: str = None) -> structlog.BoundLogger:
    return structlog.get_logger(name)
```

### Request Logging Middleware

```python
# app/middleware/logging.py
import time
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import structlog

logger = structlog.get_logger()

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = str(uuid.uuid4())
        start_time = time.perf_counter()

        # Bind context for all logs in this request
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
            client_ip=request.client.host if request.client else None
        )

        # Add request ID to response headers
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id

        # Calculate duration
        duration_ms = (time.perf_counter() - start_time) * 1000

        # Log request completion
        logger.info(
            "request_completed",
            status_code=response.status_code,
            duration_ms=round(duration_ms, 2),
            content_length=response.headers.get("content-length")
        )

        return response
```

### Application Logging

```python
from app.core.logging import get_logger

logger = get_logger(__name__)

async def create_user(data: UserCreate) -> User:
    logger.info("creating_user", email=data.email)

    try:
        user = await User(**data.model_dump()).insert()
        logger.info("user_created", user_id=str(user.id))
        return user
    except Exception as e:
        logger.error("user_creation_failed", error=str(e), email=data.email)
        raise
```

## Prometheus Metrics

### Setup with prometheus-fastapi-instrumentator

```python
# app/core/metrics.py
from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps

# Custom metrics
REQUEST_COUNT = Counter(
    "app_requests_total",
    "Total request count",
    ["method", "endpoint", "status"]
)

REQUEST_LATENCY = Histogram(
    "app_request_latency_seconds",
    "Request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

ACTIVE_REQUESTS = Gauge(
    "app_active_requests",
    "Number of active requests"
)

DB_QUERY_LATENCY = Histogram(
    "app_db_query_latency_seconds",
    "Database query latency",
    ["operation", "collection"]
)

CACHE_HITS = Counter(
    "app_cache_hits_total",
    "Cache hit count",
    ["cache_name"]
)

CACHE_MISSES = Counter(
    "app_cache_misses_total",
    "Cache miss count",
    ["cache_name"]
)

def setup_metrics(app):
    """Setup Prometheus metrics instrumentation."""
    Instrumentator().instrument(app).expose(app, endpoint="/metrics")
```

### Custom Metric Decorators

```python
import time
from functools import wraps

def track_db_query(operation: str, collection: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                return await func(*args, **kwargs)
            finally:
                duration = time.perf_counter() - start
                DB_QUERY_LATENCY.labels(
                    operation=operation,
                    collection=collection
                ).observe(duration)
        return wrapper
    return decorator

def track_cache(cache_name: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            result = await func(*args, **kwargs)
            if result is not None:
                CACHE_HITS.labels(cache_name=cache_name).inc()
            else:
                CACHE_MISSES.labels(cache_name=cache_name).inc()
            return result
        return wrapper
    return decorator

# Usage
class UserRepository:
    @track_db_query("find", "users")
    async def find_by_id(self, user_id: str):
        return await User.get(user_id)

    @track_cache("users")
    async def get_cached(self, user_id: str):
        return await cache.get(f"user:{user_id}")
```

## OpenTelemetry Tracing

### Setup

```python
# app/core/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource

def setup_tracing(app, service_name: str, otlp_endpoint: str):
    """Setup OpenTelemetry tracing."""

    # Create resource
    resource = Resource.create({
        "service.name": service_name,
        "service.version": "1.0.0",
    })

    # Setup tracer provider
    provider = TracerProvider(resource=resource)

    # Add OTLP exporter
    otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
    processor = BatchSpanProcessor(otlp_exporter)
    provider.add_span_processor(processor)

    trace.set_tracer_provider(provider)

    # Instrument FastAPI
    FastAPIInstrumentor.instrument_app(app)

    # Instrument HTTP client
    HTTPXClientInstrumentor().instrument()

    # Instrument Redis
    RedisInstrumentor().instrument()

def get_tracer(name: str) -> trace.Tracer:
    return trace.get_tracer(name)
```

### Custom Spans

```python
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

async def process_order(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)

        try:
            # Validate order
            with tracer.start_as_current_span("validate_order"):
                order = await validate_order(order_id)
                span.set_attribute("order.total", order.total)

            # Process payment
            with tracer.start_as_current_span("process_payment"):
                payment = await process_payment(order)
                span.set_attribute("payment.id", payment.id)

            # Update inventory
            with tracer.start_as_current_span("update_inventory"):
                await update_inventory(order.items)

            span.set_status(Status(StatusCode.OK))
            return order

        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise
```

## Health Check Endpoints

```python
# app/routes/health.py
from fastapi import APIRouter, Response
from typing import Dict, Any
from enum import Enum

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

router = APIRouter(tags=["Health"])

@router.get("/health")
async def health() -> Dict[str, str]:
    """Kubernetes liveness probe."""
    return {"status": "ok"}

@router.get("/health/ready")
async def ready(
    db: Database = Depends(get_db),
    cache: RedisCache = Depends(get_cache)
) -> Response:
    """Kubernetes readiness probe with dependency checks."""
    checks = {}
    status = HealthStatus.HEALTHY

    # MongoDB check
    try:
        await db.command("ping")
        checks["mongodb"] = {"status": "ok", "latency_ms": 0}
    except Exception as e:
        checks["mongodb"] = {"status": "error", "error": str(e)}
        status = HealthStatus.UNHEALTHY

    # Redis check
    try:
        start = time.perf_counter()
        await cache.client.ping()
        latency = (time.perf_counter() - start) * 1000
        checks["redis"] = {"status": "ok", "latency_ms": round(latency, 2)}
    except Exception as e:
        checks["redis"] = {"status": "error", "error": str(e)}
        status = HealthStatus.DEGRADED  # Cache failure = degraded

    response_data = {
        "status": status.value,
        "checks": checks,
        "timestamp": datetime.utcnow().isoformat()
    }

    status_code = 200 if status != HealthStatus.UNHEALTHY else 503
    return Response(
        content=json.dumps(response_data),
        status_code=status_code,
        media_type="application/json"
    )

@router.get("/health/live")
async def live() -> Dict[str, str]:
    """Simple liveness check."""
    return {"status": "alive"}
```

## Application Integration

```python
from fastapi import FastAPI
from app.core.logging import setup_logging
from app.core.metrics import setup_metrics
from app.core.tracing import setup_tracing

def create_app() -> FastAPI:
    # Setup logging first
    setup_logging(
        log_level=settings.log_level,
        json_logs=settings.environment == "production"
    )

    app = FastAPI(title="API Service")

    # Setup metrics
    setup_metrics(app)

    # Setup tracing
    if settings.otlp_endpoint:
        setup_tracing(
            app,
            service_name="api-service",
            otlp_endpoint=settings.otlp_endpoint
        )

    # Add middleware
    app.add_middleware(RequestLoggingMiddleware)

    return app
```

## Additional Resources

### Reference Files

For detailed configuration:
- **`references/grafana-dashboards.md`** - Grafana dashboard JSON
- **`references/alerting.md`** - Prometheus alerting rules
- **`references/elk-setup.md`** - Elasticsearch/Kibana log aggregation

### Example Files

Working examples in `examples/`:
- **`examples/logging_config.py`** - Complete logging setup
- **`examples/metrics_middleware.py`** - Custom metrics middleware
- **`examples/tracing_service.py`** - Service with tracing

Overview

This skill provides a production-ready observability blueprint for FastAPI applications, covering structured logging, Prometheus metrics, OpenTelemetry tracing, and health checks. It delivers concrete code patterns, middleware, and integration guidance to make APIs measurable, debuggable, and monitorable in production.

How this skill works

The skill supplies reusable modules and middleware: structured logging with context propagation and request IDs, Prometheus metrics instrumentation and custom metric decorators, and OpenTelemetry tracing with automatic instrumentation and manual spans. It also includes health and readiness endpoints that run dependency checks and return appropriate status codes for liveness and readiness probes.

When to use it

  • Add structured, JSON-ready logs with per-request context and request IDs.
  • Expose Prometheus metrics and add custom counters, histograms, and gauges.
  • Instrument FastAPI, HTTP clients, and Redis with OpenTelemetry and export spans to OTLP.
  • Implement Kubernetes liveness and readiness probes with dependency checks.
  • Setup observability during production rollout, incident response, or SRE onboarding.

Best practices

  • Initialize logging before app creation so all components inherit structured logging configuration.
  • Use context-bound request IDs and bind them to logs and response headers for traceability.
  • Expose /metrics on a secure, internal path and use Prometheus scrape configs to limit exposure.
  • Instrument both automatic (framework) and manual spans to capture business-critical operations.
  • Keep health checks fast and idempotent; classify failures as degraded vs unhealthy for correct probe behavior.

Example use cases

  • Add request logging middleware that binds request_id, method, path, and client_ip and appends X-Request-ID to responses.
  • Instrument DB and cache operations with decorators that observe latency and increment hit/miss counters.
  • Configure OpenTelemetry with an OTLP exporter, instrument FastAPI, HTTPX, and Redis, and create manual spans for multi-step business flows.
  • Expose /health, /health/ready, and /health/live endpoints to support Kubernetes liveness/readiness probes.
  • Create Grafana dashboards and Prometheus alerting rules using exported metrics and provided dashboard templates.

FAQ

Do I need separate instrumentation for development and production?

Yes. Use console-readable logs and reduced sampling locally, and enable JSON logs, full metrics, and OTLP export in production.

How do I correlate logs and traces?

Bind the request ID into structured logs and set it as a trace/span attribute. Ensure your tracing exporter and log aggregator index the same request or trace IDs.

Where should /metrics be exposed?

Expose /metrics on an internal network interface or behind a sidecar/proxy. Configure Prometheus to scrape it rather than making it publicly accessible.