Update embeddings for table and column by id (with verified)#2049
Update embeddings for table and column by id (with verified)#2049DinaLaptii wants to merge 11 commits into
Conversation
Greptile SummaryThis PR eliminates the Neo4j round-trip in the tabular embedding pipeline by threading
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py | Major refactor: operator now consumes (tables_df, columns_df) from upstream instead of querying Neo4j; adds schema-keyed column bucketing and _build_rows row builder. Logic appears correct but lacks unit tests. |
| nemo_retriever/src/nemo_retriever/graph/tabular_schema_extract_operator.py | Return type changed from pd.DataFrame to tuple[pd.DataFrame, pd.DataFrame]; now passes post-ingest UUIDs downstream without a Neo4j round-trip. Missing unit tests for the new tuple output. |
| nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py | store_relational_db_in_neo4j now propagates the schemas dict from populate_tabular_data. Missing return type annotation on the changed function signature. |
| nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py | populate_tabular_data now returns all_schemas dict instead of []. Dead assignment (all_schemas = {}) removed cleanly. Missing return type annotation. |
| nemo_retriever/src/nemo_retriever/vdb/adt_vdb.py | Adds non-abstract put() with NotImplementedError stub; design rationale for omitting @AbstractMethod is documented. Base class signature annotated correctly. |
| nemo_retriever/src/nemo_retriever/vdb/lancedb.py | Adds id field to schema, extracts row_id from content/source metadata, and implements put() with strict update-only semantics via merge_insert. records parameter missing type annotation. |
| nemo_retriever/src/nemo_retriever/vdb/operators.py | Adds PutVdbOperator extending IngestVdbOperator; constructor guard correctly detects un-overridden put() stubs. Sidecar path reused from parent cleanly. |
| nemo_retriever/src/nemo_retriever/vdb/init.py | PutVdbOperator added to public exports and all. Straightforward additive change. |
| nemo_retriever/tests/test_nv_ingest_vdb_operator.py | Adds three meaningful PutVdbOperator tests: guard-rejects-stub, happy-path delegation, and sidecar-merge path. _StubPutVDB correctly verifies VDB.put is not @AbstractMethod. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[TabularSchemaExtractOp.process] --> B[store_relational_db_in_neo4j]
B --> C[populate_tabular_data returns schemas dict]
C --> D[Concat tables_df + columns_df with UUIDs]
D --> E[Return tuple to downstream]
E --> F[TabularFetchEmbeddingsOp.process]
F --> G{Input is 2-tuple?}
G -- Yes --> H[_build_rows: schema-keyed column buckets]
G -- No --> I[Fallback: fetch from database]
H --> J[DataFrame with text + metadata]
I --> J
J --> K[Embedding step]
K --> L[PutVdbOperator.process]
L --> M[LanceDB.put: validate all rows exist]
M --> N[merge_insert when_matched_update_all]
Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
nemo_retriever/src/nemo_retriever/vdb/lancedb.py:540-545
**Missing type annotation on `records` parameter**
`LanceDB.put()` drops the `records: list` annotation that the base class declares (`VDB.put(self, records: list, **kwargs: Any)`). Any type checker will treat `records` as `Any` here, defeating the coverage the base-class annotation was meant to provide. Add `records: list` to match the parent signature.
### Issue 2 of 3
nemo_retriever/src/nemo_retriever/tabular_data/ingestion/extract_data.py:57
**Missing return type annotation on `store_relational_db_in_neo4j` and `populate_tabular_data`**
Both functions now return a meaningful `dict[str, Any]` (the `{schema_name_lower: Schema}` map) instead of nothing, but neither has a return type annotation. This affects `store_relational_db_in_neo4j` here and `populate_tabular_data` in `write_to_graph.py`. Without annotations, type checkers can't validate how callers use the returned dict, and the contract is invisible to readers. Add `-> dict[str, Any]` (or the concrete `Schema` type if it's exported) to both signatures.
### Issue 3 of 3
nemo_retriever/src/nemo_retriever/graph/tabular_fetch_embeddings_operator.py:63-79
**No unit tests for the new operator logic in `TabularFetchEmbeddingsOp` or `TabularSchemaExtractOp`**
`TabularFetchEmbeddingsOp.process()` gained a large new code path — including tuple unpacking, the `_build_rows` row-builder, schema-keyed column bucketing, `pd.isna` guards, and the fallback to fetching from the DB. `TabularSchemaExtractOp.process()` changed its return type from `pd.DataFrame` to `tuple[pd.DataFrame, pd.DataFrame]`. Neither operator has tests in the PR. `test_tabular_pipeline.py` covers extraction helpers but not the operator layer. The column-bucketing logic (unique `table_schema + table_name` keys), the empty-input sentinel, and the fallback-to-DB path are all meaningful edge cases that belong in tests.
Reviews (10): Last reviewed commit: "style(vdb): apply pre-commit formatting ..." | Re-trigger Greptile
| if records and any(batch for batch in records): | ||
| self._vdb.upsert(records, table_name=self._table_name, key=self._key) | ||
| return data |
There was a problem hiding this comment.
The return value from
vdb.upsert() carries the per-call counts (upserted, skipped_no_key, created_table, etc.) that are documented in LanceDB.upsert. Silently discarding them makes it impossible to observe incremental-update behaviour without attaching a debugger. At minimum the result should be logged at DEBUG level.
| if records and any(batch for batch in records): | |
| self._vdb.upsert(records, table_name=self._table_name, key=self._key) | |
| return data | |
| if records and any(batch for batch in records): | |
| result = self._vdb.upsert(records, table_name=self._table_name, key=self._key) | |
| logger.debug("UpsertVdbOperator: upsert result=%s", result) | |
| return data |
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/vdb/operators.py
Line: 187-189
Comment:
The return value from `vdb.upsert()` carries the per-call counts (`upserted`, `skipped_no_key`, `created_table`, etc.) that are documented in `LanceDB.upsert`. Silently discarding them makes it impossible to observe incremental-update behaviour without attaching a debugger. At minimum the result should be logged at DEBUG level.
```suggestion
if records and any(batch for batch in records):
result = self._vdb.upsert(records, table_name=self._table_name, key=self._key)
logger.debug("UpsertVdbOperator: upsert result=%s", result)
return data
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
not sure that i should to add logs here
- TabularFetchEmbeddingsOp: replace TypeError on non-tuple input with a call to fetch_tabular_embedding_dataframe so the pipeline can still produce embedding rows from Neo4j when upstream did not pass (tables_df, columns_df). - Add nemo_retriever.tabular_data.ingestion.embeddings module that queries Neo4j for Table/Column docs and returns an embedding-ready DataFrame matching the unstructured pipeline format. - lancedb: drop stale comment in _create_lancedb_results.
| pa.field("source", pa.string()), | ||
| # Stable entity id (e.g. Neo4j node UUID for tabular ingest). | ||
| # Empty string when the upstream payload has no stable identity; | ||
| # used as the merge key by ``LanceDB.upsert``. | ||
| pa.field("id", pa.string()), |
There was a problem hiding this comment.
Schema migration required for existing LanceDB tables
Adding "id" to _lancedb_arrow_schema means every call to create_index with overwrite=False (the default) will hit _validate_append_schema, which raises ValueError: "is missing required field 'id'; use overwrite=True to replace the table." for any table created before this PR. Any existing deployment that does not pass overwrite=True will fail on its next ingestion run. There is no migration path or backward-compat guard provided here.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/vdb/lancedb.py
Line: 121-125
Comment:
**Schema migration required for existing LanceDB tables**
Adding `"id"` to `_lancedb_arrow_schema` means every call to `create_index` with `overwrite=False` (the default) will hit `_validate_append_schema`, which raises `ValueError: "is missing required field 'id'; use overwrite=True to replace the table."` for any table created before this PR. Any existing deployment that does not pass `overwrite=True` will fail on its next ingestion run. There is no migration path or backward-compat guard provided here.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
it can be an issue. we must check it
| all_schemas = populate_db(tables_df, columns_df, database, num_workers) | ||
|
|
||
| if "fks" in data: | ||
| populate_fks(fks=data["fks"], database_name=database) | ||
|
|
||
| if "pks" in data: | ||
| populate_pks(pks=data["pks"], database_name=database) | ||
|
|
||
| if "queries" in data: | ||
| populate_queries(all_schemas, data["queries"], num_workers, dialect) | ||
|
|
||
| return [] | ||
| return all_schemas |
There was a problem hiding this comment.
Existing tables get wrong UUIDs on re-ingestion, causing LanceDB duplicates
In the DB-update path (when existing_db_id is not None and loaded), populate_db returns schemas whose tables_df["id"] entries for already-existing tables still hold the freshly generated UUIDs produced by the Schema constructor — they are never backfilled with the original Neo4j node IDs. Only columns_to_update (columns in the intersection with changed properties) get their id replaced with the graph UUID inside accumulate_updated_column; no equivalent correction happens for existing tables.
As a result, TabularSchemaExtractOp now propagates these wrong IDs into tables_df, TabularFetchEmbeddingsOp embeds them as the id field, and UpsertVdbOperator uses them as the LanceDB merge key. Because LanceDB holds the original UUIDs from the first ingest, the upsert path inserts new rows for every already-existing table instead of updating the existing entries, creating duplicates on every re-ingestion run.
The existing-db path in populate_db needs to backfill new_schema.tables_df["id"] (and consistently, columns_df["id"]) from existing_schema before returning, so the UUIDs in the returned schemas match what Neo4j and LanceDB already know.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/tabular_data/ingestion/write_to_graph.py
Line: 46-57
Comment:
**Existing tables get wrong UUIDs on re-ingestion, causing LanceDB duplicates**
In the DB-update path (when `existing_db_id is not None and loaded`), `populate_db` returns `schemas` whose `tables_df["id"]` entries for **already-existing tables** still hold the freshly generated UUIDs produced by the `Schema` constructor — they are never backfilled with the original Neo4j node IDs. Only `columns_to_update` (columns in the intersection with changed properties) get their `id` replaced with the graph UUID inside `accumulate_updated_column`; no equivalent correction happens for existing tables.
As a result, `TabularSchemaExtractOp` now propagates these wrong IDs into `tables_df`, `TabularFetchEmbeddingsOp` embeds them as the `id` field, and `UpsertVdbOperator` uses them as the LanceDB merge key. Because LanceDB holds the original UUIDs from the first ingest, the upsert path inserts **new rows** for every already-existing table instead of updating the existing entries, creating duplicates on every re-ingestion run.
The existing-db path in `populate_db` needs to backfill `new_schema.tables_df["id"]` (and consistently, `columns_df["id"]`) from `existing_schema` before returning, so the UUIDs in the returned `schemas` match what Neo4j and LanceDB already know.
How can I resolve this? If you propose a fix, please make it concise.…into feature/update-emmdedding-for-table-and-column-by-id
| pa.field("source", pa.string()), | ||
| # Stable entity id (e.g. Neo4j node UUID for tabular ingest). | ||
| # Empty string when the upstream payload has no stable identity; | ||
| # used as the merge key by ``LanceDB.upsert``. | ||
| pa.field("id", pa.string()), |
There was a problem hiding this comment.
it can be an issue. we must check it
Rename `VDB.upsert` / `UpsertVdbOperator` to `VDB.put` / `PutVdbOperator` and tighten the contract to an in-place replace: missing keys and rows not already present in the target table now raise `KeyError` instead of being inserted, and `put` no longer creates tables on the fly. Updates the LanceDB implementation and the operator tests accordingly.
Add PutVdbOperator to the public API alongside IngestVdbOperator and RetrieveVdbOperator so callers using the standard package import path (`from nemo_retriever.vdb import PutVdbOperator`) can access it.
Collapse the multi-line NotImplementedError raise into a single line to match what the pre-commit hook produces, unblocking CI.
Description
Checklist