home / skills / personamanagmentlayer / pcl / media-expert

media-expert skill

/stdlib/domains/media-expert

This skill provides expert guidance on media production, streaming, and asset management to optimize workflows and ensure high-quality delivery.

npx playbooks add skill personamanagmentlayer/pcl --skill media-expert

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
17.8 KB
---
name: media-expert
version: 1.0.0
description: Expert-level media production, content management, streaming, broadcasting, and media technology systems
category: domains
tags: [media, video, streaming, broadcast, cms, production]
allowed-tools:
  - Read
  - Write
  - Edit
---

# Media Expert

Expert guidance for media production, content management systems, video streaming, broadcasting systems, and modern media technology solutions.

## Core Concepts

### Media Production
- Video production workflows
- Audio production and mixing
- Post-production and editing
- Visual effects (VFX)
- Color grading and correction
- Animation and motion graphics
- Live production

### Streaming and Broadcasting
- Video streaming platforms
- Content Delivery Networks (CDN)
- Adaptive bitrate streaming
- Live broadcasting
- OTT (Over-the-Top) platforms
- Digital rights management (DRM)
- Transcoding and encoding

### Technologies
- Media Asset Management (MAM)
- Digital Asset Management (DAM)
- Broadcast automation
- IP-based media production
- Cloud production workflows
- AI for content analysis
- Virtual production

### Standards and Protocols
- SMPTE standards
- HLS (HTTP Live Streaming)
- DASH (Dynamic Adaptive Streaming over HTTP)
- RTMP/RTSP protocols
- NDI (Network Device Interface)
- MXF (Material Exchange Format)
- Metadata standards (Dublin Core, IPTC)

## Content Management System

```python
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional, Dict
from decimal import Decimal
from enum import Enum
import hashlib

class MediaType(Enum):
    VIDEO = "video"
    AUDIO = "audio"
    IMAGE = "image"
    DOCUMENT = "document"

class AssetStatus(Enum):
    DRAFT = "draft"
    IN_REVIEW = "in_review"
    APPROVED = "approved"
    PUBLISHED = "published"
    ARCHIVED = "archived"

@dataclass
class MediaAsset:
    """Media asset information"""
    asset_id: str
    title: str
    description: str
    media_type: MediaType
    file_path: str
    file_size_bytes: int
    duration_seconds: Optional[float]
    resolution: Optional[str]  # e.g., "1920x1080"
    codec: Optional[str]
    bitrate_kbps: Optional[int]
    frame_rate: Optional[float]
    created_at: datetime
    created_by: str
    status: AssetStatus
    tags: List[str]
    metadata: Dict[str, str]
    checksum: str

@dataclass
class ContentPackage:
    """Content package for distribution"""
    package_id: str
    title: str
    assets: List[str]  # Asset IDs
    created_at: datetime
    scheduled_publish: Optional[datetime]
    expiration_date: Optional[datetime]
    distribution_channels: List[str]

class MediaAssetManagementSystem:
    """Media asset management and workflow"""

    def __init__(self):
        self.assets = {}
        self.packages = {}
        self.workflows = []

    def ingest_asset(self, file_path: str, metadata: dict) -> MediaAsset:
        """Ingest media asset into system"""
        # Calculate checksum
        checksum = self._calculate_checksum(file_path)

        # Extract technical metadata
        tech_metadata = self._extract_metadata(file_path)

        asset = MediaAsset(
            asset_id=self._generate_asset_id(),
            title=metadata['title'],
            description=metadata.get('description', ''),
            media_type=MediaType(metadata['media_type']),
            file_path=file_path,
            file_size_bytes=tech_metadata['file_size'],
            duration_seconds=tech_metadata.get('duration'),
            resolution=tech_metadata.get('resolution'),
            codec=tech_metadata.get('codec'),
            bitrate_kbps=tech_metadata.get('bitrate'),
            frame_rate=tech_metadata.get('frame_rate'),
            created_at=datetime.now(),
            created_by=metadata['created_by'],
            status=AssetStatus.DRAFT,
            tags=metadata.get('tags', []),
            metadata=metadata.get('custom_metadata', {}),
            checksum=checksum
        )

        self.assets[asset.asset_id] = asset

        # Trigger automated workflows
        self._trigger_workflows(asset)

        return asset

    def _extract_metadata(self, file_path: str) -> dict:
        """Extract technical metadata from media file"""
        # Would use ffprobe or similar tool
        # Simulated metadata
        return {
            'file_size': 1073741824,  # 1 GB
            'duration': 3600.0,  # 1 hour
            'resolution': '1920x1080',
            'codec': 'h264',
            'bitrate': 5000,
            'frame_rate': 29.97
        }

    def _calculate_checksum(self, file_path: str) -> str:
        """Calculate file checksum for integrity"""
        # In production, would read actual file
        return hashlib.sha256(file_path.encode()).hexdigest()

    def transcode_asset(self, asset_id: str, output_profiles: List[dict]) -> dict:
        """Transcode asset to multiple formats"""
        asset = self.assets.get(asset_id)
        if not asset:
            return {'error': 'Asset not found'}

        transcode_jobs = []

        for profile in output_profiles:
            job = {
                'job_id': self._generate_job_id(),
                'asset_id': asset_id,
                'profile_name': profile['name'],
                'target_resolution': profile['resolution'],
                'target_bitrate': profile['bitrate'],
                'target_codec': profile['codec'],
                'status': 'queued',
                'progress_percent': 0,
                'estimated_completion': datetime.now() + timedelta(hours=1)
            }
            transcode_jobs.append(job)

        return {
            'asset_id': asset_id,
            'transcode_jobs': transcode_jobs,
            'total_jobs': len(transcode_jobs)
        }

    def search_assets(self, query: dict) -> List[MediaAsset]:
        """Search assets by metadata"""
        results = []

        for asset in self.assets.values():
            match = True

            # Text search
            if 'keywords' in query:
                keywords = query['keywords'].lower()
                if keywords not in asset.title.lower() and keywords not in asset.description.lower():
                    match = False

            # Media type filter
            if 'media_type' in query and asset.media_type.value != query['media_type']:
                match = False

            # Status filter
            if 'status' in query and asset.status.value != query['status']:
                match = False

            # Tag filter
            if 'tags' in query:
                required_tags = set(query['tags'])
                asset_tags = set(asset.tags)
                if not required_tags.issubset(asset_tags):
                    match = False

            # Date range
            if 'created_after' in query and asset.created_at < query['created_after']:
                match = False

            if match:
                results.append(asset)

        return results

    def create_content_package(self, package_data: dict) -> ContentPackage:
        """Create content package for distribution"""
        package = ContentPackage(
            package_id=self._generate_package_id(),
            title=package_data['title'],
            assets=package_data['asset_ids'],
            created_at=datetime.now(),
            scheduled_publish=package_data.get('scheduled_publish'),
            expiration_date=package_data.get('expiration_date'),
            distribution_channels=package_data['channels']
        )

        self.packages[package.package_id] = package

        return package

    def analyze_content(self, asset_id: str) -> dict:
        """AI-powered content analysis"""
        asset = self.assets.get(asset_id)
        if not asset:
            return {'error': 'Asset not found'}

        # Simulate AI analysis
        analysis = {
            'asset_id': asset_id,
            'detected_objects': ['person', 'car', 'building'],
            'detected_scenes': ['outdoor', 'daytime', 'urban'],
            'faces_detected': 3,
            'speech_to_text': 'Transcribed content would appear here...',
            'sentiment': 'positive',
            'content_categories': ['news', 'documentary'],
            'suggested_tags': ['urban', 'interview', 'documentary'],
            'quality_score': 85.0
        }

        return analysis

    def _trigger_workflows(self, asset: MediaAsset):
        """Trigger automated workflows for asset"""
        # Trigger proxy generation, thumbnails, etc.
        pass

    def _generate_asset_id(self) -> str:
        import uuid
        return f"ASSET-{uuid.uuid4().hex[:12].upper()}"

    def _generate_package_id(self) -> str:
        import uuid
        return f"PKG-{uuid.uuid4().hex[:8].upper()}"

    def _generate_job_id(self) -> str:
        import uuid
        return f"JOB-{uuid.uuid4().hex[:8].upper()}"
```

## Video Streaming Platform

```python
class VideoStreamingPlatform:
    """Video streaming and delivery system"""

    def __init__(self):
        self.streams = {}
        self.viewers = {}
        self.cdn_nodes = {}

    def start_live_stream(self, stream_data: dict) -> dict:
        """Start live video stream"""
        stream_id = self._generate_stream_id()

        stream = {
            'stream_id': stream_id,
            'title': stream_data['title'],
            'description': stream_data.get('description', ''),
            'streamer_id': stream_data['streamer_id'],
            'status': 'live',
            'started_at': datetime.now(),
            'viewer_count': 0,
            'peak_viewers': 0,
            'ingest_url': f'rtmp://ingest.example.com/live/{stream_id}',
            'playback_urls': {
                'hls': f'https://cdn.example.com/live/{stream_id}/playlist.m3u8',
                'dash': f'https://cdn.example.com/live/{stream_id}/manifest.mpd'
            },
            'quality_profiles': ['1080p', '720p', '480p', '360p']
        }

        self.streams[stream_id] = stream

        return stream

    def generate_adaptive_bitrate_manifest(self, asset_id: str) -> dict:
        """Generate ABR manifest for adaptive streaming"""
        # Generate HLS manifest
        hls_variants = [
            {
                'bandwidth': 5000000,  # 5 Mbps
                'resolution': '1920x1080',
                'codecs': 'avc1.640028,mp4a.40.2',
                'url': f'1080p/playlist.m3u8'
            },
            {
                'bandwidth': 2800000,  # 2.8 Mbps
                'resolution': '1280x720',
                'codecs': 'avc1.64001f,mp4a.40.2',
                'url': f'720p/playlist.m3u8'
            },
            {
                'bandwidth': 1400000,  # 1.4 Mbps
                'resolution': '854x480',
                'codecs': 'avc1.64001e,mp4a.40.2',
                'url': f'480p/playlist.m3u8'
            },
            {
                'bandwidth': 800000,  # 800 Kbps
                'resolution': '640x360',
                'codecs': 'avc1.64001e,mp4a.40.2',
                'url': f'360p/playlist.m3u8'
            }
        ]

        return {
            'asset_id': asset_id,
            'protocol': 'hls',
            'master_playlist_url': f'https://cdn.example.com/vod/{asset_id}/master.m3u8',
            'variants': hls_variants
        }

    def track_viewer_metrics(self, stream_id: str, viewer_id: str) -> dict:
        """Track viewer engagement metrics"""
        metrics = {
            'stream_id': stream_id,
            'viewer_id': viewer_id,
            'watch_time_seconds': 3600,
            'buffer_events': 2,
            'average_bitrate': 3500000,
            'quality_switches': 5,
            'playback_start_time_ms': 1200,
            'errors': 0,
            'device_type': 'desktop',
            'browser': 'chrome'
        }

        # Calculate Quality of Experience (QoE)
        qoe_score = self._calculate_qoe(metrics)
        metrics['qoe_score'] = qoe_score

        return metrics

    def _calculate_qoe(self, metrics: dict) -> float:
        """Calculate Quality of Experience score"""
        score = 100.0

        # Penalize buffering
        score -= metrics['buffer_events'] * 5

        # Penalize startup time
        if metrics['playback_start_time_ms'] > 2000:
            score -= 10

        # Penalize errors
        score -= metrics['errors'] * 15

        return max(0.0, score)

    def implement_drm(self, asset_id: str, drm_config: dict) -> dict:
        """Implement Digital Rights Management"""
        drm = {
            'asset_id': asset_id,
            'drm_systems': {
                'widevine': {
                    'license_url': 'https://license.example.com/widevine',
                    'supported_levels': ['L1', 'L3']
                },
                'fairplay': {
                    'certificate_url': 'https://license.example.com/fairplay/cert',
                    'license_url': 'https://license.example.com/fairplay/license'
                },
                'playready': {
                    'license_url': 'https://license.example.com/playready'
                }
            },
            'encryption': 'AES-128-CTR',
            'key_rotation_interval': 3600  # seconds
        }

        return drm

    def optimize_cdn_delivery(self, asset_id: str, viewer_location: tuple) -> dict:
        """Optimize CDN delivery based on viewer location"""
        # Find nearest CDN edge node
        nearest_node = self._find_nearest_cdn_node(viewer_location)

        return {
            'asset_id': asset_id,
            'cdn_node': nearest_node['node_id'],
            'cdn_location': nearest_node['location'],
            'distance_km': nearest_node['distance'],
            'estimated_latency_ms': nearest_node['latency'],
            'delivery_url': f"https://{nearest_node['node_id']}.cdn.example.com/{asset_id}"
        }

    def _find_nearest_cdn_node(self, viewer_location: tuple) -> dict:
        """Find nearest CDN edge node to viewer"""
        # Would calculate actual distances to CDN nodes
        return {
            'node_id': 'edge-us-east-1',
            'location': 'Virginia, USA',
            'distance': 250,  # km
            'latency': 15  # ms
        }

    def _generate_stream_id(self) -> str:
        import uuid
        return f"STREAM-{uuid.uuid4().hex[:8].upper()}"
```

## Broadcast Automation

```python
class BroadcastAutomationSystem:
    """Broadcast scheduling and automation"""

    def __init__(self):
        self.schedule = []
        self.playlists = {}

    def create_broadcast_schedule(self, channel: str, date: datetime, programming: List[dict]) -> dict:
        """Create daily broadcast schedule"""
        schedule_items = []
        current_time = date.replace(hour=0, minute=0, second=0)

        for program in programming:
            item = {
                'channel': channel,
                'start_time': current_time,
                'end_time': current_time + timedelta(seconds=program['duration']),
                'program_title': program['title'],
                'asset_id': program['asset_id'],
                'type': program['type'],  # 'program', 'commercial', 'filler'
                'metadata': program.get('metadata', {})
            }

            schedule_items.append(item)
            current_time = item['end_time']

        self.schedule.extend(schedule_items)

        return {
            'channel': channel,
            'date': date.date().isoformat(),
            'total_items': len(schedule_items),
            'total_duration': (schedule_items[-1]['end_time'] - schedule_items[0]['start_time']).seconds,
            'schedule': schedule_items[:5]  # Return first 5 items
        }

    def generate_playlist(self, schedule_id: str) -> dict:
        """Generate playout playlist"""
        # Convert schedule to playout format
        playlist = {
            'playlist_id': self._generate_playlist_id(),
            'format': 'xml',  # or 'json'
            'items': []
        }

        return playlist

    def monitor_broadcast(self, channel: str) -> dict:
        """Monitor live broadcast status"""
        status = {
            'channel': channel,
            'on_air': True,
            'current_program': 'Evening News',
            'time_code': '00:15:32',
            'next_program': 'Sports Tonight',
            'next_program_in': 2728,  # seconds
            'signal_quality': {
                'video_ok': True,
                'audio_ok': True,
                'sync_ok': True
            },
            'alarms': []
        }

        return status

    def _generate_playlist_id(self) -> str:
        import uuid
        return f"PLAY-{uuid.uuid4().hex[:8].upper()}"
```

## Best Practices

### Media Production
- Use standardized workflows
- Implement version control
- Maintain proper backups
- Use collaborative tools
- Implement quality control
- Document production processes
- Use industry-standard formats

### Content Management
- Implement robust metadata schema
- Use consistent naming conventions
- Enable full-text search
- Implement access controls
- Maintain audit trails
- Use automated workflows
- Implement archival policies

### Streaming Delivery
- Use adaptive bitrate streaming
- Implement CDN for global delivery
- Monitor QoE metrics
- Optimize for mobile devices
- Implement DRM when required
- Use low-latency protocols for live
- Monitor buffer ratios

### Broadcasting
- Implement redundant systems
- Automate scheduling
- Monitor signal quality
- Maintain emergency protocols
- Use backup playout systems
- Implement proper logging
- Conduct regular testing

## Anti-Patterns

❌ No backup systems
❌ Poor metadata management
❌ Single bitrate streaming
❌ No CDN implementation
❌ Ignoring QoE metrics
❌ Manual scheduling processes
❌ No DRM for premium content
❌ Poor asset organization
❌ No disaster recovery plan

## Resources

- SMPTE: https://www.smpte.org/
- Streaming Media: https://www.streamingmedia.com/
- NAB (National Association of Broadcasters): https://www.nab.org/
- EBU (European Broadcasting Union): https://www.ebu.ch/
- FFmpeg: https://ffmpeg.org/
- Video.js: https://videojs.com/
- OTT Standards: https://www.ott-standards.org/

Overview

This skill provides expert-level guidance for media production, content management, video streaming, broadcasting systems, and media technology architecture. It distills practical patterns for asset workflows, transcoding, streaming delivery, DRM, MAM/DAM integration, and cloud-native media operations. Use it to design, operate, or optimize professional media systems across production and distribution.

How this skill works

The skill inspects end-to-end media workflows: ingestion, metadata extraction, automated processing (thumbnails, proxies, transcodes), cataloging, packaging, and distribution. It outlines streaming delivery mechanics including ABR manifest generation, CDN optimization, live ingest and playback endpoints, and viewer QoE tracking. It also covers media standards, protocols, DRM implementation, and AI-driven content analysis for tagging and quality scoring.

When to use it

  • Designing or auditing a media asset management (MAM/DAM) system
  • Defining transcoding and adaptive-bitrate (ABR) profiles for VOD and live streams
  • Configuring CDN strategies and edge selection for global playback
  • Implementing DRM and secure license workflows for premium content
  • Automating production pipelines (proxy generation, thumbnails, QC, delivery)

Best practices

  • Capture and store standardized technical metadata (codec, bitrate, resolution, frame rate) for every asset
  • Use checksum and integrity checks on ingest to prevent file corruption and enable safe archival
  • Define clear asset lifecycle states (draft, in_review, approved, published, archived) and enforce workflow gates
  • Design ABR ladders from viewer telemetry and network profiles; avoid excessive bitrate variants
  • Automate QA: proxy transcodes, thumbnails, waveform/levels checks, and objective quality scoring before publish
  • Use tokenized CDN URLs and key rotation for DRM; monitor license failures and playback errors for rapid remediation

Example use cases

  • A broadcaster ingesting live recordings, auto-generating proxies, and scheduling VOD packages for OTT channels
  • A media company building a search-enabled MAM with AI-powered scene, face, and speech tagging
  • An OTT platform generating HLS/DASH manifests and per-region CDN routing to minimize latency
  • A post-production house automating color-grade deliverables, multiformat transcodes, and QC checks
  • A streamer tracking QoE metrics per session to tune ABR ladders and reduce buffering

FAQ

How do I choose ABR variant bitrates?

Base choices on target audience bandwidth distribution and device mix; use viewer telemetry to iterate and remove unused variants.

When should I implement DRM vs. tokenized ACLs?

Use DRM for paid or licensed premium content requiring device-level enforcement; tokenized ACLs suffice for low-risk or free content where simple access control is enough.