refactor: committor service#1263
Conversation
# Conflicts: # magicblock-accounts/src/scheduled_commits_processor.rs # magicblock-api/src/magic_validator.rs
📝 WalkthroughWalkthroughThis PR fundamentally redesigns the intent execution orchestration layer. The old actor-based Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-api/src/magic_sys_adapter.rs`:
- Around line 39-63: Replace the enter() + tokio::spawn + drop(_guard) pattern
in fetch_current_commit_nonces_sync with scheduling the task directly onto the
stored runtime Handle: clone self.handle into a local handle (e.g. let handle =
self.handle.clone()), remove the _guard and enter()/drop calls, and call
handle.spawn(async move { ... }) to run the async block that calls
committor_processor.fetch_current_commit_nonces(&pubkeys,
min_context_slot).await and then sender.send(result). This keeps
committor_processor, pubkeys and sender moved into the async block and avoids
relying on a runtime-guard lifetime.
In `@magicblock-api/src/magic_validator.rs`:
- Around line 423-424: Remove the stale NOTE comment that says "set during
[Self::start]" for the struct field intent_execution_service and update the
comment to reflect that intent_execution_service is constructed earlier in
try_from_config and moved into the struct at construction time (or simply remove
the NOTE altogether); locate the field declaration for intent_execution_service
in the MagicValidator struct and delete or replace the misleading comment so the
code accurately documents initialization happening in try_from_config rather
than Self::start.
- Around line 1009-1016: The shutdown sequence in MagicValidator::stop is wrong:
canceling self.token before stopping the committor/intent pipeline can cause
IntentExecutionService::result_processor to exit on cancellation and drop
in‑flight committor results; reorder shutdown so the committor/intent pipeline
is drained/stopped before calling self.token.cancel() (e.g., drain
processor.subscribe_for_results() or call intent_execution_service.stop() and
await it fully while ensuring result_processor consumes outstanding messages),
and replace the ignored let _ = self.intent_execution_service.stop().await; with
proper error handling/logging (capture the Result from
intent_execution_service.stop().await and log failures via your logger) so
shutdown errors aren’t discarded.
In `@magicblock-committor-service/src/committor_processor.rs`:
- Line 165: The .expect(POISONED_MUTEX_MSG) call on the mutex lock must not
panic; replace it with proper error handling by matching the lock result or
using lock().map_err(|e| CommittorServiceError::PoisonedLock(format!("{}: {:?}",
POISONED_MUTEX_MSG, e)))? (or an equivalent CommittorServiceError variant) at
both lock sites referenced (the call using POISONED_MUTEX_MSG and the other at
the 240-240 location), returning the error up the call chain; alternatively, if
you truly guarantee the lock cannot be poisoned, add an explicit invariant
comment near the lock location documenting why a panic is acceptable and keep
tests demonstrating the invariant.
- Around line 160-193: The code inserts oneshot senders into
self.pending_result_listeners while building `receivers` but doesn't remove them
on error paths, leaking entries and causing permanent `RepeatingMessageError`
for those ids; fix by tracking which ids were inserted while iterating
`intent_bundles` (e.g., push inserted `intent.id`s into a local Vec) and on any
early failure (duplicate detection or after a failing `schedule_intent_bundles`
call) remove those ids from the `pending_result_listeners` map before returning
(or use a scope guard/RAII to automatically rollback); ensure this cleanup
references the same mutex-protected map (`pending_result_listeners`) and handles
the oneshot senders you inserted so future calls can reuse those ids.
- Around line 98-112: The code currently calls
pending_result_listeners.lock().expect(POISONED_MUTEX_MSG) in
execute_intent_bundles and dispatcher; replace those panicking expects with
proper PoisonError handling: in execute_intent_bundles (the function that
returns a Result) map or match the Mutex::lock() result and convert a
PoisonError into a propagated Err (or a clear domain-specific error) instead of
panicking; in dispatcher (the async task spawned with tokio::spawn) match the
lock result and, on poisoning, log an error via the existing logger and
gracefully return/stop the spawned task rather than calling expect. Use the
MutexGuard from Ok(...) for normal flow and reference the same
pending_result_listeners Arc<Mutex<...>> to locate the calls.
In `@magicblock-committor-service/src/service.rs`:
- Line 187: The call to self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG)
uses expect on a potentially poisoned mutex; replace this panic-on-poison with
proper handling by mapping the PoisonError into a controlled Err return (or
propagate a custom error) from the surrounding function, or if you can guarantee
the invariant, add a clear comment documenting why poisoning cannot occur and
convert expect into unwrap_or_else with a documented unreachable!() guarded by
that invariant; specifically update the code paths where intents_meta_map.lock()
and the other occurrence at line ~305 are used (referencing
self.intents_meta_map.lock() and POISONED_MUTEX_MSG) to either return a Result
with an appropriate error variant or to assert the invariant in a
well-documented, reviewed comment rather than calling expect.
- Around line 184-207: When inserting ScheduledBaseIntentMeta entries into
intents_meta_map for each intent in intent_bundles, collect the inserted intent
IDs and ensure you remove those same IDs if
processor.schedule_intent_bundles(...) returns an error; specifically, capture
the inserted IDs (before calling process_undelegation_requests or immediately
after), call process_undelegation_requests, then call schedule_intent_bundles
inside a match/if-let so on Err(e) you lock intents_meta_map again and remove
each collected id (or use retain) to undo the inserts, then return the error;
reference intents_meta_map, ScheduledBaseIntentMeta::new, intent_bundles,
process_undelegation_requests, and processor.schedule_intent_bundles when
locating where to add the cleanup.
In `@magicblock-committor-service/src/service/intent_client.rs`:
- Around line 88-91: The code panics via expect when fetching the MagicContext
account (get_account(&MAGIC_CONTEXT_PUBKEY)) which must be handled or documented
as an invariant; replace the expect by explicit error handling in the function
that calls get_account (e.g.,
match/self.accounts_db.get_account(&MAGIC_CONTEXT_PUBKEY)) and return a proper
error variant (e.g., Err(Self::Error::MissingMagicContext) or map to an existing
error type) when the account is absent, or if a panic is truly intended add a
clear invariant comment next to MAGIC_CONTEXT_PUBKEY and magic_context_acc
explaining why the account is guaranteed to exist in production; ensure you
reference and update the function signature to propagate the error if needed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: edccc0c8-3e2d-4553-b65f-f3d735a7d6c9
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.locktest-integration/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (27)
magicblock-account-cloner/Cargo.tomlmagicblock-account-cloner/src/account_cloner.rsmagicblock-account-cloner/src/lib.rsmagicblock-account-cloner/src/util.rsmagicblock-accounts/Cargo.tomlmagicblock-accounts/src/errors.rsmagicblock-accounts/src/lib.rsmagicblock-accounts/src/scheduled_commits_processor.rsmagicblock-api/src/errors.rsmagicblock-api/src/magic_sys_adapter.rsmagicblock-api/src/magic_validator.rsmagicblock-api/src/tickers.rsmagicblock-committor-service/Cargo.tomlmagicblock-committor-service/src/committor_processor.rsmagicblock-committor-service/src/compute_budget.rsmagicblock-committor-service/src/error.rsmagicblock-committor-service/src/lib.rsmagicblock-committor-service/src/pubkeys_provider.rsmagicblock-committor-service/src/service.rsmagicblock-committor-service/src/service/intent_client.rsmagicblock-committor-service/src/service_ext.rsmagicblock-committor-service/src/stubs/changeset_committor_stub.rsmagicblock-committor-service/src/stubs/mod.rsmagicblock-committor-service/src/transactions.rsmagicblock-processor/tests/replay_base_slot.rstest-integration/test-committor-service/Cargo.tomltest-integration/test-committor-service/tests/test_ix_commit_local.rs
💤 Files with no reviewable changes (13)
- magicblock-committor-service/src/pubkeys_provider.rs
- magicblock-account-cloner/src/lib.rs
- magicblock-committor-service/src/stubs/mod.rs
- magicblock-accounts/src/scheduled_commits_processor.rs
- magicblock-committor-service/src/stubs/changeset_committor_stub.rs
- magicblock-accounts/Cargo.toml
- magicblock-committor-service/src/service_ext.rs
- magicblock-account-cloner/src/util.rs
- magicblock-account-cloner/src/account_cloner.rs
- magicblock-account-cloner/Cargo.toml
- magicblock-accounts/src/lib.rs
- magicblock-committor-service/src/transactions.rs
- magicblock-committor-service/src/compute_budget.rs
| fn fetch_current_commit_nonces_sync( | ||
| &self, | ||
| pubkeys: &[Pubkey], | ||
| min_context_slot: u64, | ||
| ) -> std::sync::mpsc::Receiver<TaskInfoFetcherResult<HashMap<Pubkey, u64>>> | ||
| { | ||
| let (sender, receiver) = std::sync::mpsc::channel(); | ||
| let committor_processor = self.committor_processor.clone(); | ||
| let pubkeys = pubkeys.to_owned(); | ||
|
|
||
| // This is required to switch from TransactionExecutor runtime | ||
| // blocking on it would cause a panic | ||
| let _guard = self.handle.enter(); | ||
| tokio::spawn(async move { | ||
| let result = committor_processor | ||
| .fetch_current_commit_nonces(&pubkeys, min_context_slot) | ||
| .await; | ||
| if let Err(err) = sender.send(result) { | ||
| error!(error = ?err, "Failed to send result back"); | ||
| } | ||
| }); | ||
| drop(_guard); | ||
|
|
||
| receiver | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Prefer handle.spawn over enter() + tokio::spawn + drop.
The enter()/tokio::spawn/drop(_guard) dance works, but its correctness silently depends on the guard being live exactly around the spawn call. Handle::spawn schedules directly onto the target runtime without mutating the current thread's runtime context, removing that footgun and the explanatory comment dependency.
♻️ Proposed refactor
- // This is required to switch from TransactionExecutor runtime
- // blocking on it would cause a panic
- let _guard = self.handle.enter();
- tokio::spawn(async move {
- let result = committor_processor
- .fetch_current_commit_nonces(&pubkeys, min_context_slot)
- .await;
- if let Err(err) = sender.send(result) {
- error!(error = ?err, "Failed to send result back");
- }
- });
- drop(_guard);
-
- receiver
+ // Spawn on the validator runtime handle rather than the caller's
+ // (TransactionExecutor) runtime.
+ self.handle.spawn(async move {
+ let result = committor_processor
+ .fetch_current_commit_nonces(&pubkeys, min_context_slot)
+ .await;
+ if let Err(err) = sender.send(result) {
+ error!(error = ?err, "Failed to send result back");
+ }
+ });
+
+ receiver🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-api/src/magic_sys_adapter.rs` around lines 39 - 63, Replace the
enter() + tokio::spawn + drop(_guard) pattern in
fetch_current_commit_nonces_sync with scheduling the task directly onto the
stored runtime Handle: clone self.handle into a local handle (e.g. let handle =
self.handle.clone()), remove the _guard and enter()/drop calls, and call
handle.spawn(async move { ... }) to run the async block that calls
committor_processor.fetch_current_commit_nonces(&pubkeys,
min_context_slot).await and then sender.send(result). This keeps
committor_processor, pubkeys and sender moved into the async block and avoids
relying on a runtime-guard lifetime.
| // NOTE: set during [Self::start] | ||
| slot_ticker: None, | ||
| committor_service, | ||
| intent_execution_service, |
There was a problem hiding this comment.
Stale comment now misleads.
intent_execution_service is built earlier in try_from_config (line 232) and moved in here at construction time — it is not "set during Self::start". This NOTE is leftover from a field that was previously initialized lazily; remove it to avoid confusion.
📝 Proposed fix
_metrics: (metrics_service, system_metrics_ticker),
- // NOTE: set during [Self::start]
intent_execution_service,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // NOTE: set during [Self::start] | |
| slot_ticker: None, | |
| committor_service, | |
| intent_execution_service, | |
| _metrics: (metrics_service, system_metrics_ticker), | |
| intent_execution_service, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-api/src/magic_validator.rs` around lines 423 - 424, Remove the
stale NOTE comment that says "set during [Self::start]" for the struct field
intent_execution_service and update the comment to reflect that
intent_execution_service is constructed earlier in try_from_config and moved
into the struct at construction time (or simply remove the NOTE altogether);
locate the field declaration for intent_execution_service in the MagicValidator
struct and delete or replace the misleading comment so the code accurately
documents initialization happening in try_from_config rather than Self::start.
| // Ordering is important here | ||
| // Commitor service shall be stopped last | ||
| self.token.cancel(); | ||
| if let Some(ref scheduled_commits_processor) = | ||
| self.scheduled_commits_processor | ||
| { | ||
| let step_start = Instant::now(); | ||
| scheduled_commits_processor.stop(); | ||
| log_timing( | ||
| "shutdown", | ||
| "scheduled_commits_processor_stop", | ||
| step_start, | ||
| ); | ||
| } | ||
| if let Some(ref committor_service) = self.committor_service { | ||
| let step_start = Instant::now(); | ||
| committor_service.stop(); | ||
| log_timing("shutdown", "committor_service_stop", step_start); | ||
| } | ||
|
|
||
| let step_start = Instant::now(); | ||
| // TODO(edwin): handle returned errs, at least log | ||
| let _ = self.intent_execution_service.stop().await; | ||
| log_timing("shutdown", "intent_execution_service_stop", step_start); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Inspect the referenced section
sed -n '980,1045p' magicblock-api/src/magic_validator.rs | nl -ba
# Search for other occurrences of the committor/intent stop ordering comments
rg -n "commitor|committor|intent_execution_service|shutdown|stop\(\)\.await|Failed to stop" magicblock-api/src/magic_validator.rs
# Search for similar shutdown patterns in the repo
rg -n "intent_execution_service.*stop\(\)\.await|token\.cancel\(\)|committor.*stop\(\)" magicblock-api/srcRepository: magicblock-labs/magicblock-validator
Length of output: 123
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Show the referenced section with line numbers using awk
sed -n '980,1045p' magicblock-api/src/magic_validator.rs | awk '{print FNR+979 "\t" $0}'
# Search for related shutdown/order comments and stop() usage in this file
rg -n "commitor|committor|intent_execution_service|token\.cancel\(\)|shutdown|TODO\(edwin\): handle returned errs|stop\(\)\.await|Failed to stop" magicblock-api/src/magic_validator.rs
# Search for similar stop() patterns elsewhere
rg -n "stop\(\)\.await" magicblock-api/src | head -n 50
rg -n "token\.cancel\(\)" magicblock-api/src
rg -n "committor" magicblock-api/srcRepository: magicblock-labs/magicblock-validator
Length of output: 8211
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find the stop() implementation for IntentExecutionServiceImpl
rg -n "impl\s+IntentExecutionServiceImpl|struct\s+IntentExecutionServiceImpl" -S .
# Look for the stop() method definition on IntentExecutionServiceImpl
rg -n "fn\s+stop\s*\(" -S magicblock-*/src magicblock-*/ -g'*.rs' || true
rg -n "intent_execution_service.*stop|IntentExecutionService.*stop" -S . -g'*.rs' || true
# Find committor service shutdown / stop / join methods
rg -n "committor.*(stop|shutdown|cancel|join)" -S . -g'*.rs' || true
# Specifically search for "Commitor service shall be stopped last" and nearby code references
rg -n "Commitor service shall be stopped last" -S . -g'*.rs' || true
# Show the remainder of the stop() method for any committor-related logic (roughly 1004-1105 already partially)
sed -n '1000,1120p' magicblock-api/src/magic_validator.rs | catRepository: magicblock-labs/magicblock-validator
Length of output: 62
🏁 Script executed:
#!/bin/bash
set -u
# Where is the committor service crate?
ls -la
fd -t d -a magicblock_committor_service . || true
fd -t f -i "intent_execution_service" . || true
fd -t f -g'*.rs' magicblock_committor_service . || true
# Find the IntentExecutionServiceImpl type and its stop() method
rg -n "struct\s+IntentExecutionServiceImpl|enum\s+IntentExecutionServiceImpl|type\s+IntentExecutionServiceImpl" -S . -g'*.rs' || true
rg -n "impl\s+.*IntentExecutionServiceImpl" -S . -g'*.rs' || true
rg -n "fn\s+stop\s*\(" -S . -g'*.rs' | rg -n "IntentExecutionServiceImpl|IntentExecutionService" || true
# Find where committor processor/service is stopped/joined/cancelled
rg -n "committor.*(stop|shutdown|cancel|join)" -S . -g'*.rs' || true
rg -n "token\.cancel\(\)" -S . -g'*.rs' || true
# Ensure we still have the original comment text available
rg -n "Commitor service shall be stopped last|Commitor service" -S . -g'*.rs' || trueRepository: magicblock-labs/magicblock-validator
Length of output: 4912
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Show full shutdown/stop method body around the referenced lines
sed -n '930,1110p' magicblock-api/src/magic_validator.rs | nl -ba || sed -n '930,1110p' magicblock-api/src/magic_validator.rs | head -n 220
# Inspect the IntentExecutionServiceImpl alias definition
sed -n '980,1030p' magicblock-api/src/magic_validator.rs
# Inspect committor-service stop() implementation
sed -n '1,220p' magicblock-committor-service/src/service.rs
sed -n '220,520p' magicblock-committor-service/src/service.rs
# Find the exact stop() method in service.rs with surrounding context
rg -n "pub async fn stop\\(" magicblock-committor-service/src/service.rs
rg -n "IntentExecutionServiceError" magicblock-committor-service/src/service.rs
# Also search where the validator shutdown comment is referenced (if any)
rg -n "Commitor service shall be stopped last" -S .Repository: magicblock-labs/magicblock-validator
Length of output: 24293
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find where transaction_execution is declared/initialized
rg -n "transaction_execution" magicblock-api/src/magic_validator.rs
# Show the struct fields around those declarations
sed -n '1,220p' magicblock-api/src/magic_validator.rs
sed -n '220,560p' magicblock-api/src/magic_validator.rs
# Show where transaction_execution is created/spawned (search nearby)
# We'll print a wider range around matches for transaction_execution
rg -n "transaction_execution" magicblock-api/src/magic_validator.rs | cut -d: -f1 | sort -n | uniq | while read -r line; do
start=$((line-40)); end=$((line+80));
echo "---- around $line ----"
sed -n "${start},${end}p" magicblock-api/src/magic_validator.rs
doneRepository: magicblock-labs/magicblock-validator
Length of output: 49128
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Locate CommittorProcessor implementation
rg -n "struct\s+CommittorProcessor|impl\s+CommittorProcessor|subscribe_for_results" -S magicblock-committor-service magicblock-committor-program magicblock-api magicblock-core magicblock-processor -g'*.rs'
# Show relevant parts around subscribe_for_results / result channel behavior
rg -n "subscribe_for_results" -S magicblock-committor-service -g'*.rs'
# Find BroadcastedIntentExecutionResult and how it is sent
rg -n "BroadcastedIntentExecutionResult" -S magicblock-committor-service magicblock-api -g'*.rs'
# Search for where cancellation is used in committor processor / workers
rg -n "CancellationToken" -S magicblock-committor-service -g'*.rs'
# Additionally, inspect transaction scheduler shutdown handling to see if it drains in-flight results
rg -n "shutdown|CancellationToken|cancelled\(\)" -S magicblock-processor magicblock-core magicblock-api/src -g'*.rs' | head -n 80Repository: magicblock-labs/magicblock-validator
Length of output: 7896
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Inspect committor processor / scheduler shutdown behavior
sed -n '1,260p' magicblock-committor-service/src/committor_processor.rs
sed -n '260,520p' magicblock-committor-service/src/committor_processor.rs
sed -n '1,220p' magicblock-committor-service/src/intent_execution_manager.rs
sed -n '1,240p' magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs
# 2) Inspect IntentExecutionService stop wiring (already partly known, but confirm token/cancel semantics)
sed -n '1,220p' magicblock-committor-service/src/service.rs
sed -n '220,420p' magicblock-committor-service/src/service.rs
# 3) Inspect InternalIntentRpcClient implementation to see what "notify_commit_sent" does
rg -n "struct\s+InternalIntentRpcClient|InternalIntentRpcClient" magicblock-api/src -g'*.rs'
rg -n "notify_commit_sent" magicblock-api/src -g'*.rs'
# Print the file(s) containing InternalIntentRpcClient + notify_commit_sent
fd -i "intent_client.rs" magicblock-api/src || true
rg -n "pub\s+struct\s+InternalIntentRpcClient|impl\s+InternalIntentRpcClient" magicblock-api/src -g'*.rs' || true
rg -n "notify_commit_sent" magicblock-api/src -g'*.rs' || trueRepository: magicblock-labs/magicblock-validator
Length of output: 34949
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find notify_commit_sent implementation
rg -n "notify_commit_sent" magicblock-api/src -g'*.rs'
# Find where BroadcastedIntentExecutionResult is sent into broadcast channel
rg -n "result_sender\.send\(|broadcast::Sender" magicblock-committor-service/src/intent_execution_manager -g'*.rs'
rg -n "result_sender" magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs -g'*.rs'
# Inspect the remainder of intent_execution_engine.rs where execute() is implemented/sends results
sed -n '240,420p' magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs
sed -n '420,620p' magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs
# Search for any explicit shutdown/cancellation token usage in the committor intent execution engine
rg -n "CancellationToken|cancelled\(\)|shutdown" magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs -g'*.rs'Repository: magicblock-labs/magicblock-validator
Length of output: 62
🏁 Script executed:
#!/bin/bash
set -u
# Find notify_commit_sent implementation (should be in InternalIntentRpcClient)
rg -n "notify_commit_sent" magicblock-api/src -g'*.rs' || true
# Find where BroadcastedIntentExecutionResult is sent into broadcast channel
rg -n "result_sender\.send\(|result_sender\.send" magicblock-committor-service/src/intent_execution_manager -g'*.rs' || true
rg -n "BroadcastedIntentExecutionResult::new" magicblock-committor-service/src/intent_execution_manager -g'*.rs' || true
# Inspect parts of intent_execution_engine.rs that likely contain the send
sed -n '1,220p' magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs
sed -n '220,420p' magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs
sed -n '420,720p' magicblock-committor-service/src/intent_execution_manager/intent_execution_engine.rs
# Inspect InternalIntentRpcClient file(s) for send/commit behavior
fd -i "intent_client.rs" magicblock-api/src || true
rg -n "struct\s+InternalIntentRpcClient|impl\s+InternalIntentRpcClient" magicblock-api/src -g'*.rs' || true
rg -n "InternalIntentRpcClient" magicblock-api/src -g'*.rs' | head -n 50 || trueRepository: magicblock-labs/magicblock-validator
Length of output: 25787
Fix shutdown ordering and log intent_execution_service.stop() errors in MagicValidator::stop
- The shutdown comment says “Commitor service shall be stopped last”, but
MagicValidator::stopcancelsself.tokenand then stopsintent_execution_service;IntentExecutionService’sresult_processorreturns immediately oncancellation_token.cancelled()without drainingprocessor.subscribe_for_results(), so any in-flight committor results can be lost before the committor side is fully shut down. Confirm the intended invariant and adjust shutdown sequencing/draining accordingly. let _ = self.intent_execution_service.stop().await;discards theResult; log/handle the error instead.
📝 Proposed fix for the ignored error
let step_start = Instant::now();
- // TODO(edwin): handle returned errs, at least log
- let _ = self.intent_execution_service.stop().await;
+ if let Err(err) = self.intent_execution_service.stop().await {
+ error!(error = ?err, "Failed to stop intent execution service");
+ }
log_timing("shutdown", "intent_execution_service_stop", step_start);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Ordering is important here | |
| // Commitor service shall be stopped last | |
| self.token.cancel(); | |
| if let Some(ref scheduled_commits_processor) = | |
| self.scheduled_commits_processor | |
| { | |
| let step_start = Instant::now(); | |
| scheduled_commits_processor.stop(); | |
| log_timing( | |
| "shutdown", | |
| "scheduled_commits_processor_stop", | |
| step_start, | |
| ); | |
| } | |
| if let Some(ref committor_service) = self.committor_service { | |
| let step_start = Instant::now(); | |
| committor_service.stop(); | |
| log_timing("shutdown", "committor_service_stop", step_start); | |
| } | |
| let step_start = Instant::now(); | |
| // TODO(edwin): handle returned errs, at least log | |
| let _ = self.intent_execution_service.stop().await; | |
| log_timing("shutdown", "intent_execution_service_stop", step_start); | |
| let step_start = Instant::now(); | |
| if let Err(err) = self.intent_execution_service.stop().await { | |
| error!(error = ?err, "Failed to stop intent execution service"); | |
| } | |
| log_timing("shutdown", "intent_execution_service_stop", step_start); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-api/src/magic_validator.rs` around lines 1009 - 1016, The shutdown
sequence in MagicValidator::stop is wrong: canceling self.token before stopping
the committor/intent pipeline can cause IntentExecutionService::result_processor
to exit on cancellation and drop in‑flight committor results; reorder shutdown
so the committor/intent pipeline is drained/stopped before calling
self.token.cancel() (e.g., drain processor.subscribe_for_results() or call
intent_execution_service.stop() and await it fully while ensuring
result_processor consumes outstanding messages), and replace the ignored let _ =
self.intent_execution_service.stop().await; with proper error handling/logging
(capture the Result from intent_execution_service.stop().await and log failures
via your logger) so shutdown errors aren’t discarded.
| // Critical section | ||
| let receivers = { | ||
| let mut result_listeners = self | ||
| .pending_result_listeners | ||
| .lock() | ||
| .expect(POISONED_MUTEX_MSG); | ||
|
|
||
| intent_bundles | ||
| .iter() | ||
| .map(|intent| { | ||
| let (sender, receiver) = oneshot::channel(); | ||
| match result_listeners.entry(intent.id) { | ||
| Entry::Vacant(vacant) => { | ||
| vacant.insert(sender); | ||
| Ok(receiver) | ||
| } | ||
| Entry::Occupied(_) => { | ||
| Err(CommittorServiceError::RepeatingMessageError( | ||
| intent.id, | ||
| )) | ||
| } | ||
| } | ||
| }) | ||
| .collect::<Result<Vec<_>, _>>()? | ||
| }; | ||
|
|
||
| self.schedule_intent_bundles(intent_bundles).await?; | ||
| let results = join_all(receivers.into_iter()) | ||
| .await | ||
| .into_iter() | ||
| .collect::<Result<Vec<_>, RecvError>>()?; | ||
|
|
||
| Ok(results) | ||
| } |
There was a problem hiding this comment.
Senders leak in pending_result_listeners on the error paths, permanently poisoning those message_ids.
In the critical section, senders are inserted into the map as entries are found vacant. If a later entry is Occupied (duplicate), collect short-circuits with RepeatingMessageError, but the senders already inserted for earlier ids remain in the map. Likewise, if schedule_intent_bundles fails (Line 186 ?), all inserted senders stay. In both cases the dispatcher never receives a matching result, so the senders are never removed: memory leaks, and any future call reusing those ids will fail forever with RepeatingMessageError. Roll back inserted ids on every early return.
🛡️ Proposed cleanup on error paths
pub async fn execute_intent_bundles(
&self,
intent_bundles: Vec<ScheduledIntentBundle>,
) -> CommittorServiceResult<Vec<BroadcastedIntentExecutionResult>> {
+ let ids: Vec<u64> = intent_bundles.iter().map(|i| i.id).collect();
// Critical section
let receivers = {
let mut result_listeners = self
.pending_result_listeners
.lock()
.expect(POISONED_MUTEX_MSG);
- intent_bundles
- .iter()
- .map(|intent| {
- let (sender, receiver) = oneshot::channel();
- match result_listeners.entry(intent.id) {
- Entry::Vacant(vacant) => {
- vacant.insert(sender);
- Ok(receiver)
- }
- Entry::Occupied(_) => {
- Err(CommittorServiceError::RepeatingMessageError(
- intent.id,
- ))
- }
- }
- })
- .collect::<Result<Vec<_>, _>>()?
+ let mut receivers = Vec::with_capacity(ids.len());
+ for (idx, &id) in ids.iter().enumerate() {
+ match result_listeners.entry(id) {
+ Entry::Vacant(vacant) => {
+ let (sender, receiver) = oneshot::channel();
+ vacant.insert(sender);
+ receivers.push(receiver);
+ }
+ Entry::Occupied(_) => {
+ // roll back senders inserted in this call
+ for prev in &ids[..idx] {
+ result_listeners.remove(prev);
+ }
+ return Err(
+ CommittorServiceError::RepeatingMessageError(id),
+ );
+ }
+ }
+ }
+ receivers
};
- self.schedule_intent_bundles(intent_bundles).await?;
+ if let Err(err) = self.schedule_intent_bundles(intent_bundles).await {
+ let mut listeners =
+ self.pending_result_listeners.lock().expect(POISONED_MUTEX_MSG);
+ for id in &ids {
+ listeners.remove(id);
+ }
+ return Err(err);
+ }
let results = join_all(receivers.into_iter())🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-committor-service/src/committor_processor.rs` around lines 160 -
193, The code inserts oneshot senders into self.pending_result_listeners while
building `receivers` but doesn't remove them on error paths, leaking entries and
causing permanent `RepeatingMessageError` for those ids; fix by tracking which
ids were inserted while iterating `intent_bundles` (e.g., push inserted
`intent.id`s into a local Vec) and on any early failure (duplicate detection or
after a failing `schedule_intent_bundles` call) remove those ids from the
`pending_result_listeners` map before returning (or use a scope guard/RAII to
automatically rollback); ensure this cleanup references the same mutex-protected
map (`pending_result_listeners`) and handles the oneshot senders you inserted so
future calls can reuse those ids.
| // Add metas for intent we schedule | ||
| let pubkeys_being_undelegated = { | ||
| let mut intent_metas = | ||
| self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG); | ||
| let mut pubkeys_being_undelegated = HashSet::<Pubkey>::new(); | ||
|
|
||
| fn subscribe_for_results( | ||
| &self, | ||
| ) -> oneshot::Receiver<broadcast::Receiver<BroadcastedIntentExecutionResult>> | ||
| { | ||
| let (tx, rx) = oneshot::channel(); | ||
| self.try_send(CommittorMessage::SubscribeForResults { respond_to: tx }); | ||
| rx | ||
| intent_bundles.iter().for_each(|intent| { | ||
| intent_metas | ||
| .insert(intent.id, ScheduledBaseIntentMeta::new(intent)); | ||
| if let Some(undelegate) = intent.get_undelegate_intent_pubkeys() | ||
| { | ||
| pubkeys_being_undelegated.extend(undelegate); | ||
| } | ||
| }); | ||
|
|
||
| pubkeys_being_undelegated.into_iter().collect::<Vec<_>>() | ||
| }; | ||
|
|
||
| self.process_undelegation_requests(pubkeys_being_undelegated) | ||
| .await; | ||
| self.processor | ||
| .schedule_intent_bundles(intent_bundles) | ||
| .await?; | ||
| Ok(()) |
There was a problem hiding this comment.
Intent metas leak from intents_meta_map when scheduling fails.
Metas are inserted at Lines 190-197, but if processor.schedule_intent_bundles errors at Line 204, the ? returns without removing them. Since the matching execution result never arrives, result_processor never removes those entries, so the map grows unbounded across repeated scheduling failures. Remove the just-inserted ids before propagating the error.
🛡️ Proposed cleanup
self.process_undelegation_requests(pubkeys_being_undelegated)
.await;
- self.processor
- .schedule_intent_bundles(intent_bundles)
- .await?;
- Ok(())
+ let ids: Vec<u64> = intent_bundles.iter().map(|i| i.id).collect();
+ if let Err(err) =
+ self.processor.schedule_intent_bundles(intent_bundles).await
+ {
+ let mut metas =
+ self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG);
+ for id in &ids {
+ metas.remove(id);
+ }
+ return Err(err);
+ }
+ Ok(())📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Add metas for intent we schedule | |
| let pubkeys_being_undelegated = { | |
| let mut intent_metas = | |
| self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG); | |
| let mut pubkeys_being_undelegated = HashSet::<Pubkey>::new(); | |
| fn subscribe_for_results( | |
| &self, | |
| ) -> oneshot::Receiver<broadcast::Receiver<BroadcastedIntentExecutionResult>> | |
| { | |
| let (tx, rx) = oneshot::channel(); | |
| self.try_send(CommittorMessage::SubscribeForResults { respond_to: tx }); | |
| rx | |
| intent_bundles.iter().for_each(|intent| { | |
| intent_metas | |
| .insert(intent.id, ScheduledBaseIntentMeta::new(intent)); | |
| if let Some(undelegate) = intent.get_undelegate_intent_pubkeys() | |
| { | |
| pubkeys_being_undelegated.extend(undelegate); | |
| } | |
| }); | |
| pubkeys_being_undelegated.into_iter().collect::<Vec<_>>() | |
| }; | |
| self.process_undelegation_requests(pubkeys_being_undelegated) | |
| .await; | |
| self.processor | |
| .schedule_intent_bundles(intent_bundles) | |
| .await?; | |
| Ok(()) | |
| self.process_undelegation_requests(pubkeys_being_undelegated) | |
| .await; | |
| let ids: Vec<u64> = intent_bundles.iter().map(|i| i.id).collect(); | |
| if let Err(err) = | |
| self.processor.schedule_intent_bundles(intent_bundles).await | |
| { | |
| let mut metas = | |
| self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG); | |
| for id in &ids { | |
| metas.remove(id); | |
| } | |
| return Err(err); | |
| } | |
| Ok(()) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-committor-service/src/service.rs` around lines 184 - 207, When
inserting ScheduledBaseIntentMeta entries into intents_meta_map for each intent
in intent_bundles, collect the inserted intent IDs and ensure you remove those
same IDs if processor.schedule_intent_bundles(...) returns an error;
specifically, capture the inserted IDs (before calling
process_undelegation_requests or immediately after), call
process_undelegation_requests, then call schedule_intent_bundles inside a
match/if-let so on Err(e) you lock intents_meta_map again and remove each
collected id (or use retain) to undo the inserts, then return the error;
reference intents_meta_map, ScheduledBaseIntentMeta::new, intent_bundles,
process_undelegation_requests, and processor.schedule_intent_bundles when
locating where to add the cleanup.
| // Add metas for intent we schedule | ||
| let pubkeys_being_undelegated = { | ||
| let mut intent_metas = | ||
| self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG); |
There was a problem hiding this comment.
.expect() on poisoned-mutex paths needs handling or a documented invariant.
Map the poisoned-lock case to an error or add an explicit invariant comment justifying that the lock is only held across non-panicking sections.
As per coding guidelines, any usage of .unwrap() or .expect() in production Rust code under {magicblock-*,programs,storage-proto}/** is a MAJOR issue and must not be treated as trivial; require proper error handling or explicit justification with invariants.
Also applies to: 305-305
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@magicblock-committor-service/src/service.rs` at line 187, The call to
self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG) uses expect on a
potentially poisoned mutex; replace this panic-on-poison with proper handling by
mapping the PoisonError into a controlled Err return (or propagate a custom
error) from the surrounding function, or if you can guarantee the invariant, add
a clear comment documenting why poisoning cannot occur and convert expect into
unwrap_or_else with a documented unreachable!() guarded by that invariant;
specifically update the code paths where intents_meta_map.lock() and the other
occurrence at line ~305 are used (referencing self.intents_meta_map.lock() and
POISONED_MUTEX_MSG) to either return a Result with an appropriate error variant
or to assert the invariant in a well-documented, reviewed comment rather than
calling expect.
refactor(committor-service): remove actor pattern and dead code
Summary
Simplifies
magicblock-committor-serviceby removing the Actor pattern (CommittorService/CommittorActor/CommittorMessage/BaseIntentCommittor/CommittorServiceExt) and replacing it with direct use ofCommittorProcessorand a newIntentExecutionService, aligning the committor service with the same service model already used by task-scheduler and actions_callback. No business logic was changed.Breaking Changes
Test Plan
test-integration/test-committor-serviceupdated to use the newCommittorProcessorAPI directly and pass unchanged.Summary by CodeRabbit
Bug Fixes
Refactor