diff --git a/Cargo.Bazel.json.lock b/Cargo.Bazel.json.lock index 96ba8cd0936b..68d3c4f1bde1 100644 --- a/Cargo.Bazel.json.lock +++ b/Cargo.Bazel.json.lock @@ -1,5 +1,5 @@ { - "checksum": "7f96e5c133c203870fc997d52e080f72044bd53f1530c5332ba2230232e22d68", + "checksum": "ff7f44505ebc5f13c6c3d5c53d2dffc80669e43460f22816aff45ab32ec31178", "crates": { "abnf 0.12.0": { "name": "abnf", @@ -37763,6 +37763,18 @@ ] } } + }, + { + "Binary": { + "crate_name": "ic-gateway", + "crate_root": "src/main.rs", + "srcs": { + "allow_empty": true, + "include": [ + "**/*.rs" + ] + } + } } ], "library_target_name": "ic_gateway", @@ -100978,6 +100990,7 @@ }, "binary_crates": [ "canbench 0.4.1", + "ic-gateway 0.2.0", "ic-wasm 0.9.11", "metrics-proxy 0.1.0" ], diff --git a/Cargo.lock b/Cargo.lock index 324d36b00ebe..d9763c7e79dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2739,6 +2739,8 @@ dependencies = [ "ic_consensus_system_test_utils", "ic_consensus_threshold_sig_system_test_utils", "registry-canister", + "reqwest", + "serde_cbor", "slog", "tempfile", "tokio", diff --git a/bazel/rust.MODULE.bazel b/bazel/rust.MODULE.bazel index ce659d446c2c..4dec0002247d 100644 --- a/bazel/rust.MODULE.bazel +++ b/bazel/rust.MODULE.bazel @@ -2179,6 +2179,10 @@ crate.annotation( crate = "metrics-proxy", gen_binaries = ["metrics-proxy"], ) +crate.annotation( + crate = "ic-gateway", + gen_binaries = ["ic-gateway"], +) crate.splicing_config( resolver_version = "2", ) diff --git a/ic-os/components/guestos.bzl b/ic-os/components/guestos.bzl index a074c864aaf4..bfe19faf31fc 100644 --- a/ic-os/components/guestos.bzl +++ b/ic-os/components/guestos.bzl @@ -53,6 +53,7 @@ def component_files(mode): Label("guestos/remote-attestation-server.service"): "/etc/systemd/system/remote-attestation-server.service", Label("guestos/generate-ic-config/generate-ic-config.service"): "/etc/systemd/system/generate-ic-config.service", Label("guestos/share/ic-boundary.env"): "/opt/ic/share/ic-boundary.env", + Label("guestos/share/ic-gateway.env"): "/opt/ic/share/ic-gateway.env", Label("guestos/share/nns_public_key.pem"): "/opt/ic/share/nns_public_key.pem", # init diff --git a/ic-os/components/guestos/ic-replica.service b/ic-os/components/guestos/ic-replica.service index 71cde05416f5..7657e20e4e4b 100644 --- a/ic-os/components/guestos/ic-replica.service +++ b/ic-os/components/guestos/ic-replica.service @@ -20,7 +20,7 @@ User=ic-replica Environment=RUST_BACKTRACE=1 Environment=RUST_MIN_STACK=8192000 -ExecStart=/opt/ic/bin/orchestrator --replica-binary-dir /var/lib/ic/data/images --cup-dir /var/lib/ic/data/cups --replica-config-file /run/ic-node/config/ic.json5 --enable-provisional-registration --ic-binary-directory /opt/ic/bin --orchestrator-data-directory /var/lib/ic/data/orchestrator --version-file /opt/ic/share/version.txt +ExecStart=/opt/ic/bin/orchestrator --replica-binary-dir /var/lib/ic/data/images --cup-dir /var/lib/ic/data/cups --replica-config-file /run/ic-node/config/ic.json5 --ic-boundary-env-file /opt/ic/share/ic-boundary.env --ic-gateway-env-file /opt/ic/share/ic-gateway.env --enable-provisional-registration --ic-binary-directory /opt/ic/bin --orchestrator-data-directory /var/lib/ic/data/orchestrator --version-file /opt/ic/share/version.txt LimitNOFILE=16777216 Restart=always RestartSec=10 diff --git a/ic-os/components/guestos/share/ic-gateway.env b/ic-os/components/guestos/share/ic-gateway.env new file mode 100644 index 000000000000..67aef4b25b29 --- /dev/null +++ b/ic-os/components/guestos/share/ic-gateway.env @@ -0,0 +1,5 @@ +LISTEN_PLAIN=[::]:80 +LISTEN_INSECURE_SERVE_HTTP_ONLY=true +METRICS_LISTEN=[::]:9314 +IC_URL=http://127.0.0.1:8080 +DOMAIN=gateway.icp diff --git a/ic-os/guestos/defs.bzl b/ic-os/guestos/defs.bzl index 19ee4989fdcd..880484ad4cb5 100644 --- a/ic-os/guestos/defs.bzl +++ b/ic-os/guestos/defs.bzl @@ -40,6 +40,7 @@ def image_deps(mode, malicious = False): "//publish/binaries:orchestrator": "/opt/ic/bin/orchestrator:0755", # Replica process manager, required by the IC protocol (upgrades, node addition, etc). ("//publish/malicious:replica" if malicious else "//publish/binaries:replica"): "/opt/ic/bin/replica:0755", # Main protocol binary, required by the IC protocol. Installs the malicious replica iff set only in test builds. "//publish/binaries:ic-boundary": "/opt/ic/bin/ic-boundary:0755", # API boundary node binary, required by the IC protocol. The same GuestOS is used both for the replica and API boundary nodes. + "//rs/ic_os/release:ic-gateway": "/opt/ic/bin/ic-gateway:0755", # IC-gateway binary, required by cloud engine nodes, who run it as a sidecar to the replica. "//publish/binaries:ic-consensus-pool-util": "/opt/ic/bin/ic-consensus-pool-util:0755", # May be used during recoveries to export/import consensus pool artifacts. "//publish/binaries:ic-recovery": "/opt/ic/bin/ic-recovery:0755", # Required for performing subnet recoveries on the node directly. "//publish/binaries:state-tool": "/opt/ic/bin/state-tool:0755", # May be used during recoveries for calculating the state hash and inspecting the state more generally. diff --git a/ic-os/guestos/envs/prod/BUILD.bazel b/ic-os/guestos/envs/prod/BUILD.bazel index decf8bbc02c9..d1b0c0c2ff28 100644 --- a/ic-os/guestos/envs/prod/BUILD.bazel +++ b/ic-os/guestos/envs/prod/BUILD.bazel @@ -18,19 +18,19 @@ icos_images = icos_build( file_size_check( name = "disk_img_size_check", file = icos_images.disk_image, - max_file_size = 450 * 1000 * 1000, # 419 MB on 2025-03-21 + max_file_size = 475 * 1000 * 1000, # 453 MB on 2026-06-18 ) file_size_check( name = "update_img_size_check", file = icos_images.update_image, - max_file_size = 450 * 1000 * 1000, # 416 MB on 2025-03-21 + max_file_size = 475 * 1000 * 1000, # 451 MB on 2026-06-18 ) file_size_check( name = "update_img_test_size_check", file = icos_images.update_image_test, - max_file_size = 450 * 1000 * 1000, # 417 MB on 2025-06-26 + max_file_size = 475 * 1000 * 1000, # 451 MB on 2026-06-18 ) # Export checksums & build artifacts diff --git a/ic-os/guestos/envs/recovery/BUILD.bazel b/ic-os/guestos/envs/recovery/BUILD.bazel index 00d32b16d7fc..ea84ce367abf 100644 --- a/ic-os/guestos/envs/recovery/BUILD.bazel +++ b/ic-os/guestos/envs/recovery/BUILD.bazel @@ -21,7 +21,7 @@ icos_images = icos_build( file_size_check( name = "disk_img_size_check", file = icos_images.disk_image, - max_file_size = 450 * 1000 * 1000, # 419 MB on 2025-06-26 + max_file_size = 475 * 1000 * 1000, # 453 MB on 2026-06-18 tags = [ "manual", "no-cache", @@ -31,7 +31,7 @@ file_size_check( file_size_check( name = "update_img_size_check", file = icos_images.update_image, - max_file_size = 450 * 1000 * 1000, # 417 MB on 2025-06-26 + max_file_size = 475 * 1000 * 1000, # 451 MB on 2026-06-18 tags = [ "manual", "no-cache", @@ -41,7 +41,7 @@ file_size_check( file_size_check( name = "update_img_test_size_check", file = icos_images.update_image_test, - max_file_size = 450 * 1000 * 1000, # 417 MB on 2025-06-26 + max_file_size = 475 * 1000 * 1000, # 451 MB on 2026-06-18 tags = [ "manual", "no-cache", diff --git a/rs/ic_os/release/BUILD.bazel b/rs/ic_os/release/BUILD.bazel index 27a70f3f07f0..1277aec21795 100644 --- a/rs/ic_os/release/BUILD.bazel +++ b/rs/ic_os/release/BUILD.bazel @@ -18,6 +18,7 @@ OBJECTS = { "metrics-proxy": "@crate_index//:metrics-proxy__metrics-proxy", "nss_icos": "//rs/ic_os/networking/nss_icos", "custom_metrics": "//rs/ic_os/metrics/custom_metrics:custom_metrics_bin", + "ic-gateway": "@crate_index//:ic-gateway__ic-gateway", } [release_strip_binary( diff --git a/rs/orchestrator/src/args.rs b/rs/orchestrator/src/args.rs index 4ea245933fa9..43e35f980dc5 100644 --- a/rs/orchestrator/src/args.rs +++ b/rs/orchestrator/src/args.rs @@ -27,11 +27,19 @@ pub struct OrchestratorArgs { #[clap(long)] pub(crate) replica_config_file: PathBuf, + /// The path to the IC boundary environment file + #[clap(long)] + pub(crate) ic_boundary_env_file: PathBuf, + + /// The path to the IC gateway environment file + #[clap(long)] + pub(crate) ic_gateway_env_file: PathBuf, + /// The path to the Replica binary location containing the following in case - /// of guest OS deployment: version.txt, manageboot.sh, replica, + /// of guest OS deployment: replica, ic-boundary, ic-gateway, manageboot.sh, /// install-upgrade.sh #[clap(long)] - pub(crate) ic_binary_directory: Option, + pub(crate) ic_binary_directory: PathBuf, /// If not set, the default listen addr (0.0.0.0:[`PROMETHEUS_HTTP_PORT`]) /// will be used to export metrics. diff --git a/rs/orchestrator/src/boundary_node.rs b/rs/orchestrator/src/boundary_node.rs index 895592ec5337..0394d218c517 100644 --- a/rs/orchestrator/src/boundary_node.rs +++ b/rs/orchestrator/src/boundary_node.rs @@ -1,80 +1,35 @@ use crate::{ - error::{OrchestratorError, OrchestratorResult}, - metrics::OrchestratorMetrics, - process_manager::{Process, ProcessManager, ProcessManagerImpl}, + error::OrchestratorError, + process_manager::Process, + processes::{IcBoundaryManager, IcBoundaryProcess}, registry_helper::RegistryHelper, }; -use ic_config::crypto::CryptoConfig; -use ic_logger::{ReplicaLogger, info, warn}; +use ic_logger::{ReplicaLogger, warn}; use ic_types::{NodeId, ReplicaVersion}; -use std::{ - collections::HashMap, - ffi::OsString, - path::{Path, PathBuf}, - sync::{Arc, Mutex}, -}; - -struct BoundaryNodeProcess { - version: ReplicaVersion, - binary: PathBuf, - args: Vec, - env: HashMap, -} - -impl Process for BoundaryNodeProcess { - const NAME: &'static str = "Boundary Node"; - - type Version = ReplicaVersion; - - fn get_version(&self) -> &Self::Version { - &self.version - } - - fn get_binary(&self) -> &Path { - &self.binary - } - - fn get_args(&self) -> &[OsString] { - &self.args - } - - fn get_env(&self) -> HashMap { - self.env.clone() - } -} +use std::sync::Arc; pub(crate) struct BoundaryNodeManager { registry: Arc, - _metrics: Arc, - process: Arc>>, - ic_binary_dir: PathBuf, - crypto_config: CryptoConfig, + process_manager: IcBoundaryManager, version: ReplicaVersion, - logger: ReplicaLogger, node_id: NodeId, - domain_name: Option, + logger: ReplicaLogger, } impl BoundaryNodeManager { pub(crate) fn new( registry: Arc, - metrics: Arc, + process_manager: IcBoundaryManager, version: ReplicaVersion, node_id: NodeId, - ic_binary_dir: PathBuf, - crypto_config: CryptoConfig, logger: ReplicaLogger, ) -> Self { Self { registry, - _metrics: metrics, - process: Arc::new(Mutex::new(ProcessManagerImpl::new(logger.clone()))), - ic_binary_dir, - crypto_config, + process_manager, version, logger, node_id, - domain_name: None, } } @@ -94,120 +49,37 @@ impl BoundaryNodeManager { ); // NOTE: We could also shutdown the boundary node here. However, it makes sense to continue // serving requests while the orchestrator is downloading the new image in most cases. - } else { - match self.registry.get_node_domain_name(registry_version) { - Ok(Some(domain_name)) => { - let domain_name = Some(domain_name); - - // stop ic-boundary when the domain name changes and start it again. - if domain_name != self.domain_name { - if let Err(err) = self.ensure_boundary_node_stopped() { - warn!(self.logger, "Failed to stop Boundary Node: {}", err); - } - self.domain_name = domain_name; - } - - // make sure the boundary node is running - if let Err(err) = self.ensure_boundary_node_running(&self.version) { - warn!(self.logger, "Failed to start Boundary Node: {}", err); - } - } - // BN should not be active when the node doesn't have a domain name - Ok(None) => { - warn!( - self.logger, - "There is no domain associated with the node, while this is a requirement for the API boundary node. Shutting ic-boundary down." - ); - if let Err(err) = self.ensure_boundary_node_stopped() { - warn!(self.logger, "Failed to stop Boundary Node: {}", err); - } - self.domain_name = None; - } - // Failing to read the registry - Err(err) => warn!( - self.logger, - "Failed to fetch Boundary Node domain name: {}", err - ), - } + } else if let Err(err) = self + .process_manager + .ensure_ic_boundary_running_and_restarted_on_domain_change( + self.version.clone(), + registry_version, + ) + { + warn!( + self.logger, + "Failed to ensure {} is running: {}", + IcBoundaryProcess::NAME, + err + ); } } // BN should not be active Err(OrchestratorError::ApiBoundaryNodeMissingError(_, _)) => { - if let Err(err) = self.ensure_boundary_node_stopped() { - warn!(self.logger, "Failed to stop Boundary Node: {}", err); + if let Err(err) = self.process_manager.stop() { + warn!( + self.logger, + "Failed to stop {}: {}", + IcBoundaryProcess::NAME, + err + ); } } // Failing to read the registry Err(err) => warn!( self.logger, - "Failed to fetch Boundary Node version: {}", err + "Failed to fetch API Boundary Node version: {}", err ), } } - - /// Start the current boundary node process - fn ensure_boundary_node_running(&self, version: &ReplicaVersion) -> OrchestratorResult<()> { - let mut process = self.process.lock().unwrap(); - - if process.is_running() { - return Ok(()); - } - info!(self.logger, "Starting new boundary node process"); - - let binary = self.ic_binary_dir.join("ic-boundary"); - - let domain_name = self - .domain_name - .as_ref() - .ok_or_else(|| OrchestratorError::DomainNameMissingError(self.node_id))?; - - let env = match env_file_reader::read_file("/opt/ic/share/ic-boundary.env") { - Ok(env) => env - .into_iter() - .map(|(k, v)| (OsString::from(k), OsString::from(v))) - .collect(), - Err(e) => { - return Err(OrchestratorError::IoError( - "unable to read ic-boundary environment variables".to_string(), - e, - )); - } - }; - - let args = vec![ - format!("--tls-hostname={}", domain_name).into(), - format!( - "--crypto-config={}", - serde_json::to_string(&self.crypto_config) - .map_err(OrchestratorError::SerializeCryptoConfigError)? - ) - .into(), - ]; - - process - .start(BoundaryNodeProcess { - version: version.clone(), - binary, - args, - env, - }) - .map_err(|e| { - OrchestratorError::IoError( - "Error when attempting to start new boundary node".into(), - e, - ) - }) - } - - /// Stop the current boundary node process. - fn ensure_boundary_node_stopped(&self) -> OrchestratorResult<()> { - let mut process = self.process.lock().unwrap(); - if process.is_running() { - return process.stop().map_err(|e| { - OrchestratorError::IoError("Error when attempting to stop boundary node".into(), e) - }); - } - - Ok(()) - } } diff --git a/rs/orchestrator/src/dashboard.rs b/rs/orchestrator/src/dashboard.rs index 71c3b038be73..0ec22e5b6a86 100644 --- a/rs/orchestrator/src/dashboard.rs +++ b/rs/orchestrator/src/dashboard.rs @@ -1,7 +1,7 @@ use crate::{ catch_up_package_provider::LocalCUPReader, orchestrator::SubnetAssignment, - process_manager::ProcessManager, registry_helper::RegistryHelper, - ssh_access_manager::SshAccessParameters, upgrade::ReplicaProcess, + processes::MultipleProcessesManager, registry_helper::RegistryHelper, + ssh_access_manager::SshAccessParameters, }; pub use ic_dashboard::Dashboard; use ic_logger::{ReplicaLogger, info, warn}; @@ -11,7 +11,7 @@ use ic_types::{ }; use std::{ process::Command, - sync::{Arc, Mutex, RwLock}, + sync::{Arc, RwLock}, }; const ORCHESTRATOR_DASHBOARD_PORT: u16 = 7070; @@ -24,7 +24,7 @@ pub(crate) struct OrchestratorDashboard { last_applied_firewall_version: Arc>, last_applied_ipv4_config_version: Arc>, last_poll_certified_time: Arc>, - replica_process: Arc>>, + processes_manager: Arc>, subnet_assignment: Arc>, replica_version: ReplicaVersion, hostos_version: Option, @@ -45,6 +45,7 @@ impl Dashboard for OrchestratorDashboard { last poll's certified time: {}\n\ subnet id: {}\n\ replica process id: {}\n\ + ic-gateway process id: {}\n\ replica version: {}\n\ host os version: {}\n\ scheduled upgrade: {}\n\ @@ -60,7 +61,8 @@ impl Dashboard for OrchestratorDashboard { self.registry.get_latest_version().get(), self.get_last_poll_certified_time(), self.get_subnet_id(), - self.get_pid(), + self.get_replica_pid(), + self.get_ic_gateway_pid(), self.replica_version, self.hostos_version .as_ref() @@ -90,7 +92,7 @@ impl OrchestratorDashboard { last_applied_firewall_version: Arc>, last_applied_ipv4_config_version: Arc>, last_poll_certified_time: Arc>, - replica_process: Arc>>, + processes_manager: Arc>, subnet_assignment: Arc>, replica_version: ReplicaVersion, hostos_version: Option, @@ -104,7 +106,7 @@ impl OrchestratorDashboard { last_applied_firewall_version, last_applied_ipv4_config_version, last_poll_certified_time, - replica_process, + processes_manager, subnet_assignment, replica_version, hostos_version, @@ -134,8 +136,15 @@ impl OrchestratorDashboard { ) } - fn get_pid(&self) -> String { - match self.replica_process.lock().unwrap().get_pid() { + fn get_replica_pid(&self) -> String { + match self.processes_manager.read().unwrap().get_replica_pid() { + Some(pid) => pid.to_string(), + None => "None".to_string(), + } + } + + fn get_ic_gateway_pid(&self) -> String { + match self.processes_manager.read().unwrap().get_ic_gateway_pid() { Some(pid) => pid.to_string(), None => "None".to_string(), } diff --git a/rs/orchestrator/src/error.rs b/rs/orchestrator/src/error.rs index 8359d2d18c57..39de413921ec 100644 --- a/rs/orchestrator/src/error.rs +++ b/rs/orchestrator/src/error.rs @@ -79,8 +79,8 @@ pub(crate) enum OrchestratorError { /// at the given registry version. RoleError(String, RegistryVersion), - /// The given node is missing a domain name - DomainNameMissingError(NodeId), + /// The given node is missing a domain name at the given registry version. + DomainNameMissingError(NodeId, RegistryVersion), } impl OrchestratorError { @@ -171,8 +171,11 @@ impl fmt::Display for OrchestratorError { "Failed to get the role of the node at the registry version {registry_version}: {msg}" ) } - OrchestratorError::DomainNameMissingError(node_id) => { - write!(f, "Node {node_id} does not have an associated domain name") + OrchestratorError::DomainNameMissingError(node_id, registry_version) => { + write!( + f, + "Node {node_id} does not have an associated domain name at registry version {registry_version}" + ) } } } diff --git a/rs/orchestrator/src/lib.rs b/rs/orchestrator/src/lib.rs index 035fc0772b1d..b796ff1ebd74 100644 --- a/rs/orchestrator/src/lib.rs +++ b/rs/orchestrator/src/lib.rs @@ -43,6 +43,7 @@ mod ipv4_network; mod metrics; pub mod orchestrator; mod process_manager; +mod processes; mod registration; mod registry_helper; mod signer; diff --git a/rs/orchestrator/src/metrics.rs b/rs/orchestrator/src/metrics.rs index 3da83c17dcd5..66dfccaab41d 100644 --- a/rs/orchestrator/src/metrics.rs +++ b/rs/orchestrator/src/metrics.rs @@ -19,7 +19,8 @@ pub(crate) struct OrchestratorMetrics { pub(crate) critical_error_state_removal_failed: IntCounter, pub(crate) fstrim_duration: IntGauge, pub(crate) critical_error_task_failed: IntCounterVec, - pub(crate) replica_process_start_attempts: IntCounter, + pub(crate) processes_start_attempts: IntCounterVec, + pub(crate) processes_stop_attempts: IntCounterVec, } #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, EnumIter, AsRefStr)] @@ -107,9 +108,15 @@ impl OrchestratorMetrics { grouped by the task name and the reason of the failure", &["task_name", "reason"], ), - replica_process_start_attempts: metrics_registry.int_counter( - "orchestrator_replica_process_start_attempts_total", - "Number of times the replica process was attempted to be started", + processes_start_attempts: metrics_registry.int_counter_vec( + "orchestrator_processes_start_attempts_total", + "Number of times a process was attempted to be started", + &["process_name"], + ), + processes_stop_attempts: metrics_registry.int_counter_vec( + "orchestrator_processes_stop_attempts_total", + "Number of times a process was attempted to be stopped", + &["process_name"], ), } } diff --git a/rs/orchestrator/src/orchestrator.rs b/rs/orchestrator/src/orchestrator.rs index 5d7824716203..053eb54d7056 100644 --- a/rs/orchestrator/src/orchestrator.rs +++ b/rs/orchestrator/src/orchestrator.rs @@ -7,7 +7,10 @@ use crate::{ hostos_upgrade::HostosUpgrader, ipv4_network::Ipv4Configurator, metrics::OrchestratorMetrics, - process_manager::ProcessManagerImpl, + processes::{ + IcBoundaryManager, IcBoundaryProcessConfig, IcGatewayProcessConfig, + MultipleProcessesManager, ReplicaProcessConfig, + }, registration::NodeRegistration, registry_helper::RegistryHelper, ssh_access_manager::SshAccessManager, @@ -35,8 +38,8 @@ use std::{ convert::TryFrom, future::Future, net::{Ipv4Addr, Ipv6Addr, SocketAddr}, - path::{Path, PathBuf}, - sync::{Arc, Mutex, RwLock}, + path::Path, + sync::{Arc, RwLock}, thread, time::Duration, }; @@ -240,14 +243,8 @@ impl Orchestrator { Arc::clone(&crypto) as _, ); - let replica_process = Arc::new(Mutex::new(ProcessManagerImpl::new(logger.clone()))); - let ic_binary_directory = args - .ic_binary_directory - .as_ref() - .unwrap_or(&PathBuf::from("/tmp")) - .clone(); let manageboot_runner = Box::new(ManagebootRunnerImpl::new( - ic_binary_directory.join("manageboot.sh"), + args.ic_binary_directory.join("manageboot.sh"), )); // Create a read-only CUP reader that can be shared among Dashboard and Firewall @@ -264,6 +261,24 @@ impl Orchestrator { node_id, ); + let replica_process_config = ReplicaProcessConfig { + ic_binary_dir: args.ic_binary_directory.clone(), + cup_path: local_cup_reader.get_cup_path(), + replica_config_file: args.replica_config_file.clone(), + }; + let ic_gateway_process_config = IcGatewayProcessConfig { + ic_binary_dir: args.ic_binary_directory.clone(), + ic_gateway_env_file: args.ic_gateway_env_file.clone(), + }; + + let processes_manager = Arc::new(RwLock::new(MultipleProcessesManager::new( + replica_process_config, + ic_gateway_process_config, + Arc::clone(®istry), + Arc::clone(&metrics), + logger.clone(), + ))); + if args.enable_provisional_registration { // will not return until the node is registered registration.register_node().await; @@ -281,14 +296,13 @@ impl Orchestrator { Upgrade::new( Arc::clone(®istry) as _, Arc::clone(&metrics), - Arc::clone(&replica_process) as _, + Arc::clone(&processes_manager), manageboot_runner, cup_provider, Arc::clone(&subnet_assignment), replica_version.clone(), args.replica_config_file.clone(), node_id, - ic_binary_directory.clone(), Arc::clone(®istry_replicator) as _, args.replica_binary_dir.clone(), logger.clone(), @@ -324,13 +338,22 @@ impl Orchestrator { ), }; - let boundary_node = BoundaryNodeManager::new( + let ic_boundary_process_config = IcBoundaryProcessConfig { + ic_binary_dir: args.ic_binary_directory.clone(), + ic_boundary_env_file: args.ic_boundary_env_file.clone(), + crypto_config: config.crypto.clone(), + }; + let ic_boundary_manager = IcBoundaryManager::new( + ic_boundary_process_config, Arc::clone(®istry), Arc::clone(&metrics), + logger.clone(), + ); + let boundary_node = BoundaryNodeManager::new( + Arc::clone(®istry), + ic_boundary_manager, replica_version.clone(), node_id, - ic_binary_directory.clone(), - config.crypto.clone(), logger.clone(), ); @@ -347,7 +370,7 @@ impl Orchestrator { let ipv4_configurator = Ipv4Configurator::new( Arc::clone(®istry), Arc::clone(&metrics), - ic_binary_directory, + args.ic_binary_directory, logger.clone(), ); @@ -365,7 +388,7 @@ impl Orchestrator { firewall.get_last_applied_version(), ipv4_configurator.get_last_applied_version(), registry_replicator.get_latest_certified_time(), - replica_process, + processes_manager, Arc::clone(&subnet_assignment), replica_version, hostos_version.ok(), @@ -483,10 +506,11 @@ impl Orchestrator { } info!(log, "Shut down the upgrade loop"); - if let Err(e) = upgrade.stop_replica() { - warn!(log, "Failed to stop the replica process: {e}"); + if let Err(e) = upgrade.stop_children() { + warn!(log, "Failed to stop child processes: {e}"); + } else { + info!(log, "Shut down the child processes"); } - info!(log, "Shut down the replica process"); } async fn hostos_upgrade_checks( diff --git a/rs/orchestrator/src/process_manager.rs b/rs/orchestrator/src/process_manager.rs index e0ed38088494..f5472857d34c 100644 --- a/rs/orchestrator/src/process_manager.rs +++ b/rs/orchestrator/src/process_manager.rs @@ -8,17 +8,20 @@ use std::{ ffi::OsString, fmt::Debug, io::Result, - path::Path, + os::unix::process::CommandExt, + path::PathBuf, sync::{Arc, Mutex}, }; +use crate::error::OrchestratorResult; + type PIDCell = Arc>>; -/// Captures a process that should be run by the [`ProcessManager`] +/// Captures a process that should be run by a [`ProcessRunner`] pub(crate) trait Process { /// Name of the type of process /// - /// Used only for logging purposes + /// Used for logging and metrics const NAME: &'static str; /// Version type of the process @@ -27,22 +30,34 @@ pub(crate) trait Process { /// We only impose that we can check that versions are equal and have /// a debug representation type Version: Eq + Debug; + /// Static configuration of the process, such as the path to the binary + /// and static arguments. + type Config; + /// Dynamic arguments of the process, such as the subnet ID for the replica + /// (which could change across the orchestrator's lifetime). + type Args; + + /// Build a new instance of the process with the given configuration and + /// arguments. + fn build(config: &Self::Config, args: Self::Args) -> OrchestratorResult + where + Self: Sized; /// Return the version of the [`Process`] fn get_version(&self) -> &Self::Version; /// Return the path to the binary of the [`Process`] - fn get_binary(&self) -> &Path; + fn get_binary(&self) -> PathBuf; /// Return the arguments passed to the [`Process`] - fn get_args(&self) -> &[OsString]; + fn get_args(&self) -> Vec; /// Return the env vars passed to the [`Process`] fn get_env(&self) -> HashMap; } -/// Trait for managing a single versioned [`Process`] -pub(crate) trait ProcessManager: Send { +/// Trait for running a single versioned [`Process`] +pub(crate) trait ProcessRunner: Send { /// Start the given process. fn start(&mut self, process: P) -> Result<()>; @@ -57,15 +72,15 @@ pub(crate) trait ProcessManager: Send { fn get_pid(&self) -> Option; } -/// A [`ProcessManagerImpl`] manages running a single versioned [`Process`] -pub(crate) struct ProcessManagerImpl { +/// A [`SingleProcessRunner`] manages running a single versioned [`Process`] +pub(crate) struct SingleProcessRunner { process: Option

, pid_cell: PIDCell, log: ReplicaLogger, join_handle: Option>, } -impl ProcessManagerImpl

{ +impl SingleProcessRunner

{ pub(crate) fn new(logger: ReplicaLogger) -> Self { Self { process: None, @@ -91,14 +106,17 @@ impl ProcessManagerImpl

{ /// running, a log message is printed. /// /// It is critical that we signal and terminate the whole - /// process group of which the [`Process`] should be the - /// leader. The process may spawn other - /// sub-processes under the same process group. For correctness - /// -- the processes may access state file paths and - /// handles -- it is important we signal the sub-processes - /// processes too. This is possible because we shall - /// restrict setgpid() in production -- by default disabled - /// by SELinux type enforcement. + /// process group of which the [`Process`] is the leader. The + /// process may spawn other sub-processes under the same process + /// group. For correctness -- the processes may access state file + /// paths and handles -- it is important we signal the sub-processes + /// too. + /// + /// We guarantee that the [`Process`] is its own process group leader + /// (so its PID equals its PGID, which is what the negation below + /// relies on) by setting its process group at spawn time via + /// `Command::process_group(0)` -- see `start`. We therefore do not + /// rely on the managed binary calling `setpgid` itself. /// /// We still depend on init to handle reaping of adopted children, /// as the orchestrator has no way of adopting or even knowing the @@ -125,7 +143,7 @@ impl ProcessManagerImpl

{ } } -impl ProcessManager

for ProcessManagerImpl

{ +impl ProcessRunner

for SingleProcessRunner

{ fn start(&mut self, process: P) -> Result<()> { // Do nothing if we're already running a process with the requested version if let Some(current_version) = self.process.as_ref().map(|p| p.get_version()) @@ -166,6 +184,16 @@ impl ProcessManager

for ProcessManagerImpl

{ let child = std::process::Command::new(process.get_binary()) .args(process.get_args()) .envs(process.get_env()) + // Put the child into a new process group of which it is the + // leader (PGID == PID). Any sub-processes it spawns inherit this + // group, which lets `kill()` reliably signal the whole group by + // negating the PID. We establish the group here, in the + // orchestrator, rather than relying on each managed binary to + // call `setpgid` itself. This is equivalent to `setpgid(0, 0)` + // run in the forked child before `exec`, while it is still in + // the orchestrator's SELinux domain -- which is permitted to set + // its own process group. + .process_group(0) .spawn()?; debug!(self.log, "Process started. Pid: {}", child.id()); self.set_pid(Pid::from_raw(child.id() as i32)); diff --git a/rs/orchestrator/src/processes.rs b/rs/orchestrator/src/processes.rs new file mode 100644 index 000000000000..68b375c05f50 --- /dev/null +++ b/rs/orchestrator/src/processes.rs @@ -0,0 +1,718 @@ +use crate::{ + error::{OrchestratorError, OrchestratorResult}, + metrics::OrchestratorMetrics, + process_manager::{Process, ProcessRunner, SingleProcessRunner}, + registry_helper::RegistryHelper, +}; +use ic_config::crypto::CryptoConfig; +use ic_logger::{ReplicaLogger, info}; +use ic_protobuf::registry::subnet::v1::SubnetType; +use ic_types::{RegistryVersion, ReplicaVersion, SubnetId}; +use nix::unistd::Pid; +use std::{collections::HashMap, ffi::OsString, path::PathBuf, sync::Arc}; + +// --------------------------------------------------------------------------- +// ReplicaProcess +// --------------------------------------------------------------------------- + +#[derive(Clone)] +pub(crate) struct ReplicaProcessConfig { + pub ic_binary_dir: PathBuf, + pub cup_path: PathBuf, + pub replica_config_file: PathBuf, +} + +pub(crate) struct ReplicaProcess { + ic_binary_dir: PathBuf, + replica_version: ReplicaVersion, + cup_path: PathBuf, + replica_config_file: PathBuf, + subnet_id: SubnetId, +} + +impl Process for ReplicaProcess { + const NAME: &'static str = "replica"; + type Version = ReplicaVersion; + type Config = ReplicaProcessConfig; + type Args = (ReplicaVersion, SubnetId); + + fn build( + config: &Self::Config, + (replica_version, subnet_id): Self::Args, + ) -> OrchestratorResult { + Ok(Self { + ic_binary_dir: config.ic_binary_dir.clone(), + replica_version, + cup_path: config.cup_path.clone(), + replica_config_file: config.replica_config_file.clone(), + subnet_id, + }) + } + + fn get_version(&self) -> &Self::Version { + &self.replica_version + } + fn get_binary(&self) -> PathBuf { + self.ic_binary_dir.join(Self::NAME) + } + fn get_args(&self) -> Vec { + vec![ + OsString::from("--replica-version"), + self.replica_version.to_string().into(), + OsString::from("--config-file"), + self.replica_config_file.clone().into(), + OsString::from("--catch-up-package"), + self.cup_path.clone().into(), + OsString::from("--force-subnet"), + self.subnet_id.to_string().into(), + ] + } + fn get_env(&self) -> HashMap { + HashMap::new() + } +} + +// --------------------------------------------------------------------------- +// IcBoundaryProcess +// --------------------------------------------------------------------------- + +#[derive(Clone)] +pub(crate) struct IcBoundaryProcessConfig { + pub ic_binary_dir: PathBuf, + pub ic_boundary_env_file: PathBuf, + pub crypto_config: CryptoConfig, +} + +pub(crate) struct IcBoundaryProcess { + ic_binary_dir: PathBuf, + replica_version: ReplicaVersion, + domain_name: String, + crypto_config: String, + env: HashMap, +} + +impl Process for IcBoundaryProcess { + const NAME: &'static str = "ic-boundary"; + type Version = ReplicaVersion; + type Config = IcBoundaryProcessConfig; + type Args = (ReplicaVersion, String); + + fn build( + config: &Self::Config, + (replica_version, domain_name): Self::Args, + ) -> OrchestratorResult { + let env = match env_file_reader::read_file(&config.ic_boundary_env_file) { + Ok(env) => env + .into_iter() + .map(|(k, v)| (OsString::from(k), OsString::from(v))) + .collect(), + Err(e) => { + return Err(OrchestratorError::IoError( + "unable to read ic-boundary environment variables".to_string(), + e, + )); + } + }; + let crypto_config = serde_json::to_string(&config.crypto_config) + .map_err(OrchestratorError::SerializeCryptoConfigError)?; + + Ok(Self { + ic_binary_dir: config.ic_binary_dir.clone(), + replica_version, + domain_name, + crypto_config, + env, + }) + } + + fn get_version(&self) -> &Self::Version { + &self.replica_version + } + fn get_binary(&self) -> PathBuf { + self.ic_binary_dir.join(Self::NAME) + } + fn get_args(&self) -> Vec { + vec![ + OsString::from("--tls-hostname"), + self.domain_name.clone().into(), + OsString::from("--crypto-config"), + self.crypto_config.clone().into(), + ] + } + fn get_env(&self) -> HashMap { + self.env.clone() + } +} + +// --------------------------------------------------------------------------- +// IcGatewayProcess +// --------------------------------------------------------------------------- + +#[derive(Clone)] +pub(crate) struct IcGatewayProcessConfig { + pub ic_binary_dir: PathBuf, + pub ic_gateway_env_file: PathBuf, +} + +pub(crate) struct IcGatewayProcess { + ic_binary_dir: PathBuf, + replica_version: ReplicaVersion, + env: HashMap, +} + +impl Process for IcGatewayProcess { + const NAME: &'static str = "ic-gateway"; + type Version = ReplicaVersion; + type Config = IcGatewayProcessConfig; + type Args = ReplicaVersion; + + fn build(config: &Self::Config, replica_version: Self::Args) -> OrchestratorResult { + let env = match env_file_reader::read_file(&config.ic_gateway_env_file) { + Ok(env) => env + .into_iter() + .map(|(k, v)| (OsString::from(k), OsString::from(v))) + .collect(), + Err(e) => { + return Err(OrchestratorError::IoError( + "unable to read ic-gateway environment variables".to_string(), + e, + )); + } + }; + + Ok(Self { + ic_binary_dir: config.ic_binary_dir.clone(), + replica_version, + env, + }) + } + + fn get_version(&self) -> &Self::Version { + &self.replica_version + } + fn get_binary(&self) -> PathBuf { + self.ic_binary_dir.join(Self::NAME) + } + fn get_args(&self) -> Vec { + vec![] + } + fn get_env(&self) -> HashMap { + self.env.clone() + } +} + +// --------------------------------------------------------------------------- +// ProcessManager

+// +// This struct offers common boilerplate functionality logic to ensure a process +// is running and to stop it, converting errors to [`OrchestratorError`], logging +// them, and updating metrics. +// --------------------------------------------------------------------------- + +pub(crate) struct ProcessManager { + process_runner: Box + Sync>, + process_config: P::Config, + metrics: Arc, + logger: ReplicaLogger, +} + +impl ProcessManager

{ + /// Used in tests to inject a mock ProcessRunner. + #[cfg(test)] + pub(crate) fn new_for_test( + process_runner: Box + Sync>, + process_config: P::Config, + metrics: Arc, + logger: ReplicaLogger, + ) -> Self { + Self { + process_runner, + process_config, + metrics, + logger, + } + } + + pub(crate) fn new( + process_config: P::Config, + metrics: Arc, + logger: ReplicaLogger, + ) -> Self { + let process_runner = Box::new(SingleProcessRunner::new(logger.clone())); + Self { + process_config, + process_runner, + metrics, + logger, + } + } + + pub(crate) fn ensure_running(&mut self, args: P::Args) -> OrchestratorResult<()> { + if self.process_runner.is_running() { + return Ok(()); + } + + let process = P::build(&self.process_config, args)?; + info!(self.logger, "Starting new {} process", P::NAME); + self.metrics + .processes_start_attempts + .with_label_values(&[P::NAME]) + .inc(); + self.process_runner.start(process).map_err(|e| { + OrchestratorError::IoError( + format!("Error when attempting to start {} process", P::NAME), + e, + ) + }) + } + + pub(crate) fn stop(&mut self) -> OrchestratorResult<()> { + if !self.process_runner.is_running() { + return Ok(()); + } + + info!(self.logger, "Stopping {} process", P::NAME); + self.metrics + .processes_stop_attempts + .with_label_values(&[P::NAME]) + .inc(); + self.process_runner.stop().map_err(|e| { + OrchestratorError::IoError( + format!("Error when attempting to stop the {} process", P::NAME), + e, + ) + }) + } +} + +// --------------------------------------------------------------------------- +// IcBoundaryManager +// +// Wrapper around ProcessManager which contains additional +// logic to stop and restart the process when the node's domain name changes +// in the registry. +// --------------------------------------------------------------------------- + +pub(crate) struct IcBoundaryManager { + inner: ProcessManager, + registry: Arc, + current_domain_name: Option, +} + +impl IcBoundaryManager { + pub(crate) fn new( + config: ::Config, + registry: Arc, + metrics: Arc, + logger: ReplicaLogger, + ) -> Self { + let inner = ProcessManager::new(config, metrics, logger); + Self { + inner, + registry, + current_domain_name: None, + } + } + + // Used in tests to inject a mock ProcessManager. + #[cfg(test)] + pub(crate) fn new_for_test( + inner: ProcessManager, + registry: Arc, + ) -> Self { + Self { + inner, + registry, + current_domain_name: None, + } + } + + pub(crate) fn ensure_ic_boundary_running_and_restarted_on_domain_change( + &mut self, + replica_version: ReplicaVersion, + registry_version: RegistryVersion, + ) -> OrchestratorResult<()> { + let domain_name = match self.registry.get_node_domain_name(registry_version) { + Ok(domain_name) => domain_name, + Err(err @ OrchestratorError::DomainNameMissingError(_, _)) => { + // ic-boundary should not start when the node doesn't have a domain name + self.inner.stop()?; + + // Only clear the current domain name if we successfully stopped ic-boundary, so + // that we correctly detect we should first retry to stop it in case we get a new + // domain name in a next call. + self.current_domain_name = None; + return Err(err); + } + Err(err) => return Err(err), + }; + + // stop ic-boundary when the domain name changes and start it again. + if Some(&domain_name) != self.current_domain_name.as_ref() { + self.inner.stop()?; + } + + // make sure ic-boundary is running + self.inner + .ensure_running((replica_version, domain_name.clone()))?; + + // Only update the current domain name if we performed the operations above successfully, + // so that we can retry on the next call if not. + self.current_domain_name = Some(domain_name); + Ok(()) + } + + pub(crate) fn stop(&mut self) -> OrchestratorResult<()> { + self.inner.stop() + } +} + +// --------------------------------------------------------------------------- +// MultipleProcessesManager +// +// This struct manages all processes that the upgrade loop is responsible for, +// providing a single entry point for starting and stopping them according to +// the node's configuration in the registry. +// --------------------------------------------------------------------------- + +/// Whether the orchestrator is currently allowed to actually launch +/// `ic-gateway`. CloudEngine nodes *should* run `ic-gateway`, but the launch +/// is gated off for now while the rollout is being prepared. To trigger it +/// later, flip this to `true` (and re-enable the `cloud_engine_ic_gateway_test` +/// system test by removing its `manual` tag). +const IC_GATEWAY_LAUNCH_ENABLED: bool = false; + +pub(crate) struct MultipleProcessesManager { + replica_manager: ProcessManager, + ic_gateway_manager: ProcessManager, + registry: Arc, + /// Whether this manager is allowed to actually launch `ic-gateway`. + /// Sourced from [`IC_GATEWAY_LAUNCH_ENABLED`] in production; injected by + /// tests so they can exercise both gate states. + ic_gateway_launch_enabled: bool, +} + +impl MultipleProcessesManager { + #[cfg(test)] + pub(crate) fn new_for_test( + replica_manager: ProcessManager, + ic_gateway_manager: ProcessManager, + registry: Arc, + ic_gateway_launch_enabled: bool, + ) -> Self { + Self { + replica_manager, + ic_gateway_manager, + registry, + ic_gateway_launch_enabled, + } + } + + pub(crate) fn new( + replica_process_config: ReplicaProcessConfig, + ic_gateway_process_config: IcGatewayProcessConfig, + registry: Arc, + metrics: Arc, + logger: ReplicaLogger, + ) -> Self { + let replica_manager = + ProcessManager::new(replica_process_config, metrics.clone(), logger.clone()); + let ic_gateway_manager = ProcessManager::new(ic_gateway_process_config, metrics, logger); + + Self { + replica_manager, + ic_gateway_manager, + registry, + ic_gateway_launch_enabled: IC_GATEWAY_LAUNCH_ENABLED, + } + } + + // Used in tests to assert the state of the managed processes. + #[cfg(test)] + pub(crate) fn is_replica_running(&self) -> bool { + self.replica_manager.process_runner.is_running() + } + + // Used in tests to assert the state of the managed processes. + #[cfg(test)] + pub(crate) fn is_ic_gateway_running(&self) -> bool { + self.ic_gateway_manager.process_runner.is_running() + } + + pub(crate) fn get_replica_pid(&self) -> Option { + self.replica_manager.process_runner.get_pid() + } + + pub(crate) fn get_ic_gateway_pid(&self) -> Option { + self.ic_gateway_manager.process_runner.get_pid() + } + + /// Start all processes appropriate for this node. + /// If a process fails to start, continue starting the others and return the first error. + /// + /// Always starts the replica. For cloud-engine subnet nodes it also + /// starts ic-gateway. + pub(crate) fn start_all( + &mut self, + replica_version: ReplicaVersion, + subnet_id: SubnetId, + registry_version: RegistryVersion, + ) -> OrchestratorResult<()> { + let mut result = Ok(()); + result = result.and( + self.replica_manager + .ensure_running((replica_version.clone(), subnet_id)), + ); + + // Cloud-engine nodes run ic-gateway as a sidecar, but only once the + // launch is enabled (see `IC_GATEWAY_LAUNCH_ENABLED`). Until then, + // ignore it. + if self.ic_gateway_launch_enabled { + match self.registry.get_subnet_type(subnet_id, registry_version)? { + None + | Some(SubnetType::Unspecified) + | Some(SubnetType::Application) + | Some(SubnetType::System) + | Some(SubnetType::VerifiedApplication) => { + result = result.and(self.ic_gateway_manager.stop()); + } + Some(SubnetType::CloudEngine) => { + result = result.and(self.ic_gateway_manager.ensure_running(replica_version)); + } + } + } + + result + } + + /// Stop the replica process. + pub(crate) fn stop_replica(&mut self) -> OrchestratorResult<()> { + self.replica_manager.stop() + } + + /// Stop every managed process in reverse order of startup + /// If a process fails to stop, continue stopping the others and return the first error. + pub(crate) fn stop_all(&mut self) -> OrchestratorResult<()> { + let mut result = Ok(()); + if self.ic_gateway_launch_enabled { + result = result.and(self.ic_gateway_manager.stop()); + } + result = result.and(self.replica_manager.stop()); + + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use assert_matches::assert_matches; + use ic_logger::no_op_logger; + use ic_metrics::MetricsRegistry; + use ic_registry_client_fake::FakeRegistryClient; + use ic_registry_client_helpers::node_operator::NodeRecord; + use ic_registry_keys::make_node_record_key; + use ic_registry_proto_data_provider::ProtoRegistryDataProvider; + use ic_test_utilities_types::ids::NODE_1; + use std::{path::Path, sync::Mutex}; + use tempfile::tempdir; + + const REPLICA_VERSION: &str = "replica_version_0.1"; + + /// Counters recorded by [`RecordingRunner`], so tests can assert whether + /// (and how often) the managed process was started/stopped. + #[derive(Default)] + struct RunnerLog { + running: bool, + starts: usize, + stops: usize, + } + + /// A `ProcessRunner` fake that records start/stop calls instead of spawning. + struct RecordingRunner { + log: Arc>, + } + + impl ProcessRunner

for RecordingRunner { + fn start(&mut self, _process: P) -> std::io::Result<()> { + let mut log = self.log.lock().unwrap(); + log.running = true; + log.starts += 1; + Ok(()) + } + + fn stop(&mut self) -> std::io::Result<()> { + let mut log = self.log.lock().unwrap(); + log.running = false; + log.stops += 1; + Ok(()) + } + + fn is_running(&self) -> bool { + self.log.lock().unwrap().running + } + + fn get_pid(&self) -> Option { + self.log + .lock() + .unwrap() + .running + .then_some(Pid::from_raw(12345)) + } + } + + /// Builds a registry whose node record for `NODE_1` carries the given domain + /// at each listed registry version (`None` means "no domain"). + fn registry_with_node_domains(domains: &[(u64, Option<&str>)]) -> Arc { + let data_provider = Arc::new(ProtoRegistryDataProvider::new()); + for &(version, domain) in domains { + data_provider + .add( + &make_node_record_key(NODE_1), + RegistryVersion::from(version), + Some(NodeRecord { + domain: domain.map(str::to_string), + ..Default::default() + }), + ) + .unwrap(); + } + let registry_client = Arc::new(FakeRegistryClient::new(data_provider)); + registry_client.update_to_latest_version(); + Arc::new(RegistryHelper::new(NODE_1, registry_client, no_op_logger())) + } + + /// Builds an [`IcBoundaryManager`] backed by a [`RecordingRunner`], returning + /// the manager and a handle to the runner's log. + fn ic_boundary_manager_for_test( + registry: Arc, + dir: &Path, + ) -> (IcBoundaryManager, Arc>) { + let log = Arc::new(Mutex::new(RunnerLog::default())); + let runner = Box::new(RecordingRunner { log: log.clone() }); + let env_file = dir.join("ic-boundary.env"); + std::fs::write(&env_file, b"TEST_KEY=TEST_VALUE").unwrap(); + let config = IcBoundaryProcessConfig { + ic_binary_dir: dir.to_path_buf(), + ic_boundary_env_file: env_file, + crypto_config: CryptoConfig::default(), + }; + let inner = ProcessManager::new_for_test( + runner, + config, + Arc::new(OrchestratorMetrics::new(&MetricsRegistry::new())), + no_op_logger(), + ); + let manager = IcBoundaryManager::new_for_test(inner, registry); + (manager, log) + } + + fn ensure(manager: &mut IcBoundaryManager, registry_version: u64) -> OrchestratorResult<()> { + manager.ensure_ic_boundary_running_and_restarted_on_domain_change( + ReplicaVersion::try_from(REPLICA_VERSION).unwrap(), + RegistryVersion::from(registry_version), + ) + } + + #[test] + fn ic_boundary_not_started_when_node_has_no_domain() { + let dir = tempdir().unwrap(); + let registry = registry_with_node_domains(&[(1, None)]); + let (mut manager, log) = ic_boundary_manager_for_test(registry, dir.path()); + + assert_matches!( + ensure(&mut manager, 1), + Err(OrchestratorError::DomainNameMissingError(_, _)) + ); + + let log = log.lock().unwrap(); + assert!(!log.running); + assert_eq!(log.starts, 0); + assert_eq!(log.stops, 0); + assert_eq!(manager.current_domain_name, None); + } + + #[test] + fn ic_boundary_starts_when_node_has_domain() { + let dir = tempdir().unwrap(); + let registry = registry_with_node_domains(&[(1, Some("api1.example.com"))]); + let (mut manager, log) = ic_boundary_manager_for_test(registry, dir.path()); + + ensure(&mut manager, 1).expect("ic-boundary should have started successfully"); + + let log = log.lock().unwrap(); + assert!(log.running); + assert_eq!(log.starts, 1); + assert_eq!(log.stops, 0); + assert_eq!( + manager.current_domain_name.as_deref(), + Some("api1.example.com") + ); + } + + #[test] + fn ic_boundary_not_restarted_when_domain_unchanged() { + let dir = tempdir().unwrap(); + let registry = registry_with_node_domains(&[(1, Some("api1.example.com"))]); + let (mut manager, log) = ic_boundary_manager_for_test(registry, dir.path()); + + ensure(&mut manager, 1).expect("ic-boundary should have started successfully"); + ensure(&mut manager, 1).expect("ic-boundary should have started successfully"); + + let log = log.lock().unwrap(); + assert!(log.running); + // Started once on the first call; the second call must not restart it. + assert_eq!(log.starts, 1); + assert_eq!(log.stops, 0); + assert_eq!( + manager.current_domain_name.as_deref(), + Some("api1.example.com") + ); + } + + #[test] + fn ic_boundary_restarted_when_domain_changes() { + let dir = tempdir().unwrap(); + let registry = registry_with_node_domains(&[ + (1, Some("api1.example.com")), + (2, Some("api2.example.com")), + ]); + let (mut manager, log) = ic_boundary_manager_for_test(registry, dir.path()); + + ensure(&mut manager, 1).expect("ic-boundary should have started successfully"); + ensure(&mut manager, 2).expect("ic-boundary should have started successfully"); + + let log = log.lock().unwrap(); + assert!(log.running); + // Restart on domain change: stopped once, started twice. + assert_eq!(log.starts, 2); + assert_eq!(log.stops, 1); + assert_eq!( + manager.current_domain_name.as_deref(), + Some("api2.example.com") + ); + } + + #[test] + fn ic_boundary_stopped_when_domain_is_deleted() { + let dir = tempdir().unwrap(); + let registry = registry_with_node_domains(&[(1, Some("api1.example.com")), (2, None)]); + let (mut manager, log) = ic_boundary_manager_for_test(registry, dir.path()); + + // Running with a domain ... + ensure(&mut manager, 1).expect("ic-boundary should have started successfully"); + assert!(log.lock().unwrap().running); + + // ... then the domain is removed: ic-boundary must be stopped. + assert_matches!( + ensure(&mut manager, 2), + Err(OrchestratorError::DomainNameMissingError(_, _)) + ); + + let log = log.lock().unwrap(); + assert!(!log.running); + assert_eq!(log.starts, 1); + assert_eq!(log.stops, 1); + assert_eq!(manager.current_domain_name, None); + } +} diff --git a/rs/orchestrator/src/registry_helper.rs b/rs/orchestrator/src/registry_helper.rs index 848c4fac4362..dabc18b6ece8 100644 --- a/rs/orchestrator/src/registry_helper.rs +++ b/rs/orchestrator/src/registry_helper.rs @@ -214,6 +214,16 @@ impl RegistryHelper { .map_err(OrchestratorError::RegistryClientError) } + pub(crate) fn get_subnet_type( + &self, + subnet_id: SubnetId, + version: RegistryVersion, + ) -> OrchestratorResult> { + self.registry_client + .get_subnet_type(subnet_id, version) + .map_err(OrchestratorError::RegistryClientError) + } + /// Get the replica version of the given subnet in the given registry /// version pub(crate) fn get_replica_version( @@ -375,11 +385,11 @@ impl RegistryHelper { pub(crate) fn get_node_domain_name( &self, version: RegistryVersion, - ) -> OrchestratorResult> { + ) -> OrchestratorResult { let result = self .registry_client .get_node_record(self.node_id, version)? .and_then(|node_record| node_record.domain); - Ok(result) + result.ok_or_else(|| OrchestratorError::DomainNameMissingError(self.node_id, version)) } } diff --git a/rs/orchestrator/src/upgrade.rs b/rs/orchestrator/src/upgrade.rs index bf44008d01b7..1317fb7da090 100644 --- a/rs/orchestrator/src/upgrade.rs +++ b/rs/orchestrator/src/upgrade.rs @@ -3,7 +3,7 @@ use crate::{ error::{OrchestratorError, OrchestratorResult}, metrics::OrchestratorMetrics, orchestrator::SubnetAssignment, - process_manager::{Process, ProcessManager}, + processes::MultipleProcessesManager, registry_helper::RegistryHelper, }; use async_trait::async_trait; @@ -31,10 +31,9 @@ use ic_types::{ }, }; use std::{ - collections::{BTreeMap, HashMap}, - ffi::OsString, - path::{Path, PathBuf}, - sync::{Arc, Mutex, RwLock}, + collections::BTreeMap, + path::PathBuf, + sync::{Arc, RwLock}, time::{Duration, Instant}, }; @@ -59,34 +58,6 @@ pub(crate) enum OrchestratorControlFlow { Stop, } -pub struct ReplicaProcess { - version: ReplicaVersion, - binary: PathBuf, - args: Vec, -} - -impl Process for ReplicaProcess { - const NAME: &'static str = "Replica"; - - type Version = ReplicaVersion; - - fn get_version(&self) -> &Self::Version { - &self.version - } - - fn get_binary(&self) -> &Path { - &self.binary - } - - fn get_args(&self) -> &[OsString] { - &self.args - } - - fn get_env(&self) -> HashMap { - HashMap::new() - } -} - /// Trait for the registry replicator used by the upgrade module. #[async_trait] #[cfg_attr(test, mockall::automock)] @@ -118,13 +89,12 @@ impl RegistryReplicatorForUpgrade for RegistryReplicator { pub(crate) struct Upgrade { pub registry: Arc, pub metrics: Arc, - replica_process: Arc>>, + processes_manager: Arc>, manageboot_runner: Box, cup_provider: CatchUpPackageProvider, subnet_assignment: Arc>, replica_version: ReplicaVersion, replica_config_file: PathBuf, - pub ic_binary_dir: PathBuf, pub image_path: PathBuf, registry_replicator: Arc, init_time: Instant, @@ -141,14 +111,13 @@ impl Upgrade { pub(crate) async fn new( registry: Arc, metrics: Arc, - replica_process: Arc>>, + processes_manager: Arc>, manageboot_runner: Box, cup_provider: CatchUpPackageProvider, subnet_assignment: Arc>, replica_version: ReplicaVersion, replica_config_file: PathBuf, node_id: NodeId, - ic_binary_dir: PathBuf, registry_replicator: Arc, release_content_dir: PathBuf, logger: ReplicaLogger, @@ -160,14 +129,13 @@ impl Upgrade { let value = Self { registry, metrics, - replica_process, + processes_manager, manageboot_runner, cup_provider, subnet_assignment, node_id, replica_version, replica_config_file, - ic_binary_dir, image_path: release_content_dir.join("image.bin"), registry_replicator, init_time, @@ -207,8 +175,8 @@ impl Upgrade { /// 2. Detecting if a recovery is taking place (i.e. there is a CUP in the registry with higher /// height than any available). /// 3. Downloading and upgrading to a new replica version if necessary. - /// 4. Launching the replica process if assigned to a subnet. - /// 5. Stopping the replica process and removing the node state if leaving the subnet. + /// 4. Launching the child processes if assigned to a subnet. + /// 5. Stopping the child processes and removing the node state if leaving the subnet. pub(crate) async fn check(&mut self) -> OrchestratorResult { let latest_registry_version = self.registry.get_latest_version(); @@ -357,7 +325,7 @@ impl Upgrade { // We are no longer part of the subnet. *self.subnet_assignment.write().unwrap() = SubnetAssignment::Unassigned; - self.stop_replica()?; + self.stop_children()?; self.remove_state().await.inspect_err(|_| { self.metrics.critical_error_state_removal_failed.inc(); @@ -402,8 +370,12 @@ impl Upgrade { // If it is, we restart to pass the unsigned CUP to consensus. self.stop_replica_if_new_recovery_cup(&latest_cup, old_cup_height); - // This will start a new replica process if none is running. - self.ensure_replica_is_running(&self.replica_version, subnet_id)?; + // This will start new child processes if any of them is not running + self.ensure_children_are_running( + self.replica_version.clone(), + subnet_id, + latest_registry_version, + )?; // This will trigger an image download if one is already scheduled but we did // not arrive at the corresponding CUP yet. @@ -456,10 +428,10 @@ impl Upgrade { ) .await .map_err(OrchestratorError::FileDownloadError)?; - if let Err(e) = self.stop_replica() { - // Even though we fail to stop the replica, we should still + if let Err(e) = self.stop_children() { + // Even though we fail to stop child processes, we should still // replace the registry local store, so we simply issue a warning. - warn!(self.logger, "Failed to stop replica with error {:?}", e); + warn!(self.logger, "Failed to stop children with error {:?}", e); } let new_local_store = LocalStoreImpl::new(local_store_location); self.registry_replicator @@ -551,16 +523,6 @@ impl Upgrade { .map(|Rebooting| OrchestratorControlFlow::Stop) } - /// Stop the current replica process. - pub fn stop_replica(&self) -> OrchestratorResult<()> { - self.replica_process.lock().unwrap().stop().map_err(|e| { - OrchestratorError::IoError( - "Error when attempting to stop replica during upgrade".into(), - e, - ) - }) - } - /// Ensure that an upgrade to the given `new_replica_version` should be executed. /// Returns an error if the upgrade should be delayed or blocked, for example due to the new /// replica version being recalled. @@ -632,48 +594,30 @@ impl Upgrade { ); // Restarting the replica is enough to pass the unsigned CUP forward. // If we fail, restart the current process instead. - if let Err(e) = self.stop_replica() { + if let Err(e) = self.processes_manager.write().unwrap().stop_replica() { warn!(self.logger, "Failed to stop replica with error {:?}", e); reexec_current_process(&self.logger); } } } - /// Start the replica process if not running already - fn ensure_replica_is_running( + /// Stop all child processes, including the replica. + pub fn stop_children(&self) -> OrchestratorResult<()> { + self.processes_manager.write().unwrap().stop_all() + } + + /// Start all child processes appropriate for this node. + fn ensure_children_are_running( &self, - replica_version: &ReplicaVersion, + replica_version: ReplicaVersion, subnet_id: SubnetId, + registry_version: RegistryVersion, ) -> OrchestratorResult<()> { - if self.replica_process.lock().unwrap().is_running() { - return Ok(()); - } - info!(self.logger, "Starting new replica process"); - self.metrics.replica_process_start_attempts.inc(); - let cup_path = self.cup_provider.get_cup_path(); - let replica_binary = self.ic_binary_dir.join("replica"); - let cmd = vec![ - format!("--replica-version={}", replica_version.as_ref()).into(), - format!( - "--config-file={}", - self.replica_config_file.as_path().display() - ) - .into(), - format!("--catch-up-package={}", cup_path.as_path().display()).into(), - format!("--force-subnet={}", subnet_id).into(), - ]; - - self.replica_process - .lock() - .unwrap() - .start(ReplicaProcess { - version: replica_version.clone(), - binary: replica_binary, - args: cmd, - }) - .map_err(|e| { - OrchestratorError::IoError("Error when attempting to start new replica".into(), e) - }) + self.processes_manager.write().unwrap().start_all( + replica_version, + subnet_id, + registry_version, + ) } } @@ -1151,6 +1095,11 @@ fn report_master_public_key_changed_metric( mod tests { use crate::catch_up_package_provider::LocalCUPReader; use crate::catch_up_package_provider::tests::mock_tls_config; + use crate::process_manager::{Process, ProcessRunner}; + use crate::processes::{ + IcGatewayProcess, IcGatewayProcessConfig, ProcessManager, ReplicaProcess, + ReplicaProcessConfig, + }; use super::*; use assert_matches::assert_matches; @@ -1174,7 +1123,8 @@ mod tests { use ic_protobuf::registry::subnet::v1::{CatchUpPackageContents, InitialNiDkgTranscriptRecord}; use ic_protobuf::registry::unassigned_nodes_config::v1::UnassignedNodesConfigRecord; use ic_protobuf::registry::{ - replica_version::v1::ReplicaVersionRecord, subnet::v1::SubnetRecord, + replica_version::v1::ReplicaVersionRecord, + subnet::v1::{SubnetRecord, SubnetType}, }; use ic_protobuf::types::v1 as pb; use ic_registry_client_fake::FakeRegistryClient; @@ -1225,17 +1175,30 @@ mod tests { pub fn subnet_assignment(&self) -> SubnetAssignment { *self.subnet_assignment.read().unwrap() } + + pub fn is_replica_running(&self) -> bool { + self.processes_manager.read().unwrap().is_replica_running() + } + + pub fn is_ic_gateway_running(&self) -> bool { + self.processes_manager + .read() + .unwrap() + .is_ic_gateway_running() + } } - pub(crate) struct FakeProcessManager { + /// Fake runner that tracks running state without spawning a real process. + /// Used as a drop-in for `SingleProcessRunner

` inside process managers. + pub(crate) struct FakeProcessRunner { running: bool, } - impl FakeProcessManager { + impl FakeProcessRunner { pub(crate) fn new() -> Self { Self { running: false } } } - impl ProcessManager

for FakeProcessManager { + impl ProcessRunner

for FakeProcessRunner { fn start(&mut self, _process: P) -> std::io::Result<()> { self.running = true; Ok(()) @@ -1464,12 +1427,14 @@ mod tests { data_provider: &ProtoRegistryDataProvider, registry_version: RegistryVersion, subnet_id: SubnetId, + subnet_type: SubnetType, membership: impl AsRef<[NodeId]>, replica_version: &ReplicaVersion, recalled_replica_versions: impl AsRef<[String]>, ) { let subnet_record = SubnetRecordBuilder::new() .with_membership(membership.as_ref()) + .with_subnet_type(subnet_type.try_into().unwrap()) .with_replica_version(replica_version.as_ref()) .with_recalled_replica_version_ids(recalled_replica_versions.as_ref()) .build(); @@ -1510,6 +1475,7 @@ mod tests { ) -> Upgrade { let UpgradeTestScenario { node_id, + subnet_type, current_replica_version, has_local_cup, initial_subnet_assignment, @@ -1526,26 +1492,8 @@ mod tests { let metrics = Arc::new(OrchestratorMetrics::new(&MetricsRegistry::new())); - let ic_binary_dir = dir.join("ic_binary"); - std::fs::create_dir_all(&ic_binary_dir).unwrap(); - - let replica_process = Arc::new(Mutex::new(FakeProcessManager::new())); - // Start the replica process if the test scenario indicates so - if test_scenario.was_replica_process_started_previously() { - replica_process - .lock() - .unwrap() - .start(ReplicaProcess { - version: current_replica_version.clone(), - binary: ic_binary_dir.join("replica"), - args: vec![], - }) - .unwrap(); - } - - let manageboot_runner = Box::new(FakeManagebootRunner); - let cup_dir = dir.join("cups"); + let cup_path = cup_dir.join("cup.types.v1.CatchUpPackage.pb"); std::fs::create_dir_all(&cup_dir).unwrap(); if let Some(local_cup) = has_local_cup { let cup = make_local_cup( @@ -1554,8 +1502,7 @@ mod tests { local_cup.registry_version, ); let cup_proto = pb::CatchUpPackage::from(cup); - let cup_file = cup_dir.join("cup.types.v1.CatchUpPackage.pb"); - std::fs::write(&cup_file, cup_proto.encode_to_vec()).unwrap(); + std::fs::write(&cup_path, cup_proto.encode_to_vec()).unwrap(); } let cup_provider = CatchUpPackageProvider::new( registry.clone(), @@ -1566,9 +1513,66 @@ mod tests { node_id, ); - let subnet_assignment = Arc::new(RwLock::new(initial_subnet_assignment)); - let replica_config_file = dir.join("ic.json5"); + let ic_binary_dir = dir.join("ic_binary"); + std::fs::create_dir_all(&ic_binary_dir).unwrap(); + let ic_gateway_env_file = dir.join("ic-gateway.env"); + std::fs::write(&ic_gateway_env_file, b"TEST_KEY=TEST_VALUE").unwrap(); + + let mut replica_runner = Box::new(FakeProcessRunner::new()); + let replica_process_config = ReplicaProcessConfig { + ic_binary_dir: ic_binary_dir.clone(), + cup_path, + replica_config_file: replica_config_file.clone(), + }; + let mut ic_gateway_runner = Box::new(FakeProcessRunner::new()); + let ic_gateway_process_config = IcGatewayProcessConfig { + ic_binary_dir, + ic_gateway_env_file, + }; + // Start the child processes if the test scenario indicates so + if test_scenario.were_child_processes_started_previously() { + replica_runner + .start( + ReplicaProcess::build( + &replica_process_config, + (current_replica_version.clone(), SUBNET_1), + ) + .unwrap(), + ) + .unwrap(); + if matches!(subnet_type, SubnetType::CloudEngine) { + ic_gateway_runner + .start( + IcGatewayProcess::build( + &ic_gateway_process_config, + current_replica_version.clone(), + ) + .unwrap(), + ) + .unwrap(); + } + } + let processes_manager = Arc::new(RwLock::new(MultipleProcessesManager::new_for_test( + ProcessManager::new_for_test( + replica_runner, + replica_process_config, + Arc::clone(&metrics), + logger.clone(), + ), + ProcessManager::new_for_test( + ic_gateway_runner, + ic_gateway_process_config, + Arc::clone(&metrics), + logger.clone(), + ), + Arc::clone(®istry), + /* ic_gateway_launch_enabled */ true, + ))); + + let manageboot_runner = Box::new(FakeManagebootRunner); + + let subnet_assignment = Arc::new(RwLock::new(initial_subnet_assignment)); let mut registry_replicator = MockRegistryReplicatorForUpgrade::new(); registry_replicator @@ -1597,14 +1601,13 @@ mod tests { let mut upgrade_loop = Upgrade::new( registry, metrics, - replica_process, + processes_manager, manageboot_runner, cup_provider, subnet_assignment, - current_replica_version.clone(), + current_replica_version, replica_config_file, node_id, - ic_binary_dir, Arc::new(registry_replicator), release_content_dir, logger, @@ -1686,6 +1689,8 @@ mod tests { struct UpgradeTestScenario { // Node id of the node under test node_id: NodeId, + // Subnet type of the node under test + subnet_type: SubnetType, // Current replica version of the running orchestrator current_replica_version: ReplicaVersion, // Whether the node is assigned to a subnet (<=> presence of local CUP) @@ -1729,20 +1734,13 @@ mod tests { } // Starting with an `Assigned` subnet assignment *and* successfully persisting a local CUP - // should mean that the replica process was started by a previous iteration of the upgrade + // should mean that the child processes were started by a previous iteration of the upgrade // loop. - fn was_replica_process_started_previously(&self) -> bool { + fn were_child_processes_started_previously(&self) -> bool { matches!( self.initial_subnet_assignment, SubnetAssignment::Assigned(_) - ) - && self.has_local_cup.is_some() - // TODO(CON-1630): After mocking the process management, we can remove the condition below. - // For now, we should not start the replica if a recovery CUP exists (with higher height) - // since that would try to stop the replica process, which fails in the test - // environment. - && self.has_registry_cup.as_ref().map(|(cup, _)| cup.height) - <= self.has_local_cup.as_ref().map(|cup| cup.height) + ) && self.has_local_cup.is_some() } // Returns whether the upgrade loop should call @@ -1840,6 +1838,7 @@ mod tests { &data_provider, RegistryVersion::from(1), local_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &self.current_replica_version, &recalled_replica_versions, @@ -1855,6 +1854,7 @@ mod tests { &data_provider, upgrade.registry_version, local_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -1867,6 +1867,7 @@ mod tests { &data_provider, *leaving_registry_version, local_cup.subnet_id, + self.subnet_type, vec![other_node_id], &self.current_replica_version, &recalled_replica_versions, @@ -1880,6 +1881,7 @@ mod tests { &data_provider, *leaving_registry_version, local_cup.subnet_id, + self.subnet_type, vec![other_node_id], &self.current_replica_version, &recalled_replica_versions, @@ -1889,6 +1891,7 @@ mod tests { &data_provider, upgrade.registry_version, local_cup.subnet_id, + self.subnet_type, vec![other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -1903,6 +1906,7 @@ mod tests { &data_provider, *leaving_registry_version, local_cup.subnet_id, + self.subnet_type, vec![other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -1914,6 +1918,7 @@ mod tests { &data_provider, upgrade.registry_version, local_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -1923,6 +1928,7 @@ mod tests { &data_provider, *leaving_registry_version, local_cup.subnet_id, + self.subnet_type, vec![other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -1956,6 +1962,7 @@ mod tests { &data_provider, *registry_cup_registry_version, registry_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &self.current_replica_version, &recalled_replica_versions, @@ -1969,6 +1976,7 @@ mod tests { &data_provider, *registry_cup_registry_version, registry_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &self.current_replica_version, &recalled_replica_versions, @@ -1978,6 +1986,7 @@ mod tests { &data_provider, upgrade.registry_version, registry_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -1990,6 +1999,7 @@ mod tests { &data_provider, *registry_cup_registry_version, registry_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -2007,6 +2017,7 @@ mod tests { &data_provider, *registry_cup_registry_version, registry_cup.subnet_id, + self.subnet_type, vec![self.node_id, other_node_id], &upgrade.replica_version, &recalled_replica_versions, @@ -2384,42 +2395,63 @@ mod tests { } // Returns whether the replica process should be running after the upgrade loop. - // Additionally asserts whether the orchestrator has started a *new* replica process + // Additionally asserts whether the orchestrator has started *new* child processes fn should_replica_process_be_running(&self, logs: Vec) -> bool { - let needle_has_started_new_process = "Starting new replica process"; let logs_assert = LogEntriesAssert::assert_that(logs); - let assert_has_started_new_process = || { - logs_assert - .has_only_one_message_containing(&Level::Info, needle_has_started_new_process); + let assert_has_started = |process_name: &str| { + logs_assert.has_only_one_message_containing( + &Level::Info, + &format!("Starting new {} process", process_name), + ); }; - let assert_has_not_started_new_process = || { + let assert_has_not_started = |process_name: &str| { logs_assert.has_exactly_n_messages_containing( 0, &Level::Info, - needle_has_started_new_process, + &format!("Starting new {} process", process_name), ); }; + let assert_has_started_new_processes = || { + assert_has_started(ReplicaProcess::NAME); + if matches!(self.subnet_type, SubnetType::CloudEngine) { + assert_has_started(IcGatewayProcess::NAME); + } + }; + let assert_has_not_started_new_processes = || { + assert_has_not_started(ReplicaProcess::NAME); + assert_has_not_started(IcGatewayProcess::NAME); + }; match &self.has_local_cup { Some(local_cup) => { - // If the initial subnet assignment was already `Assigned`, then the replica - // process should have been started by the previous iteration of the upgrade - // loop and should not be started again. + // If the child processes were started by the previous iteration of the upgrade + // loop, they should not be started again. // Though, if there is a recovery CUP of a higher height than the local CUP, - // then the replica process should be started again to pick up the new CUP. - let assert_has_started_new_process_if_necessary = - || match (&self.has_registry_cup, &self.initial_subnet_assignment) { - (Some((registry_cup, _)), _) - if registry_cup.height >= local_cup.height => - { - assert_has_started_new_process(); - } - (_, SubnetAssignment::Assigned(_)) => { - assert_has_not_started_new_process(); - } - (_, SubnetAssignment::Unassigned | SubnetAssignment::Unknown) => { - assert_has_started_new_process(); - } - }; + // then the *replica* process (not all children) should be started again to pick + // up the new CUP. + let assert_has_started_new_processes_if_necessary = || match ( + &self.has_registry_cup, + self.were_child_processes_started_previously(), + &self.subnet_type, + ) { + (Some((registry_cup, _)), false, SubnetType::CloudEngine) + if registry_cup.height >= local_cup.height => + { + assert_has_started(ReplicaProcess::NAME); + assert_has_started(IcGatewayProcess::NAME); + } + (Some((registry_cup, _)), _, _) + if registry_cup.height >= local_cup.height => + { + assert_has_started(ReplicaProcess::NAME); + assert_has_not_started(IcGatewayProcess::NAME); + } + (_, true, _) => { + assert_has_not_started_new_processes(); + } + (_, false, _) => { + assert_has_started_new_processes(); + } + }; let highest_height_cup = local_cup.max_height(self.has_registry_cup.as_ref().map(|(cup, _)| cup)); @@ -2428,43 +2460,43 @@ mod tests { (None, None) => { // Not leaving, so the replica process should be started only if // necessary - assert_has_started_new_process_if_necessary(); + assert_has_started_new_processes_if_necessary(); true } (None, Some(upgrade)) if highest_height_cup.registry_version < upgrade.registry_version => { // An upgrade is scheduled but the CUP's registry version has not - // reached the upgrade registry version yet, so the replica process + // reached the upgrade registry version yet, so the child processes // should be started only if not already running - assert_has_started_new_process_if_necessary(); + assert_has_started_new_processes_if_necessary(); true } (None, Some(_upgrade)) => { // An upgrade is scheduled and the CUP's registry version has reached // the upgrade registry version. // Regardless of whether the upgrade version was recalled or not, note - // that the implementation does not stop the replica process, it either + // that the implementation does not stop the child processes, it either // returns an error (if recalled) or just issues a reboot. Thus, in this - // unit test, we will assert that the replica process is in the same + // unit test, we will assert that the child processes are in the same // state as before. - assert_has_not_started_new_process(); - self.was_replica_process_started_previously() + assert_has_not_started_new_processes(); + self.were_child_processes_started_previously() } (Some(leaving_registry_version), None) if &highest_height_cup.registry_version < leaving_registry_version => { // The node is leaving the subnet, but the CUP's registry version has - // not reached the leaving registry version yet, so the replica process + // not reached the leaving registry version yet, so the child processes // should be started only if not already running - assert_has_started_new_process_if_necessary(); + assert_has_started_new_processes_if_necessary(); true } (Some(_leaving_registry_version), None) => { // The node is leaving the subnet and the CUP's registry version has // reached the leaving registry version, so we are expected to stop the - // replica process - assert_has_not_started_new_process(); + // child processes + assert_has_not_started_new_processes(); false } (Some(leaving_registry_version), Some(upgrade)) @@ -2473,9 +2505,9 @@ mod tests { < upgrade.registry_version => { // Both leaving and upgrade are scheduled, but the CUP's registry version - // has not reached either of them yet, so the replica process should be + // has not reached either of them yet, so the child processes should be // started only if not already running - assert_has_started_new_process_if_necessary(); + assert_has_started_new_processes_if_necessary(); true } (Some(leaving_registry_version), Some(_upgrade)) @@ -2484,20 +2516,20 @@ mod tests { // An upgrade is scheduled and the CUP's registry version has reached // the upgrade registry version. // Regardless of whether the upgrade version was recalled or not, note - // that the implementation does not stop the replica process, it either + // that the implementation does not stop the child processes, it either // returns an error (if recalled) or just issues a reboot. Thus, in this - // unit test, we will assert that the replica process is in the same + // unit test, we will assert that the child processes is in the same // state as before. - assert_has_not_started_new_process(); - self.was_replica_process_started_previously() + assert_has_not_started_new_processes(); + self.were_child_processes_started_previously() } (Some(_leaving_registry_version), Some(_upgrade)) => { // Both leaving and upgrade are scheduled, and the CUP's registry // version has reached the leaving registry version. Regardless of // whether the upgrade registry version has been reached, leaving the - // subnet takes precedence, and we are expected to stop the replica - // process - assert_has_not_started_new_process(); + // subnet takes precedence, and we are expected to stop the child + // processes + assert_has_not_started_new_processes(); false } } @@ -2505,8 +2537,8 @@ mod tests { None => { match &self.has_registry_cup { None => { - // Being unassigned, the replica process should not be running - assert_has_not_started_new_process(); + // Being unassigned, the child processes should not be running + assert_has_not_started_new_processes(); false } Some((registry_cup, _)) => { @@ -2515,26 +2547,26 @@ mod tests { // But there could be an upgrade scheduled in the meantime match &self.upgrade_to { None => { - // No upgrade is scheduled, so the replica process should be + // No upgrade is scheduled, so the child processes should be // *started* - assert_has_started_new_process(); + assert_has_started_new_processes(); true } Some(upgrade) if registry_cup.registry_version < upgrade.registry_version => { // An upgrade is scheduled but the CUP's registry version has - // not reached the upgrade registry version yet, so the replica - // process should be *started* - assert_has_started_new_process(); + // not reached the upgrade registry version yet, so the child + // processes should be *started* + assert_has_started_new_processes(); true } Some(_upgrade) => { // This scenario can be interpreted as the unassigned node // having a different replica version than the subnet's - // We should upgrade before actually starting the replica - // process (or return early if the version was recalled). - assert_has_not_started_new_process(); + // We should upgrade before actually starting the child + // processes (or return early if the version was recalled). + assert_has_not_started_new_processes(); false } } @@ -2577,8 +2609,8 @@ mod tests { // TODO(CON-1631): introduce distinct enum variants to better compare errors assert!(actual_error.contains(expected_error)); } - _ => { - panic!("Upgrade loop flow result does not match expected flow"); + (actual, expected) => { + panic!("Expected flow result {expected:?}, but got {actual:?}. Logs: {logs:#?}"); } } @@ -2592,8 +2624,8 @@ mod tests { // Check presence/absence of local CUP, including its height, which // tests the recovery case where the recovery CUP would overwrite the // local CUP - let cup_file = tmp_path.join("cups").join("cup.types.v1.CatchUpPackage.pb"); - let local_cup_height = std::fs::read(cup_file) + let cup_path = tmp_path.join("cups").join("cup.types.v1.CatchUpPackage.pb"); + let local_cup_height = std::fs::read(cup_path) .map(|bytes| { CatchUpPackage::try_from(&pb::CatchUpPackage::decode(&bytes[..]).unwrap()) .unwrap() @@ -2610,9 +2642,19 @@ mod tests { // Check whether the replica process is running or not assert_eq!( - upgrade_loop.replica_process.lock().unwrap().is_running(), + upgrade_loop.is_replica_running(), test_scenario.should_replica_process_be_running(logs), ); + if matches!(test_scenario.subnet_type, SubnetType::CloudEngine) { + // For Cloud Engine subnets, ic-gateway should always be running when the replica is + assert_eq!( + upgrade_loop.is_ic_gateway_running(), + upgrade_loop.is_replica_running() + ); + } else { + // For other subnets, ic-gateway should never be running + assert!(!upgrade_loop.is_ic_gateway_running()); + } // Asserting further invariants: // - Consistent flow/subnet assignment: @@ -2648,13 +2690,13 @@ mod tests { // `Assigned` AND (EITHER we are not upgrading OR the replica was // already started beforehand) assert_eq!( - upgrade_loop.replica_process.lock().unwrap().is_running(), + upgrade_loop.is_replica_running(), matches!(new_subnet_assignment, SubnetAssignment::Assigned(_)) && (matches!( flow_result, Ok(OrchestratorControlFlow::Assigned(_)) | Ok(OrchestratorControlFlow::Leaving(_)) - ) || test_scenario.was_replica_process_started_previously()) + ) || test_scenario.were_child_processes_started_previously()) ); // - As an assigned node: if new_subnet_assignment != SubnetAssignment::Unassigned { @@ -2684,6 +2726,7 @@ mod tests { #[tokio::test] async fn test_upgrade_scenarios( #[values(NODE_1)] node_id: NodeId, + #[values(SubnetType::Application, SubnetType::CloudEngine)] subnet_type: SubnetType, #[values(ReplicaVersion::try_from("replica_version_0.1").unwrap())] current_replica_version: ReplicaVersion, #[values( None, @@ -2734,10 +2777,8 @@ mod tests { #[values( None, Some(RegistryVersion::from(5)), - Some(RegistryVersion::from(10)), Some(RegistryVersion::from(50)), - Some(RegistryVersion::from(100)), - Some(RegistryVersion::from(150)) + Some(RegistryVersion::from(100)) )] is_leaving: Option, #[values(false, true)] does_upgrade: bool, @@ -2746,10 +2787,8 @@ mod tests { RegistryVersion::from(3), RegistryVersion::from(5), RegistryVersion::from(10), - RegistryVersion::from(75), RegistryVersion::from(100), - RegistryVersion::from(150), - RegistryVersion::from(175) + RegistryVersion::from(150) )] upgrade_registry_version: RegistryVersion, #[values(false, true)] upgrade_is_recalled: bool, @@ -2764,6 +2803,7 @@ mod tests { let test_scenario = UpgradeTestScenario { node_id, + subnet_type, current_replica_version, has_local_cup, has_registry_cup, @@ -2810,6 +2850,7 @@ mod tests { async fn test_ignore_recalled_versions_if_nns() { let test_scenario = UpgradeTestScenario { node_id: NODE_1, + subnet_type: SubnetType::System, current_replica_version: ReplicaVersion::try_from("replica_version_0.1").unwrap(), has_local_cup: Some(CUPScenario { height: Height::from(100), @@ -2852,6 +2893,7 @@ mod tests { async fn test_ignore_up_to_date_replicator_after_timeout() { let test_scenario = UpgradeTestScenario { node_id: NODE_1, + subnet_type: SubnetType::Application, current_replica_version: ReplicaVersion::try_from("replica_version_0.1").unwrap(), has_local_cup: Some(CUPScenario { height: Height::from(100), diff --git a/rs/tests/consensus/cup_explorer_test.rs b/rs/tests/consensus/cup_explorer_test.rs index c2b7eb32d61c..d5ace6df819d 100644 --- a/rs/tests/consensus/cup_explorer_test.rs +++ b/rs/tests/consensus/cup_explorer_test.rs @@ -219,10 +219,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica is restarted when the orchestrator observes the recovery CUP in the registry - .update_orchestrator_metrics_to_check( - "orchestrator_replica_process_start_attempts_total", - 2, - ) + .update_orchestrator_metrics_to_check("orchestrator_processes_start_attempts_total", 2) .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/orchestrator/BUILD.bazel b/rs/tests/consensus/orchestrator/BUILD.bazel index fe5959af6959..9c2f494b5a33 100644 --- a/rs/tests/consensus/orchestrator/BUILD.bazel +++ b/rs/tests/consensus/orchestrator/BUILD.bazel @@ -119,6 +119,25 @@ system_test_nns( ], ) +system_test( + name = "cloud_engine_ic_gateway_test", + # Excluded from automatic runs while ic-gateway launching is gated off in the + # orchestrator (see IC_GATEWAY_LAUNCH_ENABLED in + # rs/orchestrator/src/processes.rs). Remove this tag when enabling the launch. + tags = ["manual"], + deps = [ + # Keep sorted. + "//rs/registry/subnet_type", + "//rs/tests/driver:ic-system-test-driver", + "//rs/types/types", + "@crate_index//:anyhow", + "@crate_index//:reqwest", + "@crate_index//:serde_cbor", + "@crate_index//:slog", + "@crate_index//:url", + ], +) + system_test_nns( name = "rotate_ecdsa_idkg_key_test", tags = [ diff --git a/rs/tests/consensus/orchestrator/Cargo.toml b/rs/tests/consensus/orchestrator/Cargo.toml index bc7b0870659c..fb10f58c9309 100644 --- a/rs/tests/consensus/orchestrator/Cargo.toml +++ b/rs/tests/consensus/orchestrator/Cargo.toml @@ -27,6 +27,8 @@ ic_consensus_system_test_node_registration_test_common = { path = "../node_regis ic_consensus_system_test_utils = { path = "../utils" } ic_consensus_threshold_sig_system_test_utils = { path = "../tecdsa/utils" } registry-canister = { path = "../../../registry/canister" } +reqwest = { workspace = true } +serde_cbor = { workspace = true } slog = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } @@ -36,6 +38,10 @@ url = { workspace = true } name = "ic-systest-cup-compatibility" path = "cup_compatibility_test.rs" +[[bin]] +name = "ic-systest-cloud-engine-ic-gateway-test" +path = "cloud_engine_ic_gateway_test.rs" + [[bin]] name = "ic-systest-node-reassignment-test" path = "node_reassignment_test.rs" diff --git a/rs/tests/consensus/orchestrator/cloud_engine_ic_gateway_test.rs b/rs/tests/consensus/orchestrator/cloud_engine_ic_gateway_test.rs new file mode 100644 index 000000000000..5a77677c1005 --- /dev/null +++ b/rs/tests/consensus/orchestrator/cloud_engine_ic_gateway_test.rs @@ -0,0 +1,181 @@ +/* tag::catalog[] +Title:: Cloud engine nodes are healthy on port 80 (served by ic-gateway). + +Goal:: +Verify that, for a cloud engine subnet, every node can be asserted healthy by +querying its public API status endpoint (`/api/v2/status`) on port 80 instead of +the replica's own port 8080. + +Background:: +Unlike regular subnets, cloud engine nodes are self-contained: in addition to the +replica, the orchestrator spawns an `ic-gateway` process next to it on the very +same node, which forwards requests from port 80 to the replica's port 8080. + +Runbook:: +0. Set up an IC with one System (NNS) subnet and one cloud engine. +1. For each node in the cloud engine subnet, query `/api/v2/status` on port 80 + and assert that the replica reports `Healthy`. + +Success:: +Every cloud engine node reports a healthy status on port 80. + +end::catalog[] */ + +use anyhow::{Result, bail}; +use ic_registry_subnet_type::SubnetType; +use ic_system_test_driver::driver::group::SystemTestGroup; +use ic_system_test_driver::driver::ic::{InternetComputer, Subnet}; +use ic_system_test_driver::driver::test_env::TestEnv; +use ic_system_test_driver::driver::test_env_api::{ + HasPublicApiUrl, HasTopologySnapshot, IcNodeContainer, IcNodeSnapshot, READY_WAIT_TIMEOUT, + RETRY_BACKOFF, +}; +use ic_system_test_driver::util::block_on; +use ic_system_test_driver::{retry_with_msg_async, systest}; +use ic_types::messages::{HttpStatusResponse, ReplicaHealthStatus}; +use slog::{Logger, info}; +use std::time::Duration; +use url::Url; + +/// Port on which `ic-gateway` exposes the public API of a cloud engine node. +/// +/// Regular replicas serve their public API on port 8080. On cloud engine nodes +/// the orchestrator additionally spawns `ic-gateway` next to the replica, and +/// that process terminates the public API on port 80 (the port opened to the +/// network by the cloud-engine firewall rules). +const IC_GATEWAY_PORT: u16 = 80; + +/// Domain name that all `ic-gateway`s are configured to serve. +const IC_GATEWAY_DOMAIN: &str = "gateway.icp"; + +/// Number of nodes in the cloud engine subnet under test. +const CLOUD_ENGINE_NODES: usize = 4; + +/// Per-request timeout when polling the status endpoint. +const STATUS_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); + +fn setup(env: TestEnv) { + InternetComputer::new() + .with_api_boundary_nodes_playnet(1) + .add_fast_single_node_subnet(SubnetType::System) + .add_subnet(Subnet::fast(SubnetType::CloudEngine, CLOUD_ENGINE_NODES)) + .setup_and_start(&env) + .expect("failed to setup IC under test"); + + env.topology_snapshot().subnets().for_each(|subnet| { + subnet + .nodes() + .for_each(|node| node.await_status_is_healthy().unwrap()) + }); +} + +fn test(env: TestEnv) { + let logger = env.logger(); + let topology = env.topology_snapshot(); + + let cloud_engine_subnet = topology + .subnets() + .find(|subnet| subnet.subnet_type() == SubnetType::CloudEngine) + .expect("the topology must contain a cloud engine subnet"); + + let nodes: Vec = cloud_engine_subnet.nodes().collect(); + assert_eq!( + nodes.len(), + CLOUD_ENGINE_NODES, + "unexpected number of cloud engine nodes" + ); + + block_on(async { + for node in &nodes { + info!( + logger, + "Asserting that cloud engine node {} is healthy on port {} (ic-gateway)", + node.node_id, + IC_GATEWAY_PORT, + ); + // The standard `await_status_is_healthy` targets the replica on port + // 8080. For cloud engine nodes we instead assert health on port 80, + // which is served by the `ic-gateway` instance the orchestrator runs + // next to the replica. + await_healthy_on_ic_gateway(node, &logger) + .await + .unwrap_or_else(|err| { + panic!( + "cloud engine node {} is not healthy on port {}: {err}", + node.node_id, IC_GATEWAY_PORT, + ) + }); + } + }); + + info!( + logger, + "All {} cloud engine nodes are healthy on port {} (ic-gateway)", + nodes.len(), + IC_GATEWAY_PORT, + ); +} + +/// Polls `/api/v2/status` of `node` on [`IC_GATEWAY_PORT`] (the port served by +/// the co-located `ic-gateway`) until the replica reports itself `Healthy`. +/// +/// This mirrors the driver's standard health check (`status_is_healthy`), but +/// retargets it from port 8080 to port 80. +async fn await_healthy_on_ic_gateway(node: &IcNodeSnapshot, logger: &Logger) -> Result<()> { + let status_url = Url::parse(&format!("http://{IC_GATEWAY_DOMAIN}/api/v2/status")) + .expect("failed to parse status URL"); + + retry_with_msg_async!( + format!( + "awaiting healthy status of node {} on port {IC_GATEWAY_PORT}", + node.node_id + ), + logger, + READY_WAIT_TIMEOUT, + RETRY_BACKOFF, + || async { + let response = reqwest::Client::builder() + .timeout(STATUS_REQUEST_TIMEOUT) + .resolve( + IC_GATEWAY_DOMAIN, + (node.get_ip_addr(), IC_GATEWAY_PORT).into(), + ) + .build() + .expect("cannot build a reqwest client") + .get(status_url.clone()) + .send() + .await?; + + let status = response.status(); + let body = response + .bytes() + .await + .expect("failed to convert a response to bytes") + .to_vec(); + if status.is_client_error() || status.is_server_error() { + bail!( + "status check failed with {status}: `{}`", + String::from_utf8_lossy(&body) + ); + } + + let cbor = serde_cbor::from_slice(&body).expect("response is not encoded as cbor"); + let status_response = serde_cbor::value::from_value::(cbor) + .expect("failed to deserialize a response to HttpStatusResponse"); + + match status_response.replica_health_status { + Some(ReplicaHealthStatus::Healthy) => Ok(()), + other => bail!("replica not healthy yet, status: {other:?}"), + } + } + ) + .await +} + +fn main() -> Result<()> { + SystemTestGroup::new() + .with_setup(setup) + .add_test(systest!(test)) + .execute_from_args()?; + Ok(()) +} diff --git a/rs/tests/consensus/orchestrator/node_reassignment_test.rs b/rs/tests/consensus/orchestrator/node_reassignment_test.rs index b13945a3f708..26c16a82ab0d 100644 --- a/rs/tests/consensus/orchestrator/node_reassignment_test.rs +++ b/rs/tests/consensus/orchestrator/node_reassignment_test.rs @@ -360,10 +360,7 @@ fn main() -> Result<()> { .add_test(systest!(test)) // Some nodes change subnets twice in which case the replica process would be started // three times. - .update_orchestrator_metrics_to_check( - "orchestrator_replica_process_start_attempts_total", - 3, - ) + .update_orchestrator_metrics_to_check("orchestrator_processes_start_attempts_total", 3) .execute_from_args()?; Ok(()) diff --git a/rs/tests/consensus/replica_determinism_test.rs b/rs/tests/consensus/replica_determinism_test.rs index 13afa101aad2..39097c02dfa1 100644 --- a/rs/tests/consensus/replica_determinism_test.rs +++ b/rs/tests/consensus/replica_determinism_test.rs @@ -176,10 +176,7 @@ fn main() -> Result<()> { // TODO(DSM-118): The replica may occasionally be started 3 times (instead of the usual 2) if // it crashes again briefly during the catch-up process after the divergence. Consider reducing // this number if the underlying issue has been resolved. - .update_orchestrator_metrics_to_check( - "orchestrator_replica_process_start_attempts_total", - 3, - ) + .update_orchestrator_metrics_to_check("orchestrator_processes_start_attempts_total", 3) // One of the nodes has a corrupted state which could cause a panic in the replica like: // thread 'MR Batch Processor' (1588) panicked at rs/state_manager/src/lib.rs:1036:17: // Unexpected sandbox state for canister ... diff --git a/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_enable_chain_keys_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_enable_chain_keys_test.rs index beb193772642..1ae1902b55fc 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_enable_chain_keys_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_enable_chain_keys_test.rs @@ -12,7 +12,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_test.rs index 601cf91af225..6674a4aa038e 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_test.rs @@ -11,7 +11,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test; CupCorruption::CorruptedIncludingInvalidNiDkgId)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") // The test corrupts the CUPs, so it's expected that the following error metric will be // non-zero. .remove_metrics_to_check("orchestrator_cup_deserialization_failed_total") diff --git a/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_with_chain_keys_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_with_chain_keys_test.rs index 7e2a5eaa3933..06264be5847a 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_with_chain_keys_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_failover_nodes_with_chain_keys_test.rs @@ -13,7 +13,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_large_no_upgrade_with_chain_keys_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_large_no_upgrade_with_chain_keys_test.rs index 3f16021bb298..aff5d13b5d12 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_large_no_upgrade_with_chain_keys_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_large_no_upgrade_with_chain_keys_test.rs @@ -14,7 +14,7 @@ fn main() -> Result<()> { .with_timeout_per_test(Duration::from_secs(50 * 60)) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_enable_chain_keys_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_enable_chain_keys_test.rs index f3fd7327abf5..ea6c785d1bd6 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_enable_chain_keys_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_enable_chain_keys_test.rs @@ -13,7 +13,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test; CupCorruption::NotCorrupted)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_local_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_local_test.rs index a6b2bbb21d48..2f0c93b1acf0 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_local_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_local_test.rs @@ -11,7 +11,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_provision_write_access_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_provision_write_access_test.rs index ddf55e86284d..1e06e1597061 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_provision_write_access_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_provision_write_access_test.rs @@ -11,7 +11,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_test.rs index f60f22b1ee8d..f58420a5e281 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_test.rs @@ -11,7 +11,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_with_chain_keys_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_with_chain_keys_test.rs index 3da5ef6fc587..3b36bb38e043 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_with_chain_keys_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_no_upgrade_with_chain_keys_test.rs @@ -18,7 +18,7 @@ fn main() -> Result<()> { // non-zero. .remove_metrics_to_check("orchestrator_cup_deserialization_failed_total") // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_enable_chain_keys_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_enable_chain_keys_test.rs index 911f483b4046..9591dd26d37c 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_enable_chain_keys_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_enable_chain_keys_test.rs @@ -12,7 +12,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_test.rs index 548910bdc5fb..97249bf7418c 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_test.rs @@ -11,7 +11,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test; CupCorruption::NotCorrupted)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_with_chain_keys_test.rs b/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_with_chain_keys_test.rs index 621aa18ff680..af83521a8e2a 100644 --- a/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_with_chain_keys_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_app_same_nodes_with_chain_keys_test.rs @@ -13,7 +13,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_recovery/sr_nns_failover_nodes_test.rs b/rs/tests/consensus/subnet_recovery/sr_nns_failover_nodes_test.rs index 8f1617c01882..2774957df6df 100644 --- a/rs/tests/consensus/subnet_recovery/sr_nns_failover_nodes_test.rs +++ b/rs/tests/consensus/subnet_recovery/sr_nns_failover_nodes_test.rs @@ -62,7 +62,7 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(test)) // The replica binary is "broken" and restarted by the orchestrator multiple times - .remove_metrics_to_check("orchestrator_replica_process_start_attempts_total") + .remove_metrics_to_check("orchestrator_processes_start_attempts_total") .execute_from_args()?; Ok(()) } diff --git a/rs/tests/consensus/subnet_splitting_test.rs b/rs/tests/consensus/subnet_splitting_test.rs index ded0db973349..737dc849c491 100644 --- a/rs/tests/consensus/subnet_splitting_test.rs +++ b/rs/tests/consensus/subnet_splitting_test.rs @@ -448,9 +448,6 @@ fn main() -> Result<()> { .with_setup(setup) .add_test(systest!(subnet_splitting_test)) // The replica is restarted when the orchestrator observes the recovery CUP in the registry - .update_orchestrator_metrics_to_check( - "orchestrator_replica_process_start_attempts_total", - 2, - ) + .update_orchestrator_metrics_to_check("orchestrator_processes_start_attempts_total", 2) .execute_from_args() } diff --git a/rs/tests/driver/src/driver/group.rs b/rs/tests/driver/src/driver/group.rs index eb5d2855c75d..a177d43f611e 100644 --- a/rs/tests/driver/src/driver/group.rs +++ b/rs/tests/driver/src/driver/group.rs @@ -647,6 +647,8 @@ impl SystemTestSubGroup { } } +// Replica metrics to check by default. Including a prefix is supported and will match on all +// metrics with that prefix. For that reason, keep the list prefix-free. fn default_replica_metrics() -> BTreeMap<&'static str, u64> { BTreeMap::from([ ("critical_errors", 0), @@ -658,12 +660,14 @@ fn default_replica_metrics() -> BTreeMap<&'static str, u64> { ]) } +// Orchestrator metrics to check by default. Including a prefix is supported and will match on all +// metrics with that prefix. For that reason, keep the list prefix-free. fn default_orchestrator_metrics() -> BTreeMap<&'static str, u64> { BTreeMap::from([ ("orchestrator_cup_deserialization_failed_total", 0), ("orchestrator_state_removal_failed_total", 0), ("orchestrator_tasks_failed_total", 0), - ("orchestrator_replica_process_start_attempts_total", 1), + ("orchestrator_processes_start_attempts_total", 1), ]) } diff --git a/rs/tests/driver/src/driver/test_env_api.rs b/rs/tests/driver/src/driver/test_env_api.rs index 7c06d9df1fa3..07e5024c3bc5 100644 --- a/rs/tests/driver/src/driver/test_env_api.rs +++ b/rs/tests/driver/src/driver/test_env_api.rs @@ -1138,10 +1138,21 @@ impl IcNodeSnapshot { self.node_id ); for (name, value) in metrics { - let max_value = metrics_to_check - .get(name.split('(').next().unwrap()) - .copied() - .unwrap_or_default(); + // Assert the metrics to check are prefix-free. This allows to specify a metric name + // prefix to check all metrics with that prefix. + let mut metrics_to_check = metrics_to_check + .iter() + .filter(|(metric_name, _)| name.starts_with(**metric_name)) + .map(|(_, max_value)| *max_value); + let max_value = metrics_to_check.next().unwrap_or_default(); + // Assert that the iterator only had one element, i.e. the metrics to check are + // prefix-free. + assert!( + metrics_to_check.count() == 0, + "The metric `{name}` is not prefix-free with respect to the other metrics to check. \ + This is not allowed. Please specify a prefix-free set of metrics to check." + ); + assert!( value[0] <= max_value, "The metric `{name}` on node {} exceeded the maximum allowed value: \