From 228d411c5df9021c2431883035548d199bffccb8 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 8 May 2026 00:47:40 +0530 Subject: [PATCH 1/2] log warning in case of lagging events in indexer service --- src/daemon/mod.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 3ccffecf959..b2991c4a5d1 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -42,6 +42,7 @@ use std::path::Path; use std::sync::Arc; use std::sync::OnceLock; use std::time::{Duration, Instant}; +use tokio::sync::broadcast::error::RecvError; use tokio::{ net::TcpListener, signal::{ @@ -614,11 +615,19 @@ fn maybe_start_indexer_service( // Continuously listen for head changes loop { - for ts in head_changes_rx.recv().await?.applies { - tracing::debug!("Indexing tipset {}", ts.key()); - let delegated_messages = - chain_store.headers_delegated_messages(ts.block_headers().iter())?; - chain_store.process_signed_messages(&delegated_messages)?; + match head_changes_rx.recv().await { + Ok(changes) => { + for ts in changes.applies { + tracing::debug!("Indexing tipset {}", ts.key()); + let delegated_messages = chain_store + .headers_delegated_messages(ts.block_headers().iter())?; + chain_store.process_signed_messages(&delegated_messages)?; + } + } + Err(RecvError::Lagged(n)) => { + warn!("indexer service lagged: skipping {n} events") + } + Err(RecvError::Closed) => break Ok(()), } } }); From 63fccd4d9d64226bde3c88a60a463a5322855368 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 8 May 2026 00:48:42 +0530 Subject: [PATCH 2/2] log state sequence error and keep adding rest of the msg to msg pool --- src/message_pool/msgpool/mod.rs | 8 +++++++- src/message_pool/msgpool/msg_pool.rs | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 98a52b2ae02..c05331124e8 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -306,7 +306,13 @@ where }; for (_, hm) in rmsgs { for (_, msg) in hm { - let sequence = mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from())?; + let sequence = match mpool_ctx.get_state_sequence(state_nonce_cache, &msg.from()) { + Ok(seq) => seq, + Err(e) => { + tracing::debug!("Failed to get the state sequence: {}", e); + continue; + } + }; if let Err(e) = add_helper( api, bls_sig_cache, diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 8d27bbd0d02..26e53ed67f8 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -617,8 +617,8 @@ where tracing::warn!("Error changing head: {e}"); } } - Err(RecvError::Lagged(e)) => { - warn!("Head change subscriber lagged: skipping {e} events"); + Err(RecvError::Lagged(n)) => { + warn!("Head change subscriber lagged: skipping {n} events"); } Err(RecvError::Closed) => { break Ok(());