diff --git a/nemo_retriever/pyproject.toml b/nemo_retriever/pyproject.toml index a768ae7ad4..e20141fff4 100644 --- a/nemo_retriever/pyproject.toml +++ b/nemo_retriever/pyproject.toml @@ -66,6 +66,10 @@ dependencies = [ "langchain-nvidia-ai-endpoints>=0.3.0", # Default VDB solution "lancedb", + # OpenTelemetry — API only, ~70 KB pure-Python. When no provider is + # registered every instrumentation point is a near-zero-cost no-op. + # The SDK + exporters live behind the ``[otel]`` extra below. + "opentelemetry-api>=1.40.0", ] [project.optional-dependencies] @@ -142,6 +146,22 @@ benchmarks = [ llm = [ "litellm>=1.40.0", ] + +# ── OpenTelemetry SDK + exporters + auto-instrumentations ─────────────────── +# Install this when you want NeMo Retriever to *emit* traces and metrics +# rather than just defer to a host application's provider. Service mode +# pulls these in by default; library users only need them for one-line +# observability bootstrapping (``OTELConfig`` + ``configure``). +otel = [ + "opentelemetry-api>=1.40.0", + "opentelemetry-sdk>=1.40.0", + "opentelemetry-exporter-otlp>=1.40.0", + "opentelemetry-instrumentation-fastapi>=0.55b0", + "opentelemetry-instrumentation-httpx>=0.55b0", + "opentelemetry-instrumentation-requests>=0.55b0", + "opentelemetry-instrumentation-sqlite3>=0.55b0", + "opentelemetry-instrumentation-logging>=0.55b0", +] dev = [ "build>=1.2.2", "pytest>=8.0.2", @@ -149,7 +169,7 @@ dev = [ # ── Convenience: full install ───────────────────────────────────────────────── all = [ - "nemo_retriever[local,multimedia,tabular,benchmarks,llm]", + "nemo_retriever[local,multimedia,tabular,benchmarks,llm,otel]", ] [project.scripts] diff --git a/nemo_retriever/src/nemo_retriever/graph/executor.py b/nemo_retriever/src/nemo_retriever/graph/executor.py index 14a323ab08..937c46d88d 100644 --- a/nemo_retriever/src/nemo_retriever/graph/executor.py +++ b/nemo_retriever/src/nemo_retriever/graph/executor.py @@ -15,6 +15,9 @@ from nemo_retriever.graph.gpu_operator import GPUOperator from nemo_retriever.graph.pipeline_graph import Graph, Node from nemo_retriever.graph.operator_resolution import resolve_graph +from nemo_retriever.observability.propagate import inject_current_context +from nemo_retriever.observability.ray_integration import RayOperatorSpanWrapper, collect_otel_env +from nemo_retriever.observability.spans import operator_span from nemo_retriever.utils.hf_cache import collect_hf_runtime_env from nemo_retriever.utils.input_files import ( _is_explicit_glob_path, @@ -131,12 +134,26 @@ def ingest(self, data: Any, **kwargs: Any) -> Any: if self._show_progress and tqdm is not None: pbar = tqdm(operators, desc="Pipeline stages", unit="stage") - for name, op in pbar: + for index, (name, op) in enumerate(pbar): pbar.set_postfix_str(name) - df = op.run(df) + with operator_span( + name, + run_mode="inprocess", + operator_class=type(op).__name__, + operator_index=index, + batch_size=len(df) if df is not None else None, + ): + df = op.run(df) else: - for _name, op in operators: - df = op.run(df) + for index, (name, op) in enumerate(operators): + with operator_span( + name, + run_mode="inprocess", + operator_class=type(op).__name__, + operator_index=index, + batch_size=len(df) if df is not None else None, + ): + df = op.run(df) return df @@ -249,6 +266,7 @@ def ingest(self, data: Any, **kwargs: Any) -> Any: } ray_env_vars.update(collect_hf_runtime_env()) ray_env_vars.update(collect_remote_auth_runtime_env()) + ray_env_vars.update(collect_otel_env()) os.environ["HF_HUB_OFFLINE"] = ray_env_vars["HF_HUB_OFFLINE"] runtime_env = {"env_vars": ray_env_vars} ray.init( @@ -272,8 +290,13 @@ def ingest(self, data: Any, **kwargs: Any) -> Any: ds = rd.read_binary_files(input_paths, include_paths=True) except FileNotFoundError as exc: raise_input_path_not_found(input_paths or [], exc) + + # Snapshot the driver's pipeline_span so actor operator spans can + # attach as children rather than become orphan roots. + otel_parent_carrier = inject_current_context() + nodes = self._linearize(resolved_graph) - for node in nodes: + for node_index, node in enumerate(nodes): overrides = dict(self._node_overrides.get(node.name, {})) target_num_rows_per_block = overrides.pop("target_num_rows_per_block", None) batch_size = overrides.pop("batch_size", self._default_batch_size) @@ -366,17 +389,20 @@ def ingest(self, data: Any, **kwargs: Any) -> Any: elif target_num_rows_per_block is not None and int(target_num_rows_per_block) > 0: ds = ds.repartition(target_num_rows_per_block=int(target_num_rows_per_block)) - # Pass the operator class directly to map_batches with - # fn_constructor_kwargs for deferred construction on workers. - # AbstractOperator.__call__ delegates to run(), so each stage - # executes the full preprocess -> process -> postprocess chain. + wrapped_kwargs = { + "_otel_operator_class": node.operator_class, + "_otel_node_name": node.name, + "_otel_node_index": node_index, + "_otel_parent_carrier": otel_parent_carrier or None, + "_otel_operator_kwargs": dict(node.operator_kwargs or {}), + } ds = ds.map_batches( - node.operator_class, + RayOperatorSpanWrapper, batch_size=batch_size, batch_format=batch_format, num_cpus=num_cpus, num_gpus=num_gpus, - fn_constructor_kwargs=node.operator_kwargs, + fn_constructor_kwargs=wrapped_kwargs, **overrides, ) diff --git a/nemo_retriever/src/nemo_retriever/graph_ingestor.py b/nemo_retriever/src/nemo_retriever/graph_ingestor.py index a8f340af62..214fdbd966 100644 --- a/nemo_retriever/src/nemo_retriever/graph_ingestor.py +++ b/nemo_retriever/src/nemo_retriever/graph_ingestor.py @@ -33,6 +33,9 @@ from nemo_retriever.graph import InprocessExecutor, RayDataExecutor from nemo_retriever.graph.ingestor_runtime import batch_tuning_to_node_overrides, build_graph from nemo_retriever.ingestor import ingestor +from nemo_retriever.observability import OTELConfig +from nemo_retriever.observability.configure import apply_otel_env_defaults, configure as _configure_otel +from nemo_retriever.observability.spans import pipeline_span from nemo_retriever.params import ( ASRParams, AudioChunkParams, @@ -177,6 +180,11 @@ class GraphIngestor(ingestor): ``"raise"`` raises when explicitly configured remote NIM stages report row-level errors. ``"collect"`` returns partial results with the stage error payloads preserved. + otel + Optional :class:`~nemo_retriever.observability.OTELConfig` for + one-shot SDK setup. ``None`` (default) means defer to whatever + provider the host application has already registered globally — + no spans are emitted unless the host has wired up OTEL. """ RUN_MODE = "graph" @@ -196,6 +204,7 @@ def __init__( node_overrides: Optional[Dict[str, Dict[str, Any]]] = None, show_progress: bool = True, error_policy: str = "raise", + otel: Optional[OTELConfig] = None, ) -> None: super().__init__(documents=documents) if run_mode not in {"batch", "inprocess"}: @@ -214,6 +223,18 @@ def __init__( self._show_progress = show_progress self._error_policy = error_policy self._rd_dataset: Any = None + # Seed env defaults BEFORE _configure_otel (the requests instrumentor + # reads OTEL_PYTHON_REQUESTS_EXCLUDED_URLS at instrument-time) and + # BEFORE the executor calls collect_otel_env (so Ray actors inherit + # the same service name + exclude list via runtime_env). + apply_otel_env_defaults(default_service_name=f"nemo-retriever-{run_mode}") + if otel is not None: + # Register the shutdown callback at atexit so library-mode + # callers flush the BatchSpanProcessor queue without needing + # to invoke it manually. Idempotent on the SDK side. + import atexit + + atexit.register(_configure_otel(otel)) # Pipeline configuration accumulated by fluent methods self._extraction_mode: str = "pdf" @@ -436,6 +457,15 @@ def ingest(self, params: Any = None, **kwargs: Any) -> Any: post_extract_order = tuple(s for s in self._stage_order if s != "extract") + with pipeline_span( + self._run_mode, + extraction_mode=self._extraction_mode, + stages=post_extract_order, + ): + return self._run_pipeline(post_extract_order) + + def _run_pipeline(self, post_extract_order: tuple[str, ...]) -> Any: + """Build the executor and run the pipeline.""" if self._run_mode == "batch": import ray diff --git a/nemo_retriever/src/nemo_retriever/ingestor.py b/nemo_retriever/src/nemo_retriever/ingestor.py index c80c767c79..a6cdb187e7 100644 --- a/nemo_retriever/src/nemo_retriever/ingestor.py +++ b/nemo_retriever/src/nemo_retriever/ingestor.py @@ -18,6 +18,7 @@ from io import BytesIO from typing import Any, Dict, List, Optional, Tuple, Union +from nemo_retriever.observability import OTELConfig from nemo_retriever.params import CaptionParams from nemo_retriever.params import DedupParams from nemo_retriever.params import EmbedParams @@ -44,10 +45,17 @@ def create_ingestor( *, run_mode: RunMode = "inprocess", params: IngestorCreateParams | None = None, + otel: OTELConfig | None = None, **kwargs: Any, ) -> "Ingestor": """ Graph-only ingestion factory. + + Pass ``otel=OTELConfig(...)`` to install global OpenTelemetry providers + when the host application has not already configured them. When + ``otel`` is ``None`` (default), instrumentation defers to whatever + provider is already registered — meaning embedded library use inside + an OTEL-instrumented host needs no extra wiring. """ merged = _merge_params(params, kwargs) if isinstance(merged, IngestorCreateParams): @@ -80,6 +88,7 @@ def create_ingestor( debug=parsed.debug, allow_no_gpu=parsed.allow_no_gpu, error_policy=parsed.error_policy, + otel=otel, ) diff --git a/nemo_retriever/src/nemo_retriever/nim/probe.py b/nemo_retriever/src/nemo_retriever/nim/probe.py index 2b3a2226a3..328f368405 100644 --- a/nemo_retriever/src/nemo_retriever/nim/probe.py +++ b/nemo_retriever/src/nemo_retriever/nim/probe.py @@ -8,13 +8,61 @@ import logging import time import urllib.parse +from contextlib import contextmanager from typing import Optional import requests +from opentelemetry import trace as _trace_api + +from nemo_retriever.observability import attributes as _otel_attrs +from nemo_retriever.observability.tracer import get_tracer + logger = logging.getLogger(__name__) +@contextmanager +def _probe_span(name: str, kind: str, url: str): + """Open a span around one NIM probe. + + Status / detail are filled in *after* the probe finishes (the caller + sets ``probe_status`` on the span before exit). When no provider is + registered the span is a no-op. + """ + tracer = get_tracer() + attrs = { + _otel_attrs.NIM_KIND: kind, + _otel_attrs.NIM_NAME: name, + _otel_attrs.NIM_URL: url, + } + with tracer.start_as_current_span( + "nim.probe", + attributes=attrs, + record_exception=False, + set_status_on_exception=False, + ) as span: + yield span + + +def _finalize_probe_span(span, status: str, error: str | None = None) -> None: + """Tag *span* with its terminal ``probe_status``, optionally with an error.""" + span.set_attribute(_otel_attrs.NIM_PROBE_STATUS, status) + if error is not None: + span.set_status(_trace_api.Status(_trace_api.StatusCode.ERROR, error)) + + +def _kind_from_name(name: str) -> str: + """Normalize the canonical NIM role passed via ``name`` to a snake_case kind label. + + Callers pass roles like ``"ocr"``, ``"page-elements"``, ``"graphic-elements"``, + ``"table-structure"``, ``"embed"`` — the prefix arg holds the actor class + name, not the role, so we cannot derive kind from it. + """ + if not name: + return "unknown" + return name.replace("-", "_").lower() + + @dataclasses.dataclass(frozen=True) class ProbeResult: """Outcome of a single NIM endpoint probe.""" @@ -67,63 +115,70 @@ def probe_endpoint( """ parsed = urllib.parse.urlparse(url) health_url = f"{parsed.scheme}://{parsed.netloc}/v1/health/ready" + nim_kind = _kind_from_name(name) # Step 1: unauthenticated health check - try: - t0 = time.perf_counter() - resp = requests.get(health_url, timeout=timeout) - elapsed_ms = (time.perf_counter() - t0) * 1000 - logger.info( - "%s: %s endpoint %s responded %d in %.0fms", - prefix, - name, - health_url, - resp.status_code, - elapsed_ms, - ) - if resp.ok: - _probe_results.append(ProbeResult(url=health_url, name=name, prefix=prefix, status="ok")) - return - except requests.ConnectionError: - logger.warning( - "%s: %s endpoint %s is UNREACHABLE (connection refused). " - "Processing will stall until this endpoint becomes available.", - prefix, - name, - health_url, - ) - _probe_results.append( - ProbeResult( - url=health_url, - name=name, - prefix=prefix, - status="unreachable", - detail=f"{name} endpoint {health_url} is UNREACHABLE (connection refused). " + with _probe_span(name=name, kind=nim_kind, url=health_url) as span: + try: + t0 = time.perf_counter() + resp = requests.get(health_url, timeout=timeout) + elapsed_ms = (time.perf_counter() - t0) * 1000 + logger.info( + "%s: %s endpoint %s responded %d in %.0fms", + prefix, + name, + health_url, + resp.status_code, + elapsed_ms, + ) + span.set_attribute("http.status_code", int(resp.status_code)) + if resp.ok: + _finalize_probe_span(span, "ok") + _probe_results.append(ProbeResult(url=health_url, name=name, prefix=prefix, status="ok")) + return + except requests.ConnectionError: + logger.warning( + "%s: %s endpoint %s is UNREACHABLE (connection refused). " "Processing will stall until this endpoint becomes available.", + prefix, + name, + health_url, ) - ) - return - except requests.Timeout: - logger.warning( - "%s: %s endpoint %s timed out after %.1fs. " "The endpoint may be overloaded or not ready.", - prefix, - name, - health_url, - timeout, - ) - _probe_results.append( - ProbeResult( - url=health_url, - name=name, - prefix=prefix, - status="timeout", - detail=f"{name} endpoint {health_url} timed out after {timeout:.1f}s. " - "The endpoint may be overloaded or not ready.", + _finalize_probe_span(span, "unreachable", "connection refused") + _probe_results.append( + ProbeResult( + url=health_url, + name=name, + prefix=prefix, + status="unreachable", + detail=f"{name} endpoint {health_url} is UNREACHABLE (connection refused). " + "Processing will stall until this endpoint becomes available.", + ) ) - ) - return - except Exception as exc: - logger.debug("%s: %s endpoint probe %s failed: %s", prefix, name, health_url, exc) + return + except requests.Timeout: + logger.warning( + "%s: %s endpoint %s timed out after %.1fs. " "The endpoint may be overloaded or not ready.", + prefix, + name, + health_url, + timeout, + ) + _finalize_probe_span(span, "timeout", f"timeout after {timeout:.1f}s") + _probe_results.append( + ProbeResult( + url=health_url, + name=name, + prefix=prefix, + status="timeout", + detail=f"{name} endpoint {health_url} timed out after {timeout:.1f}s. " + "The endpoint may be overloaded or not ready.", + ) + ) + return + except Exception as exc: + _finalize_probe_span(span, "error", type(exc).__name__) + logger.debug("%s: %s endpoint probe %s failed: %s", prefix, name, health_url, exc) # Step 2: authenticated probe of the actual endpoint URL. # Only reached when the health path returned non-2xx (e.g. 404 on @@ -141,57 +196,64 @@ def probe_endpoint( target = post_url or url body = post_body if post_body is not None else {} headers = {"Authorization": f"Bearer {api_key}"} - try: - t0 = time.perf_counter() - resp = requests.post(target, headers=headers, json=body, timeout=timeout) - elapsed_ms = (time.perf_counter() - t0) * 1000 - logger.info( - "%s: %s endpoint %s responded %d in %.0fms", - prefix, - name, - target, - resp.status_code, - elapsed_ms, - ) - if resp.status_code in (401, 403): - raise RuntimeError( - f"{prefix}: authentication failed for {name} endpoint {target} " - f"(HTTP {resp.status_code}) — verify the API key is valid." + with _probe_span(name=name, kind=nim_kind, url=target) as span: + try: + t0 = time.perf_counter() + resp = requests.post(target, headers=headers, json=body, timeout=timeout) + elapsed_ms = (time.perf_counter() - t0) * 1000 + logger.info( + "%s: %s endpoint %s responded %d in %.0fms", + prefix, + name, + target, + resp.status_code, + elapsed_ms, + ) + span.set_attribute("http.status_code", int(resp.status_code)) + if resp.status_code in (401, 403): + _finalize_probe_span(span, "auth_failed", f"http {resp.status_code}") + raise RuntimeError( + f"{prefix}: authentication failed for {name} endpoint {target} " + f"(HTTP {resp.status_code}) — verify the API key is valid." + ) + _finalize_probe_span(span, "ok") + except RuntimeError: + raise + except requests.ConnectionError: + logger.warning( + "%s: %s endpoint %s is UNREACHABLE (connection refused).", + prefix, + name, + target, + ) + _finalize_probe_span(span, "unreachable", "connection refused") + _probe_results.append( + ProbeResult( + url=target, + name=name, + prefix=prefix, + status="unreachable", + detail=f"{name} endpoint {target} is UNREACHABLE (connection refused).", + ) ) - except RuntimeError: - raise - except requests.ConnectionError: - logger.warning( - "%s: %s endpoint %s is UNREACHABLE (connection refused).", - prefix, - name, - target, - ) - _probe_results.append( - ProbeResult( - url=target, - name=name, - prefix=prefix, - status="unreachable", - detail=f"{name} endpoint {target} is UNREACHABLE (connection refused).", + except requests.Timeout: + logger.warning( + "%s: %s endpoint %s timed out after %.1fs.", + prefix, + name, + target, + timeout, ) - ) - except requests.Timeout: - logger.warning( - "%s: %s endpoint %s timed out after %.1fs.", - prefix, - name, - target, - timeout, - ) - _probe_results.append( - ProbeResult( - url=target, - name=name, - prefix=prefix, - status="timeout", - detail=f"{name} endpoint {target} timed out after {timeout:.1f}s.", + _finalize_probe_span(span, "timeout", f"timeout after {timeout:.1f}s") + _probe_results.append( + ProbeResult( + url=target, + name=name, + prefix=prefix, + status="timeout", + detail=f"{name} endpoint {target} timed out after {timeout:.1f}s.", + ) ) - ) - except Exception as exc: - logger.debug("%s: %s endpoint probe %s failed: %s", prefix, name, target, exc) + except Exception as exc: + _finalize_probe_span(span, "error", type(exc).__name__) + logger.debug("%s: %s endpoint probe %s failed: %s", prefix, name, target, exc) diff --git a/nemo_retriever/src/nemo_retriever/observability/__init__.py b/nemo_retriever/src/nemo_retriever/observability/__init__.py new file mode 100644 index 0000000000..1bdce59afc --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/observability/__init__.py @@ -0,0 +1,82 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""OpenTelemetry observability primitives for NeMo Retriever. + +The library exposes a thin, API-only wrapper over OpenTelemetry so it +can be imported safely whether or not the user has installed the +``[otel]`` extra. When no ``TracerProvider`` / ``MeterProvider`` is +registered globally — the default — every instrumentation point is a +near-zero-cost no-op. + +Two usage patterns: + +* **Embedded library mode**: the host application configures + OpenTelemetry once at startup; NeMo Retriever's spans and metrics + inherit that provider automatically. No code change required on + our side. +* **Service mode**: call :func:`configure` (or pass an :class:`OTELConfig` + to the relevant factory) to install global providers wired to an OTLP + collector. +""" + +from __future__ import annotations + +from nemo_retriever.observability import attributes +from nemo_retriever.observability.configure import OTELConfig, apply_otel_env_defaults, configure +from nemo_retriever.observability.propagate import extract_context, inject_current_context +from nemo_retriever.observability.ray_integration import RayOperatorSpanWrapper, collect_otel_env +from nemo_retriever.observability.spans import ( + nim_kind_for_operator, + operator_span, + pipeline_span, + tag_current_span, + tag_document, + tag_job, + tag_upload, +) +from nemo_retriever.observability.tracer import ( + documents_failed_counter, + documents_processed_counter, + get_meter, + get_tracer, + jobs_completed_counter, + jobs_failed_counter, + jobs_submitted_counter, + operator_duration_histogram, + pages_completed_counter, + pages_failed_counter, + reset_instrument_cache, + safe_add, +) + +__all__ = [ + "OTELConfig", + "RayOperatorSpanWrapper", + "apply_otel_env_defaults", + "attributes", + "collect_otel_env", + "configure", + "documents_failed_counter", + "documents_processed_counter", + "extract_context", + "get_meter", + "get_tracer", + "inject_current_context", + "jobs_completed_counter", + "jobs_failed_counter", + "jobs_submitted_counter", + "nim_kind_for_operator", + "operator_duration_histogram", + "operator_span", + "pages_completed_counter", + "pages_failed_counter", + "pipeline_span", + "reset_instrument_cache", + "safe_add", + "tag_current_span", + "tag_document", + "tag_job", + "tag_upload", +] diff --git a/nemo_retriever/src/nemo_retriever/observability/attributes.py b/nemo_retriever/src/nemo_retriever/observability/attributes.py new file mode 100644 index 0000000000..a4a2215b0b --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/observability/attributes.py @@ -0,0 +1,52 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Span and metric attribute name constants used by NeMo Retriever instrumentation.""" + +from __future__ import annotations + +# Run mode: "inprocess", "batch", or "service". +RUN_MODE = "nemo_retriever.run_mode" + +# Operator metadata. +OPERATOR_NAME = "nemo_retriever.operator.name" +OPERATOR_CLASS = "nemo_retriever.operator.class" +OPERATOR_INDEX = "nemo_retriever.operator.index" + +# Batch metadata. +BATCH_SIZE = "nemo_retriever.batch.size" +BATCH_OUTPUT_ROWS = "nemo_retriever.batch.output_rows" +BATCH_JOB_COUNT = "nemo_retriever.batch.job_count" +BATCH_DOCUMENT_COUNT = "nemo_retriever.batch.document_count" + +# Pipeline metadata. +PIPELINE_STAGES = "nemo_retriever.pipeline.stages" +EXTRACTION_MODE = "nemo_retriever.extraction_mode" + +# Per-request correlation in service mode. +REQUEST_ID = "nemo_retriever.request_id" + +# Document/job correlation. +DOCUMENT_ID = "nemo_retriever.document_id" +JOB_ID = "nemo_retriever.job_id" +FILENAME = "nemo_retriever.filename" +TOTAL_PAGES = "nemo_retriever.total_pages" + +# Worker process identity (service mode cross-process spans). +WORKER_PID = "nemo_retriever.worker.pid" + +# Error classification on failed operator spans. +ERROR_CLASS = "nemo_retriever.error.class" + +# Status label used on document counters: "ok", "failed", "cancelled". +STATUS = "nemo_retriever.status" + +# NIM endpoint metadata. +NIM_KIND = "nemo_retriever.nim.kind" +NIM_NAME = "nemo_retriever.nim.name" +NIM_URL = "nemo_retriever.nim.url" +NIM_PROBE_STATUS = "nemo_retriever.nim.probe_status" + +# Instrumentation scope name (passed to ``get_tracer`` / ``get_meter``). +INSTRUMENTATION_SCOPE = "nemo_retriever" diff --git a/nemo_retriever/src/nemo_retriever/observability/configure.py b/nemo_retriever/src/nemo_retriever/observability/configure.py new file mode 100644 index 0000000000..7783c3b8e8 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/observability/configure.py @@ -0,0 +1,261 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Optional one-shot OpenTelemetry SDK setup helper. + +SDK + exporter packages live behind the ``[otel]`` extra and are imported +lazily so ``import nemo_retriever`` never pulls them in. Calling +:func:`configure` is opt-in — hosts that already configured OpenTelemetry +can skip it and the existing global providers are used as-is. +""" + +from __future__ import annotations + +import dataclasses +import logging +import os +from typing import Callable, Literal, Sequence + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class OTELConfig: + """User-facing OpenTelemetry SDK configuration. + + All fields are optional. Standard ``OTEL_*`` environment variables + take precedence inside the SDK itself, so values supplied here are + fallbacks when the corresponding env var is unset. + """ + + enabled: bool = True + service_name: str = "nemo-retriever" + endpoint: str | None = None + exporter: Literal["otlp", "console", "none"] = "otlp" + sampling_ratio: float = 1.0 + auto_instrument: bool = True + resource_attributes: dict[str, str] | None = None + + +def configure(config: OTELConfig | None = None) -> Callable[[], None]: + """Install global tracer + meter providers and return a shutdown callback. + + When *config* is ``None``, defaults are used. When ``enabled`` is + false, the call is a no-op and the returned shutdown callback does + nothing. Callers should invoke the shutdown callback at process + exit to flush in-flight spans/metrics through the exporter. + """ + cfg = config or OTELConfig() + if not cfg.enabled: + return _noop_shutdown + + try: + return _configure_sdk(cfg) + except ImportError as exc: + logger.warning( + "OpenTelemetry SDK not installed (%s); pipeline will run " + "without tracing. Install with `pip install nemo-retriever[otel]`.", + exc, + ) + return _noop_shutdown + + +def _noop_shutdown() -> None: + return None + + +_HF_HUB_URL_PATTERN = r"huggingface\.co" + + +def apply_otel_env_defaults(*, default_service_name: str) -> None: + """Seed ``OTEL_*`` env vars with sensible defaults without clobbering + values the caller has already set. + + Must run BEFORE :func:`configure` (the ``requests`` instrumentor reads + ``OTEL_PYTHON_REQUESTS_EXCLUDED_URLS`` at instrument-time) and BEFORE + :func:`collect_otel_env` (so Ray actors inherit the same values via + runtime_env propagation). + + Currently sets: + + * ``OTEL_SERVICE_NAME`` — falls back to *default_service_name* when + unset, preventing the SDK's ``"unknown_service"`` placeholder from + surfacing in the collector. + * ``OTEL_PYTHON_REQUESTS_EXCLUDED_URLS`` — appends a HuggingFace Hub + regex so HEAD probes for optional config files (which legitimately + return 404) don't show up as ``STATUS_CODE_ERROR`` spans. Any + caller-supplied patterns are preserved. + """ + os.environ.setdefault("OTEL_SERVICE_NAME", default_service_name) + + existing = os.environ.get("OTEL_PYTHON_REQUESTS_EXCLUDED_URLS", "").strip() + if _HF_HUB_URL_PATTERN not in existing: + os.environ["OTEL_PYTHON_REQUESTS_EXCLUDED_URLS"] = ( + f"{existing},{_HF_HUB_URL_PATTERN}" if existing else _HF_HUB_URL_PATTERN + ) + + +def _configure_sdk(cfg: OTELConfig) -> Callable[[], None]: + """Lazy SDK import + provider wiring; runs only when ``cfg.enabled``.""" + from opentelemetry import metrics, trace + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.resources import Resource + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio + + resource_attrs: dict[str, str] = {"service.name": cfg.service_name} + if cfg.resource_attributes: + resource_attrs.update(cfg.resource_attributes) + resource = Resource.create(resource_attrs) + + sampler = ParentBasedTraceIdRatio(max(0.0, min(cfg.sampling_ratio, 1.0))) + tracer_provider = TracerProvider(resource=resource, sampler=sampler) + + span_exporter = _build_span_exporter(cfg) + metric_reader = _build_metric_reader(cfg) + + if span_exporter is not None: + tracer_provider.add_span_processor(BatchSpanProcessor(span_exporter)) + + trace.set_tracer_provider(tracer_provider) + + if metric_reader is not None: + meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + else: + meter_provider = MeterProvider(resource=resource) + metrics.set_meter_provider(meter_provider) + + from nemo_retriever.observability.tracer import reset_instrument_cache + + reset_instrument_cache() + + if cfg.auto_instrument: + _install_auto_instrumentations() + + logger.info( + "OpenTelemetry configured: service=%s exporter=%s endpoint=%s sampling=%.3f", + cfg.service_name, + cfg.exporter, + cfg.endpoint or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", ""), + cfg.sampling_ratio, + ) + + def _shutdown() -> None: + try: + tracer_provider.shutdown() + except Exception: # noqa: BLE001 — best effort + logger.exception("Failed to shut down tracer provider cleanly") + try: + meter_provider.shutdown() + except Exception: # noqa: BLE001 — best effort + logger.exception("Failed to shut down meter provider cleanly") + + return _shutdown + + +def _otlp_use_http(endpoint: str | None) -> bool: + """Decide whether to use the OTLP/HTTP exporter or the gRPC one. + + OTLP supports both protocols and both use ``http://``-prefixed + endpoints, so URL scheme alone is *not* a reliable signal. Use: + + * ``OTEL_EXPORTER_OTLP_PROTOCOL`` env var when set + (``grpc`` | ``http/protobuf`` | ``http/json``); + * the canonical port as a fallback (``:4318`` → HTTP, anything else + including ``:4317`` → gRPC); + * gRPC otherwise (matches the SDK's own default). + """ + proto = os.environ.get("OTEL_EXPORTER_OTLP_PROTOCOL", "").strip().lower() + if proto.startswith("http"): + return True + if proto == "grpc": + return False + if endpoint: + if ( + ":4318" in endpoint + or endpoint.rstrip("/").endswith("/v1/traces") + or endpoint.rstrip("/").endswith("/v1/metrics") + ): + return True + return False + + +def _build_span_exporter(cfg: OTELConfig): # type: ignore[no-untyped-def] + if cfg.exporter == "none": + return None + if cfg.exporter == "console": + from opentelemetry.sdk.trace.export import ConsoleSpanExporter + + return ConsoleSpanExporter() + # Default: OTLP. Endpoint resolves from cfg.endpoint, then env vars. + endpoint = ( + cfg.endpoint + or os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") + or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + ) + return _build_otlp_span_exporter(endpoint) + + +def _build_otlp_span_exporter(endpoint: str | None): # type: ignore[no-untyped-def] + if _otlp_use_http(endpoint): + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HTTPExporter + + return HTTPExporter(endpoint=endpoint) if endpoint else HTTPExporter() + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCExporter + + return GRPCExporter(endpoint=endpoint) if endpoint else GRPCExporter() + + +def _build_metric_reader(cfg: OTELConfig): # type: ignore[no-untyped-def] + if cfg.exporter == "none": + return None + if cfg.exporter == "console": + from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader + + return PeriodicExportingMetricReader(ConsoleMetricExporter()) + + endpoint = ( + cfg.endpoint + or os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT") + or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT") + ) + + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + + if _otlp_use_http(endpoint): + from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as HTTPMetric + + return PeriodicExportingMetricReader(HTTPMetric(endpoint=endpoint) if endpoint else HTTPMetric()) + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter as GRPCMetric + + return PeriodicExportingMetricReader(GRPCMetric(endpoint=endpoint) if endpoint else GRPCMetric()) + + +_AUTO_INSTRUMENTORS: Sequence[tuple[str, str]] = ( + ("opentelemetry.instrumentation.httpx", "HTTPXClientInstrumentor"), + ("opentelemetry.instrumentation.requests", "RequestsInstrumentor"), + ("opentelemetry.instrumentation.sqlite3", "SQLite3Instrumentor"), + ("opentelemetry.instrumentation.logging", "LoggingInstrumentor"), +) + + +def _install_auto_instrumentations() -> None: + """Install best-effort auto-instrumentations from the optional extra. + + Each instrumentor is independent: a missing module simply means that + integration is skipped. Already-installed instrumentors are no-ops. + """ + for module_name, class_name in _AUTO_INSTRUMENTORS: + try: + module = __import__(module_name, fromlist=[class_name]) + except ImportError: + continue + instrumentor_cls = getattr(module, class_name, None) + if instrumentor_cls is None: + continue + try: + instrumentor_cls().instrument() + except Exception: # noqa: BLE001 — never break startup over a probe + logger.exception("Failed to install %s.%s", module_name, class_name) diff --git a/nemo_retriever/src/nemo_retriever/observability/propagate.py b/nemo_retriever/src/nemo_retriever/observability/propagate.py new file mode 100644 index 0000000000..dc33158650 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/observability/propagate.py @@ -0,0 +1,27 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from typing import Any, Mapping + +from opentelemetry import context as otel_context +from opentelemetry import trace as _trace_api +from opentelemetry.propagate import extract, inject + + +def inject_current_context() -> dict[str, str]: + """Serialise the active span context to a carrier dict; ``{}`` when none.""" + if not _trace_api.get_current_span().get_span_context().is_valid: + return {} + carrier: dict[str, str] = {} + inject(carrier) + return carrier + + +def extract_context(carrier: Mapping[str, Any] | None) -> Any: + """Rehydrate an OTEL ``Context`` from a carrier; empty ``Context`` when missing.""" + if not carrier: + return otel_context.Context() + return extract(carrier) diff --git a/nemo_retriever/src/nemo_retriever/observability/ray_integration.py b/nemo_retriever/src/nemo_retriever/observability/ray_integration.py new file mode 100644 index 0000000000..eafc6ce293 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/observability/ray_integration.py @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Per-operator span emission from inside Ray Data map-batches actors. + +Ray actors do not see the driver's active OpenTelemetry context, and the +map-batches API provides no built-in hook for wrapping each batch call. +The span wrapper class in this module solves these problems. It +constructs the real operator inside the actor and wraps every batch in +an operator span parented to the driver's pipeline span via a W3C trace +context carrier. Actor-side provider bootstrap is handled separately by +opentelemetry-distro, which reads the standard OTEL environment +variables forwarded through Ray's runtime environment. +""" + +from __future__ import annotations + +import os +from typing import Any, Mapping + +from nemo_retriever.observability.propagate import extract_context +from nemo_retriever.observability.spans import operator_span + + +# Broad prefix so custom OTEL_* settings (sampler, propagator, log level) +# all flow through to actors. +_OTEL_ENV_PREFIXES = ("OTEL_",) + + +def collect_otel_env() -> dict[str, str]: + """Snapshot ``OTEL_*`` env vars for merging into Ray ``runtime_env.env_vars``.""" + return {k: v for k, v in os.environ.items() if any(k.startswith(p) for p in _OTEL_ENV_PREFIXES)} + + +class RayOperatorSpanWrapper: + """Wraps each Ray map-batches call in an operator span. + + Deliberately not an AbstractOperator subclass: map-batches only needs + a callable, and inheritance would pull in unwanted constructor and + run-method side effects. + """ + + def __init__( + self, + *, + _otel_operator_class: type, + _otel_node_name: str, + _otel_node_index: int, + _otel_parent_carrier: Mapping[str, str] | None = None, + _otel_operator_kwargs: Mapping[str, Any] | None = None, + ) -> None: + self._operator = _otel_operator_class(**dict(_otel_operator_kwargs or {})) + self._operator_class_name = _otel_operator_class.__name__ + self._node_name = _otel_node_name + self._node_index = int(_otel_node_index) + # Extract once at construction instead of re-parsing per batch. + self._parent_ctx = extract_context(_otel_parent_carrier or {}) + + def __call__(self, data: Any, **kwargs: Any) -> Any: + batch_size = len(data) if hasattr(data, "__len__") else None + with operator_span( + self._node_name, + run_mode="batch", + operator_class=self._operator_class_name, + operator_index=self._node_index, + batch_size=batch_size, + parent_context=self._parent_ctx, + ): + return self._operator(data, **kwargs) diff --git a/nemo_retriever/src/nemo_retriever/observability/spans.py b/nemo_retriever/src/nemo_retriever/observability/spans.py new file mode 100644 index 0000000000..bff22c7df2 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/observability/spans.py @@ -0,0 +1,213 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Reusable span and metric context managers for the pipeline seam. + +Centralizing the span and metric schema here keeps attribute keys, span +names, and metric labels identical across the three call sites that +exercise the seam (the inprocess executor, the service-mode worker +subprocess, and the public ingest entry point), so a single +OpenTelemetry collector view works for both library and service +deployments. +""" + +from __future__ import annotations + +import time +from contextlib import contextmanager +from functools import lru_cache +from typing import Any, Iterator, Mapping + +from opentelemetry import trace as _trace_api + +from nemo_retriever.observability import attributes as A +from nemo_retriever.observability.tracer import ( + documents_failed_counter, + documents_processed_counter, + get_tracer, + operator_duration_histogram, + safe_add, +) +from nemo_retriever.params.models import RunMode + + +def tag_current_span(**attrs: Any) -> None: + """Set attributes on the active span, skipping any that are None. + + Safe to call when no span is recording; the call is a no-op. + """ + span = _trace_api.get_current_span() + if not span.is_recording(): + return + filtered = {k: v for k, v in attrs.items() if v is not None} + if not filtered: + return + try: + span.set_attributes(filtered) + except Exception: # noqa: BLE001 — telemetry must never raise + pass + + +def tag_job(job_id: str | None) -> None: + """Tag the active span with the job id.""" + if job_id is not None: + tag_current_span(**{A.JOB_ID: job_id}) + + +def tag_document(document_id: str | None) -> None: + """Tag the active span with the document id.""" + if document_id is not None: + tag_current_span(**{A.DOCUMENT_ID: document_id}) + + +def tag_upload(*, filename: str | None = None, total_pages: int | None = None) -> None: + """Tag the active span with the upload's filename and page count.""" + tag_current_span(**{A.FILENAME: filename, A.TOTAL_PAGES: total_pages}) + + +# Operator class name substring to canonical NIM kind. Lets traces be +# filtered by role rather than URL or port, which matters when the same +# NIM image is reachable on different ports across deployments. Order +# matters: more specific substrings come first. +_NIM_KIND_RULES: tuple[tuple[str, str], ...] = ( + ("PageElement", "page_elements"), + ("GraphicElements", "graphic_elements"), + ("TableStructure", "table_structure"), + ("OCR", "ocr"), + ("BatchEmbed", "embed"), + ("Embed", "embed"), + ("Caption", "caption"), + ("NemotronParse", "nemotron_parse"), + ("ASR", "asr"), + ("Rerank", "rerank"), +) + + +@lru_cache(maxsize=128) +def nim_kind_for_operator(class_name: str) -> str | None: + """Return the canonical NIM kind for an operator class name, or None. + + Cached because operator_span calls this once per operator per batch + on the hot path, and the operator class set is small and stable. + """ + for needle, kind in _NIM_KIND_RULES: + if needle in class_name: + return kind + return None + + +@contextmanager +def pipeline_span( + run_mode: RunMode, + *, + extraction_mode: str | None = None, + stages: Mapping[str, Any] | tuple[str, ...] | list[str] | None = None, + extra_attrs: Mapping[str, Any] | None = None, +) -> Iterator[_trace_api.Span]: + """Open a root span around an end-to-end pipeline execution. + + Wraps the ingest call so a single trace view shows the full request, + operator chain, and outbound NIM call tree. Safe to use whether or + not a global tracer provider is set; with no provider the span is a + no-op. + """ + tracer = get_tracer() + attrs: dict[str, Any] = {A.RUN_MODE: run_mode} + if extraction_mode is not None: + attrs[A.EXTRACTION_MODE] = extraction_mode + if stages is not None: + if isinstance(stages, (list, tuple)): + attrs[A.PIPELINE_STAGES] = list(stages) + else: + attrs[A.PIPELINE_STAGES] = list(stages.keys()) + if extra_attrs: + attrs.update(extra_attrs) + + with tracer.start_as_current_span( + "ingestor.ingest", + attributes=attrs, + record_exception=True, + set_status_on_exception=True, + ) as span: + yield span + + +@contextmanager +def operator_span( + name: str, + run_mode: RunMode, + *, + operator_class: str | None = None, + operator_index: int | None = None, + batch_size: int | None = None, + extra_attrs: Mapping[str, Any] | None = None, + parent_context: Any = None, +) -> Iterator[_trace_api.Span]: + """Open a span and record duration and throughput metrics for one operator call. + + On normal exit, records a histogram observation for the operator's + wall-clock latency and increments the documents-processed counter by + the batch size when one is supplied. On exception, records the + exception on the span, flips the duration label to failed, and + increments the documents-failed counter by the batch size (or one + when no batch size is given), tagged with the exception class name. + + The parent_context argument attaches the span to an explicit parent, + used in Ray batch mode where the actor process has no active context + and needs to inherit the driver's pipeline span. + """ + tracer = get_tracer() + span_attrs: dict[str, Any] = { + A.OPERATOR_NAME: name, + A.RUN_MODE: run_mode, + } + if operator_class is not None: + span_attrs[A.OPERATOR_CLASS] = operator_class + kind = nim_kind_for_operator(operator_class) + if kind is not None: + span_attrs[A.NIM_KIND] = kind + if operator_index is not None: + span_attrs[A.OPERATOR_INDEX] = operator_index + if batch_size is not None: + span_attrs[A.BATCH_SIZE] = batch_size + if extra_attrs: + span_attrs.update(extra_attrs) + + metric_attrs = {A.OPERATOR_NAME: name, A.RUN_MODE: run_mode} + + t0 = time.monotonic() + error_class: str | None = None + try: + with tracer.start_as_current_span( + f"operator.{name}", + context=parent_context, + attributes=span_attrs, + record_exception=True, + set_status_on_exception=True, + ) as span: + yield span + except BaseException as exc: # noqa: BLE001 — re-raised immediately; we only tag the metric + error_class = type(exc).__name__ + raise + finally: + elapsed_ms = (time.monotonic() - t0) * 1000.0 + status = "ok" if error_class is None else "failed" + try: + operator_duration_histogram().record(elapsed_ms, attributes={**metric_attrs, A.STATUS: status}) + except Exception: # noqa: BLE001 — instrumentation must never raise + pass + + if error_class is None: + if batch_size: + safe_add( + documents_processed_counter(), + batch_size, + {**metric_attrs, A.STATUS: "ok"}, + ) + else: + safe_add( + documents_failed_counter(), + batch_size or 1, + {**metric_attrs, A.ERROR_CLASS: error_class}, + ) diff --git a/nemo_retriever/src/nemo_retriever/observability/tracer.py b/nemo_retriever/src/nemo_retriever/observability/tracer.py new file mode 100644 index 0000000000..fbddc2a3d0 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/observability/tracer.py @@ -0,0 +1,135 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Tracer, meter, and instrument accessors built on the OpenTelemetry API. + +Depends only on the OpenTelemetry API package. When no provider is +registered the returned tracer and meter are no-ops with negligible +overhead, so this module is safe to import unconditionally from library +code. +""" + +from __future__ import annotations + +from functools import lru_cache +from importlib.metadata import PackageNotFoundError, version +from typing import TYPE_CHECKING, Any, Mapping + +from opentelemetry import metrics, trace + +from nemo_retriever.observability.attributes import INSTRUMENTATION_SCOPE + +if TYPE_CHECKING: + from opentelemetry.metrics import Counter, Histogram, Meter + from opentelemetry.trace import Tracer + +try: + _NR_VERSION: str | None = version("nemo-retriever") +except PackageNotFoundError: + _NR_VERSION = None + + +@lru_cache(maxsize=1) +def get_tracer() -> "Tracer": + """Return the NeMo Retriever tracer for the active global provider.""" + return trace.get_tracer(INSTRUMENTATION_SCOPE, _NR_VERSION) + + +@lru_cache(maxsize=1) +def get_meter() -> "Meter": + """Return the NeMo Retriever meter for the active global provider.""" + return metrics.get_meter(INSTRUMENTATION_SCOPE, _NR_VERSION) + + +@lru_cache(maxsize=None) +def _counter(name: str, description: str, unit: str = "1") -> "Counter": + return get_meter().create_counter(name=name, unit=unit, description=description) + + +@lru_cache(maxsize=None) +def _histogram(name: str, description: str, unit: str) -> "Histogram": + return get_meter().create_histogram(name=name, unit=unit, description=description) + + +def reset_instrument_cache() -> None: + """Clear cached tracer, meter, and instruments. Used by tests that swap providers.""" + get_tracer.cache_clear() + get_meter.cache_clear() + _counter.cache_clear() + _histogram.cache_clear() + + +# Named instruments. Each is a thin alias so the cache key and description +# stay colocated with the call site. + + +def operator_duration_histogram() -> "Histogram": + return _histogram( + "nemo_retriever.operator.duration_ms", + "Wall-clock duration of a single AbstractOperator.run() call.", + "ms", + ) + + +def documents_processed_counter() -> "Counter": + return _counter( + "nemo_retriever.documents.processed", + "Number of documents that flowed through an operator.", + ) + + +def documents_failed_counter() -> "Counter": + return _counter( + "nemo_retriever.documents.failed", + "Number of operator invocations that raised an exception.", + ) + + +def jobs_submitted_counter() -> "Counter": + return _counter( + "nemo_retriever.jobs.submitted", + "Number of ingest jobs accepted by the service (one increment per job_id).", + ) + + +def jobs_completed_counter() -> "Counter": + return _counter( + "nemo_retriever.jobs.completed", + "Number of jobs that reached terminal status=complete.", + ) + + +def jobs_failed_counter() -> "Counter": + return _counter( + "nemo_retriever.jobs.failed", + "Number of jobs that ended in status=failed or status=cancelled.", + ) + + +def pages_completed_counter() -> "Counter": + return _counter( + "nemo_retriever.pages.completed", + "Number of pages whose pipeline run finished successfully.", + ) + + +def pages_failed_counter() -> "Counter": + return _counter( + "nemo_retriever.pages.failed", + "Number of pages whose pipeline run terminally failed (any stage).", + ) + + +def safe_add(counter: "Counter", amount: int = 1, attributes: Mapping[str, Any] | None = None) -> None: + """Add to a counter without ever raising. Use on pipeline hot paths. + + Telemetry must never break ingestion: a misconfigured exporter or + metric instrument bug should not propagate as an unhandled + exception. Use this from call sites where the increment is purely + observational. + """ + try: + counter.add(amount, attributes=attributes) + except Exception: # noqa: BLE001 — telemetry must never raise + pass diff --git a/nemo_retriever/src/nemo_retriever/page_elements/cpu_actor.py b/nemo_retriever/src/nemo_retriever/page_elements/cpu_actor.py index d934ada89d..7f7d2e769f 100644 --- a/nemo_retriever/src/nemo_retriever/page_elements/cpu_actor.py +++ b/nemo_retriever/src/nemo_retriever/page_elements/cpu_actor.py @@ -11,6 +11,7 @@ from nemo_retriever.graph.abstract_operator import AbstractOperator from nemo_retriever.graph.cpu_operator import CPUOperator from nemo_retriever.nim.nim import NIMClient +from nemo_retriever.nim.probe import probe_endpoint from nemo_retriever.page_elements.shared import _error_payload, detect_page_elements_v3 @@ -37,6 +38,12 @@ def __init__(self, **detect_kwargs: Any) -> None: self._nim_client = NIMClient( max_pool_workers=int(self.detect_kwargs.get("remote_max_pool_workers", 24)), ) + probe_endpoint( + invoke_url, + name="page-elements", + prefix="PageElementDetectionCPUActor", + api_key=str(self.detect_kwargs.get("api_key") or "") or None, + ) def preprocess(self, data: Any, **kwargs: Any) -> Any: return data diff --git a/nemo_retriever/src/nemo_retriever/service/app.py b/nemo_retriever/src/nemo_retriever/service/app.py index 7d6acf1dcf..c939acc9e8 100644 --- a/nemo_retriever/src/nemo_retriever/service/app.py +++ b/nemo_retriever/src/nemo_retriever/service/app.py @@ -23,7 +23,9 @@ from pathlib import Path -from nemo_retriever.service.config import ServiceConfig +from nemo_retriever.observability import OTELConfig, attributes as otel_attrs, tag_current_span +from nemo_retriever.observability.configure import apply_otel_env_defaults, configure as configure_otel +from nemo_retriever.service.config import OTELServiceConfig, ServiceConfig from nemo_retriever.service.db.engine import DatabaseEngine from nemo_retriever.service.db.repository import Repository from nemo_retriever.service.event_bus import EventBus @@ -93,6 +95,14 @@ def _resolve_spool_root(config: ServiceConfig) -> Path: return Path(config.database.path).resolve().parent / "spool" +def _to_otel_config(cfg: OTELServiceConfig) -> OTELConfig: + """Convert the YAML-driven service config into the library's :class:`OTELConfig`.""" + data = cfg.model_dump() + if not data.get("resource_attributes"): + data["resource_attributes"] = None + return OTELConfig(**data) + + @asynccontextmanager async def _lifespan(app: FastAPI) -> AsyncIterator[None]: """Startup / shutdown lifecycle for the service.""" @@ -193,6 +203,12 @@ async def _lifespan(app: FastAPI) -> AsyncIterator[None]: ) pool.shutdown() db_engine.close() + otel_shutdown = getattr(app.state, "otel_shutdown", None) + if otel_shutdown is not None: + try: + otel_shutdown() + except Exception: # noqa: BLE001 — best-effort flush on the way out + logger.exception("OpenTelemetry shutdown raised; ignoring") logger.info("Retriever service stopped") @@ -202,12 +218,18 @@ class _RequestIdMiddleware(BaseHTTPMiddleware): Routers read ``request.state.request_id`` and pass it to :func:`record_event` so all provenance events from one HTTP call can be correlated in the event log. + + Also tags the active OTEL span with ``nemo_retriever.request_id`` + so traces and the SQLite event log can be cross-referenced — when + no provider is registered the call is a no-op on the + ``NonRecordingSpan``. """ async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response: - request.state.request_id = uuid.uuid4().hex - response = await call_next(request) - return response + request_id = uuid.uuid4().hex + request.state.request_id = request_id + tag_current_span(**{otel_attrs.REQUEST_ID: request_id}) + return await call_next(request) def create_app(config: ServiceConfig) -> FastAPI: @@ -215,6 +237,14 @@ def create_app(config: ServiceConfig) -> FastAPI: _configure_logging(config) _apply_resource_limits(config) + # OTEL setup must run before lifespan: FastAPIInstrumentor adds a + # middleware, and Starlette locks the middleware stack once lifespan begins. + # Env defaults precede configure_otel so the requests instrumentor sees + # the HF exclude list at instrument-time and worker subprocesses inherit + # the service name from os.environ. + apply_otel_env_defaults(default_service_name=config.otel.service_name) + app_state_otel_shutdown = configure_otel(_to_otel_config(config.otel)) + app = FastAPI( title="Retriever Service", description="Low-latency document ingestion service powered by nemo-retriever", @@ -223,6 +253,7 @@ def create_app(config: ServiceConfig) -> FastAPI: lifespan=_lifespan, ) app.state.config = config + app.state.otel_shutdown = app_state_otel_shutdown app.add_middleware(_RequestIdMiddleware) @@ -238,6 +269,40 @@ def create_app(config: ServiceConfig) -> FastAPI: else: logger.info("Bearer-token authentication DISABLED (no api_token configured)") + # FastAPI instrumentation must register AFTER app-level middlewares so the + # OTEL HTTP span ends up outermost — _RequestIdMiddleware then runs inside + # it and its tag_current_span call attaches to the inbound HTTP span. + if config.otel.enabled: + try: + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + + # Patterns match the full URL via re.search, so anchor with $ + # (not ^) against the terminal id segment. CLI polls + # /v1/ingest/{job,status}/ every 2s — without exclusion a + # 200-sec job produces ~100 spans of poll noise. Override via + # OTEL_PYTHON_FASTAPI_EXCLUDED_URLS. + default_excludes = ",".join( + [ + "/v1/health", + "/metrics", + r"/v1/ingest/job/[^/]+$", + r"/v1/ingest/status/[^/]+$", + ] + ) + FastAPIInstrumentor.instrument_app( + app, + excluded_urls=default_excludes, + ) + logger.info( + "FastAPI auto-instrumentation enabled " + "(excluding health/metrics and per-job/per-document status-poll endpoints)" + ) + except ImportError: + logger.info( + "OTEL enabled but opentelemetry-instrumentation-fastapi is missing — " + "install nemo-retriever[otel] to capture HTTP spans" + ) + from nemo_retriever.service.routers import events, ingest, internal, metrics, query, rerank, stream, system app.include_router(events.router, prefix="/v1") diff --git a/nemo_retriever/src/nemo_retriever/service/cli.py b/nemo_retriever/src/nemo_retriever/service/cli.py index 8387004f05..f867166112 100644 --- a/nemo_retriever/src/nemo_retriever/service/cli.py +++ b/nemo_retriever/src/nemo_retriever/service/cli.py @@ -61,6 +61,16 @@ def start( None, "--gpu-devices", help="Comma-separated GPU device IDs (overrides YAML)." ), db_path: Optional[str] = typer.Option(None, "--db-path", help="SQLite database path (overrides YAML)."), + results_dir: Optional[str] = typer.Option( + None, + "--results-dir", + help="Directory for per-job page-result JSON files (overrides YAML).", + ), + lancedb_uri: Optional[str] = typer.Option( + None, + "--lancedb-uri", + help="LanceDB directory URI used by /v1/query (overrides YAML).", + ), api_token: Optional[str] = typer.Option( None, "--api-token", @@ -75,6 +85,26 @@ def start( "--drain-timeout-s", help="Seconds to wait for in-flight batches to finish on shutdown (overrides YAML).", ), + otel: Optional[bool] = typer.Option( + None, + "--otel/--no-otel", + help="Enable OpenTelemetry tracing/metrics export (overrides YAML).", + ), + otel_endpoint: Optional[str] = typer.Option( + None, + "--otel-endpoint", + help="OTLP collector endpoint, e.g. http://localhost:4317 (overrides YAML / $OTEL_EXPORTER_OTLP_ENDPOINT).", + ), + otel_service_name: Optional[str] = typer.Option( + None, + "--otel-service-name", + help="OTEL service.name resource attribute (overrides YAML).", + ), + otel_exporter: Optional[str] = typer.Option( + None, + "--otel-exporter", + help="One of otlp (default), console (stdout for debugging), or none.", + ), ) -> None: """Start the retriever ingest web server.""" import uvicorn @@ -108,10 +138,22 @@ def start( overrides["resources.gpu_devices"] = [d.strip() for d in gpu_devices.split(",") if d.strip()] if db_path is not None: overrides["database.path"] = db_path + if results_dir is not None: + overrides["processing.results_dir"] = results_dir + if lancedb_uri is not None: + overrides["vector_store.lancedb_uri"] = lancedb_uri if api_token is not None: overrides["auth.api_token"] = api_token if drain_timeout_s is not None: overrides["drain.timeout_s"] = drain_timeout_s + if otel is not None: + overrides["otel.enabled"] = otel + if otel_endpoint is not None: + overrides["otel.endpoint"] = otel_endpoint + if otel_service_name is not None: + overrides["otel.service_name"] = otel_service_name + if otel_exporter is not None: + overrides["otel.exporter"] = otel_exporter cfg = load_config(config_path=str(config) if config else None, overrides=overrides or None) diff --git a/nemo_retriever/src/nemo_retriever/service/config.py b/nemo_retriever/src/nemo_retriever/service/config.py index e67d4000ec..fe4d05cc67 100644 --- a/nemo_retriever/src/nemo_retriever/service/config.py +++ b/nemo_retriever/src/nemo_retriever/service/config.py @@ -137,6 +137,41 @@ class VectorStoreConfig(BaseModel): embedding_model: str = "nvidia/llama-nemotron-embed-1b-v2" +class OTELServiceConfig(BaseModel): + """OpenTelemetry tracing/metrics export for the service. + + When ``enabled`` is true, the service installs global ``TracerProvider`` + and ``MeterProvider`` instances at startup and instruments FastAPI, + httpx/requests, and SQLite via the corresponding ``opentelemetry- + instrumentation-*`` packages (install them via the ``[otel]`` extra). + + Standard ``OTEL_*`` environment variables — populated by Helm's + ``otelEnvVars`` block — take precedence over fields supplied here, so + a Kubernetes deployment can centralise endpoint/service-name config + in one place. + """ + + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + service_name: str = "nemo-retriever-service" + # OTLP collector endpoint. ``null`` falls back to the + # ``OTEL_EXPORTER_OTLP_ENDPOINT`` environment variable. + endpoint: str | None = None + # ``otlp`` (gRPC or HTTP, picked from endpoint scheme), ``console`` + # (stdout — useful for local debugging), or ``none`` (disable export + # while still collecting in-process spans for tests). + exporter: Literal["otlp", "console", "none"] = "otlp" + # Parent-based ratio sampler. 1.0 = always sample, 0.0 = never. + sampling_ratio: float = 1.0 + # Auto-instrument outbound httpx/requests, sqlite3, and logging. + auto_instrument: bool = True + # Free-form resource attribute dict appended to the SDK Resource — + # use for ``deployment.environment`` style labels not driven by + # ``OTEL_RESOURCE_ATTRIBUTES``. + resource_attributes: dict[str, str] = Field(default_factory=dict) + + class EventBusConfig(BaseModel): """SSE event-bus back-pressure policy. @@ -194,6 +229,7 @@ class ServiceConfig(BaseModel): event_bus: EventBusConfig = Field(default_factory=EventBusConfig) vector_store: VectorStoreConfig = Field(default_factory=VectorStoreConfig) reranker: RerankerConfig = Field(default_factory=RerankerConfig) + otel: OTELServiceConfig = Field(default_factory=OTELServiceConfig) def _bundled_yaml_path() -> Path: diff --git a/nemo_retriever/src/nemo_retriever/service/processing/pool.py b/nemo_retriever/src/nemo_retriever/service/processing/pool.py index ef2ce036ac..d88d5075d4 100644 --- a/nemo_retriever/src/nemo_retriever/service/processing/pool.py +++ b/nemo_retriever/src/nemo_retriever/service/processing/pool.py @@ -50,6 +50,19 @@ import pandas as pd +from nemo_retriever.observability import OTELConfig +from nemo_retriever.observability import attributes as otel_attrs +from nemo_retriever.observability.configure import configure as configure_otel +from nemo_retriever.observability.propagate import extract_context, inject_current_context +from nemo_retriever.observability.spans import operator_span +from nemo_retriever.observability.tracer import ( + get_tracer, + jobs_completed_counter, + jobs_failed_counter, + pages_completed_counter, + pages_failed_counter, + safe_add, +) from nemo_retriever.service.config import NimEndpointsConfig, ServiceConfig from nemo_retriever.service.db.engine import DatabaseEngine from nemo_retriever.service.db.repository import Repository @@ -320,6 +333,7 @@ def _build_operator_chain( _worker_chain: list[tuple[str, Any]] | None = None _worker_db_path: str | None = None +_worker_otel_shutdown: typing.Callable[[], None] | None = None def _record_probe_events(db_path: str) -> None: @@ -360,6 +374,7 @@ def _worker_initializer( nim_config_queue: multiprocessing.Queue, # type: ignore[type-arg] fallback_nim_config: dict[str, Any], vector_store_config: dict[str, Any] | None = None, + otel_config: dict[str, Any] | None = None, ) -> None: """Called exactly once per worker process by ProcessPoolExecutor. @@ -367,8 +382,14 @@ def _worker_initializer( multi-endpoint URLs are distributed round-robin. If the queue is empty (more processes than configs — shouldn't happen) the fallback config is used so the worker still functions. + + The ``otel_config`` dict (when supplied) installs a fresh + ``TracerProvider`` / ``MeterProvider`` inside the worker subprocess. + A subprocess does NOT inherit the parent's in-memory provider state, + so each worker must initialise its own — without this, spans built + inside ``_run_pipeline_batch`` would silently no-op. """ - global _worker_chain, _worker_db_path + global _worker_chain, _worker_db_path, _worker_otel_shutdown # Workers should not handle SIGINT — the main process owns shutdown. # Without this, Ctrl+C causes noisy KeyboardInterrupt tracebacks from @@ -382,6 +403,17 @@ def _worker_initializer( except ImportError: pass + if otel_config is not None: + try: + _worker_otel_shutdown = configure_otel(OTELConfig(**otel_config)) + if _worker_otel_shutdown is not None: + import atexit + + atexit.register(_worker_otel_shutdown) + except Exception: # noqa: BLE001 — never block worker startup on telemetry + logger.exception("[pid %d] Worker OTEL configure failed", os.getpid()) + _worker_otel_shutdown = None + _worker_db_path = db_path try: nim_config_dict = nim_config_queue.get_nowait() @@ -417,6 +449,7 @@ class PageDescriptor: job_id: str | None = None page_number: int = 1 spool_path: str | None = None + trace_context: dict[str, str] | None = None @dataclasses.dataclass @@ -443,6 +476,16 @@ class WorkerResult: page_contents: list[dict[str, Any]] = dataclasses.field(default_factory=list) +@dataclasses.dataclass +class _JobProgress: + """Per-job result counters + terminal-fired flag (pool-internal).""" + + seen: int = 0 + failed: int = 0 + expected: int | None = None + fired: bool = False + + @dataclasses.dataclass class BatchWorkerResult: """Picklable result returned from a worker process to the main process. @@ -476,6 +519,79 @@ def _run_pipeline_batch( using the ``_page_document_id`` provenance column. """ pid = os.getpid() + + span_attrs: dict[str, Any] = { + otel_attrs.RUN_MODE: "service", + otel_attrs.WORKER_PID: pid, + otel_attrs.BATCH_SIZE: len(page_descriptors), + } + + job_ids = {p.get("job_id") for p in page_descriptors if p.get("job_id")} + doc_ids = {p.get("document_id") for p in page_descriptors if p.get("document_id")} + if len(job_ids) == 1: + span_attrs[otel_attrs.JOB_ID] = next(iter(job_ids)) + if len(doc_ids) == 1: + span_attrs[otel_attrs.DOCUMENT_ID] = next(iter(doc_ids)) + span_attrs[otel_attrs.BATCH_JOB_COUNT] = len(job_ids) + span_attrs[otel_attrs.BATCH_DOCUMENT_COUNT] = len(doc_ids) + + parent_ctx, links = _resolve_batch_span_parents(page_descriptors) + if links: + span_attrs["nemo_retriever.batch.link_count"] = len(links) + + with get_tracer().start_as_current_span( + "pool.run_pipeline_batch", + context=parent_ctx, + links=links or None, + attributes=span_attrs, + record_exception=True, + set_status_on_exception=True, + ): + return _run_pipeline_batch_impl(page_descriptors, db_path=db_path) + + +def _resolve_batch_span_parents( + page_descriptors: list[dict[str, Any]], +) -> tuple[Any, list[Any]]: + """Pick a parent ``Context`` and ``Link`` list from per-page descriptors. + + Returns ``(parent_ctx, links)``. Pages that share the same + ``trace_id`` collapse to a single Link/Context (no point duplicating). + When no per-page carrier yields a valid context (e.g. OTEL was not + configured at submit time) the batch span becomes a fresh root. + """ + from opentelemetry import trace as _trace_api + + seen_trace_ids: set[int] = set() + contexts_in_order: list[Any] = [] + for d in page_descriptors: + carrier = d.get("trace_context") if isinstance(d, dict) else None + if not carrier: + continue + ctx = extract_context(carrier) + span = _trace_api.get_current_span(ctx) + sc = span.get_span_context() + if not sc.is_valid: + continue + if sc.trace_id in seen_trace_ids: + continue + seen_trace_ids.add(sc.trace_id) + contexts_in_order.append((ctx, sc)) + + if not contexts_in_order: + return extract_context(None), [] + + parent_ctx = contexts_in_order[0][0] + links = [_trace_api.Link(sc) for _ctx, sc in contexts_in_order[1:]] + return parent_ctx, links + + +def _run_pipeline_batch_impl( + page_descriptors: list[dict[str, Any]], + *, + db_path: str, +) -> BatchWorkerResult: + pid = os.getpid() engine = DatabaseEngine(db_path) repo = Repository(engine) @@ -604,13 +720,20 @@ def _run_pipeline_batch( failed_stage: str | None = None failed_exc: BaseException | None = None - for stage_name, op in _worker_chain: + for stage_index, (stage_name, op) in enumerate(_worker_chain): try: - df = op.run(df) - if df is None: - raise RuntimeError( - f"Operator '{stage_name}' ({type(op).__name__}) returned None instead of a DataFrame" - ) + with operator_span( + stage_name, + run_mode="service", + operator_class=type(op).__name__, + operator_index=stage_index, + batch_size=len(df) if df is not None else None, + ): + df = op.run(df) + if df is None: + raise RuntimeError( + f"Operator '{stage_name}' ({type(op).__name__}) returned None instead of a DataFrame" + ) except BaseException as exc: failed_stage = stage_name failed_exc = exc @@ -1244,8 +1367,8 @@ def __init__( self._draining = threading.Event() # In-memory job page counters for immediate SSE job_complete # detection without waiting for the DB writer thread. - self._job_page_counts: dict[str, int] = {} - self._job_page_counts_lock = threading.Lock() + self._job_progress: dict[str, _JobProgress] = {} + self._job_progress_lock = threading.Lock() # Background spool sweeper (started in start()). self._spool_cleanup_task: asyncio.Task | None = None @@ -1284,12 +1407,19 @@ def start(self) -> None: if self._config.vector_store is not None: vs_dict = self._config.vector_store.model_dump() + otel_dict: dict[str, Any] | None = None + otel_cfg = getattr(self._config, "otel", None) + if otel_cfg is not None and otel_cfg.enabled: + otel_dict = otel_cfg.model_dump() + if not otel_dict.get("resource_attributes"): + otel_dict["resource_attributes"] = None + logger.info("Initialising %d worker(s) — building operator chains", self._num_workers) self._executor = ProcessPoolExecutor( max_workers=self._num_workers, mp_context=ctx, initializer=_worker_initializer, - initargs=(db_path, nim_config_queue, fallback_nim_config, vs_dict), + initargs=(db_path, nim_config_queue, fallback_nim_config, vs_dict, otel_dict), ) warmup_futures = [self._executor.submit(os.getpid) for _ in range(self._num_workers)] @@ -1640,7 +1770,7 @@ def _publish_cancelled(self, page: dict[str, Any]) -> None: ) if job_id: - self._handle_job_completion(job_id) + self._handle_job_completion(job_id, success=False) # ------------------------------------------------------------------ # Result callback (runs in main process on a callback thread) @@ -1680,6 +1810,37 @@ def _handle_single_result(self, result: WorkerResult) -> None: doc_id = result.document_id job_id = result.job_id + # Per-page lifecycle counter — distinct from the per-operator + # ``documents.processed`` counter, which fires per stage. This + # one increments exactly once per page so dashboards can simply + # graph the rate. + if result.success: + safe_add(pages_completed_counter(), 1) + else: + safe_add( + pages_failed_counter(), + 1, + {otel_attrs.ERROR_CLASS: result.failure_type or "unknown"}, + ) + + if result.success: + logger.info( + "[page complete] doc=%s job=%s page=%s duration_ms=%.0f", + doc_id[:8], + (job_id or "")[:8], + result.page_number, + result.processing_duration_ms, + ) + else: + logger.warning( + "[page failed] doc=%s job=%s page=%s failure_type=%s error=%s", + doc_id[:8], + (job_id or "")[:8], + result.page_number, + result.failure_type, + (result.error_message or "")[:160], + ) + if result.success: page_complete_payload: dict[str, Any] = { "event": "page_complete", @@ -1783,39 +1944,89 @@ def _handle_single_result(self, result: WorkerResult) -> None: self._db_writer.enqueue(write_item) if job_id: - self._handle_job_completion(job_id) + self._handle_job_completion(job_id, success=result.success) - def _handle_job_completion(self, job_id: str) -> None: - """Track page completion in-memory and publish job_complete SSE immediately. + def expect_results(self, job_id: str, n: int) -> None: + """Override the expected ``WorkerResult`` count for *job_id*. - Uses an in-memory counter so the SSE fires as soon as the last - page result arrives, without waiting for the DB writer thread. + Required for whole-job uploads where ``job.total_pages`` is the + PDF page count, not the number of input documents. """ - with self._job_page_counts_lock: - self._job_page_counts[job_id] = self._job_page_counts.get(job_id, 0) + 1 - pages_completed = self._job_page_counts[job_id] + if n <= 0: + return + with self._job_progress_lock: + self._job_progress.setdefault(job_id, _JobProgress()).expected = n + + def _handle_job_completion(self, job_id: str, *, success: bool = True) -> None: + """Track per-result state; fire ``job_complete`` SSE + lifecycle + counter exactly once when ``seen >= expected``.""" + with self._job_progress_lock: + progress = self._job_progress.setdefault(job_id, _JobProgress()) + if progress.fired: + return + progress.seen += 1 + if not success: + progress.failed += 1 + # Snapshot inside the lock so the SSE payload reflects the + # result count at *this* call's threshold crossing, not + # whatever extra results other threads stack on while we + # release the lock to do the DB lookup below. + seen, failed, expected = progress.seen, progress.failed, progress.expected + # Fast path: ``expect_results`` declared a known target. + # Skip the DB lookup for the N-1 non-terminal results. + if expected is not None and seen < expected: + return repo = Repository(self._db_engine) job = repo.get_job(job_id) if job is None: return - if pages_completed >= job.total_pages and job.total_pages > 0: - with self._job_page_counts_lock: - self._job_page_counts.pop(job_id, None) - logger.info("[job %s] All %d pages of %s complete", job_id[:8], job.total_pages, job.filename) - asyncio.run_coroutine_threadsafe( - self._event_bus.publish( - job_id, - { - "event": "job_complete", - "job_id": job_id, - "filename": job.filename, - "total_pages": job.total_pages, - }, - ), - self._event_loop, + with self._job_progress_lock: + progress = self._job_progress.get(job_id) + if progress is None or progress.fired: + return + target = expected if expected is not None else job.total_pages + if target <= 0 or seen < target: + return + progress.fired = True + self._job_progress.pop(job_id, None) + + any_success = failed < seen + if any_success: + logger.info( + "[job complete] job=%s filename=%s total_pages=%d failed_results=%d", + job_id[:8], + job.filename, + job.total_pages, + failed, ) + safe_add(jobs_completed_counter(), 1) + else: + # Skip the natural-terminal counter when the cancel route + # already fired ``jobs.failed`` — avoids double-counting. + if not self.is_job_cancelled(job_id): + logger.warning( + "[job failed] job=%s filename=%s total_pages=%d reason=all_pages_failed", + job_id[:8], + job.filename, + job.total_pages, + ) + safe_add(jobs_failed_counter(), 1, {otel_attrs.STATUS: "failed"}) + + asyncio.run_coroutine_threadsafe( + self._event_bus.publish( + job_id, + { + "event": "job_complete", + "job_id": job_id, + "filename": job.filename, + "total_pages": job.total_pages, + "failed_pages": failed, + }, + ), + self._event_loop, + ) # ------------------------------------------------------------------ # Submit (public API — enqueues into the batch buffer) @@ -1854,6 +2065,8 @@ def try_submit( if self.is_job_cancelled(job_id): return False + trace_context = inject_current_context() or None + # When the bytes have been spooled, do NOT also pickle them into # the descriptor — the worker will read them from disk. descriptor = PageDescriptor( @@ -1864,5 +2077,6 @@ def try_submit( job_id=job_id, page_number=page_number, spool_path=spool_path, + trace_context=trace_context, ) return self._buffer.enqueue(dataclasses.asdict(descriptor)) diff --git a/nemo_retriever/src/nemo_retriever/service/retriever-service.yaml b/nemo_retriever/src/nemo_retriever/service/retriever-service.yaml index fb3ca30634..70e0b11eb8 100644 --- a/nemo_retriever/src/nemo_retriever/service/retriever-service.yaml +++ b/nemo_retriever/src/nemo_retriever/service/retriever-service.yaml @@ -116,6 +116,28 @@ vector_store: refine_factor: 10 embedding_model: "nvidia/llama-nemotron-embed-1b-v2" +# OpenTelemetry tracing/metrics export. When enabled, the service +# installs global TracerProvider and MeterProvider instances at startup +# and instruments FastAPI + outbound httpx/requests + SQLite. Install +# the optional dependencies with `pip install nemo-retriever[otel]`. +# +# Standard OTEL_* environment variables (typically injected by Helm via +# the `otelEnvVars` block) override values supplied here, so prod +# deployments centralise endpoint/service-name config in one place. +otel: + enabled: false + service_name: "nemo-retriever-service" + # Override OTLP collector endpoint. null = read OTEL_EXPORTER_OTLP_ENDPOINT. + endpoint: null + # otlp (default), console (stdout, debugging), or none. + exporter: "otlp" + # Parent-based trace ratio sampler. 1.0 = always sample, 0.0 = never. + sampling_ratio: 1.0 + # Auto-instrument outbound httpx/requests, sqlite3, and logging. + auto_instrument: true + # Extra resource attributes appended to the SDK Resource. + resource_attributes: {} + # SSE event-bus back-pressure policy. # # - drop_low_priority (default): silently shed page_complete events diff --git a/nemo_retriever/src/nemo_retriever/service/routers/ingest.py b/nemo_retriever/src/nemo_retriever/service/routers/ingest.py index 2685286011..b848ac699d 100644 --- a/nemo_retriever/src/nemo_retriever/service/routers/ingest.py +++ b/nemo_retriever/src/nemo_retriever/service/routers/ingest.py @@ -15,6 +15,9 @@ from fastapi import APIRouter, File, Form, HTTPException, Query, Request, UploadFile from fastapi.responses import JSONResponse +from nemo_retriever.observability import attributes as otel_attrs +from nemo_retriever.observability.spans import tag_document, tag_job, tag_upload +from nemo_retriever.observability.tracer import jobs_failed_counter, jobs_submitted_counter, safe_add from nemo_retriever.service.db.repository import Repository from nemo_retriever.service.event_logger import record_event from nemo_retriever.service.failure_types import EventCategory @@ -125,6 +128,8 @@ async def ingest_document( except (json.JSONDecodeError, Exception) as exc: raise HTTPException(status_code=400, detail=f"Invalid metadata JSON: {exc}") + tag_job(meta.job_id) + file_bytes = await file.read() content_sha256 = hashlib.sha256(file_bytes).hexdigest() @@ -132,6 +137,7 @@ async def ingest_document( existing = repo.get_document_by_sha(content_sha256) if existing is not None: + tag_document(existing.id) logger.info("Duplicate document detected (sha=%s), returning existing record", content_sha256[:12]) record_event( repo, @@ -214,6 +220,7 @@ async def ingest_document( filename = meta.filename or file.filename or "unknown" content_type = meta.content_type or file.content_type or "application/octet-stream" + tag_upload(filename=filename, total_pages=meta.total_pages) # Handle job creation / update when the client pre-splits pages job_id = meta.job_id @@ -228,6 +235,13 @@ async def ingest_document( ) repo.insert_job(job) repo.update_job_status(job_id, ProcessingStatus.PROCESSING) + safe_add(jobs_submitted_counter(), 1, {"endpoint": "/v1/ingest"}) + logger.info( + "[job submitted] job=%s filename=%s total_pages=%d", + job_id[:8], + filename, + meta.total_pages or 0, + ) repo.increment_job_pages_submitted(job_id) logger.info("[job %s] Received page %s/%s of %s", job_id[:8], meta.page_number, meta.total_pages, filename) @@ -242,6 +256,7 @@ async def ingest_document( metadata_json=json.dumps(meta.metadata), ) doc, lost_race = repo.insert_document_or_get(new_doc) + tag_document(doc.id) if lost_race: # Another concurrent request inserted the same SHA between our # get_document_by_sha() check above and the INSERT here. Treat @@ -402,6 +417,7 @@ async def get_ingest_status( request: Request, document_id: str, ) -> IngestStatus | IngestComplete: + tag_document(document_id) repo: Repository = request.app.state.repository doc = await asyncio.to_thread(repo.get_document, document_id) if doc is None: @@ -509,6 +525,7 @@ async def get_job_status( request: Request, job_id: str, ) -> JobStatus: + tag_job(job_id) repo: Repository = request.app.state.repository job = await asyncio.to_thread(repo.get_job, job_id) if job is None: @@ -553,6 +570,7 @@ async def get_job_page( ``/v1/ingest/job/{job_id}/results`` to fetch all output rows for every input page in one call. """ + tag_job(job_id) from pathlib import Path repo: Repository = request.app.state.repository @@ -598,6 +616,7 @@ async def get_job_results( Results are read from the filesystem at ``{results_dir}/{job_id}/``. Each JSON file corresponds to one output row produced by the pipeline. """ + tag_job(job_id) from pathlib import Path repo: Repository = request.app.state.repository @@ -844,13 +863,27 @@ async def ingest_whole_job( total_pages = 1 job_id = meta.job_id or uuid.uuid4().hex + tag_job(job_id) + tag_upload(filename=filename, total_pages=total_pages) job = Job(id=job_id, filename=filename, content_sha256="", total_pages=total_pages) + inserted = False try: repo.insert_job(job) repo.update_job_status(job_id, ProcessingStatus.PROCESSING) + inserted = True except Exception: # noqa: BLE001 — job_id may already exist pass + if inserted: + safe_add(jobs_submitted_counter(), 1, {"endpoint": "/v1/ingest/job"}) + logger.info("[job submitted] job=%s filename=%s total_pages=%d", job_id[:8], filename, total_pages) + # Whole-job upload produces exactly one ``WorkerResult`` (the entire + # PDF goes through the pipeline as a single input document). Tell + # the pool so ``jobs.completed`` / ``jobs.failed`` fires once the + # single result returns — without this override, terminal detection + # compares against ``total_pages`` (the PDF page count) and never + # fires for whole-job uploads. + pool.expect_results(job_id, 1) ok, payload = await asyncio.to_thread( _enqueue_one_page, @@ -963,13 +996,22 @@ async def ingest_batch( raise HTTPException(status_code=400, detail=f"Invalid metadata JSON: {exc}") job_id = meta.job_id or uuid.uuid4().hex + tag_job(job_id) + tag_upload(filename=meta.filename or "batch", total_pages=len(files)) if not meta.job_id: job = Job(id=job_id, filename=meta.filename or "batch", content_sha256="", total_pages=len(files)) + inserted = False try: await asyncio.to_thread(repo.insert_job, job) await asyncio.to_thread(repo.update_job_status, job_id, ProcessingStatus.PROCESSING) - except Exception: # noqa: BLE001 + inserted = True + except Exception: # noqa: BLE001 — job_id may already exist pass + if inserted: + safe_add(jobs_submitted_counter(), 1, {"endpoint": "/v1/ingest/batch"}) + logger.info( + "[job submitted] job=%s filename=%s total_pages=%d", job_id[:8], meta.filename or "batch", len(files) + ) accepted: list[IngestAccepted] = [] rejected: list[dict] = [] @@ -1015,6 +1057,7 @@ async def cancel_job( completion, but any subsequent batches that contain pages of the cancelled job are skipped before they're dispatched. """ + tag_job(job_id) repo: Repository = request.app.state.repository pool = request.app.state.processing_pool @@ -1023,6 +1066,15 @@ async def cancel_job( raise HTTPException(status_code=404, detail=f"Job {job_id} not found") pool.cancel_job(job_id) + if not info.get("already_terminal"): + # One increment per job cancellation; don't double-count when + # the user cancels a job that's already in a terminal state. + safe_add(jobs_failed_counter(), 1, {otel_attrs.STATUS: "cancelled"}) + logger.info( + "[job failed] job=%s reason=cancelled docs_cancelled=%d", + job_id[:8], + int(info.get("documents_cancelled", 0)), + ) job = await asyncio.to_thread(repo.get_job, job_id) return JobCancelResponse( @@ -1057,6 +1109,7 @@ async def delete_job( Pass ``?force=true`` to delete a queued or in-flight job (the pipeline will discard any remaining buffered pages silently). """ + tag_job(job_id) repo: Repository = request.app.state.repository job = await asyncio.to_thread(repo.get_job, job_id) if job is None: @@ -1144,6 +1197,7 @@ async def update_job( ``pages_completed`` counter is reset to 0 and every document's status is set back to ``queued``. """ + tag_job(job_id) if action != "requeue": raise HTTPException(status_code=400, detail=f"Unknown action '{action}'. Supported: requeue") diff --git a/nemo_retriever/src/nemo_retriever/text_embed/cpu_operator.py b/nemo_retriever/src/nemo_retriever/text_embed/cpu_operator.py index 2511b865c9..4c59c263d4 100644 --- a/nemo_retriever/src/nemo_retriever/text_embed/cpu_operator.py +++ b/nemo_retriever/src/nemo_retriever/text_embed/cpu_operator.py @@ -34,7 +34,7 @@ def __init__(self, params: EmbedParams) -> None: self._model = None api_key = self._kwargs.get("api_key") - if endpoint and api_key: + if endpoint: # Probe the /embeddings path with a model-name-only body — auth is # checked before body validation so a bad key returns 401 without # triggering inference. A valid key with an empty input returns 400. diff --git a/nemo_retriever/tests/test_observability.py b/nemo_retriever/tests/test_observability.py new file mode 100644 index 0000000000..96ea9d2efb --- /dev/null +++ b/nemo_retriever/tests/test_observability.py @@ -0,0 +1,249 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Minimal essential tests for the OpenTelemetry observability module. + +Each kept test guards a contract that has no other coverage: + +* ``test_operator_span_success`` / ``test_operator_span_failure`` — + happy path + exception path span/metric emission, including the + ``nim.kind`` derivation and ``error.class`` label. +* ``test_run_pipeline_batch_inherits_per_page_context`` — service mode + worker span parents under the inbound HTTP span via the W3C carrier + captured at ``try_submit`` time. +* ``test_ray_operator_span_wrapper_links_to_driver`` — Ray batch mode + actor span inherits the driver's ``pipeline_span``. +""" + +from __future__ import annotations + +from typing import Iterator + +import pandas as pd +import pytest +from opentelemetry import metrics, trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from nemo_retriever.graph.abstract_operator import AbstractOperator +from nemo_retriever.observability import ( + attributes as A, + inject_current_context, + operator_span, + pipeline_span, + reset_instrument_cache, +) + + +# OTEL Python allows ``set_*_provider`` only once per process — install +# SDK providers a single time and reset the in-memory exporter/reader +# state between tests. +_SPAN_EXPORTER = InMemorySpanExporter() +_METRIC_READER: InMemoryMetricReader | None = None +_PROVIDERS_INSTALLED = False + + +def _install_session_providers() -> InMemoryMetricReader: + global _METRIC_READER, _PROVIDERS_INSTALLED + if _PROVIDERS_INSTALLED: + assert _METRIC_READER is not None + return _METRIC_READER + + tp = trace.get_tracer_provider() + provider = tp if isinstance(tp, TracerProvider) else TracerProvider() + if not isinstance(tp, TracerProvider): + trace.set_tracer_provider(provider) + provider.add_span_processor(SimpleSpanProcessor(_SPAN_EXPORTER)) + + _METRIC_READER = InMemoryMetricReader() + existing = metrics.get_meter_provider() + if not isinstance(existing, MeterProvider): + metrics.set_meter_provider(MeterProvider(metric_readers=[_METRIC_READER])) + else: + raise RuntimeError("A non-test MeterProvider was already installed; in-memory reader cannot attach.") + _PROVIDERS_INSTALLED = True + return _METRIC_READER + + +@pytest.fixture +def otel_capture() -> Iterator[tuple[InMemorySpanExporter, InMemoryMetricReader]]: + reader = _install_session_providers() + reset_instrument_cache() + _SPAN_EXPORTER.clear() + yield _SPAN_EXPORTER, reader + + +def _finished_spans(exporter: InMemorySpanExporter): + return list(exporter.get_finished_spans()) + + +def _metric_names(metric_reader: InMemoryMetricReader) -> set[str]: + names: set[str] = set() + data = metric_reader.get_metrics_data() + if data is None: + return names + for resource_metrics in data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + names.add(metric.name) + return names + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_operator_span_success(otel_capture) -> None: + """Happy path: operator_span emits one span + records duration histogram + processed counter.""" + span_exporter, metric_reader = otel_capture + + with operator_span( + "FooOp", + run_mode="inprocess", + operator_class="OCRV2Actor", # implies nim.kind=ocr + operator_index=0, + batch_size=12, + ): + pass + + spans = _finished_spans(span_exporter) + assert len(spans) == 1 + span = spans[0] + assert span.name == "operator.FooOp" + assert span.attributes[A.OPERATOR_NAME] == "FooOp" + assert span.attributes[A.RUN_MODE] == "inprocess" + assert span.attributes[A.BATCH_SIZE] == 12 + # nim.kind derived from operator_class via the substring lookup — + # protects the canonical role mapping used by Zipkin filters. + assert span.attributes[A.NIM_KIND] == "ocr" + + names = _metric_names(metric_reader) + assert "nemo_retriever.operator.duration_ms" in names + assert "nemo_retriever.documents.processed" in names + + +def test_operator_span_failure(otel_capture) -> None: + """Exception path: span records the exception + ``documents.failed`` + counter fires with ``error.class`` attribute equal to the exception + type's ``__name__``.""" + span_exporter, metric_reader = otel_capture + + class _Boom(RuntimeError): + pass + + with pytest.raises(_Boom): + with operator_span("BarOp", run_mode="service", batch_size=4): + raise _Boom("boom") + + spans = _finished_spans(span_exporter) + assert len(spans) == 1 + assert any(event.name == "exception" for event in spans[0].events) + + data = metric_reader.get_metrics_data() + saw_failed_with_error_class = False + for resource_metrics in data.resource_metrics: + for scope_metrics in resource_metrics.scope_metrics: + for metric in scope_metrics.metrics: + if metric.name != "nemo_retriever.documents.failed": + continue + for point in metric.data.data_points: + if point.attributes.get(A.ERROR_CLASS) == "_Boom": + saw_failed_with_error_class = True + assert saw_failed_with_error_class + + +def test_run_pipeline_batch_inherits_per_page_context(otel_capture, monkeypatch) -> None: + """Service-mode worker batch span must parent under the inbound HTTP + span via per-page ``trace_context`` carriers captured at submit time. + Without this the trace tree splits across process boundaries. + """ + span_exporter, _ = otel_capture + + from nemo_retriever.service.processing import pool as pool_mod + + class _FakeRepo: + def update_document_status(self, *_a, **_kw): + return None + + def get_document(self, *_a, **_kw): + return None + + def get_job(self, *_a, **_kw): + return None + + monkeypatch.setattr(pool_mod, "DatabaseEngine", lambda *_a, **_kw: object()) + monkeypatch.setattr(pool_mod, "Repository", lambda _e: _FakeRepo()) + monkeypatch.setattr(pool_mod, "_worker_chain", []) + + with pipeline_span("service") as parent: + parent_trace_id = parent.get_span_context().trace_id + carrier = inject_current_context() + + descriptors = [ + { + "document_id": f"doc-{i}", + "content_sha256": "h", + "file_bytes": b"x", + "filename": "x.pdf", + "job_id": "job-1", + "page_number": i, + "spool_path": None, + "trace_context": carrier, + } + for i in range(3) + ] + pool_mod._run_pipeline_batch(descriptors, db_path="/tmp/unused.db") + + worker_spans = [s for s in _finished_spans(span_exporter) if s.name == "pool.run_pipeline_batch"] + assert len(worker_spans) == 1 + span = worker_spans[0] + assert span.context.trace_id == parent_trace_id + # Uniform batch → singular correlation attrs populated for Zipkin filters. + assert span.attributes[A.JOB_ID] == "job-1" + assert span.attributes[A.BATCH_DOCUMENT_COUNT] == 3 + + +def test_ray_operator_span_wrapper_links_to_driver(otel_capture) -> None: + """Ray-actor-side wrapper must produce an ``operator.`` span + parented to the driver's ``pipeline_span`` via the W3C carrier — the + whole point of the Ray batch instrumentation.""" + span_exporter, _ = otel_capture + + from nemo_retriever.observability import RayOperatorSpanWrapper + + class _AddOneOp(AbstractOperator): + def preprocess(self, d, **kw): + return d + + def process(self, d, **kw): + d = d.copy() + d["v"] = d["v"] + 1 + return d + + def postprocess(self, d, **kw): + return d + + with pipeline_span("batch") as parent: + parent_trace_id = parent.get_span_context().trace_id + carrier = inject_current_context() + + wrapper = RayOperatorSpanWrapper( + _otel_operator_class=_AddOneOp, + _otel_node_name="AddOne", + _otel_node_index=2, + _otel_parent_carrier=carrier, + ) + result = wrapper(pd.DataFrame({"v": [10, 20, 30]})) + assert list(result["v"]) == [11, 21, 31] + + op_spans = [s for s in _finished_spans(span_exporter) if s.name == "operator.AddOne"] + assert len(op_spans) == 1 + span = op_spans[0] + assert span.context.trace_id == parent_trace_id + assert span.attributes[A.RUN_MODE] == "batch" + assert span.attributes[A.BATCH_SIZE] == 3 diff --git a/nemo_retriever/uv.lock b/nemo_retriever/uv.lock index 3b646caf7e..a1d05f8db4 100644 --- a/nemo_retriever/uv.lock +++ b/nemo_retriever/uv.lock @@ -268,6 +268,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/9f/64/2e54428beba8d9992aa478bb8f6de9e4ecaa5f8f513bcfd567ed7fb0262d/apscheduler-3.11.2-py3-none-any.whl", hash = "sha256:ce005177f741409db4e4dd40a7431b76feb856b9dd69d57e0da49d6715bfd26d", size = 64439, upload-time = "2025-12-22T00:39:33.303Z" }, ] +[[package]] +name = "asgiref" +version = "3.11.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/63/40/f03da1264ae8f7cfdbf9146542e5e7e8100a4c66ab48e791df9a03d3f6c0/asgiref-3.11.1.tar.gz", hash = "sha256:5f184dc43b7e763efe848065441eac62229c9f7b0475f41f80e207a114eda4ce", size = 38550, upload-time = "2026-02-03T13:30:14.33Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5c/0a/a72d10ed65068e115044937873362e6e32fab1b7dce0046aeb224682c989/asgiref-3.11.1-py3-none-any.whl", hash = "sha256:e8667a091e69529631969fd45dc268fa79b99c92c5fcdda727757e52146ec133", size = 24345, upload-time = "2026-02-03T13:30:13.039Z" }, +] + [[package]] name = "astor" version = "0.8.1" @@ -2366,6 +2375,7 @@ dependencies = [ { name = "markitdown" }, { name = "nltk" }, { name = "numpy" }, + { name = "opentelemetry-api" }, { name = "pandas" }, { name = "pillow" }, { name = "prometheus-fastapi-instrumentator" }, @@ -2410,6 +2420,14 @@ all = [ { name = "neo4j" }, { name = "nvidia-ml-py" }, { name = "open-clip-torch" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp" }, + { name = "opentelemetry-instrumentation-fastapi" }, + { name = "opentelemetry-instrumentation-httpx" }, + { name = "opentelemetry-instrumentation-logging" }, + { name = "opentelemetry-instrumentation-requests" }, + { name = "opentelemetry-instrumentation-sqlite3" }, + { name = "opentelemetry-sdk" }, { name = "psutil" }, { name = "scikit-learn" }, { name = "scipy" }, @@ -2472,6 +2490,16 @@ multimedia = [ { name = "scipy" }, { name = "soundfile" }, ] +otel = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp" }, + { name = "opentelemetry-instrumentation-fastapi" }, + { name = "opentelemetry-instrumentation-httpx" }, + { name = "opentelemetry-instrumentation-logging" }, + { name = "opentelemetry-instrumentation-requests" }, + { name = "opentelemetry-instrumentation-sqlite3" }, + { name = "opentelemetry-sdk" }, +] tabular = [ { name = "duckdb" }, { name = "duckdb-engine" }, @@ -2509,7 +2537,7 @@ requires-dist = [ { name = "librosa", marker = "extra == 'multimedia'", specifier = ">=0.10.2" }, { name = "litellm", marker = "extra == 'llm'", specifier = ">=1.40.0" }, { name = "markitdown" }, - { name = "nemo-retriever", extras = ["benchmarks", "llm", "local", "multimedia", "tabular"], marker = "extra == 'all'" }, + { name = "nemo-retriever", extras = ["benchmarks", "llm", "local", "multimedia", "otel", "tabular"], marker = "extra == 'all'" }, { name = "nemotron-graphic-elements-v1", marker = "extra == 'local'", specifier = ">=0.dev0", index = "https://test.pypi.org/simple/" }, { name = "nemotron-ocr", marker = "(platform_machine == 'aarch64' and sys_platform == 'linux' and extra == 'local') or (platform_machine == 'x86_64' and sys_platform == 'linux' and extra == 'local')", specifier = ">=2.0.0.dev0,<2.0.0a0", index = "https://test.pypi.org/simple/" }, { name = "nemotron-page-elements-v3", marker = "extra == 'local'", specifier = ">=0.dev0", index = "https://test.pypi.org/simple/" }, @@ -2519,6 +2547,15 @@ requires-dist = [ { name = "numpy", specifier = ">=1.26.0" }, { name = "nvidia-ml-py", marker = "extra == 'local'" }, { name = "open-clip-torch", marker = "extra == 'benchmarks'", specifier = "==3.2.0" }, + { name = "opentelemetry-api", specifier = ">=1.40.0" }, + { name = "opentelemetry-api", marker = "extra == 'otel'", specifier = ">=1.40.0" }, + { name = "opentelemetry-exporter-otlp", marker = "extra == 'otel'", specifier = ">=1.40.0" }, + { name = "opentelemetry-instrumentation-fastapi", marker = "extra == 'otel'", specifier = ">=0.55b0" }, + { name = "opentelemetry-instrumentation-httpx", marker = "extra == 'otel'", specifier = ">=0.55b0" }, + { name = "opentelemetry-instrumentation-logging", marker = "extra == 'otel'", specifier = ">=0.55b0" }, + { name = "opentelemetry-instrumentation-requests", marker = "extra == 'otel'", specifier = ">=0.55b0" }, + { name = "opentelemetry-instrumentation-sqlite3", marker = "extra == 'otel'", specifier = ">=0.55b0" }, + { name = "opentelemetry-sdk", marker = "extra == 'otel'", specifier = ">=1.40.0" }, { name = "pandas", specifier = ">=2.0,<3" }, { name = "pillow", specifier = "==12.2.0" }, { name = "prometheus-fastapi-instrumentator", specifier = ">=7.0,<8" }, @@ -2553,7 +2590,7 @@ requires-dist = [ { name = "vllm", marker = "platform_machine != 'aarch64' and platform_machine != 'x86_64' and sys_platform == 'linux' and extra == 'local'", specifier = "==0.17.0" }, { name = "vllm", marker = "platform_machine == 'x86_64' and sys_platform == 'linux' and extra == 'local'", url = "https://github.com/vllm-project/vllm/releases/download/v0.17.0/vllm-0.17.0+cu130-cp38-abi3-manylinux_2_35_x86_64.whl" }, ] -provides-extras = ["local", "multimedia", "tabular", "benchmarks", "llm", "dev", "all"] +provides-extras = ["local", "multimedia", "tabular", "benchmarks", "llm", "otel", "dev", "all"] [[package]] name = "nemotron-graphic-elements-v1" @@ -3139,8 +3176,8 @@ name = "opentelemetry-exporter-otlp" version = "1.41.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "opentelemetry-exporter-otlp-proto-grpc", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-exporter-otlp-proto-http", marker = "sys_platform == 'linux'" }, + { name = "opentelemetry-exporter-otlp-proto-grpc" }, + { name = "opentelemetry-exporter-otlp-proto-http" }, ] sdist = { url = "https://files.pythonhosted.org/packages/42/84/d55baf8e1a222f40282956083e67de9fa92d5fa451108df4839505fa2a24/opentelemetry_exporter_otlp-1.41.1.tar.gz", hash = "sha256:299a2f0541ca175df186f5ac58fd5db177ba1e9b72b0826049062f750d55b47f", size = 6152, upload-time = "2026-04-24T13:15:40.006Z" } wheels = [ @@ -3152,7 +3189,7 @@ name = "opentelemetry-exporter-otlp-proto-common" version = "1.41.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "opentelemetry-proto", marker = "sys_platform == 'linux'" }, + { name = "opentelemetry-proto" }, ] sdist = { url = "https://files.pythonhosted.org/packages/ae/fa/f9e3bd3c4d692b3ce9a2880a167d1f79681a1bea11f00d5bf76adc03e6ea/opentelemetry_exporter_otlp_proto_common-1.41.1.tar.gz", hash = "sha256:0e253156ea9c36b0bd3d2440c5c9ba7dd1f3fb64ba7a08fc85fbac536b56e1fb", size = 20409, upload-time = "2026-04-24T13:15:40.924Z" } wheels = [ @@ -3164,13 +3201,13 @@ name = "opentelemetry-exporter-otlp-proto-grpc" version = "1.41.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "googleapis-common-protos", marker = "sys_platform == 'linux'" }, - { name = "grpcio", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-api", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-exporter-otlp-proto-common", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-proto", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-sdk", marker = "sys_platform == 'linux'" }, - { name = "typing-extensions", marker = "sys_platform == 'linux'" }, + { name = "googleapis-common-protos" }, + { name = "grpcio" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "typing-extensions" }, ] sdist = { url = "https://files.pythonhosted.org/packages/1e/9b/e4503060b8695579dbaad187dc8cef4554188de68748c88060599b77489e/opentelemetry_exporter_otlp_proto_grpc-1.41.1.tar.gz", hash = "sha256:b05df8fa1333dc9a3fda36b676b96b5095ab6016d3f0c3296d430d629ba1443b", size = 25755, upload-time = "2026-04-24T13:15:41.93Z" } wheels = [ @@ -3182,13 +3219,13 @@ name = "opentelemetry-exporter-otlp-proto-http" version = "1.41.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "googleapis-common-protos", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-api", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-exporter-otlp-proto-common", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-proto", marker = "sys_platform == 'linux'" }, - { name = "opentelemetry-sdk", marker = "sys_platform == 'linux'" }, - { name = "requests", marker = "sys_platform == 'linux'" }, - { name = "typing-extensions", marker = "sys_platform == 'linux'" }, + { name = "googleapis-common-protos" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-exporter-otlp-proto-common" }, + { name = "opentelemetry-proto" }, + { name = "opentelemetry-sdk" }, + { name = "requests" }, + { name = "typing-extensions" }, ] sdist = { url = "https://files.pythonhosted.org/packages/33/5b/9d3c7f70cca10136ba82a81e738dee626c8e7fc61c6887ea9a58bf34c606/opentelemetry_exporter_otlp_proto_http-1.41.1.tar.gz", hash = "sha256:4747a9604c8550ab38c6fd6180e2fcb80de3267060bef2c306bad3cb443302bc", size = 24139, upload-time = "2026-04-24T13:15:42.977Z" } wheels = [ @@ -3209,6 +3246,126 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c8/d2/ee4002b88e20c59fae52bed008a63c6f7eff7d498f302032f6b0434a6de7/opentelemetry_exporter_prometheus-0.62b1-py3-none-any.whl", hash = "sha256:7a0b8a6402e107e1f93e38f074a668797e1103936b189561959531a67ffeba55", size = 13278, upload-time = "2026-04-24T13:15:22.485Z" }, ] +[[package]] +name = "opentelemetry-instrumentation" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "packaging" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/52/cb/0523b92c112a6cc70be43724343dc45225d3af134419844d7879a07755d4/opentelemetry_instrumentation-0.62b1.tar.gz", hash = "sha256:90e92a905ba4f84db06ac3aec96701df6c079b2d66e9379f8739f0a1bdcc7f45", size = 34043, upload-time = "2026-04-24T13:22:31.997Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4d/0f/45adbaea1f81b847cffdcee4f4b5f89297e42facf7fac78c7aaac4c38e75/opentelemetry_instrumentation-0.62b1-py3-none-any.whl", hash = "sha256:976fc6e640f2006599e97429c949e622c108d0c17c2059347d1e6c93c707f257", size = 34163, upload-time = "2026-04-24T13:21:31.722Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-asgi" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "asgiref" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/54/43/b2f0703ff46718ff7b17d7fbf8e9d7f20e26a23c7c325092dd762d09cf9d/opentelemetry_instrumentation_asgi-0.62b1.tar.gz", hash = "sha256:7cf5f5d5c493bbb1edd2bd6d51fa879d964e94048904017258a32ffa47329310", size = 26781, upload-time = "2026-04-24T13:22:37.158Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/41/968c1fe12fb90abffca6620e65d4af91451c02ecca8f74a17a62cac490de/opentelemetry_instrumentation_asgi-0.62b1-py3-none-any.whl", hash = "sha256:b7f89be48528512619bd54fa2459f72afb1695ba71d7024d382ad96d467e7fa8", size = 17011, upload-time = "2026-04-24T13:21:38.006Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-dbapi" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1b/f4/8a8ad1d9477dc8e4f5d28648ad51cb9f110a40a25744b2f16f1c5e2f524a/opentelemetry_instrumentation_dbapi-0.62b1.tar.gz", hash = "sha256:82755a0f013980b2c000a10452fb91ab5135b937a8afe37c45f34e62ecc7a2b8", size = 16920, upload-time = "2026-04-24T13:22:47.762Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a5/5c/8f2bb672f167746c32c820170cb6b177243a45736f97ecc8b479dcddaece/opentelemetry_instrumentation_dbapi-0.62b1-py3-none-any.whl", hash = "sha256:dd2cc4085e832a708835218833b839a88ef0ace66851d14debefe6b2ef363a3c", size = 14204, upload-time = "2026-04-24T13:21:50.204Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-fastapi" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-instrumentation-asgi" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/77/38/91780475a25370b6d483afbaed3e1e170459d6351c5f7c08d66b65e2172e/opentelemetry_instrumentation_fastapi-0.62b1.tar.gz", hash = "sha256:b377d4ba32868fb1ff0f64da3fcdd3aa154d698fc83d65f5d380ea21bf31ee19", size = 25054, upload-time = "2026-04-24T13:22:50.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/8c/6f/602e4081d3fe82731aff7e3e9c2f1662d85701841d6dc25f16a1874e11cd/opentelemetry_instrumentation_fastapi-0.62b1-py3-none-any.whl", hash = "sha256:93fa9cc4f315819aee5f4fceb6196c1e5b0fbd789c5520c631de228bd3e5285b", size = 13484, upload-time = "2026-04-24T13:21:54.538Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-httpx" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/33/cb/7a418e69c7dad281803529cb4f6de1b747d802cca44c38032668690b4836/opentelemetry_instrumentation_httpx-0.62b1.tar.gz", hash = "sha256:a1fac9bcc3a6ef5996a7990563f1af0798468b2c146de535fd598369383fba7e", size = 24181, upload-time = "2026-04-24T13:22:52.124Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c7/e0/eca824e9492ccec00e055bdd243aeda8eb7c5eda746d98af4d7a2d97ecf3/opentelemetry_instrumentation_httpx-0.62b1-py3-none-any.whl", hash = "sha256:88614015df451d61bc7e73f22524e6f223611f80b6caad2f6bdcbe05fa0df653", size = 17201, upload-time = "2026-04-24T13:21:58.072Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-logging" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/3b/25/a30e0160cb3654bb63936be16d8ffe5f4a658d10bec0d5509cca3c74f103/opentelemetry_instrumentation_logging-0.62b1.tar.gz", hash = "sha256:997359d29ce06cb3768677387469431d0484b2450b5c35d7f02361431d3de338", size = 18969, upload-time = "2026-04-24T13:22:54.275Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/47/e4/216d1c7ff9c10815a8587ecbca0b570596921f001d1e2c2903c6f19e2e90/opentelemetry_instrumentation_logging-0.62b1-py3-none-any.whl", hash = "sha256:969330216d1ae02f4e10f1a030566ae758114caead020817192e6a02c6d1a0e1", size = 17488, upload-time = "2026-04-24T13:22:00.726Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-requests" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-http" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/85/03/eb26e1c65fd776206b759955d33151ee39ee0e7ac8dd80c97385c321a8d0/opentelemetry_instrumentation_requests-0.62b1.tar.gz", hash = "sha256:67a79c4b67e2192445c1cf03d62126fa623065688d8bd1a9f87f858b0e5f0286", size = 18401, upload-time = "2026-04-24T13:23:02.291Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d8/5a/cddb1f93bd17d6cfc99038a0aaa9af3d6bf455dedfe24d0ba4e13c1d83f7/opentelemetry_instrumentation_requests-0.62b1-py3-none-any.whl", hash = "sha256:ca348f2f51b715c21e86d82106d784f7069ae849c3e636ab37e34dc0ba510b8c", size = 14208, upload-time = "2026-04-24T13:22:13.89Z" }, +] + +[[package]] +name = "opentelemetry-instrumentation-sqlite3" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-instrumentation" }, + { name = "opentelemetry-instrumentation-dbapi" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/22/17/3294134715ebc458fd8668b41c0a93e1cec025c41bd2642fa976af763f39/opentelemetry_instrumentation_sqlite3-0.62b1.tar.gz", hash = "sha256:14ba18aed35a01e1d89bb814e49f7d6dd90bb68b2034ccf047801ca4659dba27", size = 7961, upload-time = "2026-04-24T13:23:03.793Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0e/02/aa91082f2074e341cbf880500ee086bab9046c58c5061413e232ce035064/opentelemetry_instrumentation_sqlite3-0.62b1-py3-none-any.whl", hash = "sha256:3ab0d614bab9956126ce81a22315bf58d56d653a65675a66e8b525f880ec3016", size = 9342, upload-time = "2026-04-24T13:22:15.961Z" }, +] + [[package]] name = "opentelemetry-proto" version = "1.41.1" @@ -3261,6 +3418,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/55/22/41fb05f1dc5fda2c468e05a41814c20859016c85117b66c8a257cae814f6/opentelemetry_semantic_conventions_ai-0.5.1-py3-none-any.whl", hash = "sha256:25aeb22bd261543b4898a73824026d96770e5351209c7d07a0b1314762b1f6e4", size = 11250, upload-time = "2026-03-26T14:20:37.108Z" }, ] +[[package]] +name = "opentelemetry-util-http" +version = "0.62b1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/24/1b/aa71b63e18d30a8384036b9937f40f7618f8030a7aa213155fb54f6f2b47/opentelemetry_util_http-0.62b1.tar.gz", hash = "sha256:adf6facbb89aef8f8bc566e2f04624942ba08a7b678b3479a91051a8f4dc70a3", size = 11393, upload-time = "2026-04-24T13:23:12.994Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/5d/85/a9d9d32161c1ced61346267db4c9702da54f81ec5dc88214bc65c23f4e9d/opentelemetry_util_http-0.62b1-py3-none-any.whl", hash = "sha256:c57e8a6c19fc422c288e6074e882f506f85030b69b7376182f74f9257b9261f0", size = 9295, upload-time = "2026-04-24T13:22:28.078Z" }, +] + [[package]] name = "orjson" version = "3.11.9"