feat: resume interrupted dataset generation runs (sync + async engine)#526
feat: resume interrupted dataset generation runs (sync + async engine)#526przemekboruta wants to merge 33 commits intoNVIDIA-NeMo:mainfrom
Conversation
Greptile SummaryThis PR adds a
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Core resume logic: adds config compatibility check, per-path resume branching, precomputed row-group lists for non-aligned extensions, filesystem-based initial counters, and incremental metadata writes; all previously flagged crash-window and extension-alignment issues appear addressed. |
| packages/data-designer-engine/src/data_designer/engine/storage/artifact_storage.py | Adds ResumeMode enum, resume field, resolved_dataset_name resume semantics, clear_partial_results(), and refresh_media_storage_path(); all edge cases (empty dir, missing dir, IF_POSSIBLE downgrade) handled correctly. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py | start() gains start_batch, initial_actual_num_records, num_records_list, and original_target_num_records params; all backward-compatible with defaults; incremental metadata now includes original_target_num_records for multi-run tracking. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py | RowGroupBufferManager gains initial_actual_num_records and initial_total_num_batches constructor params to seed counters on resume; write_metadata gains original_target_num_records param; straightforward and correct. |
| packages/data-designer/src/data_designer/interface/data_designer.py | create() and _create_resource_provider() thread resume through to ArtifactStorage and builder.build(); ResumeMode is exported in init.py; clean pass-through with no logic added at this layer. |
| packages/data-designer/src/data_designer/cli/commands/create.py | Adds --resume / -r CLI option with case-insensitive ResumeMode parsing; default is NEVER; forwarded correctly through GenerationController. |
| packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py | Comprehensive test coverage for all resume modes, crash windows, extension alignment, already-complete detection, config compatibility, and processor gating; matches all scenarios described in the PR. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["build(resume=X)"] --> B["_check_resume_config_compatibility()"]
B --> C{compat?}
C -- "ALWAYS + INCOMPATIBLE" --> D["raise DatasetGenerationError"]
C -- "IF_POSSIBLE + INCOMPATIBLE/NO_PRIOR" --> E["resume=NEVER\npop cache\nrefresh_media_storage_path()"]
C -- "IF_POSSIBLE + COMPATIBLE" --> F["resume=ALWAYS\npop cache"]
C -- "ALWAYS + COMPATIBLE" --> G["continue with ALWAYS"]
E --> H["_write_builder_config()"]
F --> H
G --> H
H --> I{metadata.json\nexists?}
I -- "No (ALWAYS)" --> J["clear_partial_results()\nresume=NEVER"]
I -- "Yes" --> K{async engine?}
J --> K
K -- "Yes" --> L["_build_async()"]
K -- "No + ALWAYS" --> M["_build_with_resume()"]
K -- "No + NEVER" --> N["sync batch loop"]
L --> O{resume\n== ALWAYS?}
O -- "Yes" --> P["_load_resume_state()\n_find_completed_row_group_ids()\nprecompute row groups\nclear_partial_results()"]
O -- "No" --> Q["fresh async run"]
P --> R{already\ncomplete?}
R -- "Yes" --> S["return False"]
R -- "No" --> T["schedule remaining row groups\nwrite incremental metadata"]
M --> U["_load_resume_state()\ncompute original+extension sizes\nbatch_manager.start()"]
U --> V{already\ncomplete?}
V -- "Yes" --> S
V -- "No" --> W["run remaining batches"]
S --> X["skip run_after_generation"]
T --> Y["return True"]
W --> Y
N --> Y
Q --> Y
Y --> Z["run_after_generation\nreturn final_dataset_path"]
Reviews (28): Last reviewed commit: "fix(engine): preserve original_target_nu..." | Re-trigger Greptile
…set already complete _build_with_resume and _build_async now return False when the dataset is already complete (early-return path), True otherwise. build() skips _processor_runner.run_after_generation() on False, preventing processors from calling shutil.rmtree and rewriting an already-finalized dataset. Fixes the issue raised in review: greptile P1 comment on PR NVIDIA-NeMo#526.
|
Issue #525 has been triaged. The linked issue check is being re-evaluated. |
Code Review: PR #526 — Resume interrupted dataset generation runs (sync + async engine)SummaryThis PR adds a Scope: ~860 additions, ~16 deletions across 10 files (including a plan doc and comprehensive tests). The feature is well-designed: it leverages existing FindingsHigh Severity(H1) Medium Severity(M1) (M2) sum(min(buffer_size, num_records - rg_id * buffer_size) for rg_id in completed_ids)This formula assumes each row group was written with exactly (M3) Low Severity(L1) Plan/implementation divergence: async engine support (L2) (L3) Incremental metadata writes add I/O overhead to async engine (L4) Test file has mid-file imports (L5) No validation of Positive Observations
VerdictApprove with suggestions. The implementation is solid, well-tested, and handles edge cases thoughtfully. The high-severity finding (H1) is a readability/maintainability concern rather than a correctness bug — the discarded return value works because |
|
Want your agent to iterate on Greptile's feedback? Try greploops. |
|
cc @johnnygreco @andreatgretel Suggestion: add an
|
| State on disk | ResumeMode.NEVER |
ResumeMode.ALWAYS |
ResumeMode.IF_POSSIBLE |
|---|---|---|---|
| Folder missing or empty | create (timestamp on collision elsewhere) | raise | create in dataset_name |
metadata.json present, compatible |
timestamp-suffix new folder | resume | resume |
metadata.json present, incompatible |
timestamp-suffix new folder | raise | raise |
Folder has data but no metadata.json |
timestamp-suffix new folder | raise | raise |
The crucial line is the third one: under IF_POSSIBLE, an incompatible-config case must raise, not silently start fresh. Silently overwriting a folder that belongs to a different config is worse than failing loudly. The whole point of IF_POSSIBLE is "I might be a retry of myself" — if the hash says it isn't, the folder belongs to an unrelated run that happened to land on the same dataset_name, and the right response is to surface that collision rather than paper over it.
Implementation sketch
- Define
ResumeMode(probably indata_designer.config) and re-export from the public package. ArtifactStorage: changeresume: booltoresume: ResumeMode. Inresolved_dataset_name, theIF_POSSIBLEbranch returnsdataset_nameunchanged whether or not the folder currently exists, and never raises on missing/empty folders.DatasetBuilder._load_resume_state:- Compare the persisted
config_hash(Add a deterministic hash to uniquely identify a workflow config #584) against the current invocation's hash; mismatch → raise a clear "config drift" error. - In
IF_POSSIBLEmode,FileNotFoundError→ "no resume state" → fresh start. Hash mismatch and corrupt metadata still raise (same asALWAYS).
- Compare the persisted
- Tests should cover:
- All four state-on-disk cases × all three
ResumeModevalues - The hash-mismatch error path for
ALWAYSandIF_POSSIBLE - String coercion via
StrEnum(resume="if_possible"resolves toResumeMode.IF_POSSIBLE) so config-driven callers stay ergonomic
- All four state-on-disk cases × all three
Other considerations
- Concurrency.
IF_POSSIBLEplus a shareddataset_nameacross two concurrent processes is a race. Worth documenting that resume assumes a single writer perdataset_name. A lockfile in the dataset folder would make this enforceable, but is probably a separate piece of work. - Cleanup semantics.
clear_partial_results()should fire inIF_POSSIBLEmode the same way it does inALWAYS— partial results from a previous interrupted run shouldn't leak into the resumed (or fresh) run.
Related cleanups (separate from the API change)
While reading the PR, two small things stood out that are worth a follow-up regardless of the tri-state proposal:
- The partial-completion warning at the end of
_build_asyncis unreachable because of thereturn Trueimmediately above it. Moving the warning above the return restores user-visible feedback for incomplete async runs. _load_resume_stateraisesDatasetGenerationErrorfrom aFileNotFoundErrorwithoutfrom exc, dropping the original traceback. Chaining it would help future debugging.
|
Thank you for taking this on, the plan in One thought is that we could try doing checkpointing on a task level already. However, that would need a sidecar format (parquet only wants whole row groups), concurrency-safe writes from many parallel asyncio tasks, and a |
|
Thanks everyone for the thorough review — really useful catches across the board. Here's what was addressed:
PR description updated to reflect the changed semantics. |
Empty directory (crash between mkdir and first file write) was treated as compatible — _check_resume_config_compatibility returned True, IF_POSSIBLE upgraded to ALWAYS, which then raised ArtifactStorageError. Fix: treat empty directory the same as missing — return False from _check_resume_config_compatibility when any(dir.iterdir()) is False. Test: test_if_possible_starts_fresh_when_directory_is_empty
…int mismatch ResumeMode.ALWAYS was documented to raise when column/model config changed, but _check_resume_config_compatibility() was only called in the IF_POSSIBLE branch. A user resuming with ALWAYS after changing the config would silently mix records from two different configs. Fix: - Refactor _check_resume_config_compatibility() to return _ConfigCompatibility enum (COMPATIBLE / INCOMPATIBLE / NO_PRIOR_DATASET) instead of bool so callers can distinguish 'no prior run' from 'configs differ' - Call the check for both ALWAYS and IF_POSSIBLE before _write_builder_config() - ALWAYS + INCOMPATIBLE → DatasetGenerationError - IF_POSSIBLE + INCOMPATIBLE → silent fresh start (existing behaviour) - IF_POSSIBLE + NO_PRIOR_DATASET → silent fresh start (existing behaviour) Test: test_build_resume_always_raises_on_config_mismatch
nabinchha
left a comment
There was a problem hiding this comment.
Thanks for grinding through this one, @przemekboruta — the ResumeMode enum, crash-window reconciliation, and the _ConfigCompatibility tri-state are all real improvements over the earlier rounds. Most of what's left is small follow-through from the recent iterations (stale strings/docstrings) plus a couple of behavior gaps that are worth catching before merge.
Summary
Adds resume: ResumeMode (NEVER / ALWAYS / IF_POSSIBLE) to DataDesigner.create() and DatasetBuilder.build(). Sync resume reads metadata.json for batch progress; async resume reconciles metadata.json with a filesystem scan of parquet-files/ and seeds the row-group buffer with the original target so non-aligned runs extend correctly. IF_POSSIBLE uses DataDesignerConfig.fingerprint() to silently fall back to a fresh run on config drift; ALWAYS raises. The implementation matches the PR description.
Findings
Critical — Let's fix these before merge
Direct collision with #540 (feat(results): add export() method and --output-format CLI flag)
-
What: Commit
0bdf24abon this branch introduces an early version ofDatasetCreationResults.export()plus the--output-format/-fCLI flag. That feature has its own dedicated PR — #540, also by @przemekboruta, branchfeat/dataset-export, currently open — which has already been through review rounds with @andreatgretel and @nabinchha and contains a meaningfully more developed implementation:- Streaming export (memory proportional to one batch, not the full dataset) — #526's commit calls
self.load_dataset()which materializes everything in memory. count_records()helper usingpq.read_metadataso the CLI doesn't OOM just to print a row count — #526 has none of this.- Schema unification with
pa.unify_schemas(promote_options="permissive")for parquet batches with type drift — #526 doesn't handle this. InvalidFileFormatError(project-canonical) instead ofValueError.click.Choice(SUPPORTED_EXPORT_FORMATS)for parse-time CLI validation, better--help, and tab completion.- Extension-based format inference (
.jsonl/.csv/.parquet) with explicit override. - ~15 tests covering streaming, schema unification, extension inference, error paths, controller-level happy/sad paths, etc.
Both PRs touch the same files (
packages/data-designer/src/data_designer/interface/results.py,cli/commands/create.py,cli/controllers/generation_controller.py, plus their tests) — whichever merges second will conflict, and if #526 lands first it will overwrite the older shape intomain, forcing #540 into a redo just to ship its memory-safety improvements. - Streaming export (memory proportional to one batch, not the full dataset) — #526's commit calls
-
Why: This isn't just scope creep — it's a guaranteed merge conflict and a regression risk. The version of
export()shipping in #526 has none of the OOM-safety, schema-unification, or canonical-error-type work that #540 has been iterating on for weeks. Users on the eventual release would land on the inferior implementation purely because of merge ordering. -
Suggestion: Drop commit
0bdf24abfrom this PR —git rebase -i origin/mainand removing that single commit (and its companion test edits) should be clean since the resume work doesn't depend on it. Let #540 ship the export feature on its own merits. Coordinate with @nabinchha / @andreatgretel on the sequencing if needed, but #540 is the right home for those changes and #526 should focus solely on resume (#525).
packages/data-designer/src/data_designer/cli/commands/create.py:13-49 and packages/data-designer/src/data_designer/cli/controllers/generation_controller.py:113-151 — CLI create doesn't surface resume
- What:
DataDesigner.create()acceptsresume: ResumeModeandResumeModeis exported fromdata_designer.interface, butdata-designer createhas no--resumeflag andGenerationController.run_create()never passesresume=through. CLI users get no way to resume an interrupted run. - Why: Resume's whole value proposition is "you don't have to redo work after a crash" — but the CLI is exactly the surface where crashes happen most (long-running jobs killed by preemption / OOM / SIGTERM where the user only has a shell handy). We're shipping the API but stopping one layer short.
- Suggestion: Add a
--resumeoption tocreate_command(e.g.typer.Option(ResumeMode.NEVER, "--resume", case_sensitive=False)so--resume always/--resume if_possible/--resume neverall work viaStrEnum), thread it throughGenerationController.run_create(), and passresume=resumeintodata_designer.create(...). Worth a parametrize test intests/cli/commands/test_create_command.py.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py:393 and :593 — Stale "Remove resume=True" warning text
- What: Both already-complete warnings still say "Remove
resume=Trueif you want to generate a new dataset." The public API is now aResumeModeenum, andresume=Trueisn't even a valid value anymore. - Why: It's the operator-facing log line that fires exactly when a user is mid-recovery and reading carefully. Sending them looking for a flag that no longer exists is the wrong note to hit.
- Suggestion:
"⚠️ Dataset is already complete — all batches were found in the existing artifact directory. "
"Nothing to resume. Use resume=ResumeMode.NEVER if you want to generate a new dataset."(same swap in _build_async).
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py:230-232 and packages/data-designer/src/data_designer/interface/data_designer.py:226-230 — Docstring overstates when ALWAYS raises
- What: Both docstrings say
ALWAYS"Raises if no prior progress exists or … incompatible." The actual behavior inbuild()lines 290-299 is that a missingmetadata.json(interrupted before the first batch / row group completed) silently starts a fresh run with an info log — it doesn't raise. The only "no prior progress" case that still raises is "directory itself doesn't exist" (viaArtifactStorage.resolved_dataset_name). - Why: That gap is exactly what users will hit after their first crashed run on a small dataset (records < buffer_size → only one row group → if it didn't finish, no metadata). They'll plan around an exception that doesn't fire.
- Suggestion: Tighten to "
ALWAYS: resume from the last completed batch / row group. If there's a checkpoint but its parameters are incompatible (buffer_sizemismatch,num_records< what was already generated), raisesDatasetGenerationError. If no checkpoint exists yet (interrupted before the first batch finished), silently restarts from the beginning. If the dataset directory itself doesn't exist, raises."
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py:290-299 — artifact_storage.resume not synced when the no-metadata fallback fires
- What: When
IF_POSSIBLE+COMPATIBLEupgrades toALWAYS(lines 274-277), both the localresumeandself.artifact_storage.resumeare flipped toALWAYSand the cachedresolved_dataset_nameis popped. If the no-metadata branch then fires (line 290), only the localresumeis downgraded toNEVER;self.artifact_storage.resumestays atALWAYS. Today this is benign becauseresolved_dataset_namewas already cached one line earlier, but it leaves the two state holders disagreeing. - Why: Same shape of bug that motivated
test_if_possible_incompatible_config_does_not_overwrite_existing_dataset—ArtifactStorage.resumeand the localresumegetting out of sync. That test only covers the IF_POSSIBLE → incompatible path; the no-metadata path leaves the same trap unprotected. - Suggestion: Mirror the IF_POSSIBLE downgrade:
if resume == ResumeMode.ALWAYS and not self.artifact_storage.metadata_file_path.exists():
logger.info(...)
self.artifact_storage.clear_partial_results()
resume = ResumeMode.NEVER
self.artifact_storage.resume = ResumeMode.NEVER # keep both in syncpackages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py — Resume tests reach pervasively into private internals
- What: DEVELOPMENT.md is explicit: "Test public APIs only. Tests should exercise public interfaces, not
_-prefixed functions or classes. If something is hard to test without reaching into private internals, consider refactoring the code to expose a public entry point." The new resume tests violate this in several distinct ways:- Three tests exercise
_find_completed_row_group_ids()as the system under test (lines 1606, 1620, 1636) —test_find_completed_row_group_ids_empty_dir,test_find_completed_row_group_ids_with_files,test_find_completed_row_group_ids_ignores_non_batch_files. The function is a private filesystem scanner; its behavior is already covered by the end-to-end async-resume tests viabuild(). - Resume behavior is driven by patching private methods rather than going through
build():_check_resume_config_compatibility(1549, 1907),_build_with_resume(1523),_build_async(1688),_prepare_async_runused as a capture hook (1786, 1830, 1873). - Coverage relies on a wall of private-method patches per test — most of the IF_POSSIBLE / async-resume tests stack 5-7
patch.object(builder, "_…")calls (_run_model_health_check_if_needed,_run_mcp_tool_check_if_needed,_write_builder_config,_initialize_generators_and_graph,_processor_runner.run_after_generation, …). Some of that scaffolding pre-dates this PR, but the new tests double down on the pattern. - The "stale return type" issue at line 1907 is a downstream symptom:
patch.object(builder, "_check_resume_config_compatibility", return_value=False)happens to behave correctly only becauseFalse != _ConfigCompatibility.COMPATIBLEis true. If anyone later branches on a specific enum member, the test will silently keep "passing" on the wrong path — thelogger.info("Config has changed …")branch on line 268 isn't being exercised the way the test name implies becauseFalse == _ConfigCompatibility.INCOMPATIBLEis also false.
- Three tests exercise
- Why: This is exactly the failure mode DEVELOPMENT.md warns about — tests that ratchet implementation details into the test suite. Today's symptoms: (a) the stale
return_value=Falsetest is silently miscategorized; (b) refactoring resume logic (e.g. extracting_resolve_async_resume_stateper the previous review's suggestion, or reshaping_build_with_resume/_build_asyncinto a single dispatcher) will cascade into ~20 test edits even when behavior is unchanged; (c) the senior review at the top of this thread already asked for "an end-to-end variant with real parquet files on disk" to lock down the filesystem reconciliation invariant — the existing tests instrument private internals instead of asserting on observable behavior. - Suggestion: A few concrete moves:
- Drop the three direct
_find_completed_row_group_idstests. That logic is already exercised bytest_find_completed_row_group_ids_used_for_initial_total_batchesand the async crash-window tests, which assert on observable downstream effects. - Convert the
_prepare_async_runcapture-hook tests into end-to-end tests that drivebuilder.build(num_records=…, resume=ResumeMode.ALWAYS)against a real seeded dataset directory (smallnum_records, real parquet files written via_write_parquet_filesyou already have), then assert onmetadata.jsoncontents (actual_num_records,num_completed_batches) afterbuild()returns. That's howtest_find_completed_row_group_ids_used_for_initial_total_batchesalready works — extend the same shape to the crash-window and skip-set tests. - For the IF_POSSIBLE-downgrade tests (
test_if_possible_incompatible_config_does_not_overwrite_existing_datasetetc.), the assertions onstorage.resume == ResumeMode.NEVERandstorage.resolved_dataset_name != "dataset"are observable; the patches on_write_builder_config/_initialize_generators_and_graphare scaffolding to short-circuit generation. If we extract a public seam (e.g. adry_run=Trueargument onbuild()that runs the resume-decision logic and returns the resolved dataset name without generating) those tests collapse to a 3-line invocation. - At minimum, fix the
return_value=Falsetoreturn_value=_ConfigCompatibility.INCOMPATIBLE— the enum is already imported on line 24 of the test file. Even if the broader refactor lands as a follow-up, this one stops being a silently miscategorized test.
- Drop the three direct
Warnings — Worth addressing
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py:519-548 — Missing-config case silently returns COMPATIBLE, but docstring promises a warning
- What: Docstring says "COMPATIBLE — fingerprints match, or stored config is unreadable (warning logged)". In practice
config_pathabsent returnsCOMPATIBLEsilently (no log), unlike the unreadable case which warns. - Why: A user who deletes
builder_config.jsonby mistake and reruns withIF_POSSIBLEgets a silent resume whose generation parameters can no longer be validated. Worse, this meansIF_POSSIBLEwill then upgrade toALWAYSand proceed — bypassing the fingerprint check the user explicitly opted into. That's a silent correctness footgun on a feature whose entire job is to detect config drift. - Suggestion: Log a warning in the
not config_path.exists()branch — a missingbuilder_config.jsonnext to a populated dataset directory is genuinely anomalous and worth surfacing. Update the docstring to match if you go that route.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py:543 — except Exception is broader than needed
- What:
_check_resume_config_compatibilitycatches anyExceptionto recover from a corruptbuilder_config.json. Realistic failure modes areOSError,json.JSONDecodeError, andpydantic.ValidationError. - Why: STYLEGUIDE.md is explicit: "Prefer specific exception types over bare
except. Never catchExceptionorBaseExceptionwithout re-raising." As written, this also swallows programming bugs — e.g. anAttributeErrorfrom a future schema change toBuilderConfig, or aTypeErrorfrom a refactor — under the "unreadable config — assume compatible" log line. That's exactly the silent-fallback shape the style guide is trying to prevent: a real bug becomes "user sees a warning, run continues with fingerprint check skipped, config drift goes undetected." - Suggestion:
except (OSError, json.JSONDecodeError, ValidationError):(importValidationErrorfrompydantic). Anything outside that set is a programming error and should propagate.
packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py — Format check fails
- What:
ruff format --checkflagstest_if_possible_starts_fresh_when_directory_is_emptyat line 1959 for an unnecessary multi-line signature. - Why: CI will fail this on the next push and pre-commit would have caught it. Worth fixing before merge so the green-CI baseline is honest.
- Suggestion:
make check-all-fixand re-commit.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py:602-609 — Nested finalize_row_group closure
- What:
_build_asyncdefinesfinalize_row_group(rg_id)as a nested function inside the body of an already ~120-line method. It capturesbuffer_manager,num_records,buffer_size, andon_batch_completefrom the enclosing scope. - Why: STYLEGUIDE.md is explicit: "Avoid nested functions. Define helpers at module level or as private methods on the class. Nested functions hide logic, make testing harder, and complicate stack traces. The only acceptable use is closures that genuinely need to capture local state." This one closes over instance + parameter state, which a method +
functools.partialhandles equivalently. The senior review at the top of this thread already flagged that_build_asyncis dense at 120 lines and worth decomposing — this nested function is one of the contributors to that density and one of the cheaper pieces to extract. - Suggestion: Promote to a private method
_finalize_row_group(self, rg_id, *, buffer_manager, num_records, buffer_size, on_batch_complete)and passfunctools.partial(self._finalize_row_group, buffer_manager=buffer_manager, num_records=num_records, buffer_size=buffer_size, on_batch_complete=on_batch_complete)into_prepare_async_run. Same behavior, but stack traces and tests can address it directly.
What Looks Good
_ConfigCompatibilitytri-state cleanly disambiguates the three resume scenarios. The earlierboolreturn type forcedIF_POSSIBLEto conflate "no prior dataset" with "configs differ"; the new enum letsALWAYSraise onINCOMPATIBLEonly, whileIF_POSSIBLEcollapses bothINCOMPATIBLEandNO_PRIOR_DATASETinto a silent fresh start.- Filesystem-vs-metadata reconciliation in
_build_asyncis the right move. Sourcing bothinitial_total_num_batchesandinitial_actual_num_recordsfrom_find_completed_row_group_ids()(withstate.target_num_recordsfor per-group sizing) covers the full crash-window matrix without needing a separate flag — andtest_initial_actual_num_records_from_filesystem_in_crash_windowplustest_build_async_resume_initial_actual_num_records_uses_original_targetlock that down. if generated: run_after_generation(...)gating is exactly the right shape. Resuming an already-complete dataset no longer destroys post-processed parquet by re-runningAFTER_GENERATIONprocessors on top of itself. Both engines covered.test_build_async_resume_skip_row_groups_contains_completed_idslocks down the "only the missing row groups get scheduled" invariant the previous review thread asked for.- Test coverage hits the failure modes that matter — ~30 new tests across config-mismatch, buffer-mismatch, num_records-too-small, no-metadata fallback, crash-window FS reconciliation, IF_POSSIBLE downgrade paths (no dir / empty dir / incompatible config), and the async skip-set invariant.
Verdict
Needs changes — Six Critical findings and four Warnings, all worth addressing before merge. The collision with #540 is the highest-priority blocker (regression risk on a feature with its own already-reviewed PR); the CLI --resume gap and the test-private-API issue are the next biggest because they shape what users and future maintainers interact with. The Warnings are all small but each is a STYLEGUIDE.md violation or a silent-fallback footgun, so they're cheaper to fold into this PR than to chase in a follow-up.
This review was generated by an AI assistant.
…I flag, fix edge cases C1: drop commit 0bdf24a — remove export() / --output-format from this PR; that feature belongs to NVIDIA-NeMo#540 which has a superior streaming implementation C2: add --resume / -r flag to data-designer create CLI, thread ResumeMode through GenerationController.run_create() into DataDesigner.create() C3: fix already-complete warning text — replace stale "Remove resume=True" with "Use resume=ResumeMode.NEVER" in _build_with_resume and _build_async C4: fix docstrings — ALWAYS does NOT raise when no checkpoint exists (silently restarts from scratch); clarify num_records >= actual semantics C5: sync artifact_storage.resume = NEVER when no-metadata fallback fires so both state holders agree after the downgrade C6: fix return_value=False → _ConfigCompatibility.INCOMPATIBLE in IF_POSSIBLE test; drop 3 direct _find_completed_row_group_ids tests (private API, covered by build()) W1: add logger.warning when builder_config.json is absent (silent COMPATIBLE was footgun) W2: narrow except Exception → (OSError, json.JSONDecodeError, ValidationError) W3: run make check-all-fix — ruff reformatted test_if_possible_starts_fresh_when_directory_is_empty
…s formula on async resume When extending an async run (num_records > state.target_num_records) and a crash occurs after an extension row group is written to disk but before write_metadata, the formula `min(buffer_size, state.target_num_records - rg_id * buffer_size)` yields a negative value for any extension row group (rg_id * buffer_size >= target), making initial_actual_num_records silently undercount. The RowGroupBufferManager then starts at the wrong offset, and the final metadata reports an incorrect actual_num_records with a false partial-completion warning. Fix: use state.target_num_records for original row groups and num_records for extension row groups (guarded by rg_id * buffer_size < state.target_num_records). Covers the scenario with a new regression test.
… on non-aligned extension resume The partitioning loop in _prepare_async_run decremented remaining by min(buffer_size, remaining) for every row group, including skipped ones. For a non-aligned original run (e.g. target=5, buffer_size=2, last group has 1 record), the loop deducted 2 for the skipped last group, leaving remaining one short. Extension row groups received smaller sizes than intended, so the generated dataset was silently short by the deficit and a false partial-completion warning fired. Fix: pre-compute the full row-group list with correct per-group sizes in _build_async where state.target_num_records is available, then pass it to _prepare_async_run as precomputed_row_groups (replacing the skip_row_groups param). Original groups use min(buffer_size, target - rg*bs); extension groups use min(buffer_size, extension_records - ext_idx*bs). Also updates the skip_row_groups test to assert on precomputed_row_groups and adds a regression test for the non-aligned extension case.
|
Thanks for the thorough review, @nabinchha — the second pass in particular caught real gaps worth fixing before merge. Two more fixes landed since your last review, both surfaced by Greptile on the updated code: in the extension crash window — when an async run is extended ( Non-aligned extension row-group sizes — the partitioning loop in |
The plan described the initial resume: bool design which has since been replaced by the full ResumeMode enum (NEVER/ALWAYS/IF_POSSIBLE), async engine support, filesystem reconciliation, and config compatibility checks. The PR description is the authoritative record of what shipped.
Resolved conflicts in CLI create command and tests — kept both --resume (PR NVIDIA-NeMo#526) and --output-format (PR NVIDIA-NeMo#540) parameters throughout create.py, generation_controller.py, and all test files.
… group's slack original_target=5, buffer_size=2 produces 3 groups [2,2,1]. Extending to num_records=6: ceil(6/2)=3 equalled len(completed_ids)=3, triggering the already-complete branch on both the async and sync paths — returning the 5-record dataset silently. Fix (async): replace ceil(num_records/bs) with num_original_groups + ceil(extension_records/bs) so any extension always adds new groups beyond num_original_groups. Fix (sync): add num_records_list param to DatasetBatchManager.start() and pass the correct per-batch sizes in _build_with_resume, giving the batch manager the right total batch count (4 instead of 3 in the example).
|
Also, @przemekboruta do you have any interest in writing a short dev note about this feature once it goes out? |
… resume Prevents negative extension_records in async path which silently truncated the dataset and corrupted metadata without triggering a partial-completion warning.
…ngrade When build() detected an incompatible config and downgraded resume from IF_POSSIBLE to NEVER, _media_storage.base_path remained bound to the original directory while all other path properties resolved to the new timestamped directory — causing broken image references in image-column runs.
…sume writes After finalize_row_group successfully wrote incremental metadata during an extension run, target_num_records in metadata was updated to the extension target. A subsequent resume would read this as the original target, making _rg_size() incorrect for all row groups and silently corrupting actual_num_records. Stores original_target_num_records as an immutable field in metadata so the original group boundaries are always recoverable regardless of how many incremental writes have occurred.
|
Hey @nabinchha! Got a bit carried away with the Greptile review and landed a few more commits — mainly fixing edge cases in the async extension crash windows (stale And yes, I'd love to write a dev note about this feature! Just let me know — should I add it to this PR, or would you prefer a separate one? |
Summary
Closes #525
Adds
resume: ResumeMode = ResumeMode.NEVERtoDataDesigner.create()andDatasetBuilder.build(). Generation picks up from where the interrupted run left off — for both the sync and async engines.Changes
ArtifactStorageResumeMode(StrEnum)enum (NEVER/ALWAYS/IF_POSSIBLE);resume: ResumeMode = ResumeMode.NEVERfield;resolved_dataset_nameskips timestamp logic onALWAYS/IF_POSSIBLE; newclear_partial_results(); newrefresh_media_storage_path()re-points_media_storage.base_pathafter anIF_POSSIBLE→NEVERdowngradeDatasetBatchManager.start()start_batch,initial_actual_num_records,num_records_list, andoriginal_target_num_recordsparams (all default, no breakage);num_records_listlets callers supply the exact per-batch sizes;original_target_num_recordsis persisted in each incremental metadata write so extension resumes can always recover the immutable original group boundariesDatasetBuilder.build()resume: ResumeModeparam;_load_resume_state()reads and validatesmetadata.json;_build_with_resume()skips completed batches (sync);_build_async()skips completed row groups (async);_check_resume_config_compatibility()returns_ConfigCompatibilityenum (COMPATIBLE/INCOMPATIBLE/NO_PRIOR_DATASET) and is called for bothALWAYSandIF_POSSIBLE—ALWAYSraises on INCOMPATIBLE,IF_POSSIBLEstarts fresh on INCOMPATIBLE or NO_PRIOR_DATASET;resolved_dataset_namecache invalidated onIF_POSSIBLEdowngrade; partial-completion warning emitted beforereturnin_build_async;_ResumeStatecarriesoriginal_target_num_records(immutable across extension writes) so_rg_size()always uses the correct original group boundariesRowGroupBufferManager.__init__()initial_actual_num_recordsandinitial_total_num_batchesparams to seed counters on resumeDatasetBuilder._find_completed_row_group_ids()parquet-files/forbatch_*.parquetto determine which async row groups are already doneDatasetBuilder._build_async()_prepare_async_run; original groups usemin(bs, target - rg*bs), extension groups usemin(bs, ext_records - ext_idx*bs)— replaces theskip_row_groupsapproach that incorrectly deductedbuffer_sizefor non-aligned skipped groups; total row groups now computed asnum_original_groups + ceil(extension_records/bs)instead ofceil(num_records/bs), fixing a false "already complete" when the extension fits within the last original group's slackDatasetBuilder._build_with_resume()num_records_list = original_sizes + extension_sizestobatch_manager.start(), giving the correct total batch count for non-aligned extensions; fixes the same false "already complete" on the sync pathfinalize_row_groupclosuremetadata.jsonafter every row-group checkpoint (not just at the end), making all async runs resumable if interruptedDataDesigner.create()resume: ResumeMode, passes it through toArtifactStorageandbuilder.build()CLI create--resume/-roption (ResumeMode, defaultNEVER);GenerationController.run_create()accepts and forwardsresumeboolreturn in_build_with_resume/_build_asyncbuild()gatesrun_after_generationon the return value so processors are never re-run on an already-complete datasetResumeMode semantics
NEVER(default)ALWAYSDatasetGenerationErrorif config changed or run is otherwise incompatibleIF_POSSIBLEValidation and error cases
metadata.json(interrupted before first batch): restarts from scratch (both engines)num_recordsless than already-generated records →DatasetGenerationErrornum_recordsbetween already-generated records and original target (i.e.actual <= num_records < target) →DatasetGenerationError; prevents negativeextension_recordsin the async path which would silently truncate the dataset and corrupt metadatanum_recordsgreater than original target is allowed (extends the dataset)write_metadata(async): crash after an extension row group's parquet file is on disk but before the metadata write —initial_actual_num_recordsis derived from the filesystem using the correct per-group sizes, avoiding a negative size for extension groupswrite_metadata(async): crash after a successful incremental metadata write during an extension flipstarget_num_recordsto the extension target;original_target_num_recordsis a separate immutable field written once at the start of the original run and carried unchanged through all subsequent writes, so_rg_size()always uses the correct original group boundaries on any future resumebuffer_size(last group has fewer thanbuffer_sizerecords), extension row groups now receive correct sizes — the oldskip_row_groupsloop deductedbuffer_sizefor every skipped group, leavingremainingshort and causing extension groups to be undersized, silently producing a dataset shorter than requestedoriginal_target=5, buffer_size=2produces 3 groups/batches; extending tonum_records=6gaveceil(6/2)=3 == len(completed)=3, triggering the already-complete branch and returning 5 records silently — fixed by computing the total asnum_original_groups + ceil(extension_records/bs)on both pathsbuffer_sizemismatch →DatasetGenerationErrorALWAYS→DatasetGenerationError; withIF_POSSIBLE→ silent fresh start,resolved_dataset_namecache invalidated so the fresh run gets a timestamped directory, and_media_storagere-initialised to the same path so image-column runs write to the correct directoryIF_POSSIBLEstarts fresh (no error);ALWAYScontinues (handled by metadata check)builder_config.json→ warning logged, config compatibility check skipped (treated as compatible)Test plan
test_resolved_dataset_name_resume_uses_existing_foldertest_resolved_dataset_name_resume_raises_when_no_existing_foldertest_resolved_dataset_name_resume_raises_when_folder_is_emptytest_resolved_dataset_name_if_possible_uses_existing_foldertest_resolved_dataset_name_if_possible_uses_clean_name_when_no_existing_foldertest_clear_partial_results_removes_partial_foldertest_clear_partial_results_is_noop_when_no_partial_foldertest_start_with_start_batchtest_start_with_initial_actual_num_recordstest_start_with_start_batch_and_initial_actual_num_recordstest_start_default_values_unchangedtest_build_resume_starts_fresh_without_metadatatest_build_resume_raises_when_num_records_below_actualtest_build_resume_raises_when_num_records_below_original_targettest_build_resume_allows_larger_num_recordstest_build_resume_raises_on_buffer_size_mismatchtest_build_resume_always_raises_on_config_mismatchtest_build_resume_runs_remaining_batchestest_build_resume_logs_warning_when_already_completetest_build_resume_already_complete_does_not_run_after_generation_processorstest_build_resume_not_already_complete_when_extension_fits_in_slacktest_build_async_resume_logs_warning_when_already_completetest_build_async_resume_starts_fresh_without_metadatatest_build_async_resume_already_complete_does_not_run_after_generation_processorstest_find_completed_row_group_ids_used_for_initial_total_batchestest_initial_actual_num_records_from_filesystem_in_crash_windowtest_build_async_resume_skip_row_groups_contains_completed_idstest_build_async_resume_initial_actual_num_records_uses_original_targettest_build_async_resume_initial_actual_num_records_extension_crash_windowtest_build_async_resume_extension_non_aligned_row_group_sizestest_build_async_resume_not_already_complete_when_extension_fits_in_slacktest_if_possible_incompatible_config_does_not_overwrite_existing_datasettest_if_possible_incompatible_config_refreshes_media_storage_pathtest_build_async_resume_stale_original_target_after_incremental_metadata_writetest_if_possible_starts_fresh_when_no_existing_directorytest_if_possible_starts_fresh_when_directory_is_emptytest_create_command_passes_resume_alwaystest_create_command_passes_resume_if_possibletest_run_create_passes_resume_alwaystest_run_create_passes_resume_if_possible