diff --git a/jetstream-consumer/consumer.py b/jetstream-consumer/consumer.py index 5437a05..d31eae2 100644 --- a/jetstream-consumer/consumer.py +++ b/jetstream-consumer/consumer.py @@ -30,6 +30,14 @@ RETRY_INTERVAL = int(os.environ.get("RETRY_INTERVAL", "30")) DURABLE_NAME = "argus-jetstream-consumer" +# Streams the consumer requires. {stream_name: [subject_filters]} +# If pull_subscribe raises NotFoundError because a stream does not yet exist, +# the consumer will attempt to create it with these subject filters. +REQUIRED_STREAMS: dict[str, list[str]] = { + "hi_agents": ["hi.agents.>"], + "hi_tasks": ["hi.tasks.>"], +} + # ── Shared metrics state (guarded by _lock) ──────────────────────────────── _lock = threading.Lock() @@ -147,6 +155,37 @@ def _run_http_server(port: int) -> None: # ── JetStream subscription loop ──────────────────────────────────────────── +async def _pull_subscribe_with_auto_create( + js: Any, + subject: str, + stream: str, + durable: str, +) -> Any: + """Call ``js.pull_subscribe`` and, on :class:`NotFoundError`, create the + expected stream and retry once. + + nats-py raises ``NotFoundError`` when ``pull_subscribe`` is asked to bind to + a stream that does not exist on the JetStream server. The previous + behaviour was to surface this as a generic exception and fall back to the + outer retry loop, which would never make progress until an operator + manually created the stream. This wrapper attempts an automatic recovery + by creating the stream with the subject filter we are about to subscribe + to, then retrying ``pull_subscribe`` exactly once. If the second attempt + still fails, the exception propagates so the outer loop can log it. + """ + try: + return await js.pull_subscribe(subject, durable=durable, stream=stream) + except NotFoundError: + subjects = REQUIRED_STREAMS.get(stream, [subject]) + log.warning( + "JetStream stream %r not found; auto-creating with subjects %s", + stream, + subjects, + ) + await js.add_stream(name=stream, subjects=subjects) + return await js.pull_subscribe(subject, durable=durable, stream=stream) + + async def _fetch_loop( sub: Any, stream: str, @@ -193,15 +232,17 @@ async def subscribe_loop(stop_event: asyncio.Event) -> None: ) js = nc.jetstream() - sub_agents = await js.pull_subscribe( + sub_agents = await _pull_subscribe_with_auto_create( + js, "hi.agents.>", - durable=DURABLE_NAME, stream="hi_agents", + durable=DURABLE_NAME, ) - sub_tasks = await js.pull_subscribe( + sub_tasks = await _pull_subscribe_with_auto_create( + js, "hi.tasks.>", - durable=DURABLE_NAME, stream="hi_tasks", + durable=DURABLE_NAME, ) with _lock: diff --git a/tests/test_jetstream_consumer.py b/tests/test_jetstream_consumer.py index d6631f5..4b586e6 100644 --- a/tests/test_jetstream_consumer.py +++ b/tests/test_jetstream_consumer.py @@ -11,7 +11,7 @@ from pathlib import Path import types import urllib.request -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock import pytest @@ -259,6 +259,112 @@ def test_zero_count_latency_renders_zero(self, consumer): assert 'hi_jetstream_task_latency_seconds{status="failed"} 0.0' in output +# --------------------------------------------------------------------------- +# _pull_subscribe_with_auto_create +# --------------------------------------------------------------------------- + +class TestPullSubscribeWithAutoCreate: + """Verify the auto-create-on-NotFoundError fallback.""" + + def _run(self, coro): + import asyncio + + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + def test_returns_subscription_when_stream_exists(self, consumer): + js = MagicMock() + expected_sub = object() + js.pull_subscribe = AsyncMock(return_value=expected_sub) + js.add_stream = AsyncMock() + + result = self._run( + consumer._pull_subscribe_with_auto_create( + js, "hi.agents.>", stream="hi_agents", durable="d", + ) + ) + + assert result is expected_sub + js.pull_subscribe.assert_awaited_once_with( + "hi.agents.>", durable="d", stream="hi_agents", + ) + js.add_stream.assert_not_called() + + def test_creates_stream_and_retries_on_not_found(self, consumer): + from nats.js.errors import NotFoundError + + expected_sub = object() + js = MagicMock() + js.pull_subscribe = AsyncMock(side_effect=[NotFoundError(), expected_sub]) + js.add_stream = AsyncMock() + + result = self._run( + consumer._pull_subscribe_with_auto_create( + js, "hi.agents.>", stream="hi_agents", durable="d", + ) + ) + + assert result is expected_sub + assert js.pull_subscribe.await_count == 2 + js.add_stream.assert_awaited_once_with( + name="hi_agents", subjects=["hi.agents.>"], + ) + + def test_uses_subject_when_stream_not_in_registry(self, consumer): + from nats.js.errors import NotFoundError + + expected_sub = object() + js = MagicMock() + js.pull_subscribe = AsyncMock(side_effect=[NotFoundError(), expected_sub]) + js.add_stream = AsyncMock() + + result = self._run( + consumer._pull_subscribe_with_auto_create( + js, "hi.custom.>", stream="hi_custom", durable="d", + ) + ) + + assert result is expected_sub + js.add_stream.assert_awaited_once_with( + name="hi_custom", subjects=["hi.custom.>"], + ) + + def test_propagates_second_failure(self, consumer): + from nats.js.errors import NotFoundError + + js = MagicMock() + js.pull_subscribe = AsyncMock(side_effect=[NotFoundError(), NotFoundError()]) + js.add_stream = AsyncMock() + + with pytest.raises(NotFoundError): + self._run( + consumer._pull_subscribe_with_auto_create( + js, "hi.agents.>", stream="hi_agents", durable="d", + ) + ) + assert js.pull_subscribe.await_count == 2 + js.add_stream.assert_awaited_once() + + def test_propagates_add_stream_failure(self, consumer): + from nats.js.errors import NotFoundError + + js = MagicMock() + js.pull_subscribe = AsyncMock(side_effect=NotFoundError()) + js.add_stream = AsyncMock(side_effect=RuntimeError("permission denied")) + + with pytest.raises(RuntimeError, match="permission denied"): + self._run( + consumer._pull_subscribe_with_auto_create( + js, "hi.agents.>", stream="hi_agents", durable="d", + ) + ) + # pull_subscribe should only have been called once before add_stream blew up. + assert js.pull_subscribe.await_count == 1 + + # --------------------------------------------------------------------------- # HTTP handler # ---------------------------------------------------------------------------