diff --git a/tests/integration-tests/configs/develop.yaml b/tests/integration-tests/configs/develop.yaml index 0deb563d7d..14c62df901 100644 --- a/tests/integration-tests/configs/develop.yaml +++ b/tests/integration-tests/configs/develop.yaml @@ -479,6 +479,12 @@ test-suites: instances: {{ common.INSTANCES_DEFAULT_X86 }} oss: [{{ OS_X86_5 }}] schedulers: ["slurm"] + test_slurm.py::test_expedited_requeue: + dimensions: + - regions: ["ap-east-1"] + instances: {{ common.INSTANCES_DEFAULT_X86 }} + oss: [{{ OS_X86_5 }}] + schedulers: ["slurm"] test_slurm.py::test_slurm_config_update: dimensions: - regions: [ "ap-east-1" ] diff --git a/tests/integration-tests/tests/common/scaling_common.py b/tests/integration-tests/tests/common/scaling_common.py index f46f623021..7a70a9914a 100644 --- a/tests/integration-tests/tests/common/scaling_common.py +++ b/tests/integration-tests/tests/common/scaling_common.py @@ -25,6 +25,8 @@ from utils import get_compute_nodes_instance_count SCALING_COMMON_DATADIR = pathlib.Path(__file__).parent / "scaling" +RUN_INSTANCES_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/run_instances_overrides.json" +CREATE_FLEET_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/create_fleet_overrides.json" def validate_and_get_scaling_test_config(scaling_test_config_file): @@ -364,3 +366,77 @@ def setup_ec2_launch_override_to_emulate_ice( run_as_root=True, ) # fmt: on + + +def _write_json_override(remote_command_executor, path, content): + """Write a JSON override file on the head node.""" + json_str = json.dumps(content) + remote_command_executor.run_remote_command( + f"echo '{json_str}' | sudo tee {path}", + raise_on_error=True, + ) + + +def _build_create_fleet_override(cluster_name, queue, compute_resource, instance_types, subnet_id): + """Build a create_fleet_overrides dict with LaunchTemplateSpecification, InstanceTypes and SubnetId.""" + lt_name = f"{cluster_name}-{queue}-{compute_resource}" + overrides_list = [{"InstanceType": it, "SubnetId": subnet_id} for it in instance_types] + return { + queue: { + compute_resource: { + "LaunchTemplateConfigs": [ + { + "LaunchTemplateSpecification": { + "LaunchTemplateName": lt_name, + "Version": "$Latest", + }, + "Overrides": overrides_list, + } + ], + } + } + } + + +def setup_create_fleet_override_to_emulate_ice( + remote_command_executor, cluster_name, queue, compute_resource, instance_types, subnet_id +): + """ + Write create_fleet_overrides.json with invalid InstanceTypes to emulate ICE. + + This targets multi-instance-type CRs that use the create_fleet API. The override includes + LaunchTemplateSpecification and SubnetId to avoid MissingParameter errors. The invalid + InstanceTypes (prefixed with "ICE-") cause create_fleet to return no instances, which is + detected as InsufficientInstanceCapacity. + + To recover, call recover_create_fleet_override_from_ice() which replaces the invalid + InstanceTypes with real ones. + """ + ice_instance_types = [f"ICE-{it}" for it in instance_types] + overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, ice_instance_types, subnet_id) + logging.info( + "Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " "for queue=%s, cr=%s", + queue, + compute_resource, + ) + _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) + + +def recover_create_fleet_override_from_ice( + remote_command_executor, cluster_name, queue, compute_resource, real_instance_types, subnet_id +): + """ + Recover from simulated ICE by changing InstanceTypes in create_fleet_overrides.json back to real ones. + + This is the "change instance type in JSON to recover" approach — the invalid "ICE-*" prefixed + InstanceTypes are replaced with real ones (with correct SubnetId), so the next create_fleet + call succeeds. + """ + overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, real_instance_types, subnet_id) + logging.info( + "Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " "for queue=%s, cr=%s", + real_instance_types, + queue, + compute_resource, + ) + _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 646536d277..6a353b8e2f 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -55,7 +55,11 @@ wait_for_compute_nodes_states, wait_for_num_nodes_in_scheduler, ) -from tests.common.scaling_common import setup_ec2_launch_override_to_emulate_ice +from tests.common.scaling_common import ( + recover_create_fleet_override_from_ice, + setup_create_fleet_override_to_emulate_ice, + setup_ec2_launch_override_to_emulate_ice, +) from tests.common.schedulers_common import SlurmCommands @@ -616,6 +620,179 @@ def test_fast_capacity_failover( ) +def _submit_jobs_and_simulate_ice(common_cluster_details, jobs): + """ + Set up ICE simulation, submit jobs, and wait for ICE to be detected. + + Each entry in jobs is a dict with "label" (str) and "expedited" (bool). + Returns a list of job IDs in the same order as the input jobs. + """ + rce = common_cluster_details["remote_command_executor"] + scheduler_commands = common_cluster_details["scheduler_commands"] + target_nodes = common_cluster_details["target_nodes"] + + # Set up ICE simulation + setup_create_fleet_override_to_emulate_ice( + rce, + cluster_name=common_cluster_details["cluster_name"], + queue=common_cluster_details["partition"], + compute_resource=common_cluster_details["ice_compute_res"], + instance_types=common_cluster_details["real_instance_types"], + subnet_id=common_cluster_details["subnet_id"], + ) + + # Clear logs for clean state + rce.clear_slurm_resume_log() + rce.clear_clustermgtd_log() + + # Submit all jobs + job_ids = [] + for job in jobs: + requeue_opt = "--requeue=expedite " if job["expedited"] else "" + job_type = "expedited" if job["expedited"] else "normal" + jid = scheduler_commands.submit_command_and_assert_job_accepted( + submit_command_args={ + "command": f'echo "START_TIME=$(date +%s)"; sleep 30; echo "{job["label"]} done"', + "nodes": 1, + "partition": common_cluster_details["partition"], + "host": target_nodes[0], + "other_options": f"{requeue_opt}--exclusive", + } + ) + time.sleep(20) # Add a sleep to match manual submission of job + logging.info("Submitted %s (%s) ID: %s", job["label"], job_type, jid) + job_ids.append(jid) + + # Wait for ICE to be detected + retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)( + rce, + ["/var/log/parallelcluster/clustermgtd"], + ["The following compute resources are in down state due to insufficient capacity"], + ) + + # Verify target nodes are down due to ICE + assert_compute_node_states(scheduler_commands, target_nodes, expected_states=["down#", "down~"]) + + return job_ids + + +def _recover_from_ice_and_wait_for_jobs(common_cluster_details, job_ids): + """Recover from ICE simulation, wait for nodes to reset, and wait for all jobs to complete.""" + rce = common_cluster_details["remote_command_executor"] + scheduler_commands = common_cluster_details["scheduler_commands"] + target_nodes = common_cluster_details["target_nodes"] + + # Recover from ICE + recover_create_fleet_override_from_ice( + rce, + cluster_name=common_cluster_details["cluster_name"], + queue=common_cluster_details["partition"], + compute_resource=common_cluster_details["ice_compute_res"], + real_instance_types=common_cluster_details["real_instance_types"], + subnet_id=common_cluster_details["subnet_id"], + ) + + # Wait for insufficient_capacity_timeout to expire and nodes to reset + retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)( + rce, + ["/var/log/parallelcluster/clustermgtd"], + ["Reset the following compute resources because insufficient capacity timeout expired"], + ) + + # Wait for target nodes to be power-saved (reset) + wait_for_compute_nodes_states( + scheduler_commands, target_nodes, expected_states=["idle~"], wait_fixed_secs=5, stop_max_delay_secs=600 + ) + + # Wait for all jobs to complete + for jid in job_ids: + scheduler_commands.wait_job_completed(jid, timeout=15) + scheduler_commands.assert_job_succeeded(jid) + + +def _collect_start_epochs(remote_command_executor, job_ids): + """Read START_TIME epoch from slurm output files and return as a list in the same order.""" + epochs = [] + for jid in job_ids: + output = remote_command_executor.run_remote_command(f"cat ~/slurm-{jid}.out").stdout + logging.info("Job %s output: %s", jid, output) + epoch = int(re.search(r"START_TIME=(\d+)", output).group(1)) + epochs.append(epoch) + return epochs + + +@pytest.mark.usefixtures("region", "os", "instance", "scheduler") +@pytest.mark.expedited_requeue +def test_expedited_requeue( + pcluster_config_reader, + clusters_factory, + scheduler_commands_factory, + vpc_stack, +): + """ + Test Slurm 25.11+ expedited requeue behavior with recoverable ICE simulation. + + Uses create_fleet_overrides.json to simulate ICE (invalid "ICE-" prefixed InstanceTypes), + then recovers by changing them back to real ones. + Verifies that expedited requeue jobs are treated as highest priority after ICE recovery. + """ + cluster_config = pcluster_config_reader() + cluster = clusters_factory(cluster_config) + remote_command_executor = RemoteCommandExecutor(cluster) + clustermgtd_conf_path = retrieve_clustermgtd_conf_path(remote_command_executor) + scheduler_commands = scheduler_commands_factory(remote_command_executor) + + partition = "queue" + ice_compute_res = "ice-cr" + real_instance_types = ["t3.medium", "c5.large"] + subnet_id = vpc_stack.get_private_subnet() + + # Set insufficient_capacity_timeout to 180s for quick reset of iced-ompute resource + _set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path) + + # Get node lists — all nodes in ice-cr are dynamic + nodes_in_scheduler = scheduler_commands.get_compute_nodes(partition, all_nodes=True) + _, dynamic_nodes = get_partition_nodes(nodes_in_scheduler) + ice_dynamic_nodes = [n for n in dynamic_nodes] + logging.info("ICE CR dynamic nodes: %s", ice_dynamic_nodes) + # Pick a specific dynamic node to target + target_node = ice_dynamic_nodes[0] + logging.info("Target dynamic node for ICE test: %s", target_node) + + common_cluster_details = { + "remote_command_executor": remote_command_executor, + "cluster_name": cluster.cfn_name, + "scheduler_commands": scheduler_commands, + "partition": partition, + "ice_compute_res": ice_compute_res, + "real_instance_types": real_instance_types, + "subnet_id": subnet_id, + "target_nodes": [target_node], + } + + # Submit 3 jobs in a single ICE cycle: + # job1 (normal), job2 (expedited), job3 (expedited) + # TODO: Improve the test by making Job 2 as expedited and Job 1 as normal + # so that its clear that Job 2 goes at the top of the queue + jobs = [ + {"label": "job1", "expedited": False}, + {"label": "job2", "expedited": True}, + # {"label": "job3", "expedited": True}, + ] + job_ids = _submit_jobs_and_simulate_ice(common_cluster_details, jobs) + _recover_from_ice_and_wait_for_jobs(common_cluster_details, job_ids) + start_epochs = _collect_start_epochs(remote_command_executor, job_ids) + + # Expected ordering after ICE recovery: + # Expedited jobs (job2, job3) run first in submission order, then normal job (job1) last + logging.info("Start epochs: %s", dict(zip([j["label"] for j in jobs], start_epochs))) + + assert_that(start_epochs[1]).is_less_than_or_equal_to(start_epochs[0]) # job1 (normal) after job2 (expedited) + # assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[0]) # job3 (expedited) before job1 (normal) + # assert_that(start_epochs[1]).is_less_than_or_equal_to(start_epochs[2]) # job2 (expedited) before job3 (expedited) + logging.info("Verified: expedited jobs (job2) ran before normal job (job1)") + + @pytest.mark.usefixtures("region", "os", "instance", "scheduler") @pytest.mark.slurm_config_update def test_slurm_config_update( diff --git a/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml b/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml new file mode 100644 index 0000000000..17b73b96e3 --- /dev/null +++ b/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml @@ -0,0 +1,26 @@ +Image: + Os: {{ os }} +HeadNode: + InstanceType: {{ instance }} + Networking: + SubnetId: {{ public_subnet_id }} + Ssh: + KeyName: {{ key_name }} +Scheduling: + Scheduler: slurm + SlurmQueues: + - Name: queue + Networking: + SubnetIds: + - {{ private_subnet_id }} + ComputeResources: + - Name: ice-cr + Instances: + - InstanceType: t3.medium + - InstanceType: c5.large + - Name: normal-cr + InstanceType: {{ instance }} +SharedStorage: + - MountDir: /shared + Name: name1 + StorageType: Ebs