diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 3ccffecf959..b2991c4a5d1 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -42,6 +42,7 @@ use std::path::Path; use std::sync::Arc; use std::sync::OnceLock; use std::time::{Duration, Instant}; +use tokio::sync::broadcast::error::RecvError; use tokio::{ net::TcpListener, signal::{ @@ -614,11 +615,19 @@ fn maybe_start_indexer_service( // Continuously listen for head changes loop { - for ts in head_changes_rx.recv().await?.applies { - tracing::debug!("Indexing tipset {}", ts.key()); - let delegated_messages = - chain_store.headers_delegated_messages(ts.block_headers().iter())?; - chain_store.process_signed_messages(&delegated_messages)?; + match head_changes_rx.recv().await { + Ok(changes) => { + for ts in changes.applies { + tracing::debug!("Indexing tipset {}", ts.key()); + let delegated_messages = chain_store + .headers_delegated_messages(ts.block_headers().iter())?; + chain_store.process_signed_messages(&delegated_messages)?; + } + } + Err(RecvError::Lagged(n)) => { + warn!("indexer service lagged: skipping {n} events") + } + Err(RecvError::Closed) => break Ok(()), } } }); diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 98a52b2ae02..c05331124e8 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -306,7 +306,13 @@ where }; for (_, hm) in rmsgs { for (_, msg) in hm { - let sequence = mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from())?; + let sequence = match mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from()) { + Ok(seq) => seq, + Err(e) => { + tracing::debug!("Failed to get the state sequence: {}", e); + continue; + } + }; if let Err(e) = add_helper( api, bls_sig_cache, diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 8d27bbd0d02..26e53ed67f8 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -617,8 +617,8 @@ where tracing::warn!("Error changing head: {e}"); } } - Err(RecvError::Lagged(e)) => { - warn!("Head change subscriber lagged: skipping {e} events"); + Err(RecvError::Lagged(n)) => { + warn!("Head change subscriber lagged: skipping {n} events"); } Err(RecvError::Closed) => { break Ok(());