home / skills / doanchienthangdev / omgkit / data-eng

This skill helps you build reliable ML data pipelines by designing collection, validation, processing, and storage strategies for quality data.

npx playbooks add skill doanchienthangdev/omgkit --skill data-eng

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
4.0 KB
---
name: data-engineering
description: ML data engineering covering data pipelines, data quality, collection strategies, storage, and versioning for machine learning systems.
---

# Data Engineering for ML

Building robust data infrastructure for ML systems.

## Data Pipeline Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                   ML DATA PIPELINE                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  COLLECTION  →  VALIDATION  →  PROCESSING  →  STORAGE       │
│     ↓              ↓              ↓              ↓          │
│  Sources      Schema Check    Transform      Data Lake      │
│  APIs         Quality Check   Normalize      Feature Store  │
│  DBs          Statistics      Encode         Model Registry │
│                                                              │
└─────────────────────────────────────────────────────────────┘
```

## Data Collection

```python
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class DataSource:
    name: str
    type: str  # database, api, file, stream
    connection: Dict

class DataCollector:
    def __init__(self, sources: List[DataSource]):
        self.sources = sources

    def collect(self, source_name: str) -> pd.DataFrame:
        source = next(s for s in self.sources if s.name == source_name)

        if source.type == "database":
            return pd.read_sql(source.connection["query"],
                             source.connection["conn"])
        elif source.type == "api":
            response = requests.get(source.connection["url"])
            return pd.DataFrame(response.json())
        elif source.type == "file":
            return pd.read_parquet(source.connection["path"])
```

## Data Quality

```python
import great_expectations as ge

def validate_data(df: pd.DataFrame, expectations_path: str) -> bool:
    ge_df = ge.from_pandas(df)

    # Schema validation
    assert ge_df.expect_column_to_exist("user_id").success
    assert ge_df.expect_column_values_to_not_be_null("user_id").success
    assert ge_df.expect_column_values_to_be_unique("user_id").success

    # Value validation
    assert ge_df.expect_column_values_to_be_between(
        "age", min_value=0, max_value=150
    ).success

    # Statistical validation
    assert ge_df.expect_column_mean_to_be_between(
        "purchase_amount", min_value=0, max_value=10000
    ).success

    return True
```

## Data Versioning

```python
# DVC for data versioning
# dvc init
# dvc add data/processed/

import dvc.api

# Load specific version
data_url = dvc.api.get_url(
    path='data/processed/train.parquet',
    repo='https://github.com/org/repo',
    rev='v1.2.0'
)

# Track changes
def version_data(data_path: str, message: str):
    import subprocess
    subprocess.run(["dvc", "add", data_path])
    subprocess.run(["git", "add", f"{data_path}.dvc"])
    subprocess.run(["git", "commit", "-m", message])
    subprocess.run(["dvc", "push"])
```

## Data Storage Patterns

| Pattern | Use Case | Technology |
|---------|----------|------------|
| Data Lake | Raw storage | S3, GCS, ADLS |
| Data Warehouse | Analytics | Snowflake, BigQuery |
| Feature Store | ML features | Feast, Tecton |
| Vector Store | Embeddings | Pinecone, Weaviate |

## Commands
- `/omgdata:collect` - Data collection
- `/omgdata:validate` - Data validation
- `/omgdata:version` - Version data

## Best Practices

1. Validate data at every stage
2. Version all data assets
3. Document data schemas
4. Monitor data quality metrics
5. Implement data lineage tracking

Overview

This skill covers ML data engineering focused on building reliable data pipelines, enforcing data quality, designing collection strategies, and managing storage and versioning for machine learning systems. It explains practical patterns for ingesting, validating, transforming, and persisting data for production ML. The material targets practitioners who need reproducible data flows and robust feature delivery for models.

How this skill works

The skill inspects and outlines each stage of an ML data pipeline: collection, validation, processing, and storage. It describes concrete components such as source connectors (databases, APIs, files, streams), schema and statistical checks, transformation steps, and persistent stores like data lakes, warehouses, feature stores, and vector databases. It also covers data versioning workflows using DVC and Git to ensure reproducibility and traceability.

When to use it

  • When you need repeatable, auditable data ingestion for training and inference.
  • When data quality issues are causing model drift or production failures.
  • When multiple teams must share consistent feature definitions and datasets.
  • When you need to version, roll back, or reproduce training datasets.
  • When designing storage strategy to balance cost, latency, and analytical needs.

Best practices

  • Validate data at every pipeline stage with schema, null checks, and statistical tests.
  • Version all significant data artifacts and track changes with DVC + Git.
  • Document schemas and feature contracts so consumers agree on expectations.
  • Monitor data quality metrics and set alerts for anomalies or distribution shifts.
  • Use appropriate storage patterns: data lake for raw, warehouse for analytics, feature store for ML features.

Example use cases

  • Ingesting user clickstream from API and stream sources, validating schema, and storing raw parquet in S3.
  • Running nightly transforms to normalize features, then registering them in a feature store for model training.
  • Using DVC to pin a training dataset version so experiments are reproducible across environments.
  • Validating incoming customer records with Great Expectations checks before they reach the model pipeline.
  • Serving embeddings to a vector store (Pinecone/Weaviate) for low-latency similarity search in production.

FAQ

How should I choose between a data lake and a data warehouse?

Use a data lake for cost-effective raw storage and flexible schema-on-read; use a warehouse for structured analytics, high-performance SQL queries, and governed reporting.

What core checks should I include in automated validation?

Include presence and uniqueness of keys, null checks, range/value constraints, and basic statistical checks (means, distributions) to catch drift and breaking changes.