Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions mlpstorage_py/benchmarks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ def _run(self):

from functools import wraps

from pyarrow.ipc import open_stream

from mlpstorage_py.config import PARAM_VALIDATION, DATETIME_STR, MLPS_DEBUG, EXEC_TYPE
from mlpstorage_py.config import PARAM_VALIDATION, DATETIME_STR, MLPS_DEBUG, EXEC_TYPE, EXIT_CODE
from mlpstorage_py.debug import debug_tryer_wrapper
from mlpstorage_py.interfaces import BenchmarkInterface, BenchmarkConfig, BenchmarkCommand
from mlpstorage_py.mlps_logging import setup_logging, apply_logging_options
Expand Down Expand Up @@ -928,6 +927,7 @@ def run(self) -> int:
# Note: Stage progress remains visible showing elapsed time
# during this phase. DLIO output flows through directly.
start_time = time.time()
result = EXIT_CODE.FAILURE
try:
result = self._run()
finally:
Expand Down
7 changes: 5 additions & 2 deletions mlpstorage_py/benchmarks/dlio.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _apply_object_storage_params(self):
)

def process_dlio_params(self, config_file):
params_dict = dict() if not self.args.params else {k: v for k, v in (item.split("=") for item in self.args.params)}
params_dict = dict() if not self.args.params else {k: v for k, v in (item.split("=", 1) for item in self.args.params)}
yaml_params = read_config_from_file(os.path.join(self.DLIO_CONFIG_PATH, "workload", config_file))
combined_params = update_nested_dict(yaml_params, create_nested_dict(params_dict))

Expand All @@ -239,7 +239,9 @@ def execute_command(self):
if hasattr(self.args, "command"):
output_file_prefix += f"_{self.args.command}"

self._execute_command(cmd, output_file_prefix=output_file_prefix)
_, _, return_code = self._execute_command(cmd, output_file_prefix=output_file_prefix)
if return_code != 0:
raise RuntimeError(f'DLIO exited with return code {return_code}')

@abc.abstractmethod
def add_workflow_to_cmd(self, cmd) -> str:
Expand Down Expand Up @@ -457,6 +459,7 @@ def _run(self):
self.logger.error(f'Invalid command: {self.args.command}')
return EXIT_CODE.INVALID_ARGUMENTS
except Exception as e:
self.logger.error(f'Checkpointing benchmark failed: {e}')
return EXIT_CODE.FAILURE
return EXIT_CODE.SUCCESS

Expand Down
5 changes: 3 additions & 2 deletions mlpstorage_py/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ class EXIT_CODE(enum.IntEnum):
CONFIGURATION_ERROR = 5
FAILURE = 6
TIMEOUT = 7
# Add more as needed

INTERRUPTED = 8
ERROR = 9

def __str__(self):
return f"{self.name} ({self.value})"
class EXEC_TYPE(enum.Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def closed_mpi_processes(self):
if num_processes != 8:
self.log.error(
"CLOSED submission with model %s in subset mode requires %d processes, got %d",
model_key,
model_name,
8,
num_processes
)
Expand Down
24 changes: 13 additions & 11 deletions mlpstorage_py/submission_checker/checks/training_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ def recalculate_dataset_size(self):
continue

# Calculate min samples from steps per epoch
num_steps_per_epoch = max(MIN_STEPS_PER_EPOCH,
num_files_train * num_samples_per_file // (batch_size * num_accelerators))
min_samples_steps = num_steps_per_epoch * batch_size * num_accelerators
min_samples_steps = MIN_STEPS_PER_EPOCH * batch_size * num_accelerators

# Calculate min samples from host memory
total_host_memory = num_hosts * host_memory_gb
Expand Down Expand Up @@ -218,19 +216,23 @@ def accelerator_utilization_check(self):
valid = True
if self.mode != "training":
return valid
MIN_AU_PERCENTAGE = 90.0
for summary, metadata, _ in self.submissions_logs.run_files:
metrics = summary.get("metric", {})
au_mean = metrics.get("train_au_mean_percentage", 0)
au_expectation = metrics.get("train_au_meet_expectation", "")

if au_expectation != "success":
au_values = metrics.get("train_au_percentage", [])
if not au_values:
self.log.error("AU check failed: train_au_percentage missing from summary")
valid = False
continue
au_mean = sum(au_values) / len(au_values)
if au_mean < MIN_AU_PERCENTAGE:
self.log.error(
"AU check failed: expected 'success', got '%s' (AU: %.2f%%)",
au_expectation,
au_mean
"AU check failed: mean AU %.2f%% is below minimum %.2f%%",
au_mean,
MIN_AU_PERCENTAGE
)
valid = False

return valid

def single_host_simulated_accelerators(self):
Expand Down
1 change: 1 addition & 0 deletions mlpstorage_py/submission_checker/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def load(self) -> Generator[SubmissionLogs, None, None]:
for timestamp in list_dir(run_path):
timestamp_path = os.path.join(run_path, timestamp)
summary_path = os.path.join(timestamp_path, "summary.json")
metadata_path = self.find_metadata_path(timestamp_path)
metadata_file = self.load_single_log(metadata_path, "Metadata")
run_file = self.load_single_log(summary_path, "Summary")
run_files.append((run_file, metadata_file, timestamp))
Expand Down
2 changes: 1 addition & 1 deletion mlpstorage_py/submission_checker/parsers/json_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_keys(self):
return self.keys

def __contains__(self, key):
return key in self.messages
return key in self.d

def __repr__(self):
return f"<SummaryParser path={self.path!r} keys={len(self.keys)}>"
Loading