home / skills / thebushidocollective / han / python-async-patterns
This skill helps you master Python asynchronous programming with asyncio and concurrent futures, enabling efficient I/O-bound and CPU-bound task patterns.
npx playbooks add skill thebushidocollective/han --skill python-async-patternsReview the files below or copy the command above to add this skill to your agents.
---
name: python-async-patterns
user-invocable: false
description: Master Python asynchronous programming with asyncio, async/await,
and concurrent.futures. Use for async code and concurrency patterns.
allowed-tools:
- Bash
- Read
---
# Python Async Patterns
Master asynchronous programming in Python using asyncio, async/await syntax,
and concurrent execution patterns for I/O-bound and CPU-bound tasks.
## Basic Async/Await
**Core async syntax:**
```python
import asyncio
# Define async function with async def
async def fetch_data(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # Simulate I/O operation
return f"Data from {url}"
# Call async function
async def main() -> None:
result = await fetch_data("https://api.example.com")
print(result)
# Run async function
asyncio.run(main())
```
**Multiple concurrent operations:**
```python
import asyncio
async def fetch_url(url: str) -> str:
await asyncio.sleep(1)
return f"Content from {url}"
async def main() -> None:
# Run concurrently with gather
results = await asyncio.gather(
fetch_url("https://example.com/1"),
fetch_url("https://example.com/2"),
fetch_url("https://example.com/3")
)
for result in results:
print(result)
asyncio.run(main())
```
## asyncio.create_task
**Creating and managing tasks:**
```python
import asyncio
async def process_item(item: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Processed {item}"
async def main() -> None:
# Create tasks for concurrent execution
task1 = asyncio.create_task(process_item("A", 2.0))
task2 = asyncio.create_task(process_item("B", 1.0))
task3 = asyncio.create_task(process_item("C", 1.5))
# Do other work while tasks run
print("Tasks started")
# Wait for tasks to complete
result1 = await task1
result2 = await task2
result3 = await task3
print(result1, result2, result3)
asyncio.run(main())
```
**Task with name and context:**
```python
import asyncio
async def background_task(name: str) -> None:
print(f"Task {name} starting")
await asyncio.sleep(2)
print(f"Task {name} completed")
async def main() -> None:
# Create named task
task = asyncio.create_task(
background_task("worker"),
name="background-worker"
)
# Check task status
print(f"Task name: {task.get_name()}")
print(f"Task done: {task.done()}")
await task
asyncio.run(main())
```
## asyncio.gather vs asyncio.wait
**Using gather for results:**
```python
import asyncio
async def fetch(n: int) -> int:
await asyncio.sleep(1)
return n * 2
async def main() -> None:
# gather returns results in order
results = await asyncio.gather(
fetch(1),
fetch(2),
fetch(3)
)
print(results) # [2, 4, 6]
# Return exceptions instead of raising
results = await asyncio.gather(
fetch(1),
fetch(2),
return_exceptions=True
)
asyncio.run(main())
```
**Using wait for more control:**
```python
import asyncio
async def worker(n: int) -> int:
await asyncio.sleep(n)
return n
async def main() -> None:
tasks = [
asyncio.create_task(worker(1)),
asyncio.create_task(worker(2)),
asyncio.create_task(worker(3))
]
# Wait for first task to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# Cancel pending tasks
for task in pending:
task.cancel()
# Wait for all with timeout
done, pending = await asyncio.wait(
tasks,
timeout=2.0
)
asyncio.run(main())
```
## Async Context Managers
**Creating async context managers:**
```python
import asyncio
from typing import AsyncIterator
class AsyncResource:
async def __aenter__(self) -> "AsyncResource":
print("Acquiring resource")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
print("Releasing resource")
await asyncio.sleep(0.1)
async def query(self) -> str:
return "data"
async def main() -> None:
async with AsyncResource() as resource:
result = await resource.query()
print(result)
asyncio.run(main())
```
**Using asynccontextmanager decorator:**
```python
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
# Setup
print(f"Connecting to {url}")
await asyncio.sleep(0.1)
conn = f"connection-{url}"
try:
yield conn
finally:
# Teardown
print(f"Closing connection to {url}")
await asyncio.sleep(0.1)
async def main() -> None:
async with get_connection("localhost") as conn:
print(f"Using {conn}")
asyncio.run(main())
```
## Async Iterators
**Creating async iterators:**
```python
import asyncio
from typing import AsyncIterator
class AsyncRange:
def __init__(self, count: int) -> None:
self.count = count
def __aiter__(self) -> AsyncIterator[int]:
return self
async def __anext__(self) -> int:
if self.count <= 0:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.count -= 1
return self.count
async def main() -> None:
async for i in AsyncRange(5):
print(i)
asyncio.run(main())
```
**Async generator functions:**
```python
import asyncio
from typing import AsyncIterator
async def async_range(count: int) -> AsyncIterator[int]:
for i in range(count):
await asyncio.sleep(0.1)
yield i
async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
for url in urls:
await asyncio.sleep(0.5)
yield f"Page content from {url}"
async def main() -> None:
async for num in async_range(5):
print(num)
urls = ["url1", "url2", "url3"]
async for page in fetch_pages(urls):
print(page)
asyncio.run(main())
```
## Async Queues
**Producer-consumer pattern with Queue:**
```python
import asyncio
from asyncio import Queue
async def producer(queue: Queue[int], n: int) -> None:
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(i)
print(f"Produced {i}")
await queue.put(None) # Sentinel value
async def consumer(queue: Queue[int], name: str) -> None:
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.2)
print(f"Consumer {name} processed {item}")
queue.task_done()
async def main() -> None:
queue: Queue[int] = Queue(maxsize=5)
# Start producer and consumers
prod = asyncio.create_task(producer(queue, 10))
cons1 = asyncio.create_task(consumer(queue, "A"))
cons2 = asyncio.create_task(consumer(queue, "B"))
await prod
await queue.join() # Wait for all tasks to be processed
# Signal consumers to exit
await queue.put(None)
await queue.put(None)
await cons1
await cons2
asyncio.run(main())
```
## Semaphore and Lock
**Limiting concurrent operations:**
```python
import asyncio
async def fetch_with_limit(
url: str,
semaphore: asyncio.Semaphore
) -> str:
async with semaphore:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Data from {url}"
async def main() -> None:
# Limit to 3 concurrent operations
semaphore = asyncio.Semaphore(3)
urls = [f"https://example.com/{i}" for i in range(10)]
tasks = [
fetch_with_limit(url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
```
**Using Lock for mutual exclusion:**
```python
import asyncio
class Counter:
def __init__(self) -> None:
self.value = 0
self.lock = asyncio.Lock()
async def increment(self) -> None:
async with self.lock:
# Critical section
current = self.value
await asyncio.sleep(0.01)
self.value = current + 1
async def main() -> None:
counter = Counter()
# Run increments concurrently
await asyncio.gather(*[
counter.increment()
for _ in range(100)
])
print(f"Final value: {counter.value}") # Should be 100
asyncio.run(main())
```
## Timeouts and Cancellation
**Using timeout:**
```python
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(5)
return "completed"
async def main() -> None:
# Timeout after 2 seconds
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(main())
```
**Handling cancellation:**
```python
import asyncio
async def cancellable_task() -> None:
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise
async def main() -> None:
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancellation confirmed")
asyncio.run(main())
```
## Event Loop Management
**Direct event loop control:**
```python
import asyncio
async def task1() -> None:
print("Task 1")
async def task2() -> None:
print("Task 2")
# Create new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Schedule callbacks
loop.call_soon(lambda: print("Callback"))
# Schedule delayed callback
loop.call_later(1.0, lambda: print("Delayed"))
# Run coroutine
loop.run_until_complete(task1())
# Run multiple tasks
loop.run_until_complete(
asyncio.gather(task1(), task2())
)
finally:
loop.close()
```
## concurrent.futures
**ThreadPoolExecutor for I/O-bound tasks:**
```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def blocking_io_task(n: int) -> int:
print(f"Task {n} starting")
time.sleep(2) # Blocking I/O
return n * 2
async def main() -> None:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=3) as executor:
# Run blocking function in thread pool
tasks = [
loop.run_in_executor(executor, blocking_io_task, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
```
**ProcessPoolExecutor for CPU-bound tasks:**
```python
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive_task(n: int) -> int:
# CPU-intensive computation
result = sum(i * i for i in range(n))
return result
async def main() -> None:
loop = asyncio.get_event_loop()
with ProcessPoolExecutor(max_workers=4) as executor:
# Run CPU-bound function in process pool
tasks = [
loop.run_in_executor(
executor,
cpu_intensive_task,
10_000_000
)
for _ in range(4)
]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} tasks")
asyncio.run(main())
```
## Async HTTP with aiohttp
**Making async HTTP requests:**
```python
import asyncio
import aiohttp
async def fetch_url(
session: aiohttp.ClientSession,
url: str
) -> str:
async with session.get(url) as response:
return await response.text()
async def main() -> None:
async with aiohttp.ClientSession() as session:
urls = [
"https://example.com/1",
"https://example.com/2",
"https://example.com/3"
]
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} pages")
asyncio.run(main())
```
## Error Handling
**Handling exceptions in async code:**
```python
import asyncio
async def failing_task() -> None:
await asyncio.sleep(1)
raise ValueError("Task failed")
async def main() -> None:
# Handle exception in single task
try:
await failing_task()
except ValueError as e:
print(f"Caught: {e}")
# Handle exceptions with gather
results = await asyncio.gather(
failing_task(),
failing_task(),
return_exceptions=True
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
asyncio.run(main())
```
## When to Use This Skill
Use python-async-patterns when you need to:
- Handle multiple I/O operations concurrently (API calls, database queries)
- Build async web servers or clients
- Process data streams asynchronously
- Implement producer-consumer patterns with async queues
- Run blocking I/O operations without blocking the event loop
- Create async context managers for resource management
- Implement async iterators for streaming data
- Control concurrency with semaphores and locks
- Handle timeouts and cancellation in async operations
- Mix CPU-bound and I/O-bound operations efficiently
## Best Practices
- Use asyncio.run() for the main entry point
- Create tasks with asyncio.create_task() for concurrent execution
- Use gather() when you need all results
- Use wait() when you need fine-grained control
- Always handle CancelledError in long-running tasks
- Use semaphores to limit concurrent operations
- Prefer async context managers for resource management
- Use asyncio.Queue for producer-consumer patterns
- Run blocking I/O in thread pool with run_in_executor()
- Run CPU-bound tasks in process pool
- Set appropriate timeouts for network operations
- Use structured concurrency patterns (nurseries)
## Common Pitfalls
- Forgetting to await coroutines (creates coroutine object, doesn't run)
- Blocking the event loop with CPU-intensive work
- Not handling task cancellation properly
- Using time.sleep() instead of asyncio.sleep()
- Creating too many concurrent tasks without limits
- Not closing resources properly in async context
- Mixing blocking and async code incorrectly
- Not handling exceptions in background tasks
- Forgetting to call task_done() with Queue
- Using global event loop instead of asyncio.run()
## Resources
- [asyncio Documentation](https://docs.python.org/3/library/asyncio.html)
- [aiohttp Documentation](https://docs.aiohttp.org/)
- [PEP 492 - Coroutines](https://peps.python.org/pep-0492/)
- [PEP 525 - Async Generators](https://peps.python.org/pep-0525/)
- [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html)
This skill teaches practical Python asynchronous programming using asyncio, async/await, and concurrent.futures to handle I/O-bound and CPU-bound workloads. It focuses on concrete patterns: task creation, synchronization primitives, async iterators and context managers, queues, timeouts, and integration with thread/process pools. Use it to make networked, concurrent, and streaming code correct and efficient.
The skill presents reusable async patterns and small, focused examples that show how to create and manage asyncio tasks, gather or wait for results, and handle cancellations and timeouts. It demonstrates resource management with async context managers, producer-consumer flows with asyncio.Queue, and how to offload blocking or CPU-heavy work via ThreadPoolExecutor and ProcessPoolExecutor. Each pattern includes recommended practices for safety and performance.
Can I run blocking code in asyncio?
Yes—use loop.run_in_executor() to run blocking functions in a ThreadPoolExecutor or ProcessPoolExecutor so the event loop stays responsive.
When should I use gather vs wait?
Use gather when you want all results returned in order; use wait when you need control over which tasks complete first, timeouts, or to cancel pending tasks.