From e3286fbbdbd465652a743667a51bf060fef77ad4 Mon Sep 17 00:00:00 2001 From: Curtis Anderson Date: Tue, 26 May 2026 16:25:28 -0700 Subject: [PATCH] fix: resolve 11 high/medium severity bugs found in broad codebase scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes span core framework stability, submission validation correctness, and benchmark execution reliability. All changes are targeted one-line or small-block fixes with no refactoring. ## CORE-1: Ctrl+C raises AttributeError instead of exiting cleanly config.py — Added missing EXIT_CODE.INTERRUPTED (8) and EXIT_CODE.ERROR (9) enum members. Without these, the SIGINT/SIGTERM signal handler called sys.exit(EXIT_CODE.INTERRUPTED) and crashed with AttributeError before the process could exit. ## CORE-2: Run entries validated against datagen metadata (stale path) submission_checker/loader.py — Added metadata_path re-computation inside the inner run-timestamp loop. Previously metadata_path was set only in the outer datagen loop, so every run entry was loaded with the datagen's metadata file. Checks like closed_submission_parameters and verify_datasize_usage were auditing datagen invocation args instead of run invocation args. ## CORE-3: --params values containing '=' crash before benchmark starts benchmarks/dlio.py — Changed item.split("=") to item.split("=", 1) in process_dlio_params(). Without maxsplit=1, any param value containing '=' (base64 credentials, S3 URIs, endpoint strings) produced more than 2 parts and raised ValueError: too many values to unpack before the benchmark started. ## CORE-4: CheckpointingBenchmark._run() swallows exceptions silently benchmarks/dlio.py — Added self.logger.error() call in the except block of CheckpointingBenchmark._run(). Previously the caught exception 'e' was discarded with no log output, making all checkpointing failures produce a silent EXIT_CODE.FAILURE. Now matches the logging pattern in TrainingBenchmark._run(). ## CORE-5: UnboundLocalError masks original exception from _run() benchmarks/base.py — Initialized result = EXIT_CODE.FAILURE before the try block in Benchmark.run(). If _run() raised an exception, the finally block completed cleanup correctly but then 'return result' hit UnboundLocalError (result was never assigned), replacing the original diagnostic exception with a secondary one. Also added EXIT_CODE to the config import in base.py. ## CORE-6: JSONParser.__contains__ raises AttributeError on any 'in' test submission_checker/parsers/json_parser.py — Changed self.messages to self.d in __contains__. The attribute self.messages does not exist; the parsed JSON dict is stored in self.d. Any 'key in parser' test raised AttributeError. ## CORE-12: Unused pyarrow import makes pyarrow a hard benchmark dependency benchmarks/base.py — Removed unused 'from pyarrow.ipc import open_stream'. The symbol open_stream was never referenced in the file. The top-level import forced pyarrow to be present at import time for all benchmarks, failing with ImportError if absent. pyarrow remains in pyproject.toml as it is needed by DLIO and parquet handling elsewhere. ## CORE-13: DLIO exit code discarded; failed runs report EXIT_CODE.SUCCESS benchmarks/dlio.py — execute_command() now captures the return code from _execute_command() and raises RuntimeError on non-zero. Previously the return value was discarded entirely, so a DLIO crash or assertion failure left TrainingBenchmark._run() returning SUCCESS and proceeding to validate nonexistent or incomplete results. ## RULES-1: 500-steps dataset minimum formula is circular; check never fires submission_checker/checks/training_checks.py — Replaced the circular num_steps_per_epoch intermediate with the direct formula: min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators The old code derived num_steps_per_epoch from the actual file count using max(..., actual_steps), then multiplied back. Because actual_steps >= itself, min_samples_steps was always >= actual samples, so the constraint could never produce a "too few files" error. The direct formula matches rules/utils.py. ## RULES-3: NameError in subset-mode process count check; check silently passes submission_checker/checks/checkpointing_checks.py — Replaced undefined model_key with model_name in the subset-mode error log. model_key was only assigned inside the else branch (after a regex match), but was referenced in the if branch. The NameError was silently swallowed, causing CLOSED subset-mode submissions with the wrong process count to pass validation unchallenged. ## RULES-4: AU check reads nonexistent DLIO fields; every submission fails submission_checker/checks/training_checks.py — Replaced lookups for train_au_mean_percentage and train_au_meet_expectation (neither exists in DLIO output) with the actual field train_au_percentage (a list of per-epoch AU values). The check now computes the mean of that list and compares it against the 90% minimum required by the MLPerf Storage rules (Rules.md §3.3.2). Previously both .get() calls always returned their defaults (0 and ""), making au_expectation != "success" always True and flagging every submission as an AU failure regardless of actual utilization. --- mlpstorage_py/benchmarks/base.py | 4 ++-- mlpstorage_py/benchmarks/dlio.py | 7 ++++-- mlpstorage_py/config.py | 5 ++-- .../checks/checkpointing_checks.py | 2 +- .../checks/training_checks.py | 24 ++++++++++--------- mlpstorage_py/submission_checker/loader.py | 1 + .../submission_checker/parsers/json_parser.py | 2 +- 7 files changed, 26 insertions(+), 19 deletions(-) diff --git a/mlpstorage_py/benchmarks/base.py b/mlpstorage_py/benchmarks/base.py index 22feb52d..5b855e5d 100755 --- a/mlpstorage_py/benchmarks/base.py +++ b/mlpstorage_py/benchmarks/base.py @@ -45,9 +45,8 @@ def _run(self): from functools import wraps -from pyarrow.ipc import open_stream -from mlpstorage_py.config import PARAM_VALIDATION, DATETIME_STR, MLPS_DEBUG, EXEC_TYPE +from mlpstorage_py.config import PARAM_VALIDATION, DATETIME_STR, MLPS_DEBUG, EXEC_TYPE, EXIT_CODE from mlpstorage_py.debug import debug_tryer_wrapper from mlpstorage_py.interfaces import BenchmarkInterface, BenchmarkConfig, BenchmarkCommand from mlpstorage_py.mlps_logging import setup_logging, apply_logging_options @@ -928,6 +927,7 @@ def run(self) -> int: # Note: Stage progress remains visible showing elapsed time # during this phase. DLIO output flows through directly. start_time = time.time() + result = EXIT_CODE.FAILURE try: result = self._run() finally: diff --git a/mlpstorage_py/benchmarks/dlio.py b/mlpstorage_py/benchmarks/dlio.py index 137eb82e..f92ef558 100755 --- a/mlpstorage_py/benchmarks/dlio.py +++ b/mlpstorage_py/benchmarks/dlio.py @@ -214,7 +214,7 @@ def _apply_object_storage_params(self): ) def process_dlio_params(self, config_file): - params_dict = dict() if not self.args.params else {k: v for k, v in (item.split("=") for item in self.args.params)} + params_dict = dict() if not self.args.params else {k: v for k, v in (item.split("=", 1) for item in self.args.params)} yaml_params = read_config_from_file(os.path.join(self.DLIO_CONFIG_PATH, "workload", config_file)) combined_params = update_nested_dict(yaml_params, create_nested_dict(params_dict)) @@ -239,7 +239,9 @@ def execute_command(self): if hasattr(self.args, "command"): output_file_prefix += f"_{self.args.command}" - self._execute_command(cmd, output_file_prefix=output_file_prefix) + _, _, return_code = self._execute_command(cmd, output_file_prefix=output_file_prefix) + if return_code != 0: + raise RuntimeError(f'DLIO exited with return code {return_code}') @abc.abstractmethod def add_workflow_to_cmd(self, cmd) -> str: @@ -457,6 +459,7 @@ def _run(self): self.logger.error(f'Invalid command: {self.args.command}') return EXIT_CODE.INVALID_ARGUMENTS except Exception as e: + self.logger.error(f'Checkpointing benchmark failed: {e}') return EXIT_CODE.FAILURE return EXIT_CODE.SUCCESS diff --git a/mlpstorage_py/config.py b/mlpstorage_py/config.py index 2d985d78..602af9a6 100755 --- a/mlpstorage_py/config.py +++ b/mlpstorage_py/config.py @@ -147,8 +147,9 @@ class EXIT_CODE(enum.IntEnum): CONFIGURATION_ERROR = 5 FAILURE = 6 TIMEOUT = 7 - # Add more as needed - + INTERRUPTED = 8 + ERROR = 9 + def __str__(self): return f"{self.name} ({self.value})" class EXEC_TYPE(enum.Enum): diff --git a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py index ac9d515e..62f410ac 100644 --- a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py +++ b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py @@ -149,7 +149,7 @@ def closed_mpi_processes(self): if num_processes != 8: self.log.error( "CLOSED submission with model %s in subset mode requires %d processes, got %d", - model_key, + model_name, 8, num_processes ) diff --git a/mlpstorage_py/submission_checker/checks/training_checks.py b/mlpstorage_py/submission_checker/checks/training_checks.py index 87d2ce2d..6bfb7986 100644 --- a/mlpstorage_py/submission_checker/checks/training_checks.py +++ b/mlpstorage_py/submission_checker/checks/training_checks.py @@ -110,9 +110,7 @@ def recalculate_dataset_size(self): continue # Calculate min samples from steps per epoch - num_steps_per_epoch = max(MIN_STEPS_PER_EPOCH, - num_files_train * num_samples_per_file // (batch_size * num_accelerators)) - min_samples_steps = num_steps_per_epoch * batch_size * num_accelerators + min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators # Calculate min samples from host memory total_host_memory = num_hosts * host_memory_gb @@ -218,19 +216,23 @@ def accelerator_utilization_check(self): valid = True if self.mode != "training": return valid + MIN_AU_PERCENTAGE = 90.0 for summary, metadata, _ in self.submissions_logs.run_files: metrics = summary.get("metric", {}) - au_mean = metrics.get("train_au_mean_percentage", 0) - au_expectation = metrics.get("train_au_meet_expectation", "") - - if au_expectation != "success": + au_values = metrics.get("train_au_percentage", []) + if not au_values: + self.log.error("AU check failed: train_au_percentage missing from summary") + valid = False + continue + au_mean = sum(au_values) / len(au_values) + if au_mean < MIN_AU_PERCENTAGE: self.log.error( - "AU check failed: expected 'success', got '%s' (AU: %.2f%%)", - au_expectation, - au_mean + "AU check failed: mean AU %.2f%% is below minimum %.2f%%", + au_mean, + MIN_AU_PERCENTAGE ) valid = False - + return valid def single_host_simulated_accelerators(self): diff --git a/mlpstorage_py/submission_checker/loader.py b/mlpstorage_py/submission_checker/loader.py index 316081ee..20dcc018 100644 --- a/mlpstorage_py/submission_checker/loader.py +++ b/mlpstorage_py/submission_checker/loader.py @@ -117,6 +117,7 @@ def load(self) -> Generator[SubmissionLogs, None, None]: for timestamp in list_dir(run_path): timestamp_path = os.path.join(run_path, timestamp) summary_path = os.path.join(timestamp_path, "summary.json") + metadata_path = self.find_metadata_path(timestamp_path) metadata_file = self.load_single_log(metadata_path, "Metadata") run_file = self.load_single_log(summary_path, "Summary") run_files.append((run_file, metadata_file, timestamp)) diff --git a/mlpstorage_py/submission_checker/parsers/json_parser.py b/mlpstorage_py/submission_checker/parsers/json_parser.py index 466c208c..0e89c0fd 100644 --- a/mlpstorage_py/submission_checker/parsers/json_parser.py +++ b/mlpstorage_py/submission_checker/parsers/json_parser.py @@ -49,7 +49,7 @@ def get_keys(self): return self.keys def __contains__(self, key): - return key in self.messages + return key in self.d def __repr__(self): return f""