Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions jetstream-consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@
RETRY_INTERVAL = int(os.environ.get("RETRY_INTERVAL", "30"))
DURABLE_NAME = "argus-jetstream-consumer"

# Streams the consumer requires. {stream_name: [subject_filters]}
# If pull_subscribe raises NotFoundError because a stream does not yet exist,
# the consumer will attempt to create it with these subject filters.
REQUIRED_STREAMS: dict[str, list[str]] = {
"hi_agents": ["hi.agents.>"],
"hi_tasks": ["hi.tasks.>"],
}

# ── Shared metrics state (guarded by _lock) ────────────────────────────────
_lock = threading.Lock()

Expand Down Expand Up @@ -147,6 +155,37 @@ def _run_http_server(port: int) -> None:

# ── JetStream subscription loop ────────────────────────────────────────────

async def _pull_subscribe_with_auto_create(
js: Any,
subject: str,
stream: str,
durable: str,
) -> Any:
"""Call ``js.pull_subscribe`` and, on :class:`NotFoundError`, create the
expected stream and retry once.

nats-py raises ``NotFoundError`` when ``pull_subscribe`` is asked to bind to
a stream that does not exist on the JetStream server. The previous
behaviour was to surface this as a generic exception and fall back to the
outer retry loop, which would never make progress until an operator
manually created the stream. This wrapper attempts an automatic recovery
by creating the stream with the subject filter we are about to subscribe
to, then retrying ``pull_subscribe`` exactly once. If the second attempt
still fails, the exception propagates so the outer loop can log it.
"""
try:
return await js.pull_subscribe(subject, durable=durable, stream=stream)
except NotFoundError:
subjects = REQUIRED_STREAMS.get(stream, [subject])
log.warning(
"JetStream stream %r not found; auto-creating with subjects %s",
stream,
subjects,
)
await js.add_stream(name=stream, subjects=subjects)
return await js.pull_subscribe(subject, durable=durable, stream=stream)


async def _fetch_loop(
sub: Any,
stream: str,
Expand Down Expand Up @@ -193,15 +232,17 @@ async def subscribe_loop(stop_event: asyncio.Event) -> None:
)
js = nc.jetstream()

sub_agents = await js.pull_subscribe(
sub_agents = await _pull_subscribe_with_auto_create(
js,
"hi.agents.>",
durable=DURABLE_NAME,
stream="hi_agents",
durable=DURABLE_NAME,
)
sub_tasks = await js.pull_subscribe(
sub_tasks = await _pull_subscribe_with_auto_create(
js,
"hi.tasks.>",
durable=DURABLE_NAME,
stream="hi_tasks",
durable=DURABLE_NAME,
)

with _lock:
Expand Down
108 changes: 107 additions & 1 deletion tests/test_jetstream_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pathlib import Path
import types
import urllib.request
from unittest.mock import MagicMock
from unittest.mock import AsyncMock, MagicMock

import pytest

Expand Down Expand Up @@ -259,6 +259,112 @@ def test_zero_count_latency_renders_zero(self, consumer):
assert 'hi_jetstream_task_latency_seconds{status="failed"} 0.0' in output


# ---------------------------------------------------------------------------
# _pull_subscribe_with_auto_create
# ---------------------------------------------------------------------------

class TestPullSubscribeWithAutoCreate:
"""Verify the auto-create-on-NotFoundError fallback."""

def _run(self, coro):
import asyncio

loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()

def test_returns_subscription_when_stream_exists(self, consumer):
js = MagicMock()
expected_sub = object()
js.pull_subscribe = AsyncMock(return_value=expected_sub)
js.add_stream = AsyncMock()

result = self._run(
consumer._pull_subscribe_with_auto_create(
js, "hi.agents.>", stream="hi_agents", durable="d",
)
)

assert result is expected_sub
js.pull_subscribe.assert_awaited_once_with(
"hi.agents.>", durable="d", stream="hi_agents",
)
js.add_stream.assert_not_called()

def test_creates_stream_and_retries_on_not_found(self, consumer):
from nats.js.errors import NotFoundError

expected_sub = object()
js = MagicMock()
js.pull_subscribe = AsyncMock(side_effect=[NotFoundError(), expected_sub])
js.add_stream = AsyncMock()

result = self._run(
consumer._pull_subscribe_with_auto_create(
js, "hi.agents.>", stream="hi_agents", durable="d",
)
)

assert result is expected_sub
assert js.pull_subscribe.await_count == 2
js.add_stream.assert_awaited_once_with(
name="hi_agents", subjects=["hi.agents.>"],
)

def test_uses_subject_when_stream_not_in_registry(self, consumer):
from nats.js.errors import NotFoundError

expected_sub = object()
js = MagicMock()
js.pull_subscribe = AsyncMock(side_effect=[NotFoundError(), expected_sub])
js.add_stream = AsyncMock()

result = self._run(
consumer._pull_subscribe_with_auto_create(
js, "hi.custom.>", stream="hi_custom", durable="d",
)
)

assert result is expected_sub
js.add_stream.assert_awaited_once_with(
name="hi_custom", subjects=["hi.custom.>"],
)

def test_propagates_second_failure(self, consumer):
from nats.js.errors import NotFoundError

js = MagicMock()
js.pull_subscribe = AsyncMock(side_effect=[NotFoundError(), NotFoundError()])
js.add_stream = AsyncMock()

with pytest.raises(NotFoundError):
self._run(
consumer._pull_subscribe_with_auto_create(
js, "hi.agents.>", stream="hi_agents", durable="d",
)
)
assert js.pull_subscribe.await_count == 2
js.add_stream.assert_awaited_once()

def test_propagates_add_stream_failure(self, consumer):
from nats.js.errors import NotFoundError

js = MagicMock()
js.pull_subscribe = AsyncMock(side_effect=NotFoundError())
js.add_stream = AsyncMock(side_effect=RuntimeError("permission denied"))

with pytest.raises(RuntimeError, match="permission denied"):
self._run(
consumer._pull_subscribe_with_auto_create(
js, "hi.agents.>", stream="hi_agents", durable="d",
)
)
# pull_subscribe should only have been called once before add_stream blew up.
assert js.pull_subscribe.await_count == 1


# ---------------------------------------------------------------------------
# HTTP handler
# ---------------------------------------------------------------------------
Expand Down
Loading