home / skills / aj-geddes / useful-ai-prompts / ml-pipeline-automation

ml-pipeline-automation skill

/skills/ml-pipeline-automation

This skill helps you build end-to-end ML pipelines with automated data processing, training, validation, and deployment across Airflow, Kubeflow, and Jenkins.

npx playbooks add skill aj-geddes/useful-ai-prompts --skill ml-pipeline-automation

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
12.8 KB
---
name: ML Pipeline Automation
description: Build end-to-end ML pipelines with automated data processing, training, validation, and deployment using Airflow, Kubeflow, and Jenkins
---

# ML Pipeline Automation

ML pipeline automation orchestrates the entire machine learning workflow from data ingestion through model deployment, ensuring reproducibility, scalability, and reliability.

## Pipeline Components

- **Data Ingestion**: Collecting data from multiple sources
- **Data Processing**: Cleaning, transformation, feature engineering
- **Model Training**: Training and hyperparameter tuning
- **Validation**: Cross-validation and testing
- **Deployment**: Moving models to production
- **Monitoring**: Tracking performance metrics

## Orchestration Platforms

- **Apache Airflow**: Workflow scheduling with DAGs
- **Kubeflow**: Kubernetes-native ML workflows
- **Jenkins**: CI/CD for ML pipelines
- **Prefect**: Modern data flow orchestration
- **Dagster**: Asset-driven orchestration

## Python Implementation

```python
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
import logging
from datetime import datetime
import json
import os

# Airflow imports
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

# MLflow for tracking
import mlflow
import mlflow.sklearn

# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("=== 1. Modular Pipeline Functions ===")

# Data ingestion
def ingest_data(**context):
    """Ingest and load data"""
    logger.info("Starting data ingestion...")

    X, y = make_classification(n_samples=2000, n_features=30,
                              n_informative=20, random_state=42)
    data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
    data['target'] = y

    # Save to disk
    data_path = '/tmp/raw_data.csv'
    data.to_csv(data_path, index=False)

    context['task_instance'].xcom_push(key='data_path', value=data_path)
    logger.info(f"Data ingested: {len(data)} rows")
    return {'status': 'success', 'samples': len(data)}

# Data processing
def process_data(**context):
    """Clean and preprocess data"""
    logger.info("Starting data processing...")

    # Get data path from previous task
    task_instance = context['task_instance']
    data_path = task_instance.xcom_pull(key='data_path', task_ids='ingest_data')

    data = pd.read_csv(data_path)

    # Handle missing values
    data = data.fillna(data.mean())

    # Remove duplicates
    data = data.drop_duplicates()

    # Remove outliers (simple approach)
    numeric_cols = data.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        Q1 = data[col].quantile(0.25)
        Q3 = data[col].quantile(0.75)
        IQR = Q3 - Q1
        data = data[(data[col] >= Q1 - 1.5 * IQR) & (data[col] <= Q3 + 1.5 * IQR)]

    processed_path = '/tmp/processed_data.csv'
    data.to_csv(processed_path, index=False)

    task_instance.xcom_push(key='processed_path', value=processed_path)
    logger.info(f"Data processed: {len(data)} rows after cleaning")
    return {'status': 'success', 'rows_remaining': len(data)}

# Feature engineering
def engineer_features(**context):
    """Create new features"""
    logger.info("Starting feature engineering...")

    task_instance = context['task_instance']
    processed_path = task_instance.xcom_pull(key='processed_path', task_ids='process_data')

    data = pd.read_csv(processed_path)

    # Create interaction features
    feature_cols = [col for col in data.columns if col.startswith('feature_')]
    for i in range(min(5, len(feature_cols))):
        for j in range(i+1, min(6, len(feature_cols))):
            data[f'interaction_{i}_{j}'] = data[feature_cols[i]] * data[feature_cols[j]]

    # Create polynomial features
    for col in feature_cols[:5]:
        data[f'{col}_squared'] = data[col] ** 2

    engineered_path = '/tmp/engineered_data.csv'
    data.to_csv(engineered_path, index=False)

    task_instance.xcom_push(key='engineered_path', value=engineered_path)
    logger.info(f"Features engineered: {len(data.columns)} total features")
    return {'status': 'success', 'features': len(data.columns)}

# Train model
def train_model(**context):
    """Train ML model"""
    logger.info("Starting model training...")

    task_instance = context['task_instance']
    engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features')

    data = pd.read_csv(engineered_path)

    X = data.drop('target', axis=1)
    y = data['target']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Train model
    model = RandomForestClassifier(n_estimators=100, max_depth=15, random_state=42)
    model.fit(X_train_scaled, y_train)

    # Evaluate
    y_pred = model.predict(X_test_scaled)
    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    # Save model
    model_path = '/tmp/model.pkl'
    scaler_path = '/tmp/scaler.pkl'
    joblib.dump(model, model_path)
    joblib.dump(scaler, scaler_path)

    task_instance.xcom_push(key='model_path', value=model_path)
    task_instance.xcom_push(key='scaler_path', value=scaler_path)

    # Log to MLflow
    with mlflow.start_run():
        mlflow.log_param('n_estimators', 100)
        mlflow.log_param('max_depth', 15)
        mlflow.log_metric('accuracy', accuracy)
        mlflow.log_metric('f1_score', f1)
        mlflow.sklearn.log_model(model, 'model')

    logger.info(f"Model trained: Accuracy={accuracy:.4f}, F1={f1:.4f}")
    return {'status': 'success', 'accuracy': accuracy, 'f1_score': f1}

# Validate model
def validate_model(**context):
    """Validate model performance"""
    logger.info("Starting model validation...")

    task_instance = context['task_instance']
    model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model')
    engineered_path = task_instance.xcom_pull(key='engineered_path', task_ids='engineer_features')

    model = joblib.load(model_path)
    data = pd.read_csv(engineered_path)

    X = data.drop('target', axis=1)
    y = data['target']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model')
    scaler = joblib.load(scaler_path)
    X_test_scaled = scaler.transform(X_test)

    # Validate
    y_pred = model.predict(X_test_scaled)
    accuracy = accuracy_score(y_test, y_pred)

    validation_result = {
        'status': 'success' if accuracy > 0.85 else 'failed',
        'accuracy': accuracy,
        'threshold': 0.85,
        'timestamp': datetime.now().isoformat()
    }

    task_instance.xcom_push(key='validation_result', value=json.dumps(validation_result))

    logger.info(f"Validation result: {validation_result}")
    return validation_result

# Deploy model
def deploy_model(**context):
    """Deploy validated model"""
    logger.info("Starting model deployment...")

    task_instance = context['task_instance']
    validation_result = json.loads(task_instance.xcom_pull(
        key='validation_result', task_ids='validate_model'))

    if validation_result['status'] != 'success':
        logger.warning("Validation failed, deployment skipped")
        return {'status': 'skipped', 'reason': 'validation_failed'}

    model_path = task_instance.xcom_pull(key='model_path', task_ids='train_model')
    scaler_path = task_instance.xcom_pull(key='scaler_path', task_ids='train_model')

    # Simulate deployment
    deploy_path = '/tmp/deployed_model/'
    os.makedirs(deploy_path, exist_ok=True)

    import shutil
    shutil.copy(model_path, os.path.join(deploy_path, 'model.pkl'))
    shutil.copy(scaler_path, os.path.join(deploy_path, 'scaler.pkl'))

    logger.info(f"Model deployed to {deploy_path}")
    return {'status': 'success', 'deploy_path': deploy_path}

# 2. Airflow DAG Definition
print("\n=== 2. Airflow DAG ===")

dag_definition = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'ml-team',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'ml_pipeline_dag',
    default_args=default_args,
    description='End-to-end ML pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:

    # Task 1: Ingest Data
    ingest = PythonOperator(
        task_id='ingest_data',
        python_callable=ingest_data,
    )

    # Task 2: Process Data
    process = PythonOperator(
        task_id='process_data',
        python_callable=process_data,
    )

    # Task 3: Engineer Features
    engineer = PythonOperator(
        task_id='engineer_features',
        python_callable=engineer_features,
    )

    # Task 4: Train Model
    train = PythonOperator(
        task_id='train_model',
        python_callable=train_model,
    )

    # Task 5: Validate Model
    validate = PythonOperator(
        task_id='validate_model',
        python_callable=validate_model,
    )

    # Task 6: Deploy Model
    deploy = PythonOperator(
        task_id='deploy_model',
        python_callable=deploy_model,
    )

    # Define dependencies
    ingest >> process >> engineer >> train >> validate >> deploy
'''

print("Airflow DAG defined with 6 tasks")

# 3. Pipeline execution summary
print("\n=== 3. Pipeline Execution ===")

class PipelineOrchestrator:
    def __init__(self):
        self.execution_log = []
        self.start_time = None
        self.end_time = None

    def run_pipeline(self):
        self.start_time = datetime.now()
        logger.info("Starting ML pipeline execution")

        try:
            # Execute pipeline tasks
            result1 = ingest_data(task_instance=self)
            self.execution_log.append(('ingest_data', result1))

            result2 = process_data(task_instance=self)
            self.execution_log.append(('process_data', result2))

            result3 = engineer_features(task_instance=self)
            self.execution_log.append(('engineer_features', result3))

            result4 = train_model(task_instance=self)
            self.execution_log.append(('train_model', result4))

            result5 = validate_model(task_instance=self)
            self.execution_log.append(('validate_model', result5))

            result6 = deploy_model(task_instance=self)
            self.execution_log.append(('deploy_model', result6))

            self.end_time = datetime.now()
            logger.info("Pipeline execution completed successfully")

        except Exception as e:
            logger.error(f"Pipeline execution failed: {str(e)}")

    def xcom_push(self, key, value):
        if not hasattr(self, 'xcom_storage'):
            self.xcom_storage = {}
        self.xcom_storage[key] = value

    def xcom_pull(self, key, task_ids):
        if hasattr(self, 'xcom_storage') and key in self.xcom_storage:
            return self.xcom_storage[key]
        return None

    def get_summary(self):
        duration = (self.end_time - self.start_time).total_seconds() if self.end_time else 0
        return {
            'start_time': self.start_time.isoformat() if self.start_time else None,
            'end_time': self.end_time.isoformat() if self.end_time else None,
            'duration_seconds': duration,
            'tasks_executed': len(self.execution_log),
            'execution_log': self.execution_log
        }

# Execute pipeline
orchestrator = PipelineOrchestrator()
orchestrator.run_pipeline()
summary = orchestrator.get_summary()

print("\n=== Pipeline Summary ===")
for key, value in summary.items():
    if key != 'execution_log':
        print(f"{key}: {value}")

print("\nTask Execution Log:")
for task_name, result in summary['execution_log']:
    print(f"  {task_name}: {result}")

print("\nML pipeline automation setup completed!")
```

## Pipeline Best Practices

- **Modularity**: Each step should be independent
- **Idempotency**: Tasks should be safely repeatable
- **Error Handling**: Graceful degradation and alerting
- **Versioning**: Track data, code, and model versions
- **Monitoring**: Track execution metrics and logs

## Scheduling Strategies

- **Daily**: Standard for daily retraining
- **Weekly**: For larger feature engineering
- **On-demand**: Triggered by data updates
- **Real-time**: For streaming applications

## Deliverables

- Automated pipeline DAG
- Task dependency graph
- Execution logs and monitoring
- Performance metrics
- Rollback procedures
- Documentation

Overview

This skill automates end-to-end ML workflows: data ingestion, processing, feature engineering, training, validation, and deployment. It provides implementations and orchestration examples using Airflow, Kubeflow, and Jenkins to ensure reproducibility, scalability, and repeatable delivery of models to production.

How this skill works

The skill defines modular pipeline tasks that ingest synthetic or real data, clean and engineer features, train and validate models, and deploy validated artifacts. Tasks exchange metadata via XCom-like stores, log metrics to MLflow, and are assembled into DAGs for scheduled or on-demand execution on Airflow, Kubeflow pipelines, or CI/CD tools like Jenkins.

When to use it

  • When you need a repeatable production pipeline for training and deploying models
  • When teams require scheduled retraining, validation, and automated deployment
  • When you want to integrate ML tracking (MLflow) and model artifact versioning
  • For workflows that must run on Kubernetes or be orchestrated with Airflow or Jenkins
  • When you need end-to-end visibility: logs, metrics, and execution summaries

Best practices

  • Keep tasks modular and idempotent so each step can be retried safely
  • Version data, code, and models; log experiments and parameters with MLflow
  • Add robust error handling, alerts, and graceful fallback paths
  • Use small, testable components locally before wiring into production DAGs
  • Monitor pipeline health, model drift, and runtime metrics continuously

Example use cases

  • Daily retraining pipeline for a classification model with scheduled Airflow DAG
  • On-demand model rebuild triggered by new data arrival via Jenkins or webhook
  • Kubeflow pipeline for scalable training and hyperparameter sweeps on Kubernetes
  • CI/CD flow to validate model quality thresholds before automated deployment
  • Batch feature engineering and preprocessing with automated cleanup and artifact storage

FAQ

Can I run the pipeline locally before deploying to Airflow or Kubeflow?

Yes. Each task is modular and can be executed sequentially in a local orchestrator or unit tests to validate behavior before production deployment.

How are model quality gates enforced before deployment?

Validation tasks compute metrics and compare them to thresholds; deployment tasks read validation results and skip deployment if quality gates fail.