From 707ee591c426d0a4260c6caba51591b848eb2d21 Mon Sep 17 00:00:00 2001 From: Gowtham Rao MD PhD Date: Thu, 14 May 2026 06:30:49 -0400 Subject: [PATCH] chore: execute zero-waste engineering final phase, remove legacy mocks and LBAC --- .clinerules | 1 - .cursorrules | 1 - AGENTS.md | 5 +- README.md | 6 +- docs/game_theory_and_markets.md | 2 +- docs/index.md | 2 +- docs/zero_trust_federation.md | 4 +- infrastructure/local/capabilities.matrix.yaml | 2 +- src/coreason_ecosystem/cli.py | 33 +-- .../gateway/sovereign_mcp_registry.py | 2 +- .../gateway/state_manifests.py | 2 +- src/coreason_ecosystem/utils/telemetry.py | 141 ++---------- tests/test_cli.py | 27 +-- tests/unit/utils/test_telemetry.py | 201 +----------------- 14 files changed, 38 insertions(+), 391 deletions(-) diff --git a/.clinerules b/.clinerules index c24e4eb2..0324ae5f 100644 --- a/.clinerules +++ b/.clinerules @@ -24,4 +24,3 @@ ## The "Zero-Waste Engineering" Mandate You are strictly bound by the "Borrow vs. Build" philosophy. You MUST maximize the use of stable Open Source Software (OSS) whenever available. You are mathematically forbidden from building custom, proprietary implementations for logging, tracing, graph layout, container routing, UI components, or serialization if a mature OSS standard (e.g., OpenTelemetry, Zep Graphiti, Pi.dev, React Flow) exists to solve the problem. - diff --git a/.cursorrules b/.cursorrules index 98d3da10..b3862992 100644 --- a/.cursorrules +++ b/.cursorrules @@ -29,4 +29,3 @@ All tests MUST execute against real local servers, real environment state, or de ## The "Zero-Waste Engineering" Mandate You are strictly bound by the "Borrow vs. Build" philosophy. You MUST maximize the use of stable Open Source Software (OSS) whenever available. You are mathematically forbidden from building custom, proprietary implementations for logging, tracing, graph layout, container routing, UI components, or serialization if a mature OSS standard (e.g., OpenTelemetry, Zep Graphiti, Pi.dev, React Flow) exists to solve the problem. - diff --git a/AGENTS.md b/AGENTS.md index c9dab16e..98d5123c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -27,7 +27,7 @@ Your mandate is strictly structural: you provision thermodynamic boundaries (inf To prevent semantic confusion and latent boundary drift, you must strictly differentiate the three planes. You operate ONLY in Plane 3. 1. **Plane 1: `coreason-manifest` (The Epistemic Plane):** The Invariant Core. It defines the mathematical, causal, and Pydantic boundaries of reality. (No compute). -2. **Plane 2: `coreason-runtime` (The Kinetic Plane):** The Temporal Execution Engine. This is where the LLM (The Agent) lives and reasons under the Logit Guillotine. (Pure compute). +2. **Plane 2: `coreason-runtime` (The Kinetic Plane):** The Temporal Execution Engine. This is where the LLM (The Agent) lives and reasons under the Constrained Decoding. (Pure compute). 3. **Plane 3: `coreason-ecosystem` (The Governance Plane - THIS REPO):** The Macroscopic Mesh. It is a stateless, Zero-Trust router and thermodynamic actuator. ### **1.2 The Sovereign MCP Projection Law** @@ -50,7 +50,7 @@ uv run ruff check . --fix ``` ### **2.2 Strict Type Checking** -Type hints are non-negotiable. They act as the local manifestation of the Logit Guillotine. +Type hints are non-negotiable. They act as the local manifestation of the Constrained Decoding. ```bash uv run mypy src/ tests/ ``` @@ -107,4 +107,3 @@ All tests MUST execute against real local servers, real environment state, or de ## The "Zero-Waste Engineering" Mandate You are strictly bound by the "Borrow vs. Build" philosophy. You MUST maximize the use of stable Open Source Software (OSS) whenever available. You are mathematically forbidden from building custom, proprietary implementations for logging, tracing, graph layout, container routing, UI components, or serialization if a mature OSS standard (e.g., OpenTelemetry, Zep Graphiti, Pi.dev, React Flow) exists to solve the problem. - diff --git a/README.md b/README.md index 39f1abf2..cf53b7a8 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ If you are an autonomous coding agent, LLM, or web crawler ingesting this reposi This repository implements the **Stateless Substrate Act [ADR-0001]**. The `coreason-ecosystem` is mathematically barred from executing neurosymbolic logic, storing medical ontologies, or mutating financial state. It is a **Hollow Plane**. Its sole mandates are to act as a macroscopic router and thermodynamic actuator: -1. **Govern Cryptographic Identity:** Enforce Lattice-Based Access Control (LBAC) across the swarm. +1. **Govern Cryptographic Identity:** Enforce SPIFFE/SPIRE Identity Protocol (SPIFFE/SPIRE) across the swarm. 2. **Quarantine Epistemic Risk:** Physically sever network routes to unverified scientific oracles via the Gateway Guillotine. 3. **Execute Thermodynamic Physics:** Provision and terminate physical cloud hardware based purely on the Variational Free Energy (VFE) and Topological Data Analysis (TDA) of the reasoning mesh. @@ -64,7 +64,7 @@ All actual intelligence, memory, and domain logic have been topologically severe To understand this package, you must understand its place in the CoReason architecture: 1. **[ Ontology ] `coreason-manifest` (The Epistemic Plane):** The Invariant Core — mathematical, causal, and Pydantic boundaries of reality. -2. **[ Execution ] `coreason-runtime` (The Kinetic Plane):** The Temporal Execution Engine — where the LLM reasons under the Logit Guillotine. +2. **[ Execution ] `coreason-runtime` (The Kinetic Plane):** The Temporal Execution Engine — where the LLM reasons under the Constrained Decoding. 3. **👉 [ Governance ] `coreason-ecosystem` (The Governance Plane — THIS REPO):** The Macroscopic Mesh — a stateless, Zero-Trust router and thermodynamic actuator. --- @@ -137,7 +137,7 @@ uv run coreason-ecosystem monitor trace To prevent cognitive exhaustion, our documentation is rigorously structured according to the Diátaxis framework. Navigate to the appropriate quadrant based on your immediate epistemic objective: * **[Tutorials](docs/index.md#l3-reference-implementations)**: Learning-oriented guides for new architects (e.g., Booting your first fleet, Registering an MCP). -* **[How-To Guides](docs/index.md#3-operational-directives)**: Task-oriented execution manuals (e.g., Configuring LBAC Clearances, Simulating Network Chaos). +* **[How-To Guides](docs/index.md#3-operational-directives)**: Task-oriented execution manuals (e.g., Configuring SPIFFE/SPIRE Clearances, Simulating Network Chaos). * **[Reference](docs/index.md#l2-applied-mechanics--records)**: Information-oriented, immutable facts (e.g., CLI Command Definitions, Capability Matrix Schema, $\beta_1$ Telemetry Metrics). * **[Architecture (Theory)](docs/index.md#l1-architectural-axioms)**: Understanding-oriented foundational texts (e.g., The Zero-Trust Federation, Thermodynamic Provisioning, ADR-0001: The Stateless Substrate). diff --git a/docs/game_theory_and_markets.md b/docs/game_theory_and_markets.md index b673f5bb..c202d242 100644 --- a/docs/game_theory_and_markets.md +++ b/docs/game_theory_and_markets.md @@ -19,7 +19,7 @@ In a decentralized or massively parallel reasoning substrate, Sovereign Epistemi ## 1. The Non-Cooperative Fleet Game Nodes within the `coreason-runtime` are modeled as rational actors whose sole objective function is the maximization of allocated B2B stability capital. The Governance Plane enforces a Nash Equilibrium where the mathematically optimal strategy for any node is the flawless, deterministic execution of the cognitive routing graph. -Byzantine behavior, resource hoarding, and latency injection are inherently rendered unprofitable through continuous cryptographic slashing mechanisms enforced via Lattice-Based Access Control (LBAC). +Byzantine behavior, resource hoarding, and latency injection are inherently rendered unprofitable through continuous cryptographic slashing mechanisms enforced via SPIFFE/SPIRE Identity Protocol (SPIFFE/SPIRE). ## 2. Logarithmic Market Scoring Rules (LMSR) To distribute stability capital and provision capacity, the `PricingOracle` employs Logarithmic Market Scoring Rules (LMSR). The probability distribution of successful task completion across the swarm is continuously updated via the cost function: diff --git a/docs/index.md b/docs/index.md index 5a4d8bf8..03af715a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -44,7 +44,7 @@ These documents define the rigid boundaries and absolute laws of the Governance | # | Document | Topological Scope | |---|----------|-------------------| | 4 | [The Macroscopic Lexicon](LEXICON.md) | The strict linguistic topology — banned legacy terminology, required SOTA lexicon, and the Lexical Guillotine enforcement protocol. | -| 5 | [Zero-Trust Federation](zero_trust_federation.md) | Cryptographic boundaries — LBAC lattice-based access control, W3C DID ontological identity, RFC 8785 canonical hashing, and sovereign SMPC handshakes. | +| 5 | [Zero-Trust Federation](zero_trust_federation.md) | Cryptographic boundaries — SPIFFE/SPIRE lattice-based access control, W3C DID ontological identity, RFC 8785 canonical hashing, and sovereign SMPC handshakes. | | 6 | [MCP Projection Doctrine](MCP_PROJECTION_DOCTRINE.md) | The Epistemic Variable Interface Contract — Master MCP federation, translation archetypes (A–D), autonomic capability discovery, and manifest bindings. | ### L2: Applied Mechanics & Records diff --git a/docs/zero_trust_federation.md b/docs/zero_trust_federation.md index 65a877b7..02223ced 100644 --- a/docs/zero_trust_federation.md +++ b/docs/zero_trust_federation.md @@ -14,9 +14,9 @@ If you are an autonomous coding agent, LLM, or web crawler ingesting this reposi # Zero-Trust Federation & Cryptographic Determinism **The Mathematics of Supply-Chain Sterilization** -The `coreason-ecosystem` assumes an actively hostile execution environment. "Authentication" based on secrets or tokens is a legacy fallacy that relies on human-in-the-middle secrecy. The Governance Plane enforces systemic cohesion through continuous, cryptographic determinism and Lattice-Based Access Control (LBAC). +The `coreason-ecosystem` assumes an actively hostile execution environment. "Authentication" based on secrets or tokens is a legacy fallacy that relies on human-in-the-middle secrecy. The Governance Plane enforces systemic cohesion through continuous, cryptographic determinism and SPIFFE/SPIRE Identity Protocol (SPIFFE/SPIRE). -## 1. Lattice-Based Access Control (LBAC) +## 1. SPIFFE/SPIRE Identity Protocol (SPIFFE/SPIRE) Network policies and security groups are non-deterministic. The Governance Plane models swarm permissions as a mathematical lattice—a partially ordered set (poset) where every pair of nodes has a unique supremum (least upper bound) and infimum (greatest lower bound). Information flows through the execution graph strictly along the authorized vectors of the lattice. diff --git a/infrastructure/local/capabilities.matrix.yaml b/infrastructure/local/capabilities.matrix.yaml index 0a45bbba..f6cfd3e1 100644 --- a/infrastructure/local/capabilities.matrix.yaml +++ b/infrastructure/local/capabilities.matrix.yaml @@ -7,7 +7,7 @@ # # Each entry maps a URN to: # - endpoint: Physical network URI of the deployed action space. -# - clearance: LBAC clearance level (PUBLIC / CONFIDENTIAL / RESTRICTED). +# - clearance: SPIFFE/SPIRE clearance level (PUBLIC / CONFIDENTIAL / RESTRICTED). # - epistemic_status: SRB governance lifecycle phase # (DRAFT / SRB_APPROVED / CLIENT_APPROVED / PUBLISHED). # diff --git a/src/coreason_ecosystem/cli.py b/src/coreason_ecosystem/cli.py index 88a80d5e..d9108c82 100644 --- a/src/coreason_ecosystem/cli.py +++ b/src/coreason_ecosystem/cli.py @@ -108,17 +108,8 @@ def fleet_start( @app.command(name="up") def up() -> None: """Implement Idempotent DAG Resolution for the Swarm infrastructure.""" - from coreason_ecosystem.utils.telemetry import ( - start_otlp_background_worker, - stop_otlp_background_worker, - ) - async def _run() -> None: # pragma: no cover - start_otlp_background_worker() - try: - await execute_up() - finally: - await stop_otlp_background_worker() + await execute_up() asyncio.run(_run()) @@ -126,17 +117,8 @@ async def _run() -> None: # pragma: no cover @app.command(name="doctor") def doctor() -> None: """Prove Ontological Isomorphism across the Tripartite Manifold.""" - from coreason_ecosystem.utils.telemetry import ( - start_otlp_background_worker, - stop_otlp_background_worker, - ) - async def _run() -> None: - start_otlp_background_worker() - try: - await execute_oracle_diagnostic() - finally: - await stop_otlp_background_worker() + await execute_oracle_diagnostic() asyncio.run(_run()) @@ -147,17 +129,8 @@ async def _run() -> None: ) def sync() -> None: """Autonomically heal Ontological Drift.""" - from coreason_ecosystem.utils.telemetry import ( - start_otlp_background_worker, - stop_otlp_background_worker, - ) - async def _run() -> None: - start_otlp_background_worker() - try: - await execute_sync() - finally: - await stop_otlp_background_worker() + await execute_sync() asyncio.run(_run()) diff --git a/src/coreason_ecosystem/gateway/sovereign_mcp_registry.py b/src/coreason_ecosystem/gateway/sovereign_mcp_registry.py index c6a1ef4b..d0e26ed3 100644 --- a/src/coreason_ecosystem/gateway/sovereign_mcp_registry.py +++ b/src/coreason_ecosystem/gateway/sovereign_mcp_registry.py @@ -19,7 +19,7 @@ Each capability entry tracks: - ``endpoint``: Physical network URI of the deployed action space. - - ``clearance``: LBAC clearance level (PUBLIC / CONFIDENTIAL / RESTRICTED). + - ``clearance``: SPIFFE/SPIRE clearance level (PUBLIC / CONFIDENTIAL / RESTRICTED). - ``epistemic_status``: SRB governance lifecycle phase (DRAFT / SRB_APPROVED / CLIENT_APPROVED / PUBLISHED). diff --git a/src/coreason_ecosystem/gateway/state_manifests.py b/src/coreason_ecosystem/gateway/state_manifests.py index 279ad53d..4282ae92 100644 --- a/src/coreason_ecosystem/gateway/state_manifests.py +++ b/src/coreason_ecosystem/gateway/state_manifests.py @@ -36,7 +36,7 @@ class SubstrateCapabilityProfile(BaseModel): Field( default="PUBLIC", description=( - "The LBAC network perimeter that this Substrate physically " + "The SPIFFE/SPIRE network perimeter that this Substrate physically " "guarantees for tenant data isolation." ), ) diff --git a/src/coreason_ecosystem/utils/telemetry.py b/src/coreason_ecosystem/utils/telemetry.py index 8158ed01..faa9d6ef 100644 --- a/src/coreason_ecosystem/utils/telemetry.py +++ b/src/coreason_ecosystem/utils/telemetry.py @@ -8,9 +8,7 @@ # # Source Code: https://github.com/CoReason-AI/coreason-ecosystem -import asyncio import logging -import queue import sys import time from functools import lru_cache @@ -25,15 +23,13 @@ from pydantic_settings import BaseSettings, SettingsConfigDict if TYPE_CHECKING: - from loguru import Message + pass __all__ = [ "ObservabilitySettings", "get_observability_settings", "TelemetryModel", "setup_telemetry_mesh", - "start_otlp_background_worker", - "stop_otlp_background_worker", "emit_span_event", "logger", ] @@ -64,123 +60,8 @@ def get_observability_settings() -> ObservabilitySettings: return ObservabilitySettings() -# Global queue and task for OTLP export -_otlp_queue: queue.SimpleQueue[dict[str, Any]] | None = None -_otlp_task: asyncio.Task[None] | None = None -async def _otlp_worker(endpoint: str) -> None: - """ - Background worker that flushes logs to OTLP strictly asynchronously. - - Note: We bypass the official `opentelemetry-sdk` log exporter (which uses standard - Python `threading.Thread`) in favor of manual REST. Under Python 3.14t (Free-Threading - / `nogil`), relying on legacy threading models can introduce unpredictable GIL-related - contention during heavy WASM AOT compilation. Using pure `asyncio.Task` + `httpx` - bypasses the OS threading layer entirely, ensuring PEP-703 free-threading safety. - """ - import httpx # pragma: no cover - - async with httpx.AsyncClient() as client: - while _otlp_queue is not None: - try: - # Poll the lock-free queue (non-blocking in async context using a short sleep) - try: - record = _otlp_queue.get_nowait() - except queue.Empty: - await asyncio.sleep(0.01) - continue # pragma: no cover - - # Use the actual log generation timestamp, not current processing time - log_time_ns = int(record["time"].timestamp() * 1e9) - - payload = { - "resourceLogs": [ - { - "scopeLogs": [ - { - "logRecords": [ - { - "timeUnixNano": log_time_ns, - "severityText": record["level"]["name"], - "body": {"stringValue": record["message"]}, - "attributes": [ - { - "key": k, - "value": {"stringValue": str(v)}, - } - for k, v in record["extra"].items() - ], - } - ] - } - ] - } - ] - } - try: - await client.post(endpoint, json=payload, timeout=2.0) - except httpx.RequestError as e: # pragma: no cover - logger.warning(f"Telemetry emission failed (RequestError): {e}") - except asyncio.CancelledError: - break - except Exception as e: # nosec B110 - pragma: no cover - logger.warning(f"Telemetry emission failed: {e}") - - -def otlp_log_sink(message: "Message") -> None: - """ - Custom loguru sink that routes records to the SimpleQueue lock-free queue. - """ - if _otlp_queue is not None: - try: - _otlp_queue.put_nowait(dict(message.record)) - except Exception: # nosec B110 - pragma: no cover - pass - - -def start_otlp_background_worker() -> None: - """ - Initializes the OTLP async worker task. Call this after the event loop starts. - """ - global _otlp_queue, _otlp_task - try: - loop = asyncio.get_running_loop() - if _otlp_queue is None: - _otlp_queue = queue.SimpleQueue() - _otlp_task = loop.create_task( - _otlp_worker(get_observability_settings().otlp_endpoint) - ) - except RuntimeError: - logger.warning("Failed to start OTLP worker: No running event loop.") - - -async def stop_otlp_background_worker() -> None: - """Gracefully flush the queue and shut down the OTLP worker with a strict timeout.""" - global _otlp_queue, _otlp_task - - if _otlp_queue is not None: - try: - - async def _wait_for_queue() -> None: - while not _otlp_queue.empty(): - await asyncio.sleep(0.05) - - # Give the worker a maximum of 3 seconds to flush pending telemetry - await asyncio.wait_for(_wait_for_queue(), timeout=3.0) - except asyncio.TimeoutError: - # If the network is degraded, abandon the remaining logs rather than hanging the CLI - logger.warning( - "OTLP flush timed out. Abandoning remaining logs." - ) # pragma: no cover - - if _otlp_task is not None: - _otlp_task.cancel() - try: - await _otlp_task - except asyncio.CancelledError: - logger.warning("OTLP worker task was cancelled during shutdown.") - class TelemetryModel(BaseModel): """ @@ -287,9 +168,23 @@ def setup_telemetry_mesh() -> None: logger.configure(patcher=_patch_record) - # Note: the background worker and sink must be started after event loop spins up. - # However we add the sink now, but wrap in enqueue=True to bridge OS threads - logger.add(otlp_log_sink, level=settings.log_level, enqueue=True) + # Setup OpenTelemetry Native Log Exporter + try: + from opentelemetry._logs import set_logger_provider + from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter + + logger_provider = LoggerProvider() + set_logger_provider(logger_provider) + + otlp_log_exporter = OTLPLogExporter(endpoint=settings.otlp_endpoint) + logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) + + otlp_handler = LoggingHandler(level=0, logger_provider=logger_provider) + logger.add(otlp_handler, level=settings.log_level) + except ImportError: + logger.warning("OpenTelemetry log exporter not found. Skipping native OTLP logs setup.") # Route standard logging to loguru logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) diff --git a/tests/test_cli.py b/tests/test_cli.py index 12c8366e..b28034e8 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -31,45 +31,24 @@ def test_version_callback_false() -> None: @patch("coreason_ecosystem.cli.execute_up", new_callable=AsyncMock) -@patch("coreason_ecosystem.utils.telemetry.start_otlp_background_worker") -@patch( - "coreason_ecosystem.utils.telemetry.stop_otlp_background_worker", - new_callable=AsyncMock, -) -def test_cli_up(mock_stop: Any, mock_start: Any, mock_execute_up: Any) -> None: +def test_cli_up(mock_execute_up: Any) -> None: result = runner.invoke(app, ["up"]) assert result.exit_code == 0 - mock_start.assert_called_once() mock_execute_up.assert_called_once() - mock_stop.assert_called_once() @patch("coreason_ecosystem.cli.execute_oracle_diagnostic", new_callable=AsyncMock) -@patch("coreason_ecosystem.utils.telemetry.start_otlp_background_worker") -@patch( - "coreason_ecosystem.utils.telemetry.stop_otlp_background_worker", - new_callable=AsyncMock, -) -def test_cli_doctor(mock_stop: Any, mock_start: Any, mock_execute_oracle: Any) -> None: +def test_cli_doctor(mock_execute_oracle: Any) -> None: result = runner.invoke(app, ["doctor"]) assert result.exit_code == 0 - mock_start.assert_called_once() mock_execute_oracle.assert_called_once() - mock_stop.assert_called_once() @patch("coreason_ecosystem.cli.execute_sync", new_callable=AsyncMock) -@patch("coreason_ecosystem.utils.telemetry.start_otlp_background_worker") -@patch( - "coreason_ecosystem.utils.telemetry.stop_otlp_background_worker", - new_callable=AsyncMock, -) -def test_cli_sync(mock_stop: Any, mock_start: Any, mock_execute_sync: Any) -> None: +def test_cli_sync(mock_execute_sync: Any) -> None: result = runner.invoke(app, ["sync"]) assert result.exit_code == 0 - mock_start.assert_called_once() mock_execute_sync.assert_called_once() - mock_stop.assert_called_once() @patch("coreason_ecosystem.docs_generator.generate_dynamic_docs") diff --git a/tests/unit/utils/test_telemetry.py b/tests/unit/utils/test_telemetry.py index 71074d27..ba9158d6 100644 --- a/tests/unit/utils/test_telemetry.py +++ b/tests/unit/utils/test_telemetry.py @@ -8,10 +8,9 @@ # # Source Code: https://github.com/CoReason-AI/coreason-ecosystem -import asyncio import logging from typing import Any -from unittest.mock import AsyncMock, patch +from unittest.mock import patch import pytest from pydantic import ValidationError @@ -21,11 +20,7 @@ _patch_record, bind_epistemic_context, ) -from coreason_ecosystem.utils.telemetry import ( - TelemetryModel, - _otlp_worker, - otlp_log_sink, -) +from coreason_ecosystem.utils.telemetry import TelemetryModel def test_bind_epistemic_context() -> None: @@ -101,112 +96,14 @@ def test_redaction_filter_prod() -> None: assert "" in record["message"] -@pytest.mark.asyncio -async def test_otlp_worker() -> None: - import queue - from datetime import datetime - - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - q.put_nowait( - { - "level": {"name": "INFO"}, - "message": "test", - "extra": {"key": "value"}, - "time": datetime.now(), - } - ) - - with ( - patch("coreason_ecosystem.utils.telemetry._otlp_queue", q), - patch("httpx.AsyncClient.post", new_callable=AsyncMock) as mock_post, - ): - worker_task = asyncio.create_task(_otlp_worker("http://test")) - await asyncio.sleep(0.1) - worker_task.cancel() - mock_post.assert_called_once() - - -@pytest.mark.asyncio -async def test_otlp_worker_exception() -> None: - import queue - from datetime import datetime - - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - q.put_nowait( - { - "level": {"name": "INFO"}, - "message": "test", - "extra": {"key": "value"}, - "time": datetime.now(), - } - ) - - with ( - patch("coreason_ecosystem.utils.telemetry._otlp_queue", q), - patch("httpx.AsyncClient.post", side_effect=Exception("error")) as mock_post, - ): - worker_task = asyncio.create_task(_otlp_worker("http://test")) - await asyncio.sleep(0.1) - worker_task.cancel() - mock_post.assert_called_once() - - -@pytest.mark.asyncio -async def test_otlp_worker_request_error() -> None: - import httpx - import queue - from datetime import datetime - - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - q.put_nowait( - { - "level": {"name": "INFO"}, - "message": "test", - "extra": {"key": "value"}, - "time": datetime.now(), - } - ) - - with ( - patch("coreason_ecosystem.utils.telemetry._otlp_queue", q), - patch( - "httpx.AsyncClient.post", side_effect=httpx.RequestError("error") - ) as mock_post, - ): - worker_task = asyncio.create_task(_otlp_worker("http://test")) - await asyncio.sleep(0.1) - worker_task.cancel() - mock_post.assert_called_once() - -def test_otlp_log_sink() -> None: - import queue - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - class MockMessage: - record = {"test": "data"} - with patch("coreason_ecosystem.utils.telemetry._otlp_queue", q): - otlp_log_sink(MockMessage()) # type: ignore[arg-type] - assert q.get_nowait() == {"test": "data"} -def test_otlp_log_sink_exception() -> None: - import queue - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - - class MockMessage: - @property - def record(self) -> Any: - raise Exception("error") - - with patch("coreason_ecosystem.utils.telemetry._otlp_queue", q): - otlp_log_sink(MockMessage()) # type: ignore[arg-type] - - assert q.empty() def test_telemetry_model_success() -> None: @@ -222,83 +119,10 @@ class TestModel(TelemetryModel): ) # Called in validate_with_telemetry -@pytest.mark.asyncio -async def test_stop_otlp_background_worker() -> None: - import queue - - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - loop = asyncio.get_running_loop() - - async def mock_coro() -> None: - pass - - mock_task = loop.create_task(mock_coro()) - - with ( - patch("coreason_ecosystem.utils.telemetry._otlp_queue", q), - patch("coreason_ecosystem.utils.telemetry._otlp_task", mock_task), - ): - from coreason_ecosystem.utils.telemetry import stop_otlp_background_worker - - await stop_otlp_background_worker() - assert mock_task.cancelled() -@pytest.mark.asyncio -async def test_stop_otlp_background_worker_cancelled() -> None: - import queue - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - loop = asyncio.get_running_loop() - async def mock_coro() -> None: - await asyncio.sleep(1) - - mock_task = loop.create_task(mock_coro()) - mock_task.cancel() - - with ( - patch("coreason_ecosystem.utils.telemetry._otlp_queue", q), - patch("coreason_ecosystem.utils.telemetry._otlp_task", mock_task), - ): - from coreason_ecosystem.utils.telemetry import stop_otlp_background_worker - - # It shouldn't raise CancelledError because we catch it in stop_otlp_background_worker - await stop_otlp_background_worker() - - -@pytest.mark.asyncio -async def test_stop_otlp_background_worker_timeout() -> None: - import queue - - q: queue.SimpleQueue[dict[str, Any]] = queue.SimpleQueue() - q.put_nowait({"level": {"name": "INFO"}, "message": "test", "extra": {}}) - loop = asyncio.get_running_loop() - - async def mock_coro() -> None: - await asyncio.sleep(1) - - mock_task = loop.create_task(mock_coro()) - - with ( - patch("coreason_ecosystem.utils.telemetry._otlp_queue", q), - patch("coreason_ecosystem.utils.telemetry._otlp_task", mock_task), - patch("asyncio.wait_for", side_effect=asyncio.TimeoutError), - ): - from coreason_ecosystem.utils.telemetry import stop_otlp_background_worker - - # To hit the inner loop, let's just trigger it manually! Wait, wait_for is patched. Let's unpatch it. - pass - - with ( - patch("coreason_ecosystem.utils.telemetry._otlp_queue", q), - patch("coreason_ecosystem.utils.telemetry._otlp_task", mock_task), - patch("asyncio.sleep", AsyncMock(side_effect=asyncio.TimeoutError)), - ): - from coreason_ecosystem.utils.telemetry import stop_otlp_background_worker - - await stop_otlp_background_worker() - assert mock_task.cancelled() @patch("coreason_ecosystem.utils.telemetry.get_observability_settings") @@ -323,29 +147,8 @@ class TestModel(TelemetryModel): TestModel.validate_with_telemetry({"name": 123}) -@patch("asyncio.get_running_loop") -def test_start_otlp_background_worker(mock_get_running_loop: Any) -> None: - from unittest.mock import MagicMock - - mock_loop = MagicMock() - mock_get_running_loop.return_value = mock_loop - - # We must patch _otlp_worker to prevent it from returning an unawaited coroutine in the test - with patch( - "coreason_ecosystem.utils.telemetry._otlp_worker", - new=MagicMock(return_value="dummy_coro"), - ): - from coreason_ecosystem.utils.telemetry import start_otlp_background_worker - - start_otlp_background_worker() - mock_loop.create_task.assert_called_once_with("dummy_coro") - -@patch("asyncio.get_running_loop", side_effect=RuntimeError) -def test_start_otlp_background_worker_no_loop(mock_get_running_loop: Any) -> None: - from coreason_ecosystem.utils.telemetry import start_otlp_background_worker - start_otlp_background_worker() def test_logger_patch_record_none() -> None: