home / skills / bobmatnyc / claude-mpm-skills / asyncio

This is most likely a fork of the asyncio skill from benchflow-ai
npx playbooks add skill bobmatnyc/claude-mpm-skills --skill asyncio

Review the files below or copy the command above to add this skill to your agents.

Files (2)
SKILL.md
41.6 KB
---
name: asyncio
description: Python asyncio - Modern concurrent programming with async/await, event loops, tasks, coroutines, primitives, aiohttp, and FastAPI async patterns
version: 1.0.0
category: toolchain
author: Claude MPM Team
license: MIT
progressive_disclosure:
  entry_point:
    summary: "Async/await concurrency: coroutines, tasks, event loops, async HTTP (aiohttp), async DB, FastAPI async, locks, queues, error handling"
    when_to_use: "Concurrent I/O operations, HTTP clients/servers, database access, WebSocket handling, parallel API calls, background tasks, real-time systems"
    quick_start: "1. Define async def functions 2. Await async operations 3. Run with asyncio.run() 4. Use asyncio.gather() for parallel tasks 5. Handle errors with try/except"
context_limit: 700
tags:
  - asyncio
  - async
  - concurrency
  - coroutines
  - aiohttp
  - fastapi
  - async-database
  - event-loop
  - tasks
requires_tools: []
---

# Python asyncio - Async/Await Concurrency

## Overview

Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.

**Key Features**:
- async/await syntax for readable concurrent code
- Event loop for managing concurrent operations
- Tasks for running multiple coroutines concurrently
- Primitives: locks, semaphores, events, queues
- HTTP client/server with aiohttp
- Database async support (asyncpg, aiomysql, motor)
- FastAPI async endpoints
- WebSocket support
- Background task management

**Installation**:
```bash
# asyncio is built-in (Python 3.7+)

# Async HTTP client
pip install aiohttp

# Async HTTP requests (alternative)
pip install httpx

# Async database drivers
pip install asyncpg aiomysql motor  # PostgreSQL, MySQL, MongoDB

# FastAPI with async support
pip install fastapi uvicorn[standard]

# Async testing
pip install pytest-asyncio
```

## Basic Async/Await Patterns

### 1. Simple Async Function

```python
import asyncio

async def hello():
    """Basic async function (coroutine)."""
    print("Hello")
    await asyncio.sleep(1)  # Async sleep (non-blocking)
    print("World")
    return "Done"

# Run async function
result = asyncio.run(hello())
print(result)  # "Done"
```

**Key Points**:
- `async def` defines a coroutine function
- `await` suspends execution until awaitable completes
- `asyncio.run()` is the entry point for async programs
- Coroutines must be awaited or scheduled

### 2. Multiple Concurrent Tasks

```python
import asyncio
import time

async def task(name, duration):
    """Simulate async task."""
    print(f"{name}: Starting (duration: {duration}s)")
    await asyncio.sleep(duration)
    print(f"{name}: Complete")
    return f"{name} result"

async def run_concurrent():
    """Run multiple tasks concurrently."""
    start = time.time()

    # Sequential (slow) - 6 seconds total
    # result1 = await task("Task 1", 3)
    # result2 = await task("Task 2", 2)
    # result3 = await task("Task 3", 1)

    # Concurrent (fast) - 3 seconds total
    results = await asyncio.gather(
        task("Task 1", 3),
        task("Task 2", 2),
        task("Task 3", 1)
    )

    elapsed = time.time() - start
    print(f"Total time: {elapsed:.2f}s")
    print(f"Results: {results}")

asyncio.run(run_concurrent())
# Output: Total time: 3.00s (tasks ran concurrently)
```

### 3. Task Creation and Management

```python
import asyncio

async def background_task(name):
    """Long-running background task."""
    for i in range(5):
        print(f"{name}: iteration {i}")
        await asyncio.sleep(1)
    return f"{name} complete"

async def main():
    # Create task (starts immediately)
    task1 = asyncio.create_task(background_task("Task-1"))
    task2 = asyncio.create_task(background_task("Task-2"))

    # Do other work while tasks run
    print("Main: doing other work")
    await asyncio.sleep(2)

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())
```

### 4. Error Handling in Async Code

```python
import asyncio

async def risky_operation(fail=False):
    """Operation that might fail."""
    await asyncio.sleep(1)
    if fail:
        raise ValueError("Operation failed")
    return "Success"

async def handle_errors():
    # Individual try/except
    try:
        result = await risky_operation(fail=True)
    except ValueError as e:
        print(f"Caught error: {e}")
        result = "Fallback value"

    # Gather with error handling
    results = await asyncio.gather(
        risky_operation(fail=False),
        risky_operation(fail=True),
        risky_operation(fail=False),
        return_exceptions=True  # Return exceptions instead of raising
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(handle_errors())
```

## Event Loop Fundamentals

### 1. Event Loop Lifecycle

```python
import asyncio

# Modern approach (Python 3.7+)
async def main():
    print("Main coroutine")
    await asyncio.sleep(1)

asyncio.run(main())  # Creates loop, runs main, closes loop

# Manual loop management (advanced use cases)
async def manual_example():
    loop = asyncio.get_event_loop()

    # Schedule coroutine
    task = loop.create_task(some_coroutine())

    # Schedule callback
    loop.call_later(5, callback_function)

    # Run until complete
    result = await task

    return result

# Get current event loop
async def get_current_loop():
    loop = asyncio.get_running_loop()
    print(f"Loop: {loop}")

    # Schedule callback in event loop
    loop.call_soon(lambda: print("Callback executed"))

    await asyncio.sleep(0)  # Let callback execute
```

### 2. Loop Scheduling and Callbacks

```python
import asyncio
from datetime import datetime

def callback(name, loop):
    """Callback function (not async)."""
    print(f"{datetime.now()}: {name} callback executed")

    # Stop loop after callback
    # loop.stop()

async def schedule_callbacks():
    loop = asyncio.get_running_loop()

    # Schedule immediate callback
    loop.call_soon(callback, "Immediate", loop)

    # Schedule callback after delay
    loop.call_later(2, callback, "Delayed 2s", loop)

    # Schedule callback at specific time
    loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)

    # Wait for callbacks to execute
    await asyncio.sleep(5)

asyncio.run(schedule_callbacks())
```

### 3. Running Blocking Code

```python
import asyncio
import time

def blocking_io():
    """CPU-intensive or blocking I/O operation."""
    print("Blocking operation started")
    time.sleep(2)  # Blocks thread
    print("Blocking operation complete")
    return "Blocking result"

async def run_in_executor():
    """Run blocking code in thread pool."""
    loop = asyncio.get_running_loop()

    # Run in default executor (thread pool)
    result = await loop.run_in_executor(
        None,  # Use default executor
        blocking_io
    )

    print(f"Result: {result}")

# Run blocking operations concurrently
async def concurrent_blocking():
    loop = asyncio.get_running_loop()

    # These run in thread pool, don't block event loop
    results = await asyncio.gather(
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io),
        loop.run_in_executor(None, blocking_io)
    )

    print(f"All results: {results}")

asyncio.run(concurrent_blocking())
```

## Asyncio Primitives

### 1. Locks for Mutual Exclusion

```python
import asyncio

# Shared resource
counter = 0
lock = asyncio.Lock()

async def increment_with_lock(name):
    """Increment counter with lock protection."""
    global counter

    async with lock:
        # Critical section - only one task at a time
        print(f"{name}: acquired lock")
        current = counter
        await asyncio.sleep(0.1)  # Simulate processing
        counter = current + 1
        print(f"{name}: released lock, counter={counter}")

async def increment_without_lock(name):
    """Increment without lock - race condition!"""
    global counter

    current = counter
    await asyncio.sleep(0.1)  # Race condition window
    counter = current + 1
    print(f"{name}: counter={counter}")

async def test_locks():
    global counter

    # Without lock (race condition)
    counter = 0
    await asyncio.gather(
        increment_without_lock("Task-1"),
        increment_without_lock("Task-2"),
        increment_without_lock("Task-3")
    )
    print(f"Without lock: {counter}")  # Often wrong (< 3)

    # With lock (correct)
    counter = 0
    await asyncio.gather(
        increment_with_lock("Task-1"),
        increment_with_lock("Task-2"),
        increment_with_lock("Task-3")
    )
    print(f"With lock: {counter}")  # Always 3

asyncio.run(test_locks())
```

### 2. Semaphores for Resource Limiting

```python
import asyncio

# Limit concurrent operations
semaphore = asyncio.Semaphore(2)  # Max 2 concurrent

async def limited_operation(name):
    """Operation limited by semaphore."""
    print(f"{name}: waiting for semaphore")

    async with semaphore:
        print(f"{name}: acquired semaphore")
        await asyncio.sleep(2)  # Simulate work
        print(f"{name}: releasing semaphore")

async def test_semaphore():
    # Create 5 tasks, but only 2 run concurrently
    await asyncio.gather(
        limited_operation("Task-1"),
        limited_operation("Task-2"),
        limited_operation("Task-3"),
        limited_operation("Task-4"),
        limited_operation("Task-5")
    )

asyncio.run(test_semaphore())
# Only 2 tasks hold semaphore at any time
```

### 3. Events for Signaling

```python
import asyncio

event = asyncio.Event()

async def waiter(name):
    """Wait for event to be set."""
    print(f"{name}: waiting for event")
    await event.wait()  # Block until event is set
    print(f"{name}: event received!")

async def setter():
    """Set event after delay."""
    await asyncio.sleep(2)
    print("Setter: setting event")
    event.set()  # Wake up all waiters

async def test_event():
    # Create waiters
    await asyncio.gather(
        waiter("Waiter-1"),
        waiter("Waiter-2"),
        waiter("Waiter-3"),
        setter()
    )

asyncio.run(test_event())
```

### 4. Queues for Task Distribution

```python
import asyncio
import random

async def producer(queue, name):
    """Produce items and add to queue."""
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"{name}: produced {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    """Consume items from queue."""
    while True:
        item = await queue.get()  # Block until item available

        if item is None:  # Shutdown signal
            queue.task_done()
            break

        print(f"{name}: consumed {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def test_queue():
    queue = asyncio.Queue(maxsize=10)

    # Create producers and consumers
    await asyncio.gather(
        producer(queue, "Producer-1"),
        producer(queue, "Producer-2"),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
        consumer(queue, "Consumer-3")
    )

    # Wait for all items to be processed
    await queue.join()
    print("All tasks complete")

asyncio.run(test_queue())
```

### 5. Condition Variables

```python
import asyncio

condition = asyncio.Condition()
items = []

async def consumer(name):
    """Wait for items to be available."""
    async with condition:
        # Wait until items are available
        await condition.wait_for(lambda: len(items) > 0)

        item = items.pop(0)
        print(f"{name}: consumed {item}")

async def producer(name):
    """Add items and notify consumers."""
    async with condition:
        item = f"{name}-item"
        items.append(item)
        print(f"{name}: produced {item}")

        # Notify one waiting consumer
        condition.notify(n=1)
        # Or notify all: condition.notify_all()

async def test_condition():
    await asyncio.gather(
        consumer("Consumer-1"),
        consumer("Consumer-2"),
        producer("Producer-1"),
        producer("Producer-2")
    )

asyncio.run(test_condition())
```

## Async HTTP with aiohttp

### 1. Basic HTTP Client

```python
import asyncio
import aiohttp

async def fetch_url(session, url):
    """Fetch single URL."""
    async with session.get(url) as response:
        status = response.status
        text = await response.text()
        return {"url": url, "status": status, "length": len(text)}

async def fetch_multiple_urls():
    """Fetch multiple URLs concurrently."""
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/json",
    ]

    async with aiohttp.ClientSession() as session:
        # Concurrent requests
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")

asyncio.run(fetch_multiple_urls())
```

### 2. HTTP Client with Error Handling

```python
import asyncio
import aiohttp
from typing import Dict, Any

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Dict[str, Any]:
    """Fetch URL with retry logic."""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                response.raise_for_status()  # Raise for 4xx/5xx
                data = await response.json()
                return {"success": True, "data": data}

        except aiohttp.ClientError as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                return {"success": False, "error": str(e)}

            # Exponential backoff
            await asyncio.sleep(2 ** attempt)

        except asyncio.TimeoutError:
            print(f"Attempt {attempt + 1} timed out")
            if attempt == max_retries - 1:
                return {"success": False, "error": "Timeout"}

            await asyncio.sleep(2 ** attempt)

async def parallel_api_calls():
    """Make parallel API calls with error handling."""
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/status/500",  # Will fail
        "https://httpbin.org/delay/1",
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_with_retry(session, url) for url in urls],
            return_exceptions=True  # Don't stop on errors
        )

        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                print(f"{url}: Exception - {result}")
            elif result["success"]:
                print(f"{url}: Success")
            else:
                print(f"{url}: Failed - {result['error']}")

asyncio.run(parallel_api_calls())
```

### 3. HTTP Server with aiohttp

```python
from aiohttp import web
import asyncio

async def handle_hello(request):
    """Simple GET handler."""
    name = request.query.get("name", "World")
    return web.json_response({"message": f"Hello, {name}!"})

async def handle_post(request):
    """POST handler with JSON body."""
    data = await request.json()

    # Simulate async processing
    await asyncio.sleep(1)

    return web.json_response({
        "received": data,
        "status": "processed"
    })

async def handle_stream(request):
    """Streaming response."""
    response = web.StreamResponse()
    await response.prepare(request)

    for i in range(10):
        await response.write(f"Chunk {i}\n".encode())
        await asyncio.sleep(0.5)

    await response.write_eof()
    return response

# Create application
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)

# Run server
if __name__ == "__main__":
    web.run_app(app, host="0.0.0.0", port=8080)
```

### 4. WebSocket Client

```python
import asyncio
import aiohttp

async def websocket_client():
    """Connect to WebSocket server."""
    url = "wss://echo.websocket.org"

    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(url) as ws:
            # Send messages
            await ws.send_str("Hello WebSocket")
            await ws.send_json({"type": "greeting", "data": "test"})

            # Receive messages
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"Received: {msg.data}")

                    if msg.data == "close":
                        await ws.close()
                        break

                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"Error: {ws.exception()}")
                    break

asyncio.run(websocket_client())
```

## Async Database Operations

### 1. PostgreSQL with asyncpg

```python
import asyncio
import asyncpg

async def database_operations():
    """Async PostgreSQL operations."""
    # Create connection pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="user",
        password="password",
        min_size=5,
        max_size=20
    )

    try:
        # Acquire connection from pool
        async with pool.acquire() as conn:
            # Execute query
            rows = await conn.fetch(
                "SELECT id, name, email FROM users WHERE active = $1",
                True
            )

            for row in rows:
                print(f"User: {row['name']} ({row['email']})")

            # Insert data
            await conn.execute(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                "Alice", "[email protected]"
            )

            # Transaction
            async with conn.transaction():
                await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
                await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")

    finally:
        await pool.close()

asyncio.run(database_operations())
```

### 2. MongoDB with motor

```python
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def mongodb_operations():
    """Async MongoDB operations."""
    # Create client
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    try:
        # Insert document
        result = await collection.insert_one({
            "name": "Alice",
            "email": "[email protected]",
            "age": 30
        })
        print(f"Inserted ID: {result.inserted_id}")

        # Find documents
        cursor = collection.find({"age": {"$gte": 25}})
        async for document in cursor:
            print(f"User: {document['name']}")

        # Update document
        await collection.update_one(
            {"name": "Alice"},
            {"$set": {"age": 31}}
        )

        # Aggregation pipeline
        pipeline = [
            {"$match": {"age": {"$gte": 25}}},
            {"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
        ]
        async for result in collection.aggregate(pipeline):
            print(f"Average age: {result['avg_age']}")

    finally:
        client.close()

asyncio.run(mongodb_operations())
```

### 3. Connection Pool Pattern

```python
import asyncio
import asyncpg
from typing import Optional

class DatabasePool:
    """Async database connection pool manager."""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        """Create connection pool."""
        self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)

    async def close(self):
        """Close connection pool."""
        if self.pool:
            await self.pool.close()

    async def execute(self, query: str, *args):
        """Execute query."""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """Fetch multiple rows."""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def fetchrow(self, query: str, *args):
        """Fetch single row."""
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

# Usage
async def use_pool():
    db = DatabasePool("postgresql://user:pass@localhost/mydb")
    await db.connect()

    try:
        # Execute operations
        rows = await db.fetch("SELECT * FROM users")
        for row in rows:
            print(row)
    finally:
        await db.close()

asyncio.run(use_pool())
```

## FastAPI Async Patterns

### 1. Async Endpoints

```python
from fastapi import FastAPI, HTTPException
import asyncio
import httpx

app = FastAPI()

@app.get("/")
async def root():
    """Simple async endpoint."""
    return {"message": "Hello World"}

@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
    """Endpoint with async delay."""
    await asyncio.sleep(seconds)
    return {"message": f"Waited {seconds} seconds"}

@app.get("/fetch")
async def fetch_external():
    """Fetch data from external API."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/json")
        return response.json()

@app.get("/parallel")
async def parallel_requests():
    """Make parallel API calls."""
    async with httpx.AsyncClient() as client:
        responses = await asyncio.gather(
            client.get("https://httpbin.org/delay/1"),
            client.get("https://httpbin.org/delay/2"),
            client.get("https://httpbin.org/json")
        )

        return {
            "results": [r.json() for r in responses]
        }
```

### 2. Background Tasks

```python
from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def send_email(email: str, message: str):
    """Simulate sending email."""
    print(f"Sending email to {email}")
    await asyncio.sleep(5)  # Simulate slow email service
    print(f"Email sent to {email}: {message}")

@app.post("/send-notification")
async def send_notification(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    """Send notification in background."""
    # Add task to background
    background_tasks.add_task(send_email, email, message)

    # Return immediately
    return {"status": "notification queued"}

# Alternative: manual task creation
@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
    """Create background task manually."""
    asyncio.create_task(send_email(email, message))
    return {"status": "notification queued"}
```

### 3. Async Dependencies

```python
from fastapi import FastAPI, Depends
import asyncpg

app = FastAPI()

# Database pool (global)
db_pool = None

async def get_db():
    """Dependency: database connection."""
    async with db_pool.acquire() as conn:
        yield conn

@app.on_event("startup")
async def startup():
    """Initialize database pool on startup."""
    global db_pool
    db_pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb"
    )

@app.on_event("shutdown")
async def shutdown():
    """Close database pool on shutdown."""
    await db_pool.close()

@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
    """Get user with async database dependency."""
    user = await conn.fetchrow(
        "SELECT * FROM users WHERE id = $1",
        user_id
    )

    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return dict(user)
```

### 4. WebSocket Endpoints

```python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio

app = FastAPI()

# Active connections
active_connections: List[WebSocket] = []

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """WebSocket endpoint."""
    await websocket.accept()
    active_connections.append(websocket)

    try:
        while True:
            # Receive message
            data = await websocket.receive_text()

            # Broadcast to all connections
            for connection in active_connections:
                await connection.send_text(f"Broadcast: {data}")

    except WebSocketDisconnect:
        active_connections.remove(websocket)
        print("Client disconnected")

# Background task to send periodic updates
async def broadcast_updates():
    """Send periodic updates to all clients."""
    while True:
        await asyncio.sleep(5)

        for connection in active_connections:
            try:
                await connection.send_text("Periodic update")
            except:
                active_connections.remove(connection)

@app.on_event("startup")
async def startup():
    """Start background update task."""
    asyncio.create_task(broadcast_updates())
```

## Common Patterns and Best Practices

### 1. Timeout Handling

```python
import asyncio

async def slow_operation():
    """Slow operation."""
    await asyncio.sleep(10)
    return "Result"

async def with_timeout():
    """Run operation with timeout."""
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
        print(f"Result: {result}")

    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(with_timeout())
```

### 2. Cancellation Handling

```python
import asyncio

async def cancellable_task():
    """Task that can be cancelled."""
    try:
        for i in range(10):
            print(f"Working: {i}")
            await asyncio.sleep(1)

        return "Complete"

    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise  # Re-raise to propagate cancellation

async def cancel_example():
    """Example of task cancellation."""
    task = asyncio.create_task(cancellable_task())

    # Let it run for a bit
    await asyncio.sleep(3)

    # Cancel the task
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Confirmed: task was cancelled")

asyncio.run(cancel_example())
```

### 3. Resource Cleanup with Context Managers

```python
import asyncio

class AsyncResource:
    """Async context manager for resource management."""

    async def __aenter__(self):
        """Async setup."""
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async setup
        self.connection = "connected"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async cleanup."""
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async cleanup
        self.connection = None

async def use_resource():
    """Use async resource."""
    async with AsyncResource() as resource:
        print(f"Using resource: {resource.connection}")
        await asyncio.sleep(1)
    # Resource automatically cleaned up

asyncio.run(use_resource())
```

### 4. Debouncing and Throttling

```python
import asyncio
from datetime import datetime

class Debouncer:
    """Debounce async function calls."""

    def __init__(self, delay: float):
        self.delay = delay
        self.task = None

    async def call(self, func, *args, **kwargs):
        """Debounced function call."""
        # Cancel previous task
        if self.task:
            self.task.cancel()

        # Create new task
        async def delayed_call():
            await asyncio.sleep(self.delay)
            await func(*args, **kwargs)

        self.task = asyncio.create_task(delayed_call())

async def api_call(query: str):
    """Simulated API call."""
    print(f"{datetime.now()}: API call with query: {query}")

async def debounce_example():
    """Example of debouncing."""
    debouncer = Debouncer(delay=1.0)

    # Rapid calls - only last one executes
    await debouncer.call(api_call, "query1")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query2")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query3")

    # Wait for debounced call
    await asyncio.sleep(2)

asyncio.run(debounce_example())
# Output: Only "query3" API call executes
```

### 5. Rate Limiting

```python
import asyncio
from datetime import datetime

class RateLimiter:
    """Limit rate of async operations."""

    def __init__(self, max_calls: int, period: float):
        self.max_calls = max_calls
        self.period = period
        self.semaphore = asyncio.Semaphore(max_calls)
        self.calls = []

    async def __aenter__(self):
        """Acquire rate limit slot."""
        await self.semaphore.acquire()

        now = asyncio.get_event_loop().time()

        # Remove old calls outside period
        self.calls = [t for t in self.calls if now - t < self.period]

        if len(self.calls) >= self.max_calls:
            # Wait until oldest call expires
            sleep_time = self.period - (now - self.calls[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.calls.append(now)
        return self

    async def __aexit__(self, *args):
        """Release semaphore."""
        self.semaphore.release()

async def rate_limited_operation(limiter, name):
    """Operation with rate limiting."""
    async with limiter:
        print(f"{datetime.now()}: {name}")
        await asyncio.sleep(0.1)

async def rate_limit_example():
    """Example of rate limiting."""
    # Max 3 calls per 2 seconds
    limiter = RateLimiter(max_calls=3, period=2.0)

    # Try to make 6 calls
    await asyncio.gather(*[
        rate_limited_operation(limiter, f"Call-{i}")
        for i in range(6)
    ])

asyncio.run(rate_limit_example())
```

## Debugging Async Code

### 1. Enable Debug Mode

```python
import asyncio
import logging

# Enable asyncio debug mode
asyncio.run(main(), debug=True)

# Or set environment variable:
# PYTHONASYNCIODEBUG=1 python script.py

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def debug_example():
    logger.debug("Starting operation")
    await asyncio.sleep(1)
    logger.debug("Operation complete")
```

### 2. Detect Blocking Code

```python
import asyncio
import time

async def problematic_code():
    """Code with blocking operation."""
    print("Starting")

    # BAD: Blocking sleep
    time.sleep(2)  # This blocks the event loop!

    print("Complete")

# Run with debug mode to detect blocking
asyncio.run(problematic_code(), debug=True)
# Warning: Executing <Task> took 2.001 seconds
```

### 3. Track Pending Tasks

```python
import asyncio

async def track_tasks():
    """Track all pending tasks."""
    # Get all tasks
    tasks = asyncio.all_tasks()

    print(f"Total tasks: {len(tasks)}")
    for task in tasks:
        print(f"  - {task.get_name()}: {task}")

        # Check if task is done
        if task.done():
            try:
                result = task.result()
                print(f"    Result: {result}")
            except Exception as e:
                print(f"    Exception: {e}")

# Create some tasks
async def main():
    task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
    task2 = asyncio.create_task(track_tasks(), name="tracking")

    await task2
    task1.cancel()

asyncio.run(main())
```

## Testing Async Code

### 1. pytest-asyncio Setup

```python
# test_async.py
import pytest
import asyncio

# Mark test as async
@pytest.mark.asyncio
async def test_async_function():
    """Test async function."""
    result = await some_async_function()
    assert result == "expected"

@pytest.mark.asyncio
async def test_async_http():
    """Test async HTTP client."""
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/json") as response:
            assert response.status == 200
            data = await response.json()
            assert "slideshow" in data

# Async fixture
@pytest.fixture
async def async_client():
    """Async test fixture."""
    client = await create_async_client()
    yield client
    await client.close()

@pytest.mark.asyncio
async def test_with_fixture(async_client):
    """Test using async fixture."""
    result = await async_client.fetch_data()
    assert result is not None
```

### 2. Mocking Async Functions

```python
import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    """Test with async mock."""
    # Create async mock
    mock_func = AsyncMock(return_value="mocked result")

    result = await mock_func()
    assert result == "mocked result"
    mock_func.assert_called_once()

@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
    """Test with patched async function."""
    mock_async.return_value = {"status": "success"}

    result = await some_function_that_calls_async()

    assert result["status"] == "success"
    mock_async.assert_called_once()
```

## Performance Optimization

### 1. Use asyncio.gather() for Parallelism

```python
import asyncio
import time

async def slow_task(n):
    await asyncio.sleep(1)
    return n * 2

async def optimized():
    """Parallel execution."""
    start = time.time()

    # Sequential (slow) - 5 seconds
    # results = []
    # for i in range(5):
    #     result = await slow_task(i)
    #     results.append(result)

    # Parallel (fast) - 1 second
    results = await asyncio.gather(*[slow_task(i) for i in range(5)])

    elapsed = time.time() - start
    print(f"Time: {elapsed:.2f}s, Results: {results}")

asyncio.run(optimized())
```

### 2. Connection Pooling

```python
import asyncio
import aiohttp

# BAD: Create new session for each request
async def bad_pattern():
    for i in range(10):
        async with aiohttp.ClientSession() as session:
            async with session.get("https://httpbin.org/json") as response:
                await response.json()

# GOOD: Reuse session with connection pool
async def good_pattern():
    async with aiohttp.ClientSession() as session:
        tasks = [
            session.get("https://httpbin.org/json")
            for i in range(10)
        ]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            await response.json()
```

### 3. Avoid Blocking Operations

```python
import asyncio

# BAD: Blocking I/O in async function
async def bad_file_read():
    with open("large_file.txt") as f:  # Blocks event loop!
        data = f.read()
    return data

# GOOD: Use async file I/O or run in executor
async def good_file_read():
    loop = asyncio.get_running_loop()

    # Run blocking operation in thread pool
    data = await loop.run_in_executor(
        None,
        lambda: open("large_file.txt").read()
    )
    return data

# BETTER: Use aiofiles for async file I/O
import aiofiles

async def better_file_read():
    async with aiofiles.open("large_file.txt") as f:
        data = await f.read()
    return data
```

## Common Pitfalls

### ❌ Anti-Pattern 1: Not Awaiting Coroutines

```python
# WRONG
async def bad():
    result = async_function()  # Returns coroutine, doesn't execute!
    print(result)  # Prints: <coroutine object>

# CORRECT
async def good():
    result = await async_function()  # Actually executes
    print(result)
```

### ❌ Anti-Pattern 2: Blocking the Event Loop

```python
# WRONG
import time

async def bad():
    time.sleep(5)  # Blocks entire event loop!

# CORRECT
async def good():
    await asyncio.sleep(5)  # Non-blocking
```

### ❌ Anti-Pattern 3: Not Handling Cancellation

```python
# WRONG
async def bad():
    await asyncio.sleep(10)
    # No cleanup if cancelled

# CORRECT
async def good():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        # Cleanup resources
        await cleanup()
        raise  # Re-raise to propagate
```

### ❌ Anti-Pattern 4: Creating Event Loop Incorrectly

```python
# WRONG (Python 3.7+)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# CORRECT (Python 3.7+)
asyncio.run(main())
```

### ❌ Anti-Pattern 5: Not Closing Resources

```python
# WRONG
async def bad():
    session = aiohttp.ClientSession()
    response = await session.get(url)
    # Session never closed - resource leak!

# CORRECT
async def good():
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        # Session automatically closed
```

## Best Practices

1. **Use asyncio.run()** for entry point (Python 3.7+)
2. **Always await coroutines** - don't forget await
3. **Use async context managers** for resource cleanup
4. **Connection pooling** for HTTP and database clients
5. **Handle CancelledError** for graceful shutdown
6. **Use asyncio.gather()** for parallel operations
7. **Avoid blocking operations** in async functions
8. **Use timeouts** to prevent hanging operations
9. **Debug mode** during development to catch issues
10. **Test async code** with pytest-asyncio

## Quick Reference

### Common Commands

```bash
# Run async script
python script.py

# Run with debug mode
PYTHONASYNCIODEBUG=1 python script.py

# Run tests
pytest -v --asyncio-mode=auto

# Install async dependencies
pip install aiohttp asyncpg motor pytest-asyncio
```

### Essential Imports

```python
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Any
```

## Resources

- **Official Documentation**: https://docs.python.org/3/library/asyncio.html
- **aiohttp**: https://docs.aiohttp.org/
- **asyncpg**: https://magicstack.github.io/asyncpg/
- **FastAPI Async**: https://fastapi.tiangolo.com/async/
- **pytest-asyncio**: https://pytest-asyncio.readthedocs.io/

## Related Skills

When using asyncio, consider these complementary skills:

- **fastapi-local-dev**: FastAPI async server patterns and production deployment
- **pytest**: Testing async code with pytest-asyncio and fixtures
- **systematic-debugging**: Debugging async race conditions and deadlocks

### Quick FastAPI Async Patterns (Inlined for Standalone Use)

```python
# FastAPI async endpoint pattern
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

app = FastAPI()

# Async database setup
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    # Async database query
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# Background tasks with asyncio
@app.post("/send-email")
async def send_email_endpoint(email: EmailSchema):
    # Non-blocking background task
    asyncio.create_task(send_email_async(email))
    return {"status": "email queued"}
```

### Quick pytest-asyncio Patterns (Inlined for Standalone Use)

```python
# Testing async functions with pytest
import pytest
import pytest_asyncio
from httpx import AsyncClient

# Async test fixture
@pytest_asyncio.fixture
async def async_client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

# Async test function
@pytest.mark.asyncio
async def test_get_user(async_client):
    response = await async_client.get("/users/1")
    assert response.status_code == 200
    assert response.json()["id"] == 1

# Testing concurrent operations
@pytest.mark.asyncio
async def test_concurrent_requests():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Run 10 requests concurrently
        responses = await asyncio.gather(
            *[client.get(f"/users/{i}") for i in range(1, 11)]
        )
        assert all(r.status_code == 200 for r in responses)

# Mock async dependencies
@pytest_asyncio.fixture
async def mock_db():
    # Setup mock database
    db = AsyncMock()
    yield db
    # Cleanup
```

### Quick Async Debugging Reference (Inlined for Standalone Use)

**Common Async Pitfalls:**

1. **Blocking the Event Loop**
   ```python
   # ❌ WRONG: Blocking call in async function
   async def bad_function():
       time.sleep(5)  # Blocks entire event loop!
       return "done"

   # ✅ CORRECT: Use asyncio.sleep
   async def good_function():
       await asyncio.sleep(5)  # Releases event loop
       return "done"
   ```

2. **Debugging Race Conditions**
   ```python
   # Add logging to track execution order
   import logging
   logging.basicConfig(level=logging.DEBUG)

   async def debug_task(name):
       logging.debug(f"{name}: Starting")
       await asyncio.sleep(1)
       logging.debug(f"{name}: Finished")
       return name

   # Run with detailed tracing
   asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
   ```

3. **Deadlock Detection**
   ```python
   # Use timeout to detect deadlocks
   try:
       result = await asyncio.wait_for(some_async_function(), timeout=5.0)
   except asyncio.TimeoutError:
       logging.error("Deadlock detected: operation timed out")
       # Investigate what's blocking
   ```

4. **Inspecting Running Tasks**
   ```python
   # Check all pending tasks
   tasks = asyncio.all_tasks()
   for task in tasks:
       print(f"Task: {task.get_name()}, Done: {task.done()}")
       if not task.done():
           print(f"  Current coro: {task.get_coro()}")
   ```

[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]

---

**Python Version Compatibility:** This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.