Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions magicblock-chainlink/src/chainlink/fetch_cloner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ where
&self.cloner
}

pub(crate) fn remote_account_provider(
&self,
) -> &Arc<RemoteAccountProvider<T, U>> {
&self.remote_account_provider
}

#[cfg(test)]
fn has_pending_request(&self, pubkey: &Pubkey) -> bool {
self.pending_requests.contains(pubkey)
Expand Down Expand Up @@ -568,6 +574,37 @@ where
return;
}

// A late forwarded update can arrive after an account was removed from
// the provider watch set. If a new subscription already won the race,
// is_watching is true and this update can be processed normally. If this
// update wins before acquire_subscription completes, the update is dropped;
// the new subscription path performs its own fetch and clones fresh state.
// If stale state is still present locally, cleanup is routed through the
// existing removal listener, which serializes the final is_watching check and
// eviction submission against same-pubkey subscription transitions.
let update_slot = update.account.slot();
if !self.remote_account_provider.is_watching(&pubkey) {
trace!(
pubkey = %pubkey,
update_slot,
"Dropping subscription update for account that is no longer watched"
);
if self.accounts_bank.get_account(&pubkey).is_some() {
if let Err(err) = self
.remote_account_provider
.send_removal_update(pubkey)
.await
{
warn!(
pubkey = %pubkey,
error = ?err,
"Failed to enqueue stale subscription update removal"
);
}
}
return;
}

let (resolved_account, deleg_record, delegation_actions) = self
.resolve_account_to_clone_from_forwarded_sub_with_unsubscribe(
update,
Expand Down
166 changes: 166 additions & 0 deletions magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,18 @@ fn init_fetch_cloner(
(fetch_cloner, subscription_tx)
}

async fn acquire_direct_subscription_for_update(
remote_account_provider: &Arc<
RemoteAccountProvider<ChainRpcClientMock, ChainPubsubClientMock>,
>,
pubkey: &Pubkey,
) {
remote_account_provider
.acquire_subscription(pubkey, SubscriptionReason::DirectAccount)
.await
.expect("failed to acquire direct subscription for update test");
}

async fn wait_for_pending_request(
fetch_cloner: &Arc<
FetchCloner<
Expand Down Expand Up @@ -1620,6 +1632,96 @@ async fn test_delegated_cleanup_keeps_undelegation_tracking_subscription() {
assert_not_subscribed!(remote_account_provider, &[&account_pubkey]);
}

#[tokio::test]
async fn test_subscription_update_for_unwatched_absent_account_is_dropped() {
init_logger();
let pubkey = Pubkey::new_unique();
let validator_keypair = Keypair::new();
let chain_account = Account {
lamports: 1_000_000,
data: Vec::new(),
owner: system_program::id(),
executable: false,
rent_epoch: 0,
};

let ctx =
setup([(pubkey, chain_account.clone())], 42, validator_keypair).await;

assert!(!ctx.remote_account_provider.is_watching(&pubkey));
assert!(ctx.accounts_bank.get_account(&pubkey).is_none());

ctx.subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey,
account: RemoteAccount::from_fresh_account(
chain_account,
42,
RemoteAccountUpdateSource::Subscription,
),
})
.await
.unwrap();

tokio::time::timeout(Duration::from_secs(1), async {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
if ctx.fetch_cloner.cloner().clone_request_count() == 0 {
break;
}
}
})
Comment on lines +1666 to +1673
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test can pass before the stale update is actually processed

At Line 1669, clone_request_count() == 0 is true at baseline, so this loop can exit immediately and the test may pass without exercising the async drop path at all. Please gate on a post-send processing signal (or at least a deterministic barrier) before asserting zero clone requests.

Suggested adjustment
-    tokio::time::timeout(Duration::from_secs(1), async {
-        loop {
-            tokio::time::sleep(Duration::from_millis(10)).await;
-            if ctx.fetch_cloner.cloner().clone_request_count() == 0 {
-                break;
-            }
-        }
-    })
-    .await
-    .expect("timed out waiting for stale update to be dropped");
+    // Give the subscription task a chance to process the forwarded update.
+    tokio::time::sleep(Duration::from_millis(50)).await;
+    assert_eq!(ctx.fetch_cloner.cloner().clone_request_count(), 0);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-chainlink/src/chainlink/fetch_cloner/tests.rs` around lines 1666 -
1673, The test currently breaks out immediately because clone_request_count()
can be 0 at baseline; change the test to first observe that a clone request was
created (wait for ctx.fetch_cloner.cloner().clone_request_count() to become > 0
or use a deterministic post-send signal from the code path) then wait for it to
drop back to 0; in other words, gate the zero-check behind a confirmed prior
increment (or await a dedicated processing-complete signal/oneshot you add to
the async drop path) so the test actually exercises the async drop processing in
tests.rs where ctx.fetch_cloner.cloner() and clone_request_count() are used.

.await
.expect("timed out waiting for stale update to be dropped");

assert!(ctx.accounts_bank.get_account(&pubkey).is_none());
assert_eq!(ctx.fetch_cloner.cloner().clone_request_count(), 0);
}

#[tokio::test]
async fn test_subscription_update_for_unwatched_present_account_enqueues_removal(
) {
init_logger();
let pubkey = Pubkey::new_unique();
let validator_keypair = Keypair::new();
let owner = Pubkey::new_unique();
let bank_account = AccountSharedData::new(1_000_000, 0, &owner);

let ctx = setup(
[(pubkey, Account::from(bank_account.clone()))],
43,
validator_keypair,
)
.await;
ctx.accounts_bank.insert(pubkey, bank_account.clone());
let mut removed_rx = ctx
.remote_account_provider
.try_get_removed_account_rx()
.expect("removed account receiver should be available");

assert!(!ctx.remote_account_provider.is_watching(&pubkey));

ctx.subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey,
account: RemoteAccount::from_fresh_account(
Account::from(bank_account),
43,
RemoteAccountUpdateSource::Subscription,
),
})
.await
.unwrap();

let removed =
tokio::time::timeout(Duration::from_secs(1), removed_rx.recv())
.await
.expect("timed out waiting for removal notification");
assert_eq!(removed, Some(pubkey));
assert!(removed_rx.try_recv().is_err());
assert_eq!(ctx.fetch_cloner.cloner().clone_request_count(), 0);
}

// End-to-end variant of the test above that drives the cleanup through
// `process_subscription_update` (via the subscription channel) instead of
// invoking `cleanup_direct_subscription_for_delegated_account` directly.
Expand Down Expand Up @@ -2668,6 +2770,11 @@ async fn send_subscription_update_and_get_subscribed_programs(
};

accounts_bank.insert(account_pubkey, AccountSharedData::from(bank_account));
acquire_direct_subscription_for_update(
remote_account_provider,
&account_pubkey,
)
.await;

let pubsub_client = remote_account_provider.pubsub_client();
let initial_programs = pubsub_client.subscribed_program_ids();
Expand Down Expand Up @@ -2906,6 +3013,7 @@ async fn test_non_raw_eata_owned_account_subscription_update_stays_delegated() {
let ata_pubkey = derive_ata(&wallet_owner, &mint);

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
subscription_tx,
Expand All @@ -2928,6 +3036,11 @@ async fn test_non_raw_eata_owned_account_subscription_update_stays_delegated() {
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&account_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: account_pubkey,
Expand Down Expand Up @@ -2981,6 +3094,7 @@ async fn test_discovered_dlp_owned_account_without_delegation_record_falls_back(
};

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
subscription_tx,
..
Expand All @@ -2995,6 +3109,11 @@ async fn test_discovered_dlp_owned_account_without_delegation_record_falls_back(
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&account_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: account_pubkey,
Expand Down Expand Up @@ -3114,6 +3233,7 @@ async fn test_out_of_order_delegated_eata_subscription_update_still_projects_ata
let eata_account = create_eata_account(&wallet_owner, &mint, AMOUNT, true);

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
subscription_tx,
Expand Down Expand Up @@ -3141,6 +3261,11 @@ async fn test_out_of_order_delegated_eata_subscription_update_still_projects_ata
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&eata_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: eata_pubkey,
Expand Down Expand Up @@ -3193,6 +3318,7 @@ async fn test_out_of_order_delegated_eata_update_clones_action_dependencies() {
};

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
subscription_tx,
Expand Down Expand Up @@ -3224,6 +3350,11 @@ async fn test_out_of_order_delegated_eata_update_clones_action_dependencies() {
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&eata_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: eata_pubkey,
Expand Down Expand Up @@ -3286,6 +3417,7 @@ async fn test_subscription_update_with_delegation_actions_clones_dependencies()
};

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
subscription_tx,
Expand Down Expand Up @@ -3316,6 +3448,11 @@ async fn test_subscription_update_with_delegation_actions_clones_dependencies()
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&account_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: account_pubkey,
Expand Down Expand Up @@ -3375,6 +3512,7 @@ async fn test_delegated_eata_subscription_update_clones_raw_eata_and_projects_at
let eata_account = create_eata_account(&wallet_owner, &mint, AMOUNT, true);

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
subscription_tx,
Expand All @@ -3397,6 +3535,11 @@ async fn test_delegated_eata_subscription_update_clones_raw_eata_and_projects_at
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&eata_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: eata_pubkey,
Expand Down Expand Up @@ -3473,6 +3616,7 @@ async fn test_ata_subscription_update_projects_eata_when_chain_slot_lags() {
ata_account.data[64..72].copy_from_slice(&BASE_ATA_AMOUNT.to_le_bytes());

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
subscription_tx,
Expand Down Expand Up @@ -3502,6 +3646,11 @@ async fn test_ata_subscription_update_projects_eata_when_chain_slot_lags() {
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&ata_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: ata_pubkey,
Expand Down Expand Up @@ -3582,6 +3731,7 @@ async fn test_delegated_eata_subscription_update_clones_action_dependencies() {
};

let FetcherTestCtx {
remote_account_provider,
accounts_bank,
rpc_client,
subscription_tx,
Expand All @@ -3608,6 +3758,11 @@ async fn test_delegated_eata_subscription_update_clones_action_dependencies() {
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&eata_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: eata_pubkey,
Expand Down Expand Up @@ -4419,6 +4574,11 @@ async fn test_undelegating_projected_ata_subscription_update_stays_locked() {
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&ata_pubkey,
)
.await;
subscription_tx
.send(ForwardedSubscriptionUpdate {
pubkey: ata_pubkey,
Expand Down Expand Up @@ -5385,6 +5545,12 @@ async fn test_project_ata_skips_repeat_fetch_for_known_empty_eata() {
RemoteAccount, RemoteAccountUpdateSource,
};

acquire_direct_subscription_for_update(
&remote_account_provider,
&ata_pubkey,
)
.await;

let send_update = |slot| {
let ata_account = ata_account.clone();
let subscription_tx = subscription_tx.clone();
Expand Down
Loading
Loading