home / skills / pluginagentmarketplace / custom-plugin-python / asyncio-programming

asyncio-programming skill

/skills/asyncio-programming

This skill helps you master asynchronous programming with asyncio to write concurrent, efficient I/O-bound code and build scalable async applications.

npx playbooks add skill pluginagentmarketplace/custom-plugin-python --skill asyncio-programming

Review the files below or copy the command above to add this skill to your agents.

Files (4)
SKILL.md
9.3 KB
---
name: Asyncio Programming
description: Master asynchronous programming with asyncio, async/await, concurrent operations, and async frameworks
version: "2.1.0"
sasmp_version: "1.3.0"
bonded_agent: 05-async-concurrency
bond_type: PRIMARY_BOND

# Skill Configuration
retry_strategy: exponential_backoff
observability:
  logging: true
  metrics: task_completion_rate
---

# Asyncio Programming

## Overview

Master asynchronous programming in Python with asyncio. Learn to write concurrent code that efficiently handles I/O-bound operations, build async web applications, and understand the async/await paradigm.

## Learning Objectives

- Understand asynchronous programming concepts
- Write async functions with async/await syntax
- Manage concurrent operations with asyncio
- Build async web applications
- Handle async I/O operations efficiently
- Debug and test async code

## Core Topics

### 1. Async/Await Basics
- Understanding coroutines
- async/await syntax
- Event loop fundamentals
- Running async functions
- Async vs sync execution
- Common pitfalls

**Code Example:**
```python
import asyncio
import time

# Synchronous version (slow)
def fetch_data_sync(url):
    print(f"Fetching {url}...")
    time.sleep(2)  # Simulating network delay
    return f"Data from {url}"

def main_sync():
    urls = ['url1', 'url2', 'url3']
    results = []
    for url in urls:
        data = fetch_data_sync(url)
        results.append(data)
    return results

# Takes 6 seconds (2 * 3)
start = time.time()
main_sync()
print(f"Sync took: {time.time() - start:.2f}s")

# Asynchronous version (fast)
async def fetch_data_async(url):
    print(f"Fetching {url}...")
    await asyncio.sleep(2)  # Non-blocking sleep
    return f"Data from {url}"

async def main_async():
    urls = ['url1', 'url2', 'url3']
    # Create tasks and run concurrently
    tasks = [fetch_data_async(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

# Takes 2 seconds (concurrent execution)
start = time.time()
asyncio.run(main_async())
print(f"Async took: {time.time() - start:.2f}s")
```

### 2. Asyncio Tasks & Coroutines
- Creating and managing tasks
- asyncio.gather() vs asyncio.wait()
- Task cancellation
- Task groups (Python 3.11+)
- Exception handling in tasks
- Timeouts

**Code Example:**
```python
import asyncio

async def process_item(item_id, delay):
    print(f"Processing item {item_id}")
    await asyncio.sleep(delay)
    if item_id == 3:
        raise ValueError(f"Item {item_id} failed!")
    return f"Result {item_id}"

async def main():
    # Method 1: gather (returns results in order)
    tasks = [
        process_item(1, 1),
        process_item(2, 2),
        process_item(3, 1),
    ]
    try:
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                print(f"Task {i} failed: {result}")
            else:
                print(f"Task {i} result: {result}")
    except Exception as e:
        print(f"Error: {e}")

    # Method 2: wait (returns done/pending sets)
    tasks = [
        asyncio.create_task(process_item(i, i))
        for i in range(1, 4)
    ]
    done, pending = await asyncio.wait(tasks, timeout=2.5)

    print(f"Completed: {len(done)}, Pending: {len(pending)}")

    # Cancel pending tasks
    for task in pending:
        task.cancel()

    # Method 3: Task groups (Python 3.11+)
    async with asyncio.TaskGroup() as tg:
        for i in range(1, 4):
            tg.create_task(process_item(i, 1))
    # All tasks completed or exception raised

asyncio.run(main())
```

### 3. Async I/O Operations
- Async file operations (aiofiles)
- Async HTTP requests (aiohttp)
- Async database operations (asyncpg, motor)
- Async messaging (aio-pika)
- Streams and protocols

**Code Example:**
```python
import asyncio
import aiohttp
import aiofiles
from typing import List

# Async HTTP requests
async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def fetch_multiple_urls(urls: List[str]):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

# Async file operations
async def read_file_async(filepath):
    async with aiofiles.open(filepath, 'r') as f:
        content = await f.read()
        return content

async def write_file_async(filepath, content):
    async with aiofiles.open(filepath, 'w') as f:
        await f.write(content)

# Async database operations (example with asyncpg)
import asyncpg

async def fetch_users():
    conn = await asyncpg.connect(
        user='user',
        password='password',
        database='mydb',
        host='localhost'
    )
    try:
        rows = await conn.fetch('SELECT * FROM users')
        return rows
    finally:
        await conn.close()

# Usage
async def main():
    # Fetch URLs concurrently
    urls = [
        'https://api.example.com/data1',
        'https://api.example.com/data2',
        'https://api.example.com/data3',
    ]
    results = await fetch_multiple_urls(urls)

    # Read/write files
    content = await read_file_async('input.txt')
    await write_file_async('output.txt', content.upper())

    # Database operations
    users = await fetch_users()
    print(f"Found {len(users)} users")

asyncio.run(main())
```

### 4. Async Web Frameworks
- FastAPI async routes
- aiohttp web server
- WebSocket handling
- Background tasks
- Middleware and dependencies

**Code Example:**
```python
# FastAPI async example
from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

# Async route
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # Async database call
    user = await fetch_user_from_db(user_id)
    return user

# Background task
async def send_notification(email: str, message: str):
    await asyncio.sleep(2)  # Simulate email sending
    print(f"Sent email to {email}: {message}")

@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
    # Process order synchronously
    order_id = save_order(order_data)

    # Send notification in background
    background_tasks.add_task(
        send_notification,
        order_data['customer_email'],
        f"Order #{order_id} created"
    )

    return {"order_id": order_id}

# aiohttp web server
from aiohttp import web

async def handle_request(request):
    name = request.match_info.get('name', 'Anonymous')
    await asyncio.sleep(1)  # Async operation
    return web.json_response({'message': f'Hello {name}'})

app = web.Application()
app.add_routes([web.get('/{name}', handle_request)])

# WebSocket example
async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT:
            await ws.send_str(f"Echo: {msg.data}")
        elif msg.type == web.WSMsgType.ERROR:
            print(f'Error: {ws.exception()}')

    return ws

app.add_routes([web.get('/ws', websocket_handler)])
```

## Hands-On Practice

### Project 1: Async Web Scraper
Build a concurrent web scraper with rate limiting.

**Requirements:**
- Scrape multiple websites concurrently
- Implement rate limiting
- Handle errors gracefully
- Save results to async database
- Progress tracking
- Retry failed requests

**Key Skills:** aiohttp, async I/O, error handling

### Project 2: Real-time Chat Server
Create a WebSocket-based chat application.

**Requirements:**
- WebSocket server with aiohttp
- Multiple chat rooms
- User authentication
- Message broadcasting
- Connection management
- Message history persistence

**Key Skills:** WebSockets, async server, state management

### Project 3: Async Task Queue
Build a distributed task processing system.

**Requirements:**
- Task queue with Redis/RabbitMQ
- Worker pool management
- Task prioritization
- Result caching
- Progress monitoring
- Graceful shutdown

**Key Skills:** Message queues, concurrent workers, cleanup

## Assessment Criteria

- [ ] Understand async/await semantics
- [ ] Write efficient concurrent code
- [ ] Handle async exceptions properly
- [ ] Use asyncio tasks effectively
- [ ] Build async web applications
- [ ] Debug async code
- [ ] Manage async resources (cleanup)

## Resources

### Official Documentation
- [asyncio Docs](https://docs.python.org/3/library/asyncio.html) - Official documentation
- [aiohttp](https://docs.aiohttp.org/) - Async HTTP client/server
- [FastAPI](https://fastapi.tiangolo.com/) - Modern async web framework

### Learning Platforms
- [Using Asyncio in Python](https://realpython.com/async-io-python/) - Real Python guide
- [Python Concurrency](https://learning.oreilly.com/library/view/using-asyncio-in/9781492075325/) - O'Reilly book
- [Asyncio Recipes](https://www.amazon.com/Python-Asyncio-Recipes-Caleb-Hattingh/dp/1484244001) - Practical examples

### Tools
- [aiohttp](https://docs.aiohttp.org/) - Async HTTP
- [aiofiles](https://github.com/Tinche/aiofiles) - Async file I/O
- [asyncpg](https://magicstack.github.io/asyncpg/) - Async PostgreSQL
- [aiodebug](https://github.com/mosquito/aiodebug) - Async debugging

## Next Steps

After mastering asyncio, explore:
- **Multiprocessing** - CPU-bound parallelism
- **Celery** - Distributed task queue
- **gRPC** - Async RPC framework
- **Kafka** - Async event streaming

Overview

This skill teaches asynchronous programming in Python using asyncio, async/await, and related async libraries. It focuses on writing concurrent I/O-bound code, building async web services, and managing tasks, timeouts, and errors for reliable production systems. Practical examples and hands-on projects reinforce concepts for real-world use.

How this skill works

The curriculum inspects core asyncio primitives: coroutines, the event loop, tasks, and TaskGroup patterns. It demonstrates async I/O with aiohttp, aiofiles, and async database drivers, and shows how to structure async web apps (FastAPI, aiohttp) and WebSocket services. Each topic includes examples for creating, running, cancelling, and debugging concurrent work, plus project-based exercises to apply the techniques.

When to use it

  • When your application is I/O-bound and needs high concurrency with low thread overhead
  • When building async web APIs, WebSocket servers, or background workers
  • When performing concurrent file, HTTP, or database operations
  • When you need non-blocking integrations with message brokers or external services
  • When reducing latency by running many independent network calls concurrently

Best practices

  • Favor async libraries end-to-end to avoid blocking the event loop (use aiohttp, aiofiles, asyncpg)
  • Use asyncio.create_task and TaskGroup to manage background work and ensure proper cancellation/cleanup
  • Prefer asyncio.gather(return_exceptions=True) or wait for structured error handling in concurrent tasks
  • Apply timeouts and retries around external calls to avoid stuck tasks
  • Test and debug async flows with dedicated tools (aiodebug) and write integration tests that run the event loop

Example use cases

  • Concurrent web scraper that respects rate limits and retries failed requests (aiohttp + async DB)
  • Realtime chat server using WebSockets and connection management with aiohttp or FastAPI
  • Async task queue with Redis/RabbitMQ workers, graceful shutdown, and progress tracking
  • High-throughput API gateway that proxies many backend services concurrently
  • Background jobs and notification senders that run without blocking request handlers

FAQ

Will asyncio speed up CPU-bound code?

No. asyncio improves concurrency for I/O-bound tasks. For CPU-bound work use multiprocessing or offload work to worker processes or thread pools.

How do I avoid blocking the event loop?

Never call blocking I/O or long-running synchronous code directly. Use async libraries, run blocking calls in an executor, or move them to separate processes.