Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion rs/execution_environment/benches/100k_canisters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ const NUM_CANISTERS_PER_CREATOR_CANISTER: usize = 10_000;

lazy_static::lazy_static! {
static ref STATE_MACHINE: Arc<Mutex<StateMachine>> = {
let env = StateMachine::new();
let mut env = StateMachine::new();
// Don't wait for the Replicated State metrics thread every round.
env.flush_replicated_state_metrics = false;

let features = [];
let wasm =
canister_test::Project::cargo_bin_maybe_from_env("canister_creator_canister", &features);
Expand Down
1 change: 0 additions & 1 deletion rs/execution_environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ impl ExecutionServices {
let scheduler = Box::new(SchedulerImpl::new(
subnet_config.scheduler_config,
config.embedders_config,
own_subnet_id,
Arc::clone(&ingress_history_writer) as Arc<_>,
Arc::clone(&execution_environment) as Arc<_>,
Arc::clone(&cycles_account_manager),
Expand Down
17 changes: 3 additions & 14 deletions rs/execution_environment/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::SubnetSchedule;
use ic_replicated_state::canister_state::NextExecution;
use ic_replicated_state::canister_state::execution_state::NextScheduledMethod;
use ic_replicated_state::metrics::ReplicatedStateMetrics;
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use ic_replicated_state::{
CanisterState, ExecutionTask, InputQueueType, NetworkTopology, ReplicatedState,
Expand All @@ -43,7 +42,7 @@ use ic_types::ingress::{IngressState, IngressStatus};
use ic_types::messages::{Ingress, MessageId, NO_DEADLINE, Response, SubnetMessage};
use ic_types::{
CanisterId, ComputeAllocation, ExecutionRound, MemoryAllocation, NumBytes, NumInstructions,
NumMessages, NumSlices, Randomness, ReplicaVersion, SubnetId, Time,
NumMessages, NumSlices, Randomness, ReplicaVersion, Time,
};
use ic_types_cycles::{CanisterCyclesCostSchedule, Cycles};
use more_asserts::{debug_assert_ge, debug_assert_le, debug_assert_lt};
Expand Down Expand Up @@ -141,12 +140,10 @@ impl SchedulerRoundLimits {
pub(crate) struct SchedulerImpl {
config: SchedulerConfig,
hypervisor_config: HypervisorConfig,
own_subnet_id: SubnetId,
ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState>>,
exec_env: Arc<ExecutionEnvironment>,
cycles_account_manager: Arc<CyclesAccountManager>,
metrics: Arc<SchedulerMetrics>,
state_metrics: ReplicatedStateMetrics,
log: ReplicaLogger,
thread_pool: RefCell<scoped_threadpool::Pool>,
rate_limiting_of_heap_delta: FlagStatus,
Expand All @@ -159,7 +156,6 @@ impl SchedulerImpl {
pub(crate) fn new(
config: SchedulerConfig,
hypervisor_config: HypervisorConfig,
own_subnet_id: SubnetId,
ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState>>,
exec_env: Arc<ExecutionEnvironment>,
cycles_account_manager: Arc<CyclesAccountManager>,
Expand All @@ -174,12 +170,10 @@ impl SchedulerImpl {
config,
hypervisor_config,
thread_pool: RefCell::new(scoped_threadpool::Pool::new(scheduler_cores)),
own_subnet_id,
ingress_history_writer,
exec_env,
cycles_account_manager,
metrics: Arc::new(SchedulerMetrics::new(metrics_registry)),
state_metrics: ReplicatedStateMetrics::new(metrics_registry),
log,
rate_limiting_of_heap_delta,
rate_limiting_of_instructions,
Expand Down Expand Up @@ -1100,17 +1094,12 @@ impl SchedulerImpl {
}
}

self.state_metrics.observe(
self.own_subnet_id,
state,
current_round.get().into(),
logger,
);

self.check_invariants(state, current_round_type, current_round, logger);
}

/// Checks the DTS and subnet memory usage invariants at the end of the round.
//
// TODO(DSM-103): Move into ReplicatedStateMetrics.
fn check_invariants(
&self,
state: &ReplicatedState,
Expand Down
34 changes: 27 additions & 7 deletions rs/execution_environment/src/scheduler/test_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use ic_replicated_state::{
ReplicatedState,
canister_state::execution_state::{self, WasmExecutionMode, WasmMetadata},
metadata_state::testing::NetworkTopologyTesting,
metrics::ReplicatedStateMetrics,
num_bytes_try_from,
page_map::TestPageAllocatorFileDescriptorImpl,
testing::{CanisterQueuesTesting, ReplicatedStateTesting},
Expand Down Expand Up @@ -112,25 +113,26 @@ pub(crate) struct SchedulerTest {
round_summary: Option<ExecutionRoundSummary>,
/// The amount of cycles that new canisters have by default.
initial_canister_cycles: Cycles,
/// The id of the user that sends ingress messages.
/// The ID of the user that sends ingress messages.
user_id: UserId,
/// The id of a canister that is guaranteed to be xnet.
/// The ID of a canister that is guaranteed to be XNet.
xnet_canister_id: CanisterId,
/// The actual scheduler.
scheduler: SchedulerImpl,
/// The fake Wasm executor.
wasm_executor: Arc<TestWasmExecutor>,
/// Registry Execution Settings.
registry_settings: RegistryExecutionSettings,
/// Metrics Registry.
metrics_registry: MetricsRegistry,
/// Chain key subnet public keys.
/// Replicated state metrics, updated by `SchedulerTest` after every round.
///
/// These used to be updated by the scheduler, but state instrumentation was
/// moved into the State Manager.
state_metrics: ReplicatedStateMetrics,
chain_key_subnet_public_keys: BTreeMap<MasterPublicKeyId, MasterPublicKey>,
/// Available pre-signatures.
idkg_pre_signatures: BTreeMap<IDkgMasterPublicKeyId, AvailablePreSignatures>,
/// Version of the running replica, not the registry's Entry
replica_version: ReplicaVersion,
/// Hypervisor config.
hypervisor_config: HypervisorConfig,
}

Expand Down Expand Up @@ -169,6 +171,10 @@ impl SchedulerTest {
&self.metrics_registry
}

pub fn state_metrics(&self) -> &ReplicatedStateMetrics {
&self.state_metrics
}

pub fn canister_state_mut(&mut self, canister_id: CanisterId) -> &mut CanisterState {
self.state_mut()
.canister_state_make_mut(&canister_id)
Expand Down Expand Up @@ -595,6 +601,18 @@ impl SchedulerTest {
round_type,
self.registry_settings(),
);

// Explicitly observe the Replicated State metrics after every round. This used
// to be done by the scheduler, so there are a number of tests depending on
// various state metrics, but instrumentation was moved into the State Manager.
// So we "manually"update the metrics after every round.
self.state_metrics.observe(
state.metadata.own_subnet_id,
&state,
self.round.get().into(),
&self.scheduler.log,
);

self.state = Some(state);
self.check_invariants();
self.increment_round();
Expand Down Expand Up @@ -1078,7 +1096,6 @@ impl SchedulerTestBuilder {
let scheduler = SchedulerImpl::new(
self.scheduler_config,
self.hypervisor_config.clone(),
self.own_subnet_id,
execution_services.ingress_history_writer,
execution_services.execution_environment,
execution_services.cycles_account_manager,
Expand All @@ -1089,6 +1106,8 @@ impl SchedulerTestBuilder {
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
);

let state_metrics = ReplicatedStateMetrics::new(&self.metrics_registry);

SchedulerTest {
state: Some(state),
next_canister_id: 0,
Expand All @@ -1101,6 +1120,7 @@ impl SchedulerTestBuilder {
wasm_executor,
registry_settings,
metrics_registry: self.metrics_registry,
state_metrics,
chain_key_subnet_public_keys,
idkg_pre_signatures: BTreeMap::new(),
replica_version: self.replica_version,
Expand Down
28 changes: 10 additions & 18 deletions rs/execution_environment/src/scheduler/tests/dts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ fn dts_long_execution_completes() {
ErrorCode::CanisterDidNotReply,
);
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_paused_execution()
.get_sample_sum(),
9.0
Expand Down Expand Up @@ -124,8 +123,7 @@ fn cannot_execute_management_message_for_targeted_long_execution_canister() {
test.execute_round(ExecutionRoundType::OrdinaryRound);
assert_eq!(test.state().subnet_queues().input_queues_message_count(), 1);
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_paused_execution()
.get_sample_sum(),
4.0
Expand All @@ -143,8 +141,7 @@ fn cannot_execute_management_message_for_targeted_long_execution_canister() {
ErrorCode::CanisterDidNotReply,
);
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_paused_execution()
.get_sample_sum(),
9.0
Expand Down Expand Up @@ -176,8 +173,7 @@ fn dts_long_execution_runs_out_of_instructions() {
ErrorCode::CanisterInstructionLimitExceeded,
);
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_paused_execution()
.get_sample_sum(),
9.0
Expand Down Expand Up @@ -509,7 +505,7 @@ fn dts_allow_only_one_long_install_code_execution_at_a_time() {
0.0
);

let state_metrics = &test.scheduler().state_metrics;
let state_metrics = test.state_metrics();
// 2 rounds because the first canister was paused twice.
assert_eq!(
state_metrics
Expand Down Expand Up @@ -549,7 +545,7 @@ fn dts_allow_only_one_long_install_code_execution_at_a_time() {
);
// 3 slices for the first canister, 1 slice for the second.
assert_eq!(metrics.round.slices.get_sample_sum(), 4.0);
let state_metrics = &test.scheduler().state_metrics;
let state_metrics = test.state_metrics();
// Same 2 rounds of paused install code as above.
assert_eq!(
state_metrics
Expand Down Expand Up @@ -593,16 +589,14 @@ fn dts_resume_install_code_after_abort() {

// After 1 + 9 rounds we had a paused install code.
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_paused_install_code()
.get_sample_sum(),
10.0
);
// After the checkpoint round we had an aborted install code.
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_aborted_install_code()
.get_sample_sum(),
1.0
Expand Down Expand Up @@ -659,15 +653,13 @@ fn dts_resume_long_execution_after_abort() {
ErrorCode::CanisterDidNotReply,
);
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_paused_execution()
.get_sample_sum(),
10.0
);
assert_eq!(
test.scheduler()
.state_metrics
test.state_metrics()
.canister_aborted_execution()
.get_sample_sum(),
1.0
Expand Down
Loading
Loading