home / skills / chunkytortoise / enterprisehub / condition-based-waiting

This skill helps you resolve race conditions in tests by waiting for actual conditions instead of fixed delays.

npx playbooks add skill chunkytortoise/enterprisehub --skill condition-based-waiting

Review the files below or copy the command above to add this skill to your agents.

Files (3)
SKILL.md
13.8 KB
---
name: Condition-Based Waiting
description: This skill should be used when dealing with "race conditions in tests", "Redis timing issues", "WebSocket connection timing", "async test failures", "flaky tests due to timing", "test synchronization problems", or when tests fail intermittently due to timing issues.
version: 1.0.0
---

# Condition-Based Waiting: Fixing Race Conditions in Tests

## Overview

This skill provides systematic approaches to fixing race conditions and timing issues in tests, particularly common with Redis connections, WebSocket interactions, database operations, and async workflows.

## When to Use This Skill

Use this skill when you encounter:
- **Intermittent test failures** that pass on retry
- **Redis connection timing issues** in async tests
- **WebSocket connection race conditions**
- **Database transaction timing problems**
- **API response timing inconsistencies**
- **File system operation delays**
- **Async/await synchronization issues**

## Core Principles

### 1. Replace Sleep with Condition Waiting
```python
# ❌ BAD: Fixed delays are unreliable
import time
time.sleep(2)  # Arbitrary delay

# ✅ GOOD: Wait for actual condition
import asyncio
from typing import Callable, Any

async def wait_for_condition(
    condition: Callable[[], bool],
    timeout: float = 10.0,
    poll_interval: float = 0.1,
    error_message: str = "Condition not met"
) -> None:
    """Wait for a condition to become true within timeout."""
    start_time = asyncio.get_event_loop().time()

    while True:
        if condition():
            return

        if asyncio.get_event_loop().time() - start_time > timeout:
            raise TimeoutError(error_message)

        await asyncio.sleep(poll_interval)
```

### 2. Redis Connection Readiness
```python
# ✅ GOOD: Proper Redis connection waiting
import redis.asyncio as redis
import pytest

@pytest.fixture
async def redis_client():
    client = redis.Redis(host='localhost', port=6379, decode_responses=True)

    # Wait for Redis to be ready
    await wait_for_condition(
        condition=lambda: asyncio.create_task(ping_redis(client)),
        timeout=30.0,
        error_message="Redis not available within 30 seconds"
    )

    yield client
    await client.close()

async def ping_redis(client: redis.Redis) -> bool:
    """Check if Redis is responsive."""
    try:
        await client.ping()
        return True
    except Exception:
        return False
```

### 3. WebSocket Connection Synchronization
```python
# ✅ GOOD: WebSocket state verification
import websockets
import json

class WebSocketTestClient:
    def __init__(self, uri: str):
        self.uri = uri
        self.websocket = None
        self.connected = False
        self.messages = []

    async def connect(self):
        """Connect and wait for readiness."""
        self.websocket = await websockets.connect(self.uri)
        self.connected = True

        # Wait for initial handshake/auth response
        await self.wait_for_message_count(1, timeout=5.0)

    async def wait_for_message_count(self, count: int, timeout: float = 10.0):
        """Wait for specific number of messages."""
        await wait_for_condition(
            condition=lambda: len(self.messages) >= count,
            timeout=timeout,
            error_message=f"Expected {count} messages, got {len(self.messages)}"
        )

    async def wait_for_message_containing(self, text: str, timeout: float = 10.0):
        """Wait for message containing specific text."""
        await wait_for_condition(
            condition=lambda: any(text in msg for msg in self.messages),
            timeout=timeout,
            error_message=f"No message containing '{text}' received"
        )
```

### 4. Database Transaction Consistency
```python
# ✅ GOOD: Database state verification
import asyncpg
from typing import Optional

class DatabaseTestHelper:
    def __init__(self, db_pool: asyncpg.Pool):
        self.pool = db_pool

    async def wait_for_record(
        self,
        table: str,
        conditions: dict,
        timeout: float = 10.0
    ) -> Optional[dict]:
        """Wait for a database record to appear."""
        async def check_record():
            async with self.pool.acquire() as conn:
                query = f"SELECT * FROM {table} WHERE " + " AND ".join(
                    f"{k} = ${i+1}" for i, k in enumerate(conditions.keys())
                )
                result = await conn.fetchrow(query, *conditions.values())
                return result is not None

        await wait_for_condition(
            condition=check_record,
            timeout=timeout,
            error_message=f"Record not found in {table} with {conditions}"
        )

        # Fetch the actual record
        async with self.pool.acquire() as conn:
            query = f"SELECT * FROM {table} WHERE " + " AND ".join(
                f"{k} = ${i+1}" for i, k in enumerate(conditions.keys())
            )
            return await conn.fetchrow(query, *conditions.values())
```

### 5. File System Operation Synchronization
```python
# ✅ GOOD: File system waiting
import os
import aiofiles
from pathlib import Path

async def wait_for_file(
    file_path: Path,
    timeout: float = 10.0,
    min_size: int = 0
) -> None:
    """Wait for file to exist and optionally reach minimum size."""
    await wait_for_condition(
        condition=lambda: (
            file_path.exists() and
            file_path.stat().st_size >= min_size
        ),
        timeout=timeout,
        error_message=f"File {file_path} not ready within {timeout}s"
    )

async def wait_for_file_content(
    file_path: Path,
    expected_content: str,
    timeout: float = 10.0
) -> None:
    """Wait for file to contain specific content."""
    async def check_content():
        try:
            async with aiofiles.open(file_path, 'r') as f:
                content = await f.read()
                return expected_content in content
        except (FileNotFoundError, PermissionError):
            return False

    await wait_for_condition(
        condition=check_content,
        timeout=timeout,
        error_message=f"File {file_path} doesn't contain expected content"
    )
```

## Integration Patterns

### 1. Test Fixture with Built-in Waiting
```python
@pytest.fixture
async def system_ready():
    """Comprehensive system readiness check."""
    # Wait for Redis
    redis_client = redis.Redis(decode_responses=True)
    await wait_for_condition(
        condition=lambda: asyncio.create_task(ping_redis(redis_client)),
        timeout=30.0
    )

    # Wait for Database
    db_pool = await asyncpg.create_pool("postgresql://localhost/testdb")
    await wait_for_condition(
        condition=lambda: asyncio.create_task(ping_database(db_pool)),
        timeout=30.0
    )

    # Wait for HTTP service
    await wait_for_condition(
        condition=lambda: asyncio.create_task(ping_http_service("http://localhost:8000/health")),
        timeout=60.0
    )

    yield {
        'redis': redis_client,
        'db': db_pool
    }

    # Cleanup
    await redis_client.close()
    await db_pool.close()
```

### 2. Smart Retry Mechanisms
```python
import functools
from typing import TypeVar, Callable, Any

T = TypeVar('T')

def retry_on_condition(
    max_retries: int = 3,
    delay: float = 1.0,
    backoff_multiplier: float = 2.0,
    retry_exceptions: tuple = (ConnectionError, TimeoutError)
):
    """Decorator for retrying with exponential backoff."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            current_delay = delay

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)
                except retry_exceptions as e:
                    if attempt == max_retries:
                        raise
                    await asyncio.sleep(current_delay)
                    current_delay *= backoff_multiplier

        return wrapper
    return decorator

# Usage:
@retry_on_condition(max_retries=3, delay=0.5)
async def flaky_api_call():
    """API call that might fail due to timing."""
    async with aiohttp.ClientSession() as session:
        async with session.get("http://api.example.com/data") as response:
            return await response.json()
```

## Project-Specific Patterns

### For EnterpriseHub GHL Real Estate AI

#### 1. Streamlit Component Testing
```python
# ✅ Streamlit app readiness
async def wait_for_streamlit_app(
    port: int = 8501,
    timeout: float = 30.0
) -> None:
    """Wait for Streamlit app to be ready."""
    url = f"http://localhost:{port}"

    async def check_app():
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    return response.status == 200
        except aiohttp.ClientError:
            return False

    await wait_for_condition(
        condition=check_app,
        timeout=timeout,
        error_message=f"Streamlit app not ready on port {port}"
    )
```

#### 2. AI Model Loading Synchronization
```python
# ✅ Wait for AI model initialization
async def wait_for_model_ready(
    model_service,
    timeout: float = 120.0
) -> None:
    """Wait for AI model to be loaded and ready."""
    await wait_for_condition(
        condition=lambda: model_service.is_ready(),
        timeout=timeout,
        poll_interval=2.0,
        error_message="AI model not ready within timeout"
    )
```

#### 3. GHL API Synchronization
```python
# ✅ Wait for GHL webhook processing
async def wait_for_webhook_processed(
    webhook_id: str,
    webhook_service,
    timeout: float = 30.0
) -> None:
    """Wait for GHL webhook to be processed."""
    await wait_for_condition(
        condition=lambda: webhook_service.is_processed(webhook_id),
        timeout=timeout,
        error_message=f"Webhook {webhook_id} not processed"
    )
```

## Best Practices

### 1. Proper Timeout Configuration
```python
# Environment-specific timeouts
TIMEOUTS = {
    'test': {
        'redis_connection': 5.0,
        'db_transaction': 10.0,
        'api_response': 15.0,
        'websocket_connect': 5.0
    },
    'ci': {
        'redis_connection': 30.0,
        'db_transaction': 60.0,
        'api_response': 45.0,
        'websocket_connect': 30.0
    },
    'local': {
        'redis_connection': 10.0,
        'db_transaction': 30.0,
        'api_response': 30.0,
        'websocket_connect': 10.0
    }
}

def get_timeout(operation: str) -> float:
    """Get environment-appropriate timeout."""
    env = os.getenv('TEST_ENV', 'local')
    return TIMEOUTS.get(env, TIMEOUTS['local']).get(operation, 30.0)
```

### 2. Comprehensive Logging
```python
import logging

async def wait_for_condition_with_logging(
    condition: Callable[[], bool],
    timeout: float,
    operation_name: str,
    poll_interval: float = 0.1
) -> None:
    """Wait for condition with detailed logging."""
    logger = logging.getLogger(__name__)
    logger.info(f"Starting wait for: {operation_name}")

    start_time = asyncio.get_event_loop().time()
    attempt = 0

    while True:
        attempt += 1

        if condition():
            elapsed = asyncio.get_event_loop().time() - start_time
            logger.info(f"Condition met for {operation_name} after {elapsed:.2f}s ({attempt} attempts)")
            return

        elapsed = asyncio.get_event_loop().time() - start_time
        if elapsed > timeout:
            logger.error(f"Timeout waiting for {operation_name} after {elapsed:.2f}s ({attempt} attempts)")
            raise TimeoutError(f"{operation_name} not ready within {timeout}s")

        if attempt % 50 == 0:  # Log every 5 seconds with 0.1s poll interval
            logger.debug(f"Still waiting for {operation_name} ({elapsed:.1f}s elapsed)")

        await asyncio.sleep(poll_interval)
```

### 3. Health Check Patterns
```python
class HealthChecker:
    """Centralized health checking for all services."""

    def __init__(self):
        self.checks = {}

    def register_check(self, name: str, check_func: Callable[[], bool]):
        """Register a health check."""
        self.checks[name] = check_func

    async def wait_for_all_healthy(self, timeout: float = 60.0):
        """Wait for all registered services to be healthy."""
        for name, check_func in self.checks.items():
            await wait_for_condition(
                condition=check_func,
                timeout=timeout,
                error_message=f"Service {name} not healthy"
            )

    async def get_health_status(self) -> dict:
        """Get current health status of all services."""
        status = {}
        for name, check_func in self.checks.items():
            try:
                status[name] = check_func()
            except Exception as e:
                status[name] = False
                logging.warning(f"Health check failed for {name}: {e}")
        return status

# Usage in tests:
@pytest.fixture(scope="session")
async def healthy_system():
    health_checker = HealthChecker()

    # Register all service health checks
    health_checker.register_check("redis", lambda: ping_redis())
    health_checker.register_check("database", lambda: ping_database())
    health_checker.register_check("api", lambda: ping_api())

    # Wait for everything to be ready
    await health_checker.wait_for_all_healthy(timeout=120.0)

    yield health_checker
```

## Common Pitfalls to Avoid

1. **Don't use fixed sleep() calls** - Always wait for actual conditions
2. **Don't ignore cleanup** - Always close connections after tests
3. **Don't use infinite loops** - Always include timeouts
4. **Don't ignore logging** - Log wait operations for debugging
5. **Don't assume instant readiness** - Services need time to initialize

## Integration with Existing Tests

This skill works seamlessly with the existing test-driven-development skill and enhances the verification-before-completion quality gates by providing robust timing synchronization patterns.

Overview

This skill provides practical, reusable patterns to eliminate race conditions and timing-related flakiness in tests. It focuses on replacing brittle fixed sleeps with condition-based waiting, and offers helpers for Redis, WebSockets, databases, file I/O, and service health checks. The goal is predictable tests that wait for actual readiness rather than arbitrary delays.

How this skill works

The skill supplies a small set of async wait-for-condition primitives and higher-level helpers that poll until a predicate becomes true or a timeout elapses. It integrates with test fixtures and retry decorators so you can wait for Redis pings, WebSocket handshakes, database records, file writes, or external HTTP health checks. Logging, environment-aware timeouts, and exponential backoff patterns make waits observable and configurable for local and CI environments.

When to use it

  • Intermittent test failures that pass on retry
  • Async tests with Redis connection timing issues
  • WebSocket connection handshakes and message ordering problems
  • Database transactions or eventual consistency delays
  • File system operations that complete asynchronously

Best practices

  • Replace fixed time.sleep calls with condition-based waiting and explicit timeouts
  • Tune timeouts per environment (local vs CI) and expose them via env vars
  • Log wait progress and reasons for timeouts to make failures actionable
  • Always close and clean up connections in fixtures to avoid cascading flakiness
  • Avoid infinite loops—use sensible poll intervals and overall timeouts

Example use cases

  • A pytest fixture that waits for Redis, Postgres, and an HTTP service before running integration tests
  • An async WebSocket test client that waits for handshake messages and content before asserting
  • A database helper that waits for a specific record to appear after a background job
  • A file-watcher test that waits for a file to exist and reach a minimum size or contain expected content
  • A retry decorator for flaky API calls with exponential backoff in test helpers

FAQ

Why not just increase sleep durations to avoid flakiness?

Fixed sleeps mask the root cause, slow tests unnecessarily, and still can fail under load or CI variability. Condition-based waits only delay as long as needed and fail fast with diagnostic messages.

How do I choose poll intervals and timeouts?

Use short poll intervals (e.g. 0.05–0.2s) for responsiveness and set timeouts based on environment: smaller locally, larger in CI. Make them configurable via environment variables.