From e170216695b8a6027a96c1528d1308257749dcb7 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 2 Jul 2026 18:46:09 +0800 Subject: [PATCH] fix: add cancellation or timeout to more free tokio::spawn tasks --- src/chain_sync/chain_follower.rs | 44 ++++++++++++++++++++++---------- src/daemon/mod.rs | 13 +++++++++- src/libp2p/service.rs | 29 +++++++++++++-------- src/rpc/methods/chain.rs | 28 ++++++++++++++------ 4 files changed, 81 insertions(+), 33 deletions(-) diff --git a/src/chain_sync/chain_follower.rs b/src/chain_sync/chain_follower.rs index 52a71471c1e5..055427eb1d18 100644 --- a/src/chain_sync/chain_follower.rs +++ b/src/chain_sync/chain_follower.rs @@ -41,6 +41,7 @@ use libp2p::PeerId; use parking_lot::Mutex; use std::time::{Duration, Instant}; use tokio::{sync::Notify, task::JoinSet}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; pub struct ChainFollower { @@ -211,6 +212,8 @@ async fn chain_follower( let seen_block_cache = SeenBlockCache::default(); let mut set = JoinSet::new(); + let cancellation_token = CancellationToken::new(); + let _cancellation_token_drop_guard = cancellation_token.drop_guard_ref(); // Increment metrics, update peer information, and forward tipsets to the state machine. set.spawn({ @@ -222,6 +225,7 @@ async fn chain_follower( let genesis = genesis.shallow_clone(); let bad_block_cache = bad_block_cache.shallow_clone(); let seen_block_cache = seen_block_cache.shallow_clone(); + let cancellation_token = cancellation_token.clone(); async move { while let Ok(event) = network_rx.recv_async().await { inc_gossipsub_event_metrics(&event); @@ -231,6 +235,7 @@ async fn chain_follower( &network, state_manager.chain_store().shallow_clone(), &genesis, + cancellation_token.clone(), ); let Ok(tipset) = (match event { @@ -312,6 +317,7 @@ async fn chain_follower( let state_changed = state_changed.shallow_clone(); let tasks = tasks.shallow_clone(); let bad_block_cache = bad_block_cache.shallow_clone(); + let cancellation_token = cancellation_token.clone(); async move { const FORK_CLEANUP_INTERVAL: Duration = Duration::from_mins(1); let mut last_fork_cleanup = Instant::now(); @@ -350,14 +356,19 @@ async fn chain_follower( let tasks = tasks.shallow_clone(); let state_machine = state_machine.shallow_clone(); let state_changed = state_changed.shallow_clone(); + let cancellation_token = cancellation_token.clone(); async move { - if let Some(event) = action.await { - state_machine.lock().update(event); - state_changed.notify_one(); - } - let mut tasks = tasks.lock(); - tasks.remove(&task); - tasks.shrink_to_fit(); + cancellation_token + .run_until_cancelled(async move { + if let Some(event) = action.await { + state_machine.lock().update(event); + state_changed.notify_one(); + } + let mut tasks = tasks.lock(); + tasks.remove(&task); + tasks.shrink_to_fit(); + }) + .await } }); } @@ -454,17 +465,24 @@ fn update_peer_info( network: &SyncNetworkContext, chain_store: ChainStore, genesis: &Tipset, + cancellation_token: CancellationToken, ) { match event { NetworkEvent::PeerConnected(peer_id) => { + let peer_id = *peer_id; let genesis_cid = *genesis.block_headers().first().cid(); + let network = network.shallow_clone(); // Spawn and immediately move on to the next event - tokio::task::spawn(handle_peer_connected_event( - network.shallow_clone(), - chain_store, - *peer_id, - genesis_cid, - )); + tokio::task::spawn(async move { + cancellation_token + .run_until_cancelled(handle_peer_connected_event( + network, + chain_store, + peer_id, + genesis_cid, + )) + .await + }); } NetworkEvent::PeerDisconnected(peer_id) => { handle_peer_disconnected_event(network, *peer_id); diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 273ebb4175f2..a374f0ec8507 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -52,6 +52,7 @@ use tokio::{ sync::mpsc, task::JoinSet, }; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; pub static GLOBAL_SNAPSHOT_GC: OnceLock> = OnceLock::new(); @@ -370,6 +371,8 @@ fn maybe_prefill_rpc_caches( let state_manager = chain_follower.state_manager.shallow_clone(); let mut validated_tipset_rx = chain_follower.subscribe_validated_tipset(); services.spawn(async move { + let cancellation_token = CancellationToken::new(); + let _cancellation_token_drop_guard = cancellation_token.drop_guard_ref(); loop { match validated_tipset_rx.recv().await { Ok(_) if !sync_status.load().is_synced() => { @@ -378,7 +381,15 @@ fn maybe_prefill_rpc_caches( } Ok(tsk) => { let state_manager = state_manager.shallow_clone(); - tokio::spawn(prefill_rpc_caches_for_tipset(state_manager, tsk)); + let cancellation_token = cancellation_token.clone(); + tokio::spawn(async move { + cancellation_token + .run_until_cancelled(prefill_rpc_caches_for_tipset( + state_manager, + tsk, + )) + .await + }); } Err(RecvError::Lagged(n)) => { warn!("validated tipset broadcast lagged: skipped {n} tipsets") diff --git a/src/libp2p/service.rs b/src/libp2p/service.rs index cd8810fd182a..f04a8f25dcf6 100644 --- a/src/libp2p/service.rs +++ b/src/libp2p/service.rs @@ -793,6 +793,7 @@ async fn handle_chain_exchange_event( ChainExchangeResponse, )>, ) { + const CHAIN_EXCHANGE_RESPONSE_TIMEOUT: Duration = Duration::from_mins(5); match ce_event { request_response::Event::Message { peer, message, .. } => match message { request_response::Message::Request { @@ -827,17 +828,23 @@ async fn handle_chain_exchange_event( .await; let db = db.shallow_clone(); - tokio::task::spawn(async move { - let _per_peer_permit = per_peer_permit; - let _global_permit = global_permit; - if let Err(e) = cx_response_tx.send(( - request_id, - channel, - make_chain_exchange_response(&db, &request), - )) { - debug!("Failed to send ChainExchangeResponse: {e:?}"); - } - }); + tokio::task::spawn(tokio::time::timeout( + CHAIN_EXCHANGE_RESPONSE_TIMEOUT, + async move { + let _per_peer_permit = per_peer_permit; + let _global_permit = global_permit; + if let Err(e) = cx_response_tx + .send_async(( + request_id, + channel, + make_chain_exchange_response(&db, &request), + )) + .await + { + debug!("Failed to send ChainExchangeResponse: {e:?}"); + } + }, + )); } request_response::Message::Response { request_id, diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 7b849238bd8a..4dd9c714c74c 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -1340,14 +1340,26 @@ pub(crate) fn chain_notify( tokio::spawn(async move { // Skip first message let _ = head_changes_rx.recv().await; - while let Ok(changes) = head_changes_rx.recv().await { - let api_changes = changes - .into_change_vec() - .into_iter() - .map(From::from) - .collect(); - if sender.send(api_changes).is_err() { - break; + loop { + match head_changes_rx.recv().await { + Ok(changes) => { + let api_changes = changes + .into_change_vec() + .into_iter() + .map(From::from) + .collect(); + if sender.send(api_changes).is_err() { + tracing::info!("chain notify subscribers are all closed"); + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + tracing::info!("head changes channel closed"); + break; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!("head changes channel lagged by {n} messages"); + } } } });