Skip to content

Commit 3461ab3

Browse files
committed
feat: RabbitMQ (pika) transport
1 parent 90f2a7d commit 3461ab3

8 files changed

Lines changed: 251 additions & 18 deletions

File tree

.github/workflows/ci.yml

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,15 @@ jobs:
4545
--health-interval 5s
4646
--health-timeout 3s
4747
--health-retries 10
48+
rabbitmq:
49+
image: rabbitmq:3
50+
ports:
51+
- 5672:5672
52+
options: >-
53+
--health-cmd "rabbitmq-diagnostics -q ping"
54+
--health-interval 10s
55+
--health-timeout 5s
56+
--health-retries 15
4857
steps:
4958
- uses: actions/checkout@v4
5059

@@ -53,12 +62,13 @@ jobs:
5362
with:
5463
python-version: '3.12'
5564

56-
- name: Install (with redis extra)
65+
- name: Install (with redis + amqp extras)
5766
run: |
5867
python -m pip install --upgrade pip
59-
pip install -e ".[redis,dev]"
68+
pip install -e ".[redis,amqp,dev]"
6069
61-
- name: Run tests (Redis transport included)
70+
- name: Run tests (Redis + RabbitMQ transports included)
6271
env:
6372
BABELQUEUE_TEST_REDIS: redis://localhost:6379/0
73+
BABELQUEUE_TEST_AMQP: amqp://guest:guest@localhost:5672/
6474
run: pytest

CHANGELOG.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,23 @@ The envelope wire format is versioned separately by `meta.schema_version`
99

1010
## [Unreleased]
1111

12+
### Added
13+
- **RabbitMQ transport** (`PikaTransport`, `amqp://`): durable queue, persistent
14+
delivery, `basic_get` + manual ack, and the contract AMQP properties (`type`=URN,
15+
`correlation_id`=trace_id, `x-schema-version`/`x-source-lang`/`x-attempts`).
16+
Optional `[amqp]` extra (lazy `pika` import) — the core stays zero-dep.
17+
18+
## [0.2.0] - 2026-06-06
19+
1220
### Added
1321
- **Runtime**`BabelQueue(broker_url=...)` app with a `@app.handler("urn:...")`
1422
decorator, `publish()`, and a `consume()` / `run()` loop. Routes by URN over the
1523
canonical envelope; `attempts`-based retry → opt-in dead-letter queue;
1624
`on_unknown_urn` strategies (`fail`/`delete`/`release`/`dead_letter`).
1725
- **Transports** — a pluggable `Transport` abstraction with `InMemoryTransport`
1826
(`memory://`, for tests/local) and `RedisTransport` (`redis://`, reliable-queue
19-
pattern via `BLMOVE` + a processing list). Redis client is an optional extra
20-
(`pip install "babelqueue[redis]"`), imported lazily — the core stays zero-dep.
27+
pattern via `BLMOVE` + a processing list). Redis client is an optional `[redis]`
28+
extra, imported lazily — the core stays zero-dep.
2129

2230
## [0.1.0] - 2026-06-06
2331

@@ -36,5 +44,6 @@ The envelope wire format is versioned separately by `meta.schema_version`
3644
- Pre-1.0: the public API may change before the `1.0.0` tag.
3745
- The core has **zero runtime dependencies** (standard library only); Python `>=3.9`.
3846

39-
[Unreleased]: https://github.com/BabelQueue/babelqueue-python/compare/v0.1.0...HEAD
47+
[Unreleased]: https://github.com/BabelQueue/babelqueue-python/compare/v0.2.0...HEAD
48+
[0.2.0]: https://github.com/BabelQueue/babelqueue-python/compare/v0.1.0...v0.2.0
4049
[0.1.0]: https://github.com/BabelQueue/babelqueue-python/releases/tag/v0.1.0

README.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,19 @@ dlq = dead_letter.annotate(envelope, "failed", "orders", attempts=3, error="boom
8282

8383
## Runtime — produce & consume
8484

85-
For an end-to-end app, use `BabelQueue` with a broker. Redis support comes via an
86-
extra:
85+
For an end-to-end app, use `BabelQueue` with a broker. Broker clients come via
86+
extras:
8787

8888
```bash
89-
pip install "babelqueue[redis]"
89+
pip install "babelqueue[redis]" # redis://
90+
pip install "babelqueue[amqp]" # amqp:// (RabbitMQ)
9091
```
9192

9293
```python
9394
from babelqueue import BabelQueue
9495

9596
app = BabelQueue("redis://localhost:6379/0", queue="orders")
97+
# or: BabelQueue("amqp://guest:guest@localhost:5672/", queue="orders")
9698

9799
@app.handler("urn:babel:orders:created")
98100
def on_order_created(data, meta): # AI/ML, data processing, anything
@@ -112,16 +114,17 @@ app.run() # consume forever (Ctrl-C to stop)
112114
- **Retry & dead-letter:** failures are retried up to `max_attempts` (bumping the
113115
envelope's `attempts`); enable `dead_letter=True` to quarantine exhausted
114116
messages on `<queue>.dlq`. `on_unknown_urn` = `fail` | `delete` | `release` | `dead_letter`.
115-
- **Transports:** `redis://` (reliable-queue pattern) and `memory://` (in-process,
116-
great for tests/local). Bring your own by passing `transport=...`.
117+
- **Transports:** `redis://` (reliable-queue pattern), `amqp://` (RabbitMQ via
118+
`pika`, with the contract AMQP properties) and `memory://` (in-process, great for
119+
tests/local). Bring your own by passing `transport=...`.
117120

118-
> RabbitMQ (`pika`) and **Celery** / **Django** adapters are the next iterations.
121+
> **Celery** / **Django** adapters are the next iterations.
119122
120123
## What's here
121124

122125
The codec/contracts/dead-letter (zero-dep core) **and** the `BabelQueue` runtime
123-
above (in-memory built in; Redis via the `[redis]` extra). For framework
124-
integration, the Celery and Django adapters are planned.
126+
above (in-memory built in; Redis via `[redis]`, RabbitMQ via `[amqp]`). For
127+
framework integration, the Celery and Django adapters are planned.
125128

126129
## Testing
127130

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "babelqueue"
7-
version = "0.2.0"
7+
version = "0.3.0"
88
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
99
readme = "README.md"
1010
requires-python = ">=3.9"

src/babelqueue/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from .routing import UnknownUrnStrategy
2020
from .transport import InMemoryTransport, ReceivedMessage, Transport
2121

22-
__version__ = "0.2.0"
22+
__version__ = "0.3.0"
2323

2424
__all__ = [
2525
"BabelQueue",

src/babelqueue/pika_transport.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
"""RabbitMQ transport over AMQP 0-9-1. Requires the ``amqp`` extra:
2+
3+
pip install "babelqueue[amqp]"
4+
5+
Producing publishes the envelope to a durable queue with persistent delivery and
6+
the AMQP properties that are part of the cross-language contract (``type`` = URN,
7+
``correlation_id`` = trace_id, ``message_id`` = meta.id, ``x-schema-version`` /
8+
``x-source-lang`` / ``x-attempts`` headers) — so a Go/PHP consumer can route on
9+
``properties.type`` without parsing the body. Consuming uses ``basic_get`` + manual
10+
ack (at-least-once), matching the PHP RabbitMQ driver.
11+
12+
Connection is lazy; it (re)connects on first use and after a drop.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
import json
18+
from typing import Any, Dict, Optional
19+
20+
from .transport import ReceivedMessage, Transport
21+
22+
23+
class PikaTransport(Transport):
24+
def __init__(self, url: str) -> None:
25+
try:
26+
import pika
27+
except ImportError as exc: # pragma: no cover - import guard
28+
raise ImportError(
29+
"PikaTransport requires the 'pika' package. Install with "
30+
'pip install "babelqueue[amqp]".'
31+
) from exc
32+
33+
self._pika = pika
34+
self._url = url
35+
self._connection: Any = None
36+
self._channel: Any = None
37+
self._declared: set[str] = set()
38+
39+
# -- connection / topology ---------------------------------------------
40+
41+
def _chan(self) -> Any:
42+
if self._connection is None or self._connection.is_closed:
43+
self._connection = self._pika.BlockingConnection(self._pika.URLParameters(self._url))
44+
self._channel = None
45+
self._declared.clear()
46+
if self._channel is None or self._channel.is_closed:
47+
self._channel = self._connection.channel()
48+
return self._channel
49+
50+
def _declare(self, queue: str) -> None:
51+
if queue not in self._declared:
52+
self._chan().queue_declare(queue=queue, durable=True)
53+
self._declared.add(queue)
54+
55+
def _properties(self, body: str) -> Any:
56+
"""AMQP properties derived from the envelope (part of the wire contract)."""
57+
try:
58+
envelope: Dict[str, Any] = json.loads(body)
59+
except (ValueError, TypeError):
60+
return self._pika.BasicProperties(content_type="application/json", delivery_mode=2)
61+
62+
meta = envelope.get("meta") or {}
63+
headers = {
64+
"x-schema-version": meta.get("schema_version"),
65+
"x-source-lang": meta.get("lang"),
66+
"x-attempts": envelope.get("attempts", 0),
67+
}
68+
return self._pika.BasicProperties(
69+
content_type="application/json",
70+
content_encoding="utf-8",
71+
delivery_mode=2, # persistent
72+
message_id=meta.get("id"),
73+
correlation_id=envelope.get("trace_id"),
74+
type=envelope.get("job"),
75+
app_id="babelqueue",
76+
headers={k: v for k, v in headers.items() if v is not None},
77+
)
78+
79+
# -- Transport ----------------------------------------------------------
80+
81+
def publish(self, queue: str, body: str) -> None:
82+
self._declare(queue)
83+
self._chan().basic_publish(
84+
exchange="",
85+
routing_key=queue,
86+
body=body.encode("utf-8"),
87+
properties=self._properties(body),
88+
)
89+
90+
def pop(self, queue: str, timeout: float = 1.0) -> Optional[ReceivedMessage]:
91+
self._declare(queue)
92+
method, _props, body = self._chan().basic_get(queue=queue, auto_ack=False)
93+
if method is None:
94+
# Nothing ready — sleep (heartbeat-safe) so the caller doesn't busy-loop.
95+
if timeout and timeout > 0:
96+
self._connection.sleep(timeout)
97+
return None
98+
text = body.decode("utf-8") if isinstance(body, (bytes, bytearray)) else str(body)
99+
return ReceivedMessage(body=text, queue=queue, handle=method.delivery_tag)
100+
101+
def ack(self, message: ReceivedMessage) -> None:
102+
self._chan().basic_ack(delivery_tag=message.handle)
103+
104+
def close(self) -> None: # pragma: no cover
105+
try:
106+
if self._connection is not None and self._connection.is_open:
107+
self._connection.close()
108+
except Exception:
109+
pass

src/babelqueue/transport.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,12 @@ def make_transport(broker_url: str) -> Transport:
7575
from .redis_transport import RedisTransport
7676

7777
return RedisTransport(broker_url)
78+
if scheme in ("amqp", "amqps"):
79+
from .pika_transport import PikaTransport
80+
81+
return PikaTransport(broker_url)
7882

7983
raise BabelQueueError(
80-
f"Unsupported broker scheme {scheme!r}. Use 'memory://' or 'redis://', "
81-
"or pass your own Transport via BabelQueue(transport=...)."
84+
f"Unsupported broker scheme {scheme!r}. Use 'memory://', 'redis://' or "
85+
"'amqp://', or pass your own Transport via BabelQueue(transport=...)."
8286
)

tests/test_pika_transport.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""Integration tests for the RabbitMQ (pika) transport.
2+
3+
Skipped unless a broker is reachable (the `pika` package installed and a broker at
4+
``BABELQUEUE_TEST_AMQP`` / localhost). The CI ``integration`` job runs these
5+
against a RabbitMQ service; locally they skip cleanly.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import os
11+
import unittest
12+
import uuid
13+
14+
try:
15+
import pika as _pika
16+
except ImportError: # pragma: no cover
17+
_pika = None
18+
19+
from babelqueue import BabelQueue, EnvelopeCodec
20+
21+
AMQP_URL = os.environ.get("BABELQUEUE_TEST_AMQP", "amqp://guest:guest@localhost:5672/")
22+
23+
24+
def _amqp_available() -> bool:
25+
if _pika is None:
26+
return False
27+
try:
28+
conn = _pika.BlockingConnection(_pika.URLParameters(AMQP_URL))
29+
conn.close()
30+
return True
31+
except Exception: # pragma: no cover - connection failure
32+
return False
33+
34+
35+
@unittest.skipUnless(_amqp_available(), f"no reachable RabbitMQ at {AMQP_URL}")
36+
class PikaTransportTest(unittest.TestCase):
37+
def setUp(self) -> None:
38+
self.queue = f"bqtest-{uuid.uuid4().hex}"
39+
self.conn = _pika.BlockingConnection(_pika.URLParameters(AMQP_URL))
40+
self.ctl = self.conn.channel()
41+
42+
def tearDown(self) -> None:
43+
for q in (self.queue, f"{self.queue}.dlq"):
44+
try:
45+
self.ctl.queue_delete(queue=q)
46+
except Exception:
47+
pass
48+
self.conn.close()
49+
50+
def _depth(self, queue: str) -> int:
51+
method = self.ctl.queue_declare(queue=queue, durable=True, passive=True)
52+
return method.method.message_count
53+
54+
def test_publish_consume_round_trip_and_ack(self) -> None:
55+
app = BabelQueue(AMQP_URL, queue=self.queue)
56+
seen = {}
57+
58+
@app.handler("urn:babel:orders:created")
59+
def handle(data, meta): # noqa: ANN001
60+
seen.update(data)
61+
62+
app.publish("urn:babel:orders:created", {"order_id": 42})
63+
processed = app.consume(max_messages=1, timeout=3)
64+
65+
self.assertEqual(processed, 1)
66+
self.assertEqual(seen, {"order_id": 42})
67+
self.assertEqual(self._depth(self.queue), 0) # acked
68+
69+
def test_publish_sets_contract_amqp_properties(self) -> None:
70+
app = BabelQueue(AMQP_URL, queue=self.queue)
71+
app.publish("urn:babel:orders:created", {"order_id": 1}, trace_id="trace-amqp")
72+
73+
method, props, body = self.ctl.basic_get(queue=self.queue, auto_ack=True)
74+
self.assertIsNotNone(method)
75+
self.assertEqual(props.type, "urn:babel:orders:created") # route on properties.type
76+
self.assertEqual(props.correlation_id, "trace-amqp") # trace_id
77+
self.assertEqual(props.content_type, "application/json")
78+
self.assertEqual(props.delivery_mode, 2) # persistent
79+
self.assertEqual(props.app_id, "babelqueue")
80+
81+
def test_failure_dead_letters(self) -> None:
82+
app = BabelQueue(AMQP_URL, queue=self.queue, max_attempts=1, dead_letter=True)
83+
84+
@app.handler("urn:babel:orders:created")
85+
def handle(data, meta): # noqa: ANN001
86+
raise RuntimeError("boom")
87+
88+
app.publish("urn:babel:orders:created", {"order_id": 1})
89+
app.consume(max_messages=2, timeout=3)
90+
91+
self.assertEqual(self._depth(f"{self.queue}.dlq"), 1)
92+
_m, _p, body = self.ctl.basic_get(queue=f"{self.queue}.dlq", auto_ack=True)
93+
env = EnvelopeCodec.decode(body.decode("utf-8"))
94+
self.assertEqual(env["dead_letter"]["reason"], "failed")
95+
96+
97+
if __name__ == "__main__":
98+
unittest.main()

0 commit comments

Comments
 (0)