diff --git a/.github/workflows/lint_clippy.yml b/.github/workflows/lint_clippy.yml index 6070de95..44a66910 100644 --- a/.github/workflows/lint_clippy.yml +++ b/.github/workflows/lint_clippy.yml @@ -48,8 +48,14 @@ jobs: override: true components: clippy - - name: check clippy errors + - name: check clippy errors (with "--features stub_supervisor_api_client") uses: actions-rs/cargo@v1 with: command: clippy - args: --all-features --all-targets --workspace -- -D warnings + args: --features stub_supervisor_api_client --all-targets -- -D warnings + + - name: check clippy errors (with "--features score_supervisor_api_client") + uses: actions-rs/cargo@v1 + with: + command: clippy + args: --features score_supervisor_api_client --no-default-features --all-targets -- -D warnings diff --git a/.vscode/settings.json b/.vscode/settings.json index 30718b6f..4761cb3f 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -9,6 +9,9 @@ "rust-analyzer.cargo.cfgs": [ "!miri" ], + "rust-analyzer.cargo.features": [ + "stub_supervisor_api_client" + ], "rust-analyzer.check.command": "clippy", "rust-analyzer.rustfmt.overrideCommand": [ "${workspaceFolder}/.vscode/rustfmt.sh" diff --git a/Cargo.lock b/Cargo.lock index d2d342a1..b4e0ba5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "anstream" version = "0.6.21" @@ -52,11 +61,27 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "cc" +version = "1.2.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b26a0954ae34af09b50f0de26458fa95369a0d478d8236d3f93082b219bd29" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + [[package]] name = "clap" -version = "4.5.52" +version = "4.5.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa8120877db0e5c011242f96806ce3c94e0737ab8108532a76a3300a01db2ab8" +checksum = "6899ea499e3fb9305a65d5ebf6e3d2248c5fab291f300ad0a704fbe142eae31a" dependencies = [ "clap_builder", "clap_derive", @@ -64,9 +89,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.52" +version = "4.5.57" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02576b399397b659c26064fbc92a75fede9d18ffd5f80ca1cd74ddab167016e1" +checksum = "7b12c8b680195a62a8364d16b8447b01b6c2c8f9aaf68bee653be34d4245e238" dependencies = [ "anstream", "anstyle", @@ -76,9 +101,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.49" +version = "4.5.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0b5487afeab2deb2ff4e03a807ad1a03ac532ff5a2cee5d86884440c7f7671" +checksum = "a92793da1a46a5f2a02a6f4c46c6496b28c43638adea8306fcb0caa1634f24e5" dependencies = [ "heck", "proc-macro2", @@ -88,9 +113,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" +checksum = "c3e64b0cc0439b12df2fa678eae89a1c56a529fd067a9115f7827f1fffd22b32" [[package]] name = "colorchoice" @@ -103,11 +128,43 @@ name = "containers" version = "0.1.0" source = "git+https://github.com/eclipse-score/baselibs_rust.git?tag=v0.0.4#d36362e03664f65117145d6fc90e38505d54a900" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "generator" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" +dependencies = [ + "cc", + "cfg-if", + "libc", + "log", + "rustversion", + "windows-link", + "windows-result", +] + [[package]] name = "health_monitoring_lib" version = "0.0.1" dependencies = [ "containers", + "loom", "monitor_rs", "score_log", "score_testing_macros", @@ -126,6 +183,18 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itoa" +version = "1.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.177" @@ -139,6 +208,42 @@ dependencies = [ "libc", ] +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", + "serde", + "serde_json", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + [[package]] name = "monitor_rs" version = "0.0.1" @@ -146,12 +251,33 @@ dependencies = [ "libc", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + [[package]] name = "once_cell_polyfill" version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + [[package]] name = "proc-macro2" version = "1.0.103" @@ -170,6 +296,23 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" + [[package]] name = "rust_supervised_app" version = "0.0.1" @@ -184,6 +327,18 @@ dependencies = [ "stdout_logger", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "score_log" version = "0.0.1" @@ -219,6 +374,64 @@ dependencies = [ "syn", ] +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook" version = "0.3.18" @@ -231,13 +444,20 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.4.6" +version = "1.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" dependencies = [ + "errno", "libc", ] +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + [[package]] name = "stdout_logger" version = "0.0.1" @@ -263,6 +483,64 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-ident" version = "1.0.22" @@ -275,12 +553,27 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -289,3 +582,9 @@ checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ "windows-link", ] + +[[package]] +name = "zmij" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de98dfa5d5b7fef4ee834d0073d560c9ca7b6c46a71d058c48db7960f8cfaf7" diff --git a/Cargo.toml b/Cargo.toml index 4c19b6ec..0665dc50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "src/health_monitoring_lib", "examples/rust_supervised_app", ] +default-members = ["src/health_monitoring_lib"] [workspace.package] edition = "2021" @@ -18,7 +19,6 @@ libc = "0.2.177" clap = { version = "4.5.49", features = ["derive"] } signal-hook = "0.3.18" -monitor_rs = { path = "src/launch_manager_daemon/health_monitor_lib/rust_bindings" } # Temporary API health_monitoring_lib = { path = "src/health_monitoring_lib" } score_log = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.0.4" } score_testing_macros = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = "v0.0.4" } @@ -28,3 +28,6 @@ containers = { git = "https://github.com/eclipse-score/baselibs_rust.git", tag = [workspace.lints.clippy] std_instead_of_core = "warn" alloc_instead_of_core = "warn" + +[workspace.lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } diff --git a/src/health_monitoring_lib/BUILD b/src/health_monitoring_lib/BUILD index 926b588d..b1e2e4eb 100644 --- a/src/health_monitoring_lib/BUILD +++ b/src/health_monitoring_lib/BUILD @@ -28,6 +28,7 @@ PROC_MACRO_DEPS = [ CC_SOURCES = [ "cpp/common.cpp", "cpp/deadline_monitor.cpp", + "cpp/heartbeat_monitor.cpp", "cpp/ffi_helpers.h", "cpp/health_monitor.cpp", ] @@ -35,6 +36,7 @@ CC_SOURCES = [ CC_HDRS = [ "cpp/include/score/hm/common.h", "cpp/include/score/hm/deadline/deadline_monitor.h", + "cpp/include/score/hm/heartbeat/heartbeat_monitor.h", "cpp/include/score/hm/health_monitor.h", ] @@ -42,6 +44,7 @@ CC_HDRS = [ rust_library( name = "health_monitoring_lib", srcs = glob(["rust/**/*.rs"]), + crate_features = ["score_supervisor_api_client"], proc_macro_deps = PROC_MACRO_DEPS, visibility = ["//visibility:public"], deps = COMMON_DEPS, @@ -64,6 +67,7 @@ cc_library( rust_static_library( name = "health_monitoring_lib_ffi", srcs = glob(["rust/**/*.rs"]), + crate_features = ["score_supervisor_api_client"], crate_name = "health_monitoring_lib", proc_macro_deps = [ "@score_baselibs_rust//src/testing_macros:score_testing_macros", @@ -97,6 +101,7 @@ cc_library( rust_test( name = "tests", crate = ":health_monitoring_lib", + crate_features = ["stub_supervisor_api_client"], rustc_flags = [ "-C", "link-arg=-lm", diff --git a/src/health_monitoring_lib/Cargo.toml b/src/health_monitoring_lib/Cargo.toml index 71b38d46..924f7fae 100644 --- a/src/health_monitoring_lib/Cargo.toml +++ b/src/health_monitoring_lib/Cargo.toml @@ -7,7 +7,6 @@ edition.workspace = true authors.workspace = true license-file.workspace = true - [lib] path = "rust/lib.rs" @@ -18,11 +17,15 @@ workspace = true score_log.workspace = true score_testing_macros.workspace = true containers.workspace = true -monitor_rs.workspace = true +monitor_rs = { path = "../launch_manager_daemon/health_monitor_lib/rust_bindings", optional = true } # Temporary API [dev-dependencies] stdout_logger.workspace = true +[target.'cfg(loom)'.dependencies] +loom = { version = "0.7", features = ["checkpoint"] } + [features] -default = [] +default = ["stub_supervisor_api_client"] stub_supervisor_api_client = [] +score_supervisor_api_client = ["monitor_rs"] diff --git a/src/health_monitoring_lib/cpp/ffi_helpers.h b/src/health_monitoring_lib/cpp/ffi_helpers.h index 35dd9188..59f57182 100644 --- a/src/health_monitoring_lib/cpp/ffi_helpers.h +++ b/src/health_monitoring_lib/cpp/ffi_helpers.h @@ -13,6 +13,7 @@ #ifndef SCORE_HM_FFI_HELPERS_HPP #define SCORE_HM_FFI_HELPERS_HPP +#include #include namespace score::hm::ffi diff --git a/src/health_monitoring_lib/cpp/health_monitor.cpp b/src/health_monitoring_lib/cpp/health_monitor.cpp index 75f79b7e..158888f8 100644 --- a/src/health_monitoring_lib/cpp/health_monitor.cpp +++ b/src/health_monitoring_lib/cpp/health_monitor.cpp @@ -25,8 +25,12 @@ internal::FFIHandle health_monitor_builder_build(internal::FFIHandle health_moni void health_monitor_builder_add_deadline_monitor(internal::FFIHandle handle, const IdentTag* tag, internal::FFIHandle monitor_handle); +void health_monitor_builder_add_heartbeat_monitor(internal::FFIHandle hmon_builder_handle, + const IdentTag* monitor_tag, + internal::FFIHandle monitor_builder_handle); internal::FFIHandle health_monitor_get_deadline_monitor(internal::FFIHandle health_monitor_handle, const IdentTag* tag); +internal::FFIHandle health_monitor_get_heartbeat_monitor(internal::FFIHandle hmon_handle, const IdentTag* monitor_tag); void health_monitor_start(internal::FFIHandle health_monitor_handle); void health_monitor_destroy(internal::FFIHandle handler); } @@ -53,6 +57,18 @@ HealthMonitorBuilder HealthMonitorBuilder::add_deadline_monitor(const IdentTag& return std::move(*this); } +HealthMonitorBuilder HealthMonitorBuilder::add_heartbeat_monitor(const IdentTag& monitor_tag, + heartbeat::HeartbeatMonitorBuilder&& monitor) && +{ + auto monitor_handle{monitor.drop_by_rust()}; + SCORE_LANGUAGE_FUTURECPP_PRECONDITION(monitor_handle.has_value()); + SCORE_LANGUAGE_FUTURECPP_PRECONDITION(health_monitor_builder_handle_.as_rust_handle().has_value()); + + health_monitor_builder_add_heartbeat_monitor( + health_monitor_builder_handle_.as_rust_handle().value(), &monitor_tag, monitor_handle.value()); + return std::move(*this); +} + HealthMonitorBuilder HealthMonitorBuilder::with_internal_processing_cycle(std::chrono::milliseconds cycle_duration) && { internal_processing_cycle_duration_ = cycle_duration; @@ -99,6 +115,18 @@ score::cpp::expected HealthMonitor::get_deadli return score::cpp::unexpected(Error::NotFound); } + +score::cpp::expected HealthMonitor::get_heartbeat_monitor( + const IdentTag& monitor_tag) +{ + auto maybe_monitor{health_monitor_get_heartbeat_monitor(health_monitor_, &monitor_tag)}; + if (maybe_monitor == nullptr) + { + return score::cpp::unexpected(Error::NotFound); + } + return score::cpp::expected{heartbeat::HeartbeatMonitor{maybe_monitor}}; +} + void HealthMonitor::start() { health_monitor_start(health_monitor_); diff --git a/src/health_monitoring_lib/cpp/heartbeat_monitor.cpp b/src/health_monitoring_lib/cpp/heartbeat_monitor.cpp new file mode 100644 index 00000000..1199c485 --- /dev/null +++ b/src/health_monitoring_lib/cpp/heartbeat_monitor.cpp @@ -0,0 +1,46 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +#include "score/hm/heartbeat/heartbeat_monitor.h" + +extern "C" { +using namespace score::hm; +using namespace score::hm::internal; +using namespace score::hm::heartbeat; + +internal::FFIHandle heartbeat_monitor_builder_create(uint32_t range_min_ms, uint32_t range_max_ms); +void heartbeat_monitor_builder_destroy(internal::FFIHandle monitor_builder_handle); +void heartbeat_monitor_destroy(FFIHandle monitor_handle); +void heartbeat_monitor_heartbeat(FFIHandle monitor_handle); +} + +namespace score::hm::heartbeat +{ +HeartbeatMonitorBuilder::HeartbeatMonitorBuilder(const TimeRange& range) + : monitor_builder_handle_{heartbeat_monitor_builder_create(range.min_ms(), range.max_ms()), + &heartbeat_monitor_builder_destroy} +{ +} + +HeartbeatMonitor::HeartbeatMonitor(FFIHandle monitor_handle) + : monitor_handle_{monitor_handle, &heartbeat_monitor_destroy} +{ +} + +void HeartbeatMonitor::heartbeat() +{ + auto monitor_handle{monitor_handle_.as_rust_handle()}; + SCORE_LANGUAGE_FUTURECPP_PRECONDITION(monitor_handle.has_value()); + heartbeat_monitor_heartbeat(monitor_handle.value()); +} + +} // namespace score::hm::heartbeat diff --git a/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h b/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h index bf7dc1d7..13de6edd 100644 --- a/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h +++ b/src/health_monitoring_lib/cpp/include/score/hm/health_monitor.h @@ -15,6 +15,7 @@ #include #include +#include namespace score::hm { @@ -40,6 +41,9 @@ class HealthMonitorBuilder final /// Adds a deadline monitor to the builder to construct DeadlineMonitor instances during HealthMonitor build. HealthMonitorBuilder add_deadline_monitor(const IdentTag& tag, deadline::DeadlineMonitorBuilder&& monitor) &&; + /// Adds a heartbeat monitor for a specific identifier tag. + HealthMonitorBuilder add_heartbeat_monitor(const IdentTag& tag, heartbeat::HeartbeatMonitorBuilder&& monitor) &&; + /// Sets the cycle duration for supervisor API notifications. /// This duration determines how often the health monitor notifies the supervisor that the system is alive. HealthMonitorBuilder with_supervisor_api_cycle(std::chrono::milliseconds cycle_duration) &&; @@ -70,6 +74,7 @@ class HealthMonitor final ~HealthMonitor(); score::cpp::expected get_deadline_monitor(const IdentTag& tag); + score::cpp::expected get_heartbeat_monitor(const IdentTag& monitor_tag); void start(); diff --git a/src/health_monitoring_lib/cpp/include/score/hm/heartbeat/heartbeat_monitor.h b/src/health_monitoring_lib/cpp/include/score/hm/heartbeat/heartbeat_monitor.h new file mode 100644 index 00000000..fbaa52e5 --- /dev/null +++ b/src/health_monitoring_lib/cpp/include/score/hm/heartbeat/heartbeat_monitor.h @@ -0,0 +1,84 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +#ifndef SCORE_HM_HEARTBEAT_HEARTBEAT_MONITOR_H +#define SCORE_HM_HEARTBEAT_HEARTBEAT_MONITOR_H + +#include +#include + +namespace score::hm +{ +// Forward declaration +class HealthMonitor; +class HealthMonitorBuilder; +} // namespace score::hm + +namespace score::hm::heartbeat +{ +// Forward declaration +class HeartbeatMonitor; + +/// Builder for `HeartbeatMonitor`. +class HeartbeatMonitorBuilder final : public internal::RustDroppable +{ + public: + /// Create a new `HeartbeatMonitorBuilder`. + /// + /// - `range` - time range between heartbeats. + HeartbeatMonitorBuilder(const TimeRange& range); + + HeartbeatMonitorBuilder(const HeartbeatMonitorBuilder&) = delete; + HeartbeatMonitorBuilder& operator=(const HeartbeatMonitorBuilder&) = delete; + + HeartbeatMonitorBuilder(HeartbeatMonitorBuilder&&) = default; + HeartbeatMonitorBuilder& operator=(HeartbeatMonitorBuilder&&) = delete; + + protected: + std::optional __drop_by_rust_impl() + { + return monitor_builder_handle_.drop_by_rust(); + } + + private: + internal::DroppableFFIHandle monitor_builder_handle_; + + // Allow to hide drop_by_rust implementation + friend class internal::RustDroppable; + + // Allow HealthMonitorBuilder to access drop_by_rust implementation + friend class ::score::hm::HealthMonitorBuilder; +}; + +class HeartbeatMonitor final +{ + public: + // Delete copy, allow move + HeartbeatMonitor(const HeartbeatMonitor&) = delete; + HeartbeatMonitor& operator=(const HeartbeatMonitor&) = delete; + + HeartbeatMonitor(HeartbeatMonitor&& other) noexcept = default; + HeartbeatMonitor& operator=(HeartbeatMonitor&& other) noexcept = default; + + void heartbeat(); + + private: + explicit HeartbeatMonitor(internal::FFIHandle monitor_handle); + + // Only `HealthMonitor` is allowed to create `HeartbeatMonitor` instances. + friend class score::hm::HealthMonitor; + internal::DroppableFFIHandle monitor_handle_; +}; + +} // namespace score::hm::heartbeat + +#endif // SCORE_HM_HEARTBEAT_HEARTBEAT_MONITOR_H diff --git a/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp b/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp index 32d76085..1d105ac6 100644 --- a/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp +++ b/src/health_monitoring_lib/cpp/tests/health_monitor_test.cpp @@ -14,10 +14,9 @@ #include "score/hm/common.h" #include #include -#include +#include using namespace score::hm; -using ::testing::_; class HealthMonitorTest : public ::testing::Test { @@ -26,31 +25,52 @@ class HealthMonitorTest : public ::testing::Test // For first review round, only single test case to show up API TEST_F(HealthMonitorTest, TestName) { - auto builder_mon = deadline::DeadlineMonitorBuilder() - .add_deadline(IdentTag("deadline_1"), - TimeRange(std::chrono::milliseconds(100), std::chrono::milliseconds(200))) - .add_deadline(IdentTag("deadline_2"), - TimeRange(std::chrono::milliseconds(100), std::chrono::milliseconds(200))); + // Setup deadline monitor construction. + const IdentTag deadline_monitor_tag{"deadline_monitor"}; + auto deadline_monitor_builder = + deadline::DeadlineMonitorBuilder() + .add_deadline(IdentTag("deadline_1"), + TimeRange(std::chrono::milliseconds(100), std::chrono::milliseconds(200))) + .add_deadline(IdentTag("deadline_2"), + TimeRange(std::chrono::milliseconds(100), std::chrono::milliseconds(200))); - IdentTag ident("monitor"); + // Setup heartbeat monitor construction. + const IdentTag heartbeat_monitor_tag{"heartbeat_monitor"}; + const TimeRange heartbeat_range{std::chrono::milliseconds{100}, std::chrono::milliseconds{200}}; + auto heartbeat_monitor_builder = heartbeat::HeartbeatMonitorBuilder(heartbeat_range); auto hm = HealthMonitorBuilder() - .add_deadline_monitor(ident, std::move(builder_mon)) + .add_deadline_monitor(deadline_monitor_tag, std::move(deadline_monitor_builder)) + .add_heartbeat_monitor(heartbeat_monitor_tag, std::move(heartbeat_monitor_builder)) .with_internal_processing_cycle(std::chrono::milliseconds(50)) .with_supervisor_api_cycle(std::chrono::milliseconds(50)) .build(); - auto deadline_monitor_res = hm.get_deadline_monitor(ident); + // Obtain deadline monitor from HMON. + auto deadline_monitor_res = hm.get_deadline_monitor(deadline_monitor_tag); EXPECT_TRUE(deadline_monitor_res.has_value()); { - // Try again to get the same monitor - auto deadline_monitor_res = hm.get_deadline_monitor(ident); + // Try again to get the same monitor. + auto deadline_monitor_res = hm.get_deadline_monitor(deadline_monitor_tag); EXPECT_FALSE(deadline_monitor_res.has_value()); } auto deadline_mon = std::move(*deadline_monitor_res); + // Obtain heartbeat monitor from HMON. + auto heartbeat_monitor_res{hm.get_heartbeat_monitor(heartbeat_monitor_tag)}; + EXPECT_TRUE(heartbeat_monitor_res.has_value()); + + { + // Try again to get the same monitor. + auto heartbeat_monitor_res{hm.get_heartbeat_monitor(heartbeat_monitor_tag)}; + EXPECT_FALSE(heartbeat_monitor_res.has_value()); + } + + auto heartbeat_monitor{std::move(*heartbeat_monitor_res)}; + + // Start HMON. hm.start(); auto deadline_res = deadline_mon.get_deadline(IdentTag("deadline_1")); diff --git a/src/health_monitoring_lib/rust/common.rs b/src/health_monitoring_lib/rust/common.rs index 0f8f29bd..bce6d841 100644 --- a/src/health_monitoring_lib/rust/common.rs +++ b/src/health_monitoring_lib/rust/common.rs @@ -15,6 +15,8 @@ use core::fmt::Debug; use core::hash::Hash; use core::time::Duration; use std::sync::Arc; +use std::time::Instant; + /// Unique identifier for deadlines. #[derive(Clone, Copy, Eq)] #[repr(C)] @@ -90,6 +92,7 @@ impl From<&str> for IdentTag { } } +/// Range of accepted time. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct TimeRange { pub min: Duration, @@ -103,16 +106,41 @@ impl TimeRange { } } +/// Get offset between HMON and monitor starting time points as [`u32`]. +pub(crate) fn hmon_time_offset(hmon_starting_point: Instant, monitor_starting_point: Instant) -> u32 { + let result = hmon_starting_point.checked_duration_since(monitor_starting_point); + let duration_since = result.expect("HMON starting point is earlier than monitor starting point"); + duration_to_u32(duration_since) +} + +/// Get duration as [`u32`]. +pub(crate) fn duration_to_u32(duration: Duration) -> u32 { + let millis = duration.as_millis(); + u32::try_from(millis).expect("Monitor running for too long") +} + +/// Heartbeat monitor error subgroup. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, crate::log::ScoreDebug)] +pub(crate) enum HeartbeatMonitorEvaluationError { + /// Multiple heartbeats were observed in the same epoch. + MultipleHeartbeats, +} + /// Errors that can occur during monitor evaluation. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, crate::log::ScoreDebug)] pub(crate) enum MonitorEvaluationError { TooEarly, TooLate, + HeartbeatSpecific(HeartbeatMonitorEvaluationError), } /// Trait for evaluating monitors and reporting errors to be used by HealthMonitor. pub(crate) trait MonitorEvaluator { - fn evaluate(&self, on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError)); + /// Run monitor evaluation. + /// + /// - `hmon_starting_point` - starting point of all monitors. + /// - `on_error` - error handling, containing tag of failing object and error code. + fn evaluate(&self, hmon_starting_point: Instant, on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError)); } /// Handle to a monitor evaluator, allowing for dynamic dispatch. @@ -127,8 +155,8 @@ impl MonitorEvalHandle { } impl MonitorEvaluator for MonitorEvalHandle { - fn evaluate(&self, on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError)) { - self.inner.evaluate(on_error) + fn evaluate(&self, hmon_starting_point: Instant, on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError)) { + self.inner.evaluate(hmon_starting_point, on_error) } } @@ -172,3 +200,51 @@ pub(crate) mod ffi { } } } + +#[cfg(test)] +mod tests { + use crate::common::{duration_to_u32, hmon_time_offset}; + use core::time::Duration; + use std::time::Instant; + + #[test] + fn test_hmon_time_offset_valid() { + let monitor_starting_point = Instant::now(); + let hmon_starting_point = Instant::now(); + let offset = hmon_time_offset(hmon_starting_point, monitor_starting_point); + // Allow small offset. + assert!(offset < 10); + } + + #[test] + #[should_panic(expected = "HMON starting point is earlier than monitor starting point")] + fn test_hmon_time_offset_wrong_order() { + let hmon_starting_point = Instant::now(); + let monitor_starting_point = Instant::now(); + let _offset = hmon_time_offset(hmon_starting_point, monitor_starting_point); + } + + #[test] + #[should_panic(expected = "Monitor running for too long")] + fn test_hmon_time_offset_diff_too_large() { + const HUNDRED_DAYS_AS_SECS: u64 = 100 * 24 * 60 * 60; + let monitor_starting_point = Instant::now(); + let hmon_starting_point = Instant::now() + .checked_add(Duration::from_secs(HUNDRED_DAYS_AS_SECS)) + .unwrap(); + let _offset = hmon_time_offset(hmon_starting_point, monitor_starting_point); + } + + #[test] + fn test_duration_to_u32_valid() { + let result = duration_to_u32(Duration::from_millis(1234)); + assert_eq!(result, 1234); + } + + #[test] + #[should_panic(expected = "Monitor running for too long")] + fn test_duration_to_u32_too_large() { + const HUNDRED_DAYS_AS_SECS: u64 = 100 * 24 * 60 * 60; + let _result = duration_to_u32(Duration::from_secs(HUNDRED_DAYS_AS_SECS)); + } +} diff --git a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs index 5565de0f..27f74ada 100644 --- a/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs +++ b/src/health_monitoring_lib/rust/deadline/deadline_monitor.rs @@ -11,7 +11,9 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* use super::common::DeadlineTemplate; -use crate::common::{IdentTag, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator, TimeRange}; +use crate::common::{ + duration_to_u32, hmon_time_offset, IdentTag, MonitorEvalHandle, MonitorEvaluationError, MonitorEvaluator, TimeRange, +}; use crate::{ deadline::{ common::StateIndex, @@ -96,7 +98,7 @@ impl DeadlineMonitor { inner: Arc::new(DeadlineMonitorInner { deadlines, active_deadlines: active_deadlines.into(), - start_time: Instant::now(), + monitor_starting_point: Instant::now(), }), } } @@ -176,7 +178,7 @@ impl Deadline { /// Caller must ensure that deadline is not used until it's stopped. /// After this call You shall assure there's only a single owner of the `Deadline` instance and it does not call start before stopping. pub(super) unsafe fn start_internal(&mut self) -> Result<(), DeadlineError> { - let now = self.monitor.now(); + let now = duration_to_u32(self.monitor.monitor_starting_point.elapsed()); let max_time = now + self.range.max.as_millis() as u32; let mut is_broken = false; @@ -201,7 +203,7 @@ impl Deadline { } pub(super) fn stop_internal(&mut self) { - let now = self.monitor.now(); + let now = duration_to_u32(self.monitor.monitor_starting_point.elapsed()); let max = self.range.max.as_millis() as u32; let min = self.range.min.as_millis() as u32; @@ -243,6 +245,7 @@ impl Deadline { (Some(MonitorEvaluationError::TooLate), val) => { error!("Deadline {:?} stopped too late by {} ms", self.tag, val); }, + (Some(MonitorEvaluationError::HeartbeatSpecific(_)), _) => unreachable!(), (None, _) => {}, } } @@ -267,8 +270,9 @@ impl Drop for Deadline { } struct DeadlineMonitorInner { - // Start time of the monitor creation to calculate relative timestamps - start_time: Instant, + /// Monitor starting point. + /// Offset is calculated during evaluation in relation to provided health monitor starting point. + monitor_starting_point: Instant, // Templates for deadlines registered in the monitor to create `Deadline` instances. deadlines: HashMap, @@ -280,8 +284,8 @@ struct DeadlineMonitorInner { } impl MonitorEvaluator for DeadlineMonitorInner { - fn evaluate(&self, on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError)) { - self.evaluate(on_error); + fn evaluate(&self, hmon_starting_point: Instant, on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError)) { + self.evaluate(hmon_starting_point, on_error); } } @@ -294,14 +298,7 @@ impl DeadlineMonitorInner { } } - fn now(&self) -> u32 { - let duration = self.start_time.elapsed(); - // As u32 can hold up to ~49 days in milliseconds, this should be sufficient for our use case - // We still have a room up to 60bits timestamp if needed in future - u32::try_from(duration.as_millis()).expect("Monitor running for too long") - } - - fn evaluate(&self, mut on_failed: impl FnMut(&IdentTag, MonitorEvaluationError)) { + fn evaluate(&self, hmon_starting_point: Instant, mut on_failed: impl FnMut(&IdentTag, MonitorEvaluationError)) { for (tag, deadline) in self.active_deadlines.iter() { let snapshot = deadline.snapshot(); if snapshot.is_underrun() { @@ -316,7 +313,9 @@ impl DeadlineMonitorInner { "Deadline snapshot cannot be both running and stopped" ); - let now = self.now(); + // Get current timestamp, with offset to HMON time. + let offset = hmon_time_offset(hmon_starting_point, self.monitor_starting_point); + let now = offset + duration_to_u32(hmon_starting_point.elapsed()); let expected = snapshot.timestamp_ms(); if now > expected { // Deadline missed, report @@ -395,12 +394,13 @@ mod tests { let monitor = create_monitor_with_deadlines(); let mut deadline = monitor.get_deadline(&IdentTag::from("deadline_long")).unwrap(); let handle = deadline.start().unwrap(); + let hmon_starting_point = Instant::now(); std::thread::sleep(core::time::Duration::from_millis(1001)); // Sleep to simulate work within the deadline range drop(handle); // stop the deadline - monitor.inner.evaluate(|tag, deadline_failure| { + monitor.inner.evaluate(hmon_starting_point, |tag, deadline_failure| { panic!( "Deadline {:?} should not have failed or underrun({:?})", tag, deadline_failure @@ -413,10 +413,11 @@ mod tests { let monitor = create_monitor_with_deadlines(); let mut deadline = monitor.get_deadline(&IdentTag::from("deadline_long")).unwrap(); let handle = deadline.start().unwrap(); + let hmon_starting_point = Instant::now(); drop(handle); // stop the deadline - monitor.inner.evaluate(|tag, deadline_failure| { + monitor.inner.evaluate(hmon_starting_point, |tag, deadline_failure| { assert_eq!( deadline_failure, MonitorEvaluationError::TooEarly, @@ -431,10 +432,11 @@ mod tests { let monitor = create_monitor_with_deadlines(); let mut deadline = monitor.get_deadline(&IdentTag::from("deadline_long")).unwrap(); let handle = deadline.start().unwrap(); + let hmon_starting_point = Instant::now(); // So deadline stop happens after evaluate, still it should be reported as failed - monitor.inner.evaluate(|tag, deadline_failure| { + monitor.inner.evaluate(hmon_starting_point, |tag, deadline_failure| { assert_eq!( deadline_failure, MonitorEvaluationError::TooEarly, @@ -452,6 +454,7 @@ mod tests { let monitor = create_monitor_with_deadlines(); let mut deadline = monitor.get_deadline(&IdentTag::from("deadline_long")).unwrap(); let handle = deadline.start().unwrap(); + let hmon_starting_point = Instant::now(); // So deadline failed, then we start it again so it shall be already expired and also evaluation shall work drop(handle); // stop the deadline @@ -460,7 +463,7 @@ mod tests { let handle = deadline.start(); assert_eq!(handle.err(), Some(DeadlineError::DeadlineAlreadyFailed)); - monitor.inner.evaluate(|tag, deadline_failure| { + monitor.inner.evaluate(hmon_starting_point, |tag, deadline_failure| { assert_eq!( deadline_failure, MonitorEvaluationError::TooEarly, @@ -476,10 +479,11 @@ mod tests { let monitor = create_monitor_with_deadlines(); let mut deadline = monitor.get_deadline(&IdentTag::from("deadline_fast")).unwrap(); let handle = deadline.start().unwrap(); + let hmon_starting_point = Instant::now(); drop(handle); // stop the deadline - monitor.inner.evaluate(|tag, deadline_failure| { + monitor.inner.evaluate(hmon_starting_point, |tag, deadline_failure| { assert_eq!( deadline_failure, MonitorEvaluationError::TooLate, @@ -493,6 +497,7 @@ mod tests { #[test] fn monitor_with_multiple_running_deadlines() { let monitor = create_monitor_with_multiple_running_deadlines(); + let hmon_starting_point = Instant::now(); let mut deadline = monitor.get_deadline(&IdentTag::from("deadline_fast1")).unwrap(); let _handle1 = deadline.start().unwrap(); @@ -507,7 +512,7 @@ mod tests { let mut cnt = 0; - monitor.inner.evaluate(|tag, deadline_failure| { + monitor.inner.evaluate(hmon_starting_point, |tag, deadline_failure| { cnt += 1; assert_eq!( deadline_failure, diff --git a/src/health_monitoring_lib/rust/ffi.rs b/src/health_monitoring_lib/rust/ffi.rs index 8c0cc52c..b638b9f4 100644 --- a/src/health_monitoring_lib/rust/ffi.rs +++ b/src/health_monitoring_lib/rust/ffi.rs @@ -51,6 +51,35 @@ extern "C" fn health_monitor_builder_add_deadline_monitor(handle: FFIHandle, tag health_monitor_builder.add_deadline_monitor_internal(&tag, *monitor); } +#[no_mangle] +extern "C" fn health_monitor_builder_add_heartbeat_monitor( + hmon_builder_handle: FFIHandle, + monitor_tag: *const IdentTag, + monitor_builder_handle: FFIHandle, +) { + assert!(!hmon_builder_handle.is_null()); + assert!(!monitor_tag.is_null()); + assert!(!monitor_builder_handle.is_null()); + + // SAFETY: + // Validity of the pointer is ensured. + // `IdentTag` type must be compatible between C++ and Rust. + let monitor_tag = unsafe { *monitor_tag }; + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that pointer was created with a call to `heartbeat_monitor_builder_create`. + let monitor_builder = unsafe { Box::from_raw(monitor_builder_handle as *mut heartbeat::HeartbeatMonitorBuilder) }; + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that pointer was created with a call to `health_monitor_builder_create`. + let mut health_monitor_builder = + FFIBorrowed::new(unsafe { Box::from_raw(hmon_builder_handle as *mut HealthMonitorBuilder) }); + + health_monitor_builder.add_heartbeat_monitor_internal(&monitor_tag, *monitor_builder); +} + #[no_mangle] extern "C" fn health_monitor_builder_build( handle: FFIHandle, @@ -93,6 +122,28 @@ extern "C" fn health_monitor_get_deadline_monitor(handle: FFIHandle, tag: *const } } +#[no_mangle] +extern "C" fn health_monitor_get_heartbeat_monitor(hmon_handle: FFIHandle, monitor_tag: *const IdentTag) -> FFIHandle { + assert!(!hmon_handle.is_null()); + assert!(!monitor_tag.is_null()); + + // SAFETY: + // Validity of the pointer is ensured. + // `IdentTag` type must be compatible between C++ and Rust. + let monitor_tag = unsafe { *monitor_tag }; + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that pointer was created with a call to `health_monitor_builder_build`. + let mut health_monitor = FFIBorrowed::new(unsafe { Box::from_raw(hmon_handle as *mut HealthMonitor) }); + + if let Some(heartbeat_monitor) = health_monitor.get_heartbeat_monitor(&monitor_tag) { + Box::into_raw(Box::new(heartbeat_monitor)) as FFIHandle + } else { + core::ptr::null_mut() + } +} + #[no_mangle] extern "C" fn health_monitor_start(handle: FFIHandle) { assert!(!handle.is_null()); diff --git a/src/health_monitoring_lib/rust/heartbeat/ffi.rs b/src/health_monitoring_lib/rust/heartbeat/ffi.rs new file mode 100644 index 00000000..9069f5a1 --- /dev/null +++ b/src/health_monitoring_lib/rust/heartbeat/ffi.rs @@ -0,0 +1,63 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::common::ffi::FFIHandle; +use crate::heartbeat::{HeartbeatMonitor, HeartbeatMonitorBuilder}; +use crate::TimeRange; +use core::time::Duration; + +#[no_mangle] +pub extern "C" fn heartbeat_monitor_builder_create(range_min_ms: u32, range_max_ms: u32) -> FFIHandle { + let range_min = Duration::from_millis(range_min_ms as u64); + let range_max = Duration::from_millis(range_max_ms as u64); + let range = TimeRange::new(range_min, range_max); + let builder = HeartbeatMonitorBuilder::new(range); + let handle = Box::into_raw(Box::new(builder)); + handle as FFIHandle +} + +#[no_mangle] +pub extern "C" fn heartbeat_monitor_builder_destroy(monitor_builder_handle: FFIHandle) { + assert!(!monitor_builder_handle.is_null()); + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that pointer was created by a call to `heartbeat_monitor_builder_create`. + unsafe { + let _ = Box::from_raw(monitor_builder_handle as *mut HeartbeatMonitorBuilder); + } +} + +#[no_mangle] +pub extern "C" fn heartbeat_monitor_destroy(monitor_handle: FFIHandle) { + assert!(!monitor_handle.is_null()); + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that the pointer was created by a call to `health_monitor_get_heartbeat_monitor`. + unsafe { + let _ = Box::from_raw(monitor_handle as *mut HeartbeatMonitor); + } +} + +#[no_mangle] +pub extern "C" fn heartbeat_monitor_heartbeat(monitor_handle: FFIHandle) { + assert!(!monitor_handle.is_null()); + + // SAFETY: + // Validity of the pointer is ensured. + // It is assumed that the pointer was created by a call to `health_monitor_get_heartbeat_monitor`. + let monitor = unsafe { Box::from_raw(monitor_handle as *mut HeartbeatMonitor) }; + + monitor.heartbeat(); +} diff --git a/src/health_monitoring_lib/rust/heartbeat/heartbeat_monitor.rs b/src/health_monitoring_lib/rust/heartbeat/heartbeat_monitor.rs new file mode 100644 index 00000000..eb4f3fdd --- /dev/null +++ b/src/health_monitoring_lib/rust/heartbeat/heartbeat_monitor.rs @@ -0,0 +1,670 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::common::{ + duration_to_u32, hmon_time_offset, HeartbeatMonitorEvaluationError, IdentTag, MonitorEvalHandle, + MonitorEvaluationError, MonitorEvaluator, TimeRange, +}; +use crate::heartbeat::heartbeat_state::{HeartbeatState, HeartbeatStateSnapshot}; +use crate::log::warn; +use crate::protected_memory::ProtectedMemoryAllocator; +use core::time::Duration; +use score_log::ScoreDebug; +use std::sync::Arc; +use std::time::Instant; + +/// Builder for [`HeartbeatMonitor`]. +#[derive(Debug)] +pub struct HeartbeatMonitorBuilder { + /// Time range between heartbeats. + range: TimeRange, +} + +impl HeartbeatMonitorBuilder { + /// Create a new [`HeartbeatMonitorBuilder`]. + /// + /// - `range` - time range between heartbeats. + pub fn new(range: TimeRange) -> Self { + Self { range } + } + + /// Build the [`HeartbeatMonitor`]. + /// + /// - `tag` - tag of this monitor. + /// - `internal_processing_cycle` - health monitor processing cycle. + /// - `_allocator` - protected memory allocator. + /// + /// # Panics + /// + /// Internal processing cycle must be shorter than doubled minimum time range. + pub(crate) fn build( + self, + tag: IdentTag, + internal_processing_cycle: Duration, + _allocator: &ProtectedMemoryAllocator, + ) -> HeartbeatMonitor { + assert!(self.range.min * 2 > internal_processing_cycle); + HeartbeatMonitor::new(tag, self.range) + } +} + +/// Heartbeat monitor. +pub struct HeartbeatMonitor { + inner: Arc, +} + +impl HeartbeatMonitor { + /// Create a new [`HeartbeatMonitor`]. + fn new(tag: IdentTag, range: TimeRange) -> Self { + Self { + inner: Arc::new(HeartbeatMonitorInner::new(tag, range)), + } + } + + /// Provide a heartbeat. + pub fn heartbeat(&self) { + self.inner.heartbeat() + } + + /// Get eval handle. + pub(crate) fn get_eval_handle(&self) -> MonitorEvalHandle { + MonitorEvalHandle::new(Arc::clone(&self.inner)) + } +} + +/// Time range using [`u32`]. +#[derive(ScoreDebug)] +struct InternalRange { + min: u32, + max: u32, +} + +impl InternalRange { + /// Create range using provided values. + fn new(min: u32, max: u32) -> Self { + assert!(min <= max, "provided min is greater than provided max"); + Self { min, max } + } + + /// Create range with values offset by timestamp. + fn offset(&self, timestamp: u32) -> Self { + Self::new(self.min + timestamp, self.max + timestamp) + } +} + +impl From for InternalRange { + fn from(value: TimeRange) -> Self { + let min = duration_to_u32(value.min); + let max = duration_to_u32(value.max); + Self::new(min, max) + } +} + +struct HeartbeatMonitorInner { + /// Tag of this monitor. + tag: IdentTag, + /// Time range between heartbeats. + range: InternalRange, + /// Monitor starting point. + /// Offset is calculated during evaluation in relation to provided health monitor starting point. + monitor_starting_point: Instant, + /// Current heartbeat state. + /// Contains data in relation to [`Self::monitor_starting_point`]. + heartbeat_state: HeartbeatState, +} + +impl MonitorEvaluator for HeartbeatMonitorInner { + fn evaluate(&self, hmon_starting_point: Instant, on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError)) { + // Get current timestamp, with offset to HMON time. + let offset = hmon_time_offset(hmon_starting_point, self.monitor_starting_point); + let now = offset + duration_to_u32(hmon_starting_point.elapsed()); + + // Load current monitor state. + let snapshot = self.heartbeat_state.snapshot(); + + // Get and recalculate snapshot timestamps. + // IMPORTANT: first heartbeat is obtained when HMON time is unknown. + // It is necessary to: + // - use offset as cycle starting point. + // - get heartbeat snapshot in relation to zero point. + let (start_timestamp, heartbeat_timestamp) = if snapshot.post_init() { + let start_timestamp = snapshot.start_timestamp(); + let heartbeat_timestamp = start_timestamp + snapshot.heartbeat_timestamp_offset(); + (start_timestamp, heartbeat_timestamp) + } else { + let start_timestamp = offset; + let heartbeat_timestamp = snapshot.heartbeat_timestamp_offset(); + (start_timestamp, heartbeat_timestamp) + }; + + // Get allowed time range as absolute values. + let range = self.range.offset(start_timestamp); + + // Check current counter state. + let counter = snapshot.counter(); + // Disallow multiple heartbeats in same heartbeat cycle. + if counter > 1 { + warn!("Multiple heartbeats detected"); + on_error( + &self.tag, + MonitorEvaluationError::HeartbeatSpecific(HeartbeatMonitorEvaluationError::MultipleHeartbeats), + ); + return; + } + // Handle no heartbeats. + else if counter == 0 { + // Disallow no heartbeats when already out of time range. + // Stop execution if still in range. + if now > range.max { + let offset = now - range.max; + warn!("No heartbeat detected, observed after range: {}", offset); + on_error(&self.tag, MonitorEvaluationError::TooLate); + } + // Either way - execution is stopped here. + return; + } + + // Check current heartbeat state. + // Heartbeat before allowed range. + if heartbeat_timestamp < range.min { + let offset = range.min - heartbeat_timestamp; + warn!("Heartbeat occurred too early, offset to range: {}", offset); + on_error(&self.tag, MonitorEvaluationError::TooEarly); + } + // Heartbeat after allowed range. + else if heartbeat_timestamp > range.max { + let offset = heartbeat_timestamp - range.max; + warn!("Heartbeat occurred too late, offset to range: {}", offset); + on_error(&self.tag, MonitorEvaluationError::TooLate); + } + // Heartbeat in allowed state. + else { + // Update heartbeat monitor state with a current heartbeat as a beginning of a new cycle. + let _ = self + .heartbeat_state + .update(|_| Some(HeartbeatStateSnapshot::new(heartbeat_timestamp))); + } + } +} + +impl HeartbeatMonitorInner { + fn new(tag: IdentTag, range: TimeRange) -> Self { + let monitor_starting_point = Instant::now(); + let heartbeat_state_snapshot = HeartbeatStateSnapshot::default(); + let heartbeat_state = HeartbeatState::new(heartbeat_state_snapshot); + Self { + tag, + range: InternalRange::from(range), + monitor_starting_point, + heartbeat_state, + } + } + + /// Provide a heartbeat. + fn heartbeat(&self) { + // Get current timestamp. + let now = duration_to_u32(self.monitor_starting_point.elapsed()); + + // Set heartbeat timestamp and update counter. + let _ = self.heartbeat_state.update(|mut state| { + let start_ts = state.start_timestamp(); + state.set_heartbeat_timestamp_offset(now - start_ts); + state.increment_counter(); + Some(state) + }); + } +} + +#[cfg(test)] +mod test_common { + use crate::TimeRange; + use core::time::Duration; + use std::thread::sleep; + use std::time::Instant; + + pub(super) const TAG: &str = "heartbeat_monitor"; + + pub(super) fn sleep_until(target: Duration, start: Instant) { + let elapsed = start.elapsed(); + let diff = target.saturating_sub(elapsed); + sleep(diff) + } + + pub(super) fn range_from_ms(min: u64, max: u64) -> TimeRange { + TimeRange::new(Duration::from_millis(min), Duration::from_millis(max)) + } +} + +#[score_testing_macros::test_mod_with_log] +#[cfg(all(test, not(loom)))] +mod tests { + use crate::common::{HeartbeatMonitorEvaluationError, MonitorEvaluationError, MonitorEvaluator}; + use crate::heartbeat::heartbeat_monitor::test_common::{range_from_ms, sleep_until, TAG}; + use crate::heartbeat::{HeartbeatMonitor, HeartbeatMonitorBuilder}; + use crate::protected_memory::ProtectedMemoryAllocator; + use crate::{IdentTag, TimeRange}; + use core::sync::atomic::{AtomicBool, Ordering}; + use core::time::Duration; + use std::sync::Arc; + use std::thread::{sleep, spawn}; + use std::time::Instant; + + fn create_monitor_single_cycle(range: TimeRange) -> HeartbeatMonitor { + let tag = IdentTag::from(TAG); + let internal_processing_cycle = Duration::from_millis(1); + let allocator = ProtectedMemoryAllocator {}; + HeartbeatMonitorBuilder::new(range).build(tag, internal_processing_cycle, &allocator) + } + + #[test] + fn test_no_beat_evaluate_early() { + let range = range_from_ms(80, 120); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // No beat happened, no error is expected. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + panic!("error happened, tag: {tag:?}, error: {error:?}") + }); + } + + #[test] + fn test_no_beat_evaluate_in_range() { + let range = range_from_ms(80, 120); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // Wait until middle of range. + sleep_until(Duration::from_millis(100), hmon_starting_point); + + // No beat happened, no error is expected. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + panic!("error happened, tag: {tag:?}, error: {error:?}") + }); + } + #[test] + fn test_no_beat_evaluate_late() { + let range = range_from_ms(80, 120); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // Wait until late. + sleep_until(Duration::from_millis(150), hmon_starting_point); + + // No beat happened, too late error is expected. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert_eq!(error, MonitorEvaluationError::TooLate); + }); + } + + fn beat_eval_test( + beat_time: Duration, + eval_time: Duration, + on_error: &mut dyn FnMut(&IdentTag, MonitorEvaluationError), + ) { + let range = range_from_ms(80, 120); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // Wait and beat. + sleep_until(beat_time, hmon_starting_point); + monitor.heartbeat(); + + // Wait and evaluate. + sleep_until(eval_time, hmon_starting_point); + monitor.inner.evaluate(hmon_starting_point, on_error); + } + + fn beat_early_test(eval_time: Duration) { + beat_eval_test( + Duration::from_millis(25), + eval_time, + &mut |tag: &IdentTag, error: MonitorEvaluationError| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert_eq!(error, MonitorEvaluationError::TooEarly); + }, + ); + } + + #[test] + fn test_beat_early_evaluate_early() { + beat_early_test(Duration::from_millis(50)); + } + + #[test] + fn test_beat_early_evaluate_in_range() { + beat_early_test(Duration::from_millis(100)); + } + + #[test] + fn test_beat_early_evaluate_late() { + beat_early_test(Duration::from_millis(150)); + } + + fn beat_in_range_test(eval_time: Duration) { + beat_eval_test(Duration::from_millis(90), eval_time, &mut |tag, error| { + panic!("error happened, tag: {tag:?}, error: {error:?}") + }); + } + + #[test] + fn test_beat_in_range_evaluate_in_range() { + beat_in_range_test(Duration::from_millis(100)); + } + + #[test] + fn test_beat_in_range_evaluate_late() { + beat_in_range_test(Duration::from_millis(150)); + } + + #[test] + fn test_beat_late_evaluate_late() { + beat_eval_test( + Duration::from_millis(150), + Duration::from_millis(200), + &mut |tag: &IdentTag, error: MonitorEvaluationError| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert_eq!(error, MonitorEvaluationError::TooLate); + }, + ) + } + + fn multiple_beats_eval_test(beat_time: Duration, eval_time: Duration) { + let range = range_from_ms(80, 120); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // Wait and beat. + sleep_until(beat_time, hmon_starting_point); + const NUM_BEATS: usize = 10; + for _ in 0..NUM_BEATS { + monitor.heartbeat(); + } + + // Wait and evaluate. + sleep_until(eval_time, hmon_starting_point); + monitor.inner.evaluate( + hmon_starting_point, + &mut |tag: &IdentTag, error: MonitorEvaluationError| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert_eq!( + error, + MonitorEvaluationError::HeartbeatSpecific(HeartbeatMonitorEvaluationError::MultipleHeartbeats) + ); + }, + ); + } + + #[test] + fn test_multiple_beats_early_evaluate_early() { + multiple_beats_eval_test(Duration::from_millis(25), Duration::from_millis(50)) + } + + #[test] + fn test_multiple_beats_early_evaluate_in_range() { + multiple_beats_eval_test(Duration::from_millis(25), Duration::from_millis(100)) + } + + #[test] + fn test_multiple_beats_early_evaluate_late() { + multiple_beats_eval_test(Duration::from_millis(25), Duration::from_millis(150)) + } + + #[test] + fn test_multiple_beats_in_range_evaluate_in_range() { + multiple_beats_eval_test(Duration::from_millis(90), Duration::from_millis(100)) + } + + #[test] + fn test_multiple_beats_in_range_evaluate_late() { + multiple_beats_eval_test(Duration::from_millis(90), Duration::from_millis(150)) + } + + #[test] + fn test_multiple_beats_late_evaluate_late() { + multiple_beats_eval_test(Duration::from_millis(150), Duration::from_millis(200)) + } + + fn create_monitor_multiple_cycles(cycle: Duration) -> Arc { + let range = range_from_ms(80, 120); + let tag = IdentTag::from(TAG); + let allocator = ProtectedMemoryAllocator {}; + Arc::new(HeartbeatMonitorBuilder::new(range).build(tag, cycle, &allocator)) + } + + #[test] + fn test_cycle_early() { + let cycle = Duration::from_millis(20); + let monitor = create_monitor_multiple_cycles(cycle); + let hmon_starting_point = Instant::now(); + + // Run heartbeat thread. + let monitor_clone = monitor.clone(); + let heartbeat_finished = Arc::new(AtomicBool::new(false)); + let heartbeat_finished_clone = heartbeat_finished.clone(); + let heartbeat_thread = spawn(move || { + const NUM_BEATS: u32 = 3; + const BEAT_INTERVAL: Duration = Duration::from_millis(100); + for i in 1..NUM_BEATS { + sleep_until(i * BEAT_INTERVAL, hmon_starting_point); + monitor_clone.heartbeat(); + } + + // Perform a last heartbeat in shorter interval. + sleep_until( + NUM_BEATS * BEAT_INTERVAL - Duration::from_millis(40), + hmon_starting_point, + ); + monitor_clone.heartbeat(); + + heartbeat_finished_clone.store(true, Ordering::Release); + }); + + // Run evaluation thread. + while !heartbeat_finished.load(Ordering::Acquire) { + sleep(cycle); + // Too early error is expected. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert_eq!(error, MonitorEvaluationError::TooEarly); + }); + } + + heartbeat_thread.join().unwrap(); + } + + #[test] + fn test_cycle_in_range() { + let cycle = Duration::from_millis(20); + let monitor = create_monitor_multiple_cycles(cycle); + let hmon_starting_point = Instant::now(); + + // Run heartbeat thread. + let monitor_clone = monitor.clone(); + let heartbeat_finished = Arc::new(AtomicBool::new(false)); + let heartbeat_finished_clone = heartbeat_finished.clone(); + let heartbeat_thread = spawn(move || { + const NUM_BEATS: u32 = 3; + const BEAT_INTERVAL: Duration = Duration::from_millis(100); + for i in 1..=NUM_BEATS { + sleep_until(i * BEAT_INTERVAL, hmon_starting_point); + monitor_clone.heartbeat(); + } + heartbeat_finished_clone.store(true, Ordering::Release); + }); + + // Run evaluation thread. + while !heartbeat_finished.load(Ordering::Acquire) { + sleep(cycle); + // No error is expected. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + panic!("error happened, tag: {tag:?}, error: {error:?}") + }); + } + + heartbeat_thread.join().unwrap(); + } + + #[test] + fn test_cycle_late() { + let cycle = Duration::from_millis(20); + let monitor = create_monitor_multiple_cycles(cycle); + let hmon_starting_point = Instant::now(); + + // Run heartbeat thread. + let monitor_clone = monitor.clone(); + let heartbeat_finished = Arc::new(AtomicBool::new(false)); + let heartbeat_finished_clone = heartbeat_finished.clone(); + let heartbeat_thread = spawn(move || { + const NUM_BEATS: u32 = 3; + const BEAT_INTERVAL: Duration = Duration::from_millis(100); + for i in 1..NUM_BEATS { + sleep_until(i * BEAT_INTERVAL, hmon_starting_point); + monitor_clone.heartbeat(); + } + + // Perform a last heartbeat in shorter interval. + sleep_until( + NUM_BEATS * BEAT_INTERVAL + Duration::from_millis(40), + hmon_starting_point, + ); + monitor_clone.heartbeat(); + + heartbeat_finished_clone.store(true, Ordering::Release); + }); + + // Run evaluation thread. + while !heartbeat_finished.load(Ordering::Acquire) { + sleep(cycle); + // No heartbeat or too late error is expected. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert!(error == MonitorEvaluationError::TooLate); + }); + } + + heartbeat_thread.join().unwrap(); + } + + #[test] + fn test_timestamp_offset() { + let range = range_from_ms(80, 120); + let monitor = create_monitor_single_cycle(range); + + // Move away monitor creation and HMON starting point. + sleep(Duration::from_millis(300)); + let hmon_starting_point = Instant::now(); + + // Wait and beat. + sleep_until(Duration::from_millis(90), hmon_starting_point); + monitor.heartbeat(); + + // Wait and evaluate. + sleep_until(Duration::from_millis(100), hmon_starting_point); + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + panic!("error happened, tag: {tag:?}, error: {error:?}") + }); + } +} + +#[cfg(all(test, loom))] +mod loom_tests { + use crate::common::{MonitorEvaluationError, MonitorEvaluator}; + use crate::heartbeat::heartbeat_monitor::test_common::{range_from_ms, sleep_until, TAG}; + use crate::heartbeat::{HeartbeatMonitor, HeartbeatMonitorBuilder}; + use crate::protected_memory::ProtectedMemoryAllocator; + use crate::{IdentTag, TimeRange}; + use core::time::Duration; + use loom::thread::spawn; + use std::sync::Arc; + use std::time::Instant; + + fn create_monitor_single_cycle(range: TimeRange) -> Arc { + let tag = IdentTag::from(TAG); + let internal_processing_cycle = Duration::from_millis(1); + let allocator = ProtectedMemoryAllocator {}; + Arc::new(HeartbeatMonitorBuilder::new(range).build(tag, internal_processing_cycle, &allocator)) + } + + #[test] + fn test_heartbeat_evaluate_too_early() { + loom::model(|| { + let range = range_from_ms(30, 70); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // Perform heartbeat in a separate thread. + let monitor_clone = monitor.clone(); + let heartbeat_thread = spawn(move || monitor_clone.heartbeat()); + + // Evaluate. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert_eq!(error, MonitorEvaluationError::TooEarly); + }); + + heartbeat_thread.join().unwrap(); + }); + } + + #[test] + fn test_heartbeat_evaluate_in_range() { + loom::model(|| { + let range = range_from_ms(30, 70); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // Wait until in range. + sleep_until(Duration::from_millis(50), hmon_starting_point); + + // Perform heartbeat in a separate thread. + let monitor_clone = monitor.clone(); + let heartbeat_thread = spawn(move || monitor_clone.heartbeat()); + + // Evaluate. + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + panic!("error happened, tag: {tag:?}, error: {error:?}"); + }); + + heartbeat_thread.join().unwrap(); + }); + } + + #[test] + fn test_heartbeat_evaluate_too_late() { + loom::model(|| { + let range = range_from_ms(30, 70); + let monitor = create_monitor_single_cycle(range); + let hmon_starting_point = Instant::now(); + + // Wait until too late. + sleep_until(Duration::from_millis(100), hmon_starting_point); + + // Perform heartbeat in a separate thread. + let monitor_clone = monitor.clone(); + let heartbeat_thread = spawn(move || monitor_clone.heartbeat()); + + // Evaluate. + let mut error_detected = false; + monitor.inner.evaluate(hmon_starting_point, &mut |tag, error| { + assert_eq!(*tag, IdentTag::from(TAG)); + assert_eq!(error, MonitorEvaluationError::TooLate); + error_detected = true; + }); + + heartbeat_thread.join().unwrap(); + assert!(error_detected); + }); + } +} diff --git a/src/health_monitoring_lib/rust/heartbeat/heartbeat_state.rs b/src/health_monitoring_lib/rust/heartbeat/heartbeat_state.rs new file mode 100644 index 00000000..4f8d8d96 --- /dev/null +++ b/src/health_monitoring_lib/rust/heartbeat/heartbeat_state.rs @@ -0,0 +1,321 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use core::cmp::min; + +#[cfg(not(loom))] +use core::sync::atomic::{AtomicU64, Ordering}; +#[cfg(loom)] +use loom::sync::atomic::{AtomicU64, Ordering}; + +/// Snapshot of a heartbeat state. +/// Data layout: +/// - cycle start timestamp: 32 bits +/// - heartbeat timestamp offset: 29 bits +/// - heartbeat counter: 2 bits +/// - post-init flag: 1 bit +#[derive(Clone, Copy, Default)] +pub struct HeartbeatStateSnapshot(u64); + +const START_MASK: u64 = 0xFFFFFFFF_00000000; +const START_OFFSET: u32 = u32::BITS; +const BEAT_MASK: u64 = 0x00000000_FFFFFFF8; +const BEAT_OFFSET: u32 = 3; +const COUNT_MASK: u64 = 0b0110; +const COUNT_OFFSET: u32 = 1; +const POST_INIT_MASK: u64 = 0b0001; + +impl HeartbeatStateSnapshot { + /// Create a new snapshot with known starting point. + /// `post_init` flag is implicitly set to 1. + pub fn new(start_timestamp: u32) -> Self { + let mut snapshot = Self::default(); + snapshot.set_start_timestamp(start_timestamp); + snapshot.set_post_init(true); + snapshot + } + + /// Return underlying data. + pub fn as_u64(&self) -> u64 { + self.0 + } + + /// Cycle start timestamp. + pub fn start_timestamp(&self) -> u32 { + ((self.0 & START_MASK) >> START_OFFSET) as u32 + } + + /// Set cycle start timestamp. + pub fn set_start_timestamp(&mut self, value: u32) { + self.0 = ((value as u64) << START_OFFSET) | (self.0 & !START_MASK); + } + + /// Heartbeat timestamp offset. + pub fn heartbeat_timestamp_offset(&self) -> u32 { + ((self.0 & BEAT_MASK) >> BEAT_OFFSET) as u32 + } + + /// Set heartbeat timestamp offset. + /// Value is 29-bit, must be lower than 0x1FFFFFFF. + pub fn set_heartbeat_timestamp_offset(&mut self, value: u32) { + assert!(value < 1 << 29, "provided heartbeat offset is out of range"); + self.0 = ((value as u64) << BEAT_OFFSET) | (self.0 & !BEAT_MASK); + } + + /// Heartbeat counter. + pub fn counter(&self) -> u8 { + ((self.0 & COUNT_MASK) >> COUNT_OFFSET) as u8 + } + + /// Increment heartbeat counter. + /// Value is 2-bit, larger values are saturated to max value (3). + pub fn increment_counter(&mut self) { + let value = min(self.counter() + 1, 3); + self.0 = ((value as u64) << COUNT_OFFSET) | (self.0 & !COUNT_MASK); + } + + /// Post-init state. + /// This should be `false` only before first cycle is concluded. + pub fn post_init(&self) -> bool { + let value = self.0 & POST_INIT_MASK; + value != 0 + } + + /// Set post-init state. + pub fn set_post_init(&mut self, value: bool) { + self.0 = (value as u64) | (self.0 & !POST_INIT_MASK); + } +} + +impl From for HeartbeatStateSnapshot { + fn from(value: u64) -> Self { + Self(value) + } +} + +/// Atomic representation of [`HeartbeatStateSnapshot`]. +pub struct HeartbeatState(AtomicU64); + +impl HeartbeatState { + /// Create a new [`HeartbeatState`] using provided [`HeartbeatStateSnapshot`]. + pub fn new(snapshot: HeartbeatStateSnapshot) -> Self { + Self(AtomicU64::new(snapshot.as_u64())) + } + + /// Return a snapshot of the current heartbeat state. + pub fn snapshot(&self) -> HeartbeatStateSnapshot { + HeartbeatStateSnapshot::from(self.0.load(Ordering::Relaxed)) + } + + /// Update the heartbeat state using the provided closure. + /// Closure receives the current state and should return an [`Option`] containing a new state. + /// If [`None`] is returned then the state was not updated. + pub fn update Option>( + &self, + mut f: F, + ) -> Result { + // Prev values returned + self.0 + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |prev| { + let snapshot = HeartbeatStateSnapshot::from(prev); + f(snapshot).map(|new_snapshot| new_snapshot.as_u64()) + }) + .map(HeartbeatStateSnapshot::from) + .map_err(HeartbeatStateSnapshot::from) + } +} + +#[cfg(all(test, not(loom)))] +mod tests { + use crate::heartbeat::heartbeat_state::{HeartbeatState, HeartbeatStateSnapshot}; + use core::cmp::min; + use core::sync::atomic::Ordering; + + #[test] + fn test_snapshot_new_zero() { + let state = HeartbeatStateSnapshot::new(0); + + assert_eq!(state.as_u64(), 0x01); + assert_eq!(state.start_timestamp(), 0); + assert_eq!(state.heartbeat_timestamp_offset(), 0); + assert_eq!(state.counter(), 0); + assert!(state.post_init()); + } + + #[test] + fn test_snapshot_new_valid() { + let state = HeartbeatStateSnapshot::new(0xDEADBEEF); + + assert_eq!(state.as_u64(), (0xDEADBEEF << u32::BITS) + 0x01); + assert_eq!(state.start_timestamp(), 0xDEADBEEF); + assert_eq!(state.heartbeat_timestamp_offset(), 0); + assert_eq!(state.counter(), 0); + assert!(state.post_init()); + } + + #[test] + fn test_snapshot_new_max() { + let state = HeartbeatStateSnapshot::new(u32::MAX); + + assert_eq!(state.as_u64(), ((u32::MAX as u64) << u32::BITS) + 0x01); + assert_eq!(state.start_timestamp(), u32::MAX); + assert_eq!(state.heartbeat_timestamp_offset(), 0); + assert_eq!(state.counter(), 0); + assert!(state.post_init()); + } + + #[test] + fn test_snapshot_from_u64_zero() { + let state = HeartbeatStateSnapshot::from(0); + + assert_eq!(state.as_u64(), 0); + assert_eq!(state.start_timestamp(), 0); + assert_eq!(state.heartbeat_timestamp_offset(), 0); + assert_eq!(state.counter(), 0); + assert!(!state.post_init()); + } + + #[test] + fn test_snapshot_from_u64_valid() { + let state = HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF); + + assert_eq!(state.as_u64(), 0xDEADBEEF_DEADBEEF); + assert_eq!(state.start_timestamp(), 0xDEADBEEF); + assert_eq!(state.heartbeat_timestamp_offset(), 0xDEADBEEF >> 3); + assert_eq!(state.counter(), 3); + assert!(state.post_init()); + } + + #[test] + fn test_snapshot_from_u64_max() { + let state = HeartbeatStateSnapshot::from(u64::MAX); + + assert_eq!(state.as_u64(), u64::MAX); + assert_eq!(state.start_timestamp(), u32::MAX); + assert_eq!(state.heartbeat_timestamp_offset(), u32::MAX >> 3); + assert_eq!(state.counter(), 3); + assert!(state.post_init()); + } + + #[test] + fn test_snapshot_default() { + let state = HeartbeatStateSnapshot::default(); + + assert_eq!(state.as_u64(), 0); + assert_eq!(state.start_timestamp(), 0); + assert_eq!(state.heartbeat_timestamp_offset(), 0); + assert_eq!(state.counter(), 0); + assert!(!state.post_init()); + } + + #[test] + fn test_snapshot_set_start_timestamp() { + let mut state = HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF); + state.set_start_timestamp(0xCAFEBAAD); + + assert_eq!(state.start_timestamp(), 0xCAFEBAAD); + + // Check other parameters unchanged. + assert_eq!(state.heartbeat_timestamp_offset(), 0xDEADBEEF >> 3); + assert_eq!(state.counter(), 3); + assert!(state.post_init()); + } + + #[test] + fn test_snapshot_set_heartbeat_timestamp_valid() { + let mut state = HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF); + state.set_heartbeat_timestamp_offset(0x1CAFEBAD); + + assert_eq!(state.heartbeat_timestamp_offset(), 0x1CAFEBAD); + + // Check other parameters unchanged. + assert_eq!(state.start_timestamp(), 0xDEADBEEF); + assert_eq!(state.counter(), 3); + assert!(state.post_init()); + } + + #[test] + #[should_panic(expected = "provided heartbeat offset is out of range")] + fn test_snapshot_set_heartbeat_timestamp_out_of_range() { + let mut state = HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF); + state.set_heartbeat_timestamp_offset(0x20000000); + } + + #[test] + fn test_snapshot_counter_increment() { + let mut state = HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEE9); + + // Max value is 3, check if saturates. + for i in 1..=4 { + state.increment_counter(); + assert_eq!(state.counter(), min(i, 3)); + } + + // Check other parameters unchanged. + assert_eq!(state.start_timestamp(), 0xDEADBEEF); + assert_eq!(state.heartbeat_timestamp_offset(), 0xDEADBEEF >> 3); + assert!(state.post_init()); + } + + #[test] + fn test_snapshot_set_post_init() { + let mut state = HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF); + + state.set_post_init(false); + assert!(!state.post_init()); + state.set_post_init(true); + assert!(state.post_init()); + + // Check other parameters unchanged. + assert_eq!(state.start_timestamp(), 0xDEADBEEF); + assert_eq!(state.heartbeat_timestamp_offset(), 0xDEADBEEF >> 3); + assert_eq!(state.counter(), 3); + } + + #[test] + fn test_state_new() { + let state = HeartbeatState::new(HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF)); + assert_eq!(state.0.load(Ordering::Relaxed), 0xDEADBEEF_DEADBEEF); + } + + #[test] + fn test_state_snapshot() { + let state = HeartbeatState::new(HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF)); + assert_eq!(state.snapshot().as_u64(), 0xDEADBEEF_DEADBEEF); + } + + #[test] + fn test_state_update_some() { + let state = HeartbeatState::new(HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF)); + let _ = state.update(|prev_snapshot| { + // Make sure state is as expected. + assert_eq!(prev_snapshot.as_u64(), 0xDEADBEEF_DEADBEEF); + assert_eq!(prev_snapshot.start_timestamp(), 0xDEADBEEF); + assert_eq!(prev_snapshot.heartbeat_timestamp_offset(), 0xDEADBEEF >> 3); + assert_eq!(prev_snapshot.counter(), 3); + assert!(prev_snapshot.post_init()); + + Some(HeartbeatStateSnapshot::from(0)) + }); + + assert_eq!(state.snapshot().as_u64(), 0); + } + + #[test] + fn test_state_update_none() { + let state = HeartbeatState::new(HeartbeatStateSnapshot::from(0xDEADBEEF_DEADBEEF)); + let _ = state.update(|_| None); + + assert_eq!(state.snapshot().as_u64(), 0xDEADBEEF_DEADBEEF); + } +} diff --git a/src/health_monitoring_lib/rust/heartbeat/mod.rs b/src/health_monitoring_lib/rust/heartbeat/mod.rs new file mode 100644 index 00000000..8f1c2b30 --- /dev/null +++ b/src/health_monitoring_lib/rust/heartbeat/mod.rs @@ -0,0 +1,20 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +mod heartbeat_monitor; +mod heartbeat_state; + +pub use heartbeat_monitor::{HeartbeatMonitor, HeartbeatMonitorBuilder}; + +// FFI bindings +pub(super) mod ffi; diff --git a/src/health_monitoring_lib/rust/lib.rs b/src/health_monitoring_lib/rust/lib.rs index 3d91682c..86c066cc 100644 --- a/src/health_monitoring_lib/rust/lib.rs +++ b/src/health_monitoring_lib/rust/lib.rs @@ -11,30 +11,38 @@ // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use std::collections::HashMap; - mod common; mod ffi; mod log; mod protected_memory; +mod supervisor_api_client; mod worker; pub mod deadline; +pub mod heartbeat; + +use crate::supervisor_api_client::SupervisorAPIClientImpl; pub use common::{IdentTag, TimeRange}; +use core::time::Duration; +use std::collections::HashMap; +/// Builder for [`HealthMonitor`]. #[derive(Default)] pub struct HealthMonitorBuilder { - deadlines: HashMap, - supervisor_api_cycle: core::time::Duration, - internal_processing_cycle: core::time::Duration, + deadline_monitor_builders: HashMap, + heartbeat_monitor_builders: HashMap, + supervisor_api_cycle: Duration, + internal_processing_cycle: Duration, } impl HealthMonitorBuilder { + /// Create a new [`HealthMonitorBuilder`]. pub fn new() -> Self { Self { - deadlines: HashMap::new(), - supervisor_api_cycle: core::time::Duration::from_millis(500), - internal_processing_cycle: core::time::Duration::from_millis(100), + deadline_monitor_builders: HashMap::new(), + heartbeat_monitor_builders: HashMap::new(), + supervisor_api_cycle: Duration::from_millis(500), + internal_processing_cycle: Duration::from_millis(100), } } @@ -49,16 +57,27 @@ impl HealthMonitorBuilder { self } + /// Adds a heartbeat monitor for a specific identifier tag. + /// # Arguments + /// * `tag` - The unique identifier for the heartbeat monitor. + /// * `monitor` - The builder for the heartbeat monitor. + /// # Note + /// If a monitor with the same tag already exists, it will be overwritten. + pub fn add_heartbeat_monitor(mut self, tag: &IdentTag, monitor: heartbeat::HeartbeatMonitorBuilder) -> Self { + self.add_heartbeat_monitor_internal(tag, monitor); + self + } + /// Sets the cycle duration for supervisor API notifications. /// This duration determines how often the health monitor notifies the supervisor that the system is alive. - pub fn with_supervisor_api_cycle(mut self, cycle_duration: core::time::Duration) -> Self { + pub fn with_supervisor_api_cycle(mut self, cycle_duration: Duration) -> Self { self.with_supervisor_api_cycle_internal(cycle_duration); self } /// Sets the internal processing cycle duration. /// This duration determines how often the health monitor checks deadlines. - pub fn with_internal_processing_cycle(mut self, cycle_duration: core::time::Duration) -> Self { + pub fn with_internal_processing_cycle(mut self, cycle_duration: Duration) -> Self { self.with_internal_processing_cycle_internal(cycle_duration); self } @@ -73,12 +92,29 @@ impl HealthMonitorBuilder { ); let allocator = protected_memory::ProtectedMemoryAllocator {}; - let mut monitors = HashMap::new(); - for (tag, builder) in self.deadlines { - monitors.insert(tag, Some(DeadlineMonitorState::Available(builder.build(&allocator)))); + + // Create deadline monitors. + let mut deadline_monitors = HashMap::new(); + for (tag, builder) in self.deadline_monitor_builders { + deadline_monitors.insert(tag, Some(MonitorState::Available(builder.build(&allocator)))); + } + + // Create heartbeat monitors. + let mut heartbeat_monitors = HashMap::new(); + for (tag, builder) in self.heartbeat_monitor_builders { + heartbeat_monitors.insert( + tag, + Some(MonitorState::Available(builder.build( + tag, + self.internal_processing_cycle, + &allocator, + ))), + ); } + HealthMonitor { - deadline_monitors: monitors, + deadline_monitors, + heartbeat_monitors, worker: worker::UniqueThreadRunner::new(self.internal_processing_cycle), supervisor_api_cycle: self.supervisor_api_cycle, } @@ -87,27 +123,37 @@ impl HealthMonitorBuilder { // Used by FFI and config parsing code which prefer not to move builder instance pub(crate) fn add_deadline_monitor_internal(&mut self, tag: &IdentTag, monitor: deadline::DeadlineMonitorBuilder) { - self.deadlines.insert(*tag, monitor); + self.deadline_monitor_builders.insert(*tag, monitor); } - pub(crate) fn with_supervisor_api_cycle_internal(&mut self, cycle_duration: core::time::Duration) { + pub(crate) fn add_heartbeat_monitor_internal( + &mut self, + tag: &IdentTag, + monitor: heartbeat::HeartbeatMonitorBuilder, + ) { + self.heartbeat_monitor_builders.insert(*tag, monitor); + } + + pub(crate) fn with_supervisor_api_cycle_internal(&mut self, cycle_duration: Duration) { self.supervisor_api_cycle = cycle_duration; } - pub(crate) fn with_internal_processing_cycle_internal(&mut self, cycle_duration: core::time::Duration) { + pub(crate) fn with_internal_processing_cycle_internal(&mut self, cycle_duration: Duration) { self.internal_processing_cycle = cycle_duration; } } -enum DeadlineMonitorState { - Available(deadline::DeadlineMonitor), +/// Monitor ownership state in the [`HealthMonitor`]. +enum MonitorState { + Available(Monitor), Taken(common::MonitorEvalHandle), } pub struct HealthMonitor { - deadline_monitors: HashMap>, + deadline_monitors: HashMap>>, + heartbeat_monitors: HashMap>>, worker: worker::UniqueThreadRunner, - supervisor_api_cycle: core::time::Duration, + supervisor_api_cycle: Duration, } impl HealthMonitor { @@ -122,13 +168,30 @@ impl HealthMonitor { let monitor = self.deadline_monitors.get_mut(tag)?; match monitor.take() { - Some(DeadlineMonitorState::Available(deadline_monitor)) => { - monitor.replace(DeadlineMonitorState::Taken(deadline_monitor.get_eval_handle())); + Some(MonitorState::Available(deadline_monitor)) => { + monitor.replace(MonitorState::Taken(deadline_monitor.get_eval_handle())); Some(deadline_monitor) }, - Some(DeadlineMonitorState::Taken(v)) => { - monitor.replace(DeadlineMonitorState::Taken(v)); // Insert back + Some(MonitorState::Taken(v)) => { + monitor.replace(MonitorState::Taken(v)); // Insert back + None + }, + None => None, + } + } + + pub fn get_heartbeat_monitor(&mut self, tag: &IdentTag) -> Option { + let monitor = self.heartbeat_monitors.get_mut(tag)?; + + match monitor.take() { + Some(MonitorState::Available(heartbeat_monitor)) => { + monitor.replace(MonitorState::Taken(heartbeat_monitor.get_eval_handle())); + + Some(heartbeat_monitor) + }, + Some(MonitorState::Taken(v)) => { + monitor.replace(MonitorState::Taken(v)); // Insert back None }, None => None, @@ -145,19 +208,25 @@ impl HealthMonitor { /// /// Panics if no monitors have been added. pub fn start(&mut self) { + // Number of all monitors. + let num_monitors = self.deadline_monitors.len() + self.heartbeat_monitors.len(); + assert!( - !self.deadline_monitors.is_empty(), - "No deadline monitors have been added. HealthMonitor cannot start without any monitors." + num_monitors > 0, + "No monitors have been added. HealthMonitor cannot start without any monitors." ); - let mut monitors = containers::fixed_capacity::FixedCapacityVec::new(self.deadline_monitors.len()); + // Eval handles to all monitors. + let mut monitors = containers::fixed_capacity::FixedCapacityVec::new(num_monitors); + + // Start deadline monitors. for (tag, monitor) in self.deadline_monitors.iter_mut() { match monitor.take() { - Some(DeadlineMonitorState::Taken(handle)) => { + Some(MonitorState::Taken(handle)) => { monitors.push(handle).expect("Failed to push monitor handle"); // Should not fail since we preallocated enough capacity }, - Some(DeadlineMonitorState::Available(_)) => { + Some(MonitorState::Available(_)) => { panic!( "All monitors must be taken before starting HealthMonitor but {:?} is not taken.", tag @@ -172,16 +241,30 @@ impl HealthMonitor { } } - let monitoring_logic = worker::MonitoringLogic::new( - monitors, - self.supervisor_api_cycle, - // Currently only `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. - // The later is meant to be used for testing purposes. - #[cfg(not(any(test, feature = "stub_supervisor_api_client")))] - worker::ScoreSupervisorAPIClient::new(), - #[cfg(any(test, feature = "stub_supervisor_api_client"))] - worker::StubSupervisorAPIClient {}, - ); + // Start heartbeat monitors. + for (tag, monitor) in self.heartbeat_monitors.iter_mut() { + match monitor.take() { + Some(MonitorState::Taken(handle)) => { + monitors.push(handle).expect("Failed to push monitor handle"); + // Should not fail since we preallocated enough capacity + }, + Some(MonitorState::Available(_)) => { + panic!( + "All monitors must be taken before starting HealthMonitor but {:?} is not taken.", + tag + ); + }, + None => { + panic!( + "Invalid monitor ({:?}) state encountered while starting HealthMonitor.", + tag + ); + }, + } + } + + let monitoring_logic = + worker::MonitoringLogic::new(monitors, self.supervisor_api_cycle, SupervisorAPIClientImpl::new()); self.worker.start(monitoring_logic) } @@ -195,7 +278,7 @@ mod tests { use super::*; #[test] - #[should_panic(expected = "No deadline monitors have been added. HealthMonitor cannot start without any monitors.")] + #[should_panic(expected = "No monitors have been added. HealthMonitor cannot start without any monitors.")] fn hm_with_no_monitors_shall_panic_on_start() { let health_monitor_builder = super::HealthMonitorBuilder::new(); health_monitor_builder.build().start(); @@ -205,7 +288,7 @@ mod tests { #[should_panic(expected = "supervisor API cycle must be multiple of internal processing cycle")] fn hm_with_wrong_cycle_fails_to_build() { super::HealthMonitorBuilder::new() - .with_supervisor_api_cycle(core::time::Duration::from_millis(50)) + .with_supervisor_api_cycle(Duration::from_millis(50)) .build(); } diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs new file mode 100644 index 00000000..095e3d01 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/mod.rs @@ -0,0 +1,38 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +//! Module for selecting [`SupervisorAPIClient`] implementation. +//! Currently `ScoreSupervisorAPIClient` and `StubSupervisorAPIClient` are supported. +//! The latter is meant for testing purposes. + +/// An abstraction over the API used to notify the supervisor about process liveness. +pub trait SupervisorAPIClient { + fn notify_alive(&self); +} + +// Disallow both and none features. +#[cfg(any( + all(feature = "score_supervisor_api_client", feature = "stub_supervisor_api_client"), + not(any(feature = "score_supervisor_api_client", feature = "stub_supervisor_api_client")) +))] +compile_error!("Either 'score_supervisor_api_client' or 'stub_supervisor_api_client' must be enabled!"); + +#[cfg(feature = "score_supervisor_api_client")] +mod score_supervisor_api_client; +#[cfg(feature = "stub_supervisor_api_client")] +mod stub_supervisor_api_client; + +#[cfg(feature = "score_supervisor_api_client")] +pub use score_supervisor_api_client::ScoreSupervisorAPIClient as SupervisorAPIClientImpl; +#[cfg(feature = "stub_supervisor_api_client")] +pub use stub_supervisor_api_client::StubSupervisorAPIClient as SupervisorAPIClientImpl; diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs new file mode 100644 index 00000000..73872372 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/score_supervisor_api_client.rs @@ -0,0 +1,40 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::log::debug; +use crate::supervisor_api_client::SupervisorAPIClient; +use crate::worker::Checks; + +#[allow(dead_code)] +pub struct ScoreSupervisorAPIClient { + supervisor_link: monitor_rs::Monitor, +} + +unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution + +#[allow(dead_code)] +impl ScoreSupervisorAPIClient { + pub fn new() -> Self { + let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); + debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); + // This is only temporary usage so unwrap is fine here. + let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); + Self { supervisor_link } + } +} + +impl SupervisorAPIClient for ScoreSupervisorAPIClient { + fn notify_alive(&self) { + self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); + } +} diff --git a/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs new file mode 100644 index 00000000..a948c819 --- /dev/null +++ b/src/health_monitoring_lib/rust/supervisor_api_client/stub_supervisor_api_client.rs @@ -0,0 +1,32 @@ +// ******************************************************************************* +// Copyright (c) 2026 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// +// +// SPDX-License-Identifier: Apache-2.0 +// ******************************************************************************* + +use crate::log::warn; +use crate::supervisor_api_client::SupervisorAPIClient; + +/// A stub implementation of the SupervisorAPIClient that logs alive notifications. +#[allow(dead_code)] +pub struct StubSupervisorAPIClient; + +impl StubSupervisorAPIClient { + pub fn new() -> Self { + Self + } +} + +#[allow(dead_code)] +impl SupervisorAPIClient for StubSupervisorAPIClient { + fn notify_alive(&self) { + warn!("StubSupervisorAPIClient: notify_alive called"); + } +} diff --git a/src/health_monitoring_lib/rust/worker.rs b/src/health_monitoring_lib/rust/worker.rs index bed7091c..f193b486 100644 --- a/src/health_monitoring_lib/rust/worker.rs +++ b/src/health_monitoring_lib/rust/worker.rs @@ -10,18 +10,11 @@ // // SPDX-License-Identifier: Apache-2.0 // ******************************************************************************* -use crate::{common::MonitorEvaluator, log::debug}; +use crate::common::{MonitorEvalHandle, MonitorEvaluator}; +use crate::log::{info, warn}; +use crate::supervisor_api_client::SupervisorAPIClient; use containers::fixed_capacity::FixedCapacityVec; - -use crate::{ - common::MonitorEvalHandle, - log::{info, warn}, -}; - -/// An abstraction over the API used to notify the supervisor about process liveness. -pub(super) trait SupervisorAPIClient { - fn notify_alive(&self); -} +use std::time::Instant; pub(super) struct MonitoringLogic { monitors: FixedCapacityVec, @@ -49,12 +42,13 @@ impl MonitoringLogic { } } - fn run(&mut self) -> bool { + fn run(&mut self, hmon_starting_point: Instant) -> bool { let mut has_any_error = false; for monitor in self.monitors.iter() { - monitor.evaluate(&mut |tag, error| { + monitor.evaluate(hmon_starting_point, &mut |tag, error| { has_any_error = true; + // TODO: either monitors should be separated or their type should be mentioned. warn!("Monitor with tag {:?} reported error: {:?}.", tag, error); }); } @@ -99,6 +93,7 @@ impl UniqueThreadRunner { std::thread::spawn(move || { info!("Monitoring thread started."); + let hmon_starting_point = Instant::now(); let mut next_sleep_time = interval; // TODO Add some checks and log if cyclicly here is not met. @@ -107,7 +102,7 @@ impl UniqueThreadRunner { let now = std::time::Instant::now(); - if !monitoring_logic.run() { + if !monitoring_logic.run(hmon_starting_point) { info!("Monitoring logic failed, stopping thread."); break; } @@ -134,20 +129,9 @@ impl Drop for UniqueThreadRunner { } } -/// A stub implementation of the SupervisorAPIClient that logs alive notifications. -#[allow(dead_code)] -pub(super) struct StubSupervisorAPIClient; - -#[allow(dead_code)] -impl SupervisorAPIClient for StubSupervisorAPIClient { - fn notify_alive(&self) { - warn!("StubSupervisorAPIClient: notify_alive called"); - } -} - #[allow(dead_code)] #[derive(Copy, Clone)] -enum Checks { +pub(crate) enum Checks { WorkerCheckpoint, } @@ -159,33 +143,9 @@ impl From for u32 { } } -#[allow(dead_code)] -pub(super) struct ScoreSupervisorAPIClient { - supervisor_link: monitor_rs::Monitor, -} - -unsafe impl Send for ScoreSupervisorAPIClient {} // Just assuming it's safe to send across threads, this is a temporary solution - -#[allow(dead_code)] -impl ScoreSupervisorAPIClient { - pub fn new() -> Self { - let value = std::env::var("IDENTIFIER").expect("IDENTIFIER env not set"); - debug!("ScoreSupervisorAPIClient: Creating with IDENTIFIER={}", value); - // This is only temporary usage so unwrap is fine here. - let supervisor_link = monitor_rs::Monitor::::new(&value).expect("Failed to create supervisor_link"); - Self { supervisor_link } - } -} -impl SupervisorAPIClient for ScoreSupervisorAPIClient { - fn notify_alive(&self) { - self.supervisor_link.report_checkpoint(Checks::WorkerCheckpoint); - } -} - #[score_testing_macros::test_mod_with_log] #[cfg(test)] mod tests { - use crate::{ deadline::{DeadlineMonitor, DeadlineMonitorBuilder}, protected_memory::ProtectedMemoryAllocator, @@ -238,6 +198,7 @@ mod tests { fn monitoring_logic_report_error_when_deadline_failed() { let deadline_monitor = create_monitor_with_deadlines(); let alive_mock = MockSupervisorAPIClient::new(); + let hmon_starting_point = Instant::now(); let mut logic = MonitoringLogic::new( { @@ -254,7 +215,7 @@ mod tests { drop(handle); - assert!(!logic.run()); + assert!(!logic.run(hmon_starting_point)); assert_eq!(alive_mock.get_notify_count(), 0); } @@ -262,6 +223,7 @@ mod tests { fn monitoring_logic_report_alive_on_each_call_when_no_error() { let deadline_monitor = create_monitor_with_deadlines(); let alive_mock = MockSupervisorAPIClient::new(); + let hmon_starting_point = Instant::now(); let mut logic = MonitoringLogic::new( { @@ -276,11 +238,11 @@ mod tests { let mut deadline = deadline_monitor.get_deadline(&IdentTag::from("deadline_long")).unwrap(); let _handle = deadline.start().unwrap(); - assert!(logic.run()); - assert!(logic.run()); - assert!(logic.run()); - assert!(logic.run()); - assert!(logic.run()); + assert!(logic.run(hmon_starting_point)); + assert!(logic.run(hmon_starting_point)); + assert!(logic.run(hmon_starting_point)); + assert!(logic.run(hmon_starting_point)); + assert!(logic.run(hmon_starting_point)); assert_eq!(alive_mock.get_notify_count(), 5); } @@ -289,6 +251,7 @@ mod tests { fn monitoring_logic_report_alive_respect_cycle() { let deadline_monitor = create_monitor_with_deadlines(); let alive_mock = MockSupervisorAPIClient::new(); + let hmon_starting_point = Instant::now(); let mut logic = MonitoringLogic::new( { @@ -304,19 +267,19 @@ mod tests { let _handle = deadline.start().unwrap(); std::thread::sleep(core::time::Duration::from_millis(30)); - assert!(logic.run()); + assert!(logic.run(hmon_starting_point)); std::thread::sleep(core::time::Duration::from_millis(30)); - assert!(logic.run()); + assert!(logic.run(hmon_starting_point)); std::thread::sleep(core::time::Duration::from_millis(30)); - assert!(logic.run()); + assert!(logic.run(hmon_starting_point)); std::thread::sleep(core::time::Duration::from_millis(30)); - assert!(logic.run()); + assert!(logic.run(hmon_starting_point)); std::thread::sleep(core::time::Duration::from_millis(30)); - assert!(logic.run()); + assert!(logic.run(hmon_starting_point)); assert_eq!(alive_mock.get_notify_count(), 5); }