From a8b63ab4ba720b85b96a4254c6a78f72d6c3558b Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 19:48:27 -0700 Subject: [PATCH 1/7] fix(SC-1..SC-5): correct five I/O measurement bugs in streaming_checkpoint.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SC-1 — Async S3 I/O time invisible to metric For MinIO and s3torchconnector backends, write_chunk() returns before the ThreadPoolExecutor UploadPart tasks complete. Total I/O time was accumulated as a sum of buffer-fill microseconds, missing the actual network transmission that happens in parallel. Fixed by capturing io_wall_start before the write loop and io_wall_end after close(), then storing io_wall_time in the stats dict. _format_results() now uses io_wall_time as the io_throughput denominator (falling back to io_accumulated_time for synchronous backends where they are equal). The old accumulated metric is retained as io_accumulated_time for diagnostic visibility. SC-2 — memcpy inside timed I/O window inflates CPU time in I/O metric bytes(shm.buf[:nbytes]) — a full memcpy of up to 32 MB — was executed inside the per-chunk io_start/io_stop window in _writer_process(). For a 32 MB chunk this adds ~10–50 ms of pure CPU time to reported I/O time. Fixed by moving chunk_bytes = bytes(shm.buf[:nbytes]) to before io_start, then passing the already-converted bytes object to write_chunk(). The write_chunk() callers in minio_writer.py and s3torch_writer.py are updated separately (see their commits). SC-3 — total_time includes process setup; pipeline_overhead_pct is misleading start_time was captured at the top of save() before buffer pool creation (up to 2 GB of shared memory), generator init, IPC queue/ event creation, and process fork. These can take seconds for large checkpoints. Fixed by splitting into setup_start (original position) and pipeline_start (immediately after writer_proc.start()). The total_time returned by _format_results() now measures the data pipeline only. setup_time is passed through and included in the results dict for diagnostic visibility. SC-4 — gen_throughput and io_throughput use different byte counts gen_throughput used total_size_bytes (requested size) while io_throughput used stats['total_bytes'] (actual bytes written). When these differ (truncated last chunk, backend short-writes), throughput_ratio = gen/io is a dimensionally inconsistent comparison. Fixed by computing actual_bytes_gb = stats['total_bytes'] / 1024**3 once and using it for both numerators. A warning is emitted when requested and actual byte counts differ. SC-5 — parallel reader backend_stats discards all-but-last reader's stats In the load() parallel reader finally block, backend_stats was assigned inside a for loop over readers, overwriting on every iteration. All readers except the last had their backend stats (S3 request counts, errors, retries) silently discarded. Fixed by accumulating into all_backend_stats = [] before the loop and returning the full list. --- .../checkpointing/streaming_checkpoint.py | 51 +++++++++++++------ 1 file changed, 35 insertions(+), 16 deletions(-) diff --git a/mlpstorage_py/checkpointing/streaming_checkpoint.py b/mlpstorage_py/checkpointing/streaming_checkpoint.py index bffa9a5b..382e6a6f 100644 --- a/mlpstorage_py/checkpointing/streaming_checkpoint.py +++ b/mlpstorage_py/checkpointing/streaming_checkpoint.py @@ -157,8 +157,8 @@ def save( print(f"Use dgen-py: {self.use_dgen}") print("=" * 80) - start_time = time.time() - + setup_start = time.time() + # Create buffer pool buffers, buffer_names = self._create_buffer_pool() @@ -205,7 +205,9 @@ def save( ) writer_proc.start() print(f"\n[Main] Writer process started (PID={writer_proc.pid})") - + pipeline_start = time.time() + setup_time = pipeline_start - setup_start + try: # Producer loop print(f"[Main] Starting producer at {time.perf_counter():.3f}s") @@ -249,7 +251,7 @@ def save( if 'error' in stats: raise RuntimeError(f"Writer process error: {stats['error']}") - return self._format_results(stats, gen_time, time.time() - start_time, total_size_bytes) + return self._format_results(stats, gen_time, time.time() - pipeline_start, total_size_bytes, setup_time) def _create_buffer_pool(self): """Create shared memory buffer pool.""" @@ -392,6 +394,7 @@ def _writer_process(buffer_names, chunk_size, filepath, total_size, total_io_time = 0.0 chunks_written = 0 _write_error = None # Error from write loop, if any + io_wall_start = time.perf_counter() try: while written < total_size: @@ -402,9 +405,12 @@ def _writer_process(buffer_names, chunk_size, filepath, total_size, buffer_idx, nbytes = item shm = buffers[buffer_idx] + # Copy buffer outside timed window to avoid memcpy inflation + chunk_bytes = bytes(shm.buf[:nbytes]) + # Time ONLY the I/O operation io_start = time.perf_counter() - bytes_written = writer.write_chunk(shm.buf, nbytes) + bytes_written = writer.write_chunk(chunk_bytes, nbytes) total_io_time += time.perf_counter() - io_start written += bytes_written @@ -434,6 +440,8 @@ def _writer_process(buffer_names, chunk_size, filepath, total_size, if _write_error is None: _write_error = f"close() failed: {e}" + io_wall_end = time.perf_counter() + # Force cleanup of storage-library resources. try: del writer @@ -444,6 +452,7 @@ def _writer_process(buffer_names, chunk_size, filepath, total_size, # Build result dict — single put to stats_queue. result = { 'io_time': total_io_time, + 'io_wall_time': io_wall_end - io_wall_start, 'close_time': close_time, 'total_bytes': written, 'chunks_written': chunks_written, @@ -471,20 +480,26 @@ def _writer_process(buffer_names, chunk_size, filepath, total_size, sys.stdout.flush() os._exit(exit_code) - def _format_results(self, stats, gen_time, total_time, total_size_bytes): + def _format_results(self, stats, gen_time, total_time, total_size_bytes, setup_time=0.0): """Format results for return.""" - gen_throughput = (total_size_bytes / (1024**3)) / gen_time - io_throughput = (stats['total_bytes'] / (1024**3)) / stats['io_time'] - + actual_bytes_gb = stats['total_bytes'] / (1024**3) + gen_throughput = actual_bytes_gb / gen_time + io_throughput = actual_bytes_gb / stats.get('io_wall_time', stats['io_time']) + + if stats['total_bytes'] != total_size_bytes: + print(f"[Warning] Bytes written ({stats['total_bytes']}) != requested ({total_size_bytes}); throughput ratio uses actual bytes for both numerators.") + # Calculate improved metrics throughput_ratio = gen_throughput / io_throughput pipeline_overhead = ((total_time - max(gen_time, stats['io_time'])) / total_time) * 100 bottleneck = "I/O" if stats['io_time'] > gen_time else "Generation" - + results = { 'gen_time': gen_time, - 'io_time': stats['io_time'], + 'io_accumulated_time': stats['io_time'], + 'io_wall_time': stats.get('io_wall_time', stats['io_time']), 'close_time': stats.get('close_time', 0.0), + 'setup_time': setup_time, 'total_time': total_time, 'total_bytes': stats['total_bytes'], 'chunks': stats['chunks_written'], @@ -495,13 +510,15 @@ def _format_results(self, stats, gen_time, total_time, total_size_bytes): 'bottleneck': bottleneck, 'backend_stats': stats.get('backend_stats', {}) } - + print("\n" + "=" * 80) print("RESULTS") print("=" * 80) + print(f"Setup: {results['setup_time']:.4f}s (buffer pool + fork overhead)") print(f"Generation: {results['gen_time']:.4f}s @ {results['gen_throughput_gbps']:.2f} GB/s") - print(f"I/O: {results['io_time']:.4f}s @ {results['io_throughput_gbps']:.2f} GB/s") - print(f" - write: {results['io_time'] - results['close_time']:.4f}s") + print(f"I/O: {results['io_wall_time']:.4f}s (wall) @ {results['io_throughput_gbps']:.2f} GB/s") + print(f" - accumulated: {results['io_accumulated_time']:.4f}s (sum of per-chunk timers)") + print(f" - write: {results['io_accumulated_time'] - results['close_time']:.4f}s") print(f" - close: {results['close_time']:.4f}s (fsync/finalize)") print(f"Total: {results['total_time']:.4f}s") print(f"") @@ -510,7 +527,7 @@ def _format_results(self, stats, gen_time, total_time, total_size_bytes): print(f"Bottleneck: {results['bottleneck']}") print(f"Chunks: {results['chunks']}") print("=" * 80) - + return results def load( @@ -699,11 +716,13 @@ def _read_block(reader, block_start, block_end, worker_id): io_time = max(t for _, t, _ in results) chunks = sum(c for _, _, c in results) finally: + all_backend_stats = [] for r in readers: try: - backend_stats = r.close() + all_backend_stats.append(r.close()) except Exception: pass + backend_stats = all_backend_stats total_time = time.time() - wall_start io_gbps = (total_read / 1024**3) / io_time if io_time > 0 else 0.0 From 8a27651f6f7a73a0f8aacd33a97b30f327bf5c78 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 19:48:43 -0700 Subject: [PATCH 2/7] fix(SC-1,SC-2): remove async I/O timing gap and memcpy from minio_writer.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SC-1 — Async upload latency not captured in io_time The MinIO multipart upload writer uses a ThreadPoolExecutor to send UploadPart requests in parallel. write_chunk() returns as soon as the part buffer is flushed to the executor queue, before the actual S3 network round-trips complete. This caused the accumulated io_time in streaming_checkpoint.py to reflect only buffer-management overhead (microseconds per chunk) rather than true network I/O time. The fix in streaming_checkpoint.py (SC-1) switches to a wall-clock window that captures the full upload pipeline duration; this file required no additional changes for SC-1 but is listed as affected because its async behaviour is the root cause. SC-2 — bytes() memcpy inside write_chunk() added CPU time to I/O metric data = bytes(buffer[:size]) performed a full copy of up to 32 MB of data inside write_chunk(), which was called from inside the timed I/O window in streaming_checkpoint._writer_process(). This attributed ~10–50 ms of CPU memcpy time per chunk to reported I/O time, inflating throughput numbers. Fixed by: - Changing the buffer parameter type from memoryview to bytes - Removing the data = bytes(buffer[:size]) internal copy - Replacing all references to data in the method body with buffer (a bytes object supports the same slicing used downstream) The conversion now happens in streaming_checkpoint.py before io_start, outside the timed window. --- .../storage_writers/minio_writer.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/mlpstorage_py/checkpointing/storage_writers/minio_writer.py b/mlpstorage_py/checkpointing/storage_writers/minio_writer.py index a9b63535..65b13aa1 100644 --- a/mlpstorage_py/checkpointing/storage_writers/minio_writer.py +++ b/mlpstorage_py/checkpointing/storage_writers/minio_writer.py @@ -323,34 +323,33 @@ def _flush_part(self) -> None: rate = written_gb / elapsed if elapsed > 0 else 0.0 print(f'\r[Writer] {written_gb:.2f} GB, {rate:.2f} GB/s ', end='', flush=True) - def write_chunk(self, buffer: memoryview, size: int) -> int: + def write_chunk(self, buffer: bytes, size: int) -> int: """Write chunk, flushing parts as they fill up. - + Args: - buffer: Memory buffer containing data to write + buffer: Bytes containing data to write size: Number of bytes to write from buffer - + Returns: Number of bytes written """ - data = bytes(buffer[:size]) offset = 0 - + while offset < size: # Calculate how much we can add to current part remaining_in_part = self.part_size - self.part_buffer_size chunk_remaining = size - offset to_write = min(remaining_in_part, chunk_remaining) - + # Add to part buffer - self.part_buffer.write(data[offset:offset + to_write]) + self.part_buffer.write(buffer[offset:offset + to_write]) self.part_buffer_size += to_write offset += to_write - + # Flush if part is full if self.part_buffer_size >= self.part_size: self._flush_part() - + self.total_bytes += size return size From 7f7c526f975187dd373ffeb0b2c16980e6003b8b Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 19:48:55 -0700 Subject: [PATCH 3/7] fix(SC-1,SC-2): remove async I/O timing gap and memcpy from s3torch_writer.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SC-1 — Async upload latency not captured in io_time The s3torchconnector writer streams data to S3 via write(), which queues bytes internally and returns before the network transmission completes. write_chunk() therefore returns immediately after handing off the buffer, so the per-chunk timer in streaming_checkpoint.py captured only the handoff cost (microseconds), not the actual upload latency. The wall-clock io_wall_time fix in streaming_checkpoint.py (SC-1) addresses the measurement; this file is listed as affected because its async behaviour is the root cause. SC-2 — bytes() memcpy inside write_chunk() added CPU time to I/O metric data = bytes(buffer[:size]) performed a full in-process copy of up to 32 MB inside write_chunk(), inside the timed I/O window. For a 32 MB chunk this is ~10–50 ms of pure CPU memcpy attributed as I/O time, inflating throughput figures. Fixed by: - Changing the buffer parameter type annotation from memoryview to bytes - Removing the data = bytes(buffer[:size]) internal copy line - Passing buffer directly to self.writer.write() The conversion now happens in streaming_checkpoint.py before io_start, outside the timed window. --- .../checkpointing/storage_writers/s3torch_writer.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/mlpstorage_py/checkpointing/storage_writers/s3torch_writer.py b/mlpstorage_py/checkpointing/storage_writers/s3torch_writer.py index 9e899807..0fca03cd 100644 --- a/mlpstorage_py/checkpointing/storage_writers/s3torch_writer.py +++ b/mlpstorage_py/checkpointing/storage_writers/s3torch_writer.py @@ -197,18 +197,17 @@ def __init__( print(f"[S3TorchWriter] region={region}, endpoint={endpoint or 'AWS S3'}") print(f"[S3TorchWriter] (multipart auto-managed by s3torchconnector)") - def write_chunk(self, buffer: memoryview, size: int) -> int: + def write_chunk(self, buffer: bytes, size: int) -> int: """Write chunk directly to S3 (streaming). - + Args: - buffer: Memory buffer containing data to write + buffer: Bytes containing data to write size: Number of bytes to write from buffer - + Returns: Number of bytes written """ - data = bytes(buffer[:size]) - self.writer.write(data) # Stream directly to S3 + self.writer.write(buffer) # Stream directly to S3 self.total_bytes += size elapsed = time.monotonic() - self._start_time written_gb = self.total_bytes / 1e9 From f9daed4fbfbedaf0b88cd8da3043db0f9ba4637f Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 19:49:06 -0700 Subject: [PATCH 4/7] fix(MPI-1): raise on non-zero mpirun exit instead of returning partial data When mpirun exits with a non-zero return code, one or more ranks failed and the collected_data dict may reflect only the ranks that completed before the failure. The previous code logged a warning and then returned that partial data as if collection had succeeded. Downstream validation (ClusterInformation.validate_cluster_consistency()) received no signal that data was missing and silently validated an incomplete cluster view. Fixed by replacing the warning-and-continue block with a RuntimeError that includes the exit code, the count of hosts that did report, and mpirun's stderr. The caller in base.py already wraps collect_cluster_info() in a try/except that handles collection failures gracefully (logs a warning and continues without cluster data), so no additional call-site changes are needed. --- mlpstorage_py/cluster_collector.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mlpstorage_py/cluster_collector.py b/mlpstorage_py/cluster_collector.py index a4a1ea4e..ba11a1bf 100644 --- a/mlpstorage_py/cluster_collector.py +++ b/mlpstorage_py/cluster_collector.py @@ -1537,9 +1537,10 @@ def collect(self) -> Dict[str, Any]: ) if result.returncode != 0: - self.logger.warning( - f"MPI collection returned non-zero exit code: " - f"{result.returncode}\nstderr: {result.stderr}" + raise RuntimeError( + f"MPI cluster collection failed (exit code {result.returncode}). " + f"Partial data from {len(collected_data)} hosts was collected but cannot be trusted. " + f"stderr: {result.stderr}" ) self.logger.info( From 904ebd74d057908bdd20a36408dcc6a773fff15e Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 19:49:17 -0700 Subject: [PATCH 5/7] fix(RULES-3): replace undefined model_key with model_name in subset-mode check In the CLOSED subset-mode MPI process count check, model_key was referenced in the if branch error log but was only assigned inside the else branch (after a successful regex match on model_size). At runtime, entering the if branch with checkpoint_mode == "subset" always raised NameError before the error message could be emitted. The exception was silently swallowed by the outer try/except, causing the check to pass unconditionally for CLOSED subset-mode submissions with the wrong process count. Fixed by replacing model_key with model_name in the error log at the affected line. model_name is assigned unconditionally earlier in the function from metadata.get("args", {}).get("model", "").lower() and is the correct human-readable identifier to include in the error message. The model_key assignment in the else branch is unchanged and still used correctly in that branch. --- mlpstorage_py/submission_checker/checks/checkpointing_checks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py index ac9d515e..62f410ac 100644 --- a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py +++ b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py @@ -149,7 +149,7 @@ def closed_mpi_processes(self): if num_processes != 8: self.log.error( "CLOSED submission with model %s in subset mode requires %d processes, got %d", - model_key, + model_name, 8, num_processes ) From 035d199b84b0c278ccda426af79c641ca04a4b00 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 19:49:37 -0700 Subject: [PATCH 6/7] fix(RULES-1,RULES-2,RULES-4): correct three submission checker bugs in training_checks.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RULES-1 — 500-steps dataset minimum formula is circular; check never fires The formula derived num_steps_per_epoch from the actual file count using max(MIN_STEPS_PER_EPOCH, actual_steps), then multiplied back to get min_samples_steps. Because actual_steps >= itself, min_samples_steps was always >= the actual sample count, so the 500-steps constraint never produced a "too few files" error regardless of actual dataset size. Fixed by replacing the two-step calculation with the direct formula: min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators This matches the canonical computation in rules/utils.py. RULES-2 — Multi-host memory total wrong for mixed-memory clusters host_memory_GB is a list with one entry per host, but the checker took only the first element ([0]) and multiplied by num_hosts. For clusters where hosts have different memory sizes, this produced an incorrect total that could be larger or smaller than the true sum. Fixed by replacing the [0]-index-and-multiply pattern with: total_host_memory = sum(summary.get("host_memory_GB", [0])) This matches the canonical pattern in rules/utils.py which sums the per-host memory list. RULES-4 — AU check reads non-existent DLIO fields; every submission fails The accelerator utilization check looked up train_au_mean_percentage and train_au_meet_expectation from the DLIO summary JSON. Neither field exists in actual DLIO output (the real field is train_au_percentage, a list of per-epoch AU values). Both .get() calls always returned their defaults (0 and "" respectively), causing au_expectation != "success" to be permanently True. Every training submission was flagged as an AU failure regardless of actual utilization, making real failures and valid runs indistinguishable. Fixed by reading train_au_percentage, computing its mean, and comparing against the 90% MLPerf Storage minimum (Rules.md §3.3.2). --- .../submission_checker/checks/training_checks.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/mlpstorage_py/submission_checker/checks/training_checks.py b/mlpstorage_py/submission_checker/checks/training_checks.py index 87d2ce2d..0b5cfad0 100644 --- a/mlpstorage_py/submission_checker/checks/training_checks.py +++ b/mlpstorage_py/submission_checker/checks/training_checks.py @@ -101,21 +101,17 @@ def recalculate_dataset_size(self): # From summary num_accelerators = summary.get("num_accelerators", 1) - num_hosts = summary.get("num_hosts", 1) - host_memory_gb = summary.get("host_memory_GB", [0])[0] - + total_host_memory = sum(summary.get("host_memory_GB", [0])) + if record_length == 0: self.log.error("Record length is 0, cannot calculate dataset size") valid = False continue - + # Calculate min samples from steps per epoch - num_steps_per_epoch = max(MIN_STEPS_PER_EPOCH, - num_files_train * num_samples_per_file // (batch_size * num_accelerators)) - min_samples_steps = num_steps_per_epoch * batch_size * num_accelerators - + min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators + # Calculate min samples from host memory - total_host_memory = num_hosts * host_memory_gb min_samples_memory = (total_host_memory * HOST_MEMORY_MULTIPLIER * 1024 * 1024 * 1024 / record_length) From 56391c89168f8fb93dbc6cc590df5111e7597451 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 19:49:49 -0700 Subject: [PATCH 7/7] fix(RULES-5): guard against KeyError on parquet datasets in rules_legacy.py Direct dict access dataset_params['record_length_bytes'] raised KeyError for parquet-format datasets where this key is absent. The active code path in rules/utils.py handles parquet correctly, but the legacy verifier still routes training runs through TrainingRunRulesChecker, which called this function. Parquet workloads using the legacy path crashed with KeyError rather than validating. Fixed by replacing the direct access with .get(): record_length_bytes = dataset_params.get('record_length_bytes') When record_length_bytes is absent or zero, a warning is logged and file_size_bytes is set to 0. The subsequent min_num_files_by_bytes calculation is guarded against division by zero: min_num_files_by_bytes = (dataset_size_bytes // file_size_bytes) if file_size_bytes else 0 With min_num_files_by_bytes = 0, required_file_count falls back to the samples-based minimum, which matches the parquet behaviour in the active rules path. --- mlpstorage_py/rules_legacy.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/mlpstorage_py/rules_legacy.py b/mlpstorage_py/rules_legacy.py index c16c79d0..fa3be109 100755 --- a/mlpstorage_py/rules_legacy.py +++ b/mlpstorage_py/rules_legacy.py @@ -1729,9 +1729,13 @@ def calculate_training_data_size(args, cluster_information, dataset_params, read # Required Minimum Dataset size is 5x the total client memory dataset_size_bytes = 5 * total_mem_bytes - file_size_bytes = dataset_params['num_samples_per_file'] * dataset_params['record_length_bytes'] - - min_num_files_by_bytes = dataset_size_bytes // file_size_bytes + record_length_bytes = dataset_params.get('record_length_bytes') + if not record_length_bytes: + logger.warning('record_length_bytes missing from dataset params (parquet?); skipping byte-based dataset size check') + file_size_bytes = 0 + else: + file_size_bytes = dataset_params['num_samples_per_file'] * record_length_bytes + min_num_files_by_bytes = (dataset_size_bytes // file_size_bytes) if file_size_bytes else 0 num_samples_by_bytes = min_num_files_by_bytes * dataset_params['num_samples_per_file'] min_samples = 500 * num_processes * reader_params['batch_size'] min_num_files_by_samples = min_samples // dataset_params['num_samples_per_file']