home / skills / microck / ordinary-claude-skills / dask

This skill helps you scale data pipelines with Dask to process data larger than memory through parallel DataFrames, arrays, and workflows.

npx playbooks add skill microck/ordinary-claude-skills --skill dask

Review the files below or copy the command above to add this skill to your agents.

Files (7)
SKILL.md
13.8 KB
---
name: dask
description: "Parallel/distributed computing. Scale pandas/NumPy beyond memory, parallel DataFrames/Arrays, multi-file processing, task graphs, for larger-than-RAM datasets and parallel workflows."
---

# Dask

## Overview

Dask is a Python library for parallel and distributed computing that enables three critical capabilities:
- **Larger-than-memory execution** on single machines for data exceeding available RAM
- **Parallel processing** for improved computational speed across multiple cores
- **Distributed computation** supporting terabyte-scale datasets across multiple machines

Dask scales from laptops (processing ~100 GiB) to clusters (processing ~100 TiB) while maintaining familiar Python APIs.

## When to Use This Skill

This skill should be used when:
- Process datasets that exceed available RAM
- Scale pandas or NumPy operations to larger datasets
- Parallelize computations for performance improvements
- Process multiple files efficiently (CSVs, Parquet, JSON, text logs)
- Build custom parallel workflows with task dependencies
- Distribute workloads across multiple cores or machines

## Core Capabilities

Dask provides five main components, each suited to different use cases:

### 1. DataFrames - Parallel Pandas Operations

**Purpose**: Scale pandas operations to larger datasets through parallel processing.

**When to Use**:
- Tabular data exceeds available RAM
- Need to process multiple CSV/Parquet files together
- Pandas operations are slow and need parallelization
- Scaling from pandas prototype to production

**Reference Documentation**: For comprehensive guidance on Dask DataFrames, refer to `references/dataframes.md` which includes:
- Reading data (single files, multiple files, glob patterns)
- Common operations (filtering, groupby, joins, aggregations)
- Custom operations with `map_partitions`
- Performance optimization tips
- Common patterns (ETL, time series, multi-file processing)

**Quick Example**:
```python
import dask.dataframe as dd

# Read multiple files as single DataFrame
ddf = dd.read_csv('data/2024-*.csv')

# Operations are lazy until compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
```

**Key Points**:
- Operations are lazy (build task graph) until `.compute()` called
- Use `map_partitions` for efficient custom operations
- Convert to DataFrame early when working with structured data from other sources

### 2. Arrays - Parallel NumPy Operations

**Purpose**: Extend NumPy capabilities to datasets larger than memory using blocked algorithms.

**When to Use**:
- Arrays exceed available RAM
- NumPy operations need parallelization
- Working with scientific datasets (HDF5, Zarr, NetCDF)
- Need parallel linear algebra or array operations

**Reference Documentation**: For comprehensive guidance on Dask Arrays, refer to `references/arrays.md` which includes:
- Creating arrays (from NumPy, random, from disk)
- Chunking strategies and optimization
- Common operations (arithmetic, reductions, linear algebra)
- Custom operations with `map_blocks`
- Integration with HDF5, Zarr, and XArray

**Quick Example**:
```python
import dask.array as da

# Create large array with chunks
x = da.random.random((100000, 100000), chunks=(10000, 10000))

# Operations are lazy
y = x + 100
z = y.mean(axis=0)

# Compute result
result = z.compute()
```

**Key Points**:
- Chunk size is critical (aim for ~100 MB per chunk)
- Operations work on chunks in parallel
- Rechunk data when needed for efficient operations
- Use `map_blocks` for operations not available in Dask

### 3. Bags - Parallel Processing of Unstructured Data

**Purpose**: Process unstructured or semi-structured data (text, JSON, logs) with functional operations.

**When to Use**:
- Processing text files, logs, or JSON records
- Data cleaning and ETL before structured analysis
- Working with Python objects that don't fit array/dataframe formats
- Need memory-efficient streaming processing

**Reference Documentation**: For comprehensive guidance on Dask Bags, refer to `references/bags.md` which includes:
- Reading text and JSON files
- Functional operations (map, filter, fold, groupby)
- Converting to DataFrames
- Common patterns (log analysis, JSON processing, text processing)
- Performance considerations

**Quick Example**:
```python
import dask.bag as db
import json

# Read and parse JSON files
bag = db.read_text('logs/*.json').map(json.loads)

# Filter and transform
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})

# Convert to DataFrame for analysis
ddf = processed.to_dataframe()
```

**Key Points**:
- Use for initial data cleaning, then convert to DataFrame/Array
- Use `foldby` instead of `groupby` for better performance
- Operations are streaming and memory-efficient
- Convert to structured formats (DataFrame) for complex operations

### 4. Futures - Task-Based Parallelization

**Purpose**: Build custom parallel workflows with fine-grained control over task execution and dependencies.

**When to Use**:
- Building dynamic, evolving workflows
- Need immediate task execution (not lazy)
- Computations depend on runtime conditions
- Implementing custom parallel algorithms
- Need stateful computations

**Reference Documentation**: For comprehensive guidance on Dask Futures, refer to `references/futures.md` which includes:
- Setting up distributed client
- Submitting tasks and working with futures
- Task dependencies and data movement
- Advanced coordination (queues, locks, events, actors)
- Common patterns (parameter sweeps, dynamic tasks, iterative algorithms)

**Quick Example**:
```python
from dask.distributed import Client

client = Client()  # Create local cluster

# Submit tasks (executes immediately)
def process(x):
    return x ** 2

futures = client.map(process, range(100))

# Gather results
results = client.gather(futures)

client.close()
```

**Key Points**:
- Requires distributed client (even for single machine)
- Tasks execute immediately when submitted
- Pre-scatter large data to avoid repeated transfers
- ~1ms overhead per task (not suitable for millions of tiny tasks)
- Use actors for stateful workflows

### 5. Schedulers - Execution Backends

**Purpose**: Control how and where Dask tasks execute (threads, processes, distributed).

**When to Choose Scheduler**:
- **Threads** (default): NumPy/Pandas operations, GIL-releasing libraries, shared memory benefit
- **Processes**: Pure Python code, text processing, GIL-bound operations
- **Synchronous**: Debugging with pdb, profiling, understanding errors
- **Distributed**: Need dashboard, multi-machine clusters, advanced features

**Reference Documentation**: For comprehensive guidance on Dask Schedulers, refer to `references/schedulers.md` which includes:
- Detailed scheduler descriptions and characteristics
- Configuration methods (global, context manager, per-compute)
- Performance considerations and overhead
- Common patterns and troubleshooting
- Thread configuration for optimal performance

**Quick Example**:
```python
import dask
import dask.dataframe as dd

# Use threads for DataFrame (default, good for numeric)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute()  # Uses threads

# Use processes for Python-heavy work
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')

# Use synchronous for debugging
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute()  # Can use pdb

# Use distributed for monitoring and scaling
from dask.distributed import Client
client = Client()
result4 = computation.compute()  # Uses distributed with dashboard
```

**Key Points**:
- Threads: Lowest overhead (~10 µs/task), best for numeric work
- Processes: Avoids GIL (~10 ms/task), best for Python work
- Distributed: Monitoring dashboard (~1 ms/task), scales to clusters
- Can switch schedulers per computation or globally

## Best Practices

For comprehensive performance optimization guidance, memory management strategies, and common pitfalls to avoid, refer to `references/best-practices.md`. Key principles include:

### Start with Simpler Solutions
Before using Dask, explore:
- Better algorithms
- Efficient file formats (Parquet instead of CSV)
- Compiled code (Numba, Cython)
- Data sampling

### Critical Performance Rules

**1. Don't Load Data Locally Then Hand to Dask**
```python
# Wrong: Loads all data in memory first
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)

# Correct: Let Dask handle loading
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
```

**2. Avoid Repeated compute() Calls**
```python
# Wrong: Each compute is separate
for item in items:
    result = dask_computation(item).compute()

# Correct: Single compute for all
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
```

**3. Don't Build Excessively Large Task Graphs**
- Increase chunk sizes if millions of tasks
- Use `map_partitions`/`map_blocks` to fuse operations
- Check task graph size: `len(ddf.__dask_graph__())`

**4. Choose Appropriate Chunk Sizes**
- Target: ~100 MB per chunk (or 10 chunks per core in worker memory)
- Too large: Memory overflow
- Too small: Scheduling overhead

**5. Use the Dashboard**
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link)  # Monitor performance, identify bottlenecks
```

## Common Workflow Patterns

### ETL Pipeline
```python
import dask.dataframe as dd

# Extract: Read data
ddf = dd.read_csv('raw_data/*.csv')

# Transform: Clean and process
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])

# Load: Aggregate and save
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
```

### Unstructured to Structured Pipeline
```python
import dask.bag as db
import json

# Start with Bag for unstructured data
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')

# Convert to DataFrame for structured analysis
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
```

### Large-Scale Array Computation
```python
import dask.array as da

# Load or create large array
x = da.from_zarr('large_dataset.zarr')

# Process in chunks
normalized = (x - x.mean()) / x.std()

# Save result
da.to_zarr(normalized, 'normalized.zarr')
```

### Custom Parallel Workflow
```python
from dask.distributed import Client

client = Client()

# Scatter large dataset once
data = client.scatter(large_dataset)

# Process in parallel with dependencies
futures = []
for param in parameters:
    future = client.submit(process, data, param)
    futures.append(future)

# Gather results
results = client.gather(futures)
```

## Selecting the Right Component

Use this decision guide to choose the appropriate Dask component:

**Data Type**:
- Tabular data → **DataFrames**
- Numeric arrays → **Arrays**
- Text/JSON/logs → **Bags** (then convert to DataFrame)
- Custom Python objects → **Bags** or **Futures**

**Operation Type**:
- Standard pandas operations → **DataFrames**
- Standard NumPy operations → **Arrays**
- Custom parallel tasks → **Futures**
- Text processing/ETL → **Bags**

**Control Level**:
- High-level, automatic → **DataFrames/Arrays**
- Low-level, manual → **Futures**

**Workflow Type**:
- Static computation graph → **DataFrames/Arrays/Bags**
- Dynamic, evolving → **Futures**

## Integration Considerations

### File Formats
- **Efficient**: Parquet, HDF5, Zarr (columnar, compressed, parallel-friendly)
- **Compatible but slower**: CSV (use for initial ingestion only)
- **For Arrays**: HDF5, Zarr, NetCDF

### Conversion Between Collections
```python
# Bag → DataFrame
ddf = bag.to_dataframe()

# DataFrame → Array (for numeric data)
arr = ddf.to_dask_array(lengths=True)

# Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
```

### With Other Libraries
- **XArray**: Wraps Dask arrays with labeled dimensions (geospatial, imaging)
- **Dask-ML**: Machine learning with scikit-learn compatible APIs
- **Distributed**: Advanced cluster management and monitoring

## Debugging and Development

### Iterative Development Workflow

1. **Test on small data with synchronous scheduler**:
```python
dask.config.set(scheduler='synchronous')
result = computation.compute()  # Can use pdb, easy debugging
```

2. **Validate with threads on sample**:
```python
sample = ddf.head(1000)  # Small sample
# Test logic, then scale to full dataset
```

3. **Scale with distributed for monitoring**:
```python
from dask.distributed import Client
client = Client()
print(client.dashboard_link)  # Monitor performance
result = computation.compute()
```

### Common Issues

**Memory Errors**:
- Decrease chunk sizes
- Use `persist()` strategically and delete when done
- Check for memory leaks in custom functions

**Slow Start**:
- Task graph too large (increase chunk sizes)
- Use `map_partitions` or `map_blocks` to reduce tasks

**Poor Parallelization**:
- Chunks too large (increase number of partitions)
- Using threads with Python code (switch to processes)
- Data dependencies preventing parallelism

## Reference Files

All reference documentation files can be read as needed for detailed information:

- `references/dataframes.md` - Complete Dask DataFrame guide
- `references/arrays.md` - Complete Dask Array guide
- `references/bags.md` - Complete Dask Bag guide
- `references/futures.md` - Complete Dask Futures and distributed computing guide
- `references/schedulers.md` - Complete scheduler selection and configuration guide
- `references/best-practices.md` - Comprehensive performance optimization and troubleshooting

Load these files when users need detailed information about specific Dask components, operations, or patterns beyond the quick guidance provided here.

Overview

This skill provides a concise guide to using Dask for parallel and distributed computing in Python. It explains when to scale pandas/NumPy beyond memory, how to process multi-file datasets in parallel, and how to build task-based workflows across cores or clusters. The focus is practical: DataFrame/Array/Bag APIs, Futures for custom tasks, and scheduler choices for performance.

How this skill works

Dask exposes familiar pandas and NumPy-style collections that build lazy task graphs. Operations are composed into tasks and executed when .compute() is called or when futures are submitted to a distributed client. Work is partitioned into chunks that run in parallel using thread, process, or distributed schedulers and can scale from a laptop to a multi-node cluster.

When to use it

  • When datasets exceed available RAM and need out-of-core processing
  • To parallelize slow pandas or NumPy workloads across cores or machines
  • For efficient multi-file ingestion (CSV, Parquet, JSON, logs)
  • When building custom parallel workflows with dependencies or state
  • To scale scientific array computations with HDF5/Zarr/NetCDF

Best practices

  • Let Dask read data directly (avoid loading into pandas first)
  • Aim for ~100 MB per chunk and roughly 10 chunks per core in memory
  • Minimize repeated compute() calls; compute many tasks together
  • Use map_partitions/map_blocks to fuse operations and shrink graphs
  • Use the distributed dashboard to monitor memory and task bottlenecks

Example use cases

  • ETL pipeline: read many CSVs with dask.dataframe, clean, aggregate, and write Parquet
  • Large array processing: load a Zarr dataset with dask.array, normalize, and save results
  • Log processing: parse JSON logs with dask.bag, filter, then convert to DataFrame for analysis
  • Parameter sweep: scatter large shared data and run client.submit for many parameter combinations
  • Prototype-to-production: start with pandas locally, then scale to Dask DataFrames for larger data

FAQ

When should I choose Dask DataFrame vs Dask Array vs Bag?

Use DataFrame for tabular data, Array for large numeric tensors and chunked linear algebra, and Bag for unstructured text/JSON or object streams you’ll later convert to structured formats.

How do I pick a scheduler?

Use threads for numeric libraries that release the GIL, processes for Python-heavy tasks, synchronous for debugging, and distributed for monitoring and multi-machine scaling.