home / skills / aj-geddes / useful-ai-prompts / correlation-tracing

correlation-tracing skill

/skills/correlation-tracing

This skill helps implement correlation IDs and distributed tracing across microservices to improve observability, debugging, and request-flow analysis.

npx playbooks add skill aj-geddes/useful-ai-prompts --skill correlation-tracing

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
12.0 KB
---
name: correlation-tracing
description: Implement distributed tracing with correlation IDs, trace propagation, and span tracking across microservices. Use when debugging distributed systems, monitoring request flows, or implementing observability.
---

# Correlation & Distributed Tracing

## Overview

Implement correlation IDs and distributed tracing to track requests across multiple services and understand system behavior.

## When to Use

- Microservices architectures
- Debugging distributed systems
- Performance monitoring
- Request flow visualization
- Error tracking across services
- Dependency analysis
- Latency optimization

## Implementation Examples

### 1. **Correlation ID Middleware (Express)**

```typescript
import express from 'express';
import { v4 as uuidv4 } from 'uuid';

// Async local storage for context
import { AsyncLocalStorage } from 'async_hooks';

const traceContext = new AsyncLocalStorage<Map<string, any>>();

interface TraceContext {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  serviceName: string;
}

function correlationMiddleware(serviceName: string) {
  return (
    req: express.Request,
    res: express.Response,
    next: express.NextFunction
  ) => {
    // Extract or generate trace ID
    const traceId = req.headers['x-trace-id'] as string || uuidv4();
    const parentSpanId = req.headers['x-span-id'] as string;
    const spanId = uuidv4();

    // Set context
    const context = new Map<string, any>();
    context.set('traceId', traceId);
    context.set('spanId', spanId);
    context.set('parentSpanId', parentSpanId);
    context.set('serviceName', serviceName);

    // Inject trace headers
    res.setHeader('X-Trace-Id', traceId);
    res.setHeader('X-Span-Id', spanId);

    // Run in context
    traceContext.run(context, () => {
      next();
    });
  };
}

// Helper to get current context
function getTraceContext(): TraceContext | null {
  const context = traceContext.getStore();
  if (!context) return null;

  return {
    traceId: context.get('traceId'),
    spanId: context.get('spanId'),
    parentSpanId: context.get('parentSpanId'),
    serviceName: context.get('serviceName')
  };
}

// Enhanced logger with trace context
class TracedLogger {
  log(level: string, message: string, data?: any): void {
    const context = getTraceContext();

    const logEntry = {
      level,
      message,
      ...data,
      ...context,
      timestamp: new Date().toISOString()
    };

    console.log(JSON.stringify(logEntry));
  }

  info(message: string, data?: any): void {
    this.log('info', message, data);
  }

  error(message: string, data?: any): void {
    this.log('error', message, data);
  }

  warn(message: string, data?: any): void {
    this.log('warn', message, data);
  }
}

const logger = new TracedLogger();

// HTTP client with trace propagation
async function tracedFetch(
  url: string,
  options: RequestInit = {}
): Promise<Response> {
  const context = getTraceContext();

  const headers = new Headers(options.headers);

  if (context) {
    headers.set('X-Trace-Id', context.traceId);
    headers.set('X-Span-Id', context.spanId);
    headers.set('X-Parent-Span-Id', context.spanId);
  }

  const startTime = Date.now();

  try {
    const response = await fetch(url, {
      ...options,
      headers
    });

    const duration = Date.now() - startTime;

    logger.info('HTTP request completed', {
      method: options.method || 'GET',
      url,
      statusCode: response.status,
      duration
    });

    return response;
  } catch (error) {
    const duration = Date.now() - startTime;

    logger.error('HTTP request failed', {
      method: options.method || 'GET',
      url,
      error: (error as Error).message,
      duration
    });

    throw error;
  }
}

// Usage
const app = express();

app.use(correlationMiddleware('api-service'));

app.get('/api/users/:id', async (req, res) => {
  logger.info('Fetching user', { userId: req.params.id });

  // Call another service with trace propagation
  const response = await tracedFetch(
    `http://user-service/users/${req.params.id}`
  );

  const data = await response.json();

  logger.info('User fetched successfully');

  res.json(data);
});
```

### 2. **OpenTelemetry Integration**

```typescript
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { Resource } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';

// Configure OpenTelemetry
const sdk = new NodeSDK({
  resource: new Resource({
    [SemanticResourceAttributes.SERVICE_NAME]: 'my-service',
    [SemanticResourceAttributes.SERVICE_VERSION]: '1.0.0',
  }),
  traceExporter: new JaegerExporter({
    endpoint: 'http://localhost:14268/api/traces',
  }),
  instrumentations: [
    getNodeAutoInstrumentations({
      '@opentelemetry/instrumentation-http': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-express': {
        enabled: true,
      },
      '@opentelemetry/instrumentation-pg': {
        enabled: true,
      },
    }),
  ],
});

sdk.start();

// Custom spans
import { trace, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('my-service');

async function processOrder(orderId: string) {
  const span = tracer.startSpan('process_order');

  span.setAttribute('order.id', orderId);

  try {
    // Validate order
    const validateSpan = tracer.startSpan('validate_order', {
      parent: span,
    });

    await validateOrder(orderId);
    validateSpan.setStatus({ code: SpanStatusCode.OK });
    validateSpan.end();

    // Process payment
    const paymentSpan = tracer.startSpan('process_payment', {
      parent: span,
    });

    await processPayment(orderId);
    paymentSpan.setStatus({ code: SpanStatusCode.OK });
    paymentSpan.end();

    span.setStatus({ code: SpanStatusCode.OK });
  } catch (error) {
    span.setStatus({
      code: SpanStatusCode.ERROR,
      message: (error as Error).message,
    });
    span.recordException(error as Error);
    throw error;
  } finally {
    span.end();
  }
}

async function validateOrder(orderId: string) {
  // Validation logic
}

async function processPayment(orderId: string) {
  // Payment logic
}
```

### 3. **Python Distributed Tracing**

```python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import Resource
from flask import Flask, request
import requests
import uuid

# Setup tracing
resource = Resource.create({"service.name": "python-service"})
trace.set_tracer_provider(TracerProvider(resource=resource))

jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Auto-instrument Flask and requests
app = Flask(__name__)
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()

tracer = trace.get_tracer(__name__)

@app.route('/api/orders/<order_id>')
def get_order(order_id):
    # Current span is automatically created by FlaskInstrumentor

    with tracer.start_as_current_span("fetch_order_details") as span:
        span.set_attribute("order.id", order_id)

        # Fetch from database
        with tracer.start_as_current_span("database_query"):
            order = fetch_order_from_db(order_id)

        # Call another service (automatically traced)
        with tracer.start_as_current_span("fetch_user_details"):
            user = requests.get(
                f"http://user-service/users/{order['user_id']}"
            ).json()

        return {
            "order": order,
            "user": user
        }

def fetch_order_from_db(order_id):
    # Database logic
    return {"id": order_id, "user_id": "user123"}

if __name__ == '__main__':
    app.run(port=5000)
```

### 4. **Manual Trace Propagation**

```typescript
interface Span {
  traceId: string;
  spanId: string;
  parentSpanId?: string;
  name: string;
  serviceName: string;
  startTime: number;
  endTime?: number;
  duration?: number;
  tags: Record<string, any>;
  logs: Array<{ timestamp: number; message: string; fields?: any }>;
  status: 'ok' | 'error';
}

class DistributedTracer {
  private spans: Span[] = [];

  startSpan(
    name: string,
    parentSpanId?: string
  ): Span {
    const context = getTraceContext();

    const span: Span = {
      traceId: context?.traceId || uuidv4(),
      spanId: uuidv4(),
      parentSpanId: parentSpanId || context?.parentSpanId,
      name,
      serviceName: context?.serviceName || 'unknown',
      startTime: Date.now(),
      tags: {},
      logs: [],
      status: 'ok'
    };

    this.spans.push(span);
    return span;
  }

  endSpan(span: Span): void {
    span.endTime = Date.now();
    span.duration = span.endTime - span.startTime;

    // Send to tracing backend
    this.reportSpan(span);
  }

  setTag(span: Span, key: string, value: any): void {
    span.tags[key] = value;
  }

  logEvent(span: Span, message: string, fields?: any): void {
    span.logs.push({
      timestamp: Date.now(),
      message,
      fields
    });
  }

  setError(span: Span, error: Error): void {
    span.status = 'error';
    span.tags['error'] = true;
    span.tags['error.message'] = error.message;
    span.tags['error.stack'] = error.stack;
  }

  private async reportSpan(span: Span): Promise<void> {
    // Send to Jaeger, Zipkin, or other backend
    console.log('Reporting span:', JSON.stringify(span, null, 2));

    // In production:
    // await fetch('http://tracing-collector/api/spans', {
    //   method: 'POST',
    //   headers: { 'Content-Type': 'application/json' },
    //   body: JSON.stringify(span)
    // });
  }

  getAllSpans(): Span[] {
    return this.spans;
  }

  getTrace(traceId: string): Span[] {
    return this.spans.filter(s => s.traceId === traceId);
  }
}

const tracer = new DistributedTracer();

// Usage
async function handleRequest() {
  const span = tracer.startSpan('handle_request');

  tracer.setTag(span, 'http.method', 'GET');
  tracer.setTag(span, 'http.url', '/api/users/123');

  try {
    // Database operation
    const dbSpan = tracer.startSpan('database_query', span.spanId);
    tracer.setTag(dbSpan, 'db.type', 'postgresql');
    tracer.setTag(dbSpan, 'db.statement', 'SELECT * FROM users WHERE id = $1');

    await queryDatabase();

    tracer.endSpan(dbSpan);

    // External API call
    const apiSpan = tracer.startSpan('external_api_call', span.spanId);
    tracer.setTag(apiSpan, 'http.url', 'https://api.example.com/data');

    await callExternalAPI();

    tracer.endSpan(apiSpan);

    tracer.logEvent(span, 'Request completed successfully');
    tracer.endSpan(span);
  } catch (error) {
    tracer.setError(span, error as Error);
    tracer.logEvent(span, 'Request failed', { error: (error as Error).message });
    tracer.endSpan(span);
    throw error;
  }
}

async function queryDatabase() {
  await new Promise(resolve => setTimeout(resolve, 100));
}

async function callExternalAPI() {
  await new Promise(resolve => setTimeout(resolve, 200));
}
```

## Best Practices

### ✅ DO
- Generate trace IDs at entry points
- Propagate trace context across services
- Include correlation IDs in logs
- Use structured logging
- Set appropriate span attributes
- Sample traces in high-traffic systems
- Monitor trace collection overhead
- Implement context propagation

### ❌ DON'T
- Skip trace propagation
- Log without correlation context
- Create too many spans
- Store sensitive data in spans
- Block on trace reporting
- Forget error tracking

## Trace Headers

```
X-Trace-Id: trace identifier
X-Span-Id: current span
X-Parent-Span-Id: parent span
X-Sampled: sampling decision
```

## Resources

- [OpenTelemetry](https://opentelemetry.io/)
- [Jaeger Tracing](https://www.jaegertracing.io/)
- [Zipkin](https://zipkin.io/)

Overview

This skill implements distributed tracing with correlation IDs, trace propagation, and span tracking across microservices. It provides middleware, tracing helpers, and examples for OpenTelemetry and manual tracer implementations. Use it to make request flows observable and to connect logs, metrics, and traces for debugging and performance analysis.

How this skill works

At service entry points the skill generates or extracts a trace ID and span IDs, stores them in context, and injects trace headers into outgoing requests. It offers middleware for HTTP frameworks, an enhanced logger that attaches correlation context to log entries, and helper functions or SDK integrations to start/end spans and report them to backends like Jaeger. Optional OpenTelemetry examples show automated instrumentation and custom spans for complex operations.

When to use it

  • Microservices architectures where requests cross service boundaries
  • Debugging distributed failures and tracing error propagation
  • Monitoring request latency and identifying hotspots
  • Visualizing request flows in a tracing backend
  • Ensuring logs include correlation context for investigation

Best practices

  • Generate trace IDs at service entry points and propagate them with headers
  • Include correlation IDs in all logs using structured JSON logging
  • Set meaningful span names and attributes for important operations
  • Sample traces in high-traffic systems and monitor reporting overhead
  • Avoid storing sensitive data in spans and do not block on trace reporting

Example use cases

  • Attach middleware to an Express API to generate X-Trace-Id and X-Span-Id headers
  • Use OpenTelemetry NodeSDK to auto-instrument HTTP, Express, and DB calls and export to Jaeger
  • Instrument a Flask service with OpenTelemetry and propagate traces via requests to downstream services
  • Implement a manual tracer to start/end spans, add tags, log events, and report spans to a collector
  • Enhance application logs with trace context so alerts link back to full traces for faster triage

FAQ

Which headers should I use to propagate trace context?

Use standard headers such as X-Trace-Id, X-Span-Id, X-Parent-Span-Id and X-Sampled, or adopt W3C Trace Context if available.

Should I instrument every function with spans?

No. Focus on meaningful operations (HTTP requests, DB queries, external calls). Avoid too many short-lived spans and use sampling in high-traffic paths.