diff --git a/mlpstorage_py/benchmarks/base.py b/mlpstorage_py/benchmarks/base.py index 22feb52d..5b855e5d 100755 --- a/mlpstorage_py/benchmarks/base.py +++ b/mlpstorage_py/benchmarks/base.py @@ -45,9 +45,8 @@ def _run(self): from functools import wraps -from pyarrow.ipc import open_stream -from mlpstorage_py.config import PARAM_VALIDATION, DATETIME_STR, MLPS_DEBUG, EXEC_TYPE +from mlpstorage_py.config import PARAM_VALIDATION, DATETIME_STR, MLPS_DEBUG, EXEC_TYPE, EXIT_CODE from mlpstorage_py.debug import debug_tryer_wrapper from mlpstorage_py.interfaces import BenchmarkInterface, BenchmarkConfig, BenchmarkCommand from mlpstorage_py.mlps_logging import setup_logging, apply_logging_options @@ -928,6 +927,7 @@ def run(self) -> int: # Note: Stage progress remains visible showing elapsed time # during this phase. DLIO output flows through directly. start_time = time.time() + result = EXIT_CODE.FAILURE try: result = self._run() finally: diff --git a/mlpstorage_py/benchmarks/dlio.py b/mlpstorage_py/benchmarks/dlio.py index 137eb82e..f92ef558 100755 --- a/mlpstorage_py/benchmarks/dlio.py +++ b/mlpstorage_py/benchmarks/dlio.py @@ -214,7 +214,7 @@ def _apply_object_storage_params(self): ) def process_dlio_params(self, config_file): - params_dict = dict() if not self.args.params else {k: v for k, v in (item.split("=") for item in self.args.params)} + params_dict = dict() if not self.args.params else {k: v for k, v in (item.split("=", 1) for item in self.args.params)} yaml_params = read_config_from_file(os.path.join(self.DLIO_CONFIG_PATH, "workload", config_file)) combined_params = update_nested_dict(yaml_params, create_nested_dict(params_dict)) @@ -239,7 +239,9 @@ def execute_command(self): if hasattr(self.args, "command"): output_file_prefix += f"_{self.args.command}" - self._execute_command(cmd, output_file_prefix=output_file_prefix) + _, _, return_code = self._execute_command(cmd, output_file_prefix=output_file_prefix) + if return_code != 0: + raise RuntimeError(f'DLIO exited with return code {return_code}') @abc.abstractmethod def add_workflow_to_cmd(self, cmd) -> str: @@ -457,6 +459,7 @@ def _run(self): self.logger.error(f'Invalid command: {self.args.command}') return EXIT_CODE.INVALID_ARGUMENTS except Exception as e: + self.logger.error(f'Checkpointing benchmark failed: {e}') return EXIT_CODE.FAILURE return EXIT_CODE.SUCCESS diff --git a/mlpstorage_py/config.py b/mlpstorage_py/config.py index 2d985d78..602af9a6 100755 --- a/mlpstorage_py/config.py +++ b/mlpstorage_py/config.py @@ -147,8 +147,9 @@ class EXIT_CODE(enum.IntEnum): CONFIGURATION_ERROR = 5 FAILURE = 6 TIMEOUT = 7 - # Add more as needed - + INTERRUPTED = 8 + ERROR = 9 + def __str__(self): return f"{self.name} ({self.value})" class EXEC_TYPE(enum.Enum): diff --git a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py index ac9d515e..62f410ac 100644 --- a/mlpstorage_py/submission_checker/checks/checkpointing_checks.py +++ b/mlpstorage_py/submission_checker/checks/checkpointing_checks.py @@ -149,7 +149,7 @@ def closed_mpi_processes(self): if num_processes != 8: self.log.error( "CLOSED submission with model %s in subset mode requires %d processes, got %d", - model_key, + model_name, 8, num_processes ) diff --git a/mlpstorage_py/submission_checker/checks/training_checks.py b/mlpstorage_py/submission_checker/checks/training_checks.py index 87d2ce2d..6bfb7986 100644 --- a/mlpstorage_py/submission_checker/checks/training_checks.py +++ b/mlpstorage_py/submission_checker/checks/training_checks.py @@ -110,9 +110,7 @@ def recalculate_dataset_size(self): continue # Calculate min samples from steps per epoch - num_steps_per_epoch = max(MIN_STEPS_PER_EPOCH, - num_files_train * num_samples_per_file // (batch_size * num_accelerators)) - min_samples_steps = num_steps_per_epoch * batch_size * num_accelerators + min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators # Calculate min samples from host memory total_host_memory = num_hosts * host_memory_gb @@ -218,19 +216,23 @@ def accelerator_utilization_check(self): valid = True if self.mode != "training": return valid + MIN_AU_PERCENTAGE = 90.0 for summary, metadata, _ in self.submissions_logs.run_files: metrics = summary.get("metric", {}) - au_mean = metrics.get("train_au_mean_percentage", 0) - au_expectation = metrics.get("train_au_meet_expectation", "") - - if au_expectation != "success": + au_values = metrics.get("train_au_percentage", []) + if not au_values: + self.log.error("AU check failed: train_au_percentage missing from summary") + valid = False + continue + au_mean = sum(au_values) / len(au_values) + if au_mean < MIN_AU_PERCENTAGE: self.log.error( - "AU check failed: expected 'success', got '%s' (AU: %.2f%%)", - au_expectation, - au_mean + "AU check failed: mean AU %.2f%% is below minimum %.2f%%", + au_mean, + MIN_AU_PERCENTAGE ) valid = False - + return valid def single_host_simulated_accelerators(self): diff --git a/mlpstorage_py/submission_checker/loader.py b/mlpstorage_py/submission_checker/loader.py index 316081ee..20dcc018 100644 --- a/mlpstorage_py/submission_checker/loader.py +++ b/mlpstorage_py/submission_checker/loader.py @@ -117,6 +117,7 @@ def load(self) -> Generator[SubmissionLogs, None, None]: for timestamp in list_dir(run_path): timestamp_path = os.path.join(run_path, timestamp) summary_path = os.path.join(timestamp_path, "summary.json") + metadata_path = self.find_metadata_path(timestamp_path) metadata_file = self.load_single_log(metadata_path, "Metadata") run_file = self.load_single_log(summary_path, "Summary") run_files.append((run_file, metadata_file, timestamp)) diff --git a/mlpstorage_py/submission_checker/parsers/json_parser.py b/mlpstorage_py/submission_checker/parsers/json_parser.py index 466c208c..0e89c0fd 100644 --- a/mlpstorage_py/submission_checker/parsers/json_parser.py +++ b/mlpstorage_py/submission_checker/parsers/json_parser.py @@ -49,7 +49,7 @@ def get_keys(self): return self.keys def __contains__(self, key): - return key in self.messages + return key in self.d def __repr__(self): return f""