home / skills / thebushidocollective / han / python-async-patterns

This skill helps you master Python asynchronous programming with asyncio and concurrent futures, enabling efficient I/O-bound and CPU-bound task patterns.

npx playbooks add skill thebushidocollective/han --skill python-async-patterns

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
14.6 KB
---
name: python-async-patterns
user-invocable: false
description: Master Python asynchronous programming with asyncio, async/await,
  and concurrent.futures. Use for async code and concurrency patterns.
allowed-tools:
  - Bash
  - Read
---

# Python Async Patterns

Master asynchronous programming in Python using asyncio, async/await syntax,
and concurrent execution patterns for I/O-bound and CPU-bound tasks.

## Basic Async/Await

**Core async syntax:**

```python
import asyncio

# Define async function with async def
async def fetch_data(url: str) -> str:
    print(f"Fetching {url}...")
    await asyncio.sleep(1)  # Simulate I/O operation
    return f"Data from {url}"

# Call async function
async def main() -> None:
    result = await fetch_data("https://api.example.com")
    print(result)

# Run async function
asyncio.run(main())
```

**Multiple concurrent operations:**

```python
import asyncio

async def fetch_url(url: str) -> str:
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main() -> None:
    # Run concurrently with gather
    results = await asyncio.gather(
        fetch_url("https://example.com/1"),
        fetch_url("https://example.com/2"),
        fetch_url("https://example.com/3")
    )
    for result in results:
        print(result)

asyncio.run(main())
```

## asyncio.create_task

**Creating and managing tasks:**

```python
import asyncio

async def process_item(item: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Processed {item}"

async def main() -> None:
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(process_item("A", 2.0))
    task2 = asyncio.create_task(process_item("B", 1.0))
    task3 = asyncio.create_task(process_item("C", 1.5))

    # Do other work while tasks run
    print("Tasks started")

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(result1, result2, result3)

asyncio.run(main())
```

**Task with name and context:**

```python
import asyncio

async def background_task(name: str) -> None:
    print(f"Task {name} starting")
    await asyncio.sleep(2)
    print(f"Task {name} completed")

async def main() -> None:
    # Create named task
    task = asyncio.create_task(
        background_task("worker"),
        name="background-worker"
    )

    # Check task status
    print(f"Task name: {task.get_name()}")
    print(f"Task done: {task.done()}")

    await task

asyncio.run(main())
```

## asyncio.gather vs asyncio.wait

**Using gather for results:**

```python
import asyncio

async def fetch(n: int) -> int:
    await asyncio.sleep(1)
    return n * 2

async def main() -> None:
    # gather returns results in order
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3)
    )
    print(results)  # [2, 4, 6]

    # Return exceptions instead of raising
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        return_exceptions=True
    )

asyncio.run(main())
```

**Using wait for more control:**

```python
import asyncio

async def worker(n: int) -> int:
    await asyncio.sleep(n)
    return n

async def main() -> None:
    tasks = [
        asyncio.create_task(worker(1)),
        asyncio.create_task(worker(2)),
        asyncio.create_task(worker(3))
    ]

    # Wait for first task to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"Done: {len(done)}, Pending: {len(pending)}")

    # Cancel pending tasks
    for task in pending:
        task.cancel()

    # Wait for all with timeout
    done, pending = await asyncio.wait(
        tasks,
        timeout=2.0
    )

asyncio.run(main())
```

## Async Context Managers

**Creating async context managers:**

```python
import asyncio
from typing import AsyncIterator

class AsyncResource:
    async def __aenter__(self) -> "AsyncResource":
        print("Acquiring resource")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        print("Releasing resource")
        await asyncio.sleep(0.1)

    async def query(self) -> str:
        return "data"

async def main() -> None:
    async with AsyncResource() as resource:
        result = await resource.query()
        print(result)

asyncio.run(main())
```

**Using asynccontextmanager decorator:**

```python
from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
    # Setup
    print(f"Connecting to {url}")
    await asyncio.sleep(0.1)
    conn = f"connection-{url}"

    try:
        yield conn
    finally:
        # Teardown
        print(f"Closing connection to {url}")
        await asyncio.sleep(0.1)

async def main() -> None:
    async with get_connection("localhost") as conn:
        print(f"Using {conn}")

asyncio.run(main())
```

## Async Iterators

**Creating async iterators:**

```python
import asyncio
from typing import AsyncIterator

class AsyncRange:
    def __init__(self, count: int) -> None:
        self.count = count

    def __aiter__(self) -> AsyncIterator[int]:
        return self

    async def __anext__(self) -> int:
        if self.count <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.count -= 1
        return self.count

async def main() -> None:
    async for i in AsyncRange(5):
        print(i)

asyncio.run(main())
```

**Async generator functions:**

```python
import asyncio
from typing import AsyncIterator

async def async_range(count: int) -> AsyncIterator[int]:
    for i in range(count):
        await asyncio.sleep(0.1)
        yield i

async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
    for url in urls:
        await asyncio.sleep(0.5)
        yield f"Page content from {url}"

async def main() -> None:
    async for num in async_range(5):
        print(num)

    urls = ["url1", "url2", "url3"]
    async for page in fetch_pages(urls):
        print(page)

asyncio.run(main())
```

## Async Queues

**Producer-consumer pattern with Queue:**

```python
import asyncio
from asyncio import Queue

async def producer(queue: Queue[int], n: int) -> None:
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f"Produced {i}")
    await queue.put(None)  # Sentinel value

async def consumer(queue: Queue[int], name: str) -> None:
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(0.2)
        print(f"Consumer {name} processed {item}")
        queue.task_done()

async def main() -> None:
    queue: Queue[int] = Queue(maxsize=5)

    # Start producer and consumers
    prod = asyncio.create_task(producer(queue, 10))
    cons1 = asyncio.create_task(consumer(queue, "A"))
    cons2 = asyncio.create_task(consumer(queue, "B"))

    await prod
    await queue.join()  # Wait for all tasks to be processed

    # Signal consumers to exit
    await queue.put(None)
    await queue.put(None)

    await cons1
    await cons2

asyncio.run(main())
```

## Semaphore and Lock

**Limiting concurrent operations:**

```python
import asyncio

async def fetch_with_limit(
    url: str,
    semaphore: asyncio.Semaphore
) -> str:
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Data from {url}"

async def main() -> None:
    # Limit to 3 concurrent operations
    semaphore = asyncio.Semaphore(3)

    urls = [f"https://example.com/{i}" for i in range(10)]

    tasks = [
        fetch_with_limit(url, semaphore)
        for url in urls
    ]

    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())
```

**Using Lock for mutual exclusion:**

```python
import asyncio

class Counter:
    def __init__(self) -> None:
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self) -> None:
        async with self.lock:
            # Critical section
            current = self.value
            await asyncio.sleep(0.01)
            self.value = current + 1

async def main() -> None:
    counter = Counter()

    # Run increments concurrently
    await asyncio.gather(*[
        counter.increment()
        for _ in range(100)
    ])

    print(f"Final value: {counter.value}")  # Should be 100

asyncio.run(main())
```

## Timeouts and Cancellation

**Using timeout:**

```python
import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(5)
    return "completed"

async def main() -> None:
    # Timeout after 2 seconds
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())
```

**Handling cancellation:**

```python
import asyncio

async def cancellable_task() -> None:
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise

async def main() -> None:
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(3)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancellation confirmed")

asyncio.run(main())
```

## Event Loop Management

**Direct event loop control:**

```python
import asyncio

async def task1() -> None:
    print("Task 1")

async def task2() -> None:
    print("Task 2")

# Create new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

try:
    # Schedule callbacks
    loop.call_soon(lambda: print("Callback"))

    # Schedule delayed callback
    loop.call_later(1.0, lambda: print("Delayed"))

    # Run coroutine
    loop.run_until_complete(task1())

    # Run multiple tasks
    loop.run_until_complete(
        asyncio.gather(task1(), task2())
    )
finally:
    loop.close()
```

## concurrent.futures

**ThreadPoolExecutor for I/O-bound tasks:**

```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io_task(n: int) -> int:
    print(f"Task {n} starting")
    time.sleep(2)  # Blocking I/O
    return n * 2

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ThreadPoolExecutor(max_workers=3) as executor:
        # Run blocking function in thread pool
        tasks = [
            loop.run_in_executor(executor, blocking_io_task, i)
            for i in range(5)
        ]

        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())
```

**ProcessPoolExecutor for CPU-bound tasks:**

```python
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive_task(n: int) -> int:
    # CPU-intensive computation
    result = sum(i * i for i in range(n))
    return result

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor(max_workers=4) as executor:
        # Run CPU-bound function in process pool
        tasks = [
            loop.run_in_executor(
                executor,
                cpu_intensive_task,
                10_000_000
            )
            for _ in range(4)
        ]

        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} tasks")

asyncio.run(main())
```

## Async HTTP with aiohttp

**Making async HTTP requests:**

```python
import asyncio
import aiohttp

async def fetch_url(
    session: aiohttp.ClientSession,
    url: str
) -> str:
    async with session.get(url) as response:
        return await response.text()

async def main() -> None:
    async with aiohttp.ClientSession() as session:
        urls = [
            "https://example.com/1",
            "https://example.com/2",
            "https://example.com/3"
        ]

        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        print(f"Fetched {len(results)} pages")

asyncio.run(main())
```

## Error Handling

**Handling exceptions in async code:**

```python
import asyncio

async def failing_task() -> None:
    await asyncio.sleep(1)
    raise ValueError("Task failed")

async def main() -> None:
    # Handle exception in single task
    try:
        await failing_task()
    except ValueError as e:
        print(f"Caught: {e}")

    # Handle exceptions with gather
    results = await asyncio.gather(
        failing_task(),
        failing_task(),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")

asyncio.run(main())
```

## When to Use This Skill

Use python-async-patterns when you need to:

- Handle multiple I/O operations concurrently (API calls, database queries)
- Build async web servers or clients
- Process data streams asynchronously
- Implement producer-consumer patterns with async queues
- Run blocking I/O operations without blocking the event loop
- Create async context managers for resource management
- Implement async iterators for streaming data
- Control concurrency with semaphores and locks
- Handle timeouts and cancellation in async operations
- Mix CPU-bound and I/O-bound operations efficiently

## Best Practices

- Use asyncio.run() for the main entry point
- Create tasks with asyncio.create_task() for concurrent execution
- Use gather() when you need all results
- Use wait() when you need fine-grained control
- Always handle CancelledError in long-running tasks
- Use semaphores to limit concurrent operations
- Prefer async context managers for resource management
- Use asyncio.Queue for producer-consumer patterns
- Run blocking I/O in thread pool with run_in_executor()
- Run CPU-bound tasks in process pool
- Set appropriate timeouts for network operations
- Use structured concurrency patterns (nurseries)

## Common Pitfalls

- Forgetting to await coroutines (creates coroutine object, doesn't run)
- Blocking the event loop with CPU-intensive work
- Not handling task cancellation properly
- Using time.sleep() instead of asyncio.sleep()
- Creating too many concurrent tasks without limits
- Not closing resources properly in async context
- Mixing blocking and async code incorrectly
- Not handling exceptions in background tasks
- Forgetting to call task_done() with Queue
- Using global event loop instead of asyncio.run()

## Resources

- [asyncio Documentation](https://docs.python.org/3/library/asyncio.html)
- [aiohttp Documentation](https://docs.aiohttp.org/)
- [PEP 492 - Coroutines](https://peps.python.org/pep-0492/)
- [PEP 525 - Async Generators](https://peps.python.org/pep-0525/)
- [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)

Overview

This skill teaches practical Python asynchronous programming using asyncio, async/await, and concurrent.futures to handle I/O-bound and CPU-bound workloads. It focuses on concrete patterns: task creation, synchronization primitives, async iterators and context managers, queues, timeouts, and integration with thread/process pools. Use it to make networked, concurrent, and streaming code correct and efficient.

How this skill works

The skill presents reusable async patterns and small, focused examples that show how to create and manage asyncio tasks, gather or wait for results, and handle cancellations and timeouts. It demonstrates resource management with async context managers, producer-consumer flows with asyncio.Queue, and how to offload blocking or CPU-heavy work via ThreadPoolExecutor and ProcessPoolExecutor. Each pattern includes recommended practices for safety and performance.

When to use it

  • When you need concurrent I/O like HTTP requests or database queries
  • Building async servers, clients, or streaming pipelines
  • Coordinating producer-consumer workflows or backpressure handling
  • Offloading blocking I/O to threads or CPU work to processes
  • Managing task lifecycle, timeouts, and graceful cancellation
  • Implementing async iterators or context-managed resources

Best practices

  • Use asyncio.run() as the main entry point and avoid mixing event loops
  • Create tasks with asyncio.create_task() for background work and await them when needed
  • Prefer asyncio.gather() for ordered results and asyncio.wait() for fine-grained control
  • Handle asyncio.CancelledError in long-running tasks for proper cleanup
  • Use asyncio.Semaphore/Lock to limit concurrency and protect critical sections
  • Run blocking I/O with run_in_executor() and CPU-bound work with ProcessPoolExecutor; set timeouts for network calls

Example use cases

  • Parallel HTTP requests with aiohttp and asyncio.gather to fetch many pages
  • Producer-consumer pipeline using asyncio.Queue for streaming data processing
  • Limiting concurrency with asyncio.Semaphore when crawling or calling rate-limited APIs
  • Offloading blocking file or library calls to ThreadPoolExecutor with loop.run_in_executor()
  • Running CPU-heavy computations in ProcessPoolExecutor while keeping the event loop responsive
  • Implementing async context managers for connections or pooled resources

FAQ

Can I run blocking code in asyncio?

Yes—use loop.run_in_executor() to run blocking functions in a ThreadPoolExecutor or ProcessPoolExecutor so the event loop stays responsive.

When should I use gather vs wait?

Use gather when you want all results returned in order; use wait when you need control over which tasks complete first, timeouts, or to cancel pending tasks.