home / skills / benchflow-ai / skillsbench / model-deployment

model-deployment skill

/registry/terminal_bench_2.0/full_batch_reviewed/terminal_bench_2_0_hf-model-inference/environment/skills/model-deployment

This skill helps you deploy machine learning models to production with robust APIs, scalable serving, and cloud integration for reliable inference.

npx playbooks add skill benchflow-ai/skillsbench --skill model-deployment

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
10.5 KB
---
name: Model Deployment
description: Deploy machine learning models to production using Flask, FastAPI, Docker, cloud platforms (AWS, GCP, Azure), and model serving frameworks
---

# Model Deployment

## Overview

Model deployment is the process of taking a trained machine learning model and making it available for production use through APIs, web services, or batch processing systems.

## When to Use

- When productionizing trained models for real-world inference and predictions
- When building REST APIs or web services for model serving
- When scaling predictions to serve multiple users or applications
- When deploying models to cloud platforms, edge devices, or containers
- When implementing CI/CD pipelines for ML model updates
- When creating batch processing systems for large-scale predictions

## Deployment Approaches

- **REST APIs**: Flask, FastAPI for synchronous inference
- **Batch Processing**: Scheduled jobs for large-scale predictions
- **Real-time Streaming**: Kafka, Spark Streaming for continuous data
- **Serverless**: AWS Lambda, Google Cloud Functions
- **Edge Deployment**: TensorFlow Lite, ONNX for edge devices
- **Model Serving**: TensorFlow Serving, Seldon Core, BentoML

## Key Considerations

- **Model Format**: Pickle, SavedModel, ONNX, PMML
- **Scalability**: Load balancing, auto-scaling
- **Latency**: Response time requirements
- **Monitoring**: Model drift, performance metrics
- **Versioning**: Multiple model versions in production

## Python Implementation

```python
import numpy as np
import pandas as pd
import pickle
import json
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_classification
import joblib

# FastAPI for REST API
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
import uvicorn

# For model serving
import mlflow.pyfunc
import mlflow.sklearn

# Docker and deployment
import logging
import time
from typing import List, Dict

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("=== 1. Train and Save Model ===")

# Create dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train model
model = RandomForestClassifier(n_estimators=100, max_depth=10, random_state=42)
model.fit(X_scaled, y)

# Save model and preprocessing
model_path = '/tmp/model.pkl'
scaler_path = '/tmp/scaler.pkl'

joblib.dump(model, model_path)
joblib.dump(scaler, scaler_path)

print(f"Model saved to {model_path}")
print(f"Scaler saved to {scaler_path}")

# 2. Model Serving Class
print("\n=== 2. Model Serving Class ===")

class ModelPredictor:
    def __init__(self, model_path, scaler_path):
        self.model = joblib.load(model_path)
        self.scaler = joblib.load(scaler_path)
        self.load_time = time.time()
        self.predictions_count = 0
        logger.info("Model loaded successfully")

    def predict(self, features: List[List[float]]) -> Dict:
        try:
            X = np.array(features)
            X_scaled = self.scaler.transform(X)
            predictions = self.model.predict(X_scaled)
            probabilities = self.model.predict_proba(X_scaled)

            self.predictions_count += len(X)

            return {
                'predictions': predictions.tolist(),
                'probabilities': probabilities.tolist(),
                'count': len(X),
                'timestamp': time.time()
            }
        except Exception as e:
            logger.error(f"Prediction error: {str(e)}")
            raise

    def health_check(self) -> Dict:
        return {
            'status': 'healthy',
            'uptime': time.time() - self.load_time,
            'predictions': self.predictions_count
        }

# Initialize predictor
predictor = ModelPredictor(model_path, scaler_path)

# 3. FastAPI Application
print("\n=== 3. FastAPI Application ===")

app = FastAPI(
    title="ML Model API",
    description="Production ML model serving API",
    version="1.0.0"
)

class PredictionRequest(BaseModel):
    features: List[List[float]] = Field(..., example=[[1.0, 2.0, 3.0]])

class PredictionResponse(BaseModel):
    predictions: List[int]
    probabilities: List[List[float]]
    count: int
    timestamp: float

class HealthResponse(BaseModel):
    status: str
    uptime: float
    predictions: int

@app.get("/health", response_model=HealthResponse)
async def health_check():
    """Health check endpoint"""
    return predictor.health_check()

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make predictions"""
    try:
        result = predictor.predict(request.features)
        return result
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/predict-batch")
async def predict_batch(requests: List[PredictionRequest], background_tasks: BackgroundTasks):
    """Batch prediction with background processing"""
    all_features = []
    for req in requests:
        all_features.extend(req.features)

    result = predictor.predict(all_features)
    background_tasks.add_task(logger.info, f"Batch prediction processed: {result['count']} samples")
    return result

@app.get("/stats")
async def get_stats():
    """Get model statistics"""
    return {
        'model_type': type(predictor.model).__name__,
        'n_estimators': predictor.model.n_estimators,
        'max_depth': predictor.model.max_depth,
        'feature_importance': predictor.model.feature_importances_.tolist(),
        'total_predictions': predictor.predictions_count
    }

# 4. Dockerfile template
print("\n=== 4. Dockerfile Template ===")

dockerfile_content = '''FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY model.pkl .
COPY scaler.pkl .
COPY app.py .

EXPOSE 8000

CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]
'''

print("Dockerfile content:")
print(dockerfile_content)

# 5. Requirements file
print("\n=== 5. Requirements.txt ===")

requirements = """fastapi==0.104.1
uvicorn[standard]==0.24.0
numpy==1.24.0
pandas==2.1.0
scikit-learn==1.3.2
joblib==1.3.2
pydantic==2.5.0
mlflow==2.8.1
"""

print("Requirements:")
print(requirements)

# 6. Docker Compose for deployment
print("\n=== 6. Docker Compose Template ===")

docker_compose = '''version: '3.8'

services:
  ml-api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - LOG_LEVEL=info
      - WORKERS=4
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  ml-monitor:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    command:
      - "--config.file=/etc/prometheus/prometheus.yml"

  ml-dashboard:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    volumes:
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards
'''

print("Docker Compose content:")
print(docker_compose)

# 7. Testing the API
print("\n=== 7. Testing the API ===")

def test_predictor():
    # Test single prediction
    test_features = [[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0,
                     1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, 9.1, 10.1]]

    result = predictor.predict(test_features)
    print(f"Prediction result: {result}")

    # Health check
    health = predictor.health_check()
    print(f"Health status: {health}")

    # Batch predictions
    batch_features = [
        [1.0] * 20,
        [2.0] * 20,
        [3.0] * 20,
    ]
    batch_result = predictor.predict(batch_features)
    print(f"Batch prediction: {batch_result['count']} samples processed")

test_predictor()

# 8. Model versioning and registry
print("\n=== 8. Model Registry with MLflow ===")

# Log model to MLflow
with mlflow.start_run():
    mlflow.sklearn.log_model(model, "model")
    mlflow.log_param("max_depth", 10)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_metric("accuracy", 0.95)

    model_uri = "runs:/" + mlflow.active_run().info.run_id + "/model"
    print(f"Model logged to MLflow: {model_uri}")

# 9. Deployment monitoring code
print("\n=== 9. Monitoring Setup ===")

class ModelMonitor:
    def __init__(self):
        self.predictions = []
        self.latencies = []

    def log_prediction(self, features, prediction, latency):
        self.predictions.append({
            'timestamp': time.time(),
            'features_mean': np.mean(features),
            'prediction': prediction,
            'latency_ms': latency * 1000
        })

    def check_model_drift(self):
        if len(self.predictions) < 100:
            return {'drift_detected': False}

        recent_predictions = [p['prediction'] for p in self.predictions[-100:]]
        historical_mean = np.mean([p['prediction'] for p in self.predictions[:-100]])
        recent_mean = np.mean(recent_predictions)

        drift = abs(recent_mean - historical_mean) > 0.1

        return {
            'drift_detected': drift,
            'historical_mean': float(historical_mean),
            'recent_mean': float(recent_mean),
            'threshold': 0.1
        }

    def get_stats(self):
        if not self.latencies:
            return {}

        return {
            'avg_latency_ms': np.mean(self.latencies) * 1000,
            'p95_latency_ms': np.percentile(self.latencies, 95) * 1000,
            'p99_latency_ms': np.percentile(self.latencies, 99) * 1000,
            'total_predictions': len(self.predictions)
        }

monitor = ModelMonitor()

print("\nDeployment setup completed!")
print("To run FastAPI server: uvicorn app:app --reload")
```

## Deployment Checklist

- Model format and serialization
- Input/output validation
- Error handling and logging
- Authentication and security
- Rate limiting and throttling
- Health check endpoints
- Monitoring and alerting
- Version management
- Rollback procedures

## Cloud Deployment Options

- **AWS**: SageMaker, Lambda, EC2
- **GCP**: Vertex AI, Cloud Run, App Engine
- **Azure**: Machine Learning, App Service
- **Kubernetes**: Self-managed on-premises

## Performance Optimization

- Model quantization for smaller size
- Caching predictions
- Batch processing
- GPU acceleration
- Request pooling

## Deliverables

- Deployed model endpoint
- API documentation
- Docker configuration
- Monitoring dashboard
- Deployment guide
- Performance benchmarks
- Scaling recommendations

Overview

This skill packages and deploys trained machine learning models to production using frameworks like Flask and FastAPI, containerization with Docker, and cloud platforms (AWS, GCP, Azure). It includes model serving patterns, monitoring, versioning, and templates for CI/CD and container orchestration. The outcome is a reliable, testable endpoint with observability and rollback support.

How this skill works

The skill trains or loads a serialized model, wraps it in a lightweight inference service (FastAPI/Flask) with input validation and health endpoints, and provides Docker and Docker Compose templates to containerize the service. It integrates model serving tools (MLflow, TensorFlow Serving, BentoML), monitoring hooks, and deployment patterns for cloud services or Kubernetes, plus scripts for testing, logging, and basic model registry operations.

When to use it

  • You need a production API for synchronous inference or batch endpoints
  • To containerize and run models on cloud VMs, managed services, or Kubernetes
  • When implementing CI/CD and model versioning for repeatable deployments
  • To add monitoring, health checks, and drift detection in production
  • When optimizing latency, scaling, or GPU acceleration for inference

Best practices

  • Serialize models with portable formats (SavedModel, ONNX, joblib) and store preprocessing artifacts with the model
  • Expose health and metrics endpoints and instrument latencies, request counts, and prediction distributions
  • Validate inputs and return clear error codes; add rate limiting and authentication for public endpoints
  • Use container images and Composer/Kubernetes manifests for reproducible deployments and horizontal scaling
  • Implement model versioning and a rollback plan; log model metadata to a registry (MLflow or similar)

Example use cases

  • Expose a scikit-learn classifier as a REST API with FastAPI and Docker for web applications
  • Deploy a TensorFlow model to TensorFlow Serving behind a load balancer for real-time inference
  • Create scheduled batch prediction jobs for large datasets using containerized workers
  • Push model versions to MLflow registry and automate production rollout via CI/CD pipelines
  • Run model endpoints on Cloud Run, SageMaker, or Vertex AI with Prometheus/Grafana monitoring

FAQ

How do I choose between FastAPI and Flask?

Use FastAPI for type-driven validation, async support, and higher throughput; Flask is fine for simple synchronous services or when migrating legacy code.

What model format should I use for portability?

SavedModel or ONNX are best for portability across runtimes; joblib/pickle is fine for internal Python-only deployments but less portable and riskier for security.