home / skills / aj-geddes / useful-ai-prompts / model-deployment

model-deployment skill

/skills/model-deployment

This is most likely a fork of the model-deployment skill from benchflow-ai
npx playbooks add skill aj-geddes/useful-ai-prompts --skill model-deployment

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
10.5 KB
---
name: Model Deployment
description: Deploy machine learning models to production using Flask, FastAPI, Docker, cloud platforms (AWS, GCP, Azure), and model serving frameworks
---

# Model Deployment

## Overview

Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems.

## When to Use

- When productionizing trained models for real-world inference and predictions
- When building REST APIs or web services for model serving
- When scaling predictions to serve multiple users or applications
- When deploying models to cloud platforms, edge devices, or containers
- When implementing CI/CD pipelines for ML model updates
- When creating batch processing systems for large-scale predictions

## Deployment Approaches

- **REST APIs**: Flask, FastAPI for synchronous inference
- **Batch Processing**: Scheduled jobs for large-scale predictions
- **Real-time Streaming**: Kafka, Spark Streaming for continuous data
- **Serverless**: AWS Lambda, Google Cloud Functions
- **Edge Deployment**: TensorFlow Lite, ONNX for edge devices
- **Model Serving**: TensorFlow Serving, Seldon Core, BentoML

## Key Considerations

- **Model Format**: Pickle, SavedModel, ONNX, PMML
- **Scalability**: Load balancing, auto-scaling
- **Latency**: Response time requirements
- **Monitoring**: Model drift, performance metrics
- **Versioning**: Multiple model versions in production

## Python Implementation

```python
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib

# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn

# For model serving
import mlflow.pyfunc
import mlflow.sklearn

# Docker and deployment
import logging
import time
from typing import List, Dict

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("=== 1. Train and Save Model ===")

# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)

# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'

joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)

print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")

# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")

class ModelPredictor:
    def __init__(self, model_path, scaler_path):
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path)
        self.load_time = time.time()
        self.predictions_count = 0
        logger.info("Model loaded successfully")

    def predict(self, features: List[List[float]]) -> Dict:
        try:
            X = np.array(features)
            X_scaled = self.scaler.transform(X)
            predictions = self.model.predict(X_scaled)
            probabilities = self.model.predict_proba(X_scaled)

            self.predictions_count += len(X)

            return {
                'predictions': predictions.tolist(),
                'probabilities': probabilities.tolist(),
                'count': len(X),
                'timestamp': time.time()
            }
        except Exception as e:
            logger.error(f"Prediction error: {str(e)}")
            raise

    def health_check(self) -> Dict:
        return {
            'status': 'healthy',
            'uptime': time.time() - self.load_time,
            'predictions': self.predictions_count
        }

# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)

# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")

app = FastAPI(
    title="ML Model API",
    description="Production ML model serving API",
    version="1.0.0"
)

class PredictionRequest(BaseModel):
    features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]
    count: int
    timestamp: float

class HealthResponse(BaseModel):
    status: str
    uptime: float
    predictions: int

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Health check endpoint"""
    return predictor.health_check()

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make predictions"""
    try:
        result = predictor.predict(request.features)
        return result
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
    """Batch prediction with background processing"""
    all_features = []
    for req in requests:
        all_features.extend(req.features)

    result = predictor.predict(all_features)
    background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
    return result

@app.get("/stats")
async def get_stats():
    """Get model statistics"""
    return {
        'model_type': type(predictor.model).__name__,
        'n_estimators': predictor.model.n_estimators,
        'max_depth': predictor.model.max_depth,
        'feature_importance': predictor.model.feature_importances_.tolist(),
        'total_predictions': predictor.predictions_count
    }

# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")

dockerfile_content = '''FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY model.pkl .
COPY scaler.pkl .
COPY app.py .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''

print("Dockerfile content:")
print(dockerfile_content)

# 5. Requirements file
print("\n=== 5. Requirements.txt ===")

requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""

print("Requirements:")
print(requirements)

# 6. Docker Compose for deployment
print("\n=== 6. Docker Compose Template ===")

docker_compose = '''version: '3.8'

services:
  ml-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=info
      - WORKERS=4
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  ml-monitor:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"

  ml-dashboard:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''

print("Docker Compose content:")
print(docker_compose)

# 7. Testing the API
print("\n=== 7. Testing the API ===")

def test_predictor():
    # Test single prediction
    test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
                     1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]

    result = predictor.predict(test_features)
    print(f"Prediction result: {result}")

    # Health check
    health = predictor.health_check()
    print(f"Health status: {health}")

    # Batch predictions
    batch_features = [
        [1.0] * 20,
        [2.0] * 20,
        [3.0] * 20,
    ]
    batch_result = predictor.predict(batch_features)
    print(f"Batch prediction: {batch_result['count']} samples processed")

test_predictor()

# 8. Model versioning and registry
print("\n=== 8. Model Registry with MLflow ===")

# Log model to MLflow
with mlflow.start_run():
    mlflow.sklearn.log_model(model, "model")
    mlflow.log_param("max_depth", 10)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("accuracy", 0.95)

    model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
    print(f"Model logged to MLflow: {model_uri}")

# 9. Deployment monitoring code
print("\n=== 9. Monitoring Setup ===")

class ModelMonitor:
    def __init__(self):
        self.predictions = []
        self.latencies = []

    def log_prediction(self, features, prediction, latency):
        self.predictions.append({
            'timestamp': time.time(),
            'features_mean': np.mean(features),
            'prediction': prediction,
            'latency_ms': latency * 1000
        })

    def check_model_drift(self):
        if len(self.predictions) < 100:
            return {'drift_detected': False}

        recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
        historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
        recent_mean = np.mean(recent_predictions)

        drift = abs(recent_mean - historical_mean) > 0.1

        return {
            'drift_detected': drift,
            'historical_mean': float(historical_mean),
            'recent_mean': float(recent_mean),
            'threshold': 0.1
        }

    def get_stats(self):
        if not self.latencies:
            return {}

        return {
            'avg_latency_ms': np.mean(self.latencies) * 1000,
            'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
            'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
            'total_predictions': len(self.predictions)
        }

monitor = ModelMonitor()

print("\nDeployment setup completed!")
print("To run FastAPI server: uvicorn app:app --reload")
```

## Deployment Checklist

- Model format and serialization
- Input/output validation
- Error handling and logging
- Authentication and security
- Rate limiting and throttling
- Health check endpoints
- Monitoring and alerting
- Version management
- Rollback procedures

## Cloud Deployment Options

- **AWS**: SageMaker, Lambda, EC2
- **GCP**: Vertex AI, Cloud Run, App Engine
- **Azure**: Machine Learning, App Service
- **Kubernetes**: Self-managed on-premises

## Performance Optimization

- Model quantization for smaller size
- Caching predictions
- Batch processing
- GPU acceleration
- Request pooling

## Deliverables

- Deployed model endpoint
- API documentation
- Docker configuration
- Monitoring dashboard
- Deployment guide
- Performance benchmarks
- Scaling recommendations