diff --git a/lean_client/Cargo.lock b/lean_client/Cargo.lock index da0c8d6..97e23a8 100644 --- a/lean_client/Cargo.lock +++ b/lean_client/Cargo.lock @@ -190,7 +190,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -201,7 +201,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -640,7 +640,7 @@ version = "0.3.0" source = "git+https://github.com/leanEthereum/multilinear-toolkit.git?branch=lean-vm-simple#e06cba2e214879c00c7fbc0e5b12908ddfcba588" dependencies = [ "fiat-shamir", - "itertools 0.10.5", + "itertools 0.14.0", "p3-field 0.3.0", "p3-util 0.3.0", "rand 0.9.2", @@ -1679,7 +1679,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2310,8 +2310,13 @@ dependencies = [ "anyhow", "axum", "clap", + "fork_choice", "futures", + "hex", "metrics", + "parking_lot", + "serde_json", + "ssz", "tokio", "tower-http", "tracing", @@ -2374,12 +2379,30 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", +] + [[package]] name = "hyper-util" version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ + "base64", "bytes", "futures-channel", "futures-core", @@ -2387,7 +2410,9 @@ dependencies = [ "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.1", "tokio", @@ -2701,6 +2726,16 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "iri-string" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -2820,6 +2855,8 @@ dependencies = [ "libp2p-identity 0.2.13", "metrics", "networking", + "parking_lot", + "reqwest", "ssz", "tokio", "tracing", @@ -3762,6 +3799,7 @@ dependencies = [ "tiny-keccak", "tokio", "tracing", + "typenum", "yamux 0.12.1", ] @@ -3804,7 +3842,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3868,7 +3906,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.4.0", "proc-macro2 1.0.103", "quote 1.0.42", "syn 2.0.111", @@ -5047,6 +5085,44 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", +] + [[package]] name = "resolv-conf" version = "0.7.6" @@ -5223,7 +5299,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5860,6 +5936,9 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] [[package]] name = "synstructure" @@ -5927,7 +6006,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -6093,6 +6172,16 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" @@ -6182,9 +6271,12 @@ checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "bitflags 2.10.0", "bytes", + "futures-util", "http", "http-body", + "iri-string", "pin-project-lite", + "tower", "tower-layer", "tower-service", "tracing", @@ -6606,6 +6698,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.106" @@ -6638,6 +6743,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b32828d774c412041098d182a8b38b16ea816958e07cf40eec2bc080ae137ac" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web-time" version = "1.1.0" @@ -6648,6 +6763,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "whir-p3" version = "0.1.0" diff --git a/lean_client/Cargo.toml b/lean_client/Cargo.toml index b93acde..02af6e1 100644 --- a/lean_client/Cargo.toml +++ b/lean_client/Cargo.toml @@ -312,9 +312,11 @@ http_api = { workspace = true } libp2p-identity = { workspace = true } metrics = { workspace = true } networking = { workspace = true } +parking_lot = { workspace = true } ssz = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } validator = { workspace = true } xmss = { workspace = true } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 39e8126..0b9c6a4 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -403,6 +403,8 @@ fn process_block_internal( // Store block and state, store the plain Block (not SignedBlockWithAttestation) store.blocks.insert(block_root, block.clone()); store.states.insert(block_root, new_state.clone()); + // Also store signed block for serving BlocksByRoot requests (checkpoint sync backfill) + store.signed_blocks.insert(block_root, signed_block.clone()); let justified_updated = new_state.latest_justified.slot > store.latest_justified.slot; let finalized_updated = new_state.latest_finalized.slot > store.latest_finalized.slot; diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index a059988..1960e9e 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use anyhow::{Result, anyhow, ensure}; use containers::{ - AggregatedSignatureProof, Attestation, AttestationData, Block, Checkpoint, Config, + AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config, SignatureKey, SignedBlockWithAttestation, Slot, State, }; use metrics::set_gauge_u64; @@ -58,6 +58,10 @@ pub struct Store { /// Used to look up the exact attestation data that was signed, /// matching ream's attestation_data_by_root_provider design. pub attestation_data_by_root: HashMap, + + /// Signed blocks indexed by block root. + /// Used to serve BlocksByRoot requests to peers for checkpoint sync backfill. + pub signed_blocks: HashMap, } const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; @@ -108,6 +112,30 @@ impl Store { slot: block_target.slot, } } + + pub fn compute_block_weights(&self) -> HashMap { + let attestations = extract_attestations_from_aggregated_payloads( + &self.latest_known_aggregated_payloads, + &self.attestation_data_by_root, + ); + + let start_slot = self.latest_finalized.slot; + let mut weights: HashMap = HashMap::new(); + + for attestation_data in attestations.values() { + let mut current_root = attestation_data.head.root; + + while let Some(block) = self.blocks.get(¤t_root) { + if block.slot <= start_slot { + break; + } + *weights.entry(current_root).or_insert(0) += 1; + current_root = block.parent_root; + } + } + + weights + } } /// Initialize forkchoice store from an anchor state and block @@ -120,25 +148,43 @@ pub fn get_forkchoice_store( let block = anchor_block.message.block.clone(); let block_slot = block.slot; - // Compute block root using the header hash (canonical block root) - let block_root = block.hash_tree_root(); - - let latest_justified = if anchor_state.latest_justified.root.is_zero() { - Checkpoint { - root: block_root, - slot: block_slot, - } + // Compute block root differently for genesis vs checkpoint sync: + // - Genesis (slot 0): Use block.hash_tree_root() directly + // - Checkpoint sync (slot > 0): Use BlockHeader from state.latest_block_header + // because we have the correct body_root there but may have synthetic empty body in Block + let block_root = if block_slot.0 == 0 { + block.hash_tree_root() } else { - anchor_state.latest_justified.clone() + let block_header = BlockHeader { + slot: anchor_state.latest_block_header.slot, + proposer_index: anchor_state.latest_block_header.proposer_index, + parent_root: anchor_state.latest_block_header.parent_root, + state_root: anchor_state.hash_tree_root(), + body_root: anchor_state.latest_block_header.body_root, + }; + block_header.hash_tree_root() }; - let latest_finalized = if anchor_state.latest_finalized.root.is_zero() { - Checkpoint { - root: block_root, - slot: block_slot, - } - } else { - anchor_state.latest_finalized.clone() + // Per checkpoint sync: always use anchor block's root for checkpoints. + // The original checkpoint roots point to blocks that don't exist in our store. + // We only have the anchor block, so use its root. Keep the slot from state + // to preserve justification/finalization progress information. + let latest_justified = Checkpoint { + root: block_root, + slot: if anchor_state.latest_justified.root.is_zero() { + block_slot + } else { + anchor_state.latest_justified.slot + }, + }; + + let latest_finalized = Checkpoint { + root: block_root, + slot: if anchor_state.latest_finalized.root.is_zero() { + block_slot + } else { + anchor_state.latest_finalized.slot + }, }; // Store the original anchor_state - do NOT modify it @@ -160,6 +206,7 @@ pub fn get_forkchoice_store( latest_known_aggregated_payloads: HashMap::new(), latest_new_aggregated_payloads: HashMap::new(), attestation_data_by_root: HashMap::new(), + signed_blocks: [(block_root, anchor_block)].into(), } } diff --git a/lean_client/http_api/Cargo.toml b/lean_client/http_api/Cargo.toml index d5a2524..f0b2173 100644 --- a/lean_client/http_api/Cargo.toml +++ b/lean_client/http_api/Cargo.toml @@ -7,8 +7,13 @@ edition = { workspace = true } anyhow = { workspace = true } axum = { workspace = true } clap = { workspace = true } +fork_choice = { workspace = true } futures = { workspace = true } +hex = { workspace = true } metrics = { workspace = true } +parking_lot = { workspace = true } +serde_json = { workspace = true } +ssz = { workspace = true } tokio = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } diff --git a/lean_client/http_api/src/handlers.rs b/lean_client/http_api/src/handlers.rs new file mode 100644 index 0000000..138c9d3 --- /dev/null +++ b/lean_client/http_api/src/handlers.rs @@ -0,0 +1,96 @@ +use std::sync::Arc; + +use axum::{ + Json, + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use fork_choice::store::Store; +use parking_lot::RwLock; +use serde_json::{Value, json}; +use ssz::SszWrite; + +pub type SharedStore = Arc>; + +pub async fn health() -> impl IntoResponse { + Json(json!({ + "status": "healthy", + "service": "lean-rpc-api" + })) +} + +pub async fn states_finalized(State(store): State) -> Result { + let store = store.read(); + + let finalized_root = store.latest_finalized.root; + + let state = store + .states + .get(&finalized_root) + .ok_or(StatusCode::NOT_FOUND)?; + + let ssz_bytes = state + .to_ssz() + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(( + StatusCode::OK, + [(axum::http::header::CONTENT_TYPE, "application/octet-stream")], + ssz_bytes, + ) + .into_response()) +} + +pub async fn checkpoints_justified(State(store): State) -> impl IntoResponse { + let store = store.read(); + + Json(json!({ + "slot": store.latest_justified.slot.0, + "root": format!("0x{}", hex::encode(store.latest_justified.root.as_bytes())) + })) +} + +pub async fn fork_choice(State(store): State) -> impl IntoResponse { + let store = store.read(); + + let finalized_slot = store.latest_finalized.slot; + let weights = store.compute_block_weights(); + + let nodes: Vec = store + .blocks + .iter() + .filter(|(_, block)| block.slot >= finalized_slot) + .map(|(root, block)| { + let weight = weights.get(root).copied().unwrap_or(0); + json!({ + "root": format!("0x{}", hex::encode(root.as_bytes())), + "slot": block.slot.0, + "parent_root": format!("0x{}", hex::encode(block.parent_root.as_bytes())), + "proposer_index": block.proposer_index, + "weight": weight + }) + }) + .collect(); + + let validator_count = store + .states + .get(&store.head) + .map(|state| state.validators.len_u64()) + .unwrap_or(0); + + Json(json!({ + "nodes": nodes, + "head": format!("0x{}", hex::encode(store.head.as_bytes())), + "justified": { + "slot": store.latest_justified.slot.0, + "root": format!("0x{}", hex::encode(store.latest_justified.root.as_bytes())) + }, + "finalized": { + "slot": store.latest_finalized.slot.0, + "root": format!("0x{}", hex::encode(store.latest_finalized.root.as_bytes())) + }, + "safe_target": format!("0x{}", hex::encode(store.safe_target.as_bytes())), + "validator_count": validator_count + })) +} diff --git a/lean_client/http_api/src/lib.rs b/lean_client/http_api/src/lib.rs index e269386..4642554 100644 --- a/lean_client/http_api/src/lib.rs +++ b/lean_client/http_api/src/lib.rs @@ -1,6 +1,8 @@ mod config; +pub mod handlers; mod routing; mod server; pub use config::HttpServerConfig; +pub use handlers::SharedStore; pub use server::run_server; diff --git a/lean_client/http_api/src/routing.rs b/lean_client/http_api/src/routing.rs index 3af33b9..7a0a282 100644 --- a/lean_client/http_api/src/routing.rs +++ b/lean_client/http_api/src/routing.rs @@ -1,14 +1,35 @@ -use axum::Router; +use axum::{Router, routing::get}; use metrics::metrics_module; -use crate::config::HttpServerConfig; +use crate::{ + config::HttpServerConfig, + handlers::{self, SharedStore}, +}; -pub fn normal_routes(config: &HttpServerConfig, genesis_time: u64) -> Router { +pub fn normal_routes( + config: &HttpServerConfig, + genesis_time: u64, + store: Option, +) -> Router { let mut router = Router::new(); if config.metrics_enabled() { router = router.merge(metrics_module(config.metrics.clone(), genesis_time)); } + if let Some(store) = store { + let lean_routes = Router::new() + .route("/lean/v0/health", get(handlers::health)) + .route("/lean/v0/states/finalized", get(handlers::states_finalized)) + .route( + "/lean/v0/checkpoints/justified", + get(handlers::checkpoints_justified), + ) + .route("/lean/v0/fork_choice", get(handlers::fork_choice)) + .with_state(store); + + router = router.merge(lean_routes); + } + router } diff --git a/lean_client/http_api/src/server.rs b/lean_client/http_api/src/server.rs index 9eccdb7..c1abe87 100644 --- a/lean_client/http_api/src/server.rs +++ b/lean_client/http_api/src/server.rs @@ -4,10 +4,14 @@ use anyhow::{Context, Error as AnyhowError, Result}; use futures::{TryFutureExt as _, future::FutureExt as _}; use tracing::info; -use crate::{config::HttpServerConfig, routing::normal_routes}; - -pub async fn run_server(config: HttpServerConfig, genesis_time: u64) -> Result<()> { - let router = normal_routes(&config, genesis_time); +use crate::{config::HttpServerConfig, handlers::SharedStore, routing::normal_routes}; + +pub async fn run_server( + config: HttpServerConfig, + genesis_time: u64, + store: Option, +) -> Result<()> { + let router = normal_routes(&config, genesis_time, store); let listener = config .listener() diff --git a/lean_client/networking/Cargo.toml b/lean_client/networking/Cargo.toml index 1a53517..f6e2a47 100644 --- a/lean_client/networking/Cargo.toml +++ b/lean_client/networking/Cargo.toml @@ -25,6 +25,7 @@ sha2 = { workspace = true } snap = { workspace = true } ssz = { workspace = true } tiny-keccak = { workspace = true } +typenum = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } yamux = { workspace = true } diff --git a/lean_client/networking/src/network/behaviour.rs b/lean_client/networking/src/network/behaviour.rs index d27a3be..1ea1704 100644 --- a/lean_client/networking/src/network/behaviour.rs +++ b/lean_client/networking/src/network/behaviour.rs @@ -6,7 +6,8 @@ use crate::req_resp::ReqResp; #[derive(NetworkBehaviour)] pub struct LeanNetworkBehaviour { pub identify: identify::Behaviour, - pub req_resp: ReqResp, + pub status_req_resp: ReqResp, + pub blocks_by_root_req_resp: ReqResp, pub gossipsub: GossipsubBehaviour, pub connection_limits: connection_limits::Behaviour, } diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index 8b01bb3..e3db354 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -39,9 +39,10 @@ use crate::{ enr_ext::EnrExt, gossipsub::{self, config::GossipsubConfig, message::GossipsubMessage, topic::GossipsubKind}, network::behaviour::{LeanNetworkBehaviour, LeanNetworkBehaviourEvent}, - req_resp::{self, BLOCKS_BY_ROOT_PROTOCOL_V1, LeanRequest, ReqRespMessage, STATUS_PROTOCOL_V1}, + req_resp::{self, LeanRequest, ReqRespMessage}, types::{ ChainMessage, ChainMessageSink, ConnectionState, OutboundP2pRequest, P2pRequestSource, + SignedBlockProvider, StatusProvider, }, }; @@ -168,6 +169,10 @@ where peer_count: Arc, outbound_p2p_requests: R, chain_message_sink: S, + /// Shared block provider for serving BlocksByRoot requests + signed_block_provider: SignedBlockProvider, + /// Shared status provider for Status req/resp protocol + status_provider: StatusProvider, } impl NetworkService @@ -179,12 +184,16 @@ where network_config: Arc, outbound_p2p_requests: R, chain_message_sink: S, + signed_block_provider: SignedBlockProvider, + status_provider: StatusProvider, ) -> Result { Self::new_with_peer_count( network_config, outbound_p2p_requests, chain_message_sink, Arc::new(AtomicU64::new(0)), + signed_block_provider, + status_provider, ) .await } @@ -194,6 +203,8 @@ where outbound_p2p_requests: R, chain_message_sink: S, peer_count: Arc, + signed_block_provider: SignedBlockProvider, + status_provider: StatusProvider, ) -> Result { let local_key = Keypair::generate_secp256k1(); Self::new_with_keypair( @@ -202,6 +213,8 @@ where chain_message_sink, peer_count, local_key, + signed_block_provider, + status_provider, ) .await } @@ -212,6 +225,8 @@ where chain_message_sink: S, peer_count: Arc, local_key: Keypair, + signed_block_provider: SignedBlockProvider, + status_provider: StatusProvider, ) -> Result { let behaviour = Self::build_behaviour(&local_key, &network_config)?; @@ -262,6 +277,8 @@ where peer_count, outbound_p2p_requests, chain_message_sink, + signed_block_provider, + status_provider, }; service.listen(&multiaddr)?; @@ -333,8 +350,11 @@ where LeanNetworkBehaviourEvent::Gossipsub(event) => { self.handle_gossipsub_event(event).await } - LeanNetworkBehaviourEvent::ReqResp(event) => { - self.handle_request_response_event(event) + LeanNetworkBehaviourEvent::StatusReqResp(event) => { + self.handle_status_req_resp_event(event) + } + LeanNetworkBehaviourEvent::BlocksByRootReqResp(event) => { + self.handle_blocks_by_root_req_resp_event(event) } LeanNetworkBehaviourEvent::Identify(event) => self.handle_identify_event(event), LeanNetworkBehaviourEvent::ConnectionLimits(_) => { @@ -553,7 +573,64 @@ where None } - fn handle_request_response_event(&mut self, event: ReqRespMessage) -> Option { + fn handle_status_req_resp_event(&mut self, event: ReqRespMessage) -> Option { + use crate::req_resp::LeanResponse; + use libp2p::request_response::{Event, Message}; + + match event { + Event::Message { peer, message, .. } => match message { + Message::Response { response, .. } => match response { + LeanResponse::Status(_) => { + info!(peer = %peer, "Received Status response"); + } + _ => { + warn!(peer = %peer, "Unexpected response type on Status protocol"); + } + }, + Message::Request { + request, channel, .. + } => { + use crate::req_resp::{LeanRequest, LeanResponse}; + + let response = match request { + LeanRequest::Status(_) => { + let status = self.status_provider.read().clone(); + info!(peer = %peer, finalized_slot = status.finalized.slot.0, head_slot = status.head.slot.0, "Received Status request"); + LeanResponse::Status(status) + } + _ => { + warn!(peer = %peer, "Unexpected request type on Status protocol"); + return None; + } + }; + + if let Err(e) = self + .swarm + .behaviour_mut() + .status_req_resp + .send_response(channel, response) + { + warn!(peer = %peer, ?e, "Failed to send Status response"); + } + } + }, + Event::OutboundFailure { peer, error, .. } => { + warn!(peer = %peer, ?error, "Status outbound request failed"); + } + Event::InboundFailure { peer, error, .. } => { + warn!(peer = %peer, ?error, "Status inbound request failed"); + } + Event::ResponseSent { peer, .. } => { + trace!(peer = %peer, "Status response sent"); + } + } + None + } + + fn handle_blocks_by_root_req_resp_event( + &mut self, + event: ReqRespMessage, + ) -> Option { use crate::req_resp::LeanResponse; use libp2p::request_response::{Event, Message}; @@ -595,11 +672,11 @@ where } }); } - LeanResponse::Status(_) => { - info!(peer = %peer, "Received Status response"); - } LeanResponse::Empty => { - warn!(peer = %peer, "Received empty response"); + warn!(peer = %peer, "Received empty BlocksByRoot response"); + } + _ => { + warn!(peer = %peer, "Unexpected response type on BlocksByRoot protocol"); } } } @@ -609,36 +686,43 @@ where use crate::req_resp::{LeanRequest, LeanResponse}; let response = match request { - LeanRequest::Status(_) => { - info!(peer = %peer, "Received Status request"); - LeanResponse::Status(containers::Status::default()) - } LeanRequest::BlocksByRoot(roots) => { info!(peer = %peer, num_roots = roots.len(), "Received BlocksByRoot request"); - // TODO: Lookup blocks from our store and return them - // For now, return empty to prevent timeout - LeanResponse::BlocksByRoot(vec![]) + + // Look up blocks from our signed_blocks store + let blocks_guard = self.signed_block_provider.read(); + + let blocks: Vec<_> = roots + .iter() + .filter_map(|root| blocks_guard.get(root).cloned()) + .collect(); + info!(peer = %peer, found = blocks.len(), requested = roots.len(), "Serving BlocksByRoot response"); + LeanResponse::BlocksByRoot(blocks) + } + _ => { + warn!(peer = %peer, "Unexpected request type on BlocksByRoot protocol"); + return None; } }; if let Err(e) = self .swarm .behaviour_mut() - .req_resp + .blocks_by_root_req_resp .send_response(channel, response) { - warn!(peer = %peer, ?e, "Failed to send response"); + warn!(peer = %peer, ?e, "Failed to send BlocksByRoot response"); } } }, Event::OutboundFailure { peer, error, .. } => { - warn!(peer = %peer, ?error, "Request failed"); + warn!(peer = %peer, ?error, "BlocksByRoot outbound request failed"); } Event::InboundFailure { peer, error, .. } => { - warn!(peer = %peer, ?error, "Inbound request failed"); + warn!(peer = %peer, ?error, "BlocksByRoot inbound request failed"); } Event::ResponseSent { peer, .. } => { - trace!(peer = %peer, "Response sent"); + trace!(peer = %peer, "BlocksByRoot response sent"); } } None @@ -841,14 +925,13 @@ where } fn send_status_request(&mut self, peer_id: PeerId) { - let status = containers::Status::default(); + let status = self.status_provider.read().clone(); + info!(peer = %peer_id, finalized_slot = status.finalized.slot.0, head_slot = status.head.slot.0, "Sending Status request for handshake"); let request = LeanRequest::Status(status); - - info!(peer = %peer_id, "Sending Status request for handshake"); let _request_id = self .swarm .behaviour_mut() - .req_resp + .status_req_resp .send_request(&peer_id, request); } @@ -872,7 +955,7 @@ where let _request_id = self .swarm .behaviour_mut() - .req_resp + .blocks_by_root_req_resp .send_request(&peer_id, request); } @@ -888,10 +971,8 @@ where ) .map_err(|err| anyhow!("Failed to create gossipsub behaviour: {err:?}"))?; - let req_resp = req_resp::build(vec![ - STATUS_PROTOCOL_V1.to_string(), - BLOCKS_BY_ROOT_PROTOCOL_V1.to_string(), - ]); + let status_req_resp = req_resp::build_status(); + let blocks_by_root_req_resp = req_resp::build_blocks_by_root(); let connection_limits = connection_limits::Behaviour::new( ConnectionLimits::default() @@ -902,7 +983,8 @@ where Ok(LeanNetworkBehaviour { identify, - req_resp, + status_req_resp, + blocks_by_root_req_resp, gossipsub, connection_limits, }) diff --git a/lean_client/networking/src/req_resp.rs b/lean_client/networking/src/req_resp.rs index 52a931f..ae30aa9 100644 --- a/lean_client/networking/src/req_resp.rs +++ b/lean_client/networking/src/req_resp.rs @@ -9,13 +9,29 @@ use libp2p::request_response::{ }; use snap::read::FrameDecoder; use snap::write::FrameEncoder; -use ssz::{H256, SszReadDefault as _, SszWrite as _}; +use ssz::{H256, PersistentList, Ssz, SszReadDefault as _, SszWrite as _}; +use tracing::warn; +use typenum::U1024; pub const MAX_REQUEST_BLOCKS: usize = 1024; +pub const MAX_PAYLOAD_SIZE: usize = 10 * 1024 * 1024; // 10 MiB pub const STATUS_PROTOCOL_V1: &str = "/leanconsensus/req/status/1/ssz_snappy"; pub const BLOCKS_BY_ROOT_PROTOCOL_V1: &str = "/leanconsensus/req/blocks_by_root/1/ssz_snappy"; +/// Response codes for req/resp protocol messages. +pub const RESPONSE_SUCCESS: u8 = 0; +pub const RESPONSE_INVALID_REQUEST: u8 = 1; +pub const RESPONSE_SERVER_ERROR: u8 = 2; +pub const RESPONSE_RESOURCE_UNAVAILABLE: u8 = 3; + +pub type RequestedBlockRoots = PersistentList; + +#[derive(Clone, Debug, PartialEq, Eq, Ssz)] +pub struct BlocksByRootRequest { + pub roots: RequestedBlockRoots, +} + #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct LeanProtocol(pub String); @@ -42,6 +58,41 @@ pub enum LeanResponse { pub struct LeanCodec; impl LeanCodec { + /// Encode a u32 as an unsigned LEB128 varint. + fn encode_varint(value: u32) -> Vec { + let mut result = Vec::new(); + let mut v = value; + loop { + let mut byte = (v & 0x7F) as u8; + v >>= 7; + if v != 0 { + byte |= 0x80; + } + result.push(byte); + if v == 0 { + break; + } + } + result + } + + /// Decode an unsigned LEB128 varint from data. + /// Returns (value, bytes_consumed) on success. + fn decode_varint(data: &[u8]) -> io::Result<(u32, usize)> { + let mut result = 0u32; + for (i, &byte) in data.iter().enumerate().take(5) { + let value = (byte & 0x7F) as u32; + result |= value << (7 * i); + if byte & 0x80 == 0 { + return Ok((result, i + 1)); + } + } + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid or truncated varint", + )) + } + /// Compress data using Snappy framing format (required for req/resp protocol) fn compress(data: &[u8]) -> io::Result> { let mut encoder = FrameEncoder::new(Vec::new()); @@ -59,28 +110,82 @@ impl LeanCodec { Ok(decompressed) } + /// Encode request with varint length prefix per spec: + /// [varint: uncompressed_length][snappy_framed_payload] fn encode_request(request: &LeanRequest) -> io::Result> { let ssz_bytes = match request { LeanRequest::Status(status) => status.to_ssz().map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("SSZ encode failed: {e}")) })?, LeanRequest::BlocksByRoot(roots) => { - let mut bytes = Vec::new(); + let mut request_roots = RequestedBlockRoots::default(); for root in roots { - bytes.extend_from_slice(root.as_bytes()); + request_roots.push(*root).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("Failed to add root: {e:?}")) + })?; } - bytes + let request = BlocksByRootRequest { + roots: request_roots, + }; + request.to_ssz().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("SSZ encode failed: {e}")) + })? } }; - Self::compress(&ssz_bytes) + + if ssz_bytes.len() > MAX_PAYLOAD_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Payload too large: {} > {}", + ssz_bytes.len(), + MAX_PAYLOAD_SIZE + ), + )); + } + + let compressed = Self::compress(&ssz_bytes)?; + let mut result = Self::encode_varint(ssz_bytes.len() as u32); + result.extend(compressed); + + Ok(result) } + /// Decode request with varint length prefix per spec: + /// [varint: uncompressed_length][snappy_framed_payload] fn decode_request(protocol: &str, data: &[u8]) -> io::Result { if data.is_empty() { return Ok(LeanRequest::Status(Status::default())); } - let ssz_bytes = Self::decompress(data)?; + // Parse varint length prefix + let (declared_len, varint_size) = Self::decode_varint(data)?; + + if declared_len as usize > MAX_PAYLOAD_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Declared length too large: {} > {}", + declared_len, MAX_PAYLOAD_SIZE + ), + )); + } + + // Decompress payload after varint + let compressed = &data[varint_size..]; + let ssz_bytes = Self::decompress(compressed)?; + + // Validate length matches + if ssz_bytes.len() != declared_len as usize { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Length mismatch: declared {}, got {}", + declared_len, + ssz_bytes.len() + ), + )); + } if protocol.contains("status") { let status = Status::from_ssz_default(&ssz_bytes).map_err(|e| { @@ -91,14 +196,13 @@ impl LeanCodec { })?; Ok(LeanRequest::Status(status)) } else if protocol.contains("blocks_by_root") { - let mut roots = Vec::new(); - for chunk in ssz_bytes.chunks(32) { - if chunk.len() == 32 { - let mut root = [0u8; 32]; - root.copy_from_slice(chunk); - roots.push(H256::from(root)); - } - } + let request = BlocksByRootRequest::from_ssz_default(&ssz_bytes).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("SSZ decode BlocksByRootRequest failed: {e:?}"), + ) + })?; + let roots: Vec = request.roots.into_iter().copied().collect(); if roots.len() > MAX_REQUEST_BLOCKS { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -118,39 +222,110 @@ impl LeanCodec { } } + /// Encode a single response chunk with response code and varint length prefix per spec: + /// [response_code: 1 byte][varint: uncompressed_length][snappy_framed_payload] + fn encode_response_chunk(code: u8, ssz_bytes: &[u8]) -> io::Result> { + let compressed = Self::compress(ssz_bytes)?; + let mut result = vec![code]; + result.extend(Self::encode_varint(ssz_bytes.len() as u32)); + result.extend(compressed); + Ok(result) + } + + /// Encode response per spec. For BlocksByRoot, each block is a separate chunk: + /// [code][varint][snappy(block1)][code][varint][snappy(block2)]... fn encode_response(response: &LeanResponse) -> io::Result> { - let ssz_bytes = match response { - LeanResponse::Status(status) => status.to_ssz().map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("SSZ encode failed: {e}")) - })?, + match response { + LeanResponse::Status(status) => { + let ssz_bytes = status.to_ssz().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("SSZ encode failed: {e}")) + })?; + Self::encode_response_chunk(RESPONSE_SUCCESS, &ssz_bytes) + } LeanResponse::BlocksByRoot(blocks) => { - let mut bytes = Vec::new(); + // Each block is a separate chunk with its own response code + let mut result = Vec::new(); for block in blocks { - let block_bytes = block.to_ssz().map_err(|e| { + let ssz_bytes = block.to_ssz().map_err(|e| { io::Error::new(io::ErrorKind::Other, format!("SSZ encode failed: {e}")) })?; - bytes.extend_from_slice(&block_bytes); + let chunk = Self::encode_response_chunk(RESPONSE_SUCCESS, &ssz_bytes)?; + result.extend(chunk); } - bytes + // Empty response: no chunks written (stream just ends) + Ok(result) } - LeanResponse::Empty => Vec::new(), - }; + LeanResponse::Empty => Ok(Vec::new()), + } + } + + /// Decode a single response chunk per spec: + /// [response_code: 1 byte][varint: uncompressed_length][snappy_framed_payload] + /// Returns (code, ssz_bytes, total_bytes_consumed) + fn decode_response_chunk(data: &[u8]) -> io::Result<(u8, Vec, usize)> { + if data.is_empty() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Empty response chunk", + )); + } + + // First byte is response code + let code = data[0]; + + // Parse varint length starting at offset 1 + let (declared_len, varint_size) = Self::decode_varint(&data[1..])?; - if ssz_bytes.is_empty() { - return Ok(Vec::new()); + if declared_len as usize > MAX_PAYLOAD_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Declared length too large: {} > {}", + declared_len, MAX_PAYLOAD_SIZE + ), + )); } - Self::compress(&ssz_bytes) + // Decompress payload after code + varint + let payload_start = 1 + varint_size; + let compressed = &data[payload_start..]; + let ssz_bytes = Self::decompress(compressed)?; + + // Validate length matches + if ssz_bytes.len() != declared_len as usize { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Length mismatch: declared {}, got {}", + declared_len, + ssz_bytes.len() + ), + )); + } + + // Calculate total bytes consumed (approximate - we consumed all remaining data) + let total_consumed = data.len(); + + Ok((code, ssz_bytes, total_consumed)) } + /// Decode response per spec. For BlocksByRoot, handle chunked format: + /// [code][varint][snappy(block1)][code][varint][snappy(block2)]... fn decode_response(protocol: &str, data: &[u8]) -> io::Result { if data.is_empty() { return Ok(LeanResponse::Empty); } - let ssz_bytes = Self::decompress(data)?; - if protocol.contains("status") { + let (code, ssz_bytes, _) = Self::decode_response_chunk(data)?; + + if code != RESPONSE_SUCCESS { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("Status request failed with code: {}", code), + )); + } + let status = Status::from_ssz_default(&ssz_bytes).map_err(|e| { io::Error::new( io::ErrorKind::Other, @@ -159,9 +334,18 @@ impl LeanCodec { })?; Ok(LeanResponse::Status(status)) } else if protocol.contains("blocks_by_root") { + let (code, ssz_bytes, _) = Self::decode_response_chunk(data)?; + + if code != RESPONSE_SUCCESS { + // Non-success codes indicate block not found or error + warn!(response_code = code, "BlocksByRoot non-success response"); + return Ok(LeanResponse::BlocksByRoot(Vec::new())); + } + if ssz_bytes.is_empty() { return Ok(LeanResponse::BlocksByRoot(Vec::new())); } + let block = SignedBlockWithAttestation::from_ssz_default(&ssz_bytes).map_err(|e| { io::Error::new( io::ErrorKind::Other, @@ -329,9 +513,12 @@ pub fn build(protocols: impl IntoIterator) -> ReqResp { RequestResponse::with_codec(LeanCodec::default(), protocols, Config::default()) } -pub fn build_default() -> ReqResp { - build(vec![ - STATUS_PROTOCOL_V1.to_string(), - BLOCKS_BY_ROOT_PROTOCOL_V1.to_string(), - ]) +/// Build a RequestResponse behavior for Status protocol only +pub fn build_status() -> ReqResp { + build(vec![STATUS_PROTOCOL_V1.to_string()]) +} + +/// Build a RequestResponse behavior for BlocksByRoot protocol only +pub fn build_blocks_by_root() -> ReqResp { + build(vec![BLOCKS_BY_ROOT_PROTOCOL_V1.to_string()]) } diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index fae2185..7af3615 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -1,9 +1,12 @@ -use std::{collections::HashMap, fmt::Display}; +use std::{collections::HashMap, fmt::Display, sync::Arc}; use anyhow::{Result, anyhow}; use async_trait::async_trait; -use containers::{SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation}; +use containers::{ + SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Status, +}; use metrics::METRICS; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use ssz::H256; use tokio::sync::mpsc; @@ -11,6 +14,14 @@ use tracing::warn; use crate::serde_utils::quoted_u64; +/// Shared block provider for serving BlocksByRoot requests. +/// Allows NetworkService to look up signed blocks for checkpoint sync backfill. +pub type SignedBlockProvider = Arc>>; + +/// Shared status provider for Status req/resp protocol. +/// Allows NetworkService to send accurate finalized/head checkpoints to peers. +pub type StatusProvider = Arc>; + /// 1-byte domain for gossip message-id isolation of valid snappy messages. /// Per leanSpec, prepended to the message hash when decompression succeeds. pub const MESSAGE_DOMAIN_VALID_SNAPPY: &[u8; 1] = &[0x01]; diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 24a73a5..57fbbed 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -1,8 +1,9 @@ use anyhow::{Context as _, Result}; use clap::Parser; use containers::{ - Attestation, AttestationData, Block, BlockBody, BlockSignatures, BlockWithAttestation, - Checkpoint, Config, SignedBlockWithAttestation, Slot, State, Validator, + Attestation, AttestationData, Block, BlockBody, BlockHeader, BlockSignatures, + BlockWithAttestation, Checkpoint, Config, SignedBlockWithAttestation, Slot, State, Status, + Validator, }; use ethereum_types::H256; use features::Feature; @@ -16,8 +17,10 @@ use metrics::{METRICS, Metrics}; use networking::gossipsub::config::GossipsubConfig; use networking::gossipsub::topic::{compute_subnet_id, get_subscription_topics}; use networking::network::{NetworkService, NetworkServiceConfig}; -use networking::types::{ChainMessage, OutboundP2pRequest}; -use ssz::{PersistentList, SszHash}; +use networking::types::{ChainMessage, OutboundP2pRequest, SignedBlockProvider, StatusProvider}; +use parking_lot::RwLock; +use ssz::{PersistentList, SszHash, SszReadDefault as _}; +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; @@ -40,6 +43,92 @@ fn load_node_key(path: &str) -> Result> { Ok(Keypair::from(keypair)) } +async fn download_checkpoint_state(url: &str) -> Result { + info!("Downloading checkpoint state from: {}", url); + + let client = reqwest::Client::new(); + let response = client + .get(url) + .header("Accept", "application/octet-stream") + .send() + .await + .context("Failed to send HTTP request for checkpoint state")?; + + if !response.status().is_success() { + anyhow::bail!( + "Checkpoint sync failed: HTTP {} from {}", + response.status(), + url + ); + } + + let bytes = response + .bytes() + .await + .context("Failed to read checkpoint state response body")?; + + let state = State::from_ssz_default(&bytes) + .map_err(|e| anyhow::anyhow!("Failed to decode SSZ checkpoint state: {:?}", e))?; + + info!( + "Downloaded checkpoint state at slot {} ({} bytes)", + state.latest_block_header.slot.0, + bytes.len() + ); + + Ok(state) +} + +fn verify_checkpoint_state(state: &State, genesis_state: &State) -> Result<()> { + // Verify genesis time matches + anyhow::ensure!( + state.config.genesis_time == genesis_state.config.genesis_time, + "Genesis time mismatch: checkpoint has {}, expected {}. Wrong network?", + state.config.genesis_time, + genesis_state.config.genesis_time + ); + + // Verify validator count matches + let state_validator_count = state.validators.len_u64(); + let expected_validator_count = genesis_state.validators.len_u64(); + + anyhow::ensure!( + state_validator_count == expected_validator_count, + "Validator count mismatch: checkpoint has {}, genesis expects {}. Wrong network?", + state_validator_count, + expected_validator_count + ); + + // Verify state has validators + anyhow::ensure!( + state_validator_count > 0, + "Invalid checkpoint state: no validators in registry" + ); + + // Verify each validator pubkey matches genesis + for i in 0..state_validator_count { + let state_pubkey = &state.validators.get(i).expect("validator exists").pubkey; + let genesis_pubkey = &genesis_state + .validators + .get(i) + .expect("validator exists") + .pubkey; + + anyhow::ensure!( + state_pubkey == genesis_pubkey, + "Validator pubkey mismatch at index {}: checkpoint has different validator set. Wrong network?", + i + ); + } + + info!( + "Checkpoint state verified: genesis_time={}, validators={}", + state.config.genesis_time, state_validator_count + ); + + Ok(()) +} + fn print_chain_status(store: &Store, connected_peers: u64) { let current_slot = store.time / INTERVALS_PER_SLOT; @@ -146,6 +235,9 @@ struct Args { /// When set, uses this value instead of the hardcoded default #[arg(long = "attestation-committee-count")] attestation_committee_count: Option, + + #[arg(long)] + checkpoint_sync_url: Option, } #[tokio::main] @@ -260,10 +352,110 @@ async fn main() -> Result<()> { }; let config = Config { genesis_time }; - let store = get_forkchoice_store(genesis_state.clone(), genesis_signed_block, config); - let num_validators = genesis_state.validators.len_u64(); - info!(num_validators = num_validators, "Genesis state loaded"); + let (anchor_state, anchor_block) = if let Some(ref url) = args.checkpoint_sync_url { + info!("Checkpoint sync enabled, downloading from: {}", url); + + match download_checkpoint_state(url).await { + Ok(checkpoint_state) => { + if let Err(e) = verify_checkpoint_state(&checkpoint_state, &genesis_state) { + error!("Checkpoint verification failed: {}. Refusing to start.", e); + return Err(e); + } + + // Compute state root for the checkpoint state (like zeam's genStateBlockHeader) + let checkpoint_state_root = checkpoint_state.hash_tree_root(); + + // Reconstruct block header from state's latest_block_header with correct state_root + // The state's latest_block_header already contains the correct body_root from the original block + let checkpoint_block_header = BlockHeader { + slot: checkpoint_state.latest_block_header.slot, + proposer_index: checkpoint_state.latest_block_header.proposer_index, + parent_root: checkpoint_state.latest_block_header.parent_root, + state_root: checkpoint_state_root, + body_root: checkpoint_state.latest_block_header.body_root, + }; + + // Compute block root from the BlockHeader (NOT from a synthetic Block with empty body) + let checkpoint_block_root = checkpoint_block_header.hash_tree_root(); + + // Create a Block structure for the SignedBlockWithAttestation + // Note: body is synthetic but block_root is computed correctly from header above + let checkpoint_block = Block { + slot: checkpoint_block_header.slot, + proposer_index: checkpoint_block_header.proposer_index, + parent_root: checkpoint_block_header.parent_root, + state_root: checkpoint_state_root, + body: BlockBody { + attestations: Default::default(), + }, + }; + + let checkpoint_proposer_attestation = Attestation { + validator_id: checkpoint_state.latest_block_header.proposer_index, + data: AttestationData { + slot: checkpoint_state.slot, + head: Checkpoint { + root: checkpoint_block_root, + slot: checkpoint_state.slot, + }, + target: checkpoint_state.latest_finalized.clone(), + source: checkpoint_state.latest_justified.clone(), + }, + }; + + let checkpoint_signed_block = SignedBlockWithAttestation { + message: BlockWithAttestation { + block: checkpoint_block, + proposer_attestation: checkpoint_proposer_attestation, + }, + signature: BlockSignatures { + attestation_signatures: PersistentList::default(), + proposer_signature: Signature::default(), + }, + }; + + info!( + slot = checkpoint_state.slot.0, + finalized = checkpoint_state.latest_finalized.slot.0, + justified = checkpoint_state.latest_justified.slot.0, + block_root = %format!("0x{:x}", checkpoint_block_root), + state_root = %format!("0x{:x}", checkpoint_state_root), + "Checkpoint sync successful" + ); + + (checkpoint_state, checkpoint_signed_block) + } + Err(e) => { + warn!("Checkpoint sync failed: {}. Falling back to genesis.", e); + (genesis_state.clone(), genesis_signed_block) + } + } + } else { + (genesis_state.clone(), genesis_signed_block) + }; + + // Clone anchor block for seeding the shared block provider later + let anchor_block_for_provider = anchor_block.clone(); + // Compute block root from BlockHeader (NOT from Block with potentially empty body) + // Must match the computation in get_forkchoice_store + let anchor_block_header = BlockHeader { + slot: anchor_state.latest_block_header.slot, + proposer_index: anchor_state.latest_block_header.proposer_index, + parent_root: anchor_state.latest_block_header.parent_root, + state_root: anchor_state.hash_tree_root(), + body_root: anchor_state.latest_block_header.body_root, + }; + let anchor_block_root = anchor_block_header.hash_tree_root(); + + let store = Arc::new(RwLock::new(get_forkchoice_store( + anchor_state.clone(), + anchor_block, + config, + ))); + + let num_validators = anchor_state.validators.len_u64(); + info!(num_validators = num_validators, "Anchor state loaded"); let validator_service = if let (Some(node_id), Some(registry_path)) = (&args.node_id, &args.validator_registry_path) @@ -358,6 +550,27 @@ async fn main() -> Result<()> { let peer_count = Arc::new(AtomicU64::new(0)); let peer_count_for_status = peer_count.clone(); + // Create shared block provider for BlocksByRoot requests (checkpoint sync backfill) + // Seed with anchor block so we can serve it to peers doing checkpoint sync + let mut initial_blocks = HashMap::new(); + initial_blocks.insert(anchor_block_root, anchor_block_for_provider.clone()); + + let signed_block_provider: SignedBlockProvider = Arc::new(RwLock::new(initial_blocks)); + let signed_block_provider_for_network = signed_block_provider.clone(); + + let initial_status = { + let s = store.read(); + Status::new( + s.latest_finalized.clone(), + Checkpoint { + root: s.head, + slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), + }, + ) + }; + let status_provider: StatusProvider = Arc::new(RwLock::new(initial_status)); + let status_provider_for_network = status_provider.clone(); + // LOAD NODE KEY let mut network_service = if let Some(key_path) = &args.node_key { match load_node_key(key_path) { @@ -370,6 +583,8 @@ async fn main() -> Result<()> { chain_message_sender.clone(), peer_count, keypair, + signed_block_provider_for_network, + status_provider_for_network, ) .await .expect("Failed to create network service with custom key") @@ -381,6 +596,8 @@ async fn main() -> Result<()> { outbound_p2p_receiver, chain_message_sender.clone(), peer_count, + signed_block_provider_for_network, + status_provider_for_network, ) .await .expect("Failed to create network service") @@ -392,6 +609,8 @@ async fn main() -> Result<()> { outbound_p2p_receiver, chain_message_sender.clone(), peer_count, + signed_block_provider_for_network, + status_provider_for_network, ) .await .expect("Failed to create network service") @@ -405,16 +624,16 @@ async fn main() -> Result<()> { let chain_outbound_sender = outbound_p2p_sender.clone(); + let http_store = store.clone(); task::spawn(async move { - if args.http_config.metrics_enabled() { - if let Err(err) = http_api::run_server(args.http_config, genesis_time).await { - error!("HTTP Server failed with error: {err:?}"); - } + if let Err(err) = + http_api::run_server(args.http_config, genesis_time, Some(http_store)).await + { + error!("HTTP Server failed with error: {err:?}"); } }); let chain_handle = task::spawn(async move { - // Devnet-3: 5 intervals per slot at 800ms each (4 second slots) let mut tick_interval = interval(Duration::from_millis(800)); let mut last_logged_slot = 0u64; let mut last_status_slot: Option = None; @@ -422,24 +641,24 @@ async fn main() -> Result<()> { let mut last_attestation_slot: Option = None; let peer_count = peer_count_for_status; - let mut store = store; loop { tokio::select! { _ = tick_interval.tick() => { - // Devnet-3: on_tick expects time in milliseconds let now_millis = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; - on_tick(&mut store, now_millis, false); + on_tick(&mut *store.write(), now_millis, false); - let current_slot = store.time / INTERVALS_PER_SLOT; - let current_interval = store.time % INTERVALS_PER_SLOT; + let (current_slot, current_interval) = { + let s = store.read(); + (s.time / INTERVALS_PER_SLOT, s.time % INTERVALS_PER_SLOT) + }; if last_status_slot != Some(current_slot) { let peers = peer_count.load(Ordering::Relaxed); - print_chain_status(&store, peers); + print_chain_status(&*store.read(), peers); last_status_slot = Some(current_slot); } @@ -454,7 +673,7 @@ async fn main() -> Result<()> { "Our turn to propose block!" ); - match vs.build_block_proposal(&mut store, Slot(current_slot), proposer_idx) { + match vs.build_block_proposal(&mut *store.write(), Slot(current_slot), proposer_idx) { Ok(signed_block) => { let block_root = signed_block.message.block.hash_tree_root(); info!( @@ -463,14 +682,13 @@ async fn main() -> Result<()> { "Built block, processing and gossiping" ); - // Synchronize store time with wall clock before processing own block let now_millis = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; - on_tick(&mut store, now_millis, false); + on_tick(&mut *store.write(), now_millis, false); - match on_block(&mut store, signed_block.clone()) { + match on_block(&mut *store.write(), signed_block.clone()) { Ok(()) => { info!("Own block processed successfully"); // GOSSIP TO NETWORK @@ -493,7 +711,7 @@ async fn main() -> Result<()> { 1 => { if let Some(ref vs) = validator_service { if last_attestation_slot != Some(current_slot) { - let attestations = vs.create_attestations(&store, Slot(current_slot)); + let attestations = vs.create_attestations(&*store.read(), Slot(current_slot)); for signed_att in attestations { let validator_id = signed_att.validator_id; let subnet_id = compute_subnet_id(validator_id); @@ -504,7 +722,7 @@ async fn main() -> Result<()> { "Broadcasting attestation to subnet" ); - match on_attestation(&mut store, signed_att.clone(), false) { + match on_attestation(&mut *store.write(), signed_att.clone(), false) { Ok(()) => { if let Err(e) = chain_outbound_sender.send( OutboundP2pRequest::GossipAttestation(signed_att, subnet_id) @@ -520,9 +738,8 @@ async fn main() -> Result<()> { } } 2 => { - // Interval 2: Aggregation phase (devnet-3) if let Some(ref vs) = validator_service { - if let Some(aggregations) = vs.maybe_aggregate(&store, Slot(current_slot)) { + if let Some(aggregations) = vs.maybe_aggregate(&*store.read(), Slot(current_slot)) { for aggregation in aggregations { if let Err(e) = chain_outbound_sender.send( OutboundP2pRequest::GossipAggregation(aggregation) @@ -530,19 +747,17 @@ async fn main() -> Result<()> { warn!("Failed to gossip aggregation: {}", e); } } - info!(slot = current_slot, tick = store.time, "Aggregation phase - broadcast aggregated attestations"); + info!(slot = current_slot, tick = store.read().time, "Aggregation phase - broadcast aggregated attestations"); } else { - info!(slot = current_slot, tick = store.time, "Aggregation phase - no aggregation duty or no attestations"); + info!(slot = current_slot, tick = store.read().time, "Aggregation phase - no aggregation duty or no attestations"); } } } 3 => { - // Interval 3: Safe target update (devnet-3) - info!(slot = current_slot, tick = store.time, "Computing safe target"); + info!(slot = current_slot, tick = store.read().time, "Computing safe target"); } 4 => { - // Interval 4: Accept attestations (devnet-3) - info!(slot = current_slot, tick = store.time, "Accepting new attestations"); + info!(slot = current_slot, tick = store.read().time, "Accepting new attestations"); } _ => {} } @@ -550,7 +765,7 @@ async fn main() -> Result<()> { if current_slot != last_logged_slot && current_slot % 10 == 0 { debug!("(Okay)Store time updated : slot {}, pending blocks: {}", current_slot, - store.blocks_queue.values().map(|v| v.len()).sum::() + store.read().blocks_queue.values().map(|v| v.len()).sum::() ); last_logged_slot = current_slot; } @@ -571,21 +786,33 @@ async fn main() -> Result<()> { info!( slot = block_slot.0, block_root = %format!("0x{:x}", block_root), + parent_root = %format!("0x{:x}", parent_root), "Processing block built by Validator {}", proposer ); - // Synchronize store time with wall clock before processing block let now_millis = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; - on_tick(&mut store, now_millis, false); + on_tick(&mut *store.write(), now_millis, false); - match on_block(&mut store, signed_block_with_attestation.clone()) { + match on_block(&mut *store.write(), signed_block_with_attestation.clone()) { Ok(()) => { info!("Block processed successfully"); + signed_block_provider.write().insert(block_root, signed_block_with_attestation.clone()); + + { + let s = store.read(); + let mut status = status_provider.write(); + status.finalized = s.latest_finalized.clone(); + status.head = Checkpoint { + root: s.head, + slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), + }; + } + if should_gossip { if let Err(e) = outbound_p2p_sender.send( OutboundP2pRequest::GossipBlockWithAttestation(signed_block_with_attestation) @@ -597,7 +824,12 @@ async fn main() -> Result<()> { } } Err(e) if format!("{e:?}").starts_with("Err: (Fork-choice::Handlers::OnBlock) Block queued") => { - debug!("Block queued, requesting missing parent: {}", e); + warn!( + child_slot = block_slot.0, + child_block_root = %format!("0x{:x}", block_root), + missing_parent_root = %format!("0x{:x}", parent_root), + "Block queued - parent not found, will request via BlocksByRoot" + ); // Request missing parent block from peers if !parent_root.is_zero() { @@ -605,8 +837,6 @@ async fn main() -> Result<()> { OutboundP2pRequest::RequestBlocksByRoot(vec![parent_root]) ) { warn!("Failed to request missing parent block: {}", req_err); - } else { - debug!("Requested missing parent block: 0x{:x}", parent_root); } } } @@ -631,7 +861,7 @@ async fn main() -> Result<()> { validator_id ); - match on_attestation(&mut store, signed_attestation.clone(), false) { + match on_attestation(&mut *store.write(), signed_attestation.clone(), false) { Ok(()) => { if should_gossip { let subnet_id = compute_subnet_id(validator_id); @@ -661,8 +891,7 @@ async fn main() -> Result<()> { .filter(|b| **b) .count(); - // Devnet-3: Process aggregated attestation for safe target computation - match on_aggregated_attestation(&mut store, signed_aggregated_attestation.clone()) { + match on_aggregated_attestation(&mut *store.write(), signed_aggregated_attestation.clone()) { Ok(_) => { info!( slot = agg_slot,