home / skills / anton-abyzov / specweave / n8n-kafka-workflows

n8n-kafka-workflows skill

/plugins/specweave-n8n/skills/n8n-kafka-workflows

This skill helps you connect n8n with Kafka for event-driven workflows, enabling reliable triggers, producers, error handling, and no-code integration.

This is most likely a fork of the sw-n8n-kafka-workflows skill from openclaw
npx playbooks add skill anton-abyzov/specweave --skill n8n-kafka-workflows

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
12.5 KB
---
name: n8n-kafka-workflows
description: n8n workflow automation with Kafka integration expert. Covers Kafka trigger node, producer node, event-driven workflows, error handling, retries, and no-code/low-code event processing patterns. Activates for n8n kafka, kafka trigger, kafka producer, n8n workflows, event-driven automation, no-code kafka, workflow patterns.
---

# n8n Kafka Workflows Skill

Expert knowledge of integrating Apache Kafka with n8n workflow automation platform for no-code/low-code event-driven processing.

## What I Know

### n8n Kafka Nodes

**Kafka Trigger Node** (Event Consumer):
- Triggers workflow on new Kafka messages
- Supports consumer groups
- Auto-commit or manual offset management
- Multiple topic subscription
- Message batching

**Kafka Producer Node** (Event Publisher):
- Sends messages to Kafka topics
- Supports key-based partitioning
- Header support
- Compression (gzip, snappy, lz4)
- Batch sending

**Configuration**:
```json
{
  "credentials": {
    "kafkaApi": {
      "brokers": "localhost:9092",
      "clientId": "n8n-workflow",
      "ssl": false,
      "sasl": {
        "mechanism": "plain",
        "username": "{{$env.KAFKA_USER}}",
        "password": "{{$env.KAFKA_PASSWORD}}"
      }
    }
  }
}
```

## When to Use This Skill

Activate me when you need help with:
- n8n Kafka setup ("Configure Kafka trigger in n8n")
- Workflow patterns ("Event-driven automation with n8n")
- Error handling ("Retry failed Kafka messages")
- Integration patterns ("Enrich Kafka events with HTTP API")
- Producer configuration ("Send messages to Kafka from n8n")
- Consumer groups ("Process Kafka events in parallel")

## Common Workflow Patterns

### Pattern 1: Event-Driven Processing

**Use Case**: Process Kafka events with HTTP API enrichment

```
[Kafka Trigger] → [HTTP Request] → [Transform] → [Database]
     ↓
  orders topic
     ↓
  Get customer data
     ↓
  Merge order + customer
     ↓
  Save to PostgreSQL
```

**n8n Workflow**:
1. **Kafka Trigger**:
   - Topic: `orders`
   - Consumer Group: `order-processor`
   - Offset: `latest`

2. **HTTP Request** (Enrich):
   - URL: `https://api.example.com/customers/{{$json.customerId}}`
   - Method: GET
   - Headers: `Authorization: Bearer {{$env.API_TOKEN}}`

3. **Set Node** (Transform):
   ```javascript
   return {
     orderId: $json.order.id,
     customerId: $json.order.customerId,
     customerName: $json.customer.name,
     customerEmail: $json.customer.email,
     total: $json.order.total,
     timestamp: new Date().toISOString()
   };
   ```

4. **PostgreSQL** (Save):
   - Operation: INSERT
   - Table: `enriched_orders`
   - Columns: Mapped from Set node

### Pattern 2: Fan-Out (Publish to Multiple Topics)

**Use Case**: Single event triggers multiple downstream workflows

```
[Kafka Trigger] → [Switch] → [Kafka Producer] (topic: high-value-orders)
     ↓                ↓
  orders topic        └─→ [Kafka Producer] (topic: all-orders)
                           └─→ [Kafka Producer] (topic: analytics)
```

**n8n Workflow**:
1. **Kafka Trigger**: Consume `orders`
2. **Switch Node**: Route by `total` value
   - Route 1: `total > 1000` → `high-value-orders` topic
   - Route 2: Always → `all-orders` topic
   - Route 3: Always → `analytics` topic
3. **Kafka Producer** (x3): Send to respective topics

### Pattern 3: Retry with Dead Letter Queue (DLQ)

**Use Case**: Retry failed messages, send to DLQ after 3 attempts

```
[Kafka Trigger] → [Try/Catch] → [Success] → [Kafka Producer] (topic: processed)
     ↓                ↓
  input topic     [Catch Error]
                       ↓
                  [Increment Retry Count]
                       ↓
                  [If Retry < 3]
                       ↓ Yes
                  [Kafka Producer] (topic: input-retry)
                       ↓ No
                  [Kafka Producer] (topic: dlq)
```

**n8n Workflow**:
1. **Kafka Trigger**: `input` topic
2. **Try Node**: HTTP Request (may fail)
3. **Catch Node** (Error Handler):
   - Get retry count from message headers
   - Increment retry count
   - If retry < 3: Send to `input-retry` topic
   - Else: Send to `dlq` topic

### Pattern 4: Batch Processing with Aggregation

**Use Case**: Aggregate 100 events, send batch to API

```
[Kafka Trigger] → [Aggregate] → [HTTP Request] → [Kafka Producer]
     ↓               ↓
  events topic   Buffer 100 msgs
                     ↓
                Send batch to API
                     ↓
                Publish results
```

**n8n Workflow**:
1. **Kafka Trigger**: Enable batching (100 messages)
2. **Aggregate Node**: Combine into array
3. **HTTP Request**: POST batch
4. **Kafka Producer**: Send results

### Pattern 5: Change Data Capture (CDC) to Kafka

**Use Case**: Stream database changes to Kafka

```
[Cron Trigger] → [PostgreSQL] → [Compare] → [Kafka Producer]
     ↓               ↓              ↓
  Every 1 min    Get new rows   Find diffs
                                    ↓
                              Publish changes
```

**n8n Workflow**:
1. **Cron**: Every 1 minute
2. **PostgreSQL**: SELECT new rows (WHERE updated_at > last_run)
3. **Function Node**: Detect changes (INSERT/UPDATE/DELETE)
4. **Kafka Producer**: Send CDC events

## Best Practices

### 1. Use Consumer Groups for Parallel Processing

✅ **DO**:
```
Workflow Instance 1:
  Consumer Group: order-processor
  Partition: 0, 1, 2

Workflow Instance 2:
  Consumer Group: order-processor
  Partition: 3, 4, 5
```

❌ **DON'T**:
```
// WRONG: No consumer group (all instances get all messages!)
Consumer Group: (empty)
```

### 2. Handle Errors with Try/Catch

✅ **DO**:
```
[Kafka Trigger]
  ↓
[Try] → [HTTP Request] → [Success Handler]
  ↓
[Catch] → [Error Handler] → [Kafka DLQ]
```

❌ **DON'T**:
```
// WRONG: No error handling (workflow crashes on failure!)
[Kafka Trigger] → [HTTP Request] → [Database]
```

### 3. Use Environment Variables for Credentials

✅ **DO**:
```
Kafka Brokers: {{$env.KAFKA_BROKERS}}
SASL Username: {{$env.KAFKA_USER}}
SASL Password: {{$env.KAFKA_PASSWORD}}
```

❌ **DON'T**:
```
// WRONG: Hardcoded credentials in workflow!
Kafka Brokers: "localhost:9092"
SASL Username: "admin"
SASL Password: "admin-secret"
```

### 4. Set Explicit Partitioning Keys

✅ **DO**:
```
Kafka Producer:
  Topic: orders
  Key: {{$json.customerId}}  // Partition by customer
  Message: {{$json}}
```

❌ **DON'T**:
```
// WRONG: No key (random partitioning!)
Kafka Producer:
  Topic: orders
  Message: {{$json}}
```

### 5. Monitor Consumer Lag

**Setup Prometheus metrics export**:
```
[Cron Trigger] → [Kafka Admin] → [Get Consumer Lag] → [Prometheus]
     ↓               ↓                   ↓
  Every 30s    List consumer groups   Calculate lag
                                           ↓
                                   Push to Pushgateway
```

## Error Handling Strategies

### Strategy 1: Exponential Backoff Retry

```javascript
// Function Node (Calculate Backoff)
const retryCount = $json.headers?.['retry-count'] || 0;
const backoffMs = Math.min(1000 * Math.pow(2, retryCount), 60000); // Max 60 seconds

return {
  retryCount: retryCount + 1,
  backoffMs,
  nextRetryAt: new Date(Date.now() + backoffMs).toISOString()
};
```

**Workflow**:
1. Try processing
2. On failure: Calculate backoff
3. Wait (using Wait node)
4. Retry (send to retry topic)
5. If max retries reached: Send to DLQ

### Strategy 2: Circuit Breaker

```javascript
// Function Node (Check Failure Rate)
const failures = $json.metrics.failures || 0;
const total = $json.metrics.total || 1;
const failureRate = failures / total;

if (failureRate > 0.5) {
  // Circuit open (too many failures)
  return { circuitState: 'OPEN', skipProcessing: true };
}

return { circuitState: 'CLOSED', skipProcessing: false };
```

**Workflow**:
1. Track success/failure metrics
2. Calculate failure rate
3. If >50% failures: Open circuit (stop processing)
4. Wait 30 seconds
5. Try single request (half-open)
6. If success: Close circuit (resume)

### Strategy 3: Idempotent Processing

```javascript
// Function Node (Deduplication)
const messageId = $json.headers?.['message-id'];
const cache = $('Redis').get(messageId);

if (cache) {
  // Already processed, skip
  return { skip: true, reason: 'duplicate' };
}

// Process and cache
await $('Redis').set(messageId, 'processed', { ttl: 3600 });
return { skip: false };
```

**Workflow**:
1. Extract message ID
2. Check Redis cache
3. If exists: Skip processing
4. Process message
5. Store message ID in cache (1 hour TTL)

## Performance Optimization

### 1. Batch Processing

**Enable batching in Kafka Trigger**:
```
Kafka Trigger:
  Batch Size: 100
  Batch Timeout: 5000ms  // Max wait 5 seconds
```

**Process batch**:
```javascript
// Function Node (Batch Transform)
const events = $input.all();
const transformed = events.map(event => ({
  id: event.json.id,
  timestamp: event.json.timestamp,
  processed: true
}));

return transformed;
```

### 2. Parallel Processing with Split in Batches

```
[Kafka Trigger] → [Split in Batches] → [HTTP Request] → [Aggregate]
     ↓                  ↓                     ↓
  1000 events      100 at a time       Parallel API calls
                                            ↓
                                      Combine results
```

### 3. Use Compression

**Kafka Producer**:
```
Compression: lz4  // Or gzip, snappy
Batch Size: 1000  // Larger batches = better compression
```

## Integration Patterns

### Pattern 1: Kafka + HTTP API Enrichment

```
[Kafka Trigger] → [HTTP Request] → [Transform] → [Kafka Producer]
     ↓                 ↓                ↓
  Raw events      Enrich from API   Combine data
                                         ↓
                                  Publish enriched
```

### Pattern 2: Kafka + Database Sync

```
[Kafka Trigger] → [PostgreSQL Upsert] → [Kafka Producer]
     ↓                   ↓                    ↓
  CDC events      Update database    Publish success/failure
```

### Pattern 3: Kafka + Email Notifications

```
[Kafka Trigger] → [If Critical] → [Send Email] → [Kafka Producer]
     ↓                ↓                ↓
  Alerts        severity=critical  Notify admin
                                        ↓
                                   Publish alert sent
```

### Pattern 4: Kafka + Slack Alerts

```
[Kafka Trigger] → [Transform] → [Slack] → [Kafka Producer]
     ↓               ↓            ↓
  Errors        Format message  Send to #alerts
                                     ↓
                                Publish notification
```

## Testing n8n Workflows

### Manual Testing

1. **Test with Sample Data**:
   - Right-click node → "Add Sample Data"
   - Execute workflow
   - Check outputs

2. **Test Kafka Producer**:
   ```bash
   # Consume test topic
   kcat -C -b localhost:9092 -t test-output -o beginning
   ```

3. **Test Kafka Trigger**:
   ```bash
   # Produce test message
   echo '{"test": "data"}' | kcat -P -b localhost:9092 -t test-input
   ```

### Automated Testing

**n8n CLI**:
```bash
# Execute workflow with input
n8n execute workflow --file workflow.json --input data.json

# Export workflow
n8n export:workflow --id=123 --output=workflow.json
```

## Common Issues & Solutions

### Issue 1: Consumer Lag Building Up

**Symptoms**: Processing slower than message arrival

**Solutions**:
1. Increase consumer group size (parallel processing)
2. Enable batching (process 100 messages at once)
3. Optimize HTTP requests (use connection pooling)
4. Use Split in Batches for parallel processing

### Issue 2: Duplicate Messages

**Cause**: At-least-once delivery, no deduplication

**Solution**: Add idempotency check:
```javascript
// Check if message already processed
const messageId = $json.headers?.['message-id'];
const exists = await $('Redis').exists(messageId);

if (exists) {
  return { skip: true };
}
```

### Issue 3: Workflow Execution Timeout

**Cause**: Long-running HTTP requests

**Solution**: Use async patterns:
```
[Kafka Trigger] → [Webhook] → [Wait for Webhook] → [Process Response]
     ↓               ↓
  Trigger job    Async callback
                     ↓
                 Continue workflow
```

## References

- n8n Kafka Trigger: https://docs.n8n.io/integrations/builtin/trigger-nodes/n8n-nodes-base.kafkatrigger/
- n8n Kafka Producer: https://docs.n8n.io/integrations/builtin/app-nodes/n8n-nodes-base.kafka/
- n8n Best Practices: https://docs.n8n.io/hosting/scaling/best-practices/
- Workflow Examples: https://n8n.io/workflows

---

**Invoke me when you need n8n Kafka integration, workflow automation, or event-driven no-code patterns!**

Overview

This skill is an expert guide for building n8n workflows that integrate with Apache Kafka for event-driven, no-code/low-code automation. It focuses on Kafka Trigger and Producer nodes, consumer group strategies, retries, DLQs, batching, and common integration patterns. Practical patterns and concrete configs help you move from prototype to reliable production workflows.

How this skill works

I explain how the Kafka Trigger consumes messages and how the Kafka Producer publishes messages, including batching, keys, headers, compression, and offset management. I cover workflow patterns like enrichment, fan‑out, retries with DLQ, batch aggregation, and CDC, and show how to wire Try/Catch, Wait, and Split nodes for robust processing. Configuration examples use environment variables for credentials and show best practices for partitioning and monitoring.

When to use it

  • Build event-driven workflows that react to Kafka messages without writing custom code
  • Publish enriched or transformed events back to Kafka from n8n
  • Implement retries, DLQ, and exponential backoff for unreliable downstream services
  • Process large volumes with batching and split-into-batches for parallel calls
  • Stream CDC data from a database into Kafka or sync Kafka events to a database

Best practices

  • Use consumer groups to scale consumers and avoid duplicate full-stream consumption
  • Keep credentials in environment variables ({{$env.*}}) rather than hardcoding
  • Set explicit partitioning keys to preserve ordering for entities like customerId
  • Add Try/Catch and DLQ flows; increment retry headers and cap attempts
  • Monitor consumer lag (export metrics to Prometheus) and tune batch size/timeout

Example use cases

  • Enrich orders: Kafka Trigger → HTTP Request to customer API → Transform → PostgreSQL insert
  • Fan-out: route high-value orders to multiple topics with Switch and multiple Kafka Producers
  • Retry with DLQ: on failure increment retry-count header and send to retry or dlq topic
  • Batch aggregation: collect 100 events, call batch API, publish results to Kafka
  • CDC sync: cron-select new rows, diff, and publish change events to Kafka

FAQ

How do I avoid duplicate processing?

Implement idempotency checks using a message-id and a cache (Redis) before processing; store processed IDs with a TTL.

How should I handle long-running external calls?

Use async patterns: hand off work via webhook or job queue, or use Wait nodes and retries with exponential backoff; push long tasks to background workers and publish results back to Kafka.