home / skills / eyadsibai / ltk / data-engineering

This skill helps you design and optimize data pipelines, warehouses, and ETL/ELT processes using modern data stack practices.

npx playbooks add skill eyadsibai/ltk --skill data-engineering

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
4.8 KB
---
name: data-engineering
description: Use when "data pipelines", "ETL", "data warehousing", "data lakes", or asking about "Airflow", "Spark", "dbt", "Snowflake", "BigQuery", "data modeling"
version: 1.0.0
---

<!-- Adapted from: claude-skills/engineering-team/senior-data-engineer -->

# Data Engineering Guide

Data pipelines, warehousing, and modern data stack.

## When to Use

- Building data pipelines
- Designing data warehouses
- Implementing ETL/ELT processes
- Setting up data lakes
- Optimizing data infrastructure

## Modern Data Stack

### Components

```
Sources → Ingestion → Storage → Transform → Serve → Consume
```

| Layer | Tools |
|-------|-------|
| Ingestion | Fivetran, Airbyte, Stitch |
| Storage | S3, GCS, Snowflake, BigQuery |
| Transform | dbt, Spark, Airflow |
| Orchestration | Airflow, Dagster, Prefect |
| Serving | Looker, Tableau, Metabase |

## Data Pipeline Patterns

### Batch Processing

```python
# Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    'daily_etl',
    schedule_interval='0 6 * * *',
    start_date=datetime(2024, 1, 1)
)

def extract():
    # Extract from source
    pass

def transform():
    # Transform data
    pass

def load():
    # Load to warehouse
    pass

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag
)

extract_task >> transform_task >> load_task
```

### Streaming Processing

```python
# Kafka consumer example
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    process_event(message.value)
```

## dbt Patterns

### Model Structure

```
models/
├── staging/           # 1:1 with source
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/      # Business logic
│   └── int_order_items.sql
└── marts/             # Final models
    ├── dim_customers.sql
    └── fct_orders.sql
```

### Example Model

```sql
-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

select
    o.order_id,
    o.customer_id,
    o.order_date,
    sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
    on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3
```

## Data Modeling

### Dimensional Modeling

```
Fact Tables (events/transactions)
├── fct_orders
├── fct_page_views
└── fct_transactions

Dimension Tables (context)
├── dim_customers
├── dim_products
├── dim_dates
└── dim_locations
```

### Star Schema

```
        dim_customers
              │
dim_dates ── fct_orders ── dim_products
              │
        dim_locations
```

## Data Quality

### Validation Rules

```sql
-- dbt tests
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_total
        tests:
          - not_null
          - positive_value
```

### Quality Metrics

| Metric | Description |
|--------|-------------|
| Completeness | % non-null values |
| Uniqueness | % distinct values |
| Timeliness | Data freshness |
| Accuracy | Matches source |
| Consistency | Across systems |

## Performance Optimization

### Partitioning

```sql
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders
```

### Query Optimization

| Technique | Impact |
|-----------|--------|
| Partitioning | Reduce scanned data |
| Clustering | Improve filter speed |
| Materialization | Pre-compute joins |
| Caching | Reduce repeat queries |

## Monitoring

### Pipeline Metrics

| Metric | Alert Threshold |
|--------|-----------------|
| Runtime | >2x normal |
| Row count | ±20% variance |
| Freshness | >SLA |
| Failures | Any failure |

### Data Observability

```yaml
# Monte Carlo / Elementary example
monitors:
  - table: fct_orders
    tests:
      - freshness:
          threshold: 6 hours
      - volume:
          threshold: 10%
      - schema_change: true
```

## Best Practices

### Pipeline Design

- Idempotent operations
- Incremental processing
- Clear data lineage
- Automated testing

### Data Governance

- Document all models
- Track data lineage
- Implement access controls
- Version control SQL

### Cost Management

- Monitor query costs
- Use partitioning
- Schedule off-peak
- Archive old data

Overview

This skill helps design, build, and operate data pipelines, warehouses, and lakes across modern data stacks. It covers ingestion, storage, transformation, orchestration, serving, and observability using tools like Airflow, Spark, dbt, Snowflake, and BigQuery. Use it to translate business needs into reliable, maintainable ETL/ELT systems.

How this skill works

The skill inspects common pipeline patterns (batch and streaming), data modeling approaches (dimensional and star schemas), and transform frameworks (dbt and Spark). It provides practical examples for orchestration, partitioning, query optimization, testing, and monitoring to ensure data quality and performance. It also recommends governance, cost control, and deployment patterns for production systems.

When to use it

  • Designing end-to-end data pipelines (ingest → transform → serve)
  • Implementing ETL/ELT with Airflow, dbt, or Spark
  • Modeling data warehouses and defining fact/dimension tables
  • Setting up data lakes, partitioning, and storage strategies
  • Building monitoring and data quality checks for production pipelines

Best practices

  • Make operations idempotent and prefer incremental processing to limit cost and runtime
  • Organize dbt models into staging → intermediate → marts for clarity and lineage
  • Apply partitioning and clustering to reduce scanned data and speed filters
  • Automate tests and freshness checks; fail fast on schema drift or large volume variance
  • Document models, track lineage, and enforce access controls via versioned SQL and policies

Example use cases

  • Create a daily Airflow DAG that extracts source data, runs dbt transforms, and loads marts to BigQuery
  • Design a star-schema warehouse with fct_orders and dim_customers to support BI dashboards
  • Implement streaming consumers for event data with Kafka and near-real-time transformation into a serving table
  • Add dbt tests and Monte Carlo/Elementary monitors for freshness, volume, and schema changes
  • Optimize a large table by partitioning on order_date and clustering by customer_id to lower query costs

FAQ

What pipeline pattern should I pick: batch or streaming?

Choose batch for periodic, high-throughput workloads and where eventual latency is acceptable. Use streaming when you need low-latency event processing or real-time analytics.

How do I keep costs under control on cloud warehouses?

Monitor query usage, use partitioning and clustering, materialize heavy joins, schedule expensive workloads off-peak, and archive old partitions or cold data to cheaper storage.