home / skills / aj-geddes / useful-ai-prompts / zero-trust-architecture

zero-trust-architecture skill

/skills/zero-trust-architecture

This skill helps you implement zero trust security across cloud-native apps with identity, device, location checks and continuous verification.

npx playbooks add skill aj-geddes/useful-ai-prompts --skill zero-trust-architecture

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
16.7 KB
---
name: zero-trust-architecture
description: Implement Zero Trust security model with identity verification, microsegmentation, least privilege access, and continuous monitoring. Use when building secure cloud-native applications.
---

# Zero Trust Architecture

## Overview

Implement comprehensive Zero Trust security architecture based on "never trust, always verify" principle with identity-centric security, microsegmentation, and continuous verification.

## When to Use

- Cloud-native applications
- Microservices architecture
- Remote workforce security
- API security
- Multi-cloud deployments
- Legacy modernization
- Compliance requirements

## Implementation Examples

### 1. **Zero Trust Gateway**

```javascript
// zero-trust-gateway.js
const jwt = require('jsonwebtoken');
const axios = require('axios');

class ZeroTrustGateway {
  constructor() {
    this.identityProvider = process.env.IDENTITY_PROVIDER_URL;
    this.deviceRegistry = new Map();
    this.sessionContext = new Map();
  }

  /**
   * Verify identity - Who are you?
   */
  async verifyIdentity(token) {
    try {
      // Verify JWT token
      const decoded = jwt.verify(token, process.env.JWT_PUBLIC_KEY, {
        algorithms: ['RS256']
      });

      // Check token hasn't been revoked
      const revoked = await this.checkTokenRevocation(decoded.jti);
      if (revoked) {
        throw new Error('Token has been revoked');
      }

      return {
        valid: true,
        userId: decoded.sub,
        roles: decoded.roles,
        permissions: decoded.permissions
      };
    } catch (error) {
      return { valid: false, error: error.message };
    }
  }

  /**
   * Verify device - What device are you using?
   */
  async verifyDevice(deviceId, deviceFingerprint) {
    const registered = this.deviceRegistry.get(deviceId);

    if (!registered) {
      return {
        trusted: false,
        reason: 'Device not registered'
      };
    }

    // Check device fingerprint matches
    if (registered.fingerprint !== deviceFingerprint) {
      return {
        trusted: false,
        reason: 'Device fingerprint mismatch'
      };
    }

    // Check device compliance
    const compliance = await this.checkDeviceCompliance(deviceId);

    return {
      trusted: compliance.compliant,
      reason: compliance.reason,
      riskScore: compliance.riskScore
    };
  }

  /**
   * Verify location - Where are you?
   */
  async verifyLocation(ip, expectedCountry) {
    try {
      // Get geolocation data
      const geoData = await this.getGeoLocation(ip);

      // Check for impossible travel
      const lastLocation = this.getLastKnownLocation(ip);
      if (lastLocation) {
        const impossibleTravel = this.detectImpossibleTravel(
          lastLocation,
          geoData,
          Date.now() - lastLocation.timestamp
        );

        if (impossibleTravel) {
          return {
            valid: false,
            reason: 'Impossible travel detected',
            riskScore: 9
          };
        }
      }

      // Check against allowed locations
      if (expectedCountry && geoData.country !== expectedCountry) {
        return {
          valid: false,
          reason: 'Unexpected location',
          riskScore: 7
        };
      }

      return {
        valid: true,
        location: geoData,
        riskScore: 1
      };
    } catch (error) {
      return {
        valid: false,
        reason: 'Location verification failed',
        riskScore: 5
      };
    }
  }

  /**
   * Verify authorization - What can you access?
   */
  async verifyAuthorization(userId, resource, action, context) {
    // Get user permissions
    const user = await this.getUserPermissions(userId);

    // Check direct permissions
    if (this.hasPermission(user, resource, action)) {
      return { authorized: true, reason: 'Direct permission' };
    }

    // Check role-based permissions
    for (const role of user.roles) {
      if (this.hasRolePermission(role, resource, action)) {
        return { authorized: true, reason: `Role: ${role}` };
      }
    }

    // Check attribute-based policies
    const abacResult = await this.evaluateABAC(user, resource, action, context);
    if (abacResult.allowed) {
      return { authorized: true, reason: 'ABAC policy' };
    }

    return {
      authorized: false,
      reason: 'Insufficient permissions'
    };
  }

  /**
   * Calculate risk score
   */
  calculateRiskScore(factors) {
    let score = 0;

    // Identity factors
    if (!factors.mfaUsed) score += 3;
    if (factors.newDevice) score += 2;

    // Location factors
    if (factors.unusualLocation) score += 3;
    if (factors.vpnDetected) score += 1;

    // Behavior factors
    if (factors.unusualTime) score += 2;
    if (factors.rapidRequests) score += 2;

    // Device factors
    if (!factors.deviceCompliant) score += 4;
    if (factors.jailbroken) score += 5;

    return Math.min(score, 10);
  }

  /**
   * Continuous verification middleware
   */
  middleware() {
    return async (req, res, next) => {
      const startTime = Date.now();

      try {
        // Extract authentication token
        const token = req.headers.authorization?.replace('Bearer ', '');
        if (!token) {
          return res.status(401).json({
            error: 'unauthorized',
            message: 'No authentication token provided'
          });
        }

        // Step 1: Verify identity
        const identity = await this.verifyIdentity(token);
        if (!identity.valid) {
          return res.status(401).json({
            error: 'unauthorized',
            message: 'Invalid identity'
          });
        }

        // Step 2: Verify device
        const deviceId = req.headers['x-device-id'];
        const deviceFingerprint = req.headers['x-device-fingerprint'];

        if (deviceId && deviceFingerprint) {
          const device = await this.verifyDevice(deviceId, deviceFingerprint);
          if (!device.trusted) {
            return res.status(403).json({
              error: 'forbidden',
              message: device.reason
            });
          }
        }

        // Step 3: Verify location
        const location = await this.verifyLocation(req.ip);
        if (!location.valid) {
          // Require step-up authentication
          return res.status(403).json({
            error: 'forbidden',
            message: 'Additional authentication required',
            requiresStepUp: true
          });
        }

        // Step 4: Calculate risk score
        const riskScore = this.calculateRiskScore({
          mfaUsed: identity.mfaUsed,
          newDevice: !deviceId,
          unusualLocation: location.riskScore > 5,
          deviceCompliant: true
        });

        // Step 5: Verify authorization
        const authorization = await this.verifyAuthorization(
          identity.userId,
          req.path,
          req.method,
          {
            ip: req.ip,
            riskScore,
            time: new Date()
          }
        );

        if (!authorization.authorized) {
          return res.status(403).json({
            error: 'forbidden',
            message: authorization.reason
          });
        }

        // Add context to request
        req.zeroTrust = {
          userId: identity.userId,
          roles: identity.roles,
          riskScore,
          verificationTime: Date.now() - startTime
        };

        // Log access
        this.logAccess(req, identity, riskScore);

        next();
      } catch (error) {
        console.error('Zero Trust verification failed:', error);
        return res.status(500).json({
          error: 'internal_error',
          message: 'Security verification failed'
        });
      }
    };
  }

  async checkTokenRevocation(jti) {
    // Check against revocation list
    return false;
  }

  async checkDeviceCompliance(deviceId) {
    // Check device meets security requirements
    return {
      compliant: true,
      reason: 'Device meets requirements',
      riskScore: 1
    };
  }

  async getGeoLocation(ip) {
    // Get geolocation from IP
    return {
      country: 'US',
      city: 'San Francisco',
      lat: 37.7749,
      lon: -122.4194
    };
  }

  getLastKnownLocation(ip) {
    return null;
  }

  detectImpossibleTravel(lastLocation, currentLocation, timeDiff) {
    // Calculate if travel is physically possible
    return false;
  }

  async getUserPermissions(userId) {
    // Fetch user permissions
    return {
      roles: ['user'],
      permissions: []
    };
  }

  hasPermission(user, resource, action) {
    return false;
  }

  hasRolePermission(role, resource, action) {
    return false;
  }

  async evaluateABAC(user, resource, action, context) {
    return { allowed: false };
  }

  logAccess(req, identity, riskScore) {
    console.log({
      timestamp: new Date().toISOString(),
      userId: identity.userId,
      resource: req.path,
      method: req.method,
      riskScore,
      ip: req.ip
    });
  }
}

// Express setup
const express = require('express');
const app = express();

const ztGateway = new ZeroTrustGateway();

// Apply Zero Trust middleware
app.use(ztGateway.middleware());

// Protected endpoint
app.get('/api/sensitive-data', (req, res) => {
  res.json({
    message: 'Access granted',
    riskScore: req.zeroTrust.riskScore
  });
});

module.exports = ZeroTrustGateway;
```

### 2. **Service Mesh - Microsegmentation**

```yaml
# istio-zero-trust.yaml
# Istio configuration for Zero Trust microsegmentation

apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
  name: default
  namespace: production
spec:
  mtls:
    mode: STRICT  # Require mutual TLS

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: deny-all
  namespace: production
spec:
  {} # Deny all by default

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: allow-frontend-to-backend
  namespace: production
spec:
  selector:
    matchLabels:
      app: backend
  action: ALLOW
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/production/sa/frontend"]
    to:
    - operation:
        methods: ["GET", "POST"]
        paths: ["/api/*"]

---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: allow-backend-to-database
  namespace: production
spec:
  selector:
    matchLabels:
      app: database
  action: ALLOW
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/production/sa/backend"]
    to:
    - operation:
        methods: ["*"]

---
# JWT authentication
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
  name: jwt-auth
  namespace: production
spec:
  selector:
    matchLabels:
      app: backend
  jwtRules:
  - issuer: "https://auth.example.com"
    jwksUri: "https://auth.example.com/.well-known/jwks.json"
    audiences:
    - "api.example.com"

---
# Network policy - additional defense layer
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: backend-network-policy
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: backend
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: frontend
    ports:
    - protocol: TCP
      port: 8080
  egress:
  - to:
    - podSelector:
        matchLabels:
          app: database
    ports:
    - protocol: TCP
      port: 5432
```

### 3. **Python Zero Trust Policy Engine**

```python
# zero_trust_policy.py
from dataclasses import dataclass
from typing import List, Dict, Any
from datetime import datetime
import jwt

@dataclass
class ZeroTrustContext:
    user_id: str
    device_id: str
    location: Dict[str, Any]
    risk_score: int
    timestamp: datetime

class ZeroTrustPolicy:
    def __init__(self):
        self.policies = self.load_policies()

    def load_policies(self) -> List[Dict]:
        """Load Zero Trust policies"""
        return [
            {
                'name': 'high_risk_block',
                'condition': lambda ctx: ctx.risk_score >= 8,
                'action': 'deny',
                'reason': 'High risk score'
            },
            {
                'name': 'require_mfa',
                'condition': lambda ctx: ctx.risk_score >= 5,
                'action': 'step_up_auth',
                'reason': 'Elevated risk requires MFA'
            },
            {
                'name': 'untrusted_device',
                'condition': lambda ctx: not self.is_device_trusted(ctx.device_id),
                'action': 'deny',
                'reason': 'Untrusted device'
            },
            {
                'name': 'unusual_location',
                'condition': lambda ctx: self.is_unusual_location(ctx),
                'action': 'step_up_auth',
                'reason': 'Unusual location detected'
            }
        ]

    def evaluate(self, context: ZeroTrustContext, resource: str, action: str) -> Dict:
        """Evaluate Zero Trust policies"""
        for policy in self.policies:
            if policy['condition'](context):
                return {
                    'allowed': policy['action'] != 'deny',
                    'action': policy['action'],
                    'reason': policy['reason'],
                    'policy': policy['name']
                }

        # Check resource-specific permissions
        if self.has_permission(context.user_id, resource, action):
            return {
                'allowed': True,
                'action': 'allow',
                'reason': 'User has required permissions'
            }

        return {
            'allowed': False,
            'action': 'deny',
            'reason': 'No matching policy allows access'
        }

    def is_device_trusted(self, device_id: str) -> bool:
        # Check device trust status
        return True

    def is_unusual_location(self, context: ZeroTrustContext) -> bool:
        # Check if location is unusual for user
        return False

    def has_permission(self, user_id: str, resource: str, action: str) -> bool:
        # Check user permissions
        return False

# Flask integration
from flask import Flask, request, jsonify, g
from functools import wraps

app = Flask(__name__)
zt_policy = ZeroTrustPolicy()

def zero_trust_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        # Build Zero Trust context
        context = ZeroTrustContext(
            user_id=g.user_id,
            device_id=request.headers.get('X-Device-ID', 'unknown'),
            location={
                'ip': request.remote_addr,
                'country': 'US'  # From GeoIP
            },
            risk_score=calculate_risk_score(request),
            timestamp=datetime.utcnow()
        )

        # Evaluate policies
        result = zt_policy.evaluate(
            context,
            request.path,
            request.method
        )

        if not result['allowed']:
            return jsonify({
                'error': 'forbidden',
                'reason': result['reason'],
                'action_required': result['action']
            }), 403

        return f(*args, **kwargs)

    return decorated_function

def calculate_risk_score(request) -> int:
    """Calculate risk score based on request context"""
    score = 0

    # Check for suspicious patterns
    if not request.headers.get('X-Device-ID'):
        score += 2

    # Add more risk factors
    return min(score, 10)

@app.route('/api/sensitive', methods=['GET'])
@zero_trust_required
def get_sensitive_data():
    return jsonify({'data': 'sensitive information'})
```

## Zero Trust Principles

1. **Verify Explicitly**: Always authenticate and authorize
2. **Least Privilege**: Minimal access required
3. **Assume Breach**: Limit blast radius
4. **Microsegmentation**: Network segmentation
5. **Continuous Monitoring**: Real-time security
6. **Device Trust**: Verify device compliance
7. **Data Protection**: Encrypt everything

## Implementation Layers

- **Identity Layer**: Strong authentication (MFA, SSO)
- **Device Layer**: Device compliance, certificates
- **Network Layer**: Microsegmentation, mTLS
- **Application Layer**: API gateway, authorization
- **Data Layer**: Encryption, DLP

## Zero Trust Maturity Model

1. **Traditional**: Perimeter-based security
2. **Advanced**: Some segmentation
3. **Optimal**: Full Zero Trust
4. **Progressive**: Continuous improvement

## Best Practices

### ✅ DO
- Verify every request
- Implement MFA everywhere
- Use microsegmentation
- Monitor continuously
- Encrypt all communications
- Implement least privilege
- Log all access
- Regular audits

### ❌ DON'T
- Trust network location
- Use implicit trust
- Skip device verification
- Allow lateral movement
- Use static credentials

## Resources

- [NIST Zero Trust Architecture](https://csrc.nist.gov/publications/detail/sp/800-207/final)
- [Google BeyondCorp](https://cloud.google.com/beyondcorp)
- [Zero Trust Implementation Guide](https://www.microsoft.com/en-us/security/business/zero-trust)

Overview

This skill implements a Zero Trust security architecture for cloud-native applications. It focuses on identity verification, device and location checks, microsegmentation, least-privilege authorization, and continuous risk-based monitoring. Use it to harden microservices, APIs, and remote access in multi-cloud or hybrid environments.

How this skill works

The skill inspects identity tokens, device fingerprints, IP geolocation, and runtime context to compute a risk score and enforce access decisions. It combines identity verification, device compliance checks, location and behavior signals, and policy evaluation (RBAC/ABAC) to allow, deny, or require step-up authentication. It also demonstrates microsegmentation patterns for service mesh and network policy enforcement to limit lateral movement.

When to use it

  • Protect cloud-native APIs and microservices
  • Enforce least-privilege in multi-tenant environments
  • Harden remote workforce and BYOD access
  • Apply risk-based step-up authentication
  • Implement microsegmentation with a service mesh
  • Meet regulatory and compliance controls

Best practices

  • Center controls on identity: use short-lived tokens, strong MFA, and token revocation
  • Apply microsegmentation: deny-by-default and allow explicit service-to-service paths
  • Use layered signals: combine device posture, location, behavior, and token claims for risk scoring
  • Make policies auditable and testable: version policies and simulate decisions before enforcement
  • Fail securely: default to deny on verification failures and log context for investigation
  • Automate telemetry: collect access logs and risk metrics for continuous improvement

Example use cases

  • Zero Trust middleware that verifies JWTs, device posture, location, and authorizes requests before routing to services
  • Istio/Service Mesh configuration enforcing mtls, deny-all default, and explicit service-to-service allow rules
  • Policy engine that evaluates risk scores and returns allow/deny/step-up actions for API gateways
  • Protecting sensitive endpoints with step-up MFA when risk thresholds are met
  • Segmenting backend, frontend, and database traffic with network policies and authorization policies

FAQ

How do I calculate a meaningful risk score?

Combine identity signals (MFA, token age), device posture (compliance, jailbreak), location anomalies, and behavior (rate, time). Weight factors based on impact and cap scores to map to clear actions (allow, step-up, deny).

Where should Zero Trust policies run?

Run enforcement as close to the control plane: API gateway or sidecar proxy for requests, service mesh for service-to-service, and centralized policy engine for consistent decisions and auditing.