Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use libp2p::PeerId;
use parking_lot::Mutex;
use std::time::{Duration, Instant};
use tokio::{sync::Notify, task::JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};

pub struct ChainFollower {
Expand Down Expand Up @@ -211,6 +212,8 @@ async fn chain_follower(
let seen_block_cache = SeenBlockCache::default();

let mut set = JoinSet::new();
let cancellation_token = CancellationToken::new();
let _cancellation_token_drop_guard = cancellation_token.drop_guard_ref();

// Increment metrics, update peer information, and forward tipsets to the state machine.
set.spawn({
Expand All @@ -222,6 +225,7 @@ async fn chain_follower(
let genesis = genesis.shallow_clone();
let bad_block_cache = bad_block_cache.shallow_clone();
let seen_block_cache = seen_block_cache.shallow_clone();
let cancellation_token = cancellation_token.clone();
async move {
while let Ok(event) = network_rx.recv_async().await {
inc_gossipsub_event_metrics(&event);
Expand All @@ -231,6 +235,7 @@ async fn chain_follower(
&network,
state_manager.chain_store().shallow_clone(),
&genesis,
cancellation_token.clone(),
);

let Ok(tipset) = (match event {
Expand Down Expand Up @@ -312,6 +317,7 @@ async fn chain_follower(
let state_changed = state_changed.shallow_clone();
let tasks = tasks.shallow_clone();
let bad_block_cache = bad_block_cache.shallow_clone();
let cancellation_token = cancellation_token.clone();
async move {
const FORK_CLEANUP_INTERVAL: Duration = Duration::from_mins(1);
let mut last_fork_cleanup = Instant::now();
Expand Down Expand Up @@ -350,14 +356,19 @@ async fn chain_follower(
let tasks = tasks.shallow_clone();
let state_machine = state_machine.shallow_clone();
let state_changed = state_changed.shallow_clone();
let cancellation_token = cancellation_token.clone();
async move {
if let Some(event) = action.await {
state_machine.lock().update(event);
state_changed.notify_one();
}
let mut tasks = tasks.lock();
tasks.remove(&task);
tasks.shrink_to_fit();
cancellation_token
.run_until_cancelled(async move {
if let Some(event) = action.await {
state_machine.lock().update(event);
state_changed.notify_one();
}
let mut tasks = tasks.lock();
tasks.remove(&task);
tasks.shrink_to_fit();
})
.await
}
});
}
Expand Down Expand Up @@ -454,17 +465,24 @@ fn update_peer_info(
network: &SyncNetworkContext,
chain_store: ChainStore,
genesis: &Tipset,
cancellation_token: CancellationToken,
) {
match event {
NetworkEvent::PeerConnected(peer_id) => {
let peer_id = *peer_id;
let genesis_cid = *genesis.block_headers().first().cid();
let network = network.shallow_clone();
// Spawn and immediately move on to the next event
tokio::task::spawn(handle_peer_connected_event(
network.shallow_clone(),
chain_store,
*peer_id,
genesis_cid,
));
tokio::task::spawn(async move {
cancellation_token
.run_until_cancelled(handle_peer_connected_event(
network,
chain_store,
peer_id,
genesis_cid,
))
.await
});
}
NetworkEvent::PeerDisconnected(peer_id) => {
handle_peer_disconnected_event(network, *peer_id);
Expand Down
13 changes: 12 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use tokio::{
sync::mpsc,
task::JoinSet,
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

pub static GLOBAL_SNAPSHOT_GC: OnceLock<Arc<SnapshotGarbageCollector>> = OnceLock::new();
Expand Down Expand Up @@ -370,6 +371,8 @@ fn maybe_prefill_rpc_caches(
let state_manager = chain_follower.state_manager.shallow_clone();
let mut validated_tipset_rx = chain_follower.subscribe_validated_tipset();
services.spawn(async move {
let cancellation_token = CancellationToken::new();
let _cancellation_token_drop_guard = cancellation_token.drop_guard_ref();
loop {
match validated_tipset_rx.recv().await {
Ok(_) if !sync_status.load().is_synced() => {
Expand All @@ -378,7 +381,15 @@ fn maybe_prefill_rpc_caches(
}
Ok(tsk) => {
let state_manager = state_manager.shallow_clone();
tokio::spawn(prefill_rpc_caches_for_tipset(state_manager, tsk));
let cancellation_token = cancellation_token.clone();
tokio::spawn(async move {
cancellation_token
.run_until_cancelled(prefill_rpc_caches_for_tipset(
state_manager,
tsk,
))
.await
});
}
Err(RecvError::Lagged(n)) => {
warn!("validated tipset broadcast lagged: skipped {n} tipsets")
Expand Down
29 changes: 18 additions & 11 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ async fn handle_chain_exchange_event(
ChainExchangeResponse,
)>,
) {
const CHAIN_EXCHANGE_RESPONSE_TIMEOUT: Duration = Duration::from_mins(5);
match ce_event {
request_response::Event::Message { peer, message, .. } => match message {
request_response::Message::Request {
Expand Down Expand Up @@ -827,17 +828,23 @@ async fn handle_chain_exchange_event(
.await;

let db = db.shallow_clone();
tokio::task::spawn(async move {
let _per_peer_permit = per_peer_permit;
let _global_permit = global_permit;
if let Err(e) = cx_response_tx.send((
request_id,
channel,
make_chain_exchange_response(&db, &request),
)) {
debug!("Failed to send ChainExchangeResponse: {e:?}");
}
});
tokio::task::spawn(tokio::time::timeout(
CHAIN_EXCHANGE_RESPONSE_TIMEOUT,
async move {
let _per_peer_permit = per_peer_permit;
let _global_permit = global_permit;
if let Err(e) = cx_response_tx
.send_async((
request_id,
channel,
make_chain_exchange_response(&db, &request),
))
.await
{
debug!("Failed to send ChainExchangeResponse: {e:?}");
}
},
));
}
request_response::Message::Response {
request_id,
Expand Down
28 changes: 20 additions & 8 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1340,14 +1340,26 @@ pub(crate) fn chain_notify(
tokio::spawn(async move {
// Skip first message
let _ = head_changes_rx.recv().await;
while let Ok(changes) = head_changes_rx.recv().await {
let api_changes = changes
.into_change_vec()
.into_iter()
.map(From::from)
.collect();
if sender.send(api_changes).is_err() {
break;
loop {
match head_changes_rx.recv().await {
Ok(changes) => {
let api_changes = changes
.into_change_vec()
.into_iter()
.map(From::from)
.collect();
if sender.send(api_changes).is_err() {
tracing::info!("chain notify subscribers are all closed");
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("head changes channel closed");
break;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("head changes channel lagged by {n} messages");
}
}
}
});
Expand Down
Loading