From 2fcfa5641b4b3080b67021871dfa7001c0069ec8 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Tue, 5 May 2026 12:18:56 +0000 Subject: [PATCH 1/5] feat: [DSM-103] Active-canister-only scheduling Only schedule active canisters instead of all canisters. This needs to be done on every scheduler loop iteration (as opposed to once per round followed by filtering per iteration), to account for canisters going idle / becoming active from one iteration to the next. But scheduling priorities continue to be updated only once, at the end of the round. --- rs/execution_environment/BUILD.bazel | 2 + rs/execution_environment/benches/scheduler.rs | 44 +- rs/execution_environment/src/lib.rs | 4 +- rs/execution_environment/src/scheduler.rs | 45 +- .../src/scheduler/round_schedule.rs | 407 +++++++----------- .../src/scheduler/round_schedule/tests.rs | 332 +++++++------- .../src/scheduler/scheduler_metrics.rs | 13 +- .../src/scheduler/tests/limits.rs | 2 +- .../src/scheduler/tests/metrics.rs | 1 - .../src/scheduler/tests/scheduling.rs | 199 +-------- rs/replicated_state/src/canister_state.rs | 14 + .../src/metadata_state/subnet_schedule.rs | 8 + rs/replicated_state/src/replicated_state.rs | 27 +- rs/replicated_state/tests/replicated_state.rs | 7 +- 14 files changed, 462 insertions(+), 643 deletions(-) diff --git a/rs/execution_environment/BUILD.bazel b/rs/execution_environment/BUILD.bazel index 01b0612b66a6..7642d9ce80bf 100644 --- a/rs/execution_environment/BUILD.bazel +++ b/rs/execution_environment/BUILD.bazel @@ -391,9 +391,11 @@ rust_ic_bench( # Keep sorted. ":execution_environment", "//rs/config", + "//rs/monitoring/logger", "//rs/monitoring/metrics", "//rs/registry/subnet_type", "//rs/replicated_state", + "//rs/test_utilities/types", "//rs/types/base_types", "//rs/types/cycles", "//rs/types/types", diff --git a/rs/execution_environment/benches/scheduler.rs b/rs/execution_environment/benches/scheduler.rs index e35b98ed767b..4042723a735d 100644 --- a/rs/execution_environment/benches/scheduler.rs +++ b/rs/execution_environment/benches/scheduler.rs @@ -2,10 +2,16 @@ use criterion::Criterion; use ic_base_types::NumSeconds; use ic_config::flag_status::FlagStatus; use ic_execution_environment::{RoundSchedule, SchedulerMetrics}; +use ic_logger::new_replica_logger_from_config; use ic_metrics::MetricsRegistry; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::canister_state::canister_snapshots::CanisterSnapshots; -use ic_replicated_state::{CanisterState, ReplicatedState, SchedulerState, SystemState}; +use ic_replicated_state::canister_state::system_state::PausedExecutionId; +use ic_replicated_state::{ + CanisterState, ExecutionTask, InputQueueType, ReplicatedState, SchedulerState, SystemState, +}; +use ic_test_utilities_types::messages::RequestBuilder; +use ic_types::messages::{CanisterMessageOrTask, CanisterTask}; use ic_types::{ExecutionRound, NumBytes, NumInstructions}; use ic_types_cycles::Cycles; use ic_types_test_utils::ids::{canister_test_id, subnet_test_id, user_test_id}; @@ -15,8 +21,6 @@ use std::sync::Arc; fn main() { // 100k canisters, 5k active, 1k executed every round. let mut canisters = BTreeMap::new(); - let mut ordered_new_execution_canister_ids = Vec::new(); - let mut ordered_long_execution_canister_ids = Vec::new(); let mut executed_canisters = BTreeSet::new(); for i in 0..100_000 { let canister_id = canister_test_id(i); @@ -28,15 +32,29 @@ fn main() { NumSeconds::from(100_000), ); let canister_snapshots = CanisterSnapshots::default(); - let canister_state = + let mut canister_state = CanisterState::new(system_state, None, scheduler_state, canister_snapshots); // 5k active canisters. if i < 5_000 { // Every 10th canister has a long execution, the rest have new inputs. if i % 10 == 0 { - ordered_long_execution_canister_ids.push(canister_id); + canister_state + .system_state + .task_queue + .enqueue(ExecutionTask::PausedExecution { + id: PausedExecutionId(0), + input: CanisterMessageOrTask::Task(CanisterTask::Heartbeat), + }); } else { - ordered_new_execution_canister_ids.push(canister_id); + let mut available_memory = i64::MAX; + canister_state + .push_input( + RequestBuilder::new().receiver(canister_id).build().into(), + &mut available_memory, + SubnetType::Application, + InputQueueType::RemoteSubnet, + ) + .unwrap(); } } // First 1k canisters complete an execution every round. @@ -53,44 +71,42 @@ fn main() { let rate_limiting_of_heap_delta = FlagStatus::Enabled; let install_code_rate_limit = NumInstructions::from(1_000_000); let rate_limiting_of_instructions = FlagStatus::Enabled; - let long_execution_cores = 1; let mut round_schedule = RoundSchedule::new( scheduler_cores, heap_delta_rate_limit, rate_limiting_of_heap_delta, install_code_rate_limit, rate_limiting_of_instructions, - long_execution_cores, - ordered_new_execution_canister_ids, - ordered_long_execution_canister_ids, ); let metrics_registry = MetricsRegistry::new(); let metrics = SchedulerMetrics::new(&metrics_registry); + let (log, _async_guard) = new_replica_logger_from_config(&Default::default()); let mut criterion = Criterion::default(); let mut group = criterion.benchmark_group("RoundSchedule"); + let current_round = ExecutionRound::from(13); group.bench_function("iteration", |bench| { bench.iter(|| { - round_schedule.start_iteration(&mut state, true); + round_schedule.start_iteration(&mut state, true, &metrics, &log); round_schedule.end_iteration( &mut state, &executed_canisters, &executed_canisters, &BTreeSet::new(), - ExecutionRound::from(1), + current_round, ); }); }); // Populate the subnet schedule, even if the iteration benchmark is not run. - round_schedule.start_iteration(&mut state, true); + round_schedule.start_iteration(&mut state, true, &metrics, &log); round_schedule.end_iteration( &mut state, &executed_canisters, &executed_canisters, &BTreeSet::new(), - ExecutionRound::from(1), + current_round, ); group.bench_function("finish_round", |bench| { diff --git a/rs/execution_environment/src/lib.rs b/rs/execution_environment/src/lib.rs index 7dfe56725e20..659e7fa6f5f5 100644 --- a/rs/execution_environment/src/lib.rs +++ b/rs/execution_environment/src/lib.rs @@ -52,7 +52,9 @@ pub use metrics::IngressFilterMetrics; pub use query_handler::{DataCertificateWithDelegationMetadata, InternalHttpQueryHandler}; use query_handler::{HttpQueryHandler, QueryScheduler}; use scheduler::SchedulerImpl; -pub use scheduler::{RoundSchedule, SchedulerMetrics, abort_all_paused_executions}; +pub use scheduler::{ + IterationSchedule, RoundSchedule, SchedulerMetrics, abort_all_paused_executions, +}; use std::{path::Path, sync::Arc}; use tokio::sync::mpsc::Sender; diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 1cc8aeda6c08..b14c8563c6a3 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -1,5 +1,5 @@ -pub use self::round_schedule::RoundSchedule; use self::round_schedule::*; +pub use self::round_schedule::{IterationSchedule, RoundSchedule}; pub use self::scheduler_metrics::SchedulerMetrics; use self::scheduler_metrics::*; use self::threshold_signatures::*; @@ -422,6 +422,7 @@ impl SchedulerImpl { canister_ingress_latencies: &mut CanisterIngressQueueLatencies, scheduler_round_limits: &mut SchedulerRoundLimits, root_measurement_scope: &MeasurementScope<'a>, + round_log: &ReplicaLogger, ) -> ReplicatedState { let cost_schedule = state.get_own_cost_schedule(); let measurement_scope = @@ -486,10 +487,12 @@ impl SchedulerImpl { // Scheduling. let scheduling_timer = self.metrics.round_inner_iteration_scheduling.start_timer(); - round_schedule.charge_idle_canisters(state.canisters_and_schedule_mut().0); - - // Obtain the active canisters for this iteration. - let iteration_schedule = round_schedule.start_iteration(&mut state, is_first_iteration); + let iteration_schedule = round_schedule.start_iteration( + &mut state, + is_first_iteration, + &self.metrics, + round_log, + ); if iteration_schedule.is_empty() { break state; } @@ -593,6 +596,7 @@ impl SchedulerImpl { .metrics .round_inner_heartbeat_overhead_duration .start_timer(); + // Remove all remaining `Heartbeat` and `GlobalTimer` tasks // because they will be added again in the next round. for canister_id in &heartbeat_and_timer_canisters { @@ -723,7 +727,7 @@ impl SchedulerImpl { max_instructions_executed_per_thread = max_instructions_executed_per_thread.max(instructions_executed); - let divisor = round_limits_per_thread.instructions.get(); + let divisor = self.config.max_instructions_per_slice.get(); debug_assert_ne!(divisor, 0, "prevent divide by zero panic"); if divisor > 0 { let value = instructions_executed.get() as f64 / divisor as f64; @@ -1065,6 +1069,9 @@ impl SchedulerImpl { /// /// NOTE: This is also called by `checkpoint_round_with_no_execution()`, so it /// must be safe to call even when no execution has taken place. + // + // TODO(DSM-103): Consider only aborting / checking DTS invariants for actually + // scheduled canisters. fn finish_round( &self, state: &mut ReplicatedState, @@ -1433,25 +1440,16 @@ impl Scheduler for SchedulerImpl { scheduler_round_limits.update_subnet_round_limits(&subnet_round_limits); } - // Scheduling. - let mut round_schedule = { - let _timer = self.metrics.round_scheduling_duration.start_timer(); - - RoundSchedule::apply_scheduling_strategy( - &mut state, - self.config.scheduler_cores, - self.config.heap_delta_rate_limit, - self.rate_limiting_of_heap_delta, - self.config.install_code_rate_limit, - self.rate_limiting_of_instructions, - current_round, - self.config.accumulated_priority_reset_interval, - &self.metrics, - &round_log, - ) - }; + // TODO(DSM-103): Consider routing messages from subnet output queues to local canisters. // Inner round. + let mut round_schedule = RoundSchedule::new( + self.config.scheduler_cores, + self.config.heap_delta_rate_limit, + self.rate_limiting_of_heap_delta, + self.config.install_code_rate_limit, + self.rate_limiting_of_instructions, + ); let mut state = self.inner_round( state, current_round, @@ -1463,6 +1461,7 @@ impl Scheduler for SchedulerImpl { &mut canister_ingress_latencies, &mut scheduler_round_limits, &root_measurement_scope, + &round_log, ); // Update [`SignWithThresholdContext`]s by assigning randomness and matching pre-signatures. diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index 042f3d6202b9..8b827e8b4760 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -9,7 +9,6 @@ use ic_logger::{ReplicaLogger, error}; use ic_replicated_state::canister_state::NextExecution; use ic_replicated_state::{CanisterPriority, CanisterState, ReplicatedState}; use ic_types::{AccumulatedPriority, ComputeAllocation, ExecutionRound, NumInstructions}; -use ic_utils::iter::left_outer_join; use num_traits::SaturatingSub; use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; @@ -199,10 +198,6 @@ impl IterationSchedule { pub fn is_empty(&self) -> bool { self.schedule.is_empty() } - - pub fn iter(&self) -> impl Iterator { - self.schedule.iter() - } } /// Round-level schedule and accounting: builds the iteration schedule each iteration, @@ -212,13 +207,6 @@ pub struct RoundSchedule { /// Immutable configuration for this round. config: Config, - /// Number of cores dedicated for long executions. - long_execution_cores: usize, - /// Ordered Canister IDs with new executions. - ordered_new_execution_canister_ids: Vec, - /// Ordered Canister IDs with long executions. - ordered_long_execution_canister_ids: Vec, - /// Canisters that were scheduled. scheduled_canisters: BTreeSet, /// Canisters that had a long execution at the start of this round. @@ -242,9 +230,6 @@ impl RoundSchedule { rate_limiting_of_heap_delta: FlagStatus, install_code_rate_limit: NumInstructions, rate_limiting_of_instructions: FlagStatus, - long_execution_cores: usize, - ordered_new_execution_canister_ids: Vec, - ordered_long_execution_canister_ids: Vec, ) -> Self { let config = Config { scheduler_cores, @@ -263,10 +248,6 @@ impl RoundSchedule { }; Self { config, - long_execution_cores: long_execution_cores - .min(ordered_long_execution_canister_ids.len()), - ordered_new_execution_canister_ids, - ordered_long_execution_canister_ids, scheduled_canisters: BTreeSet::new(), executed_canisters: BTreeSet::new(), long_execution_canisters: BTreeSet::new(), @@ -276,31 +257,6 @@ impl RoundSchedule { } } - /// Marks idle canisters in front of the schedule as fully executed. - pub fn charge_idle_canisters( - &mut self, - canisters: &mut BTreeMap>, - ) { - for canister_id in self.ordered_new_execution_canister_ids.iter() { - let canister = canisters.get(canister_id); - if let Some(canister) = canister { - let next_execution = canister.next_execution(); - match next_execution { - NextExecution::None => { - self.fully_executed_canisters.insert(canister.canister_id()); - } - // Skip install code canisters. - NextExecution::ContinueInstallCode => {} - - NextExecution::StartNew | NextExecution::ContinueLong => { - // Stop searching after the first non-idle canister. - break; - } - } - } - } - } - /// Computes and returns an iteration schedule covering active canisters only. /// /// Updates round accumulators (scheduled, rate limited, long execution @@ -309,11 +265,23 @@ impl RoundSchedule { &mut self, state: &mut ReplicatedState, is_first_iteration: bool, + metrics: &SchedulerMetrics, + logger: &ReplicaLogger, ) -> IterationSchedule { let (canister_states, subnet_schedule) = state.canisters_and_schedule_mut(); + // Sum of all scheduled canisters' compute allocations. + // This corresponds to |a| in Scheduler Analysis. + let mut total_compute_allocation = ZERO; + let mut long_executions_count = 0; + // Sum of all long execution canisters' compute allocations. + let mut long_executions_compute_allocation = ZERO; + // Collect all active canisters and their next executions. - let canister_next_executions: BTreeMap<_, _> = canister_states + // + // Unfortunately, not all active canisters are in the subnet schedule, so we + // must iterate over all canister states to find them. + let mut schedule: Vec = canister_states .iter() .filter_map(|(canister_id, canister)| { // Record and filter out rate limited canisters. @@ -331,11 +299,7 @@ impl RoundSchedule { return None; } - let next_execution = canister.next_execution(); - match next_execution { - // Filter out canisters with no messages or with paused installations. - NextExecution::None | NextExecution::ContinueInstallCode => None, - + let canister_round_state = match canister.next_execution() { NextExecution::StartNew => { // Don't schedule canisters that completed a long execution this round. We need // canisters to move between the long execution and new execution pools, so the @@ -343,56 +307,102 @@ impl RoundSchedule { if self.long_execution_canisters.contains(canister_id) { return None; } - self.scheduled_canisters.insert(*canister_id); - Some((canister_id, next_execution)) + CanisterRoundState::new(canister, subnet_schedule.get_mut(*canister_id)) } NextExecution::ContinueLong => { if is_first_iteration { self.long_execution_canisters.insert(*canister_id); } - self.scheduled_canisters.insert(*canister_id); - Some((canister_id, next_execution)) + let priority = subnet_schedule.get_mut(*canister_id); + let rs = CanisterRoundState::new(canister, priority); + long_executions_count += 1; + long_executions_compute_allocation += rs.compute_allocation; + rs } - } + + NextExecution::None => { + // (Only) drop from the schedule idle canisters with 0-100 AP. + // + // Idle canisters with negative AP are kept in the schedule until they reach 0 + // AP, to prevent them from jumping from the back of the schedule to the middle + // just by being idle for a round. Similarly, idle canisters with positive AP + // burn it down as if they had executed full rounds until they (almost) reach 0. + if is_first_iteration && !canister.must_be_in_schedule() { + let canister_priority = subnet_schedule.get(canister_id); + if canister_priority.accumulated_priority >= ZERO + && canister_priority.accumulated_priority <= ONE_HUNDRED_PERCENT + { + subnet_schedule.remove(canister_id); + } + } + return None; + } + + NextExecution::ContinueInstallCode => return None, + }; + + total_compute_allocation += canister_round_state.compute_allocation; + self.scheduled_canisters.insert(*canister_id); + + Some(canister_round_state) }) .collect(); + schedule.sort(); - let mut schedule: Vec = self - .ordered_long_execution_canister_ids - .iter() - .filter( - |canister_id| match canister_next_executions.get(canister_id) { - Some(NextExecution::ContinueLong) => true, - - // We expect long execution, but there is none, - // so the long execution was finished in the - // previous inner round. - // - // We should avoid scheduling this canister to: - // 1. Avoid the canister to bypass the logic in - // `apply_scheduling_strategy()`. - // 2. Charge canister for resources at the end - // of the round. - Some(NextExecution::StartNew) => false, - - None - | Some(NextExecution::None) // Idle canister. - | Some(NextExecution::ContinueInstallCode) // Subnet message. - => false, - }, + let compute_capacity = self.compute_capacity(); + let long_execution_cores = if long_executions_count == schedule.len() { + // Only long executions. + std::cmp::min(long_executions_count, self.config.scheduler_cores) + } else { + // Mix of long and short executions. + // + // Compute the number of long execution cores by dividing long executions' + // compute allocation plus free compute share by `100%` and rounding up (so that + // both long and new executions get enough cores to cover their respective + // cumulative compute allocations). + let free_compute = compute_capacity - total_compute_allocation; + let long_executions_compute = long_executions_compute_allocation + + (free_compute * long_executions_count as i64 / schedule.len() as i64); + std::cmp::min( + long_executions_count, + ((long_executions_compute + ONE_HUNDRED_PERCENT - AccumulatedPriority::new(1)) + / ONE_HUNDRED_PERCENT) as usize, ) - .cloned() - .collect(); - let long_executions_count = schedule.len(); - let long_execution_cores = self.long_execution_cores.min(long_executions_count); + }; - schedule.extend( - self.ordered_new_execution_canister_ids - .iter() - .filter(|canister_id| canister_next_executions.contains_key(canister_id)), + // There is at least `1%` of free capacity to distribute across canisters. + // This is guaranteed by `validate_compute_allocation()`. + debug_assert_or_critical_error!( + total_compute_allocation + ONE_PERCENT <= compute_capacity, + metrics.scheduler_compute_allocation_invariant_broken, + logger, + "{}: Total compute allocation {}% must be less than compute capacity {}%", + SCHEDULER_COMPUTE_ALLOCATION_INVARIANT_BROKEN, + total_compute_allocation, + compute_capacity + ); + // If there are long executions, `long_execution_cores` must be non-zero. + debug_assert_or_critical_error!( + long_executions_count == 0 || long_execution_cores > 0, + metrics.scheduler_cores_invariant_broken, + logger, + "{}: Number of long execution cores {} must be more than 0", + SCHEDULER_CORES_INVARIANT_BROKEN, + long_execution_cores, + ); + // Can't have more long execution cores than scheduler cores. + debug_assert_or_critical_error!( + long_execution_cores <= self.config.scheduler_cores, + metrics.scheduler_cores_invariant_broken, + logger, + "{}: Number of long execution cores {} must be <= scheduler cores {}", + SCHEDULER_CORES_INVARIANT_BROKEN, + long_execution_cores, + self.config.scheduler_cores ); + let schedule: Vec = schedule.into_iter().map(|rs| rs.canister_id).collect(); if is_first_iteration { // First iteration: mark the first canister on each core as fully executed. let mut observe_scheduled_as_first = |canister: &CanisterId| { @@ -514,11 +524,17 @@ impl RoundSchedule { .insert(*canister_id); } - // Add all canisters to the subnet schedule; and charge any immediate or - // deferred (on long execution completion) execution rounds. - for canister in canister_states.values() { + // Remove any deleted canisters from the subnet schedule. Beyond this point it + // is safe to assume that the subnet schedule only refers to existing canisters. + subnet_schedule.retain(|canister_id, _| canister_states.contains_key(canister_id)); + + // Add all canisters that we (tried to) schedule this round to the subnet + // schedule; and charge any immediate or deferred (on long execution completion) + // full execution rounds. + for canister_id in &self.scheduled_canisters { + let canister = canister_states.get_mut(canister_id).unwrap(); // Add the canister to the subnet schedule, if not already there. - let canister_priority = subnet_schedule.get_mut(canister.canister_id()); + let canister_priority = subnet_schedule.get_mut(*canister_id); // Charge for the first round of every long execution immediately, to properly // account for newly started long executions (scheduled as new executions). @@ -531,14 +547,28 @@ impl RoundSchedule { // On message completion (or short execution), charge for the remaining rounds. if canister_priority.executed_rounds > 0 && (!canister.has_long_execution() - || self - .canisters_with_completed_messages - .contains(&canister.canister_id())) + || self.canisters_with_completed_messages.contains(canister_id)) { canister_priority.accumulated_priority -= ONE_HUNDRED_PERCENT * (canister_priority.executed_rounds - 1).max(1); - canister_priority.executed_rounds = 0; + if canister.has_long_execution() + && self.long_execution_canisters.contains(canister_id) + { + // Safety net: canister had a long execution at the start of this round; + // completed it; and started another long execution; this branch is currently + // never taken, because we never reschedule canisters that completed a long + // execution this round; but if we ever change that behavior, skip charging for + // the first round of the new long execution but do charge for the second round. + canister_priority.executed_rounds = 1; + } else { + canister_priority.executed_rounds = 0; + } } + + Arc::make_mut(canister) + .system_state + .canister_metrics_mut() + .observe_round_scheduled(); } self.grant_heap_delta_and_install_code_credits(state, metrics); @@ -549,15 +579,22 @@ impl RoundSchedule { let mut total_ap = ZERO; let mut total_ca = ZERO; let mut compute_allocations = Vec::with_capacity(subnet_schedule.len()); - for (_, canister, canister_priority) in - left_outer_join(canister_states.iter_mut(), subnet_schedule.iter_mut()) - { - // Safe to unwrap, we called SubnetSchedule::get_mut() above for all canisters. - let canister_priority = canister_priority.unwrap(); - + for (canister_id, canister_priority) in subnet_schedule.iter_mut() { + let canister = canister_states.get_mut(canister_id).unwrap(); let compute_allocation = from_ca(canister.compute_allocation()); canister_priority.accumulated_priority += compute_allocation; + // Treat idle canisters with positive AP as fully executed (which is technically + // true). The goal is to gradually burn down their AP to zero, so that if they + // get new inputs soon, they will not have instantly lost all their previous AP. + if canister_priority.accumulated_priority > ZERO + && !self.scheduled_canisters.contains(canister_id) + && !self.rate_limited_canisters.contains(canister_id) + { + canister_priority.accumulated_priority -= + ONE_HUNDRED_PERCENT.min(canister_priority.accumulated_priority); + } + // Apply an exponential decay to AP values outside the [AP_ROUNDS_MIN, // AP_ROUNDS_MAX] range to soft bound any runaway AP. const AP_MAX: AccumulatedPriority = @@ -574,10 +611,10 @@ impl RoundSchedule { total_ap += canister_priority.accumulated_priority; total_ca += compute_allocation; - compute_allocations.push((canister.canister_id(), compute_allocation)); + compute_allocations.push((*canister_id, compute_allocation)); } - // Distribute the "free compute" (negative of total AP) to all canisters. + // Distribute the "free compute" (negative of total AP) to scheduled canisters. // // Only ever apply positive free compute. If the total AP is positive (e.g. we // granted compute allocations after not having completed any execution this @@ -628,6 +665,10 @@ impl RoundSchedule { .set((accumulated_priority_deviation / subnet_schedule.len().max(1) as f64).sqrt()); self.observe_round_metrics(state, current_round, metrics); + + // NOTE: Some active canisters may not be in the subnet schedule at this point, + // because we may have bailed out of inner_round() due to reaching some limit, + // either before we scheduled anything or after inducting subnet-local messages. } /// Deducts the heap delta and install code rate limits from the canisters' @@ -637,8 +678,12 @@ impl RoundSchedule { state: &mut ReplicatedState, metrics: &SchedulerMetrics, ) { - let (canister_states, _) = state.canisters_and_schedule_mut(); - for canister in canister_states.values_mut() { + let (canister_states, subnet_schedule) = state.canisters_and_schedule_mut(); + for (canister_id, _) in subnet_schedule.iter() { + let Some(canister) = canister_states.get_mut(canister_id) else { + continue; + }; + let heap_delta_debit = canister.scheduler_state.heap_delta_debit.get(); metrics .canister_heap_delta_debits @@ -711,164 +756,14 @@ impl RoundSchedule { /// Returns scheduler compute capacity in accumulated priority. /// /// For the DTS scheduler, it's `(number of cores - 1) * 100%` - fn compute_capacity(scheduler_cores: usize) -> AccumulatedPriority { - ONE_HUNDRED_PERCENT * (scheduler_cores as i64 - 1) + fn compute_capacity(&self) -> AccumulatedPriority { + ONE_HUNDRED_PERCENT * (self.config.scheduler_cores as i64 - 1) } /// Canisters that were scheduled this round. pub fn scheduled_canisters(&self) -> &BTreeSet { &self.scheduled_canisters } - - /// Orders the canisters and updates their accumulated priorities according to - /// the strategy described in RUN-58. - /// - /// A shorter description of the scheduling strategy is available in the note - /// section about [Scheduler and AccumulatedPriority] in types/src/lib.rs - pub(super) fn apply_scheduling_strategy( - state: &mut ReplicatedState, - scheduler_cores: usize, - heap_delta_rate_limit: NumBytes, - rate_limiting_of_heap_delta: FlagStatus, - install_code_rate_limit: NumInstructions, - rate_limiting_of_instructions: FlagStatus, - current_round: ExecutionRound, - accumulated_priority_reset_interval: ExecutionRound, - metrics: &SchedulerMetrics, - logger: &ReplicaLogger, - ) -> RoundSchedule { - let number_of_canisters = state.canister_states().len(); - - // Total allocatable compute capacity. - // As one scheduler core is reserved to guarantee long executions progress, - // compute capacity is `(scheduler_cores - 1) * 100` - let compute_capacity = Self::compute_capacity(scheduler_cores); - - // Sum of all scheduled canisters' compute allocations. - // This corresponds to |a| in Scheduler Analysis. - let mut total_compute_allocation = ZERO; - // Sum of all long execution canisters' compute allocations. - let mut long_executions_compute_allocation = ZERO; - let mut long_executions_count = 0; - - // This corresponds to the vector p in the Scheduler Analysis document. - let mut round_states = Vec::with_capacity(number_of_canisters); - - // Reset the accumulated priorities periodically. - // We want to reset the scheduler regularly to safely support changes in the set - // of canisters and their compute allocations. - let is_reset_round = current_round - .get() - .is_multiple_of(accumulated_priority_reset_interval.get()); - let (canister_states, subnet_schedule) = state.canisters_and_schedule_mut(); - if is_reset_round { - for &canister_id in canister_states.keys() { - let canister_priority = subnet_schedule.get_mut(canister_id); - canister_priority.accumulated_priority = Default::default(); - } - } - - // Collect the priority of the canisters for this round. - let mut accumulated_priority_invariant = ZERO; - for (_, canister, canister_priority) in - left_outer_join(canister_states.iter_mut(), subnet_schedule.iter()) - { - let canister_priority = canister_priority.unwrap_or(&CanisterPriority::DEFAULT); - let compute_allocation = from_ca(canister.compute_allocation()); - let accumulated_priority = canister_priority.accumulated_priority; - round_states.push(CanisterRoundState::new(canister, canister_priority)); - - total_compute_allocation += compute_allocation; - accumulated_priority_invariant += accumulated_priority; - if canister_priority.long_execution_start_round.is_some() { - long_executions_compute_allocation += compute_allocation; - long_executions_count += 1; - } - if canister.has_input() { - let canister = Arc::make_mut(canister); - canister - .system_state - .canister_metrics_mut() - .observe_round_scheduled(); - } - } - round_states.sort(); - - // Assert there is at least `1%` of free capacity to distribute across canisters. - // It's guaranteed by `validate_compute_allocation()` - debug_assert_or_critical_error!( - total_compute_allocation + ONE_PERCENT <= compute_capacity, - metrics.scheduler_compute_allocation_invariant_broken, - logger, - "{}: Total compute allocation {}% must be less than compute capacity {}%", - SCHEDULER_COMPUTE_ALLOCATION_INVARIANT_BROKEN, - total_compute_allocation, - compute_capacity - ); - // Observe accumulated priority metrics - metrics - .scheduler_accumulated_priority_invariant - .set(accumulated_priority_invariant.get()); - - let long_execution_cores = if long_executions_count == canister_states.len() { - // Only long executions. - std::cmp::min(long_executions_count, scheduler_cores) - } else { - // Mix of long and short executions. - // - // Compute the number of long execution cores by dividing long executions' - // compute allocation plus free compute share by `100%` and rounding up (so that - // both long and new executions get enough cores to cover their respective - // cumulative compute allocations). - let free_compute = compute_capacity - total_compute_allocation; - let long_executions_compute = long_executions_compute_allocation - + (free_compute * long_executions_count as i64 / canister_states.len() as i64); - std::cmp::min( - long_executions_count, - ((long_executions_compute + ONE_HUNDRED_PERCENT - AccumulatedPriority::new(1)) - / ONE_HUNDRED_PERCENT) as usize, - ) - }; - - // If there are long executions, `long_execution_cores` must be non-zero. - debug_assert_or_critical_error!( - long_executions_count == 0 || long_execution_cores > 0, - metrics.scheduler_cores_invariant_broken, - logger, - "{}: Number of long execution cores {} must be more than 0", - SCHEDULER_CORES_INVARIANT_BROKEN, - long_execution_cores, - ); - // Can't have more long execution cores than scheduler cores. - debug_assert_or_critical_error!( - long_execution_cores <= scheduler_cores, - metrics.scheduler_cores_invariant_broken, - logger, - "{}: Number of long execution cores {} must be <= scheduler cores {}", - SCHEDULER_CORES_INVARIANT_BROKEN, - long_execution_cores, - scheduler_cores - ); - - RoundSchedule::new( - scheduler_cores, - heap_delta_rate_limit, - rate_limiting_of_heap_delta, - install_code_rate_limit, - rate_limiting_of_instructions, - long_execution_cores, - round_states - .iter() - .skip(long_executions_count) - .map(|rs| rs.canister_id) - .collect(), - round_states - .iter() - .take(long_executions_count) - .map(|rs| rs.canister_id) - .collect(), - ) - } } /// Returns true if the canister exports the heartbeat method. diff --git a/rs/execution_environment/src/scheduler/round_schedule/tests.rs b/rs/execution_environment/src/scheduler/round_schedule/tests.rs index 5a8955718a62..8ffe161043cd 100644 --- a/rs/execution_environment/src/scheduler/round_schedule/tests.rs +++ b/rs/execution_environment/src/scheduler/round_schedule/tests.rs @@ -47,19 +47,21 @@ impl RoundScheduleFixture { /// Creates a new fixture with a sensible `RoundSchedule` default (4 cores, /// large heap delta rate limit), and an empty `ReplicatedState`. fn new() -> Self { - let mut state = ReplicatedState::new(subnet_test_id(1), SubnetType::Application); - let current_round = ExecutionRound::new(1); - let metrics = SchedulerMetrics::new(&MetricsRegistry::new()); - let logger = ic_logger::replica_logger::test_logger(Some(slog::Level::Info)); - let round_schedule = - RoundScheduleBuilder::new().build(&mut state, current_round, &metrics, &logger); + Self::with_round_schedule(RoundScheduleBuilder::new().build()) + } + + /// Creates a new fixture with the given `RoundSchedule` and an empty + /// `ReplicatedState`. + fn with_round_schedule(round_schedule: RoundSchedule) -> Self { + let registry = MetricsRegistry::new(); + let metrics = SchedulerMetrics::new(®istry); Self { round_schedule, - state, - current_round, + state: ReplicatedState::new(subnet_test_id(1), SubnetType::Application), + current_round: ExecutionRound::new(1), next_canister_id: 0, metrics, - logger, + logger: ic_logger::replica_logger::test_logger(Some(slog::Level::Info)), } } @@ -92,41 +94,16 @@ impl RoundScheduleFixture { canister_id } - /// Creates a new `RoundSchedule` around the current `state`. - fn start_round( - &mut self, - current_round: ExecutionRound, - round_schedule_builder: RoundScheduleBuilder, - ) { - self.current_round = current_round; - self.round_schedule = round_schedule_builder.build( - &mut self.state, - current_round, - &self.metrics, - &self.logger, - ); - } - - /// Creates a new `RoundSchedule` around the current `state` and calls - /// `RoundSchedule::start_iteration` on it. + /// Calls `RoundSchedule::start_iteration`, mutating canister priorities and + /// returning the iteration schedule. fn start_iteration(&mut self, is_first_iteration: bool) -> IterationSchedule { - self.round_schedule = RoundScheduleBuilder::new().build( + let iteration = self.round_schedule.start_iteration( &mut self.state, - self.current_round, + is_first_iteration, &self.metrics, &self.logger, ); - self.start_iteration_only(is_first_iteration) - } - - /// Calls `RoundSchedule::start_iteration`, mutating canister priorities and - /// returning the iteration schedule. - fn start_iteration_only(&mut self, is_first_iteration: bool) -> IterationSchedule { - let iteration = self - .round_schedule - .start_iteration(&mut self.state, is_first_iteration); - // `IterationSchedule` sanity checks. assert_eq!( iteration.scheduler_cores, @@ -415,24 +392,13 @@ impl RoundScheduleBuilder { self } - fn build( - self, - state: &mut ReplicatedState, - current_round: ExecutionRound, - metrics: &SchedulerMetrics, - logger: &ReplicaLogger, - ) -> RoundSchedule { - RoundSchedule::apply_scheduling_strategy( - state, + fn build(self) -> RoundSchedule { + RoundSchedule::new( self.cores, self.heap_delta_rate_limit, FlagStatus::Enabled, self.install_code_rate_limit, FlagStatus::Enabled, - current_round, - ExecutionRound::new(u64::MAX / 2), - metrics, - logger, ) } } @@ -622,17 +588,14 @@ fn start_iteration_long_executions_first_cores() { /// `fully_executed_canisters`. #[test] fn start_iteration_first_iteration_fully_executed() { - let mut fixture = RoundScheduleFixture::new(); + let round_schedule = RoundScheduleBuilder::new().with_cores(2).build(); + let mut fixture = RoundScheduleFixture::with_round_schedule(round_schedule); let first_long = fixture.canister_with_long_execution(); let first_new = fixture.canister_with_input(); let second_long = fixture.canister_with_long_execution(); let second_new = fixture.canister_with_input(); - fixture.start_round( - ExecutionRound::new(1), - RoundScheduleBuilder::new().with_cores(2), - ); - fixture.start_iteration_only(true); + fixture.start_iteration(true); assert!(fixture.fully_executed_canisters().contains(&first_long)); assert!(fixture.fully_executed_canisters().contains(&first_new)); @@ -698,9 +661,7 @@ fn start_iteration_later_iteration_exclude_completed_long() { fixture.remove_long_execution(canister_a); fixture.push_input(canister_a); - let iter2 = fixture - .round_schedule - .start_iteration(&mut fixture.state, false); + let iter2 = fixture.start_iteration(false); assert!( !iter2.schedule.contains(&canister_a), @@ -712,17 +673,16 @@ fn start_iteration_later_iteration_exclude_completed_long() { #[test] fn start_iteration_with_heap_delta_rate_limit() { let limit = ic_base_types::NumBytes::new(1000); - let mut fixture = RoundScheduleFixture::new(); + let mut fixture = RoundScheduleFixture::with_round_schedule( + RoundScheduleBuilder::new() + .with_heap_delta_rate_limit(limit) + .build(), + ); let canister_id = fixture.canister_with_input(); fixture.add_heap_delta_debit(canister_id, limit); for is_first_iteration in [true, false] { - fixture.start_round( - ExecutionRound::new(1), - RoundScheduleBuilder::new().with_heap_delta_rate_limit(limit), - ); - - let iteration = fixture.start_iteration_only(is_first_iteration); + let iteration = fixture.start_iteration(is_first_iteration); // Canister is rate-limited so not in the iteration schedule. assert!(iteration.is_empty()); @@ -734,17 +694,16 @@ fn start_iteration_with_heap_delta_rate_limit() { #[test] fn start_iteration_with_install_code_rate_limit() { let limit = NumInstructions::new(1000); - let mut fixture = RoundScheduleFixture::new(); + let round_schedule = RoundScheduleBuilder::new() + .with_install_code_rate_limit(limit) + .build(); + let mut fixture = RoundScheduleFixture::with_round_schedule(round_schedule); + let canister_id = fixture.canister_with_input(); fixture.add_install_code_debit(canister_id, limit); for is_first_iteration in [true, false] { - fixture.start_round( - ExecutionRound::new(1), - RoundScheduleBuilder::new().with_install_code_rate_limit(limit), - ); - - let iteration = fixture.start_iteration_only(is_first_iteration); + let iteration = fixture.start_iteration(is_first_iteration); // Canister is rate-limited so not in the iteration schedule. assert!(iteration.is_empty()); @@ -758,18 +717,22 @@ fn start_iteration_with_install_code_rate_limit() { /// a full round). Later iterations in the same round must not charge it again. #[test] fn start_iteration_first_iteration_charges_rate_limited_canisters() { - let mut fixture = RoundScheduleFixture::new(); + let heap_delta_limit = ic_base_types::NumBytes::new(1000); + let install_code_limit = NumInstructions::new(2000); + let round_schedule = RoundScheduleBuilder::new() + .with_heap_delta_rate_limit(heap_delta_limit) + .with_install_code_rate_limit(install_code_limit) + .build(); + let mut fixture = RoundScheduleFixture::with_round_schedule(round_schedule); // A heap delta rate-limited canister with a non-trivial starting AP, so we can // verify the exact 100% decrement. - let heap_delta_limit = ic_base_types::NumBytes::new(1000); let heap_delta = fixture.canister_with_input(); fixture.add_heap_delta_debit(heap_delta, heap_delta_limit); *fixture.canister_priority_mut(heap_delta) = priority(40); // An install code rate-limited canister with a non-trivial starting AP, so we // can verify the exact 100% decrement. - let install_code_limit = NumInstructions::new(2000); let install_code = fixture.canister_with_input(); fixture.add_install_code_debit(install_code, install_code_limit); *fixture.canister_priority_mut(install_code) = priority(50); @@ -778,15 +741,8 @@ fn start_iteration_first_iteration_charges_rate_limited_canisters() { let control = fixture.canister_with_input(); *fixture.canister_priority_mut(control) = priority(60); - fixture.start_round( - ExecutionRound::new(1), - RoundScheduleBuilder::new() - .with_heap_delta_rate_limit(heap_delta_limit) - .with_install_code_rate_limit(install_code_limit), - ); - for is_first_iteration in [true, false] { - fixture.start_iteration_only(is_first_iteration); + fixture.start_iteration(is_first_iteration); assert!(fixture.rate_limited_canisters().contains(&heap_delta)); assert!(fixture.rate_limited_canisters().contains(&install_code)); @@ -810,21 +766,44 @@ fn start_iteration_first_iteration_charges_rate_limited_canisters() { } } +/// start_iteration drops idle canisters (no inputs or heap delta / install +/// code debits) with 0-100 accumulated priority from the subnet schedule. +#[test] +fn start_iteration_idle_between_0_and_100_dropped_from_schedule() { + let mut fixture = RoundScheduleFixture::new(); + + // Simulate a bunch of idle canisters with varying AP. + let ap_minus_1 = fixture.canister(); + *fixture.canister_priority_mut(ap_minus_1) = priority(-1); + let ap_0 = fixture.canister(); + *fixture.canister_priority_mut(ap_0) = priority(0); + let ap_99 = fixture.canister(); + *fixture.canister_priority_mut(ap_99) = priority(99); + let ap_101 = fixture.canister(); + *fixture.canister_priority_mut(ap_101) = priority(101); + + fixture.start_iteration(true); + + // `start_iteration()` drops idle canisters with 0-100 AP. + assert!(!fixture.has_canister_priority(&ap_0)); + assert!(!fixture.has_canister_priority(&ap_99)); + // But retains canisters with AP < 0 and AP > 100. + assert_eq!(fixture.canister_priority(&ap_minus_1), &priority(-1)); + assert_eq!(fixture.canister_priority(&ap_101), &priority(101)); +} + #[test] #[should_panic] fn start_iteration_scheduler_compute_allocation_invariant_broken() { - let mut fixture = RoundScheduleFixture::new(); + let round_schedule = RoundScheduleBuilder::new().with_cores(2).build(); + let mut fixture = RoundScheduleFixture::with_round_schedule(round_schedule); let canister_id = fixture.canister_with_input(); fixture .canister_state(&canister_id) .system_state .compute_allocation = ComputeAllocation::try_from(100).unwrap(); - fixture.start_round( - ExecutionRound::new(1), - RoundScheduleBuilder::new().with_cores(2), - ); - let iteration = fixture.start_iteration_only(true); + let iteration = fixture.start_iteration(true); // Without debug_assertions, the canister would be scheduled normally. assert_eq!(iteration.schedule.as_slice(), &[canister_id]); @@ -881,8 +860,7 @@ fn end_iteration_sets_long_execution_start_round() { let mut fixture = RoundScheduleFixture::new(); let canister_id = fixture.canister_with_input(); - fixture.start_round(ExecutionRound::new(1), RoundScheduleBuilder::new()); - fixture.start_iteration_only(true); + fixture.start_iteration(true); // Replace the input with a long execution. fixture.pop_input(canister_id); @@ -1034,7 +1012,8 @@ fn advance_long_execution_preserves_long_execution_start_round() { /// finish_round). #[test] fn finish_round_fully_executed_get_executed_rounds_bumped() { - let mut fixture = RoundScheduleFixture::new(); + let round_schedule = RoundScheduleBuilder::new().with_cores(2).build(); + let mut fixture = RoundScheduleFixture::with_round_schedule(round_schedule); let long = fixture.canister_with_long_execution(); let new = fixture.canister_with_input(); @@ -1045,10 +1024,8 @@ fn finish_round_fully_executed_get_executed_rounds_bumped() { const NEW_AP: AccumulatedPriority = AccumulatedPriority::new(110 * MULTIPLIER); fixture.canister_priority_mut(new).accumulated_priority = NEW_AP; - let current_round = ExecutionRound::new(1); - fixture.start_round(current_round, RoundScheduleBuilder::new().with_cores(2)); - - fixture.start_iteration_only(true); + fixture.current_round = ExecutionRound::new(1); + fixture.start_iteration(true); fixture.end_iteration(&btreeset! {long, new}, &btreeset! {new}, &btreeset! {}); assert_eq!(fixture.fully_executed_canisters(), &btreeset! {long, new}); @@ -1058,7 +1035,10 @@ fn finish_round_fully_executed_get_executed_rounds_bumped() { let long_priority = fixture.canister_priority(&long); assert_eq!(long_priority.accumulated_priority, LONG_AP); assert_eq!(long_priority.executed_rounds, 2); - assert_eq!(long_priority.last_full_execution_round, current_round); + assert_eq!( + long_priority.last_full_execution_round, + fixture.current_round + ); // New execution canister was charged 100 AP. let new_priority = fixture.canister_priority(&new); @@ -1067,14 +1047,18 @@ fn finish_round_fully_executed_get_executed_rounds_bumped() { NEW_AP - ONE_HUNDRED_PERCENT ); assert_eq!(new_priority.executed_rounds, 0); - assert_eq!(new_priority.last_full_execution_round, current_round); + assert_eq!( + new_priority.last_full_execution_round, + fixture.current_round + ); } /// finish_round grants scheduled canisters their compute allocation and /// calls observe_round_scheduled() on metrics. #[test] fn finish_round_scheduled_get_compute_allocation_and_metrics() { - let mut fixture = RoundScheduleFixture::new(); + let round_schedule = RoundScheduleBuilder::new().with_cores(2).build(); + let mut fixture = RoundScheduleFixture::with_round_schedule(round_schedule); // Add three canisters so the one we check is not in the first two (not fully // executed), so it does not get executed_rounds bumped, which would reduce its // accumulated priority. @@ -1090,10 +1074,7 @@ fn finish_round_scheduled_get_compute_allocation_and_metrics() { let canister_b = canister_with_compute_allocation(20); let canister_c = canister_with_compute_allocation(10); - let current_round = ExecutionRound::new(1); - fixture.start_round(current_round, RoundScheduleBuilder::new().with_cores(2)); - - fixture.start_iteration_only(true); + fixture.start_iteration(true); let all = btreeset! {canister_a, canister_b, canister_c}; fixture.end_iteration(&all, &all, &btreeset! {}); assert_eq!( @@ -1157,7 +1138,7 @@ fn finish_round_charge_first_slice_of_new_long_execution() { .canister_priority_mut(canister_id) .accumulated_priority = INITIAL_AP; - fixture.start_iteration_only(true); + fixture.start_iteration(true); // Consume the input, replacing it with a long execution. fixture.pop_input(canister_id); @@ -1209,7 +1190,7 @@ fn finish_round_in_flight_long_execution_no_charge() { const INITIAL_AP: AccumulatedPriority = AccumulatedPriority::new(50 * MULTIPLIER); priority.accumulated_priority = INITIAL_AP; - fixture.start_iteration_only(true); + fixture.start_iteration(true); fixture.end_iteration(&btreeset! {canister_id}, &btreeset! {}, &btreeset! {}); fixture.finish_round(); @@ -1258,6 +1239,39 @@ fn finish_round_charges_for_executed_rounds() { ); } +/// finish_round burns down to zero the positive AP of idle canisters (no inputs +/// or heap delta / install code debits). +#[test] +fn finish_round_burns_down_idle_canister_accumulated_priority() { + let mut fixture = RoundScheduleFixture::new(); + fixture.start_iteration(true); + fixture.end_iteration(&btreeset! {}, &btreeset! {}, &btreeset! {}); + + // A bunch of idle canisters with varying AP. + let ap_minus_1 = fixture.canister(); + *fixture.canister_priority_mut(ap_minus_1) = priority(-1); + let ap_0 = fixture.canister(); + *fixture.canister_priority_mut(ap_0) = priority(0); + let ap_99 = fixture.canister(); + *fixture.canister_priority_mut(ap_99) = priority(99); + let ap_101 = fixture.canister(); + *fixture.canister_priority_mut(ap_101) = priority(101); + + fixture.finish_round(); + + // Nothing changes for canisters with AP <= 0. + assert_eq!(fixture.canister_priority(&ap_minus_1), &priority(-1)); + assert!( + fixture.has_canister_priority(&ap_0) && fixture.canister_priority(&ap_0) == &priority(0) + ); + // Canisters with 0 < AP < 100 have their AP burned down to 0. + assert!( + fixture.has_canister_priority(&ap_99) && fixture.canister_priority(&ap_99) == &priority(0) + ); + // Canisters with AP >= 100 have 100 AP burned. + assert_eq!(fixture.canister_priority(&ap_101), &priority(1)); +} + /// finish_round applies an exponential decay to AP values outside the /// `[AP_ROUNDS_MIN, AP_ROUNDS_MAX]` window, in both directions; values inside /// the window are left untouched. @@ -1292,19 +1306,27 @@ fn finish_round_exponential_decay() { let mut fixture = RoundScheduleFixture::new(); // A canister below AP_MIN. - let below_min_canister = fixture.canister(); + let below_min_canister = fixture.canister_with_input(); *fixture.canister_priority_mut(below_min_canister) = priority(LOW_AP_PERCENT); // 4 canisters above AP_MAX. With only one, the post-decay sum would be negative // (HIGH_DECAYED_PERCENT < |LOW_DECAYED_PERCENT|) and the free-compute // distribution would adjust the AP of `below_min_canister`. let above_max_canisters: Vec<_> = (0..4) .map(|_| { - let id = fixture.canister(); + let id = fixture.canister_with_input(); *fixture.canister_priority_mut(id) = priority(HIGH_AP_PERCENT); id }) .collect(); + // Record all canisters as scheduled, so they don't get treated as idle (and + // have 100 AP burned down). + fixture.round_schedule.scheduled_canisters = above_max_canisters.iter().cloned().collect(); + fixture + .round_schedule + .scheduled_canisters + .insert(below_min_canister); + fixture.finish_round(); for canister_id in &above_max_canisters { @@ -1340,6 +1362,9 @@ fn check_finish_round_free_compute_grants(executed_rounds: i64, expected_aps: [i fixture.set_compute_allocation(id, COMPUTE_ALLOCATIONS[i]); // Add the canister to the subnet schedule with default (0) priority. fixture.canister_priority_mut(id); + // Mark the canister as scheduled, so it doesn't get treated as idle (and has + // 100 AP burned down). + fixture.round_schedule.scheduled_canisters.insert(id); id }); @@ -1454,22 +1479,25 @@ fn finish_round_grant_heap_delta_and_install_code_credits() { .get_sample_sum(), 200.0 ); + + // The next round / iteration drops `canister_b` from the schedule. + fixture.start_iteration(true); + assert!(fixture.has_canister_priority(&canister_a)); + assert!(!fixture.has_canister_priority(&canister_b)); } /// After `finish_round`, a canister with a pending heartbeat task receives /// the same accumulated priority as a canister with a pending input. #[test] fn finish_round_heartbeat_treated_same_as_input() { - let mut fixture = RoundScheduleFixture::new(); - let current_round = ExecutionRound::new(1); + let round_schedule = RoundScheduleBuilder::new().with_cores(2).build(); + let mut fixture = RoundScheduleFixture::with_round_schedule(round_schedule); let canister_a = fixture.canister_with_input(); let canister_b = fixture.canister_with_input(); fixture.set_heartbeat_export(canister_b); - fixture.start_round(current_round, RoundScheduleBuilder::new().with_cores(2)); - - fixture.start_iteration_only(true); + fixture.start_iteration(true); fixture.pop_input(canister_a); fixture.pop_input(canister_b); fixture.end_iteration( @@ -1767,6 +1795,10 @@ prop_compose! { /// /// Only does one iteration per round. And one message or slice per canister. /// But this is sufficient to approximate the full range of scheduler behavior. +/// +/// `debug_canister_idx` is a mechanism that enables verbose debugging output +/// for a specific canister and for the scheduler state as a whole every round. +/// Set to `Some(canister_idx)` to enable debug output. fn run_multi_round_simulation( scheduler_cores: usize, num_rounds: usize, @@ -1827,16 +1859,14 @@ fn run_multi_round_simulation( } // --- Start round --- - let current_round = ExecutionRound::new(round as u64); - fixture.start_round( - current_round, - RoundScheduleBuilder::new() - .with_cores(scheduler_cores) - .with_heap_delta_rate_limit(HEAP_DELTA_RATE_LIMIT), - ); + fixture.round_schedule = RoundScheduleBuilder::new() + .with_cores(scheduler_cores) + .with_heap_delta_rate_limit(HEAP_DELTA_RATE_LIMIT) + .build(); + fixture.current_round = ExecutionRound::new(round as u64); // --- start_iteration --- - let iteration = fixture.start_iteration_only(true); + let iteration = fixture.start_iteration(true); // --- Simulate core assignment and execution --- let core_schedules = fixture.partition_to_cores(&iteration); @@ -1959,6 +1989,24 @@ fn run_multi_round_simulation( } } + if debug_canister_idx.is_some() { + println!( + "executed rounds: {:?}", + sims.iter().map(|s| s.full_rounds).collect::>() + ); + println!( + "canister priorities: {:?}", + sims.iter() + .map(|sim| fixture + .state + .canister_priority(&sim.canister_id) + .accumulated_priority + .get() + / MULTIPLIER) + .collect::>() + ); + } + (fixture.state, sims) } @@ -1969,23 +2017,8 @@ fn assert_multi_round_invariants( scheduler_cores: usize, num_rounds: usize, total_compute_allocation: usize, - expected_efficiency_percent: usize, + expected_long_execution_efficiency_percent: usize, ) -> Result<(), TestCaseError> { - println!( - "executed rounds: {:?}", - sims.iter().map(|s| s.full_rounds).collect::>() - ); - println!( - "canister priorities: {:?}", - sims.iter() - .map(|sim| state - .canister_priority(&sim.canister_id) - .accumulated_priority - .get() - / MULTIPLIER) - .collect::>() - ); - // The sum of accumulated priorities should be zero (or slightly positive, in // case e.g. we just distributed compute allocation after not having executed // anything this round). @@ -2039,7 +2072,11 @@ fn assert_multi_round_invariants( .archetype .expected_full_rounds(num_rounds, free_compute_per_canister); - expected_rounds = expected_rounds * expected_efficiency_percent / 100; + if sim.archetype.has_long_execution { + expected_rounds = + expected_rounds * expected_long_execution_efficiency_percent / 100; + } + prop_assert!( executed_rounds + credit_rounds + 1 >= expected_rounds, "canister {i}: executed_rounds {executed_rounds} + credit_rounds {credit_rounds} < expected_rounds {expected_rounds}" @@ -2067,18 +2104,19 @@ fn multi_round_priority_invariants( let (state, sims) = run_multi_round_simulation(scheduler_cores, num_rounds, &archetype_configs, None); - // Expect 80%+ efficient scheduling with a mix of long and short executions. + // Expect 90%+ efficient scheduling of long executions. // - // This is due to idle canisters consuming free compute on rounds where not all - // cores are busy, reducing free compute available to active canisters. - let expected_efficiency_percent = 80; + // Because long executions are prioritized for throughput (in-progress ones get + // prioritized over new ones); and due to AP exponential decay; long executions + // are scheduled less efficiently. + let expected_long_execution_efficiency_percent = 90; assert_multi_round_invariants( &state, &sims, scheduler_cores, num_rounds, total_compute_allocation, - expected_efficiency_percent, + expected_long_execution_efficiency_percent, )?; } @@ -2114,14 +2152,14 @@ fn multi_round_all_active_short_executions( let (state, sims) = run_multi_round_simulation(scheduler_cores, num_rounds, &archetype_configs, None); - // Expect 100% efficient scheduling with short executions only. - let expected_efficiency_percent = 100; + // No-op, we already expect 100% efficient scheduling of short executions. + let expected_long_execution_efficiency_percent = 100; assert_multi_round_invariants( &state, &sims, scheduler_cores, num_rounds, total_compute_allocation as usize, - expected_efficiency_percent, + expected_long_execution_efficiency_percent, )?; } diff --git a/rs/execution_environment/src/scheduler/scheduler_metrics.rs b/rs/execution_environment/src/scheduler/scheduler_metrics.rs index 0f506302752e..bcd0095fa24a 100644 --- a/rs/execution_environment/src/scheduler/scheduler_metrics.rs +++ b/rs/execution_environment/src/scheduler/scheduler_metrics.rs @@ -8,9 +8,7 @@ use ic_replicated_state::metrics::{ }; use ic_types::ingress::{IngressState, IngressStatus}; use ic_types::{PrincipalId, Time}; -use prometheus::{ - Gauge, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge, -}; +use prometheus::{Gauge, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec}; use std::collections::BTreeMap; pub(crate) const CANISTER_INVARIANT_BROKEN: &str = "scheduler_canister_invariant_broken"; @@ -42,7 +40,6 @@ pub struct SchedulerMetrics { pub(super) round_postponed_raw_rand_queue: ScopedMetrics, pub(super) round_inner_subnet_queue: ScopedMetrics, pub(super) round_advance_long_install_code: ScopedMetrics, - pub(super) round_scheduling_duration: Histogram, pub(super) round_update_signature_request_contexts_duration: Histogram, pub(super) round_inner: ScopedMetrics, pub(super) round_inner_heartbeat_overhead_duration: Histogram, @@ -65,7 +62,6 @@ pub struct SchedulerMetrics { pub(super) subnet_memory_usage_invariant: IntCounter, pub(super) scheduler_compute_allocation_invariant_broken: IntCounter, pub(super) scheduler_cores_invariant_broken: IntCounter, - pub(super) scheduler_accumulated_priority_invariant: IntGauge, pub(super) scheduler_accumulated_priority_deviation: Gauge, pub(super) inducted_messages: IntCounterVec, pub(super) delivered_pre_signatures: HistogramVec, @@ -91,7 +87,7 @@ impl SchedulerMetrics { ), compute_utilization_per_core: metrics_registry.histogram( "scheduler_compute_utilization_per_core", - "The Internet Computer's compute utilization as a percent per cpu core.", + "The Internet Computer's compute utilization per CPU core as a 0.0-1.0 fraction.", linear_buckets(0.0, 0.05, 21), ), msg_execution_duration: duration_histogram( @@ -220,7 +216,6 @@ impl SchedulerMetrics { slices: round_phase_slices_histogram("long install", metrics_registry), messages: round_phase_messages_histogram("long install", metrics_registry), }, - round_scheduling_duration: round_phase_duration_histogram("scheduling", metrics_registry), round_update_signature_request_contexts_duration: round_phase_duration_histogram("threshold sign", metrics_registry), // `inner_round()` processing. round_inner: ScopedMetrics { @@ -316,10 +311,6 @@ impl SchedulerMetrics { subnet_memory_usage_invariant: metrics_registry.error_counter(SUBNET_MEMORY_USAGE_INVARIANT_BROKEN), scheduler_compute_allocation_invariant_broken: metrics_registry.error_counter(SCHEDULER_COMPUTE_ALLOCATION_INVARIANT_BROKEN), scheduler_cores_invariant_broken: metrics_registry.error_counter(SCHEDULER_CORES_INVARIANT_BROKEN), - scheduler_accumulated_priority_invariant: metrics_registry.int_gauge( - "scheduler_accumulated_priority_invariant", - "The sum of all accumulated priorities on the subnet." - ), scheduler_accumulated_priority_deviation: metrics_registry.gauge( "scheduler_accumulated_priority_deviation", "The standard deviation of accumulated priorities on the subnet." diff --git a/rs/execution_environment/src/scheduler/tests/limits.rs b/rs/execution_environment/src/scheduler/tests/limits.rs index 4a5d11207ba9..d8f7bce5b10f 100644 --- a/rs/execution_environment/src/scheduler/tests/limits.rs +++ b/rs/execution_environment/src/scheduler/tests/limits.rs @@ -271,7 +271,7 @@ fn dont_execute_any_canisters_if_not_enough_instructions_in_round() { let system_state = &canister_state.system_state; assert_eq!(system_state.queues().ingress_queue_size(), 1); assert!(!test.was_fully_executed(canister_state.canister_id())); - assert_eq!(system_state.canister_metrics().rounds_scheduled(), 1); + assert_eq!(system_state.canister_metrics().rounds_scheduled(), 0); assert_eq!(system_state.canister_metrics().executed(), 0); assert_eq!( system_state diff --git a/rs/execution_environment/src/scheduler/tests/metrics.rs b/rs/execution_environment/src/scheduler/tests/metrics.rs index c5dfcfcac750..cc2227ff80f7 100644 --- a/rs/execution_environment/src/scheduler/tests/metrics.rs +++ b/rs/execution_environment/src/scheduler/tests/metrics.rs @@ -188,7 +188,6 @@ fn can_record_metrics_for_a_round() { assert_eq!(metrics.canister_age.get_sample_sum() as i64, 0); assert_eq!(metrics.round_preparation_duration.get_sample_count(), 1); assert_eq!(metrics.round_preparation_ingress.get_sample_count(), 1); - assert_eq!(metrics.round_scheduling_duration.get_sample_count(), 1); assert_eq!(metrics.round_finalization_scheduling.get_sample_count(), 1); assert_eq!( metrics.round_inner_iteration_scheduling.get_sample_count(), diff --git a/rs/execution_environment/src/scheduler/tests/scheduling.rs b/rs/execution_environment/src/scheduler/tests/scheduling.rs index e871b3507bbd..ef765a3762e7 100644 --- a/rs/execution_environment/src/scheduler/tests/scheduling.rs +++ b/rs/execution_environment/src/scheduler/tests/scheduling.rs @@ -4,11 +4,9 @@ use super::test_utilities::{ SchedulerTest, SchedulerTestBuilder, ingress, instructions, on_response, other_side, }; use super::*; -use ic_config::subnet_config::{SchedulerConfig, SubnetConfig}; -use ic_registry_subnet_type::SubnetType; +use ic_config::subnet_config::SchedulerConfig; use ic_replicated_state::testing::CanisterQueuesTesting; use ic_types::ComputeAllocation; -use ic_types::ingress::IngressStatus; use ic_types::methods::SystemMethod; use ic_types_cycles::Cycles; use more_asserts::{assert_ge, assert_gt, assert_le}; @@ -180,10 +178,10 @@ fn execute_idle_and_canisters_with_messages() { test.execute_round(ExecutionRoundType::OrdinaryRound); - // We update `last_full_execution_round` for the canister without any + // We do not update `last_full_execution_round` for the canister without any // input messages. - assert!(test.was_fully_executed(idle)); - // But not its counts of rounds scheduled or executed. + assert!(!test.was_fully_executed(idle)); + // Nor its counts of rounds scheduled or executed. let idle = test.canister_state(idle); assert_eq!(idle.system_state.canister_metrics().rounds_scheduled(), 0); assert_eq!(idle.system_state.canister_metrics().executed(), 0); @@ -732,7 +730,7 @@ fn scheduler_respects_compute_allocation( let ( mut test, scheduler_cores, - _messages_per_canister, + messages_per_canister, _instructions_per_round, _instructions_per_message, ) = test; @@ -745,10 +743,10 @@ fn scheduler_respects_compute_allocation( // to be executed by a thread. let mut scheduled_first_counters = HashMap::::new(); - // Because we may be left with as little free compute capacity as 1, run for + // Because we may be left with as little free compute capacity as 100, run for // enough rounds that every canister gets a chance to be scheduled at least once - // for free, i.e. `100 * number_of_canisters` rounds. - let number_of_rounds = 100 * number_of_canisters; + // for free, i.e. `number_of_canisters` rounds. + let number_of_rounds = number_of_canisters; let canister_ids: Vec<_> = test.state().canister_states().keys().cloned().collect(); @@ -781,7 +779,7 @@ fn scheduler_respects_compute_allocation( }; prop_assert!( - *count >= expected_count, + *count >= std::cmp::min(expected_count, messages_per_canister), "Canister {} (allocation {}) should have been scheduled \ {} out of {} rounds, was scheduled only {} rounds instead.", canister_id, @@ -793,106 +791,6 @@ fn scheduler_respects_compute_allocation( } } -#[test] -fn scheduler_resets_accumulated_priorities() { - /// Create `scheduler_cores * 2` canisters with 2 messages each and execute 2 rounds. - /// Return number of executed second ingress messages. - fn executed_messages_after_two_rounds(scheduler_cores: usize, reset_interval: u64) -> usize { - /// Count the number of executed ingress messages. - fn executed_messages(test: &SchedulerTest, ingress_ids: &[MessageId]) -> usize { - ingress_ids - .iter() - .filter_map(|ingress_id| match test.ingress_status(ingress_id) { - IngressStatus::Known { - // There is no response, so messages are in the failed state - state: IngressState::Failed(_), - .. - } => Some(()), - _ => None, - }) - .count() - } - - // There must twice more canisters than the scheduler cores - let num_canisters = scheduler_cores * 2; - - let subnet_config = SubnetConfig::new(SubnetType::Application); - let mut test = SchedulerTestBuilder::new() - .with_scheduler_config(SchedulerConfig { - scheduler_cores, - // Increase the overhead to execute just one message per round per core - instruction_overhead_per_execution: subnet_config - .scheduler_config - .max_instructions_per_round, - // Reset accumulated priority every second round - accumulated_priority_reset_interval: reset_interval.into(), - ..subnet_config.scheduler_config - }) - .build(); - - // Create canisters with 2 messages each - let mut canister_ids = Vec::with_capacity(num_canisters); - let mut first_ingress_ids = Vec::with_capacity(num_canisters); - let mut second_ingress_ids = Vec::with_capacity(num_canisters); - for _ in 0..num_canisters { - let canister_id = test.create_canister(); - canister_ids.push(canister_id); - first_ingress_ids.push(test.send_ingress(canister_id, ingress(5))); - second_ingress_ids.push(test.send_ingress(canister_id, ingress(5))); - } - - // Execute the first round. Only first `scheduler_cores` messages - // must be executed (marked as `E`): - // Canister ID: 0 1 2 3 (scheduler_cores * 2) - // 1st message states: E E . . - // 2nd message states: . . . . - test.execute_round(ExecutionRoundType::OrdinaryRound); - // After the first round, only the first `scheduler_cores` messages will be executed - assert_eq!( - scheduler_cores, - executed_messages(&test, &first_ingress_ids) - ); - assert_eq!(0, executed_messages(&test, &second_ingress_ids)); - - // Execute the second round - test.execute_round(ExecutionRoundType::OrdinaryRound); - // Return number of executed second ingress messages - executed_messages(&test, &second_ingress_ids) - } - - // Note: the DTS scheduler requires at least 2 scheduler cores - let scheduler_cores = 2; - - // When there is no reset round, canisters with the same compute allocation - // get scheduled fairly, one by one: - // - // 1. After the first round, some two canisters will be executed. - // 2. After the second round, the other two canisters will be executed. - // - // After two rounds, all canisters will be executed once (marked as `E`): - // - // Canister ID: 0 1 2 3 (scheduler_cores * 2) - // 1st message states: E E E E - // 2nd message states: . . . . <-- num_executed_second_messages == 0 - let num_executed_second_messages = executed_messages_after_two_rounds(scheduler_cores, 100); - assert_eq!(0, num_executed_second_messages); - - // When the accumulated priorities get reset every round, accumulated priority - // becomes irrelevant. Scheduler will be trying to execute every round - // the same two canisters: - // - // 1. After the first round, some two canister will be executed. - // 2. After the second round, the same two canisters will be executed. - // - // After two rounds, two canisters will be executed twice (marked as `E`): - // - // Canister ID: 0 1 2 3 (scheduler_cores * 2) - // 1st message states: E E . . - // 2nd message states: E E . . <-- num_executed_second_messages == scheduler_cores - let num_executed_second_messages = executed_messages_after_two_rounds(scheduler_cores, 1); - assert_eq!(scheduler_cores, num_executed_second_messages); -} - #[test] fn inner_round_first_execution_is_not_a_full_execution() { let scheduler_cores = 2; @@ -1103,67 +1001,6 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: prop_assert_eq!(total_accumulated_priority, 0); } -#[test] -fn charge_idle_canisters_for_full_execution_round() { - let scheduler_cores = 2; - let num_rounds = 100; - let slice = 20; - let mut test = SchedulerTestBuilder::new() - .with_scheduler_config(SchedulerConfig { - scheduler_cores, - max_instructions_per_round: slice.into(), - max_instructions_per_message: slice.into(), - max_instructions_per_slice: slice.into(), - max_instructions_per_install_code_slice: slice.into(), - ..zero_instruction_overhead_config() - }) - .build(); - - // Bump up the round number. - test.execute_round(ExecutionRoundType::OrdinaryRound); - - // Create many idle canisters. - for _ in 0..scheduler_cores * 2 { - test.create_canister(); - } - - // Create many busy canisters. - for _ in 0..scheduler_cores * 2 { - let canister_id = test.create_canister(); - for _ in 0..num_rounds { - test.send_ingress(canister_id, ingress(slice)); - } - } - - for round in 0..num_rounds { - test.execute_round(ExecutionRoundType::OrdinaryRound); - - for canister in test.state().canisters_iter() { - // Assert that we punished all idle canisters, not just top `scheduler_cores`. - if round == 0 && !canister.has_input() { - assert_ne!(test.last_round(), 0.into()); - assert_eq!( - test.state() - .canister_priority(&canister.canister_id()) - .last_full_execution_round, - test.last_round() - ); - } - } - let mut total_accumulated_priority = 0; - for (_, canister_priority) in test.state().metadata.subnet_schedule.iter() { - // Assert there is no divergency in accumulated priorities. - let priority = canister_priority.accumulated_priority; - assert_le!(priority.get(), 100 * MULTIPLIER); - assert_ge!(priority.get(), -100 * MULTIPLIER); - - total_accumulated_priority += canister_priority.accumulated_priority.get(); - } - // The accumulated priority invariant should be respected. - assert_eq!(total_accumulated_priority, 0); - } -} - /// Canisters with inputs but without enough cycles to execute them do get /// categorized as "fully executed" when their inputs are consumed, even though /// they didn't actually execute any message. @@ -1245,9 +1082,9 @@ fn frozen_canisters_are_fully_executed() { } /// Canisters with heartbeats or timers but without enough cycles to execute them -/// do not get executed, but are charged as idle. +/// do not get scheduled. #[test] -fn frozen_canisters_with_heartbeats_or_timers_are_charged_as_idle() { +fn frozen_canisters_with_heartbeats_or_timers_are_not_scheduled() { let scheduler_cores = 2; let canisters_per_core = 2; let slice = 100; @@ -1308,11 +1145,15 @@ fn frozen_canisters_with_heartbeats_or_timers_are_charged_as_idle() { 0 ); - // But all canisters were marked as fully executed, because they were idle. - for (i, canister) in canisters.iter().enumerate() { - assert!( - test.was_fully_executed(*canister), - "Canister {i} should have been charged as idle", + // Or scheduled. + assert_eq!(test.state().metadata.subnet_schedule.len(), 0); + for canister in canisters.iter() { + assert_eq!( + test.canister_state(*canister) + .system_state + .canister_metrics() + .rounds_scheduled(), + 0 ); } } diff --git a/rs/replicated_state/src/canister_state.rs b/rs/replicated_state/src/canister_state.rs index 2ab2be225df1..0769c481b5b3 100644 --- a/rs/replicated_state/src/canister_state.rs +++ b/rs/replicated_state/src/canister_state.rs @@ -275,6 +275,20 @@ impl CanisterState { self.system_state.queues().has_output() } + /// Returns true if the canister must be present in the `SubnetSchedule`, + /// regardless of new inputs or accumulated priority. + /// + /// This is different from "should be scheduled in an iteration", which also + /// considers whether the canister has inputs / tasks or non-zero AP / priority + /// credit. It is strictly about ensuring that canisters are retained in the + /// subnet schedule for as long as they have long-running executions or heap + /// delta or install code debits. + pub fn must_be_in_schedule(&self) -> bool { + self.scheduler_state.heap_delta_debit.get() > 0 + || self.scheduler_state.install_code_debit.get() > 0 + || self.has_long_execution_or_install_code() + } + /// See `SystemState::push_output_request` for documentation. pub fn push_output_request( &mut self, diff --git a/rs/replicated_state/src/metadata_state/subnet_schedule.rs b/rs/replicated_state/src/metadata_state/subnet_schedule.rs index b1d6060223cc..6c6b433d4fd4 100644 --- a/rs/replicated_state/src/metadata_state/subnet_schedule.rs +++ b/rs/replicated_state/src/metadata_state/subnet_schedule.rs @@ -42,6 +42,14 @@ impl CanisterPriority { long_execution_start_round: None, last_full_execution_round: ExecutionRound::new(0), }; + + /// Returns true if the canister has non-zero accumulated priority or is in a + /// long execution. + pub fn is_non_zero(&self) -> bool { + self.accumulated_priority.get() != 0 + || self.executed_rounds != 0 + || self.long_execution_start_round.is_some() + } } impl Default for CanisterPriority { diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index d94c2a04c1b9..ea82e6c99fc3 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -607,11 +607,15 @@ impl ReplicatedState { /// cleaned up. pub fn put_canister_state>>(&mut self, canister_state: CS) { let canister_state = canister_state.into(); - // Also insert a scheduling priority for the canister. This is a temporary - // measure to ensure that every canister has an explicit priority. - self.metadata - .subnet_schedule - .get_mut(canister_state.canister_id()); + + // Add the canister to the subnet schedule if it has install code or heap delta + // debits or is in a long-running execution. + if canister_state.must_be_in_schedule() { + self.metadata + .subnet_schedule + .get_mut(canister_state.canister_id()); + } + self.canister_states .insert(canister_state.canister_id(), canister_state); } @@ -737,12 +741,19 @@ impl ReplicatedState { .collect() } - /// Prunes all canister priorities for which a corresponding canister state no - /// longer exists. + /// Prunes the canister priorities of deleted canisters; and those that have + /// all-zero accumulated priority, priority credit, heap delta and install code + /// debits, and do not have a long-running execution. pub fn garbage_collect_subnet_schedule(&mut self) { self.metadata .subnet_schedule - .retain(|canister_id, _| self.canister_states.contains_key(canister_id)); + .retain(|canister_id, priority| { + self.canister_states + .get(canister_id) + .is_some_and(|canister| { + priority.is_non_zero() || canister.must_be_in_schedule() + }) + }); } pub fn system_metadata(&self) -> &SystemMetadata { diff --git a/rs/replicated_state/tests/replicated_state.rs b/rs/replicated_state/tests/replicated_state.rs index 75400b49c91b..d81c6a96ec77 100644 --- a/rs/replicated_state/tests/replicated_state.rs +++ b/rs/replicated_state/tests/replicated_state.rs @@ -1246,8 +1246,9 @@ fn split() { expected.put_canister_state(canister_state); // And the split marker should be reset. expected.metadata.split_from = None; - // The canister priority for `CANISTER_1` is gone, as it was not persisted. - expected.metadata.subnet_schedule.remove(&CANISTER_1); + // The canister priority for `CANISTER_2` is replaced with the default, as it + // was not persisted. + expected.metadata.subnet_schedule.get_mut(CANISTER_2); // Everything else should be the same as in phase 1. assert_eq!(expected, state_b); } @@ -1352,6 +1353,8 @@ fn online_split() { CanisterCyclesCostSchedule::Normal, ), }); + // Canister must be in the subnet schedule. + fixture.state.canister_priority_mut(canister_id); }; add_aborted_install_code_task(CANISTER_1); add_aborted_install_code_task(CANISTER_2); From 989e14d46bab6ded2c3b64be262f43fd0224e066 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Thu, 7 May 2026 13:03:14 +0000 Subject: [PATCH 2/5] feat: [DSM-106] Observe `ReplicatedState` metrics in a background thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Based on a recent CPU profile of the `alin/DSM-103-active-canister-only-scheduling` branch (i.e. after scheduler optimizations), `ReplicatedStateMetrics::observe()` accounts for 20%-25% of the DSM round duration. This is all read-only work (scanning the `ReplicatedState` and instrumenting stats) so it can be safely done on a background thread, particularly after the `StateManager` has already created a cheap to clone `Arc`. The change moves `ReplicatedStateMetrics::observe()` off the `commit_and_certify()` critical path by introducing a generic `WorkerThread` in `ic-utils-thread` and wiring `StateManagerImpl` to defer observations through it. There's a single‑slot bounded channel (one in‑flight + one queued); excess work is dropped and counted via `state_manager_skipped_state_observations`. --- Cargo.lock | 2 + rs/execution_environment/src/lib.rs | 1 - rs/execution_environment/src/scheduler.rs | 17 +-- .../src/scheduler/test_utilities.rs | 34 ++++- .../src/scheduler/tests/dts.rs | 28 ++-- .../src/scheduler/tests/metrics.rs | 39 ++--- .../src/scheduler/tests/rate_limiting.rs | 2 +- .../tests/canister_logging.rs | 8 +- rs/messaging/src/message_routing.rs | 3 + rs/state_machine_tests/src/lib.rs | 7 + rs/state_manager/src/lib.rs | 48 +++++- rs/state_manager/tests/state_manager.rs | 26 +++- rs/utils/thread/BUILD.bazel | 7 +- rs/utils/thread/Cargo.toml | 4 + rs/utils/thread/src/lib.rs | 1 + rs/utils/thread/src/worker_thread.rs | 143 ++++++++++++++++++ 16 files changed, 301 insertions(+), 69 deletions(-) create mode 100644 rs/utils/thread/src/worker_thread.rs diff --git a/Cargo.lock b/Cargo.lock index 200a5d9c26d5..d3d918c15a24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15555,6 +15555,8 @@ name = "ic-utils-thread" version = "0.9.0" dependencies = [ "crossbeam-channel", + "ic-metrics", + "prometheus", ] [[package]] diff --git a/rs/execution_environment/src/lib.rs b/rs/execution_environment/src/lib.rs index 659e7fa6f5f5..c4e39a0dbc24 100644 --- a/rs/execution_environment/src/lib.rs +++ b/rs/execution_environment/src/lib.rs @@ -171,7 +171,6 @@ impl ExecutionServices { let scheduler = Box::new(SchedulerImpl::new( subnet_config.scheduler_config, config.embedders_config, - own_subnet_id, Arc::clone(&ingress_history_writer) as Arc<_>, Arc::clone(&execution_environment) as Arc<_>, Arc::clone(&cycles_account_manager), diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index b14c8563c6a3..9e985f08aa64 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -33,7 +33,6 @@ use ic_registry_subnet_type::SubnetType; use ic_replicated_state::SubnetSchedule; use ic_replicated_state::canister_state::NextExecution; use ic_replicated_state::canister_state::execution_state::NextScheduledMethod; -use ic_replicated_state::metrics::ReplicatedStateMetrics; use ic_replicated_state::page_map::PageAllocatorFileDescriptor; use ic_replicated_state::{ CanisterState, ExecutionTask, InputQueueType, NetworkTopology, ReplicatedState, @@ -43,7 +42,7 @@ use ic_types::ingress::{IngressState, IngressStatus}; use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage}; use ic_types::{ CanisterId, ComputeAllocation, ExecutionRound, MemoryAllocation, NumBytes, NumInstructions, - NumMessages, NumSlices, Randomness, ReplicaVersion, SubnetId, Time, + NumMessages, NumSlices, Randomness, ReplicaVersion, Time, }; use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles}; use more_asserts::{debug_assert_ge, debug_assert_le, debug_assert_lt}; @@ -141,12 +140,10 @@ impl SchedulerRoundLimits { pub(crate) struct SchedulerImpl { config: SchedulerConfig, hypervisor_config: HypervisorConfig, - own_subnet_id: SubnetId, ingress_history_writer: Arc>, exec_env: Arc, cycles_account_manager: Arc, metrics: Arc, - state_metrics: ReplicatedStateMetrics, log: ReplicaLogger, thread_pool: RefCell, rate_limiting_of_heap_delta: FlagStatus, @@ -159,7 +156,6 @@ impl SchedulerImpl { pub(crate) fn new( config: SchedulerConfig, hypervisor_config: HypervisorConfig, - own_subnet_id: SubnetId, ingress_history_writer: Arc>, exec_env: Arc, cycles_account_manager: Arc, @@ -174,12 +170,10 @@ impl SchedulerImpl { config, hypervisor_config, thread_pool: RefCell::new(scoped_threadpool::Pool::new(scheduler_cores)), - own_subnet_id, ingress_history_writer, exec_env, cycles_account_manager, metrics: Arc::new(SchedulerMetrics::new(metrics_registry)), - state_metrics: ReplicatedStateMetrics::new(metrics_registry), log, rate_limiting_of_heap_delta, rate_limiting_of_instructions, @@ -1096,17 +1090,12 @@ impl SchedulerImpl { } } - self.state_metrics.observe( - self.own_subnet_id, - state, - current_round.get().into(), - logger, - ); - self.check_invariants(state, current_round_type, current_round, logger); } /// Checks the DTS and subnet memory usage invariants at the end of the round. + // + // TODO(DSM-103): Move into ReplicatedStateMetrics. fn check_invariants( &self, state: &ReplicatedState, diff --git a/rs/execution_environment/src/scheduler/test_utilities.rs b/rs/execution_environment/src/scheduler/test_utilities.rs index 78788a0fd1b4..f2e01fbc758f 100644 --- a/rs/execution_environment/src/scheduler/test_utilities.rs +++ b/rs/execution_environment/src/scheduler/test_utilities.rs @@ -42,6 +42,7 @@ use ic_replicated_state::{ ReplicatedState, canister_state::execution_state::{self, WasmExecutionMode, WasmMetadata}, metadata_state::testing::NetworkTopologyTesting, + metrics::ReplicatedStateMetrics, num_bytes_try_from, page_map::TestPageAllocatorFileDescriptorImpl, testing::{CanisterQueuesTesting, ReplicatedStateTesting}, @@ -112,25 +113,26 @@ pub(crate) struct SchedulerTest { round_summary: Option, /// The amount of cycles that new canisters have by default. initial_canister_cycles: Cycles, - /// The id of the user that sends ingress messages. + /// The ID of the user that sends ingress messages. user_id: UserId, - /// The id of a canister that is guaranteed to be xnet. + /// The ID of a canister that is guaranteed to be XNet. xnet_canister_id: CanisterId, /// The actual scheduler. scheduler: SchedulerImpl, /// The fake Wasm executor. wasm_executor: Arc, - /// Registry Execution Settings. registry_settings: RegistryExecutionSettings, - /// Metrics Registry. metrics_registry: MetricsRegistry, - /// Chain key subnet public keys. + /// Replicated state metrics, updated by `SchedulerTest` after every round. + /// + /// These used to be updated by the scheduler, but state instrumentation was + /// moved into the State Manager. + state_metrics: ReplicatedStateMetrics, chain_key_subnet_public_keys: BTreeMap, /// Available pre-signatures. idkg_pre_signatures: BTreeMap, /// Version of the running replica, not the registry's Entry replica_version: ReplicaVersion, - /// Hypervisor config. hypervisor_config: HypervisorConfig, } @@ -169,6 +171,10 @@ impl SchedulerTest { &self.metrics_registry } + pub fn state_metrics(&self) -> &ReplicatedStateMetrics { + &self.state_metrics + } + pub fn canister_state_mut(&mut self, canister_id: CanisterId) -> &mut CanisterState { self.state_mut() .canister_state_make_mut(&canister_id) @@ -595,6 +601,18 @@ impl SchedulerTest { round_type, self.registry_settings(), ); + + // Explicitly observe the Replicated State metrics after every round. This used + // to be done by the scheduler, so there are a number of tests depending on + // various state metrics, but instrumentation was moved into the State Manager. + // So we "manually"update the metrics after every round. + self.state_metrics.observe( + state.metadata.own_subnet_id, + &state, + self.round.get().into(), + &self.scheduler.log, + ); + self.state = Some(state); self.check_invariants(); self.increment_round(); @@ -1078,7 +1096,6 @@ impl SchedulerTestBuilder { let scheduler = SchedulerImpl::new( self.scheduler_config, self.hypervisor_config.clone(), - self.own_subnet_id, execution_services.ingress_history_writer, execution_services.execution_environment, execution_services.cycles_account_manager, @@ -1089,6 +1106,8 @@ impl SchedulerTestBuilder { Arc::new(TestPageAllocatorFileDescriptorImpl::new()), ); + let state_metrics = ReplicatedStateMetrics::new(&self.metrics_registry); + SchedulerTest { state: Some(state), next_canister_id: 0, @@ -1101,6 +1120,7 @@ impl SchedulerTestBuilder { wasm_executor, registry_settings, metrics_registry: self.metrics_registry, + state_metrics, chain_key_subnet_public_keys, idkg_pre_signatures: BTreeMap::new(), replica_version: self.replica_version, diff --git a/rs/execution_environment/src/scheduler/tests/dts.rs b/rs/execution_environment/src/scheduler/tests/dts.rs index 24cf12ebb852..e02a7b43d4ad 100644 --- a/rs/execution_environment/src/scheduler/tests/dts.rs +++ b/rs/execution_environment/src/scheduler/tests/dts.rs @@ -39,8 +39,7 @@ fn dts_long_execution_completes() { ErrorCode::CanisterDidNotReply, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 9.0 @@ -124,8 +123,7 @@ fn cannot_execute_management_message_for_targeted_long_execution_canister() { test.execute_round(ExecutionRoundType::OrdinaryRound); assert_eq!(test.state().subnet_queues().input_queues_message_count(), 1); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 4.0 @@ -143,8 +141,7 @@ fn cannot_execute_management_message_for_targeted_long_execution_canister() { ErrorCode::CanisterDidNotReply, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 9.0 @@ -176,8 +173,7 @@ fn dts_long_execution_runs_out_of_instructions() { ErrorCode::CanisterInstructionLimitExceeded, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 9.0 @@ -509,7 +505,7 @@ fn dts_allow_only_one_long_install_code_execution_at_a_time() { 0.0 ); - let state_metrics = &test.scheduler().state_metrics; + let state_metrics = test.state_metrics(); // 2 rounds because the first canister was paused twice. assert_eq!( state_metrics @@ -549,7 +545,7 @@ fn dts_allow_only_one_long_install_code_execution_at_a_time() { ); // 3 slices for the first canister, 1 slice for the second. assert_eq!(metrics.round.slices.get_sample_sum(), 4.0); - let state_metrics = &test.scheduler().state_metrics; + let state_metrics = test.state_metrics(); // Same 2 rounds of paused install code as above. assert_eq!( state_metrics @@ -593,16 +589,14 @@ fn dts_resume_install_code_after_abort() { // After 1 + 9 rounds we had a paused install code. assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_install_code() .get_sample_sum(), 10.0 ); // After the checkpoint round we had an aborted install code. assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_aborted_install_code() .get_sample_sum(), 1.0 @@ -659,15 +653,13 @@ fn dts_resume_long_execution_after_abort() { ErrorCode::CanisterDidNotReply, ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_paused_execution() .get_sample_sum(), 10.0 ); assert_eq!( - test.scheduler() - .state_metrics + test.state_metrics() .canister_aborted_execution() .get_sample_sum(), 1.0 diff --git a/rs/execution_environment/src/scheduler/tests/metrics.rs b/rs/execution_environment/src/scheduler/tests/metrics.rs index cc2227ff80f7..eb264724905d 100644 --- a/rs/execution_environment/src/scheduler/tests/metrics.rs +++ b/rs/execution_environment/src/scheduler/tests/metrics.rs @@ -20,6 +20,7 @@ use ic_management_canister_types_private::{ use ic_registry_routing_table::CanisterIdRange; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::metadata_state::testing::NetworkTopologyTesting; +use ic_replicated_state::metrics::ReplicatedStateMetrics; use ic_replicated_state::testing::SystemStateTesting; use ic_test_utilities_metrics::{ HistogramStats, fetch_counter_vec, fetch_gauge, fetch_gauge_vec, fetch_histogram_stats, @@ -605,7 +606,7 @@ fn long_open_call_context_is_recorded() { test.execute_round(ExecutionRoundType::OrdinaryRound); - let state_metrics = &test.scheduler().state_metrics; + let state_metrics = test.state_metrics(); let gauge = state_metrics .old_open_call_contexts() .get_metric_with_label_values(&["1d"]) @@ -633,8 +634,8 @@ fn threshold_signature_agreements_metric_is_updated() { ]) .build(); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 1.into(), &no_op_logger(), @@ -797,8 +798,8 @@ fn threshold_signature_agreements_metric_is_updated() { test.execute_round(ExecutionRoundType::OrdinaryRound); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 2.into(), &no_op_logger(), @@ -857,8 +858,8 @@ fn consumed_cycles_ecdsa_outcalls_are_added_to_consumed_cycles_total() { let canister_id = test.create_canister(); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -894,8 +895,8 @@ fn consumed_cycles_ecdsa_outcalls_are_added_to_consumed_cycles_total() { .sign_with_ecdsa_contexts(); assert_eq!(sign_with_ecdsa_contexts.len(), 1); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -936,8 +937,8 @@ fn consumed_cycles_http_outcalls_are_added_to_consumed_cycles_total() { test.state_mut().metadata.own_subnet_features.http_requests = true; - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1002,8 +1003,8 @@ fn consumed_cycles_http_outcalls_are_added_to_consumed_cycles_total() { Some(NumBytes::from(response_size_limit)), ); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1130,8 +1131,8 @@ fn consumed_cycles_for_instructions_are_updated_from_valid_canisters() { .system_state .consume_cycles(removed_cycles); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1176,8 +1177,8 @@ fn consumed_cycles_for_resource_allocations_are_updated_from_valid_canisters() { test.advance_time(duration); test.charge_for_resource_allocations(); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), @@ -1260,8 +1261,8 @@ fn consumed_cycles_are_updated_from_deleted_canisters() { ); test.execute_round(ExecutionRoundType::OrdinaryRound); - test.scheduler().state_metrics.observe( - test.scheduler().own_subnet_id, + test.state_metrics().observe( + test.state().metadata.own_subnet_id, test.state(), 0.into(), &no_op_logger(), diff --git a/rs/execution_environment/src/scheduler/tests/rate_limiting.rs b/rs/execution_environment/src/scheduler/tests/rate_limiting.rs index 8b155c016f5c..ba20426eeaed 100644 --- a/rs/execution_environment/src/scheduler/tests/rate_limiting.rs +++ b/rs/execution_environment/src/scheduler/tests/rate_limiting.rs @@ -355,7 +355,7 @@ fn no_heap_delta_rate_limiting_for_system_subnet() { // Assert that we reached the subnet heap delta capacity (140 GiB) in 70 rounds. assert_ge!( - test.scheduler().state_metrics.current_heap_delta(), + test.state_metrics().current_heap_delta(), SUBNET_HEAP_DELTA_CAPACITY ); diff --git a/rs/execution_environment/tests/canister_logging.rs b/rs/execution_environment/tests/canister_logging.rs index 7fd9c7865ad5..93ebe4642c06 100644 --- a/rs/execution_environment/tests/canister_logging.rs +++ b/rs/execution_environment/tests/canister_logging.rs @@ -1718,11 +1718,13 @@ fn test_metric_canister_log_memory_usage_bytes_from_canister_log() { ); // Nothing logged yet — sum is zero. + env.flush_replicated_state_metrics(); let stats = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_eq!(stats.sum, 0.0); // One debug_print — sum reflects the payload plus small record metadata. let _ = env.execute_ingress(canister_id, "test", vec![]); + env.flush_replicated_state_metrics(); let stats = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_le!(METRIC_PAYLOAD_SIZE as f64, stats.sum); assert_le!( @@ -1751,6 +1753,7 @@ fn test_metric_canister_log_memory_usage_bytes_from_log_memory_store() { ); // Every per-round observation equals the default allocated store size. + env.flush_replicated_state_metrics(); let before = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_eq!( before.sum as u64 / before.count, @@ -1760,6 +1763,7 @@ fn test_metric_canister_log_memory_usage_bytes_from_log_memory_store() { // After a debug_print: allocation unchanged; new observations carry the // same value, so the per-observation average remains the default. let _ = env.execute_ingress(canister_id, "test", vec![]); + env.flush_replicated_state_metrics(); let after = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_gt!(after.count, before.count); assert_eq!( @@ -1816,13 +1820,15 @@ fn test_metric_canister_log_retention_seconds() { // Seed the buffer with a first record so retention is non-zero on the // second observation. let _ = env.execute_ingress(canister_id, "test", vec![]); + env.flush_replicated_state_metrics(); let before = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); // Advance simulated time and append another record. env.advance_time(TIME_ADVANCE); let _ = env.execute_ingress(canister_id, "test", vec![]); - + env.flush_replicated_state_metrics(); let after = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); + assert_eq!(after.count, before.count + 1); let sample = after.sum - before.sum; // The new sample should report at least the advanced wall-clock gap. diff --git a/rs/messaging/src/message_routing.rs b/rs/messaging/src/message_routing.rs index 50f2924b3c55..fabcb1120f69 100644 --- a/rs/messaging/src/message_routing.rs +++ b/rs/messaging/src/message_routing.rs @@ -1507,6 +1507,9 @@ impl BatchProcessor for BatchProcessorImpl, + /// Runs expensive `ReplicatedStateMetrics::observe` calls in the background, + /// so that `commit_and_certify` does not have to do it on the critical path. + replicated_state_metrics_thread: WorkerThread, state_layout: StateLayout, /// The main metadata. Different threads will need to access this field. /// @@ -1328,6 +1341,9 @@ impl StateManagerImpl { malicious_flags: MaliciousFlags, ) -> Self { let metrics = StateManagerMetrics::new(metrics_registry, log.clone()); + let replicated_state_metrics = Arc::new(ReplicatedStateMetrics::new(metrics_registry)); + let replicated_state_metrics_thread = + WorkerThread::new("StateMetrics", metrics.skipped_state_observations.clone()); let _timer = metrics .api_call_duration @@ -1624,6 +1640,8 @@ impl StateManagerImpl { Self { log, metrics, + replicated_state_metrics, + replicated_state_metrics_thread, state_layout, states, verifier, @@ -3545,6 +3563,16 @@ impl StateManager for StateManagerImpl { .send(req) .expect("failed to send tip request"); } + + // `ReplicatedStateMetrics::observe()` only updates Prometheus metrics, so defer + // it to a background thread to keep it off the critical path. + let replicated_state_metrics = Arc::clone(&self.replicated_state_metrics); + let own_subnet_id = self.own_subnet_id; + let log = self.log.clone(); + self.replicated_state_metrics_thread + .enqueue(Box::new(move || { + replicated_state_metrics.observe(own_subnet_id, &state, height, &log); + })); } fn report_diverged_checkpoint(&self, height: Height) { @@ -4203,9 +4231,13 @@ pub mod testing { /// Testing only: Purges the `manifest` at `height` in `states.states_metadata`. fn purge_manifest(&mut self, height: Height) -> bool; - /// Testing only: Wait till deallocation queue is empty. + /// Testing only: Wait until deallocation queue is empty. fn flush_deallocation_channel(&self); + /// Testing only: Wait until all enqueued replicated state metrics observations + /// have been processed by the background metrics thread. + fn flush_metrics_channel(&self); + /// Testing only: Returns heights in `states.snapshots`. fn state_snapshot_heights(&self) -> Vec; @@ -4277,6 +4309,10 @@ pub mod testing { self.deallocator_thread.flush_deallocation_channel(); } + fn flush_metrics_channel(&self) { + self.replicated_state_metrics_thread.flush_channel(); + } + fn state_snapshot_heights(&self) -> Vec { let states = self.states.read(); states.snapshots.iter().map(|s| s.height).collect() diff --git a/rs/state_manager/tests/state_manager.rs b/rs/state_manager/tests/state_manager.rs index 14f6a095b3e2..0625879c7a43 100644 --- a/rs/state_manager/tests/state_manager.rs +++ b/rs/state_manager/tests/state_manager.rs @@ -52,7 +52,8 @@ use ic_test_utilities_consensus::fake::{Fake, FakeVerifier}; use ic_test_utilities_io::{make_mutable, make_readonly, write_all_at}; use ic_test_utilities_logger::with_test_replica_logger; use ic_test_utilities_metrics::{ - Labels, fetch_gauge, fetch_histogram_vec_stats, fetch_int_counter_vec, fetch_int_gauge, + HistogramStats, Labels, fetch_gauge, fetch_histogram_stats, fetch_histogram_vec_stats, + fetch_int_counter_vec, fetch_int_gauge, }; use ic_test_utilities_state::{arb_stream, arb_stream_slice, canister_ids}; use ic_test_utilities_tmpdir::tmpdir; @@ -9527,3 +9528,26 @@ fn commit_and_certify_panic_on_delivered_fake_certification() { assert_eq!(no_state_clone_count(metrics), 0); }); } + +#[test] +fn commit_and_certify_observes_replicated_state_metrics() { + state_manager_test(|metrics, sm| { + let state = sm.take_tip().1; + // Sanity check: metrics should be zero before any observations are made. + assert_matches!( + fetch_histogram_stats(metrics, "scheduler_canister_paused_execution"), + Some(HistogramStats { count: 0, sum: 0.0 }) + ); + + // Call `commit_and_certify` and wait for the background thread to complete the + // observation. + sm.commit_and_certify_at_height(state, Height::new(1), CertificationScope::Metadata, None); + sm.flush_metrics_channel(); + + // Metrics have now been updated. + assert_matches!( + fetch_histogram_stats(metrics, "scheduler_canister_paused_execution"), + Some(HistogramStats { count: 1, sum: 0.0 }) + ); + }); +} diff --git a/rs/utils/thread/BUILD.bazel b/rs/utils/thread/BUILD.bazel index 80f459e6e5a9..8c65d0d0e79c 100644 --- a/rs/utils/thread/BUILD.bazel +++ b/rs/utils/thread/BUILD.bazel @@ -5,6 +5,11 @@ package(default_visibility = ["//visibility:public"]) DEPENDENCIES = [ # Keep sorted. "@crate_index//:crossbeam-channel", + "@crate_index//:prometheus", +] + +TEST_DEPENDENCIES = [ + "//rs/monitoring/metrics", ] rust_library( @@ -18,5 +23,5 @@ rust_library( rust_test( name = "thread_test", crate = ":thread", - deps = DEPENDENCIES, + deps = DEPENDENCIES + TEST_DEPENDENCIES, ) diff --git a/rs/utils/thread/Cargo.toml b/rs/utils/thread/Cargo.toml index a2e8a30f36dc..759e7153bd35 100644 --- a/rs/utils/thread/Cargo.toml +++ b/rs/utils/thread/Cargo.toml @@ -10,3 +10,7 @@ documentation.workspace = true [dependencies] crossbeam-channel = { workspace = true } +prometheus = { workspace = true } + +[dev-dependencies] +ic-metrics = { path = "../../monitoring/metrics" } diff --git a/rs/utils/thread/src/lib.rs b/rs/utils/thread/src/lib.rs index 6793d460cf9c..22f7c271da8d 100644 --- a/rs/utils/thread/src/lib.rs +++ b/rs/utils/thread/src/lib.rs @@ -1,6 +1,7 @@ use std::thread; pub mod deallocator_thread; +pub mod worker_thread; /// An object that joins a thread when it's dropped. Mostly helpful to implement /// graceful shutdowns. diff --git a/rs/utils/thread/src/worker_thread.rs b/rs/utils/thread/src/worker_thread.rs new file mode 100644 index 000000000000..f46346ca51ee --- /dev/null +++ b/rs/utils/thread/src/worker_thread.rs @@ -0,0 +1,143 @@ +//! A background thread that runs workloads off the critical path. +//! +//! Backpressure: the worker has a single-slot channel plus the job currently +//! being processed (so at most two workloads pinned in memory at any time). If +//! a new workload arrives while both slots are full, the new workload is +//! dropped and a "skipped" counter is bumped. +//! +//! The struct is `Send + Sync` and shuts down cleanly on `Drop`: dropping the +//! sender closes the channel, the worker drains any enqueued workloads, its +//! `recv()` returns `Err`, and the `JoinOnDrop` handle joins the thread. + +use crate::JoinOnDrop; +use crossbeam_channel::{Sender, TrySendError, bounded}; +use prometheus::IntCounter; + +/// A workload to be executed by the worker thread. +type Workload = Box; + +enum Job { + Workload(Workload), + /// Test-only barrier: notify when the worker has drained all preceding + /// jobs; only used in tests. + Flush(Sender<()>), +} + +/// A worker thread that executes workloads in the background. +pub struct WorkerThread { + sender: Sender, + skipped: IntCounter, + _handle: JoinOnDrop<()>, +} + +impl WorkerThread { + pub fn new(name: &str, skipped: IntCounter) -> Self { + // At most one queued job; combined with the in-flight job that's the + // most state we'll keep alive. + let (sender, receiver) = bounded::(1); + + let handle = JoinOnDrop::new( + std::thread::Builder::new() + .name(name.to_string()) + .spawn(move || { + while let Ok(job) = receiver.recv() { + match job { + Job::Workload(workload) => { + workload(); + } + Job::Flush(notify) => { + // Best-effort notify; ignore if the receiver is + // already gone (e.g. test was aborted). + let _ = notify.send(()); + } + } + } + }) + .expect("failed to spawn worker thread"), + ); + Self { + sender, + skipped, + _handle: handle, + } + } + + /// Enqueues a workload. If the worker is busy and the channel is full, drops + /// the workload and increments the `skipped` counter rather than blocking the + /// caller. + pub fn enqueue(&self, workload: Workload) { + match self.sender.try_send(Job::Workload(workload)) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + self.skipped.inc(); + } + Err(TrySendError::Disconnected(_)) => { + // The worker thread exited; should only happen during shutdown. + } + } + } + + /// Test-only: blocks until all previously-enqueued workloads have been + /// processed. + #[doc(hidden)] + pub fn flush_channel(&self) { + let (notify_send, notify_recv) = bounded(1); + // Use a blocking send: this waits for the in-flight + queued job (if + // any) to drain, then enqueues the flush marker. + if self.sender.send(Job::Flush(notify_send)).is_err() { + // Worker is gone; nothing to flush. + return; + } + notify_recv + .recv() + .expect("worker thread exited while flushing"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + + fn skipped_counter() -> IntCounter { + let registry = ic_metrics::MetricsRegistry::new(); + registry.int_counter("test_skipped", "Skipped workloads.") + } + + #[test] + fn enqueue_does_not_block_caller_under_load() { + // Spam the worker with many workloads and ensure that the caller is + // never blocked: with a single-slot channel, the surplus must be + // dropped via the `skipped` counter rather than queued. We only + // assert that the loop completes promptly and that `skipped + processed` + // accounts for all the workloads we attempted. + const N: u64 = 1_000; + + let completed = Arc::new(AtomicU64::new(0)); + let skipped = skipped_counter(); + + let worker_thread = WorkerThread::new("test_worker_thread_skip", skipped.clone()); + for _ in 0..N { + let completed = Arc::clone(&completed); + worker_thread.enqueue(Box::new(move || { + completed.fetch_add(1, Ordering::Relaxed); + })); + } + worker_thread.flush_channel(); + + // Completed vs skipped counts are non-deterministic (scheduling dependent), but + // they should both be non-zero and must sum to the number of attempts. + let completed = completed.load(Ordering::Relaxed); + assert!(completed > 0); + assert!(skipped.get() > 0); + assert_eq!(completed + skipped.get(), N); + } + + #[test] + fn flush_is_a_no_op_when_idle() { + let worker_thread = WorkerThread::new("test_worker_thread_flush_idle", skipped_counter()); + // Should return promptly even with nothing in the queue. + worker_thread.flush_channel(); + } +} From 890c65ef4630321eed980f158e908398189e9b61 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Thu, 7 May 2026 14:16:10 +0000 Subject: [PATCH 3/5] Have the StateMachine call StateManager::flush_metrics_channel() after every process_batch(), so we don't need to do it on a test-by-test basis. StateMachine tests usually only have a handful of canisters anyway, so ReplicatedState metrics are collected quickly. --- rs/execution_environment/tests/canister_logging.rs | 6 ------ rs/state_machine_tests/src/lib.rs | 9 +++------ 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/rs/execution_environment/tests/canister_logging.rs b/rs/execution_environment/tests/canister_logging.rs index 93ebe4642c06..88085ca5af7c 100644 --- a/rs/execution_environment/tests/canister_logging.rs +++ b/rs/execution_environment/tests/canister_logging.rs @@ -1718,13 +1718,11 @@ fn test_metric_canister_log_memory_usage_bytes_from_canister_log() { ); // Nothing logged yet — sum is zero. - env.flush_replicated_state_metrics(); let stats = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_eq!(stats.sum, 0.0); // One debug_print — sum reflects the payload plus small record metadata. let _ = env.execute_ingress(canister_id, "test", vec![]); - env.flush_replicated_state_metrics(); let stats = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_le!(METRIC_PAYLOAD_SIZE as f64, stats.sum); assert_le!( @@ -1753,7 +1751,6 @@ fn test_metric_canister_log_memory_usage_bytes_from_log_memory_store() { ); // Every per-round observation equals the default allocated store size. - env.flush_replicated_state_metrics(); let before = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_eq!( before.sum as u64 / before.count, @@ -1763,7 +1760,6 @@ fn test_metric_canister_log_memory_usage_bytes_from_log_memory_store() { // After a debug_print: allocation unchanged; new observations carry the // same value, so the per-observation average remains the default. let _ = env.execute_ingress(canister_id, "test", vec![]); - env.flush_replicated_state_metrics(); let after = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_gt!(after.count, before.count); assert_eq!( @@ -1820,13 +1816,11 @@ fn test_metric_canister_log_retention_seconds() { // Seed the buffer with a first record so retention is non-zero on the // second observation. let _ = env.execute_ingress(canister_id, "test", vec![]); - env.flush_replicated_state_metrics(); let before = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); // Advance simulated time and append another record. env.advance_time(TIME_ADVANCE); let _ = env.execute_ingress(canister_id, "test", vec![]); - env.flush_replicated_state_metrics(); let after = fetch_histogram_stats(env.metrics_registry(), METRIC).unwrap(); assert_eq!(after.count, before.count + 1); diff --git a/rs/state_machine_tests/src/lib.rs b/rs/state_machine_tests/src/lib.rs index 7f62d396210d..7f7671836431 100644 --- a/rs/state_machine_tests/src/lib.rs +++ b/rs/state_machine_tests/src/lib.rs @@ -2567,12 +2567,6 @@ impl StateMachine { ) } - /// Wait until all enqueued `ReplicatedStateMetrics::observe()` calls have been - /// processed by the background metrics thread. - pub fn flush_replicated_state_metrics(&self) { - self.state_manager.flush_metrics_channel(); - } - /// Generates a Xnet payload to a remote subnet. pub fn generate_xnet_payload( &self, @@ -3085,6 +3079,9 @@ impl StateMachine { } assert_eq!(self.state_manager.latest_state_height(), batch_number); + // Wait until the enqueued `ReplicatedStateMetrics::observe()` call has been + // processed by the background metrics thread. + self.state_manager.flush_metrics_channel(); self.check_critical_errors(); self.set_time(time_of_next_round.into()); From b6a6d081bc00f7ace6104a1100aed8462e814e7e Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 8 May 2026 09:43:05 +0000 Subject: [PATCH 4/5] Have the 100k_canisters benchmark not wait for the Replicated State metrics thread every round. --- rs/execution_environment/benches/100k_canisters.rs | 3 ++- rs/state_machine_tests/src/lib.rs | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/rs/execution_environment/benches/100k_canisters.rs b/rs/execution_environment/benches/100k_canisters.rs index 7e3dfa01d6fb..75d140af29c1 100644 --- a/rs/execution_environment/benches/100k_canisters.rs +++ b/rs/execution_environment/benches/100k_canisters.rs @@ -9,7 +9,8 @@ const NUM_CANISTERS_PER_CREATOR_CANISTER: usize = 10_000; lazy_static::lazy_static! { static ref STATE_MACHINE: Arc> = { - let env = StateMachine::new(); + let mut env = StateMachine::new(); + env.flush_replicated_state_metrics = false; let features = []; let wasm = canister_test::Project::cargo_bin_maybe_from_env("canister_creator_canister", &features); diff --git a/rs/state_machine_tests/src/lib.rs b/rs/state_machine_tests/src/lib.rs index 7f7671836431..6c0acd983d9d 100644 --- a/rs/state_machine_tests/src/lib.rs +++ b/rs/state_machine_tests/src/lib.rs @@ -1146,6 +1146,9 @@ pub struct StateMachine { registry_data_provider: Arc, pub registry_client: Arc, pub state_manager: Arc, + /// Whether to flush the replicated state metrics channel after each round. + /// Defaults to `true` but can be overridden, e.g. for benchmarks. + pub flush_replicated_state_metrics: bool, consensus_time: Arc, ingress_pool: Arc>, ingress_manager: Arc, @@ -2337,6 +2340,7 @@ impl StateMachine { registry_data_provider, registry_client: registry_client.clone(), state_manager, + flush_replicated_state_metrics: true, consensus_time, ingress_pool, ingress_manager: ingress_manager.clone(), @@ -3081,7 +3085,9 @@ impl StateMachine { // Wait until the enqueued `ReplicatedStateMetrics::observe()` call has been // processed by the background metrics thread. - self.state_manager.flush_metrics_channel(); + if self.flush_replicated_state_metrics { + self.state_manager.flush_metrics_channel(); + } self.check_critical_errors(); self.set_time(time_of_next_round.into()); From eb7a06a2a341e14d71f6c50b2b9862b430e916a5 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean Date: Fri, 8 May 2026 09:43:38 +0000 Subject: [PATCH 5/5] Comment. --- rs/execution_environment/benches/100k_canisters.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rs/execution_environment/benches/100k_canisters.rs b/rs/execution_environment/benches/100k_canisters.rs index 75d140af29c1..bf4a87c441d0 100644 --- a/rs/execution_environment/benches/100k_canisters.rs +++ b/rs/execution_environment/benches/100k_canisters.rs @@ -10,7 +10,9 @@ const NUM_CANISTERS_PER_CREATOR_CANISTER: usize = 10_000; lazy_static::lazy_static! { static ref STATE_MACHINE: Arc> = { let mut env = StateMachine::new(); + // Don't wait for the Replicated State metrics thread every round. env.flush_replicated_state_metrics = false; + let features = []; let wasm = canister_test::Project::cargo_bin_maybe_from_env("canister_creator_canister", &features);