Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5110d63
docs: add implementation plan for resume mechanism
przemekboruta Apr 13, 2026
6afb638
feat(storage): add resume flag and clear_partial_results()
przemekboruta Apr 13, 2026
699f510
feat(batch-manager): add start_batch param to start()
przemekboruta Apr 13, 2026
866df6b
feat(builder): implement resume logic in DatasetBuilder
przemekboruta Apr 13, 2026
e0b22d5
feat(interface): expose resume on DataDesigner.create()
przemekboruta Apr 13, 2026
4b514b2
test: add tests for resume mechanism
przemekboruta Apr 13, 2026
1db497a
feat(builder): extend resume to async engine (DATA_DESIGNER_ASYNC_ENG…
przemekboruta Apr 13, 2026
812d7df
fix(builder): skip after-generation processors when resume finds data…
przemekboruta Apr 13, 2026
4054610
fix(builder): use filesystem count for initial_total_num_batches on a…
przemekboruta Apr 13, 2026
0bdf24a
feat(results): add export() method and --output-format CLI flag
przemekboruta Apr 13, 2026
4401e4b
fix(builder): handle resume when metadata.json missing (interrupted b…
przemekboruta Apr 14, 2026
c2d0a77
docs(interface): fix resume docstring — async engine is supported
przemekboruta Apr 14, 2026
4ffd7f3
fix(builder): derive initial_actual_num_records from filesystem in as…
przemekboruta Apr 14, 2026
b89f1a1
feat(resume): replace resume: bool with ResumeMode enum (NEVER/ALWAYS…
przemekboruta Apr 30, 2026
5a99f59
fix(resume): invalidate resolved_dataset_name cache when IF_POSSIBLE …
przemekboruta May 1, 2026
f69b3e7
fix(builder): move partial-completion warning before return in _build…
przemekboruta May 1, 2026
4daf48b
fix(builder): IF_POSSIBLE now starts fresh when no dataset directory …
przemekboruta May 4, 2026
69c3e55
fix(builder): use original target_num_records in async resume record …
przemekboruta May 4, 2026
487def2
fix(builder): IF_POSSIBLE starts fresh on empty dataset directory
przemekboruta May 4, 2026
a08d9cc
fix(builder): ALWAYS raises DatasetGenerationError on config fingerpr…
przemekboruta May 4, 2026
75fda14
Merge branch 'main' into main
nabinchha May 5, 2026
e7c0f95
fix(resume): address nabinchha review — drop export collision, add CL…
przemekboruta May 6, 2026
d38ac96
Merge origin/main into main
przemekboruta May 6, 2026
02821b8
fix(builder): replace stdlib StrEnum with project compat shim for Pyt…
przemekboruta May 6, 2026
b8c633c
fix(builder): guard extension row groups in initial_actual_num_record…
przemekboruta May 6, 2026
0fef8d4
fix(builder): pre-compute row-group list in _build_async to fix sizes…
przemekboruta May 6, 2026
dad57b8
chore: remove stale implementation plan for #525
przemekboruta May 7, 2026
1a32cac
Merge origin/main into feat/dataset-export
przemekboruta May 7, 2026
60b9d6b
fix(engine): fix false 'already complete' when extension fits in last…
przemekboruta May 7, 2026
cd85e97
fix(engine): raise error when num_records is below original target on…
przemekboruta May 8, 2026
031c689
Merge branch 'main' into main
przemekboruta May 8, 2026
729ecf2
fix(storage): refresh MediaStorage path after IF_POSSIBLE → NEVER dow…
przemekboruta May 8, 2026
e64ec03
fix(engine): preserve original_target_num_records across extension re…
przemekboruta May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, artifact_storage: ArtifactStorage):
self._num_records_list: list[int] | None = None
self._buffer_size: int | None = None
self._actual_num_records: int = 0
self._original_target_num_records: int | None = None
self.artifact_storage = artifact_storage

@property
Expand Down Expand Up @@ -87,9 +88,11 @@ def finish_batch(self, on_complete: Callable[[Path], None] | None = None) -> Pat
self._actual_num_records += len(self._buffer)
final_file_path = self.artifact_storage.move_partial_result_to_final_file_path(self._current_batch_number)

target = sum(self.num_records_list)
self.artifact_storage.write_metadata(
{
"target_num_records": sum(self.num_records_list),
"target_num_records": target,
"original_target_num_records": self._original_target_num_records or target,
"actual_num_records": self._actual_num_records,
"total_num_batches": self.num_batches,
"buffer_size": self._buffer_size,
Expand Down Expand Up @@ -158,17 +161,32 @@ def reset(self, delete_files: bool = False) -> None:
except OSError as e:
raise DatasetBatchManagementError(f"🛑 Failed to delete directory {dir_path}: {e}")

def start(self, *, num_records: int, buffer_size: int) -> None:
def start(
self,
*,
num_records: int,
buffer_size: int,
start_batch: int = 0,
initial_actual_num_records: int = 0,
num_records_list: list[int] | None = None,
original_target_num_records: int | None = None,
) -> None:
if num_records <= 0:
raise DatasetBatchManagementError("🛑 num_records must be positive.")
if buffer_size <= 0:
raise DatasetBatchManagementError("🛑 buffer_size must be positive.")

self._buffer_size = buffer_size
self._num_records_list = [buffer_size] * (num_records // buffer_size)
if remaining_records := num_records % buffer_size:
self._num_records_list.append(remaining_records)
self._original_target_num_records = original_target_num_records
if num_records_list is not None:
self._num_records_list = list(num_records_list)
else:
self._num_records_list = [buffer_size] * (num_records // buffer_size)
if remaining_records := num_records % buffer_size:
self._num_records_list.append(remaining_records)
self.reset()
self._current_batch_number = start_batch
self._actual_num_records = initial_actual_num_records

def write(self) -> Path | None:
"""Write the current batch to a parquet file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,18 @@ class RowGroupBufferManager:
exclusively by the async scheduler.
"""

def __init__(self, artifact_storage: ArtifactStorage) -> None:
def __init__(
self,
artifact_storage: ArtifactStorage,
initial_actual_num_records: int = 0,
initial_total_num_batches: int = 0,
) -> None:
self._buffers: dict[int, list[dict]] = {}
self._row_group_sizes: dict[int, int] = {}
self._dropped: dict[int, set[int]] = {}
self._artifact_storage = artifact_storage
self._actual_num_records: int = 0
self._total_num_batches: int = 0
self._actual_num_records: int = initial_actual_num_records
self._total_num_batches: int = initial_total_num_batches

def init_row_group(self, row_group: int, size: int) -> None:
"""Allocate a buffer for *row_group* with *size* empty rows."""
Expand Down Expand Up @@ -129,11 +134,14 @@ def checkpoint_row_group(

self.free_row_group(row_group)

def write_metadata(self, target_num_records: int, buffer_size: int) -> None:
def write_metadata(
self, target_num_records: int, buffer_size: int, original_target_num_records: int | None = None
) -> None:
"""Write final metadata after all row groups are checkpointed."""
self._artifact_storage.write_metadata(
{
"target_num_records": target_num_records,
"original_target_num_records": original_target_num_records or target_num_records,
"actual_num_records": self._actual_num_records,
"total_num_batches": self._total_num_batches,
"buffer_size": buffer_size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ class BatchStage(StrEnum):
PROCESSORS_OUTPUTS = "processors_outputs_path"


class ResumeMode(StrEnum):
NEVER = "never"
ALWAYS = "always"
IF_POSSIBLE = "if_possible"


class ArtifactStorage(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

Expand All @@ -47,6 +53,7 @@ class ArtifactStorage(BaseModel):
partial_results_folder_name: str = "tmp-partial-parquet-files"
dropped_columns_folder_name: str = "dropped-columns-parquet-files"
processors_outputs_folder_name: str = PROCESSORS_OUTPUTS_FOLDER_NAME
resume: ResumeMode = ResumeMode.NEVER
_media_storage: MediaStorage = PrivateAttr(default=None)

@property
Expand All @@ -67,12 +74,19 @@ def artifact_path_exists(self) -> bool:
def resolved_dataset_name(self) -> str:
dataset_path = self.artifact_path / self.dataset_name
if dataset_path.exists() and len(list(dataset_path.iterdir())) > 0:
if self.resume in (ResumeMode.ALWAYS, ResumeMode.IF_POSSIBLE):
return self.dataset_name
new_dataset_name = f"{self.dataset_name}_{datetime.now().strftime('%m-%d-%Y_%H%M%S')}"
logger.info(
f"📂 Dataset path {str(dataset_path)!r} already exists. Dataset from this session"
f"\n\t\t will be saved to {str(self.artifact_path / new_dataset_name)!r} instead."
)
return new_dataset_name
if self.resume == ResumeMode.ALWAYS:
raise ArtifactStorageError(
f"🛑 Cannot resume: no existing dataset found at {str(dataset_path)!r}. "
"Run without resume=ResumeMode.ALWAYS to start a new generation."
)
return self.dataset_name

@property
Expand Down Expand Up @@ -144,6 +158,16 @@ def set_media_storage_mode(self, mode: StorageMode) -> None:
"""
self._media_storage.mode = mode

def refresh_media_storage_path(self) -> None:
"""Re-point MediaStorage to the current base_dataset_path.

Must be called after popping the resolved_dataset_name cache so that
_media_storage.base_path and .images_dir reflect the updated directory.
"""
images_subdir = self._media_storage.images_dir.name
self._media_storage.base_path = self.base_dataset_path
self._media_storage.images_dir = self.base_dataset_path / images_subdir

@staticmethod
def mkdir_if_needed(path: Path | str) -> Path:
"""Create the directory if it does not exist."""
Expand Down Expand Up @@ -204,6 +228,11 @@ def load_dataset_with_dropped_columns(self) -> pd.DataFrame:
df = lazy.pd.concat([df, df_dropped], axis=1)
return df

def clear_partial_results(self) -> None:
"""Remove any in-flight partial results left over from an interrupted run."""
if self.partial_results_path.exists():
shutil.rmtree(self.partial_results_path)

def move_partial_result_to_final_file_path(self, batch_number: int) -> Path:
partial_result_path = self.create_batch_file_path(batch_number, batch_stage=BatchStage.PARTIAL_RESULT)
if not partial_result_path.exists():
Expand Down
Loading
Loading