diff --git a/Cargo.lock b/Cargo.lock index db3d6455f668..46f51827b2ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15565,6 +15565,8 @@ name = "ic-utils-thread" version = "0.9.0" dependencies = [ "crossbeam-channel", + "ic-metrics", + "prometheus", ] [[package]] diff --git a/rs/execution_environment/benches/100k_canisters.rs b/rs/execution_environment/benches/100k_canisters.rs index 7e3dfa01d6fb..bf4a87c441d0 100644 --- a/rs/execution_environment/benches/100k_canisters.rs +++ b/rs/execution_environment/benches/100k_canisters.rs @@ -9,7 +9,10 @@ const NUM_CANISTERS_PER_CREATOR_CANISTER: usize = 10_000; lazy_static::lazy_static! { static ref STATE_MACHINE: Arc> = { - let env = StateMachine::new(); + let mut env = StateMachine::new(); + // Don't wait for the Replicated State metrics thread every round. + env.flush_replicated_state_metrics = false; + let features = []; let wasm = canister_test::Project::cargo_bin_maybe_from_env("canister_creator_canister", &features); diff --git a/rs/execution_environment/src/lib.rs b/rs/execution_environment/src/lib.rs index 659e7fa6f5f5..c4e39a0dbc24 100644 --- a/rs/execution_environment/src/lib.rs +++ b/rs/execution_environment/src/lib.rs @@ -171,7 +171,6 @@ impl ExecutionServices { let scheduler = Box::new(SchedulerImpl::new( subnet_config.scheduler_config, config.embedders_config, - own_subnet_id, Arc::clone(&ingress_history_writer) as Arc<_>, Arc::clone(&execution_environment) as Arc<_>, Arc::clone(&cycles_account_manager), diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 03f70d819226..2621ffb30812 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -33,7 +33,6 @@ use ic_registry_subnet_type::SubnetType; use ic_replicated_state::SubnetSchedule; use ic_replicated_state::canister_state::NextExecution; use ic_replicated_state::canister_state::execution_state::NextScheduledMethod; -use ic_replicated_state::metrics::ReplicatedStateMetrics; use ic_replicated_state::page_map::PageAllocatorFileDescriptor; use ic_replicated_state::{ CanisterState, ExecutionTask, InputQueueType, NetworkTopology, ReplicatedState, @@ -43,7 +42,7 @@ use ic_types::ingress::{IngressState, IngressStatus}; use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage}; use ic_types::{ CanisterId, ComputeAllocation, ExecutionRound, MemoryAllocation, NumBytes, NumInstructions, - NumMessages, NumSlices, Randomness, ReplicaVersion, SubnetId, Time, + NumMessages, NumSlices, Randomness, ReplicaVersion, Time, }; use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles}; use more_asserts::{debug_assert_ge, debug_assert_le, debug_assert_lt}; @@ -141,12 +140,10 @@ impl SchedulerRoundLimits { pub(crate) struct SchedulerImpl { config: SchedulerConfig, hypervisor_config: HypervisorConfig, - own_subnet_id: SubnetId, ingress_history_writer: Arc>, exec_env: Arc, cycles_account_manager: Arc, metrics: Arc, - state_metrics: ReplicatedStateMetrics, log: ReplicaLogger, thread_pool: RefCell, rate_limiting_of_heap_delta: FlagStatus, @@ -159,7 +156,6 @@ impl SchedulerImpl { pub(crate) fn new( config: SchedulerConfig, hypervisor_config: HypervisorConfig, - own_subnet_id: SubnetId, ingress_history_writer: Arc>, exec_env: Arc, cycles_account_manager: Arc, @@ -174,12 +170,10 @@ impl SchedulerImpl { config, hypervisor_config, thread_pool: RefCell::new(scoped_threadpool::Pool::new(scheduler_cores)), - own_subnet_id, ingress_history_writer, exec_env, cycles_account_manager, metrics: Arc::new(SchedulerMetrics::new(metrics_registry)), - state_metrics: ReplicatedStateMetrics::new(metrics_registry), log, rate_limiting_of_heap_delta, rate_limiting_of_instructions, @@ -1100,17 +1094,12 @@ impl SchedulerImpl { } } - self.state_metrics.observe( - self.own_subnet_id, - state, - current_round.get().into(), - logger, - ); - self.check_invariants(state, current_round_type, current_round, logger); } /// Checks the DTS and subnet memory usage invariants at the end of the round. + // + // TODO(DSM-103): Move into ReplicatedStateMetrics. fn check_invariants( &self, state: &ReplicatedState, diff --git a/rs/execution_environment/src/scheduler/test_utilities.rs b/rs/execution_environment/src/scheduler/test_utilities.rs index 78788a0fd1b4..f2e01fbc758f 100644 --- a/rs/execution_environment/src/scheduler/test_utilities.rs +++ b/rs/execution_environment/src/scheduler/test_utilities.rs @@ -42,6 +42,7 @@ use ic_replicated_state::{ ReplicatedState, canister_state::execution_state::{self, WasmExecutionMode, WasmMetadata}, metadata_state::testing::NetworkTopologyTesting, + metrics::ReplicatedStateMetrics, num_bytes_try_from, page_map::TestPageAllocatorFileDescriptorImpl, testing::{CanisterQueuesTesting, ReplicatedStateTesting}, @@ -112,25 +113,26 @@ pub(crate) struct SchedulerTest { round_summary: Option, /// The amount of cycles that new canisters have by default. initial_canister_cycles: Cycles, - /// The id of the user that sends ingress messages. + /// The ID of the user that sends ingress messages. user_id: UserId, - /// The id of a canister that is guaranteed to be xnet. + /// The ID of a canister that is guaranteed to be XNet. xnet_canister_id: CanisterId, /// The actual scheduler. scheduler: SchedulerImpl, /// The fake Wasm executor. wasm_executor: Arc, - /// Registry Execution Settings. registry_settings: RegistryExecutionSettings, - /// Metrics Registry. metrics_registry: MetricsRegistry, - /// Chain key subnet public keys. + /// Replicated state metrics, updated by `SchedulerTest` after every round. + /// + /// These used to be updated by the scheduler, but state instrumentation was + /// moved into the State Manager. + state_metrics: ReplicatedStateMetrics, chain_key_subnet_public_keys: BTreeMap, /// Available pre-signatures. idkg_pre_signatures: BTreeMap, /// Version of the running replica, not the registry's Entry replica_version: ReplicaVersion, - /// Hypervisor config. hypervisor_config: HypervisorConfig, } @@ -169,6 +171,10 @@ impl SchedulerTest { &self.metrics_registry } + pub fn state_metrics(&self) -> &ReplicatedStateMetrics { + &self.state_metrics + } + pub fn canister_state_mut(&mut self, canister_id: CanisterId) -> &mut CanisterState { self.state_mut() .canister_state_make_mut(&canister_id) @@ -595,6 +601,18 @@ impl SchedulerTest { round_type, self.registry_settings(), ); + + // Explicitly observe the Replicated State metrics after every round. This used + // to be done by the scheduler, so there are a number of tests depending on + // various state metrics, but instrumentation was moved into the State Manager. + // So we "manually"update the metrics after every round. + self.state_metrics.observe( + state.metadata.own_subnet_id, + &state, + self.round.get().into(), + &self.scheduler.log, + ); + self.state = Some(state); self.check_invariants(); self.increment_round(); @@ -1078,7 +1096,6 @@ impl SchedulerTestBuilder { let scheduler = SchedulerImpl::new( self.scheduler_config, self.hypervisor_config.clone(), - self.own_subnet_id, execution_services.ingress_history_writer, execution_services.execution_environment, execution_services.cycles_account_manager, @@ -1089,6 +1106,8 @@ impl SchedulerTestBuilder { Arc::new(TestPageAllocatorFileDescriptorImpl::new()), ); + let state_metrics = ReplicatedStateMetrics::new(&self.metrics_registry); + SchedulerTest { state: Some(state), next_canister_id: 0, @@ -1101,6 +1120,7 @@ impl SchedulerTestBuilder { wasm_executor, registry_settings, metrics_registry: self.metrics_registry, + state_metrics, chain_key_subnet_public_keys, idkg_pre_signatures: BTreeMap::new(), replica_version: self.replica_version, diff --git a/rs/execution_environment/src/scheduler/tests/dts.rs b/rs/execution_environment/src/scheduler/tests/dts.rs index 24cf12ebb852..e02a7b43d4ad 100644 --- a/rs/execution_environment/src/scheduler/tests/dts.rs +++ b/rs/execution_environment/src/scheduler/tests/dts.rs @@ -39,8 +39,7 @@ fn dts_long_execution_completes() { ErrorCode::CanisterDidNotReply, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 9.0 @@ -124,8 +123,7 @@ fn cannot_execute_management_message_for_targeted_long_execution_canister() { test.execute_round(ExecutionRoundType::OrdinaryRound); assert_eq!(test.state().subnet_queues().input_queues_message_count(), 1); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 4.0 @@ -143,8 +141,7 @@ fn cannot_execute_management_message_for_targeted_long_execution_canister() { ErrorCode::CanisterDidNotReply, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 9.0 @@ -176,8 +173,7 @@ fn dts_long_execution_runs_out_of_instructions() { ErrorCode::CanisterInstructionLimitExceeded, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 9.0 @@ -509,7 +505,7 @@ fn dts_allow_only_one_long_install_code_execution_at_a_time() { 0.0 ); - let state_metrics = &test.scheduler().state_metrics; + let state_metrics = test.state_metrics(); // 2 rounds because the first canister was paused twice. assert_eq!( state_metrics @@ -549,7 +545,7 @@ fn dts_allow_only_one_long_install_code_execution_at_a_time() { ); // 3 slices for the first canister, 1 slice for the second. assert_eq!(metrics.round.slices.get_sample_sum(), 4.0); - let state_metrics = &test.scheduler().state_metrics; + let state_metrics = test.state_metrics(); // Same 2 rounds of paused install code as above. assert_eq!( state_metrics @@ -593,16 +589,14 @@ fn dts_resume_install_code_after_abort() { // After 1 + 9 rounds we had a paused install code. assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_install_code() .get_sample_sum(), 10.0 ); // After the checkpoint round we had an aborted install code. assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_aborted_install_code() .get_sample_sum(), 1.0 @@ -659,15 +653,13 @@ fn dts_resume_long_execution_after_abort() { ErrorCode::CanisterDidNotReply, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 10.0 ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_aborted_execution() .get_sample_sum(), 1.0 diff --git a/rs/execution_environment/src/scheduler/tests/metrics.rs b/rs/execution_environment/src/scheduler/tests/metrics.rs index cc2227ff80f7..eb264724905d 100644 --- a/rs/execution_environment/src/scheduler/tests/metrics.rs +++ b/rs/execution_environment/src/scheduler/tests/metrics.rs @@ -20,6 +20,7 @@ use ic_management_canister_types_private::{ use ic_registry_routing_table::CanisterIdRange; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::metadata_state::testing::NetworkTopologyTesting; +use ic_replicated_state::metrics::ReplicatedStateMetrics; use ic_replicated_state::testing::SystemStateTesting; use ic_test_utilities_metrics::{ HistogramStats, fetch_counter_vec, fetch_gauge, fetch_gauge_vec, fetch_histogram_stats, @@ -605,7 +606,7 @@ fn long_open_call_context_is_recorded() { test.execute_round(ExecutionRoundType::OrdinaryRound); - let state_metrics = &test.scheduler().state_metrics; + let state_metrics = test.state_metrics(); let gauge = state_metrics .old_open_call_contexts() .get_metric_with_label_values(&["1d"]) @@ -633,8 +634,8 @@ fn threshold_signature_agreements_metric_is_updated() { ]) .build(); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 1.into(), &no_op_logger(), @@ -797,8 +798,8 @@ fn threshold_signature_agreements_metric_is_updated() { test.execute_round(ExecutionRoundType::OrdinaryRound); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 2.into(), &no_op_logger(), @@ -857,8 +858,8 @@ fn consumed_cycles_ecdsa_outcalls_are_added_to_consumed_cycles_total() { let canister_id = test.create_canister(); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -894,8 +895,8 @@ fn consumed_cycles_ecdsa_outcalls_are_added_to_consumed_cycles_total() { .sign_with_ecdsa_contexts(); assert_eq!(sign_with_ecdsa_contexts.len(), 1); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -936,8 +937,8 @@ fn consumed_cycles_http_outcalls_are_added_to_consumed_cycles_total() { test.state_mut().metadata.own_subnet_features.http_requests = true; - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1002,8 +1003,8 @@ fn consumed_cycles_http_outcalls_are_added_to_consumed_cycles_total() { Some(NumBytes::from(response_size_limit)), ); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1130,8 +1131,8 @@ fn consumed_cycles_for_instructions_are_updated_from_valid_canisters() { .system_state .consume_cycles(removed_cycles); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1176,8 +1177,8 @@ fn consumed_cycles_for_resource_allocations_are_updated_from_valid_canisters() { test.advance_time(duration); test.charge_for_resource_allocations(); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1260,8 +1261,8 @@ fn consumed_cycles_are_updated_from_deleted_canisters() { ); test.execute_round(ExecutionRoundType::OrdinaryRound); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), diff --git a/rs/execution_environment/src/scheduler/tests/rate_limiting.rs b/rs/execution_environment/src/scheduler/tests/rate_limiting.rs index 67bd63e67208..eb3b7aa2873e 100644 --- a/rs/execution_environment/src/scheduler/tests/rate_limiting.rs +++ b/rs/execution_environment/src/scheduler/tests/rate_limiting.rs @@ -355,7 +355,7 @@ fn no_heap_delta_rate_limiting_for_system_subnet() { // Assert that we reached the subnet heap delta capacity (140 GiB) in 70 rounds. assert_ge!( - test.scheduler().state_metrics.current_heap_delta(), + test.state_metrics().current_heap_delta(), SUBNET_HEAP_DELTA_CAPACITY ); diff --git a/rs/execution_environment/tests/canister_logging.rs b/rs/execution_environment/tests/canister_logging.rs index 7fd9c7865ad5..88085ca5af7c 100644 --- a/rs/execution_environment/tests/canister_logging.rs +++ b/rs/execution_environment/tests/canister_logging.rs @@ -1821,8 +1821,8 @@ fn test_metric_canister_log_retention_seconds() { // Advance simulated time and append another record. env.advance_time(TIME_ADVANCE); let _ = env.execute_ingress(canister_id, "test", vec![]); - let after = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); + assert_eq!(after.count, before.count + 1); let sample = after.sum - before.sum; // The new sample should report at least the advanced wall-clock gap. diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index 50f2924b3c55..fabcb1120f69 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -1507,6 +1507,9 @@ impl BatchProcessor for BatchProcessorImpl, pub registry_client: Arc, pub state_manager: Arc, + /// Whether to flush the replicated state metrics channel after each round. + /// Defaults to `true` but can be overridden, e.g. for benchmarks. + pub flush_replicated_state_metrics: bool, consensus_time: Arc, ingress_pool: Arc>, ingress_manager: Arc, @@ -2332,6 +2336,7 @@ impl StateMachine { registry_data_provider, registry_client: registry_client.clone(), state_manager, + flush_replicated_state_metrics: true, consensus_time, ingress_pool, ingress_manager: ingress_manager.clone(), @@ -3074,6 +3079,11 @@ impl StateMachine { } assert_eq!(self.state_manager.latest_state_height(), batch_number); + // Wait until the enqueued `ReplicatedStateMetrics::observe()` call has been + // processed by the background metrics thread. + if self.flush_replicated_state_metrics { + self.state_manager.flush_metrics_channel(); + } self.check_critical_errors(); self.set_time(time_of_next_round.into()); diff --git a/rs/state_manager/src/lib.rs b/rs/state_manager/src/lib.rs index 26057ffd4c03..359b930b2bcc 100644 --- a/rs/state_manager/src/lib.rs +++ b/rs/state_manager/src/lib.rs @@ -50,10 +50,10 @@ use ic_metrics::{ use ic_protobuf::proxy::{ProtoProxy, ProxyDecodeError}; use ic_protobuf::{messaging::xnet::v1, state::v1 as pb}; use ic_registry_subnet_type::SubnetType; -use ic_replicated_state::page_map::PageAllocatorFileDescriptor; -use ic_replicated_state::{ - ReplicatedState, - page_map::{PersistenceError, StorageMetrics}, +use ic_replicated_state::ReplicatedState; +use ic_replicated_state::metrics::ReplicatedStateMetrics; +use ic_replicated_state::page_map::{ + PageAllocatorFileDescriptor, PersistenceError, StorageMetrics, }; use ic_state_layout::{ CheckpointLayout, CheckpointStatus, ReadOnly, StateLayout, error::LayoutError, @@ -69,7 +69,9 @@ use ic_types::{ state_sync::CURRENT_STATE_SYNC_VERSION, xnet::{CertifiedStreamSlice, StreamIndex, StreamSlice}, }; -use ic_utils_thread::{JoinOnDrop, deallocator_thread::DeallocatorThread}; +use ic_utils_thread::JoinOnDrop; +use ic_utils_thread::deallocator_thread::DeallocatorThread; +use ic_utils_thread::worker_thread::WorkerThread; use ic_wasm_types::ModuleLoadingStatus; use parking_lot::RwLockWriteGuard; use prometheus::{Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec}; @@ -192,6 +194,7 @@ pub struct StateManagerMetrics { latest_hash_tree_max_index: IntGauge, fast_forward_height: IntGauge, no_state_clone_count: IntCounter, + skipped_state_observations: IntCounter, } #[derive(Clone)] @@ -487,6 +490,11 @@ impl StateManagerMetrics { "Number of heights whose states were not cloned and not stored by this node.", ); + let skipped_state_observations = metrics_registry.int_counter( + "state_manager_skipped_state_observations", + "Number of `ReplicatedStateMetrics::observe` invocations skipped because the background metrics thread was still busy.", + ); + Self { state_manager_error_count, checkpoint_op_duration, @@ -514,6 +522,7 @@ impl StateManagerMetrics { latest_hash_tree_max_index, fast_forward_height, no_state_clone_count, + skipped_state_observations, } } @@ -933,6 +942,10 @@ const EXTRA_CHECKPOINTS_TO_KEEP: usize = 0; pub struct StateManagerImpl { log: ReplicaLogger, metrics: StateManagerMetrics, + replicated_state_metrics: Arc, + /// Runs expensive `ReplicatedStateMetrics::observe` calls in the background, + /// so that `commit_and_certify` does not have to do it on the critical path. + replicated_state_metrics_thread: WorkerThread, state_layout: StateLayout, /// The main metadata. Different threads will need to access this field. /// @@ -1328,6 +1341,9 @@ impl StateManagerImpl { max_certified_height_tx: watch::Sender, ) -> Self { let metrics = StateManagerMetrics::new(metrics_registry, log.clone()); + let replicated_state_metrics = Arc::new(ReplicatedStateMetrics::new(metrics_registry)); + let replicated_state_metrics_thread = + WorkerThread::new("StateMetrics", metrics.skipped_state_observations.clone()); let _timer = metrics .api_call_duration @@ -1625,6 +1641,8 @@ impl StateManagerImpl { Self { log, metrics, + replicated_state_metrics, + replicated_state_metrics_thread, state_layout, states, verifier, @@ -3530,6 +3548,16 @@ impl StateManager for StateManagerImpl { .send(req) .expect("failed to send tip request"); } + + // `ReplicatedStateMetrics::observe()` only updates Prometheus metrics, so defer + // it to a background thread to keep it off the critical path. + let replicated_state_metrics = Arc::clone(&self.replicated_state_metrics); + let own_subnet_id = self.own_subnet_id; + let log = self.log.clone(); + self.replicated_state_metrics_thread + .enqueue(Box::new(move || { + replicated_state_metrics.observe(own_subnet_id, &state, height, &log); + })); } fn report_diverged_checkpoint(&self, height: Height) { @@ -3662,13 +3690,13 @@ fn spawn_hash_thread( let delivered_hash = cert.signed.content.hash.as_ref(); assert_prev_hash_matches(delivered_hash, "delivered"); // If we do agree, write the certification to the metadata, so that consensus does - // not have to deliver it again. + // not have to deliver it again. certification_metadata.certification = *reference_certification; } // It's possible that we already computed this state before. We // validate that hashes agree to spot bugs causing non-determinism as - // early as possible. + // early as possible. let mut states = states.write(); if let Some(prev_metadata) = states.certifications_metadata.get(&height) { let prev_hash = &prev_metadata.certified_state_hash; @@ -4361,9 +4389,13 @@ pub mod testing { /// Testing only: Purges the `manifest` at `height` in `states.states_metadata`. fn purge_manifest(&mut self, height: Height) -> bool; - /// Testing only: Wait till deallocation queue is empty. + /// Testing only: Wait until deallocation queue is empty. fn flush_deallocation_channel(&self); + /// Testing only: Wait until all enqueued replicated state metrics observations + /// have been processed by the background metrics thread. + fn flush_metrics_channel(&self); + /// Testing only: Returns heights in `states.snapshots`. fn state_snapshot_heights(&self) -> Vec; @@ -4438,6 +4470,10 @@ pub mod testing { self.deallocator_thread.flush_deallocation_channel(); } + fn flush_metrics_channel(&self) { + self.replicated_state_metrics_thread.flush_channel(); + } + fn state_snapshot_heights(&self) -> Vec { let states = self.states.read(); states.snapshots.iter().map(|s| s.height).collect() diff --git a/rs/state_manager/tests/state_manager.rs b/rs/state_manager/tests/state_manager.rs index 39e16606e55a..fea15ceb2f49 100644 --- a/rs/state_manager/tests/state_manager.rs +++ b/rs/state_manager/tests/state_manager.rs @@ -52,7 +52,8 @@ use ic_test_utilities_consensus::fake::{Fake, FakeVerifier}; use ic_test_utilities_io::{make_mutable, make_readonly, write_all_at}; use ic_test_utilities_logger::with_test_replica_logger; use ic_test_utilities_metrics::{ - Labels, fetch_gauge, fetch_histogram_vec_stats, fetch_int_counter_vec, fetch_int_gauge, + HistogramStats, Labels, fetch_gauge, fetch_histogram_stats, fetch_histogram_vec_stats, + fetch_int_counter_vec, fetch_int_gauge, }; use ic_test_utilities_state::{arb_stream, arb_stream_slice, canister_ids}; use ic_test_utilities_tmpdir::tmpdir; @@ -9648,6 +9649,29 @@ fn commit_and_certify_panic_on_delivered_fake_certification() { }); } +#[test] +fn commit_and_certify_observes_replicated_state_metrics() { + state_manager_test(|metrics, sm| { + let state = sm.take_tip().1; + // Sanity check: metrics should be zero before any observations are made. + assert_matches!( + fetch_histogram_stats(metrics, "scheduler_canister_paused_execution"), + Some(HistogramStats { count: 0, sum: 0.0 }) + ); + + // Call `commit_and_certify` and wait for the background thread to complete the + // observation. + sm.commit_and_certify_at_height(state, Height::new(1), CertificationScope::Metadata, None); + sm.flush_metrics_channel(); + + // Metrics have now been updated. + assert_matches!( + fetch_histogram_stats(metrics, "scheduler_canister_paused_execution"), + Some(HistogramStats { count: 1, sum: 0.0 }) + ); + }); +} + #[test] fn certification_not_pruned() { state_manager_test(|metrics, sm| { diff --git a/rs/utils/thread/BUILD.bazel b/rs/utils/thread/BUILD.bazel index 3f287d30529b..355c66ac5574 100644 --- a/rs/utils/thread/BUILD.bazel +++ b/rs/utils/thread/BUILD.bazel @@ -8,7 +8,9 @@ rust_library( crate_name = "ic_utils_thread", version = "0.1.0", deps = [ + # Keep sorted. "@crate_index//:crossbeam-channel", + "@crate_index//:prometheus", ], ) @@ -16,6 +18,8 @@ rust_test( name = "thread_test", crate = ":thread", deps = [ - "@crate_index//:crossbeam-channel", + # Keep sorted. + "//rs/monitoring/metrics", + "@crate_index//:prometheus", ], ) diff --git a/rs/utils/thread/Cargo.toml b/rs/utils/thread/Cargo.toml index a2e8a30f36dc..759e7153bd35 100644 --- a/rs/utils/thread/Cargo.toml +++ b/rs/utils/thread/Cargo.toml @@ -10,3 +10,7 @@ documentation.workspace = true [dependencies] crossbeam-channel = { workspace = true } +prometheus = { workspace = true } + +[dev-dependencies] +ic-metrics = { path = "../../monitoring/metrics" } diff --git a/rs/utils/thread/src/lib.rs b/rs/utils/thread/src/lib.rs index 6793d460cf9c..22f7c271da8d 100644 --- a/rs/utils/thread/src/lib.rs +++ b/rs/utils/thread/src/lib.rs @@ -1,6 +1,7 @@ use std::thread; pub mod deallocator_thread; +pub mod worker_thread; /// An object that joins a thread when it's dropped. Mostly helpful to implement /// graceful shutdowns. diff --git a/rs/utils/thread/src/worker_thread.rs b/rs/utils/thread/src/worker_thread.rs new file mode 100644 index 000000000000..f46346ca51ee --- /dev/null +++ b/rs/utils/thread/src/worker_thread.rs @@ -0,0 +1,143 @@ +//! A background thread that runs workloads off the critical path. +//! +//! Backpressure: the worker has a single-slot channel plus the job currently +//! being processed (so at most two workloads pinned in memory at any time). If +//! a new workload arrives while both slots are full, the new workload is +//! dropped and a "skipped" counter is bumped. +//! +//! The struct is `Send + Sync` and shuts down cleanly on `Drop`: dropping the +//! sender closes the channel, the worker drains any enqueued workloads, its +//! `recv()` returns `Err`, and the `JoinOnDrop` handle joins the thread. + +use crate::JoinOnDrop; +use crossbeam_channel::{Sender, TrySendError, bounded}; +use prometheus::IntCounter; + +/// A workload to be executed by the worker thread. +type Workload = Box; + +enum Job { + Workload(Workload), + /// Test-only barrier: notify when the worker has drained all preceding + /// jobs; only used in tests. + Flush(Sender<()>), +} + +/// A worker thread that executes workloads in the background. +pub struct WorkerThread { + sender: Sender, + skipped: IntCounter, + _handle: JoinOnDrop<()>, +} + +impl WorkerThread { + pub fn new(name: &str, skipped: IntCounter) -> Self { + // At most one queued job; combined with the in-flight job that's the + // most state we'll keep alive. + let (sender, receiver) = bounded::(1); + + let handle = JoinOnDrop::new( + std::thread::Builder::new() + .name(name.to_string()) + .spawn(move || { + while let Ok(job) = receiver.recv() { + match job { + Job::Workload(workload) => { + workload(); + } + Job::Flush(notify) => { + // Best-effort notify; ignore if the receiver is + // already gone (e.g. test was aborted). + let _ = notify.send(()); + } + } + } + }) + .expect("failed to spawn worker thread"), + ); + Self { + sender, + skipped, + _handle: handle, + } + } + + /// Enqueues a workload. If the worker is busy and the channel is full, drops + /// the workload and increments the `skipped` counter rather than blocking the + /// caller. + pub fn enqueue(&self, workload: Workload) { + match self.sender.try_send(Job::Workload(workload)) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + self.skipped.inc(); + } + Err(TrySendError::Disconnected(_)) => { + // The worker thread exited; should only happen during shutdown. + } + } + } + + /// Test-only: blocks until all previously-enqueued workloads have been + /// processed. + #[doc(hidden)] + pub fn flush_channel(&self) { + let (notify_send, notify_recv) = bounded(1); + // Use a blocking send: this waits for the in-flight + queued job (if + // any) to drain, then enqueues the flush marker. + if self.sender.send(Job::Flush(notify_send)).is_err() { + // Worker is gone; nothing to flush. + return; + } + notify_recv + .recv() + .expect("worker thread exited while flushing"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + + fn skipped_counter() -> IntCounter { + let registry = ic_metrics::MetricsRegistry::new(); + registry.int_counter("test_skipped", "Skipped workloads.") + } + + #[test] + fn enqueue_does_not_block_caller_under_load() { + // Spam the worker with many workloads and ensure that the caller is + // never blocked: with a single-slot channel, the surplus must be + // dropped via the `skipped` counter rather than queued. We only + // assert that the loop completes promptly and that `skipped + processed` + // accounts for all the workloads we attempted. + const N: u64 = 1_000; + + let completed = Arc::new(AtomicU64::new(0)); + let skipped = skipped_counter(); + + let worker_thread = WorkerThread::new("test_worker_thread_skip", skipped.clone()); + for _ in 0..N { + let completed = Arc::clone(&completed); + worker_thread.enqueue(Box::new(move || { + completed.fetch_add(1, Ordering::Relaxed); + })); + } + worker_thread.flush_channel(); + + // Completed vs skipped counts are non-deterministic (scheduling dependent), but + // they should both be non-zero and must sum to the number of attempts. + let completed = completed.load(Ordering::Relaxed); + assert!(completed > 0); + assert!(skipped.get() > 0); + assert_eq!(completed + skipped.get(), N); + } + + #[test] + fn flush_is_a_no_op_when_idle() { + let worker_thread = WorkerThread::new("test_worker_thread_flush_idle", skipped_counter()); + // Should return promptly even with nothing in the queue. + worker_thread.flush_channel(); + } +}