diff --git a/README.md b/README.md index d60a57f..6a92315 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ kubeflow-mcp serve \ --instruction-tier full \ # full | compact | minimal --transport stdio \ # stdio | http | sse --auth-token SECRET \ # bearer token for HTTP auth (dev/staging) + --otel-endpoint URL \ # OTLP HTTP endpoint (optional tracing) --log-level INFO \ # DEBUG | INFO | WARNING | ERROR --log-format console \ # console | json (auto-detected if omitted) --no-banner # suppress startup banner @@ -165,6 +166,26 @@ kubeflow-mcp agent \ +## Observability + +OpenTelemetry tracing is optional and can be enabled without changing tool code. + +- Install optional dependencies: `pip install ".[otel]"` +- Enable tracing with CLI flag or env var: + +```bash +kubeflow-mcp serve --otel-endpoint http://localhost:4318 +# or +export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 +kubeflow-mcp serve +``` + +Each tool invocation emits a span with attributes: +`tool.name`, `tool.args_preview`, `tool.success`, `tool.duration_ms`, `kubeflow.persona`, and `correlation_id`. + +> **Note:** `kubeflow-mcp agent --otel-endpoint ...` emits spans under a separate +> `kubeflow-mcp-agent` service in Jaeger, distinct from the `kubeflow-mcp` server spans. + ## Development ```bash diff --git a/kubeflow_mcp/cli.py b/kubeflow_mcp/cli.py index c88b524..e977b21 100644 --- a/kubeflow_mcp/cli.py +++ b/kubeflow_mcp/cli.py @@ -94,6 +94,12 @@ def cli() -> None: "Falls back to KUBEFLOW_MCP_AUTH_TOKEN env var, config file. " "Ignored for stdio transport.", ) +@click.option( + "--otel-endpoint", + default=None, + help="OpenTelemetry OTLP HTTP endpoint for tracing. " + "Falls back to OTEL_EXPORTER_OTLP_ENDPOINT env var, config file.", +) def serve( clients: str | None, persona: str | None, @@ -104,6 +110,7 @@ def serve( instruction_tier: str | None, no_banner: bool, auth_token: str | None, + otel_endpoint: str | None, ) -> None: """Start the MCP server. @@ -116,6 +123,7 @@ def serve( from kubeflow_mcp.core.logging import setup_logging from kubeflow_mcp.core.resilience import configure_circuit_breaker from kubeflow_mcp.core.server import configure_resilience, create_server + from kubeflow_mcp.core.telemetry import setup_tracing cfg = load_config() @@ -128,8 +136,11 @@ def serve( if auth_token: cfg.auth.auth_token = auth_token + if otel_endpoint: + cfg.observability.otel_endpoint = otel_endpoint logger = setup_logging(level=log_level, format=log_format) + tracing_enabled = setup_tracing(endpoint=cfg.observability.otel_endpoint) logger.info( "Starting kubeflow-mcp", extra={ @@ -138,6 +149,7 @@ def serve( "transport": transport, "mode": mode, "instruction_tier": instruction_tier, + "tracing_enabled": tracing_enabled, }, ) diff --git a/kubeflow_mcp/cli_test.py b/kubeflow_mcp/cli_test.py index 968a58c..4dfea7b 100644 --- a/kubeflow_mcp/cli_test.py +++ b/kubeflow_mcp/cli_test.py @@ -97,6 +97,7 @@ def _make_serve_mocks(config=None): mock_load_config = MagicMock(return_value=config) mock_build_auth_provider = MagicMock(return_value=None) mock_configure_circuit_breaker = MagicMock() + mock_setup_tracing = MagicMock(return_value=False) fake_server_mod = MagicMock() fake_server_mod.create_server = mock_create_server @@ -109,6 +110,8 @@ def _make_serve_mocks(config=None): fake_auth_mod.build_auth_provider = mock_build_auth_provider fake_resilience_mod = MagicMock() fake_resilience_mod.configure_circuit_breaker = mock_configure_circuit_breaker + fake_telemetry_mod = MagicMock() + fake_telemetry_mod.setup_tracing = mock_setup_tracing modules_patch = { "kubeflow_mcp.core.server": fake_server_mod, @@ -116,6 +119,7 @@ def _make_serve_mocks(config=None): "kubeflow_mcp.core.config": fake_config_mod, "kubeflow_mcp.core.auth": fake_auth_mod, "kubeflow_mcp.core.resilience": fake_resilience_mod, + "kubeflow_mcp.core.telemetry": fake_telemetry_mod, } return mock_server, mock_create_server, modules_patch @@ -401,3 +405,40 @@ def test_serve_default_shows_banner(): _, kwargs = mock_server.run.call_args assert kwargs.get("show_banner") is True + + +def test_serve_calls_setup_tracing_with_config_endpoint(): + from kubeflow_mcp.core.config import ObservabilityConfig + + config = _make_default_config() + config.observability = ObservabilityConfig(otel_endpoint="http://otel-collector:4318/v1/traces") + mock_server, _, modules_patch = _make_serve_mocks(config=config) + fake_telemetry_mod = modules_patch["kubeflow_mcp.core.telemetry"] + + with patch.dict(sys.modules, modules_patch): + runner = CliRunner() + runner.invoke(cli, ["serve"]) + + fake_telemetry_mod.setup_tracing.assert_called_once_with( + endpoint="http://otel-collector:4318/v1/traces" + ) + + +def test_serve_otel_endpoint_cli_overrides_config(): + from kubeflow_mcp.core.config import ObservabilityConfig + + config = _make_default_config() + config.observability = ObservabilityConfig(otel_endpoint="http://old-endpoint:4318/v1/traces") + mock_server, _, modules_patch = _make_serve_mocks(config=config) + fake_telemetry_mod = modules_patch["kubeflow_mcp.core.telemetry"] + + with patch.dict(sys.modules, modules_patch): + runner = CliRunner() + runner.invoke( + cli, + ["serve", "--otel-endpoint", "http://new-endpoint:4318/v1/traces"], + ) + + fake_telemetry_mod.setup_tracing.assert_called_once_with( + endpoint="http://new-endpoint:4318/v1/traces" + ) diff --git a/kubeflow_mcp/core/config.py b/kubeflow_mcp/core/config.py index 84c8fc3..8627eb7 100644 --- a/kubeflow_mcp/core/config.py +++ b/kubeflow_mcp/core/config.py @@ -50,6 +50,9 @@ level: INFO format: json + observability: + otel_endpoint: http://localhost:4318 + Namespace restrictions are enforced via ``~/.kf-mcp-policy.yaml`` (``policy.namespaces``), not through server config. """ @@ -135,6 +138,12 @@ class LoggingConfig(BaseModel): format: str | None = Field(default=None) +class ObservabilityConfig(BaseModel): + """Observability configuration.""" + + otel_endpoint: str | None = Field(default=None) + + class Config(BaseModel): """Root configuration.""" @@ -144,6 +153,7 @@ class Config(BaseModel): trainer: TrainerConfig = Field(default_factory=TrainerConfig) optimizer: OptimizerConfig = Field(default_factory=OptimizerConfig) logging: LoggingConfig = Field(default_factory=LoggingConfig) + observability: ObservabilityConfig = Field(default_factory=ObservabilityConfig) def _find_config_file() -> Path | None: @@ -229,6 +239,14 @@ def load_config(config_path: Path | None = None) -> Config: format=os.getenv("LOG_FORMAT", logging_file.get("format")), ) + observability_file = file_config.get("observability", {}) + observability = ObservabilityConfig( + otel_endpoint=os.getenv( + "OTEL_EXPORTER_OTLP_ENDPOINT", + observability_file.get("otel_endpoint"), + ) + ) + # Build client-specific configs trainer_file = file_config.get("trainer", {}) trainer = TrainerConfig( @@ -283,6 +301,7 @@ def load_config(config_path: Path | None = None) -> Config: trainer=trainer, optimizer=optimizer, logging=logging_config, + observability=observability, ) diff --git a/kubeflow_mcp/core/logging.py b/kubeflow_mcp/core/logging.py index fc3f397..a317718 100644 --- a/kubeflow_mcp/core/logging.py +++ b/kubeflow_mcp/core/logging.py @@ -49,7 +49,7 @@ def format(self, record: logging.LogRecord) -> str: if ctx is not None: log_dict["context"] = ctx - extra_keys = {"audit", "tool", "parameters", "success", "duration_ms"} + extra_keys = {"audit", "tool", "parameters", "success", "duration_ms", "tracing_enabled"} for key in extra_keys: if hasattr(record, key): log_dict[key] = getattr(record, key) diff --git a/kubeflow_mcp/core/server.py b/kubeflow_mcp/core/server.py index b6c4a9d..b041d72 100644 --- a/kubeflow_mcp/core/server.py +++ b/kubeflow_mcp/core/server.py @@ -21,6 +21,7 @@ import functools import importlib +import json import logging import re import time @@ -41,10 +42,25 @@ HEALTH_TOOLS, ) from kubeflow_mcp.core.logging import with_correlation_id -from kubeflow_mcp.core.policy import apply_policy_filters, get_allowed_tools, is_read_only +from kubeflow_mcp.core.policy import ( + apply_policy_filters, + get_allowed_tools, + get_effective_persona, + is_read_only, +) from kubeflow_mcp.core.resilience import RateLimiter, get_breaker from kubeflow_mcp.core.resources import register_resources from kubeflow_mcp.core.security import mask_sensitive_data +from kubeflow_mcp.core.telemetry import get_tracer + +try: + from opentelemetry.trace import SpanKind + from opentelemetry.trace import Status as _Status + from opentelemetry.trace import StatusCode as _StatusCode +except ImportError: # pragma: no cover + SpanKind = None # type: ignore[assignment,misc] + _Status = None # type: ignore[assignment,misc] + _StatusCode = None # type: ignore[assignment,misc] logger = logging.getLogger(__name__) @@ -84,70 +100,99 @@ def _inject_meta(result: Any, tool_name: str) -> Any: def _audit_wrap(tool_func): """Wrap a tool function with rate limiting, circuit breaking, audit logging, and response metadata.""" + tracer = get_tracer("kubeflow_mcp.tools") @functools.wraps(tool_func) def wrapper(**kwargs): tool_name = tool_func.__name__ - - if _rate_limiter is not None and not _rate_limiter.acquire(): - logger.warning("rate_limited", extra={"tool": tool_name}) - return { - "error": "Rate limit exceeded. Retry after a brief pause.", - "error_code": ErrorCode.RATE_LIMITED, - } - - breaker = get_breaker(tool_name) - if not breaker.can_execute(): - logger.warning("circuit_open", extra={"tool": tool_name}) - return { - "error": f"Circuit breaker open for '{tool_name}' — K8s API may be degraded. Retries automatically after recovery timeout.", - "error_code": ErrorCode.CIRCUIT_OPEN, - } - cid = with_correlation_id() - masked = mask_sensitive_data(kwargs) if kwargs else {} + persona = get_effective_persona() start = time.monotonic() - try: - result = tool_func(**kwargs) - duration_ms = int((time.monotonic() - start) * 1000) - is_success = ( - "error_code" not in result and "error" not in result - if isinstance(result, dict) - else True - ) - if is_success: - breaker.record_success() - elif is_infrastructure_error(result): - breaker.record_failure() - logger.info( - "tool_call", - extra={ - "audit": True, - "correlation_id": cid, - "tool": tool_name, - "parameters": masked, - "success": is_success, - "duration_ms": duration_ms, - }, - ) - return _inject_meta(result, tool_name) - except Exception: - duration_ms = int((time.monotonic() - start) * 1000) - breaker.record_failure() - logger.error( - "tool_call_failed", - extra={ - "audit": True, - "correlation_id": cid, - "tool": tool_name, - "parameters": masked, - "success": False, - "duration_ms": duration_ms, - }, - exc_info=True, + span_kwargs: dict[str, Any] = {} + if SpanKind is not None: + span_kwargs["kind"] = SpanKind.CLIENT + + with tracer.start_as_current_span( + f"tool:{tool_name}", **span_kwargs + ) as span: + span.set_attribute("tool.name", tool_name) + span.set_attribute("kubeflow.persona", persona) + span.set_attribute("correlation_id", cid) + masked = mask_sensitive_data(kwargs) if kwargs else {} + span.set_attribute( + "tool.args_preview", + json.dumps(masked, default=str)[:300], ) - raise + + if _rate_limiter is not None and not _rate_limiter.acquire(): + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("tool.success", False) + span.set_attribute("tool.duration_ms", duration_ms) + logger.warning("rate_limited", extra={"tool": tool_name}) + return { + "error": "Rate limit exceeded. Retry after a brief pause.", + "error_code": ErrorCode.RATE_LIMITED, + } + + breaker = get_breaker(tool_name) + if not breaker.can_execute(): + duration_ms = int((time.monotonic() - start) * 1000) + span.set_attribute("tool.success", False) + span.set_attribute("tool.duration_ms", duration_ms) + logger.warning("circuit_open", extra={"tool": tool_name}) + return { + "error": f"Circuit breaker open for '{tool_name}' — K8s API may be degraded. Retries automatically after recovery timeout.", + "error_code": ErrorCode.CIRCUIT_OPEN, + } + + try: + result = tool_func(**kwargs) + duration_ms = int((time.monotonic() - start) * 1000) + is_success = ( + "error_code" not in result and "error" not in result + if isinstance(result, dict) + else True + ) + span.set_attribute("tool.success", is_success) + span.set_attribute("tool.duration_ms", duration_ms) + if is_success: + breaker.record_success() + elif is_infrastructure_error(result): + breaker.record_failure() + + logger.info( + "tool_call", + extra={ + "audit": True, + "correlation_id": cid, + "tool": tool_name, + "parameters": masked, + "success": is_success, + "duration_ms": duration_ms, + }, + ) + return _inject_meta(result, tool_name) + except Exception as exc: + duration_ms = int((time.monotonic() - start) * 1000) + breaker.record_failure() + span.set_attribute("tool.success", False) + span.set_attribute("tool.duration_ms", duration_ms) + if _StatusCode is not None: + span.set_status(_Status(_StatusCode.ERROR, str(exc))) + logger.error( + "tool_call_failed", + extra={ + "audit": True, + "correlation_id": cid, + "tool": tool_name, + "parameters": masked, + "success": False, + "duration_ms": duration_ms, + }, + exc_info=True, + ) + raise return wrapper diff --git a/kubeflow_mcp/core/telemetry.py b/kubeflow_mcp/core/telemetry.py new file mode 100644 index 0000000..b00f836 --- /dev/null +++ b/kubeflow_mcp/core/telemetry.py @@ -0,0 +1,135 @@ +# Copyright The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenTelemetry tracing helpers with safe no-op fallback.""" + +from __future__ import annotations + +import atexit +import logging +import threading +from collections.abc import Generator +from contextlib import contextmanager +from typing import Any +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + +try: + from opentelemetry import trace as _otel_trace + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + + _OTEL_AVAILABLE = True +except ImportError: + _OTEL_AVAILABLE = False + + +class _NoopSpan: + def set_attribute(self, key: str, value: Any) -> None: + return None + + def record_exception(self, exception: BaseException, **kwargs: Any) -> None: + return None + + def set_status(self, status: Any, description: str | None = None) -> None: + return None + + +class _NoopTracer: + @contextmanager + def start_as_current_span( + self, name: str, **kwargs: Any + ) -> Generator[_NoopSpan, None, None]: + yield _NoopSpan() + + +_NOOP_TRACER = _NoopTracer() +_tracing_initialized = False +_configured_endpoint: str | None = None +_setup_lock = threading.Lock() + + +def _normalize_and_validate_endpoint(endpoint: str | None) -> str: + """Normalize and validate OTLP endpoint.""" + normalized_endpoint = endpoint.strip() if endpoint is not None else "" + if not normalized_endpoint: + return "" + + parsed = urlparse(normalized_endpoint) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError( + "Invalid OpenTelemetry endpoint. Use a full HTTP(S) URL, " + "for example: http://localhost:4318" + ) + # Append the standard OTLP trace path when only a base URL is provided, + # matching the OTel SDK convention for OTEL_EXPORTER_OTLP_ENDPOINT. + if not parsed.path or parsed.path == "/": + normalized_endpoint = normalized_endpoint.rstrip("/") + "/v1/traces" + return normalized_endpoint + + +def setup_tracing(endpoint: str | None, service_name: str = "kubeflow-mcp") -> bool: + """Configure OpenTelemetry tracing. + + Returns True when tracing is configured, False when disabled/unavailable. + """ + global _configured_endpoint, _tracing_initialized + + normalized_endpoint = _normalize_and_validate_endpoint(endpoint) + if not normalized_endpoint: + return False + + if not _OTEL_AVAILABLE: + logger.warning( + "OpenTelemetry endpoint configured but OTel packages are not installed. " + "Install with: pip install '.[otel]'" + ) + return False + + with _setup_lock: + if _tracing_initialized: + if _configured_endpoint and _configured_endpoint != normalized_endpoint: + logger.warning( + "Tracing already initialized for endpoint '%s'; ignoring new endpoint '%s'.", + _configured_endpoint, + normalized_endpoint, + ) + return True + + exporter = OTLPSpanExporter(endpoint=normalized_endpoint) + processor = BatchSpanProcessor(exporter) + current_provider = _otel_trace.get_tracer_provider() + + if hasattr(current_provider, "add_span_processor"): + current_provider.add_span_processor(processor) + else: + resource = Resource.create({"service.name": service_name}) + provider = TracerProvider(resource=resource) + provider.add_span_processor(processor) + _otel_trace.set_tracer_provider(provider) + atexit.register(provider.shutdown) + + _tracing_initialized = True + _configured_endpoint = normalized_endpoint + return True + + +def get_tracer(name: str = "kubeflow_mcp") -> Any: + """Return OpenTelemetry tracer or no-op tracer when OTel is unavailable.""" + if not _OTEL_AVAILABLE: + return _NOOP_TRACER + return _otel_trace.get_tracer(name) diff --git a/pyproject.toml b/pyproject.toml index 0e0d3df..f3caaec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,10 @@ docs = [ "sphinx-copybutton>=0.5", "sphinx-design>=0.5", ] +otel = [ + "opentelemetry-exporter-otlp-proto-http>=1.25.0", + "opentelemetry-sdk>=1.25.0", +] [project.scripts] kubeflow-mcp = "kubeflow_mcp.cli:cli" diff --git a/tests/unit/core/test_telemetry.py b/tests/unit/core/test_telemetry.py new file mode 100644 index 0000000..a543beb --- /dev/null +++ b/tests/unit/core/test_telemetry.py @@ -0,0 +1,287 @@ +# Copyright The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenTelemetry tracing tests.""" + +from __future__ import annotations + +from contextlib import contextmanager +from types import SimpleNamespace + +import pytest + +from kubeflow_mcp.core import telemetry +from kubeflow_mcp.core.server import _audit_wrap + + +class _FakeSpan: + def __init__(self) -> None: + self.attributes: dict[str, object] = {} + self.exceptions: list[BaseException] = [] + self.status_code: object | None = None + self.status_description: str | None = None + + def set_attribute(self, key: str, value: object) -> None: + self.attributes[key] = value + + def record_exception(self, exception: BaseException) -> None: + self.exceptions.append(exception) + + def set_status(self, code: object, description: str | None = None) -> None: + self.status_code = code + self.status_description = description + + +class _FakeTracer: + def __init__(self, span: _FakeSpan) -> None: + self._span = span + self.last_span_name: str | None = None + self.last_span_kwargs: dict[str, object] = {} + + @contextmanager + def start_as_current_span(self, name: str, **kwargs): + self.last_span_name = name + self.last_span_kwargs = kwargs + yield self._span + + +class _FakeBreaker: + def __init__(self, can_execute: bool = True) -> None: + self._can_execute = can_execute + self.successes = 0 + self.failures = 0 + + def can_execute(self) -> bool: + return self._can_execute + + def record_success(self) -> None: + self.successes += 1 + + def record_failure(self) -> None: + self.failures += 1 + + +def test_get_tracer_returns_noop_when_otel_unavailable(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(telemetry, "_OTEL_AVAILABLE", False) + tracer = telemetry.get_tracer("test") + with tracer.start_as_current_span("span") as span: + span.set_attribute("k", "v") + span.record_exception(ValueError("boom")) + + +def test_setup_tracing_noop_when_otel_unavailable(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(telemetry, "_OTEL_AVAILABLE", False) + monkeypatch.setattr(telemetry, "_tracing_initialized", False) + assert telemetry.setup_tracing("http://collector:4318/v1/traces") is False + + +def test_setup_tracing_configures_provider_when_available(monkeypatch: pytest.MonkeyPatch) -> None: + calls: dict[str, object] = {} + + class _FakeProvider: + def __init__(self, resource: object) -> None: + calls["resource"] = resource + self._processors: list[object] = [] + + def add_span_processor(self, processor: object) -> None: + self._processors.append(processor) + calls["processor"] = processor + + def shutdown(self) -> None: + pass + + class _FakeResource: + @staticmethod + def create(data: dict[str, str]) -> dict[str, str]: + return data + + class _FakeBatchProcessor: + def __init__(self, exporter: object) -> None: + self.exporter = exporter + + class _FakeExporter: + def __init__(self, endpoint: str) -> None: + self.endpoint = endpoint + calls["endpoint"] = endpoint + + fake_trace = SimpleNamespace() + + def _set_tracer_provider(provider: object) -> None: + calls["provider"] = provider + + def _get_tracer(name: str) -> str: + return f"tracer:{name}" + + def _get_tracer_provider() -> object: + return object() + + fake_trace.set_tracer_provider = _set_tracer_provider + fake_trace.get_tracer = _get_tracer + fake_trace.get_tracer_provider = _get_tracer_provider + + monkeypatch.setattr(telemetry, "_OTEL_AVAILABLE", True) + monkeypatch.setattr(telemetry, "_tracing_initialized", False) + monkeypatch.setattr(telemetry, "_configured_endpoint", None, raising=False) + monkeypatch.setattr(telemetry, "Resource", _FakeResource, raising=False) + monkeypatch.setattr(telemetry, "TracerProvider", _FakeProvider, raising=False) + monkeypatch.setattr(telemetry, "BatchSpanProcessor", _FakeBatchProcessor, raising=False) + monkeypatch.setattr(telemetry, "OTLPSpanExporter", _FakeExporter, raising=False) + monkeypatch.setattr(telemetry, "_otel_trace", fake_trace, raising=False) + + assert telemetry.setup_tracing("http://collector:4318/v1/traces", "kubeflow-mcp") is True + assert calls["endpoint"] == "http://collector:4318/v1/traces" + assert calls["resource"] == {"service.name": "kubeflow-mcp"} + assert telemetry.get_tracer("unit") == "tracer:unit" + + +def test_setup_tracing_treats_whitespace_endpoint_as_disabled( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(telemetry, "_OTEL_AVAILABLE", True) + monkeypatch.setattr(telemetry, "_tracing_initialized", False) + monkeypatch.setattr(telemetry, "_configured_endpoint", None, raising=False) + assert telemetry.setup_tracing(" ") is False + + +def test_setup_tracing_rejects_invalid_endpoint() -> None: + with pytest.raises(ValueError, match="Invalid OpenTelemetry endpoint"): + telemetry.setup_tracing("localhost:4318/v1/traces") + + +def test_setup_tracing_reuses_existing_provider(monkeypatch: pytest.MonkeyPatch) -> None: + calls: dict[str, object] = {} + + class _ExistingProvider: + def __init__(self) -> None: + self.processors: list[object] = [] + + def add_span_processor(self, processor: object) -> None: + self.processors.append(processor) + calls["processor"] = processor + + class _FakeBatchProcessor: + def __init__(self, exporter: object) -> None: + self.exporter = exporter + + class _FakeExporter: + def __init__(self, endpoint: str) -> None: + calls["endpoint"] = endpoint + + provider = _ExistingProvider() + fake_trace = SimpleNamespace() + fake_trace.get_tracer_provider = lambda: provider + fake_trace.get_tracer = lambda _name: "tracer" + fake_trace.set_tracer_provider = lambda _provider: calls.update({"set_called": True}) + + monkeypatch.setattr(telemetry, "_OTEL_AVAILABLE", True) + monkeypatch.setattr(telemetry, "_tracing_initialized", False) + monkeypatch.setattr(telemetry, "_configured_endpoint", None, raising=False) + monkeypatch.setattr(telemetry, "BatchSpanProcessor", _FakeBatchProcessor, raising=False) + monkeypatch.setattr(telemetry, "OTLPSpanExporter", _FakeExporter, raising=False) + monkeypatch.setattr(telemetry, "_otel_trace", fake_trace, raising=False) + + assert telemetry.setup_tracing("http://collector:4318/v1/traces", "kubeflow-mcp") is True + assert calls["endpoint"] == "http://collector:4318/v1/traces" + assert "processor" in calls + assert "set_called" not in calls + + +def test_audit_wrap_sets_span_attributes_on_success(monkeypatch: pytest.MonkeyPatch) -> None: + import kubeflow_mcp.core.server as server_mod + + span = _FakeSpan() + tracer = _FakeTracer(span) + breaker = _FakeBreaker() + monkeypatch.setattr(server_mod, "_rate_limiter", None) + monkeypatch.setattr(server_mod, "with_correlation_id", lambda: "cid-123") + monkeypatch.setattr(server_mod, "get_effective_persona", lambda: "ml-engineer") + monkeypatch.setattr(server_mod, "get_tracer", lambda _name: tracer) + monkeypatch.setattr(server_mod, "get_breaker", lambda _tool: breaker) + + def sample_tool(**_kwargs): + return {"ok": True} + + wrapped = _audit_wrap(sample_tool) + wrapped() + + assert span.attributes["tool.name"] == "sample_tool" + assert tracer.last_span_name == "tool:sample_tool" + assert span.attributes["correlation_id"] == "cid-123" + assert span.attributes["kubeflow.persona"] == "ml-engineer" + assert span.attributes["tool.success"] is True + assert "tool.duration_ms" in span.attributes + assert span.attributes["tool.args_preview"] == "{}" + assert breaker.successes == 1 + # Verify SpanKind.CLIENT is passed when OTel is available + from kubeflow_mcp.core.server import SpanKind as _SpanKind + + if _SpanKind is not None: + assert tracer.last_span_kwargs.get("kind") == _SpanKind.CLIENT + + +def test_audit_wrap_records_exception_on_failure(monkeypatch: pytest.MonkeyPatch) -> None: + import kubeflow_mcp.core.server as server_mod + + span = _FakeSpan() + breaker = _FakeBreaker() + monkeypatch.setattr(server_mod, "_rate_limiter", None) + monkeypatch.setattr(server_mod, "with_correlation_id", lambda: "cid-123") + monkeypatch.setattr(server_mod, "get_effective_persona", lambda: "readonly") + monkeypatch.setattr(server_mod, "get_tracer", lambda _name: _FakeTracer(span)) + monkeypatch.setattr(server_mod, "get_breaker", lambda _tool: breaker) + + def failing_tool(**_kwargs): + raise RuntimeError("boom") + + wrapped = _audit_wrap(failing_tool) + with pytest.raises(RuntimeError, match="boom"): + wrapped() + + assert span.attributes["tool.success"] is False + assert "tool.duration_ms" in span.attributes + assert breaker.failures == 1 + # record_exception is no longer called manually; start_as_current_span + # auto-records it. Instead, set_status(ERROR) must be called. + assert len(span.exceptions) == 0 + from kubeflow_mcp.core.server import _StatusCode + + if _StatusCode is not None: + status = span.status_code + assert status.status_code == _StatusCode.ERROR + assert status.description == "boom" + + +def test_audit_wrap_circuit_breaker_open(monkeypatch: pytest.MonkeyPatch) -> None: + """Circuit-open path sets tool.success=False on the span before the early return.""" + import kubeflow_mcp.core.server as server_mod + + span = _FakeSpan() + tracer = _FakeTracer(span) + breaker = _FakeBreaker(can_execute=False) + monkeypatch.setattr(server_mod, "_rate_limiter", None) + monkeypatch.setattr(server_mod, "with_correlation_id", lambda: "cid-456") + monkeypatch.setattr(server_mod, "get_effective_persona", lambda: "readonly") + monkeypatch.setattr(server_mod, "get_tracer", lambda _name: tracer) + monkeypatch.setattr(server_mod, "get_breaker", lambda _tool: breaker) + + def sample_tool(**_kwargs): + return {"ok": True} + + wrapped = _audit_wrap(sample_tool) + result = wrapped() + + assert span.attributes["tool.success"] is False + assert "tool.duration_ms" in span.attributes + assert "error" in result + assert result["error_code"] == "CIRCUIT_OPEN" diff --git a/uv.lock b/uv.lock index 2a025fb..d0bd0a5 100644 --- a/uv.lock +++ b/uv.lock @@ -1,13 +1,16 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'emscripten'", "python_full_version >= '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'emscripten'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'emscripten'", + "python_full_version == '3.12.*' and sys_platform == 'emscripten'", + "python_full_version == '3.13.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.12.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", "python_full_version == '3.11.*' and sys_platform == 'win32'", "python_full_version == '3.11.*' and sys_platform == 'emscripten'", "python_full_version == '3.11.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", @@ -852,9 +855,12 @@ resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'emscripten'", "python_full_version >= '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'emscripten'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'emscripten'", + "python_full_version == '3.12.*' and sys_platform == 'emscripten'", + "python_full_version == '3.13.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.12.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", "python_full_version == '3.11.*' and sys_platform == 'win32'", "python_full_version == '3.11.*' and sys_platform == 'emscripten'", "python_full_version == '3.11.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", @@ -1518,6 +1524,10 @@ docs = [ hub = [ { name = "kubeflow", extra = ["hub"] }, ] +otel = [ + { name = "opentelemetry-exporter-otlp-proto-http" }, + { name = "opentelemetry-sdk" }, +] spark = [ { name = "kubeflow", extra = ["spark"] }, ] @@ -1533,6 +1543,8 @@ requires-dist = [ { name = "kubeflow", extras = ["hub"], marker = "extra == 'hub'", specifier = ">=0.4.0" }, { name = "kubeflow", extras = ["spark"], marker = "extra == 'all'", specifier = ">=0.4.0" }, { name = "kubeflow", extras = ["spark"], marker = "extra == 'spark'", specifier = ">=0.4.0" }, + { name = "opentelemetry-exporter-otlp-proto-http", marker = "extra == 'otel'", specifier = ">=1.25.0" }, + { name = "opentelemetry-sdk", marker = "extra == 'otel'", specifier = ">=1.25.0" }, { name = "pip-audit", marker = "extra == 'dev'", specifier = ">=2.7" }, { name = "pydantic", specifier = ">=2.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" }, @@ -1544,7 +1556,7 @@ requires-dist = [ { name = "sphinx-copybutton", marker = "extra == 'docs'", specifier = ">=0.5" }, { name = "sphinx-design", marker = "extra == 'docs'", specifier = ">=0.5" }, ] -provides-extras = ["hub", "spark", "all", "dev", "docs"] +provides-extras = ["hub", "spark", "all", "dev", "docs", "otel"] [[package]] name = "kubeflow-spark-api" @@ -2040,9 +2052,12 @@ resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'emscripten'", "python_full_version >= '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'emscripten'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'emscripten'", + "python_full_version == '3.12.*' and sys_platform == 'emscripten'", + "python_full_version == '3.13.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.12.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", "python_full_version == '3.11.*' and sys_platform == 'win32'", "python_full_version == '3.11.*' and sys_platform == 'emscripten'", "python_full_version == '3.11.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", @@ -2145,15 +2160,84 @@ wheels = [ [[package]] name = "opentelemetry-api" -version = "1.41.0" +version = "1.41.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "importlib-metadata" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/47/8e/3778a7e87801d994869a9396b9fc2a289e5f9be91ff54a27d41eace494b0/opentelemetry_api-1.41.0.tar.gz", hash = "sha256:9421d911326ec12dee8bc933f7839090cad7a3f13fcfb0f9e82f8174dc003c09", size = 71416, upload-time = "2026-04-09T14:38:34.544Z" } +sdist = { url = "https://files.pythonhosted.org/packages/fa/fc/b7564cbef36601aef0d6c9bc01f7badb64be8e862c2e1c3c5c3b43b53e4f/opentelemetry_api-1.41.1.tar.gz", hash = "sha256:0ad1814d73b875f84494387dae86ce0b12c68556331ce6ce8fe789197c949621", size = 71416, upload-time = "2026-04-24T13:15:38.262Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/29/59/3e7118ed140f76b0982ba4321bdaed1997a0473f9720de2d10788a577033/opentelemetry_api-1.41.1-py3-none-any.whl", hash = "sha256:a22df900e75c76dc08440710e51f52f1aa6b451b429298896023e60db5b3139f", size = 69007, upload-time = "2026-04-24T13:15:15.662Z" }, +] + +[[package]] +name = "opentelemetry-exporter-otlp-proto-common" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-proto" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ae/fa/f9e3bd3c4d692b3ce9a2880a167d1f79681a1bea11f00d5bf76adc03e6ea/opentelemetry_exporter_otlp_proto_common-1.41.1.tar.gz", hash = "sha256:0e253156ea9c36b0bd3d2440c5c9ba7dd1f3fb64ba7a08fc85fbac536b56e1fb", size = 20409, upload-time = "2026-04-24T13:15:40.924Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/29/48/bce76d3ea772b609757e9bc844e02ab408a6446609bf74fb562062ba6b71/opentelemetry_exporter_otlp_proto_common-1.41.1-py3-none-any.whl", hash = "sha256:10da74dad6a49344b9b7b21b6182e3060373a235fde1528616d5f01f92e66aa9", size = 18366, upload-time = "2026-04-24T13:15:18.917Z" }, +] + +[[package]] +name = "opentelemetry-exporter-otlp-proto-http" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "googleapis-common-protos" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "requests" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/33/5b/9d3c7f70cca10136ba82a81e738dee626c8e7fc61c6887ea9a58bf34c606/opentelemetry_exporter_otlp_proto_http-1.41.1.tar.gz", hash = "sha256:4747a9604c8550ab38c6fd6180e2fcb80de3267060bef2c306bad3cb443302bc", size = 24139, upload-time = "2026-04-24T13:15:42.977Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ba/4d/ef07ff2fc630849f2080ae0ae73a61f67257905b7ac79066640bfa0c5739/opentelemetry_exporter_otlp_proto_http-1.41.1-py3-none-any.whl", hash = "sha256:1a21e8f49c7a946d935551e90947d6c3eb39236723c6624401da0f33d68edcb4", size = 22673, upload-time = "2026-04-24T13:15:21.313Z" }, +] + +[[package]] +name = "opentelemetry-proto" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "protobuf" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/99/e8/633c6d8a9c8840338b105907e55c32d3da1983abab5e52f899f72a82c3d1/opentelemetry_proto-1.41.1.tar.gz", hash = "sha256:4b9d2eb631237ea43b80e16c073af438554e32bc7e9e3f8ca4a9582f900020e5", size = 45670, upload-time = "2026-04-24T13:15:49.768Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e4/1e/5cd77035e3e82070e2265a63a760f715aacd3cb16dddc7efee913f297fcc/opentelemetry_proto-1.41.1-py3-none-any.whl", hash = "sha256:0496713b804d127a4147e32849fbaf5683fac8ee98550e8e7679cd706c289720", size = 72076, upload-time = "2026-04-24T13:15:32.542Z" }, +] + +[[package]] +name = "opentelemetry-sdk" +version = "1.41.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/58/d0/54ee30dab82fb0acda23d144502771ff76ef8728459c83c3e89ef9fb1825/opentelemetry_sdk-1.41.1.tar.gz", hash = "sha256:724b615e1215b5aeacda0abb8a6a8922c9a1853068948bd0bd225a56d0c792e6", size = 230180, upload-time = "2026-04-24T13:15:50.991Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b4/e7/a1420b698aad018e1cf60fdbaaccbe49021fb415e2a0d81c242f4c518f54/opentelemetry_sdk-1.41.1-py3-none-any.whl", hash = "sha256:edee379c126c1bce952b0c812b48fe8ff35b30df0eecf17e98afa4d598b7d85d", size = 180213, upload-time = "2026-04-24T13:15:33.767Z" }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9e/de/911ac9e309052aca1b20b2d5549d3db45d1011e1a610e552c6ccdd1b64f8/opentelemetry_semantic_conventions-0.62b1.tar.gz", hash = "sha256:c5cc6e04a7f8c7cdd30be2ed81499fa4e75bfbd52c9cb70d40af1f9cd3619802", size = 145750, upload-time = "2026-04-24T13:15:52.236Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/58/ee/99ab786653b3bda9c37ade7e24a7b607a1b1f696063172768417539d876d/opentelemetry_api-1.41.0-py3-none-any.whl", hash = "sha256:0e77c806e6a89c9e4f8d372034622f3e1418a11bdbe1c80a50b3d3397ad0fa4f", size = 69007, upload-time = "2026-04-09T14:38:11.833Z" }, + { url = "https://files.pythonhosted.org/packages/eb/a6/83dc2ab6fa397ee66fba04fe2e74bdf7be3b3870005359ceb7689103c058/opentelemetry_semantic_conventions-0.62b1-py3-none-any.whl", hash = "sha256:cf506938103d331fbb78eded0d9788095f7fd59016f2bda813c3324e5a74a93c", size = 231620, upload-time = "2026-04-24T13:15:35.454Z" }, ] [[package]] @@ -2246,9 +2330,12 @@ resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'emscripten'", "python_full_version >= '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'emscripten'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'emscripten'", + "python_full_version == '3.12.*' and sys_platform == 'emscripten'", + "python_full_version == '3.13.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.12.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", "python_full_version == '3.11.*' and sys_platform == 'win32'", "python_full_version == '3.11.*' and sys_platform == 'emscripten'", "python_full_version == '3.11.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", @@ -3387,9 +3474,12 @@ resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'emscripten'", "python_full_version >= '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'emscripten'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'emscripten'", + "python_full_version == '3.12.*' and sys_platform == 'emscripten'", + "python_full_version == '3.13.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.12.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", ] dependencies = [ { name = "alabaster", marker = "python_full_version >= '3.12'" }, @@ -3466,9 +3556,12 @@ resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'emscripten'", "python_full_version >= '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'win32'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform == 'emscripten'", - "python_full_version >= '3.12' and python_full_version < '3.14' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'win32'", + "python_full_version == '3.12.*' and sys_platform == 'win32'", + "python_full_version == '3.13.*' and sys_platform == 'emscripten'", + "python_full_version == '3.12.*' and sys_platform == 'emscripten'", + "python_full_version == '3.13.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", + "python_full_version == '3.12.*' and sys_platform != 'emscripten' and sys_platform != 'win32'", "python_full_version == '3.11.*' and sys_platform == 'win32'", "python_full_version == '3.11.*' and sys_platform == 'emscripten'", "python_full_version == '3.11.*' and sys_platform != 'emscripten' and sys_platform != 'win32'",