Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
testpaths = ["tests"]
pythonpath = ["."]
addopts = "-v --strict-markers --cov=exporter --cov-report=term-missing --cov-report=html --cov-report=xml"
markers = [
"integration: integration tests that exercise real external dependencies (slower; opt-in via -m integration)",
"requires_nats: integration tests that require a real `nats-server` binary on PATH (mirrors Charybdis's REQUIRES_NATS gate)",
]

[tool.coverage.run]
source = ["exporter"]
Expand Down
Empty file added tests/integration/__init__.py
Empty file.
149 changes: 149 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Shared fixtures for integration tests that need a real NATS server.

Spins up a `nats-server` subprocess with JetStream enabled on an ephemeral
port and yields a connection URL. Tests are skipped (per their own
`pytest.mark.skipif`) when `nats-server` is not on PATH.

Mirrors the pattern used by ProjectScylla's `tests/integration/nats/conftest.py`
and Charybdis's `REQUIRES_NATS` ctest gate so contributors can recognize the
shape across the HomericIntelligence ecosystem.
"""
from __future__ import annotations

import asyncio
import json
import socket
import subprocess
import tempfile
import time
from collections.abc import Generator
from typing import Any

import pytest


def _find_free_port() -> int:
"""Find a free TCP port by binding to port 0."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
port: int = s.getsockname()[1]
return port


def _wait_for_port(port: int, host: str = "127.0.0.1", timeout: float = 5.0) -> None:
"""Block until a TCP port is accepting connections."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
with socket.create_connection((host, port), timeout=0.5):
return
except OSError:
time.sleep(0.2)
raise TimeoutError(
f"nats-server did not start on {host}:{port} within {timeout}s"
)


class NATSPublisher:
"""Synchronous wrapper around nats-py for publishing JetStream messages.

Manages its own asyncio event loop so tests stay synchronous.
"""

def __init__(
self,
url: str,
streams: tuple[tuple[str, list[str]], ...] = (
("hi_agents", ["hi.agents.>"]),
("hi_tasks", ["hi.tasks.>"]),
),
) -> None:
"""Initialize the publisher and ensure all configured streams exist."""
self._url = url
self._streams = streams
self._loop = asyncio.new_event_loop()
self._nc: Any = None
self._js: Any = None
self._setup()

def _setup(self) -> None:
"""Connect to NATS and create configured JetStream streams."""
import nats as nats_client

self._nc = self._loop.run_until_complete(nats_client.connect(self._url))
self._js = self._nc.jetstream()
for name, subjects in self._streams:
self._loop.run_until_complete(
self._js.add_stream(name=name, subjects=subjects)
)

def publish_json(self, subject: str, payload: dict[str, Any]) -> None:
"""Publish a JSON-encoded message to the given subject."""
self._loop.run_until_complete(
self._js.publish(subject, json.dumps(payload).encode())
)

def purge_all(self) -> None:
"""Purge every configured stream so tests start with a clean slate."""
for name, _ in self._streams:
self._loop.run_until_complete(self._js.purge_stream(name))

def close(self) -> None:
"""Drain the connection and close the event loop."""
if self._nc is not None:
self._loop.run_until_complete(self._nc.drain())
self._loop.close()


@pytest.fixture(scope="session")
def nats_port() -> int:
"""Return an ephemeral TCP port for nats-server."""
return _find_free_port()


@pytest.fixture(scope="session")
def nats_url(nats_port: int) -> Generator[str, None, None]:
"""Start a real nats-server process with JetStream and yield its URL.

The subprocess is terminated in teardown. Tests that depend on this
fixture should `pytest.mark.skipif(shutil.which("nats-server") is None)`
so the suite remains green on hosts without the binary.
"""
with tempfile.TemporaryDirectory() as store_dir:
proc = subprocess.Popen(
[
"nats-server",
"-p",
str(nats_port),
"-js",
"-sd",
store_dir,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
_wait_for_port(nats_port)
yield f"nats://127.0.0.1:{nats_port}"
finally:
proc.terminate()
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)


@pytest.fixture()
def publisher(nats_url: str) -> Generator[NATSPublisher, None, None]:
"""Yield a NATSPublisher connected to the test server.

Creates `hi_agents` and `hi_tasks` streams and tears down the
connection after the test.
"""
pub = NATSPublisher(nats_url)
pub.purge_all()
try:
yield pub
finally:
pub.close()
179 changes: 179 additions & 0 deletions tests/integration/test_jetstream_consumer_real_nats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Integration test for jetstream-consumer against a real nats-server.

Spins up a real NATS server (JetStream enabled) via the session-scoped
fixture in `conftest.py`, runs the consumer's `subscribe_loop` in a
background thread, publishes synthetic `hi.tasks.completed` and
`hi.agents.created` events, and asserts that `/metrics` reflects them.

Mirrors Charybdis's `REQUIRES_NATS` ctest gate semantically: when the
`nats-server` binary is not on PATH the entire module is skipped.
"""
from __future__ import annotations

import asyncio
import importlib.util
import shutil
import sys
import threading
import time
import types
from pathlib import Path
from unittest.mock import MagicMock

import pytest

from .conftest import NATSPublisher


def _nats_py_installed() -> bool:
"""Return True iff the real `nats-py` package is importable on disk.

Unit tests inject a `MagicMock` stub at `sys.modules["nats"]`; we
explicitly bypass that cache so this probe reflects what is actually
installed in the environment.
"""
saved = {k: sys.modules.pop(k) for k in list(sys.modules) if k == "nats" or k.startswith("nats.")}
try:
return importlib.util.find_spec("nats") is not None
finally:
sys.modules.update(saved)


_nats_py_available = _nats_py_installed()

pytestmark = [
pytest.mark.integration,
pytest.mark.requires_nats,
pytest.mark.skipif(
shutil.which("nats-server") is None,
reason="nats-server not in PATH",
),
pytest.mark.skipif(
not _nats_py_available,
reason="nats-py not installed",
),
]


# ---------------------------------------------------------------------------
# Stub `nats.errors` / `nats.js.errors` only if they are not provided by an
# installed nats-py. The publisher fixture imports the real `nats` package
# (it must be installed for integration tests anyway), so we just guard
# against the case where the unit-test stub is still cached from a prior
# test in the same session.
# ---------------------------------------------------------------------------

def _ensure_real_nats_loaded() -> None:
"""Drop any unit-test stub so we get the real installed `nats` package."""
nats_mod = sys.modules.get("nats")
if nats_mod is not None and isinstance(getattr(nats_mod, "connect", None), MagicMock):
# A unit-test stub is loaded. Purge so the real package re-imports.
for key in [k for k in sys.modules if k == "nats" or k.startswith("nats.")]:
del sys.modules[key]


@pytest.fixture(scope="module")
def consumer_module() -> types.ModuleType:
"""Load `jetstream-consumer/consumer.py` against the real nats-py.

Unit tests in `tests/test_jetstream_consumer.py` install a `MagicMock`
stub at module scope; we explicitly purge it here so this integration
module loads the real client.
"""
_ensure_real_nats_loaded()
path = Path(__file__).resolve().parents[2] / "jetstream-consumer" / "consumer.py"
spec = importlib.util.spec_from_file_location("consumer_integration", path)
assert spec is not None and spec.loader is not None
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod


def _reset_consumer_state(consumer: types.ModuleType) -> None:
"""Clear the consumer's module-global metric state before a test."""
with consumer._lock:
consumer._event_counts.clear()
consumer._last_seq.clear()
consumer._latency_accum.clear()
consumer._scrape_ts = 0.0
consumer._connected = 0


def test_subscribe_loop_records_published_events(
consumer_module: types.ModuleType,
nats_url: str,
publisher: NATSPublisher,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""End-to-end: publish events → subscribe_loop consumes → metrics reflect them.

This exercises connect → pull_subscribe → fetch → ack → render_metrics
against a real nats-server, the path that unit tests cannot cover
because they replace `nats` with a `MagicMock`.
"""
consumer = consumer_module
_reset_consumer_state(consumer)
monkeypatch.setattr(consumer, "NATS_URL", nats_url)
monkeypatch.setattr(consumer, "FETCH_TIMEOUT", 0.25)
monkeypatch.setattr(consumer, "FETCH_BATCH", 10)

# Publish before starting the consumer so messages are queued in
# JetStream and the durable consumer picks them up on first fetch.
publisher.publish_json(
"hi.tasks.completed",
{"status": "completed", "created_at": 100.0, "completed_at": 100.5},
)
publisher.publish_json("hi.agents.created", {"agent_id": "alpha"})

stop_event_box: dict[str, asyncio.Event] = {}
loop_box: dict[str, asyncio.AbstractEventLoop] = {}
ready = threading.Event()

def run_subscriber() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop_box["loop"] = loop
stop_event = asyncio.Event()
stop_event_box["stop"] = stop_event
ready.set()
try:
loop.run_until_complete(consumer.subscribe_loop(stop_event))
finally:
loop.close()

t = threading.Thread(target=run_subscriber, daemon=True)
t.start()
assert ready.wait(timeout=5.0), "subscriber thread failed to initialise"

try:
# Wait up to 15s for both events to be consumed and acked.
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
with consumer._lock:
tasks_count = consumer._event_counts.get(
("hi.tasks", "completed"), 0
)
agents_count = consumer._event_counts.get(
("hi.agents", "created"), 0
)
connected = consumer._connected
if tasks_count >= 1 and agents_count >= 1 and connected == 1:
break
time.sleep(0.2)

metrics = consumer._render_metrics()
finally:
loop = loop_box["loop"]
stop_event = stop_event_box["stop"]
loop.call_soon_threadsafe(stop_event.set)
t.join(timeout=10.0)
assert not t.is_alive(), "subscriber thread failed to shut down"

# Verify metric state reflects what we published.
assert "hi_jetstream_consumer_connected 1" in metrics
assert 'subject_prefix="hi.tasks"' in metrics
assert 'event_type="completed"' in metrics
assert 'subject_prefix="hi.agents"' in metrics
assert 'event_type="created"' in metrics
# Latency for the completed task: 100.5 - 100.0 = 0.5s
assert 'hi_jetstream_task_latency_seconds{status="completed"} 0.5' in metrics
Loading