feat: implement pubsub connection pooling#931
Conversation
Amp-Thread-ID: https://ampcode.com/threads/T-019c330f-5ffe-720e-a1d5-c702b78de081 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c330f-e8f6-72c0-9e0e-93bbc1f1b9c2 Co-authored-by: Amp <amp@ampcode.com>
…scriptions Amp-Thread-ID: https://ampcode.com/threads/T-019c3702-6f33-74eb-89d9-fad10aac35ed Co-authored-by: Amp <amp@ampcode.com>
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces the single-connection PubSub abstraction with a new Suggested reviewers
✨ Finishing touches🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In `@magicblock-chainlink/src/remote_account_provider/pubsub_common.rs`:
- Around line 26-36: The current brittle provider detection uses
pubsub_url.to_lowercase().contains("helius"); change it to parse the URL host
and base the detection on the hostname only (e.g., use
url::Url::parse(pubsub_url.as_str()) and check url.host_str().map(|h|
h.contains("helius")) ), then set per_stream_subscription_limit to
Some(HELIUS_PER_STREAM_SUBSCRIPTION_LIMIT) only when the host matches; keep
existing symbols pubsub_url, per_stream_subscription_limit, and
HELIUS_PER_STREAM_SUBSCRIPTION_LIMIT and ensure the parsing fallback preserves
the previous None behavior if parsing fails.
In `@magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs`:
- Around line 134-160: There’s a TOCTOU where two concurrent
find_or_create_connection calls can both miss capacity and each create a
connection; to avoid over-provisioning, after creating new_connection but before
pushing it, re-acquire the same lock and re-check for an available slot using
pick_connection (or otherwise reserve a slot): if pick_connection returns Some,
increment that slot’s sub_count and discard/close the newly-created connection
and return the existing one; otherwise push the new PooledConnection into
connections as you do now. Reference: find_or_create_connection,
pick_connection, PooledConnection, sub_count, connections.
- Around line 128-130: The clear_connections method currently drops entries in
self.connections without unsubscribing active streams; add a doc comment on pub
fn clear_connections(&self) stating it does not perform graceful unsubscribe and
must only be called after subscriptions have been cancelled (e.g., by the
actor's try_reconnect), and mention the precondition that callers are
responsible for closing/ cancelling streams to avoid abruptly terminating active
subscriptions; reference clear_connections and try_reconnect in the comment so
maintainers know the required call order.
In `@magicblock-chainlink/src/remote_account_provider/pubsub_connection.rs`:
- Around line 123-131: The current reconnect() handling silently returns Ok(())
when self.reconnect_guard.try_lock() fails, which misleads callers; change the
behavior so callers can observe the true outcome by either (A) returning a
distinct status/result when the lock is contended (e.g., an enum
ReconnectResult::InProgress) from the reconnect() method instead of Ok(()), or
(B) block on the mutex (use lock().await) so the caller shares the real
reconnect result; update the reconnect_guard usage and the callers of
reconnect() to handle the new result (refer to reconnect(), reconnect_guard,
try_lock, and RECONNECT_ATTEMPT_DELAY) and ensure the sleep path still delays
but returns an appropriate status/error rather than Ok(()).
- Around line 121-151: The reconnect() implementation on PubsubConnection (the
async fn reconnect in pubsub_connection.rs) is currently dead code because
PubSubConnectionPool never invokes reconnect() and instead uses
clear_connections() and T::new(); either mark the method as intentionally unused
with #[allow(dead_code)] or add a short doc comment explaining its
reserved/future use so linters and readers understand why it exists, or remove
the method to shrink the trait surface if you confirm no external consumers need
it; reference the reconnect method, PubSubConnectionPool, clear_connections, and
T::new() when making the change.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs`:
- Around line 77-138: account_subscribe and program_subscribe duplicate the same
find-or-create, error mapping, rollback, and wrap_unsub logic; introduce a
private helper (e.g., subscribe_with_pool) that calls
self.find_or_create_connection().await, maps errors to
PubsubClientError::SubscribeFailed, accepts an async closure (passed the chosen
connection Arc) to invoke either connection.account_subscribe or
connection.program_subscribe, and on success wraps the returned raw_unsub via
self.wrap_unsub(raw_unsub, sub_count) and on failure decrements sub_count
(sub_count.fetch_sub(1, Ordering::SeqCst)) before returning the error; then
refactor account_subscribe and program_subscribe to call this helper with the
appropriate subscribe closure.
- Around line 150-159: The current non-atomic pattern in the Phase 1 block
(calling pick_connection(&guard) to get pooled_conn, then reading/subsequently
calling sub_count.fetch_add(1, Ordering::SeqCst)) can let concurrent callers
exceed per_connection_sub_limit; change the increment to an atomic CAS loop
(e.g., use sub_count.fetch_update or compare_exchange in a loop) that only
increments when the observed value is strictly less than
per_connection_sub_limit and otherwise treats the slot as full, so
pick_connection + atomic increment becomes a single atomic check-and-increment;
ensure you still return Ok((sub_count_handle,
Arc::clone(&pooled_conn.connection))) only after the CAS succeeds and fall back
to the existing creation/lookup paths when the CAS fails due to the limit.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs`:
- Around line 151-189: The pool can grow unbounded and the trace calls
connections.len() which is O(n); modify find_or_create_connection to enforce a
configurable max (e.g., add a max_connections field on PubSubConnectionPool and
check it under the new_connection_guard before creating a connection, returning
a clear RemoteAccountProvider error when exhausted), and avoid calling
scc::Queue::len() in trace by maintaining a cheap counter (e.g., an AtomicUsize
connection_count incremented when pushing a PooledConnection and decremented on
drop) and log that counter instead of self.connections.len(); update references
to new_connection_guard, connections, find_or_create_connection, and the trace
call accordingly.
Amp-Thread-ID: https://ampcode.com/threads/T-019c3830-1c06-7406-831c-fe2af2fd8314 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c3838-288d-732e-bcf0-bafe984b350b Co-authored-by: Amp <amp@ampcode.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs`:
- Around line 148-174: The reconnect() method currently calls conn.reconnect()
right after creating a connection via T::new(self.url.clone()), which is
redundant; remove the conn.reconnect().await? call and instead use the freshly
created conn directly to construct the PooledConnection (PooledConnection {
connection: Arc::new(conn), sub_count: Arc::new(AtomicUsize::new(0)) }) and push
that into self.connections; ensure error handling still returns
PubsubClientError::ConnectionClosed on T::new failure and keep the rest of
reconnect() logic unchanged.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (1)
150-168:⚠️ Potential issue | 🟠 Major
.expect()on mutex locks in new production code violates coding guidelines.Lines 156 and 161 use
.expect()which should be treated as a major issue per repository guidelines. While this pattern exists elsewhere in the file, new code should move toward proper error handling.Additionally, this method holds both
subscriptionsandprogram_subsmutex locks simultaneously (theMutexGuardtemporaries live until.collect()completes). Although there's no deadlock today—abort_and_signal_connection_issueacquires them sequentially—this dual-lock pattern is fragile. Consider draining each map separately to match the pattern used indrain_subscriptions.Proposed fix: drain maps sequentially and handle poisoned locks
fn unsubscribe_all( subscriptions: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>, program_subs: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>, ) { - let subs = subscriptions - .lock() - .expect("subscriptions lock poisoned") - .drain() - .chain( - program_subs - .lock() - .expect("program subs lock poisoned") - .drain(), - ) - .collect::<Vec<_>>(); - for (_, sub) in subs { - sub.cancellation_token.cancel(); + fn drain_and_cancel( + map: Arc<Mutex<HashMap<Pubkey, AccountSubscription>>>, + label: &str, + ) { + let Ok(mut lock) = map.lock() else { + error!("{label} lock poisoned during unsubscribe_all"); + return; + }; + for (_, sub) in lock.drain() { + sub.cancellation_token.cancel(); + } } + drain_and_cancel(subscriptions, "subscriptions"); + drain_and_cancel(program_subs, "program_subs"); }As per coding guidelines,
{magicblock-*,programs,storage-proto}/**: Treat any usage of.unwrap()or.expect()in production Rust code as a MAJOR issue.
🤖 Fix all issues with AI agents
In `@magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs`:
- Around line 776-777: In try_reconnect, the test-only unsubscribe() call is
missing the 2-second timeout used elsewhere and can hang on dead sockets; wrap
the unsubscribe().await invocation in the same
tokio::time::timeout(Duration::from_secs(2), unsubscribe()).await call (as done
at the other call-sites around lines 569 and 728) and ignore or log the timeout
result since unsubscribe resolves to (), ensuring the reconnect flow cannot
block indefinitely.
Amp-Thread-ID: https://ampcode.com/threads/T-019c419c-5b98-774b-b9d6-40733b964154 Co-authored-by: Amp <amp@ampcode.com>
* master: chore: add subscription activation and per-program metrics (#929)
bmuddha
left a comment
There was a problem hiding this comment.
Overall solid, with some minor nitpicks, I feel need to be addressed before merging.
Summary
Implement WebSocket connection pooling for PubSub subscriptions to improve resource efficiency
and connection management. Introduces a generic connection pool abstraction that efficiently
manages multiple concurrent subscriptions across pooled connections.
This limits max subscriptions to 100 for helius pubsub.
Details
Connection Pooling Architecture
A new
PubSubConnectionPoolmanages multiple WebSocket connections, distributing subscriptionsacross them based on per-stream limits. This prevents connection saturation and enables
horizontal scaling of subscriptions.
Abstraction Layer
Extracted pubsub connection logic into a trait-based design (
PubsubConnection) allowing theChainPubsubActorto work with generic connection implementations. This improves testability and allows for different connection strategies.Integration
ChainPubsubActornow uses the pooled connection interface instead of direct client accessChainPubsubClientby removing pooling responsibilitiesTesting
Added comprehensive test suite covering:
Summary by CodeRabbit
New Features
Improvements
Tests
Removals