home / skills / terrylica / cc-skills / ml-data-pipeline-architecture

ml-data-pipeline-architecture skill

/plugins/devops-tools/skills/ml-data-pipeline-architecture

This skill helps optimize ML data pipelines by choosing Polars, Arrow, and ClickHouse patterns for memory-efficient, lazy, zero-copy processing.

npx playbooks add skill terrylica/cc-skills --skill ml-data-pipeline-architecture

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
9.9 KB
---
name: ml-data-pipeline-architecture
description: Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse. TRIGGERS - data pipeline, polars vs pandas, arrow format, clickhouse ml, efficient loading, zero-copy, memory optimization.
---

# ML Data Pipeline Architecture

Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.

**ADR**: [2026-01-22-polars-preference-hook](/docs/adr/2026-01-22-polars-preference-hook.md) (efficiency preferences framework)

> **Note**: A PreToolUse hook enforces Polars preference. To use Pandas, add `# polars-exception: <reason>` at file top.

## When to Use This Skill

Use this skill when:

- Deciding between Polars and Pandas for a data pipeline
- Optimizing memory usage with zero-copy Arrow patterns
- Loading data from ClickHouse into PyTorch DataLoaders
- Implementing lazy evaluation for large datasets
- Migrating existing Pandas code to Polars

---

## 1. Decision Tree: Polars vs Pandas

```
Dataset size?
├─ < 1M rows → Pandas OK (simpler API, richer ecosystem)
├─ 1M-10M rows → Consider Polars (2-5x faster, less memory)
└─ > 10M rows → Use Polars (required for memory efficiency)

Operations?
├─ Simple transforms → Either works
├─ Group-by aggregations → Polars 5-10x faster
├─ Complex joins → Polars with lazy evaluation
└─ Streaming/chunked → Polars scan_* functions

Integration?
├─ scikit-learn heavy → Pandas (better interop)
├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor)
└─ ClickHouse source → Arrow stream → Polars (optimal)
```

---

## 2. Zero-Copy Pipeline Architecture

### The Problem with Pandas

```python
# BAD: 3 memory copies
df = pd.read_sql(query, conn)     # Copy 1: DB → pandas
X = df[features].values           # Copy 2: pandas → numpy
tensor = torch.from_numpy(X)      # Copy 3: numpy → tensor
# Peak memory: 3x data size
```

### The Solution with Arrow

```python
# GOOD: 0-1 memory copies
import clickhouse_connect
import polars as pl
import torch

client = clickhouse_connect.get_client(...)
arrow_table = client.query_arrow("SELECT * FROM bars")  # Arrow in DB memory
df = pl.from_arrow(arrow_table)                          # Zero-copy view
X = df.select(features).to_numpy()                       # Single allocation
tensor = torch.from_numpy(X)                             # View (no copy)
# Peak memory: 1.2x data size
```

---

## 3. ClickHouse Integration Patterns

### Pattern A: Arrow Stream (Recommended)

```python
def query_arrow(client, query: str) -> pl.DataFrame:
    """ClickHouse → Arrow → Polars (zero-copy chain)."""
    arrow_table = client.query_arrow(f"{query} FORMAT ArrowStream")
    return pl.from_arrow(arrow_table)

# Usage
df = query_arrow(client, "SELECT * FROM bars WHERE ts >= '2024-01-01'")
```

### Pattern B: Polars Native (Simpler)

```python
# Polars has native ClickHouse support (see pola.rs for version requirements)
df = pl.read_database_uri(
    query="SELECT * FROM bars",
    uri="clickhouse://user:pass@host/db"
)
```

### Pattern C: Parquet Export (Batch Jobs)

```python
# For reproducible batch processing
client.query("SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet")
df = pl.scan_parquet("data.parquet")  # Lazy, memory-mapped
```

---

## 4. PyTorch DataLoader Integration

### Minimal Change Pattern

```python
from torch.utils.data import TensorDataset, DataLoader

# Accept both pandas and polars
def prepare_data(df) -> tuple[torch.Tensor, torch.Tensor]:
    if isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df)

    X = df.select(features).to_numpy()
    y = df.select(target).to_numpy()

    return (
        torch.from_numpy(X).float(),
        torch.from_numpy(y).float()
    )

X, y = prepare_data(df)
dataset = TensorDataset(X, y)
loader = DataLoader(dataset, batch_size=32, pin_memory=True)
```

### Custom PolarsDataset (Large Data)

```python
class PolarsDataset(torch.utils.data.Dataset):
    """Memory-efficient dataset from Polars DataFrame."""

    def __init__(self, df: pl.DataFrame, features: list[str], target: str):
        self.arrow = df.to_arrow()  # Arrow backing for zero-copy slicing
        self.features = features
        self.target = target

    def __len__(self) -> int:
        return self.arrow.num_rows

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        row = self.arrow.slice(idx, 1)
        x = torch.tensor([row[f][0].as_py() for f in self.features], dtype=torch.float32)
        y = torch.tensor(row[self.target][0].as_py(), dtype=torch.float32)
        return x, y
```

---

## 5. Lazy Evaluation Patterns

### Pipeline Composition

```python
# Define transformations lazily (no computation yet)
pipeline = (
    pl.scan_parquet("raw_data.parquet")
    .filter(pl.col("timestamp") >= start_date)
    .with_columns([
        (pl.col("close").pct_change()).alias("returns"),
        (pl.col("volume").log()).alias("log_volume"),
    ])
    .select(features + [target])
)

# Execute only when needed
train_df = pipeline.filter(pl.col("timestamp") < split_date).collect()
test_df = pipeline.filter(pl.col("timestamp") >= split_date).collect()
```

### Streaming for Large Files

```python
# Process file in chunks (never loads full file)
def process_large_file(path: str, chunk_size: int = 100_000):
    reader = pl.scan_parquet(path)

    for batch in reader.iter_batches(n_rows=chunk_size):
        # Process each chunk
        features = compute_features(batch)
        yield features.to_numpy()
```

---

## 6. Schema Validation

### Pydantic for Config

```python
from pydantic import BaseModel, field_validator

class FeatureConfig(BaseModel):
    features: list[str]
    target: str
    seq_len: int = 15

    @field_validator("features")
    @classmethod
    def validate_features(cls, v):
        required = {"returns_vs", "momentum_z", "atr_pct"}
        missing = required - set(v)
        if missing:
            raise ValueError(f"Missing required features: {missing}")
        return v
```

### DataFrame Schema Validation

```python
def validate_schema(df: pl.DataFrame, required: list[str], stage: str) -> None:
    """Fail-fast schema validation."""
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(
            f"[{stage}] Missing columns: {missing}\n"
            f"Available: {sorted(df.columns)}"
        )
```

---

## 7. Performance Benchmarks

| Operation      | Pandas | Polars | Speedup |
| -------------- | ------ | ------ | ------- |
| Read CSV (1GB) | 45s    | 4s     | 11x     |
| Filter rows    | 2.1s   | 0.4s   | 5x      |
| Group-by agg   | 3.8s   | 0.3s   | 13x     |
| Sort           | 5.2s   | 0.4s   | 13x     |
| Memory peak    | 10GB   | 2.5GB  | 4x      |

_Benchmark: 50M rows, 20 columns, MacBook M2_

---

## 8. Migration Checklist

### Phase 1: Add Arrow Support

- [ ] Add `polars = "<version>"` to dependencies (see [PyPI](https://pypi.org/project/polars/))
- [ ] Implement `query_arrow()` in data client
- [ ] Verify zero-copy with memory profiler

### Phase 2: Polars at Entry Points

- [ ] Add `pl.from_pandas()` wrapper at trainer entry
- [ ] Update `prepare_sequences()` to accept both types
- [ ] Add schema validation after conversion

### Phase 3: Full Lazy Evaluation

- [ ] Convert file reads to `pl.scan_*`
- [ ] Compose transformations lazily
- [ ] Call `.collect()` only before `.to_numpy()`

---

## 9. Anti-Patterns to Avoid

### DON'T: Mix APIs Unnecessarily

```python
# BAD: Convert back and forth
df_polars = pl.from_pandas(df_pandas)
df_pandas_again = df_polars.to_pandas()  # Why?
```

### DON'T: Collect Too Early

```python
# BAD: Defeats lazy evaluation
df = pl.scan_parquet("data.parquet").collect()  # Full load
filtered = df.filter(...)  # After the fact

# GOOD: Filter before collect
df = pl.scan_parquet("data.parquet").filter(...).collect()
```

### DON'T: Ignore Memory Pressure

```python
# BAD: Loads entire file
df = pl.read_parquet("huge_file.parquet")

# GOOD: Stream in chunks
for batch in pl.scan_parquet("huge_file.parquet").iter_batches():
    process(batch)
```

---

## References

- [Polars User Guide](https://docs.pola.rs/)
- [Polars Migration Guide](https://docs.pola.rs/user-guide/migration/pandas/)
- [Apache Arrow Python](https://arrow.apache.org/docs/python/)
- [ClickHouse Python Client](https://clickhouse.com/docs/integrations/python)
- [PyTorch Data Loading](https://pytorch.org/tutorials/beginner/data_loading_tutorial.html)
- [Polars Preference Hook ADR](/docs/adr/2026-01-22-polars-preference-hook.md)

---

## Troubleshooting

| Issue                       | Cause                            | Solution                                             |
| --------------------------- | -------------------------------- | ---------------------------------------------------- |
| Memory spike during load    | Collecting too early             | Use lazy evaluation, call collect() only when needed |
| Arrow conversion fails      | Unsupported data type            | Check for object columns, convert to native types    |
| ClickHouse connection error | Wrong port or credentials        | Verify host:8123 (HTTP) or host:9000 (native)        |
| Zero-copy not working       | Intermediate pandas conversion   | Remove to_pandas() calls, stay in Arrow/Polars       |
| Polars hook blocking code   | Pandas used without exception    | Add `# polars-exception: reason` comment at file top |
| Slow group-by operations    | Using pandas for large datasets  | Migrate to Polars for 5-10x speedup                  |
| Schema validation failure   | Column names case-sensitive      | Verify exact column names from source                |
| PyTorch DataLoader OOM      | Loading full dataset into memory | Use PolarsDataset with Arrow backing for lazy access |
| Parquet scan performance    | Not using predicate pushdown     | Add filters before collect() for lazy evaluation     |
| Type mismatch in tensor     | Float64 vs Float32 mismatch      | Explicitly cast with .cast(pl.Float32) before numpy  |

Overview

This skill captures proven architecture patterns for efficient ML data pipelines using Polars, Apache Arrow, and ClickHouse. It explains decision criteria, zero-copy data flows, lazy evaluation, and scalable PyTorch integration to reduce memory use and speed up ETL and training. The guidance is practical, migration-ready, and focused on production constraints.

How this skill works

I describe when to prefer Polars over Pandas and how to wire Arrow-backed transfers from ClickHouse into Polars for minimal copies. The patterns include Arrow stream ingestion, lazy scan APIs, Parquet batch flows, and a memory-efficient bridge into PyTorch DataLoaders. I also cover schema validation, benchmarks, migration steps, and anti-patterns to avoid common traps.

When to use it

  • Choosing Polars vs Pandas based on dataset size and operation type
  • Building zero-copy pipelines from ClickHouse into ML training
  • Optimizing memory and throughput for >1M-row datasets
  • Converting batch jobs to lazy, predicate-pushed pipelines
  • Feeding large datasets into PyTorch without loading full dataset into RAM

Best practices

  • Prefer Polars for >1M rows and heavy group-by or join workloads
  • Use ClickHouse → ArrowStream → pl.from_arrow for zero-copy ingestion
  • Compose transformations with pl.scan_* and call .collect() only at the end
  • Validate schemas early and fail-fast with clear error messages
  • Avoid round-tripping to pandas; keep data in Arrow/Polars to prevent extra copies

Example use cases

  • Stream large ClickHouse result sets into a Polars-backed training pipeline with 1.2x peak memory
  • Migrate a Pandas-based ETL to Polars using Arrow transfer and lazy scans to cut runtime by 5–10x
  • Serve batches to PyTorch using a PolarsDataset that slices an Arrow table for per-row access
  • Export reproducible batch inputs to Parquet, process lazily, and apply predicate pushdown
  • Implement schema-driven feature validation with Pydantic before model training

FAQ

When should I still use Pandas?

Use Pandas for small datasets (<1M rows), or when you rely on a Pandas-only ecosystem like certain scikit-learn utilities.

How do I ensure zero-copy from ClickHouse to tensors?

Stream results as Arrow, convert to Polars with pl.from_arrow, select features and call .to_numpy() then torch.from_numpy() to avoid intermediate copies.

What are the common causes of Arrow conversion failures?

Unsupported object-typed columns or mixed types; convert those columns to native types before Arrow serialization.