Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions rs/messaging/src/routing/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,24 @@ impl StreamBuilderImpl {
}
RequestOrResponse::Response(rep) => {
// A Response: discard it.
error!(
self.log,
"{}: Discarding response, destination not found: {:?}",
CRITICAL_ERROR_RESPONSE_DESTINATION_NOT_FOUND,
rep
);
self.metrics
.critical_error_response_destination_not_found
.inc();
if rep.is_best_effort() {
// Expected when the destination subnet has been deleted.
warn!(
self.log,
"Discarding best-effort response, destination not found: {:?}",
rep
);
} else {
error!(
self.log,
"{}: Discarding response, destination not found: {:?}",
CRITICAL_ERROR_RESPONSE_DESTINATION_NOT_FOUND,
rep
);
self.metrics
.critical_error_response_destination_not_found
.inc();
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions rs/messaging/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ impl StateMachine for StateMachineImpl {

self.observe_phase_duration(PHASE_INDUCTION, &since);

// Discard streams to subnets no longer present in the network topology.
state_with_messages.discard_streams_for_deleted_subnets();

let execution_round_type = if requires_full_state_hash {
ExecutionRoundType::CheckpointRound
} else {
Expand Down
38 changes: 38 additions & 0 deletions rs/messaging/src/state_machine/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ic_registry_subnet_features::SubnetFeatures;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::{
ReplicatedState, SubnetTopology, metadata_state::testing::NetworkTopologyTesting,
testing::ReplicatedStateTesting,
};
use ic_test_utilities_execution_environment::test_registry_settings;
use ic_test_utilities_logger::with_test_replica_logger;
Expand Down Expand Up @@ -249,6 +250,43 @@ fn test_delivered_batch_interface() {
}
}

#[test]
fn state_machine_discards_stream_for_deleted_subnet() {
let provided_batch = BatchBuilder::new().batch_number(Height::new(1)).build();
let fixture = test_fixture(&provided_batch);

// Add a stream to SUBNET_2, which is not present in the fixture's network topology.
let mut initial_state = fixture.initial_state;
initial_state.modify_streams(|streams| {
streams.insert(SUBNET_2, Default::default());
});
assert!(initial_state.get_stream(&SUBNET_2).is_some());

with_test_replica_logger(|log| {
let state_machine = Box::new(StateMachineImpl::new(
fixture.scheduler,
fixture.demux,
fixture.stream_builder,
Default::default(),
log,
fixture.metrics,
));

let state = state_machine.execute_round(
initial_state,
fixture.network_topology.clone(),
provided_batch,
Default::default(),
Default::default(),
&test_registry_settings(),
Default::default(),
Default::default(),
);

assert!(state.get_stream(&SUBNET_2).is_none());
});
}

const NNS_SUBNET_ID: SubnetId = SUBNET_0;
const SUBNET_A: SubnetId = SUBNET_1;
const SUBNET_B: SubnetId = SUBNET_2;
Expand Down
19 changes: 19 additions & 0 deletions rs/replicated_state/src/replicated_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,25 @@ impl ReplicatedState {
self.metadata.streams.get(destination_subnet_id)
}

/// Discards streams to subnets no longer present in the network topology.
///
/// Called after the induction phase of each round, once the new
/// `NetworkTopology` (reflecting any registry deletions) has been applied.
/// Safe to call because by the time the registry deletion takes effect, all
/// certified stream slices from the deleted subnet have already been inducted
/// in the same round, and no new certified slices can be produced once the
/// subnet's key is removed from the registry.
pub fn discard_streams_for_deleted_subnets(&mut self) {
let mut streams = self.take_streams();
streams.retain(|subnet_id, _| {
self.metadata
.network_topology
.subnets()
.contains_key(subnet_id)
});
self.put_streams(streams);
}

/// Returns the sum of reserved compute allocations of all currently
/// available canisters.
pub fn total_compute_allocation(&self) -> u64 {
Expand Down
Loading