Skip to content

KPouianou/GigQ

Repository files navigation

GigQ

Lightweight SQLite Job Queue

PyPI Python Versions License Build Status

GigQ is a Python job queue backed by SQLite. It fits teams and projects that have outgrown a raw for loop with try/except, but don't want to run Redis or any other broker. Define functions, enqueue them, and run one or more workers on any machine that can reach the database file.

from gigq import task, JobQueue, Worker

@task()
def greet(name="world"):
    return f"Hello, {name}!"

queue = JobQueue("jobs.db")
greet.submit(queue, name="Alice")
Worker("jobs.db").start()

You get retries with backoff, crash recovery (stuck work is reclaimed), and queryable status and results from the same DB the workers use.

When to use GigQ

Good fit

  • Local or single-server automation, ETL, scraping batches, ML prep, or any scriptable work you want off the request path
  • A few concurrent worker threads or processes against one SQLite file
  • You are fine storing queue state in a file next to your app

Not the right tool

  • Multi-datacenter orchestration, strict global ordering, or millions of jobs per second
  • When you already run Redis/Kafka/SQS and need their ecosystem

Workflows and parent_results

Wire @task functions into a Workflow, declare dependencies, and submit once. Dependent tasks can declare a parent_results argument; GigQ injects a dict of parent job id → deserialized result so fan-out and fan-in steps can pass data without going through job parameters.

from gigq import task, JobQueue, Workflow, Worker

@task()
def source():
    return {"items": [1, 2, 3]}

@task()
def branch_a(parent_results):
    n = next(iter(parent_results.values()))["items"]
    return {"branch": "a", "len": len(n)}

@task()
def branch_b(parent_results):
    n = next(iter(parent_results.values()))["items"]
    return {"branch": "b", "len": len(n)}

@task()
def merge(parent_results):
    return {"combined": list(parent_results.values())}

queue = JobQueue("jobs.db")
wf = Workflow("fan")
s = wf.add_task(source)
a = wf.add_task(branch_a, depends_on=[s])
b = wf.add_task(branch_b, depends_on=[s])
wf.add_task(merge, depends_on=[a, b])
wf.submit_all(queue)
Worker("jobs.db").start()

Installation

pip install gigq

CLI

The gigq command uses --db (default gigq.db) and a subcommand:

Command Purpose
gigq worker Run a worker; add --concurrency N for threaded workers
gigq list List jobs; optional --status pending (etc.)
gigq status <id> --show-result Inspect a job and its result
gigq stats Aggregate counts by status
gigq submit Enqueue by import path module.function
gigq cancel <id> Cancel a pending job
gigq requeue <id> Requeue a failed or timed-out job
gigq clear Delete completed and cancelled jobs

How it works

Jobs, dependencies, and results live in SQLite tables (jobs, job_executions). Workers claim work in transactions so only one worker runs a given job at a time; multiple threads or processes coordinate through the database file (WAL mode is enabled for less contention). Retries and timeouts are enforced in the worker loop.

Examples

Example Description
examples/parallel_tasks.py Fan-out workers plus a fan-in task using parent_results
examples/data_pipeline.py Linear pipeline: generate → transform → format via parent_results
examples/hyperparameter_tuning.py scikit-learn search with sequential vs parallel workers and crash recovery

Integrations

Documentation

Full guides and API reference: https://kpouianou.github.io/GigQ/

License

This project is licensed under the MIT License — see LICENSE.

Packages

 
 
 

Contributors

Languages