From ce295cd006384d9a1415ab3db90fb3f99756aedf Mon Sep 17 00:00:00 2001 From: Micah Villmow <4211002+mvillmow@users.noreply.github.com> Date: Wed, 13 May 2026 20:02:07 -0700 Subject: [PATCH] test(integration): add real-NATS pytest fixture (#173) Adds a session-scoped pytest fixture that spins up a real `nats-server` process with JetStream enabled on an ephemeral port, plus one integration test that exercises `jetstream-consumer` end-to-end: publish synthetic hi.tasks.completed / hi.agents.created events, run subscribe_loop in a background thread, and assert that _render_metrics() reflects the consumed events and computed task latency. Mirrors ProjectScylla's tests/integration/nats/conftest.py shape and Charybdis's REQUIRES_NATS ctest gate: the module is skipped when either `nats-server` is not on PATH or `nats-py` is not installed, so existing CI (which lacks both) stays green and the test only fires on machines provisioned for integration runs. Registers `integration` and `requires_nats` pytest markers in pyproject.toml so `--strict-markers` accepts them. Closes #173 Co-Authored-By: Claude Opus 4.7 (1M context) --- pyproject.toml | 4 + tests/integration/__init__.py | 0 tests/integration/conftest.py | 149 +++++++++++++++ .../test_jetstream_consumer_real_nats.py | 179 ++++++++++++++++++ 4 files changed, 332 insertions(+) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/test_jetstream_consumer_real_nats.py diff --git a/pyproject.toml b/pyproject.toml index 899fb7c..2e9cc99 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,6 +2,10 @@ testpaths = ["tests"] pythonpath = ["."] addopts = "-v --strict-markers --cov=exporter --cov-report=term-missing --cov-report=html --cov-report=xml" +markers = [ + "integration: integration tests that exercise real external dependencies (slower; opt-in via -m integration)", + "requires_nats: integration tests that require a real `nats-server` binary on PATH (mirrors Charybdis's REQUIRES_NATS gate)", +] [tool.coverage.run] source = ["exporter"] diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..6101dc6 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,149 @@ +"""Shared fixtures for integration tests that need a real NATS server. + +Spins up a `nats-server` subprocess with JetStream enabled on an ephemeral +port and yields a connection URL. Tests are skipped (per their own +`pytest.mark.skipif`) when `nats-server` is not on PATH. + +Mirrors the pattern used by ProjectScylla's `tests/integration/nats/conftest.py` +and Charybdis's `REQUIRES_NATS` ctest gate so contributors can recognize the +shape across the HomericIntelligence ecosystem. +""" +from __future__ import annotations + +import asyncio +import json +import socket +import subprocess +import tempfile +import time +from collections.abc import Generator +from typing import Any + +import pytest + + +def _find_free_port() -> int: + """Find a free TCP port by binding to port 0.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + port: int = s.getsockname()[1] + return port + + +def _wait_for_port(port: int, host: str = "127.0.0.1", timeout: float = 5.0) -> None: + """Block until a TCP port is accepting connections.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with socket.create_connection((host, port), timeout=0.5): + return + except OSError: + time.sleep(0.2) + raise TimeoutError( + f"nats-server did not start on {host}:{port} within {timeout}s" + ) + + +class NATSPublisher: + """Synchronous wrapper around nats-py for publishing JetStream messages. + + Manages its own asyncio event loop so tests stay synchronous. + """ + + def __init__( + self, + url: str, + streams: tuple[tuple[str, list[str]], ...] = ( + ("hi_agents", ["hi.agents.>"]), + ("hi_tasks", ["hi.tasks.>"]), + ), + ) -> None: + """Initialize the publisher and ensure all configured streams exist.""" + self._url = url + self._streams = streams + self._loop = asyncio.new_event_loop() + self._nc: Any = None + self._js: Any = None + self._setup() + + def _setup(self) -> None: + """Connect to NATS and create configured JetStream streams.""" + import nats as nats_client + + self._nc = self._loop.run_until_complete(nats_client.connect(self._url)) + self._js = self._nc.jetstream() + for name, subjects in self._streams: + self._loop.run_until_complete( + self._js.add_stream(name=name, subjects=subjects) + ) + + def publish_json(self, subject: str, payload: dict[str, Any]) -> None: + """Publish a JSON-encoded message to the given subject.""" + self._loop.run_until_complete( + self._js.publish(subject, json.dumps(payload).encode()) + ) + + def purge_all(self) -> None: + """Purge every configured stream so tests start with a clean slate.""" + for name, _ in self._streams: + self._loop.run_until_complete(self._js.purge_stream(name)) + + def close(self) -> None: + """Drain the connection and close the event loop.""" + if self._nc is not None: + self._loop.run_until_complete(self._nc.drain()) + self._loop.close() + + +@pytest.fixture(scope="session") +def nats_port() -> int: + """Return an ephemeral TCP port for nats-server.""" + return _find_free_port() + + +@pytest.fixture(scope="session") +def nats_url(nats_port: int) -> Generator[str, None, None]: + """Start a real nats-server process with JetStream and yield its URL. + + The subprocess is terminated in teardown. Tests that depend on this + fixture should `pytest.mark.skipif(shutil.which("nats-server") is None)` + so the suite remains green on hosts without the binary. + """ + with tempfile.TemporaryDirectory() as store_dir: + proc = subprocess.Popen( + [ + "nats-server", + "-p", + str(nats_port), + "-js", + "-sd", + store_dir, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + try: + _wait_for_port(nats_port) + yield f"nats://127.0.0.1:{nats_port}" + finally: + proc.terminate() + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + + +@pytest.fixture() +def publisher(nats_url: str) -> Generator[NATSPublisher, None, None]: + """Yield a NATSPublisher connected to the test server. + + Creates `hi_agents` and `hi_tasks` streams and tears down the + connection after the test. + """ + pub = NATSPublisher(nats_url) + pub.purge_all() + try: + yield pub + finally: + pub.close() diff --git a/tests/integration/test_jetstream_consumer_real_nats.py b/tests/integration/test_jetstream_consumer_real_nats.py new file mode 100644 index 0000000..2b752ee --- /dev/null +++ b/tests/integration/test_jetstream_consumer_real_nats.py @@ -0,0 +1,179 @@ +"""Integration test for jetstream-consumer against a real nats-server. + +Spins up a real NATS server (JetStream enabled) via the session-scoped +fixture in `conftest.py`, runs the consumer's `subscribe_loop` in a +background thread, publishes synthetic `hi.tasks.completed` and +`hi.agents.created` events, and asserts that `/metrics` reflects them. + +Mirrors Charybdis's `REQUIRES_NATS` ctest gate semantically: when the +`nats-server` binary is not on PATH the entire module is skipped. +""" +from __future__ import annotations + +import asyncio +import importlib.util +import shutil +import sys +import threading +import time +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + +from .conftest import NATSPublisher + + +def _nats_py_installed() -> bool: + """Return True iff the real `nats-py` package is importable on disk. + + Unit tests inject a `MagicMock` stub at `sys.modules["nats"]`; we + explicitly bypass that cache so this probe reflects what is actually + installed in the environment. + """ + saved = {k: sys.modules.pop(k) for k in list(sys.modules) if k == "nats" or k.startswith("nats.")} + try: + return importlib.util.find_spec("nats") is not None + finally: + sys.modules.update(saved) + + +_nats_py_available = _nats_py_installed() + +pytestmark = [ + pytest.mark.integration, + pytest.mark.requires_nats, + pytest.mark.skipif( + shutil.which("nats-server") is None, + reason="nats-server not in PATH", + ), + pytest.mark.skipif( + not _nats_py_available, + reason="nats-py not installed", + ), +] + + +# --------------------------------------------------------------------------- +# Stub `nats.errors` / `nats.js.errors` only if they are not provided by an +# installed nats-py. The publisher fixture imports the real `nats` package +# (it must be installed for integration tests anyway), so we just guard +# against the case where the unit-test stub is still cached from a prior +# test in the same session. +# --------------------------------------------------------------------------- + +def _ensure_real_nats_loaded() -> None: + """Drop any unit-test stub so we get the real installed `nats` package.""" + nats_mod = sys.modules.get("nats") + if nats_mod is not None and isinstance(getattr(nats_mod, "connect", None), MagicMock): + # A unit-test stub is loaded. Purge so the real package re-imports. + for key in [k for k in sys.modules if k == "nats" or k.startswith("nats.")]: + del sys.modules[key] + + +@pytest.fixture(scope="module") +def consumer_module() -> types.ModuleType: + """Load `jetstream-consumer/consumer.py` against the real nats-py. + + Unit tests in `tests/test_jetstream_consumer.py` install a `MagicMock` + stub at module scope; we explicitly purge it here so this integration + module loads the real client. + """ + _ensure_real_nats_loaded() + path = Path(__file__).resolve().parents[2] / "jetstream-consumer" / "consumer.py" + spec = importlib.util.spec_from_file_location("consumer_integration", path) + assert spec is not None and spec.loader is not None + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def _reset_consumer_state(consumer: types.ModuleType) -> None: + """Clear the consumer's module-global metric state before a test.""" + with consumer._lock: + consumer._event_counts.clear() + consumer._last_seq.clear() + consumer._latency_accum.clear() + consumer._scrape_ts = 0.0 + consumer._connected = 0 + + +def test_subscribe_loop_records_published_events( + consumer_module: types.ModuleType, + nats_url: str, + publisher: NATSPublisher, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """End-to-end: publish events → subscribe_loop consumes → metrics reflect them. + + This exercises connect → pull_subscribe → fetch → ack → render_metrics + against a real nats-server, the path that unit tests cannot cover + because they replace `nats` with a `MagicMock`. + """ + consumer = consumer_module + _reset_consumer_state(consumer) + monkeypatch.setattr(consumer, "NATS_URL", nats_url) + monkeypatch.setattr(consumer, "FETCH_TIMEOUT", 0.25) + monkeypatch.setattr(consumer, "FETCH_BATCH", 10) + + # Publish before starting the consumer so messages are queued in + # JetStream and the durable consumer picks them up on first fetch. + publisher.publish_json( + "hi.tasks.completed", + {"status": "completed", "created_at": 100.0, "completed_at": 100.5}, + ) + publisher.publish_json("hi.agents.created", {"agent_id": "alpha"}) + + stop_event_box: dict[str, asyncio.Event] = {} + loop_box: dict[str, asyncio.AbstractEventLoop] = {} + ready = threading.Event() + + def run_subscriber() -> None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop_box["loop"] = loop + stop_event = asyncio.Event() + stop_event_box["stop"] = stop_event + ready.set() + try: + loop.run_until_complete(consumer.subscribe_loop(stop_event)) + finally: + loop.close() + + t = threading.Thread(target=run_subscriber, daemon=True) + t.start() + assert ready.wait(timeout=5.0), "subscriber thread failed to initialise" + + try: + # Wait up to 15s for both events to be consumed and acked. + deadline = time.monotonic() + 15.0 + while time.monotonic() < deadline: + with consumer._lock: + tasks_count = consumer._event_counts.get( + ("hi.tasks", "completed"), 0 + ) + agents_count = consumer._event_counts.get( + ("hi.agents", "created"), 0 + ) + connected = consumer._connected + if tasks_count >= 1 and agents_count >= 1 and connected == 1: + break + time.sleep(0.2) + + metrics = consumer._render_metrics() + finally: + loop = loop_box["loop"] + stop_event = stop_event_box["stop"] + loop.call_soon_threadsafe(stop_event.set) + t.join(timeout=10.0) + assert not t.is_alive(), "subscriber thread failed to shut down" + + # Verify metric state reflects what we published. + assert "hi_jetstream_consumer_connected 1" in metrics + assert 'subject_prefix="hi.tasks"' in metrics + assert 'event_type="completed"' in metrics + assert 'subject_prefix="hi.agents"' in metrics + assert 'event_type="created"' in metrics + # Latency for the completed task: 100.5 - 100.0 = 0.5s + assert 'hi_jetstream_task_latency_seconds{status="completed"} 0.5' in metrics