diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index c93fadf4c..a18024cfd 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -3,8 +3,7 @@ use std::ops::Deref; use std::sync::Arc; use std::time::Duration; -use futures::never::Never; -use futures::{FutureExt, TryFutureExt}; +use futures::TryFutureExt; use miden_node_proto::domain::batch::BatchInputs; use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; @@ -40,7 +39,7 @@ pub struct BatchBuilder { /// state and are immediately available for a new batch building job. /// /// See also: [`BatchBuilder::wait_for_available_worker`]. - worker_pool: JoinSet<()>, + worker_pool: JoinSet>, batch_interval: Duration, /// The batch prover to use. /// @@ -69,7 +68,7 @@ impl BatchBuilder { // It is important that the worker pool is filled to capacity with ready workers. See // `Self::worker_pool` and `Self::wait_for_available_worker` for more context. - let worker_pool = std::iter::repeat_n(std::future::ready(()), num_workers.get()).collect(); + let worker_pool = (0..num_workers.get()).map(|_| std::future::ready(Ok(()))).collect(); Self { batch_interval, @@ -85,7 +84,7 @@ impl BatchBuilder { /// A pool of batch-proving workers is spawned, which are fed new batch jobs periodically. /// A batch is skipped if there are no available workers, or if there are no transactions /// available to batch. - pub async fn run(mut self, mempool: SharedMempool) { + pub async fn run(mut self, mempool: SharedMempool) -> anyhow::Result<()> { assert!( self.failure_rate < 1.0 && self.failure_rate.is_sign_positive(), "Failure rate must be a percentage" @@ -99,15 +98,15 @@ impl BatchBuilder { loop { interval.tick().await; - self.build_batch(mempool.clone()).await; + self.build_batch(mempool.clone()).await?; } } #[instrument(parent = None, target = COMPONENT, name = "batch_builder.build_batch", skip_all)] - async fn build_batch(&mut self, mempool: SharedMempool) { + async fn build_batch(&mut self, mempool: SharedMempool) -> Result<(), BuildBatchError> { Span::current().set_attribute("workers.count", self.worker_pool.len()); - self.wait_for_available_worker().await; + self.wait_for_available_worker().await?; let job = BatchJob { failure_rate: self.failure_rate, @@ -118,6 +117,8 @@ impl BatchBuilder { self.worker_pool .spawn(async move { job.build_batch().await }.instrument(tracing::Span::current())); + + Ok(()) } /// Waits for a new batch building worker to become available. @@ -132,13 +133,16 @@ impl BatchBuilder { /// design was chosen instead as it removes this branching logic by "always" having the pool /// at max capacity. Instead completed workers wait to be culled by this function. #[instrument(target = COMPONENT, name = "batch_builder.wait_for_available_worker", skip_all)] - async fn wait_for_available_worker(&mut self) { + async fn wait_for_available_worker(&mut self) -> Result<(), BuildBatchError> { // We must crash here because otherwise we have a batch that has been selected from the // mempool, but which is now in limbo. This effectively corrupts the mempool. - if let Err(crash) = self.worker_pool.join_next().await.expect("worker pool is never empty") - { - tracing::error!(message=%crash, "Batch worker pool panic'd"); - panic!("Batch worker pool panic: {crash}"); + match self.worker_pool.join_next().await.expect("worker pool is never empty") { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(crash) => { + tracing::error!(message=%crash, "Batch worker pool panic'd"); + panic!("Batch worker pool panic: {crash}"); + }, } } } @@ -151,7 +155,7 @@ impl BatchBuilder { /// It is entirely self-contained and performs the full batch creation flow, from selecting the /// batch from the [`Mempool`] up to and including submitting the results back to the [`Mempool`]. /// -/// Errors are also handled internally and are not propagated up. +/// Recoverable errors are handled internally. Mempool poison is propagated as a fatal error. struct BatchJob { /// Simulated block failure rate as a percentage. /// @@ -163,17 +167,18 @@ struct BatchJob { } impl BatchJob { - async fn build_batch(&self) { - let Some(batch) = self.select_batch().instrument(Span::current()).await else { + async fn build_batch(&self) -> Result<(), BuildBatchError> { + let Some(batch) = self.select_batch()? else { tracing::info!("No transactions available."); - return; + return Ok(()); }; batch.inject_telemetry(); let batch_id = batch.id(); - self.get_batch_inputs(batch) - .and_then(|(txs, inputs)| Self::propose_batch(txs, inputs) ) + let result = self + .get_batch_inputs(batch) + .and_then(|(txs, inputs)| Self::propose_batch(txs, inputs)) .inspect_ok(TelemetryInjectorExt::inject_telemetry) .and_then(|proposed| self.prove_batch(proposed)) @@ -181,19 +186,25 @@ impl BatchJob { // called. The system cannot handle errors after it considers the process complete // (which makes sense). .and_then(|x| self.inject_failure(x)) - .and_then(|proven_batch| async { self.commit_batch(proven_batch).await; Ok(()) }) + .and_then(|proven_batch| async { self.commit_batch(proven_batch) }) // Handle errors by propagating the error to the root span and rolling back the batch. .inspect_err(|err| Span::current().set_error(err)) - .or_else(|_err| self.rollback_batch(batch_id).never_error()) - // Error has been handled, this is just type manipulation to remove the result wrapper. - .unwrap_or_else(|_: Never| ()) .instrument(Span::current()) .await; + + match result { + Ok(()) => Ok(()), + Err(err @ BuildBatchError::MempoolPoisoned(_)) => Err(err), + Err(_) => { + self.rollback_batch(batch_id)?; + Ok(()) + }, + } } #[instrument(target = COMPONENT, name = "batch_builder.select_batch", skip_all)] - async fn select_batch(&self) -> Option { - self.mempool.lock().await.select_batch() + fn select_batch(&self) -> Result, BuildBatchError> { + Ok(self.mempool.lock().map_err(BuildBatchError::MempoolPoisoned)?.select_batch()) } #[instrument(target = COMPONENT, name = "batch_builder.get_batch_inputs", skip_all, err)] @@ -286,13 +297,21 @@ impl BatchJob { } #[instrument(target = COMPONENT, name = "batch_builder.commit_batch", skip_all)] - async fn commit_batch(&self, batch: Arc) { - self.mempool.lock().await.commit_batch(batch); + fn commit_batch(&self, batch: Arc) -> Result<(), BuildBatchError> { + self.mempool + .lock() + .map_err(BuildBatchError::MempoolPoisoned)? + .commit_batch(batch); + Ok(()) } #[instrument(target = COMPONENT, name = "batch_builder.rollback_batch", skip_all)] - async fn rollback_batch(&self, batch_id: BatchId) { - self.mempool.lock().await.rollback_batch(batch_id); + fn rollback_batch(&self, batch_id: BatchId) -> Result<(), BuildBatchError> { + self.mempool + .lock() + .map_err(BuildBatchError::MempoolPoisoned)? + .rollback_batch(batch_id); + Ok(()) } } diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index 08ebd38f5..00611ebcd 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -2,7 +2,6 @@ use std::ops::Deref; use std::sync::Arc; use anyhow::Context; -use futures::FutureExt; use miden_node_utils::spawn::spawn_blocking_in_current_span; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::batch::{OrderedBatches, ProvenBatch}; @@ -82,12 +81,16 @@ impl BlockBuilder { // Exit if a fatal error occurred. // // No need for error logging since this is handled inside the function. - if let err @ Err(BuildBlockError::Desync { local_chain_tip, .. }) = - self.build_block(&mempool).await - { - return err.with_context(|| { - format!("fatal error while building block {}", local_chain_tip.child()) - }); + match self.build_block(&mempool).await { + Err(err @ BuildBlockError::Desync { local_chain_tip, .. }) => { + return Err(err).with_context(|| { + format!("fatal error while building block {}", local_chain_tip.child()) + }); + }, + Err(err @ BuildBlockError::MempoolPoisoned(_)) => { + return Err(err).context("fatal error while accessing mempool"); + }, + Err(_) | Ok(()) => {}, } } } @@ -107,7 +110,8 @@ impl BlockBuilder { async fn build_block(&self, mempool: &SharedMempool) -> Result<(), BuildBlockError> { use futures::TryFutureExt; - let selected = Self::select_block(mempool).inspect(SelectedBlock::inject_telemetry).await; + let selected = Self::select_block(mempool)?; + selected.inject_telemetry(); let block_num = selected.block_number; self.get_block_inputs(selected) @@ -121,15 +125,15 @@ impl BlockBuilder { // Handle errors by propagating the error to the root span and rolling back the block. .inspect_err(|err| Span::current().set_error(err)) .or_else(|err| async { - self.rollback_block(mempool, block_num).await; + Self::rollback_block(mempool, block_num)?; Err(err) }) .await } #[instrument(target = COMPONENT, name = "block_builder.select_block", skip_all)] - async fn select_block(mempool: &SharedMempool) -> SelectedBlock { - mempool.lock().await.select_block() + fn select_block(mempool: &SharedMempool) -> Result { + Ok(mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.select_block()) } /// Fetches block inputs from the store for the [`SelectedBlock`]. @@ -267,14 +271,15 @@ impl BlockBuilder { .map_err(BuildBlockError::StoreApplyBlockFailed)?; let (header, ..) = signed_block.into_parts(); - mempool.lock().await.commit_block(header); + mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(header); Ok(()) } #[instrument(target = COMPONENT, name = "block_builder.rollback_block", skip_all)] - async fn rollback_block(&self, mempool: &SharedMempool, block: BlockNumber) { - mempool.lock().await.rollback_block(block); + fn rollback_block(mempool: &SharedMempool, block: BlockNumber) -> Result<(), BuildBlockError> { + mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.rollback_block(block); + Ok(()) } } diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index d32550d9c..862a1a2ce 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -12,6 +12,7 @@ use miden_remote_prover_client::RemoteProverClientError; use thiserror::Error; use tokio::task::JoinError; +use crate::mempool::MempoolPoisonError; use crate::validator::ValidatorError; // Block-producer errors @@ -72,6 +73,10 @@ pub enum MempoolSubmissionError { #[error("the mempool is at capacity")] CapacityExceeded, + + #[error("mempool lock is poisoned")] + #[grpc(internal)] + MempoolPoisoned(#[source] MempoolPoisonError), } // Mempool submission conflicts with current state @@ -123,6 +128,9 @@ pub enum BuildBatchError { #[error("batch proof security level is too low: {0} < {1}")] SecurityLevelTooLow(u32, u32), + + #[error("mempool lock is poisoned")] + MempoolPoisoned(#[source] MempoolPoisonError), } // Block building errors @@ -148,6 +156,9 @@ pub enum BuildBlockError { #[error("block signature is invalid")] InvalidSignature, + #[error("mempool lock is poisoned")] + MempoolPoisoned(#[source] MempoolPoisonError), + /// We sometimes randomly inject errors into the batch building process to test our failure /// responses. diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 9395a4111..9ec79c417 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -52,7 +52,7 @@ //! transactions even if the store and block producer momentarily disagree on the chain tip. use std::collections::{HashSet, VecDeque}; use std::num::NonZeroUsize; -use std::sync::Arc; +use std::sync::{Arc, LockResult, Mutex, MutexGuard}; use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_utils::ErrorReport; @@ -60,7 +60,8 @@ use miden_protocol::batch::{BatchId, ProvenBatch}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::transaction::{TransactionHeader, TransactionId}; use subscription::SubscriptionProvider; -use tokio::sync::{Mutex, MutexGuard, mpsc}; +use thiserror::Error; +use tokio::sync::mpsc; use tracing::instrument; use crate::block_builder::SelectedBlock; @@ -90,6 +91,10 @@ mod tests; #[derive(Clone)] pub struct SharedMempool(Arc>); +#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)] +#[error("shared mempool lock is poisoned")] +pub struct MempoolPoisonError; + #[derive(Debug, Clone, PartialEq)] pub struct MempoolConfig { /// The constraints each proposed block must adhere to. @@ -147,13 +152,14 @@ impl Default for MempoolConfig { // ================================================================================================ impl SharedMempool { - /// Acquires an asynchronous lock on the underlying [`Mempool`]. + /// Acquires a lock on the underlying [`Mempool`]. /// /// Callers should minimise the amount of work performed while holding the lock to reduce /// contention with other subsystems that need to access the pool. - #[instrument(target = COMPONENT, name = "mempool.lock", skip_all)] - pub async fn lock(&self) -> MutexGuard<'_, Mempool> { - self.0.lock().await + #[instrument(target = COMPONENT, name = "mempool.lock", skip_all, err)] + pub fn lock(&self) -> Result, MempoolPoisonError> { + let result: LockResult> = self.0.lock(); + result.map_err(|_| MempoolPoisonError) } } diff --git a/crates/block-producer/src/mempool/tests.rs b/crates/block-producer/src/mempool/tests.rs index e1b38b010..4a835d0d4 100644 --- a/crates/block-producer/src/mempool/tests.rs +++ b/crates/block-producer/src/mempool/tests.rs @@ -13,6 +13,20 @@ use crate::test_utils::batch::TransactionBatchConstructor; mod add_transaction; mod add_user_batch; +#[test] +fn shared_mempool_lock_is_poisoned_after_panic() { + let mempool = Mempool::shared(BlockNumber::GENESIS, MempoolConfig::default()); + let poisoned = mempool.clone(); + + let _ = std::thread::spawn(move || { + let _guard = poisoned.lock().expect("fresh mempool lock should not be poisoned"); + panic!("poison shared mempool lock"); + }) + .join(); + + assert!(matches!(mempool.lock(), Err(MempoolPoisonError))); +} + impl Mempool { /// Returns an empty [`Mempool`] and a perfect clone intended for use as the Unit Under Test and /// the reference instance. diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 853875b78..0e9876d59 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -157,10 +157,7 @@ impl BlockProducer { let batch_builder_id = tasks .spawn({ let mempool = mempool.clone(); - async { - batch_builder.run(mempool).await; - Ok(()) - } + async { batch_builder.run(mempool).await } }) .id(); let block_builder_id = tasks @@ -272,7 +269,10 @@ impl BlockProducerRpcServer { interval.tick().await; let (chain_tip, unbatched_transactions, proposed_batches, proven_batches) = { - let mempool = mempool.lock().await; + let Ok(mempool) = mempool.lock() else { + tracing::error!("mempool lock poisoned, stopping mempool stats updater"); + return; + }; ( mempool.chain_tip(), mempool.unbatched_transactions_count() as u64, @@ -301,6 +301,7 @@ impl BlockProducerRpcServer { skip_all, err )] + #[expect(clippy::let_and_return)] async fn submit_proven_tx( &self, request: proto::transaction::ProvenTransaction, @@ -336,7 +337,14 @@ impl BlockProducerRpcServer { .map(Arc::new) .map_err(MempoolSubmissionError::StateConflict)?; - self.mempool.lock().await.lock().await.add_transaction(tx).map(Into::into) + let shared_mempool = self.mempool.lock().await; + // We need the let binding here to avoid E0597 `shared_mempool` does not live long enough + let result = shared_mempool + .lock() + .map_err(MempoolSubmissionError::MempoolPoisoned)? + .add_transaction(tx) + .map(Into::into); + result } #[instrument( @@ -345,6 +353,7 @@ impl BlockProducerRpcServer { skip_all, err )] + #[expect(clippy::let_and_return)] async fn submit_proven_tx_batch( &self, request: proto::transaction::TransactionBatch, @@ -374,7 +383,14 @@ impl BlockProducerRpcServer { txs.push(tx); } - self.mempool.lock().await.lock().await.add_user_batch(&txs).map(Into::into) + let shared_mempool = self.mempool.lock().await; + // We need the let binding here to avoid E0597 `shared_mempool` does not live long enough + let result = shared_mempool + .lock() + .map_err(MempoolSubmissionError::MempoolPoisoned)? + .add_user_batch(&txs) + .map(Into::into); + result } } @@ -422,7 +438,11 @@ impl api_server::Api for BlockProducerRpcServer { &self, _request: tonic::Request<()>, ) -> Result, tonic::Status> { - let subscription = self.mempool.lock().await.lock().await.subscribe(); + let shared_mempool = self.mempool.lock().await; + let subscription = shared_mempool + .lock() + .map_err(|err| tonic::Status::internal(err.to_string()))? + .subscribe(); let subscription = ReceiverStream::new(subscription); Ok(tonic::Response::new(MempoolEventSubscription { inner: subscription }))