feat: [DSM-106] Observe ReplicatedState metrics in a background thread#10120
feat: [DSM-106] Observe ReplicatedState metrics in a background thread#10120alin-at-dfinity wants to merge 6 commits intomasterfrom
ReplicatedState metrics in a background thread#10120Conversation
Only schedule active canisters instead of all canisters. This needs to be done on every scheduler loop iteration (as opposed to once per round followed by filtering per iteration), to account for canisters going idle / becoming active from one iteration to the next. But scheduling priorities continue to be updated only once, at the end of the round.
Based on a recent CPU profile of the `alin/DSM-103-active-canister-only-scheduling` branch (i.e. after scheduler optimizations), `ReplicatedStateMetrics::observe()` accounts for 20%-25% of the DSM round duration. This is all read-only work (scanning the `ReplicatedState` and instrumenting stats) so it can be safely done on a background thread, particularly after the `StateManager` has already created a cheap to clone `Arc<ReplicatedState>`. The change moves `ReplicatedStateMetrics::observe()` off the `commit_and_certify()` critical path by introducing a generic `WorkerThread` in `ic-utils-thread` and wiring `StateManagerImpl` to defer observations through it. There's a single‑slot bounded channel (one in‑flight + one queued); excess work is dropped and counted via `state_manager_skipped_state_observations`.
…r every process_batch(), so we don't need to do it on a test-by-test basis. StateMachine tests usually only have a handful of canisters anyway, so ReplicatedState metrics are collected quickly.
…etrics thread every round.
| // Wait until the enqueued `ReplicatedStateMetrics::observe()` call has been | ||
| // processed by the background metrics thread. | ||
| if self.flush_replicated_state_metrics { | ||
| self.state_manager.flush_metrics_channel(); | ||
| } |
There was a problem hiding this comment.
Not sure how much of an effect this will have on the average StateMachine test. In my benchmark runs it added about 30% overhead (and a lot of variability), but those runs actually had 100k canisters for the metrics thread to iterate over.
I could also switch the default to not wait for metrics to be collected; and only override it in tests that actually look at ReplicatedState metrics. But I did that and was pretty sure it was done right; and then had a test that I missed flake out on me. So rather than look for every ReplicatedState metric name; and risk similar flakiness in the future, when I will likely have forgotten about this quirk; I'd rather be safe.
There was a problem hiding this comment.
Pull request overview
Moves replicated-state metrics observation off the StateManager::commit_and_certify() critical path by introducing a reusable background WorkerThread, then wiring StateManagerImpl to enqueue ReplicatedStateMetrics::observe() work asynchronously (with bounded buffering + skipped-work counter). The PR also updates a large set of scheduler/test utilities to reflect that state instrumentation is no longer performed inside the scheduler (and includes additional subnet-schedule behavior changes).
Changes:
- Add
WorkerThread(single-slot queue + drop-on-overload) to run expensive, read-only workloads in a background thread. - Defer
ReplicatedStateMetrics::observe()fromStateManagerImpl::commit_and_certify()to the worker thread and addstate_manager_skipped_state_observationsto track drops; add test coverage that waits for background completion. - Refactor scheduler/tests/bench harnesses to stop relying on scheduler-owned replicated-state metrics, and adjust subnet-schedule handling to drop certain idle entries.
Reviewed changes
Copilot reviewed 27 out of 28 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| rs/utils/thread/src/worker_thread.rs | Introduces the generic background worker thread abstraction with bounded queueing and skipped counter. |
| rs/utils/thread/src/lib.rs | Exposes the new worker_thread module. |
| rs/utils/thread/Cargo.toml | Adds prometheus dependency and ic-metrics dev-dependency for worker-thread tests. |
| rs/utils/thread/BUILD.bazel | Wires Bazel deps for prometheus and test-only ic-metrics. |
| rs/state_manager/tests/state_manager.rs | Adds a test ensuring commit_and_certify triggers replicated-state metrics via the background thread. |
| rs/state_manager/src/lib.rs | Creates ReplicatedStateMetrics + WorkerThread, enqueues observations from commit_and_certify, and exposes a testing-only flush hook. |
| rs/state_machine_tests/src/lib.rs | Flushes the state-manager metrics worker after each round (configurable for benchmarks). |
| rs/replicated_state/tests/replicated_state.rs | Updates split tests to match new subnet-schedule population semantics. |
| rs/replicated_state/src/replicated_state.rs | Adjusts when canisters are inserted into the subnet schedule and expands subnet-schedule GC rules. |
| rs/replicated_state/src/metadata_state/subnet_schedule.rs | Adds CanisterPriority::is_non_zero() helper used by GC logic. |
| rs/replicated_state/src/canister_state.rs | Adds CanisterState::must_be_in_schedule() predicate used to retain required schedule entries. |
| rs/messaging/src/message_routing.rs | Adds TODO related to memory-usage observation timing. |
| rs/execution_environment/tests/canister_logging.rs | Minor formatting-only change in test. |
| rs/execution_environment/src/scheduler/tests/scheduling.rs | Updates scheduler tests to match new scheduling/metrics behavior and removes some tests. |
| rs/execution_environment/src/scheduler/tests/rate_limiting.rs | Updates accessor usage for state metrics in tests. |
| rs/execution_environment/src/scheduler/tests/metrics.rs | Updates tests to use ReplicatedStateMetrics directly (no longer owned by scheduler). |
| rs/execution_environment/src/scheduler/tests/limits.rs | Adjusts expectations for rounds_scheduled under instruction limits. |
| rs/execution_environment/src/scheduler/tests/dts.rs | Updates tests to use new state-metrics access patterns. |
| rs/execution_environment/src/scheduler/test_utilities.rs | Adds state_metrics to SchedulerTest and observes replicated-state metrics explicitly after each round. |
| rs/execution_environment/src/scheduler/scheduler_metrics.rs | Removes unused metrics/gauges and tweaks metric description wording. |
| rs/execution_environment/src/scheduler/round_schedule/tests.rs | Updates tests and adds coverage for new subnet-schedule pruning / idle AP handling. |
| rs/execution_environment/src/scheduler/round_schedule.rs | Refactors iteration scheduling and subnet-schedule maintenance; adds pruning and AP burn-down logic. |
| rs/execution_environment/src/scheduler.rs | Removes scheduler-owned ReplicatedStateMetrics, refactors round scheduling creation, and adjusts compute-utilization calculation. |
| rs/execution_environment/src/lib.rs | Re-exports IterationSchedule and updates scheduler construction call sites. |
| rs/execution_environment/BUILD.bazel | Adds missing deps needed by updated benches. |
| rs/execution_environment/benches/scheduler.rs | Updates benchmark setup to new RoundSchedule/iteration API and logger requirements. |
| rs/execution_environment/benches/100k_canisters.rs | Disables per-round flush of replicated-state metrics for benchmark performance. |
| Cargo.lock | Records new dependency edges for ic-utils-thread. |
Comments suppressed due to low confidence (2)
rs/execution_environment/src/scheduler.rs:729
compute_utilization_per_coreis described as a 0.0–1.0 fraction, but the utilization calculation now dividesinstructions_executedbymax_instructions_per_slice. Since a core/thread can execute multiple canisters per iteration (seeIterationSchedule::partition_canisters_to_cores),instructions_executedcan exceed a single-slice limit, producing values > 1.0 and skewing the metric. Consider dividing by the actual per-thread instruction budget for the iteration (e.g.round_limits_per_thread.instructions.get()or another limit that boundsinstructions_executed).
max_instructions_executed_per_thread =
max_instructions_executed_per_thread.max(instructions_executed);
let divisor = self.config.max_instructions_per_slice.get();
debug_assert_ne!(divisor, 0, "prevent divide by zero panic");
if divisor > 0 {
let value = instructions_executed.get() as f64 / divisor as f64;
self.metrics.compute_utilization_per_core.observe(value);
}
rs/execution_environment/src/scheduler/round_schedule.rs:621
- The comment says free compute is distributed to “scheduled canisters”, but the code builds
compute_allocationsfrom all entries insubnet_scheduleand then iterates over that vector when distributingfree_compute. Either update the comment to match the current behavior (subnet-schedule entries), or filtercompute_allocations/total_ap/total_cato scheduled canisters if that’s the intended change.
// Grant all canisters in the subnet schedule their compute allocation. Collect
// scheduled canisters' compute allocations; and sum their total AP and CA.
let mut total_ap = ZERO;
let mut total_ca = ZERO;
let mut compute_allocations = Vec::with_capacity(subnet_schedule.len());
for (canister_id, canister_priority) in subnet_schedule.iter_mut() {
let canister = canister_states.get_mut(canister_id).unwrap();
let compute_allocation = from_ca(canister.compute_allocation());
canister_priority.accumulated_priority += compute_allocation;
// Treat idle canisters with positive AP as fully executed (which is technically
// true). The goal is to gradually burn down their AP to zero, so that if they
// get new inputs soon, they will not have instantly lost all their previous AP.
if canister_priority.accumulated_priority > ZERO
&& !self.scheduled_canisters.contains(canister_id)
&& !self.rate_limited_canisters.contains(canister_id)
{
canister_priority.accumulated_priority -=
ONE_HUNDRED_PERCENT.min(canister_priority.accumulated_priority);
}
// Apply an exponential decay to AP values outside the [AP_ROUNDS_MIN,
// AP_ROUNDS_MAX] range to soft bound any runaway AP.
const AP_MAX: AccumulatedPriority =
AccumulatedPriority::new(AP_ROUNDS_MAX * 100 * MULTIPLIER);
const AP_MIN: AccumulatedPriority =
AccumulatedPriority::new(AP_ROUNDS_MIN * 100 * MULTIPLIER);
if canister_priority.accumulated_priority > AP_MAX {
canister_priority.accumulated_priority = AP_MAX
+ (canister_priority.accumulated_priority - AP_MAX) * AP_DECAY_PERCENT / 100;
} else if canister_priority.accumulated_priority < AP_MIN {
canister_priority.accumulated_priority = AP_MIN
+ (canister_priority.accumulated_priority - AP_MIN) * AP_DECAY_PERCENT / 100;
}
total_ap += canister_priority.accumulated_priority;
total_ca += compute_allocation;
compute_allocations.push((*canister_id, compute_allocation));
}
// Distribute the "free compute" (negative of total AP) to scheduled canisters.
//
// Only ever apply positive free compute. If the total AP is positive (e.g. we
// granted compute allocations after not having completed any execution this
// round), then there is simply no free compute to distribute.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// A workload to be executed by the worker thread. | ||
| type Workload = Box<dyn FnOnce() + Send>; | ||
|
|
Based on a recent CPU profile of the
alin/DSM-103-active-canister-only-schedulingbranch (i.e. after scheduler optimizations),ReplicatedStateMetrics::observe()accounts for 20%-25% of the DSM round duration. This is all read-only work (scanning theReplicatedStateand instrumenting stats) so it can be safely done on a background thread, particularly after theStateManagerhas already created a cheap to cloneArc<ReplicatedState>.The change moves
ReplicatedStateMetrics::observe()off thecommit_and_certify()critical path by introducing a genericWorkerThreadinic-utils-threadand wiringStateManagerImplto defer observations through it. There's a single‑slot bounded channel (one in‑flight + one queued); excess work is dropped and counted viastate_manager_skipped_state_observations.