home / skills / markus41 / claude / fastapi-background

fastapi-background skill

/plugins/fastapi-backend/skills/fastapi-background

This skill helps you implement background task processing in FastAPI using ARQ, Celery, and Dramatiq for scalable async jobs.

npx playbooks add skill markus41/claude --skill fastapi-background

Review the files below or copy the command above to add this skill to your agents.

Files (1)
SKILL.md
7.8 KB
---
name: FastAPI Background Tasks
description: This skill should be used when the user asks to "create background task", "add async job", "implement task queue", "schedule periodic task", "use Celery", "use ARQ", "process async", or mentions background processing, task queues, job scheduling, workers, or async jobs. Provides multiple task queue framework patterns.
version: 0.1.0
---

# FastAPI Background Task Processing

This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.

## ARQ (Async Redis Queue) - Recommended

### Installation

```bash
pip install arq
```

### Configuration

```python
# app/workers/config.py
from arq.connections import RedisSettings
from app.config import get_settings

settings = get_settings()

class WorkerSettings:
    redis_settings = RedisSettings(
        host=settings.redis_host,
        port=settings.redis_port,
        password=settings.redis_password,
        database=1  # Separate from cache
    )

    # Job settings
    max_jobs = 10
    job_timeout = 300  # 5 minutes
    keep_result = 3600  # 1 hour
    queue_name = "default"

    # Cron jobs
    cron_jobs = []
```

### Task Definitions

```python
# app/workers/tasks.py
from arq import cron
from typing import Dict, Any
import asyncio

async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str):
    """Send email asynchronously."""
    email_service = ctx.get("email_service")
    await email_service.send(to=to, subject=subject, body=body)
    return {"status": "sent", "to": to}

async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str):
    """Process uploaded file (resize, convert, etc.)."""
    storage = ctx.get("storage")
    file_data = await storage.get(file_id)

    # Process file
    processed = await process_file(file_data)

    # Save processed file
    await storage.put(f"processed/{file_id}", processed)

    return {"status": "processed", "file_id": file_id}

async def cleanup_expired(ctx: Dict[str, Any]):
    """Periodic cleanup of expired data."""
    db = ctx.get("db")
    result = await db.delete_expired()
    return {"deleted": result.deleted_count}

# Cron job example
@cron(hour=2, minute=0)  # Run at 2 AM daily
async def daily_report(ctx: Dict[str, Any]):
    """Generate daily report."""
    report_service = ctx.get("report_service")
    await report_service.generate_daily()
```

### Worker Entry Point

```python
# app/workers/main.py
from arq import create_pool
from arq.connections import RedisSettings
from app.workers.config import WorkerSettings
from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report
from app.infrastructure.database import init_database
from app.services.email import EmailService

async def startup(ctx: Dict[str, Any]):
    """Worker startup - initialize services."""
    await init_database()
    ctx["email_service"] = EmailService()
    ctx["db"] = get_db()

async def shutdown(ctx: Dict[str, Any]):
    """Worker shutdown - cleanup."""
    await close_database()

class WorkerSettings(WorkerSettings):
    functions = [send_email, process_upload, cleanup_expired]
    cron_jobs = [daily_report]
    on_startup = startup
    on_shutdown = shutdown

# Run with: arq app.workers.main.WorkerSettings
```

### Enqueueing Tasks from FastAPI

```python
# app/dependencies.py
from arq import ArqRedis, create_pool
from arq.connections import RedisSettings

async def get_task_queue() -> ArqRedis:
    return await create_pool(RedisSettings())

# app/routes/users.py
from fastapi import Depends
from arq import ArqRedis

@router.post("/users/{user_id}/welcome")
async def send_welcome_email(
    user_id: str,
    queue: ArqRedis = Depends(get_task_queue)
):
    user = await get_user(user_id)

    # Enqueue background task
    job = await queue.enqueue_job(
        "send_email",
        to=user.email,
        subject="Welcome!",
        body="Thanks for signing up."
    )

    return {"job_id": job.job_id, "status": "queued"}

@router.post("/uploads")
async def upload_file(
    file: UploadFile,
    user: User = Depends(get_current_user),
    queue: ArqRedis = Depends(get_task_queue)
):
    # Save file
    file_id = await save_file(file)

    # Enqueue processing
    await queue.enqueue_job(
        "process_upload",
        file_id=file_id,
        user_id=str(user.id),
        _defer_by=5  # Delay 5 seconds
    )

    return {"file_id": file_id, "status": "processing"}
```

## Celery (Battle-Tested)

### Configuration

```python
# app/workers/celery_app.py
from celery import Celery
from app.config import get_settings

settings = get_settings()

celery_app = Celery(
    "worker",
    broker=settings.celery_broker_url,
    backend=settings.celery_result_backend,
    include=["app.workers.celery_tasks"]
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,
    worker_prefetch_multiplier=1,
)

# Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = {
    "cleanup-every-hour": {
        "task": "app.workers.celery_tasks.cleanup_expired",
        "schedule": 3600.0,
    },
    "daily-report": {
        "task": "app.workers.celery_tasks.generate_daily_report",
        "schedule": crontab(hour=2, minute=0),
    },
}
```

### Celery Tasks

```python
# app/workers/celery_tasks.py
from app.workers.celery_app import celery_app
import asyncio

def run_async(coro):
    """Helper to run async code in sync Celery tasks."""
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro)

@celery_app.task(bind=True, max_retries=3)
def send_email(self, to: str, subject: str, body: str):
    try:
        run_async(_send_email_async(to, subject, body))
        return {"status": "sent", "to": to}
    except Exception as exc:
        self.retry(exc=exc, countdown=60)

@celery_app.task
def process_upload(file_id: str, user_id: str):
    run_async(_process_upload_async(file_id, user_id))
    return {"status": "processed", "file_id": file_id}
```

## Dramatiq (Modern Celery Alternative)

### Configuration

```python
# app/workers/dramatiq_app.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend

redis_broker = RedisBroker(url="redis://localhost:6379/0")
result_backend = RedisBackend(url="redis://localhost:6379/1")

redis_broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(redis_broker)
```

### Dramatiq Tasks

```python
# app/workers/dramatiq_tasks.py
import dramatiq

@dramatiq.actor(max_retries=3, min_backoff=1000)
def send_email(to: str, subject: str, body: str):
    # Sync implementation
    return {"status": "sent", "to": to}

@dramatiq.actor(time_limit=300000)  # 5 min timeout
def process_upload(file_id: str, user_id: str):
    return {"status": "processed", "file_id": file_id}
```

## FastAPI Built-in Background Tasks

For simple fire-and-forget tasks (no persistence):

```python
from fastapi import BackgroundTasks

async def write_log(message: str):
    with open("log.txt", "a") as f:
        f.write(f"{message}\n")

@router.post("/log")
async def create_log(message: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, message)
    return {"status": "logged"}
```

## Additional Resources

### Reference Files

For detailed patterns:
- **`references/arq-advanced.md`** - ARQ advanced patterns, retries, priorities
- **`references/celery-patterns.md`** - Celery best practices, chains, groups
- **`references/monitoring.md`** - Flower, task monitoring

### Example Files

Working examples in `examples/`:
- **`examples/arq_worker.py`** - Complete ARQ worker
- **`examples/celery_app.py`** - Celery configuration
- **`examples/task_service.py`** - Task enqueueing service

Overview

This skill provides practical patterns for adding background processing to FastAPI apps using multiple frameworks: ARQ (recommended for async), Celery, Dramatiq, and FastAPI's built-in BackgroundTasks. It explains configuration, task definitions, worker entry points, and enqueueing from request handlers. The goal is to help you pick the right pattern and implement resilient, scalable async jobs and periodic tasks.

How this skill works

It outlines three recommended approaches: ARQ for native asyncio workers, Celery for a battle-tested distributed task system, and Dramatiq as a modern alternative. For each approach the skill shows configuration, task function examples, worker startup/shutdown hooks, and how to enqueue jobs from FastAPI. It also includes a simple fire-and-forget example using FastAPI's BackgroundTasks for lightweight tasks without persistence.

When to use it

  • You need true async workers that run asyncio coroutines (use ARQ).
  • You require mature features, complex workflows, result backends or Celery integrations.
  • You want a lightweight modern alternative with simple Redis-backed actors (Dramatiq).
  • You only need quick fire-and-forget tasks with no persistence (FastAPI BackgroundTasks).
  • You need periodic jobs or cron-style scheduling for maintenance tasks.

Best practices

  • Prefer ARQ when your app code is asyncio-native to avoid sync bridging and blocking.
  • Keep worker responsibilities small and idempotent; persist state in DB or storage rather than relying on in-memory state.
  • Use separate Redis databases for queues, results, and cache to avoid interference.
  • Configure sensible timeouts, retries, and max concurrency to prevent runaway jobs.
  • Expose enqueueing through dependency-injected queue clients in FastAPI to keep routes simple and testable.

Example use cases

  • Send welcome or transactional emails asynchronously after user signup using ARQ or Celery.
  • Process uploaded files (resize, transcode) in background workers and update records when complete.
  • Run nightly or hourly cleanup and reporting tasks via ARQ cron jobs or Celery Beat.
  • Implement retryable long-running jobs with result backends for status tracking.
  • Quick logging or telemetry writes using FastAPI BackgroundTasks for minimal latency.

FAQ

Which task system should I choose for asyncio apps?

Use ARQ—it's designed for asyncio, supports cron jobs, and avoids running async code inside sync workers.

When is FastAPI BackgroundTasks sufficient?

Use it for simple fire-and-forget tasks that don't need persistence, retries, or separate worker processes; not for reliability or long-running jobs.