home / skills / gigaverse-app / skillet / async-patterns

This skill helps you write robust async code by applying safe_gather patterns for fail-fast, timeouts, and clean cancellation.

npx playbooks add skill gigaverse-app/skillet --skill async-patterns

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
5.8 KB
---
name: async-patterns
description: Use when writing async code, using asyncio.gather, handling concurrent operations, or when user mentions "asyncio", "async", "await", "gather", "concurrent", "parallel tasks", "fail-fast", "timeout", "race condition", "cancellation".
---

# Async Patterns

## Rules Summary

1. Prefer `safe_gather` for all new async code - it provides fail-fast, timeout support, and cleaner cancellation
2. Use `safe_gather` with `return_exceptions=True` for partial-results patterns (better than asyncio.gather due to timeout support)
3. Migrating existing cleanup/shutdown code to `safe_gather` is optional (low priority, both work fine)
4. Only use `asyncio.wait` with `FIRST_COMPLETED` for incremental processing patterns
5. Always consider timeout and cancellation behavior when choosing async primitives

---

## safe_gather vs asyncio.gather

### Default Choice: safe_gather

**`safe_gather` is better than `asyncio.gather` in ALL cases** because it provides:
- Fail-fast cancellation (when not using `return_exceptions=True`)
- Timeout support with automatic cleanup
- Cleaner cancellation handling
- Same behavior as `asyncio.gather` when using `return_exceptions=True`

### Implementation

If you don't have `safe_gather` in your codebase, here's a reference implementation:

```python
import asyncio
from typing import Any, Coroutine

async def safe_gather(
    *coros: Coroutine[Any, Any, Any],
    return_exceptions: bool = False,
    timeout: float | None = None,
) -> list[Any]:
    """
    Gather coroutines with fail-fast behavior and optional timeout.

    Unlike asyncio.gather:
    - Cancels remaining tasks on first exception (unless return_exceptions=True)
    - Supports timeout with automatic cleanup
    - Handles cancellation more gracefully
    """
    if timeout is not None:
        return await asyncio.wait_for(
            _safe_gather_impl(*coros, return_exceptions=return_exceptions),
            timeout=timeout,
        )
    return await _safe_gather_impl(*coros, return_exceptions=return_exceptions)


async def _safe_gather_impl(
    *coros: Coroutine[Any, Any, Any],
    return_exceptions: bool = False,
) -> list[Any]:
    tasks = [asyncio.create_task(coro) for coro in coros]

    if return_exceptions:
        return await asyncio.gather(*tasks, return_exceptions=True)

    try:
        return await asyncio.gather(*tasks)
    except Exception:
        # Cancel remaining tasks on failure
        for task in tasks:
            if not task.done():
                task.cancel()
        # Wait for cancellation to complete
        await asyncio.gather(*tasks, return_exceptions=True)
        raise
```

### Pattern: All tasks must succeed (fail-fast)

```python
# Initialization - all workers must start
await safe_gather(*[worker.pre_run() for worker in workers])

# Data fetching - need all pieces
channel_info, front_row, participants = await safe_gather(
    fetch_channel_info(channel_id),
    fetch_front_row(channel_id),
    fetch_participants(channel_id),
)

# Validation - all must pass
await safe_gather(*validation_tasks)

# With timeout
results = await safe_gather(*tasks, timeout=30.0)
```

### Pattern: Partial results acceptable

```python
# Use safe_gather with return_exceptions=True
# Benefits: timeout support + cleaner cancellation vs asyncio.gather
results = await safe_gather(*batch_tasks, return_exceptions=True)
for result in results:
    if isinstance(result, Exception):
        logger.error(f"Task failed: {result}")
    else:
        process(result)

# With timeout for partial results
results = await safe_gather(*batch_tasks, return_exceptions=True, timeout=30.0)
```

### Pattern: Cleanup/shutdown

```python
# Prefer safe_gather (better cancellation handling)
await safe_gather(*cancelled_tasks, return_exceptions=True)

# With timeout for cleanup
await safe_gather(
    service1.shutdown(),
    service2.shutdown(),
    service3.shutdown(),
    return_exceptions=True,
    timeout=10.0  # Don't wait forever for cleanup
)

# OK to keep existing asyncio.gather (migration is optional, both work)
await asyncio.gather(*cancelled_tasks, return_exceptions=True)
```

### When to Keep asyncio.gather

**Only keep for existing code to avoid churn:**
- Existing cleanup/shutdown code that works fine
- Low priority to migrate (both behaviors are equivalent)
- Focus migration efforts on new code and high-value patterns

## Migration Decision Tree

```
Is this new code you're writing?
├─ Yes -> Use safe_gather
└─ No (existing code)
   └─ Is it cleanup/shutdown with return_exceptions=True?
      ├─ Yes -> Keep asyncio.gather (optional to migrate)
      └─ No -> Evaluate migration benefit
         └─ Would fail-fast or timeout help?
            ├─ Yes -> Migrate to safe_gather
            └─ No -> Low priority, either is fine
```

## Common Patterns

### Initialization Pattern
```python
# Old
await asyncio.gather(*[worker.pre_run() for worker in workers])

# New
await safe_gather(*[worker.pre_run() for worker in workers])
```

### Tuple Unpacking Pattern
```python
# Old
first, last = await asyncio.gather(
    get_first_item(id),
    get_last_item(id),
)

# New
first, last = await safe_gather(
    get_first_item(id),
    get_last_item(id),
)
```

### Cleanup Pattern (DO NOT CHANGE unless adding timeout)
```python
# Correct - keep as-is
await asyncio.gather(*self._running_tasks, return_exceptions=True)

# Or upgrade if you want timeout
await safe_gather(*self._running_tasks, return_exceptions=True, timeout=10.0)
```

## Key Principles

1. **Fail-fast by default**: If one task fails, cancel the rest immediately
2. **Always consider timeout**: Long-running operations should have timeouts
3. **Clean cancellation**: Always handle CancelledError properly
4. **Partial results when appropriate**: Use `return_exceptions=True` for batch operations
5. **Don't wait forever**: Especially for cleanup/shutdown operations

Overview

This skill helps you choose and apply safe async concurrency patterns when writing asyncio code. It promotes a drop-in safe_gather primitive that adds fail-fast behavior, timeout support, and cleaner cancellation compared to asyncio.gather. Use it to make concurrent code more robust, predictable, and easier to reason about.

How this skill works

The skill recommends using safe_gather for new async code by creating tasks, canceling remaining tasks on the first error (unless return_exceptions=True), and optionally enforcing a timeout with automatic cleanup. For partial-result batches it uses safe_gather(return_exceptions=True) to surface both results and exceptions while still supporting timeouts. Use asyncio.wait(FIRST_COMPLETED) only for incremental-processing patterns.

When to use it

  • When writing new async code that spawns multiple coroutines concurrently
  • When you need fail-fast semantics so one error cancels the rest
  • When you want timeout support that cleans up remaining tasks
  • When running batch work where some failures are acceptable (use return_exceptions=True)
  • When handling shutdown/cleanup and you want explicit time limits for graceful termination

Best practices

  • Default to safe_gather for new concurrent operations to get fail-fast and timeout behavior
  • Use return_exceptions=True with safe_gather for partial-result patterns and inspect exceptions per-task
  • Always consider and set reasonable timeouts for long-running or shutdown operations
  • Handle asyncio.CancelledError and perform necessary cleanup after cancellations
  • Only use asyncio.wait(FIRST_COMPLETED) for incremental processing where you need the first finished task

Example use cases

  • Start many worker initializations and require all to succeed: await safe_gather(*init_tasks)
  • Fetch multiple independent resources in parallel with a global timeout: await safe_gather(*fetches, timeout=30.0)
  • Run a batch where some failures are allowed: results = await safe_gather(*batch, return_exceptions=True) and log errors
  • Shutdown multiple services with a short timeout to avoid blocking shutdown: await safe_gather(service1.shutdown(), service2.shutdown(), return_exceptions=True, timeout=10.0)
  • Migrate an initialization pattern from asyncio.gather to safe_gather to gain fail-fast semantics

FAQ

Why prefer safe_gather over asyncio.gather?

safe_gather cancels remaining tasks on the first failure (unless you opt into return_exceptions=True), supports timeouts with cleanup, and handles cancellation more clearly while matching asyncio.gather behavior when return_exceptions=True.

Should I refactor all existing asyncio.gather calls to safe_gather?

No. Prioritize new code and high-value migrations. Keep existing cleanup/shutdown calls that already use return_exceptions=True unless you need timeout or improved cancellation handling.