diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index 452e5bf00e..b46ada9efa 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -9,7 +9,7 @@ use { contracts::CowSettlementForwarder::CowSettlementForwarder, eth_domain_types::{self as eth, BlockNo, TxId}, ethrpc::block_stream::into_stream, - futures::{FutureExt, StreamExt, future::select_ok}, + futures::{StreamExt, stream::FuturesUnordered}, num::Saturating, thiserror::Error, tracing::Instrument, @@ -63,20 +63,51 @@ impl Mempools { submission_deadline: BlockNo, mode: &SubmissionMode, ) -> Result { - let (submission, _remaining_futures) = select_ok(self.mempools.iter().map(|mempool| { - async move { + // Errors from mempools that have already failed; will be observed once the + // overall outcome is known. + let mut shadowed_errors: Vec<(&infra::Mempool, Error)> = + Vec::with_capacity(self.mempools.len()); + + // Race all mempools; first success wins. Failures overtaken by a later success + // are recorded as `Superseded`. + let mut futures: FuturesUnordered<_> = self + .mempools + .iter() + .map(|mempool| async move { let result = self .submit(mempool, settlement, submission_deadline, mode) .instrument(tracing::info_span!("mempool", kind = mempool.to_string())) .await; - observe::mempool_executed(mempool, settlement, &result); - result + (mempool, result) + }) + .collect(); + + while let Some((mempool, result)) = futures.next().await { + match result { + Ok(submission) => { + // First success wins: record this mempool as the winner and label any + // already-failed mempools as `Superseded`. Remaining in-flight futures are + // dropped here without awaiting. (their outcomes are never logged and emit no + // metrics) + observe::mempool_succeeded(mempool, settlement, &submission); + for (shadowed_mempool, err) in &shadowed_errors { + observe::mempool_superseded(shadowed_mempool, mempool, settlement, err); + } + return Ok(submission.tx_hash); + } + Err(err) => shadowed_errors.push((mempool, err)), } - .boxed() - })) - .await?; + } - Ok(submission.tx_hash) + // All mempools failed: observe each with its real error label and return the + // last error as the overall failure. + for (mempool, err) in &shadowed_errors { + observe::mempool_failed(mempool, settlement, err); + } + let (_last_mempool, last_err) = shadowed_errors + .pop() + .expect("Mempools::try_new guarantees a non-empty mempool list"); + Err(last_err) } /// Defines if the mempools are configured in a way that guarantees that diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index 6a69180f6c..44c9485472 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -22,7 +22,7 @@ use { infra::solver, util::http, }, - eth_domain_types::{self as eth, Gas}, + eth_domain_types::{self as eth, BlockNo, Gas}, ethrpc::block_stream::BlockInfo, num::Saturating, std::{ @@ -360,28 +360,42 @@ pub fn solver_response( .observe(compute_time.as_secs_f64()); } -/// Observe the result of mempool transaction execution. -pub fn mempool_executed( +/// Observe a successful mempool transaction execution. +pub fn mempool_succeeded( mempool: &Mempool, settlement: &Settlement, - res: &Result, + submission: &SubmissionSuccess, ) { - match res { - Ok(submission) => { - tracing::info!( - txid = ?submission.tx_hash, - %mempool, - ?settlement, - "sending transaction via mempool succeeded", - ); - } - Err(mempools::Error::Disabled) => { + tracing::info!( + txid = ?submission.tx_hash, + %mempool, + ?settlement, + "sending transaction via mempool succeeded", + ); + metrics::get() + .mempool_submission + .with_label_values(&[mempool.to_string().as_str(), "Success"]) + .inc(); + let blocks_passed = submission + .included_in_block + .saturating_sub(submission.submitted_at_block); + metrics::get() + .mempool_submission_results_blocks_passed + .with_label_values(&[mempool.to_string().as_str(), "Success"]) + .inc_by(blocks_passed.0); +} + +/// Observe a failed mempool transaction execution. +pub fn mempool_failed(mempool: &Mempool, settlement: &Settlement, err: &mempools::Error) { + use mempools::Error::*; + match err { + Disabled => { tracing::debug!( %mempool, "sending transaction via mempool disabled", ); } - Err(err) => { + _ => { tracing::warn!( ?err, %mempool, @@ -390,51 +404,63 @@ pub fn mempool_executed( ); } } - let result = match res { - Ok(_) => "Success", - Err(mempools::Error::Revert { .. } | mempools::Error::SimulationRevert { .. }) => "Revert", - Err(mempools::Error::Expired { .. }) => "Expired", - Err(mempools::Error::Other(_)) => "Other", - Err(mempools::Error::Disabled) => "Disabled", + let label = match err { + Revert { .. } | SimulationRevert { .. } => "Revert", + Expired { .. } => "Expired", + Other(_) => "Other", + Disabled => "Disabled", }; metrics::get() .mempool_submission - .with_label_values(&[mempool.to_string().as_str(), result]) + .with_label_values(&[mempool.to_string().as_str(), label]) .inc(); // For some of the errors we are interested in observing the exact block numbers // passed since the first submission. - let blocks_passed = match res { - Ok(SubmissionSuccess { + let (BlockNo(start), BlockNo(end)) = match err { + Revert { submitted_at_block, - included_in_block, + reverted_at_block: end, .. - }) => Some(("Success", submitted_at_block, included_in_block)), - Err(mempools::Error::Revert { - tx_id: _, - submitted_at_block, - reverted_at_block, - }) => Some(("Revert", submitted_at_block, reverted_at_block)), - Err(mempools::Error::SimulationRevert { + } + | SimulationRevert { submitted_at_block, - reverted_at_block, - }) => Some(("Revert", submitted_at_block, reverted_at_block)), - Err(mempools::Error::Expired { - tx_id: _, + reverted_at_block: end, + } + | Expired { submitted_at_block, - submission_deadline, - }) => Some(("Expired", submitted_at_block, submission_deadline)), - Err(mempools::Error::Other(_)) => None, - Err(mempools::Error::Disabled) => None, + submission_deadline: end, + .. + } => (submitted_at_block, end), + _ => return, }; + metrics::get() + .mempool_submission_results_blocks_passed + .with_label_values(&[mempool.to_string().as_str(), label]) + .inc_by(end.saturating_sub(*start)); +} - if let Some((label, start, end)) = blocks_passed { - let blocks_passed = end.saturating_sub(*start); - metrics::get() - .mempool_submission_results_blocks_passed - .with_label_values(&[mempool.to_string().as_str(), label]) - .inc_by(blocks_passed.0); - } +/// A mempool's submission failed but another mempool succeeded for the +/// same settlement. Recorded under a `Superseded` label so the per-mempool +/// metric can be filtered when computing the overall settlement submission +/// success rate. +pub fn mempool_superseded( + mempool: &Mempool, + superseded_by: &Mempool, + settlement: &Settlement, + err: &mempools::Error, +) { + tracing::debug!( + ?err, + %mempool, + %superseded_by, + ?settlement, + "mempool submission superseded by another mempool", + ); + metrics::get() + .mempool_submission + .with_label_values(&[mempool.to_string().as_str(), "Superseded"]) + .inc(); } /// Observe that an invalid DTO was received.