home / skills / personamanagmentlayer / pcl / iot-expert

iot-expert skill

/stdlib/domains/iot-expert

This skill provides expert guidance on IoT systems, edge computing, and protocols to design secure, scalable, and maintainable deployments.

npx playbooks add skill personamanagmentlayer/pcl --skill iot-expert

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
11.8 KB
---
name: iot-expert
version: 1.0.0
description: Expert-level IoT systems, embedded devices, edge computing, and IoT protocols
category: domains
tags: [iot, embedded, edge-computing, mqtt, sensors, firmware]
allowed-tools:
  - Read
  - Write
  - Edit
---

# IoT Expert

Expert guidance for IoT systems, embedded devices, edge computing, sensor networks, and IoT protocols.

## Core Concepts

### IoT Architecture
- Device layer (sensors, actuators)
- Edge computing layer
- Network layer (connectivity)
- Cloud/platform layer
- Application layer
- Security across all layers

### IoT Protocols
- MQTT (Message Queuing Telemetry Transport)
- CoAP (Constrained Application Protocol)
- HTTP/REST for IoT
- WebSocket for real-time
- LoRaWAN for long-range
- Zigbee, Z-Wave for home automation

### Embedded Systems
- Microcontroller programming
- Real-time operating systems (RTOS)
- Power management
- Firmware updates (OTA)
- Hardware interfaces (I2C, SPI, UART)
- Memory constraints

## MQTT Implementation

```python
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from typing import Callable, Dict

class MQTTClient:
    def __init__(self, broker: str, port: int = 1883, client_id: str = "iot_device"):
        self.broker = broker
        self.port = port
        self.client = mqtt.Client(client_id)
        self.subscriptions: Dict[str, Callable] = {}

        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect

    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print(f"Connected to MQTT broker at {self.broker}:{self.port}")
            # Resubscribe to topics on reconnect
            for topic in self.subscriptions.keys():
                self.client.subscribe(topic)
        else:
            print(f"Connection failed with code {rc}")

    def _on_message(self, client, userdata, msg):
        topic = msg.topic
        payload = msg.payload.decode()

        if topic in self.subscriptions:
            try:
                data = json.loads(payload)
                self.subscriptions[topic](data)
            except json.JSONDecodeError:
                self.subscriptions[topic](payload)

    def _on_disconnect(self, client, userdata, rc):
        if rc != 0:
            print(f"Unexpected disconnect. Reconnecting...")

    def connect(self, username: str = None, password: str = None):
        if username and password:
            self.client.username_pw_set(username, password)

        self.client.connect(self.broker, self.port, 60)
        self.client.loop_start()

    def publish(self, topic: str, payload: Dict, qos: int = 1, retain: bool = False):
        """Publish message to MQTT topic"""
        message = json.dumps(payload)
        result = self.client.publish(topic, message, qos=qos, retain=retain)
        return result.rc == mqtt.MQTT_ERR_SUCCESS

    def subscribe(self, topic: str, callback: Callable, qos: int = 1):
        """Subscribe to MQTT topic with callback"""
        self.subscriptions[topic] = callback
        self.client.subscribe(topic, qos=qos)

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

# IoT Device Example
class TemperatureSensor:
    def __init__(self, device_id: str, mqtt_client: MQTTClient):
        self.device_id = device_id
        self.mqtt = mqtt_client
        self.topic = f"sensors/temperature/{device_id}"

    def read_temperature(self) -> float:
        # In real device, read from actual sensor
        import random
        return round(random.uniform(20.0, 30.0), 2)

    def publish_reading(self):
        temperature = self.read_temperature()

        payload = {
            "device_id": self.device_id,
            "temperature": temperature,
            "unit": "celsius",
            "timestamp": datetime.utcnow().isoformat()
        }

        self.mqtt.publish(self.topic, payload)
        return payload
```

## Embedded C for Microcontroller

```c
// Arduino/ESP32 Example
#include <WiFi.h>
#include <PubSubClient.h>
#include "DHT.h"

#define DHTPIN 4
#define DHTTYPE DHT22

const char* ssid = "YourWiFiSSID";
const char* password = "YourPassword";
const char* mqtt_server = "broker.example.com";

WiFiClient espClient;
PubSubClient client(espClient);
DHT dht(DHTPIN, DHTTYPE);

void setup_wifi() {
    delay(10);
    Serial.println("Connecting to WiFi...");

    WiFi.begin(ssid, password);

    while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
    }

    Serial.println("WiFi connected");
    Serial.println("IP address: ");
    Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
    Serial.print("Message arrived [");
    Serial.print(topic);
    Serial.print("] ");

    for (int i = 0; i < length; i++) {
        Serial.print((char)payload[i]);
    }
    Serial.println();
}

void reconnect() {
    while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");

        if (client.connect("ESP32Client")) {
            Serial.println("connected");
            client.subscribe("device/control");
        } else {
            Serial.print("failed, rc=");
            Serial.print(client.state());
            Serial.println(" try again in 5 seconds");
            delay(5000);
        }
    }
}

void setup() {
    Serial.begin(115200);
    setup_wifi();
    client.setServer(mqtt_server, 1883);
    client.setCallback(callback);
    dht.begin();
}

void loop() {
    if (!client.connected()) {
        reconnect();
    }
    client.loop();

    // Read sensor every 10 seconds
    static unsigned long lastRead = 0;
    if (millis() - lastRead > 10000) {
        float humidity = dht.readHumidity();
        float temperature = dht.readTemperature();

        if (!isnan(humidity) && !isnan(temperature)) {
            char msg[100];
            snprintf(msg, sizeof(msg),
                    "{\"temperature\":%.2f,\"humidity\":%.2f}",
                    temperature, humidity);

            client.publish("sensors/data", msg);
            Serial.println(msg);
        }

        lastRead = millis();
    }
}
```

## Edge Computing

```python
import asyncio
from typing import Dict, List
import numpy as np

class EdgeProcessor:
    """Process data at edge before sending to cloud"""

    def __init__(self, buffer_size: int = 100):
        self.buffer: List[Dict] = []
        self.buffer_size = buffer_size

    def add_reading(self, reading: Dict):
        """Add sensor reading to buffer"""
        self.buffer.append(reading)

        if len(self.buffer) >= self.buffer_size:
            self.process_buffer()

    def process_buffer(self) -> Dict:
        """Process buffered data at edge"""
        if not self.buffer:
            return {}

        # Extract temperature values
        temperatures = [r['temperature'] for r in self.buffer]

        # Compute statistics at edge
        summary = {
            "count": len(temperatures),
            "mean": np.mean(temperatures),
            "std": np.std(temperatures),
            "min": np.min(temperatures),
            "max": np.max(temperatures),
            "anomalies": self.detect_anomalies(temperatures)
        }

        # Clear buffer
        self.buffer = []

        return summary

    def detect_anomalies(self, values: List[float]) -> List[int]:
        """Detect anomalies using simple threshold"""
        mean = np.mean(values)
        std = np.std(values)
        threshold = 2.5

        anomalies = []
        for i, v in enumerate(values):
            if abs(v - mean) > threshold * std:
                anomalies.append(i)

        return anomalies

class IoTPipeline:
    """Complete IoT data pipeline"""

    def __init__(self, mqtt_client: MQTTClient):
        self.mqtt = mqtt_client
        self.edge_processor = EdgeProcessor()
        self.devices: Dict[str, TemperatureSensor] = {}

    def register_device(self, device: TemperatureSensor):
        """Register IoT device"""
        self.devices[device.device_id] = device

        # Subscribe to device topic
        topic = f"sensors/temperature/{device.device_id}"
        self.mqtt.subscribe(topic, self.handle_device_data)

    def handle_device_data(self, data: Dict):
        """Handle incoming device data"""
        # Process at edge
        self.edge_processor.add_reading(data)

    async def collect_data_loop(self, interval: int = 5):
        """Continuous data collection from devices"""
        while True:
            for device in self.devices.values():
                reading = device.publish_reading()
                print(f"Device {device.device_id}: {reading['temperature']}°C")

            await asyncio.sleep(interval)
```

## Device Management

```python
from datetime import datetime
from enum import Enum

class DeviceStatus(Enum):
    ONLINE = "online"
    OFFLINE = "offline"
    MAINTENANCE = "maintenance"
    ERROR = "error"

class IoTDevice:
    def __init__(self, device_id: str, device_type: str):
        self.device_id = device_id
        self.device_type = device_type
        self.status = DeviceStatus.OFFLINE
        self.last_seen = None
        self.firmware_version = "1.0.0"
        self.metadata = {}

    def update_status(self, status: DeviceStatus):
        self.status = status
        self.last_seen = datetime.utcnow()

    def needs_firmware_update(self, latest_version: str) -> bool:
        return self.firmware_version < latest_version

class DeviceManager:
    def __init__(self):
        self.devices: Dict[str, IoTDevice] = {}

    def register_device(self, device: IoTDevice):
        """Register new device"""
        self.devices[device.device_id] = device

    def update_device_heartbeat(self, device_id: str):
        """Update device last seen timestamp"""
        if device_id in self.devices:
            self.devices[device_id].update_status(DeviceStatus.ONLINE)

    def get_offline_devices(self, timeout_seconds: int = 300) -> List[IoTDevice]:
        """Get devices that haven't reported recently"""
        offline = []
        now = datetime.utcnow()

        for device in self.devices.values():
            if device.last_seen:
                elapsed = (now - device.last_seen).total_seconds()
                if elapsed > timeout_seconds:
                    offline.append(device)

        return offline

    def schedule_firmware_update(self, device_id: str, new_version: str):
        """Schedule OTA firmware update"""
        if device_id in self.devices:
            # Send update command via MQTT
            payload = {
                "command": "firmware_update",
                "version": new_version,
                "url": f"https://updates.example.com/{new_version}.bin"
            }
            return payload
```

## Best Practices

### Device Design
- Implement power management for battery devices
- Use deep sleep modes when idle
- Handle network disconnections gracefully
- Implement watchdog timers
- Design for remote diagnostics
- Plan for firmware updates (OTA)

### Security
- Use TLS/SSL for MQTT connections
- Implement device authentication
- Encrypt sensitive data
- Secure firmware updates
- Regular security patches
- Network segmentation

### Data Management
- Process data at edge when possible
- Implement data buffering for offline scenarios
- Use efficient data formats (e.g., Protocol Buffers)
- Compress data before transmission
- Handle time synchronization
- Implement data retention policies

## Anti-Patterns

❌ No power management strategy
❌ Unencrypted communications
❌ No error handling for network failures
❌ Sending all raw data to cloud
❌ No device authentication
❌ Hard-coded credentials in firmware
❌ No OTA update mechanism

## Resources

- MQTT: https://mqtt.org/
- ESP32 Documentation: https://docs.espressif.com/
- Arduino: https://www.arduino.cc/
- AWS IoT: https://aws.amazon.com/iot/
- Azure IoT: https://azure.microsoft.com/en-us/overview/iot/

Overview

This skill provides expert-level guidance for designing, building, and operating IoT systems, embedded devices, edge computing, and IoT protocols. It focuses on practical architecture, secure communication, firmware best practices, and efficient edge processing. Use it to accelerate robust device development and reliable deployments across constrained and connected environments.

How this skill works

The skill explains layered IoT architecture from sensors to cloud and enforces security across layers. It covers protocol selection and implementation (MQTT, CoAP, HTTP, LoRaWAN, Zigbee), embedded programming patterns (RTOS, power management, interfaces), edge processing techniques, and device management workflows including OTA updates and heartbeat handling. Practical code patterns and anti-patterns are provided to shorten implementation time and reduce operational risk.

When to use it

  • Designing new sensor or actuator devices and choosing hardware/software stack
  • Implementing secure field-to-cloud communication and protocol selection
  • Creating edge processors to summarize, filter, or detect anomalies before cloud ingress
  • Building OTA firmware update, device heartbeat, and lifecycle management systems
  • Optimizing battery-powered devices for power consumption and reliability

Best practices

  • Treat security as a cross-layer concern: TLS, device auth, encrypted firmware, network segmentation
  • Design for intermittent connectivity: local buffering, retries, and idempotent messages
  • Optimize for constrained devices: use efficient formats, minimize memory and CPU footprint
  • Use watchdogs, deep sleep, and power budgeting for battery devices
  • Perform edge aggregation and anomaly detection to reduce cloud costs and latency

Example use cases

  • Temperature sensor fleet with MQTT telemetry, edge aggregation, and cloud dashboard
  • Battery-powered environmental sensors using deep sleep and LoRaWAN for long-range telemetry
  • ESP32-based home gateway bridging Zigbee sensor networks to cloud via MQTT with TLS
  • Edge processor that computes summary statistics and flags anomalies before sending to cloud
  • Device management system tracking last-seen, scheduling OTA updates, and reporting device states

FAQ

Which protocol should I choose for constrained sensors?

Use MQTT for lightweight, reliable pub/sub when a broker is available; use CoAP for constrained UDP scenarios and when REST-like semantics are needed.

How do I minimize power consumption on battery devices?

Use deep sleep modes, reduce radio on-time, batch transmissions, tune sensing intervals, and implement aggressive power management in hardware and firmware.