home / skills / anton-abyzov / specweave / confluent-kafka-connect

confluent-kafka-connect skill

/plugins/specweave-confluent/skills/confluent-kafka-connect

This skill helps you design and troubleshoot Kafka Connect pipelines with source and sink connectors, Debezium CDC, and SMT transformations.

This is most likely a fork of the sw-confluent-kafka-connect skill from openclaw
npx playbooks add skill anton-abyzov/specweave --skill confluent-kafka-connect

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
10.8 KB
---
name: confluent-kafka-connect
description: Kafka Connect integration expert. Covers source and sink connectors, JDBC, Elasticsearch, S3, Debezium CDC, SMT (Single Message Transforms), connector configuration, and data pipeline patterns. Activates for kafka connect, connectors, source connector, sink connector, jdbc connector, debezium, smt, data pipeline, cdc.
---

# Confluent Kafka Connect Skill

Expert knowledge of Kafka Connect for building data pipelines with source and sink connectors.

## What I Know

### Connector Types

**Source Connectors** (External System → Kafka):
- JDBC Source: Databases → Kafka
- Debezium: CDC (MySQL, PostgreSQL, MongoDB) → Kafka
- S3 Source: AWS S3 files → Kafka
- File Source: Local files → Kafka

**Sink Connectors** (Kafka → External System):
- JDBC Sink: Kafka → Databases
- Elasticsearch Sink: Kafka → Elasticsearch
- S3 Sink: Kafka → AWS S3
- HDFS Sink: Kafka → Hadoop HDFS

**Single Message Transforms (SMTs)**:
- Field operations: Insert, Mask, Replace, TimestampConverter
- Routing: RegexRouter, TimestampRouter
- Filtering: Filter, Predicates

## When to Use This Skill

Activate me when you need help with:
- Connector setup ("Configure JDBC connector")
- CDC patterns ("Debezium MySQL CDC")
- Data pipelines ("Stream database changes to Kafka")
- SMT transforms ("Mask sensitive fields")
- Connector troubleshooting ("Connector task failed")

## Common Patterns

### Pattern 1: JDBC Source (Database → Kafka)

**Use Case**: Stream database table changes to Kafka

**Configuration**:
```json
{
  "name": "jdbc-source-users",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "postgres",
    "connection.password": "password",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "postgres-",
    "table.whitelist": "users,orders",
    "poll.interval.ms": "5000"
  }
}
```

**Modes**:
- `incrementing`: Track by auto-increment ID
- `timestamp`: Track by timestamp column
- `timestamp+incrementing`: Both (most reliable)

### Pattern 2: Debezium CDC (MySQL → Kafka)

**Use Case**: Capture all database changes (INSERT/UPDATE/DELETE)

**Configuration**:
```json
{
  "name": "debezium-mysql-cdc",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "1",
    "database.server.name": "mysql",
    "database.include.list": "mydb",
    "table.include.list": "mydb.users,mydb.orders",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "schema.history.internal.kafka.topic": "schema-changes.mydb"
  }
}
```

**Output Format** (Debezium Envelope):
```json
{
  "before": null,
  "after": {
    "id": 1,
    "name": "John Doe",
    "email": "[email protected]"
  },
  "source": {
    "version": "1.9.0",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1620000000000,
    "snapshot": "false",
    "db": "mydb",
    "table": "users",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 12345,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "c",  // c=CREATE, u=UPDATE, d=DELETE, r=READ
  "ts_ms": 1620000000000
}
```

### Pattern 3: JDBC Sink (Kafka → Database)

**Use Case**: Write Kafka events to PostgreSQL

**Configuration**:
```json
{
  "name": "jdbc-sink-enriched-orders",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "3",
    "topics": "enriched-orders",
    "connection.url": "jdbc:postgresql://localhost:5432/analytics",
    "connection.user": "postgres",
    "connection.password": "password",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_value",
    "pk.fields": "order_id",
    "table.name.format": "orders_${topic}"
  }
}
```

**Insert Modes**:
- `insert`: Append only (fails on duplicate)
- `update`: Update only (requires PK)
- `upsert`: INSERT or UPDATE (recommended)

### Pattern 4: S3 Sink (Kafka → AWS S3)

**Use Case**: Archive Kafka topics to S3

**Configuration**:
```json
{
  "name": "s3-sink-events",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "topics": "user-events,order-events",
    "s3.region": "us-east-1",
    "s3.bucket.name": "my-kafka-archive",
    "s3.part.size": "5242880",
    "flush.size": "1000",
    "rotate.interval.ms": "60000",
    "rotate.schedule.interval.ms": "3600000",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "US",
    "timestamp.extractor": "Record"
  }
}
```

**Partitioning** (S3 folder structure):
```
s3://my-kafka-archive/
  topics/user-events/year=2025/month=01/day=15/hour=10/
    user-events+0+0000000000.json
    user-events+0+0000001000.json
  topics/order-events/year=2025/month=01/day=15/hour=10/
    order-events+0+0000000000.json
```

### Pattern 5: Elasticsearch Sink (Kafka → Elasticsearch)

**Use Case**: Index Kafka events for search

**Configuration**:
```json
{
  "name": "elasticsearch-sink-logs",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "3",
    "topics": "application-logs",
    "connection.url": "http://localhost:9200",
    "connection.username": "elastic",
    "connection.password": "password",
    "key.ignore": "true",
    "schema.ignore": "true",
    "type.name": "_doc",
    "index.write.wait_for_active_shards": "1"
  }
}
```

## Single Message Transforms (SMTs)

### Transform 1: Mask Sensitive Fields

**Use Case**: Hide email/phone in Kafka topics

**Configuration**:
```json
{
  "transforms": "maskEmail",
  "transforms.maskEmail.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.maskEmail.fields": "email,phone"
}
```

**Before**:
```json
{"id": 1, "name": "John", "email": "[email protected]", "phone": "555-1234"}
```

**After**:
```json
{"id": 1, "name": "John", "email": null, "phone": null}
```

### Transform 2: Add Timestamp

**Use Case**: Add processing timestamp to all messages

**Configuration**:
```json
{
  "transforms": "insertTimestamp",
  "transforms.insertTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.insertTimestamp.timestamp.field": "processed_at"
}
```

### Transform 3: Route by Field Value

**Use Case**: Route high-value orders to separate topic

**Configuration**:
```json
{
  "transforms": "routeByValue",
  "transforms.routeByValue.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.routeByValue.regex": "(.*)",
  "transforms.routeByValue.replacement": "$1-high-value",
  "transforms.routeByValue.predicate": "isHighValue",
  "predicates": "isHighValue",
  "predicates.isHighValue.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
  "predicates.isHighValue.pattern": "orders"
}
```

### Transform 4: Flatten Nested JSON

**Use Case**: Flatten nested structures for JDBC sink

**Configuration**:
```json
{
  "transforms": "flatten",
  "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.flatten.delimiter": "_"
}
```

**Before**:
```json
{
  "user": {
    "id": 1,
    "profile": {
      "name": "John",
      "email": "[email protected]"
    }
  }
}
```

**After**:
```json
{
  "user_id": 1,
  "user_profile_name": "John",
  "user_profile_email": "[email protected]"
}
```

## Best Practices

### 1. Use Idempotent Connectors

✅ **DO**:
```json
// JDBC Sink with upsert mode
{
  "insert.mode": "upsert",
  "pk.mode": "record_value",
  "pk.fields": "id"
}
```

❌ **DON'T**:
```json
// WRONG: insert mode (duplicates on restart!)
{
  "insert.mode": "insert"
}
```

### 2. Monitor Connector Status

```bash
# Check connector status
curl http://localhost:8083/connectors/jdbc-source-users/status

# Check task status
curl http://localhost:8083/connectors/jdbc-source-users/tasks/0/status
```

### 3. Use Schema Registry

✅ **DO**:
```json
{
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}
```

### 4. Configure Error Handling

```json
{
  "errors.tolerance": "all",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "errors.deadletterqueue.topic.name": "dlq-jdbc-sink",
  "errors.deadletterqueue.context.headers.enable": "true"
}
```

## Connector Management

### Deploy Connector

```bash
# Create connector via REST API
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @jdbc-source.json

# Update connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/config \
  -H "Content-Type: application/json" \
  -d @jdbc-source.json
```

### Monitor Connectors

```bash
# List all connectors
curl http://localhost:8083/connectors

# Get connector info
curl http://localhost:8083/connectors/jdbc-source-users

# Get connector status
curl http://localhost:8083/connectors/jdbc-source-users/status

# Get connector tasks
curl http://localhost:8083/connectors/jdbc-source-users/tasks
```

### Pause/Resume Connectors

```bash
# Pause connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/pause

# Resume connector
curl -X PUT http://localhost:8083/connectors/jdbc-source-users/resume

# Restart connector
curl -X POST http://localhost:8083/connectors/jdbc-source-users/restart

# Restart task
curl -X POST http://localhost:8083/connectors/jdbc-source-users/tasks/0/restart
```

## Common Issues & Solutions

### Issue 1: Connector Task Failed

**Symptoms**: Task state = FAILED

**Solutions**:
1. Check connector logs: `docker logs connect-worker`
2. Validate configuration: `curl http://localhost:8083/connector-plugins/<class>/config/validate`
3. Restart task: `curl -X POST .../tasks/0/restart`

### Issue 2: Schema Evolution Error

**Error**: `Incompatible schema detected`

**Solution**: Enable auto-evolution:
```json
{
  "auto.create": "true",
  "auto.evolve": "true"
}
```

### Issue 3: JDBC Connection Pool Exhausted

**Error**: `Could not get JDBC connection`

**Solution**: Increase pool size:
```json
{
  "connection.attempts": "3",
  "connection.backoff.ms": "10000"
}
```

## References

- Kafka Connect Documentation: https://kafka.apache.org/documentation/#connect
- Confluent Hub: https://www.confluent.io/hub/
- Debezium Documentation: https://debezium.io/documentation/
- Transform Reference: https://kafka.apache.org/documentation/#connect_transforms

---

**Invoke me when you need Kafka Connect, connectors, CDC, or data pipeline expertise!**

Overview

This skill is a Kafka Connect integration expert for designing, configuring, and troubleshooting source and sink connectors, SMTs, and CDC pipelines. It focuses on practical connector patterns (JDBC, Debezium, S3, Elasticsearch), data flow best practices, and operational tasks for production systems. Use it to accelerate connector adoption, avoid common pitfalls, and harden pipelines for reliability and observability.

How this skill works

I inspect connector goals, recommend configuration snippets, and explain transform and error-handling options. I map common pipeline patterns (database→Kafka, Kafka→S3, Kafka→Elasticsearch) to connector settings and SMT chains and provide operational commands for deployment, status checks, pause/resume, and task restarts. I also diagnose common failures and propose corrective actions.

When to use it

  • Setting up a JDBC source or sink to stream relational data
  • Implementing Debezium CDC for reliable change capture
  • Archiving topics to S3 or indexing to Elasticsearch
  • Applying SMTs to mask, route, flatten, or timestamp records
  • Troubleshooting connector tasks, schema or connection errors

Best practices

  • Prefer idempotent write modes (upsert, pk-based) for sinks to avoid duplicates
  • Use Schema Registry and Avro/Protobuf converters for strong schema management
  • Configure errors.tolerance and a dead-letter queue for resilient pipelines
  • Monitor connector and task status via the REST API and collect worker logs
  • Limit task.parallelism and tune JDBC connection/backoff settings to avoid pool exhaustion

Example use cases

  • Stream user and order tables from PostgreSQL to Kafka using JdbcSource with timestamp+incrementing mode
  • Capture MySQL row-level changes with Debezium and publish change envelopes to topics for downstream processing
  • Archive high-volume topics into S3 with time-based partitioning and periodic rotations for cost-effective retention
  • Index application logs into Elasticsearch using the Elasticsearch sink and schema-ignore mode for flexible documents
  • Mask PII fields with MaskField SMT and add processed_at timestamps with InsertField before sinking to analytics

FAQ

How do I reduce duplicate rows when writing to a database sink?

Use upsert insert.mode with pk.mode set to record_value and define pk.fields so the connector performs idempotent writes.

What should I check when a connector task is FAILED?

Review worker logs, call the REST status endpoint for the connector and task, validate connector config via the plugin validate API, then restart the failed task.