Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,45 @@
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Graph operator: fetch tabular entity descriptions from Neo4j into an embedding-ready DataFrame."""
"""Graph operator: turn ``(tables_df, columns_df)`` into embedding-ready rows."""

from __future__ import annotations

from typing import Any
import logging
from typing import Any, Iterable

import pandas as pd

from nemo_retriever.graph.abstract_operator import AbstractOperator
from nemo_retriever.graph.cpu_operator import CPUOperator
from nemo_retriever.tabular_data.ingestion.embeddings import fetch_tabular_embedding_dataframe
from nemo_retriever.tabular_data.ingestion.model.reserved_words import Labels

logger = logging.getLogger(__name__)


class TabularFetchEmbeddingsOp(AbstractOperator, CPUOperator):
"""Fetch all tabular entity descriptions from Neo4j into an embedding-ready DataFrame.
"""Build an embedding-ready DataFrame from ``(tables_df, columns_df)``.

Expected input: a 2-tuple ``(tables_df, columns_df)``. Both DataFrames
carry the post-ingest UUIDs of the Table/Column nodes written to Neo4j;
``tables_df`` is keyed by ``id`` (table UUID) with at least
``table_name``, ``table_schema`` and ``description`` columns, and
``columns_df`` carries one row per column with ``id``, ``table_name``,
``column_name``, ``data_type``, ``description`` and ``sample_values``.
Multiple schemas can be concatenated into the same pair — the
``table_schema`` column on each table row keeps them distinguishable.

Output columns: ``text, _embed_modality, path, page_number, metadata``.
Two row types are produced:

This operator ignores its input — it always queries Neo4j directly and
returns a fresh DataFrame with columns:
``text``, ``_embed_modality``, ``path``, ``page_number``, ``metadata``.
* one ``Table`` row per table, whose ``text`` joins the table description
with a compact list of its columns; and
* one ``Column`` row per column.

The output schema matches the format produced by the unstructured pipeline,
so the standard :class:`~nemo_retriever.text_embed.operators._BatchEmbedActor`
can be chained directly after this operator.
The text templates match the previous Neo4j-derived format, so the rest
of the pipeline (``_BatchEmbedActor`` → ``IngestVdbOperator``) keeps
working untouched.
"""

def __init__(
Expand All @@ -39,9 +56,185 @@ def preprocess(self, data: Any, **kwargs: Any) -> Any:
return data

def process(self, data: Any, **kwargs: Any) -> pd.DataFrame:
from nemo_retriever.tabular_data.ingestion.embeddings import fetch_tabular_embedding_dataframe
if not (isinstance(data, tuple) and len(data) == 2):
logger.warning(
"TabularFetchEmbeddingsOp received no (tables_df, columns_df) input "
"for database %r (got %s); falling back to fetching embeddings from the database.",
self._database_name,
type(data).__name__,
)
return fetch_tabular_embedding_dataframe(database_name=self._database_name)
Comment thread
yuvalshkolar marked this conversation as resolved.

return fetch_tabular_embedding_dataframe(database_name=self._database_name)
tables_df, columns_df = data
rows = list(self._build_rows(tables_df, columns_df))
if rows:
return pd.DataFrame(rows)
return pd.DataFrame(columns=["text", "_embed_modality", "path", "page_number", "metadata"])

def postprocess(self, data: Any, **kwargs: Any) -> Any:
return data

def _build_rows(self, tables_df: pd.DataFrame, columns_df: pd.DataFrame) -> Iterable[dict[str, Any]]:
# Index columns by (schema, table_name) so duplicate table names across
# different schemas (e.g. two "users" tables in two schemas) don't get
# their columns merged into one bucket — which would both bloat each
# table's text and double-emit every column row per duplicate.
columns_by_table: dict[tuple[str, str], list[Any]] = {}
for _, col in columns_df.iterrows():
key = (
str(col.get("table_schema", "")).lower(),
str(col.get("table_name", "")).lower(),
)
columns_by_table.setdefault(key, []).append(col)

rows: list[dict[str, Any]] = []
for _, table in tables_df.iterrows():
table_id = str(table.get("id", ""))
table_name = str(table.get("table_name", ""))
table_description = "" if pd.isna(v := table.get("description")) else str(v).strip()
schema_name = str(table.get("table_schema", ""))
columns = columns_by_table.get((schema_name.lower(), table_name.lower()), [])

table_text = _create_table_text(
table_name=table_name,
table_description=table_description,
columns=columns,
schema_name=schema_name,
database_name=self._database_name,
)
rows.append(
_create_row(
text=table_text,
node_id=table_id,
label=Labels.TABLE,
name=table_name,
schema_name=schema_name,
database_name=self._database_name,
)
)

for column in columns:
column_id = str(column.get("id", ""))
column_name = str(column.get("column_name", ""))
data_type = "" if pd.isna(v := column.get("data_type")) else str(v).strip()
column_description = "" if pd.isna(v := column.get("description")) else str(v).strip()
sample_values = (column.get("sample_values") or [])[:5]
column_text = _create_column_text(
column_name=column_name,
column_description=column_description,
data_type=data_type,
sample_values=sample_values,
schema_name=schema_name,
table_name=table_name,
database_name=self._database_name,
)
rows.append(
_create_row(
text=column_text,
node_id=column_id,
label=Labels.COLUMN,
name=column_name,
schema_name=schema_name,
database_name=self._database_name,
)
)
return rows


# ── Helpers ──────────────────────────────────────────────────────────────────


def _create_table_text(
*,
table_name: str,
table_description: str,
columns: list[Any],
schema_name: str,
database_name: str,
) -> str:
"""Build the embedding text for a Table node.

Returns just the text string; the caller is responsible for wrapping it
in an embed-row dict via :func:`_create_row`.
"""
text = f"db_name: {database_name}" f", schema_name: {schema_name}" f", table_name: {table_name}"
if table_description:
text += f", table_description: {table_description}"

column_pieces: list[str] = []
for column in columns:
column_name = column.get("column_name", "")
data_type = "" if pd.isna(v := column.get("data_type")) else str(v).strip()
piece = f"{{name: {column_name}, data_type: {data_type}"

column_description = "" if pd.isna(v := column.get("description")) else str(v).strip()
if column_description:
piece += f", description: {column_description}"
piece += "}"
column_pieces.append(piece)

text += f", columns: {','.join(column_pieces)}"
return text


def _create_column_text(
*,
column_name: str,
column_description: str,
data_type: str,
sample_values: list[Any],
table_name: str,
schema_name: str,
database_name: str,
) -> str:
"""Build the embedding text for a Column node.

Returns just the text string; the caller is responsible for wrapping it
in an embed-row dict via :func:`_create_row`.
"""
text = (
f"db_name: {database_name}"
f", schema_name: {schema_name}"
f", table_name: {table_name}"
f", column_name: {column_name}"
f", data_type: {data_type}"
)
if column_description:
text += f", column_description: {column_description}"
if len(sample_values) > 0:
text += f", sample_values: {', '.join(str(x) for x in sample_values)}"
return text


def _create_row(
*,
text: str,
node_id: str | None,
label: str,
name: str,
schema_name: str,
database_name: str,
) -> dict[str, Any]:
path = f"neo4j:{node_id}" if node_id else "neo4j:unknown"
# Nest tabular identifiers under content_metadata so they survive the
# IngestVdbOperator → LanceDB write path (which only persists
# content_metadata + source_metadata into the table's metadata column).
# Top-level copies are kept for any in-memory consumer of this DataFrame.
tabular_fields = {
"id": node_id,
"label": label,
"name": name,
"source_path": path,
"schema_name": schema_name,
"database_name": database_name,
}
return {
"text": text.strip(),
"_embed_modality": "text",
"path": path,
"page_number": -1,
"metadata": {
**tabular_fields,
"content_metadata": dict(tabular_fields),
},
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@ class TabularSchemaExtractOp(AbstractOperator, CPUOperator):
connector stored in *tabular_params*.
2. Write the extracted entities as graph nodes and relationships into Neo4j.

The operator produces an empty DataFrame as output so it can be chained
with downstream operators (e.g. :class:`TabularFetchEmbeddingsOp`) via
``>>``. All meaningful state lives in Neo4j after this step.
The operator returns ``(tables_df, columns_df)`` — concatenated across
every ingested :class:`Schema` — carrying the post-ingest UUIDs written
to Neo4j. The per-row ``table_schema`` column keeps schemas
distinguishable. Downstream operators (notably
:class:`TabularFetchEmbeddingsOp`) can build embedding text directly
from this pair without a Neo4j round-trip.

Returns ``(empty_df, empty_df)`` when there is nothing to ingest, so
the chain still flows.
"""

def __init__(
Expand All @@ -43,15 +49,26 @@ def preprocess(self, data: Any, **kwargs: Any) -> TabularExtractParams | None:
return data
return self._tabular_params

def process(self, data: TabularExtractParams | None, **kwargs: Any) -> pd.DataFrame:
def process(self, data: TabularExtractParams | None, **kwargs: Any) -> tuple[pd.DataFrame, pd.DataFrame]:
from nemo_retriever.tabular_data.ingestion.extract_data import (
extract_tabular_db_data,
store_relational_db_in_neo4j,
)

empty = (pd.DataFrame(), pd.DataFrame())
if data is None or data.connector is None:
return empty

schema_data = extract_tabular_db_data(params=data)
store_relational_db_in_neo4j(data=schema_data, dialect=data.connector.dialect)
return pd.DataFrame()
schemas = store_relational_db_in_neo4j(data=schema_data, dialect=data.connector.dialect) or {}
if not schemas:
return empty

tables = [s.tables_df for s in schemas.values() if s.tables_df is not None]
columns = [s.columns_df for s in schemas.values() if s.columns_df is not None]
tables_df = pd.concat(tables, ignore_index=True) if tables else pd.DataFrame()
columns_df = pd.concat(columns, ignore_index=True) if columns else pd.DataFrame()
return tables_df, columns_df

def postprocess(self, data: Any, **kwargs: Any) -> Any:
return data
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,22 @@ def store_relational_db_in_neo4j(data, dialect: str, num_workers: int = 4):
Args:
data: Data dict returned by extract_tabular_db_data().
dialect: SQL dialect used by the connector (e.g. "sqlite", "duckdb", "snowflake").
neo4j_conn: Active Neo4jConnectionManager instance (unused directly here;
populate_tabular_data uses its own DAL connection, but
accepted for API consistency with the other ingest steps).
num_workers: Worker count forwarded to populate_tabular_data.

Returns:
``{schema_name_lower: Schema}`` dict produced during ingestion, so
callers can recover the post-ingest ``tables_df`` / ``columns_df``
(with the UUIDs assigned to each Table/Column node) without a
round-trip back to Neo4j. Returns ``{}`` when *data* is empty.
"""
if not data:
return
return {}

from nemo_retriever.tabular_data.ingestion.write_to_graph import (
populate_tabular_data,
)

populate_tabular_data(
return populate_tabular_data(
data,
num_workers=num_workers,
dialect=dialect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ def populate_tabular_data(data, num_workers, dialect):

if tables_df is None or tables_df.empty:
logger.warning("No tables found in source database; skipping graph population.")
return
return {}

database = data["database_name"]
logger.info(f"Started parsing db {database}.")

all_schemas = {}
all_schemas = populate_db(tables_df, columns_df, database, num_workers)

if "fks" in data:
populate_fks(fks=data["fks"], database_name=database)

if "pks" in data:
populate_pks(pks=data["pks"], database_name=database)

if "queries" in data:
populate_queries(all_schemas, data["queries"], num_workers, dialect)

return []
return all_schemas
Comment on lines 46 to +57
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Existing tables get wrong UUIDs on re-ingestion, causing LanceDB duplicates

In the DB-update path (when existing_db_id is not None and loaded), populate_db returns schemas whose tables_df["id"] entries for already-existing tables still hold the freshly generated UUIDs produced by the Schema constructor — they are never backfilled with the original Neo4j node IDs. Only columns_to_update (columns in the intersection with changed properties) get their id replaced with the graph UUID inside accumulate_updated_column; no equivalent correction happens for existing tables.

As a result, TabularSchemaExtractOp now propagates these wrong IDs into tables_df, TabularFetchEmbeddingsOp embeds them as the id field, and UpsertVdbOperator uses them as the LanceDB merge key. Because LanceDB holds the original UUIDs from the first ingest, the upsert path inserts new rows for every already-existing table instead of updating the existing entries, creating duplicates on every re-ingestion run.

The existing-db path in populate_db needs to backfill new_schema.tables_df["id"] (and consistently, columns_df["id"]) from existing_schema before returning, so the UUIDs in the returned schemas match what Neo4j and LanceDB already know.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py
Line: 46-57

Comment:
**Existing tables get wrong UUIDs on re-ingestion, causing LanceDB duplicates**

In the DB-update path (when `existing_db_id is not None and loaded`), `populate_db` returns `schemas` whose `tables_df["id"]` entries for **already-existing tables** still hold the freshly generated UUIDs produced by the `Schema` constructor — they are never backfilled with the original Neo4j node IDs. Only `columns_to_update` (columns in the intersection with changed properties) get their `id` replaced with the graph UUID inside `accumulate_updated_column`; no equivalent correction happens for existing tables.

As a result, `TabularSchemaExtractOp` now propagates these wrong IDs into `tables_df`, `TabularFetchEmbeddingsOp` embeds them as the `id` field, and `UpsertVdbOperator` uses them as the LanceDB merge key. Because LanceDB holds the original UUIDs from the first ingest, the upsert path inserts **new rows** for every already-existing table instead of updating the existing entries, creating duplicates on every re-ingestion run.

The existing-db path in `populate_db` needs to backfill `new_schema.tables_df["id"]` (and consistently, `columns_df["id"]`) from `existing_schema` before returning, so the UUIDs in the returned `schemas` match what Neo4j and LanceDB already know.

How can I resolve this? If you propose a fix, please make it concise.



def populate_db(tables_df, columns_df, database, num_workers):
Expand Down
3 changes: 2 additions & 1 deletion nemo_retriever/src/nemo_retriever/vdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from nemo_retriever.vdb.adt_vdb import VDB
from nemo_retriever.vdb.factory import get_vdb_op_cls
from nemo_retriever.vdb.operators import IngestVdbOperator, RetrieveVdbOperator
from nemo_retriever.vdb.operators import IngestVdbOperator, PutVdbOperator, RetrieveVdbOperator
from nemo_retriever.vdb.records import RetrievalHit, normalize_retrieval_results, to_client_vdb_records
from nemo_retriever.vdb.sidecar_metadata import (
apply_sidecar_metadata_to_client_batches,
Expand All @@ -22,6 +22,7 @@
"VDB",
"get_vdb_op_cls",
"IngestVdbOperator",
"PutVdbOperator",
"RetrievalHit",
"RetrieveVdbOperator",
"normalize_retrieval_results",
Expand Down
Loading
Loading