Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions tests/integration-tests/configs/develop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,12 @@ test-suites:
instances: {{ common.INSTANCES_DEFAULT_X86 }}
oss: [{{ OS_X86_5 }}]
schedulers: ["slurm"]
test_slurm.py::test_expedited_requeue:
dimensions:
- regions: ["ap-east-1"]
instances: {{ common.INSTANCES_DEFAULT_X86 }}
oss: [{{ OS_X86_5 }}]
schedulers: ["slurm"]
test_slurm.py::test_slurm_config_update:
dimensions:
- regions: [ "ap-east-1" ]
Expand Down
76 changes: 76 additions & 0 deletions tests/integration-tests/tests/common/scaling_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from utils import get_compute_nodes_instance_count

SCALING_COMMON_DATADIR = pathlib.Path(__file__).parent / "scaling"
RUN_INSTANCES_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/run_instances_overrides.json"
CREATE_FLEET_OVERRIDES_PATH = "/opt/slurm/etc/pcluster/create_fleet_overrides.json"


def validate_and_get_scaling_test_config(scaling_test_config_file):
Expand Down Expand Up @@ -364,3 +366,77 @@ def setup_ec2_launch_override_to_emulate_ice(
run_as_root=True,
)
# fmt: on


def _write_json_override(remote_command_executor, path, content):
"""Write a JSON override file on the head node."""
json_str = json.dumps(content)
remote_command_executor.run_remote_command(
f"echo '{json_str}' | sudo tee {path}",
raise_on_error=True,
)


def _build_create_fleet_override(cluster_name, queue, compute_resource, instance_types, subnet_id):
"""Build a create_fleet_overrides dict with LaunchTemplateSpecification, InstanceTypes and SubnetId."""
lt_name = f"{cluster_name}-{queue}-{compute_resource}"
overrides_list = [{"InstanceType": it, "SubnetId": subnet_id} for it in instance_types]
return {
queue: {
compute_resource: {
"LaunchTemplateConfigs": [
{
"LaunchTemplateSpecification": {
"LaunchTemplateName": lt_name,
"Version": "$Latest",
},
"Overrides": overrides_list,
}
],
}
}
}


def setup_create_fleet_override_to_emulate_ice(
remote_command_executor, cluster_name, queue, compute_resource, instance_types, subnet_id
):
"""
Write create_fleet_overrides.json with invalid InstanceTypes to emulate ICE.

This targets multi-instance-type CRs that use the create_fleet API. The override includes
LaunchTemplateSpecification and SubnetId to avoid MissingParameter errors. The invalid
InstanceTypes (prefixed with "ICE-") cause create_fleet to return no instances, which is
detected as InsufficientInstanceCapacity.

To recover, call recover_create_fleet_override_from_ice() which replaces the invalid
InstanceTypes with real ones.
"""
ice_instance_types = [f"ICE-{it}" for it in instance_types]
overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, ice_instance_types, subnet_id)
logging.info(
"Writing create_fleet_overrides.json with invalid InstanceTypes to emulate ICE " "for queue=%s, cr=%s",
queue,
compute_resource,
)
_write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides)


def recover_create_fleet_override_from_ice(
remote_command_executor, cluster_name, queue, compute_resource, real_instance_types, subnet_id
):
"""
Recover from simulated ICE by changing InstanceTypes in create_fleet_overrides.json back to real ones.

This is the "change instance type in JSON to recover" approach — the invalid "ICE-*" prefixed
InstanceTypes are replaced with real ones (with correct SubnetId), so the next create_fleet
call succeeds.
"""
overrides = _build_create_fleet_override(cluster_name, queue, compute_resource, real_instance_types, subnet_id)
logging.info(
"Recovering from ICE: writing real InstanceTypes=%s in create_fleet_overrides.json " "for queue=%s, cr=%s",
real_instance_types,
queue,
compute_resource,
)
_write_json_override(remote_command_executor, CREATE_FLEET_OVERRIDES_PATH, overrides)
179 changes: 178 additions & 1 deletion tests/integration-tests/tests/schedulers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@
wait_for_compute_nodes_states,
wait_for_num_nodes_in_scheduler,
)
from tests.common.scaling_common import setup_ec2_launch_override_to_emulate_ice
from tests.common.scaling_common import (
recover_create_fleet_override_from_ice,
setup_create_fleet_override_to_emulate_ice,
setup_ec2_launch_override_to_emulate_ice,
)
from tests.common.schedulers_common import SlurmCommands


Expand Down Expand Up @@ -616,6 +620,179 @@ def test_fast_capacity_failover(
)


def _submit_jobs_and_simulate_ice(common_cluster_details, jobs):
"""
Set up ICE simulation, submit jobs, and wait for ICE to be detected.

Each entry in jobs is a dict with "label" (str) and "expedited" (bool).
Returns a list of job IDs in the same order as the input jobs.
"""
rce = common_cluster_details["remote_command_executor"]
scheduler_commands = common_cluster_details["scheduler_commands"]
target_nodes = common_cluster_details["target_nodes"]

# Set up ICE simulation
setup_create_fleet_override_to_emulate_ice(
rce,
cluster_name=common_cluster_details["cluster_name"],
queue=common_cluster_details["partition"],
compute_resource=common_cluster_details["ice_compute_res"],
instance_types=common_cluster_details["real_instance_types"],
subnet_id=common_cluster_details["subnet_id"],
)

# Clear logs for clean state
rce.clear_slurm_resume_log()
rce.clear_clustermgtd_log()

# Submit all jobs
job_ids = []
for job in jobs:
requeue_opt = "--requeue=expedite " if job["expedited"] else ""
job_type = "expedited" if job["expedited"] else "normal"
jid = scheduler_commands.submit_command_and_assert_job_accepted(
submit_command_args={
"command": f'echo "START_TIME=$(date +%s)"; sleep 30; echo "{job["label"]} done"',
"nodes": 1,
"partition": common_cluster_details["partition"],
"host": target_nodes[0],
"other_options": f"{requeue_opt}--exclusive",
}
)
time.sleep(20) # Add a sleep to match manual submission of job
logging.info("Submitted %s (%s) ID: %s", job["label"], job_type, jid)
job_ids.append(jid)

# Wait for ICE to be detected
retry(wait_fixed=seconds(20), stop_max_delay=minutes(3))(assert_lines_in_logs)(
rce,
["/var/log/parallelcluster/clustermgtd"],
["The following compute resources are in down state due to insufficient capacity"],
)

# Verify target nodes are down due to ICE
assert_compute_node_states(scheduler_commands, target_nodes, expected_states=["down#", "down~"])

return job_ids


def _recover_from_ice_and_wait_for_jobs(common_cluster_details, job_ids):
"""Recover from ICE simulation, wait for nodes to reset, and wait for all jobs to complete."""
rce = common_cluster_details["remote_command_executor"]
scheduler_commands = common_cluster_details["scheduler_commands"]
target_nodes = common_cluster_details["target_nodes"]

# Recover from ICE
recover_create_fleet_override_from_ice(
rce,
cluster_name=common_cluster_details["cluster_name"],
queue=common_cluster_details["partition"],
compute_resource=common_cluster_details["ice_compute_res"],
real_instance_types=common_cluster_details["real_instance_types"],
subnet_id=common_cluster_details["subnet_id"],
)

# Wait for insufficient_capacity_timeout to expire and nodes to reset
retry(wait_fixed=seconds(20), stop_max_delay=minutes(4))(assert_lines_in_logs)(
rce,
["/var/log/parallelcluster/clustermgtd"],
["Reset the following compute resources because insufficient capacity timeout expired"],
)

# Wait for target nodes to be power-saved (reset)
wait_for_compute_nodes_states(
scheduler_commands, target_nodes, expected_states=["idle~"], wait_fixed_secs=5, stop_max_delay_secs=600
)

# Wait for all jobs to complete
for jid in job_ids:
scheduler_commands.wait_job_completed(jid, timeout=15)
scheduler_commands.assert_job_succeeded(jid)


def _collect_start_epochs(remote_command_executor, job_ids):
"""Read START_TIME epoch from slurm output files and return as a list in the same order."""
epochs = []
for jid in job_ids:
output = remote_command_executor.run_remote_command(f"cat ~/slurm-{jid}.out").stdout
logging.info("Job %s output: %s", jid, output)
epoch = int(re.search(r"START_TIME=(\d+)", output).group(1))
epochs.append(epoch)
return epochs


@pytest.mark.usefixtures("region", "os", "instance", "scheduler")
@pytest.mark.expedited_requeue
def test_expedited_requeue(
pcluster_config_reader,
clusters_factory,
scheduler_commands_factory,
vpc_stack,
):
"""
Test Slurm 25.11+ expedited requeue behavior with recoverable ICE simulation.

Uses create_fleet_overrides.json to simulate ICE (invalid "ICE-" prefixed InstanceTypes),
then recovers by changing them back to real ones.
Verifies that expedited requeue jobs are treated as highest priority after ICE recovery.
"""
cluster_config = pcluster_config_reader()
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
clustermgtd_conf_path = retrieve_clustermgtd_conf_path(remote_command_executor)
scheduler_commands = scheduler_commands_factory(remote_command_executor)

partition = "queue"
ice_compute_res = "ice-cr"
real_instance_types = ["t3.medium", "c5.large"]
subnet_id = vpc_stack.get_private_subnet()

# Set insufficient_capacity_timeout to 180s for quick reset of iced-ompute resource
_set_insufficient_capacity_timeout(remote_command_executor, 180, clustermgtd_conf_path)

# Get node lists — all nodes in ice-cr are dynamic
nodes_in_scheduler = scheduler_commands.get_compute_nodes(partition, all_nodes=True)
_, dynamic_nodes = get_partition_nodes(nodes_in_scheduler)
ice_dynamic_nodes = [n for n in dynamic_nodes]
logging.info("ICE CR dynamic nodes: %s", ice_dynamic_nodes)
# Pick a specific dynamic node to target
target_node = ice_dynamic_nodes[0]
logging.info("Target dynamic node for ICE test: %s", target_node)

common_cluster_details = {
"remote_command_executor": remote_command_executor,
"cluster_name": cluster.cfn_name,
"scheduler_commands": scheduler_commands,
"partition": partition,
"ice_compute_res": ice_compute_res,
"real_instance_types": real_instance_types,
"subnet_id": subnet_id,
"target_nodes": [target_node],
}

# Submit 3 jobs in a single ICE cycle:
# job1 (normal), job2 (expedited), job3 (expedited)
# TODO: Improve the test by making Job 2 as expedited and Job 1 as normal
# so that its clear that Job 2 goes at the top of the queue
jobs = [
{"label": "job1", "expedited": False},
{"label": "job2", "expedited": True},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to verify the scenario customer requested -> the job with expedited requeue flag should still have the highest priority after requeue.

Not the scenario the job with expedited requeue flag should run before a normal job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the usecase that CX has is that the higher priority job that they want gets to the top of the queue and runs before an other job which is what i tested in the most obvious way i could. with the original submission of expediated and normal job it was not obvious whether the expediated feature was working or the original submission order was being maintained

# {"label": "job3", "expedited": True},
]
job_ids = _submit_jobs_and_simulate_ice(common_cluster_details, jobs)
_recover_from_ice_and_wait_for_jobs(common_cluster_details, job_ids)
start_epochs = _collect_start_epochs(remote_command_executor, job_ids)

# Expected ordering after ICE recovery:
# Expedited jobs (job2, job3) run first in submission order, then normal job (job1) last
logging.info("Start epochs: %s", dict(zip([j["label"] for j in jobs], start_epochs)))

assert_that(start_epochs[1]).is_less_than_or_equal_to(start_epochs[0]) # job1 (normal) after job2 (expedited)
# assert_that(start_epochs[2]).is_less_than_or_equal_to(start_epochs[0]) # job3 (expedited) before job1 (normal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have these assertions been commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a TODO which is for addition of another job 3 which is an improvement for now the test is focusing on only 2 Jobs

# assert_that(start_epochs[1]).is_less_than_or_equal_to(start_epochs[2]) # job2 (expedited) before job3 (expedited)
logging.info("Verified: expedited jobs (job2) ran before normal job (job1)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.



@pytest.mark.usefixtures("region", "os", "instance", "scheduler")
@pytest.mark.slurm_config_update
def test_slurm_config_update(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Image:
Os: {{ os }}
HeadNode:
InstanceType: {{ instance }}
Networking:
SubnetId: {{ public_subnet_id }}
Ssh:
KeyName: {{ key_name }}
Scheduling:
Scheduler: slurm
SlurmQueues:
- Name: queue
Networking:
SubnetIds:
- {{ private_subnet_id }}
ComputeResources:
- Name: ice-cr
Instances:
- InstanceType: t3.medium
- InstanceType: c5.large
- Name: normal-cr
InstanceType: {{ instance }}
SharedStorage:
- MountDir: /shared
Name: name1
StorageType: Ebs
Loading