home / skills / benchflow-ai / skillsbench / asyncio

asyncio skill

/registry/terminal_bench_2.0/full_batch_reviewed/terminal_bench_2_0_cancel-async-tasks/environment/skills/asyncio

This skill helps you implement Python asyncio patterns for concurrent I/O tasks, improving responsiveness and scalability with async/await and event loops.

npx playbooks add skill benchflow-ai/skillsbench --skill asyncio

Review the files below or copy the command above to add this skill to your agents.

Files (3)
SKILL.md
41.6 KB
---
name: asyncio
description: Python asyncio - Modern concurrent programming with async/await, event loops, tasks, coroutines, primitives, aiohttp, and FastAPI async patterns
version: 1.0.0
category: toolchain
author: Claude MPM Team
license: MIT
progressive_disclosure:
  entry_point:
    summary: "Async/await concurrency: coroutines, tasks, event loops, async HTTP (aiohttp), async DB, FastAPI async, locks, queues, error handling"
    when_to_use: "Concurrent I/O operations, HTTP clients/servers, database access, WebSocket handling, parallel API calls, background tasks, real-time systems"
    quick_start: "1. Define async def functions 2. Await async operations 3. Run with asyncio.run() 4. Use asyncio.gather() for parallel tasks 5. Handle errors with try/except"
context_limit: 700
tags:
  - asyncio
  - async
  - concurrency
  - coroutines
  - aiohttp
  - fastapi
  - async-database
  - event-loop
  - tasks
requires_tools: []
---

# Python asyncio - Async/Await Concurrency

## Overview

Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.

**Key Features**:
- async/await syntax for readable concurrent code
- Event loop for managing concurrent operations
- Tasks for running multiple coroutines concurrently
- Primitives: locks, semaphores, events, queues
- HTTP client/server with aiohttp
- Database async support (asyncpg, aiomysql, motor)
- FastAPI async endpoints
- WebSocket support
- Background task management

**Installation**:
```bash
# asyncio is built-in (Python 3.7+)

# Async HTTP client
pip install aiohttp

# Async HTTP requests (alternative)
pip install httpx

# Async database drivers
pip install asyncpg aiomysql motor  # PostgreSQL, MySQL, MongoDB

# FastAPI with async support
pip install fastapi uvicorn[standard]

# Async testing
pip install pytest-asyncio
```

## Basic Async/Await Patterns

### 1. Simple Async Function

```python
import asyncio

async def hello():
    """Basic async function (coroutine)."""
    print("Hello")
    await asyncio.sleep(1)  # Async sleep (non-blocking)
    print("World")
    return "Done"

# Run async function
result = asyncio.run(hello())
print(result)  # "Done"
```

**Key Points**:
- `async def` defines a coroutine function
- `await` suspends execution until awaitable completes
- `asyncio.run()` is the entry point for async programs
- Coroutines must be awaited or scheduled

### 2. Multiple Concurrent Tasks

```python
import asyncio
import time

async def task(name, duration):
    """Simulate async task."""
    print(f"{name}: Starting (duration: {duration}s)")
    await asyncio.sleep(duration)
    print(f"{name}: Complete")
    return f"{name} result"

async def run_concurrent():
    """Run multiple tasks concurrently."""
    start = time.time()

    # Sequential (slow) - 6 seconds total
    # result1 = await task("Task 1", 3)
    # result2 = await task("Task 2", 2)
    # result3 = await task("Task 3", 1)

    # Concurrent (fast) - 3 seconds total
    results = await asyncio.gather(
        task("Task 1", 3),
        task("Task 2", 2),
        task("Task 3", 1)
    )

    elapsed = time.time() - start
    print(f"Total time: {elapsed:.2f}s")
    print(f"Results: {results}")

asyncio.run(run_concurrent())
# Output: Total time: 3.00s (tasks ran concurrently)
```

### 3. Task Creation and Management

```python
import asyncio

async def background_task(name):
    """Long-running background task."""
    for i in range(5):
        print(f"{name}: iteration {i}")
        await asyncio.sleep(1)
    return f"{name} complete"

async def main():
    # Create task (starts immediately)
    task1 = asyncio.create_task(background_task("Task-1"))
    task2 = asyncio.create_task(background_task("Task-2"))

    # Do other work while tasks run
    print("Main: doing other work")
    await asyncio.sleep(2)

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())
```

### 4. Error Handling in Async Code

```python
import asyncio

async def risky_operation(fail=False):
    """Operation that might fail."""
    await asyncio.sleep(1)
    if fail:
        raise ValueError("Operation failed")
    return "Success"

async def handle_errors():
    # Individual try/except
    try:
        result = await risky_operation(fail=True)
    except ValueError as e:
        print(f"Caught error: {e}")
        result = "Fallback value"

    # Gather with error handling
    results = await asyncio.gather(
        risky_operation(fail=False),
        risky_operation(fail=True),
        risky_operation(fail=False),
        return_exceptions=True  # Return exceptions instead of raising
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(handle_errors())
```

## Event Loop Fundamentals

### 1. Event Loop Lifecycle

```python
import asyncio

# Modern approach (Python 3.7+)
async def main():
    print("Main coroutine")
    await asyncio.sleep(1)

asyncio.run(main())  # Creates loop, runs main, closes loop

# Manual loop management (advanced use cases)
async def manual_example():
    loop = asyncio.get_event_loop()

    # Schedule coroutine
    task = loop.create_task(some_coroutine())

    # Schedule callback
    loop.call_later(5, callback_function)

    # Run until complete
    result = await task

    return result

# Get current event loop
async def get_current_loop():
    loop = asyncio.get_running_loop()
    print(f"Loop: {loop}")

    # Schedule callback in event loop
    loop.call_soon(lambda: print("Callback executed"))

    await asyncio.sleep(0)  # Let callback execute
```

### 2. Loop Scheduling and Callbacks

```python
import asyncio
from datetime import datetime

def callback(name, loop):
    """Callback function (not async)."""
    print(f"{datetime.now()}: {name} callback executed")

    # Stop loop after callback
    # loop.stop()

async def schedule_callbacks():
    loop = asyncio.get_running_loop()

    # Schedule immediate callback
    loop.call_soon(callback, "Immediate", loop)

    # Schedule callback after delay
    loop.call_later(2, callback, "Delayed 2s", loop)

    # Schedule callback at specific time
    loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)

    # Wait for callbacks to execute
    await asyncio.sleep(5)

asyncio.run(schedule_callbacks())
```

### 3. Running Blocking Code

```python
import asyncio
import time

def blocking_io():
    """CPU-intensive or blocking I/O operation."""
    print("Blocking operation started")
    time.sleep(2)  # Blocks thread
    print("Blocking operation complete")
    return "Blocking result"

async def run_in_executor():
    """Run blocking code in thread pool."""
    loop = asyncio.get_running_loop()

    # Run in default executor (thread pool)
    result = await loop.run_in_executor(
        None,  # Use default executor
        blocking_io
    )

    print(f"Result: {result}")

# Run blocking operations concurrently
async def concurrent_blocking():
    loop = asyncio.get_running_loop()

    # These run in thread pool, don't block event loop
    results = await asyncio.gather(
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io)
    )

    print(f"All results: {results}")

asyncio.run(concurrent_blocking())
```

## Asyncio Primitives

### 1. Locks for Mutual Exclusion

```python
import asyncio

# Shared resource
counter = 0
lock = asyncio.Lock()

async def increment_with_lock(name):
    """Increment counter with lock protection."""
    global counter

    async with lock:
        # Critical section - only one task at a time
        print(f"{name}: acquired lock")
        current = counter
        await asyncio.sleep(0.1)  # Simulate processing
        counter = current + 1
        print(f"{name}: released lock, counter={counter}")

async def increment_without_lock(name):
    """Increment without lock - race condition!"""
    global counter

    current = counter
    await asyncio.sleep(0.1)  # Race condition window
    counter = current + 1
    print(f"{name}: counter={counter}")

async def test_locks():
    global counter

    # Without lock (race condition)
    counter = 0
    await asyncio.gather(
        increment_without_lock("Task-1"),
        increment_without_lock("Task-2"),
        increment_without_lock("Task-3")
    )
    print(f"Without lock: {counter}")  # Often wrong (< 3)

    # With lock (correct)
    counter = 0
    await asyncio.gather(
        increment_with_lock("Task-1"),
        increment_with_lock("Task-2"),
        increment_with_lock("Task-3")
    )
    print(f"With lock: {counter}")  # Always 3

asyncio.run(test_locks())
```

### 2. Semaphores for Resource Limiting

```python
import asyncio

# Limit concurrent operations
semaphore = asyncio.Semaphore(2)  # Max 2 concurrent

async def limited_operation(name):
    """Operation limited by semaphore."""
    print(f"{name}: waiting for semaphore")

    async with semaphore:
        print(f"{name}: acquired semaphore")
        await asyncio.sleep(2)  # Simulate work
        print(f"{name}: releasing semaphore")

async def test_semaphore():
    # Create 5 tasks, but only 2 run concurrently
    await asyncio.gather(
        limited_operation("Task-1"),
        limited_operation("Task-2"),
        limited_operation("Task-3"),
        limited_operation("Task-4"),
        limited_operation("Task-5")
    )

asyncio.run(test_semaphore())
# Only 2 tasks hold semaphore at any time
```

### 3. Events for Signaling

```python
import asyncio

event = asyncio.Event()

async def waiter(name):
    """Wait for event to be set."""
    print(f"{name}: waiting for event")
    await event.wait()  # Block until event is set
    print(f"{name}: event received!")

async def setter():
    """Set event after delay."""
    await asyncio.sleep(2)
    print("Setter: setting event")
    event.set()  # Wake up all waiters

async def test_event():
    # Create waiters
    await asyncio.gather(
        waiter("Waiter-1"),
        waiter("Waiter-2"),
        waiter("Waiter-3"),
        setter()
    )

asyncio.run(test_event())
```

### 4. Queues for Task Distribution

```python
import asyncio
import random

async def producer(queue, name):
    """Produce items and add to queue."""
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"{name}: produced {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    """Consume items from queue."""
    while True:
        item = await queue.get()  # Block until item available

        if item is None:  # Shutdown signal
            queue.task_done()
            break

        print(f"{name}: consumed {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def test_queue():
    queue = asyncio.Queue(maxsize=10)

    # Create producers and consumers
    await asyncio.gather(
        producer(queue, "Producer-1"),
        producer(queue, "Producer-2"),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
        consumer(queue, "Consumer-3")
    )

    # Wait for all items to be processed
    await queue.join()
    print("All tasks complete")

asyncio.run(test_queue())
```

### 5. Condition Variables

```python
import asyncio

condition = asyncio.Condition()
items = []

async def consumer(name):
    """Wait for items to be available."""
    async with condition:
        # Wait until items are available
        await condition.wait_for(lambda: len(items) > 0)

        item = items.pop(0)
        print(f"{name}: consumed {item}")

async def producer(name):
    """Add items and notify consumers."""
    async with condition:
        item = f"{name}-item"
        items.append(item)
        print(f"{name}: produced {item}")

        # Notify one waiting consumer
        condition.notify(n=1)
        # Or notify all: condition.notify_all()

async def test_condition():
    await asyncio.gather(
        consumer("Consumer-1"),
        consumer("Consumer-2"),
        producer("Producer-1"),
        producer("Producer-2")
    )

asyncio.run(test_condition())
```

## Async HTTP with aiohttp

### 1. Basic HTTP Client

```python
import asyncio
import aiohttp

async def fetch_url(session, url):
    """Fetch single URL."""
    async with session.get(url) as response:
        status = response.status
        text = await response.text()
        return {"url": url, "status": status, "length": len(text)}

async def fetch_multiple_urls():
    """Fetch multiple URLs concurrently."""
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/json",
    ]

    async with aiohttp.ClientSession() as session:
        # Concurrent requests
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")

asyncio.run(fetch_multiple_urls())
```

### 2. HTTP Client with Error Handling

```python
import asyncio
import aiohttp
from typing import Dict, Any

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Dict[str, Any]:
    """Fetch URL with retry logic."""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                response.raise_for_status()  # Raise for 4xx/5xx
                data = await response.json()
                return {"success": True, "data": data}

        except aiohttp.ClientError as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                return {"success": False, "error": str(e)}

            # Exponential backoff
            await asyncio.sleep(2 ** attempt)

        except asyncio.TimeoutError:
            print(f"Attempt {attempt + 1} timed out")
            if attempt == max_retries - 1:
                return {"success": False, "error": "Timeout"}

            await asyncio.sleep(2 ** attempt)

async def parallel_api_calls():
    """Make parallel API calls with error handling."""
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/status/500",  # Will fail
        "https://httpbin.org/delay/1",
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_with_retry(session, url) for url in urls],
            return_exceptions=True  # Don't stop on errors
        )

        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                print(f"{url}: Exception - {result}")
            elif result["success"]:
                print(f"{url}: Success")
            else:
                print(f"{url}: Failed - {result['error']}")

asyncio.run(parallel_api_calls())
```

### 3. HTTP Server with aiohttp

```python
from aiohttp import web
import asyncio

async def handle_hello(request):
    """Simple GET handler."""
    name = request.query.get("name", "World")
    return web.json_response({"message": f"Hello, {name}!"})

async def handle_post(request):
    """POST handler with JSON body."""
    data = await request.json()

    # Simulate async processing
    await asyncio.sleep(1)

    return web.json_response({
        "received": data,
        "status": "processed"
    })

async def handle_stream(request):
    """Streaming response."""
    response = web.StreamResponse()
    await response.prepare(request)

    for i in range(10):
        await response.write(f"Chunk {i}\n".encode())
        await asyncio.sleep(0.5)

    await response.write_eof()
    return response

# Create application
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)

# Run server
if __name__ == "__main__":
    web.run_app(app, host="0.0.0.0", port=8080)
```

### 4. WebSocket Client

```python
import asyncio
import aiohttp

async def websocket_client():
    """Connect to WebSocket server."""
    url = "wss://echo.websocket.org"

    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(url) as ws:
            # Send messages
            await ws.send_str("Hello WebSocket")
            await ws.send_json({"type": "greeting", "data": "test"})

            # Receive messages
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"Received: {msg.data}")

                    if msg.data == "close":
                        await ws.close()
                        break

                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"Error: {ws.exception()}")
                    break

asyncio.run(websocket_client())
```

## Async Database Operations

### 1. PostgreSQL with asyncpg

```python
import asyncio
import asyncpg

async def database_operations():
    """Async PostgreSQL operations."""
    # Create connection pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="user",
        password="password",
        min_size=5,
        max_size=20
    )

    try:
        # Acquire connection from pool
        async with pool.acquire() as conn:
            # Execute query
            rows = await conn.fetch(
                "SELECT id, name, email FROM users WHERE active = $1",
                True
            )

            for row in rows:
                print(f"User: {row['name']} ({row['email']})")

            # Insert data
            await conn.execute(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                "Alice", "[email protected]"
            )

            # Transaction
            async with conn.transaction():
                await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
                await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")

    finally:
        await pool.close()

asyncio.run(database_operations())
```

### 2. MongoDB with motor

```python
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def mongodb_operations():
    """Async MongoDB operations."""
    # Create client
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    try:
        # Insert document
        result = await collection.insert_one({
            "name": "Alice",
            "email": "[email protected]",
            "age": 30
        })
        print(f"Inserted ID: {result.inserted_id}")

        # Find documents
        cursor = collection.find({"age": {"$gte": 25}})
        async for document in cursor:
            print(f"User: {document['name']}")

        # Update document
        await collection.update_one(
            {"name": "Alice"},
            {"$set": {"age": 31}}
        )

        # Aggregation pipeline
        pipeline = [
            {"$match": {"age": {"$gte": 25}}},
            {"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
        ]
        async for result in collection.aggregate(pipeline):
            print(f"Average age: {result['avg_age']}")

    finally:
        client.close()

asyncio.run(mongodb_operations())
```

### 3. Connection Pool Pattern

```python
import asyncio
import asyncpg
from typing import Optional

class DatabasePool:
    """Async database connection pool manager."""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        """Create connection pool."""
        self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)

    async def close(self):
        """Close connection pool."""
        if self.pool:
            await self.pool.close()

    async def execute(self, query: str, *args):
        """Execute query."""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """Fetch multiple rows."""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def fetchrow(self, query: str, *args):
        """Fetch single row."""
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

# Usage
async def use_pool():
    db = DatabasePool("postgresql://user:pass@localhost/mydb")
    await db.connect()

    try:
        # Execute operations
        rows = await db.fetch("SELECT * FROM users")
        for row in rows:
            print(row)
    finally:
        await db.close()

asyncio.run(use_pool())
```

## FastAPI Async Patterns

### 1. Async Endpoints

```python
from fastapi import FastAPI, HTTPException
import asyncio
import httpx

app = FastAPI()

@app.get("/")
async def root():
    """Simple async endpoint."""
    return {"message": "Hello World"}

@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
    """Endpoint with async delay."""
    await asyncio.sleep(seconds)
    return {"message": f"Waited {seconds} seconds"}

@app.get("/fetch")
async def fetch_external():
    """Fetch data from external API."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/json")
        return response.json()

@app.get("/parallel")
async def parallel_requests():
    """Make parallel API calls."""
    async with httpx.AsyncClient() as client:
        responses = await asyncio.gather(
            client.get("https://httpbin.org/delay/1"),
            client.get("https://httpbin.org/delay/2"),
            client.get("https://httpbin.org/json")
        )

        return {
            "results": [r.json() for r in responses]
        }
```

### 2. Background Tasks

```python
from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def send_email(email: str, message: str):
    """Simulate sending email."""
    print(f"Sending email to {email}")
    await asyncio.sleep(5)  # Simulate slow email service
    print(f"Email sent to {email}: {message}")

@app.post("/send-notification")
async def send_notification(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    """Send notification in background."""
    # Add task to background
    background_tasks.add_task(send_email, email, message)

    # Return immediately
    return {"status": "notification queued"}

# Alternative: manual task creation
@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
    """Create background task manually."""
    asyncio.create_task(send_email(email, message))
    return {"status": "notification queued"}
```

### 3. Async Dependencies

```python
from fastapi import FastAPI, Depends
import asyncpg

app = FastAPI()

# Database pool (global)
db_pool = None

async def get_db():
    """Dependency: database connection."""
    async with db_pool.acquire() as conn:
        yield conn

@app.on_event("startup")
async def startup():
    """Initialize database pool on startup."""
    global db_pool
    db_pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb"
    )

@app.on_event("shutdown")
async def shutdown():
    """Close database pool on shutdown."""
    await db_pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
    """Get user with async database dependency."""
    user = await conn.fetchrow(
        "SELECT * FROM users WHERE id = $1",
        user_id
    )

    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return dict(user)
```

### 4. WebSocket Endpoints

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio

app = FastAPI()

# Active connections
active_connections: List[WebSocket] = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket endpoint."""
    await websocket.accept()
    active_connections.append(websocket)

    try:
        while True:
            # Receive message
            data = await websocket.receive_text()

            # Broadcast to all connections
            for connection in active_connections:
                await connection.send_text(f"Broadcast: {data}")

    except WebSocketDisconnect:
        active_connections.remove(websocket)
        print("Client disconnected")

# Background task to send periodic updates
async def broadcast_updates():
    """Send periodic updates to all clients."""
    while True:
        await asyncio.sleep(5)

        for connection in active_connections:
            try:
                await connection.send_text("Periodic update")
            except:
                active_connections.remove(connection)

@app.on_event("startup")
async def startup():
    """Start background update task."""
    asyncio.create_task(broadcast_updates())
```

## Common Patterns and Best Practices

### 1. Timeout Handling

```python
import asyncio

async def slow_operation():
    """Slow operation."""
    await asyncio.sleep(10)
    return "Result"

async def with_timeout():
    """Run operation with timeout."""
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
        print(f"Result: {result}")

    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(with_timeout())
```

### 2. Cancellation Handling

```python
import asyncio

async def cancellable_task():
    """Task that can be cancelled."""
    try:
        for i in range(10):
            print(f"Working: {i}")
            await asyncio.sleep(1)

        return "Complete"

    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise  # Re-raise to propagate cancellation

async def cancel_example():
    """Example of task cancellation."""
    task = asyncio.create_task(cancellable_task())

    # Let it run for a bit
    await asyncio.sleep(3)

    # Cancel the task
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Confirmed: task was cancelled")

asyncio.run(cancel_example())
```

### 3. Resource Cleanup with Context Managers

```python
import asyncio

class AsyncResource:
    """Async context manager for resource management."""

    async def __aenter__(self):
        """Async setup."""
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async setup
        self.connection = "connected"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async cleanup."""
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async cleanup
        self.connection = None

async def use_resource():
    """Use async resource."""
    async with AsyncResource() as resource:
        print(f"Using resource: {resource.connection}")
        await asyncio.sleep(1)
    # Resource automatically cleaned up

asyncio.run(use_resource())
```

### 4. Debouncing and Throttling

```python
import asyncio
from datetime import datetime

class Debouncer:
    """Debounce async function calls."""

    def __init__(self, delay: float):
        self.delay = delay
        self.task = None

    async def call(self, func, *args, **kwargs):
        """Debounced function call."""
        # Cancel previous task
        if self.task:
            self.task.cancel()

        # Create new task
        async def delayed_call():
            await asyncio.sleep(self.delay)
            await func(*args, **kwargs)

        self.task = asyncio.create_task(delayed_call())

async def api_call(query: str):
    """Simulated API call."""
    print(f"{datetime.now()}: API call with query: {query}")

async def debounce_example():
    """Example of debouncing."""
    debouncer = Debouncer(delay=1.0)

    # Rapid calls - only last one executes
    await debouncer.call(api_call, "query1")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query2")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query3")

    # Wait for debounced call
    await asyncio.sleep(2)

asyncio.run(debounce_example())
# Output: Only "query3" API call executes
```

### 5. Rate Limiting

```python
import asyncio
from datetime import datetime

class RateLimiter:
    """Limit rate of async operations."""

    def __init__(self, max_calls: int, period: float):
        self.max_calls = max_calls
        self.period = period
        self.semaphore = asyncio.Semaphore(max_calls)
        self.calls = []

    async def __aenter__(self):
        """Acquire rate limit slot."""
        await self.semaphore.acquire()

        now = asyncio.get_event_loop().time()

        # Remove old calls outside period
        self.calls = [t for t in self.calls if now - t < self.period]

        if len(self.calls) >= self.max_calls:
            # Wait until oldest call expires
            sleep_time = self.period - (now - self.calls[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.calls.append(now)
        return self

    async def __aexit__(self, *args):
        """Release semaphore."""
        self.semaphore.release()

async def rate_limited_operation(limiter, name):
    """Operation with rate limiting."""
    async with limiter:
        print(f"{datetime.now()}: {name}")
        await asyncio.sleep(0.1)

async def rate_limit_example():
    """Example of rate limiting."""
    # Max 3 calls per 2 seconds
    limiter = RateLimiter(max_calls=3, period=2.0)

    # Try to make 6 calls
    await asyncio.gather(*[
        rate_limited_operation(limiter, f"Call-{i}")
        for i in range(6)
    ])

asyncio.run(rate_limit_example())
```

## Debugging Async Code

### 1. Enable Debug Mode

```python
import asyncio
import logging

# Enable asyncio debug mode
asyncio.run(main(), debug=True)

# Or set environment variable:
# PYTHONASYNCIODEBUG=1 python script.py

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def debug_example():
    logger.debug("Starting operation")
    await asyncio.sleep(1)
    logger.debug("Operation complete")
```

### 2. Detect Blocking Code

```python
import asyncio
import time

async def problematic_code():
    """Code with blocking operation."""
    print("Starting")

    # BAD: Blocking sleep
    time.sleep(2)  # This blocks the event loop!

    print("Complete")

# Run with debug mode to detect blocking
asyncio.run(problematic_code(), debug=True)
# Warning: Executing <Task> took 2.001 seconds
```

### 3. Track Pending Tasks

```python
import asyncio

async def track_tasks():
    """Track all pending tasks."""
    # Get all tasks
    tasks = asyncio.all_tasks()

    print(f"Total tasks: {len(tasks)}")
    for task in tasks:
        print(f"  - {task.get_name()}: {task}")

        # Check if task is done
        if task.done():
            try:
                result = task.result()
                print(f"    Result: {result}")
            except Exception as e:
                print(f"    Exception: {e}")

# Create some tasks
async def main():
    task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
    task2 = asyncio.create_task(track_tasks(), name="tracking")

    await task2
    task1.cancel()

asyncio.run(main())
```

## Testing Async Code

### 1. pytest-asyncio Setup

```python
# test_async.py
import pytest
import asyncio

# Mark test as async
@pytest.mark.asyncio
async def test_async_function():
    """Test async function."""
    result = await some_async_function()
    assert result == "expected"

@pytest.mark.asyncio
async def test_async_http():
    """Test async HTTP client."""
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/json") as response:
            assert response.status == 200
            data = await response.json()
            assert "slideshow" in data

# Async fixture
@pytest.fixture
async def async_client():
    """Async test fixture."""
    client = await create_async_client()
    yield client
    await client.close()

@pytest.mark.asyncio
async def test_with_fixture(async_client):
    """Test using async fixture."""
    result = await async_client.fetch_data()
    assert result is not None
```

### 2. Mocking Async Functions

```python
import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    """Test with async mock."""
    # Create async mock
    mock_func = AsyncMock(return_value="mocked result")

    result = await mock_func()
    assert result == "mocked result"
    mock_func.assert_called_once()

@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
    """Test with patched async function."""
    mock_async.return_value = {"status": "success"}

    result = await some_function_that_calls_async()

    assert result["status"] == "success"
    mock_async.assert_called_once()
```

## Performance Optimization

### 1. Use asyncio.gather() for Parallelism

```python
import asyncio
import time

async def slow_task(n):
    await asyncio.sleep(1)
    return n * 2

async def optimized():
    """Parallel execution."""
    start = time.time()

    # Sequential (slow) - 5 seconds
    # results = []
    # for i in range(5):
    #     result = await slow_task(i)
    #     results.append(result)

    # Parallel (fast) - 1 second
    results = await asyncio.gather(*[slow_task(i) for i in range(5)])

    elapsed = time.time() - start
    print(f"Time: {elapsed:.2f}s, Results: {results}")

asyncio.run(optimized())
```

### 2. Connection Pooling

```python
import asyncio
import aiohttp

# BAD: Create new session for each request
async def bad_pattern():
    for i in range(10):
        async with aiohttp.ClientSession() as session:
            async with session.get("https://httpbin.org/json") as response:
                await response.json()

# GOOD: Reuse session with connection pool
async def good_pattern():
    async with aiohttp.ClientSession() as session:
        tasks = [
            session.get("https://httpbin.org/json")
            for i in range(10)
        ]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            await response.json()
```

### 3. Avoid Blocking Operations

```python
import asyncio

# BAD: Blocking I/O in async function
async def bad_file_read():
    with open("large_file.txt") as f:  # Blocks event loop!
        data = f.read()
    return data

# GOOD: Use async file I/O or run in executor
async def good_file_read():
    loop = asyncio.get_running_loop()

    # Run blocking operation in thread pool
    data = await loop.run_in_executor(
        None,
        lambda: open("large_file.txt").read()
    )
    return data

# BETTER: Use aiofiles for async file I/O
import aiofiles

async def better_file_read():
    async with aiofiles.open("large_file.txt") as f:
        data = await f.read()
    return data
```

## Common Pitfalls

### ❌ Anti-Pattern 1: Not Awaiting Coroutines

```python
# WRONG
async def bad():
    result = async_function()  # Returns coroutine, doesn't execute!
    print(result)  # Prints: <coroutine object>

# CORRECT
async def good():
    result = await async_function()  # Actually executes
    print(result)
```

### ❌ Anti-Pattern 2: Blocking the Event Loop

```python
# WRONG
import time

async def bad():
    time.sleep(5)  # Blocks entire event loop!

# CORRECT
async def good():
    await asyncio.sleep(5)  # Non-blocking
```

### ❌ Anti-Pattern 3: Not Handling Cancellation

```python
# WRONG
async def bad():
    await asyncio.sleep(10)
    # No cleanup if cancelled

# CORRECT
async def good():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        # Cleanup resources
        await cleanup()
        raise  # Re-raise to propagate
```

### ❌ Anti-Pattern 4: Creating Event Loop Incorrectly

```python
# WRONG (Python 3.7+)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# CORRECT (Python 3.7+)
asyncio.run(main())
```

### ❌ Anti-Pattern 5: Not Closing Resources

```python
# WRONG
async def bad():
    session = aiohttp.ClientSession()
    response = await session.get(url)
    # Session never closed - resource leak!

# CORRECT
async def good():
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        # Session automatically closed
```

## Best Practices

1. **Use asyncio.run()** for entry point (Python 3.7+)
2. **Always await coroutines** - don't forget await
3. **Use async context managers** for resource cleanup
4. **Connection pooling** for HTTP and database clients
5. **Handle CancelledError** for graceful shutdown
6. **Use asyncio.gather()** for parallel operations
7. **Avoid blocking operations** in async functions
8. **Use timeouts** to prevent hanging operations
9. **Debug mode** during development to catch issues
10. **Test async code** with pytest-asyncio

## Quick Reference

### Common Commands

```bash
# Run async script
python script.py

# Run with debug mode
PYTHONASYNCIODEBUG=1 python script.py

# Run tests
pytest -v --asyncio-mode=auto

# Install async dependencies
pip install aiohttp asyncpg motor pytest-asyncio
```

### Essential Imports

```python
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Any
```

## Resources

- **Official Documentation**: https://docs.python.org/3/library/asyncio.html
- **aiohttp**: https://docs.aiohttp.org/
- **asyncpg**: https://magicstack.github.io/asyncpg/
- **FastAPI Async**: https://fastapi.tiangolo.com/async/
- **pytest-asyncio**: https://pytest-asyncio.readthedocs.io/

## Related Skills

When using asyncio, consider these complementary skills:

- **fastapi-local-dev**: FastAPI async server patterns and production deployment
- **pytest**: Testing async code with pytest-asyncio and fixtures
- **systematic-debugging**: Debugging async race conditions and deadlocks

### Quick FastAPI Async Patterns (Inlined for Standalone Use)

```python
# FastAPI async endpoint pattern
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

app = FastAPI()

# Async database setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    # Async database query
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# Background tasks with asyncio
@app.post("/send-email")
async def send_email_endpoint(email: EmailSchema):
    # Non-blocking background task
    asyncio.create_task(send_email_async(email))
    return {"status": "email queued"}
```

### Quick pytest-asyncio Patterns (Inlined for Standalone Use)

```python
# Testing async functions with pytest
import pytest
import pytest_asyncio
from httpx import AsyncClient

# Async test fixture
@pytest_asyncio.fixture
async def async_client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

# Async test function
@pytest.mark.asyncio
async def test_get_user(async_client):
    response = await async_client.get("/users/1")
    assert response.status_code == 200
    assert response.json()["id"] == 1

# Testing concurrent operations
@pytest.mark.asyncio
async def test_concurrent_requests():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Run 10 requests concurrently
        responses = await asyncio.gather(
            *[client.get(f"/users/{i}") for i in range(1, 11)]
        )
        assert all(r.status_code == 200 for r in responses)

# Mock async dependencies
@pytest_asyncio.fixture
async def mock_db():
    # Setup mock database
    db = AsyncMock()
    yield db
    # Cleanup
```

### Quick Async Debugging Reference (Inlined for Standalone Use)

**Common Async Pitfalls:**

1. **Blocking the Event Loop**
   ```python
   # ❌ WRONG: Blocking call in async function
   async def bad_function():
       time.sleep(5)  # Blocks entire event loop!
       return "done"

   # ✅ CORRECT: Use asyncio.sleep
   async def good_function():
       await asyncio.sleep(5)  # Releases event loop
       return "done"
   ```

2. **Debugging Race Conditions**
   ```python
   # Add logging to track execution order
   import logging
   logging.basicConfig(level=logging.DEBUG)

   async def debug_task(name):
       logging.debug(f"{name}: Starting")
       await asyncio.sleep(1)
       logging.debug(f"{name}: Finished")
       return name

   # Run with detailed tracing
   asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
   ```

3. **Deadlock Detection**
   ```python
   # Use timeout to detect deadlocks
   try:
       result = await asyncio.wait_for(some_async_function(), timeout=5.0)
   except asyncio.TimeoutError:
       logging.error("Deadlock detected: operation timed out")
       # Investigate what's blocking
   ```

4. **Inspecting Running Tasks**
   ```python
   # Check all pending tasks
   tasks = asyncio.all_tasks()
   for task in tasks:
       print(f"Task: {task.get_name()}, Done: {task.done()}")
       if not task.done():
           print(f"  Current coro: {task.get_coro()}")
   ```

[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]

---

**Python Version Compatibility:** This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.

Overview

This skill teaches modern Python concurrency using asyncio: async/await, event loops, tasks, and synchronization primitives. It covers real-world patterns for I/O-bound workloads, async HTTP with aiohttp, async database drivers, and FastAPI async endpoints. The content emphasizes practical examples for writing non-blocking, maintainable code.

How this skill works

The skill explains how to define coroutines with async/await, run them with asyncio.run(), and schedule concurrent work using tasks and asyncio.gather(). It demonstrates event loop management, running blocking work in executors, and using primitives like Lock, Semaphore, Event, Condition, and Queue to coordinate tasks. The guide also shows async HTTP clients with aiohttp, retry and timeout patterns, and integrating async patterns into web frameworks like FastAPI.

When to use it

  • When handling many I/O-bound operations (HTTP, DB, WebSocket) concurrently
  • When you need non-blocking, single-threaded concurrency without threads/processes
  • When building async HTTP clients or servers with aiohttp or FastAPI
  • When coordinating background tasks, producers/consumers, or rate-limited resources
  • When implementing retry/timeout strategies for network calls

Best practices

  • Prefer asyncio.run() for top-level entry points and avoid creating multiple event loops in a process
  • Use asyncio.create_task() for background work and await task results or handle cancellations
  • Protect shared state with asyncio.Lock or use asyncio.Queue for producer/consumer patterns
  • Limit concurrent external calls with asyncio.Semaphore to avoid resource overload
  • Use run_in_executor for CPU-bound or blocking calls to keep the loop responsive
  • Use return_exceptions=True with gather when you need to collect partial results and handle failures explicitly

Example use cases

  • Fetch hundreds of URLs concurrently with aiohttp and exponential backoff for retries
  • Implement a FastAPI application with async endpoints that query async database drivers (asyncpg, motor)
  • Create producer/consumer pipelines using asyncio.Queue for streaming data processing
  • Run background monitoring tasks with asyncio.create_task() while the main service handles requests
  • Throttle API calls with semaphore to respect rate limits while maintaining concurrency

FAQ

Is asyncio only for network I/O?

No. asyncio is best for I/O-bound work (network, file, DB) but can also coordinate tasks that await on other coroutines; use run_in_executor for CPU-bound functions.

How do I handle exceptions across many concurrent tasks?

Use asyncio.gather(..., return_exceptions=True) to collect results and exceptions, or wrap each task in try/except and propagate errors or cancellations explicitly.