Skip to content

Commit c918115

Browse files
committed
feat: Celery + Django adapters (v0.5.0)
1 parent 3fdce97 commit c918115

13 files changed

Lines changed: 414 additions & 10 deletions

File tree

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ jobs:
2424
with:
2525
python-version: ${{ matrix.python }}
2626

27-
- name: Install
27+
- name: Install (with Celery + Django adapters)
2828
run: |
2929
python -m pip install --upgrade pip
30-
pip install -e ".[dev]"
30+
pip install -e ".[dev,celery,django]"
3131
3232
- name: Run tests
3333
run: pytest

CHANGELOG.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,19 @@ The envelope wire format is versioned separately by `meta.schema_version`
99

1010
## [Unreleased]
1111

12+
## [0.5.0] - 2026-06-06
13+
14+
### Added
15+
- **Celery adapter** (`babelqueue.celery`, `[celery]` extra) — `from_celery(app)`
16+
builds a `BabelQueue` runtime on a Celery app's broker, and `install_worker(app)`
17+
registers a Celery worker bootstep that drains URN-routed polyglot messages in a
18+
background thread alongside Celery's own consumer.
19+
- **Django adapter** (`babelqueue.django`, `[django]` extra) — settings-driven
20+
`BABELQUEUE` config, `get_app()` / `publish()` shortcuts, and a
21+
`manage.py babelqueue_worker` management command. Add `"babelqueue.django"` to
22+
`INSTALLED_APPS`.
23+
- Both adapters lazy-import their framework, so the core stays dependency-free.
24+
1225
## [0.4.0] - 2026-06-06
1326

1427
### Added
@@ -55,7 +68,8 @@ The envelope wire format is versioned separately by `meta.schema_version`
5568
- Pre-1.0: the public API may change before the `1.0.0` tag.
5669
- The core has **zero runtime dependencies** (standard library only); Python `>=3.9`.
5770

58-
[Unreleased]: https://github.com/BabelQueue/babelqueue-python/compare/v0.4.0...HEAD
71+
[Unreleased]: https://github.com/BabelQueue/babelqueue-python/compare/v0.5.0...HEAD
72+
[0.5.0]: https://github.com/BabelQueue/babelqueue-python/compare/v0.4.0...v0.5.0
5973
[0.4.0]: https://github.com/BabelQueue/babelqueue-python/compare/v0.3.0...v0.4.0
6074
[0.3.0]: https://github.com/BabelQueue/babelqueue-python/compare/v0.2.0...v0.3.0
6175
[0.2.0]: https://github.com/BabelQueue/babelqueue-python/compare/v0.1.0...v0.2.0

README.md

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,51 @@ app.run() # consume forever (Ctrl-C to stop)
118118
`pika`, with the contract AMQP properties) and `memory://` (in-process, great for
119119
tests/local). Bring your own by passing `transport=...`.
120120

121-
> **Celery** / **Django** adapters are the next iterations.
121+
## Framework adapters — Celery & Django
122+
123+
**Celery** (`pip install "babelqueue[celery]"`) — reuse your Celery app's broker for
124+
polyglot interop, and consume inbound messages as a Celery worker bootstep:
125+
126+
```python
127+
from babelqueue.celery import from_celery, install_worker
128+
129+
bq = from_celery(celery_app, queue="orders") # runtime on Celery's broker
130+
bq.publish("urn:babel:orders:created", {"order_id": 1042})
131+
132+
@bq.handler("urn:babel:orders:created")
133+
def on_created(data, meta): ...
134+
135+
install_worker(celery_app, bq) # `celery worker` also drains URN messages
136+
```
137+
138+
**Django** (`pip install "babelqueue[django]"`) — add `"babelqueue.django"` to
139+
`INSTALLED_APPS` and configure a `BABELQUEUE` dict:
140+
141+
```python
142+
# settings.py
143+
BABELQUEUE = {"broker_url": "redis://localhost:6379/0", "queue": "orders", "dead_letter": True}
144+
```
145+
146+
```python
147+
from babelqueue.django import publish, get_app
148+
149+
publish("urn:babel:orders:created", {"order_id": 1042}) # in a view / signal
150+
151+
@get_app().handler("urn:babel:orders:created") # register handlers at startup
152+
def on_created(data, meta): ...
153+
```
154+
155+
```bash
156+
python manage.py babelqueue_worker --queue orders # run the consumer
157+
```
122158

123159
## What's here
124160

125-
The codec/contracts/dead-letter (zero-dep core) **and** the `BabelQueue` runtime
126-
above (in-memory built in; Redis via `[redis]`, RabbitMQ via `[amqp]`). For
127-
framework integration, the Celery and Django adapters are planned.
161+
The codec/contracts/dead-letter (zero-dep core), the `BabelQueue` runtime
162+
(in-memory built in; Redis via `[redis]`, RabbitMQ via `[amqp]`), and framework
163+
adapters for **Celery** (`[celery]`) and **Django** (`[django]`). Every layer
164+
speaks the one canonical envelope, so it interoperates with the PHP/Laravel,
165+
Symfony, Go, Node and .NET SDKs.
128166

129167
## Testing
130168

pyproject.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "babelqueue"
7-
version = "0.4.0"
7+
version = "0.5.0"
88
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
99
readme = "README.md"
1010
requires-python = ">=3.9"
@@ -28,9 +28,11 @@ classifiers = [
2828
dependencies = []
2929

3030
[project.optional-dependencies]
31-
# Planned runtime / adapters — standard, zero-heavy-dep drivers.
31+
# Optional runtime drivers + framework adapters — standard, zero-heavy-dep.
3232
redis = ["redis>=4"]
3333
amqp = ["pika>=1.3"]
34+
celery = ["celery>=5"]
35+
django = ["django>=4.2"]
3436
dev = ["pytest>=7"]
3537

3638
[project.urls]

src/babelqueue/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from .routing import UnknownUrnStrategy
2020
from .transport import InMemoryTransport, ReceivedMessage, Transport
2121

22-
__version__ = "0.4.0"
22+
__version__ = "0.5.0"
2323

2424
__all__ = [
2525
"BabelQueue",

src/babelqueue/celery.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""Celery integration. Requires the ``celery`` extra:
2+
3+
pip install "babelqueue[celery]"
4+
5+
A Celery app already configures a broker (Redis/RabbitMQ). :func:`from_celery`
6+
builds a :class:`~babelqueue.BabelQueue` runtime on that *same* broker, so a
7+
Celery-based service produces and consumes the canonical polyglot envelope
8+
alongside its Celery tasks — interoperating with the PHP/Laravel, Go, Node, ...
9+
SDKs. :func:`install_worker` runs that consumer as a Celery worker *bootstep* (a
10+
daemon thread started on ``celery worker``), so one process handles both Celery
11+
tasks and inbound polyglot messages.
12+
13+
``celery`` is imported lazily, so the core stays dependency-free.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import threading
19+
from typing import Any, Optional
20+
21+
from .app import BabelQueue
22+
from .exceptions import BabelQueueError
23+
24+
25+
def broker_url(celery_app: Any) -> str:
26+
"""Extract the broker URL from a Celery app (supports old/new config keys)."""
27+
conf = getattr(celery_app, "conf", None)
28+
url = None
29+
if conf is not None:
30+
url = getattr(conf, "broker_url", None)
31+
if not url and hasattr(conf, "get"):
32+
url = conf.get("broker_url") or conf.get("BROKER_URL")
33+
if not url:
34+
raise BabelQueueError(
35+
"The Celery app has no broker configured; set broker_url before calling from_celery()."
36+
)
37+
return str(url)
38+
39+
40+
def from_celery(celery_app: Any, **kwargs: Any) -> BabelQueue:
41+
"""Build a :class:`~babelqueue.BabelQueue` runtime on the Celery app's broker.
42+
43+
Extra keyword arguments are forwarded to ``BabelQueue`` (``queue``,
44+
``max_attempts``, ``dead_letter``, ``on_unknown_urn``, ...).
45+
"""
46+
return BabelQueue(broker_url(celery_app), **kwargs)
47+
48+
49+
def install_worker(
50+
celery_app: Any,
51+
babel_app: Optional[BabelQueue] = None,
52+
*,
53+
queue: Optional[str] = None,
54+
**kwargs: Any,
55+
) -> type:
56+
"""Register a Celery worker bootstep that consumes BabelQueue messages.
57+
58+
When a ``celery worker`` boots, the step starts a daemon thread running the
59+
BabelQueue consumer loop (URN routing, retry → dead-letter). If ``babel_app``
60+
is omitted it is built with :func:`from_celery`. Returns the bootstep class.
61+
"""
62+
from celery import bootsteps # lazy: only needed for this integration
63+
64+
app = babel_app if babel_app is not None else from_celery(celery_app, **kwargs)
65+
66+
class BabelQueueConsumerStep(bootsteps.StartStopStep):
67+
"""Runs the BabelQueue consumer loop alongside Celery's own consumer."""
68+
69+
def __init__(self, parent: Any, **options: Any) -> None:
70+
super().__init__(parent, **options)
71+
self._thread: Optional[threading.Thread] = None
72+
self._stop = threading.Event()
73+
74+
def start(self, parent: Any) -> None:
75+
def loop() -> None:
76+
while not self._stop.is_set():
77+
app.consume(queue, max_messages=1, timeout=1.0)
78+
79+
self._thread = threading.Thread(
80+
target=loop, name="babelqueue-consumer", daemon=True
81+
)
82+
self._thread.start()
83+
84+
def stop(self, parent: Any) -> None:
85+
self._stop.set()
86+
87+
celery_app.steps["worker"].add(BabelQueueConsumerStep)
88+
return BabelQueueConsumerStep

src/babelqueue/django/__init__.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
"""Django integration. Requires the ``django`` extra:
2+
3+
pip install "babelqueue[django]"
4+
5+
Add ``"babelqueue.django"`` to ``INSTALLED_APPS`` and configure a ``BABELQUEUE``
6+
settings dict::
7+
8+
BABELQUEUE = {
9+
"broker_url": "redis://localhost:6379/0",
10+
"queue": "orders",
11+
"max_attempts": 3,
12+
"dead_letter": True,
13+
}
14+
15+
Then publish from views/signals with :func:`publish`, register handlers on
16+
:func:`get_app`, and run the consumer with ``python manage.py babelqueue_worker``.
17+
The runtime is the shared :class:`~babelqueue.BabelQueue`, so messages interoperate
18+
with the PHP/Laravel, Go, Node, ... SDKs. ``django`` is imported lazily.
19+
"""
20+
21+
from __future__ import annotations
22+
23+
from typing import Any, Dict, Mapping, Optional
24+
25+
from ..app import BabelQueue
26+
27+
# Keys (besides broker_url) forwarded to the BabelQueue constructor.
28+
_APP_KWARGS = frozenset(
29+
{
30+
"queue",
31+
"on_unknown_urn",
32+
"max_attempts",
33+
"dead_letter",
34+
"dead_letter_queue",
35+
"dead_letter_suffix",
36+
"transport",
37+
}
38+
)
39+
40+
_app: Optional[BabelQueue] = None
41+
42+
43+
def _build() -> BabelQueue:
44+
from django.conf import settings # lazy
45+
46+
raw: Mapping[str, Any] = getattr(settings, "BABELQUEUE", {}) or {}
47+
kwargs: Dict[str, Any] = {k: v for k, v in raw.items() if k in _APP_KWARGS}
48+
broker = raw.get("broker_url", "memory://")
49+
return BabelQueue(broker, **kwargs)
50+
51+
52+
def get_app() -> BabelQueue:
53+
"""Return the process-wide :class:`~babelqueue.BabelQueue`, built from
54+
``settings.BABELQUEUE`` on first use."""
55+
global _app
56+
if _app is None:
57+
_app = _build()
58+
return _app
59+
60+
61+
def publish(urn: str, data: Mapping[str, Any], **kwargs: Any) -> str:
62+
"""Publish a message through the configured app; returns its id (``meta.id``)."""
63+
return get_app().publish(urn, dict(data), **kwargs)
64+
65+
66+
def reset() -> None:
67+
"""Drop the cached app so the next :func:`get_app` rebuilds it (tests / settings reload)."""
68+
global _app
69+
_app = None
70+
71+
72+
__all__ = ["get_app", "publish", "reset"]

src/babelqueue/django/apps.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from __future__ import annotations
2+
3+
from django.apps import AppConfig
4+
5+
6+
class BabelQueueConfig(AppConfig):
7+
"""Django app config for the BabelQueue adapter."""
8+
9+
name = "babelqueue.django"
10+
label = "babelqueue"
11+
verbose_name = "BabelQueue"

src/babelqueue/django/management/__init__.py

Whitespace-only changes.

src/babelqueue/django/management/commands/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)