home / skills / yonatangross / orchestkit / background-jobs

background-jobs skill

/plugins/ork/skills/background-jobs

This skill helps you implement reliable background tasks and job queues using ARQ, Celery, or Redis, boosting throughput and reliability.

npx playbooks add skill yonatangross/orchestkit --skill background-jobs

Review the files below or copy the command above to add this skill to your agents.

Files (5)
SKILL.md
9.2 KB
---
name: background-jobs
description: Async task processing with Celery, ARQ, and Redis for Python backends. Use when implementing background tasks, job queues, workers, scheduled jobs, or periodic task processing.
context: fork
agent: data-pipeline-engineer
version: 1.0.0
tags: [background-jobs, celery, arq, redis, async, python, 2026]
author: OrchestKit
user-invocable: false
---

# Background Job Patterns

Offload long-running tasks with async job queues.

## Overview

- Long-running tasks (report generation, data processing)
- Email/notification sending
- Scheduled/periodic tasks
- Webhook processing
- Data export/import pipelines
- Non-LLM async operations (use LangGraph for LLM workflows)

## Tool Selection

| Tool | Language | Best For | Complexity |
|------|----------|----------|------------|
| ARQ | Python (async) | FastAPI, simple jobs | Low |
| Celery | Python | Complex workflows, enterprise | High |
| RQ | Python | Simple Redis queues | Low |
| Dramatiq | Python | Reliable messaging | Medium |

## ARQ (Async Redis Queue)

### Setup

```python
# backend/app/workers/arq_worker.py
from arq import create_pool
from arq.connections import RedisSettings

async def startup(ctx: dict):
    """Initialize worker resources."""
    ctx["db"] = await create_db_pool()
    ctx["http"] = httpx.AsyncClient()

async def shutdown(ctx: dict):
    """Cleanup worker resources."""
    await ctx["db"].close()
    await ctx["http"].aclose()

class WorkerSettings:
    redis_settings = RedisSettings(host="redis", port=6379)
    functions = [
        send_email,
        generate_report,
        process_webhook,
    ]
    on_startup = startup
    on_shutdown = shutdown
    max_jobs = 10
    job_timeout = 300  # 5 minutes
```

### Task Definition

```python
from arq import func

async def send_email(
    ctx: dict,
    to: str,
    subject: str,
    body: str,
) -> dict:
    """Send email task."""
    http = ctx["http"]
    response = await http.post(
        "https://api.sendgrid.com/v3/mail/send",
        json={"to": to, "subject": subject, "html": body},
        headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
    )
    return {"status": response.status_code, "to": to}

async def generate_report(
    ctx: dict,
    report_id: str,
    format: str = "pdf",
) -> dict:
    """Generate report asynchronously."""
    db = ctx["db"]
    data = await db.fetch_report_data(report_id)
    pdf_bytes = await render_pdf(data)
    await db.save_report_file(report_id, pdf_bytes)
    return {"report_id": report_id, "size": len(pdf_bytes)}
```

### Enqueue from FastAPI

```python
from arq import create_pool
from arq.connections import RedisSettings

# Dependency
async def get_arq_pool():
    return await create_pool(RedisSettings(host="redis"))

@router.post("/api/v1/reports")
async def create_report(
    data: ReportRequest,
    arq: ArqRedis = Depends(get_arq_pool),
):
    report = await service.create_report(data)

    # Enqueue background job
    job = await arq.enqueue_job(
        "generate_report",
        report.id,
        format=data.format,
    )

    return {"report_id": report.id, "job_id": job.job_id}

@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(
    job_id: str,
    arq: ArqRedis = Depends(get_arq_pool),
):
    job = Job(job_id, arq)
    status = await job.status()
    result = await job.result() if status == JobStatus.complete else None
    return {"job_id": job_id, "status": status, "result": result}
```

## Celery (Enterprise)

### Setup

```python
# backend/app/workers/celery_app.py
from celery import Celery

celery_app = Celery(
    "orchestkit",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1",
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    task_track_started=True,
    task_time_limit=600,  # 10 minutes hard limit
    task_soft_time_limit=540,  # 9 minutes soft limit
    worker_prefetch_multiplier=1,  # Fair distribution
    task_acks_late=True,  # Acknowledge after completion
    task_reject_on_worker_lost=True,
)
```

### Task Definition

```python
from celery import shared_task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
)
def send_email(self, to: str, subject: str, body: str) -> dict:
    """Send email with automatic retry."""
    try:
        response = requests.post(
            "https://api.sendgrid.com/v3/mail/send",
            json={"to": to, "subject": subject, "html": body},
            headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
            timeout=30,
        )
        response.raise_for_status()
        return {"status": "sent", "to": to}
    except Exception as exc:
        logger.error(f"Email failed: {exc}")
        raise self.retry(exc=exc)

@shared_task(bind=True)
def generate_report(self, report_id: str) -> dict:
    """Long-running report generation."""
    self.update_state(state="PROGRESS", meta={"step": "fetching"})
    data = fetch_report_data(report_id)

    self.update_state(state="PROGRESS", meta={"step": "rendering"})
    pdf = render_pdf(data)

    self.update_state(state="PROGRESS", meta={"step": "saving"})
    save_report(report_id, pdf)

    return {"report_id": report_id, "size": len(pdf)}
```

### Chains and Groups

```python
from celery import chain, group, chord

# Sequential execution
workflow = chain(
    extract_data.s(source_id),
    transform_data.s(),
    load_data.s(destination_id),
)
result = workflow.apply_async()

# Parallel execution
parallel = group(
    process_chunk.s(chunk) for chunk in chunks
)
result = parallel.apply_async()

# Parallel with callback
chord_workflow = chord(
    [process_chunk.s(chunk) for chunk in chunks],
    aggregate_results.s(),
)
result = chord_workflow.apply_async()
```

### Periodic Tasks (Celery Beat)

```python
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "cleanup-expired-sessions": {
        "task": "app.workers.tasks.cleanup_sessions",
        "schedule": crontab(minute=0, hour="*/6"),  # Every 6 hours
    },
    "generate-daily-report": {
        "task": "app.workers.tasks.daily_report",
        "schedule": crontab(minute=0, hour=2),  # 2 AM daily
    },
    "sync-external-data": {
        "task": "app.workers.tasks.sync_data",
        "schedule": 300.0,  # Every 5 minutes
    },
}
```

## FastAPI Integration

```python
from fastapi import BackgroundTasks

@router.post("/api/v1/users")
async def create_user(
    data: UserCreate,
    background_tasks: BackgroundTasks,
):
    user = await service.create_user(data)

    # Simple background task (in-process)
    background_tasks.add_task(send_welcome_email, user.email)

    return user

# For distributed tasks, use ARQ/Celery
@router.post("/api/v1/exports")
async def create_export(
    data: ExportRequest,
    arq: ArqRedis = Depends(get_arq_pool),
):
    job = await arq.enqueue_job("export_data", data.dict())
    return {"job_id": job.job_id}
```

## Job Status Tracking

```python
from enum import Enum

class JobStatus(Enum):
    PENDING = "pending"
    STARTED = "started"
    PROGRESS = "progress"
    SUCCESS = "success"
    FAILURE = "failure"
    REVOKED = "revoked"

@router.get("/api/v1/jobs/{job_id}")
async def get_job(job_id: str):
    # Celery
    result = AsyncResult(job_id, app=celery_app)
    return {
        "job_id": job_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
        "progress": result.info if result.status == "PROGRESS" else None,
    }
```

## Anti-Patterns (FORBIDDEN)

```python
# NEVER run long tasks synchronously
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest):
    pdf = await generate_pdf(data)  # Blocks for minutes!
    return pdf

# NEVER lose jobs on failure
@shared_task
def risky_task():
    do_work()  # No retry, no error handling

# NEVER store large results in Redis
@shared_task
def process_file(file_id: str) -> bytes:
    return large_file_bytes  # Store in S3/DB instead!

# NEVER use BackgroundTasks for distributed work
background_tasks.add_task(long_running_job)  # Lost if server restarts
```

## Key Decisions

| Decision | Recommendation |
|----------|----------------|
| Simple async | ARQ (native async) |
| Complex workflows | Celery (chains, chords) |
| In-process quick | FastAPI BackgroundTasks |
| LLM workflows | LangGraph (not Celery) |
| Result storage | Redis for status, S3/DB for data |
| Retry strategy | Exponential backoff with jitter |

## Related Skills

- `langgraph-checkpoints` - LLM workflow persistence
- `resilience-patterns` - Retry and fallback
- `observability-monitoring` - Job metrics

## Capability Details

### arq-tasks
**Keywords:** arq, async queue, redis queue, background task
**Solves:**
- How to run async background tasks in FastAPI?
- Simple Redis job queue

### celery-tasks
**Keywords:** celery, task queue, distributed tasks, worker
**Solves:**
- Enterprise task queue
- Complex job workflows

### celery-workflows
**Keywords:** chain, group, chord, celery workflow
**Solves:**
- Sequential task execution
- Parallel task processing

### periodic-tasks
**Keywords:** periodic, scheduled, cron, celery beat
**Solves:**
- Run tasks on schedule
- Cron-like job scheduling

Overview

This skill provides production-ready patterns for implementing background job processing in Python backends using ARQ and Celery with Redis. It explains when to pick lightweight async queues versus enterprise-grade workers, and includes examples for FastAPI integration, job status tracking, and periodic scheduling. The guidance focuses on reliable retries, result storage decisions, and anti-patterns to avoid lost or blocking work.

How this skill works

The skill inspects common background job needs—long-running tasks, email delivery, data pipelines, webhook processing, and scheduled jobs—and maps them to ARQ or Celery depending on complexity. It shows worker setup, task definitions, enqueueing from FastAPI, and monitoring job status using Redis and Celery backends. It also documents chains, groups, and chords for complex workflows and provides examples for periodic tasks and resource cleanup.

When to use it

  • Offload long-running operations (report generation, exports) to avoid blocking requests
  • Send emails, push notifications, or process webhooks asynchronously
  • Run scheduled or periodic maintenance jobs (cron-style)
  • Implement complex multi-step workflows with dependencies and parallelism
  • Use lightweight async queues for FastAPI native async tasks

Best practices

  • Choose ARQ for native async FastAPI workloads and simple Redis-backed jobs
  • Use Celery for enterprise workflows that need chains, chords, and advanced retry semantics
  • Store large artifacts (files, PDFs) in S3 or a database; keep Redis for status and small metadata
  • Use exponential backoff with jitter for retries and set sensible soft/hard time limits
  • Prefer task acks_late and task_reject_on_worker_lost to avoid lost work

Example use cases

  • Enqueue PDF report generation from a FastAPI endpoint and return job_id for polling
  • Send transactional emails with automatic retries and failure logging using Celery
  • Run parallel data-processing chunks with a Celery group and aggregate results with a chord
  • Schedule nightly data sync or cleanup jobs with Celery Beat crontab entries
  • Use ARQ worker context for shared resources (DB pools, HTTP clients) and limit max_jobs/job_timeout

FAQ

When should I prefer ARQ over Celery?

Use ARQ when your app is async-first (FastAPI), needs a low-complexity Redis queue, and you want native async task functions with minimal configuration.

Where should I store large task results?

Avoid storing big blobs in Redis. Persist files and large outputs to S3 or a database and store only references and status in Redis or the task backend.