From 0e1ddbe6b36de3d83564849010b049c47fd73514 Mon Sep 17 00:00:00 2001 From: m0ar Date: Fri, 21 Nov 2025 15:51:39 +0100 Subject: [PATCH] feat: allow blocking bad peers, enforce backoff on inbound syncs --- recon/src/libp2p.rs | 91 ++++++++++++- recon/src/libp2p/handler.rs | 37 +++++- recon/src/libp2p/tests.rs | 259 ++++++++++++++++++++++++++++++++++++ recon/src/metrics.rs | 39 ++++++ 4 files changed, 420 insertions(+), 6 deletions(-) diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index 7c5a23679..c24da3c12 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -24,12 +24,14 @@ use ceramic_core::{EventId, PeerKey}; use futures::{future::BoxFuture, FutureExt}; use libp2p::{ core::ConnectedPoint, - swarm::{CloseConnection, ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm}, + swarm::{ + CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, ToSwarm, + }, }; use libp2p_identity::PeerId; use std::{ cmp::min, - collections::{btree_map::Entry, BTreeMap}, + collections::{btree_map::Entry, BTreeMap, HashSet}, task::Poll, time::Duration, }; @@ -38,8 +40,10 @@ use tracing::{debug, trace, warn}; use crate::{ libp2p::handler::{FromBehaviour, FromHandler, Handler}, + metrics::{BlockedConnection, InboundSyncRejected}, Sha256a, }; +use ceramic_metrics::Recorder; /// Name of the Recon protocol for synchronizing peers pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer"; @@ -57,6 +61,9 @@ pub struct Config { /// Maximum delay between synchronization attempts. /// Defaults to 10 minutes pub per_peer_maximum_sync_delay: Duration, + /// Set of PeerIds that are permanently blocked. + /// Connections from these peers will be denied. + pub blocked_peers: HashSet, } impl Default for Config { @@ -65,6 +72,7 @@ impl Default for Config { per_peer_sync_delay: Duration::from_millis(1000), per_peer_sync_backoff: 2.0, per_peer_maximum_sync_delay: Duration::from_secs(60 * 10), + blocked_peers: HashSet::new(), } } } @@ -79,6 +87,9 @@ pub struct Behaviour { model: M, config: Config, peers: BTreeMap, + /// Tracks backoff state for peers, persisting across disconnections. + /// Maps peer ID to the time until which incoming syncs should be rejected. + backoff_registry: BTreeMap, swarm_events_sender: tokio::sync::mpsc::Sender>, swarm_events_receiver: tokio::sync::mpsc::Receiver>, next_sync: Option>, @@ -94,6 +105,7 @@ where .field("model", &self.model) .field("config", &self.config) .field("peers", &self.peers) + .field("backoff_registry", &self.backoff_registry) .field("swarm_events_sender", &self.swarm_events_sender) .field("swarm_events_receiver", &self.swarm_events_receiver) .field("next_sync", &"_") @@ -156,6 +168,7 @@ impl Behaviour { model, config, peers: BTreeMap::new(), + backoff_registry: BTreeMap::new(), swarm_events_sender: tx, swarm_events_receiver: rx, next_sync: None, @@ -169,6 +182,16 @@ impl Behaviour { handle.block_on(async move { tx.send(event).await }) }); } + + /// Calculate the reject_until timestamp for a peer + fn calculate_reject_until(peer_info: &PeerInfo) -> Option { + peer_info + .next_sync + .values() + .max() + .copied() + .filter(|t| *t > Instant::now()) + } } impl NetworkBehaviour for Behaviour @@ -188,6 +211,8 @@ where id: info.connection_id, dialer: matches!(info.endpoint, ConnectedPoint::Dialer { .. }), }; + + // Get or create peer info self.peers .entry(info.peer_id) .and_modify(|peer_info| peer_info.connections.push(connection_info)) @@ -311,9 +336,28 @@ where ), ); info.status = PeerStatus::Failed { stream_set }; + // Collect data before releasing borrow + let reject_until = Self::calculate_reject_until(info); + let status = info.status; + let connection_ids: Vec<_> = info.connections.iter().map(|c| c.id).collect(); + + // Update backoff registry and notify handlers + if let Some(reject_until) = reject_until { + self.backoff_registry.insert(peer_id, reject_until); + // Notify handlers so they can reject incoming syncs + for conn_id in connection_ids { + self.send_event(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::One(conn_id), + event: FromBehaviour::UpdateRejectUntil { + reject_until: Some(reject_until), + }, + }); + } + } Some(ToSwarm::GenerateEvent(Event::PeerEvent(PeerEvent { remote_peer_id: peer_id, - status: info.status, + status, }))) } else { tracing::warn!(%peer_id, ?connection_id, "peer not found in peers map when failed synchronizing? closing connectoin"); @@ -323,6 +367,13 @@ where }) } } + + // An incoming sync was rejected due to backoff - just log it, no state change needed + FromHandler::InboundRejected { stream_set } => { + debug!(%peer_id, ?stream_set, "inbound sync rejected due to backoff"); + self.peer.metrics().record(&InboundSyncRejected); + None + } }; if let Some(ev) = ev { @@ -334,6 +385,10 @@ where &mut self, cx: &mut std::task::Context<'_>, ) -> Poll>> { + // Clean up expired backoff entries + let now = Instant::now(); + self.backoff_registry.retain(|_, expires| *expires > now); + if let Poll::Ready(Some(event)) = self.swarm_events_receiver.poll_recv(cx) { trace!(?event, "swarm event"); return Poll::Ready(event); @@ -388,13 +443,26 @@ where _local_addr: &libp2p::Multiaddr, _remote_addr: &libp2p::Multiaddr, ) -> std::result::Result, libp2p::swarm::ConnectionDenied> { - debug!(%peer, ?connection_id, "handle_established_inbound_connection"); + // Check if peer is blocked + if self.config.blocked_peers.contains(&peer) { + debug!(%peer, ?connection_id, "rejecting inbound connection from blocked peer"); + self.peer.metrics().record(&BlockedConnection); + return Err(ConnectionDenied::new(format!("peer {} is blocked", peer))); + } + // Check backoff registry for this peer + let reject_inbound_until = self + .backoff_registry + .get(&peer) + .copied() + .filter(|t| *t > Instant::now()); + debug!(%peer, ?connection_id, ?reject_inbound_until, "handle_established_inbound_connection"); Ok(Handler::new( peer, connection_id, handler::State::WaitingInbound, self.peer.clone(), self.model.clone(), + reject_inbound_until, )) } @@ -405,7 +473,19 @@ where _addr: &libp2p::Multiaddr, _role_override: libp2p::core::Endpoint, ) -> std::result::Result, libp2p::swarm::ConnectionDenied> { - debug!(%peer, ?connection_id, "handle_established_outbound_connection"); + // Check if peer is blocked + if self.config.blocked_peers.contains(&peer) { + debug!(%peer, ?connection_id, "rejecting outbound connection to blocked peer"); + self.peer.metrics().record(&BlockedConnection); + return Err(ConnectionDenied::new(format!("peer {} is blocked", peer))); + } + // Check backoff registry for this peer + let reject_inbound_until = self + .backoff_registry + .get(&peer) + .copied() + .filter(|t| *t > Instant::now()); + debug!(%peer, ?connection_id, ?reject_inbound_until, "handle_established_outbound_connection"); Ok(Handler::new( peer, connection_id, @@ -415,6 +495,7 @@ where }, self.peer.clone(), self.model.clone(), + reject_inbound_until, )) } } diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index dfec0fa1f..32468536b 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -14,6 +14,7 @@ use libp2p::{ }, }; use libp2p_identity::PeerId; +use tokio::time::Instant; use tracing::{debug, trace}; use crate::{ @@ -29,6 +30,9 @@ pub struct Handler { model: M, state: State, behavior_events_queue: VecDeque, + /// Timestamp until which incoming syncs should be rejected. + /// If this is in the future, incoming sync attempts will be dropped. + reject_inbound_until: Option, } impl Handler @@ -42,6 +46,7 @@ where state: State, peer: P, model: M, + reject_inbound_until: Option, ) -> Self { Self { remote_peer_id: peer_id, @@ -50,6 +55,7 @@ where model, state, behavior_events_queue: VecDeque::new(), + reject_inbound_until, } } // Transition the state to a new state. @@ -134,7 +140,14 @@ impl std::fmt::Debug for State { #[derive(Debug)] pub enum FromBehaviour { - StartSync { stream_set: StreamSet }, + StartSync { + stream_set: StreamSet, + }, + /// Update the timestamp until which incoming syncs should be rejected. + /// Used to reject incoming syncs from peers we're currently backing off. + UpdateRejectUntil { + reject_until: Option, + }, } #[derive(Debug)] pub enum FromHandler { @@ -150,6 +163,10 @@ pub enum FromHandler { stream_set: StreamSet, error: anyhow::Error, }, + /// An incoming sync was rejected because we're backing off from this peer. + InboundRejected { + stream_set: StreamSet, + }, } impl ConnectionHandler for Handler @@ -239,6 +256,9 @@ where | State::Outbound(_, _) | State::Inbound(_, _) => {} }, + FromBehaviour::UpdateRejectUntil { reject_until } => { + self.reject_inbound_until = reject_until; + } } } @@ -260,6 +280,21 @@ where ) => { match self.state { State::Idle | State::WaitingInbound => { + // Check if we should reject this inbound sync due to backoff + if let Some(reject_until) = self.reject_inbound_until { + if reject_until > Instant::now() { + debug!( + %self.remote_peer_id, + ?self.connection_id, + ?stream_set, + "rejecting inbound sync due to backoff" + ); + self.behavior_events_queue + .push_front(FromHandler::InboundRejected { stream_set }); + return; + } + } + self.behavior_events_queue .push_front(FromHandler::Started { stream_set }); let stream = match stream_set { diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 27c2d810f..046879e1d 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -137,6 +137,7 @@ macro_rules! setup_test { per_peer_sync_delay: std::time::Duration::from_millis(10), per_peer_sync_backoff: 100.0, per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), + blocked_peers: std::collections::HashSet::new(), }; let swarm1 = Swarm::new_ephemeral(|_| { crate::libp2p::Behaviour::new(alice_peer, alice, config.clone()) @@ -353,6 +354,264 @@ fn into_peer_event(ev: crate::libp2p::Event) -> PeerEvent { } } +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn blocked_peer_connection_rejected() { + // First create Bob's swarm to get his PeerId + let bob_peer = Recon::new( + BTreeStoreErrors::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + let bob = Recon::new( + BTreeStoreErrors::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + + let bob_config = crate::libp2p::Config { + per_peer_sync_delay: std::time::Duration::from_millis(10), + per_peer_sync_backoff: 100.0, + per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), + blocked_peers: std::collections::HashSet::new(), + }; + + let mut swarm2 = + Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(bob_peer, bob, bob_config)); + + // Get Bob's peer ID and create Alice with Bob blocked + let bob_peer_id = *swarm2.local_peer_id(); + + let alice_peer = Recon::new( + BTreeStoreErrors::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + let alice = Recon::new( + BTreeStoreErrors::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + + let mut blocked_peers = std::collections::HashSet::new(); + blocked_peers.insert(bob_peer_id); + + let alice_config = crate::libp2p::Config { + per_peer_sync_delay: std::time::Duration::from_millis(10), + per_peer_sync_backoff: 100.0, + per_peer_maximum_sync_delay: std::time::Duration::from_millis(1000), + blocked_peers, + }; + + let mut swarm1 = + Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(alice_peer, alice, alice_config)); + + // Alice listens, Bob tries to connect + swarm1.listen().with_memory_addr_external().await; + + // Bob connects to Alice - the connection should be denied + let alice_addr = swarm1.external_addresses().next().unwrap().clone(); + swarm2.dial(alice_addr).unwrap(); + + // Drive the swarms - when Alice blocks Bob, the connection is established at the transport + // level but immediately closed when handle_established_inbound_connection returns ConnectionDenied. + // We verify this by checking that Bob sees ConnectionClosed right after ConnectionEstablished. + let result = tokio::time::timeout(std::time::Duration::from_millis(1000), async { + let mut connection_established = false; + loop { + tokio::select! { + _e = swarm1.next_swarm_event() => { + // Alice processes events + } + e = swarm2.next_swarm_event() => { + match e { + libp2p::swarm::SwarmEvent::ConnectionEstablished { .. } => { + connection_established = true; + } + libp2p::swarm::SwarmEvent::ConnectionClosed { peer_id, .. } => { + // Connection was closed - this happens when Alice blocks Bob + if connection_established && peer_id == *swarm1.local_peer_id() { + return true; + } + } + _ => {} + } + } + } + } + }) + .await; + + // Verify the connection was established then immediately closed (blocked peer behavior) + assert!( + result.is_ok() && result.unwrap(), + "Expected connection to be established then closed due to blocking" + ); +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn inbound_sync_rejected_during_backoff() { + // This test verifies that after sync failures, the backoff mechanism prevents syncs. + // We set up a scenario where: + // 1. Bob's model store fails repeatedly, causing sync failures + // 2. Due to the large backoff multiplier (100x), model syncs are eventually skipped + // 3. We observe that by the third sync cycle, model syncs are skipped entirely + // + // The backoff progression is: 10ms -> 1000ms (after second fail) + // This means the second model sync still happens (backoff not yet active), + // but by the third cycle, the backoff has taken effect and model syncs are skipped. + + let mut bob_model_store = BTreeStoreErrors::default(); + bob_model_store.set_error(Error::new_transient(anyhow::anyhow!( + "transient error to trigger backoff" + ))); + + let (mut swarm1, mut swarm2) = setup_test!( + BTreeStoreErrors::default(), + BTreeStoreErrors::default(), + BTreeStoreErrors::default(), + bob_model_store, + BTreeStoreErrors::default(), + BTreeStoreErrors::default(), + ); + + swarm1.listen().with_memory_addr_external().await; + swarm2.connect(&mut swarm1).await; + + // Drive through three sync cycles to observe backoff behavior + // Cycle 1 (events 0-3): Peer sync, Model sync (fails) + // Cycle 2 (events 4-7): Peer sync, Model sync (fails again, backoff increases) + // Cycle 3 (events 8-11): Peer sync, Peer sync (model skipped due to long backoff) + let (p1_events, p2_events): ([crate::libp2p::Event; 12], [crate::libp2p::Event; 12]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + + // Verify Bob saw model sync failures in the first two cycles + assert_eq!( + into_peer_event(p2_events[3].clone()).status, + PeerStatus::Failed { + stream_set: StreamSet::Model + }, + "Expected Bob to see first Model sync failure" + ); + assert_eq!( + into_peer_event(p2_events[7].clone()).status, + PeerStatus::Failed { + stream_set: StreamSet::Model + }, + "Expected Bob to see second Model sync failure" + ); + + // Verify that in the third cycle (events 8-11), only peer syncs occur + // because the model sync backoff (1000ms) now exceeds the peer sync delay (10ms) + let third_cycle_stream_sets: Vec<_> = p1_events[8..12] + .iter() + .map(|ev| { + let crate::libp2p::Event::PeerEvent(PeerEvent { status, .. }) = ev; + match status { + PeerStatus::Started { stream_set } + | PeerStatus::Synchronized { stream_set, .. } + | PeerStatus::Failed { stream_set } => Some(*stream_set), + _ => None, + } + }) + .collect(); + + // In the third cycle, backoff has kicked in - we should see only peer syncs + assert!( + third_cycle_stream_sets + .iter() + .all(|s| *s == Some(StreamSet::Peer)), + "Expected only Peer syncs in third cycle after backoff kicked in, got: {:?}", + third_cycle_stream_sets + ); +} + +#[test(tokio::test(flavor = "multi_thread", worker_threads = 2))] +async fn backoff_registry_expires() { + // This test verifies that backoff entries expire and allow syncs to resume. + // We configure a short backoff (10ms) and verify: + // 1. Initial sync fails, triggering backoff + // 2. After backoff expires, model syncs resume (they may fail again, but they're attempted) + // + // The key observation is that with a short backoff, we should see model sync attempts + // resume after the backoff period, unlike in inbound_sync_rejected_during_backoff where + // the large backoff (1000ms) prevents model syncs from occurring. + + let mut bob_model_store = BTreeStoreErrors::default(); + bob_model_store.set_error(Error::new_transient(anyhow::anyhow!( + "transient error to trigger backoff" + ))); + + // Create swarms with very short backoff times so they expire quickly + let alice = Recon::new( + BTreeStoreErrors::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + let alice_peer = Recon::new( + BTreeStoreErrors::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + let bob_peer = Recon::new( + BTreeStoreErrors::default(), + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + let bob = Recon::new( + bob_model_store, + FullInterests::default(), + Metrics::register(&mut Registry::default()), + ); + + // Use very short backoff (5ms * 2 = 10ms) that will expire before the next peer sync cycle + let config = crate::libp2p::Config { + per_peer_sync_delay: std::time::Duration::from_millis(5), + per_peer_sync_backoff: 2.0, + per_peer_maximum_sync_delay: std::time::Duration::from_millis(50), + blocked_peers: std::collections::HashSet::new(), + }; + + let mut swarm1 = + Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(alice_peer, alice, config.clone())); + let mut swarm2 = Swarm::new_ephemeral(|_| crate::libp2p::Behaviour::new(bob_peer, bob, config)); + + swarm1.listen().with_memory_addr_external().await; + swarm2.connect(&mut swarm1).await; + + // Drive through multiple sync cycles + // With short backoff (10ms), model syncs should resume after the backoff expires + // We expect to see model sync attempts (failures) interspersed with peer syncs + let (p1_events, _p2_events): ([crate::libp2p::Event; 12], [crate::libp2p::Event; 12]) = + libp2p_swarm_test::drive(&mut swarm1, &mut swarm2).await; + + // Count how many model sync events we see + let model_sync_count = p1_events + .iter() + .filter(|ev| { + let crate::libp2p::Event::PeerEvent(PeerEvent { status, .. }) = ev; + matches!( + status, + PeerStatus::Started { + stream_set: StreamSet::Model + } | PeerStatus::Synchronized { + stream_set: StreamSet::Model, + .. + } | PeerStatus::Failed { + stream_set: StreamSet::Model + } + ) + }) + .count(); + + // With short backoff, we should see multiple model sync attempts (they fail but are retried) + // This proves the backoff expires and allows new sync attempts + assert!( + model_sync_count >= 2, + "Expected at least 2 model sync events (proving backoff expired and syncs resumed), got: {}", + model_sync_count + ); +} + fn assert_in_sync(id: PeerId, events: [crate::libp2p::Event; 4]) { assert_eq!( into_peer_event(events[0].clone()), diff --git a/recon/src/metrics.rs b/recon/src/metrics.rs index 81c1b7537..c5b319586 100644 --- a/recon/src/metrics.rs +++ b/recon/src/metrics.rs @@ -28,6 +28,11 @@ pub struct Metrics { protocol_pending_items: Counter, protocol_invalid_items: Family, + + /// Number of connections denied due to peer being blocked + blocked_connection_count: Counter, + /// Number of inbound syncs rejected due to backoff + inbound_sync_rejected_count: Counter, } #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] @@ -121,6 +126,20 @@ impl Metrics { sub_registry ); + register!( + blocked_connection_count, + "Number of connections denied due to peer being blocked", + Counter::default(), + sub_registry + ); + + register!( + inbound_sync_rejected_count, + "Number of inbound syncs rejected due to backoff", + Counter::default(), + sub_registry + ); + Self { protocol_message_received_count, protocol_message_sent_count, @@ -129,6 +148,8 @@ impl Metrics { protocol_run_new_keys, protocol_pending_items, protocol_invalid_items, + blocked_connection_count, + inbound_sync_rejected_count, } } } @@ -205,3 +226,21 @@ impl Recorder for Metrics { self.protocol_pending_items.inc_by(event.0); } } + +/// Event for when a connection is blocked +#[derive(Debug)] +pub struct BlockedConnection; +impl Recorder for Metrics { + fn record(&self, _event: &BlockedConnection) { + self.blocked_connection_count.inc(); + } +} + +/// Event for when an inbound sync is rejected due to backoff +#[derive(Debug)] +pub struct InboundSyncRejected; +impl Recorder for Metrics { + fn record(&self, _event: &InboundSyncRejected) { + self.inbound_sync_rejected_count.inc(); + } +}