fix: Adding lock to init relayer instances#622
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR introduces distributed locking coordination for relayer initialization across multiple instances. It adds per-relayer staleness checks, implements locking-based initialization in persistent storage mode, and falls back to direct initialization in in-memory mode. Redis sync metadata utilities track initialization timestamps to prevent redundant syncs, while a new repository API exposes storage connection details. Changes
Sequence Diagram(s)sequenceDiagram
participant Init as RelayerInitialization
participant Repo as RelayerRepository
participant Redis as Redis (Distributed Lock & Metadata)
participant Service as RelayerService
rect rgba(100, 150, 200, 0.5)
note over Init,Service: Persistent Storage Mode (with Locking)
Init->>Repo: connection_info()
Repo-->>Init: Some((client, prefix))
loop For each relayer
Init->>Redis: is_relayer_recently_synced(prefix, relayer_id)
Redis-->>Init: bool (recent sync check)
alt Not Recently Synced
Init->>Redis: Acquire per-relayer lock (TTL-based)
alt Lock Acquired
Redis-->>Init: Lock acquired
Init->>Service: initialize(relayer_id)
Service-->>Init: Result
Init->>Redis: set_relayer_last_sync(prefix, relayer_id)
Init->>Redis: Release lock
else Lock Held by Other Instance
Redis-->>Init: Lock contention
Init->>Init: Skip (SkippedLockHeld)
end
else Recently Synced
Init->>Init: Skip (SkippedRecentSync)
end
end
end
rect rgba(150, 100, 200, 0.5)
note over Init,Service: In-Memory Mode (no Locking)
Init->>Repo: connection_info()
Repo-->>Init: None
loop For each relayer
Init->>Service: initialize(relayer_id)
Service-->>Init: Result
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## main #622 +/- ##
==========================================
- Coverage 90.97% 90.27% -0.71%
==========================================
Files 288 289 +1
Lines 118548 120108 +1560
==========================================
+ Hits 107852 108428 +576
- Misses 10696 11680 +984
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/bootstrap/config_processor.rs`:
- Around line 529-537: The current wait_for_config_processing_complete call
ignores poll_until's timeout result and always returns Ok(()), allowing startup
to continue without config; update wait_for_config_processing_complete to
propagate poll_until errors (or convert timeout into a Err) instead of
unconditionally returning Ok so the process fails fast when is_redis_populated
polling times out; locate the poll_until invocation in
wait_for_config_processing_complete and return the poll_until result (or map its
timeout into a meaningful error) so the caller cannot proceed with empty
repositories.
In `@src/utils/time.rs`:
- Around line 24-28: The docstring for poll_until is incorrect: it claims the
function can return Err from the check closure, but poll_until logs errors from
the check closure and continues polling, only returning Ok(true) if condition
met or Ok(false) on timeout; update the documentation of poll_until to remove
the `Err(_)` return variant, explicitly state that errors from the `check`
closure are logged and ignored (do not stop polling), and clearly document the
actual return values (Ok(true) when condition met, Ok(false) on timeout).
🧹 Nitpick comments (3)
src/bootstrap/initialize_relayers.rs (3)
169-196: Consider lock expiration during long-running initialization.If initialization takes longer than
BOOTSTRAP_LOCK_TTL_SECS, the lock could expire while initialization is still in progress, allowing another instance to start initializing concurrently. This is a trade-off: a longer TTL risks lock starvation if the holder crashes, while a shorter TTL risks concurrent initialization.The current approach with graceful degradation (proceeding on lock errors) provides resilience, but you may want to document this behavior or consider implementing lock renewal for very large deployments.
257-267: Consider adding concurrency limits for large deployments.Using
join_allruns all relayer initializations concurrently without bounds. For deployments with many relayers, this could overwhelm Redis connections or external services.Consider using
futures::stream::iter(...).buffer_unordered(n)to limit concurrent initializations:♻️ Optional refactor with bounded concurrency
+use futures::stream::{self, StreamExt}; + +const MAX_CONCURRENT_INITS: usize = 10; + async fn run_initialization_batch<...>(...) -> Result<()> { - let futures = relayers.iter().map(|relayer| { + let results: Vec<_> = stream::iter(relayers.iter().map(|relayer| { let app_state = app_state.clone(); let relayer_id = relayer.id.clone(); async move { let result = initialize_relayer(relayer_id.clone(), app_state).await; (relayer_id, result) } - }); - - let results = futures::future::join_all(futures).await; + })) + .buffer_unordered(MAX_CONCURRENT_INITS) + .collect() + .await;
672-683: Consider using environment variable for test Redis URL.The Redis URL is hardcoded to
127.0.0.1:6379. For CI/CD environments or developers using different Redis configurations, consider using an environment variable with a fallback:♻️ Suggested improvement
async fn create_test_redis_pool() -> Option<Arc<Pool>> { - let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:6379"); + let redis_url = std::env::var("TEST_REDIS_URL") + .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + let cfg = deadpool_redis::Config::from_url(&redis_url);
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/bootstrap/config_processor.rs`:
- Around line 507-545: The current wait_for_config_processing_complete uses
is_redis_populated to decide completion but that ignores API keys so minimal
configs can time out; update the completion check in
wait_for_config_processing_complete (or extend is_redis_populated) to also query
the ApiKeyRepositoryTrait (AKR) from the provided app_state to detect whether
any API keys exist (e.g., call a count/list method on the API key repo) and
treat presence of at least one API key as “populated”/complete; keep the
existing polling and timeout behaviour but return completed=true when API keys
are present to avoid false timeouts for empty relayer/signer/network/plugin
sets.
🧹 Nitpick comments (3)
src/bootstrap/initialize_relayers.rs (3)
222-239: Consider adding debug logging when wait completes successfully.The function discards the boolean result from
poll_until, which means successful completion and timeout both proceed silently at this level. Whilepoll_untillogs warnings on timeout, adding a debug log on success would improve observability.♻️ Suggested improvement
- poll_until( + let completed = poll_until( || is_global_init_recently_completed(&conn, &prefix, INIT_STALENESS_THRESHOLD_SECS), max_wait, poll_interval, "initialization", ) .await?; + if completed { + debug!("Another instance completed initialization, proceeding"); + } + Ok(())
257-267: Consider bounded concurrency for large relayer counts.Using
join_allspawns all initialization tasks concurrently without limit. For deployments with many relayers, this could overwhelm connection pools or external services. Consider usingbuffer_unorderedwith a reasonable limit if large-scale deployments are expected.♻️ Suggested approach (if needed in future)
use futures::stream::{self, StreamExt}; const MAX_CONCURRENT_INIT: usize = 10; let results: Vec<_> = stream::iter(futures) .buffer_unordered(MAX_CONCURRENT_INIT) .collect() .await;
672-683: Consider using an environment variable for Redis URL.The hardcoded
redis://127.0.0.1:6379works for local testing but could be made configurable via environment variable for flexibility in different test environments.♻️ Suggested improvement
async fn create_test_redis_pool() -> Option<Arc<Pool>> { - let cfg = deadpool_redis::Config::from_url("redis://127.0.0.1:6379"); + let url = std::env::var("TEST_REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + let cfg = deadpool_redis::Config::from_url(&url);
tirumerla
left a comment
There was a problem hiding this comment.
lgtm, thanks..Added one comment
zeljkoX
left a comment
There was a problem hiding this comment.
Just tested it locally with 2 instances. Looks good.
Sai made a good comment. Let's see how to address it.
|
Hey @NicoMolinaOZ I have merged latest main and added changes to reuse new env var Same approach is used at other places where locks are used. |
Summary
Testing Process
Checklist
Note
If you are using Relayer in your stack, consider adding your team or organization to our list of Relayer Users in the Wild!
Summary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.