Skip to content

adpena/reproq-django

Repository files navigation

Reproq Django πŸš€

License: Apache 2.0 Documentation

Deterministic Background Tasks for Django 6.0+ powered by Go.

Reproq is a production-grade tasks backend that combines the ease of Django with the performance and reliability of the Reproq Worker, a high-performance execution engine written in Go.


🀝 Relationship with Reproq Worker

Reproq is split into two specialized components:

  1. Reproq Django (this repo): Task definition, enqueueing, results, and the Admin dashboard.
  2. Reproq Worker: A Go binary that claims and executes tasks from Postgres.

Key Features

  • Postgres-Only: Uses SKIP LOCKED for high-performance claiming.
  • Deterministic: Each task has a spec_hash for deduplication.
  • Django Native: Implements the Django 6.0 Tasks API.
  • Periodic Tasks: Built-in scheduler with Django models.
  • Monitoring: TaskRuns + Workers in Django Admin.

βœ… Compatibility

Component Supported
Django 6.x (Django>=6.0,<7.0)
Python 3.12+ (dev/CI uses 3.12.x)
Reproq Worker Latest release (install/upgrade via python manage.py reproq install / upgrade)

Use python manage.py reproq doctor to verify schema, worker binary, and DSN alignment.


⚑ Quickstart

1. Install

uv pip install reproq-django
# or: pip install reproq-django

2. Configure Settings

Add reproq_django to your INSTALLED_APPS and configure the TASKS backend:

INSTALLED_APPS = [
    ...,
    "reproq_django",
]

TASKS = {
    "default": {
        "BACKEND": "reproq_django.backend.ReproqBackend",
        "OPTIONS": {
            "DEDUP_ACTIVE": True,
            "TIMEOUT_SECONDS": 900,
            "MAX_ATTEMPTS": 3,
        }
    }
}

3. Define a Task

Use the Django 6.0 Tasks API with a @task decorator:

from django.tasks import task

@task(queue_name="default", priority=0)
def send_welcome_email(user_id: int) -> str:
    # business logic here
    return f"Email sent to {user_id}"

4. Bootstrap (Recommended)

Bootstrap writes a config file, installs the worker binary, and runs both migration steps:

python manage.py reproq init

Use --format toml, --skip-install, --skip-migrate, or --skip-worker-migrate if you need a lighter touch.

5. Install the Worker (Standalone)

If you want only the worker binary:

python manage.py reproq install

This command detects your OS/Architecture and fetches the correct pre-built binary from GitHub. No Go installation required!

6. Run Migrations

python manage.py reproq migrate-worker
python manage.py migrate

Note: migrate-worker applies necessary Postgres optimizations (indexes, extensions) that Django migrations cannot handle. It also backfills task_path in batches and ensures the task_runs_task_path_not_empty check exists; new installs validate it immediately, while older installs can validate it later if desired.

7. Start the Worker

python manage.py reproq worker

8. (Optional) Schedule Periodic Tasks

You have two options (choose one):

Option A: Run Beat (one process per database)

python manage.py reproq beat

Option B: Use Postgres-native scheduling (pg_cron)

python manage.py reproq pg-cron --install

pg_cron requires the Postgres extension to be enabled (see docs/deployment.md).


🧰 Management Commands

Run python manage.py reproq <subcommand> to manage the worker and day-to-day ops. Full reference (examples + exit codes): docs/cli.md.

  • Bootstrap: init writes reproq.yaml/reproq.toml, installs the worker, and runs migrations.
  • Config: config --explain prints the effective config and its precedence.
  • Doctor: doctor --strict validates DSN, schema, worker binary, and allowlist.
  • Upgrade: upgrade fetches the latest worker release and optionally runs migrate-worker.
  • Allowlist: allowlist --write --config reproq.yaml populates allowed_task_modules.
  • Ops: status/stats, logs --id <result_id>, cancel --id <result_id>.

βœ… Why Reproq (vs Celery, RQ, Huey)

Reproq is built for teams who want deterministic background tasks without adding Redis or RabbitMQ.

  • No extra broker: Uses Postgres only; no Redis/RabbitMQ to provision.
  • Deterministic deduping: Identical tasks can be coalesced safely.
  • Django-native: Implements the Django 6.0 Tasks API end-to-end.
  • Operationally lean: One database + one Go worker binary.

If you need complex routing, multi-broker support, or huge existing Celery ecosystems, Celery may still fit better. Reproq prioritizes clarity, determinism, and low operational overhead.


πŸ“š API Reference

Reproq fully implements the Django 6.0 Tasks API.

Defining Tasks

Use the standard @task decorator. Reproq respects queue_name and priority.

from django.tasks import task

@task(queue_name="high-priority", priority=100)
def send_welcome_email(user_id):
    # logic here
    return f"Email sent to {user_id}"

You can also annotate a task with a concurrency limit:

from reproq_django.concurrency import limits_concurrency

@limits_concurrency(lambda user_id: f"user:{user_id}", to=1)
@task(queue_name="high-priority", priority=100)
def send_welcome_email(user_id):
    return f"Email sent to {user_id}"

Enqueuing Tasks

Use .enqueue() to dispatch tasks. Reproq supports additional arguments via kwargs.

# Standard Enqueue
result = send_welcome_email.enqueue(123)

# Scheduled Execution (run_after)
from datetime import timedelta
result = send_welcome_email.using(run_after=timedelta(minutes=10)).enqueue(123)

# Concurrency Control (lock_key)
# Ensure only one task with this key runs at a time
result = send_welcome_email.enqueue(123, lock_key=f"user_123_sync")

# Concurrency Control (limit per key)
# Allow up to N concurrent tasks sharing a key
result = send_welcome_email.enqueue(
    123,
    concurrency_key="user:123",
    concurrency_limit=2,
)

Supported enqueue kwargs (Reproq extensions):

  • run_after: datetime or timedelta. Delays execution.
  • lock_key: str. Prevents multiple tasks with the same key from being in RUNNING state simultaneously.
  • priority: int. Overrides the task's default priority for this enqueue only.
  • concurrency_key: str or callable. Limits concurrent tasks sharing the key.
  • concurrency_limit: int. Maximum concurrent tasks for the key (0 disables).

Reserved kwargs: run_after, lock_key, priority, concurrency_key, and concurrency_limit are treated as scheduling metadata and are removed from task kwargs. If your task needs parameters with these names, rename them.

Note on Priority: Task priority is set at definition time via @task(priority=...) and can be overridden per call via enqueue(priority=...).

Async Contexts (ASGI)

If you are in an async view or task producer, use aenqueue() to avoid blocking:

result = await send_welcome_email.aenqueue(123)

In sync/Wsgi code, continue to use enqueue().

Task Context + Metadata

If your task needs context (result id, attempt, metadata), use takes_context=True.

from django.tasks import task

@task(takes_context=True)
def process_upload(context, upload_id):
    context.metadata["stage"] = "starting"
    context.save_metadata()
    # work...
    context.metadata["stage"] = "done"
    context.save_metadata()

Metadata is stored in task_runs.metadata_json and surfaced in task results.

Bulk Enqueuing

For high-throughput scenarios, use bulk_enqueue to insert thousands of tasks in a single query.

from django.tasks import tasks
from datetime import timedelta

backend = tasks["default"]
jobs = []

for i in range(1000):
    # (task_func, args, kwargs)
    jobs.append((
        send_welcome_email,
        (i,),
        {"lock_key": f"user_{i}", "run_after": timedelta(seconds=i)}
    ))

backend.bulk_enqueue(jobs)

⏰ Periodic Tasks

Reproq stores schedules in the PeriodicTask model. You must run exactly one scheduler per database:

  • Beat: python manage.py reproq beat
  • pg_cron: python manage.py reproq pg-cron --install (Postgres-native)

If you choose pg_cron, run reproq pg-cron --install after every deploy or migration to keep schedules in sync. On managed platforms, prefer --if-supported so deploys do not fail if pg_cron is unavailable.

If you cannot run a long-lived beat process (low-memory environments), run beat as a one-shot command from crontab every minute:

* * * * * /path/to/venv/bin/python manage.py reproq beat --once
* * * * * /path/to/venv/bin/python manage.py reproq schedule

Create a Schedule (Admin or ORM)

You can manage schedules in the Django Admin under "Reproq Django" or via code.

from django.utils import timezone
from reproq_django.models import PeriodicTask

PeriodicTask.objects.update_or_create(
    name="Nightly cleanup",
    defaults={
        "cron_expr": "0 2 * * *",
        "task_path": "myapp.tasks.nightly_cleanup",
        "queue_name": "maintenance",
        "next_run_at": timezone.now(),
        "enabled": True,
    },
)

Seed Schedules in Code (post_migrate)

This pattern keeps schedules in sync across environments without migrations.

from django.apps import AppConfig
from django.db import connections
from django.db.models.signals import post_migrate
from django.utils import timezone
from reproq_django.models import PeriodicTask

def _setup_periodic_tasks(**kwargs):
    using = kwargs.get("using")
    connection = connections[using]
    if "periodic_tasks" not in connection.introspection.table_names():
        return

    PeriodicTask.objects.update_or_create(
        name="Nightly cleanup",
        defaults={
            "cron_expr": "0 2 * * *",
            "task_path": "myapp.tasks.nightly_cleanup",
            "queue_name": "maintenance",
            "next_run_at": timezone.now(),
            "enabled": True,
        },
    )

class MyAppConfig(AppConfig):
    name = "myapp"

    def ready(self) -> None:
        post_migrate.connect(_setup_periodic_tasks, sender=self)

Run Now (Ad Hoc)

Use the ORM to force a schedule to run, or enqueue the task directly.

from django.utils import timezone
from reproq_django.models import PeriodicTask
from myapp.tasks import nightly_cleanup

PeriodicTask.objects.filter(name="Nightly cleanup").update(
    next_run_at=timezone.now()
)

# Or bypass the schedule
nightly_cleanup.enqueue()

Notes

  • cron_expr uses standard 5-field cron syntax: min hour day month weekday.
  • task_path must be the full Python import path for the task.
  • queue_name is optional; when set, the worker must listen on that queue.
  • payload_json should use the {"args": [...], "kwargs": {...}} envelope, with values encoded the same way as task enqueues (e.g., timedeltas/models are supported).
  • Ensure the task module is allowlisted when ALLOWED_TASK_MODULES is used.

Recurring Tasks in Code

Use the @recurring decorator to keep periodic schedules in source control.

from django.tasks import task
from reproq_django.recurring import recurring

@recurring(schedule="0 9 * * *", key="daily_report", args=(42,))
@task(queue_name="maintenance")
def send_report(account_id):
    ...

Sync code-defined schedules with:

python manage.py reproq sync-recurring

By default, recurring tasks auto-sync on post_migrate. Set REPROQ_RECURRING_AUTOSYNC=False to disable.


βš™οΈ Configuration

Configure Reproq behavior via the TASKS setting in settings.py.

TASKS = {
    "default": {
        "BACKEND": "reproq_django.backend.ReproqBackend",
        "OPTIONS": {
            # Deduplication (Default: True)
            # If True, enqueuing a task with the exact same arguments as a 
            # READY/RUNNING task will return the existing result_id.
            "DEDUP_ACTIVE": True,

            # Execution Timeout (Default: 900)
            # Max seconds a task can run before being killed by the worker.
            "TIMEOUT_SECONDS": 900,

            # Retry Limit (Default: 3)
            # Max number of attempts for a task.
            "MAX_ATTEMPTS": 3,

            # Expiry (Optional)
            # If set, tasks not picked up by this time will be marked expired.
            "EXPIRES_IN": timedelta(hours=24),
            
            # Provenance (Optional)
            # Metadata stored with the task for auditing.
            "CODE_REF": "git-sha-or-version",
            "PIP_LOCK_HASH": "hash-of-dependencies",
        }
    }
}

Worker config files (reproq.yaml/reproq.toml) are optional. Precedence is: defaults < config file < env vars < CLI flags. --dsn overrides DATABASE_URL, and DATABASE_URL is optional when flags or a config file are provided.

Multi-Database Queues

Route specific queues to different Django database aliases.

DATABASES = {
    "default": {...},
    "queues": {...},
}

REPROQ_QUEUE_DATABASES = {
    "high-priority": "queues",
    "bulk-*": "queues",  # glob patterns supported
}

Use REPROQ_DEFAULT_DB_ALIAS to change the fallback alias (defaults to "default"). When multiple queue databases are configured, task result IDs are prefixed with the database alias (queues:123). Set REPROQ_RESULT_ID_WITH_ALIAS=False to force legacy IDs. To route all Reproq tables to a non-default alias in ORM reads/writes (admin, health checks, etc.), add the router:

DATABASE_ROUTERS = ["reproq_django.db_router.ReproqRouter"]

Run one worker/beat per database:

python manage.py reproq worker --database queues --queues high-priority,bulk-1
python manage.py reproq beat --database queues

Use REPROQ_STATS_DATABASES=["*"] to aggregate stats across all queue databases.


πŸ§ͺ Development

We standardize on Python 3.12.x for local development.

bash scripts/dev_bootstrap.sh
uv run pytest

Always run tests with uv run pytest so dependencies and settings stay consistent.


β›“ Workflows (Chains & Groups)

Reproq supports complex task dependencies.

Chains (Sequential)

Execute tasks one after another. If a task fails, the chain stops.

from reproq_django.workflows import chain

# task_a -> task_b -> task_c
c = chain(
    (task_a, (1,), {}),
    task_b, # no args
    (task_c, (), {"param": "val"})
)
results = c.enqueue()
# results[0] is READY, results[1..] are WAITING

Groups (Parallel)

Execute tasks in parallel.

from reproq_django.workflows import group

g = group(
    (resize_image, ("img1.jpg",), {}),
    (resize_image, ("img2.jpg",), {}),
)
results = g.enqueue()

Chords (Group + Callback)

Run a callback once a group finishes.

from reproq_django.workflows import chord

group_results, callback_result = chord(
    (resize_image, ("img1.jpg",), {}),
    (resize_image, ("img2.jpg",), {}),
    callback=notify_done,
).enqueue()

The callback runs only after all group tasks succeed. Failed tasks leave the callback waiting. Failures mark the workflow as failed and the callback will not run.


πŸ” Retries & Backoff

Retries are managed by the Go worker. When a task fails and attempts remain, it is re-queued with an exponential backoff:

  • Base delay: 30s
  • Backoff: 2^attempt (attempt starts at 1)
  • Cap: 1 hour

The worker updates run_after on the failed task, and the backend will only claim it after that timestamp.


🚦 Rate Limiting

Reproq Worker enforces token bucket limits stored in the rate_limits table. You can manage these from Django Admin or via the worker CLI.

Keys:

  • queue:<queue_name> limits a specific queue.
  • task:<task_path> limits a specific task (overrides queue/global).
  • global is a fallback when no task/queue limit exists.

Example:

reproq limit set --key queue:default --rate 5 --burst 10

Defaults: global rate limiting is disabled until you set a positive rate.


🧰 Worker CLI Ops

The Go worker binary includes operational commands you can run directly:

# Request cancellation of a running task
reproq cancel --dsn "..." --id 12345

# Inspect failed tasks
reproq triage list --dsn "..." --limit 50

πŸ–₯ Management Commands

The python manage.py reproq command is your Swiss Army knife.

Subcommand Description
init Bootstraps Reproq in the current project.
worker Starts the Go worker. Flags: --config, --concurrency (default 10), --queues, --allowed-task-modules, --logs-dir, --payload-mode, --metrics-port, --metrics-addr, --metrics-auth-token, --metrics-allow-cidrs, --metrics-tls-cert, --metrics-tls-key, --metrics-tls-client-ca, --database. Auto-configures allow-list when unset (unless config file is used).
beat Starts the scheduler. Flags: --config, --interval (default 30s), --once, --database.
schedule Enqueue due periodic tasks once and exit (alias for beat --once).
pg-cron Syncs Postgres-native schedules. Flags: --install (default), --remove, --prefix, --dry-run, --database.
install Downloads/builds the worker binary.
upgrade Upgrades the worker binary and optionally runs migrate-worker.
migrate-worker Applies essential SQL schema optimizations (indexes, extensions).
check Validates binary path, DB connection, and schema health.
doctor Validates DSN, schema, worker binary, and allowlist; --strict fails on warnings.
config Prints effective worker/beat config; use --explain for precedence.
allowlist Prints ALLOWED_TASK_MODULES or writes them to a config file with --write.
sync-recurring Sync code-defined recurring schedules into periodic_tasks.
pause-queue Pause a queue (prevents new claims).
resume-queue Resume a paused queue.
logs Prints logs for a task run using logs_uri (supports aliased result IDs).
cancel Requests cancellation of a task run by result ID (supports aliased result IDs).
reclaim Requeue or fail tasks with expired leases (--database / --all-databases).
prune-workers Delete workers not seen recently (--database / --all-databases).
prune-successful Delete successful task runs older than a cutoff (--database / --all-databases).
prune Delete task runs by status/age (--database / --all-databases).
stats / status Shows task counts by status and active workers (--database / --all-databases).
systemd Generates systemd service files for production.
stress-test Enqueues dummy tasks for benchmarking.

πŸ“Š Stats API (JSON)

When you include reproq_django.urls in your project, GET /stats/ returns JSON task counts, per-queue task counts, worker records, and periodic task schedules. Access is granted to staff sessions, a signed TUI JWT, or METRICS_AUTH_TOKEN as a bearer token. Additional fields include worker_health, queue_controls, scheduler, and per-database rollups when REPROQ_STATS_DATABASES is set.

🧭 TUI Integration

Set METRICS_AUTH_TOKEN to enable the TUI login flow and sign TUI JWTs. This token is also accepted as a bearer token for /reproq/stats/ and is forwarded to the worker metrics proxy endpoints. If you do not want to expose SSE, set REPROQ_TUI_DISABLE_EVENTS=1 to omit the /reproq/tui/events/ stream from the TUI config payload. If METRICS_AUTH_TOKEN is unset, the TUI auth endpoints return 404.

Set LOW_MEMORY_MODE=1 to disable the worker proxy endpoints (/reproq/tui/metrics/, /reproq/tui/healthz/, /reproq/tui/events/). The endpoints return a 503 with a short hint while low-memory mode is enabled, and events are omitted from the TUI config payload.

Set REPROQ_MEMORY_LOG_INTERVAL=60s to emit periodic memory logs from the Django process (includes RSS on Linux).

Pass the TUI JWT in the Authorization: Bearer ... header when accessing these endpoints programmatically.


🧾 Worker/Beat Config Files

The Go worker/beat support YAML/TOML config files. manage.py reproq worker and manage.py reproq beat will load a config file when --config or REPROQ_CONFIG is set. If no worker/beat flags are provided, they also look for reproq.yaml, reproq.yml, reproq.toml, .reproq.yaml, .reproq.yml, or .reproq.toml in the current working directory. CLI flags override config values; environment variables override config values too. --dsn always overrides DATABASE_URL, and DATABASE_URL is optional when a config file or flags are provided.

See reproq.example.yaml and reproq.example.toml for full templates.

Queue selection uses --queues (comma-separated). The legacy --queue flag remains for compatibility but is deprecated.


βš–οΈ Scaling: Workers vs Concurrency

  • Increase concurrency (--concurrency): more goroutines in a single worker process; best for I/O-heavy tasks with minimal overhead.
  • Run multiple workers: separate processes/hosts; best for CPU-heavy workloads and fault isolation.

Rule of thumb: start with 1-2 workers per host and tune --concurrency to available CPU cores and workload type.


πŸ” Admin Dashboard

Reproq integrates deeply with the Django Admin.

  • Task Runs: View all tasks. Filter by status, queue, or lease state.
  • Actions:
    • Replay: Select tasks to re-enqueue them (creates a new copy).
    • Retry Failed: Reset failed tasks to READY.
    • Cancel: Request cancellation of running/ready tasks.
  • Workers: Monitor active worker nodes, their concurrency, and last heartbeat.
  • Periodic Tasks: Create, enable/disable, and edit cron schedules. Set next_run_at to run a job immediately.
  • Status Note: Non-standard statuses like WAITING or CANCELLED map to PENDING/CANCELLED when supported by Django's TaskResultStatus. The original value is always available via raw_status.

πŸš€ Production Deployment

Recommended Setup (Systemd)

Generate service files to run the worker and beat processes as background daemons.

python manage.py reproq systemd --user myuser --concurrency 20

This generates reproq-worker.service and reproq-beat.service. Copy them to /etc/systemd/system/ and enable them. You can pass metrics flags (for example --metrics-addr 127.0.0.1:9090) or use --env-file to load METRICS_AUTH_TOKEN and METRICS_ALLOW_CIDRS.

To use cron-style scheduling instead of a persistent beat process, generate a timer:

python manage.py reproq systemd --schedule --schedule-on-calendar "*-*-* *:*:00"

Set REPROQ_SCHEDULER_MODE=cron in your EnvironmentFile and enable the timer to ensure only one scheduler runs.

Env Vars

The Go worker relies on standard environment variables:

  • DATABASE_URL: postgres://user:pass@host:5432/db (optional if you provide --dsn or a config file)
  • WORKER_ID: (Optional) Unique name for the node.
  • REPROQ_WORKER_BIN: (Optional) Path to the binary if not using manage.py reproq install.
  • REPROQ_CONFIG: (Optional) Path to a YAML/TOML worker/beat config file.
  • ALLOWED_TASK_MODULES: (Optional) Comma-separated task module allow-list for the worker. If unset, manage.py reproq worker auto-configures it from discovered task modules.
  • REPROQ_LOGS_DIR: (Optional) Directory to persist worker stdout/stderr logs (updates task_runs.logs_uri).
  • REPROQ_MEMORY_LOG_INTERVAL: (Optional) Log memory stats at the given interval (e.g., 60s).
  • METRICS_AUTH_TOKEN: (Optional) Bearer token for /metrics, /healthz, and /events.
  • METRICS_ALLOW_CIDRS: (Optional) Comma-separated IP/CIDR allow-list for metrics/health.
  • METRICS_TLS_CERT: (Optional) TLS certificate path for health/metrics.
  • METRICS_TLS_KEY: (Optional) TLS private key path for health/metrics.
  • METRICS_TLS_CLIENT_CA: (Optional) Client CA bundle to require mTLS for health/metrics.

If DATABASE_URL is not set, manage.py reproq worker derives a DSN from settings.DATABASES["default"].

Worker binary resolution order:

  1. REPROQ_WORKER_BIN (setting or env)
  2. ./.reproq/bin/reproq (installed by reproq install)
  3. reproq_django/bin/reproq (packaged fallback)
  4. PATH

🀝 Contributing

Reproq is split into two repos:

  • Reproq Django: This repo (Python/Django logic).
  • Reproq Worker: The Go execution engine.

Issues and PRs are welcome in both!

License

Apache License 2.0. See LICENSE.

About

Django tasks backend with Postgres-native queueing and deterministic execution.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors