From 7a32ded07a7f92559feaa9c0267d0b0af1262f01 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Tue, 27 Jan 2026 09:54:43 -0500 Subject: [PATCH 01/17] Add integration test for Slurm 25.11 expedited requeue mode feature Extend test_fast_capacity_failover to validate the new --requeue=expedite option introduced in Slurm 25.11.2. This feature allows batch jobs to automatically requeue on node failure with highest priority. --- .../tests/schedulers/test_slurm.py | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 646536d277..a211157ae4 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -615,6 +615,20 @@ def test_fast_capacity_failover( expected_error_code="InvalidParameter" if "us-iso" in region else "InvalidParameterValue", ) + # Test Slurm 25.11 Expedited Requeue feature with ICE + # Jobs submitted with --requeue=expedite should automatically requeue on node failure + # and be treated as highest priority, eligible to restart immediately + _test_expedited_requeue_on_ice( + partition, + scheduler_commands, + remote_command_executor, + clustermgtd_conf_path, + ice_single_static_nodes, + ice_single_dynamic_nodes, + target_compute_resource="ice-compute-resource", + expected_error_code="InsufficientHostCapacity", + ) + @pytest.mark.usefixtures("region", "os", "instance", "scheduler") @pytest.mark.slurm_config_update @@ -2293,6 +2307,87 @@ def _test_enable_fast_capacity_failover( ) +def _test_expedited_requeue_on_ice( + partition, + scheduler_commands, + remote_command_executor, + clustermgtd_conf_path, + cr_static_nodes, + cr_dynamic_nodes, + target_compute_resource, + expected_error_code, +): + """ + Test Slurm 25.11 expedited requeue behavior when ICE occurs. + + This test verifies that jobs submitted with --requeue=expedite: + 1. Automatically requeue when nodes fail due to ICE + 2. Complete successfully on alternative compute resources + 3. Requeue within the expected time window (2 minutes) + + It expects to be run on a CR with at least 1 static node and 1 dynamic node. + It expects all the dynamic nodes to fail due to the overrides to RunInstances or CreateFleet. + It expects the job to succeed because Slurm will reallocate the failed nodes to a different CR. + """ + # set insufficient_capacity_timeout to 180 seconds to quicker reset compute resources + _set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path) + + # clear slurm_resume and clustermgtd logs in order to start from a clean state + remote_command_executor.clear_slurm_resume_log() + remote_command_executor.clear_clustermgtd_log() + + # Submit job with --requeue=expedite to test Slurm 25.11 expedited requeue feature + # Using `prefer` to allow requeuing the job on a different CR when ICE occurs + job_id = scheduler_commands.submit_command_and_assert_job_accepted( + submit_command_args={ + "command": "sleep 30", + "nodes": 2, + "partition": partition, + "prefer": target_compute_resource, + "other_options": "--requeue=expedite", + } + ) + + # Wait for ICE to be detected and nodes to be marked as down + retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)( + remote_command_executor, + ["/var/log/parallelcluster/clustermgtd"], + [ + "The following compute resources are in down state due to insufficient capacity", + ], + ) + + # Verify static nodes in ice compute resource are still up + assert_compute_node_states(scheduler_commands, cr_static_nodes, expected_states=["idle", "mixed", "allocated"]) + # Verify dynamic nodes in ice compute resource are down due to ICE + assert_compute_node_states(scheduler_commands, cr_dynamic_nodes, expected_states=["down#", "down~"]) + assert_compute_node_reasons(scheduler_commands, cr_dynamic_nodes, f"(Code:{expected_error_code})") + + # Verify expedited requeue job completes successfully within 2 minutes + # The job should be automatically requeued with highest priority and run on alternative CR + scheduler_commands.wait_job_completed(job_id) + assert_job_requeue_in_time(scheduler_commands, job_id) + + # Wait for insufficient capacity timeout to expire and nodes to reset + retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)( + remote_command_executor, + ["/var/log/parallelcluster/clustermgtd"], + [ + "Reset the following compute resources because insufficient capacity timeout expired", + ], + ) + + # Verify dynamic nodes are reset after insufficient_capacity_timeout expired + _wait_for_node_reset(scheduler_commands, static_nodes=[], dynamic_nodes=cr_dynamic_nodes) + + # Verify insufficient capacity does not trigger protected mode + assert_no_msg_in_logs( + remote_command_executor, + ["/var/log/parallelcluster/clustermgtd"], + ["Node bootstrap error"], + ) + + def _test_update_without_update_queue_params(pcluster_config_reader, cluster, remote_command_executor): """Test update without queue param change, clustermgtd and slurmctld not restart.""" updated_config_file = pcluster_config_reader(config_file="pcluster.config.update.yaml") From fc01829cdb46d3802a3fcc2d180906b5fe83e8ff Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Wed, 28 Jan 2026 14:07:04 -0500 Subject: [PATCH 02/17] Refine _test_expedited_requeue_on_ice to validate that expedited requeue jobs are treated as highest priority. --- .../tests/schedulers/test_slurm.py | 55 +++++++++++++++---- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index a211157ae4..55d6ff0790 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -2318,16 +2318,22 @@ def _test_expedited_requeue_on_ice( expected_error_code, ): """ - Test Slurm 25.11 expedited requeue behavior when ICE occurs. + Test Slurm newer than 25.11 expedited requeue behavior when ICE occurs. This test verifies that jobs submitted with --requeue=expedite: 1. Automatically requeue when nodes fail due to ICE - 2. Complete successfully on alternative compute resources - 3. Requeue within the expected time window (2 minutes) + 2. Are treated as highest priority after requeue (start before earlier-submitted normal jobs) + 3. Complete successfully on alternative compute resources + + Test strategy: + - Submit job1 with --requeue=expedite to ICE-triggering CR + - Submit job2 (normal job) BEFORE ICE occurs (so job2 has earlier SubmitTime) + - Wait for ICE to occur and job1 to be requeued + - Verify job1 starts before job2 (proving highest priority despite later requeue) It expects to be run on a CR with at least 1 static node and 1 dynamic node. It expects all the dynamic nodes to fail due to the overrides to RunInstances or CreateFleet. - It expects the job to succeed because Slurm will reallocate the failed nodes to a different CR. + It expects job1 to succeed because Slurm will reallocate the failed nodes to a different CR. """ # set insufficient_capacity_timeout to 180 seconds to quicker reset compute resources _set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path) @@ -2336,9 +2342,9 @@ def _test_expedited_requeue_on_ice( remote_command_executor.clear_slurm_resume_log() remote_command_executor.clear_clustermgtd_log() - # Submit job with --requeue=expedite to test Slurm 25.11 expedited requeue feature + # Submit job1 with --requeue=expedite to test Slurm 25.11 expedited requeue feature # Using `prefer` to allow requeuing the job on a different CR when ICE occurs - job_id = scheduler_commands.submit_command_and_assert_job_accepted( + job1_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ "command": "sleep 30", "nodes": 2, @@ -2347,6 +2353,19 @@ def _test_expedited_requeue_on_ice( "other_options": "--requeue=expedite", } ) + logging.info(f"Submitted job1 (expedited requeue) with ID: {job1_id}") + + # Submit job2 (normal job) BEFORE ICE occurs + # This ensures job2 has an earlier SubmitTime than job1's requeue time + # If expedited requeue truly provides highest priority, job1 should still start first + job2_id = scheduler_commands.submit_command_and_assert_job_accepted( + submit_command_args={ + "command": "sleep 30", + "nodes": 1, + "partition": partition, + } + ) + logging.info(f"Submitted job2 (normal job) with ID: {job2_id}") # Wait for ICE to be detected and nodes to be marked as down retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)( @@ -2363,10 +2382,26 @@ def _test_expedited_requeue_on_ice( assert_compute_node_states(scheduler_commands, cr_dynamic_nodes, expected_states=["down#", "down~"]) assert_compute_node_reasons(scheduler_commands, cr_dynamic_nodes, f"(Code:{expected_error_code})") - # Verify expedited requeue job completes successfully within 2 minutes - # The job should be automatically requeued with highest priority and run on alternative CR - scheduler_commands.wait_job_completed(job_id) - assert_job_requeue_in_time(scheduler_commands, job_id) + # Wait for both jobs to complete + scheduler_commands.wait_job_completed(job1_id) + scheduler_commands.wait_job_completed(job2_id) + + # Verify both jobs succeeded + scheduler_commands.assert_job_succeeded(job1_id) + scheduler_commands.assert_job_succeeded(job2_id) + + # Verify expedited requeue job (job1) started before normal job (job2) + # This proves that expedited requeue jobs are treated as highest priority + # even though job2 was submitted before job1 was requeued + job1_start_time = datetime.strptime(scheduler_commands.get_job_start_time(job1_id), "%Y-%m-%dT%H:%M:%S") + job2_start_time = datetime.strptime(scheduler_commands.get_job_start_time(job2_id), "%Y-%m-%dT%H:%M:%S") + job2_submit_time = datetime.strptime(scheduler_commands.get_job_submit_time(job2_id), "%Y-%m-%dT%H:%M:%S") + job1_eligible_time = datetime.strptime(scheduler_commands.get_job_eligible_time(job1_id), "%Y-%m-%dT%H:%M:%S") + logging.info(f"Job1 (expedited) start time: {job1_start_time}, eligible time: {job1_eligible_time}") + logging.info(f"Job2 (normal) submit time: {job2_submit_time}, start time: {job2_start_time}") + logging.info(f"Job2 was submitted before job1 was requeued: {job2_submit_time < job1_eligible_time}") + assert_that(job1_start_time).is_less_than_or_equal_to(job2_start_time) + logging.info("Verified: Expedited requeue job started before normal job (highest priority confirmed)") # Wait for insufficient capacity timeout to expire and nodes to reset retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)( From cd421c34526281b3aebe9b69f913c3f4eef31aff Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Wed, 28 Jan 2026 17:28:22 -0500 Subject: [PATCH 03/17] Improve expedited requeue test job commands for better debugging - Change job commands from simple 'sleep 30' to output hostname and timestamps, making it easier to verify job execution in output files - Add --prefer option to job2 targeting the same compute resource as job1 - Increase job2 node request from 1 to 2 nodes to prevent it from immediately running on another CR before job1 requeues --- tests/integration-tests/tests/schedulers/test_slurm.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 55d6ff0790..ee23d71b4f 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -2344,9 +2344,10 @@ def _test_expedited_requeue_on_ice( # Submit job1 with --requeue=expedite to test Slurm 25.11 expedited requeue feature # Using `prefer` to allow requeuing the job on a different CR when ICE occurs + # Command outputs hostname and timestamp to verify job actually ran job1_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": "sleep 30", + "command": "echo 'Job1 started on' $(hostname) 'at' $(date); sleep 30; echo 'Job1 completed at' $(date)", "nodes": 2, "partition": partition, "prefer": target_compute_resource, @@ -2358,11 +2359,14 @@ def _test_expedited_requeue_on_ice( # Submit job2 (normal job) BEFORE ICE occurs # This ensures job2 has an earlier SubmitTime than job1's requeue time # If expedited requeue truly provides highest priority, job1 should still start first + # Job2 also uses --prefer to target the same CR and requests 2 nodes + # This prevents job2 from immediately running on another CR before job1 requeues job2_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": "sleep 30", - "nodes": 1, + "command": "echo 'Job2 started on' $(hostname) 'at' $(date); sleep 30; echo 'Job2 completed at' $(date)", + "nodes": 2, "partition": partition, + "prefer": target_compute_resource, } ) logging.info(f"Submitted job2 (normal job) with ID: {job2_id}") From dd94842344da62d9a0d447471565dbd03573495c Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Thu, 29 Jan 2026 14:45:35 -0500 Subject: [PATCH 04/17] Fix quote escaping in expedited requeue test to avoid sbatch --wrap parsing error --- tests/integration-tests/tests/schedulers/test_slurm.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index ee23d71b4f..7310042665 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -2345,9 +2345,10 @@ def _test_expedited_requeue_on_ice( # Submit job1 with --requeue=expedite to test Slurm 25.11 expedited requeue feature # Using `prefer` to allow requeuing the job on a different CR when ICE occurs # Command outputs hostname and timestamp to verify job actually ran + # Note: Using double quotes in echo to avoid conflicts with --wrap's single quotes job1_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": "echo 'Job1 started on' $(hostname) 'at' $(date); sleep 30; echo 'Job1 completed at' $(date)", + "command": 'echo "Job1 started on" $(hostname) "at" $(date); sleep 30; echo "Job1 completed at" $(date)', "nodes": 2, "partition": partition, "prefer": target_compute_resource, @@ -2361,9 +2362,10 @@ def _test_expedited_requeue_on_ice( # If expedited requeue truly provides highest priority, job1 should still start first # Job2 also uses --prefer to target the same CR and requests 2 nodes # This prevents job2 from immediately running on another CR before job1 requeues + # Note: Using double quotes in echo to avoid conflicts with --wrap's single quotes job2_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": "echo 'Job2 started on' $(hostname) 'at' $(date); sleep 30; echo 'Job2 completed at' $(date)", + "command": 'echo "Job2 started on" $(hostname) "at" $(date); sleep 30; echo "Job2 completed at" $(date)', "nodes": 2, "partition": partition, "prefer": target_compute_resource, From 9bf6eb7edb0fb6d265bc1963a69200bc3fcb6c0c Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Wed, 11 Feb 2026 15:57:14 -0500 Subject: [PATCH 05/17] [Test] Decouple test_expedited_requeue from test_fast_capacity_failover and use recoverable ICE simulation Move _test_expedited_requeue_on_ice out of test_fast_capacity_failover into a standalone test_expedited_requeue with its own cluster config (multi-instance-type CR using create_fleet). Replace the unrecoverable overrides.py-based ICE simulation with create_fleet_overrides.json: write invalid 'ICE-' prefixed InstanceTypes to trigger ICE, then change them back to real ones (t3.medium, c5.xlarge) to recover. This allows verifying that after ICE recovery, the expedited requeue job starts before a normal job submitted earlier. --- tests/integration-tests/configs/develop.yaml | 6 + .../tests/common/scaling_common.py | 61 +++++ .../tests/schedulers/test_slurm.py | 257 +++++++++--------- .../pcluster.config.yaml | 26 ++ 4 files changed, 217 insertions(+), 133 deletions(-) create mode 100644 tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml diff --git a/tests/integration-tests/configs/develop.yaml b/tests/integration-tests/configs/develop.yaml index 0deb563d7d..14c62df901 100644 --- a/tests/integration-tests/configs/develop.yaml +++ b/tests/integration-tests/configs/develop.yaml @@ -479,6 +479,12 @@ test-suites: instances: {{ common.INSTANCES_DEFAULT_X86 }} oss: [{{ OS_X86_5 }}] schedulers: ["slurm"] + test_slurm.py::test_expedited_requeue: + dimensions: + - regions: ["ap-east-1"] + instances: {{ common.INSTANCES_DEFAULT_X86 }} + oss: [{{ OS_X86_5 }}] + schedulers: ["slurm"] test_slurm.py::test_slurm_config_update: dimensions: - regions: [ "ap-east-1" ] diff --git a/tests/integration-tests/tests/common/scaling_common.py b/tests/integration-tests/tests/common/scaling_common.py index f46f623021..abf18e26a0 100644 --- a/tests/integration-tests/tests/common/scaling_common.py +++ b/tests/integration-tests/tests/common/scaling_common.py @@ -25,6 +25,8 @@ from utils import get_compute_nodes_instance_count SCALING_COMMON_DATADIR = pathlib.Path(__file__).parent / "scaling" +RUN_INSTANCES_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/run_instances_overrides.json" +CREATE_FLEET_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/create_fleet_overrides.json" def validate_and_get_scaling_test_config(scaling_test_config_file): @@ -364,3 +366,62 @@ def setup_ec2_launch_override_to_emulate_ice( run_as_root=True, ) # fmt: on + + +def _write_json_override(remote_command_executor, path, content): + """Write a JSON override file on the head node.""" + json_str = json.dumps(content) + remote_command_executor.run_remote_command( + f"echo '{json_str}' | sudo tee {path}", + raise_on_error=True, + ) + + +def setup_create_fleet_override_to_emulate_ice(remote_command_executor, queue, compute_resource, instance_types): + """ + Write create_fleet_overrides.json with invalid InstanceTypes to emulate ICE. + + This targets multi-instance-type CRs that use the create_fleet API. The invalid InstanceTypes + (prefixed with "ICE-") cause create_fleet to return no instances, which is detected as + InsufficientInstanceCapacity by the instance manager. + + To recover, call recover_create_fleet_override_from_ice() which replaces the invalid + InstanceTypes with real ones. + """ + overrides = { + queue: { + compute_resource: { + "LaunchTemplateConfigs": [ + { + "Overrides": [{"InstanceType": f"ICE-{it}"} for it in instance_types] + } + ], + } + } + } + logging.info("Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " + "for queue=%s, cr=%s", queue, compute_resource) + _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) + + +def recover_create_fleet_override_from_ice(remote_command_executor, queue, compute_resource, real_instance_types): + """ + Recover from simulated ICE by changing InstanceTypes in create_fleet_overrides.json back to real ones. + + This is the "change instance type in JSON to recover" approach — the invalid "ICE-*" prefixed + InstanceTypes are replaced with real ones, so the next create_fleet call succeeds. + """ + overrides = { + queue: { + compute_resource: { + "LaunchTemplateConfigs": [ + { + "Overrides": [{"InstanceType": it} for it in real_instance_types] + } + ], + } + } + } + logging.info("Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " + "for queue=%s, cr=%s", real_instance_types, queue, compute_resource) + _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 7310042665..4e2f80144d 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -55,7 +55,11 @@ wait_for_compute_nodes_states, wait_for_num_nodes_in_scheduler, ) -from tests.common.scaling_common import setup_ec2_launch_override_to_emulate_ice +from tests.common.scaling_common import ( + recover_create_fleet_override_from_ice, + setup_create_fleet_override_to_emulate_ice, + setup_ec2_launch_override_to_emulate_ice, +) from tests.common.schedulers_common import SlurmCommands @@ -615,18 +619,126 @@ def test_fast_capacity_failover( expected_error_code="InvalidParameter" if "us-iso" in region else "InvalidParameterValue", ) - # Test Slurm 25.11 Expedited Requeue feature with ICE - # Jobs submitted with --requeue=expedite should automatically requeue on node failure - # and be treated as highest priority, eligible to restart immediately - _test_expedited_requeue_on_ice( - partition, - scheduler_commands, + +@pytest.mark.usefixtures("region", "os", "instance", "scheduler") +@pytest.mark.expedited_requeue +def test_expedited_requeue( + pcluster_config_reader, + clusters_factory, + scheduler_commands_factory, +): + """ + Test Slurm 25.11+ expedited requeue behavior with recoverable ICE simulation. + + Uses JSON run_instances_overrides to simulate ICE (Placement.Tenancy=host triggers + InsufficientHostCapacity), then recovers by changing the override to a valid InstanceType. + Verifies that expedited requeue jobs are treated as highest priority after ICE recovery. + """ + cluster_config = pcluster_config_reader() + cluster = clusters_factory(cluster_config) + remote_command_executor = RemoteCommandExecutor(cluster) + clustermgtd_conf_path = retrieve_clustermgtd_conf_path(remote_command_executor) + scheduler_commands = scheduler_commands_factory(remote_command_executor) + + partition = "queue" + ice_cr = "ice-cr" + real_instance_types = ["t3.medium", "c5.xlarge"] + + # Set up ICE simulation via create_fleet_overrides.json with invalid InstanceTypes + setup_create_fleet_override_to_emulate_ice( + remote_command_executor, queue=partition, compute_resource=ice_cr, instance_types=real_instance_types + ) + + # Set insufficient_capacity_timeout to 180s for quicker reset + _set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path) + + # Get node lists — all nodes in ice-cr are dynamic + nodes_in_scheduler = scheduler_commands.get_compute_nodes(partition, all_nodes=True) + _, dynamic_nodes = get_partition_nodes(nodes_in_scheduler) + ice_dynamic_nodes = [n for n in dynamic_nodes] + logging.info("ICE CR dynamic nodes: %s", ice_dynamic_nodes) + # Pick a specific dynamic node to target + target_node = ice_dynamic_nodes[0] + logging.info("Target dynamic node for ICE test: %s", target_node) + + # Clear logs for clean state + remote_command_executor.clear_slurm_resume_log() + remote_command_executor.clear_clustermgtd_log() + + # Submit job1 with --requeue=expedite, targeting the specific ICE dynamic node + job1_id = scheduler_commands.submit_command_and_assert_job_accepted( + submit_command_args={ + "command": 'echo "Job1 on $(hostname) at $(date)"; sleep 30; echo "Job1 done"', + "nodes": 1, + "partition": partition, + "host": target_node, + "other_options": "--requeue=expedite", + } + ) + logging.info("Submitted job1 (expedited requeue) ID: %s", job1_id) + + # Submit job2 (normal), targeting the same ICE dynamic node + job2_id = scheduler_commands.submit_command_and_assert_job_accepted( + submit_command_args={ + "command": 'echo "Job2 on $(hostname) at $(date)"; sleep 30; echo "Job2 done"', + "nodes": 1, + "partition": partition, + "host": target_node, + } + ) + logging.info("Submitted job2 (normal) ID: %s", job2_id) + + # Wait for ICE to be detected + retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)( remote_command_executor, - clustermgtd_conf_path, - ice_single_static_nodes, - ice_single_dynamic_nodes, - target_compute_resource="ice-compute-resource", - expected_error_code="InsufficientHostCapacity", + ["/var/log/parallelcluster/clustermgtd"], + ["The following compute resources are in down state due to insufficient capacity"], + ) + + # Verify the target dynamic node is down due to ICE + assert_compute_node_states(scheduler_commands, [target_node], expected_states=["down#", "down~"]) + + # Recover from ICE: change InstanceTypes in JSON override back to real ones + recover_create_fleet_override_from_ice( + remote_command_executor, queue=partition, compute_resource=ice_cr, real_instance_types=real_instance_types + ) + + # Wait for insufficient_capacity_timeout to expire and nodes to reset + retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)( + remote_command_executor, + ["/var/log/parallelcluster/clustermgtd"], + ["Reset the following compute resources because insufficient capacity timeout expired"], + ) + + # Wait for the target dynamic node to be power-saved (reset) + wait_for_compute_nodes_states( + scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=300 + ) + + # Wait for both jobs to complete — they should now run on recovered nodes + scheduler_commands.wait_job_completed(job1_id, timeout=8) + scheduler_commands.wait_job_completed(job2_id, timeout=8) + + # Verify both jobs succeeded + scheduler_commands.assert_job_succeeded(job1_id) + scheduler_commands.assert_job_succeeded(job2_id) + + # Verify expedited requeue job (job1) started before normal job (job2) + job1_start = datetime.strptime(scheduler_commands.get_job_start_time(job1_id), "%Y-%m-%dT%H:%M:%S") + job2_start = datetime.strptime(scheduler_commands.get_job_start_time(job2_id), "%Y-%m-%dT%H:%M:%S") + job2_submit = datetime.strptime(scheduler_commands.get_job_submit_time(job2_id), "%Y-%m-%dT%H:%M:%S") + job1_eligible = datetime.strptime(scheduler_commands.get_job_eligible_time(job1_id), "%Y-%m-%dT%H:%M:%S") + logging.info("Job1 (expedited) start=%s, eligible=%s", job1_start, job1_eligible) + logging.info("Job2 (normal) submit=%s, start=%s", job2_submit, job2_start) + logging.info("Job2 submitted before job1 requeued: %s", job2_submit < job1_eligible) + assert_that(job1_start).is_less_than_or_equal_to(job2_start) + logging.info("Verified: expedited requeue job started before normal job (highest priority)") + + # Verify no protected mode triggered + assert_no_msg_in_logs( + remote_command_executor, + ["/var/log/parallelcluster/clustermgtd"], + ["Node bootstrap error"], ) @@ -2307,127 +2419,6 @@ def _test_enable_fast_capacity_failover( ) -def _test_expedited_requeue_on_ice( - partition, - scheduler_commands, - remote_command_executor, - clustermgtd_conf_path, - cr_static_nodes, - cr_dynamic_nodes, - target_compute_resource, - expected_error_code, -): - """ - Test Slurm newer than 25.11 expedited requeue behavior when ICE occurs. - - This test verifies that jobs submitted with --requeue=expedite: - 1. Automatically requeue when nodes fail due to ICE - 2. Are treated as highest priority after requeue (start before earlier-submitted normal jobs) - 3. Complete successfully on alternative compute resources - - Test strategy: - - Submit job1 with --requeue=expedite to ICE-triggering CR - - Submit job2 (normal job) BEFORE ICE occurs (so job2 has earlier SubmitTime) - - Wait for ICE to occur and job1 to be requeued - - Verify job1 starts before job2 (proving highest priority despite later requeue) - - It expects to be run on a CR with at least 1 static node and 1 dynamic node. - It expects all the dynamic nodes to fail due to the overrides to RunInstances or CreateFleet. - It expects job1 to succeed because Slurm will reallocate the failed nodes to a different CR. - """ - # set insufficient_capacity_timeout to 180 seconds to quicker reset compute resources - _set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path) - - # clear slurm_resume and clustermgtd logs in order to start from a clean state - remote_command_executor.clear_slurm_resume_log() - remote_command_executor.clear_clustermgtd_log() - - # Submit job1 with --requeue=expedite to test Slurm 25.11 expedited requeue feature - # Using `prefer` to allow requeuing the job on a different CR when ICE occurs - # Command outputs hostname and timestamp to verify job actually ran - # Note: Using double quotes in echo to avoid conflicts with --wrap's single quotes - job1_id = scheduler_commands.submit_command_and_assert_job_accepted( - submit_command_args={ - "command": 'echo "Job1 started on" $(hostname) "at" $(date); sleep 30; echo "Job1 completed at" $(date)', - "nodes": 2, - "partition": partition, - "prefer": target_compute_resource, - "other_options": "--requeue=expedite", - } - ) - logging.info(f"Submitted job1 (expedited requeue) with ID: {job1_id}") - - # Submit job2 (normal job) BEFORE ICE occurs - # This ensures job2 has an earlier SubmitTime than job1's requeue time - # If expedited requeue truly provides highest priority, job1 should still start first - # Job2 also uses --prefer to target the same CR and requests 2 nodes - # This prevents job2 from immediately running on another CR before job1 requeues - # Note: Using double quotes in echo to avoid conflicts with --wrap's single quotes - job2_id = scheduler_commands.submit_command_and_assert_job_accepted( - submit_command_args={ - "command": 'echo "Job2 started on" $(hostname) "at" $(date); sleep 30; echo "Job2 completed at" $(date)', - "nodes": 2, - "partition": partition, - "prefer": target_compute_resource, - } - ) - logging.info(f"Submitted job2 (normal job) with ID: {job2_id}") - - # Wait for ICE to be detected and nodes to be marked as down - retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)( - remote_command_executor, - ["/var/log/parallelcluster/clustermgtd"], - [ - "The following compute resources are in down state due to insufficient capacity", - ], - ) - - # Verify static nodes in ice compute resource are still up - assert_compute_node_states(scheduler_commands, cr_static_nodes, expected_states=["idle", "mixed", "allocated"]) - # Verify dynamic nodes in ice compute resource are down due to ICE - assert_compute_node_states(scheduler_commands, cr_dynamic_nodes, expected_states=["down#", "down~"]) - assert_compute_node_reasons(scheduler_commands, cr_dynamic_nodes, f"(Code:{expected_error_code})") - - # Wait for both jobs to complete - scheduler_commands.wait_job_completed(job1_id) - scheduler_commands.wait_job_completed(job2_id) - - # Verify both jobs succeeded - scheduler_commands.assert_job_succeeded(job1_id) - scheduler_commands.assert_job_succeeded(job2_id) - - # Verify expedited requeue job (job1) started before normal job (job2) - # This proves that expedited requeue jobs are treated as highest priority - # even though job2 was submitted before job1 was requeued - job1_start_time = datetime.strptime(scheduler_commands.get_job_start_time(job1_id), "%Y-%m-%dT%H:%M:%S") - job2_start_time = datetime.strptime(scheduler_commands.get_job_start_time(job2_id), "%Y-%m-%dT%H:%M:%S") - job2_submit_time = datetime.strptime(scheduler_commands.get_job_submit_time(job2_id), "%Y-%m-%dT%H:%M:%S") - job1_eligible_time = datetime.strptime(scheduler_commands.get_job_eligible_time(job1_id), "%Y-%m-%dT%H:%M:%S") - logging.info(f"Job1 (expedited) start time: {job1_start_time}, eligible time: {job1_eligible_time}") - logging.info(f"Job2 (normal) submit time: {job2_submit_time}, start time: {job2_start_time}") - logging.info(f"Job2 was submitted before job1 was requeued: {job2_submit_time < job1_eligible_time}") - assert_that(job1_start_time).is_less_than_or_equal_to(job2_start_time) - logging.info("Verified: Expedited requeue job started before normal job (highest priority confirmed)") - - # Wait for insufficient capacity timeout to expire and nodes to reset - retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)( - remote_command_executor, - ["/var/log/parallelcluster/clustermgtd"], - [ - "Reset the following compute resources because insufficient capacity timeout expired", - ], - ) - - # Verify dynamic nodes are reset after insufficient_capacity_timeout expired - _wait_for_node_reset(scheduler_commands, static_nodes=[], dynamic_nodes=cr_dynamic_nodes) - - # Verify insufficient capacity does not trigger protected mode - assert_no_msg_in_logs( - remote_command_executor, - ["/var/log/parallelcluster/clustermgtd"], - ["Node bootstrap error"], - ) - def _test_update_without_update_queue_params(pcluster_config_reader, cluster, remote_command_executor): """Test update without queue param change, clustermgtd and slurmctld not restart.""" diff --git a/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml b/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml new file mode 100644 index 0000000000..2513af9596 --- /dev/null +++ b/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml @@ -0,0 +1,26 @@ +Image: + Os: {{ os }} +HeadNode: + InstanceType: {{ instance }} + Networking: + SubnetId: {{ public_subnet_id }} + Ssh: + KeyName: {{ key_name }} +Scheduling: + Scheduler: slurm + SlurmQueues: + - Name: queue + Networking: + SubnetIds: + - {{ private_subnet_id }} + ComputeResources: + - Name: ice-cr + Instances: + - InstanceType: t3.medium + - InstanceType: c5.xlarge + - Name: normal-cr + InstanceType: {{ instance }} +SharedStorage: + - MountDir: /shared + Name: name1 + StorageType: Ebs From 094326d8fac96a62ccc98daf451a127c67633661 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Wed, 11 Feb 2026 16:20:15 -0500 Subject: [PATCH 06/17] TOREVERT: Work around known Slurm 25.11 expedited requeu bug --- .../tests/schedulers/test_slurm.py | 42 ++++++++++++------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 4e2f80144d..aca4a02dfd 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -666,9 +666,10 @@ def test_expedited_requeue( remote_command_executor.clear_clustermgtd_log() # Submit job1 with --requeue=expedite, targeting the specific ICE dynamic node + # Output epoch timestamp for reliable start time parsing from slurm output file job1_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": 'echo "Job1 on $(hostname) at $(date)"; sleep 30; echo "Job1 done"', + "command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job1 done"', "nodes": 1, "partition": partition, "host": target_node, @@ -680,7 +681,7 @@ def test_expedited_requeue( # Submit job2 (normal), targeting the same ICE dynamic node job2_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": 'echo "Job2 on $(hostname) at $(date)"; sleep 30; echo "Job2 done"', + "command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job2 done"', "nodes": 1, "partition": partition, "host": target_node, @@ -715,23 +716,34 @@ def test_expedited_requeue( scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=300 ) - # Wait for both jobs to complete — they should now run on recovered nodes - scheduler_commands.wait_job_completed(job1_id, timeout=8) - scheduler_commands.wait_job_completed(job2_id, timeout=8) + # Wait for job1 to run and enter REQUEUE_HOLD. + # Known Slurm 25.11 bug: jobs with --requeue=expedite enter REQUEUE_HOLD after successful + # completion instead of COMPLETED, because _set_job_requeue_exit_value() unconditionally + # triggers expedited requeue without checking exit_code. + # We wait for REQUEUE_HOLD directly and read start time from the slurm output file, + # since StartTime in scontrol resets to Unknown in REQUEUE_HOLD state. + # TODO: Change to wait_job_completed + assert_job_succeeded once the Slurm bug is fixed. + retry(wait_fixed=seconds(10), stop_max_delay=minutes(8))(scheduler_commands.assert_job_state)( + job1_id, "REQUEUE_HOLD" + ) + logging.info("Job1 entered REQUEUE_HOLD as expected (known Slurm 25.11 bug)") + scheduler_commands.cancel_job(job1_id) - # Verify both jobs succeeded - scheduler_commands.assert_job_succeeded(job1_id) + # Wait for job2 to complete normally + scheduler_commands.wait_job_completed(job2_id, timeout=8) scheduler_commands.assert_job_succeeded(job2_id) + # Read start times from slurm output files (epoch timestamps) + job1_output = remote_command_executor.run_remote_command(f"cat ~/slurm-{job1_id}.out").stdout + job2_output = remote_command_executor.run_remote_command(f"cat ~/slurm-{job2_id}.out").stdout + job1_start_epoch = int(re.search(r"START_TIME=(\d+)", job1_output).group(1)) + job2_start_epoch = int(re.search(r"START_TIME=(\d+)", job2_output).group(1)) + logging.info("Job1 output: %s", job1_output) + logging.info("Job2 output: %s", job2_output) + # Verify expedited requeue job (job1) started before normal job (job2) - job1_start = datetime.strptime(scheduler_commands.get_job_start_time(job1_id), "%Y-%m-%dT%H:%M:%S") - job2_start = datetime.strptime(scheduler_commands.get_job_start_time(job2_id), "%Y-%m-%dT%H:%M:%S") - job2_submit = datetime.strptime(scheduler_commands.get_job_submit_time(job2_id), "%Y-%m-%dT%H:%M:%S") - job1_eligible = datetime.strptime(scheduler_commands.get_job_eligible_time(job1_id), "%Y-%m-%dT%H:%M:%S") - logging.info("Job1 (expedited) start=%s, eligible=%s", job1_start, job1_eligible) - logging.info("Job2 (normal) submit=%s, start=%s", job2_submit, job2_start) - logging.info("Job2 submitted before job1 requeued: %s", job2_submit < job1_eligible) - assert_that(job1_start).is_less_than_or_equal_to(job2_start) + logging.info("Job1 start_epoch=%s, Job2 start_epoch=%s", job1_start_epoch, job2_start_epoch) + assert_that(job1_start_epoch).is_less_than_or_equal_to(job2_start_epoch) logging.info("Verified: expedited requeue job started before normal job (highest priority)") # Verify no protected mode triggered From a10e2981f874b48a55a977c5af0f74c01fff2811 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Thu, 12 Feb 2026 11:46:54 -0500 Subject: [PATCH 07/17] Use c5.large and t3.medium to align the vcpu amount --- tests/integration-tests/tests/schedulers/test_slurm.py | 2 +- .../test_slurm/test_expedited_requeue/pcluster.config.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index aca4a02dfd..5bc12153d7 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -642,7 +642,7 @@ def test_expedited_requeue( partition = "queue" ice_cr = "ice-cr" - real_instance_types = ["t3.medium", "c5.xlarge"] + real_instance_types = ["t3.medium", "c5.large"] # Set up ICE simulation via create_fleet_overrides.json with invalid InstanceTypes setup_create_fleet_override_to_emulate_ice( diff --git a/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml b/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml index 2513af9596..17b73b96e3 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml +++ b/tests/integration-tests/tests/schedulers/test_slurm/test_expedited_requeue/pcluster.config.yaml @@ -17,7 +17,7 @@ Scheduling: - Name: ice-cr Instances: - InstanceType: t3.medium - - InstanceType: c5.xlarge + - InstanceType: c5.large - Name: normal-cr InstanceType: {{ instance }} SharedStorage: From df3677bf731bc5750bb868e538a7cfc9137a96fd Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Fri, 13 Feb 2026 14:27:15 -0500 Subject: [PATCH 08/17] Fix create_fleet_overrides to include LaunchTemplateSpecification to avoid MissingParameter error --- .../tests/common/scaling_common.py | 47 ++++++++++++++----- .../tests/schedulers/test_slurm.py | 17 ++++--- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/tests/integration-tests/tests/common/scaling_common.py b/tests/integration-tests/tests/common/scaling_common.py index abf18e26a0..dafe5f1f71 100644 --- a/tests/integration-tests/tests/common/scaling_common.py +++ b/tests/integration-tests/tests/common/scaling_common.py @@ -377,51 +377,76 @@ def _write_json_override(remote_command_executor, path, content): ) -def setup_create_fleet_override_to_emulate_ice(remote_command_executor, queue, compute_resource, instance_types): +def setup_create_fleet_override_to_emulate_ice( + remote_command_executor, cluster_name, queue, compute_resource, instance_types +): """ Write create_fleet_overrides.json with invalid InstanceTypes to emulate ICE. - This targets multi-instance-type CRs that use the create_fleet API. The invalid InstanceTypes - (prefixed with "ICE-") cause create_fleet to return no instances, which is detected as - InsufficientInstanceCapacity by the instance manager. + This targets multi-instance-type CRs that use the create_fleet API. The override must include + LaunchTemplateSpecification (with the cluster's launch template name) to avoid MissingParameter + errors. The invalid InstanceTypes (prefixed with "ICE-") cause create_fleet to return no + instances, which is detected as InsufficientInstanceCapacity by the instance manager. To recover, call recover_create_fleet_override_from_ice() which replaces the invalid InstanceTypes with real ones. """ + lt_name = f"{cluster_name}-{queue}-{compute_resource}" overrides = { queue: { compute_resource: { "LaunchTemplateConfigs": [ { - "Overrides": [{"InstanceType": f"ICE-{it}"} for it in instance_types] + "LaunchTemplateSpecification": { + "LaunchTemplateName": lt_name, + "Version": "$Latest", + }, + "Overrides": [{"InstanceType": f"ICE-{it}"} for it in instance_types], } ], } } } - logging.info("Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " - "for queue=%s, cr=%s", queue, compute_resource) + logging.info( + "Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " "for queue=%s, cr=%s (LT=%s)", + queue, + compute_resource, + lt_name, + ) _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) -def recover_create_fleet_override_from_ice(remote_command_executor, queue, compute_resource, real_instance_types): +def recover_create_fleet_override_from_ice( + remote_command_executor, cluster_name, queue, compute_resource, real_instance_types +): """ Recover from simulated ICE by changing InstanceTypes in create_fleet_overrides.json back to real ones. This is the "change instance type in JSON to recover" approach — the invalid "ICE-*" prefixed InstanceTypes are replaced with real ones, so the next create_fleet call succeeds. """ + lt_name = f"{cluster_name}-{queue}-{compute_resource}" overrides = { queue: { compute_resource: { "LaunchTemplateConfigs": [ { - "Overrides": [{"InstanceType": it} for it in real_instance_types] + "LaunchTemplateSpecification": { + "LaunchTemplateName": lt_name, + "Version": "$Latest", + }, + "Overrides": [{"InstanceType": it} for it in real_instance_types], } ], } } } - logging.info("Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " - "for queue=%s, cr=%s", real_instance_types, queue, compute_resource) + logging.info( + "Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " + "for queue=%s, cr=%s (LT=%s)", + real_instance_types, + queue, + compute_resource, + lt_name, + ) _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 5bc12153d7..2fadb90bdb 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -646,7 +646,11 @@ def test_expedited_requeue( # Set up ICE simulation via create_fleet_overrides.json with invalid InstanceTypes setup_create_fleet_override_to_emulate_ice( - remote_command_executor, queue=partition, compute_resource=ice_cr, instance_types=real_instance_types + remote_command_executor, + cluster_name=cluster.cfn_name, + queue=partition, + compute_resource=ice_cr, + instance_types=real_instance_types, ) # Set insufficient_capacity_timeout to 180s for quicker reset @@ -701,7 +705,11 @@ def test_expedited_requeue( # Recover from ICE: change InstanceTypes in JSON override back to real ones recover_create_fleet_override_from_ice( - remote_command_executor, queue=partition, compute_resource=ice_cr, real_instance_types=real_instance_types + remote_command_executor, + cluster_name=cluster.cfn_name, + queue=partition, + compute_resource=ice_cr, + real_instance_types=real_instance_types, ) # Wait for insufficient_capacity_timeout to expire and nodes to reset @@ -712,9 +720,7 @@ def test_expedited_requeue( ) # Wait for the target dynamic node to be power-saved (reset) - wait_for_compute_nodes_states( - scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=300 - ) + wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=300) # Wait for job1 to run and enter REQUEUE_HOLD. # Known Slurm 25.11 bug: jobs with --requeue=expedite enter REQUEUE_HOLD after successful @@ -2431,7 +2437,6 @@ def _test_enable_fast_capacity_failover( ) - def _test_update_without_update_queue_params(pcluster_config_reader, cluster, remote_command_executor): """Test update without queue param change, clustermgtd and slurmctld not restart.""" updated_config_file = pcluster_config_reader(config_file="pcluster.config.update.yaml") From b39921744a25ae274b4b906968cf10ec694d7d3b Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Fri, 13 Feb 2026 15:33:47 -0500 Subject: [PATCH 09/17] Add SubnetId to create_fleet_overrides and get subnet from vpc_stack instead of fleet-config.json --- .../tests/common/scaling_common.py | 79 ++++++++----------- .../tests/schedulers/test_slurm.py | 10 ++- 2 files changed, 38 insertions(+), 51 deletions(-) diff --git a/tests/integration-tests/tests/common/scaling_common.py b/tests/integration-tests/tests/common/scaling_common.py index dafe5f1f71..f8cc7ab9c4 100644 --- a/tests/integration-tests/tests/common/scaling_common.py +++ b/tests/integration-tests/tests/common/scaling_common.py @@ -377,22 +377,11 @@ def _write_json_override(remote_command_executor, path, content): ) -def setup_create_fleet_override_to_emulate_ice( - remote_command_executor, cluster_name, queue, compute_resource, instance_types -): - """ - Write create_fleet_overrides.json with invalid InstanceTypes to emulate ICE. - - This targets multi-instance-type CRs that use the create_fleet API. The override must include - LaunchTemplateSpecification (with the cluster's launch template name) to avoid MissingParameter - errors. The invalid InstanceTypes (prefixed with "ICE-") cause create_fleet to return no - instances, which is detected as InsufficientInstanceCapacity by the instance manager. - - To recover, call recover_create_fleet_override_from_ice() which replaces the invalid - InstanceTypes with real ones. - """ +def _build_create_fleet_override(cluster_name, queue, compute_resource, instance_types, subnet_id): + """Build a create_fleet_overrides dict with LaunchTemplateSpecification, InstanceTypes and SubnetId.""" lt_name = f"{cluster_name}-{queue}-{compute_resource}" - overrides = { + overrides_list = [{"InstanceType": it, "SubnetId": subnet_id} for it in instance_types] + return { queue: { compute_resource: { "LaunchTemplateConfigs": [ @@ -401,52 +390,46 @@ def setup_create_fleet_override_to_emulate_ice( "LaunchTemplateName": lt_name, "Version": "$Latest", }, - "Overrides": [{"InstanceType": f"ICE-{it}"} for it in instance_types], + "Overrides": overrides_list, } ], } } } - logging.info( - "Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " "for queue=%s, cr=%s (LT=%s)", - queue, - compute_resource, - lt_name, - ) + + +def setup_create_fleet_override_to_emulate_ice( + remote_command_executor, cluster_name, queue, compute_resource, instance_types, subnet_id +): + """ + Write create_fleet_overrides.json with invalid InstanceTypes to emulate ICE. + + This targets multi-instance-type CRs that use the create_fleet API. The override includes + LaunchTemplateSpecification and SubnetId to avoid MissingParameter errors. The invalid + InstanceTypes (prefixed with "ICE-") cause create_fleet to return no instances, which is + detected as InsufficientInstanceCapacity. + + To recover, call recover_create_fleet_override_from_ice() which replaces the invalid + InstanceTypes with real ones. + """ + ice_instance_types = [f"ICE-{it}" for it in instance_types] + overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, ice_instance_types, subnet_id) + logging.info("Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " + "for queue=%s, cr=%s", queue, compute_resource) _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) def recover_create_fleet_override_from_ice( - remote_command_executor, cluster_name, queue, compute_resource, real_instance_types + remote_command_executor, cluster_name, queue, compute_resource, real_instance_types, subnet_id ): """ Recover from simulated ICE by changing InstanceTypes in create_fleet_overrides.json back to real ones. This is the "change instance type in JSON to recover" approach — the invalid "ICE-*" prefixed - InstanceTypes are replaced with real ones, so the next create_fleet call succeeds. + InstanceTypes are replaced with real ones (with correct SubnetId), so the next create_fleet + call succeeds. """ - lt_name = f"{cluster_name}-{queue}-{compute_resource}" - overrides = { - queue: { - compute_resource: { - "LaunchTemplateConfigs": [ - { - "LaunchTemplateSpecification": { - "LaunchTemplateName": lt_name, - "Version": "$Latest", - }, - "Overrides": [{"InstanceType": it} for it in real_instance_types], - } - ], - } - } - } - logging.info( - "Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " - "for queue=%s, cr=%s (LT=%s)", - real_instance_types, - queue, - compute_resource, - lt_name, - ) + overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, real_instance_types, subnet_id) + logging.info("Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " + "for queue=%s, cr=%s", real_instance_types, queue, compute_resource) _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 2fadb90bdb..4804bf1750 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -626,12 +626,13 @@ def test_expedited_requeue( pcluster_config_reader, clusters_factory, scheduler_commands_factory, + vpc_stack, ): """ Test Slurm 25.11+ expedited requeue behavior with recoverable ICE simulation. - Uses JSON run_instances_overrides to simulate ICE (Placement.Tenancy=host triggers - InsufficientHostCapacity), then recovers by changing the override to a valid InstanceType. + Uses create_fleet_overrides.json to simulate ICE (invalid "ICE-" prefixed InstanceTypes), + then recovers by changing them back to real ones. Verifies that expedited requeue jobs are treated as highest priority after ICE recovery. """ cluster_config = pcluster_config_reader() @@ -643,6 +644,7 @@ def test_expedited_requeue( partition = "queue" ice_cr = "ice-cr" real_instance_types = ["t3.medium", "c5.large"] + subnet_id = vpc_stack.get_private_subnet() # Set up ICE simulation via create_fleet_overrides.json with invalid InstanceTypes setup_create_fleet_override_to_emulate_ice( @@ -651,6 +653,7 @@ def test_expedited_requeue( queue=partition, compute_resource=ice_cr, instance_types=real_instance_types, + subnet_id=subnet_id, ) # Set insufficient_capacity_timeout to 180s for quicker reset @@ -710,6 +713,7 @@ def test_expedited_requeue( queue=partition, compute_resource=ice_cr, real_instance_types=real_instance_types, + subnet_id=subnet_id, ) # Wait for insufficient_capacity_timeout to expire and nodes to reset @@ -720,7 +724,7 @@ def test_expedited_requeue( ) # Wait for the target dynamic node to be power-saved (reset) - wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=300) + wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=600) # Wait for job1 to run and enter REQUEUE_HOLD. # Known Slurm 25.11 bug: jobs with --requeue=expedite enter REQUEUE_HOLD after successful From 0dec864c49b2c1488f19d8fa04c6b3facb2820b0 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Fri, 13 Feb 2026 16:50:44 -0500 Subject: [PATCH 10/17] Increase the job finish timeout to 15mins --- tests/integration-tests/tests/schedulers/test_slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 4804bf1750..0ad4970597 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -733,7 +733,7 @@ def test_expedited_requeue( # We wait for REQUEUE_HOLD directly and read start time from the slurm output file, # since StartTime in scontrol resets to Unknown in REQUEUE_HOLD state. # TODO: Change to wait_job_completed + assert_job_succeeded once the Slurm bug is fixed. - retry(wait_fixed=seconds(10), stop_max_delay=minutes(8))(scheduler_commands.assert_job_state)( + retry(wait_fixed=seconds(10), stop_max_delay=minutes(15))(scheduler_commands.assert_job_state)( job1_id, "REQUEUE_HOLD" ) logging.info("Job1 entered REQUEUE_HOLD as expected (known Slurm 25.11 bug)") From eb72659166bba2ccfaf5d9d5a398c0668d48e3b4 Mon Sep 17 00:00:00 2001 From: Xuanqi He Date: Fri, 13 Feb 2026 17:39:21 -0500 Subject: [PATCH 11/17] Fix the test hung issue --- tests/integration-tests/tests/schedulers/test_slurm.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 0ad4970597..a1991f79b9 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -733,14 +733,16 @@ def test_expedited_requeue( # We wait for REQUEUE_HOLD directly and read start time from the slurm output file, # since StartTime in scontrol resets to Unknown in REQUEUE_HOLD state. # TODO: Change to wait_job_completed + assert_job_succeeded once the Slurm bug is fixed. - retry(wait_fixed=seconds(10), stop_max_delay=minutes(15))(scheduler_commands.assert_job_state)( - job1_id, "REQUEUE_HOLD" - ) + def _assert_job_in_requeue_hold(): + result = remote_command_executor.run_remote_command(f"scontrol show jobs -o {job1_id}") + assert_that(result.stdout).contains("JobState=REQUEUE_HOLD") + + retry(wait_fixed=seconds(10), stop_max_delay=minutes(15))(_assert_job_in_requeue_hold)() logging.info("Job1 entered REQUEUE_HOLD as expected (known Slurm 25.11 bug)") scheduler_commands.cancel_job(job1_id) # Wait for job2 to complete normally - scheduler_commands.wait_job_completed(job2_id, timeout=8) + scheduler_commands.wait_job_completed(job2_id, timeout=15) scheduler_commands.assert_job_succeeded(job2_id) # Read start times from slurm output files (epoch timestamps) From 813aa65f2237699e7bc73cf600e0a52f926634d1 Mon Sep 17 00:00:00 2001 From: Himani Anil Deshpande Date: Tue, 24 Feb 2026 13:24:31 -0500 Subject: [PATCH 12/17] Reverting the chnage which was because of Slurm bug fixed in 25.11.3 --- .../tests/schedulers/test_slurm.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index a1991f79b9..9f8176668f 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -676,7 +676,7 @@ def test_expedited_requeue( # Output epoch timestamp for reliable start time parsing from slurm output file job1_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job1 done"', + "command": 'echo "START_TIME=$(date +%s)"; sleep 180; echo "Job1 done"', "nodes": 1, "partition": partition, "host": target_node, @@ -726,20 +726,9 @@ def test_expedited_requeue( # Wait for the target dynamic node to be power-saved (reset) wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=600) - # Wait for job1 to run and enter REQUEUE_HOLD. - # Known Slurm 25.11 bug: jobs with --requeue=expedite enter REQUEUE_HOLD after successful - # completion instead of COMPLETED, because _set_job_requeue_exit_value() unconditionally - # triggers expedited requeue without checking exit_code. - # We wait for REQUEUE_HOLD directly and read start time from the slurm output file, - # since StartTime in scontrol resets to Unknown in REQUEUE_HOLD state. - # TODO: Change to wait_job_completed + assert_job_succeeded once the Slurm bug is fixed. - def _assert_job_in_requeue_hold(): - result = remote_command_executor.run_remote_command(f"scontrol show jobs -o {job1_id}") - assert_that(result.stdout).contains("JobState=REQUEUE_HOLD") - - retry(wait_fixed=seconds(10), stop_max_delay=minutes(15))(_assert_job_in_requeue_hold)() - logging.info("Job1 entered REQUEUE_HOLD as expected (known Slurm 25.11 bug)") - scheduler_commands.cancel_job(job1_id) + # Wait for both jobs to complete — they should now run on recovered nodes + scheduler_commands.wait_job_completed(job1_id, timeout=15) + scheduler_commands.assert_job_succeeded(job1_id) # Wait for job2 to complete normally scheduler_commands.wait_job_completed(job2_id, timeout=15) From 937aaa9d3e4ecaaf7bc6bc0fceeefec1cbdc107c Mon Sep 17 00:00:00 2001 From: Himani Anil Deshpande Date: Tue, 24 Feb 2026 15:44:48 -0500 Subject: [PATCH 13/17] Adding exclusive flag since we want to identify that Expedited Job runs 1st on the node we are targetting --- tests/integration-tests/tests/schedulers/test_slurm.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 9f8176668f..631f39e664 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -676,11 +676,11 @@ def test_expedited_requeue( # Output epoch timestamp for reliable start time parsing from slurm output file job1_id = scheduler_commands.submit_command_and_assert_job_accepted( submit_command_args={ - "command": 'echo "START_TIME=$(date +%s)"; sleep 180; echo "Job1 done"', + "command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job1 done"', "nodes": 1, "partition": partition, "host": target_node, - "other_options": "--requeue=expedite", + "other_options": "--requeue=expedite --exclusive", } ) logging.info("Submitted job1 (expedited requeue) ID: %s", job1_id) @@ -692,6 +692,7 @@ def test_expedited_requeue( "nodes": 1, "partition": partition, "host": target_node, + "other_options": "--exclusive", } ) logging.info("Submitted job2 (normal) ID: %s", job2_id) From aec47aae872c761163aa334d494afc4e958e9cd8 Mon Sep 17 00:00:00 2001 From: Himani Anil Deshpande Date: Tue, 24 Feb 2026 15:48:37 -0500 Subject: [PATCH 14/17] Adding exclusive flag since we want to identify that Expedited Job runs 1st on the node we are targetting * adding a wait fix of 5 secs --- tests/integration-tests/tests/schedulers/test_slurm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 631f39e664..a3911d1f3f 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -725,7 +725,7 @@ def test_expedited_requeue( ) # Wait for the target dynamic node to be power-saved (reset) - wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], stop_max_delay_secs=600) + wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], wait_fixed_secs=5, stop_max_delay_secs=600) # Wait for both jobs to complete — they should now run on recovered nodes scheduler_commands.wait_job_completed(job1_id, timeout=15) From 99939fe6409d9a30764cf2780fb30bc8706d8f3a Mon Sep 17 00:00:00 2001 From: Himani Anil Deshpande Date: Wed, 25 Feb 2026 17:25:50 -0500 Subject: [PATCH 15/17] Add another job with expediated requeue flag --- .../tests/schedulers/test_slurm.py | 222 ++++++++++-------- 1 file changed, 129 insertions(+), 93 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index a3911d1f3f..28b1c50092 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -620,6 +620,106 @@ def test_fast_capacity_failover( ) +def _submit_jobs_and_simulate_ice(common_cluster_details, jobs): + """ + Set up ICE simulation, submit jobs, and wait for ICE to be detected. + + Each entry in jobs is a dict with "label" (str) and "expedited" (bool). + Returns a list of job IDs in the same order as the input jobs. + """ + rce = common_cluster_details["remote_command_executor"] + scheduler_commands = common_cluster_details["scheduler_commands"] + target_nodes = common_cluster_details["target_nodes"] + + # Set up ICE simulation + setup_create_fleet_override_to_emulate_ice( + rce, + cluster_name=common_cluster_details["cluster_name"], + queue=common_cluster_details["partition"], + compute_resource=common_cluster_details["ice_compute_res"], + instance_types=common_cluster_details["real_instance_types"], + subnet_id=common_cluster_details["subnet_id"], + ) + + # Clear logs for clean state + rce.clear_slurm_resume_log() + rce.clear_clustermgtd_log() + + # Submit all jobs + job_ids = [] + for job in jobs: + requeue_opt = "--requeue=expedite " if job["expedited"] else "" + job_type = "expedited" if job["expedited"] else "normal" + jid = scheduler_commands.submit_command_and_assert_job_accepted( + submit_command_args={ + "command": f'echo "START_TIME=$(date +%s)"; sleep 30; echo "{job["label"]} done"', + "nodes": 1, + "partition": common_cluster_details["partition"], + "host": target_nodes[0], + "other_options": f"{requeue_opt}--exclusive", + } + ) + logging.info("Submitted %s (%s) ID: %s", job["label"], job_type, jid) + job_ids.append(jid) + + # Wait for ICE to be detected + retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)( + rce, + ["/var/log/parallelcluster/clustermgtd"], + ["The following compute resources are in down state due to insufficient capacity"], + ) + + # Verify target nodes are down due to ICE + assert_compute_node_states(scheduler_commands, target_nodes, expected_states=["down#", "down~"]) + + return job_ids + + +def _recover_from_ice_and_wait_for_jobs(common_cluster_details, job_ids): + """Recover from ICE simulation, wait for nodes to reset, and wait for all jobs to complete.""" + rce = common_cluster_details["remote_command_executor"] + scheduler_commands = common_cluster_details["scheduler_commands"] + target_nodes = common_cluster_details["target_nodes"] + + # Recover from ICE + recover_create_fleet_override_from_ice( + rce, + cluster_name=common_cluster_details["cluster_name"], + queue=common_cluster_details["partition"], + compute_resource=common_cluster_details["ice_compute_res"], + real_instance_types=common_cluster_details["real_instance_types"], + subnet_id=common_cluster_details["subnet_id"], + ) + + # Wait for insufficient_capacity_timeout to expire and nodes to reset + retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)( + rce, + ["/var/log/parallelcluster/clustermgtd"], + ["Reset the following compute resources because insufficient capacity timeout expired"], + ) + + # Wait for target nodes to be power-saved (reset) + wait_for_compute_nodes_states( + scheduler_commands, target_nodes, expected_states=["idle~"], wait_fixed_secs=5, stop_max_delay_secs=600 + ) + + # Wait for all jobs to complete + for jid in job_ids: + scheduler_commands.wait_job_completed(jid, timeout=15) + scheduler_commands.assert_job_succeeded(jid) + + +def _collect_start_epochs(remote_command_executor, job_ids): + """Read START_TIME epoch from slurm output files and return as a list in the same order.""" + epochs = [] + for jid in job_ids: + output = remote_command_executor.run_remote_command(f"cat ~/slurm-{jid}.out").stdout + logging.info("Job %s output: %s", jid, output) + epoch = int(re.search(r"START_TIME=(\d+)", output).group(1)) + epochs.append(epoch) + return epochs + + @pytest.mark.usefixtures("region", "os", "instance", "scheduler") @pytest.mark.expedited_requeue def test_expedited_requeue( @@ -642,21 +742,11 @@ def test_expedited_requeue( scheduler_commands = scheduler_commands_factory(remote_command_executor) partition = "queue" - ice_cr = "ice-cr" + ice_compute_res = "ice-cr" real_instance_types = ["t3.medium", "c5.large"] subnet_id = vpc_stack.get_private_subnet() - # Set up ICE simulation via create_fleet_overrides.json with invalid InstanceTypes - setup_create_fleet_override_to_emulate_ice( - remote_command_executor, - cluster_name=cluster.cfn_name, - queue=partition, - compute_resource=ice_cr, - instance_types=real_instance_types, - subnet_id=subnet_id, - ) - - # Set insufficient_capacity_timeout to 180s for quicker reset + # Set insufficient_capacity_timeout to 180s for quick reset of iced-ompute resource _set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path) # Get node lists — all nodes in ice-cr are dynamic @@ -668,92 +758,38 @@ def test_expedited_requeue( target_node = ice_dynamic_nodes[0] logging.info("Target dynamic node for ICE test: %s", target_node) - # Clear logs for clean state - remote_command_executor.clear_slurm_resume_log() - remote_command_executor.clear_clustermgtd_log() - - # Submit job1 with --requeue=expedite, targeting the specific ICE dynamic node - # Output epoch timestamp for reliable start time parsing from slurm output file - job1_id = scheduler_commands.submit_command_and_assert_job_accepted( - submit_command_args={ - "command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job1 done"', - "nodes": 1, - "partition": partition, - "host": target_node, - "other_options": "--requeue=expedite --exclusive", - } - ) - logging.info("Submitted job1 (expedited requeue) ID: %s", job1_id) - - # Submit job2 (normal), targeting the same ICE dynamic node - job2_id = scheduler_commands.submit_command_and_assert_job_accepted( - submit_command_args={ - "command": 'echo "START_TIME=$(date +%s)"; sleep 30; echo "Job2 done"', - "nodes": 1, - "partition": partition, - "host": target_node, - "other_options": "--exclusive", - } - ) - logging.info("Submitted job2 (normal) ID: %s", job2_id) - - # Wait for ICE to be detected - retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)( - remote_command_executor, - ["/var/log/parallelcluster/clustermgtd"], - ["The following compute resources are in down state due to insufficient capacity"], - ) - - # Verify the target dynamic node is down due to ICE - assert_compute_node_states(scheduler_commands, [target_node], expected_states=["down#", "down~"]) - - # Recover from ICE: change InstanceTypes in JSON override back to real ones - recover_create_fleet_override_from_ice( - remote_command_executor, - cluster_name=cluster.cfn_name, - queue=partition, - compute_resource=ice_cr, - real_instance_types=real_instance_types, - subnet_id=subnet_id, - ) - - # Wait for insufficient_capacity_timeout to expire and nodes to reset - retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)( - remote_command_executor, - ["/var/log/parallelcluster/clustermgtd"], - ["Reset the following compute resources because insufficient capacity timeout expired"], - ) - - # Wait for the target dynamic node to be power-saved (reset) - wait_for_compute_nodes_states(scheduler_commands, [target_node], expected_states=["idle~"], wait_fixed_secs=5, stop_max_delay_secs=600) + common_cluster_details = { + "remote_command_executor": remote_command_executor, + "cluster_name": cluster.cfn_name, + "scheduler_commands": scheduler_commands, + "partition": partition, + "ice_compute_res": ice_compute_res, + "real_instance_types": real_instance_types, + "subnet_id": subnet_id, + "target_nodes": [target_node], + } - # Wait for both jobs to complete — they should now run on recovered nodes - scheduler_commands.wait_job_completed(job1_id, timeout=15) - scheduler_commands.assert_job_succeeded(job1_id) + # Submit 3 jobs in a single ICE cycle: + # job1 (normal), job2 (expedited), job3 (expedited) + jobs = [ + {"label": "job1", "expedited": False}, + {"label": "job2", "expedited": True}, + {"label": "job3", "expedited": True}, + ] + job_ids = _submit_jobs_and_simulate_ice(common_cluster_details, jobs) + _recover_from_ice_and_wait_for_jobs(common_cluster_details, job_ids) + start_epochs = _collect_start_epochs(remote_command_executor, job_ids) - # Wait for job2 to complete normally - scheduler_commands.wait_job_completed(job2_id, timeout=15) - scheduler_commands.assert_job_succeeded(job2_id) + # Expected ordering after ICE recovery: + # Expedited jobs (job2, job3) run first in submission order, then normal job (job1) last + logging.info("Start epochs: %s", dict(zip([j["label"] for j in jobs], start_epochs))) - # Read start times from slurm output files (epoch timestamps) - job1_output = remote_command_executor.run_remote_command(f"cat ~/slurm-{job1_id}.out").stdout - job2_output = remote_command_executor.run_remote_command(f"cat ~/slurm-{job2_id}.out").stdout - job1_start_epoch = int(re.search(r"START_TIME=(\d+)", job1_output).group(1)) - job2_start_epoch = int(re.search(r"START_TIME=(\d+)", job2_output).group(1)) - logging.info("Job1 output: %s", job1_output) - logging.info("Job2 output: %s", job2_output) - # Verify expedited requeue job (job1) started before normal job (job2) - logging.info("Job1 start_epoch=%s, Job2 start_epoch=%s", job1_start_epoch, job2_start_epoch) - assert_that(job1_start_epoch).is_less_than_or_equal_to(job2_start_epoch) - logging.info("Verified: expedited requeue job started before normal job (highest priority)") + assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[1]) # job2 (expedited) before job1 (normal) + assert_that(start_epochs[3]).is_less_than_or_equal_to(start_epochs[1]) # job3 (expedited) before job1 (normal) + assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[3]) # job2 (expedited) before job3 (expedited) + logging.info("Verified: expedited jobs (job2, job3) ran before normal job (job1)") - # Verify no protected mode triggered - assert_no_msg_in_logs( - remote_command_executor, - ["/var/log/parallelcluster/clustermgtd"], - ["Node bootstrap error"], - ) @pytest.mark.usefixtures("region", "os", "instance", "scheduler") From e2533afa7ae54bcad1c553089bf297a4510a692e Mon Sep 17 00:00:00 2001 From: Himani Anil Deshpande Date: Thu, 26 Feb 2026 20:50:57 -0500 Subject: [PATCH 16/17] remove 3rd job --- tests/integration-tests/tests/schedulers/test_slurm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 28b1c50092..9de8ce6803 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -774,7 +774,7 @@ def test_expedited_requeue( jobs = [ {"label": "job1", "expedited": False}, {"label": "job2", "expedited": True}, - {"label": "job3", "expedited": True}, + # {"label": "job3", "expedited": True}, ] job_ids = _submit_jobs_and_simulate_ice(common_cluster_details, jobs) _recover_from_ice_and_wait_for_jobs(common_cluster_details, job_ids) @@ -786,8 +786,8 @@ def test_expedited_requeue( assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[1]) # job2 (expedited) before job1 (normal) - assert_that(start_epochs[3]).is_less_than_or_equal_to(start_epochs[1]) # job3 (expedited) before job1 (normal) - assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[3]) # job2 (expedited) before job3 (expedited) + # assert_that(start_epochs[3]).is_less_than_or_equal_to(start_epochs[1]) # job3 (expedited) before job1 (normal) + # assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[3]) # job2 (expedited) before job3 (expedited) logging.info("Verified: expedited jobs (job2, job3) ran before normal job (job1)") From 098cbea470b5120e9b04d828197ce2c651d324b0 Mon Sep 17 00:00:00 2001 From: Himani Anil Deshpande Date: Mon, 2 Mar 2026 10:25:17 -0500 Subject: [PATCH 17/17] code-linters --- .../tests/common/scaling_common.py | 15 +++++++++++---- .../tests/schedulers/test_slurm.py | 13 +++++++------ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/tests/integration-tests/tests/common/scaling_common.py b/tests/integration-tests/tests/common/scaling_common.py index f8cc7ab9c4..7a70a9914a 100644 --- a/tests/integration-tests/tests/common/scaling_common.py +++ b/tests/integration-tests/tests/common/scaling_common.py @@ -414,8 +414,11 @@ def setup_create_fleet_override_to_emulate_ice( """ ice_instance_types = [f"ICE-{it}" for it in instance_types] overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, ice_instance_types, subnet_id) - logging.info("Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " - "for queue=%s, cr=%s", queue, compute_resource) + logging.info( + "Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " "for queue=%s, cr=%s", + queue, + compute_resource, + ) _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) @@ -430,6 +433,10 @@ def recover_create_fleet_override_from_ice( call succeeds. """ overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, real_instance_types, subnet_id) - logging.info("Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " - "for queue=%s, cr=%s", real_instance_types, queue, compute_resource) + logging.info( + "Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " "for queue=%s, cr=%s", + real_instance_types, + queue, + compute_resource, + ) _write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides) diff --git a/tests/integration-tests/tests/schedulers/test_slurm.py b/tests/integration-tests/tests/schedulers/test_slurm.py index 9de8ce6803..6a353b8e2f 100644 --- a/tests/integration-tests/tests/schedulers/test_slurm.py +++ b/tests/integration-tests/tests/schedulers/test_slurm.py @@ -659,6 +659,7 @@ def _submit_jobs_and_simulate_ice(common_cluster_details, jobs): "other_options": f"{requeue_opt}--exclusive", } ) + time.sleep(20) # Add a sleep to match manual submission of job logging.info("Submitted %s (%s) ID: %s", job["label"], job_type, jid) job_ids.append(jid) @@ -771,6 +772,8 @@ def test_expedited_requeue( # Submit 3 jobs in a single ICE cycle: # job1 (normal), job2 (expedited), job3 (expedited) + # TODO: Improve the test by making Job 2 as expedited and Job 1 as normal + # so that its clear that Job 2 goes at the top of the queue jobs = [ {"label": "job1", "expedited": False}, {"label": "job2", "expedited": True}, @@ -784,12 +787,10 @@ def test_expedited_requeue( # Expedited jobs (job2, job3) run first in submission order, then normal job (job1) last logging.info("Start epochs: %s", dict(zip([j["label"] for j in jobs], start_epochs))) - - assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[1]) # job2 (expedited) before job1 (normal) - # assert_that(start_epochs[3]).is_less_than_or_equal_to(start_epochs[1]) # job3 (expedited) before job1 (normal) - # assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[3]) # job2 (expedited) before job3 (expedited) - logging.info("Verified: expedited jobs (job2, job3) ran before normal job (job1)") - + assert_that(start_epochs[1]).is_less_than_or_equal_to(start_epochs[0]) # job1 (normal) after job2 (expedited) + # assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[0]) # job3 (expedited) before job1 (normal) + # assert_that(start_epochs[1]).is_less_than_or_equal_to(start_epochs[2]) # job2 (expedited) before job3 (expedited) + logging.info("Verified: expedited jobs (job2) ran before normal job (job1)") @pytest.mark.usefixtures("region", "os", "instance", "scheduler")