Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 48 additions & 29 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

use futures::never::Never;
use futures::{FutureExt, TryFutureExt};
use futures::TryFutureExt;
use miden_node_proto::domain::batch::BatchInputs;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
Expand Down Expand Up @@ -40,7 +39,7 @@ pub struct BatchBuilder {
/// state and are immediately available for a new batch building job.
///
/// See also: [`BatchBuilder::wait_for_available_worker`].
worker_pool: JoinSet<()>,
worker_pool: JoinSet<Result<(), BuildBatchError>>,
batch_interval: Duration,
/// The batch prover to use.
///
Expand Down Expand Up @@ -69,7 +68,7 @@ impl BatchBuilder {

// It is important that the worker pool is filled to capacity with ready workers. See
// `Self::worker_pool` and `Self::wait_for_available_worker` for more context.
let worker_pool = std::iter::repeat_n(std::future::ready(()), num_workers.get()).collect();
let worker_pool = (0..num_workers.get()).map(|_| std::future::ready(Ok(()))).collect();

Self {
batch_interval,
Expand All @@ -85,7 +84,7 @@ impl BatchBuilder {
/// A pool of batch-proving workers is spawned, which are fed new batch jobs periodically.
/// A batch is skipped if there are no available workers, or if there are no transactions
/// available to batch.
pub async fn run(mut self, mempool: SharedMempool) {
pub async fn run(mut self, mempool: SharedMempool) -> anyhow::Result<()> {
assert!(
self.failure_rate < 1.0 && self.failure_rate.is_sign_positive(),
"Failure rate must be a percentage"
Expand All @@ -99,15 +98,15 @@ impl BatchBuilder {

loop {
interval.tick().await;
self.build_batch(mempool.clone()).await;
self.build_batch(mempool.clone()).await?;
}
}

#[instrument(parent = None, target = COMPONENT, name = "batch_builder.build_batch", skip_all)]
async fn build_batch(&mut self, mempool: SharedMempool) {
async fn build_batch(&mut self, mempool: SharedMempool) -> Result<(), BuildBatchError> {
Span::current().set_attribute("workers.count", self.worker_pool.len());

self.wait_for_available_worker().await;
self.wait_for_available_worker().await?;

let job = BatchJob {
failure_rate: self.failure_rate,
Expand All @@ -118,6 +117,8 @@ impl BatchBuilder {

self.worker_pool
.spawn(async move { job.build_batch().await }.instrument(tracing::Span::current()));

Ok(())
}

/// Waits for a new batch building worker to become available.
Expand All @@ -132,13 +133,16 @@ impl BatchBuilder {
/// design was chosen instead as it removes this branching logic by "always" having the pool
/// at max capacity. Instead completed workers wait to be culled by this function.
#[instrument(target = COMPONENT, name = "batch_builder.wait_for_available_worker", skip_all)]
async fn wait_for_available_worker(&mut self) {
async fn wait_for_available_worker(&mut self) -> Result<(), BuildBatchError> {
// We must crash here because otherwise we have a batch that has been selected from the
// mempool, but which is now in limbo. This effectively corrupts the mempool.
if let Err(crash) = self.worker_pool.join_next().await.expect("worker pool is never empty")
{
tracing::error!(message=%crash, "Batch worker pool panic'd");
panic!("Batch worker pool panic: {crash}");
match self.worker_pool.join_next().await.expect("worker pool is never empty") {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(crash) => {
tracing::error!(message=%crash, "Batch worker pool panic'd");
panic!("Batch worker pool panic: {crash}");
},
}
}
}
Expand All @@ -151,7 +155,7 @@ impl BatchBuilder {
/// It is entirely self-contained and performs the full batch creation flow, from selecting the
/// batch from the [`Mempool`] up to and including submitting the results back to the [`Mempool`].
///
/// Errors are also handled internally and are not propagated up.
/// Recoverable errors are handled internally. Mempool poison is propagated as a fatal error.
struct BatchJob {
/// Simulated block failure rate as a percentage.
///
Expand All @@ -163,37 +167,44 @@ struct BatchJob {
}

impl BatchJob {
async fn build_batch(&self) {
let Some(batch) = self.select_batch().instrument(Span::current()).await else {
async fn build_batch(&self) -> Result<(), BuildBatchError> {
let Some(batch) = self.select_batch()? else {
tracing::info!("No transactions available.");
return;
return Ok(());
};

batch.inject_telemetry();
let batch_id = batch.id();

self.get_batch_inputs(batch)
.and_then(|(txs, inputs)| Self::propose_batch(txs, inputs) )
let result = self
.get_batch_inputs(batch)
.and_then(|(txs, inputs)| Self::propose_batch(txs, inputs))
.inspect_ok(TelemetryInjectorExt::inject_telemetry)
.and_then(|proposed| self.prove_batch(proposed))

// Failure must be injected before the final pipeline stage i.e. before commit is
// called. The system cannot handle errors after it considers the process complete
// (which makes sense).
.and_then(|x| self.inject_failure(x))
.and_then(|proven_batch| async { self.commit_batch(proven_batch).await; Ok(()) })
.and_then(|proven_batch| async { self.commit_batch(proven_batch) })
// Handle errors by propagating the error to the root span and rolling back the batch.
.inspect_err(|err| Span::current().set_error(err))
.or_else(|_err| self.rollback_batch(batch_id).never_error())
// Error has been handled, this is just type manipulation to remove the result wrapper.
.unwrap_or_else(|_: Never| ())
.instrument(Span::current())
.await;

match result {
Ok(()) => Ok(()),
Err(err @ BuildBatchError::MempoolPoisoned(_)) => Err(err),
Err(_) => {
self.rollback_batch(batch_id)?;
Ok(())
},
}
}

#[instrument(target = COMPONENT, name = "batch_builder.select_batch", skip_all)]
async fn select_batch(&self) -> Option<SelectedBatch> {
self.mempool.lock().await.select_batch()
fn select_batch(&self) -> Result<Option<SelectedBatch>, BuildBatchError> {
Ok(self.mempool.lock().map_err(BuildBatchError::MempoolPoisoned)?.select_batch())
}

#[instrument(target = COMPONENT, name = "batch_builder.get_batch_inputs", skip_all, err)]
Expand Down Expand Up @@ -286,13 +297,21 @@ impl BatchJob {
}

#[instrument(target = COMPONENT, name = "batch_builder.commit_batch", skip_all)]
async fn commit_batch(&self, batch: Arc<ProvenBatch>) {
self.mempool.lock().await.commit_batch(batch);
fn commit_batch(&self, batch: Arc<ProvenBatch>) -> Result<(), BuildBatchError> {
self.mempool
.lock()
.map_err(BuildBatchError::MempoolPoisoned)?
.commit_batch(batch);
Ok(())
}

#[instrument(target = COMPONENT, name = "batch_builder.rollback_batch", skip_all)]
async fn rollback_batch(&self, batch_id: BatchId) {
self.mempool.lock().await.rollback_batch(batch_id);
fn rollback_batch(&self, batch_id: BatchId) -> Result<(), BuildBatchError> {
self.mempool
.lock()
.map_err(BuildBatchError::MempoolPoisoned)?
.rollback_batch(batch_id);
Ok(())
}
}

Expand Down
33 changes: 19 additions & 14 deletions crates/block-producer/src/block_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::ops::Deref;
use std::sync::Arc;

use anyhow::Context;
use futures::FutureExt;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::batch::{OrderedBatches, ProvenBatch};
Expand Down Expand Up @@ -82,12 +81,16 @@ impl BlockBuilder {
// Exit if a fatal error occurred.
//
// No need for error logging since this is handled inside the function.
if let err @ Err(BuildBlockError::Desync { local_chain_tip, .. }) =
self.build_block(&mempool).await
{
return err.with_context(|| {
format!("fatal error while building block {}", local_chain_tip.child())
});
match self.build_block(&mempool).await {
Err(err @ BuildBlockError::Desync { local_chain_tip, .. }) => {
return Err(err).with_context(|| {
format!("fatal error while building block {}", local_chain_tip.child())
});
},
Err(err @ BuildBlockError::MempoolPoisoned(_)) => {
return Err(err).context("fatal error while accessing mempool");
},
Err(_) | Ok(()) => {},
}
}
}
Expand All @@ -107,7 +110,8 @@ impl BlockBuilder {
async fn build_block(&self, mempool: &SharedMempool) -> Result<(), BuildBlockError> {
use futures::TryFutureExt;

let selected = Self::select_block(mempool).inspect(SelectedBlock::inject_telemetry).await;
let selected = Self::select_block(mempool)?;
selected.inject_telemetry();
let block_num = selected.block_number;

self.get_block_inputs(selected)
Expand All @@ -121,15 +125,15 @@ impl BlockBuilder {
// Handle errors by propagating the error to the root span and rolling back the block.
.inspect_err(|err| Span::current().set_error(err))
.or_else(|err| async {
self.rollback_block(mempool, block_num).await;
Self::rollback_block(mempool, block_num)?;
Err(err)
})
.await
}

#[instrument(target = COMPONENT, name = "block_builder.select_block", skip_all)]
async fn select_block(mempool: &SharedMempool) -> SelectedBlock {
mempool.lock().await.select_block()
fn select_block(mempool: &SharedMempool) -> Result<SelectedBlock, BuildBlockError> {
Ok(mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.select_block())
}

/// Fetches block inputs from the store for the [`SelectedBlock`].
Expand Down Expand Up @@ -267,14 +271,15 @@ impl BlockBuilder {
.map_err(BuildBlockError::StoreApplyBlockFailed)?;

let (header, ..) = signed_block.into_parts();
mempool.lock().await.commit_block(header);
mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.commit_block(header);

Ok(())
}

#[instrument(target = COMPONENT, name = "block_builder.rollback_block", skip_all)]
async fn rollback_block(&self, mempool: &SharedMempool, block: BlockNumber) {
mempool.lock().await.rollback_block(block);
fn rollback_block(mempool: &SharedMempool, block: BlockNumber) -> Result<(), BuildBlockError> {
mempool.lock().map_err(BuildBlockError::MempoolPoisoned)?.rollback_block(block);
Ok(())
}
}

Expand Down
11 changes: 11 additions & 0 deletions crates/block-producer/src/errors.rs
Comment thread
kkovaacs marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use miden_remote_prover_client::RemoteProverClientError;
use thiserror::Error;
use tokio::task::JoinError;

use crate::mempool::MempoolPoisonError;
use crate::validator::ValidatorError;

// Block-producer errors
Expand Down Expand Up @@ -72,6 +73,10 @@ pub enum MempoolSubmissionError {

#[error("the mempool is at capacity")]
CapacityExceeded,

#[error("mempool lock is poisoned")]
#[grpc(internal)]
MempoolPoisoned(#[source] MempoolPoisonError),
}

// Mempool submission conflicts with current state
Expand Down Expand Up @@ -123,6 +128,9 @@ pub enum BuildBatchError {

#[error("batch proof security level is too low: {0} < {1}")]
SecurityLevelTooLow(u32, u32),

#[error("mempool lock is poisoned")]
MempoolPoisoned(#[source] MempoolPoisonError),
}

// Block building errors
Expand All @@ -148,6 +156,9 @@ pub enum BuildBlockError {
#[error("block signature is invalid")]
InvalidSignature,

#[error("mempool lock is poisoned")]
MempoolPoisoned(#[source] MempoolPoisonError),

/// We sometimes randomly inject errors into the batch building process to test our failure
/// responses.

Expand Down
18 changes: 12 additions & 6 deletions crates/block-producer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,16 @@
//! transactions even if the store and block producer momentarily disagree on the chain tip.
use std::collections::{HashSet, VecDeque};
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::{Arc, LockResult, Mutex, MutexGuard};

use miden_node_proto::domain::mempool::MempoolEvent;
use miden_node_utils::ErrorReport;
use miden_protocol::batch::{BatchId, ProvenBatch};
use miden_protocol::block::{BlockHeader, BlockNumber};
use miden_protocol::transaction::{TransactionHeader, TransactionId};
use subscription::SubscriptionProvider;
use tokio::sync::{Mutex, MutexGuard, mpsc};
use thiserror::Error;
use tokio::sync::mpsc;
use tracing::instrument;

use crate::block_builder::SelectedBlock;
Expand Down Expand Up @@ -90,6 +91,10 @@ mod tests;
#[derive(Clone)]
pub struct SharedMempool(Arc<Mutex<Mempool>>);

#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)]
#[error("shared mempool lock is poisoned")]
pub struct MempoolPoisonError;

#[derive(Debug, Clone, PartialEq)]
pub struct MempoolConfig {
/// The constraints each proposed block must adhere to.
Expand Down Expand Up @@ -147,13 +152,14 @@ impl Default for MempoolConfig {
// ================================================================================================

impl SharedMempool {
/// Acquires an asynchronous lock on the underlying [`Mempool`].
/// Acquires a lock on the underlying [`Mempool`].
///
/// Callers should minimise the amount of work performed while holding the lock to reduce
/// contention with other subsystems that need to access the pool.
#[instrument(target = COMPONENT, name = "mempool.lock", skip_all)]
pub async fn lock(&self) -> MutexGuard<'_, Mempool> {
self.0.lock().await
#[instrument(target = COMPONENT, name = "mempool.lock", skip_all, err)]
pub fn lock(&self) -> Result<MutexGuard<'_, Mempool>, MempoolPoisonError> {
let result: LockResult<MutexGuard<'_, Mempool>> = self.0.lock();
result.map_err(|_| MempoolPoisonError)
}
}

Expand Down
14 changes: 14 additions & 0 deletions crates/block-producer/src/mempool/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ use crate::test_utils::batch::TransactionBatchConstructor;
mod add_transaction;
mod add_user_batch;

#[test]
fn shared_mempool_lock_is_poisoned_after_panic() {
let mempool = Mempool::shared(BlockNumber::GENESIS, MempoolConfig::default());
let poisoned = mempool.clone();

let _ = std::thread::spawn(move || {
let _guard = poisoned.lock().expect("fresh mempool lock should not be poisoned");
panic!("poison shared mempool lock");
})
.join();

assert!(matches!(mempool.lock(), Err(MempoolPoisonError)));
}

impl Mempool {
/// Returns an empty [`Mempool`] and a perfect clone intended for use as the Unit Under Test and
/// the reference instance.
Expand Down
Loading
Loading