From 8873425621cf071a252f9152b85344b109d001c8 Mon Sep 17 00:00:00 2001 From: bomanaps Date: Thu, 12 Mar 2026 21:47:11 +0100 Subject: [PATCH 1/2] add metrics and fix checkpoint sync backfill bugs --- lean_client/containers/src/state.rs | 8 + lean_client/fork_choice/src/handlers.rs | 24 ++- lean_client/fork_choice/src/store.rs | 46 +++++- lean_client/metrics/src/metrics.rs | 122 +++++++++++++-- .../networking/src/gossipsub/tests/config.rs | 4 +- .../networking/src/gossipsub/tests/topic.rs | 50 +----- lean_client/networking/src/gossipsub/topic.rs | 27 +--- lean_client/networking/src/network/service.rs | 143 +++++++++++++++++- lean_client/src/main.rs | 63 +++++++- lean_client/validator/src/lib.rs | 20 ++- lean_client/xmss/src/aggregated_signature.rs | 2 +- lean_client/xmss/src/signature.rs | 13 +- 12 files changed, 420 insertions(+), 102 deletions(-) diff --git a/lean_client/containers/src/state.rs b/lean_client/containers/src/state.rs index 6d77370..412b346 100644 --- a/lean_client/containers/src/state.rs +++ b/lean_client/containers/src/state.rs @@ -544,6 +544,14 @@ impl State { let old_finalized_slot = finalized_slot; latest_finalized = source; finalized_slot = latest_finalized.slot; + + // Record successful finalization + METRICS.get().map(|metrics| { + metrics + .lean_finalizations_total + .with_label_values(&["success"]) + .inc(); + }); let delta = finalized_slot.0.checked_sub(old_finalized_slot.0); if let Some(delta) = delta diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 1a1a9a5..86f7994 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -25,6 +25,12 @@ pub fn on_tick(store: &mut Store, time_millis: u64, has_proposal: bool) { // Advance by one interval with appropriate signaling tick_interval(store, should_signal_proposal); } + + // Record current slot metric + let current_slot = store.time / INTERVALS_PER_SLOT; + METRICS.get().map(|metrics| { + metrics.lean_current_slot.set(current_slot as i64); + }); } /// 1. The blocks voted for must exist in our store. @@ -129,6 +135,11 @@ pub fn on_gossip_attestation( .gossip_signatures .insert(sig_key, signed_attestation.signature); + // Update gossip signatures gauge + METRICS.get().map(|metrics| { + metrics.lean_gossip_signatures.set(store.gossip_signatures.len() as i64); + }); + // Store attestation data indexed by hash for aggregation lookup store .attestation_data_by_root @@ -274,7 +285,11 @@ pub fn on_aggregated_attestation( metrics .lean_attestations_valid_total .with_label_values(&["aggregation"]) - .inc() + .inc(); + // Update gauge for new aggregated payloads count + metrics + .lean_latest_new_aggregated_payloads + .set(store.latest_new_aggregated_payloads.len() as i64); }); Ok(()) @@ -484,6 +499,13 @@ fn process_block_internal( } } + // Update gauge for known aggregated payloads count + METRICS.get().map(|metrics| { + metrics + .lean_latest_known_aggregated_payloads + .set(store.latest_known_aggregated_payloads.len() as i64); + }); + // Process each aggregated attestation's validators for fork choice // Signature verification is done in verify_signatures() before on_block() // Per Devnet-2, we process attestation data directly (not SignedAttestation) diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index 1960e9e..2939586 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -5,7 +5,7 @@ use containers::{ AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config, SignatureKey, SignedBlockWithAttestation, Slot, State, }; -use metrics::set_gauge_u64; +use metrics::{set_gauge_u64, METRICS}; use ssz::{H256, SszHash}; use xmss::Signature; @@ -295,6 +295,8 @@ pub fn get_latest_justified(states: &HashMap) -> Option<&Checkpoint } pub fn update_head(store: &mut Store) { + let old_head = store.head; + // Compute new head using LMD-GHOST from latest justified root let new_head = get_fork_choice_head( store, @@ -304,6 +306,48 @@ pub fn update_head(store: &mut Store) { ); store.head = new_head; + // Detect reorg if head changed and new head's parent is not old head + if new_head != old_head && !old_head.is_zero() { + if let Some(new_head_block) = store.blocks.get(&new_head) { + if new_head_block.parent_root != old_head { + let mut depth = 0u64; + let mut current = old_head; + + + while !current.is_zero() && depth < 100 { + if let Some(block) = store.blocks.get(¤t) { + // Check if new head descends from this block + let mut check = new_head; + while !check.is_zero() { + if check == current { + // Found common ancestor + break; + } + if let Some(b) = store.blocks.get(&check) { + check = b.parent_root; + } else { + break; + } + } + if check == current { + break; + } + depth += 1; + current = block.parent_root; + } else { + break; + } + } + + // Record reorg metrics + METRICS.get().map(|metrics| { + metrics.lean_fork_choice_reorgs_total.inc(); + metrics.lean_fork_choice_reorg_depth.observe(depth as f64); + }); + } + } + } + set_gauge_u64( |m| &m.lean_head_slot, || { diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index ea0f75f..d9bce01 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -17,6 +17,15 @@ pub struct Metrics { lean_node_start_time_seconds: IntGauge, // PQ Signature metrics + /// Total number of individual attestation signatures + pub lean_pq_sig_attestation_signatures_total: IntCounter, + + /// Total number of valid individual attestation signatures + pub lean_pq_sig_attestation_signatures_valid_total: IntCounter, + + /// Total number of invalid individual attestation signatures + pub lean_pq_sig_attestation_signatures_invalid_total: IntCounter, + /// Time taken to sign an attestation pub lean_pq_sig_attestation_signing_time_seconds: Histogram, @@ -30,7 +39,7 @@ pub struct Metrics { pub lean_pq_sig_attestations_in_aggregated_signatures_total: IntCounter, /// Time taken to build an aggregated attestation signature - pub lean_pq_sig_attestation_signatures_building_time_seconds: Histogram, + pub lean_pq_sig_aggregated_signatures_building_time_seconds: Histogram, /// Time taken to verify an aggregated attestation signature pub lean_pq_sig_aggregated_signatures_verification_time_seconds: Histogram, @@ -64,10 +73,10 @@ pub struct Metrics { pub lean_attestation_validation_time_seconds: Histogram, /// Total number of fork choice reorgs - lean_fork_choice_reorgs_total: IntCounter, + pub lean_fork_choice_reorgs_total: IntCounter, /// Depth of fork choice reorgs (in blocks) - lean_fork_choice_reorg_depth: Histogram, + pub lean_fork_choice_reorg_depth: Histogram, // State Transition Metrics /// Latest justified slot @@ -77,7 +86,7 @@ pub struct Metrics { pub lean_latest_finalized_slot: IntGauge, /// Total number of finalization attempts - lean_finalizations_total: IntCounterVec, + pub lean_finalizations_total: IntCounterVec, /// Time to process state transition pub lean_state_transition_time_seconds: Histogram, @@ -110,6 +119,27 @@ pub struct Metrics { /// Total number of peer disconnection events lean_peer_disconnection_events_total: IntCounterVec, + + /// Number of gossip signatures in fork-choice store + pub lean_gossip_signatures: IntGauge, + + /// Number of new aggregated payload items + pub lean_latest_new_aggregated_payloads: IntGauge, + + /// Number of known aggregated payload items + pub lean_latest_known_aggregated_payloads: IntGauge, + + /// Time taken to aggregate committee signatures + pub lean_committee_signatures_aggregation_time_seconds: Histogram, + + /// Validator's is_aggregator status (1=true, 0=false) + pub lean_is_aggregator: IntGauge, + + /// Node's attestation committee subnet + pub lean_attestation_committee_subnet: IntGauge, + + /// Number of attestation committees (ATTESTATION_COMMITTEE_COUNT) + pub lean_attestation_committee_count: IntGauge, } impl Metrics { @@ -125,6 +155,18 @@ impl Metrics { )?, // PQ Signature metrics + lean_pq_sig_attestation_signatures_total: IntCounter::new( + "lean_pq_sig_attestation_signatures_total", + "Total number of individual attestation signatures", + )?, + lean_pq_sig_attestation_signatures_valid_total: IntCounter::new( + "lean_pq_sig_attestation_signatures_valid_total", + "Total number of valid individual attestation signatures", + )?, + lean_pq_sig_attestation_signatures_invalid_total: IntCounter::new( + "lean_pq_sig_attestation_signatures_invalid_total", + "Total number of invalid individual attestation signatures", + )?, lean_pq_sig_attestation_signing_time_seconds: Histogram::with_opts(histogram_opts!( "lean_pq_sig_attestation_signing_time_seconds", "Time taken to sign an attestation", @@ -145,23 +187,23 @@ impl Metrics { "lean_pq_sig_attestations_in_aggregated_signatures_total", "Total number of attestations included into aggregated signatures", )?, - lean_pq_sig_attestation_signatures_building_time_seconds: Histogram::with_opts( + lean_pq_sig_aggregated_signatures_building_time_seconds: Histogram::with_opts( histogram_opts!( - "lean_pq_sig_attestation_signatures_building_time_seconds", - "Time taken to verify an aggregated attestation signature", - vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0] + "lean_pq_sig_aggregated_signatures_building_time_seconds", + "Time taken to build an aggregated attestation signature", + vec![0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0, 4.0] ), )?, lean_pq_sig_aggregated_signatures_verification_time_seconds: Histogram::with_opts( histogram_opts!( "lean_pq_sig_aggregated_signatures_verification_time_seconds", "Time taken to verify an aggregated attestation signature", - vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0] + vec![0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0, 4.0] ), )?, lean_pq_sig_aggregated_signatures_valid_total: IntCounter::new( "lean_pq_sig_aggregated_signatures_valid_total", - "On validate aggregated signature", + "Total number of valid aggregated signatures", )?, lean_pq_sig_aggregated_signatures_invalid_total: IntCounter::new( "lean_pq_sig_aggregated_signatures_invalid_total", @@ -178,7 +220,7 @@ impl Metrics { lean_fork_choice_block_processing_time_seconds: Histogram::with_opts(histogram_opts!( "lean_fork_choice_block_processing_time_seconds", "Time taken to process block", - vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0] + vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0, 1.25, 1.5, 2.0, 4.0] ))?, lean_attestations_valid_total: IntCounterVec::new( opts!( @@ -285,6 +327,40 @@ impl Metrics { ), &["direction", "reason"], )?, + + lean_gossip_signatures: IntGauge::new( + "lean_gossip_signatures", + "Number of gossip signatures in fork-choice store", + )?, + lean_latest_new_aggregated_payloads: IntGauge::new( + "lean_latest_new_aggregated_payloads", + "Number of new aggregated payload items", + )?, + lean_latest_known_aggregated_payloads: IntGauge::new( + "lean_latest_known_aggregated_payloads", + "Number of known aggregated payload items", + )?, + lean_committee_signatures_aggregation_time_seconds: Histogram::with_opts( + histogram_opts!( + "lean_committee_signatures_aggregation_time_seconds", + "Time taken to aggregate committee signatures", + vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0] + ), + )?, + + lean_is_aggregator: IntGauge::new( + "lean_is_aggregator", + "Validator's is_aggregator status (1=true, 0=false)", + )?, + + lean_attestation_committee_subnet: IntGauge::new( + "lean_attestation_committee_subnet", + "Node's attestation committee subnet", + )?, + lean_attestation_committee_count: IntGauge::new( + "lean_attestation_committee_count", + "Number of attestation committees (ATTESTATION_COMMITTEE_COUNT)", + )?, }) } @@ -293,6 +369,15 @@ impl Metrics { default_registry.register(Box::new(self.lean_node_info.clone()))?; default_registry.register(Box::new(self.lean_node_start_time_seconds.clone()))?; + default_registry.register(Box::new( + self.lean_pq_sig_attestation_signatures_total.clone(), + ))?; + default_registry.register(Box::new( + self.lean_pq_sig_attestation_signatures_valid_total.clone(), + ))?; + default_registry.register(Box::new( + self.lean_pq_sig_attestation_signatures_invalid_total.clone(), + ))?; default_registry.register(Box::new( self.lean_pq_sig_attestation_signing_time_seconds.clone(), ))?; @@ -308,7 +393,7 @@ impl Metrics { .clone(), ))?; default_registry.register(Box::new( - self.lean_pq_sig_attestation_signatures_building_time_seconds + self.lean_pq_sig_aggregated_signatures_building_time_seconds .clone(), ))?; default_registry.register(Box::new( @@ -362,6 +447,19 @@ impl Metrics { default_registry.register(Box::new(self.lean_peer_connection_events_total.clone()))?; default_registry.register(Box::new(self.lean_peer_disconnection_events_total.clone()))?; + // Additional Fork-Choice Metrics + default_registry.register(Box::new(self.lean_gossip_signatures.clone()))?; + default_registry.register(Box::new(self.lean_latest_new_aggregated_payloads.clone()))?; + default_registry.register(Box::new(self.lean_latest_known_aggregated_payloads.clone()))?; + default_registry.register(Box::new( + self.lean_committee_signatures_aggregation_time_seconds.clone(), + ))?; + + default_registry.register(Box::new(self.lean_is_aggregator.clone()))?; + + default_registry.register(Box::new(self.lean_attestation_committee_subnet.clone()))?; + default_registry.register(Box::new(self.lean_attestation_committee_count.clone()))?; + Ok(()) } diff --git a/lean_client/networking/src/gossipsub/tests/config.rs b/lean_client/networking/src/gossipsub/tests/config.rs index 5390d69..aa028ee 100644 --- a/lean_client/networking/src/gossipsub/tests/config.rs +++ b/lean_client/networking/src/gossipsub/tests/config.rs @@ -1,5 +1,5 @@ use crate::gossipsub::config::GossipsubConfig; -use crate::gossipsub::topic::{ATTESTATION_SUBNET_COUNT, GossipsubKind, get_topics}; +use crate::gossipsub::topic::{ATTESTATION_SUBNET_COUNT, GossipsubKind, get_subscription_topics}; #[test] fn test_default_parameters() { @@ -39,7 +39,7 @@ fn test_default_parameters() { #[test] fn test_set_topics() { let mut config = GossipsubConfig::new(); - let topics = get_topics("genesis".to_string()); + let topics = get_subscription_topics("genesis".to_string()); config.set_topics(topics.clone()); diff --git a/lean_client/networking/src/gossipsub/tests/topic.rs b/lean_client/networking/src/gossipsub/tests/topic.rs index ae49a4f..3da8a51 100644 --- a/lean_client/networking/src/gossipsub/tests/topic.rs +++ b/lean_client/networking/src/gossipsub/tests/topic.rs @@ -1,7 +1,7 @@ use crate::gossipsub::topic::{ AGGREGATION_TOPIC, ATTESTATION_SUBNET_COUNT, ATTESTATION_SUBNET_PREFIX, BLOCK_TOPIC, GossipsubKind, GossipsubTopic, SSZ_SNAPPY_ENCODING_POSTFIX, TOPIC_PREFIX, - get_subscription_topics, get_topics, + get_subscription_topics, }; use libp2p::gossipsub::TopicHash; @@ -94,28 +94,6 @@ fn test_topic_encoding_decoding_roundtrip() { assert_eq!(original.kind, decoded.kind); } -#[test] -fn test_get_topics_all_same_fork() { - let topics = get_topics("myfork".to_string()); - - // Block + Aggregation + ATTESTATION_SUBNET_COUNT subnets (no legacy Attestation) - let expected_count = 2 + ATTESTATION_SUBNET_COUNT as usize; - assert_eq!(topics.len(), expected_count); - - let kinds: Vec<_> = topics.iter().map(|t| t.kind.clone()).collect(); - assert!(kinds.contains(&GossipsubKind::Block)); - assert!(kinds.contains(&GossipsubKind::Aggregation)); - - // Check subnet topics - for subnet_id in 0..ATTESTATION_SUBNET_COUNT { - assert!(kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); - } - - // All should have the same fork - for topic in &topics { - assert_eq!(topic.fork, "myfork"); - } -} #[test] fn test_gossipsub_kind_display() { @@ -254,11 +232,9 @@ fn test_topic_hash_conversion() { } #[test] -fn test_get_subscription_topics_aggregator() { - // Aggregators subscribe to all topics including attestation subnets - let topics = get_subscription_topics("myfork".to_string(), true); +fn test_get_subscription_topics() { + let topics = get_subscription_topics("myfork".to_string()); - // Block + Aggregation + ATTESTATION_SUBNET_COUNT subnets let expected_count = 2 + ATTESTATION_SUBNET_COUNT as usize; assert_eq!(topics.len(), expected_count); @@ -266,27 +242,11 @@ fn test_get_subscription_topics_aggregator() { assert!(kinds.contains(&GossipsubKind::Block)); assert!(kinds.contains(&GossipsubKind::Aggregation)); - // Aggregators should have subnet topics for subnet_id in 0..ATTESTATION_SUBNET_COUNT { assert!(kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); } -} - -#[test] -fn test_get_subscription_topics_non_aggregator() { - // Non-aggregators only subscribe to Block and Aggregation - // They do NOT subscribe to attestation subnet topics (they publish to them but don't subscribe) - let topics = get_subscription_topics("myfork".to_string(), false); - - // Block + Aggregation only (no subnet topics) - assert_eq!(topics.len(), 2); - let kinds: Vec<_> = topics.iter().map(|t| t.kind.clone()).collect(); - assert!(kinds.contains(&GossipsubKind::Block)); - assert!(kinds.contains(&GossipsubKind::Aggregation)); - - // Non-aggregators should NOT have subnet topics - for subnet_id in 0..ATTESTATION_SUBNET_COUNT { - assert!(!kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); + for topic in &topics { + assert_eq!(topic.fork, "myfork"); } } diff --git a/lean_client/networking/src/gossipsub/topic.rs b/lean_client/networking/src/gossipsub/topic.rs index ef4d080..2188e30 100644 --- a/lean_client/networking/src/gossipsub/topic.rs +++ b/lean_client/networking/src/gossipsub/topic.rs @@ -44,42 +44,29 @@ impl GossipsubKind { } } -/// Get all topics (for testing or full subscription) -pub fn get_topics(fork: String) -> Vec { - get_subscription_topics(fork, true) -} - -/// Get topics for subscription based on node role (devnet-3) -/// - All nodes subscribe to Block and Aggregation topics -/// - Only aggregators subscribe to AttestationSubnet topics to collect attestations -/// - Non-aggregators publish to subnet topics but don't subscribe -pub fn get_subscription_topics(fork: String, is_aggregator: bool) -> Vec { +pub fn get_subscription_topics(fork: String) -> Vec { let mut topics = vec![ GossipsubTopic { fork: fork.clone(), kind: GossipsubKind::Block, }, - // Aggregation topic - all nodes subscribe to receive aggregated attestations GossipsubTopic { fork: fork.clone(), kind: GossipsubKind::Aggregation, }, ]; - // Only aggregators subscribe to attestation subnet topics (devnet-3) - // Non-aggregators publish to these topics but don't subscribe - if is_aggregator { - for subnet_id in 0..ATTESTATION_SUBNET_COUNT { - topics.push(GossipsubTopic { - fork: fork.clone(), - kind: GossipsubKind::AttestationSubnet(subnet_id), - }); - } + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + topics.push(GossipsubTopic { + fork: fork.clone(), + kind: GossipsubKind::AttestationSubnet(subnet_id), + }); } topics } + impl GossipsubTopic { pub fn decode(topic: &TopicHash) -> Result { let topic_parts = Self::split_topic(topic)?; diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index e3db354..9a466f9 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -20,6 +20,7 @@ use libp2p::{ gossipsub::{Event, IdentTopic, MessageAuthenticity}, identify, multiaddr::Protocol, + request_response::OutboundRequestId, swarm::{Config, ConnectionError, Swarm, SwarmEvent}, }; use libp2p_identity::{Keypair, PeerId}; @@ -46,6 +47,15 @@ use crate::{ }, }; +const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 3; +const MAX_BLOCK_FETCH_DEPTH: u32 = 512; + +struct PendingBlocksRequest { + roots: Vec, + retries: u8, + depth: u32, +} + #[derive(Debug, Clone)] pub struct NetworkServiceConfig { pub gossipsub_config: GossipsubConfig, @@ -173,6 +183,10 @@ where signed_block_provider: SignedBlockProvider, /// Shared status provider for Status req/resp protocol status_provider: StatusProvider, + /// Pending BlocksByRoot requests for retry on empty response + pending_blocks_by_root: HashMap, + /// Depth tracking per block root for limiting backward chain walking + pending_block_depths: HashMap, } impl NetworkService @@ -279,6 +293,8 @@ where chain_message_sink, signed_block_provider, status_provider, + pending_blocks_by_root: HashMap::new(), + pending_block_depths: HashMap::new(), }; service.listen(&multiaddr)?; @@ -636,15 +652,31 @@ where match event { Event::Message { peer, message, .. } => match message { - Message::Response { response, .. } => { + Message::Response { + response, + request_id, + } => { + let pending = self.pending_blocks_by_root.remove(&request_id); + let request_depth = pending.as_ref().map(|p| p.depth).unwrap_or(0); + match response { LeanResponse::BlocksByRoot(blocks) => { info!( peer = %peer, num_blocks = blocks.len(), + depth = request_depth, "Received BlocksByRoot response" ); + // Track depth for potential parent block requests + // Each block's parent will be requested at depth + 1 + for block in &blocks { + let parent_root = block.message.block.parent_root; + if !parent_root.is_zero() { + self.pending_block_depths.insert(parent_root, request_depth + 1); + } + } + // Feed received blocks back into chain processing let chain_sink = self.chain_message_sink.clone(); tokio::spawn(async move { @@ -673,7 +705,11 @@ where }); } LeanResponse::Empty => { - warn!(peer = %peer, "Received empty BlocksByRoot response"); + if let Some(req) = pending { + self.retry_blocks_by_root_request(peer, req); + } else { + warn!(peer = %peer, "Received empty BlocksByRoot response (no pending request)"); + } } _ => { warn!(peer = %peer, "Unexpected response type on BlocksByRoot protocol"); @@ -715,8 +751,16 @@ where } } }, - Event::OutboundFailure { peer, error, .. } => { + Event::OutboundFailure { + peer, + error, + request_id, + .. + } => { warn!(peer = %peer, ?error, "BlocksByRoot outbound request failed"); + if let Some(req) = self.pending_blocks_by_root.remove(&request_id) { + self.retry_blocks_by_root_request(peer, req); + } } Event::InboundFailure { peer, error, .. } => { warn!(peer = %peer, ?error, "BlocksByRoot inbound request failed"); @@ -728,6 +772,42 @@ where None } + fn retry_blocks_by_root_request(&mut self, failed_peer: PeerId, req: PendingBlocksRequest) { + if req.retries >= MAX_BLOCKS_BY_ROOT_RETRIES { + warn!( + retries = req.retries, + num_roots = req.roots.len(), + depth = req.depth, + "BlocksByRoot max retries exceeded, giving up" + ); + return; + } + + let connected_peers: Vec = self + .peer_table + .lock() + .iter() + .filter(|(id, state)| **state == ConnectionState::Connected && **id != failed_peer) + .map(|(id, _)| *id) + .collect(); + + if let Some(peer_id) = connected_peers.choose(&mut rand::rng()).cloned() { + info!( + peer = %peer_id, + retries = req.retries + 1, + depth = req.depth, + num_roots = req.roots.len(), + "Retrying BlocksByRoot request with different peer" + ); + self.send_blocks_by_root_request_internal(peer_id, req.roots, req.retries + 1, req.depth); + } else { + warn!( + num_roots = req.roots.len(), + "No other connected peers to retry BlocksByRoot request" + ); + } + } + fn handle_identify_event(&mut self, event: identify::Event) -> Option { match event { identify::Event::Received { @@ -876,13 +956,40 @@ where } } OutboundP2pRequest::RequestBlocksByRoot(roots) => { + // Look up and validate depth for each root + // Depth is set when we receive a block and track its parent + // For initial gossip-triggered requests, depth will be 0 (not found) + let mut roots_to_request = Vec::new(); + for root in roots { + let depth = self.pending_block_depths.remove(&root).unwrap_or(0); + if depth >= MAX_BLOCK_FETCH_DEPTH { + warn!( + root = %root, + depth = depth, + max_depth = MAX_BLOCK_FETCH_DEPTH, + "Skipping block request: exceeded max fetch depth" + ); + } else { + roots_to_request.push((root, depth)); + } + } + + if roots_to_request.is_empty() { + return; + } + if let Some(peer_id) = self.get_random_connected_peer() { + // Use max depth among requested roots for the batch + let depth = roots_to_request.iter().map(|(_, d)| *d).max().unwrap_or(0); + let roots: Vec = roots_to_request.into_iter().map(|(r, _)| r).collect(); + info!( peer = %peer_id, num_blocks = roots.len(), + depth = depth, "Requesting missing blocks from peer" ); - self.send_blocks_by_root_request(peer_id, roots); + self.send_blocks_by_root_request_with_depth(peer_id, roots, depth); } else { warn!("Cannot request blocks: no connected peers"); } @@ -936,6 +1043,25 @@ where } pub fn send_blocks_by_root_request(&mut self, peer_id: PeerId, roots: Vec) { + self.send_blocks_by_root_request_with_depth(peer_id, roots, 0); + } + + pub fn send_blocks_by_root_request_with_depth( + &mut self, + peer_id: PeerId, + roots: Vec, + depth: u32, + ) { + self.send_blocks_by_root_request_internal(peer_id, roots, 0, depth); + } + + fn send_blocks_by_root_request_internal( + &mut self, + peer_id: PeerId, + roots: Vec, + retries: u8, + depth: u32, + ) { if roots.is_empty() { return; } @@ -950,13 +1076,18 @@ where return; } + // Depth is tracked in PendingBlocksRequest for retries + // No need to store in pending_block_depths here - it's set when blocks are received let request = LeanRequest::BlocksByRoot(roots.clone()); - info!(peer = %peer_id, num_roots = roots.len(), "Sending BlocksByRoot request"); - let _request_id = self + info!(peer = %peer_id, num_roots = roots.len(), retries, depth, "Sending BlocksByRoot request"); + let request_id = self .swarm .behaviour_mut() .blocks_by_root_req_resp .send_request(&peer_id, request); + + self.pending_blocks_by_root + .insert(request_id, PendingBlocksRequest { roots, retries, depth }); } fn build_behaviour( diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 57fbbed..fd774a6 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -276,6 +276,15 @@ async fn main() -> Result<()> { .transpose() .context("failed to set metrics on start")?; + // Record aggregator and network metrics on startup + METRICS.get().map(|metrics| { + metrics + .lean_is_aggregator + .set(if args.is_aggregator { 1 } else { 0 }); + // ATTESTATION_SUBNET_COUNT is 1 in current implementation + metrics.lean_attestation_committee_count.set(1); + }); + let (outbound_p2p_sender, outbound_p2p_receiver) = mpsc::unbounded_channel::(); let (chain_message_sender, mut chain_message_receiver) = @@ -529,10 +538,20 @@ async fn main() -> Result<()> { None }; + // Record validator subnet metric if validator is configured + if let Some(ref service) = validator_service { + if let Some(&first_validator_id) = service.config.validator_indices.first() { + let subnet_id = compute_subnet_id(first_validator_id); + METRICS.get().map(|metrics| { + metrics + .lean_attestation_committee_subnet + .set(subnet_id as i64); + }); + } + } + let fork = "devnet0".to_string(); - // Devnet-3: Non-aggregators only subscribe to Block, Attestation, Aggregation - // Aggregators also subscribe to AttestationSubnet topics to collect attestations - let gossipsub_topics = get_subscription_topics(fork, args.is_aggregator); + let gossipsub_topics = get_subscription_topics(fork); let mut gossipsub_config = GossipsubConfig::new(); gossipsub_config.set_topics(gossipsub_topics); @@ -797,6 +816,39 @@ async fn main() -> Result<()> { .as_millis() as u64; on_tick(&mut *store.write(), now_millis, false); + // Proactive parent check: verify parent exists BEFORE calling on_block. + // This avoids unnecessary state transition attempts and is more efficient + // than catching the "Block queued" error after the fact. + let parent_exists = { + let s = store.read(); + parent_root.is_zero() || s.states.contains_key(&parent_root) + }; + + if !parent_exists { + // Queue the block and request parent without calling on_block + { + let mut s = store.write(); + s.blocks_queue + .entry(parent_root) + .or_insert_with(Vec::new) + .push(signed_block_with_attestation.clone()); + } + + warn!( + child_slot = block_slot.0, + child_block_root = %format!("0x{:x}", block_root), + missing_parent_root = %format!("0x{:x}", parent_root), + "Block queued (proactive) - parent not found, requesting via BlocksByRoot" + ); + + if let Err(req_err) = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(vec![parent_root]) + ) { + warn!("Failed to request missing parent block: {}", req_err); + } + continue; + } + match on_block(&mut *store.write(), signed_block_with_attestation.clone()) { Ok(()) => { info!("Block processed successfully"); @@ -824,14 +876,15 @@ async fn main() -> Result<()> { } } Err(e) if format!("{e:?}").starts_with("Err: (Fork-choice::Handlers::OnBlock) Block queued") => { + // This path should be rare now due to proactive check, + // but handle it for edge cases (e.g., parent pruned between check and call) warn!( child_slot = block_slot.0, child_block_root = %format!("0x{:x}", block_root), missing_parent_root = %format!("0x{:x}", parent_root), - "Block queued - parent not found, will request via BlocksByRoot" + "Block queued (fallback) - parent not found, requesting via BlocksByRoot" ); - // Request missing parent block from peers if !parent_root.is_zero() { if let Err(req_err) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(vec![parent_root]) diff --git a/lean_client/validator/src/lib.rs b/lean_client/validator/src/lib.rs index c5022e3..b562734 100644 --- a/lean_client/validator/src/lib.rs +++ b/lean_client/validator/src/lib.rs @@ -237,7 +237,9 @@ impl ValidatorService { let participants = AggregationBits::from_validator_indices(&validator_ids); // Create the aggregated signature proof - // Uses attestation_data.slot as epoch (matches ream's approach) + let timer = METRICS + .get() + .map(|m| m.lean_committee_signatures_aggregation_time_seconds.start_timer()); let proof = match AggregatedSignatureProof::aggregate( participants, public_keys, @@ -245,8 +247,12 @@ impl ValidatorService { data_root, attestation_data.slot.0 as u32, ) { - Ok(p) => p, + Ok(p) => { + stop_and_record(timer); + p + } Err(e) => { + stop_and_discard(timer); warn!(error = %e, "Failed to create aggregated signature proof"); continue; } @@ -383,12 +389,6 @@ impl ValidatorService { }; let signature = if let Some(ref key_manager) = self.key_manager { - let _timer = METRICS.get().map(|metrics| { - metrics - .lean_pq_sig_attestation_signing_time_seconds - .start_timer() - }); - // Sign with XMSS let message = attestation.hash_tree_root(); let epoch = slot.0 as u32; @@ -400,6 +400,10 @@ impl ValidatorService { }); match key_manager.sign(idx, epoch, message) { Ok(sig) => { + // Record successful attestation signature + METRICS.get().map(|metrics| { + metrics.lean_pq_sig_attestation_signatures_total.inc(); + }); info!( slot = slot.0, validator = idx, diff --git a/lean_client/xmss/src/aggregated_signature.rs b/lean_client/xmss/src/aggregated_signature.rs index be587d0..9fd08c1 100644 --- a/lean_client/xmss/src/aggregated_signature.rs +++ b/lean_client/xmss/src/aggregated_signature.rs @@ -64,7 +64,7 @@ impl AggregatedSignature { let timer = METRICS.get().map(|metrics| { metrics - .lean_pq_sig_attestation_signatures_building_time_seconds + .lean_pq_sig_aggregated_signatures_building_time_seconds .start_timer() }); diff --git a/lean_client/xmss/src/signature.rs b/lean_client/xmss/src/signature.rs index ab0085f..427c1ae 100644 --- a/lean_client/xmss/src/signature.rs +++ b/lean_client/xmss/src/signature.rs @@ -8,6 +8,7 @@ use anyhow::{Error, anyhow, Result}; use eth_ssz::DecodeError; use leansig::{serialization::Serializable, signature::SignatureScheme}; use leansig::signature::generalized_xmss::instantiations_poseidon_top_level::lifetime_2_to_the_32::hashing_optimized::SIGTopLevelTargetSumLifetime32Dim64Base8; +use metrics::METRICS; use serde::de; use serde::{Deserialize, Serialize}; use ssz::{ByteVector, H256, Ssz}; @@ -42,7 +43,17 @@ impl Signature { &self.as_lean(), ); - is_valid.then_some(()).ok_or(anyhow!("invalid signature")) + if is_valid { + METRICS.get().map(|metrics| { + metrics.lean_pq_sig_attestation_signatures_valid_total.inc(); + }); + Ok(()) + } else { + METRICS.get().map(|metrics| { + metrics.lean_pq_sig_attestation_signatures_invalid_total.inc(); + }); + Err(anyhow!("invalid signature")) + } } pub(crate) fn from_lean(signature: LeanSigSignature) -> Self { From 206eaca1044f1ca8e3af387e5786a7ead37088d4 Mon Sep 17 00:00:00 2001 From: bomanaps Date: Fri, 13 Mar 2026 10:42:22 +0100 Subject: [PATCH 2/2] address review comment --- lean_client/fork_choice/src/handlers.rs | 6 -- .../networking/src/gossipsub/tests/config.rs | 3 +- .../networking/src/gossipsub/tests/topic.rs | 55 ++++++++++++++++++- lean_client/networking/src/gossipsub/topic.rs | 35 +++++++++++- lean_client/src/main.rs | 27 +++++---- 5 files changed, 104 insertions(+), 22 deletions(-) diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 86f7994..b1586cb 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -25,12 +25,6 @@ pub fn on_tick(store: &mut Store, time_millis: u64, has_proposal: bool) { // Advance by one interval with appropriate signaling tick_interval(store, should_signal_proposal); } - - // Record current slot metric - let current_slot = store.time / INTERVALS_PER_SLOT; - METRICS.get().map(|metrics| { - metrics.lean_current_slot.set(current_slot as i64); - }); } /// 1. The blocks voted for must exist in our store. diff --git a/lean_client/networking/src/gossipsub/tests/config.rs b/lean_client/networking/src/gossipsub/tests/config.rs index aa028ee..a0fde9a 100644 --- a/lean_client/networking/src/gossipsub/tests/config.rs +++ b/lean_client/networking/src/gossipsub/tests/config.rs @@ -39,7 +39,8 @@ fn test_default_parameters() { #[test] fn test_set_topics() { let mut config = GossipsubConfig::new(); - let topics = get_subscription_topics("genesis".to_string()); + // Use aggregator mode to get all subnets for this test + let topics = get_subscription_topics("genesis".to_string(), Some(0), true); config.set_topics(topics.clone()); diff --git a/lean_client/networking/src/gossipsub/tests/topic.rs b/lean_client/networking/src/gossipsub/tests/topic.rs index 3da8a51..cf97d0c 100644 --- a/lean_client/networking/src/gossipsub/tests/topic.rs +++ b/lean_client/networking/src/gossipsub/tests/topic.rs @@ -1,7 +1,7 @@ use crate::gossipsub::topic::{ AGGREGATION_TOPIC, ATTESTATION_SUBNET_COUNT, ATTESTATION_SUBNET_PREFIX, BLOCK_TOPIC, GossipsubKind, GossipsubTopic, SSZ_SNAPPY_ENCODING_POSTFIX, TOPIC_PREFIX, - get_subscription_topics, + compute_subnet_id, get_subscription_topics, }; use libp2p::gossipsub::TopicHash; @@ -232,9 +232,11 @@ fn test_topic_hash_conversion() { } #[test] -fn test_get_subscription_topics() { - let topics = get_subscription_topics("myfork".to_string()); +fn test_get_subscription_topics_aggregator() { + // Aggregator should subscribe to ALL attestation subnets + let topics = get_subscription_topics("myfork".to_string(), Some(0), true); + // Block + Aggregation + all attestation subnets let expected_count = 2 + ATTESTATION_SUBNET_COUNT as usize; assert_eq!(topics.len(), expected_count); @@ -242,6 +244,53 @@ fn test_get_subscription_topics() { assert!(kinds.contains(&GossipsubKind::Block)); assert!(kinds.contains(&GossipsubKind::Aggregation)); + // All attestation subnets should be present + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + assert!(kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); + } + + for topic in &topics { + assert_eq!(topic.fork, "myfork"); + } +} + +#[test] +fn test_get_subscription_topics_non_aggregator_validator() { + // Non-aggregator validator should subscribe to only their own subnet + let validator_id = 5u64; + let topics = get_subscription_topics("myfork".to_string(), Some(validator_id), false); + + // Block + Aggregation + only one attestation subnet + let expected_count = 3; + assert_eq!(topics.len(), expected_count); + + let kinds: Vec<_> = topics.iter().map(|t| t.kind.clone()).collect(); + assert!(kinds.contains(&GossipsubKind::Block)); + assert!(kinds.contains(&GossipsubKind::Aggregation)); + + // Only the validator's own subnet should be present + let expected_subnet = compute_subnet_id(validator_id); + assert!(kinds.contains(&GossipsubKind::AttestationSubnet(expected_subnet))); + + for topic in &topics { + assert_eq!(topic.fork, "myfork"); + } +} + +#[test] +fn test_get_subscription_topics_non_validator() { + // Non-validator node (no validator_id) should subscribe to ALL subnets for general sync + let topics = get_subscription_topics("myfork".to_string(), None, false); + + // Block + Aggregation + all attestation subnets + let expected_count = 2 + ATTESTATION_SUBNET_COUNT as usize; + assert_eq!(topics.len(), expected_count); + + let kinds: Vec<_> = topics.iter().map(|t| t.kind.clone()).collect(); + assert!(kinds.contains(&GossipsubKind::Block)); + assert!(kinds.contains(&GossipsubKind::Aggregation)); + + // All attestation subnets should be present for subnet_id in 0..ATTESTATION_SUBNET_COUNT { assert!(kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); } diff --git a/lean_client/networking/src/gossipsub/topic.rs b/lean_client/networking/src/gossipsub/topic.rs index 2188e30..57446f8 100644 --- a/lean_client/networking/src/gossipsub/topic.rs +++ b/lean_client/networking/src/gossipsub/topic.rs @@ -44,7 +44,20 @@ impl GossipsubKind { } } -pub fn get_subscription_topics(fork: String) -> Vec { +/// Get gossipsub topics for subscription based on validator role. +/// +/// Topic subscription rules: +/// - Block and Aggregation topics: Always subscribed +/// - Attestation subnet topics: +/// - If `is_aggregator` is true: Subscribe to ALL attestation subnets (needed for aggregation) +/// - If `is_aggregator` is false and `validator_id` is Some: Subscribe only to the validator's +/// own subnet (validator_id % ATTESTATION_SUBNET_COUNT) for publishing attestations +/// - If `validator_id` is None: Subscribe to all subnets (non-validator node for general sync) +pub fn get_subscription_topics( + fork: String, + validator_id: Option, + is_aggregator: bool, +) -> Vec { let mut topics = vec![ GossipsubTopic { fork: fork.clone(), @@ -56,11 +69,29 @@ pub fn get_subscription_topics(fork: String) -> Vec { }, ]; - for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + if is_aggregator { + // Aggregators subscribe to ALL attestation subnets to collect attestations for aggregation + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + topics.push(GossipsubTopic { + fork: fork.clone(), + kind: GossipsubKind::AttestationSubnet(subnet_id), + }); + } + } else if let Some(vid) = validator_id { + // Non-aggregator validators subscribe only to their own subnet for publishing attestations + let subnet_id = compute_subnet_id(vid); topics.push(GossipsubTopic { fork: fork.clone(), kind: GossipsubKind::AttestationSubnet(subnet_id), }); + } else { + // Non-validator nodes subscribe to all subnets for general network participation + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + topics.push(GossipsubTopic { + fork: fork.clone(), + kind: GossipsubKind::AttestationSubnet(subnet_id), + }); + } } topics diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index fd774a6..9d39c84 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -538,20 +538,27 @@ async fn main() -> Result<()> { None }; + // Extract first validator ID for subnet subscription and metrics + let first_validator_id: Option = validator_service + .as_ref() + .and_then(|service| service.config.validator_indices.first().copied()); + // Record validator subnet metric if validator is configured - if let Some(ref service) = validator_service { - if let Some(&first_validator_id) = service.config.validator_indices.first() { - let subnet_id = compute_subnet_id(first_validator_id); - METRICS.get().map(|metrics| { - metrics - .lean_attestation_committee_subnet - .set(subnet_id as i64); - }); - } + if let Some(validator_id) = first_validator_id { + let subnet_id = compute_subnet_id(validator_id); + METRICS.get().map(|metrics| { + metrics + .lean_attestation_committee_subnet + .set(subnet_id as i64); + }); } let fork = "devnet0".to_string(); - let gossipsub_topics = get_subscription_topics(fork); + // Subscribe to topics based on validator role: + // - Aggregators: all attestation subnets + // - Non-aggregator validators: only their own subnet + // - Non-validators: all subnets for general sync + let gossipsub_topics = get_subscription_topics(fork, first_validator_id, args.is_aggregator); let mut gossipsub_config = GossipsubConfig::new(); gossipsub_config.set_topics(gossipsub_topics);