Skip to content

Sachini066/pgqueue

Repository files navigation

pgqueue

NestJS PostgreSQL TypeScript License: MIT

Lightweight PostgreSQL-backed job queue. No Redis or RabbitMQ needed - just your existing Postgres database. Uses FOR UPDATE SKIP LOCKED for safe concurrent processing.

Why pgqueue?

Most applications already have PostgreSQL. Adding Redis or RabbitMQ just for background jobs means more infrastructure to manage. pgqueue gives you reliable job processing using only Postgres, with features you'd expect from dedicated queue systems.

Features

  • SKIP LOCKED - Concurrent workers safely dequeue without conflicts
  • Priority Levels - Critical jobs processed first (0-20 scale)
  • Delayed/Scheduled Jobs - Process at a specific time or after a delay
  • Retry with Exponential Backoff - Automatic retries: 2s, 4s, 8s, 16s...
  • Dead Letter Queue - Failed jobs preserved for inspection and manual retry
  • Queue Statistics - Real-time counts by queue and status
  • REST API + Swagger - Full CRUD for job management
  • Docker Ready - docker-compose with PostgreSQL included

Quick Start

git clone https://github.com/Sachini066/pgqueue.git
cd pgqueue
cp .env.example .env
docker compose up -d

API at http://localhost:3000/docs

How It Works

The SKIP LOCKED Pattern

-- Multiple workers can run this concurrently without conflicts
UPDATE jobs SET status = 'running'
WHERE id = (
  SELECT id FROM jobs
  WHERE queue = 'emails' AND status = 'pending' AND run_at <= NOW()
  ORDER BY priority DESC, run_at ASC
  FOR UPDATE SKIP LOCKED  -- Skip rows locked by other workers
  LIMIT 1
)
RETURNING *;

This single query atomically selects AND claims the next job, while other workers skip already-claimed rows.

Job Lifecycle

pending → running → completed
                  ↘ failed → pending (retry with backoff)
                            ↘ dead (max retries exceeded)

API Endpoints

Method Endpoint Description
POST /jobs Create a new job
GET /jobs/:id Get job by ID
GET /jobs?queue=name Get queue statistics
GET /jobs/dead-letter/list List dead letter jobs
POST /jobs/dead-letter/:id/retry Retry a dead job
POST /jobs/maintenance/purge Purge old completed jobs

Create a Job

curl -X POST http://localhost:3000/jobs \
  -H 'Content-Type: application/json' \
  -d '{
    "queue": "email-notifications",
    "payload": { "to": "user@example.com", "subject": "Welcome" },
    "priority": 10,
    "maxRetries": 5
  }'

Create a Delayed Job

curl -X POST http://localhost:3000/jobs \
  -H 'Content-Type: application/json' \
  -d '{
    "queue": "reminders",
    "payload": { "message": "Follow up" },
    "delayMs": 3600000
  }'

Register Job Handlers

// In your module or main.ts
const queueService = app.get(QueueService);

queueService.registerHandler('email-notifications', async (job) => {
  const { to, subject } = job.payload;
  await sendEmail(to, subject);
});

queueService.registerHandler('image-processing', async (job) => {
  await resizeImage(job.payload.imageUrl);
}, 2000); // Poll every 2 seconds

Project Structure

src/
├── queue/
│   ├── dto/              # Input validation
│   ├── interfaces/       # Job types and interfaces
│   ├── queue.controller.ts   # REST endpoints
│   ├── queue.module.ts
│   ├── queue.repository.ts   # Raw SQL with SKIP LOCKED
│   ├── queue.service.ts      # Business logic
│   └── queue.worker.ts       # Polling and job execution
├── app.module.ts
└── main.ts

Testing

# Start test database
docker compose up -d db

# Run tests
npm test

Environment Variables

Variable Default Description
DB_HOST localhost PostgreSQL host
DB_PORT 5432 PostgreSQL port
DB_USERNAME postgres Database user
DB_PASSWORD postgres Database password
DB_NAME pgqueue Database name
PORT 3000 Application port

License

MIT

About

Lightweight PostgreSQL-backed job queue using SKIP LOCKED for concurrent worker safety

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors