From 9dc1bee63a130dcf585d0a46c217d6790152e46b Mon Sep 17 00:00:00 2001 From: Helena Greebe Date: Sun, 22 Feb 2026 15:21:01 -0500 Subject: [PATCH 1/5] Add timeout for drained nodes --- src/slurm_plugin/clustermgtd.py | 46 +++++++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/src/slurm_plugin/clustermgtd.py b/src/slurm_plugin/clustermgtd.py index c612dea1..398001d6 100644 --- a/src/slurm_plugin/clustermgtd.py +++ b/src/slurm_plugin/clustermgtd.py @@ -151,6 +151,7 @@ class ClustermgtdConfig: "terminate_down_nodes": True, "orphaned_instance_timeout": 300, "ec2_instance_missing_max_count": 0, + "hold_drain_nodes_timeout": 30, # Health check configs "disable_ec2_health_check": False, "disable_scheduled_event_health_check": False, @@ -293,6 +294,9 @@ def _get_terminate_config(self, config): self.terminate_drain_nodes = config.getboolean( "clustermgtd", "terminate_drain_nodes", fallback=self.DEFAULTS.get("terminate_drain_nodes") ) + self.hold_drain_nodes_timeout = config.getint( + "clustermgtd", "hold_drain_nodes_timeout", fallback=self.DEFAULTS.get("hold_drain_nodes_timeout") + ) self.terminate_down_nodes = config.getboolean( "clustermgtd", "terminate_down_nodes", fallback=self.DEFAULTS.get("terminate_down_nodes") ) @@ -388,6 +392,7 @@ def __init__(self, config): This state is required because we need to ignore static nodes that might have long bootstrap time """ self._insufficient_capacity_compute_resources = {} + self._held_compute_resources = {} self._static_nodes_in_replacement = set() self._partitions_protected_failure_count_map = {} self._nodes_without_backing_instance_count_map = {} @@ -783,6 +788,10 @@ def _find_unhealthy_slurm_nodes(self, slurm_nodes): # do not consider as unhealthy the nodes reserved for capacity blocks continue + # Track when the node was first found unhealthy + if node.name not in self._held_compute_resources: + self._held_compute_resources[node.name] = self._current_time + all_unhealthy_nodes.append(node) if isinstance(node, StaticNode): @@ -798,6 +807,14 @@ def _find_unhealthy_slurm_nodes(self, slurm_nodes): self._config.ec2_instance_missing_max_count, self._nodes_without_backing_instance_count_map, ) + + # Clean up nodes that are no longer unhealthy from _held_compute_resources + unhealthy_node_names = {node.name for node in all_unhealthy_nodes} + self._held_compute_resources = { + name: timestamp for name, timestamp in self._held_compute_resources.items() + if name in unhealthy_node_names + } + return ( unhealthy_dynamic_nodes, unhealthy_static_nodes, @@ -822,18 +839,28 @@ def _handle_unhealthy_dynamic_nodes(self, unhealthy_dynamic_nodes): """ Maintain any unhealthy dynamic node. - Terminate instances backing dynamic nodes. + Terminate instances backing dynamic nodes only after hold_drain_nodes_timeout. Setting node to down will let slurm requeue jobs allocated to node. Setting node to power_down will terminate backing instance and reset dynamic node for future use. """ - instances_to_terminate = [node.instance.id for node in unhealthy_dynamic_nodes if node.instance] + # Filter to only nodes that have exceeded the hold timeout + nodes_to_terminate = [] + for node in unhealthy_dynamic_nodes: + if node.name not in self._held_compute_resources: + nodes_to_terminate += node + else: + if time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): + nodes_to_terminate += node + self._held_compute_resources.pop(node.name, None) + + instances_to_terminate = [node.instance.id for node in nodes_to_terminate if node.instance] if instances_to_terminate: log.info("Terminating instances that are backing unhealthy dynamic nodes") self._instance_manager.delete_instances( instances_to_terminate, terminate_batch_size=self._config.terminate_max_batch_size ) log.info("Setting unhealthy dynamic nodes to down and power_down.") - set_nodes_power_down([node.name for node in unhealthy_dynamic_nodes], reason="Scheduler health check failed") + set_nodes_power_down([node.name for node in nodes_to_terminate], reason="Scheduler health check failed") @log_exception(log, "maintaining powering down nodes", raise_on_error=False) def _handle_powering_down_nodes(self, slurm_nodes): @@ -880,7 +907,16 @@ def _handle_unhealthy_static_nodes(self, unhealthy_static_nodes): except Exception as e: log.error("Encountered exception when retrieving console output from unhealthy static nodes: %s", e) - node_list = [node.name for node in unhealthy_static_nodes] + nodes_to_terminate = [] + for node in unhealthy_static_nodes: + if node.name not in self._held_compute_resources: + nodes_to_terminate += node + else: + if time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): + nodes_to_terminate += node + self._held_compute_resources.pop(node.name, None) + + node_list = [node.name for node in nodes_to_terminate] # Set nodes into down state so jobs can be requeued immediately try: log.info("Setting unhealthy static nodes to DOWN") @@ -888,7 +924,7 @@ def _handle_unhealthy_static_nodes(self, unhealthy_static_nodes): except Exception as e: log.error("Encountered exception when setting unhealthy static nodes into down state: %s", e) - instances_to_terminate = [node.instance.id for node in unhealthy_static_nodes if node.instance] + instances_to_terminate = [node.instance.id for node in nodes_to_terminate if node.instance] if instances_to_terminate: log.info("Terminating instances backing unhealthy static nodes") From dfcd0ddb3f6ec1162b0ab01139f7ab45659402bd Mon Sep 17 00:00:00 2001 From: Helena Greebe Date: Sun, 22 Feb 2026 15:27:14 -0500 Subject: [PATCH 2/5] Add logging --- src/slurm_plugin/clustermgtd.py | 34 +++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/src/slurm_plugin/clustermgtd.py b/src/slurm_plugin/clustermgtd.py index 398001d6..6d90b85d 100644 --- a/src/slurm_plugin/clustermgtd.py +++ b/src/slurm_plugin/clustermgtd.py @@ -847,11 +847,18 @@ def _handle_unhealthy_dynamic_nodes(self, unhealthy_dynamic_nodes): nodes_to_terminate = [] for node in unhealthy_dynamic_nodes: if node.name not in self._held_compute_resources: - nodes_to_terminate += node - else: - if time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): - nodes_to_terminate += node - self._held_compute_resources.pop(node.name, None) + nodes_to_terminate.append(node) + elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): + nodes_to_terminate.append(node) + self._held_compute_resources.pop(node.name, None) + + nodes_being_held = set(node.name for node in unhealthy_dynamic_nodes) - set(node.name for node in nodes_to_terminate) + if nodes_being_held: + log.info( + "Holding termination for unhealthy dynamic nodes (timeout: %ss): %s", + self._config.hold_drain_nodes_timeout, + print_with_count(nodes_being_held), + ) instances_to_terminate = [node.instance.id for node in nodes_to_terminate if node.instance] if instances_to_terminate: @@ -910,11 +917,18 @@ def _handle_unhealthy_static_nodes(self, unhealthy_static_nodes): nodes_to_terminate = [] for node in unhealthy_static_nodes: if node.name not in self._held_compute_resources: - nodes_to_terminate += node - else: - if time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): - nodes_to_terminate += node - self._held_compute_resources.pop(node.name, None) + nodes_to_terminate.append(node) + elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): + nodes_to_terminate.append(node) + self._held_compute_resources.pop(node.name, None) + + nodes_being_held = set(node.name for node in unhealthy_static_nodes) - set(node.name for node in nodes_to_terminate) + if nodes_being_held: + log.info( + "Holding termination for unhealthy static nodes (timeout: %ss): %s", + self._config.hold_drain_nodes_timeout, + print_with_count(nodes_being_held), + ) node_list = [node.name for node in nodes_to_terminate] # Set nodes into down state so jobs can be requeued immediately From cba7a37b288a9a7292dd6e0094ebd2b9a1fe67d6 Mon Sep 17 00:00:00 2001 From: Helena Greebe Date: Sun, 22 Feb 2026 15:57:20 -0500 Subject: [PATCH 3/5] Make timeout in minutes --- src/slurm_plugin/clustermgtd.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/slurm_plugin/clustermgtd.py b/src/slurm_plugin/clustermgtd.py index 6d90b85d..d7759829 100644 --- a/src/slurm_plugin/clustermgtd.py +++ b/src/slurm_plugin/clustermgtd.py @@ -151,7 +151,7 @@ class ClustermgtdConfig: "terminate_down_nodes": True, "orphaned_instance_timeout": 300, "ec2_instance_missing_max_count": 0, - "hold_drain_nodes_timeout": 30, + "hold_drain_nodes_timeout": 5, # Health check configs "disable_ec2_health_check": False, "disable_scheduled_event_health_check": False, @@ -848,14 +848,14 @@ def _handle_unhealthy_dynamic_nodes(self, unhealthy_dynamic_nodes): for node in unhealthy_dynamic_nodes: if node.name not in self._held_compute_resources: nodes_to_terminate.append(node) - elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): + elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout * 60): nodes_to_terminate.append(node) self._held_compute_resources.pop(node.name, None) nodes_being_held = set(node.name for node in unhealthy_dynamic_nodes) - set(node.name for node in nodes_to_terminate) if nodes_being_held: log.info( - "Holding termination for unhealthy dynamic nodes (timeout: %ss): %s", + "Holding termination for unhealthy dynamic nodes (timeout: %sm): %s", self._config.hold_drain_nodes_timeout, print_with_count(nodes_being_held), ) @@ -918,14 +918,14 @@ def _handle_unhealthy_static_nodes(self, unhealthy_static_nodes): for node in unhealthy_static_nodes: if node.name not in self._held_compute_resources: nodes_to_terminate.append(node) - elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout): + elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout * 60): nodes_to_terminate.append(node) self._held_compute_resources.pop(node.name, None) nodes_being_held = set(node.name for node in unhealthy_static_nodes) - set(node.name for node in nodes_to_terminate) if nodes_being_held: log.info( - "Holding termination for unhealthy static nodes (timeout: %ss): %s", + "Holding termination for unhealthy static nodes (timeout: %sm): %s", self._config.hold_drain_nodes_timeout, print_with_count(nodes_being_held), ) From ca9a359f80e71a5ced48ebe04c8368e08bc5d71f Mon Sep 17 00:00:00 2001 From: Helena Greebe Date: Mon, 23 Feb 2026 07:57:11 -0500 Subject: [PATCH 4/5] Log time left --- src/slurm_plugin/clustermgtd.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/src/slurm_plugin/clustermgtd.py b/src/slurm_plugin/clustermgtd.py index d7759829..397ae220 100644 --- a/src/slurm_plugin/clustermgtd.py +++ b/src/slurm_plugin/clustermgtd.py @@ -151,7 +151,7 @@ class ClustermgtdConfig: "terminate_down_nodes": True, "orphaned_instance_timeout": 300, "ec2_instance_missing_max_count": 0, - "hold_drain_nodes_timeout": 5, + "hold_drain_nodes_timeout": 30, # Health check configs "disable_ec2_health_check": False, "disable_scheduled_event_health_check": False, @@ -843,21 +843,26 @@ def _handle_unhealthy_dynamic_nodes(self, unhealthy_dynamic_nodes): Setting node to down will let slurm requeue jobs allocated to node. Setting node to power_down will terminate backing instance and reset dynamic node for future use. """ - # Filter to only nodes that have exceeded the hold timeout + # Filter to only nodes that have exceeded the hold timeout (config is in minutes) + timeout_seconds = self._config.hold_drain_nodes_timeout * 60 nodes_to_terminate = [] + nodes_being_held = [] for node in unhealthy_dynamic_nodes: if node.name not in self._held_compute_resources: nodes_to_terminate.append(node) - elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout * 60): + elif time_is_up(self._held_compute_resources[node.name], self._current_time, timeout_seconds): nodes_to_terminate.append(node) self._held_compute_resources.pop(node.name, None) + else: + elapsed = (self._current_time - self._held_compute_resources[node.name]).total_seconds() + remaining = int(timeout_seconds - elapsed) + nodes_being_held.append(f"{node.name}({remaining}s left)") - nodes_being_held = set(node.name for node in unhealthy_dynamic_nodes) - set(node.name for node in nodes_to_terminate) if nodes_being_held: log.info( "Holding termination for unhealthy dynamic nodes (timeout: %sm): %s", self._config.hold_drain_nodes_timeout, - print_with_count(nodes_being_held), + nodes_being_held, ) instances_to_terminate = [node.instance.id for node in nodes_to_terminate if node.instance] @@ -914,22 +919,31 @@ def _handle_unhealthy_static_nodes(self, unhealthy_static_nodes): except Exception as e: log.error("Encountered exception when retrieving console output from unhealthy static nodes: %s", e) + # Config is in minutes, convert to seconds + timeout_seconds = self._config.hold_drain_nodes_timeout * 60 nodes_to_terminate = [] + nodes_being_held = [] for node in unhealthy_static_nodes: if node.name not in self._held_compute_resources: nodes_to_terminate.append(node) - elif time_is_up(self._held_compute_resources[node.name], self._current_time, self._config.hold_drain_nodes_timeout * 60): + elif time_is_up(self._held_compute_resources[node.name], self._current_time, timeout_seconds): nodes_to_terminate.append(node) self._held_compute_resources.pop(node.name, None) + else: + elapsed = (self._current_time - self._held_compute_resources[node.name]).total_seconds() + remaining = int(timeout_seconds - elapsed) + nodes_being_held.append(f"{node.name}({remaining}s left)") - nodes_being_held = set(node.name for node in unhealthy_static_nodes) - set(node.name for node in nodes_to_terminate) if nodes_being_held: log.info( "Holding termination for unhealthy static nodes (timeout: %sm): %s", self._config.hold_drain_nodes_timeout, - print_with_count(nodes_being_held), + nodes_being_held, ) + if not nodes_to_terminate: + return + node_list = [node.name for node in nodes_to_terminate] # Set nodes into down state so jobs can be requeued immediately try: From f9cf1bb35fb9229f96669a5597cc41dc40ec3cc1 Mon Sep 17 00:00:00 2001 From: Helena Greebe Date: Mon, 23 Feb 2026 08:06:38 -0500 Subject: [PATCH 5/5] Only hold for specific reason --- src/slurm_plugin/clustermgtd.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/slurm_plugin/clustermgtd.py b/src/slurm_plugin/clustermgtd.py index 397ae220..d4a698f3 100644 --- a/src/slurm_plugin/clustermgtd.py +++ b/src/slurm_plugin/clustermgtd.py @@ -152,6 +152,7 @@ class ClustermgtdConfig: "orphaned_instance_timeout": 300, "ec2_instance_missing_max_count": 0, "hold_drain_nodes_timeout": 30, + "hold_drain_nodes_reasons": ["Prolog error"], # Health check configs "disable_ec2_health_check": False, "disable_scheduled_event_health_check": False, @@ -297,6 +298,14 @@ def _get_terminate_config(self, config): self.hold_drain_nodes_timeout = config.getint( "clustermgtd", "hold_drain_nodes_timeout", fallback=self.DEFAULTS.get("hold_drain_nodes_timeout") ) + # Parse comma-separated list of reasons + hold_drain_nodes_reasons_str = config.get( + "clustermgtd", "hold_drain_nodes_reasons", fallback=None + ) + if hold_drain_nodes_reasons_str: + self.hold_drain_nodes_reasons = [r.strip() for r in hold_drain_nodes_reasons_str.split(",")] + else: + self.hold_drain_nodes_reasons = self.DEFAULTS.get("hold_drain_nodes_reasons") self.terminate_down_nodes = config.getboolean( "clustermgtd", "terminate_down_nodes", fallback=self.DEFAULTS.get("terminate_down_nodes") ) @@ -788,9 +797,10 @@ def _find_unhealthy_slurm_nodes(self, slurm_nodes): # do not consider as unhealthy the nodes reserved for capacity blocks continue - # Track when the node was first found unhealthy + # Track when the node was first found unhealthy, only if drain reason matches configured reasons if node.name not in self._held_compute_resources: - self._held_compute_resources[node.name] = self._current_time + if node.reason and any(reason in node.reason for reason in self._config.hold_drain_nodes_reasons): + self._held_compute_resources[node.name] = self._current_time all_unhealthy_nodes.append(node)