home / skills / openclaw / skills / postgres-job-queue

postgres-job-queue skill

/skills/wpank/postgres-job-queue

This skill helps you manage a PostgreSQL-based job queue with priority, batch claiming, and progress tracking for reliable background work.

npx playbooks add skill openclaw/skills --skill postgres-job-queue

Review the files below or copy the command above to add this skill to your agents.

Files (3)
SKILL.md
5.3 KB
---
name: postgres-job-queue
model: standard
description: PostgreSQL-based job queue with priority scheduling, batch claiming, and progress tracking. Use when building job queues without external dependencies. Triggers on PostgreSQL job queue, background jobs, task queue, priority queue, SKIP LOCKED.
---

# PostgreSQL Job Queue

Production-ready job queue using PostgreSQL with priority scheduling, batch claiming, and progress tracking.

---

## When to Use

- Need job queue but want to avoid Redis/RabbitMQ dependencies
- Jobs need priority-based scheduling
- Long-running jobs need progress visibility
- Jobs should survive service restarts

---

## Schema Design

```sql
CREATE TABLE jobs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    job_type VARCHAR(50) NOT NULL,
    priority INT NOT NULL DEFAULT 100,
    status VARCHAR(20) NOT NULL DEFAULT 'pending',
    data JSONB NOT NULL DEFAULT '{}',
    
    -- Progress tracking
    progress INT DEFAULT 0,
    current_stage VARCHAR(100),
    events_count INT DEFAULT 0,
    
    -- Worker tracking
    worker_id VARCHAR(100),
    claimed_at TIMESTAMPTZ,
    
    -- Timing
    created_at TIMESTAMPTZ DEFAULT NOW(),
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    
    -- Retry handling
    attempts INT DEFAULT 0,
    max_attempts INT DEFAULT 3,
    last_error TEXT,
    
    CONSTRAINT valid_status CHECK (
        status IN ('pending', 'claimed', 'running', 'completed', 'failed', 'cancelled')
    )
);

-- Critical: Partial index for fast claiming
CREATE INDEX idx_jobs_claimable ON jobs (priority DESC, created_at ASC) 
    WHERE status = 'pending';
CREATE INDEX idx_jobs_worker ON jobs (worker_id) 
    WHERE status IN ('claimed', 'running');
```

---

## Batch Claiming with SKIP LOCKED

```sql
CREATE OR REPLACE FUNCTION claim_job_batch(
    p_worker_id VARCHAR(100),
    p_job_types VARCHAR(50)[],
    p_batch_size INT DEFAULT 10
) RETURNS SETOF jobs AS $$
BEGIN
    RETURN QUERY
    WITH claimable AS (
        SELECT id
        FROM jobs
        WHERE status = 'pending'
          AND job_type = ANY(p_job_types)
          AND attempts < max_attempts
        ORDER BY priority DESC, created_at ASC
        LIMIT p_batch_size
        FOR UPDATE SKIP LOCKED  -- Critical: skip locked rows
    ),
    claimed AS (
        UPDATE jobs
        SET status = 'claimed',
            worker_id = p_worker_id,
            claimed_at = NOW(),
            attempts = attempts + 1
        WHERE id IN (SELECT id FROM claimable)
        RETURNING *
    )
    SELECT * FROM claimed;
END;
$$ LANGUAGE plpgsql;
```

---

## Go Implementation

```go
const (
    PriorityExplicit   = 150  // User-requested
    PriorityDiscovered = 100  // System-discovered
    PriorityBackfill   = 30   // Background backfills
)

type JobQueue struct {
    db       *pgx.Pool
    workerID string
}

func (q *JobQueue) Claim(ctx context.Context, types []string, batchSize int) ([]Job, error) {
    rows, err := q.db.Query(ctx,
        "SELECT * FROM claim_job_batch($1, $2, $3)",
        q.workerID, types, batchSize,
    )
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var jobs []Job
    for rows.Next() {
        var job Job
        if err := rows.Scan(&job); err != nil {
            return nil, err
        }
        jobs = append(jobs, job)
    }
    return jobs, nil
}

func (q *JobQueue) Complete(ctx context.Context, jobID uuid.UUID) error {
    _, err := q.db.Exec(ctx, `
        UPDATE jobs 
        SET status = 'completed',
            progress = 100,
            completed_at = NOW()
        WHERE id = $1`,
        jobID,
    )
    return err
}

func (q *JobQueue) Fail(ctx context.Context, jobID uuid.UUID, errMsg string) error {
    _, err := q.db.Exec(ctx, `
        UPDATE jobs 
        SET status = CASE 
                WHEN attempts >= max_attempts THEN 'failed' 
                ELSE 'pending' 
            END,
            last_error = $2,
            worker_id = NULL,
            claimed_at = NULL
        WHERE id = $1`,
        jobID, errMsg,
    )
    return err
}
```

---

## Stale Job Recovery

```go
func (q *JobQueue) RecoverStaleJobs(ctx context.Context, timeout time.Duration) (int, error) {
    result, err := q.db.Exec(ctx, `
        UPDATE jobs 
        SET status = 'pending',
            worker_id = NULL,
            claimed_at = NULL
        WHERE status IN ('claimed', 'running')
          AND claimed_at < NOW() - $1::interval
          AND attempts < max_attempts`,
        timeout.String(),
    )
    if err != nil {
        return 0, err
    }
    return int(result.RowsAffected()), nil
}
```

---

## Decision Tree

| Scenario | Approach |
|----------|----------|
| Need guaranteed delivery | PostgreSQL queue |
| Need sub-ms latency | Use Redis instead |
| < 1000 jobs/sec | PostgreSQL is fine |
| > 10000 jobs/sec | Add Redis layer |
| Need strict ordering | Single worker per type |

---

## Related Skills

- **Related:** [service-layer-architecture](../service-layer-architecture/) — Service patterns for job handlers
- **Related:** [realtime/dual-stream-architecture](../../realtime/dual-stream-architecture/) — Event publishing from jobs

---

## NEVER Do

- **NEVER use SELECT then UPDATE** — Race condition. Use SKIP LOCKED.
- **NEVER claim without SKIP LOCKED** — Workers will deadlock.
- **NEVER store large payloads** — Store references only.
- **NEVER forget partial index** — Claiming is slow without it.

Overview

This skill implements a production-ready PostgreSQL-based job queue with priority scheduling, batch claiming using SKIP LOCKED, and progress tracking. It is designed for services that prefer to avoid external queue brokers while retaining reliable delivery and visibility into long-running tasks. The implementation includes schema design, claim/complete/fail operations, stale-job recovery, and indexing guidance for performance.

How this skill works

Jobs are stored in a jobs table with fields for priority, status, progress, worker ownership, attempts, and timestamps. Workers claim batches via a PL/pgSQL function that selects pending jobs ordered by priority and created_at, using FOR UPDATE SKIP LOCKED to avoid race conditions. Workers update status to running/completed/failed and increment attempts; a recovery routine reverts stale claimed or running jobs back to pending when a worker times out.

When to use it

  • You need a job queue but want to avoid Redis or RabbitMQ dependencies.
  • Jobs require priority-based scheduling and ordering guarantees.
  • Long-running tasks need progress and stage visibility.
  • Jobs must survive service restarts and be recoverable.
  • Workload is moderate (under ~1000 jobs/sec) where Postgres is sufficient.

Best practices

  • Use a partial index on pending jobs (priority DESC, created_at ASC) to make claiming fast.
  • Always claim rows with FOR UPDATE SKIP LOCKED to prevent races and deadlocks.
  • Store large payloads outside the jobs table and keep job data lightweight (store references).
  • Increment attempts when claiming and honor max_attempts to avoid infinite retries.
  • Implement stale-job recovery to reset claimed jobs after a timeout rather than letting them remain locked.

Example use cases

  • Background processing for web services where adding Redis is undesirable.
  • Priority-driven task runners (e.g., user-triggered vs. background backfills).
  • Long-running media processing jobs that report progress and stages.
  • Service architectures that require guaranteed delivery with transactional visibility.
  • Small-to-medium scale batch processing where single-node Postgres is available.

FAQ

How do workers avoid claiming the same job concurrently?

The claim function uses SELECT ... FOR UPDATE SKIP LOCKED in a CTE so locked rows are skipped and only one worker can claim each job.

What happens when a worker crashes while processing a job?

A recovery routine updates jobs that have been claimed or running longer than a configured timeout back to pending and clears worker_id so they can be reclaimed.