home / skills / amnadtaowsoam / cerebraskills / multi-cloud-iot

multi-cloud-iot skill

/76-iot-infrastructure/multi-cloud-iot

This skill helps design and optimize multi-cloud IoT infrastructures for redundancy, compliance, and cost efficiency across providers.

npx playbooks add skill amnadtaowsoam/cerebraskills --skill multi-cloud-iot

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
40.4 KB
---
name: Multi-Cloud IoT Strategy
description: Designing and implementing multi-cloud IoT infrastructure for redundancy, compliance, and optimization
---

# Multi-Cloud IoT Strategy

## Current Level: Expert (Enterprise Scale)

## Domain: IoT Infrastructure
## Skill ID: 89

---

## Executive Summary

Multi-Cloud IoT Strategy enables deployment of IoT infrastructure across multiple cloud providers to achieve redundancy, compliance with data residency requirements, cost optimization, and avoidance of vendor lock-in. This approach is essential for enterprise IoT deployments requiring high availability, geographic distribution, and regulatory compliance.

### Strategic Necessity

- **Redundancy**: Eliminate single points of failure
- **Compliance**: Meet data residency requirements
- **Cost Optimization**: Leverage competitive pricing
- **Vendor Independence**: Avoid lock-in
- **Performance**: Deploy closer to users/devices

---

## Technical Deep Dive

### Multi-Cloud Architecture

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    Multi-Cloud IoT Architecture                            │
│                                                                              │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐                  │
│  │   AWS       │    │   Azure     │    │   GCP       │                  │
│  │   Region    │    │   Region    │    │   Region    │                  │
│  └──────┬───────┘    └──────┬───────┘    └──────┬───────┘                  │
│         │                   │                   │                           │
│         ▼                   ▼                   ▼                           │
│  ┌──────────────────────────────────────────────────────────────────────┐   │
│  │                    Multi-Cloud Layer                             │   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │   │
│  │  │  IoT     │  │  Data    │  │  Compute │  │  Storage │            │   │
│  │  │  Core    │  │  Lake    │  │  Layer   │  │  Layer   │            │   │
│  │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘            │   │
│  └───────┼────────────┼────────────┼────────────┼────────────────────┘   │
│          │            │            │            │                         │
│          ▼            ▼            ▼            ▼                         │
│  ┌──────────────────────────────────────────────────────────────────────┐   │
│  │                    Orchestration Layer                             │   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │   │
│  │  │  Service │  │  API     │  │  Event   │  │  Config  │            │   │
│  │  │  Mesh    │  │  Gateway │  │  Bus     │  │  Mgmt    │            │   │
│  │  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘            │   │
│  └───────┼────────────┼────────────┼────────────┼────────────────────┘   │
│          │            │            │            │                         │
│          ▼            ▼            ▼            ▼                         │
│  ┌──────────────────────────────────────────────────────────────────────┐   │
│  │                    Edge & Device Layer                            │   │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐            │   │
│  │  │  Edge    │  │  Edge    │  │  IoT     │  │  IoT     │            │   │
│  │  │  Servers │  │  Gateways│  │  Devices │  │  Sensors │            │   │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘            │   │
│  └──────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────┘
```

### Multi-Cloud Provider Abstraction

```python
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import logging

logger = logging.getLogger(__name__)

class CloudProvider(Enum):
    """Cloud providers"""
    AWS = "aws"
    AZURE = "azure"
    GCP = "gcp"

class ServiceType(Enum):
    """Service types"""
    IOT_CORE = "iot_core"
    DATA_LAKE = "data_lake"
    COMPUTE = "compute"
    STORAGE = "storage"
    DATABASE = "database"
    MESSAGING = "messaging"
    ANALYTICS = "analytics"

@dataclass
class CloudService:
    """Cloud service configuration"""
    provider: CloudProvider
    service_type: ServiceType
    region: str
    configuration: Dict[str, Any]
    cost_estimate: float

class MultiCloudOrchestrator:
    """Multi-cloud orchestrator for IoT infrastructure"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.providers = self._initialize_providers()
        self.service_registry = {}
        self.cost_tracker = CostTracker(config['cost'])
        self.compliance_manager = ComplianceManager(config['compliance'])
        
    def _initialize_providers(self) -> Dict[CloudProvider, Any]:
        """Initialize cloud provider clients"""
        providers = {}
        
        for provider_config in self.config['providers']:
            provider = CloudProvider(provider_config['name'])
            
            if provider == CloudProvider.AWS:
                providers[provider] = AWSProvider(provider_config)
            elif provider == CloudProvider.AZURE:
                providers[provider] = AzureProvider(provider_config)
            elif provider == CloudProvider.GCP:
                providers[provider] = GCPProvider(provider_config)
            else:
                raise ValueError(f"Unknown provider: {provider}")
        
        return providers
    
    async def deploy_service(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any],
        preferred_providers: Optional[List[CloudProvider]] = None
    ) -> CloudService:
        """Deploy service to optimal cloud provider"""
        logger.info(f"Deploying {service_type.value} service...")
        
        # Determine optimal provider
        provider = await self._select_provider(
            service_type,
            configuration,
            preferred_providers
        )
        
        # Deploy service
        logger.info(f"Deploying to {provider.value}...")
        service = await self.providers[provider].deploy_service(
            service_type,
            configuration
        )
        
        # Register service
        self.service_registry[service.service_id] = service
        
        # Track cost
        await self.cost_tracker.register_service(service)
        
        # Check compliance
        await self.compliance_manager.verify_compliance(service)
        
        logger.info(f"Service deployed: {service.service_id}")
        
        return service
    
    async def _select_provider(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any],
        preferred_providers: Optional[List[CloudProvider]]
    ) -> CloudProvider:
        """Select optimal provider for service"""
        candidates = preferred_providers or list(self.providers.keys())
        
        # Score each provider
        scores = {}
        for provider in candidates:
            score = await self._score_provider(
                provider,
                service_type,
                configuration
            )
            scores[provider] = score
        
        # Select highest scoring provider
        optimal_provider = max(scores, key=scores.get)
        
        logger.info(
            f"Selected {optimal_provider.value} with score {scores[optimal_provider]}"
        )
        
        return optimal_provider
    
    async def _score_provider(
        self,
        provider: CloudProvider,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> float:
        """Score provider for service deployment"""
        score = 0.0
        
        # Cost score (30%)
        cost_score = await self._get_cost_score(
            provider,
            service_type,
            configuration
        )
        score += 0.3 * cost_score
        
        # Performance score (25%)
        perf_score = await self._get_performance_score(
            provider,
            service_type,
            configuration
        )
        score += 0.25 * perf_score
        
        # Compliance score (25%)
        compliance_score = await self._get_compliance_score(
            provider,
            service_type,
            configuration
        )
        score += 0.25 * compliance_score
        
        # Reliability score (20%)
        reliability_score = await self._get_reliability_score(provider)
        score += 0.2 * reliability_score
        
        return score
    
    async def _get_cost_score(
        self,
        provider: CloudProvider,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> float:
        """Get cost score for provider"""
        # Get cost estimate
        cost = await self.providers[provider].estimate_cost(
            service_type,
            configuration
        )
        
        # Normalize score (lower cost = higher score)
        max_cost = 10000.0  # Normalization factor
        score = 1.0 - (cost / max_cost)
        
        return max(score, 0.0)
    
    async def _get_performance_score(
        self,
        provider: CloudProvider,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> float:
        """Get performance score for provider"""
        # Get latency and throughput metrics
        metrics = await self.providers[provider].get_performance_metrics(
            service_type
        )
        
        # Calculate score based on latency
        latency_score = 1.0 - (metrics['latency_ms'] / 1000.0)
        
        # Calculate score based on throughput
        throughput_score = min(metrics['throughput_mbps'] / 1000.0, 1.0)
        
        # Average scores
        score = (latency_score + throughput_score) / 2.0
        
        return score
    
    async def _get_compliance_score(
        self,
        provider: CloudProvider,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> float:
        """Get compliance score for provider"""
        # Check data residency requirements
        if 'data_residency' in configuration:
            required_region = configuration['data_residency']
            provider_regions = await self.providers[provider].get_regions()
            
            if required_region in provider_regions:
                return 1.0
            else:
                return 0.0
        
        # Check certifications
        certifications = await self.providers[provider].get_certifications()
        required_certs = self.config['compliance']['required_certifications']
        
        matching_certs = set(certifications) & set(required_certs)
        score = len(matching_certs) / len(required_certs)
        
        return score
    
    async def _get_reliability_score(self, provider: CloudProvider) -> float:
        """Get reliability score for provider"""
        # Get uptime SLA
        sla = await self.providers[provider].get_sla()
        
        # Convert to score (99.9% = 0.999)
        score = sla / 100.0
        
        return score
    
    async def migrate_service(
        self,
        service_id: str,
        target_provider: CloudProvider
    ) -> CloudService:
        """Migrate service to different provider"""
        logger.info(f"Migrating service {service_id} to {target_provider.value}...")
        
        if service_id not in self.service_registry:
            raise ValueError(f"Service not found: {service_id}")
        
        source_service = self.service_registry[service_id]
        source_provider = source_service.provider
        
        # Step 1: Create service on target provider
        logger.info("Creating service on target provider...")
        target_service = await self.providers[target_provider].deploy_service(
            source_service.service_type,
            source_service.configuration
        )
        
        # Step 2: Migrate data
        logger.info("Migrating data...")
        await self._migrate_data(source_service, target_service)
        
        # Step 3: Switch traffic
        logger.info("Switching traffic...")
        await self._switch_traffic(source_service, target_service)
        
        # Step 4: Verify migration
        logger.info("Verifying migration...")
        await self._verify_migration(target_service)
        
        # Step 5: Cleanup source
        logger.info("Cleaning up source service...")
        await self.providers[source_provider].delete_service(service_id)
        
        # Update registry
        del self.service_registry[service_id]
        self.service_registry[target_service.service_id] = target_service
        
        logger.info(f"Migration completed: {service_id} -> {target_service.service_id}")
        
        return target_service
    
    async def _migrate_data(
        self,
        source_service: CloudService,
        target_service: CloudService
    ):
        """Migrate data between providers"""
        # Implementation depends on service type
        if source_service.service_type == ServiceType.STORAGE:
            await self._migrate_storage_data(source_service, target_service)
        elif source_service.service_type == ServiceType.DATABASE:
            await self._migrate_database_data(source_service, target_service)
        elif source_service.service_type == ServiceType.DATA_LAKE:
            await self._migrate_datalake_data(source_service, target_service)
    
    async def _switch_traffic(
        self,
        source_service: CloudService,
        target_service: CloudService
    ):
        """Switch traffic from source to target"""
        # Update DNS records
        # Update load balancer configuration
        # Update service mesh routes
        pass
    
    async def _verify_migration(
        self,
        service: CloudService
    ):
        """Verify migration was successful"""
        # Check service health
        # Verify data integrity
        # Test functionality
        pass

class AWSProvider:
    """AWS provider implementation"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.client = self._initialize_client()
    
    def _initialize_client(self):
        """Initialize AWS client"""
        import boto3
        return boto3.client('iot-data', region_name=self.config['region'])
    
    async def deploy_service(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> CloudService:
        """Deploy service on AWS"""
        if service_type == ServiceType.IOT_CORE:
            return await self._deploy_iot_core(configuration)
        elif service_type == ServiceType.DATA_LAKE:
            return await self._deploy_data_lake(configuration)
        elif service_type == ServiceType.COMPUTE:
            return await self._deploy_compute(configuration)
        else:
            raise ValueError(f"Unsupported service type: {service_type}")
    
    async def _deploy_iot_core(
        self,
        configuration: Dict[str, Any]
    ) -> CloudService:
        """Deploy AWS IoT Core"""
        # Create IoT thing
        # Create certificates
        # Create policies
        # Create rules
        pass
    
    async def estimate_cost(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> float:
        """Estimate cost for service"""
        # Use AWS pricing API or cost calculator
        return 100.0  # Placeholder
    
    async def get_performance_metrics(
        self,
        service_type: ServiceType
    ) -> Dict[str, float]:
        """Get performance metrics"""
        return {
            'latency_ms': 50.0,
            'throughput_mbps': 1000.0
        }
    
    async def get_regions(self) -> List[str]:
        """Get available regions"""
        return [
            'us-east-1',
            'us-west-2',
            'eu-west-1',
            'ap-southeast-1'
        ]
    
    async def get_certifications(self) -> List[str]:
        """Get provider certifications"""
        return [
            'SOC 2 Type II',
            'ISO 27001',
            'GDPR',
            'HIPAA'
        ]
    
    async def get_sla(self) -> float:
        """Get service level agreement"""
        return 99.9  # 99.9% uptime

class AzureProvider:
    """Azure provider implementation"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.client = self._initialize_client()
    
    def _initialize_client(self):
        """Initialize Azure client"""
        from azure.iot.hub import IoTHubRegistryManager
        return IoTHubRegistryManager.from_connection_string(
            self.config['connection_string']
        )
    
    async def deploy_service(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> CloudService:
        """Deploy service on Azure"""
        if service_type == ServiceType.IOT_CORE:
            return await self._deploy_iot_hub(configuration)
        elif service_type == ServiceType.DATA_LAKE:
            return await self._deploy_data_lake(configuration)
        else:
            raise ValueError(f"Unsupported service type: {service_type}")
    
    async def _deploy_iot_hub(
        self,
        configuration: Dict[str, Any]
    ) -> CloudService:
        """Deploy Azure IoT Hub"""
        # Create IoT Hub
        # Create device identities
        # Create endpoints
        pass
    
    async def estimate_cost(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> float:
        """Estimate cost for service"""
        # Use Azure pricing calculator
        return 120.0  # Placeholder
    
    async def get_performance_metrics(
        self,
        service_type: ServiceType
    ) -> Dict[str, float]:
        """Get performance metrics"""
        return {
            'latency_ms': 60.0,
            'throughput_mbps': 900.0
        }
    
    async def get_regions(self) -> List[str]:
        """Get available regions"""
        return [
            'eastus',
            'westus2',
            'westeurope',
            'southeastasia'
        ]
    
    async def get_certifications(self) -> List[str]:
        """Get provider certifications"""
        return [
            'SOC 2 Type II',
            'ISO 27001',
            'GDPR',
            'HIPAA'
        ]
    
    async def get_sla(self) -> float:
        """Get service level agreement"""
        return 99.95  # 99.95% uptime

class GCPProvider:
    """GCP provider implementation"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.client = self._initialize_client()
    
    def _initialize_client(self):
        """Initialize GCP client"""
        from google.cloud import iot_v1
        return iot_v1.DeviceManagerClient()
    
    async def deploy_service(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> CloudService:
        """Deploy service on GCP"""
        if service_type == ServiceType.IOT_CORE:
            return await self._deploy_iot_core(configuration)
        elif service_type == ServiceType.DATA_LAKE:
            return await self._deploy_data_lake(configuration)
        else:
            raise ValueError(f"Unsupported service type: {service_type}")
    
    async def _deploy_iot_core(
        self,
        configuration: Dict[str, Any]
    ) -> CloudService:
        """Deploy GCP IoT Core"""
        # Create device registry
        # Create devices
        # Create gateways
        pass
    
    async def estimate_cost(
        self,
        service_type: ServiceType,
        configuration: Dict[str, Any]
    ) -> float:
        """Estimate cost for service"""
        # Use GCP pricing calculator
        return 110.0  # Placeholder
    
    async def get_performance_metrics(
        self,
        service_type: ServiceType
    ) -> Dict[str, float]:
        """Get performance metrics"""
        return {
            'latency_ms': 55.0,
            'throughput_mbps': 950.0
        }
    
    async def get_regions(self) -> List[str]:
        """Get available regions"""
        return [
            'us-central1',
            'us-west1',
            'europe-west1',
            'asia-southeast1'
        ]
    
    async def get_certifications(self) -> List[str]:
        """Get provider certifications"""
        return [
            'SOC 2 Type II',
            'ISO 27001',
            'GDPR',
            'HIPAA'
        ]
    
    async def get_sla(self) -> float:
        """Get service level agreement"""
        return 99.95  # 99.95% uptime

class CostTracker:
    """Cost tracker for multi-cloud infrastructure"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.service_costs = {}
    
    async def register_service(self, service: CloudService):
        """Register service for cost tracking"""
        self.service_costs[service.service_id] = {
            'service': service,
            'estimated_cost': service.cost_estimate,
            'actual_cost': 0.0,
            'registered_at': asyncio.get_event_loop().time()
        }
    
    async def get_total_cost(self) -> float:
        """Get total cost across all providers"""
        return sum(
            cost_info['actual_cost'] or cost_info['estimated_cost']
            for cost_info in self.service_costs.values()
        )
    
    async def get_cost_by_provider(self) -> Dict[CloudProvider, float]:
        """Get cost breakdown by provider"""
        provider_costs = {}
        
        for cost_info in self.service_costs.values():
            provider = cost_info['service'].provider
            cost = cost_info['actual_cost'] or cost_info['estimated_cost']
            
            if provider not in provider_costs:
                provider_costs[provider] = 0.0
            
            provider_costs[provider] += cost
        
        return provider_costs

class ComplianceManager:
    """Compliance manager for multi-cloud infrastructure"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.compliance_rules = config['rules']
    
    async def verify_compliance(self, service: CloudService):
        """Verify service compliance"""
        # Check data residency
        # Check certifications
        # Check security requirements
        pass
```

### Cross-Cloud Data Replication

```python
class DataReplicationManager:
    """Cross-cloud data replication manager"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.providers = self._initialize_providers()
        
    async def setup_replication(
        self,
        source: CloudService,
        destination: CloudService,
        replication_policy: Dict[str, Any]
    ):
        """Setup data replication between providers"""
        logger.info(
            f"Setting up replication from {source.provider.value} "
            f"to {destination.provider.value}"
        )
        
        # Step 1: Create replication pipeline
        pipeline = await self._create_replication_pipeline(
            source,
            destination,
            replication_policy
        )
        
        # Step 2: Configure change data capture
        await self._configure_cdc(source, pipeline)
        
        # Step 3: Setup conflict resolution
        await self._setup_conflict_resolution(
            source,
            destination,
            replication_policy
        )
        
        # Step 4: Monitor replication
        await self._monitor_replication(pipeline)
        
        logger.info("Replication setup completed")
    
    async def _create_replication_pipeline(
        self,
        source: CloudService,
        destination: CloudService,
        policy: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Create replication pipeline"""
        # Create data pipeline
        # Configure transformations
        # Setup error handling
        pass
    
    async def _configure_cdc(
        self,
        source: CloudService,
        pipeline: Dict[str, Any]
    ):
        """Configure change data capture"""
        # Setup CDC on source
        # Configure event streaming
        # Connect to pipeline
        pass
    
    async def _setup_conflict_resolution(
        self,
        source: CloudService,
        destination: CloudService,
        policy: Dict[str, Any]
    ):
        """Setup conflict resolution strategy"""
        # Define conflict detection
        # Configure resolution rules
        # Setup notification system
        pass
    
    async def _monitor_replication(self, pipeline: Dict[str, Any]):
        """Monitor replication pipeline"""
        # Track replication lag
        # Monitor error rates
        # Alert on failures
        pass
```

---

## Tooling & Tech Stack

### Multi-Cloud Tools
- **Terraform**: Multi-cloud IaC
- **Pulumi**: Multi-cloud IaC
- **Crossplane**: Multi-cloud control plane
- **Karmada**: Multi-cluster orchestration

### Service Mesh
- **Istio**: Service mesh
- **Linkerd**: Lightweight service mesh
- **Consul Connect**: Service mesh
- **AWS App Mesh**: AWS service mesh

### Data Replication
- **Apache Kafka**: Event streaming
- **Apache Pulsar**: Event streaming
- **AWS DMS**: Database migration
- **Azure Data Factory**: Data integration

### Monitoring
- **Prometheus**: Metrics collection
- **Grafana**: Visualization
- **Jaeger**: Distributed tracing
- **ELK Stack**: Logging

---

## Configuration Essentials

### Multi-Cloud Configuration

```yaml
# config/multi-cloud.yaml
providers:
  - name: "aws"
    type: "aws"
    region: "us-west-2"
    credentials:
      access_key_id: "${AWS_ACCESS_KEY_ID}"
      secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
    enabled: true
    priority: 1
  
  - name: "azure"
    type: "azure"
    region: "westus2"
    credentials:
      tenant_id: "${AZURE_TENANT_ID}"
      client_id: "${AZURE_CLIENT_ID}"
      client_secret: "${AZURE_CLIENT_SECRET}"
    enabled: true
    priority: 2
  
  - name: "gcp"
    type: "gcp"
    region: "us-central1"
    credentials:
      project_id: "${GCP_PROJECT_ID}"
      key_file: "${GCP_KEY_FILE}"
    enabled: true
    priority: 3

services:
  iot_core:
    type: "iot_core"
    redundancy: 2  # Deploy to 2 providers
    data_residency: "us-west-2"
    preferred_providers: ["aws", "azure"]
  
  data_lake:
    type: "data_lake"
    redundancy: 3  # Deploy to 3 providers
    data_residency: "us-west-2"
    preferred_providers: ["aws", "azure", "gcp"]
  
  compute:
    type: "compute"
    redundancy: 2
    data_residency: null
    preferred_providers: ["aws", "gcp"]

compliance:
  required_certifications:
    - "SOC 2 Type II"
    - "ISO 27001"
    - "GDPR"
  
  data_residency:
    us: ["us-west-2", "westus2", "us-central1"]
    eu: ["eu-west-1", "westeurope", "europe-west1"]
    asia: ["ap-southeast-1", "southeastasia", "asia-southeast1"]

cost:
  budget: 10000  # Monthly budget in USD
  alert_threshold: 0.8  # Alert at 80% of budget
  optimization_enabled: true

monitoring:
  enabled: true
  metrics_interval: 60  # seconds
  alert_channels:
    - type: "slack"
      webhook: "${SLACK_WEBHOOK_URL}"
    - type: "email"
      recipients: ["[email protected]"]
```

---

## Code Examples

### Good: Complete Multi-Cloud Deployment

```python
# multi_cloud/deploy.py
import asyncio
import logging
from typing import Dict, Any

from multi_cloud.orchestrator import MultiCloudOrchestrator
from multi_cloud.providers import CloudProvider, ServiceType

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def deploy_multi_cloud_iot():
    """Deploy IoT infrastructure across multiple clouds"""
    logger.info("=" * 60)
    logger.info("Multi-Cloud IoT Deployment")
    logger.info("=" * 60)
    
    # Load configuration
    config = load_config('config/multi-cloud.yaml')
    
    # Create orchestrator
    orchestrator = MultiCloudOrchestrator(config)
    
    # Deploy IoT Core services
    logger.info("\n" + "=" * 60)
    logger.info("Deploying IoT Core Services")
    logger.info("=" * 60)
    
    iot_config = {
        'device_count': 10000,
        'message_rate': 100,  # messages per second
        'retention_days': 30,
        'data_residency': 'us-west-2'
    }
    
    iot_services = []
    for i in range(config['services']['iot_core']['redundancy']):
        service = await orchestrator.deploy_service(
            ServiceType.IOT_CORE,
            iot_config,
            config['services']['iot_core']['preferred_providers']
        )
        iot_services.append(service)
        logger.info(f"Deployed IoT Core service: {service.service_id}")
    
    # Deploy Data Lake services
    logger.info("\n" + "=" * 60)
    logger.info("Deploying Data Lake Services")
    logger.info("=" * 60)
    
    datalake_config = {
        'storage_size_tb': 100,
        'ingestion_rate_mbps': 1000,
        'retention_days': 365,
        'data_residency': 'us-west-2'
    }
    
    datalake_services = []
    for i in range(config['services']['data_lake']['redundancy']):
        service = await orchestrator.deploy_service(
            ServiceType.DATA_LAKE,
            datalake_config,
            config['services']['data_lake']['preferred_providers']
        )
        datalake_services.append(service)
        logger.info(f"Deployed Data Lake service: {service.service_id}")
    
    # Deploy Compute services
    logger.info("\n" + "=" * 60)
    logger.info("Deploying Compute Services")
    logger.info("=" * 60)
    
    compute_config = {
        'instance_type': 't3.large',
        'instance_count': 10,
        'auto_scaling': True,
        'min_instances': 5,
        'max_instances': 20
    }
    
    compute_services = []
    for i in range(config['services']['compute']['redundancy']):
        service = await orchestrator.deploy_service(
            ServiceType.COMPUTE,
            compute_config,
            config['services']['compute']['preferred_providers']
        )
        compute_services.append(service)
        logger.info(f"Deployed Compute service: {service.service_id}")
    
    # Setup cross-cloud replication
    logger.info("\n" + "=" * 60)
    logger.info("Setting Up Cross-Cloud Replication")
    logger.info("=" * 60)
    
    replication_manager = DataReplicationManager(config)
    
    for i, source_service in enumerate(datalake_services):
        target_service = datalake_services[(i + 1) % len(datalake_services)]
        
        await replication_manager.setup_replication(
            source_service,
            target_service,
            {
                'mode': 'active-active',
                'conflict_resolution': 'last-write-wins',
                'latency_threshold_ms': 1000
            }
        )
        
        logger.info(
            f"Setup replication: {source_service.provider.value} -> "
            f"{target_service.provider.value}"
        )
    
    # Print summary
    logger.info("\n" + "=" * 60)
    logger.info("Deployment Summary")
    logger.info("=" * 60)
    logger.info(f"IoT Core Services: {len(iot_services)}")
    logger.info(f"Data Lake Services: {len(datalake_services)}")
    logger.info(f"Compute Services: {len(compute_services)}")
    
    # Get cost estimate
    total_cost = await orchestrator.cost_tracker.get_total_cost()
    cost_by_provider = await orchestrator.cost_tracker.get_cost_by_provider()
    
    logger.info(f"\nTotal Estimated Cost: ${total_cost:.2f}/month")
    logger.info("Cost by Provider:")
    for provider, cost in cost_by_provider.items():
        logger.info(f"  {provider.value}: ${cost:.2f}/month")

def load_config(filename: str) -> Dict[str, Any]:
    """Load configuration from file"""
    import yaml
    with open(filename, 'r') as f:
        return yaml.safe_load(f)

async def main():
    """Main entry point"""
    await deploy_multi_cloud_iot()

if __name__ == "__main__":
    asyncio.run(main())
```

### Bad: Anti-pattern Example

```python
# BAD: No provider abstraction
async def bad_deployment():
    # Hardcoded to AWS
    deploy_aws_iot_core()

# BAD: No cost optimization
async def bad_deployment():
    # Deploy to most expensive provider
    deploy_to_azure()

# BAD: No redundancy
async def bad_deployment():
    # Single point of failure
    deploy_to_single_provider()

# BAD: No compliance checking
async def bad_deployment():
    # Ignore data residency
    deploy_to_any_provider()

# BAD: No monitoring
async def bad_deployment():
    # Deploy without monitoring
    deploy_service()
```

---

## Standards, Compliance & Security

### Industry Standards
- **Multi-Cloud Best Practices**: Multi-cloud deployment patterns
- **Data Residency**: Data sovereignty requirements
- **Security Standards**: CIS benchmarks
- **Compliance**: SOC 2, ISO 27001, GDPR

### Security Best Practices
- **Encryption**: End-to-end encryption
- **Identity Management**: Centralized IAM
- **Network Security**: Multi-cloud networking
- **Audit Logging**: Cross-cloud logging

### Compliance Requirements
- **Data Residency**: Meet regional requirements
- **Certifications**: Maintain all required certifications
- **Audit Trail**: Complete audit log
- **Data Protection**: GDPR/HIPAA compliance

---

## Quick Start

### 1. Install Dependencies

```bash
pip install boto3 azure-iot-hub google-cloud-iot
pip install terraform pulumi
```

### 2. Configure Providers

```yaml
# config/providers.yaml
aws:
  region: us-west-2
  access_key_id: ${AWS_ACCESS_KEY_ID}
  secret_access_key: ${AWS_SECRET_ACCESS_KEY}

azure:
  region: westus2
  tenant_id: ${AZURE_TENANT_ID}
  client_id: ${AZURE_CLIENT_ID}
  client_secret: ${AZURE_CLIENT_SECRET}

gcp:
  region: us-central1
  project_id: ${GCP_PROJECT_ID}
  key_file: ${GCP_KEY_FILE}
```

### 3. Deploy Services

```python
from multi_cloud.orchestrator import MultiCloudOrchestrator

config = load_config('config/providers.yaml')
orchestrator = MultiCloudOrchestrator(config)

service = await orchestrator.deploy_service(
    ServiceType.IOT_CORE,
    {'device_count': 1000}
)
```

### 4. Monitor Deployment

```bash
# Check service status
python -m multi_cloud.status

# View cost breakdown
python -m multi_cloud.costs
```

---

## Production Checklist

### Multi-Cloud Setup
- [ ] All providers configured
- [ ] Credentials secured
- [ ] Network connectivity established
- [ ] DNS configured
- [ ] Load balancing configured

### Service Deployment
- [ ] Services deployed to multiple providers
- [ ] Replication configured
- [ ] Failover tested
- [ ] Monitoring enabled
- [ ] Alerting configured

### Compliance
- [ ] Data residency verified
- [ ] Certifications maintained
- [ ] Audit logging enabled
- [ ] Security scan passed
- [ ] Documentation complete

### Operations
- [ ] Runbooks documented
- [ ] Team trained
- [ ] Support plan in place
- [ ] Incident response ready
- [ ] Disaster recovery tested

---

## Anti-patterns

### ❌ Avoid These Practices

1. **No Provider Abstraction**
   ```python
   # BAD: No abstraction
   deploy_aws_iot_core()
   ```

2. **No Cost Optimization**
   ```python
   # BAD: No cost optimization
   deploy_to_most_expensive_provider()
   ```

3. **No Redundancy**
   ```python
   # BAD: No redundancy
   deploy_to_single_provider()
   ```

4. **No Compliance Checking**
   ```python
   # BAD: No compliance checking
   deploy_to_any_provider()
   ```

5. **No Monitoring**
   ```python
   # BAD: No monitoring
   deploy_service()
   ```

### ✅ Follow These Practices

1. **Provider Abstraction**
   ```python
   # GOOD: Provider abstraction
   orchestrator = MultiCloudOrchestrator(config)
   service = await orchestrator.deploy_service(type, config)
   ```

2. **Cost Optimization**
   ```python
   # GOOD: Cost optimization
   provider = await select_cheapest_provider(type, config)
   ```

3. **Redundancy**
   ```python
   # GOOD: Redundancy
   for provider in providers:
       await deploy_service(type, config, provider)
   ```

4. **Compliance Checking**
   ```python
   # GOOD: Compliance checking
   if verify_compliance(provider, requirements):
       await deploy_service(type, config, provider)
   ```

5. **Monitoring**
   ```python
   # GOOD: Monitoring
   await deploy_service(type, config)
   await monitor_service(service)
   ```

---

## Unit Economics & KPIs

### Development Costs
- **Initial Setup**: 80-120 hours
- **Provider Integration**: 60-100 hours
- **Testing & Validation**: 40-60 hours
- **Total**: 180-280 hours

### Operational Costs
- **Multi-Cloud Infrastructure**: $1000-5000/month
- **Data Transfer**: $200-1000/month
- **Monitoring**: $100-300/month
- **Support**: 20-40 hours/month

### ROI Metrics
- **Cost Savings**: 20-40% vs single cloud
- **Availability**: 99.99% vs 99.9%
- **Vendor Lock-in**: Eliminated
- **Compliance**: 100% met

### KPI Targets
- **Deployment Time**: < 30 minutes
- **Failover Time**: < 5 minutes
- **Cost Variance**: < 10%
- **Compliance**: 100%
- **Availability**: > 99.99%

---

## Integration Points / Related Skills

### Upstream Skills
- **86. Advanced IaC IoT**: Infrastructure provisioning
- **87. Chaos Engineering IoT**: Resilience testing
- **88. GitOps IoT Infrastructure**: GitOps implementation

### Parallel Skills
- **90. Disaster Recovery IoT**: DR planning
- **73. Differential OTA Updates**: OTA deployment
- **74. Atomic AB Partitioning**: Firmware updates
- **75. Fleet Campaign Management**: Fleet management

### Downstream Skills
- **14. Monitoring and Observability**: Metrics and tracing
- **24. Security Practices**: Infrastructure security
- **81. SaaS FinOps Pricing**: Cost optimization
- **84. Compliance AI Governance**: Compliance

### Cross-Domain Skills
- **15. DevOps Infrastructure**: CI/CD pipelines
- **59. Architecture Decision**: Architecture decisions
- **64. Meta Standards**: Coding standards
- **72. Metacognitive Skill Architect**: System design

---

## References & Resources

### Documentation
- [AWS IoT Documentation](https://docs.aws.amazon.com/iot/)
- [Azure IoT Documentation](https://docs.microsoft.com/en-us/azure/iot-fundamentals/)
- [Google Cloud IoT Documentation](https://cloud.google.com/iot/docs)
- [Terraform Multi-Cloud](https://www.terraform.io/docs/cloud/)

### Best Practices
- [Multi-Cloud Best Practices](https://www.gartner.com/en/information-technology/insights/multicloud-strategies)
- [AWS Multi-Cloud](https://aws.amazon.com/multi-cloud/)
- [Azure Multi-Cloud](https://azure.microsoft.com/en-us/solutions/multi-cloud/)

### Tools & Libraries
- [Terraform](https://www.terraform.io/)
- [Pulumi](https://www.pulumi.com/)
- [Crossplane](https://crossplane.io/)
- [Karmada](https://karmada.io/)

Overview

This skill designs and implements multi-cloud IoT infrastructure for enterprise-scale deployments to achieve redundancy, regulatory compliance, cost optimization, and vendor independence. It provides patterns, orchestration logic, and migration routines that place IoT core, data, compute, and edge services across AWS, Azure, and GCP. The focus is practical: deploy, score, migrate, and verify services across providers while tracking cost and compliance.

How this skill works

The orchestrator abstracts cloud providers and scores them by cost, performance, compliance, and reliability to pick an optimal target for each service. It automates deployment, registers services, tracks costs, verifies compliance, and supports live migrations with data transfer and traffic switching. Scoring and selection are configurable, enabling policy-driven placement (data residency, certifications, SLAs) and preferred-provider overrides.

When to use it

  • Enterprise IoT rollouts requiring geographic redundancy and high availability
  • When data residency or regulatory certifications require region-specific placement
  • To avoid vendor lock-in and enable negotiation-driven cost savings
  • When you need to route workloads to the cloud with best latency or throughput
  • During cloud exit, consolidation, or proactive disaster recovery planning

Best practices

  • Define clear placement policies: residency, certifications, cost thresholds, and SLAs
  • Instrument providers with consistent metrics and realistic cost models before scoring
  • Start with hybrid proofs-of-concept: core services in two clouds before scaling
  • Automate testing: health checks, traffic switch drills, and rollback plans
  • Treat data migrations as first-class: plan transfer windows, encryption, and validation

Example use cases

  • Deploy IoT telemetry ingestion in regional clouds to meet local data residency rules
  • Automatically place analytics pipelines on the provider with the lowest projected cost
  • Migrate an edge-related database from one cloud to another with minimal downtime
  • Configure service mesh and API gateway across clouds for consistent device connectivity
  • Run failover drills that switch traffic between providers to validate continuity

FAQ

How does provider scoring handle conflicting requirements?

Scoring is weighted by policy; you prioritize compliance, cost, or performance and the orchestrator returns the highest-scoring provider given those weights. Use preferred provider lists to enforce hard constraints.

What is required to migrate stateful services with minimal downtime?

Plan phased migrations: provision target service, replicate data continuously, perform cutover with DNS/load-balancer updates, verify health, then decommission source. Automate verification and rollback steps.