Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion nemo_retriever/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ dependencies = [
"langchain-nvidia-ai-endpoints>=0.3.0",
# Default VDB solution
"lancedb",
# OpenTelemetry — API only, ~70 KB pure-Python. When no provider is
# registered every instrumentation point is a near-zero-cost no-op.
# The SDK + exporters live behind the ``[otel]`` extra below.
"opentelemetry-api>=1.40.0",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -142,14 +146,30 @@ benchmarks = [
llm = [
"litellm>=1.40.0",
]

# ── OpenTelemetry SDK + exporters + auto-instrumentations ───────────────────
# Install this when you want NeMo Retriever to *emit* traces and metrics
# rather than just defer to a host application's provider. Service mode
# pulls these in by default; library users only need them for one-line
# observability bootstrapping (``OTELConfig`` + ``configure``).
otel = [
"opentelemetry-api>=1.40.0",
"opentelemetry-sdk>=1.40.0",
"opentelemetry-exporter-otlp>=1.40.0",
"opentelemetry-instrumentation-fastapi>=0.55b0",
"opentelemetry-instrumentation-httpx>=0.55b0",
"opentelemetry-instrumentation-requests>=0.55b0",
"opentelemetry-instrumentation-sqlite3>=0.55b0",
"opentelemetry-instrumentation-logging>=0.55b0",
]
dev = [
"build>=1.2.2",
"pytest>=8.0.2",
]

# ── Convenience: full install ─────────────────────────────────────────────────
all = [
"nemo_retriever[local,multimedia,tabular,benchmarks,llm]",
"nemo_retriever[local,multimedia,tabular,benchmarks,llm,otel]",
]

[project.scripts]
Expand Down
48 changes: 37 additions & 11 deletions nemo_retriever/src/nemo_retriever/graph/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from nemo_retriever.graph.gpu_operator import GPUOperator
from nemo_retriever.graph.pipeline_graph import Graph, Node
from nemo_retriever.graph.operator_resolution import resolve_graph
from nemo_retriever.observability.propagate import inject_current_context
from nemo_retriever.observability.ray_integration import RayOperatorSpanWrapper, collect_otel_env
from nemo_retriever.observability.spans import operator_span
from nemo_retriever.utils.hf_cache import collect_hf_runtime_env
from nemo_retriever.utils.input_files import (
_is_explicit_glob_path,
Expand Down Expand Up @@ -131,12 +134,26 @@ def ingest(self, data: Any, **kwargs: Any) -> Any:

if self._show_progress and tqdm is not None:
pbar = tqdm(operators, desc="Pipeline stages", unit="stage")
for name, op in pbar:
for index, (name, op) in enumerate(pbar):
pbar.set_postfix_str(name)
df = op.run(df)
with operator_span(
name,
run_mode="inprocess",
operator_class=type(op).__name__,
operator_index=index,
batch_size=len(df) if df is not None else None,
):
df = op.run(df)
else:
for _name, op in operators:
df = op.run(df)
for index, (name, op) in enumerate(operators):
with operator_span(
name,
run_mode="inprocess",
operator_class=type(op).__name__,
operator_index=index,
batch_size=len(df) if df is not None else None,
):
df = op.run(df)

return df

Expand Down Expand Up @@ -249,6 +266,7 @@ def ingest(self, data: Any, **kwargs: Any) -> Any:
}
ray_env_vars.update(collect_hf_runtime_env())
ray_env_vars.update(collect_remote_auth_runtime_env())
ray_env_vars.update(collect_otel_env())
os.environ["HF_HUB_OFFLINE"] = ray_env_vars["HF_HUB_OFFLINE"]
runtime_env = {"env_vars": ray_env_vars}
ray.init(
Expand All @@ -272,8 +290,13 @@ def ingest(self, data: Any, **kwargs: Any) -> Any:
ds = rd.read_binary_files(input_paths, include_paths=True)
except FileNotFoundError as exc:
raise_input_path_not_found(input_paths or [], exc)

# Snapshot the driver's pipeline_span so actor operator spans can
# attach as children rather than become orphan roots.
otel_parent_carrier = inject_current_context()

nodes = self._linearize(resolved_graph)
for node in nodes:
for node_index, node in enumerate(nodes):
overrides = dict(self._node_overrides.get(node.name, {}))
target_num_rows_per_block = overrides.pop("target_num_rows_per_block", None)
batch_size = overrides.pop("batch_size", self._default_batch_size)
Expand Down Expand Up @@ -366,17 +389,20 @@ def ingest(self, data: Any, **kwargs: Any) -> Any:
elif target_num_rows_per_block is not None and int(target_num_rows_per_block) > 0:
ds = ds.repartition(target_num_rows_per_block=int(target_num_rows_per_block))

# Pass the operator class directly to map_batches with
# fn_constructor_kwargs for deferred construction on workers.
# AbstractOperator.__call__ delegates to run(), so each stage
# executes the full preprocess -> process -> postprocess chain.
wrapped_kwargs = {
"_otel_operator_class": node.operator_class,
"_otel_node_name": node.name,
"_otel_node_index": node_index,
"_otel_parent_carrier": otel_parent_carrier or None,
"_otel_operator_kwargs": dict(node.operator_kwargs or {}),
}
ds = ds.map_batches(
node.operator_class,
RayOperatorSpanWrapper,
batch_size=batch_size,
batch_format=batch_format,
num_cpus=num_cpus,
num_gpus=num_gpus,
fn_constructor_kwargs=node.operator_kwargs,
fn_constructor_kwargs=wrapped_kwargs,
**overrides,
)

Expand Down
30 changes: 30 additions & 0 deletions nemo_retriever/src/nemo_retriever/graph_ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from nemo_retriever.graph import InprocessExecutor, RayDataExecutor
from nemo_retriever.graph.ingestor_runtime import batch_tuning_to_node_overrides, build_graph
from nemo_retriever.ingestor import ingestor
from nemo_retriever.observability import OTELConfig
from nemo_retriever.observability.configure import apply_otel_env_defaults, configure as _configure_otel
from nemo_retriever.observability.spans import pipeline_span
from nemo_retriever.params import (
ASRParams,
AudioChunkParams,
Expand Down Expand Up @@ -177,6 +180,11 @@ class GraphIngestor(ingestor):
``"raise"`` raises when explicitly configured remote NIM stages report
row-level errors. ``"collect"`` returns partial results with the stage
error payloads preserved.
otel
Optional :class:`~nemo_retriever.observability.OTELConfig` for
one-shot SDK setup. ``None`` (default) means defer to whatever
provider the host application has already registered globally —
no spans are emitted unless the host has wired up OTEL.
"""

RUN_MODE = "graph"
Expand All @@ -196,6 +204,7 @@ def __init__(
node_overrides: Optional[Dict[str, Dict[str, Any]]] = None,
show_progress: bool = True,
error_policy: str = "raise",
otel: Optional[OTELConfig] = None,
) -> None:
super().__init__(documents=documents)
if run_mode not in {"batch", "inprocess"}:
Expand All @@ -214,6 +223,18 @@ def __init__(
self._show_progress = show_progress
self._error_policy = error_policy
self._rd_dataset: Any = None
# Seed env defaults BEFORE _configure_otel (the requests instrumentor
# reads OTEL_PYTHON_REQUESTS_EXCLUDED_URLS at instrument-time) and
# BEFORE the executor calls collect_otel_env (so Ray actors inherit
# the same service name + exclude list via runtime_env).
apply_otel_env_defaults(default_service_name=f"nemo-retriever-{run_mode}")
if otel is not None:
# Register the shutdown callback at atexit so library-mode
# callers flush the BatchSpanProcessor queue without needing
# to invoke it manually. Idempotent on the SDK side.
import atexit

atexit.register(_configure_otel(otel))

# Pipeline configuration accumulated by fluent methods
self._extraction_mode: str = "pdf"
Expand Down Expand Up @@ -436,6 +457,15 @@ def ingest(self, params: Any = None, **kwargs: Any) -> Any:

post_extract_order = tuple(s for s in self._stage_order if s != "extract")

with pipeline_span(
self._run_mode,
extraction_mode=self._extraction_mode,
stages=post_extract_order,
):
return self._run_pipeline(post_extract_order)

def _run_pipeline(self, post_extract_order: tuple[str, ...]) -> Any:
"""Build the executor and run the pipeline."""
if self._run_mode == "batch":
import ray

Expand Down
9 changes: 9 additions & 0 deletions nemo_retriever/src/nemo_retriever/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple, Union

from nemo_retriever.observability import OTELConfig
from nemo_retriever.params import CaptionParams
from nemo_retriever.params import DedupParams
from nemo_retriever.params import EmbedParams
Expand All @@ -44,10 +45,17 @@ def create_ingestor(
*,
run_mode: RunMode = "inprocess",
params: IngestorCreateParams | None = None,
otel: OTELConfig | None = None,
**kwargs: Any,
) -> "Ingestor":
"""
Graph-only ingestion factory.

Pass ``otel=OTELConfig(...)`` to install global OpenTelemetry providers
when the host application has not already configured them. When
``otel`` is ``None`` (default), instrumentation defers to whatever
provider is already registered — meaning embedded library use inside
an OTEL-instrumented host needs no extra wiring.
"""
merged = _merge_params(params, kwargs)
if isinstance(merged, IngestorCreateParams):
Expand Down Expand Up @@ -80,6 +88,7 @@ def create_ingestor(
debug=parsed.debug,
allow_no_gpu=parsed.allow_no_gpu,
error_policy=parsed.error_policy,
otel=otel,
)


Expand Down
Loading
Loading