Streaming VDB writes#2041
Conversation
Greptile SummaryThis PR converts the LanceDB upload step from a memory-intensive global-batch write (collect entire embedded corpus → single worker → reshape → driver pull) to a streaming per-block write, achieving a 70% peak-memory reduction and 17% wall-clock speedup on the benchmark host.
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/vdb/lancedb.py | Adds append() and build_index() streaming methods; removes legacy build_index constructor param but doesn't guard against it landing in **kwargs and shadowing the new method via VDB.__dict__.update. |
| nemo_retriever/src/nemo_retriever/vdb/operators.py | Switches IngestVdbOperator from global-batch VDB.run() to per-batch VDB.append(); adds overwrite-flag handshake, accounting-column projection, and _vdb_uploadable flag column. |
| nemo_retriever/src/nemo_retriever/pipeline/ingest_result.py | New module wrapping ingest output in a streaming-friendly accessor; correctly avoids Dataset.to_pandas() for all public accessors. |
| nemo_retriever/src/nemo_retriever/graph_ingestor.py | Adds _finalize_vdb_upload driver-side hook that constructs a fresh VDB instance and calls build_index() once after the graph finishes streaming writes. |
| nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py | Pins IngestVdbOperator concurrency to 1 (required for overwrite-flag correctness) and sets a default batch_size=1000 to reduce Lance manifest commit count. |
| nemo_retriever/src/nemo_retriever/graph/executor.py | Replaces ds.to_pandas() with ds.materialize(), keeping block refs on the driver instead of pulling the full corpus into a single DataFrame. |
| nemo_retriever/tests/test_lancedb_streaming_append.py | New integration test pinning the streaming round-trip and connection-caching invariants against a real lancedb instance. |
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
nemo_retriever/src/nemo_retriever/vdb/lancedb.py:354
**Legacy `build_index` kwarg still shadows the new `build_index()` method**
`VDB.__init__` performs `self.__dict__.update(kwargs)`. Any unknown kwarg surviving to that call becomes an instance attribute with the same name. The old `LanceDB` constructor accepted `build_index: bool | None = None` as an explicit parameter; the new one removes it — but leaves no `kwargs.pop("build_index", None)` guard before `super().__init__(**kwargs)`.
Consequence: a caller using the old API (`LanceDB(build_index=False)`) passes that key through `**kwargs` → `VDB.__init__(build_index=False)` → `self.build_index = False`. Python's attribute lookup then resolves the instance dict before the class, so `vdb.build_index` is the boolean `False` and `GraphIngestor._finalize_vdb_upload()` raises `TypeError: 'bool' object is not callable` on `vdb.build_index()`.
Reviews (4): Last reviewed commit: "fix test" | Re-trigger Greptile
Description
Users reported batch mode ingest chewing up 400 GB+. Profiling on bo767 reproduced a 200 GB peak.
The upload step was the bottleneck. It waited for the whole embedded corpus, then handed it to one worker that held it through several reshape passes simultaneously. The driver also pulled a full copy back at the end. Add in stale workers still holding their old memory, and you hit the huge memory peak.
This PR fixes the issue by making the upload streaming. Each block writes itself to LanceDB. The first block creates the table, and all the rest only append. The search index is built once on the driver after the graph finishes.
Measured on bo767, same host:
MemAvailabledeltaChecklist