home / skills / eyadsibai / ltk / dask

This skill helps you scale pandas and NumPy workflows beyond memory by using Dask for parallel and distributed computing across cores and clusters.

npx playbooks add skill eyadsibai/ltk --skill dask

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
5.1 KB
---
name: dask
description: Use when "Dask", "parallel computing", "distributed computing", "larger than memory", or asking about "parallel pandas", "parallel numpy", "out-of-core", "multi-file processing", "cluster computing", "lazy evaluation dataframe"
version: 1.0.0
---

# Dask Parallel and Distributed Computing

Scale pandas/NumPy workflows beyond memory and across clusters.

## When to Use

- Datasets exceed available RAM
- Need to parallelize pandas or NumPy operations
- Processing multiple files efficiently (CSVs, Parquet)
- Building custom parallel workflows
- Distributing workloads across multiple cores/machines

---

## Dask Collections

| Collection | Like | Use Case |
|------------|------|----------|
| **DataFrame** | pandas | Tabular data, CSV/Parquet |
| **Array** | NumPy | Numerical arrays, matrices |
| **Bag** | list | Unstructured data, JSON logs |
| **Delayed** | Custom | Arbitrary Python functions |

**Key concept**: All collections are lazy—computation happens only when you call `.compute()`.

---

## Lazy Evaluation

| Function | Behavior | Use |
|----------|----------|-----|
| `dd.read_csv()` | Lazy load | Large CSVs |
| `dd.read_parquet()` | Lazy load | Large Parquet |
| Operations | Build graph | Chain transforms |
| `.compute()` | Execute | Get final result |

**Key concept**: Dask builds a task graph of operations, optimizes it, then executes in parallel. Call `.compute()` once at the end, not after every operation.

---

## Schedulers

| Scheduler | Best For | Start |
|-----------|----------|-------|
| **threaded** | NumPy/Pandas (releases GIL) | Default |
| **processes** | Pure Python (GIL bound) | `scheduler='processes'` |
| **synchronous** | Debugging | `scheduler='synchronous'` |
| **distributed** | Monitoring, scaling, clusters | `Client()` |

### Distributed Scheduler

| Feature | Benefit |
|---------|---------|
| Dashboard | Real-time progress monitoring |
| Cluster scaling | Add/remove workers |
| Fault tolerance | Retry failed tasks |
| Worker resources | Memory management |

---

## Chunking Concepts

### DataFrame Partitions

| Concept | Description |
|---------|-------------|
| **Partition** | Subset of rows (like a mini DataFrame) |
| **npartitions** | Number of partitions |
| **divisions** | Index boundaries between partitions |

### Array Chunks

| Concept | Description |
|---------|-------------|
| **Chunk** | Subset of array (n-dimensional block) |
| **chunks** | Tuple of chunk sizes per dimension |
| **Optimal size** | ~100 MB per chunk |

**Key concept**: Chunk size is critical. Too small = scheduling overhead. Too large = memory issues. Target ~100 MB.

---

## DataFrame Operations

### Supported (parallel)

| Category | Operations |
|----------|------------|
| **Selection** | `filter`, `loc`, column selection |
| **Aggregation** | `groupby`, `sum`, `mean`, `count` |
| **Transforms** | `apply` (row-wise), `map_partitions` |
| **Joins** | `merge`, `join` (shuffles data) |
| **I/O** | `read_csv`, `read_parquet`, `to_parquet` |

### Avoid or Use Carefully

| Operation | Issue | Alternative |
|-----------|-------|-------------|
| `iterrows` | Kills parallelism | `map_partitions` |
| `apply(axis=1)` | Slow | `map_partitions` |
| Repeated `compute()` | Inefficient | Single `compute()` at end |
| `sort_values` | Expensive shuffle | Avoid if possible |

---

## Common Patterns

### ETL Pipeline

1. `scan_*` or `read_*` (lazy load)
2. Chain filters and transforms
3. Single `.compute()` or `.to_parquet()`

### Multi-File Processing

| Pattern | Description |
|---------|-------------|
| Glob patterns | `dd.read_csv('data/*.csv')` |
| Partition per file | Natural parallelism |
| Output partitioned | `to_parquet('output/')` |

### Custom Operations

| Method | Use Case |
|--------|----------|
| `map_partitions` | Apply function to each partition |
| `map_blocks` | Apply function to each array block |
| `delayed` | Wrap arbitrary Python functions |

---

## Best Practices

| Practice | Why |
|----------|-----|
| Don't load locally first | Let Dask handle loading |
| Single compute() at end | Avoid redundant computation |
| Use Parquet | Faster than CSV, columnar |
| Match partition to files | One partition per file |
| Check task graph size | `len(ddf.__dask_graph__())` < 100k |
| Use distributed for debugging | Dashboard shows progress |

---

## Common Pitfalls

| Pitfall | Solution |
|---------|----------|
| Loading with pandas first | Use `dd.read_*` directly |
| compute() in loops | Collect all, single compute() |
| Too many partitions | Repartition to ~100 MB each |
| Memory errors | Reduce chunk size, add workers |
| Slow shuffles | Avoid sorts/joins when possible |

---

## vs Alternatives

| Tool | Best For | Trade-off |
|------|----------|-----------|
| **Dask** | Scale pandas/NumPy, clusters | Setup complexity |
| **Polars** | Fast in-memory | Must fit in RAM |
| **Vaex** | Out-of-core single machine | Limited operations |
| **Spark** | Enterprise, SQL-heavy | Infrastructure |

## Resources

- Docs: <https://docs.dask.org/>
- Best Practices: <https://docs.dask.org/en/stable/best-practices.html>
- Examples: <https://examples.dask.org/>

Overview

This skill explains how to use Dask to scale pandas and NumPy workflows beyond single-machine memory and across clusters. It focuses on lazy, parallel collections (DataFrame, Array, Bag, Delayed) and practical patterns for processing large, multi-file datasets. The goal is clear guidance for building efficient, out-of-core and distributed pipelines.

How this skill works

Dask represents computations as lazy task graphs built from high-level collections. You construct operations (read, filter, transform) which Dask optimizes and then executes in parallel when you call .compute() or write results to disk. Dask can run with local threaded or process schedulers or use the distributed Client for cluster management, monitoring, and scaling. Chunking (partitions or array chunks) determines memory and scheduling behavior and should be tuned for ~100 MB per chunk.

When to use it

  • Datasets exceed available RAM or are larger-than-memory
  • Parallelize pandas or NumPy code without rewriting core logic
  • Process many files efficiently (CSV, Parquet) with natural partitioning
  • Build custom parallel workflows or wrap Python functions for distributed execution
  • Use a cluster for scaling, monitoring, and fault-tolerant execution

Best practices

  • Use dd.read_* and lazy APIs — don’t load with pandas first
  • Design a pipeline of lazy transforms and call a single .compute() or to_parquet() at the end
  • Prefer Parquet over CSV for speed and columnar access
  • Target ~100 MB per partition/chunk to balance overhead and memory
  • Use map_partitions/map_blocks or delayed for custom operations instead of iterrows or apply(axis=1)
  • Use the distributed scheduler and dashboard for monitoring, debugging, and scaling

Example use cases

  • ETL of multi-gigabyte logs: dd.read_csv('logs/*.csv'), filter, transform, to_parquet('out/')
  • Out-of-core numerical analysis: convert NumPy workflows to dask.array and run with threaded scheduler
  • Parallel joins and aggregations on partitioned Parquet data using Dask DataFrame
  • Wrap legacy functions with dask.delayed to parallelize independent tasks across workers
  • Build a cluster to scale model training preprocessing across many machines and inspect progress in the dashboard

FAQ

How do I pick partition or chunk size?

Aim for roughly 100 MB per partition or chunk. If tasks are too small you’ll see scheduling overhead; if too large you risk memory errors. Repartition or rechunk to adjust.

When should I use the distributed scheduler?

Use it when you need monitoring, scaling across machines, fault tolerance, or to manage worker resources. The local threaded/process schedulers are fine for single-machine use.