From ffda1c38c4a0ffb357d29cf4b9337740e8a51516 Mon Sep 17 00:00:00 2001 From: Martin Raszyk Date: Fri, 8 May 2026 15:37:34 +0000 Subject: [PATCH 1/2] chore: prune streams to deleted subnets --- rs/messaging/src/state_machine.rs | 3 ++ rs/messaging/src/state_machine/tests.rs | 38 +++++++++++++++++++++ rs/replicated_state/src/replicated_state.rs | 19 +++++++++++ 3 files changed, 60 insertions(+) diff --git a/rs/messaging/src/state_machine.rs b/rs/messaging/src/state_machine.rs index 880acf043eab..2a59809eb098 100644 --- a/rs/messaging/src/state_machine.rs +++ b/rs/messaging/src/state_machine.rs @@ -234,6 +234,9 @@ impl StateMachine for StateMachineImpl { self.observe_phase_duration(PHASE_INDUCTION, &since); + // Discard streams to subnets no longer present in the network topology. + state_with_messages.discard_streams_for_deleted_subnets(); + let execution_round_type = if requires_full_state_hash { ExecutionRoundType::CheckpointRound } else { diff --git a/rs/messaging/src/state_machine/tests.rs b/rs/messaging/src/state_machine/tests.rs index 7738d7325c01..d96b0db65ba2 100644 --- a/rs/messaging/src/state_machine/tests.rs +++ b/rs/messaging/src/state_machine/tests.rs @@ -10,6 +10,7 @@ use ic_registry_subnet_features::SubnetFeatures; use ic_registry_subnet_type::SubnetType; use ic_replicated_state::{ ReplicatedState, SubnetTopology, metadata_state::testing::NetworkTopologyTesting, + testing::ReplicatedStateTesting, }; use ic_test_utilities_execution_environment::test_registry_settings; use ic_test_utilities_logger::with_test_replica_logger; @@ -249,6 +250,43 @@ fn test_delivered_batch_interface() { } } +#[test] +fn state_machine_discards_stream_for_deleted_subnet() { + let provided_batch = BatchBuilder::new().batch_number(Height::new(1)).build(); + let fixture = test_fixture(&provided_batch); + + // Add a stream to SUBNET_2, which is not present in the fixture's network topology. + let mut initial_state = fixture.initial_state; + initial_state.modify_streams(|streams| { + streams.insert(SUBNET_2, Default::default()); + }); + assert!(initial_state.get_stream(&SUBNET_2).is_some()); + + with_test_replica_logger(|log| { + let state_machine = Box::new(StateMachineImpl::new( + fixture.scheduler, + fixture.demux, + fixture.stream_builder, + Default::default(), + log, + fixture.metrics, + )); + + let state = state_machine.execute_round( + initial_state, + fixture.network_topology.clone(), + provided_batch, + Default::default(), + Default::default(), + &test_registry_settings(), + Default::default(), + Default::default(), + ); + + assert!(state.get_stream(&SUBNET_2).is_none()); + }); +} + const NNS_SUBNET_ID: SubnetId = SUBNET_0; const SUBNET_A: SubnetId = SUBNET_1; const SUBNET_B: SubnetId = SUBNET_2; diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index d94c2a04c1b9..56d29d324334 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -855,6 +855,25 @@ impl ReplicatedState { self.metadata.streams.get(destination_subnet_id) } + /// Discards streams to subnets no longer present in the network topology. + /// + /// Called after the induction phase of each round, once the new + /// `NetworkTopology` (reflecting any registry deletions) has been applied. + /// Safe to call because by the time the registry deletion takes effect, all + /// certified stream slices from the deleted subnet have already been inducted + /// in the same round, and no new certified slices can be produced once the + /// subnet's key is removed from the registry. + pub fn discard_streams_for_deleted_subnets(&mut self) { + let mut streams = self.take_streams(); + streams.retain(|subnet_id, _| { + self.metadata + .network_topology + .subnets() + .contains_key(subnet_id) + }); + self.put_streams(streams); + } + /// Returns the sum of reserved compute allocations of all currently /// available canisters. pub fn total_compute_allocation(&self) -> u64 { From b9234458acc446a7ea547dd04880c9144b242924 Mon Sep 17 00:00:00 2001 From: Martin Raszyk Date: Fri, 8 May 2026 15:50:21 +0000 Subject: [PATCH 2/2] do not raise critical error for discarded best-effort responses --- rs/messaging/src/routing/stream_builder.rs | 27 ++++++++++++++-------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/rs/messaging/src/routing/stream_builder.rs b/rs/messaging/src/routing/stream_builder.rs index 48c2dfa02d32..3d78b2822c0f 100644 --- a/rs/messaging/src/routing/stream_builder.rs +++ b/rs/messaging/src/routing/stream_builder.rs @@ -575,15 +575,24 @@ impl StreamBuilderImpl { } RequestOrResponse::Response(rep) => { // A Response: discard it. - error!( - self.log, - "{}: Discarding response, destination not found: {:?}", - CRITICAL_ERROR_RESPONSE_DESTINATION_NOT_FOUND, - rep - ); - self.metrics - .critical_error_response_destination_not_found - .inc(); + if rep.is_best_effort() { + // Expected when the destination subnet has been deleted. + warn!( + self.log, + "Discarding best-effort response, destination not found: {:?}", + rep + ); + } else { + error!( + self.log, + "{}: Discarding response, destination not found: {:?}", + CRITICAL_ERROR_RESPONSE_DESTINATION_NOT_FOUND, + rep + ); + self.metrics + .critical_error_response_destination_not_found + .inc(); + } } } }