From 9621f8d58113d6ec8dbc7a9d1617c3de118a6a6d Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 27 May 2026 15:58:38 +0800 Subject: [PATCH 1/7] fix: signal pubsub listener completion --- .../chain_pubsub_actor.rs | 37 ++++++++++++++++--- .../remote_account_provider/pubsub_common.rs | 1 + 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index e807bae87..44fca8bba 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -333,11 +333,12 @@ impl ChainPubsubActor { send_ok(response, client_id); return; } - if let Some(AccountSubscription { cancellation_token }) = - subscriptions - .lock() - .expect("subcriptions lock poisoned") - .get(&pubkey) + if let Some(AccountSubscription { + cancellation_token, .. + }) = subscriptions + .lock() + .expect("subcriptions lock poisoned") + .get(&pubkey) { cancellation_token.cancel(); send_ok(response, client_id); @@ -449,6 +450,7 @@ impl ChainPubsubActor { trace!("Adding subscription"); let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); // Insert into subscriptions HashMap immediately to prevent race condition // with unsubscribe operations @@ -462,6 +464,7 @@ impl ChainPubsubActor { pubkey, AccountSubscription { cancellation_token: cancellation_token.clone(), + completion_token: completion_token.clone(), }, ); } @@ -507,6 +510,10 @@ impl ChainPubsubActor { is_connected.clone(), &format!("Failed to subscribe to account {pubkey} after {initial_tries} retries") ); + subs.lock() + .expect("subscriptions lock poisoned") + .remove(&pubkey); + completion_token.cancel(); // RPC failed - inform the requester let _ = sub_response.send(Err(err.into())); return; @@ -579,6 +586,8 @@ impl ChainPubsubActor { } } + drop(update_stream); + // Clean up subscription with timeout to prevent hanging on dead sockets if tokio::time::timeout(Duration::from_secs(2), unsubscribe()) .await @@ -590,6 +599,7 @@ impl ChainPubsubActor { subs.lock() .expect("subscriptions lock poisoned") .remove(&pubkey); + completion_token.cancel(); }); } #[allow(clippy::too_many_arguments)] @@ -621,6 +631,7 @@ impl ChainPubsubActor { trace!("Adding program subscription"); let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); { let mut program_subs_lock = program_subs @@ -630,6 +641,7 @@ impl ChainPubsubActor { program_pubkey, AccountSubscription { cancellation_token: cancellation_token.clone(), + completion_token: completion_token.clone(), }, ); } @@ -667,6 +679,11 @@ impl ChainPubsubActor { is_connected.clone(), &format!("Failed to subscribe to program {program_pubkey}"), ); + program_subs + .lock() + .expect("program_subs lock poisoned") + .remove(&program_pubkey); + completion_token.cancel(); // RPC failed - inform the requester let _ = sub_response.send(Err(err.into())); return; @@ -774,6 +791,8 @@ impl ChainPubsubActor { } } + drop(update_stream); + // Clean up subscription with timeout to prevent hanging on dead sockets if tokio::time::timeout(Duration::from_secs(2), unsubscribe()) .await @@ -786,6 +805,7 @@ impl ChainPubsubActor { .lock() .expect("program_subs lock poisoned") .remove(&program_pubkey); + completion_token.cancel(); }); } @@ -868,7 +888,12 @@ impl ChainPubsubActor { std::mem::take(&mut *subs_lock) }; let drained_len = drained_subs.len(); - for (_, AccountSubscription { cancellation_token }) in drained_subs + for ( + _, + AccountSubscription { + cancellation_token, .. + }, + ) in drained_subs { cancellation_token.cancel(); } diff --git a/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs b/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs index 405694cef..9363a9050 100644 --- a/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs +++ b/magicblock-chainlink/src/remote_account_provider/pubsub_common.rs @@ -133,6 +133,7 @@ impl fmt::Display for SubscriptionUpdate { pub struct AccountSubscription { pub cancellation_token: CancellationToken, + pub completion_token: CancellationToken, } #[derive(Debug)] From 678e8b7010a38442c141c454572231fa0c0a709b Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 27 May 2026 16:02:38 +0800 Subject: [PATCH 2/7] fix: await pubsub unsubscribe completion --- .../chain_pubsub_actor.rs | 104 ++++++++++++++---- .../pubsub_connection_pool.rs | 9 +- 2 files changed, 90 insertions(+), 23 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index 44fca8bba..f90cbc09b 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -46,6 +46,7 @@ use crate::remote_account_provider::{ // Log every 10 secs (given chain slot time is 400ms) const CLOCK_LOG_SLOT_FREQ: u64 = 25; +const SUBSCRIPTION_COMPLETION_TIMEOUT: Duration = Duration::from_secs(5); // ----------------- // ChainPubsubActor @@ -152,27 +153,84 @@ impl ChainPubsubActor { shutdown_token: CancellationToken, ) { info!(client_id = client_id, "Shutting down pubsub actor"); - Self::unsubscribe_all(subscriptions, program_subs); + let result = + Self::unsubscribe_all(client_id, subscriptions, program_subs).await; + if let Err(err) = result { + warn!(error = ?err, client_id, "Timed out while shutting down pubsub subscriptions"); + } shutdown_token.cancel(); } - fn unsubscribe_all( + async fn cancel_and_wait_subscription( + client_id: &str, + kind: &'static str, + pubkey: Pubkey, + sub: AccountSubscription, + ) -> RemoteAccountProviderResult<()> { + sub.cancellation_token.cancel(); + match tokio::time::timeout( + SUBSCRIPTION_COMPLETION_TIMEOUT, + sub.completion_token.cancelled(), + ) + .await + { + Ok(()) => Ok(()), + Err(_) => { + warn!( + %pubkey, + kind, + timeout_ms = SUBSCRIPTION_COMPLETION_TIMEOUT.as_millis() as u64, + client_id, + "Timed out waiting for subscription listener to finish" + ); + Err(RemoteAccountProviderError::AccountSubscriptionsTaskFailed( + format!( + "Timed out waiting for {kind} subscription listener {pubkey} to finish" + ), + )) + } + } + } + + async fn unsubscribe_all( + client_id: &str, subscriptions: Arc>>, program_subs: Arc>>, - ) { - let subs = subscriptions + ) -> RemoteAccountProviderResult<()> { + let account_subs = subscriptions .lock() .expect("subscriptions lock poisoned") .drain() - .chain( - program_subs - .lock() - .expect("program subs lock poisoned") - .drain(), - ) .collect::>(); - for (_, sub) in subs { - sub.cancellation_token.cancel(); + let program_subs = program_subs + .lock() + .expect("program subs lock poisoned") + .drain() + .collect::>(); + + let mut first_error = None; + for (pubkey, sub) in account_subs { + if let Err(err) = Self::cancel_and_wait_subscription( + client_id, "account", pubkey, sub, + ) + .await + { + first_error.get_or_insert(err); + } + } + for (pubkey, sub) in program_subs { + if let Err(err) = Self::cancel_and_wait_subscription( + client_id, "program", pubkey, sub, + ) + .await + { + first_error.get_or_insert(err); + } + } + + match first_error { + Some(err) => Err(err), + None => Ok(()), } } @@ -333,15 +391,21 @@ impl ChainPubsubActor { send_ok(response, client_id); return; } - if let Some(AccountSubscription { - cancellation_token, .. - }) = subscriptions + let sub = subscriptions .lock() .expect("subcriptions lock poisoned") - .get(&pubkey) - { - cancellation_token.cancel(); - send_ok(response, client_id); + .remove(&pubkey); + if let Some(sub) = sub { + match Self::cancel_and_wait_subscription( + client_id, "account", pubkey, sub, + ) + .await + { + Ok(()) => send_ok(response, client_id), + Err(err) => { + let _ = response.send(Err(err)); + } + } } else { let _ = response .send(Err(RemoteAccountProviderError::AccountSubscriptionDoesNotExist( @@ -819,7 +883,7 @@ impl ChainPubsubActor { is_connected: Arc, ) -> RemoteAccountProviderResult<()> { // 1. Ensure we cleaned all existing subscriptions - Self::unsubscribe_all(subs, program_subs); + Self::unsubscribe_all(client_id, subs, program_subs).await?; // 2. Try to reconnect the pubsub connection pubsub_connection.reconnect().await?; diff --git a/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs b/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs index 89fc2b213..044676ddf 100644 --- a/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs +++ b/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs @@ -163,9 +163,12 @@ impl PubSubConnectionPool { } } - /// Reconnects the pool: clears state and tries to reconnect the - /// first connection to ensure that the provider is working - /// NOTE: assumes that all existing subscriptions have been dropped. + /// Reconnects the pool by dropping pooled connections and creating a fresh one. + /// + /// Caller precondition: every account/program listener task that may hold a + /// stream created from an existing pooled client must have completed before + /// this method is called. If listener completion cannot be proven, callers + /// must not call this method. pub async fn reconnect(&self) -> PubsubClientResult<()> { while self.connections.pop().is_some() {} // We cannot reconnect an existing connection due to the lockless queue From 00620dec85116bcb8c299f20faaa9ed40c51784f Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 27 May 2026 16:08:36 +0800 Subject: [PATCH 3/7] test: cover pubsub unsubscribe drain --- .../chain_pubsub_actor.rs | 146 +++++++++++++++++- 1 file changed, 140 insertions(+), 6 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index f90cbc09b..57655ec9f 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -46,8 +46,20 @@ use crate::remote_account_provider::{ // Log every 10 secs (given chain slot time is 400ms) const CLOCK_LOG_SLOT_FREQ: u64 = 25; +#[cfg(not(test))] const SUBSCRIPTION_COMPLETION_TIMEOUT: Duration = Duration::from_secs(5); +fn subscription_completion_timeout() -> Duration { + #[cfg(test)] + { + Duration::from_millis(100) + } + #[cfg(not(test))] + { + SUBSCRIPTION_COMPLETION_TIMEOUT + } +} + // ----------------- // ChainPubsubActor // ----------------- @@ -161,25 +173,26 @@ impl ChainPubsubActor { shutdown_token.cancel(); } + // Reconnect must not drop pooled PubsubClient instances until all listener + // tasks have stopped polling, explicitly dropped their subscription streams, + // and run or timed out unsubscribe cleanup. async fn cancel_and_wait_subscription( client_id: &str, kind: &'static str, pubkey: Pubkey, sub: AccountSubscription, ) -> RemoteAccountProviderResult<()> { + let timeout = subscription_completion_timeout(); sub.cancellation_token.cancel(); - match tokio::time::timeout( - SUBSCRIPTION_COMPLETION_TIMEOUT, - sub.completion_token.cancelled(), - ) - .await + match tokio::time::timeout(timeout, sub.completion_token.cancelled()) + .await { Ok(()) => Ok(()), Err(_) => { warn!( %pubkey, kind, - timeout_ms = SUBSCRIPTION_COMPLETION_TIMEOUT.as_millis() as u64, + timeout_ms = timeout.as_millis() as u64, client_id, "Timed out waiting for subscription listener to finish" ); @@ -979,3 +992,124 @@ impl ChainPubsubActor { }); } } + +#[cfg(test)] +mod tests { + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + }; + + use tokio::time::{sleep, Duration, Instant}; + + use super::*; + + #[tokio::test] + async fn unsubscribe_all_waits_for_account_and_program_completion() { + let subscriptions = Arc::new(Mutex::new(HashMap::new())); + let program_subs = Arc::new(Mutex::new(HashMap::new())); + let account_pubkey = Pubkey::new_unique(); + let program_pubkey = Pubkey::new_unique(); + + let account_cancellation_token = CancellationToken::new(); + let account_completion_token = CancellationToken::new(); + subscriptions + .lock() + .expect("subscriptions lock poisoned") + .insert( + account_pubkey, + AccountSubscription { + cancellation_token: account_cancellation_token.clone(), + completion_token: account_completion_token.clone(), + }, + ); + + let program_cancellation_token = CancellationToken::new(); + let program_completion_token = CancellationToken::new(); + program_subs + .lock() + .expect("program subs lock poisoned") + .insert( + program_pubkey, + AccountSubscription { + cancellation_token: program_cancellation_token.clone(), + completion_token: program_completion_token.clone(), + }, + ); + + let account_task_cancellation_token = + account_cancellation_token.clone(); + let account_task_completion_token = account_completion_token.clone(); + tokio::spawn(async move { + account_task_cancellation_token.cancelled().await; + sleep(Duration::from_millis(50)).await; + account_task_completion_token.cancel(); + }); + + let program_task_cancellation_token = + program_cancellation_token.clone(); + let program_task_completion_token = program_completion_token.clone(); + tokio::spawn(async move { + program_task_cancellation_token.cancelled().await; + sleep(Duration::from_millis(50)).await; + program_task_completion_token.cancel(); + }); + + let started = Instant::now(); + ChainPubsubActor::unsubscribe_all( + "test_client", + subscriptions.clone(), + program_subs.clone(), + ) + .await + .unwrap(); + + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .is_empty()); + assert!(program_subs + .lock() + .expect("program subs lock poisoned") + .is_empty()); + assert!(account_cancellation_token.is_cancelled()); + assert!(program_cancellation_token.is_cancelled()); + assert!(account_completion_token.is_cancelled()); + assert!(program_completion_token.is_cancelled()); + assert!(started.elapsed() >= Duration::from_millis(50)); + } + + #[tokio::test] + async fn unsubscribe_all_returns_error_when_completion_times_out() { + let subscriptions = Arc::new(Mutex::new(HashMap::new())); + let program_subs = Arc::new(Mutex::new(HashMap::new())); + let account_pubkey = Pubkey::new_unique(); + let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); + + subscriptions + .lock() + .expect("subscriptions lock poisoned") + .insert( + account_pubkey, + AccountSubscription { + cancellation_token: cancellation_token.clone(), + completion_token, + }, + ); + + let result = ChainPubsubActor::unsubscribe_all( + "test_client", + subscriptions.clone(), + program_subs, + ) + .await; + + assert!(result.is_err()); + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .is_empty()); + assert!(cancellation_token.is_cancelled()); + } +} From 295f65d1dd7c2e50e94506ee27785336f73844fb Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 27 May 2026 17:06:14 +0800 Subject: [PATCH 4/7] refactor: rename pubsub reconnect drain helpers --- .../chain_pubsub_actor.rs | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index 57655ec9f..50f68d675 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -165,18 +165,24 @@ impl ChainPubsubActor { shutdown_token: CancellationToken, ) { info!(client_id = client_id, "Shutting down pubsub actor"); - let result = - Self::unsubscribe_all(client_id, subscriptions, program_subs).await; + let result = Self::drain_and_wait_for_listener_completion( + client_id, + subscriptions, + program_subs, + ) + .await; if let Err(err) = result { warn!(error = ?err, client_id, "Timed out while shutting down pubsub subscriptions"); } shutdown_token.cancel(); } - // Reconnect must not drop pooled PubsubClient instances until all listener - // tasks have stopped polling, explicitly dropped their subscription streams, - // and run or timed out unsubscribe cleanup. - async fn cancel_and_wait_subscription( + // Reconnect must not drop pooled PubsubClient instances until all + // listener tasks have stopped polling and dropped their subscription streams. + // This helper cancels one listener and waits for its completion signal, which + // listener tasks emit only after dropping update_stream and running best-effort + // unsubscribe cleanup. + async fn cancel_and_wait_for_stream_drop( client_id: &str, kind: &'static str, pubkey: Pubkey, @@ -205,7 +211,7 @@ impl ChainPubsubActor { } } - async fn unsubscribe_all( + async fn drain_and_wait_for_listener_completion( client_id: &str, subscriptions: Arc>>, program_subs: Arc>>, @@ -223,7 +229,7 @@ impl ChainPubsubActor { let mut first_error = None; for (pubkey, sub) in account_subs { - if let Err(err) = Self::cancel_and_wait_subscription( + if let Err(err) = Self::cancel_and_wait_for_stream_drop( client_id, "account", pubkey, sub, ) .await @@ -232,7 +238,7 @@ impl ChainPubsubActor { } } for (pubkey, sub) in program_subs { - if let Err(err) = Self::cancel_and_wait_subscription( + if let Err(err) = Self::cancel_and_wait_for_stream_drop( client_id, "program", pubkey, sub, ) .await @@ -409,7 +415,7 @@ impl ChainPubsubActor { .expect("subcriptions lock poisoned") .remove(&pubkey); if let Some(sub) = sub { - match Self::cancel_and_wait_subscription( + match Self::cancel_and_wait_for_stream_drop( client_id, "account", pubkey, sub, ) .await @@ -895,8 +901,13 @@ impl ChainPubsubActor { client_id: &str, is_connected: Arc, ) -> RemoteAccountProviderResult<()> { - // 1. Ensure we cleaned all existing subscriptions - Self::unsubscribe_all(client_id, subs, program_subs).await?; + // 1. Drain subscriptions and wait until borrowed pubsub streams are dropped. + Self::drain_and_wait_for_listener_completion( + client_id, + subs, + program_subs, + ) + .await?; // 2. Try to reconnect the pubsub connection pubsub_connection.reconnect().await?; @@ -1005,7 +1016,8 @@ mod tests { use super::*; #[tokio::test] - async fn unsubscribe_all_waits_for_account_and_program_completion() { + async fn drain_and_wait_for_listener_completion_waits_for_account_and_program_completion( + ) { let subscriptions = Arc::new(Mutex::new(HashMap::new())); let program_subs = Arc::new(Mutex::new(HashMap::new())); let account_pubkey = Pubkey::new_unique(); @@ -1056,7 +1068,7 @@ mod tests { }); let started = Instant::now(); - ChainPubsubActor::unsubscribe_all( + ChainPubsubActor::drain_and_wait_for_listener_completion( "test_client", subscriptions.clone(), program_subs.clone(), @@ -1080,7 +1092,8 @@ mod tests { } #[tokio::test] - async fn unsubscribe_all_returns_error_when_completion_times_out() { + async fn drain_and_wait_for_listener_completion_returns_error_when_completion_times_out( + ) { let subscriptions = Arc::new(Mutex::new(HashMap::new())); let program_subs = Arc::new(Mutex::new(HashMap::new())); let account_pubkey = Pubkey::new_unique(); @@ -1098,7 +1111,7 @@ mod tests { }, ); - let result = ChainPubsubActor::unsubscribe_all( + let result = ChainPubsubActor::drain_and_wait_for_listener_completion( "test_client", subscriptions.clone(), program_subs, From 0570196ac817aca1cc1b0b6edbe48bf7db40fe32 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 27 May 2026 17:08:50 +0800 Subject: [PATCH 5/7] fix: make account unsubscribe cancellation fast --- .../chain_pubsub_actor.rs | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index 50f68d675..245124faa 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -410,21 +410,15 @@ impl ChainPubsubActor { send_ok(response, client_id); return; } - let sub = subscriptions + let cancellation_token = subscriptions .lock() .expect("subcriptions lock poisoned") - .remove(&pubkey); - if let Some(sub) = sub { - match Self::cancel_and_wait_for_stream_drop( - client_id, "account", pubkey, sub, - ) - .await - { - Ok(()) => send_ok(response, client_id), - Err(err) => { - let _ = response.send(Err(err)); - } - } + .get(&pubkey) + .map(|sub| sub.cancellation_token.clone()); + + if let Some(cancellation_token) = cancellation_token { + cancellation_token.cancel(); + send_ok(response, client_id); } else { let _ = response .send(Err(RemoteAccountProviderError::AccountSubscriptionDoesNotExist( From 56c7c104f2a7d9616d70c36a4abd6ba3e4ef64b5 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Wed, 27 May 2026 17:11:25 +0800 Subject: [PATCH 6/7] test: document pubsub reconnect drain contract --- .../chain_pubsub_actor.rs | 53 +++++++++++++++++++ .../pubsub_connection_pool.rs | 5 +- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index 245124faa..315db3a85 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -211,6 +211,11 @@ impl ChainPubsubActor { } } + // Reconnect-only safety gate: callers that may drop or replace pooled + // PubsubClient instances must drain subscription maps and wait here before + // calling PubSubConnectionPool::reconnect(). Explicit unsubscribe may cancel + // listeners without waiting because it does not drop pooled clients and leaves + // the map entry visible until listener cleanup completes. async fn drain_and_wait_for_listener_completion( client_id: &str, subscriptions: Arc>>, @@ -1119,4 +1124,52 @@ mod tests { .is_empty()); assert!(cancellation_token.is_cancelled()); } + + #[tokio::test] + async fn explicit_unsubscribe_style_cancellation_leaves_entry_for_reconnect_drain( + ) { + let subscriptions = Arc::new(Mutex::new(HashMap::new())); + let program_subs = Arc::new(Mutex::new(HashMap::new())); + let pubkey = Pubkey::new_unique(); + let cancellation_token = CancellationToken::new(); + let completion_token = CancellationToken::new(); + + subscriptions + .lock() + .expect("subscriptions lock poisoned") + .insert( + pubkey, + AccountSubscription { + cancellation_token: cancellation_token.clone(), + completion_token: completion_token.clone(), + }, + ); + + let cancellation_token_to_cancel = subscriptions + .lock() + .expect("subscriptions lock poisoned") + .get(&pubkey) + .map(|sub| sub.cancellation_token.clone()); + cancellation_token_to_cancel.unwrap().cancel(); + + assert!(cancellation_token.is_cancelled()); + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .contains_key(&pubkey)); + + completion_token.cancel(); + ChainPubsubActor::drain_and_wait_for_listener_completion( + "test_client", + subscriptions.clone(), + program_subs, + ) + .await + .unwrap(); + + assert!(subscriptions + .lock() + .expect("subscriptions lock poisoned") + .is_empty()); + } } diff --git a/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs b/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs index 044676ddf..21bf7ba30 100644 --- a/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs +++ b/magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs @@ -167,8 +167,9 @@ impl PubSubConnectionPool { /// /// Caller precondition: every account/program listener task that may hold a /// stream created from an existing pooled client must have completed before - /// this method is called. If listener completion cannot be proven, callers - /// must not call this method. + /// this method is called. Reconnect callers must enforce this by draining + /// subscription maps and waiting for listener completion; explicit unsubscribe + /// alone is not proof that the listener has finished. pub async fn reconnect(&self) -> PubsubClientResult<()> { while self.connections.pop().is_some() {} // We cannot reconnect an existing connection due to the lockless queue From b0720a32d0ed30d9e5d1349661b6ed1bba92e7f2 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Thu, 28 May 2026 15:51:54 +0800 Subject: [PATCH 7/7] perf: parallelize reconnect waits --- .../chain_pubsub_actor.rs | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs index 315db3a85..2e5d42e21 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs @@ -6,7 +6,7 @@ use std::{ }, }; -use futures_util::stream::FuturesUnordered; +use futures_util::{future::join_all, stream::FuturesUnordered}; use magicblock_core::logger::{log_trace_debug, log_trace_warn}; use magicblock_metrics::metrics::{ inc_account_subscription_account_updates_count, @@ -232,22 +232,23 @@ impl ChainPubsubActor { .drain() .collect::>(); + let cancellation_waits = account_subs + .into_iter() + .map(|(pubkey, sub)| { + Self::cancel_and_wait_for_stream_drop( + client_id, "account", pubkey, sub, + ) + }) + .chain(program_subs.into_iter().map(|(pubkey, sub)| { + Self::cancel_and_wait_for_stream_drop( + client_id, "program", pubkey, sub, + ) + })) + .collect::>(); + let mut first_error = None; - for (pubkey, sub) in account_subs { - if let Err(err) = Self::cancel_and_wait_for_stream_drop( - client_id, "account", pubkey, sub, - ) - .await - { - first_error.get_or_insert(err); - } - } - for (pubkey, sub) in program_subs { - if let Err(err) = Self::cancel_and_wait_for_stream_drop( - client_id, "program", pubkey, sub, - ) - .await - { + for result in join_all(cancellation_waits).await { + if let Err(err) = result { first_error.get_or_insert(err); } }