Skip to content

Commit 00ab174

Browse files
committed
[Observability] Clustermgtd to emit heartbeat event into its events log.
1 parent 0fff56d commit 00ab174

5 files changed

Lines changed: 114 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This file is used to list changes made in each version of the aws-parallelcluste
88

99
**CHANGES**
1010
- Direct users to slurm_resume log to see EC2 error codes if no instances are launched.
11+
- Emit clustermgtd heartbeat as a structured event to CloudWatch Logs for metric filter-based monitoring.
1112

1213
3.14.1
1314
------

src/slurm_plugin/cluster_event_publisher.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@
104104
"message": "Count of EC2 instances backing the cluster compute fleet",
105105
"event_type": "cluster-instance-count",
106106
}
107+
CLUSTERMGTD_HEARTBEAT = {
108+
"message": "Clustermgtd heartbeat",
109+
"event_type": "clustermgtd-heartbeat",
110+
}
107111

108112

109113
class ClusterEventPublisher:
@@ -474,6 +478,37 @@ def publish_compute_node_events(self, compute_nodes: List[SlurmNode], cluster_in
474478
event_supplier=({"detail": self._describe_node(node)} for node in compute_nodes),
475479
)
476480

481+
# Example event generated by this function:
482+
# {
483+
# "datetime": "2023-04-03T18:10:13.089+00:00",
484+
# "version": 0,
485+
# "scheduler": "slurm",
486+
# "cluster-name": "integ-tests-9hbu2op3liukbqqz-develop",
487+
# "node-role": "HeadNode",
488+
# "component": "clustermgtd",
489+
# "level": "INFO",
490+
# "instance-id": "i-04886f8b56e5967ee",
491+
# "event-type": "clustermgtd-heartbeat",
492+
# "message": "Clustermgtd heartbeat",
493+
# "detail": {
494+
# "heartbeat-timestamp": "2023-04-03T18:10:13.089+00:00"
495+
# }
496+
# }
497+
@log_exception(logger, "publish_heartbeat_event", catch_exception=Exception, raise_on_error=False)
498+
def publish_heartbeat_event(self, heartbeat_timestamp: datetime):
499+
"""Publish heartbeat event."""
500+
timestamp = ClusterEventPublisher.timestamp()
501+
self.publish_event(
502+
logging.INFO,
503+
**CLUSTERMGTD_HEARTBEAT,
504+
timestamp=timestamp,
505+
detail={
506+
"heartbeat-timestamp": (
507+
heartbeat_timestamp.isoformat(timespec="milliseconds") if heartbeat_timestamp else None
508+
),
509+
},
510+
)
511+
477512
# Slurm Resume Events
478513
# Note: This uses the same schema as `publish_unhealthy_static_node_events` from clustermgtd
479514
# Example event generated by this function:

src/slurm_plugin/clustermgtd.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,10 +577,13 @@ def manage_cluster(self):
577577
def _write_timestamp_to_file(self):
578578
"""Write timestamp into shared file so compute nodes can determine if head node is online."""
579579
# Make clustermgtd heartbeat readable to all users
580+
heartbeat_time = datetime.now(tz=timezone.utc)
580581
with open(os.open(self._config.heartbeat_file_path, os.O_WRONLY | os.O_CREAT, 0o644), "w") as timestamp_file:
581582
# Note: heartbeat must be written with datetime.strftime to convert localized datetime into str
582583
# datetime.strptime will not work with str(datetime)
583-
timestamp_file.write(datetime.now(tz=timezone.utc).strftime(TIMESTAMP_FORMAT))
584+
timestamp_file.write(heartbeat_time.strftime(TIMESTAMP_FORMAT))
585+
# Publish heartbeat event to events log
586+
self._event_publisher.publish_heartbeat_event(heartbeat_time)
584587

585588
@staticmethod
586589
@retry(stop_max_attempt_number=2, wait_fixed=1000)

tests/slurm_plugin/test_cluster_event_publisher.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2958,3 +2958,31 @@ def test_publish_compute_node_events(compute_nodes, expected_details, level_filt
29582958
assert_that(received_events).is_length(len(expected_details))
29592959
for received_event, expected_detail in zip(received_events, expected_details):
29602960
assert_that(received_event).is_equal_to(expected_detail)
2961+
2962+
2963+
@pytest.mark.parametrize(
2964+
"heartbeat_timestamp, expected_detail",
2965+
[
2966+
pytest.param(
2967+
datetime(year=2023, month=4, day=3, hour=18, minute=10, second=13, microsecond=89000, tzinfo=timezone.utc),
2968+
{"heartbeat-timestamp": "2023-04-03T18:10:13.089+00:00"},
2969+
id="valid timestamp",
2970+
),
2971+
pytest.param(
2972+
None,
2973+
{"heartbeat-timestamp": None},
2974+
id="No timestamp",
2975+
),
2976+
],
2977+
)
2978+
def test_publish_heartbeat_event(heartbeat_timestamp: datetime, expected_detail: dict):
2979+
"""Test that publish_heartbeat_event publishes the correct event."""
2980+
received_events = []
2981+
event_publisher = ClusterEventPublisher(event_handler(received_events, level_filter=["INFO"]))
2982+
2983+
event_publisher.publish_heartbeat_event(heartbeat_timestamp)
2984+
2985+
# Verify the event was published with the correct details
2986+
assert_that(received_events).is_length(1)
2987+
received_event = received_events[0]
2988+
assert_that(received_event).is_equal_to({"clustermgtd-heartbeat": expected_detail})

tests/slurm_plugin/test_clustermgtd.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4112,3 +4112,49 @@ def test_find_active_nodes(partitions_name_map, expected_nodelist):
41124112
"""Unit test for the `ClusterManager._find_active_nodes()` method."""
41134113
result_nodelist = ClusterManager._find_active_nodes(partitions_name_map)
41144114
assert_that(result_nodelist).is_equal_to(expected_nodelist)
4115+
4116+
4117+
@pytest.mark.parametrize(
4118+
"write_heartbeat_file_succeeds, expect_heartbeat_event_published",
4119+
[
4120+
pytest.param(True, True, id="When heartbeat file is written, heartbeat event is written"),
4121+
pytest.param(False, False, id="When heartbeat file is not written, heartbeat event is not written"),
4122+
],
4123+
)
4124+
@pytest.mark.usefixtures(
4125+
"initialize_instance_manager_mock", "initialize_executor_mock", "initialize_console_logger_mock"
4126+
)
4127+
def test_heartbeat_reporting(write_heartbeat_file_succeeds, expect_heartbeat_event_published, mocker, tmp_path):
4128+
"""Test that heartbeat event is published only when file write succeeds."""
4129+
from slurm_plugin.common import TIMESTAMP_FORMAT
4130+
4131+
heartbeat_file_path = str(tmp_path / "heartbeat")
4132+
mock_sync_config = SimpleNamespace(
4133+
heartbeat_file_path=heartbeat_file_path,
4134+
insufficient_capacity_timeout=600,
4135+
cluster_name="test-cluster",
4136+
head_node_instance_id="i-instance-id",
4137+
region="us-east-2",
4138+
boto3_config=None,
4139+
fleet_config={},
4140+
)
4141+
4142+
cluster_manager = ClusterManager(mock_sync_config)
4143+
mock_event_publisher = mocker.patch.object(cluster_manager, "_event_publisher")
4144+
4145+
if write_heartbeat_file_succeeds:
4146+
cluster_manager._write_timestamp_to_file()
4147+
heartbeat_file = tmp_path / "heartbeat"
4148+
assert_that(heartbeat_file.exists()).is_true()
4149+
else:
4150+
mocker.patch("os.open", side_effect=OSError("Mocked write failure"))
4151+
with pytest.raises(OSError):
4152+
cluster_manager._write_timestamp_to_file()
4153+
4154+
if expect_heartbeat_event_published:
4155+
mock_event_publisher.publish_heartbeat_event.assert_called_once()
4156+
event_timestamp = mock_event_publisher.publish_heartbeat_event.call_args[0][0]
4157+
assert_that(event_timestamp).is_instance_of(datetime)
4158+
assert_that(event_timestamp.strftime(TIMESTAMP_FORMAT)).is_equal_to(heartbeat_file.read_text())
4159+
else:
4160+
mock_event_publisher.publish_heartbeat_event.assert_not_called()

0 commit comments

Comments
 (0)