home / skills / 0xdarkmatter / claude-mods / python-async-patterns

python-async-patterns skill

/skills/python-async-patterns

This skill helps you implement Python asyncio patterns for concurrent programming, improving reliability, performance, and readability in async applications.

npx playbooks add skill 0xdarkmatter/claude-mods --skill python-async-patterns

Review the files below or copy the command above to add this skill to your agents.

Files (10)
SKILL.md
4.2 KB
---
name: python-async-patterns
description: "Python asyncio patterns for concurrent programming. Triggers on: asyncio, async, await, coroutine, gather, semaphore, TaskGroup, event loop, aiohttp, concurrent."
compatibility: "Python 3.10+ recommended. Some patterns require 3.11+ (TaskGroup, timeout)."
allowed-tools: "Read Write"
depends-on: [python-typing-patterns]
related-skills: [python-fastapi-patterns, python-observability-patterns]
---

# Python Async Patterns

Asyncio patterns for concurrent Python programming.

## Core Concepts

```python
import asyncio

# Coroutine (must be awaited)
async def fetch(url: str) -> str:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# Entry point
async def main():
    result = await fetch("https://example.com")
    return result

asyncio.run(main())
```

## Pattern 1: Concurrent with gather

```python
async def fetch_all(urls: list[str]) -> list[str]:
    """Fetch multiple URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)
```

## Pattern 2: Bounded Concurrency

```python
async def fetch_with_limit(urls: list[str], limit: int = 10):
    """Limit concurrent requests."""
    semaphore = asyncio.Semaphore(limit)

    async def bounded_fetch(url):
        async with semaphore:
            return await fetch_one(url)

    return await asyncio.gather(*[bounded_fetch(url) for url in urls])
```

## Pattern 3: TaskGroup (Python 3.11+)

```python
async def process_items(items):
    """Structured concurrency with automatic cleanup."""
    async with asyncio.TaskGroup() as tg:
        for item in items:
            tg.create_task(process_one(item))
    # All tasks complete here, or exception raised
```

## Pattern 4: Timeout

```python
async def with_timeout():
    try:
        async with asyncio.timeout(5.0):  # Python 3.11+
            result = await slow_operation()
    except asyncio.TimeoutError:
        result = None
    return result
```

## Critical Warnings

```python
# WRONG - blocks event loop
async def bad():
    time.sleep(5)         # Never use time.sleep!
    requests.get(url)     # Blocking I/O!

# CORRECT
async def good():
    await asyncio.sleep(5)
    async with aiohttp.ClientSession() as s:
        await s.get(url)
```

```python
# WRONG - orphaned task
async def bad():
    asyncio.create_task(work())  # May be garbage collected!

# CORRECT - keep reference
async def good():
    task = asyncio.create_task(work())
    await task
```

## Quick Reference

| Pattern | Use Case |
|---------|----------|
| `gather(*tasks)` | Multiple independent operations |
| `Semaphore(n)` | Rate limiting, resource constraints |
| `TaskGroup()` | Structured concurrency (3.11+) |
| `Queue()` | Producer-consumer |
| `timeout(s)` | Timeout wrapper (3.11+) |
| `Lock()` | Shared mutable state |

## Async Context Manager

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_connection():
    conn = await create_connection()
    try:
        yield conn
    finally:
        await conn.close()
```

## Additional Resources

For detailed patterns, load:
- `./references/concurrency-patterns.md` - Queue, Lock, producer-consumer
- `./references/aiohttp-patterns.md` - HTTP client/server patterns
- `./references/mixing-sync-async.md` - run_in_executor, thread pools
- `./references/debugging-async.md` - Debug mode, profiling, finding issues
- `./references/production-patterns.md` - Graceful shutdown, health checks, signal handling
- `./references/error-handling.md` - Retry with backoff, circuit breakers, partial failures
- `./references/performance.md` - uvloop, connection pooling, buffer sizing

## Scripts

- `./scripts/find-blocking-calls.sh` - Scan code for blocking calls in async functions

## Assets

- `./assets/async-project-template.py` - Production-ready async app skeleton

---

## See Also

**Prerequisites:**
- `python-typing-patterns` - Type hints for async functions

**Related Skills:**
- `python-fastapi-patterns` - Async web APIs
- `python-observability-patterns` - Async logging and tracing
- `python-database-patterns` - Async database access

Overview

This skill provides practical asyncio patterns for concurrent Python programming, focused on real-world usage of coroutines, gathering, bounded concurrency, TaskGroup, timeouts, and aiohttp. It aims to help developers write safe, efficient async code and avoid common pitfalls like blocking the event loop or orphaned tasks.

How this skill works

The skill inspects code patterns and offers idiomatic implementations: concurrent execution with asyncio.gather, bounded concurrency via Semaphore, structured concurrency with TaskGroup (Python 3.11+), and timeout handling with asyncio.timeout. It highlights correct async I/O (aiohttp), async context managers, and anti-patterns such as blocking calls and lost tasks, plus references and scripts for deeper checks.

When to use it

  • Implement concurrent HTTP clients or other I/O-bound workloads.
  • Limit parallelism to avoid resource exhaustion (rate limits, DB connections).
  • Migrate sync code to async or detect blocking calls in async functions.
  • Adopt structured concurrency and timeouts for robust task lifecycles.
  • Build producer-consumer pipelines or background workers with Queue and TaskGroup.

Best practices

  • Never use blocking calls (time.sleep, requests) inside async functions—use awaitable alternatives or run_in_executor.
  • Keep references to create_task results or use TaskGroup to avoid orphaned tasks and ensure cleanup.
  • Use Semaphore or a bounded worker pool to control parallelism and protect external services.
  • Prefer asyncio.timeout and TaskGroup (3.11+) for clear timeouts and structured concurrency.
  • Use async context managers for resource acquisition and ensure proper closing of connections.

Example use cases

  • Fetch hundreds of URLs concurrently while limiting concurrency to avoid rate limits using Semaphore + aiohttp.
  • Process a queue of jobs with a TaskGroup ensuring all tasks finish or exceptions propagate cleanly.
  • Wrap slow operations with asyncio.timeout to implement request-level timeouts and fallback handling.
  • Scan a codebase for blocking calls in async functions using the provided script before deploying to production.
  • Create a reusable async context manager for pooled connections and safe cleanup.

FAQ

When should I use asyncio.gather vs TaskGroup?

Use asyncio.gather for simple independent tasks; use TaskGroup for structured concurrency when you want automatic cancellation, clearer lifecycle management, and better exception propagation (Python 3.11+).

How do I avoid blocking the event loop?

Replace blocking calls with awaitable equivalents (asyncio.sleep, aiohttp) or run CPU/blocking I/O in thread/process pools via run_in_executor or concurrent.futures. Scan code with the provided script to find offenses.

How do I limit concurrent requests?

Use asyncio.Semaphore to bound concurrency or implement a worker pool that consumes from an asyncio.Queue to control parallelism and resource usage.