From fb49b1f7f4934f36c6a022a6a5ae6d1f5b321662 Mon Sep 17 00:00:00 2001 From: dlaptii Date: Mon, 18 May 2026 11:59:52 +0300 Subject: [PATCH 1/9] Update embeddings for table and column by id --- .../tabular_fetch_embeddings_operator.py | 205 +++++++++++++++++- .../graph/tabular_schema_extract_operator.py | 31 ++- .../tabular_data/ingestion/embeddings.py | 122 ----------- .../tabular_data/ingestion/extract_data.py | 16 +- .../tabular_data/ingestion/write_to_graph.py | 8 +- .../src/nemo_retriever/vdb/adt_vdb.py | 75 ++++++- .../src/nemo_retriever/vdb/lancedb.py | 137 ++++++++++++ .../src/nemo_retriever/vdb/operators.py | 56 +++++ .../tests/test_nv_ingest_vdb_operator.py | 3 + 9 files changed, 502 insertions(+), 151 deletions(-) delete mode 100644 nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py diff --git a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py index ecfceb0fbc..0a91b5c03e 100644 --- a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py @@ -2,28 +2,41 @@ # All rights reserved. # SPDX-License-Identifier: Apache-2.0 -"""Graph operator: fetch tabular entity descriptions from Neo4j into an embedding-ready DataFrame.""" +"""Graph operator: turn ``(tables_df, columns_df)`` into embedding-ready rows.""" from __future__ import annotations -from typing import Any +from typing import Any, Iterable import pandas as pd from nemo_retriever.graph.abstract_operator import AbstractOperator from nemo_retriever.graph.cpu_operator import CPUOperator +from nemo_retriever.tabular_data.ingestion.model.reserved_words import Labels class TabularFetchEmbeddingsOp(AbstractOperator, CPUOperator): - """Fetch all tabular entity descriptions from Neo4j into an embedding-ready DataFrame. + """Build an embedding-ready DataFrame from ``(tables_df, columns_df)``. - This operator ignores its input — it always queries Neo4j directly and - returns a fresh DataFrame with columns: - ``text``, ``_embed_modality``, ``path``, ``page_number``, ``metadata``. + Expected input: a 2-tuple ``(tables_df, columns_df)``. Both DataFrames + carry the post-ingest UUIDs of the Table/Column nodes written to Neo4j; + ``tables_df`` is keyed by ``id`` (table UUID) with at least + ``table_name``, ``table_schema`` and ``description`` columns, and + ``columns_df`` carries one row per column with ``id``, ``table_name``, + ``column_name``, ``data_type``, ``description`` and ``sample_values``. + Multiple schemas can be concatenated into the same pair — the + ``table_schema`` column on each table row keeps them distinguishable. - The output schema matches the format produced by the unstructured pipeline, - so the standard :class:`~nemo_retriever.text_embed.operators._BatchEmbedActor` - can be chained directly after this operator. + Output columns: ``text, _embed_modality, path, page_number, metadata``. + Two row types are produced: + + * one ``Table`` row per table, whose ``text`` joins the table description + with a compact list of its columns; and + * one ``Column`` row per column. + + The text templates match the previous Neo4j-derived format, so the rest + of the pipeline (``_BatchEmbedActor`` → ``IngestVdbOperator``) keeps + working untouched. """ def __init__( @@ -39,9 +52,179 @@ def preprocess(self, data: Any, **kwargs: Any) -> Any: return data def process(self, data: Any, **kwargs: Any) -> pd.DataFrame: - from nemo_retriever.tabular_data.ingestion.embeddings import fetch_tabular_embedding_dataframe + if not (isinstance(data, tuple) and len(data) == 2): + raise TypeError(f"Expected (tables_df, columns_df) tuple, got {type(data).__name__}") - return fetch_tabular_embedding_dataframe(database_name=self._database_name) + tables_df, columns_df = data + rows = list(self._build_rows(tables_df, columns_df)) + if rows: + return pd.DataFrame(rows) + return pd.DataFrame(columns=["text", "_embed_modality", "path", "page_number", "metadata"]) def postprocess(self, data: Any, **kwargs: Any) -> Any: return data + + def _build_rows(self, tables_df: pd.DataFrame, columns_df: pd.DataFrame) -> Iterable[dict[str, Any]]: + # Index columns by (schema, table_name) so duplicate table names across + # different schemas (e.g. two "users" tables in two schemas) don't get + # their columns merged into one bucket — which would both bloat each + # table's text and double-emit every column row per duplicate. + columns_by_table: dict[tuple[str, str], list[Any]] = {} + for _, col in columns_df.iterrows(): + key = ( + str(col.get("table_schema", "")).lower(), + str(col.get("table_name", "")).lower(), + ) + columns_by_table.setdefault(key, []).append(col) + + rows: list[dict[str, Any]] = [] + for _, table in tables_df.iterrows(): + table_id = str(table.get("id", "")) + table_name = str(table.get("table_name", "")) + table_description = "" if pd.isna(v := table.get("description")) else str(v).strip() + schema_name = str(table.get("table_schema", "")) + columns = columns_by_table.get((schema_name.lower(), table_name.lower()), []) + + table_text = _create_table_text( + table_name=table_name, + table_description=table_description, + columns=columns, + schema_name=schema_name, + database_name=self._database_name, + ) + rows.append( + _create_row( + text=table_text, + node_id=table_id, + label=Labels.TABLE, + name=table_name, + schema_name=schema_name, + database_name=self._database_name, + ) + ) + + for column in columns: + column_id = str(column.get("id", "")) + column_name = str(column.get("column_name", "")) + data_type = "" if pd.isna(v := column.get("data_type")) else str(v).strip() + column_description = "" if pd.isna(v := column.get("description")) else str(v).strip() + sample_values = (column.get("sample_values") or [])[:5] + column_text = _create_column_text( + column_name=column_name, + column_description=column_description, + data_type=data_type, + sample_values=sample_values, + schema_name=schema_name, + table_name=table_name, + database_name=self._database_name, + ) + rows.append( + _create_row( + text=column_text, + node_id=column_id, + label=Labels.COLUMN, + name=column_name, + schema_name=schema_name, + database_name=self._database_name, + ) + ) + return rows + + +# ── Helpers ────────────────────────────────────────────────────────────────── + + +def _create_table_text( + *, + table_name: str, + table_description: str, + columns: list[Any], + schema_name: str, + database_name: str, +) -> str: + """Build the embedding text for a Table node. + + Returns just the text string; the caller is responsible for wrapping it + in an embed-row dict via :func:`_create_row`. + """ + text = f"db_name: {database_name}" f", schema_name: {schema_name}" f", table_name: {table_name}" + if table_description: + text += f", table_description: {table_description}" + + column_pieces: list[str] = [] + for column in columns: + column_name = column.get("column_name", "") + data_type = "" if pd.isna(v := column.get("data_type")) else str(v).strip() + piece = f"{{name: {column_name}, data_type: {data_type}" + + column_description = "" if pd.isna(v := column.get("description")) else str(v).strip() + if column_description: + piece += f", description: {column_description}" + piece += "}" + column_pieces.append(piece) + + text += f", columns: {','.join(column_pieces)}" + return text + + +def _create_column_text( + *, + column_name: str, + column_description: str, + data_type: str, + sample_values: list[Any], + table_name: str, + schema_name: str, + database_name: str, +) -> str: + """Build the embedding text for a Column node. + + Returns just the text string; the caller is responsible for wrapping it + in an embed-row dict via :func:`_create_row`. + """ + text = ( + f"db_name: {database_name}" + f", schema_name: {schema_name}" + f", table_name: {table_name}" + f", column_name: {column_name}" + f", data_type: {data_type}" + ) + if column_description: + text += f", column_description: {column_description}" + if len(sample_values) > 0: + text += f", sample_values: {', '.join(str(x) for x in sample_values)}" + return text + + +def _create_row( + *, + text: str, + node_id: str | None, + label: str, + name: str, + schema_name: str, + database_name: str, +) -> dict[str, Any]: + path = f"neo4j:{node_id}" if node_id else "neo4j:unknown" + # Nest tabular identifiers under content_metadata so they survive the + # IngestVdbOperator → LanceDB write path (which only persists + # content_metadata + source_metadata into the table's metadata column). + # Top-level copies are kept for any in-memory consumer of this DataFrame. + tabular_fields = { + "id": node_id, + "label": label, + "name": name, + "source_path": path, + "schema_name": schema_name, + "database_name": database_name, + } + return { + "text": text.strip(), + "_embed_modality": "text", + "path": path, + "page_number": -1, + "metadata": { + **tabular_fields, + "content_metadata": dict(tabular_fields), + }, + } \ No newline at end of file diff --git a/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py b/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py index 439cef9ea2..8b4031e186 100644 --- a/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py @@ -24,9 +24,15 @@ class TabularSchemaExtractOp(AbstractOperator, CPUOperator): connector stored in *tabular_params*. 2. Write the extracted entities as graph nodes and relationships into Neo4j. - The operator produces an empty DataFrame as output so it can be chained - with downstream operators (e.g. :class:`TabularFetchEmbeddingsOp`) via - ``>>``. All meaningful state lives in Neo4j after this step. + The operator returns ``(tables_df, columns_df)`` — concatenated across + every ingested :class:`Schema` — carrying the post-ingest UUIDs written + to Neo4j. The per-row ``table_schema`` column keeps schemas + distinguishable. Downstream operators (notably + :class:`TabularFetchEmbeddingsOp`) can build embedding text directly + from this pair without a Neo4j round-trip. + + Returns ``(empty_df, empty_df)`` when there is nothing to ingest, so + the chain still flows. """ def __init__( @@ -43,15 +49,26 @@ def preprocess(self, data: Any, **kwargs: Any) -> TabularExtractParams | None: return data return self._tabular_params - def process(self, data: TabularExtractParams | None, **kwargs: Any) -> pd.DataFrame: + def process(self, data: TabularExtractParams | None, **kwargs: Any) -> tuple[pd.DataFrame, pd.DataFrame]: from nemo_retriever.tabular_data.ingestion.extract_data import ( extract_tabular_db_data, store_relational_db_in_neo4j, ) + empty = (pd.DataFrame(), pd.DataFrame()) + if data is None or data.connector is None: + return empty + schema_data = extract_tabular_db_data(params=data) - store_relational_db_in_neo4j(data=schema_data, dialect=data.connector.dialect) - return pd.DataFrame() + schemas = store_relational_db_in_neo4j(data=schema_data, dialect=data.connector.dialect) or {} + if not schemas: + return empty + + tables = [s.tables_df for s in schemas.values() if s.tables_df is not None] + columns = [s.columns_df for s in schemas.values() if s.columns_df is not None] + tables_df = pd.concat(tables, ignore_index=True) if tables else pd.DataFrame() + columns_df = pd.concat(columns, ignore_index=True) if columns else pd.DataFrame() + return tables_df, columns_df def postprocess(self, data: Any, **kwargs: Any) -> Any: - return data + return data \ No newline at end of file diff --git a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py deleted file mode 100644 index a5ec9d72be..0000000000 --- a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py +++ /dev/null @@ -1,122 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. -# All rights reserved. -# SPDX-License-Identifier: Apache-2.0 - -from typing import List - -import pandas as pd - -from nemo_retriever.tabular_data.neo4j import get_neo4j_conn -from nemo_retriever.tabular_data.ingestion.model.reserved_words import Edges, Labels - - -def query_neo4j_tables_for_embedding(database_name: str) -> List[dict]: - """Run the Neo4j query for tables not yet info_embedded; return list of doc dicts.""" - neo4j_conn = get_neo4j_conn() - query = f"""MATCH (d:{Labels.DB}{{name: $database_name}})-[:{Edges.CONTAINS}]-> - (s:{Labels.SCHEMA})-[:{Edges.CONTAINS}]->(t:{Labels.TABLE}) - MATCH (t)-[:{Edges.CONTAINS}]->(c:{Labels.COLUMN}) - WITH d, s, t, collect( - "{{name: " + c.name + ", data_type: " + c.data_type + - CASE WHEN c.description IS NOT NULL AND trim(c.description) <> '' - THEN ", description: " + c.description ELSE "" END + - "}}") as columns - RETURN collect({{ - text: "schema_name: " + s.name + - ", table_name: " + t.name + - CASE WHEN t.description IS NOT NULL AND trim(t.description) <> '' - THEN ", table_description: " + t.description ELSE "" END + - ", columns: " + apoc.text.join(columns, ' '), - name: t.name, label: labels(t)[0], id: t.id - }}) as docs - """ - result = neo4j_conn.query_read(query, parameters={"database_name": database_name}) - if not result: - return [] - return result[0].get("docs") or [] - - -def query_neo4j_columns_for_embedding(database_name: str) -> List[dict]: - """Return one doc per ``Column`` node for embedding (distinct from table-level rows).""" - neo4j_conn = get_neo4j_conn() - query = f""" - MATCH (d:{Labels.DB}{{name: $database_name}})-[:{Edges.CONTAINS}]->(s:{Labels.SCHEMA}) - -[:{Edges.CONTAINS}]->(t:{Labels.TABLE}) - -[:{Edges.CONTAINS}]->(c:{Labels.COLUMN}) - - WITH d, s, t, c, - CASE - WHEN c.description IS NOT NULL AND trim(toString(c.description)) <> '' - THEN ', column_description: ' + toString(c.description) - ELSE '' - END AS column_desc, - CASE - WHEN c.sample_values IS NOT NULL AND size(c.sample_values) > 0 - THEN ', sample_values: ' + apoc.text.join([x IN c.sample_values[..5] | toString(x)], ', ') - ELSE '' - END AS sample_vals - - RETURN collect({{ - text: 'db_name: ' + d.name + - ', schema_name: ' + s.name + - ', table_name: ' + t.name + - ', column_name: ' + c.name + - ', data_type: ' + coalesce(toString(c.data_type), '') + - column_desc + - sample_vals, - name: c.name, - label: labels(c)[0], - id: c.id - }}) AS docs - """ - result = neo4j_conn.query_read(query, parameters={"database_name": database_name}) - if not result: - return [] - return result[0].get("docs") or [] - - -def fetch_tabular_embedding_dataframe(database_name: str) -> pd.DataFrame: - """Fetch all tabular entity docs from Neo4j and return a DataFrame ready for embedding. - - Each row has: text, _embed_modality, path, page_number, metadata - (id, label, name, source_path) — matching the format produced by the - unstructured pipeline so run_pipeline_tasks_on_df works without changes. - """ - _empty = pd.DataFrame(columns=["text", "_embed_modality", "path", "page_number", "metadata"]) - table_docs = query_neo4j_tables_for_embedding(database_name=database_name) - column_docs = query_neo4j_columns_for_embedding(database_name=database_name) - docs = list(table_docs) + list(column_docs) - if not docs: - return _empty - - rows = [] - for item in docs: - text = (item.get("text") or "").strip() - node_id = item.get("id") - label = item.get("label", "") - name = item.get("name", "") - path = f"neo4j:{node_id}" if node_id is not None else "neo4j:unknown" - # Nest tabular identifiers under content_metadata so they survive the - # IngestVdbOperator → LanceDB write path (which only persists - # content_metadata + source_metadata into the table's metadata column). - # Top-level copies are kept for any in-memory consumer of this DataFrame. - tabular_fields = { - "id": node_id, - "label": label, - "name": name, - "source_path": path, - "database_name": database_name, - } - rows.append( - { - "text": text, - "_embed_modality": "text", - "path": path, - "page_number": -1, - "metadata": { - **tabular_fields, - "content_metadata": dict(tabular_fields), - }, - } - ) - return pd.DataFrame(rows) diff --git a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py index ea0e39129a..b5745b86c9 100644 --- a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py +++ b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py @@ -60,19 +60,23 @@ def store_relational_db_in_neo4j(data, dialect: str, num_workers: int = 4): Args: data: Data dict returned by extract_tabular_db_data(). dialect: SQL dialect used by the connector (e.g. "sqlite", "duckdb", "snowflake"). - neo4j_conn: Active Neo4jConnectionManager instance (unused directly here; - populate_tabular_data uses its own DAL connection, but - accepted for API consistency with the other ingest steps). + num_workers: Worker count forwarded to populate_tabular_data. + + Returns: + ``{schema_name_lower: Schema}`` dict produced during ingestion, so + callers can recover the post-ingest ``tables_df`` / ``columns_df`` + (with the UUIDs assigned to each Table/Column node) without a + round-trip back to Neo4j. Returns ``{}`` when *data* is empty. """ if not data: - return + return {} from nemo_retriever.tabular_data.ingestion.write_to_graph import ( populate_tabular_data, ) - populate_tabular_data( + return populate_tabular_data( data, num_workers=num_workers, dialect=dialect, - ) + ) \ No newline at end of file diff --git a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py index b0907e35c1..81e9a3fe57 100644 --- a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py +++ b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py @@ -38,23 +38,23 @@ def populate_tabular_data(data, num_workers, dialect): if tables_df is None or tables_df.empty: logger.warning("No tables found in source database; skipping graph population.") - return + return {} database = data["database_name"] logger.info(f"Started parsing db {database}.") - all_schemas = {} all_schemas = populate_db(tables_df, columns_df, database, num_workers) if "fks" in data: populate_fks(fks=data["fks"], database_name=database) + if "pks" in data: populate_pks(pks=data["pks"], database_name=database) if "queries" in data: populate_queries(all_schemas, data["queries"], num_workers, dialect) - return [] + return all_schemas def populate_db(tables_df, columns_df, database, num_workers): @@ -139,4 +139,4 @@ def populate_pks(pks, database_name: str): def _update_schema(schema, latest_timestamp): update_diff_from_existing_schema(schema, latest_timestamp) - logger.info(f"Updated schema {schema.get_schema_name()} to db.") + logger.info(f"Updated schema {schema.get_schema_name()} to db.") \ No newline at end of file diff --git a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py index b1097117b4..67c781e5dc 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py @@ -192,6 +192,79 @@ def retrieval(self, queries: list, **kwargs): """ pass + @abstractmethod + def upsert(self, records: list, **kwargs): + """Incrementally merge a batch of records into the target table/index. + + ``upsert`` exists as a separate abstract entry point from + :meth:`write_to_index` because it has fundamentally different + semantics. Where ``write_to_index`` is an *append* (or full + ingest) operation, ``upsert`` is a **stable-key merge**: + + * Rows whose key value already exists in the target table are + **updated in place** — all stored columns (including the dense + vector) are replaced with the values from ``records``. + * Rows whose key value is absent are **inserted**. + * Rows that already exist in the target but are *not* referenced + by ``records`` are **left untouched**. ``upsert`` MUST NOT + delete rows; tombstoning entities that have disappeared + upstream is intentionally out of scope and belongs in a + separate code path. + + This contract makes ``upsert`` suitable for incremental metadata + patches and partial re-ingests where the caller knows the stable + identity of the rows it is changing but does not want to rebuild + the whole index. + + Implementations are expected to: + + * Validate / transform records the same way :meth:`write_to_index` + does (e.g. enforce the embedding dimension, apply the + ``on_bad_vectors`` policy), so that an upserted row is + indistinguishable from one written via the full-ingest path. + * Drop rows whose ``key`` value is empty or ``None`` — an empty + merge key has no stable identity and would otherwise collapse + unrelated rows together. Skipped rows should be logged. + * Create the target table/index on the fly when it does not yet + exist (e.g. a metadata patch lands before the first full + ingest), if the backend supports it. Race-tolerance is + recommended: if a parallel writer wins the create, fall back + to opening the existing table and performing the merge. + * Avoid building heavy secondary structures (e.g. IVF/HNSW + vector indexes, FTS indexes) on the upsert path: incremental + batches are typically too small to train such indexes + meaningfully. Defer index builds to the next full + :meth:`write_to_index` / :meth:`create_index` call. + + Parameters: + - records (list): NV-Ingest-shaped batches (typically a list of + lists of record dicts) to merge into the target. The shape + mirrors what :meth:`write_to_index` accepts. + - table_name (str, optional): override the operator's configured + target table/index name for this call. When ``None``, the + implementation should use its default target. + - key (str, optional): name of the column used as the stable + merge key. Defaults to ``"id"``. Rows missing this column + (or with an empty value) should be skipped. + + Returns: + - implementation-specific result describing what happened + (typical fields include the number of rows merged, the + number of rows skipped for missing keys, and whether the + target table had to be created on the fly). Concrete + implementations should document the exact return shape. + + Backends that genuinely cannot support stable-key merges should + override this method and raise :class:`NotImplementedError` + explicitly so that :class:`UpsertVdbOperator` (and any other + caller) fails fast with a clear message instead of silently + no-oping or duplicating rows. + """ + raise NotImplementedError( + f"{type(self).__name__} does not implement upsert(); " + "incremental stable-key merges are not supported by this VDB backend." + ) + @abstractmethod def run(self, records): """Main entry point used by the NV-Ingest pipeline. @@ -235,4 +308,4 @@ def reindex(self, records: list, **kwargs): Returns: - implementation-specific result """ - pass + pass \ No newline at end of file diff --git a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py index 74f6b509d4..7870dc46d7 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py @@ -119,6 +119,10 @@ def _lancedb_arrow_schema(vector_dim: int) -> pa.Schema: pa.field("text", pa.string()), pa.field("metadata", pa.string()), pa.field("source", pa.string()), + # Stable entity id (e.g. Neo4j node UUID for tabular ingest). + # Empty string when the upstream payload has no stable identity; + # used as the merge key by ``LanceDB.upsert``. + pa.field("id", pa.string()), ] ) @@ -275,12 +279,21 @@ def _create_lancedb_results( logger.debug(f"No text found for entity: {source_name} page: {pg_num} type: {doc_type}") continue + # Promote a stable entity id (e.g. Neo4j node UUID for tabular + # ingest) to a top-level column so ``LanceDB.upsert`` can target a + # single row. Empty string means "no stable identity for this row". + row_id = content_meta.get("id") if isinstance(content_meta, dict) else None + if row_id is None and isinstance(metadata, dict): + row_id = metadata.get("id") + row_id_str = str(row_id) if row_id is not None else "" + lancedb_rows.append( { "vector": embedding, "text": text, "metadata": _json_str(content_meta), "source": _json_str(metadata.get("source_metadata", {})), + "id": row_id_str, } ) accepted += 1 @@ -529,6 +542,130 @@ def run(self, records): logger.info("Skipping LanceDB index creation for table %r because build_index=False.", self.table_name) return records + def upsert( + self, + records, + table_name: str | None = None, + key: str = "id", + ) -> dict[str, int | bool]: + """Merge-insert ``records`` into a LanceDB table on ``key``. + + Stable-key incremental update: + + * Rows matching an existing row by ``key`` are **updated in place** + (all columns, including ``vector``, are replaced). + * Rows whose ``key`` is absent in the table are **inserted**. + * Rows already in the table that are *not* referenced are **left + untouched** — this method never deletes. + + If the target table does not yet exist (e.g. an incremental update + lands before the first full ingest), it is created on the fly with + the same Arrow schema as :meth:`create_index`. The create branch is + race-tolerant: if a parallel writer wins ``create_table``, we + re-open and fall through to ``merge_insert``. + + Vector / FTS indexes are intentionally **not** built here: + incremental upserts typically carry only one or two rows and IVF + training needs at least two. Indexes will be (re)built by the next + full :meth:`run` / :meth:`write_to_index` call. + + Rows whose ``key`` value is empty/``None`` are skipped — an empty + merge key has no stable identity and would otherwise collapse + unrelated rows together. + + Returns the row counts dict from :func:`_create_lancedb_results` + plus: ``upserted``, ``inserted_via_create``, ``skipped_no_key``, + ``created_table``. + """ + target_name = table_name or self.table_name + connect_start = time.perf_counter() + db = lancedb.connect(uri=self.uri) + _record_timing("lancedb.connect", time.perf_counter() - connect_start) + + if self.validate_vector_length and self.on_bad_vectors != "error": + expected_dim: int | None = self.vector_dim + else: + expected_dim = None + + rows, counts = _create_lancedb_results(records or [], expected_dim=expected_dim) + + rows_with_key = [r for r in rows if r.get(key)] + skipped_no_key = len(rows) - len(rows_with_key) + if skipped_no_key: + logger.warning( + "LanceDB.upsert: dropping %d row(s) with empty %r — cannot upsert without a stable key.", + skipped_no_key, + key, + ) + + counts["upserted"] = 0 + counts["inserted_via_create"] = 0 + counts["skipped_no_key"] = skipped_no_key + counts["created_table"] = False + + if not rows_with_key: + logger.info("LanceDB.upsert: nothing to write to table %r.", target_name) + return counts + + try: + table = db.open_table(target_name) + except (ValueError, FileNotFoundError) as exc: + if isinstance(exc, ValueError) and not _is_missing_lancedb_table_error(exc): + raise + logger.info( + "LanceDB.upsert: target table %r not found at uri=%r (%s); " + "creating it on the fly from %d row(s). Indexes will be built " + "by the next full ingest.", + target_name, + self.uri, + exc, + len(rows_with_key), + ) + schema = _lancedb_arrow_schema(self.vector_dim) + create_kwargs: dict[str, Any] = { + "data": rows_with_key, + "schema": schema, + "mode": "create", + "on_bad_vectors": self.on_bad_vectors, + } + if self.on_bad_vectors == "fill": + create_kwargs["fill_value"] = self.fill_value + try: + create_start = time.perf_counter() + table = db.create_table(target_name, **create_kwargs) + _record_timing( + "lancedb.upsert_create_table", + time.perf_counter() - create_start, + {"rows": len(rows_with_key), "table": target_name}, + ) + counts["inserted_via_create"] = len(rows_with_key) + counts["created_table"] = True + return counts + except (ValueError, FileExistsError) as race_exc: + logger.info( + "LanceDB.upsert: race on create_table(%r) (%s); " + "falling back to merge_insert on the existing table.", + target_name, + race_exc, + ) + table = db.open_table(target_name) + + upsert_start = time.perf_counter() + ( + table.merge_insert(key) + .when_matched_update_all() + .when_not_matched_insert_all() + .execute(rows_with_key) + ) + _record_timing( + "lancedb.upsert", + time.perf_counter() - upsert_start, + {"rows": len(rows_with_key), "table": target_name}, + ) + + counts["upserted"] = len(rows_with_key) + return counts + def retrieval(self, vectors, **kwargs): """Search LanceDB with precomputed query vectors. diff --git a/nemo_retriever/src/nemo_retriever/vdb/operators.py b/nemo_retriever/src/nemo_retriever/vdb/operators.py index caf5c87710..5b75efc178 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/operators.py +++ b/nemo_retriever/src/nemo_retriever/vdb/operators.py @@ -134,6 +134,62 @@ def postprocess(self, data: Any, **kwargs: Any) -> Any: return data +class UpsertVdbOperator(IngestVdbOperator): + """Incrementally update an existing VDB table on a stable row key. + + Unlike :class:`IngestVdbOperator` (which orchestrates create_index + + write_to_index, optionally overwriting the whole table), this operator + calls ``vdb.upsert(records, ...)`` so that only rows whose ``key`` is in + ``records`` are touched. Rows in the table that are not referenced are + left untouched, and existing rows that match by ``key`` are replaced. + + The underlying VDB implementation must override + :meth:`~nemo_retriever.vdb.adt_vdb.VDB.upsert` with a real + stable-key merge; currently this is implemented by + :class:`~nemo_retriever.vdb.lancedb.LanceDB`. ``VDB.upsert`` itself + raises :class:`NotImplementedError`, so backends that have not + overridden it are detected at construction time and fail fast rather + than silently no-oping at runtime. + """ + + def __init__( + self, + *, + vdb: VDB | None = None, + vdb_op: str | None = None, + vdb_kwargs: dict[str, Any] | None = None, + key: str = "id", + table_name: str | None = None, + ) -> None: + super().__init__(vdb=vdb, vdb_op=vdb_op, vdb_kwargs=vdb_kwargs) + # ``upsert`` is part of the abstract VDB contract, but the base + # class provides a NotImplementedError stub for backends that + # cannot support stable-key merges. Treat a not-overridden stub + # as "unsupported" so misuse surfaces here instead of at the + # first write. + if getattr(type(self._vdb), "upsert", None) is VDB.upsert: + raise NotImplementedError( + f"VDB backend {type(self._vdb).__name__!r} does not implement upsert(); " + "only LanceDB is currently supported for incremental updates." + ) + self._key = key + self._table_name = table_name + + def process(self, data: Any, **kwargs: Any) -> Any: + records = to_client_vdb_records(data) + if self._sidecar_spec is not None and self._sidecar_lookup is not None: + records = apply_sidecar_metadata_to_client_batches( + records, + lookup=self._sidecar_lookup, + meta_fields=self._sidecar_spec["meta_fields"], + join_key=self._sidecar_spec["meta_join_key"], + ) + if records and any(batch for batch in records): + self._vdb.upsert(records, table_name=self._table_name, key=self._key) + return data + + + class RetrieveVdbOperator(AbstractOperator): """Retrieve hits from an nv-ingest-client VDB using precomputed query vectors.""" diff --git a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py index 8213405eea..1c2800b585 100644 --- a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py +++ b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py @@ -46,6 +46,9 @@ def retrieval(self, vectors: list, **kwargs: Any) -> list[list[dict[str, Any]]]: def run(self, records: Any) -> dict[str, Any]: self.run_calls.append(records) return {"records": records} + + def upsert(self, records: list, **kwargs: Any) -> None: + return None def _graph_rows() -> list[dict[str, Any]]: From 637e56160fc342af5320dbf6a1f59747034bb97d Mon Sep 17 00:00:00 2001 From: dlaptii Date: Mon, 18 May 2026 13:20:31 +0300 Subject: [PATCH 2/9] Fall back to Neo4j fetch when tabular tuple is unavailable - TabularFetchEmbeddingsOp: replace TypeError on non-tuple input with a call to fetch_tabular_embedding_dataframe so the pipeline can still produce embedding rows from Neo4j when upstream did not pass (tables_df, columns_df). - Add nemo_retriever.tabular_data.ingestion.embeddings module that queries Neo4j for Table/Column docs and returns an embedding-ready DataFrame matching the unstructured pipeline format. - lancedb: drop stale comment in _create_lancedb_results. --- .../tabular_fetch_embeddings_operator.py | 3 +- .../tabular_data/ingestion/embeddings.py | 122 ++++++++++++++++++ .../src/nemo_retriever/vdb/lancedb.py | 3 - 3 files changed, 124 insertions(+), 4 deletions(-) create mode 100644 nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py diff --git a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py index 0a91b5c03e..c43f812a56 100644 --- a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py @@ -12,6 +12,7 @@ from nemo_retriever.graph.abstract_operator import AbstractOperator from nemo_retriever.graph.cpu_operator import CPUOperator +from nemo_retriever.tabular_data.ingestion.embeddings import fetch_tabular_embedding_dataframe from nemo_retriever.tabular_data.ingestion.model.reserved_words import Labels @@ -53,7 +54,7 @@ def preprocess(self, data: Any, **kwargs: Any) -> Any: def process(self, data: Any, **kwargs: Any) -> pd.DataFrame: if not (isinstance(data, tuple) and len(data) == 2): - raise TypeError(f"Expected (tables_df, columns_df) tuple, got {type(data).__name__}") + return fetch_tabular_embedding_dataframe(database_name=self._database_name) tables_df, columns_df = data rows = list(self._build_rows(tables_df, columns_df)) diff --git a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py new file mode 100644 index 0000000000..d8cfb3c1ea --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import List + +import pandas as pd + +from nemo_retriever.tabular_data.neo4j import get_neo4j_conn +from nemo_retriever.tabular_data.ingestion.model.reserved_words import Edges, Labels + + +def query_neo4j_tables_for_embedding(database_name: str) -> List[dict]: + """Run the Neo4j query for tables not yet info_embedded; return list of doc dicts.""" + neo4j_conn = get_neo4j_conn() + query = f"""MATCH (d:{Labels.DB}{{name: $database_name}})-[:{Edges.CONTAINS}]-> + (s:{Labels.SCHEMA})-[:{Edges.CONTAINS}]->(t:{Labels.TABLE}) + MATCH (t)-[:{Edges.CONTAINS}]->(c:{Labels.COLUMN}) + WITH d, s, t, collect( + "{{name: " + c.name + ", data_type: " + c.data_type + + CASE WHEN c.description IS NOT NULL AND trim(c.description) <> '' + THEN ", description: " + c.description ELSE "" END + + "}}") as columns + RETURN collect({{ + text: "schema_name: " + s.name + + ", table_name: " + t.name + + CASE WHEN t.description IS NOT NULL AND trim(t.description) <> '' + THEN ", table_description: " + t.description ELSE "" END + + ", columns: " + apoc.text.join(columns, ' '), + name: t.name, label: labels(t)[0], id: t.id + }}) as docs + """ + result = neo4j_conn.query_read(query, parameters={"database_name": database_name}) + if not result: + return [] + return result[0].get("docs") or [] + + +def query_neo4j_columns_for_embedding(database_name: str) -> List[dict]: + """Return one doc per ``Column`` node for embedding (distinct from table-level rows).""" + neo4j_conn = get_neo4j_conn() + query = f""" + MATCH (d:{Labels.DB}{{name: $database_name}})-[:{Edges.CONTAINS}]->(s:{Labels.SCHEMA}) + -[:{Edges.CONTAINS}]->(t:{Labels.TABLE}) + -[:{Edges.CONTAINS}]->(c:{Labels.COLUMN}) + + WITH d, s, t, c, + CASE + WHEN c.description IS NOT NULL AND trim(toString(c.description)) <> '' + THEN ', column_description: ' + toString(c.description) + ELSE '' + END AS column_desc, + CASE + WHEN c.sample_values IS NOT NULL AND size(c.sample_values) > 0 + THEN ', sample_values: ' + apoc.text.join([x IN c.sample_values[..5] | toString(x)], ', ') + ELSE '' + END AS sample_vals + + RETURN collect({{ + text: 'db_name: ' + d.name + + ', schema_name: ' + s.name + + ', table_name: ' + t.name + + ', column_name: ' + c.name + + ', data_type: ' + coalesce(toString(c.data_type), '') + + column_desc + + sample_vals, + name: c.name, + label: labels(c)[0], + id: c.id + }}) AS docs + """ + result = neo4j_conn.query_read(query, parameters={"database_name": database_name}) + if not result: + return [] + return result[0].get("docs") or [] + + +def fetch_tabular_embedding_dataframe(database_name: str) -> pd.DataFrame: + """Fetch all tabular entity docs from Neo4j and return a DataFrame ready for embedding. + + Each row has: text, _embed_modality, path, page_number, metadata + (id, label, name, source_path) — matching the format produced by the + unstructured pipeline so run_pipeline_tasks_on_df works without changes. + """ + _empty = pd.DataFrame(columns=["text", "_embed_modality", "path", "page_number", "metadata"]) + table_docs = query_neo4j_tables_for_embedding(database_name=database_name) + column_docs = query_neo4j_columns_for_embedding(database_name=database_name) + docs = list(table_docs) + list(column_docs) + if not docs: + return _empty + + rows = [] + for item in docs: + text = (item.get("text") or "").strip() + node_id = item.get("id") + label = item.get("label", "") + name = item.get("name", "") + path = f"neo4j:{node_id}" if node_id is not None else "neo4j:unknown" + # Nest tabular identifiers under content_metadata so they survive the + # IngestVdbOperator → LanceDB write path (which only persists + # content_metadata + source_metadata into the table's metadata column). + # Top-level copies are kept for any in-memory consumer of this DataFrame. + tabular_fields = { + "id": node_id, + "label": label, + "name": name, + "source_path": path, + "database_name": database_name, + } + rows.append( + { + "text": text, + "_embed_modality": "text", + "path": path, + "page_number": -1, + "metadata": { + **tabular_fields, + "content_metadata": dict(tabular_fields), + }, + } + ) + return pd.DataFrame(rows) \ No newline at end of file diff --git a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py index 7870dc46d7..63bc851d2b 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py @@ -279,9 +279,6 @@ def _create_lancedb_results( logger.debug(f"No text found for entity: {source_name} page: {pg_num} type: {doc_type}") continue - # Promote a stable entity id (e.g. Neo4j node UUID for tabular - # ingest) to a top-level column so ``LanceDB.upsert`` can target a - # single row. Empty string means "no stable identity for this row". row_id = content_meta.get("id") if isinstance(content_meta, dict) else None if row_id is None and isinstance(metadata, dict): row_id = metadata.get("id") From 004fecd2acfb2333df936d607340bbc55e4e6ce6 Mon Sep 17 00:00:00 2001 From: dlaptii Date: Tue, 19 May 2026 11:05:47 +0300 Subject: [PATCH 3/9] resolve comments --- .../tabular_fetch_embeddings_operator.py | 9 +++++++++ .../src/nemo_retriever/vdb/adt_vdb.py | 19 ++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py index c43f812a56..a7cc172907 100644 --- a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py @@ -6,6 +6,7 @@ from __future__ import annotations +import logging from typing import Any, Iterable import pandas as pd @@ -15,6 +16,8 @@ from nemo_retriever.tabular_data.ingestion.embeddings import fetch_tabular_embedding_dataframe from nemo_retriever.tabular_data.ingestion.model.reserved_words import Labels +logger = logging.getLogger(__name__) + class TabularFetchEmbeddingsOp(AbstractOperator, CPUOperator): """Build an embedding-ready DataFrame from ``(tables_df, columns_df)``. @@ -54,6 +57,12 @@ def preprocess(self, data: Any, **kwargs: Any) -> Any: def process(self, data: Any, **kwargs: Any) -> pd.DataFrame: if not (isinstance(data, tuple) and len(data) == 2): + logger.warning( + "TabularFetchEmbeddingsOp received no (tables_df, columns_df) input " + "for database %r (got %s); falling back to fetching embeddings from the database.", + self._database_name, + type(data).__name__, + ) return fetch_tabular_embedding_dataframe(database_name=self._database_name) tables_df, columns_df = data diff --git a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py index 67c781e5dc..1f42145959 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from typing import Any """Abstract Vector Database (VDB) operator API. @@ -192,11 +193,23 @@ def retrieval(self, queries: list, **kwargs): """ pass - @abstractmethod - def upsert(self, records: list, **kwargs): + def upsert(self, records: list, **kwargs: Any) -> dict[str, Any]: """Incrementally merge a batch of records into the target table/index. - ``upsert`` exists as a separate abstract entry point from + Note: this method is intentionally **not** decorated with + :func:`abc.abstractmethod`. Marking it abstract would cause + Python's ABC machinery to refuse instantiation of any concrete + :class:`VDB` subclass that does not override ``upsert`` — which + would in turn make the early-detection guard in + :class:`~nemo_retriever.vdb.operators.UpsertVdbOperator` (which + compares ``type(self._vdb).upsert is VDB.upsert``) permanently + unreachable, since instantiation would already have failed. + The default body below raises :class:`NotImplementedError` so + backends that have not implemented stable-key merges fail fast + and visibly at the first ``upsert`` call (and are caught by the + operator-level guard at construction time). + + ``upsert`` exists as a separate entry point from :meth:`write_to_index` because it has fundamentally different semantics. Where ``write_to_index`` is an *append* (or full ingest) operation, ``upsert`` is a **stable-key merge**: From 200e2adaba6bfb53f5b4c0f38fe0a3a156d9f2aa Mon Sep 17 00:00:00 2001 From: dlaptii Date: Tue, 19 May 2026 11:21:32 +0300 Subject: [PATCH 4/9] fix test --- .../tests/test_nv_ingest_vdb_operator.py | 140 +++++++++++++++++- 1 file changed, 137 insertions(+), 3 deletions(-) diff --git a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py index 1c2800b585..554d407437 100644 --- a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py +++ b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py @@ -6,11 +6,13 @@ from typing import Any +import pandas as pd import pytest from nemo_retriever.vdb.adt_vdb import VDB from nemo_retriever.vdb import IngestVdbOperator, RetrieveVdbOperator from nemo_retriever.vdb import operators as vdb_operator_module +from nemo_retriever.vdb.operators import UpsertVdbOperator class FakeVDB(VDB): @@ -18,6 +20,9 @@ def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.run_calls: list[Any] = [] self.retrieval_calls: list[tuple[Any, dict[str, Any]]] = [] + # Per-call (records, kwargs) captures so UpsertVdbOperator tests can + # assert that ``key`` / ``table_name`` are forwarded correctly. + self.upsert_calls: list[tuple[Any, dict[str, Any]]] = [] def create_index(self, **kwargs: Any) -> None: return None @@ -46,9 +51,10 @@ def retrieval(self, vectors: list, **kwargs: Any) -> list[list[dict[str, Any]]]: def run(self, records: Any) -> dict[str, Any]: self.run_calls.append(records) return {"records": records} - - def upsert(self, records: list, **kwargs: Any) -> None: - return None + + def upsert(self, records: list, **kwargs: Any) -> dict[str, Any]: + self.upsert_calls.append((records, dict(kwargs))) + return {"upserted": sum(len(b) for b in records), "skipped_no_key": 0, "created_table": False} def _graph_rows() -> list[dict[str, Any]]: @@ -170,3 +176,131 @@ def test_constructor_requires_exactly_one_vdb_source() -> None: with pytest.raises(ValueError, match="Pass either vdb or vdb_op"): IngestVdbOperator(vdb=FakeVDB(), vdb_op="lancedb") + + +# ────────────────────────────────────────────────────────────────────────────── +# UpsertVdbOperator +# ────────────────────────────────────────────────────────────────────────────── + + +class _StubUpsertVDB(VDB): + """VDB subclass that intentionally does NOT override ``upsert``. + + Used to exercise the construction-time guard in + :class:`UpsertVdbOperator.__init__`, which compares + ``type(self._vdb).upsert is VDB.upsert`` to detect backends that + inherit the base-class ``NotImplementedError`` stub. + + Note: this class being instantiable at all is itself a regression + check — :meth:`VDB.upsert` must NOT be decorated with + ``@abstractmethod``; otherwise ABC machinery would reject this class + before the operator-level guard could run, making the guard dead code. + """ + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + + def create_index(self, **kwargs: Any) -> None: + return None + + def write_to_index(self, records: list, **kwargs: Any) -> None: + return None + + def retrieval(self, queries: list, **kwargs: Any) -> list[list[dict[str, Any]]]: + return [] + + def run(self, records: Any) -> None: + return None + + +def test_upsert_operator_rejects_vdb_without_upsert_override() -> None: + """Backends inheriting the ``VDB.upsert`` stub fail fast at construction.""" + stub = _StubUpsertVDB() + # Sanity-check the precondition the guard relies on: the subclass really + # is using the inherited stub, not its own implementation. If this ever + # fails, the guard's identity comparison would silently never fire. + assert type(stub).upsert is VDB.upsert + + with pytest.raises(NotImplementedError, match=r"does not implement upsert"): + UpsertVdbOperator(vdb=stub) + + +def test_upsert_operator_delegates_records_with_configured_key_and_table_name() -> None: + """Happy path: nv-ingest-converted records reach ``vdb.upsert`` with the configured key/table.""" + vdb = FakeVDB() + operator = UpsertVdbOperator(vdb=vdb, key="entity_id", table_name="entities") + + data = [ + { + "text": "graph chunk", + "text_embeddings_1b_v2": {"embedding": [0.1] * 2048}, + "source_id": "/tmp/doc-a.pdf", + "page_number": 7, + } + ] + + assert operator(data) is data + + assert vdb.run_calls == [] + assert len(vdb.upsert_calls) == 1 + call_records, call_kwargs = vdb.upsert_calls[0] + assert call_kwargs == {"table_name": "entities", "key": "entity_id"} + # The records that reach the backend must already be in nv-ingest-client + # shape (same conversion IngestVdbOperator performs), not the flat graph rows. + assert call_records == [ + [ + { + "document_type": "text", + "metadata": { + "embedding": [0.1] * 2048, + "content": "graph chunk", + "content_metadata": {"page_number": 7}, + "source_metadata": { + "source_id": "/tmp/doc-a.pdf", + "source_name": "doc-a.pdf", + }, + }, + } + ] + ] + + +def test_upsert_operator_merges_sidecar_metadata_into_records_before_upsert() -> None: + """Sidecar kwargs are split out from ``vdb_kwargs`` and applied before delegation.""" + vdb = FakeVDB() + meta_df = pd.DataFrame( + { + "source_id": ["/tmp/doc-a.pdf"], + "category": ["legal"], + } + ) + operator = UpsertVdbOperator( + vdb=vdb, + vdb_kwargs={ + "meta_dataframe": meta_df, + "meta_source_field": "source_id", + "meta_fields": ["category"], + "meta_join_key": "source_id", + }, + key="id", + table_name="my_table", + ) + + data = [ + { + "text": "graph chunk", + "text_embeddings_1b_v2": {"embedding": [0.1] * 2048}, + "source_id": "/tmp/doc-a.pdf", + "page_number": 7, + } + ] + + assert operator.process(data) is data + + assert len(vdb.upsert_calls) == 1 + call_records, call_kwargs = vdb.upsert_calls[0] + assert call_kwargs == {"table_name": "my_table", "key": "id"} + merged_content_meta = call_records[0][0]["metadata"]["content_metadata"] + # Sidecar column merged in alongside the per-row ``page_number``. + assert merged_content_meta["category"] == "legal" + assert merged_content_meta["page_number"] == 7 From 2b3ca75b891bb7c9729ec267c3d6ac8c7ea9c18e Mon Sep 17 00:00:00 2001 From: dlaptii Date: Tue, 19 May 2026 15:37:02 +0300 Subject: [PATCH 5/9] fix --- .cursorignore | 27 +++++++++++++++++++ .../tabular_fetch_embeddings_operator.py | 2 +- .../graph/tabular_schema_extract_operator.py | 2 +- .../tabular_data/ingestion/embeddings.py | 2 +- .../tabular_data/ingestion/extract_data.py | 2 +- .../tabular_data/ingestion/write_to_graph.py | 2 +- .../src/nemo_retriever/vdb/adt_vdb.py | 2 +- .../src/nemo_retriever/vdb/lancedb.py | 7 +---- .../src/nemo_retriever/vdb/operators.py | 1 - .../tests/test_nv_ingest_vdb_operator.py | 2 -- spider2 | 1 + 11 files changed, 35 insertions(+), 15 deletions(-) create mode 100644 .cursorignore create mode 160000 spider2 diff --git a/.cursorignore b/.cursorignore new file mode 100644 index 0000000000..86b87757b7 --- /dev/null +++ b/.cursorignore @@ -0,0 +1,27 @@ +# Large Database and Data Folders (The main culprits) +lancedb/ +data/ +datasets/ +models/ +checkpoints/ + +# Virtual Environments and Caches +.venv/ +venv/ +__pycache__/ +.pytest_cache/ +.cache/ + +# AI Agent & Third-Party Indexing Folders +.agents/ +.claude/ +.greptile/ +.github/ +.devcontainer/ + +# Build and Documentation Artifacts +docs/ +developer_docs/ +site/ +dist/ +build/ diff --git a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py index a7cc172907..6db5240f30 100644 --- a/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py @@ -237,4 +237,4 @@ def _create_row( **tabular_fields, "content_metadata": dict(tabular_fields), }, - } \ No newline at end of file + } diff --git a/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py b/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py index 8b4031e186..eca68d435d 100644 --- a/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py @@ -71,4 +71,4 @@ def process(self, data: TabularExtractParams | None, **kwargs: Any) -> tuple[pd. return tables_df, columns_df def postprocess(self, data: Any, **kwargs: Any) -> Any: - return data \ No newline at end of file + return data diff --git a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py index d8cfb3c1ea..a5ec9d72be 100644 --- a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py +++ b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/embeddings.py @@ -119,4 +119,4 @@ def fetch_tabular_embedding_dataframe(database_name: str) -> pd.DataFrame: }, } ) - return pd.DataFrame(rows) \ No newline at end of file + return pd.DataFrame(rows) diff --git a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py index b5745b86c9..7c3a3e49d5 100644 --- a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py +++ b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py @@ -79,4 +79,4 @@ def store_relational_db_in_neo4j(data, dialect: str, num_workers: int = 4): data, num_workers=num_workers, dialect=dialect, - ) \ No newline at end of file + ) diff --git a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py index 81e9a3fe57..af2b917ba0 100644 --- a/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py +++ b/nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py @@ -139,4 +139,4 @@ def populate_pks(pks, database_name: str): def _update_schema(schema, latest_timestamp): update_diff_from_existing_schema(schema, latest_timestamp) - logger.info(f"Updated schema {schema.get_schema_name()} to db.") \ No newline at end of file + logger.info(f"Updated schema {schema.get_schema_name()} to db.") diff --git a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py index 1f42145959..6d3e9083c1 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py @@ -321,4 +321,4 @@ def reindex(self, records: list, **kwargs): Returns: - implementation-specific result """ - pass \ No newline at end of file + pass diff --git a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py index 63bc851d2b..03eea63bc0 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py @@ -648,12 +648,7 @@ def upsert( table = db.open_table(target_name) upsert_start = time.perf_counter() - ( - table.merge_insert(key) - .when_matched_update_all() - .when_not_matched_insert_all() - .execute(rows_with_key) - ) + (table.merge_insert(key).when_matched_update_all().when_not_matched_insert_all().execute(rows_with_key)) _record_timing( "lancedb.upsert", time.perf_counter() - upsert_start, diff --git a/nemo_retriever/src/nemo_retriever/vdb/operators.py b/nemo_retriever/src/nemo_retriever/vdb/operators.py index 5b75efc178..5cb995d483 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/operators.py +++ b/nemo_retriever/src/nemo_retriever/vdb/operators.py @@ -189,7 +189,6 @@ def process(self, data: Any, **kwargs: Any) -> Any: return data - class RetrieveVdbOperator(AbstractOperator): """Retrieve hits from an nv-ingest-client VDB using precomputed query vectors.""" diff --git a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py index 554d407437..6052351023 100644 --- a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py +++ b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py @@ -20,8 +20,6 @@ def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.run_calls: list[Any] = [] self.retrieval_calls: list[tuple[Any, dict[str, Any]]] = [] - # Per-call (records, kwargs) captures so UpsertVdbOperator tests can - # assert that ``key`` / ``table_name`` are forwarded correctly. self.upsert_calls: list[tuple[Any, dict[str, Any]]] = [] def create_index(self, **kwargs: Any) -> None: diff --git a/spider2 b/spider2 new file mode 160000 index 0000000000..01a4c67c1e --- /dev/null +++ b/spider2 @@ -0,0 +1 @@ +Subproject commit 01a4c67c1e3f6ab9032716b050a927abbb245f65 From b68d7bd3a288a11fd52328f3efb6fe375a61091d Mon Sep 17 00:00:00 2001 From: dlaptii Date: Tue, 19 May 2026 15:44:11 +0300 Subject: [PATCH 6/9] fix --- .cursorignore | 27 --------------------------- spider2 | 1 - 2 files changed, 28 deletions(-) delete mode 100644 .cursorignore delete mode 160000 spider2 diff --git a/.cursorignore b/.cursorignore deleted file mode 100644 index 86b87757b7..0000000000 --- a/.cursorignore +++ /dev/null @@ -1,27 +0,0 @@ -# Large Database and Data Folders (The main culprits) -lancedb/ -data/ -datasets/ -models/ -checkpoints/ - -# Virtual Environments and Caches -.venv/ -venv/ -__pycache__/ -.pytest_cache/ -.cache/ - -# AI Agent & Third-Party Indexing Folders -.agents/ -.claude/ -.greptile/ -.github/ -.devcontainer/ - -# Build and Documentation Artifacts -docs/ -developer_docs/ -site/ -dist/ -build/ diff --git a/spider2 b/spider2 deleted file mode 160000 index 01a4c67c1e..0000000000 --- a/spider2 +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 01a4c67c1e3f6ab9032716b050a927abbb245f65 From 729a9d4a4c80d2dcf620fc34a28b4af8dc6f6496 Mon Sep 17 00:00:00 2001 From: dlaptii Date: Thu, 21 May 2026 13:35:48 +0300 Subject: [PATCH 7/9] refactor(vdb): rename upsert to put with strict update-only semantics Rename `VDB.upsert` / `UpsertVdbOperator` to `VDB.put` / `PutVdbOperator` and tighten the contract to an in-place replace: missing keys and rows not already present in the target table now raise `KeyError` instead of being inserted, and `put` no longer creates tables on the fly. Updates the LanceDB implementation and the operator tests accordingly. --- .../src/nemo_retriever/vdb/adt_vdb.py | 76 +++++----- .../src/nemo_retriever/vdb/lancedb.py | 132 +++++++----------- .../src/nemo_retriever/vdb/operators.py | 29 ++-- .../tests/test_nv_ingest_vdb_operator.py | 52 +++---- 4 files changed, 128 insertions(+), 161 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py index 47a5fbb4a7..d99378638d 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py @@ -134,89 +134,87 @@ def retrieval(self, queries: list, **kwargs): """ pass - def upsert(self, records: list, **kwargs: Any) -> dict[str, Any]: - """Incrementally merge a batch of records into the target table/index. + def put(self, records: list, **kwargs: Any) -> dict[str, Any]: + """Replace a batch of existing rows in the target table/index. Note: this method is intentionally **not** decorated with :func:`abc.abstractmethod`. Marking it abstract would cause Python's ABC machinery to refuse instantiation of any concrete - :class:`VDB` subclass that does not override ``upsert`` — which + :class:`VDB` subclass that does not override ``put`` — which would in turn make the early-detection guard in - :class:`~nemo_retriever.vdb.operators.UpsertVdbOperator` (which - compares ``type(self._vdb).upsert is VDB.upsert``) permanently + :class:`~nemo_retriever.vdb.operators.PutVdbOperator` (which + compares ``type(self._vdb).put is VDB.put``) permanently unreachable, since instantiation would already have failed. The default body below raises :class:`NotImplementedError` so - backends that have not implemented stable-key merges fail fast - and visibly at the first ``upsert`` call (and are caught by the + backends that have not implemented stable-key puts fail fast + and visibly at the first ``put`` call (and are caught by the operator-level guard at construction time). - ``upsert`` exists as a separate entry point from + ``put`` exists as a separate entry point from :meth:`write_to_index` because it has fundamentally different semantics. Where ``write_to_index`` is an *append* (or full - ingest) operation, ``upsert`` is a **stable-key merge**: + ingest) operation, ``put`` is a **strict in-place replace**: * Rows whose key value already exists in the target table are **updated in place** — all stored columns (including the dense vector) are replaced with the values from ``records``. - * Rows whose key value is absent are **inserted**. + * Rows whose key value does not exist in the target table + MUST raise :class:`KeyError`. ``put`` MUST NOT insert new + rows; ingestion of new rows belongs in + :meth:`write_to_index` / :meth:`run`. + * Rows whose key value is empty or ``None`` MUST raise + :class:`KeyError` — a put has no stable identity to target + without a key. * Rows that already exist in the target but are *not* referenced - by ``records`` are **left untouched**. ``upsert`` MUST NOT - delete rows; tombstoning entities that have disappeared - upstream is intentionally out of scope and belongs in a - separate code path. + by ``records`` are **left untouched**. ``put`` MUST NOT + delete rows. - This contract makes ``upsert`` suitable for incremental metadata - patches and partial re-ingests where the caller knows the stable - identity of the rows it is changing but does not want to rebuild - the whole index. + This contract makes ``put`` suitable for in-place metadata + patches where the caller knows the exact set of existing rows + it wants to change and would rather fail loudly than silently + no-op or duplicate data. Implementations are expected to: * Validate / transform records the same way :meth:`write_to_index` does (e.g. enforce the embedding dimension, apply the - ``on_bad_vectors`` policy), so that an upserted row is + ``on_bad_vectors`` policy), so that a put row is indistinguishable from one written via the full-ingest path. - * Drop rows whose ``key`` value is empty or ``None`` — an empty - merge key has no stable identity and would otherwise collapse - unrelated rows together. Skipped rows should be logged. - * Create the target table/index on the fly when it does not yet - exist (e.g. a metadata patch lands before the first full - ingest), if the backend supports it. Race-tolerance is - recommended: if a parallel writer wins the create, fall back - to opening the existing table and performing the merge. + * Raise :class:`FileNotFoundError` (or an equivalent) when the + target table does not yet exist. ``put`` MUST NOT create + tables on the fly. * Avoid building heavy secondary structures (e.g. IVF/HNSW - vector indexes, FTS indexes) on the upsert path: incremental + vector indexes, FTS indexes) on the put path: incremental batches are typically too small to train such indexes meaningfully. Defer index builds to the next full :meth:`write_to_index` / :meth:`create_index` call. Parameters: - records (list): NV-Ingest-shaped batches (typically a list of - lists of record dicts) to merge into the target. The shape + lists of record dicts) to put into the target. The shape mirrors what :meth:`write_to_index` accepts. - table_name (str, optional): override the operator's configured target table/index name for this call. When ``None``, the implementation should use its default target. - key (str, optional): name of the column used as the stable - merge key. Defaults to ``"id"``. Rows missing this column - (or with an empty value) should be skipped. + put key. Defaults to ``"id"``. Rows missing this column + (or with an empty value) MUST raise :class:`KeyError`. Returns: - implementation-specific result describing what happened - (typical fields include the number of rows merged, the - number of rows skipped for missing keys, and whether the - target table had to be created on the fly). Concrete - implementations should document the exact return shape. + (typical fields include the number of rows put). + Concrete implementations should document the exact return + shape. - Backends that genuinely cannot support stable-key merges should + Backends that genuinely cannot support stable-key puts should override this method and raise :class:`NotImplementedError` - explicitly so that :class:`UpsertVdbOperator` (and any other + explicitly so that :class:`PutVdbOperator` (and any other caller) fails fast with a clear message instead of silently no-oping or duplicating rows. """ raise NotImplementedError( - f"{type(self).__name__} does not implement upsert(); " - "incremental stable-key merges are not supported by this VDB backend." + f"{type(self).__name__} does not implement put(); " + "in-place stable-key puts are not supported by this VDB backend." ) @abstractmethod diff --git a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py index 853a8546bb..f43a3a1d55 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/lancedb.py +++ b/nemo_retriever/src/nemo_retriever/vdb/lancedb.py @@ -12,6 +12,7 @@ import lancedb import pyarrow as pa +import pyarrow.compute as pc from nemo_retriever.vdb.adt_vdb import VDB @@ -119,9 +120,6 @@ def _lancedb_arrow_schema(vector_dim: int) -> pa.Schema: pa.field("text", pa.string()), pa.field("metadata", pa.string()), pa.field("source", pa.string()), - # Stable entity id (e.g. Neo4j node UUID for tabular ingest). - # Empty string when the upstream payload has no stable identity; - # used as the merge key by ``LanceDB.upsert``. pa.field("id", pa.string()), ] ) @@ -539,40 +537,35 @@ def run(self, records): logger.info("Skipping LanceDB index creation for table %r because build_index=False.", self.table_name) return records - def upsert( + def put( self, records, table_name: str | None = None, key: str = "id", - ) -> dict[str, int | bool]: - """Merge-insert ``records`` into a LanceDB table on ``key``. + ) -> dict[str, int]: + """Replace existing rows of a LanceDB table in place, keyed by ``key``. - Stable-key incremental update: + Strict update-only semantics: * Rows matching an existing row by ``key`` are **updated in place** (all columns, including ``vector``, are replaced). - * Rows whose ``key`` is absent in the table are **inserted**. + * Rows whose ``key`` value is missing/empty raise :class:`KeyError` + — a put operation has no stable identity to target without a key. + * Rows whose ``key`` value does not match any row currently in the + table raise :class:`KeyError` — ``put`` never inserts new rows. * Rows already in the table that are *not* referenced are **left - untouched** — this method never deletes. + untouched** — ``put`` never deletes. - If the target table does not yet exist (e.g. an incremental update - lands before the first full ingest), it is created on the fly with - the same Arrow schema as :meth:`create_index`. The create branch is - race-tolerant: if a parallel writer wins ``create_table``, we - re-open and fall through to ``merge_insert``. + If the target table does not exist, :class:`FileNotFoundError` is + raised; ``put`` will not create tables on the fly. - Vector / FTS indexes are intentionally **not** built here: - incremental upserts typically carry only one or two rows and IVF - training needs at least two. Indexes will be (re)built by the next - full :meth:`run` / :meth:`write_to_index` call. - - Rows whose ``key`` value is empty/``None`` are skipped — an empty - merge key has no stable identity and would otherwise collapse - unrelated rows together. + Vector / FTS indexes are intentionally **not** rebuilt here: + incremental puts typically carry only a handful of rows. Indexes + will be (re)built by the next full :meth:`run` / + :meth:`write_to_index` call. Returns the row counts dict from :func:`_create_lancedb_results` - plus: ``upserted``, ``inserted_via_create``, ``skipped_no_key``, - ``created_table``. + plus: ``put``. """ target_name = table_name or self.table_name connect_start = time.perf_counter() @@ -585,77 +578,52 @@ def upsert( expected_dim = None rows, counts = _create_lancedb_results(records or [], expected_dim=expected_dim) + counts["put"] = 0 - rows_with_key = [r for r in rows if r.get(key)] - skipped_no_key = len(rows) - len(rows_with_key) - if skipped_no_key: - logger.warning( - "LanceDB.upsert: dropping %d row(s) with empty %r — cannot upsert without a stable key.", - skipped_no_key, - key, - ) - - counts["upserted"] = 0 - counts["inserted_via_create"] = 0 - counts["skipped_no_key"] = skipped_no_key - counts["created_table"] = False - - if not rows_with_key: - logger.info("LanceDB.upsert: nothing to write to table %r.", target_name) + if not rows: + logger.info("LanceDB.put: nothing to put into table %r.", target_name) return counts + rows_missing_key = [r for r in rows if not r.get(key)] + if rows_missing_key: + raise KeyError( + f"LanceDB.put: {len(rows_missing_key)} row(s) have an empty {key!r} value; " + "put() requires a stable id for every row." + ) + try: table = db.open_table(target_name) except (ValueError, FileNotFoundError) as exc: if isinstance(exc, ValueError) and not _is_missing_lancedb_table_error(exc): raise - logger.info( - "LanceDB.upsert: target table %r not found at uri=%r (%s); " - "creating it on the fly from %d row(s). Indexes will be built " - "by the next full ingest.", - target_name, - self.uri, - exc, - len(rows_with_key), + raise FileNotFoundError( + f"LanceDB.put: table {target_name!r} not found at uri={self.uri!r}; " + "put() only updates existing rows and will not create tables." + ) from exc + + input_ids = [r[key] for r in rows] + unique_input_ids = list(dict.fromkeys(input_ids)) + + filter_expr = pc.field(key).isin(pa.array(unique_input_ids, type=pa.string())) + existing_arrow = table.to_lance().to_table(columns=[key], filter=filter_expr) + existing_ids = set(existing_arrow.column(key).to_pylist()) + + missing_ids = [i for i in unique_input_ids if i not in existing_ids] + if missing_ids: + raise KeyError( + f"LanceDB.put: row(s) with {key}={missing_ids!r} not found in table " + f"{target_name!r}; put() only updates existing rows." ) - schema = _lancedb_arrow_schema(self.vector_dim) - create_kwargs: dict[str, Any] = { - "data": rows_with_key, - "schema": schema, - "mode": "create", - "on_bad_vectors": self.on_bad_vectors, - } - if self.on_bad_vectors == "fill": - create_kwargs["fill_value"] = self.fill_value - try: - create_start = time.perf_counter() - table = db.create_table(target_name, **create_kwargs) - _record_timing( - "lancedb.upsert_create_table", - time.perf_counter() - create_start, - {"rows": len(rows_with_key), "table": target_name}, - ) - counts["inserted_via_create"] = len(rows_with_key) - counts["created_table"] = True - return counts - except (ValueError, FileExistsError) as race_exc: - logger.info( - "LanceDB.upsert: race on create_table(%r) (%s); " - "falling back to merge_insert on the existing table.", - target_name, - race_exc, - ) - table = db.open_table(target_name) - upsert_start = time.perf_counter() - (table.merge_insert(key).when_matched_update_all().when_not_matched_insert_all().execute(rows_with_key)) + put_start = time.perf_counter() + table.merge_insert(key).when_matched_update_all().execute(rows) _record_timing( - "lancedb.upsert", - time.perf_counter() - upsert_start, - {"rows": len(rows_with_key), "table": target_name}, + "lancedb.put", + time.perf_counter() - put_start, + {"rows": len(rows), "table": target_name}, ) - counts["upserted"] = len(rows_with_key) + counts["put"] = len(rows) return counts def retrieval(self, vectors, **kwargs): diff --git a/nemo_retriever/src/nemo_retriever/vdb/operators.py b/nemo_retriever/src/nemo_retriever/vdb/operators.py index a07e581986..53db729517 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/operators.py +++ b/nemo_retriever/src/nemo_retriever/vdb/operators.py @@ -134,19 +134,21 @@ def postprocess(self, data: Any, **kwargs: Any) -> Any: return data -class UpsertVdbOperator(IngestVdbOperator): - """Incrementally update an existing VDB table on a stable row key. +class PutVdbOperator(IngestVdbOperator): + """Replace existing rows of a VDB table in place on a stable row key. Unlike :class:`IngestVdbOperator` (which orchestrates create_index + write_to_index, optionally overwriting the whole table), this operator - calls ``vdb.upsert(records, ...)`` so that only rows whose ``key`` is in - ``records`` are touched. Rows in the table that are not referenced are - left untouched, and existing rows that match by ``key`` are replaced. + calls ``vdb.put(records, ...)`` so that only rows whose ``key`` is in + ``records`` are touched. Existing rows that match by ``key`` are + replaced; rows in ``records`` whose ``key`` is not already present in + the table raise :class:`KeyError` (``put`` never inserts new rows), + and rows in the table that are not referenced are left untouched. The underlying VDB implementation must override - :meth:`~nemo_retriever.vdb.adt_vdb.VDB.upsert` with a real - stable-key merge; currently this is implemented by - :class:`~nemo_retriever.vdb.lancedb.LanceDB`. ``VDB.upsert`` itself + :meth:`~nemo_retriever.vdb.adt_vdb.VDB.put` with a real + stable-key in-place replace; currently this is implemented by + :class:`~nemo_retriever.vdb.lancedb.LanceDB`. ``VDB.put`` itself raises :class:`NotImplementedError`, so backends that have not overridden it are detected at construction time and fail fast rather than silently no-oping at runtime. @@ -162,15 +164,14 @@ def __init__( table_name: str | None = None, ) -> None: super().__init__(vdb=vdb, vdb_op=vdb_op, vdb_kwargs=vdb_kwargs) - # ``upsert`` is part of the abstract VDB contract, but the base + # ``put`` is part of the abstract VDB contract, but the base # class provides a NotImplementedError stub for backends that - # cannot support stable-key merges. Treat a not-overridden stub + # cannot support stable-key puts. Treat a not-overridden stub # as "unsupported" so misuse surfaces here instead of at the # first write. - if getattr(type(self._vdb), "upsert", None) is VDB.upsert: + if getattr(type(self._vdb), "put", None) is VDB.put: raise NotImplementedError( - f"VDB backend {type(self._vdb).__name__!r} does not implement upsert(); " - "only LanceDB is currently supported for incremental updates." + f"VDB backend {type(self._vdb).__name__!r} does not implement put(); " ) self._key = key self._table_name = table_name @@ -185,7 +186,7 @@ def process(self, data: Any, **kwargs: Any) -> Any: join_key=self._sidecar_spec["meta_join_key"], ) if records and any(batch for batch in records): - self._vdb.upsert(records, table_name=self._table_name, key=self._key) + self._vdb.put(records, table_name=self._table_name, key=self._key) return data diff --git a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py index 9182696471..70c1f5bf43 100644 --- a/nemo_retriever/tests/test_nv_ingest_vdb_operator.py +++ b/nemo_retriever/tests/test_nv_ingest_vdb_operator.py @@ -12,7 +12,7 @@ from nemo_retriever.vdb.adt_vdb import VDB from nemo_retriever.vdb import IngestVdbOperator, RetrieveVdbOperator from nemo_retriever.vdb import operators as vdb_operator_module -from nemo_retriever.vdb.operators import UpsertVdbOperator +from nemo_retriever.vdb.operators import PutVdbOperator class FakeVDB(VDB): @@ -20,7 +20,7 @@ def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self.run_calls: list[Any] = [] self.retrieval_calls: list[tuple[Any, dict[str, Any]]] = [] - self.upsert_calls: list[tuple[Any, dict[str, Any]]] = [] + self.put_calls: list[tuple[Any, dict[str, Any]]] = [] def create_index(self, **kwargs: Any) -> None: return None @@ -50,9 +50,9 @@ def run(self, records: Any) -> dict[str, Any]: self.run_calls.append(records) return {"records": records} - def upsert(self, records: list, **kwargs: Any) -> dict[str, Any]: - self.upsert_calls.append((records, dict(kwargs))) - return {"upserted": sum(len(b) for b in records), "skipped_no_key": 0, "created_table": False} + def put(self, records: list, **kwargs: Any) -> dict[str, Any]: + self.put_calls.append((records, dict(kwargs))) + return {"put": sum(len(b) for b in records)} def _graph_rows() -> list[dict[str, Any]]: @@ -177,20 +177,20 @@ def test_constructor_requires_exactly_one_vdb_source() -> None: # ────────────────────────────────────────────────────────────────────────────── -# UpsertVdbOperator +# PutVdbOperator # ────────────────────────────────────────────────────────────────────────────── -class _StubUpsertVDB(VDB): - """VDB subclass that intentionally does NOT override ``upsert``. +class _StubPutVDB(VDB): + """VDB subclass that intentionally does NOT override ``put``. Used to exercise the construction-time guard in - :class:`UpsertVdbOperator.__init__`, which compares - ``type(self._vdb).upsert is VDB.upsert`` to detect backends that + :class:`PutVdbOperator.__init__`, which compares + ``type(self._vdb).put is VDB.put`` to detect backends that inherit the base-class ``NotImplementedError`` stub. Note: this class being instantiable at all is itself a regression - check — :meth:`VDB.upsert` must NOT be decorated with + check — :meth:`VDB.put` must NOT be decorated with ``@abstractmethod``; otherwise ABC machinery would reject this class before the operator-level guard could run, making the guard dead code. """ @@ -211,22 +211,22 @@ def run(self, records: Any) -> None: return None -def test_upsert_operator_rejects_vdb_without_upsert_override() -> None: - """Backends inheriting the ``VDB.upsert`` stub fail fast at construction.""" - stub = _StubUpsertVDB() +def test_put_operator_rejects_vdb_without_put_override() -> None: + """Backends inheriting the ``VDB.put`` stub fail fast at construction.""" + stub = _StubPutVDB() # Sanity-check the precondition the guard relies on: the subclass really # is using the inherited stub, not its own implementation. If this ever # fails, the guard's identity comparison would silently never fire. - assert type(stub).upsert is VDB.upsert + assert type(stub).put is VDB.put - with pytest.raises(NotImplementedError, match=r"does not implement upsert"): - UpsertVdbOperator(vdb=stub) + with pytest.raises(NotImplementedError, match=r"does not implement put"): + PutVdbOperator(vdb=stub) -def test_upsert_operator_delegates_records_with_configured_key_and_table_name() -> None: - """Happy path: nv-ingest-converted records reach ``vdb.upsert`` with the configured key/table.""" +def test_put_operator_delegates_records_with_configured_key_and_table_name() -> None: + """Happy path: nv-ingest-converted records reach ``vdb.put`` with the configured key/table.""" vdb = FakeVDB() - operator = UpsertVdbOperator(vdb=vdb, key="entity_id", table_name="entities") + operator = PutVdbOperator(vdb=vdb, key="entity_id", table_name="entities") data = [ { @@ -240,8 +240,8 @@ def test_upsert_operator_delegates_records_with_configured_key_and_table_name() assert operator(data) is data assert vdb.run_calls == [] - assert len(vdb.upsert_calls) == 1 - call_records, call_kwargs = vdb.upsert_calls[0] + assert len(vdb.put_calls) == 1 + call_records, call_kwargs = vdb.put_calls[0] assert call_kwargs == {"table_name": "entities", "key": "entity_id"} # The records that reach the backend must already be in nv-ingest-client # shape (same conversion IngestVdbOperator performs), not the flat graph rows. @@ -263,7 +263,7 @@ def test_upsert_operator_delegates_records_with_configured_key_and_table_name() ] -def test_upsert_operator_merges_sidecar_metadata_into_records_before_upsert() -> None: +def test_put_operator_merges_sidecar_metadata_into_records_before_put() -> None: """Sidecar kwargs are split out from ``vdb_kwargs`` and applied before delegation.""" vdb = FakeVDB() meta_df = pd.DataFrame( @@ -272,7 +272,7 @@ def test_upsert_operator_merges_sidecar_metadata_into_records_before_upsert() -> "category": ["legal"], } ) - operator = UpsertVdbOperator( + operator = PutVdbOperator( vdb=vdb, vdb_kwargs={ "meta_dataframe": meta_df, @@ -295,8 +295,8 @@ def test_upsert_operator_merges_sidecar_metadata_into_records_before_upsert() -> assert operator.process(data) is data - assert len(vdb.upsert_calls) == 1 - call_records, call_kwargs = vdb.upsert_calls[0] + assert len(vdb.put_calls) == 1 + call_records, call_kwargs = vdb.put_calls[0] assert call_kwargs == {"table_name": "my_table", "key": "id"} merged_content_meta = call_records[0][0]["metadata"]["content_metadata"] # Sidecar column merged in alongside the per-row ``page_number``. From c4dc2dc1124b684623004c088673285658b9b133 Mon Sep 17 00:00:00 2001 From: dlaptii Date: Sun, 24 May 2026 09:44:44 +0300 Subject: [PATCH 8/9] fix(vdb): export PutVdbOperator from package init Add PutVdbOperator to the public API alongside IngestVdbOperator and RetrieveVdbOperator so callers using the standard package import path (`from nemo_retriever.vdb import PutVdbOperator`) can access it. --- nemo_retriever/src/nemo_retriever/vdb/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nemo_retriever/src/nemo_retriever/vdb/__init__.py b/nemo_retriever/src/nemo_retriever/vdb/__init__.py index 2c03d7a669..cd4c4f3548 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/__init__.py +++ b/nemo_retriever/src/nemo_retriever/vdb/__init__.py @@ -6,7 +6,7 @@ from nemo_retriever.vdb.adt_vdb import VDB from nemo_retriever.vdb.factory import get_vdb_op_cls -from nemo_retriever.vdb.operators import IngestVdbOperator, RetrieveVdbOperator +from nemo_retriever.vdb.operators import IngestVdbOperator, PutVdbOperator, RetrieveVdbOperator from nemo_retriever.vdb.records import RetrievalHit, normalize_retrieval_results, to_client_vdb_records from nemo_retriever.vdb.sidecar_metadata import ( apply_sidecar_metadata_to_client_batches, @@ -22,6 +22,7 @@ "VDB", "get_vdb_op_cls", "IngestVdbOperator", + "PutVdbOperator", "RetrievalHit", "RetrieveVdbOperator", "normalize_retrieval_results", From 1ebe6e6a8806eaf20982775e9ee4c20f87ca01b4 Mon Sep 17 00:00:00 2001 From: dlaptii Date: Sun, 24 May 2026 11:28:25 +0300 Subject: [PATCH 9/9] style(vdb): apply pre-commit formatting to PutVdbOperator Collapse the multi-line NotImplementedError raise into a single line to match what the pre-commit hook produces, unblocking CI. --- nemo_retriever/src/nemo_retriever/vdb/operators.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nemo_retriever/src/nemo_retriever/vdb/operators.py b/nemo_retriever/src/nemo_retriever/vdb/operators.py index 53db729517..807ae7736b 100644 --- a/nemo_retriever/src/nemo_retriever/vdb/operators.py +++ b/nemo_retriever/src/nemo_retriever/vdb/operators.py @@ -170,9 +170,7 @@ def __init__( # as "unsupported" so misuse surfaces here instead of at the # first write. if getattr(type(self._vdb), "put", None) is VDB.put: - raise NotImplementedError( - f"VDB backend {type(self._vdb).__name__!r} does not implement put(); " - ) + raise NotImplementedError(f"VDB backend {type(self._vdb).__name__!r} does not implement put(); ") self._key = key self._table_name = table_name