diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index e807bae87..2e5d42e21 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -6,7 +6,7 @@ use std::{ }, }; -use futures_util::stream::FuturesUnordered; +use futures_util::{future::join_all, stream::FuturesUnordered}; use magicblock_core::logger::{log_trace_debug, log_trace_warn}; use magicblock_metrics::metrics::{ inc_account_subscription_account_updates_count, @@ -46,6 +46,19 @@ use crate::remote_account_provider::{ // Log every 10 secs (given chain slot time is 400ms) const CLOCK_LOG_SLOT_FREQ: u64 = 25; +#[cfg(not(test))] +const SUBSCRIPTION_COMPLETION_TIMEOUT: Duration = Duration::from_secs(5); + +fn subscription_completion_timeout() -> Duration { + #[cfg(test)] + { + Duration::from_millis(100) + } + #[cfg(not(test))] + { + SUBSCRIPTION_COMPLETION_TIMEOUT + } +} // ----------------- // ChainPubsubActor @@ -152,27 +165,97 @@ impl ChainPubsubActor { shutdown_token: CancellationToken, ) { info!(client_id = client_id, "Shutting down pubsub actor"); - Self::unsubscribe_all(subscriptions, program_subs); + let result = Self::drain_and_wait_for_listener_completion( + client_id, + subscriptions, + program_subs, + ) + .await; + if let Err(err) = result { + warn!(error = ?err, client_id, "Timed out while shutting down pubsub subscriptions"); + } shutdown_token.cancel(); } - fn unsubscribe_all( + // Reconnect must not drop pooled PubsubClient instances until all + // listener tasks have stopped polling and dropped their subscription streams. + // This helper cancels one listener and waits for its completion signal, which + // listener tasks emit only after dropping update_stream and running best-effort + // unsubscribe cleanup. + async fn cancel_and_wait_for_stream_drop( + client_id: &str, + kind: &'static str, + pubkey: Pubkey, + sub: AccountSubscription, + ) -> RemoteAccountProviderResult<()> { + let timeout = subscription_completion_timeout(); + sub.cancellation_token.cancel(); + match tokio::time::timeout(timeout, sub.completion_token.cancelled()) + .await + { + Ok(()) => Ok(()), + Err(_) => { + warn!( + %pubkey, + kind, + timeout_ms = timeout.as_millis() as u64, + client_id, + "Timed out waiting for subscription listener to finish" + ); + Err(RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!( + "Timed out waiting for {kind} subscription listener {pubkey} to finish" + ), + )) + } + } + } + + // Reconnect-only safety gate: callers that may drop or replace pooled + // PubsubClient instances must drain subscription maps and wait here before + // calling PubSubConnectionPool::reconnect(). Explicit unsubscribe may cancel + // listeners without waiting because it does not drop pooled clients and leaves + // the map entry visible until listener cleanup completes. + async fn drain_and_wait_for_listener_completion( + client_id: &str, subscriptions: Arc>>, program_subs: Arc>>, - ) { - let subs = subscriptions + ) -> RemoteAccountProviderResult<()> { + let account_subs = subscriptions .lock() .expect("subscriptions lock poisoned") .drain() - .chain( - program_subs - .lock() - .expect("program subs lock poisoned") - .drain(), - ) .collect::>(); - for (_, sub) in subs { - sub.cancellation_token.cancel(); + let program_subs = program_subs + .lock() + .expect("program subs lock poisoned") + .drain() + .collect::>(); + + let cancellation_waits = account_subs + .into_iter() + .map(|(pubkey, sub)| { + Self::cancel_and_wait_for_stream_drop( + client_id, "account", pubkey, sub, + ) + }) + .chain(program_subs.into_iter().map(|(pubkey, sub)| { + Self::cancel_and_wait_for_stream_drop( + client_id, "program", pubkey, sub, + ) + })) + .collect::>(); + + let mut first_error = None; + for result in join_all(cancellation_waits).await { + if let Err(err) = result { + first_error.get_or_insert(err); + } + } + + match first_error { + Some(err) => Err(err), + None => Ok(()), } } @@ -333,12 +416,13 @@ impl ChainPubsubActor { send_ok(response, client_id); return; } - if let Some(AccountSubscription { cancellation_token }) = - subscriptions - .lock() - .expect("subcriptions lock poisoned") - .get(&pubkey) - { + let cancellation_token = subscriptions + .lock() + .expect("subcriptions lock poisoned") + .get(&pubkey) + .map(|sub| sub.cancellation_token.clone()); + + if let Some(cancellation_token) = cancellation_token { cancellation_token.cancel(); send_ok(response, client_id); } else { @@ -449,6 +533,7 @@ impl ChainPubsubActor { trace!("Adding subscription"); let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); // Insert into subscriptions HashMap immediately to prevent race condition // with unsubscribe operations @@ -462,6 +547,7 @@ impl ChainPubsubActor { pubkey, AccountSubscription { cancellation_token: cancellation_token.clone(), + completion_token: completion_token.clone(), }, ); } @@ -507,6 +593,10 @@ impl ChainPubsubActor { is_connected.clone(), &format!("Failed to subscribe to account {pubkey} after {initial_tries} retries") ); + subs.lock() + .expect("subscriptions lock poisoned") + .remove(&pubkey); + completion_token.cancel(); // RPC failed - inform the requester let _ = sub_response.send(Err(err.into())); return; @@ -579,6 +669,8 @@ impl ChainPubsubActor { } } + drop(update_stream); + // Clean up subscription with timeout to prevent hanging on dead sockets if tokio::time::timeout(Duration::from_secs(2), unsubscribe()) .await @@ -590,6 +682,7 @@ impl ChainPubsubActor { subs.lock() .expect("subscriptions lock poisoned") .remove(&pubkey); + completion_token.cancel(); }); } #[allow(clippy::too_many_arguments)] @@ -621,6 +714,7 @@ impl ChainPubsubActor { trace!("Adding program subscription"); let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); { let mut program_subs_lock = program_subs @@ -630,6 +724,7 @@ impl ChainPubsubActor { program_pubkey, AccountSubscription { cancellation_token: cancellation_token.clone(), + completion_token: completion_token.clone(), }, ); } @@ -667,6 +762,11 @@ impl ChainPubsubActor { is_connected.clone(), &format!("Failed to subscribe to program {program_pubkey}"), ); + program_subs + .lock() + .expect("program_subs lock poisoned") + .remove(&program_pubkey); + completion_token.cancel(); // RPC failed - inform the requester let _ = sub_response.send(Err(err.into())); return; @@ -774,6 +874,8 @@ impl ChainPubsubActor { } } + drop(update_stream); + // Clean up subscription with timeout to prevent hanging on dead sockets if tokio::time::timeout(Duration::from_secs(2), unsubscribe()) .await @@ -786,6 +888,7 @@ impl ChainPubsubActor { .lock() .expect("program_subs lock poisoned") .remove(&program_pubkey); + completion_token.cancel(); }); } @@ -798,8 +901,13 @@ impl ChainPubsubActor { client_id: &str, is_connected: Arc, ) -> RemoteAccountProviderResult<()> { - // 1. Ensure we cleaned all existing subscriptions - Self::unsubscribe_all(subs, program_subs); + // 1. Drain subscriptions and wait until borrowed pubsub streams are dropped. + Self::drain_and_wait_for_listener_completion( + client_id, + subs, + program_subs, + ) + .await?; // 2. Try to reconnect the pubsub connection pubsub_connection.reconnect().await?; @@ -868,7 +976,12 @@ impl ChainPubsubActor { std::mem::take(&mut *subs_lock) }; let drained_len = drained_subs.len(); - for (_, AccountSubscription { cancellation_token }) in drained_subs + for ( + _, + AccountSubscription { + cancellation_token, .. + }, + ) in drained_subs { cancellation_token.cancel(); } @@ -890,3 +1003,174 @@ impl ChainPubsubActor { }); } } + +#[cfg(test)] +mod tests { + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + }; + + use tokio::time::{sleep, Duration, Instant}; + + use super::*; + + #[tokio::test] + async fn drain_and_wait_for_listener_completion_waits_for_account_and_program_completion( + ) { + let subscriptions = Arc::new(Mutex::new(HashMap::new())); + let program_subs = Arc::new(Mutex::new(HashMap::new())); + let account_pubkey = Pubkey::new_unique(); + let program_pubkey = Pubkey::new_unique(); + + let account_cancellation_token = CancellationToken::new(); + let account_completion_token = CancellationToken::new(); + subscriptions + .lock() + .expect("subscriptions lock poisoned") + .insert( + account_pubkey, + AccountSubscription { + cancellation_token: account_cancellation_token.clone(), + completion_token: account_completion_token.clone(), + }, + ); + + let program_cancellation_token = CancellationToken::new(); + let program_completion_token = CancellationToken::new(); + program_subs + .lock() + .expect("program subs lock poisoned") + .insert( + program_pubkey, + AccountSubscription { + cancellation_token: program_cancellation_token.clone(), + completion_token: program_completion_token.clone(), + }, + ); + + let account_task_cancellation_token = + account_cancellation_token.clone(); + let account_task_completion_token = account_completion_token.clone(); + tokio::spawn(async move { + account_task_cancellation_token.cancelled().await; + sleep(Duration::from_millis(50)).await; + account_task_completion_token.cancel(); + }); + + let program_task_cancellation_token = + program_cancellation_token.clone(); + let program_task_completion_token = program_completion_token.clone(); + tokio::spawn(async move { + program_task_cancellation_token.cancelled().await; + sleep(Duration::from_millis(50)).await; + program_task_completion_token.cancel(); + }); + + let started = Instant::now(); + ChainPubsubActor::drain_and_wait_for_listener_completion( + "test_client", + subscriptions.clone(), + program_subs.clone(), + ) + .await + .unwrap(); + + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .is_empty()); + assert!(program_subs + .lock() + .expect("program subs lock poisoned") + .is_empty()); + assert!(account_cancellation_token.is_cancelled()); + assert!(program_cancellation_token.is_cancelled()); + assert!(account_completion_token.is_cancelled()); + assert!(program_completion_token.is_cancelled()); + assert!(started.elapsed() >= Duration::from_millis(50)); + } + + #[tokio::test] + async fn drain_and_wait_for_listener_completion_returns_error_when_completion_times_out( + ) { + let subscriptions = Arc::new(Mutex::new(HashMap::new())); + let program_subs = Arc::new(Mutex::new(HashMap::new())); + let account_pubkey = Pubkey::new_unique(); + let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); + + subscriptions + .lock() + .expect("subscriptions lock poisoned") + .insert( + account_pubkey, + AccountSubscription { + cancellation_token: cancellation_token.clone(), + completion_token, + }, + ); + + let result = ChainPubsubActor::drain_and_wait_for_listener_completion( + "test_client", + subscriptions.clone(), + program_subs, + ) + .await; + + assert!(result.is_err()); + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .is_empty()); + assert!(cancellation_token.is_cancelled()); + } + + #[tokio::test] + async fn explicit_unsubscribe_style_cancellation_leaves_entry_for_reconnect_drain( + ) { + let subscriptions = Arc::new(Mutex::new(HashMap::new())); + let program_subs = Arc::new(Mutex::new(HashMap::new())); + let pubkey = Pubkey::new_unique(); + let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); + + subscriptions + .lock() + .expect("subscriptions lock poisoned") + .insert( + pubkey, + AccountSubscription { + cancellation_token: cancellation_token.clone(), + completion_token: completion_token.clone(), + }, + ); + + let cancellation_token_to_cancel = subscriptions + .lock() + .expect("subscriptions lock poisoned") + .get(&pubkey) + .map(|sub| sub.cancellation_token.clone()); + cancellation_token_to_cancel.unwrap().cancel(); + + assert!(cancellation_token.is_cancelled()); + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .contains_key(&pubkey)); + + completion_token.cancel(); + ChainPubsubActor::drain_and_wait_for_listener_completion( + "test_client", + subscriptions.clone(), + program_subs, + ) + .await + .unwrap(); + + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .is_empty()); + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs b/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs index 405694cef..9363a9050 100644 --- a/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs +++ b/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs @@ -133,6 +133,7 @@ impl fmt::Display for SubscriptionUpdate { pub struct AccountSubscription { pub cancellation_token: CancellationToken, + pub completion_token: CancellationToken, } #[derive(Debug)] diff --git a/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs b/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs index 89fc2b213..21bf7ba30 100644 --- a/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs +++ b/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs @@ -163,9 +163,13 @@ impl PubSubConnectionPool { } } - /// Reconnects the pool: clears state and tries to reconnect the - /// first connection to ensure that the provider is working - /// NOTE: assumes that all existing subscriptions have been dropped. + /// Reconnects the pool by dropping pooled connections and creating a fresh one. + /// + /// Caller precondition: every account/program listener task that may hold a + /// stream created from an existing pooled client must have completed before + /// this method is called. Reconnect callers must enforce this by draining + /// subscription maps and waiting for listener completion; explicit unsubscribe + /// alone is not proof that the listener has finished. pub async fn reconnect(&self) -> PubsubClientResult<()> { while self.connections.pop().is_some() {} // We cannot reconnect an existing connection due to the lockless queue