diff --git a/nemo_retriever/src/nemo_retriever/graph/executor.py b/nemo_retriever/src/nemo_retriever/graph/executor.py index 14a323ab08..b4639383bc 100644 --- a/nemo_retriever/src/nemo_retriever/graph/executor.py +++ b/nemo_retriever/src/nemo_retriever/graph/executor.py @@ -380,4 +380,4 @@ def ingest(self, data: Any, **kwargs: Any) -> Any: **overrides, ) - return ds.to_pandas() + return ds.materialize() diff --git a/nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py b/nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py index eaf82131ff..b84dedb0c0 100644 --- a/nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py +++ b/nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py @@ -365,6 +365,24 @@ def _force_cpu_only(node_name: str) -> None: store_override["concurrency"] = (1, store_workers, 1) if store_workers > 1 else 1 store_override["num_cpus"] = DEFAULT_STORE_CPUS_PER_ACTOR + # IngestVdbOperator opens the LanceDB table with ``mode="overwrite"`` on + # its first batch and ``table.add(...)`` on every subsequent batch. With + # concurrency > 1, multiple actors would each treat their own first batch + # as the overwrite and clobber each other's writes. + overrides.setdefault(IngestVdbOperator.__name__, {})["concurrency"] = 1 + + # Coalesce Ray Data blocks into ~1000-row LanceDB writes. Every + # ``table.add(rows)`` commits a new entry to Lance's table-version + # manifest, and Lance reads the manifest linearly on every commit *and* + # every table reopen — so commit latency grows with the count of prior + # commits. At the default ~32-row block size coming out of + # ``StreamingRepartition`` this would be ~80k commits for a bo767-sized + # corpus (~80k rows); batching to ~1000 cuts that ~30× to ~3k commits and + # keeps per-commit time roughly flat. Higher values further reduce commit + # count but trade off per-batch memory in the writer actor — callers can + # raise the value via ``node_overrides``; we only set a default here. + overrides[IngestVdbOperator.__name__].setdefault("batch_size", 1000) + return overrides diff --git a/nemo_retriever/src/nemo_retriever/graph_ingestor.py b/nemo_retriever/src/nemo_retriever/graph_ingestor.py index a8127a2d38..8c323eb6dc 100644 --- a/nemo_retriever/src/nemo_retriever/graph_ingestor.py +++ b/nemo_retriever/src/nemo_retriever/graph_ingestor.py @@ -34,6 +34,7 @@ from nemo_retriever.graph import InprocessExecutor, RayDataExecutor from nemo_retriever.graph.ingestor_runtime import batch_tuning_to_node_overrides, build_graph from nemo_retriever.ingestor import ingestor +from nemo_retriever.vdb.operators import _construct_vdb from nemo_retriever.params import ( ASRParams, AudioChunkParams, @@ -557,8 +558,23 @@ def ingest(self, params: Any = None, **kwargs: Any) -> Any: result = executor.ingest(self._documents) self._raise_for_stage_errors(result) + self._finalize_vdb_upload() return result + def _finalize_vdb_upload(self) -> None: + """Build the VDB search index once after the graph has finished + writing. + + ``IngestVdbOperator`` streams per-batch writes during the run; the + index build is a one-shot operation that doesn't need any of the + graph's row data. + """ + params = self._vdb_upload_params + if params is None: + return + vdb = _construct_vdb(vdb_op=params.vdb_op, vdb_kwargs=params.vdb_kwargs) + vdb.build_index() + # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ diff --git a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py index 2cbda3a2ce..114f46f766 100644 --- a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py +++ b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py @@ -73,6 +73,7 @@ VideoFrameTextDedupParams, ) from nemo_retriever.params.models import BatchTuningParams +from nemo_retriever.pipeline.ingest_result import IngestResult from nemo_retriever.utils.input_files import resolve_input_patterns from nemo_retriever.utils.remote_auth import resolve_remote_api_key @@ -577,31 +578,26 @@ def _build_ingestor( return ingestor -def _collect_results(run_mode: str, result: Any) -> tuple[list[dict[str, Any]], Any, float, int]: - """Materialize the graph result into a list of records + DataFrame. +def _collect_results(run_mode: str, result: Any) -> tuple["IngestResult", float, int]: + """Wrap the graph result into a streaming-friendly IngestResult. - Ingest may return a ``pandas.DataFrame`` (in-process or after - ``ray.data.Dataset.to_pandas()`` in the executor), a ``ray.data.Dataset``, - or a :class:`~nemo_retriever.service_ingestor.ServiceIngestResult` (service - mode); normalize to a consistent ``(records, DataFrame, secs, units)`` tuple. + Ingest may return a ``pandas.DataFrame`` (in-process), a ``ray.data.Dataset`` + (batch mode), or a ``nemo_retriever.service_ingestor.ServiceIngestResult`` + (service mode); each is mapped to the matching ``IngestResult`` constructor without + ever pulling the full corpus onto the driver as a single pandas DataFrame. - Returns ``(records, result_df, ray_download_secs, num_input_units)``. + Returns ``(result_handle, ray_download_secs, num_input_units)``. """ if run_mode == "service": - records = list(result) - result_df = pd.DataFrame(records) if records else pd.DataFrame() - num_units = getattr(result, "total_pages", 0) or len(records) - return records, result_df, 0.0, num_units + handle = IngestResult.from_service(result) + return handle, 0.0, handle.unique_source_count() if isinstance(result, pd.DataFrame): - result_df = result + handle = IngestResult.from_dataframe(result) else: - result_df = result.to_pandas() - records = result_df.to_dict("records") - ray_download_time = 0.0 - - return records, result_df, float(ray_download_time), _count_input_units(result_df) + handle = IngestResult.from_dataset(result) + return handle, 0.0, handle.unique_source_count() def _count_uploadable_vdb_records(records: list[dict[str, Any]]) -> int: @@ -1411,7 +1407,8 @@ def run( ingest_start = time.perf_counter() raw_result = ingestor.ingest() ingestion_only_total_time = time.perf_counter() - ingest_start - ingest_local_results, result_df, ray_download_time, num_rows = _collect_results(run_mode, raw_result) + ingest_result, ray_download_time, num_rows = _collect_results(run_mode, raw_result) + total_row_count = ingest_result.row_count() if run_mode == "service": # The service writes embeddings to LanceDB server-side during @@ -1421,14 +1418,14 @@ def run( logger.info( "Service-mode ingestion complete (%d results from %d input(s), %.1fs). " "VDB writes are handled server-side.", - len(ingest_local_results), + total_row_count, num_rows, ingestion_only_total_time, ) - uploadable_vdb_records = len(ingest_local_results) + uploadable_vdb_records = total_row_count vdb_upload_time = 0.0 else: - uploadable_vdb_records = _count_uploadable_vdb_records(ingest_local_results) + uploadable_vdb_records = ingest_result.count_uploadable_vdb_records() vdb_upload_time = 0.0 if uploadable_vdb_records == 0: logger.warning( @@ -1440,26 +1437,26 @@ def run( "Prepared %s uploadable VDB records (%s graph rows) for in-graph upload to %s " "(row conversion count, not backend-confirmed writes; see VDB/operator logs for persistence).", uploadable_vdb_records, - len(ingest_local_results), + total_row_count, resolved_vdb_op, ) if save_intermediate is not None: out_dir = Path(save_intermediate).expanduser().resolve() out_dir.mkdir(parents=True, exist_ok=True) + # Streaming write: Ray Data produces a directory of per-block + # parquet files (one file in inprocess/service mode). Existing + # readers in nemo_retriever already handle the directory form. out_path = out_dir / "extraction.parquet" - result_df.to_parquet(out_path, index=False) + ingest_result.write_parquet_dir(out_path) logger.info("Wrote extraction Parquet for intermediate use: %s", out_path) if detection_summary_file is not None: - from nemo_retriever.utils.detection_summary import ( - collect_detection_summary_from_df, - write_detection_summary, - ) + from nemo_retriever.utils.detection_summary import write_detection_summary write_detection_summary( Path(detection_summary_file), - collect_detection_summary_from_df(result_df), + ingest_result.detection_summary(), ) if uploadable_vdb_records == 0 and run_mode != "service": @@ -1494,7 +1491,7 @@ def run( "input_path": str(Path(input_path).resolve()), "input_pages": int(num_rows), "num_pages": int(num_rows), - "num_rows": int(len(result_df.index)), + "num_rows": int(total_row_count), "ingestion_only_secs": float(ingestion_only_total_time), "ray_download_secs": float(ray_download_time), "vdb_upload_secs": float(vdb_upload_time), @@ -1572,7 +1569,7 @@ def run( "input_path": str(Path(input_path).resolve()), "input_pages": int(num_rows), "num_pages": int(num_rows), - "num_rows": int(len(result_df.index)), + "num_rows": int(total_row_count), "ingestion_only_secs": float(ingestion_only_total_time), "ray_download_secs": float(ray_download_time), "vdb_upload_secs": float(vdb_upload_time), @@ -1600,7 +1597,7 @@ def run( "input_path": str(Path(input_path).resolve()), "input_pages": int(num_rows), "num_pages": int(num_rows), - "num_rows": int(len(result_df.index)), + "num_rows": int(total_row_count), "ingestion_only_secs": float(ingestion_only_total_time), "ray_download_secs": float(ray_download_time), "vdb_upload_secs": float(vdb_upload_time), diff --git a/nemo_retriever/src/nemo_retriever/pipeline/ingest_result.py b/nemo_retriever/src/nemo_retriever/pipeline/ingest_result.py new file mode 100644 index 0000000000..f7826b779f --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/pipeline/ingest_result.py @@ -0,0 +1,205 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Streaming-friendly accessors for graph-pipeline ingest output. + +:class:`IngestResult` wraps the ingest output and exposes only accessors +that can be served by streaming/per-block reads: + +- ``row_count()`` — uses ``Dataset.count()`` or ``len(df.index)`` +- ``unique_source_count()`` — uses ``Dataset.unique(col)`` or ``Series.nunique`` +- ``iter_records()`` — streams rows one at a time +- ``count_uploadable_vdb_records()`` — streams in batches, sums per-batch +- ``detection_summary()`` — feeds streaming rows into ``compute_detection_summary`` +- ``write_parquet_dir(out_dir)`` — Ray Data writes per-block files into the dir; + pandas-backed mode writes a single file inside the same dir for symmetry. + +Three constructors map cleanly to the three ingest run modes: + +- ``IngestResult.from_dataset(ds)`` — batch mode (Ray Dataset). +- ``IngestResult.from_dataframe(df)`` — inprocess mode (single pandas DataFrame). +- ``IngestResult.from_service(records)`` — service mode (list of records from SSE). +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable, Iterator, Optional + +import pandas as pd + +from nemo_retriever.vdb.operators import VDB_UPLOADABLE_COLUMN + +logger = logging.getLogger(__name__) + + +_VDB_RECORD_SCAN_BATCH = 1024 + + +@dataclass +class IngestResult: + """Streaming-friendly handle over an ingest result. + + Construct via the ``from_*`` classmethods rather than calling this directly. + Exactly one of ``_dataframe``, ``_dataset``, ``_service_records`` is set. + """ + + _dataframe: Optional[pd.DataFrame] = None + _dataset: Optional[Any] = None + _service_records: Optional[list[dict[str, Any]]] = None + + @classmethod + def from_dataframe(cls, df: pd.DataFrame) -> "IngestResult": + return cls(_dataframe=df) + + @classmethod + def from_dataset(cls, dataset: Any) -> "IngestResult": + return cls(_dataset=dataset) + + @classmethod + def from_service(cls, records: Iterable[dict[str, Any]]) -> "IngestResult": + return cls(_service_records=list(records)) + + # ------------------------------------------------------------------ counts + + def row_count(self) -> int: + if self._dataset is not None: + return int(self._dataset.count()) + if self._dataframe is not None: + return int(len(self._dataframe.index)) + assert self._service_records is not None + return len(self._service_records) + + def unique_source_count(self) -> int: + """Number of distinct input units (used by run-summary reporting).""" + if self._service_records is not None: + # SSE results don't carry source-side columns today; the row count + # is the closest available proxy for input units. + return len(self._service_records) + if self._dataset is not None: + for col in ("source_id", "source_path"): + try: + return len(self._dataset.unique(col)) + except (KeyError, ValueError): + continue + return int(self._dataset.count()) + assert self._dataframe is not None + df = self._dataframe + if "source_id" in df.columns: + return int(df["source_id"].nunique()) + if "source_path" in df.columns: + return int(df["source_path"].nunique()) + return int(len(df.index)) + + # ----------------------------------------------------------------- streams + + def iter_records(self) -> Iterator[dict[str, Any]]: + if self._dataset is not None: + yield from self._dataset.iter_rows() + return + if self._dataframe is not None: + yield from self._dataframe.to_dict("records") + return + assert self._service_records is not None + yield from self._service_records + + # ------------------------------------------------------- derived accessors + + def count_uploadable_vdb_records(self) -> int: + """Count rows that produced a client-VDB record. + + Two branches in this method: + + - **Precomputed branch** (batch-mode result): ``IngestVdbOperator`` + already projected the heavy embedding/metadata payload away and + emitted a ``_vdb_uploadable`` boolean column per row. Summing that + Series is the cheapest possible answer and keeps Ray object store + + driver memory bounded. + + - **Re-derive branch** (inprocess result or raw DataFrame without the + flag): walk the rows and re-run ``to_client_vdb_records`` to decide + uploadability per row. This is more expensive per row but only runs + in modes that already materialize the corpus on the driver, so the + cost is bounded by the on-driver row count. + + Service mode doesn't reach this method — ``pipeline/__main__.py`` uses + the SSE row count directly for the same reporting field. + """ + from nemo_retriever.vdb.records import to_client_vdb_records + + total = 0 + # Iterate as pandas batches so the ``_vdb_uploadable`` column check + # is a single Series sum per batch. + if self._dataset is not None: + batches = self._dataset.iter_batches(batch_format="pandas", batch_size=_VDB_RECORD_SCAN_BATCH) + elif self._dataframe is not None: + batches = iter([self._dataframe]) + else: + assert self._service_records is not None + batches = iter([pd.DataFrame(self._service_records)]) + + for batch_df in batches: + if VDB_UPLOADABLE_COLUMN in batch_df.columns: + total += int(batch_df[VDB_UPLOADABLE_COLUMN].sum()) + else: + total += sum(len(group) for group in to_client_vdb_records(batch_df.to_dict("records"))) + return total + + def detection_summary(self) -> dict[str, Any]: + from nemo_retriever.utils.detection_summary import ( + compute_detection_summary, + iter_dataframe_rows, + ) + + # compute_detection_summary wants (page_key, meta, row_dict) tuples. + # For DataFrame backing reuse the existing helper (it handles JSON- + # string metadata); for dataset/service stream the same shape from + # individual row dicts. + if self._dataframe is not None: + return compute_detection_summary(iter_dataframe_rows(self._dataframe)) + + def _streaming_rows() -> Iterator[tuple[Any, dict[str, Any], dict[str, Any]]]: + for row in self.iter_records(): + path = str(row.get("path") or row.get("source_id") or "") + try: + page_number = int(row.get("page_number", -1)) + except (TypeError, ValueError): + page_number = -1 + meta = row.get("metadata") + if isinstance(meta, str): + try: + meta = json.loads(meta) + except json.JSONDecodeError as exc: + logger.debug( + "Could not parse metadata JSON for row %r: %s; treating as empty.", + path, + exc, + ) + meta = {} + if not isinstance(meta, dict): + meta = {} + yield (path, page_number), meta, row + + return compute_detection_summary(_streaming_rows()) + + def write_parquet_dir(self, out_dir: Path | str) -> Path: + """Write the result as a directory of per-block parquet files. + + Returns the output directory. Dataset-backed results use + ``Dataset.write_parquet``; pandas-backed results write a single + ``part-00000.parquet`` file inside ``out_dir`` so the on-disk shape + is the same regardless of run mode. + """ + out_path = Path(out_dir).expanduser().resolve() + if self._dataset is not None: + self._dataset.write_parquet(str(out_path)) + return out_path + + out_path.mkdir(parents=True, exist_ok=True) + df = self._dataframe if self._dataframe is not None else pd.DataFrame(self._service_records or []) + df.to_parquet(out_path / "part-00000.parquet", index=False) + return out_path diff --git a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py index b1097117b4..3eb278bbc6 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py @@ -1,4 +1,7 @@ +from __future__ import annotations + from abc import ABC, abstractmethod +from typing import Any """Abstract Vector Database (VDB) operator API. @@ -236,3 +239,34 @@ def reindex(self, records: list, **kwargs): - implementation-specific result """ pass + + def append(self, records: list[list[dict[str, Any]]], *, overwrite: bool) -> bool: + """Stream-friendly write entry point used by ``IngestVdbOperator``. + + Implementations should write ``records`` to the underlying table without + building search indexes; ``build_index`` is invoked separately once all + writes are complete. When ``overwrite`` is ``True`` the implementation + must drop any existing table before writing the first batch; subsequent + calls with ``overwrite=False`` must append. + + Returns ``True`` if any rows were committed to the backing store; returns + ``False`` if the implementation early-returned (e.g. all records failed + per-row validation and there were no rows to write). The streaming + operator uses this signal to keep ``overwrite=True`` until a write + actually lands — otherwise an "empty" first batch would silently flip + the handshake to ``overwrite=False`` against a non-existent table. + """ + raise NotImplementedError( + f"{type(self).__name__} does not implement streaming append(); " + "the operator's IngestVdbOperator path requires it." + ) + + def build_index(self) -> None: + """Build vector indexes on the underlying table. + + Called by ``GraphIngestor._finalize_vdb_upload`` exactly once after the streaming + writes from ``IngestVdbOperator`` are finished. Default is a no-op for + backends that do not have an explicit "build index after data is in" + step. + """ + return None diff --git a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py index 74f6b509d4..4731ca236e 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py @@ -325,21 +325,17 @@ def __init__( on_bad_vectors: str = "drop", fill_value: float = 0.0, validate_vector_length: bool = True, - build_index: bool | None = None, + create_index: bool = True, **kwargs, ): - create_index = kwargs.pop("create_index", None) - if build_index is None: - build_index = True if create_index is None else bool(create_index) - elif create_index is not None and bool(create_index) != bool(build_index): - raise ValueError("Pass only one index toggle: build_index or create_index.") - if int(vector_dim) <= 0: raise ValueError(f"vector_dim must be positive; got {vector_dim}") self.uri = uri or "lancedb" self.overwrite = bool(overwrite) self.table_name = table_name - self.build_index = bool(build_index) + # Stored under a private name so it doesn't shadow the ``build_index()`` + # streaming-finalize method on this class / the VDB base. + self._build_index_on_run = bool(create_index) self.index_type = index_type self.metric = metric self.num_partitions = num_partitions @@ -350,6 +346,11 @@ def __init__( self.on_bad_vectors = _normalize_on_bad_vectors(on_bad_vectors) self.fill_value = float(fill_value) self.validate_vector_length = bool(validate_vector_length) + # Cached lance connection + table handle reused across streaming + # ``append`` calls. The IngestVdbOperator is pinned to concurrency=1 + # in ingestor_runtime so there is exactly one writer per actor. + self._cached_db: Any = None + self._cached_table: Any = None super().__init__(**kwargs) def create_index(self, records=None, table_name: str = "nv-ingest", **kwargs): @@ -514,7 +515,7 @@ def write_to_index( def run(self, records): """Orchestrate index creation and data ingestion.""" table = self.create_index(records=records, table_name=self.table_name) - if self.build_index: + if self._build_index_on_run: self.write_to_index( records, table=table, @@ -526,9 +527,127 @@ def run(self, records): fts_language=self.fts_language, ) else: - logger.info("Skipping LanceDB index creation for table %r because build_index=False.", self.table_name) + logger.info("Skipping LanceDB index creation for table %r because create_index=False.", self.table_name) return records + def _lancedb_schema(self) -> "pa.Schema": + return pa.schema( + [ + pa.field("vector", pa.list_(pa.float32(), self.vector_dim)), + pa.field("text", pa.string()), + pa.field("metadata", pa.string()), + pa.field("source", pa.string()), + ] + ) + + def append(self, records: list[list[dict[str, Any]]], *, overwrite: bool) -> bool: + """Streaming write entry point for ``IngestVdbOperator``. + + Writes ``records`` to the LanceDB table without building any search + index. ``build_index`` does that once, after all batches are in. + + Semantics: + - ``overwrite=True``: drop any existing table and create it from this + batch's rows + the LanceDB schema. + - ``overwrite=False``: open the existing table and ``table.add`` the + rows. (Callers are responsible for ensuring an overwrite call landed + first; ``IngestVdbOperator`` does this by pinning concurrency=1 and + tracking a first-batch flag.) + + Returns ``True`` when rows were committed to the table, ``False`` when + every record was filtered out by ``_create_lancedb_results`` (so no + table was created or modified). The operator uses this to keep + ``overwrite=True`` until a write actually lands. + + The lance connection and table handle are cached on ``self`` so the + per-batch path is just ``table.add(rows)`` plus the underlying lance + version commit — no repeated ``lancedb.connect()`` or ``open_table()`` + (which becomes expensive once the table accumulates many versions). + """ + if self.validate_vector_length and self.on_bad_vectors != "error": + expected_dim: int | None = self.vector_dim + else: + expected_dim = None + + rows, counts = _create_lancedb_results(records, expected_dim=expected_dim) + if not rows: + return False + + if self._cached_db is None: + self._cached_db = lancedb.connect(uri=self.uri) + + if overwrite: + create_kwargs: dict[str, Any] = { + "data": rows, + "schema": self._lancedb_schema(), + "mode": "overwrite", + "on_bad_vectors": self.on_bad_vectors, + } + if self.on_bad_vectors == "fill": + create_kwargs["fill_value"] = self.fill_value + t0 = time.perf_counter() + self._cached_table = self._cached_db.create_table(self.table_name, **create_kwargs) + _record_timing( + "lancedb.create_table", + time.perf_counter() - t0, + {"rows": len(rows), **counts}, + ) + else: + if self._cached_table is None: + self._cached_table = self._cached_db.open_table(self.table_name) + # Mirror create_index()'s append-path warning so streaming and + # legacy paths give the same heads-up about non-deduplicating + # re-runs. Emit once per operator instance, the first time we + # open a pre-existing table for append. + existing_rows = int(self._cached_table.count_rows()) + if existing_rows: + logger.warning( + "Appending to existing LanceDB table %r at %s " + "(existing_rows=%d). Append mode does not deduplicate; " + "rerunning the same inputs will duplicate rows.", + self.table_name, + self.uri, + existing_rows, + ) + t0 = time.perf_counter() + self._cached_table.add(rows) + _record_timing( + "lancedb.append", + time.perf_counter() - t0, + {"rows": len(rows), **counts}, + ) + + return True + + def build_index(self) -> None: + """Build vector indexes on the populated table.""" + if not self._build_index_on_run: + logger.info( + "Skipping LanceDB index build for table %r because create_index=False.", + self.table_name, + ) + return + db = lancedb.connect(uri=self.uri) + try: + table = db.open_table(self.table_name) + except (FileNotFoundError, ValueError): + # Empty corpus: no batch produced rows, so no table was ever created. + logger.warning( + "LanceDB.build_index: table %r does not exist; skipping index build.", + self.table_name, + ) + return + self.write_to_index( + records=None, + table=table, + index_type=self.index_type, + metric=self.metric, + num_partitions=self.num_partitions, + num_sub_vectors=self.num_sub_vectors, + hybrid=self.hybrid, + fts_language=self.fts_language, + ) + def retrieval(self, vectors, **kwargs): """Search LanceDB with precomputed query vectors. diff --git a/nemo_retriever/src/nemo_retriever/vdb/operators.py b/nemo_retriever/src/nemo_retriever/vdb/operators.py index caf5c87710..301415e67e 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/operators.py +++ b/nemo_retriever/src/nemo_retriever/vdb/operators.py @@ -14,7 +14,11 @@ from nemo_retriever.vdb.factory import get_vdb_op_cls from nemo_retriever.graph.abstract_operator import AbstractOperator -from nemo_retriever.vdb.records import normalize_retrieval_results, to_client_vdb_records +from nemo_retriever.vdb.records import ( + _client_record_from_graph_row, + normalize_retrieval_results, + to_client_vdb_records, +) from nemo_retriever.vdb.sidecar_metadata import ( apply_sidecar_metadata_to_client_batches, build_sidecar_lookup, @@ -23,6 +27,32 @@ ) +#: Per-row boolean column emitted by ``IngestVdbOperator`` indicating whether +#: this row produced a client-VDB record. Read by +#: ``IngestResult.count_uploadable_vdb_records`` so the count survives the +#: projection (which drops the embedding/text that the validity check needed). +VDB_UPLOADABLE_COLUMN = "_vdb_uploadable" + + +#: Whitelist of accounting columns kept in the IngestVdbOperator output. +#: Every other column (embedding floats, nested metadata, full chunk text, +#: raw PDF bytes, page images, …) is dropped before the row travels through +#: the downstream global-batch barrier — a whitelist beats a blacklist here +#: because any future schema addition is heavy-by-default and would otherwise +#: silently re-fill plasma + driver memory at the end of the run. +#: +#: - ``source_id`` / ``source_path`` / ``path``: driver-side ``unique_source_count`` +#: and detection-summary key. +#: - ``page_number``: detection-summary key. +#: - ``VDB_UPLOADABLE_COLUMN``: per-row uploadable flag (see above). +_ACCOUNTING_COLUMNS = frozenset({"source_id", "source_path", "path", "page_number", VDB_UPLOADABLE_COLUMN}) + + +def _project_to_accounting_columns(df: "pd.DataFrame") -> "pd.DataFrame": + keep = [col for col in df.columns if col in _ACCOUNTING_COLUMNS] + return df[keep] if len(keep) != len(df.columns) else df + + def _construct_vdb( *, vdb: VDB | None = None, @@ -84,11 +114,17 @@ def query_vectors_from_embedded_dataframe(df: pd.DataFrame) -> list[list[float]] class IngestVdbOperator(AbstractOperator): - """Upload already-embedded graph output through an nv-ingest-client VDB.""" + """Stream already-embedded graph output into a VDB. + + Each call to `process` writes a single Ray Data block (in batch mode) or + the full DataFrame (inprocess mode) via `VDB.append`. The first call uses + ``overwrite=True`` to drop any pre-existing table; subsequent calls append. + Index construction is deferred to `GraphIngestor._finalize_vdb_upload`. + """ - #: Ray batch mode: repartition to one block and one ``map_batches`` call so - #: ``VDB.run`` sees the full dataset once (matches historical post-graph upload). - REQUIRES_GLOBAL_BATCH: bool = True + #: No global-batch repartition: peak memory is bounded by the batch size, + #: not the corpus. Requires concurrency=1 (pinned in ``ingestor_runtime``). + REQUIRES_GLOBAL_BATCH: bool = False def __init__( self, @@ -111,14 +147,34 @@ def __init__( sidecar["meta_fields"], ) self._vdb = _construct_vdb(vdb=vdb, vdb_op=vdb_op, vdb_kwargs=clean_kwargs) + self._wrote_first_batch = False def preprocess(self, data: Any, **kwargs: Any) -> Any: return data def process(self, data: Any, **kwargs: Any) -> Any: - # Compatibility shim: graph_pipeline emits flat embedded rows, while - # nv-ingest-client VDB.run still expects nested NV-Ingest records. - records = to_client_vdb_records(data) + # graph_pipeline emits flat embedded rows; nv-ingest-client VDBs expect + # the nested record shape from ``to_client_vdb_records``. + if isinstance(data, pd.DataFrame): + # Compute per-row validity in lock-step with record conversion so + # we can emit a small ``_vdb_uploadable`` flag column. Without + # this, the downstream global-batch barrier accumulates the full + # embedded payload in Ray object store and starves the upstream + # pipeline. + valid_records: list[dict[str, Any]] = [] + uploadable_mask: list[bool] = [] + for row in data.to_dict("records"): + rec = _client_record_from_graph_row(row) + if rec is None: + uploadable_mask.append(False) + else: + valid_records.append(rec) + uploadable_mask.append(True) + records: list[list[dict[str, Any]]] = [valid_records] if valid_records else [] + else: + records = to_client_vdb_records(data) + uploadable_mask = [] + if self._sidecar_spec is not None and self._sidecar_lookup is not None: records = apply_sidecar_metadata_to_client_batches( records, @@ -127,7 +183,24 @@ def process(self, data: Any, **kwargs: Any) -> Any: join_key=self._sidecar_spec["meta_join_key"], ) if records and any(batch for batch in records): - self._vdb.run(records) + # ``--append`` mode (``vdb.overwrite=False``) must preserve any table + # left behind by an earlier run. Only the first batch of an + # ``overwrite=True`` run is allowed to drop the existing table; every + # other batch — and every batch in append mode — uses ``table.add``. + vdb_overwrite = bool(getattr(self._vdb, "overwrite", True)) + overwrite_this_batch = vdb_overwrite and not self._wrote_first_batch + # ``append`` returns False when the VDB filtered every record out + # (e.g. wrong embedding length) and didn't actually write. In that + # case the next batch must keep ``overwrite=True`` — otherwise we'd + # call ``table.add`` on a table that was never created. + if self._vdb.append(records, overwrite=overwrite_this_batch): + self._wrote_first_batch = True + + if isinstance(data, pd.DataFrame): + # ``.assign`` returns a shallow-copied frame, so the operator's + # input ``data`` is not mutated and the projection result is a + # fresh DataFrame with the uploadable flag attached. + return _project_to_accounting_columns(data).assign(**{VDB_UPLOADABLE_COLUMN: uploadable_mask}) return data def postprocess(self, data: Any, **kwargs: Any) -> Any: diff --git a/nemo_retriever/tests/test_graph_pipeline_cli.py b/nemo_retriever/tests/test_graph_pipeline_cli.py index 42ca2633fd..6fe1d1e1a4 100644 --- a/nemo_retriever/tests/test_graph_pipeline_cli.py +++ b/nemo_retriever/tests/test_graph_pipeline_cli.py @@ -20,6 +20,9 @@ class _FakeDataset: + """Empty-corpus Ray Dataset stand-in supporting the streaming surface + used by :class:`IngestResult`.""" + def materialize(self): return self @@ -29,6 +32,23 @@ def take_all(self): def to_pandas(self): return pd.DataFrame() + def count(self) -> int: + return 0 + + def unique(self, column: str): + raise KeyError(column) + + def iter_rows(self): + return iter(()) + + def iter_batches(self, *, batch_format="pandas", batch_size=None): + return iter(()) + + def write_parquet(self, path: str) -> None: + from pathlib import Path as _P + + _P(path).mkdir(parents=True, exist_ok=True) + def groupby(self, _key): class _FakeGrouped: @staticmethod @@ -311,7 +331,10 @@ def test_graph_pipeline_cli_routes_beir_mode_to_evaluator(tmp_path, monkeypatch) fake_ingestor = _FakeIngestor() monkeypatch.setattr(pipeline_main, "GraphIngestor", lambda *args, **kwargs: fake_ingestor) - monkeypatch.setattr(pipeline_main, "_count_uploadable_vdb_records", lambda _records: 1) + monkeypatch.setattr( + "nemo_retriever.pipeline.ingest_result.IngestResult.count_uploadable_vdb_records", + lambda self: 1, + ) monkeypatch.setattr(detection_summary_module, "print_run_summary", lambda *args, **kwargs: None) class _FakeTable: @@ -391,7 +414,10 @@ def open_table(self, _name): monkeypatch.setitem(sys.modules, "lancedb", SimpleNamespace(connect=lambda _uri: _FakeDb())) monkeypatch.setattr(model_module, "resolve_embed_model", lambda _name: "fake-embed-model") - monkeypatch.setattr(pipeline_main, "_count_uploadable_vdb_records", lambda _records: 1) + monkeypatch.setattr( + "nemo_retriever.pipeline.ingest_result.IngestResult.count_uploadable_vdb_records", + lambda self: 1, + ) monkeypatch.setattr( pipeline_main, "_run_evaluation", diff --git a/nemo_retriever/tests/test_ingest_plans.py b/nemo_retriever/tests/test_ingest_plans.py index aaf520fcc0..8ca855dc48 100644 --- a/nemo_retriever/tests/test_ingest_plans.py +++ b/nemo_retriever/tests/test_ingest_plans.py @@ -123,6 +123,9 @@ def test_build_graph_inserts_ingest_vdb_before_webhook() -> None: break node = node.children[0] + # Streaming write is the last VDB stage; the index build runs as a + # driver-side hook (``GraphIngestor._finalize_vdb_upload``) after the + # graph completes, not as another Ray Data operator. assert names[-2] == "IngestVdbOperator" assert names[-1] == "WebhookNotifyOperator" diff --git a/nemo_retriever/tests/test_ingest_result.py b/nemo_retriever/tests/test_ingest_result.py new file mode 100644 index 0000000000..08edec8ab3 --- /dev/null +++ b/nemo_retriever/tests/test_ingest_result.py @@ -0,0 +1,121 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Two load-bearing invariants for :class:`IngestResult`. + +The DataFrame backing's basic behaviour is exercised indirectly via +``TestCollectResults`` in ``test_pipeline_helpers.py`` (which constructs an +``IngestResult.from_dataframe`` and calls ``row_count`` / ``iter_records`` / +``unique_source_count`` through ``_collect_results``); the end-to-end +multimodal-PDF integration test covers the real Ray-Dataset path through the +production CLI. The two tests below pin the bits unique to this module: + +1. A dataset-backed result must never call ``Dataset.to_pandas()`` — that is + the entire point of the driver-side memory fix; if any accessor regresses + to a full-corpus pull, the guard-rail fake raises. +2. Service mode uses ``total_pages`` as the input-unit count when present. +""" + +from __future__ import annotations + +from typing import Any, Iterable + +import pandas as pd + +from nemo_retriever.pipeline.ingest_result import IngestResult + + +def _rows() -> list[dict[str, Any]]: + """Three uploadable rows spanning two distinct sources.""" + return [ + { + "source_id": "doc-a.pdf", + "text": "alpha", + "text_embeddings_1b_v2": {"embedding": [0.1] * 2048}, + "metadata": {"content_metadata": {"type": "text"}}, + "page_number": 1, + }, + { + "source_id": "doc-a.pdf", + "text": "beta", + "text_embeddings_1b_v2": {"embedding": [0.2] * 2048}, + "metadata": {"content_metadata": {"type": "text"}}, + "page_number": 2, + }, + { + "source_id": "doc-b.pdf", + "text": "gamma", + "text_embeddings_1b_v2": {"embedding": [0.3] * 2048}, + "metadata": {"content_metadata": {"type": "text"}}, + "page_number": 1, + }, + ] + + +class _GuardRailDataset: + """Ray-Dataset-shaped fake whose ``to_pandas`` raises so an accidental + full-corpus pull is caught by the test rather than slipping into prod.""" + + def __init__(self, rows: list[dict[str, Any]]) -> None: + self._rows = rows + + def count(self) -> int: + return len(self._rows) + + def iter_rows(self) -> Iterable[dict[str, Any]]: + yield from self._rows + + def iter_batches(self, *, batch_format: str = "pandas", batch_size: int | None = None): + assert batch_format == "pandas" + size = batch_size or len(self._rows) or 1 + for start in range(0, len(self._rows), size): + yield pd.DataFrame(self._rows[start : start + size]) + + def unique(self, column: str) -> list[Any]: + seen: list[Any] = [] + for row in self._rows: + v = row.get(column) + if v not in seen: + seen.append(v) + return seen + + def write_parquet(self, path: str) -> None: # pragma: no cover - not exercised here + from pathlib import Path as _P + + _P(path).mkdir(parents=True, exist_ok=True) + + def to_pandas(self) -> pd.DataFrame: # pragma: no cover - guard rail + raise AssertionError( + "IngestResult must not call Dataset.to_pandas(); that defeats the " "driver-side memory fix." + ) + + +def test_dataset_backed_result_streams_without_pulling_to_driver() -> None: + """The whole point of this refactor: every IngestResult accessor must be + serveable from streaming Ray Dataset primitives, never from ``to_pandas``.""" + result = IngestResult.from_dataset(_GuardRailDataset(_rows())) + + assert result.row_count() == 3 + assert result.unique_source_count() == 2 # doc-a (×2), doc-b + assert sum(1 for _ in result.iter_records()) == 3 + assert result.count_uploadable_vdb_records() == 3 + # If any accessor above had called Dataset.to_pandas(), the fake would + # have raised and this assertion would never be reached. + + +def test_dataset_uses_precomputed_uploadable_flag_when_present() -> None: + """IngestVdbOperator emits a ``_vdb_uploadable`` boolean column in batch + mode after projecting away the heavy embedding/metadata columns. The + streaming uploadable count must read that flag instead of trying to + re-derive it from records that no longer carry an embedding.""" + + rows = [ + {"source_id": "a", "_vdb_uploadable": True}, + {"source_id": "a", "_vdb_uploadable": False}, + {"source_id": "b", "_vdb_uploadable": True}, + {"source_id": "b", "_vdb_uploadable": True}, + ] + result = IngestResult.from_dataset(_GuardRailDataset(rows)) + + assert result.count_uploadable_vdb_records() == 3 diff --git a/nemo_retriever/tests/test_lancedb_streaming_append.py b/nemo_retriever/tests/test_lancedb_streaming_append.py new file mode 100644 index 0000000000..8adab593bc --- /dev/null +++ b/nemo_retriever/tests/test_lancedb_streaming_append.py @@ -0,0 +1,85 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Streaming round-trip for ``LanceDB.append`` + ``LanceDB.build_index``. + +One direct test pins two invariants that together describe the streaming +contract used by ``IngestVdbOperator`` and ``GraphIngestor._finalize_vdb_upload``: + +1. **Semantics** — first ``append(overwrite=True)`` creates the table, + subsequent ``append(overwrite=False)`` calls add rows, ``build_index`` + then builds the vector index on the populated table. +2. **Connection caching** — every per-batch ``lancedb.connect()`` + + ``open_table()`` re-scans Lance's manifest, which grows with every prior + commit; the operator must cache the handle so repeated appends pay only + the commit cost. (This was the throughput killer pre-cache.) + +The end-to-end ingest+retrieve integration test covers the full pipeline; this +keeps a fast regression net around both invariants without the multi-minute +pipeline cost. +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import lancedb + +from nemo_retriever.vdb.lancedb import LanceDB + + +VECTOR_DIM = 8 + + +def _records(*texts: str) -> list[list[dict[str, Any]]]: + inner = [ + { + "document_type": "text", + "metadata": { + "embedding": [0.1 + 0.01 * i] * VECTOR_DIM, + "content": text, + "content_metadata": {"type": "text", "page_number": i + 1}, + "source_metadata": {"source_id": f"/tmp/doc-{i}.pdf"}, + }, + } + for i, text in enumerate(texts) + ] + return [inner] + + +def test_streaming_append_round_trip_with_cached_connection(tmp_path: Path, monkeypatch) -> None: + from nemo_retriever.vdb import lancedb as lancedb_module + + connect_count = 0 + real_connect = lancedb_module.lancedb.connect + + def _counting_connect(*args, **kwargs): + nonlocal connect_count + connect_count += 1 + return real_connect(*args, **kwargs) + + monkeypatch.setattr(lancedb_module.lancedb, "connect", _counting_connect) + + vdb = LanceDB(uri=str(tmp_path / "db"), table_name="stream", vector_dim=VECTOR_DIM, hybrid=False) + + vdb.append(_records("a", "b"), overwrite=True) + connect_count_after_first_append = connect_count + vdb.append(_records("c"), overwrite=False) + vdb.append(_records("d"), overwrite=False) + vdb.append(_records("e"), overwrite=False) + + # Three follow-up appends must reuse the cached connection — that's the + # whole point of the cache, and a regression here would re-introduce the + # per-batch manifest re-scan throughput collapse. + assert connect_count == connect_count_after_first_append, ( + f"3 streaming appends opened " f"{connect_count - connect_count_after_first_append} extra connections" + ) + + vdb.build_index() + + table = lancedb.connect(uri=vdb.uri).open_table(vdb.table_name) + assert table.count_rows() == 5 + indexed_columns = {idx.columns[0] if idx.columns else "" for idx in table.list_indices()} + assert "vector" in indexed_columns diff --git a/nemo_retriever/tests/test_lancedb_write_policy.py b/nemo_retriever/tests/test_lancedb_write_policy.py index 0e37ae884d..d46473e614 100644 --- a/nemo_retriever/tests/test_lancedb_write_policy.py +++ b/nemo_retriever/tests/test_lancedb_write_policy.py @@ -74,8 +74,13 @@ def test_append_incompatible_schema_raises_clear_error(tmp_path: Path) -> None: def test_create_index_kwarg_disables_index_build_without_shadowing_method(tmp_path: Path) -> None: op = LanceDB(uri=str(tmp_path), table_name="t", vector_dim=2, create_index=False) + # ``create_index`` and ``build_index`` must remain callable methods — the + # streaming flow (``IngestVdbOperator`` → ``vdb.append`` → ``vdb.build_index``) + # invokes ``build_index()`` after all batches land. The constructor kwarg is + # stored on a private attribute so it can't shadow either method. assert callable(op.create_index) - assert op.build_index is False + assert callable(op.build_index) + assert op._build_index_on_run is False def fail_if_called(*_args, **_kwargs) -> None: raise AssertionError("write_to_index should not be called when create_index=False") diff --git a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py index 8213405eea..f00e08a12c 100644 --- a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py +++ b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py @@ -17,6 +17,8 @@ class FakeVDB(VDB): def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.run_calls: list[Any] = [] + self.append_calls: list[tuple[Any, bool]] = [] + self.build_index_calls: int = 0 self.retrieval_calls: list[tuple[Any, dict[str, Any]]] = [] def create_index(self, **kwargs: Any) -> None: @@ -47,6 +49,13 @@ def run(self, records: Any) -> dict[str, Any]: self.run_calls.append(records) return {"records": records} + def append(self, records: Any, *, overwrite: bool) -> bool: + self.append_calls.append((records, overwrite)) + return True + + def build_index(self) -> None: + self.build_index_calls += 1 + def _graph_rows() -> list[dict[str, Any]]: return [ @@ -67,19 +76,6 @@ def _graph_rows() -> list[dict[str, Any]]: ] -def test_process_returns_original_graph_rows_and_delegates_converted_records_to_run() -> None: - data = _graph_rows() - vdb = FakeVDB() - operator = IngestVdbOperator(vdb=vdb) - - assert operator.preprocess(data) is data - assert operator.process(data) is data - assert operator.postprocess(data) is data - assert vdb.run_calls[0][0][0]["document_type"] == "text" - assert vdb.run_calls[0][0][0]["metadata"]["content"] == "first chunk" - assert vdb.run_calls[0][0][0]["metadata"]["embedding"] == [0.1] * 2048 - - def test_vdb_op_constructs_client_vdb(monkeypatch: pytest.MonkeyPatch) -> None: constructed_kwargs: dict[str, Any] = {} @@ -114,23 +110,26 @@ def test_ingest_operator_converts_graph_rows_to_client_vdb_records() -> None: assert operator(data) is data - assert vdb.run_calls == [ - [ + assert vdb.append_calls == [ + ( [ - { - "document_type": "text", - "metadata": { - "embedding": [0.1] * 2048, - "content": "graph chunk", - "content_metadata": {"page_number": 7}, - "source_metadata": { - "source_id": "/tmp/doc-a.pdf", - "source_name": "doc-a.pdf", + [ + { + "document_type": "text", + "metadata": { + "embedding": [0.1] * 2048, + "content": "graph chunk", + "content_metadata": {"page_number": 7}, + "source_metadata": { + "source_id": "/tmp/doc-a.pdf", + "source_name": "doc-a.pdf", + }, }, - }, - } - ] - ] + } + ] + ], + True, + ) ] @@ -167,3 +166,100 @@ def test_constructor_requires_exactly_one_vdb_source() -> None: with pytest.raises(ValueError, match="Pass either vdb or vdb_op"): IngestVdbOperator(vdb=FakeVDB(), vdb_op="lancedb") + + +def test_ingest_operator_streams_with_overwrite_flag_flip() -> None: + """First batch overwrites the table; subsequent batches append. This handshake + is the entire streaming contract — without concurrency=1 it'd be impossible to + keep coherent, and without the flag flip we'd either rewrite per batch or + never overwrite at all.""" + vdb = FakeVDB() + operator = IngestVdbOperator(vdb=vdb) + + operator.process(_graph_rows()) + operator.process(_graph_rows()) + operator.process(_graph_rows()) + + overwrites = [overwrite for _records, overwrite in vdb.append_calls] + assert overwrites == [True, False, False] + assert vdb.run_calls == [] # legacy global-batch path must not be invoked + + +def test_finalize_vdb_upload_builds_index_when_params_set(monkeypatch: pytest.MonkeyPatch) -> None: + """``GraphIngestor`` finalizes a configured VDB upload by constructing the + backend once and calling ``build_index()`` — replacing the old downstream + ``VdbBuildIndexOperator`` Ray-Data barrier with a driver-side hook so the + graph itself can stay fully streaming end-to-end.""" + from nemo_retriever.graph_ingestor import GraphIngestor + from nemo_retriever.params import VdbUploadParams + + constructed: list[dict[str, Any]] = [] + indexed: list[FakeVDB] = [] + + class _CountingVDB(FakeVDB): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + constructed.append(kwargs) + + def build_index(self) -> None: + super().build_index() + indexed.append(self) + + monkeypatch.setattr(vdb_operator_module, "get_vdb_op_cls", lambda _name: _CountingVDB) + + ingestor = GraphIngestor(run_mode="inprocess", documents=[]) + ingestor.vdb_upload(VdbUploadParams(vdb_op="fake", vdb_kwargs={"uri": "/tmp/x", "table_name": "t"})) + ingestor._finalize_vdb_upload() + + assert len(constructed) == 1 + assert constructed[0] == {"uri": "/tmp/x", "table_name": "t"} + assert len(indexed) == 1 and indexed[0].build_index_calls == 1 + + +def test_dataframe_output_keeps_only_whitelisted_columns_and_marks_uploadable() -> None: + """When the operator receives a pandas DataFrame (real Ray Data path), the + returned block must keep ONLY a small whitelist of accounting columns — + anything else (embeddings, metadata blobs, raw PDF bytes, page images, + extracted text) accumulates in plasma at the downstream global-batch + barrier and dominates the driver-side materialization. A whitelist also + survives future schema additions: any newly-added heavy column gets + dropped by default.""" + import pandas as pd + + rows = pd.DataFrame( + [ + # Valid row: text + embedding present. + { + "text": "alpha", + "text_embeddings_1b_v2": {"embedding": [0.1] * 2048}, + "source_id": "doc-a.pdf", + "source_path": "/tmp/doc-a.pdf", + "path": "/tmp/doc-a.pdf", + "page_number": 1, + "metadata": {"content_metadata": {"type": "text"}}, + "bytes": b"x" * 1024, # heavy column not in any blacklist + "image_data": "base64..." * 100, # another heavy column + }, + # Invalid row: text but no embedding. + { + "text": "beta", + "text_embeddings_1b_v2": None, + "source_id": "doc-a.pdf", + "source_path": "/tmp/doc-a.pdf", + "path": "/tmp/doc-a.pdf", + "page_number": 2, + "metadata": {"content_metadata": {"type": "text"}}, + "bytes": b"y" * 1024, + "image_data": "base64..." * 100, + }, + ] + ) + + operator = IngestVdbOperator(vdb=FakeVDB()) + out = operator(rows) + + # Only the accounting whitelist + the _vdb_uploadable flag must remain. + expected = {"source_id", "source_path", "path", "page_number", "_vdb_uploadable"} + assert set(out.columns) == expected + # First row uploaded, second row dropped (no embedding). + assert list(out["_vdb_uploadable"]) == [True, False] diff --git a/nemo_retriever/tests/test_pipeline_graph.py b/nemo_retriever/tests/test_pipeline_graph.py index 2ffe39a185..be77cc032e 100644 --- a/nemo_retriever/tests/test_pipeline_graph.py +++ b/nemo_retriever/tests/test_pipeline_graph.py @@ -926,7 +926,9 @@ def _fake_read_binary_files(paths, include_paths=True): executor = RayDataExecutor(Graph()) result = executor.ingest([str(tmp_path / "**" / "*.pdf")]) - assert isinstance(result, pd.DataFrame) + # The executor now returns the materialized Ray Dataset (cheap on the + # driver — block refs only) instead of forcing a ``to_pandas()`` pull. + assert isinstance(result, _FakeDataset) assert captured["paths"] == [str(pdf_path)] assert captured["include_paths"] is True diff --git a/nemo_retriever/tests/test_pipeline_helpers.py b/nemo_retriever/tests/test_pipeline_helpers.py index 93d377e3dc..2dfcdfae0b 100644 --- a/nemo_retriever/tests/test_pipeline_helpers.py +++ b/nemo_retriever/tests/test_pipeline_helpers.py @@ -247,34 +247,30 @@ def test_build_embed_params_forwards_remote_and_modality_flags() -> None: class TestCollectResults: - """Ingest returns a DataFrame (``ingestor.ingest()`` → ``ds.to_pandas()``); _collect_results consumes it.""" + """``_collect_results`` returns an ``IngestResult`` plus download time + unit count.""" - def test_batch_mode_accepts_ingest_dataframe(self): + def test_batch_mode_wraps_dataframe_input(self): rows = [ {"source_id": "a", "text": "hello"}, {"source_id": "a", "text": "world"}, {"source_id": "b", "text": "!"}, ] - # Same shape as the graph executor return after ``Dataset.to_pandas()``. + # Pre-fix ingest could return a DataFrame even in batch mode (after the + # historical ``ds.to_pandas()``). The streaming refactor still accepts it. result_df = pd.DataFrame(rows) - records, df, download_time, num_units = _collect_results("batch", result_df) + handle, download_time, num_units = _collect_results("batch", result_df) - assert records == rows - assert df is result_df - assert isinstance(df, pd.DataFrame) - assert list(df.columns) == ["source_id", "text"] - assert len(df) == 3 + assert list(handle.iter_records()) == rows + assert handle.row_count() == 3 # ``source_id`` has two distinct values → that is the unit count. assert num_units == 2 assert download_time >= 0.0 def test_batch_mode_handles_empty_result(self): - result_df = pd.DataFrame() - records, df, download_time, num_units = _collect_results("batch", result_df) - assert records == [] - assert df.empty - # Empty DataFrame has no columns → falls through to len(df.index) == 0. + handle, download_time, num_units = _collect_results("batch", pd.DataFrame()) + assert handle.row_count() == 0 + # Empty DataFrame has no columns → falls through to row count == 0. assert num_units == 0 assert download_time >= 0.0 @@ -285,11 +281,9 @@ def test_inprocess_mode_accepts_dataframe_directly(self): ] df_in = pd.DataFrame(rows) - records, df_out, download_time, num_units = _collect_results("inprocess", df_in) + handle, download_time, num_units = _collect_results("inprocess", df_in) - # The DataFrame is passed through unchanged (same object). - assert df_out is df_in - assert records == rows + assert list(handle.iter_records()) == rows # inprocess mode never incurs Ray download time. assert download_time == 0.0 assert num_units == 2 @@ -298,10 +292,9 @@ def test_inprocess_mode_accepts_dataframe_directly(self): def test_collect_results_accepts_inprocess_dataframe() -> None: df_in = pd.DataFrame([{"source_path": "/a.pdf"}, {"source_path": "/b.pdf"}]) - records, df_out, download_time, num_units = _collect_results("inprocess", df_in) + handle, download_time, num_units = _collect_results("inprocess", df_in) - assert df_out is df_in - assert records == [{"source_path": "/a.pdf"}, {"source_path": "/b.pdf"}] + assert list(handle.iter_records()) == [{"source_path": "/a.pdf"}, {"source_path": "/b.pdf"}] assert download_time == 0.0 assert num_units == 2 diff --git a/nemo_retriever/tests/test_sidecar_metadata.py b/nemo_retriever/tests/test_sidecar_metadata.py index 4f06a4f2de..665d76519c 100644 --- a/nemo_retriever/tests/test_sidecar_metadata.py +++ b/nemo_retriever/tests/test_sidecar_metadata.py @@ -25,6 +25,8 @@ class _FakeVDB: def __init__(self, **kwargs: Any) -> None: self.kwargs = kwargs self.run_calls: list[Any] = [] + self.append_calls: list[tuple[Any, bool]] = [] + self.build_index_calls: int = 0 def create_index(self, **kwargs: Any) -> None: return None @@ -38,6 +40,12 @@ def retrieval(self, vectors: list, **kwargs: Any) -> list: def run(self, records: Any) -> None: self.run_calls.append(records) + def append(self, records: Any, *, overwrite: bool) -> None: + self.append_calls.append((records, overwrite)) + + def build_index(self) -> None: + self.build_index_calls += 1 + def test_normalize_sidecar_cell_value_list_and_dict_no_raise() -> None: assert normalize_sidecar_cell_value([1, 2]) == [1, 2] @@ -104,8 +112,9 @@ def test_apply_sidecar_merges_into_content_metadata(tmp_path: Path) -> None: assert cm["type"] == "text" -def test_ingest_vdb_operator_marks_global_batch_for_ray() -> None: - assert IngestVdbOperator.REQUIRES_GLOBAL_BATCH is True +def test_ingest_vdb_operator_streams_without_global_batch() -> None: + """Streaming append path keeps per-actor memory bounded by the batch size.""" + assert IngestVdbOperator.REQUIRES_GLOBAL_BATCH is False def test_vdb_upload_params_triplet_validation() -> None: @@ -157,8 +166,9 @@ def fake_get_vdb_op_cls(vdb_op: str) -> type[_FakeVDB]: ] operator.process(data) vdb = operator._vdb - assert vdb.run_calls - rec = vdb.run_calls[0][0][0] + assert vdb.append_calls + records, _overwrite = vdb.append_calls[0] + rec = records[0][0] assert rec["metadata"]["content_metadata"].get("meta_a") == "zeta"