Replace ingest input-type routing with manifest branches#2095
Conversation
Greptile SummaryReplaces the root CLI
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/ingest_manifest.py | New module: manifest planner that classifies input paths by file family and emits deterministic extraction branch plans. Logic is clean, dataclasses are frozen, and branch ordering follows _BRANCH_SPECS definition order. |
| nemo_retriever/src/nemo_retriever/branch_extraction.py | New module: ExtractionBranchExecutor runs manifest-planned branches and unions outputs. Core logic is sound but five public module-level helpers lack docstrings, and the Ray schema short-circuit in normalize_ray_branch_datasets relies on an undocumented assumption. |
| nemo_retriever/src/nemo_retriever/graph_ingestor.py | Central orchestrator updated to route through manifest branches when extraction_mode is None. Branch/single/explicit paths are cleanly separated; image_only dedup-skip logic correctly handles single-branch manifests. |
| nemo_retriever/src/nemo_retriever/params/models.py | Removes table_structure_workers, table_structure_batch_size, table_structure_cpus_per_actor, and gpu_table_structure from BatchTuningParams without deprecation, breaking existing callers that configure table-structure concurrency tuning. |
| nemo_retriever/src/nemo_retriever/adapters/cli/sdk_workflow.py | Removes input_type, table_output_format, local_ingest_embed_backend, and all *_gpus_per_actor parameters from ingest_documents without a deprecation cycle; existing callers using these kwargs will receive TypeError at runtime. |
| nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py | Adds build_post_extract_graph and simplifies _append_ordered_transform_stages; hardcoded reshape_content_before_embed=True in the new helper is unnecessarily broad for text/HTML-only multi-branch cases. |
| nemo_retriever/src/nemo_retriever/graph/multi_type_extract_operator.py | Extension sets hardcoded instead of derived from INPUT_TYPE_EXTENSIONS; AUDIO_EXTENSIONS is a strict subset, creating divergence risk on the extraction_mode="auto" fallback path. |
| nemo_retriever/src/nemo_retriever/graph/executor.py | Cleanly splits ingest (materializes) from build_dataset (lazy Ray dataset) to support branch union in batch mode. |
| nemo_retriever/tests/test_ingest_manifest.py | New test file with 266 lines covering manifest building, branch planning, branch resolution defaults, and edge cases. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["GraphIngestor.ingest()"] --> B{extraction_mode set?}
B -- "Yes (explicit)" --> C["_resolve_effective_extraction_inputs()\nvalidate + pass through"]
B -- "No (auto)" --> D["build_input_manifest()\nclassify by extension"]
D --> E{How many families?}
E -- "1 family" --> F["_resolve_branch_extraction_inputs()\nsingle branch"]
E -- "N > 1 families" --> G["ExtractionBranchExecutor\n(manifest branches)"]
C --> H["_execute_single_graph()"]
F --> H
G --> I["Per-branch:\nbuild_graph(extraction_only)\nbuild_dataset (lazy)"]
I --> J["normalize_ray_branch_datasets\npad missing columns"]
J --> K["Dataset.union()"]
K --> L["build_post_extract_graph()\ndedup / caption / embed / store / vdb"]
L --> M["RayDataExecutor.ingest()\n→ to_pandas()"]
H --> N["build_graph(full)\nRayDataExecutor.ingest()\nor InprocessExecutor.ingest()"]
N --> O["_raise_for_stage_errors()"]
M --> O
O --> P["Result DataFrame"]
Prompt To Fix All With AI
Fix the following 5 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 5
nemo_retriever/src/nemo_retriever/params/models.py:252-255
**Breaking change: public `BatchTuningParams` fields removed**
`table_structure_workers`, `table_structure_batch_size`, `table_structure_cpus_per_actor`, and `gpu_table_structure` are removed from the public Pydantic model with no deprecation cycle. Users who currently pass any of these fields — whether constructing `BatchTuningParams(table_structure_workers=4, ...)` directly or loading from YAML — will get a Pydantic validation error (strict models) or silently lose their table-structure concurrency tuning (permissive models), with no migration path offered.
### Issue 2 of 5
nemo_retriever/src/nemo_retriever/adapters/cli/sdk_workflow.py:177-200
**`ingest_documents` public API broken without deprecation**
This PR removes `input_type`, `table_output_format`, `local_ingest_embed_backend`, `page_elements_gpus_per_actor`, `ocr_gpus_per_actor`, `embed_gpus_per_actor`, and the entire `table_structure_*` batch-tuning group from the `ingest_documents` function signature. Any existing caller that passes these keyword arguments — library users, harness scripts, CI pipelines — will hit a `TypeError: unexpected keyword argument`. Notably, removing `table_output_format="markdown"` also silently drops the `use_table_structure=True` injection it controlled, with no replacement surface exposed through this function.
### Issue 3 of 5
nemo_retriever/src/nemo_retriever/graph/multi_type_extract_operator.py:52-62
**Hardcoded extension sets diverge from `INPUT_TYPE_EXTENSIONS`**
`PDF_EXTENSIONS`, `TEXT_EXTENSIONS`, `HTML_EXTENSIONS`, `AUDIO_EXTENSIONS`, and `VIDEO_EXTENSIONS` are now hardcoded literals instead of derived from `INPUT_TYPE_EXTENSIONS`. Concretely, `AUDIO_EXTENSIONS = {".mp3", ".wav"}` is a subset — if `INPUT_TYPE_EXTENSIONS["audio"]` includes other formats (e.g., `.flac`, `.ogg`), the manifest planner routes those files to the audio branch, but `MultiTypeExtractOperator` (the `extraction_mode="auto"` compatibility path) will call `_unsupported_extension_message` on them. Any future extension added to `INPUT_TYPE_EXTENSIONS` must now be duplicated here.
### Issue 4 of 5
nemo_retriever/src/nemo_retriever/branch_extraction.py:218-294
**Public module functions missing docstrings**
`merge_node_overrides`, `concat_dataframes`, `normalize_ray_branch_datasets`, `ray_dataset_columns`, and `format_post_stage_summary` are all public module-level functions with no docstrings. Per the project's `docstrings-public-interface` rule, public functions must document their parameters, return values, and any exceptions raised. `normalize_ray_branch_datasets` in particular has a non-obvious short-circuit (when schema discovery is missing, it skips normalisation entirely to avoid pre-running extraction); this assumption should be captured in a docstring rather than only in a comment.
### Issue 5 of 5
nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py:536-567
**`build_post_extract_graph` hardcodes `reshape_content_before_embed=True`**
The new `build_post_extract_graph` helper always passes `reshape_content_before_embed=True`. For multi-branch ingest that includes only text or HTML families, this unconditionally inserts the content-reshape UDF before embed even though those branch outputs don't carry `page_elements_v3` or image columns. The reshape is additive and won't corrupt output, but it adds a spurious Ray Data stage and UDF overhead that `build_graph` avoids by gating on `extraction_mode in {"pdf", "image", "auto"}`. Consider deriving the flag from whether any branch has `family in {"pdf", "image"}`.
Reviews (1): Last reviewed commit: "Replace ingest input-type routing with m..." | Re-trigger Greptile
| detect_workers: Optional[int] = None | ||
| page_elements_cpus_per_actor: float = 1 | ||
| ocr_cpus_per_actor: float = 1 | ||
| table_structure_workers: Optional[int] = None | ||
| table_structure_batch_size: Optional[int] = None | ||
| table_structure_cpus_per_actor: float = 1 | ||
| embed_workers: Optional[int] = None |
There was a problem hiding this comment.
Breaking change: public
BatchTuningParams fields removed
table_structure_workers, table_structure_batch_size, table_structure_cpus_per_actor, and gpu_table_structure are removed from the public Pydantic model with no deprecation cycle. Users who currently pass any of these fields — whether constructing BatchTuningParams(table_structure_workers=4, ...) directly or loading from YAML — will get a Pydantic validation error (strict models) or silently lose their table-structure concurrency tuning (permissive models), with no migration path offered.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/params/models.py
Line: 252-255
Comment:
**Breaking change: public `BatchTuningParams` fields removed**
`table_structure_workers`, `table_structure_batch_size`, `table_structure_cpus_per_actor`, and `gpu_table_structure` are removed from the public Pydantic model with no deprecation cycle. Users who currently pass any of these fields — whether constructing `BatchTuningParams(table_structure_workers=4, ...)` directly or loading from YAML — will get a Pydantic validation error (strict models) or silently lose their table-structure concurrency tuning (permissive models), with no migration path offered.
How can I resolve this? If you propose a fix, please make it concise.| @@ -337,42 +189,35 @@ def ingest_documents( | |||
| ocr_lang: OcrLangValue | None = None, | |||
| graphic_elements_invoke_url: str | None = None, | |||
| table_structure_invoke_url: str | None = None, | |||
| table_output_format: TableOutputFormatValue | None = None, | |||
| embed_invoke_url: str | None = None, | |||
| embed_model_name: str | None = None, | |||
| local_ingest_embed_backend: LocalIngestEmbedBackendValue | None = None, | |||
| pdf_extract_workers: int | None = None, | |||
| pdf_extract_batch_size: int | None = None, | |||
| pdf_extract_cpus_per_task: float | None = None, | |||
| page_elements_workers: int | None = None, | |||
| page_elements_batch_size: int | None = None, | |||
| page_elements_cpus_per_actor: float | None = None, | |||
| page_elements_gpus_per_actor: float | None = None, | |||
| ocr_workers: int | None = None, | |||
There was a problem hiding this comment.
ingest_documents public API broken without deprecation
This PR removes input_type, table_output_format, local_ingest_embed_backend, page_elements_gpus_per_actor, ocr_gpus_per_actor, embed_gpus_per_actor, and the entire table_structure_* batch-tuning group from the ingest_documents function signature. Any existing caller that passes these keyword arguments — library users, harness scripts, CI pipelines — will hit a TypeError: unexpected keyword argument. Notably, removing table_output_format="markdown" also silently drops the use_table_structure=True injection it controlled, with no replacement surface exposed through this function.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/adapters/cli/sdk_workflow.py
Line: 177-200
Comment:
**`ingest_documents` public API broken without deprecation**
This PR removes `input_type`, `table_output_format`, `local_ingest_embed_backend`, `page_elements_gpus_per_actor`, `ocr_gpus_per_actor`, `embed_gpus_per_actor`, and the entire `table_structure_*` batch-tuning group from the `ingest_documents` function signature. Any existing caller that passes these keyword arguments — library users, harness scripts, CI pipelines — will hit a `TypeError: unexpected keyword argument`. Notably, removing `table_output_format="markdown"` also silently drops the `use_table_structure=True` injection it controlled, with no replacement surface exposed through this function.
How can I resolve this? If you propose a fix, please make it concise.| from nemo_retriever.video import dedup_video_frames | ||
| from nemo_retriever.video import video_asr_audio_chunk_params | ||
| from nemo_retriever.graph.designer import designer_component | ||
| from nemo_retriever.utils.input_files import INPUT_TYPE_EXTENSIONS | ||
| from nemo_retriever.utils.ray_resource_hueristics import gather_local_resources | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # Define file type mappings | ||
| PDF_EXTENSIONS = INPUT_TYPE_EXTENSIONS["pdf"] | INPUT_TYPE_EXTENSIONS["doc"] | ||
| TEXT_EXTENSIONS = INPUT_TYPE_EXTENSIONS["txt"] | ||
| HTML_EXTENSIONS = INPUT_TYPE_EXTENSIONS["html"] | ||
| AUDIO_EXTENSIONS = INPUT_TYPE_EXTENSIONS["audio"] | ||
| IMAGE_EXTENSIONS = INPUT_TYPE_EXTENSIONS["image"] | ||
| VIDEO_EXTENSIONS = INPUT_TYPE_EXTENSIONS["video"] | ||
| DEFAULT_AUDIO_SPLIT_INTERVAL = 500000 | ||
| DEFAULT_VIDEO_FRAME_FPS = 0.5 | ||
| PDF_EXTENSIONS = {".pdf", ".docx", ".pptx"} | ||
| TEXT_EXTENSIONS = {".txt"} | ||
| HTML_EXTENSIONS = {".html"} |
There was a problem hiding this comment.
Hardcoded extension sets diverge from
INPUT_TYPE_EXTENSIONS
PDF_EXTENSIONS, TEXT_EXTENSIONS, HTML_EXTENSIONS, AUDIO_EXTENSIONS, and VIDEO_EXTENSIONS are now hardcoded literals instead of derived from INPUT_TYPE_EXTENSIONS. Concretely, AUDIO_EXTENSIONS = {".mp3", ".wav"} is a subset — if INPUT_TYPE_EXTENSIONS["audio"] includes other formats (e.g., .flac, .ogg), the manifest planner routes those files to the audio branch, but MultiTypeExtractOperator (the extraction_mode="auto" compatibility path) will call _unsupported_extension_message on them. Any future extension added to INPUT_TYPE_EXTENSIONS must now be duplicated here.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/multi_type_extract_operator.py
Line: 52-62
Comment:
**Hardcoded extension sets diverge from `INPUT_TYPE_EXTENSIONS`**
`PDF_EXTENSIONS`, `TEXT_EXTENSIONS`, `HTML_EXTENSIONS`, `AUDIO_EXTENSIONS`, and `VIDEO_EXTENSIONS` are now hardcoded literals instead of derived from `INPUT_TYPE_EXTENSIONS`. Concretely, `AUDIO_EXTENSIONS = {".mp3", ".wav"}` is a subset — if `INPUT_TYPE_EXTENSIONS["audio"]` includes other formats (e.g., `.flac`, `.ogg`), the manifest planner routes those files to the audio branch, but `MultiTypeExtractOperator` (the `extraction_mode="auto"` compatibility path) will call `_unsupported_extension_message` on them. Any future extension added to `INPUT_TYPE_EXTENSIONS` must now be duplicated here.
How can I resolve this? If you propose a fix, please make it concise.| def merge_node_overrides( | ||
| derived_overrides: dict[str, dict[str, Any]], | ||
| explicit_overrides: dict[str, dict[str, Any]], | ||
| ) -> dict[str, dict[str, Any]]: | ||
| merged_overrides: dict[str, dict[str, Any]] = {} | ||
| for node_name in set(derived_overrides) | set(explicit_overrides): | ||
| merged_overrides[node_name] = { | ||
| **derived_overrides.get(node_name, {}), | ||
| **explicit_overrides.get(node_name, {}), | ||
| } | ||
| return merged_overrides | ||
|
|
||
|
|
||
| def concat_dataframes(frames: list[Any]) -> Any: | ||
| import pandas as pd | ||
|
|
||
| if not frames: | ||
| return pd.DataFrame(columns=["bytes", "path"]) | ||
| columns: list[str] = [] | ||
| seen: set[str] = set() | ||
| for frame in frames: | ||
| for column in frame.columns: | ||
| if column not in seen: | ||
| columns.append(column) | ||
| seen.add(column) | ||
| normalized = [frame.reindex(columns=columns) for frame in frames] | ||
| return pd.concat(normalized, ignore_index=True, sort=False) | ||
|
|
||
|
|
||
| def normalize_ray_branch_datasets(branch_datasets: list[Any]) -> list[Any]: | ||
| columns: list[str] = [] | ||
| seen: set[str] = set() | ||
| for dataset in branch_datasets: | ||
| dataset_columns = ray_dataset_columns(dataset) | ||
| if not dataset_columns: | ||
| # Avoid eager schema discovery: Ray computes missing schemas by | ||
| # executing a limit=1 plan, which pre-runs extraction branches. | ||
| return branch_datasets | ||
| for column in dataset_columns: | ||
| if column not in seen: | ||
| columns.append(column) | ||
| seen.add(column) | ||
| if not columns: | ||
| return branch_datasets | ||
| stable_columns = tuple(columns) | ||
| return [ | ||
| dataset.map_batches( | ||
| ensure_pandas_columns, | ||
| batch_format="pandas", | ||
| fn_kwargs={"columns": stable_columns}, | ||
| ) | ||
| for dataset in branch_datasets | ||
| ] | ||
|
|
||
|
|
||
| def ray_dataset_columns(dataset: Any) -> tuple[str, ...]: | ||
| try: | ||
| schema = dataset.schema(fetch_if_missing=False) | ||
| except TypeError: | ||
| schema = dataset.schema() | ||
| if schema is None: | ||
| return () | ||
| names = getattr(schema, "names", None) | ||
| if callable(names): | ||
| names = names() | ||
| if names is None: | ||
| base_schema = getattr(schema, "base_schema", None) | ||
| names = getattr(base_schema, "names", None) if base_schema is not None else None | ||
| if callable(names): | ||
| names = names() | ||
| if names is None: | ||
| return () | ||
| return tuple(str(name) for name in names) | ||
|
|
||
|
|
||
| def format_post_stage_summary(post_extract_order: tuple[str, ...]) -> str: | ||
| return ", ".join(post_extract_order) if post_extract_order else "none" |
There was a problem hiding this comment.
Public module functions missing docstrings
merge_node_overrides, concat_dataframes, normalize_ray_branch_datasets, ray_dataset_columns, and format_post_stage_summary are all public module-level functions with no docstrings. Per the project's docstrings-public-interface rule, public functions must document their parameters, return values, and any exceptions raised. normalize_ray_branch_datasets in particular has a non-obvious short-circuit (when schema discovery is missing, it skips normalisation entirely to avoid pre-running extraction); this assumption should be captured in a docstring rather than only in a comment.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/branch_extraction.py
Line: 218-294
Comment:
**Public module functions missing docstrings**
`merge_node_overrides`, `concat_dataframes`, `normalize_ray_branch_datasets`, `ray_dataset_columns`, and `format_post_stage_summary` are all public module-level functions with no docstrings. Per the project's `docstrings-public-interface` rule, public functions must document their parameters, return values, and any exceptions raised. `normalize_ray_branch_datasets` in particular has a non-obvious short-circuit (when schema discovery is missing, it skips normalisation entirely to avoid pre-running extraction); this assumption should be captured in a docstring rather than only in a comment.
How can I resolve this? If you propose a fix, please make it concise.| return graph | ||
|
|
||
|
|
||
| def build_post_extract_graph( | ||
| *, | ||
| dedup_params: Any | None = None, | ||
| embed_params: Any | None = None, | ||
| caption_params: Any | None = None, | ||
| store_params: Any | None = None, | ||
| vdb_upload_params: VdbUploadParams | None = None, | ||
| webhook_params: Any | None = None, | ||
| stage_order: tuple[str, ...] = (), | ||
| ) -> Graph: | ||
| """Build only the common stages that run after extraction branch union.""" | ||
|
|
||
| return _append_ordered_transform_stages( | ||
| Graph(), | ||
| dedup_params=dedup_params, | ||
| caption_params=caption_params, | ||
| store_params=store_params, | ||
| embed_params=embed_params, | ||
| vdb_upload_params=vdb_upload_params, | ||
| webhook_params=webhook_params, | ||
| stage_order=stage_order, | ||
| supports_dedup=True, | ||
| reshape_content_before_embed=True, | ||
| ) | ||
|
|
||
|
|
||
| def build_graph( | ||
| *, | ||
| execution_plan: IngestExecutionPlan | None = None, |
There was a problem hiding this comment.
build_post_extract_graph hardcodes reshape_content_before_embed=True
The new build_post_extract_graph helper always passes reshape_content_before_embed=True. For multi-branch ingest that includes only text or HTML families, this unconditionally inserts the content-reshape UDF before embed even though those branch outputs don't carry page_elements_v3 or image columns. The reshape is additive and won't corrupt output, but it adds a spurious Ray Data stage and UDF overhead that build_graph avoids by gating on extraction_mode in {"pdf", "image", "auto"}. Consider deriving the flag from whether any branch has family in {"pdf", "image"}.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py
Line: 536-567
Comment:
**`build_post_extract_graph` hardcodes `reshape_content_before_embed=True`**
The new `build_post_extract_graph` helper always passes `reshape_content_before_embed=True`. For multi-branch ingest that includes only text or HTML families, this unconditionally inserts the content-reshape UDF before embed even though those branch outputs don't carry `page_elements_v3` or image columns. The reshape is additive and won't corrupt output, but it adds a spurious Ray Data stage and UDF overhead that `build_graph` avoids by gating on `extraction_mode in {"pdf", "image", "auto"}`. Consider deriving the flag from whether any branch has `family in {"pdf", "image"}`.
How can I resolve this? If you propose a fix, please make it concise.
Summary
Replaces PR 2068's root CLI input-type routing with manifest-planned extraction branches inside
GraphIngestor.This keeps
retriever ingestas the simple auto-routing entrypoint, preserves explicit SDK andretriever pipeline runtyped paths, and keeps PDF-only ingest on the dedicated PDF graph.Motivation
The BMP/TIFF release bug is a silent-success data-loss issue: direct BMP/TIFF ingest can return rows, but without
page_image, detections, or OCR text.PR 2068 fixed the BMP/TIFF behavior functionally, but its routing shape was too broad for the release path. In particular, default ingest could be routed through multi-type dispatch instead of the dedicated PDF graph, creating avoidable PDF-only performance risk.
This PR replaces that approach with a manifest planner:
Changes
retriever ingest --input-type.extraction_mode="auto"as compatibility fallback toMultiTypeExtractOperator.PDFExtractionCPUActor.Validation
Focused tests:
Smoke/perf validation:
page_image.image_b64and detections or OCR text.MultiTypeExtract.MultiTypeExtractNotes
This PR intentionally excludes #2089's single-GPU embed resource tuning. That tuning exists on
mainand materially improves PDF-only bo767 throughput, but this release PR is scoped to replacing PR 2068's routing architecture and fixing the BMP/TIFF ingest bug.Checklist