From eac1e593220e52eff9edda026b790807520f4dcf Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 29 May 2026 16:15:49 +0800 Subject: [PATCH 1/4] fix(chainlink): drop stale unwatched subscription updates --- .../src/chainlink/fetch_cloner/mod.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index 157929eff..612aa633c 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -568,6 +568,21 @@ where return; } + // A late forwarded update can arrive after an account was removed from + // the provider watch set. If a new subscription already won the race, + // is_watching is true and this update can be processed normally. If this + // update wins before acquire_subscription completes, the update is dropped; + // the new subscription path performs its own fetch and clones fresh state. + let update_slot = update.account.slot(); + if !self.remote_account_provider.is_watching(&pubkey) { + trace!( + pubkey = %pubkey, + update_slot, + "Dropping subscription update for account that is no longer watched" + ); + return; + } + let (resolved_account, deleg_record, delegation_actions) = self .resolve_account_to_clone_from_forwarded_sub_with_unsubscribe( update, From d3759cefc55be9f4a835acbd0e0c98840bc2a813 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 29 May 2026 16:20:10 +0800 Subject: [PATCH 2/4] fix(chainlink): serialize stale account eviction --- .../src/chainlink/fetch_cloner/mod.rs | 22 +++++ magicblock-chainlink/src/chainlink/mod.rs | 85 +++++++++++++++++-- .../src/remote_account_provider/mod.rs | 22 ++++- 3 files changed, 119 insertions(+), 10 deletions(-) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs index 612aa633c..2ee3bd5ae 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs @@ -258,6 +258,12 @@ where &self.cloner } + pub(crate) fn remote_account_provider( + &self, + ) -> &Arc> { + &self.remote_account_provider + } + #[cfg(test)] fn has_pending_request(&self, pubkey: &Pubkey) -> bool { self.pending_requests.contains(pubkey) @@ -573,6 +579,9 @@ where // is_watching is true and this update can be processed normally. If this // update wins before acquire_subscription completes, the update is dropped; // the new subscription path performs its own fetch and clones fresh state. + // If stale state is still present locally, cleanup is routed through the + // existing removal listener, which serializes the final is_watching check and + // eviction submission against same-pubkey subscription transitions. let update_slot = update.account.slot(); if !self.remote_account_provider.is_watching(&pubkey) { trace!( @@ -580,6 +589,19 @@ where update_slot, "Dropping subscription update for account that is no longer watched" ); + if self.accounts_bank.get_account(&pubkey).is_some() { + if let Err(err) = self + .remote_account_provider + .send_removal_update(pubkey) + .await + { + warn!( + pubkey = %pubkey, + error = ?err, + "Failed to enqueue stale subscription update removal" + ); + } + } return; } diff --git a/magicblock-chainlink/src/chainlink/mod.rs b/magicblock-chainlink/src/chainlink/mod.rs index 8d8f12975..7d23564b3 100644 --- a/magicblock-chainlink/src/chainlink/mod.rs +++ b/magicblock-chainlink/src/chainlink/mod.rs @@ -289,6 +289,7 @@ impl Some(Self::subscribe_account_removals( accounts_bank, cloner, + fetch_cloner.remote_account_provider(), removed_accounts_rx, )) } else { @@ -383,10 +384,12 @@ impl fn subscribe_account_removals( accounts_bank: &Arc, cloner: &Arc, + remote_account_provider: &Arc>, mut removed_accounts_rx: mpsc::Receiver, ) -> task::JoinHandle<()> { let accounts_bank = accounts_bank.clone(); let cloner = cloner.clone(); + let remote_account_provider = remote_account_provider.clone(); task::spawn(async move { while let Some(pubkey) = removed_accounts_rx.recv().await { @@ -420,15 +423,33 @@ impl continue; } - trace!( - pubkey = %pubkey, - "Submitting eviction transaction" - ); - if let Err(err) = cloner.evict_account(pubkey).await { - warn!( + // Removal notifications can race with a new acquire_subscription for the same + // pubkey. The provider helper holds the same per-pubkey subscription lock used + // by acquire/release while it re-checks is_watching and submits eviction. This + // prevents an EvictAccount transaction from being submitted after a fresh + // subscription has made the account watched again, without blocking unrelated + // pubkeys on the defensive-eviction slow path. + let cloner = cloner.clone(); + let evicted = remote_account_provider + .evict_unwatched_with_subscription_lock(&pubkey, || async move { + trace!( + pubkey = %pubkey, + "Submitting eviction transaction for unwatched account" + ); + if let Err(err) = cloner.evict_account(pubkey).await { + warn!( + pubkey = %pubkey, + error = ?err, + "Failed to submit eviction transaction" + ); + } + }) + .await; + + if !evicted { + trace!( pubkey = %pubkey, - error = ?err, - "Failed to submit eviction transaction" + "Skipping removal notification because account is watched again" ); } } @@ -705,6 +726,48 @@ mod tests { ClonerStub, >; + async fn test_remote_account_provider() -> Arc< + crate::remote_account_provider::RemoteAccountProvider< + ChainRpcClientMock, + ChainPubsubClientMock, + >, + > { + use crate::{ + remote_account_provider::{ + chain_slot::ChainSlot, RemoteAccountProvider, + }, + testing::{ + rpc_client_mock::ChainRpcClientMockBuilder, + utils::create_test_lru_cache, + }, + }; + use std::sync::atomic::AtomicU64; + + let rpc_client = ChainRpcClientMockBuilder::new() + .slot(1) + .clock_sysvar_for_slot(1) + .build(); + let (updates_sender, updates_receiver) = mpsc::channel(1_000); + let pubsub_client = + ChainPubsubClientMock::new(updates_sender, updates_receiver); + let (forward_tx, _forward_rx) = mpsc::channel(1_000); + let (subscribed_accounts, config) = create_test_lru_cache(1000); + let chain_slot = Arc::::default(); + + Arc::new( + RemoteAccountProvider::new( + rpc_client, + pubsub_client, + forward_tx, + &config, + subscribed_accounts, + ChainSlot::new(chain_slot), + ) + .await + .expect("test remote account provider should be constructed"), + ) + } + fn disabled_chainlink( ) -> (Arc, TestReplicationModeAwareChainlink) { let accounts_bank = Arc::new(AccountsBankStub::default()); @@ -811,6 +874,7 @@ mod tests { let accounts_bank = Arc::new(AccountsBankStub::default()); let cloner = Arc::new(ClonerStub::new(accounts_bank.clone())); let (removed_tx, removed_rx) = mpsc::channel(8); + let remote_account_provider = test_remote_account_provider().await; let delegated_pubkey = Pubkey::new_unique(); let undelegating_pubkey = Pubkey::new_unique(); @@ -836,7 +900,10 @@ mod tests { AccountsBankStub, ClonerStub, >::subscribe_account_removals( - &accounts_bank, &cloner, removed_rx + &accounts_bank, + &cloner, + &remote_account_provider, + removed_rx, ); removed_tx.send(delegated_pubkey).await.unwrap(); diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 656317466..76af2b95e 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -1567,7 +1567,7 @@ impl RemoteAccountProvider { Ok(()) } - async fn send_removal_update( + pub(crate) async fn send_removal_update( &self, evicted: Pubkey, ) -> RemoteAccountProviderResult<()> { @@ -1584,6 +1584,26 @@ impl RemoteAccountProvider { self.lrucache_subscribed_accounts.contains(pubkey) } + pub(crate) async fn evict_unwatched_with_subscription_lock( + &self, + pubkey: &Pubkey, + evict: F, + ) -> bool + where + F: FnOnce() -> Fut, + Fut: std::future::Future, + { + let subscription_key_lock = self.subscription_key_lock(pubkey).await; + let _subscription_guard = subscription_key_lock.lock().await; + + if self.is_watching(pubkey) { + return false; + } + + evict().await; + true + } + /// Check if an account is currently pending (being fetched) pub fn is_pending(&self, pubkey: &Pubkey) -> bool { let fetching = self.fetching_accounts.lock().unwrap(); From 841ee808cc37a16e7b9a7e39b86e1e0b50f73087 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 29 May 2026 16:30:48 +0800 Subject: [PATCH 3/4] test: cover stale subscription updates --- .../src/chainlink/fetch_cloner/tests.rs | 166 ++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs index 95a1462a5..f8fe0df73 100644 --- a/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs +++ b/magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs @@ -267,6 +267,18 @@ fn init_fetch_cloner( (fetch_cloner, subscription_tx) } +async fn acquire_direct_subscription_for_update( + remote_account_provider: &Arc< + RemoteAccountProvider, + >, + pubkey: &Pubkey, +) { + remote_account_provider + .acquire_subscription(pubkey, SubscriptionReason::DirectAccount) + .await + .expect("failed to acquire direct subscription for update test"); +} + async fn wait_for_pending_request( fetch_cloner: &Arc< FetchCloner< @@ -1620,6 +1632,96 @@ async fn test_delegated_cleanup_keeps_undelegation_tracking_subscription() { assert_not_subscribed!(remote_account_provider, &[&account_pubkey]); } +#[tokio::test] +async fn test_subscription_update_for_unwatched_absent_account_is_dropped() { + init_logger(); + let pubkey = Pubkey::new_unique(); + let validator_keypair = Keypair::new(); + let chain_account = Account { + lamports: 1_000_000, + data: Vec::new(), + owner: system_program::id(), + executable: false, + rent_epoch: 0, + }; + + let ctx = + setup([(pubkey, chain_account.clone())], 42, validator_keypair).await; + + assert!(!ctx.remote_account_provider.is_watching(&pubkey)); + assert!(ctx.accounts_bank.get_account(&pubkey).is_none()); + + ctx.subscription_tx + .send(ForwardedSubscriptionUpdate { + pubkey, + account: RemoteAccount::from_fresh_account( + chain_account, + 42, + RemoteAccountUpdateSource::Subscription, + ), + }) + .await + .unwrap(); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + if ctx.fetch_cloner.cloner().clone_request_count() == 0 { + break; + } + } + }) + .await + .expect("timed out waiting for stale update to be dropped"); + + assert!(ctx.accounts_bank.get_account(&pubkey).is_none()); + assert_eq!(ctx.fetch_cloner.cloner().clone_request_count(), 0); +} + +#[tokio::test] +async fn test_subscription_update_for_unwatched_present_account_enqueues_removal( +) { + init_logger(); + let pubkey = Pubkey::new_unique(); + let validator_keypair = Keypair::new(); + let owner = Pubkey::new_unique(); + let bank_account = AccountSharedData::new(1_000_000, 0, &owner); + + let ctx = setup( + [(pubkey, Account::from(bank_account.clone()))], + 43, + validator_keypair, + ) + .await; + ctx.accounts_bank.insert(pubkey, bank_account.clone()); + let mut removed_rx = ctx + .remote_account_provider + .try_get_removed_account_rx() + .expect("removed account receiver should be available"); + + assert!(!ctx.remote_account_provider.is_watching(&pubkey)); + + ctx.subscription_tx + .send(ForwardedSubscriptionUpdate { + pubkey, + account: RemoteAccount::from_fresh_account( + Account::from(bank_account), + 43, + RemoteAccountUpdateSource::Subscription, + ), + }) + .await + .unwrap(); + + let removed = + tokio::time::timeout(Duration::from_secs(1), removed_rx.recv()) + .await + .expect("timed out waiting for removal notification"); + assert_eq!(removed, Some(pubkey)); + assert!(removed_rx.try_recv().is_err()); + assert_eq!(ctx.fetch_cloner.cloner().clone_request_count(), 0); +} + // End-to-end variant of the test above that drives the cleanup through // `process_subscription_update` (via the subscription channel) instead of // invoking `cleanup_direct_subscription_for_delegated_account` directly. @@ -2668,6 +2770,11 @@ async fn send_subscription_update_and_get_subscribed_programs( }; accounts_bank.insert(account_pubkey, AccountSharedData::from(bank_account)); + acquire_direct_subscription_for_update( + remote_account_provider, + &account_pubkey, + ) + .await; let pubsub_client = remote_account_provider.pubsub_client(); let initial_programs = pubsub_client.subscribed_program_ids(); @@ -2906,6 +3013,7 @@ async fn test_non_raw_eata_owned_account_subscription_update_stays_delegated() { let ata_pubkey = derive_ata(&wallet_owner, &mint); let FetcherTestCtx { + remote_account_provider, accounts_bank, rpc_client, subscription_tx, @@ -2928,6 +3036,11 @@ async fn test_non_raw_eata_owned_account_subscription_update_stays_delegated() { RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &account_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: account_pubkey, @@ -2981,6 +3094,7 @@ async fn test_discovered_dlp_owned_account_without_delegation_record_falls_back( }; let FetcherTestCtx { + remote_account_provider, accounts_bank, subscription_tx, .. @@ -2995,6 +3109,11 @@ async fn test_discovered_dlp_owned_account_without_delegation_record_falls_back( RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &account_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: account_pubkey, @@ -3114,6 +3233,7 @@ async fn test_out_of_order_delegated_eata_subscription_update_still_projects_ata let eata_account = create_eata_account(&wallet_owner, &mint, AMOUNT, true); let FetcherTestCtx { + remote_account_provider, accounts_bank, rpc_client, subscription_tx, @@ -3141,6 +3261,11 @@ async fn test_out_of_order_delegated_eata_subscription_update_still_projects_ata RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &eata_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: eata_pubkey, @@ -3193,6 +3318,7 @@ async fn test_out_of_order_delegated_eata_update_clones_action_dependencies() { }; let FetcherTestCtx { + remote_account_provider, accounts_bank, rpc_client, subscription_tx, @@ -3224,6 +3350,11 @@ async fn test_out_of_order_delegated_eata_update_clones_action_dependencies() { RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &eata_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: eata_pubkey, @@ -3286,6 +3417,7 @@ async fn test_subscription_update_with_delegation_actions_clones_dependencies() }; let FetcherTestCtx { + remote_account_provider, accounts_bank, rpc_client, subscription_tx, @@ -3316,6 +3448,11 @@ async fn test_subscription_update_with_delegation_actions_clones_dependencies() RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &account_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: account_pubkey, @@ -3375,6 +3512,7 @@ async fn test_delegated_eata_subscription_update_clones_raw_eata_and_projects_at let eata_account = create_eata_account(&wallet_owner, &mint, AMOUNT, true); let FetcherTestCtx { + remote_account_provider, accounts_bank, rpc_client, subscription_tx, @@ -3397,6 +3535,11 @@ async fn test_delegated_eata_subscription_update_clones_raw_eata_and_projects_at RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &eata_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: eata_pubkey, @@ -3473,6 +3616,7 @@ async fn test_ata_subscription_update_projects_eata_when_chain_slot_lags() { ata_account.data[64..72].copy_from_slice(&BASE_ATA_AMOUNT.to_le_bytes()); let FetcherTestCtx { + remote_account_provider, accounts_bank, rpc_client, subscription_tx, @@ -3502,6 +3646,11 @@ async fn test_ata_subscription_update_projects_eata_when_chain_slot_lags() { RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &ata_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: ata_pubkey, @@ -3582,6 +3731,7 @@ async fn test_delegated_eata_subscription_update_clones_action_dependencies() { }; let FetcherTestCtx { + remote_account_provider, accounts_bank, rpc_client, subscription_tx, @@ -3608,6 +3758,11 @@ async fn test_delegated_eata_subscription_update_clones_action_dependencies() { RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &eata_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: eata_pubkey, @@ -4419,6 +4574,11 @@ async fn test_undelegating_projected_ata_subscription_update_stays_locked() { RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &ata_pubkey, + ) + .await; subscription_tx .send(ForwardedSubscriptionUpdate { pubkey: ata_pubkey, @@ -5385,6 +5545,12 @@ async fn test_project_ata_skips_repeat_fetch_for_known_empty_eata() { RemoteAccount, RemoteAccountUpdateSource, }; + acquire_direct_subscription_for_update( + &remote_account_provider, + &ata_pubkey, + ) + .await; + let send_update = |slot| { let ata_account = ata_account.clone(); let subscription_tx = subscription_tx.clone(); From 7e8966193f81d6b9544d16ae0fa64830026f4c06 Mon Sep 17 00:00:00 2001 From: Thorsten Lorenz Date: Fri, 29 May 2026 16:34:30 +0800 Subject: [PATCH 4/4] test: cover removal listener subscription races --- magicblock-chainlink/src/chainlink/mod.rs | 131 +++++++++++++++++++++- 1 file changed, 129 insertions(+), 2 deletions(-) diff --git a/magicblock-chainlink/src/chainlink/mod.rs b/magicblock-chainlink/src/chainlink/mod.rs index 7d23564b3..5388406ec 100644 --- a/magicblock-chainlink/src/chainlink/mod.rs +++ b/magicblock-chainlink/src/chainlink/mod.rs @@ -712,7 +712,10 @@ mod tests { }; use crate::{ accounts_bank::mock::AccountsBankStub, - remote_account_provider::chain_pubsub_client::mock::ChainPubsubClientMock, + remote_account_provider::{ + chain_pubsub_client::mock::ChainPubsubClientMock, + SubscriptionReason, + }, testing::{ cloner_stub::ClonerStub, init_logger, rpc_client_mock::ChainRpcClientMock, @@ -732,6 +735,8 @@ mod tests { ChainPubsubClientMock, >, > { + use std::sync::atomic::AtomicU64; + use crate::{ remote_account_provider::{ chain_slot::ChainSlot, RemoteAccountProvider, @@ -741,7 +746,6 @@ mod tests { utils::create_test_lru_cache, }, }; - use std::sync::atomic::AtomicU64; let rpc_client = ChainRpcClientMockBuilder::new() .slot(1) @@ -928,4 +932,127 @@ mod tests { drop(removed_tx); handle.await.unwrap(); } + + #[tokio::test] + async fn test_subscribe_account_removals_skips_evict_when_account_is_watched_again( + ) { + init_logger(); + + let accounts_bank = Arc::new(AccountsBankStub::default()); + let cloner = Arc::new(ClonerStub::new(accounts_bank.clone())); + let (removed_tx, removed_rx) = mpsc::channel(8); + let remote_account_provider = test_remote_account_provider().await; + + let pubkey = Pubkey::new_unique(); + let owner = Pubkey::new_unique(); + let account = AccountSharedData::new(1_000_000, 0, &owner); + accounts_bank.insert(pubkey, account); + + let handle = InnerChainlink::< + ChainRpcClientMock, + ChainPubsubClientMock, + AccountsBankStub, + ClonerStub, + >::subscribe_account_removals( + &accounts_bank, + &cloner, + &remote_account_provider, + removed_rx, + ); + + remote_account_provider + .acquire_subscription(&pubkey, SubscriptionReason::DirectAccount) + .await + .expect("subscription acquisition should succeed"); + assert!(remote_account_provider.is_watching(&pubkey)); + + removed_tx.send(pubkey).await.unwrap(); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + assert!(accounts_bank.get_account(&pubkey).is_some()); + if remote_account_provider.is_watching(&pubkey) { + break; + } + } + }) + .await + .expect("watched account should not be evicted"); + + assert!(remote_account_provider.is_watching(&pubkey)); + assert!(accounts_bank.get_account(&pubkey).is_some()); + + drop(removed_tx); + handle.await.unwrap(); + assert!(accounts_bank.get_account(&pubkey).is_some()); + } + + #[tokio::test] + async fn test_defensive_eviction_blocks_same_pubkey_subscription_until_eviction_finishes( + ) { + init_logger(); + + let remote_account_provider = test_remote_account_provider().await; + let pubkey = Pubkey::new_unique(); + assert!(!remote_account_provider.is_watching(&pubkey)); + + let eviction_started = Arc::new(tokio::sync::Notify::new()); + let release_eviction = Arc::new(tokio::sync::Notify::new()); + + let eviction_provider = remote_account_provider.clone(); + let eviction_pubkey = pubkey; + let eviction_started_for_task = eviction_started.clone(); + let release_eviction_for_task = release_eviction.clone(); + let eviction_task = tokio::spawn(async move { + eviction_provider + .evict_unwatched_with_subscription_lock( + &eviction_pubkey, + || async move { + eviction_started_for_task.notify_one(); + release_eviction_for_task.notified().await; + }, + ) + .await + }); + + eviction_started.notified().await; + + let (result_tx, mut result_rx) = tokio::sync::oneshot::channel(); + let subscribe_provider = remote_account_provider.clone(); + let subscribe_pubkey = pubkey; + let subscribe_task = tokio::spawn(async move { + let result = subscribe_provider + .acquire_subscription( + &subscribe_pubkey, + SubscriptionReason::DirectAccount, + ) + .await; + let _ = result_tx.send(result); + }); + + assert!( + tokio::time::timeout( + Duration::from_millis(50), + &mut result_rx, + ) + .await + .is_err(), + "same-pubkey subscribe must wait while defensive eviction holds the per-pubkey subscription lock" + ); + + release_eviction.notify_one(); + assert!(eviction_task.await.unwrap()); + let subscribe_result = tokio::time::timeout( + Duration::from_secs(1), + &mut result_rx, + ) + .await + .expect("subscription should complete after eviction releases the lock") + .expect("subscription task should send its result"); + subscribe_task.await.unwrap(); + + assert!(subscribe_result.is_ok()); + assert!(remote_account_provider.is_watching(&pubkey)); + } }