From ab5454324e205bb8b72710f1e3564d93b5f255f3 Mon Sep 17 00:00:00 2001 From: KOVACS Krisztian Date: Wed, 20 May 2026 11:54:41 +0200 Subject: [PATCH 1/5] refactor(mempool): make mempool lock poisonable Currently if a mempool operation panics, its possible to leave the mempool in a corrupted state. This PR changes mempool locking to poisonable, meaning that after a panic with the mempool lock held the corrupted mempool cannot be accessed. (For example, if add transaction panics, then subsequent block and batch building should not proceed.) Changed behavior: - `SharedMempool` now uses `std::sync::Mutex`. - `SharedMempool::lock()` returns `Result, MempoolPoisonError>`. - RPC submit/subscription paths map poison to internal errors. - Batch/block builders propagate poison as fatal. - Mempool-only helper methods are synchronous; `commit_block` remains async for store I/O. Closes #2016 --- .../block-producer/src/batch_builder/mod.rs | 72 +++++++++++-------- .../block-producer/src/block_builder/mod.rs | 40 +++++++---- crates/block-producer/src/errors.rs | 11 +++ crates/block-producer/src/mempool/mod.rs | 18 +++-- crates/block-producer/src/mempool/tests.rs | 14 ++++ crates/block-producer/src/server/mod.rs | 32 ++++++--- 6 files changed, 130 insertions(+), 57 deletions(-) diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index c93fadf4ca..4000af0663 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -3,8 +3,7 @@ use std::ops::Deref; use std::sync::Arc; use std::time::Duration; -use futures::never::Never; -use futures::{FutureExt, TryFutureExt}; +use futures::TryFutureExt; use miden_node_proto::domain::batch::BatchInputs; use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; @@ -40,7 +39,7 @@ pub struct BatchBuilder { /// state and are immediately available for a new batch building job. /// /// See also: [`BatchBuilder::wait_for_available_worker`]. - worker_pool: JoinSet<()>, + worker_pool: JoinSet>, batch_interval: Duration, /// The batch prover to use. /// @@ -69,7 +68,7 @@ impl BatchBuilder { // It is important that the worker pool is filled to capacity with ready workers. See // `Self::worker_pool` and `Self::wait_for_available_worker` for more context. - let worker_pool = std::iter::repeat_n(std::future::ready(()), num_workers.get()).collect(); + let worker_pool = (0..num_workers.get()).map(|_| std::future::ready(Ok(()))).collect(); Self { batch_interval, @@ -85,7 +84,7 @@ impl BatchBuilder { /// A pool of batch-proving workers is spawned, which are fed new batch jobs periodically. /// A batch is skipped if there are no available workers, or if there are no transactions /// available to batch. - pub async fn run(mut self, mempool: SharedMempool) { + pub async fn run(mut self, mempool: SharedMempool) -> anyhow::Result<()> { assert!( self.failure_rate < 1.0 && self.failure_rate.is_sign_positive(), "Failure rate must be a percentage" @@ -99,15 +98,15 @@ impl BatchBuilder { loop { interval.tick().await; - self.build_batch(mempool.clone()).await; + self.build_batch(mempool.clone()).await?; } } #[instrument(parent = None, target = COMPONENT, name = "batch_builder.build_batch", skip_all)] - async fn build_batch(&mut self, mempool: SharedMempool) { + async fn build_batch(&mut self, mempool: SharedMempool) -> Result<(), BuildBatchError> { Span::current().set_attribute("workers.count", self.worker_pool.len()); - self.wait_for_available_worker().await; + self.wait_for_available_worker().await?; let job = BatchJob { failure_rate: self.failure_rate, @@ -118,6 +117,8 @@ impl BatchBuilder { self.worker_pool .spawn(async move { job.build_batch().await }.instrument(tracing::Span::current())); + + Ok(()) } /// Waits for a new batch building worker to become available. @@ -132,13 +133,16 @@ impl BatchBuilder { /// design was chosen instead as it removes this branching logic by "always" having the pool /// at max capacity. Instead completed workers wait to be culled by this function. #[instrument(target = COMPONENT, name = "batch_builder.wait_for_available_worker", skip_all)] - async fn wait_for_available_worker(&mut self) { + async fn wait_for_available_worker(&mut self) -> Result<(), BuildBatchError> { // We must crash here because otherwise we have a batch that has been selected from the // mempool, but which is now in limbo. This effectively corrupts the mempool. - if let Err(crash) = self.worker_pool.join_next().await.expect("worker pool is never empty") - { - tracing::error!(message=%crash, "Batch worker pool panic'd"); - panic!("Batch worker pool panic: {crash}"); + match self.worker_pool.join_next().await.expect("worker pool is never empty") { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(crash) => { + tracing::error!(message=%crash, "Batch worker pool panic'd"); + panic!("Batch worker pool panic: {crash}"); + }, } } } @@ -151,7 +155,7 @@ impl BatchBuilder { /// It is entirely self-contained and performs the full batch creation flow, from selecting the /// batch from the [`Mempool`] up to and including submitting the results back to the [`Mempool`]. /// -/// Errors are also handled internally and are not propagated up. +/// Recoverable errors are handled internally. Mempool poison is propagated as a fatal error. struct BatchJob { /// Simulated block failure rate as a percentage. /// @@ -163,17 +167,18 @@ struct BatchJob { } impl BatchJob { - async fn build_batch(&self) { - let Some(batch) = self.select_batch().instrument(Span::current()).await else { + async fn build_batch(&self) -> Result<(), BuildBatchError> { + let Some(batch) = self.select_batch()? else { tracing::info!("No transactions available."); - return; + return Ok(()); }; batch.inject_telemetry(); let batch_id = batch.id(); - self.get_batch_inputs(batch) - .and_then(|(txs, inputs)| Self::propose_batch(txs, inputs) ) + let result = self + .get_batch_inputs(batch) + .and_then(|(txs, inputs)| Self::propose_batch(txs, inputs)) .inspect_ok(TelemetryInjectorExt::inject_telemetry) .and_then(|proposed| self.prove_batch(proposed)) @@ -181,19 +186,26 @@ impl BatchJob { // called. The system cannot handle errors after it considers the process complete // (which makes sense). .and_then(|x| self.inject_failure(x)) - .and_then(|proven_batch| async { self.commit_batch(proven_batch).await; Ok(()) }) + .and_then(|proven_batch| async { self.commit_batch(proven_batch) }) // Handle errors by propagating the error to the root span and rolling back the batch. .inspect_err(|err| Span::current().set_error(err)) - .or_else(|_err| self.rollback_batch(batch_id).never_error()) - // Error has been handled, this is just type manipulation to remove the result wrapper. - .unwrap_or_else(|_: Never| ()) .instrument(Span::current()) .await; + + match result { + Ok(()) => Ok(()), + Err(err @ BuildBatchError::MempoolPoisoned(_)) => Err(err), + Err(err) => { + self.rollback_batch(batch_id)?; + Span::current().set_error(&err); + Ok(()) + }, + } } #[instrument(target = COMPONENT, name = "batch_builder.select_batch", skip_all)] - async fn select_batch(&self) -> Option { - self.mempool.lock().await.select_batch() + fn select_batch(&self) -> Result, BuildBatchError> { + Ok(self.mempool.lock()?.select_batch()) } #[instrument(target = COMPONENT, name = "batch_builder.get_batch_inputs", skip_all, err)] @@ -286,13 +298,15 @@ impl BatchJob { } #[instrument(target = COMPONENT, name = "batch_builder.commit_batch", skip_all)] - async fn commit_batch(&self, batch: Arc) { - self.mempool.lock().await.commit_batch(batch); + fn commit_batch(&self, batch: Arc) -> Result<(), BuildBatchError> { + self.mempool.lock()?.commit_batch(batch); + Ok(()) } #[instrument(target = COMPONENT, name = "batch_builder.rollback_batch", skip_all)] - async fn rollback_batch(&self, batch_id: BatchId) { - self.mempool.lock().await.rollback_batch(batch_id); + fn rollback_batch(&self, batch_id: BatchId) -> Result<(), BuildBatchError> { + self.mempool.lock()?.rollback_batch(batch_id); + Ok(()) } } diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 08ebd38f5d..94574519a0 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -2,7 +2,6 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::Context; -use futures::FutureExt; use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::batch::{OrderedBatches, ProvenBatch}; @@ -82,12 +81,19 @@ impl BlockBuilder { // Exit if a fatal error occurred. // // No need for error logging since this is handled inside the function. - if let err @ Err(BuildBlockError::Desync { local_chain_tip, .. }) = - self.build_block(&mempool).await - { - return err.with_context(|| { - format!("fatal error while building block {}", local_chain_tip.child()) - }); + match self.build_block(&mempool).await { + Err(err @ BuildBlockError::Desync { local_chain_tip, .. }) => { + return Err(err).with_context(|| { + format!("fatal error while building block {}", local_chain_tip.child()) + }); + }, + Err(err @ BuildBlockError::MempoolPoisoned(_)) => { + return Err(err).context("fatal error while accessing mempool"); + }, + Err(err) => { + Span::current().set_error(&err); + }, + Ok(()) => {}, } } } @@ -107,7 +113,8 @@ impl BlockBuilder { async fn build_block(&self, mempool: &SharedMempool) -> Result<(), BuildBlockError> { use futures::TryFutureExt; - let selected = Self::select_block(mempool).inspect(SelectedBlock::inject_telemetry).await; + let selected = Self::select_block(mempool)?; + selected.inject_telemetry(); let block_num = selected.block_number; self.get_block_inputs(selected) @@ -121,15 +128,15 @@ impl BlockBuilder { // Handle errors by propagating the error to the root span and rolling back the block. .inspect_err(|err| Span::current().set_error(err)) .or_else(|err| async { - self.rollback_block(mempool, block_num).await; + self.rollback_block(mempool, block_num)?; Err(err) }) .await } #[instrument(target = COMPONENT, name = "block_builder.select_block", skip_all)] - async fn select_block(mempool: &SharedMempool) -> SelectedBlock { - mempool.lock().await.select_block() + fn select_block(mempool: &SharedMempool) -> Result { + Ok(mempool.lock()?.select_block()) } /// Fetches block inputs from the store for the [`SelectedBlock`]. @@ -267,14 +274,19 @@ impl BlockBuilder { .map_err(BuildBlockError::StoreApplyBlockFailed)?; let (header, ..) = signed_block.into_parts(); - mempool.lock().await.commit_block(header); + mempool.lock()?.commit_block(header); Ok(()) } #[instrument(target = COMPONENT, name = "block_builder.rollback_block", skip_all)] - async fn rollback_block(&self, mempool: &SharedMempool, block: BlockNumber) { - mempool.lock().await.rollback_block(block); + fn rollback_block( + &self, + mempool: &SharedMempool, + block: BlockNumber, + ) -> Result<(), BuildBlockError> { + mempool.lock()?.rollback_block(block); + Ok(()) } } diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index d32550d9c1..ed8437d77a 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -12,6 +12,7 @@ use miden_remote_prover_client::RemoteProverClientError; use thiserror::Error; use tokio::task::JoinError; +use crate::mempool::MempoolPoisonError; use crate::validator::ValidatorError; // Block-producer errors @@ -72,6 +73,10 @@ pub enum MempoolSubmissionError { #[error("the mempool is at capacity")] CapacityExceeded, + + #[error("mempool lock is poisoned")] + #[grpc(internal)] + MempoolPoisoned(#[source] MempoolPoisonError), } // Mempool submission conflicts with current state @@ -123,6 +128,9 @@ pub enum BuildBatchError { #[error("batch proof security level is too low: {0} < {1}")] SecurityLevelTooLow(u32, u32), + + #[error("mempool lock is poisoned")] + MempoolPoisoned(#[from] MempoolPoisonError), } // Block building errors @@ -148,6 +156,9 @@ pub enum BuildBlockError { #[error("block signature is invalid")] InvalidSignature, + #[error("mempool lock is poisoned")] + MempoolPoisoned(#[from] MempoolPoisonError), + /// We sometimes randomly inject errors into the batch building process to test our failure /// responses. diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 9395a41114..9ec79c417b 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -52,7 +52,7 @@ //! transactions even if the store and block producer momentarily disagree on the chain tip. use std::collections::{HashSet, VecDeque}; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::sync::{Arc, LockResult, Mutex, MutexGuard}; use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_utils::ErrorReport; @@ -60,7 +60,8 @@ use miden_protocol::batch::{BatchId, ProvenBatch}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::transaction::{TransactionHeader, TransactionId}; use subscription::SubscriptionProvider; -use tokio::sync::{Mutex, MutexGuard, mpsc}; +use thiserror::Error; +use tokio::sync::mpsc; use tracing::instrument; use crate::block_builder::SelectedBlock; @@ -90,6 +91,10 @@ mod tests; #[derive(Clone)] pub struct SharedMempool(Arc>); +#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)] +#[error("shared mempool lock is poisoned")] +pub struct MempoolPoisonError; + #[derive(Debug, Clone, PartialEq)] pub struct MempoolConfig { /// The constraints each proposed block must adhere to. @@ -147,13 +152,14 @@ impl Default for MempoolConfig { // ================================================================================================ impl SharedMempool { - /// Acquires an asynchronous lock on the underlying [`Mempool`]. + /// Acquires a lock on the underlying [`Mempool`]. /// /// Callers should minimise the amount of work performed while holding the lock to reduce /// contention with other subsystems that need to access the pool. - #[instrument(target = COMPONENT, name = "mempool.lock", skip_all)] - pub async fn lock(&self) -> MutexGuard<'_, Mempool> { - self.0.lock().await + #[instrument(target = COMPONENT, name = "mempool.lock", skip_all, err)] + pub fn lock(&self) -> Result, MempoolPoisonError> { + let result: LockResult> = self.0.lock(); + result.map_err(|_| MempoolPoisonError) } } diff --git a/crates/block-producer/src/mempool/tests.rs b/crates/block-producer/src/mempool/tests.rs index e1b38b0108..4a835d0d4c 100644 --- a/crates/block-producer/src/mempool/tests.rs +++ b/crates/block-producer/src/mempool/tests.rs @@ -13,6 +13,20 @@ use crate::test_utils::batch::TransactionBatchConstructor; mod add_transaction; mod add_user_batch; +#[test] +fn shared_mempool_lock_is_poisoned_after_panic() { + let mempool = Mempool::shared(BlockNumber::GENESIS, MempoolConfig::default()); + let poisoned = mempool.clone(); + + let _ = std::thread::spawn(move || { + let _guard = poisoned.lock().expect("fresh mempool lock should not be poisoned"); + panic!("poison shared mempool lock"); + }) + .join(); + + assert!(matches!(mempool.lock(), Err(MempoolPoisonError))); +} + impl Mempool { /// Returns an empty [`Mempool`] and a perfect clone intended for use as the Unit Under Test and /// the reference instance. diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 853875b780..b073cbe2f9 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -157,10 +157,7 @@ impl BlockProducer { let batch_builder_id = tasks .spawn({ let mempool = mempool.clone(); - async { - batch_builder.run(mempool).await; - Ok(()) - } + async { batch_builder.run(mempool).await } }) .id(); let block_builder_id = tasks @@ -272,7 +269,10 @@ impl BlockProducerRpcServer { interval.tick().await; let (chain_tip, unbatched_transactions, proposed_batches, proven_batches) = { - let mempool = mempool.lock().await; + let Ok(mempool) = mempool.lock() else { + tracing::error!("mempool lock poisoned, stopping mempool stats updater"); + return; + }; ( mempool.chain_tip(), mempool.unbatched_transactions_count() as u64, @@ -336,7 +336,13 @@ impl BlockProducerRpcServer { .map(Arc::new) .map_err(MempoolSubmissionError::StateConflict)?; - self.mempool.lock().await.lock().await.add_transaction(tx).map(Into::into) + let shared_mempool = self.mempool.lock().await; + let result = shared_mempool + .lock() + .map_err(MempoolSubmissionError::MempoolPoisoned)? + .add_transaction(tx) + .map(Into::into); + result } #[instrument( @@ -374,7 +380,13 @@ impl BlockProducerRpcServer { txs.push(tx); } - self.mempool.lock().await.lock().await.add_user_batch(&txs).map(Into::into) + let shared_mempool = self.mempool.lock().await; + let result = shared_mempool + .lock() + .map_err(MempoolSubmissionError::MempoolPoisoned)? + .add_user_batch(&txs) + .map(Into::into); + result } } @@ -422,7 +434,11 @@ impl api_server::Api for BlockProducerRpcServer { &self, _request: tonic::Request<()>, ) -> Result, tonic::Status> { - let subscription = self.mempool.lock().await.lock().await.subscribe(); + let shared_mempool = self.mempool.lock().await; + let subscription = shared_mempool + .lock() + .map_err(|err| tonic::Status::internal(err.to_string()))? + .subscribe(); let subscription = ReceiverStream::new(subscription); Ok(tonic::Response::new(MempoolEventSubscription { inner: subscription })) From f6d993806158d7011d1cbdfb4cec00858f912478 Mon Sep 17 00:00:00 2001 From: KOVACS Krisztian Date: Wed, 20 May 2026 15:35:26 +0200 Subject: [PATCH 2/5] chore: fix clippy --- crates/block-producer/src/block_builder/mod.rs | 8 ++------ crates/block-producer/src/server/mod.rs | 4 ++++ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 94574519a0..18aeae0ff4 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -128,7 +128,7 @@ impl BlockBuilder { // Handle errors by propagating the error to the root span and rolling back the block. .inspect_err(|err| Span::current().set_error(err)) .or_else(|err| async { - self.rollback_block(mempool, block_num)?; + Self::rollback_block(mempool, block_num)?; Err(err) }) .await @@ -280,11 +280,7 @@ impl BlockBuilder { } #[instrument(target = COMPONENT, name = "block_builder.rollback_block", skip_all)] - fn rollback_block( - &self, - mempool: &SharedMempool, - block: BlockNumber, - ) -> Result<(), BuildBlockError> { + fn rollback_block(mempool: &SharedMempool, block: BlockNumber) -> Result<(), BuildBlockError> { mempool.lock()?.rollback_block(block); Ok(()) } diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index b073cbe2f9..0e9876d59d 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -301,6 +301,7 @@ impl BlockProducerRpcServer { skip_all, err )] + #[expect(clippy::let_and_return)] async fn submit_proven_tx( &self, request: proto::transaction::ProvenTransaction, @@ -337,6 +338,7 @@ impl BlockProducerRpcServer { .map_err(MempoolSubmissionError::StateConflict)?; let shared_mempool = self.mempool.lock().await; + // We need the let binding here to avoid E0597 `shared_mempool` does not live long enough let result = shared_mempool .lock() .map_err(MempoolSubmissionError::MempoolPoisoned)? @@ -351,6 +353,7 @@ impl BlockProducerRpcServer { skip_all, err )] + #[expect(clippy::let_and_return)] async fn submit_proven_tx_batch( &self, request: proto::transaction::TransactionBatch, @@ -381,6 +384,7 @@ impl BlockProducerRpcServer { } let shared_mempool = self.mempool.lock().await; + // We need the let binding here to avoid E0597 `shared_mempool` does not live long enough let result = shared_mempool .lock() .map_err(MempoolSubmissionError::MempoolPoisoned)? From e5568cf4eace807b39804c9e77998a157810c504 Mon Sep 17 00:00:00 2001 From: KOVACS Krisztian Date: Thu, 21 May 2026 09:43:14 +0200 Subject: [PATCH 3/5] fix: review comments --- crates/block-producer/src/batch_builder/mod.rs | 3 +-- crates/block-producer/src/block_builder/mod.rs | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index 4000af0663..cf6ff5b908 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -195,9 +195,8 @@ impl BatchJob { match result { Ok(()) => Ok(()), Err(err @ BuildBatchError::MempoolPoisoned(_)) => Err(err), - Err(err) => { + Err(_) => { self.rollback_batch(batch_id)?; - Span::current().set_error(&err); Ok(()) }, } diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 18aeae0ff4..629f90a7df 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -90,9 +90,7 @@ impl BlockBuilder { Err(err @ BuildBlockError::MempoolPoisoned(_)) => { return Err(err).context("fatal error while accessing mempool"); }, - Err(err) => { - Span::current().set_error(&err); - }, + Err(_) => {}, Ok(()) => {}, } } From 6835f493ada642ab50d62f869b8074f640c45a9d Mon Sep 17 00:00:00 2001 From: KOVACS Krisztian Date: Thu, 21 May 2026 09:59:19 +0200 Subject: [PATCH 4/5] chore: fix clippy --- crates/block-producer/src/block_builder/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 629f90a7df..904ac010ed 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -90,8 +90,7 @@ impl BlockBuilder { Err(err @ BuildBlockError::MempoolPoisoned(_)) => { return Err(err).context("fatal error while accessing mempool"); }, - Err(_) => {}, - Ok(()) => {}, + Err(_) | Ok(()) => {}, } } } From 7f43ddebdddc1c3de033e66392e60da91d8a1bce Mon Sep 17 00:00:00 2001 From: KOVACS Krisztian Date: Thu, 21 May 2026 10:55:24 +0200 Subject: [PATCH 5/5] fix: review comments --- crates/block-producer/src/batch_builder/mod.rs | 12 +++++++++--- crates/block-producer/src/block_builder/mod.rs | 6 +++--- crates/block-producer/src/errors.rs | 4 ++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index cf6ff5b908..a18024cfdb 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -204,7 +204,7 @@ impl BatchJob { #[instrument(target = COMPONENT, name = "batch_builder.select_batch", skip_all)] fn select_batch(&self) -> Result, BuildBatchError> { - Ok(self.mempool.lock()?.select_batch()) + Ok(self.mempool.lock().map_err(BuildBatchError::MempoolPoisoned)?.select_batch()) } #[instrument(target = COMPONENT, name = "batch_builder.get_batch_inputs", skip_all, err)] @@ -298,13 +298,19 @@ impl BatchJob { #[instrument(target = COMPONENT, name = "batch_builder.commit_batch", skip_all)] fn commit_batch(&self, batch: Arc) -> Result<(), BuildBatchError> { - self.mempool.lock()?.commit_batch(batch); + self.mempool + .lock() + .map_err(BuildBatchError::MempoolPoisoned)? + .commit_batch(batch); Ok(()) } #[instrument(target = COMPONENT, name = "batch_builder.rollback_batch", skip_all)] fn rollback_batch(&self, batch_id: BatchId) -> Result<(), BuildBatchError> { - self.mempool.lock()?.rollback_batch(batch_id); + self.mempool + .lock() + .map_err(BuildBatchError::MempoolPoisoned)? + .rollback_batch(batch_id); Ok(()) } } diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 904ac010ed..00611ebcd2 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -133,7 +133,7 @@ impl BlockBuilder { #[instrument(target = COMPONENT, name = "block_builder.select_block", skip_all)] fn select_block(mempool: &SharedMempool) -> Result { - Ok(mempool.lock()?.select_block()) + Ok(mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.select_block()) } /// Fetches block inputs from the store for the [`SelectedBlock`]. @@ -271,14 +271,14 @@ impl BlockBuilder { .map_err(BuildBlockError::StoreApplyBlockFailed)?; let (header, ..) = signed_block.into_parts(); - mempool.lock()?.commit_block(header); + mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(header); Ok(()) } #[instrument(target = COMPONENT, name = "block_builder.rollback_block", skip_all)] fn rollback_block(mempool: &SharedMempool, block: BlockNumber) -> Result<(), BuildBlockError> { - mempool.lock()?.rollback_block(block); + mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.rollback_block(block); Ok(()) } } diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index ed8437d77a..862a1a2ce9 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -130,7 +130,7 @@ pub enum BuildBatchError { SecurityLevelTooLow(u32, u32), #[error("mempool lock is poisoned")] - MempoolPoisoned(#[from] MempoolPoisonError), + MempoolPoisoned(#[source] MempoolPoisonError), } // Block building errors @@ -157,7 +157,7 @@ pub enum BuildBlockError { InvalidSignature, #[error("mempool lock is poisoned")] - MempoolPoisoned(#[from] MempoolPoisonError), + MempoolPoisoned(#[source] MempoolPoisonError), /// We sometimes randomly inject errors into the batch building process to test our failure /// responses.