home / skills / markus41 / claude / fastapi-background
This skill helps you implement background task processing in FastAPI using ARQ, Celery, and Dramatiq for scalable async jobs.
npx playbooks add skill markus41/claude --skill fastapi-backgroundReview the files below or copy the command above to add this skill to your agents.
---
name: FastAPI Background Tasks
description: This skill should be used when the user asks to "create background task", "add async job", "implement task queue", "schedule periodic task", "use Celery", "use ARQ", "process async", or mentions background processing, task queues, job scheduling, workers, or async jobs. Provides multiple task queue framework patterns.
version: 0.1.0
---
# FastAPI Background Task Processing
This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.
## ARQ (Async Redis Queue) - Recommended
### Installation
```bash
pip install arq
```
### Configuration
```python
# app/workers/config.py
from arq.connections import RedisSettings
from app.config import get_settings
settings = get_settings()
class WorkerSettings:
redis_settings = RedisSettings(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_password,
database=1 # Separate from cache
)
# Job settings
max_jobs = 10
job_timeout = 300 # 5 minutes
keep_result = 3600 # 1 hour
queue_name = "default"
# Cron jobs
cron_jobs = []
```
### Task Definitions
```python
# app/workers/tasks.py
from arq import cron
from typing import Dict, Any
import asyncio
async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str):
"""Send email asynchronously."""
email_service = ctx.get("email_service")
await email_service.send(to=to, subject=subject, body=body)
return {"status": "sent", "to": to}
async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str):
"""Process uploaded file (resize, convert, etc.)."""
storage = ctx.get("storage")
file_data = await storage.get(file_id)
# Process file
processed = await process_file(file_data)
# Save processed file
await storage.put(f"processed/{file_id}", processed)
return {"status": "processed", "file_id": file_id}
async def cleanup_expired(ctx: Dict[str, Any]):
"""Periodic cleanup of expired data."""
db = ctx.get("db")
result = await db.delete_expired()
return {"deleted": result.deleted_count}
# Cron job example
@cron(hour=2, minute=0) # Run at 2 AM daily
async def daily_report(ctx: Dict[str, Any]):
"""Generate daily report."""
report_service = ctx.get("report_service")
await report_service.generate_daily()
```
### Worker Entry Point
```python
# app/workers/main.py
from arq import create_pool
from arq.connections import RedisSettings
from app.workers.config import WorkerSettings
from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report
from app.infrastructure.database import init_database
from app.services.email import EmailService
async def startup(ctx: Dict[str, Any]):
"""Worker startup - initialize services."""
await init_database()
ctx["email_service"] = EmailService()
ctx["db"] = get_db()
async def shutdown(ctx: Dict[str, Any]):
"""Worker shutdown - cleanup."""
await close_database()
class WorkerSettings(WorkerSettings):
functions = [send_email, process_upload, cleanup_expired]
cron_jobs = [daily_report]
on_startup = startup
on_shutdown = shutdown
# Run with: arq app.workers.main.WorkerSettings
```
### Enqueueing Tasks from FastAPI
```python
# app/dependencies.py
from arq import ArqRedis, create_pool
from arq.connections import RedisSettings
async def get_task_queue() -> ArqRedis:
return await create_pool(RedisSettings())
# app/routes/users.py
from fastapi import Depends
from arq import ArqRedis
@router.post("/users/{user_id}/welcome")
async def send_welcome_email(
user_id: str,
queue: ArqRedis = Depends(get_task_queue)
):
user = await get_user(user_id)
# Enqueue background task
job = await queue.enqueue_job(
"send_email",
to=user.email,
subject="Welcome!",
body="Thanks for signing up."
)
return {"job_id": job.job_id, "status": "queued"}
@router.post("/uploads")
async def upload_file(
file: UploadFile,
user: User = Depends(get_current_user),
queue: ArqRedis = Depends(get_task_queue)
):
# Save file
file_id = await save_file(file)
# Enqueue processing
await queue.enqueue_job(
"process_upload",
file_id=file_id,
user_id=str(user.id),
_defer_by=5 # Delay 5 seconds
)
return {"file_id": file_id, "status": "processing"}
```
## Celery (Battle-Tested)
### Configuration
```python
# app/workers/celery_app.py
from celery import Celery
from app.config import get_settings
settings = get_settings()
celery_app = Celery(
"worker",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include=["app.workers.celery_tasks"]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300,
worker_prefetch_multiplier=1,
)
# Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = {
"cleanup-every-hour": {
"task": "app.workers.celery_tasks.cleanup_expired",
"schedule": 3600.0,
},
"daily-report": {
"task": "app.workers.celery_tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=0),
},
}
```
### Celery Tasks
```python
# app/workers/celery_tasks.py
from app.workers.celery_app import celery_app
import asyncio
def run_async(coro):
"""Helper to run async code in sync Celery tasks."""
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
@celery_app.task(bind=True, max_retries=3)
def send_email(self, to: str, subject: str, body: str):
try:
run_async(_send_email_async(to, subject, body))
return {"status": "sent", "to": to}
except Exception as exc:
self.retry(exc=exc, countdown=60)
@celery_app.task
def process_upload(file_id: str, user_id: str):
run_async(_process_upload_async(file_id, user_id))
return {"status": "processed", "file_id": file_id}
```
## Dramatiq (Modern Celery Alternative)
### Configuration
```python
# app/workers/dramatiq_app.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
redis_broker = RedisBroker(url="redis://localhost:6379/0")
result_backend = RedisBackend(url="redis://localhost:6379/1")
redis_broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(redis_broker)
```
### Dramatiq Tasks
```python
# app/workers/dramatiq_tasks.py
import dramatiq
@dramatiq.actor(max_retries=3, min_backoff=1000)
def send_email(to: str, subject: str, body: str):
# Sync implementation
return {"status": "sent", "to": to}
@dramatiq.actor(time_limit=300000) # 5 min timeout
def process_upload(file_id: str, user_id: str):
return {"status": "processed", "file_id": file_id}
```
## FastAPI Built-in Background Tasks
For simple fire-and-forget tasks (no persistence):
```python
from fastapi import BackgroundTasks
async def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@router.post("/log")
async def create_log(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, message)
return {"status": "logged"}
```
## Additional Resources
### Reference Files
For detailed patterns:
- **`references/arq-advanced.md`** - ARQ advanced patterns, retries, priorities
- **`references/celery-patterns.md`** - Celery best practices, chains, groups
- **`references/monitoring.md`** - Flower, task monitoring
### Example Files
Working examples in `examples/`:
- **`examples/arq_worker.py`** - Complete ARQ worker
- **`examples/celery_app.py`** - Celery configuration
- **`examples/task_service.py`** - Task enqueueing service
This skill provides practical patterns for adding background processing to FastAPI apps using multiple frameworks: ARQ (recommended for async), Celery, Dramatiq, and FastAPI's built-in BackgroundTasks. It explains configuration, task definitions, worker entry points, and enqueueing from request handlers. The goal is to help you pick the right pattern and implement resilient, scalable async jobs and periodic tasks.
It outlines three recommended approaches: ARQ for native asyncio workers, Celery for a battle-tested distributed task system, and Dramatiq as a modern alternative. For each approach the skill shows configuration, task function examples, worker startup/shutdown hooks, and how to enqueue jobs from FastAPI. It also includes a simple fire-and-forget example using FastAPI's BackgroundTasks for lightweight tasks without persistence.
Which task system should I choose for asyncio apps?
Use ARQ—it's designed for asyncio, supports cron jobs, and avoids running async code inside sync workers.
When is FastAPI BackgroundTasks sufficient?
Use it for simple fire-and-forget tasks that don't need persistence, retries, or separate worker processes; not for reliability or long-running jobs.