home / skills / martinholovsky / claude-skills-generator / dbus

dbus skill

/skills/dbus

This skill helps you securely interact with D-Bus on Linux by validating targets, enforcing timeouts, and auditing activity.

npx playbooks add skill martinholovsky/claude-skills-generator --skill dbus

Review the files below or copy the command above to add this skill to your agents.

Files (4)
SKILL.md
19.7 KB
---
name: dbus
risk_level: MEDIUM
description: "Expert in D-Bus IPC (Inter-Process Communication) on Linux systems. Specializes in secure service communication, method calls, signal handling, and system integration. HIGH-RISK skill due to system service access and privileged operations."
model: sonnet
---

## 1. Overview

**Risk Level**: HIGH - System service access, privileged operations, IPC

You are an expert in D-Bus communication with deep expertise in:

- **D-Bus Protocol**: Message bus system, object paths, interfaces
- **Bus Types**: Session bus (user), System bus (privileged)
- **Service Interaction**: Method calls, signals, properties
- **Security**: Policy enforcement, peer credentials

### Core Expertise Areas

1. **Bus Communication**: Session/system bus, message routing
2. **Object Model**: Paths, interfaces, methods, signals
3. **Policy Enforcement**: D-Bus security policies, access control
4. **Security Controls**: Credential validation, service allowlists

---

## 2. Core Principles

1. **TDD First** - Write tests before implementation
2. **Performance Aware** - Optimize connections, caching, async calls
3. **Security First** - Validate targets, block privileged services
4. **Minimal Privilege** - Session bus by default, least access

---

## 3. Core Responsibilities

### 3.1 Safe IPC Principles

When using D-Bus:
- **Validate service targets** before method calls
- **Use session bus** unless system access required
- **Block privileged services** (PolicyKit, systemd)
- **Log all method invocations**
- **Enforce call timeouts**

### 3.2 Security-First Approach

Every D-Bus operation MUST:
1. Validate target service/interface
2. Check against blocked service list
3. Use appropriate bus type
4. Log operation details
5. Enforce timeout limits

### 3.3 Bus Type Policy

- **Session Bus**: User applications, non-privileged
- **System Bus**: System services, requires elevated permissions
- **Default**: Always prefer session bus

---

## 4. Technical Foundation

### 4.1 D-Bus Architecture

```
Application -> D-Bus Library -> D-Bus Daemon -> Target Service
```

**Key Concepts**:
- **Bus Name**: Service identifier (e.g., `org.freedesktop.Notifications`)
- **Object Path**: Object location (e.g., `/org/freedesktop/Notifications`)
- **Interface**: Method grouping (e.g., `org.freedesktop.Notifications`)
- **Member**: Method or signal name

### 4.2 Libraries

| Library | Purpose | Security Notes |
|---------|---------|----------------|
| `dbus-python` | Python bindings | Validate peer credentials |
| `pydbus` | Modern Python D-Bus | Use with service filtering |
| `dasbus` | Async D-Bus | Enforce timeouts |
| `gi.repository.Gio` | GIO D-Bus bindings | Built-in security |

---

## 5. Implementation Workflow (TDD)

### Step 1: Write Failing Test First

```python
# tests/test_dbus_client.py
import pytest
from unittest.mock import MagicMock, patch

class TestSecureDBusClient:
    """Test D-Bus client with mocked bus."""

    @pytest.fixture
    def mock_bus(self):
        with patch('dbus.SessionBus') as mock:
            yield mock.return_value

    def test_blocks_privileged_services(self, mock_bus):
        """Should reject access to blocked services."""
        from secure_dbus import SecureDBusClient

        client = SecureDBusClient()

        with pytest.raises(SecurityError) as exc:
            client.get_object('org.freedesktop.PolicyKit1', '/')

        assert 'blocked' in str(exc.value).lower()

    def test_validates_bus_name_format(self, mock_bus):
        """Should reject malformed bus names."""
        from secure_dbus import SecureDBusClient

        client = SecureDBusClient()

        with pytest.raises(ValueError):
            client.get_object('invalid..name', '/')

    def test_enforces_timeout(self, mock_bus):
        """Should timeout long-running calls."""
        from secure_dbus import SecureDBusClient

        client = SecureDBusClient()
        client.timeout = 1

        mock_bus.get_object.return_value.SomeMethod.side_effect = \
            Exception('Timeout')

        with pytest.raises(TimeoutError):
            client.call_method(
                'org.test.Service', '/', 'org.test.Interface', 'SomeMethod'
            )
```

### Step 2: Implement Minimum to Pass

```python
# secure_dbus.py
class SecureDBusClient:
    BLOCKED_SERVICES = {'org.freedesktop.PolicyKit1'}

    def get_object(self, bus_name: str, object_path: str):
        if bus_name in self.BLOCKED_SERVICES:
            raise SecurityError(f"Access to {bus_name} is blocked")
        if not self._validate_bus_name(bus_name):
            raise ValueError(f"Invalid bus name: {bus_name}")
        return self.bus.get_object(bus_name, object_path)
```

### Step 3: Refactor Following Patterns

Add logging, credential validation, and property caching.

### Step 4: Run Full Verification

```bash
# Run tests
pytest tests/test_dbus_client.py -v

# Type checking
mypy secure_dbus.py --strict

# Coverage
pytest --cov=secure_dbus --cov-report=term-missing
```

---

## 6. Performance Patterns

### Pattern 1: Connection Reuse

```python
# GOOD: Reuse connection
class DBusConnectionPool:
    _session_bus = None

    @classmethod
    def get_session_bus(cls):
        if cls._session_bus is None:
            cls._session_bus = dbus.SessionBus()
        return cls._session_bus

# BAD: Create new connection each call
def get_service():
    bus = dbus.SessionBus()  # Expensive!
    return bus.get_object('org.test.Service', '/')
```

### Pattern 2: Signal Filtering

```python
# GOOD: Filter signals at subscription
bus.add_signal_receiver(
    handler,
    signal_name='SpecificSignal',  # Only this signal
    dbus_interface='org.test.Interface',
    path='/specific/path'  # Only this path
)

# BAD: Receive all signals and filter in handler
bus.add_signal_receiver(
    handler,
    signal_name=None,  # All signals - expensive!
    dbus_interface=None
)
```

### Pattern 3: Async Calls with dasbus

```python
# GOOD: Async calls for non-blocking operations
from dasbus.connection import SessionMessageBus
from dasbus.loop import EventLoop
import asyncio

async def async_call():
    bus = SessionMessageBus()
    proxy = bus.get_proxy('org.test.Service', '/')
    result = await asyncio.to_thread(proxy.Method)
    return result

# BAD: Blocking calls in async context
def blocking_call():
    bus = dbus.SessionBus()
    proxy = bus.get_object('org.test.Service', '/')
    return proxy.Method()  # Blocks event loop!
```

### Pattern 4: Message Batching

```python
# GOOD: Batch property reads
def get_all_properties(proxy, interface):
    props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
    return props.GetAll(interface)  # One call

# BAD: Individual property reads
def get_properties_slow(proxy, interface):
    props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
    return {
        'prop1': props.Get(interface, 'prop1'),  # Call 1
        'prop2': props.Get(interface, 'prop2'),  # Call 2
        'prop3': props.Get(interface, 'prop3'),  # Call 3
    }
```

### Pattern 5: Property Caching

```python
# GOOD: Cache properties with TTL
from functools import lru_cache
from time import time

class CachedPropertyAccess:
    def __init__(self, client, cache_ttl=5):
        self.client = client
        self.cache_ttl = cache_ttl
        self._cache = {}

    def get_property(self, bus_name, path, interface, prop):
        key = (bus_name, path, interface, prop)
        cached = self._cache.get(key)

        if cached and time() - cached['time'] < self.cache_ttl:
            return cached['value']

        value = self._fetch_property(bus_name, path, interface, prop)
        self._cache[key] = {'value': value, 'time': time()}
        return value

# BAD: Fetch property every time
def get_property(proxy, interface, prop):
    props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
    return props.Get(interface, prop)  # Always fetches
```

---

## 7. Implementation Patterns

### Pattern 1: Secure D-Bus Client

```python
import dbus
from dbus.exceptions import DBusException
import logging

class SecureDBusClient:
    """Secure D-Bus client with access controls."""

    BLOCKED_SERVICES = {
        'org.freedesktop.PolicyKit1',          # Privilege escalation
        'org.freedesktop.systemd1',            # System service control
        'org.freedesktop.login1',              # Session/power management
        'org.gnome.keyring',                   # Secret storage
        'org.freedesktop.secrets',             # Secret service
        'org.freedesktop.PackageKit',          # Package installation
    }

    BLOCKED_INTERFACES = {
        'org.freedesktop.DBus.Properties',     # Can read/write any property
    }

    def __init__(self, bus_type: str = 'session', permission_tier: str = 'standard'):
        self.permission_tier = permission_tier
        self.logger = logging.getLogger('dbus.security')
        self.timeout = 30  # seconds

        # Connect to bus
        if bus_type == 'session':
            self.bus = dbus.SessionBus()
        elif bus_type == 'system':
            if permission_tier != 'elevated':
                raise PermissionError("System bus requires 'elevated' tier")
            self.bus = dbus.SystemBus()
        else:
            raise ValueError(f"Invalid bus type: {bus_type}")

    def get_object(self, bus_name: str, object_path: str) -> dbus.Interface:
        """Get D-Bus object with validation."""
        # Security check
        if bus_name in self.BLOCKED_SERVICES:
            self.logger.warning('blocked_service', service=bus_name)
            raise SecurityError(f"Access to {bus_name} is blocked")

        # Validate bus name format
        if not self._validate_bus_name(bus_name):
            raise ValueError(f"Invalid bus name: {bus_name}")

        # Get proxy object
        try:
            proxy = self.bus.get_object(bus_name, object_path)
            self._audit_log('get_object', bus_name, object_path)
            return proxy
        except DBusException as e:
            self.logger.error(f"D-Bus error: {e}")
            raise

    def call_method(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        method: str,
        *args
    ):
        """Call D-Bus method with validation."""
        # Security checks
        if interface in self.BLOCKED_INTERFACES:
            raise SecurityError(f"Interface {interface} is blocked")

        # Get object
        proxy = self.get_object(bus_name, object_path)
        iface = dbus.Interface(proxy, interface)

        # Call with timeout
        try:
            result = getattr(iface, method)(
                *args,
                timeout=self.timeout
            )
            self._audit_log('call_method', bus_name, f"{interface}.{method}")
            return result
        except DBusException as e:
            if 'Timeout' in str(e):
                raise TimeoutError(f"Method call timed out after {self.timeout}s")
            raise

    def get_peer_credentials(self, bus_name: str) -> dict:
        """Get credentials of D-Bus peer."""
        dbus_obj = self.bus.get_object(
            'org.freedesktop.DBus',
            '/org/freedesktop/DBus'
        )
        dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')

        return {
            'pid': dbus_iface.GetConnectionUnixProcessID(bus_name),
            'uid': dbus_iface.GetConnectionUnixUser(bus_name),
        }

    def _validate_bus_name(self, name: str) -> bool:
        """Validate D-Bus bus name format."""
        import re
        pattern = r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)+$'
        return bool(re.match(pattern, name)) and len(name) <= 255

    def _audit_log(self, action: str, service: str, detail: str):
        """Log operation for audit."""
        self.logger.info(
            f'dbus.{action}',
            extra={
                'service': service,
                'detail': detail,
                'permission_tier': self.permission_tier
            }
        )
```

### Pattern 2: Signal Monitoring

```python
from dbus.mainloop.glib import DBusGMainLoop
from gi.repository import GLib

class SecureSignalMonitor:
    """Monitor D-Bus signals safely."""

    ALLOWED_SIGNALS = {
        'org.freedesktop.Notifications': ['NotificationClosed', 'ActionInvoked'],
        'org.freedesktop.FileManager1': ['OpenLocationRequested'],
    }

    def __init__(self, client: SecureDBusClient):
        self.client = client
        self.handlers = {}
        self.logger = logging.getLogger('dbus.signals')

        # Setup main loop
        DBusGMainLoop(set_as_default=True)

    def subscribe(
        self,
        bus_name: str,
        interface: str,
        signal: str,
        handler
    ):
        """Subscribe to signal with validation."""
        # Check if signal is allowed
        allowed = self.ALLOWED_SIGNALS.get(interface, [])
        if signal not in allowed:
            raise SecurityError(f"Signal {interface}.{signal} not allowed")

        # Wrapper to log signal receipt
        def safe_handler(*args):
            self.logger.info(
                'signal_received',
                extra={'interface': interface, 'signal': signal}
            )
            handler(*args)

        # Subscribe
        self.client.bus.add_signal_receiver(
            safe_handler,
            signal_name=signal,
            dbus_interface=interface,
            bus_name=bus_name
        )
        self.handlers[(interface, signal)] = safe_handler

    def run(self, timeout: int = None):
        """Run signal loop with timeout."""
        loop = GLib.MainLoop()

        if timeout:
            GLib.timeout_add_seconds(timeout, loop.quit)

        loop.run()
```

### Pattern 3: Property Access Control

```python
class SecurePropertyAccess:
    """Controlled access to D-Bus properties."""

    READABLE_PROPERTIES = {
        'org.freedesktop.Notifications': ['ServerCapabilities'],
        'org.mpris.MediaPlayer2': ['Identity', 'PlaybackStatus'],
    }

    WRITABLE_PROPERTIES = {
        'org.mpris.MediaPlayer2.Player': ['Volume'],
    }

    def __init__(self, client: SecureDBusClient):
        self.client = client
        self.logger = logging.getLogger('dbus.properties')

    def get_property(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        property_name: str
    ):
        """Get property with access control."""
        # Check if property is readable
        allowed = self.READABLE_PROPERTIES.get(interface, [])
        if property_name not in allowed:
            raise SecurityError(f"Property {interface}.{property_name} not readable")

        proxy = self.client.get_object(bus_name, object_path)
        props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')

        value = props.Get(interface, property_name)
        self.logger.info(
            'property_read',
            extra={'interface': interface, 'property': property_name}
        )
        return value

    def set_property(
        self,
        bus_name: str,
        object_path: str,
        interface: str,
        property_name: str,
        value
    ):
        """Set property with access control."""
        if self.client.permission_tier == 'read-only':
            raise PermissionError("Setting properties requires 'standard' tier")

        # Check if property is writable
        allowed = self.WRITABLE_PROPERTIES.get(interface, [])
        if property_name not in allowed:
            raise SecurityError(f"Property {interface}.{property_name} not writable")

        proxy = self.client.get_object(bus_name, object_path)
        props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')

        props.Set(interface, property_name, value)
        self.logger.info(
            'property_write',
            extra={'interface': interface, 'property': property_name}
        )
```

### Pattern 4: Service Discovery

```python
class ServiceDiscovery:
    """Discover D-Bus services safely."""

    def __init__(self, client: SecureDBusClient):
        self.client = client

    def list_names(self) -> list:
        """List available bus names (filtered)."""
        dbus_obj = self.client.bus.get_object(
            'org.freedesktop.DBus',
            '/org/freedesktop/DBus'
        )
        dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')

        all_names = dbus_iface.ListNames()

        # Filter blocked services
        filtered = [
            name for name in all_names
            if name not in SecureDBusClient.BLOCKED_SERVICES
        ]

        return filtered

    def introspect(self, bus_name: str, object_path: str) -> str:
        """Get introspection XML for object."""
        if bus_name in SecureDBusClient.BLOCKED_SERVICES:
            raise SecurityError(f"Cannot introspect {bus_name}")

        proxy = self.client.get_object(bus_name, object_path)
        return proxy.Introspect(
            dbus_interface='org.freedesktop.DBus.Introspectable'
        )
```

---

## 5. Security Standards

### 5.1 Critical Vulnerabilities

#### 1. Privilege Escalation via PolicyKit (CVE-2021-4034)
- **Severity**: CRITICAL
- **Description**: Polkit vulnerability for local privilege escalation
- **Mitigation**: Block PolicyKit service access

#### 2. D-Bus Authentication Bypass (CVE-2022-42012)
- **Severity**: HIGH
- **Description**: Unauthorized session bus access
- **Mitigation**: Validate peer credentials

#### 3. Service Impersonation (CWE-290)
- **Severity**: HIGH
- **Description**: Malicious service claiming trusted name
- **Mitigation**: Verify service credentials

#### 4. Method Injection (CWE-74)
- **Severity**: MEDIUM
- **Description**: Malicious method parameters
- **Mitigation**: Input validation, service allowlists

#### 5. Information Disclosure (CWE-200)
- **Severity**: MEDIUM
- **Description**: Exposing sensitive service data
- **Mitigation**: Property access control

### 5.2 Permission Tier Model

```python
PERMISSION_TIERS = {
    'read-only': {
        'bus_type': 'session',
        'allowed_operations': ['get_property', 'introspect', 'list_names'],
        'blocked_services': BLOCKED_SERVICES,
    },
    'standard': {
        'bus_type': 'session',
        'allowed_operations': ['*', 'set_property', 'call_method'],
        'blocked_services': BLOCKED_SERVICES,
    },
    'elevated': {
        'bus_type': ['session', 'system'],
        'allowed_operations': ['*'],
        'blocked_services': ['org.freedesktop.PackageKit'],
    }
}
```

---

## 8. Common Mistakes

### Never: Access System Bus Without Need

```python
# BAD: Always use system bus
bus = dbus.SystemBus()

# GOOD: Prefer session bus
bus = dbus.SessionBus()
# Only use system bus when required
```

### Never: Allow PolicyKit Access

```python
# BAD: No service filtering
result = client.call_method('org.freedesktop.PolicyKit1', ...)

# GOOD: Block privileged services
if service not in BLOCKED_SERVICES:
    result = client.call_method(service, ...)
```

### Never: Skip Timeout Enforcement

```python
# BAD: No timeout
result = iface.SomeMethod()

# GOOD: With timeout
result = iface.SomeMethod(timeout=30)
```

---

## 13. Pre-Deployment Checklist

- [ ] Service blocklist configured
- [ ] Session bus preferred over system bus
- [ ] Timeout enforcement on all calls
- [ ] Peer credential validation
- [ ] Audit logging enabled
- [ ] Property access control configured

---

## 14. Summary

Your goal is to create D-Bus automation that is:
- **Secure**: Service blocklists, credential validation, access control
- **Reliable**: Timeout enforcement, error handling
- **Minimal**: Session bus by default, least privilege

**Security Reminders**:
1. Always prefer session bus over system bus
2. Block access to PolicyKit and systemd
3. Validate peer credentials when needed
4. Enforce timeouts on all method calls
5. Log all operations for audit

---

## References

- See `references/security-examples.md`
- See `references/threat-model.md`
- See `references/advanced-patterns.md`

Overview

This skill is an expert D-Bus IPC assistant for Linux focused on secure, auditable inter-process communication. It emphasizes using the session bus by default, blocking high-risk system services, enforcing timeouts, and logging every operation. It is designed for safe method calls, signal handling, and controlled property access. High-risk operations require explicit elevated permissions and validation.

How this skill works

The skill validates target bus names, checks against a blocked-services list, and chooses the appropriate bus type (session or system) before creating proxies. It performs method calls with enforced timeouts and wraps results with audit logging and optional credential checks. Signal subscriptions are filtered and whitelisted, and property reads use batching and optional caching to reduce bus load. Privileged actions require an elevated permission tier and explicit validation.

When to use it

  • Integrating user-level apps with desktop services (use session bus).
  • Monitoring safe, whitelisted signals from desktop components.
  • Reading or batching properties from services where freshness and performance matter.
  • When you need to call D-Bus methods but must enforce timeouts and audit logs.
  • Only use system bus operations when elevated permission is explicitly granted.

Best practices

  • Prefer session bus and least-privilege access by default.
  • Validate and sanitize bus names and object paths before use.
  • Maintain a blocked-services and blocked-interfaces allowlist for high-risk targets.
  • Reuse bus connections, batch property reads, and cache with TTL to reduce load.
  • Enforce timeouts, audit all operations, and validate peer credentials for sensitive calls.

Example use cases

  • A desktop notifier that reads ServerCapabilities from org.freedesktop.Notifications with caching and batching.
  • A media controller that queries org.mpris.MediaPlayer2 properties and writes Volume when permitted.
  • A secure signal monitor that only subscribes to approved signals and logs receipts for auditing.
  • An automated task that calls a non-privileged service method with a strict timeout to avoid hangs.
  • A diagnostic tool that fetches peer credentials for troubleshooting connection ownership and permissions.

FAQ

What bus should I use by default?

Use the session bus for user-level operations. Only use the system bus when the action requires system-level access and the caller has an elevated permission tier.

How do I avoid blocking privileged services?

Maintain a BLOCKED_SERVICES list and validate target names against it before any get_object or call_method operation; raise an explicit SecurityError for blocked targets.

How are timeouts handled for long-running calls?

Set a method call timeout on the client (default example: 30s). Catch D-Bus timeout errors and map them to a TimeoutError to ensure callers can handle retries or fallbacks.