Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Clone the `gpu-recipes` repository and set a reference to the recipe folder.
git clone https://github.com/ai-hypercomputer/gpu-recipes.git
cd gpu-recipes
export REPO_ROOT=`git rev-parse --show-toplevel`
export RECIPE_ROOT=$REPO_ROOT/training/a4/deepseek_v3/megatron-bridge-pretraining-gke/32node-BF16-SEQ4096-GBS2048-NEMO25.11/recipe
export RECIPE_ROOT=$REPO_ROOT/training/a4/deepseek_v3/megatron-bridge-pretraining-gke/nemo2511/32node-BF16-SEQ4096-GBS2048/recipe
cd $RECIPE_ROOT
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Clone the `gpu-recipes` repository and set a reference to the recipe folder.
git clone https://github.com/ai-hypercomputer/gpu-recipes.git
cd gpu-recipes
export REPO_ROOT=`git rev-parse --show-toplevel`
export RECIPE_ROOT=$REPO_ROOT/training/a4/deepseek_v3/megatron-bridge-pretraining-gke/32node-BF16-SEQ4096-GBS2048-NEMO26.02/recipe
export RECIPE_ROOT=$REPO_ROOT/training/a4/deepseek_v3/megatron-bridge-pretraining-gke/nemo2602/32node-BF16-SEQ4096-GBS2048/recipe
cd $RECIPE_ROOT
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
#!/usr/bin/env python3

# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import glob
import logging
import os
Expand Down Expand Up @@ -60,109 +44,6 @@
logger = logging.getLogger(__name__)


def check_training_finished(log_file_path: str) -> bool:
"""Check if training is finished."""
with open(log_file_path, "r") as f:
log_lines = f.readlines()
log = "\n".join(log_lines)
return "StopIteration" in log or "after training is done" in log or "exiting program at iteration" in log


def check_slurm_timeout(log_file_path: str) -> bool:
"""Check if Slurm job timed out."""
with open(log_file_path, "r") as f:
log_lines = f.readlines()
log = "\n".join(log_lines)
return "DUE TO TIME LIMIT" in log


def is_flaky_failure(log_file_path: str) -> bool:
"""Check if Slurm job failed due to flaky failure."""
with open(log_file_path, "r") as f:
log_lines = f.readlines()
log = "\n".join(log_lines)

return (
"The server socket has failed to listen on any local network address." in log
or "Some NCCL operations have failed or timed out." in log
or "uncorrectable ECC error encountered" in log
or "illegal memory access" in log
or "illegal instruction" in log
or "torch.distributed.DistNetworkError" in log
or "Segmentation fault" in log
or "found NaN in" in log
or "For debugging consider passing CUDA_LAUNCH_BLOCKING=1" in log
or "double free or corruption" in log
or "Call to CUDA function failed." in log
or "Connection reset by peer" in log
or "invalid pointer" in log
or "malloc(): unaligned tcache chunk detected" in log
or "zmq.error.ZMQError: Address already in use" in log
or "We couldn't connect to 'https://huggingface.co'" in log
or "Unpack failed: incomplete input" in log
or "unspecified launch failure" in log
or "free(): corrupted unsorted chunks" in log
or "Segfault encountered" in log
or "Fatal glibc error" in log
or "EOFError: No data left in file" in log
)


def build_performance_config(args) -> Optional[Dict[str, Any]]:
"""Build performance configuration from command-line arguments.

Args:
args: Parsed command-line arguments

Returns:
Dictionary with performance configuration or None if performance is disabled
"""
config = {}

performance_params = {
"timing_threshold": args.timing_threshold,
"skip_first_percent_time": args.skip_first_percent_time,
}

for key, value in performance_params.items():
if value is not None:
config[key] = value

return config if config else None


def ensure_logs_where_written(log_file_paths: List[str]):
"""Ensure logs were written to disk."""
if len(log_file_paths) != 1:
raise FileNotFoundError(
f"Unexpected number of log files found: {log_file_paths}. Expected 1, got {len(log_file_paths)}"
)


def get_job_dir_and_status_from_run(exp_name: str):
"""Get job directory and status from run."""
result_dict = run.Experiment.from_title(exp_name).status(return_dict=True)
_, job_dict = list(result_dict.items())[0]
job_dir = job_dict["local_dir"]
job_status = str(job_dict["status"])
return job_dir, job_status


def maybe_increase_n_attempts_on_flaky_failure(
n_attempts: int,
max_retries: int,
is_finished_experiment: bool,
is_long_convergence_run: bool,
log_file_paths: List[str],
):
"""Maybe increase number of attempts."""
if not is_finished_experiment and not is_long_convergence_run:
if is_flaky_failure(log_file_paths[-1]):
n_attempts += 1
else:
n_attempts = max_retries # On non-flaky failures, we don't need to restart the experiment.

return n_attempts


def main(
Expand Down Expand Up @@ -336,151 +217,14 @@ def main(

logger.info("Will launch the following command with Nemo-Run: %s", " ".join(nemorun_script.to_command()))

is_finished_experiment = False # An experiment might consist of multiple training runs, due to restarts.
is_testing_passed = False # Whether the testing passed convergence and performance validation.
error_msg = None
n_attempts = 0
exp_name = (
exp_name[:37] if dgxc_cluster is not None else exp_name
) # Some k8s clusters have a limit on the length of the experiment name.
wandb_run_id = None
while n_attempts <= max_retries:
while is_finished_experiment is False:
if HAVE_WANDB:
wandb_run_id = (
(wandb_run_id or wandb.util.generate_id()) if is_long_convergence_run else wandb.util.generate_id()
)
executor.env_vars.update(
{
"WANDB_RUN_ID": wandb_run_id,
"WANDB_RESUME": "allow",
}
)
if wandb_key is not None:
executor.env_vars["WANDB_API_KEY"] = wandb_key

run.run(
nemorun_script,
executor=executor,
plugins=plugins,
dryrun=dryrun,
detach=detach,
name=exp_name,
)
if dryrun:
logger.info("dryrun requested: exiting")
return

def _copy_logs_to_gcp(job_dir_path):
import shutil
import glob

artifact_dir = os.environ.get("ARTIFACT_DIR", "/tmp/artifacts")
dest_logs_dir = os.path.join(artifact_dir, "logs")
os.makedirs(dest_logs_dir, exist_ok=True)

try:
log_files = glob.glob(f"{job_dir_path}/log-*.out") + glob.glob(f"{job_dir_path}/log-*.err")
for log_f in log_files:
shutil.copy(log_f, dest_logs_dir)
msg = f"Copied {log_f} to {dest_logs_dir}"
print(msg)
logger.info(msg)
except Exception as e:
print(f"Failed to copy logs to GCP: {e}")
logger.error(f"Failed to copy logs to GCP: {e}")


job_dir, job_status = get_job_dir_and_status_from_run(exp_name)

if job_status not in ["SUCCEEDED", "SUBMITTED", "PENDING", "RUNNING"]:
_copy_logs_to_gcp(job_dir)
raise Exception(f"Experiment failed for {exp_name} with status: {job_status}.")

if detach:
is_finished_experiment = True
is_testing_passed = True
break

log_file_paths = list(Path(f"{job_dir}").glob("log-*_0.out"))
ensure_logs_where_written(log_file_paths)

is_finished_experiment = (
check_training_finished(log_file_paths[-1]) if is_long_convergence_run else (job_status == "SUCCEEDED")
)

n_attempts = maybe_increase_n_attempts_on_flaky_failure(
n_attempts=n_attempts,
max_retries=max_retries,
is_finished_experiment=is_finished_experiment,
is_long_convergence_run=is_long_convergence_run,
log_file_paths=log_file_paths,
)

if not is_finished_experiment and n_attempts <= max_retries:
logger.error(f"Starting attempt {n_attempts + 1} of {max_retries + 1} for {exp_name}")

if not is_finished_experiment:
break

if is_finished_experiment is True and detach is False:
log_paths = sorted(
list(glob.glob(f"{get_nemorun_home()}/experiments/{exp_name}/{exp_name}_*/{exp_name}/log-*_0.out"))
)

if not is_long_convergence_run:
log_paths = [log_paths[-1]]

logger.info(f"Starting convergence check for {model_family_name}_{model_recipe_name}")
wandb_run = None
if HAVE_WANDB and wandb_key:
wandb_run = wandb.init(
project=wandb_project_name, entity=wandb_entity_name, id=wandb_run_id, resume="allow"
)

logger.info("Waiting 10 seconds for I/O to settle")
time.sleep(10)

is_testing_passed, error_msg = calc_convergence_and_performance(
model_family_name=model_family_name,
model_recipe_name=model_recipe_name,
assets_dir=os.path.join(job_dir, exp_name),
log_paths=log_paths,
loss_metric="lm loss",
timing_metric="elapsed time per iteration (ms)",
alloc_metric="alloc",
max_alloc_metric="max_alloc",
golden_values_path=golden_values_path,
convergence_config=convergence_params,
performance_config=performance_params,
memory_config=memory_params,
wandb_run=wandb_run,
)

if wandb_run:
wandb_run.finish()
wandb.teardown(exit_code=int(not is_testing_passed))

if not is_long_convergence_run:
n_attempts = max_retries
is_finished_experiment = True
if not is_testing_passed:
_copy_logs_to_gcp(job_dir)
break

if is_finished_experiment and is_testing_passed:
break

if not is_testing_passed and error_msg is not None:
raise AssertionError(error_msg)
if is_testing_passed and error_msg is not None:
logger.warning(error_msg)

if not is_finished_experiment:
_copy_logs_to_gcp(job_dir)
raise Exception("Megatron-Bridge CI test job failed")
elif is_finished_experiment and not detach:
logger.info("Megatron-Bridge CI test job completed successfully!")
run.run(
nemorun_script,
executor=executor,
plugins=plugins,
dryrun=dryrun,
detach=detach,
name=exp_name,
)


if __name__ == "__main__":
Expand Down
Loading