From 2dfd9dae3de2de105b73dd031f7638f417f85c7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Tue, 5 May 2026 16:36:29 -0300 Subject: [PATCH 1/7] Track superseded mempool errors separately When `Mempools::execute()` runs mempools in parallel, errors from mempools whose results were discarded after another mempool succeeded were still recorded against `driver_mempool_submission`, biasing the per-mempool success ratio with timing-dependent shadowed failures. Replace `select_ok` with `FuturesUnordered` + manual loop so observation runs in the consuming context. Errors that occur before another mempool succeeds are now recorded under a new `Superseded` label via `observe::mempool_superseded`, which also records the winning mempool in the trace fields. Errors in the all-failed case keep their existing labels (Revert / Expired / Other / Disabled). Alert query update needed when deploying: sum by (network) (increase(driver_mempool_submission{cow_fi_environment="prod",result="Success"}[2h])) / sum by (network) (increase(driver_mempool_submission{cow_fi_environment="prod",result!~"Disabled|Superseded"}[2h])) < 0.6 --- crates/driver/src/domain/mempools.rs | 52 +++++++++++++++++++------- crates/driver/src/infra/observe/mod.rs | 25 ++++++++++++- 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index 452e5bf00e..1145b5f3cb 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -9,7 +9,7 @@ use { contracts::CowSettlementForwarder::CowSettlementForwarder, eth_domain_types::{self as eth, BlockNo, TxId}, ethrpc::block_stream::into_stream, - futures::{FutureExt, StreamExt, future::select_ok}, + futures::{FutureExt, StreamExt, stream::FuturesUnordered}, num::Saturating, thiserror::Error, tracing::Instrument, @@ -63,20 +63,46 @@ impl Mempools { submission_deadline: BlockNo, mode: &SubmissionMode, ) -> Result { - let (submission, _remaining_futures) = select_ok(self.mempools.iter().map(|mempool| { - async move { - let result = self - .submit(mempool, settlement, submission_deadline, mode) - .instrument(tracing::info_span!("mempool", kind = mempool.to_string())) - .await; - observe::mempool_executed(mempool, settlement, &result); - result + let mut futures: FuturesUnordered<_> = self + .mempools + .iter() + .map(|mempool| { + async move { + let result = self + .submit(mempool, settlement, submission_deadline, mode) + .instrument(tracing::info_span!("mempool", kind = mempool.to_string())) + .await; + (mempool, result) + } + .boxed() + }) + .collect(); + + let mut shadowed_errors: Vec<(&infra::Mempool, Error)> = Vec::new(); + while let Some((mempool, result)) = futures.next().await { + match result { + Ok(submission) => { + let tx_hash = submission.tx_hash; + observe::mempool_executed(mempool, settlement, Ok(&submission)); + for (shadowed_mempool, err) in &shadowed_errors { + observe::mempool_superseded(shadowed_mempool, mempool, settlement, err); + } + return Ok(tx_hash); + } + Err(err) => shadowed_errors.push((mempool, err)), } - .boxed() - })) - .await?; + } - Ok(submission.tx_hash) + // All mempools failed: observe each with its real error label and return + // the last error as the overall failure. + let (last_mempool, last_err) = shadowed_errors + .pop() + .expect("Mempools::try_new guarantees a non-empty mempool list"); + for (mempool, err) in &shadowed_errors { + observe::mempool_executed(mempool, settlement, Err(err)); + } + observe::mempool_executed(last_mempool, settlement, Err(&last_err)); + Err(last_err) } /// Defines if the mempools are configured in a way that guarantees that diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index 6a69180f6c..719961b1bc 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -364,7 +364,7 @@ pub fn solver_response( pub fn mempool_executed( mempool: &Mempool, settlement: &Settlement, - res: &Result, + res: Result<&SubmissionSuccess, &mempools::Error>, ) { match res { Ok(submission) => { @@ -437,6 +437,29 @@ pub fn mempool_executed( } } +/// Observe that a mempool's submission failed but another mempool succeeded +/// for the same settlement. Recorded under a distinct `Superseded` label so +/// that the per-mempool metric can be filtered when computing the overall +/// settlement submission success rate. +pub fn mempool_superseded( + mempool: &Mempool, + superseded_by: &Mempool, + settlement: &Settlement, + err: &mempools::Error, +) { + tracing::debug!( + ?err, + %mempool, + %superseded_by, + ?settlement, + "mempool submission superseded by another mempool", + ); + metrics::get() + .mempool_submission + .with_label_values(&[mempool.to_string().as_str(), "Superseded"]) + .inc(); +} + /// Observe that an invalid DTO was received. pub fn invalid_dto(err: &impl std::error::Error, dto: &str) { tracing::warn!(?err, ?dto, "received invalid dto"); From fe5207d34d1e8ca3313f5fbac9f8a9794de97568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Wed, 6 May 2026 09:41:32 -0300 Subject: [PATCH 2/7] Add comments to mempool race logic --- crates/driver/src/domain/mempools.rs | 4 ++++ crates/driver/src/infra/observe/mod.rs | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index 1145b5f3cb..946b489444 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -63,6 +63,8 @@ impl Mempools { submission_deadline: BlockNo, mode: &SubmissionMode, ) -> Result { + // Race all mempools; first success wins. Failures overtaken by a + // later success are recorded as `Superseded`. let mut futures: FuturesUnordered<_> = self .mempools .iter() @@ -78,6 +80,8 @@ impl Mempools { }) .collect(); + // Errors from mempools that have already failed; observed once the + // overall outcome is known. let mut shadowed_errors: Vec<(&infra::Mempool, Error)> = Vec::new(); while let Some((mempool, result)) = futures.next().await { match result { diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index 719961b1bc..0dfc4128ef 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -437,10 +437,10 @@ pub fn mempool_executed( } } -/// Observe that a mempool's submission failed but another mempool succeeded -/// for the same settlement. Recorded under a distinct `Superseded` label so -/// that the per-mempool metric can be filtered when computing the overall -/// settlement submission success rate. +/// A mempool's submission failed but another mempool succeeded for the +/// same settlement. Recorded under a `Superseded` label so the per-mempool +/// metric can be filtered when computing the overall settlement submission +/// success rate. pub fn mempool_superseded( mempool: &Mempool, superseded_by: &Mempool, From 844a1564504397ab963fdaa2cab16ee56b449179 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Wed, 6 May 2026 09:45:18 -0300 Subject: [PATCH 3/7] minor adjustments --- crates/driver/src/domain/mempools.rs | 43 +++++++++++++--------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index 946b489444..52a98f91c2 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -9,7 +9,7 @@ use { contracts::CowSettlementForwarder::CowSettlementForwarder, eth_domain_types::{self as eth, BlockNo, TxId}, ethrpc::block_stream::into_stream, - futures::{FutureExt, StreamExt, stream::FuturesUnordered}, + futures::{StreamExt, stream::FuturesUnordered}, num::Saturating, thiserror::Error, tracing::Instrument, @@ -63,49 +63,46 @@ impl Mempools { submission_deadline: BlockNo, mode: &SubmissionMode, ) -> Result { - // Race all mempools; first success wins. Failures overtaken by a - // later success are recorded as `Superseded`. + // Errors from mempools that have already failed; will be observed once the + // overall outcome is known. + let mut shadowed_errors: Vec<(&infra::Mempool, Error)> = + Vec::with_capacity(self.mempools.len()); + + // Race all mempools; first success wins. Failures overtaken by a later success + // are recorded as `Superseded`. let mut futures: FuturesUnordered<_> = self .mempools .iter() - .map(|mempool| { - async move { - let result = self - .submit(mempool, settlement, submission_deadline, mode) - .instrument(tracing::info_span!("mempool", kind = mempool.to_string())) - .await; - (mempool, result) - } - .boxed() + .map(|mempool| async move { + let result = self + .submit(mempool, settlement, submission_deadline, mode) + .instrument(tracing::info_span!("mempool", kind = mempool.to_string())) + .await; + (mempool, result) }) .collect(); - // Errors from mempools that have already failed; observed once the - // overall outcome is known. - let mut shadowed_errors: Vec<(&infra::Mempool, Error)> = Vec::new(); while let Some((mempool, result)) = futures.next().await { match result { Ok(submission) => { - let tx_hash = submission.tx_hash; observe::mempool_executed(mempool, settlement, Ok(&submission)); for (shadowed_mempool, err) in &shadowed_errors { observe::mempool_superseded(shadowed_mempool, mempool, settlement, err); } - return Ok(tx_hash); + return Ok(submission.tx_hash); } Err(err) => shadowed_errors.push((mempool, err)), } } - // All mempools failed: observe each with its real error label and return - // the last error as the overall failure. - let (last_mempool, last_err) = shadowed_errors - .pop() - .expect("Mempools::try_new guarantees a non-empty mempool list"); + // All mempools failed: observe each with its real error label and return the + // last error as the overall failure. for (mempool, err) in &shadowed_errors { observe::mempool_executed(mempool, settlement, Err(err)); } - observe::mempool_executed(last_mempool, settlement, Err(&last_err)); + let (_last_mempool, last_err) = shadowed_errors + .pop() + .expect("Mempools::try_new guarantees a non-empty mempool list"); Err(last_err) } From 00796db611c1b656cdd5fc34ad1363de65f56817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Wed, 6 May 2026 10:13:33 -0300 Subject: [PATCH 4/7] Split mempool_executed into success and failure observers `mempool_executed` took a `Result<&SubmissionSuccess, &mempools::Error>` and re-matched the same discriminant several times to pick the log level, metric label, and block-passed labels. Replace it with two functions, `mempool_succeeded(&SubmissionSuccess)` and `mempool_failed(&mempools::Error)`, so each branch is straight-line and call sites pick the correct observer directly. Behavior and emitted metrics are unchanged. --- crates/driver/src/domain/mempools.rs | 4 +- crates/driver/src/infra/observe/mod.rs | 104 +++++++++++++------------ 2 files changed, 55 insertions(+), 53 deletions(-) diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index 52a98f91c2..f46a58ac7c 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -85,7 +85,7 @@ impl Mempools { while let Some((mempool, result)) = futures.next().await { match result { Ok(submission) => { - observe::mempool_executed(mempool, settlement, Ok(&submission)); + observe::mempool_succeeded(mempool, settlement, &submission); for (shadowed_mempool, err) in &shadowed_errors { observe::mempool_superseded(shadowed_mempool, mempool, settlement, err); } @@ -98,7 +98,7 @@ impl Mempools { // All mempools failed: observe each with its real error label and return the // last error as the overall failure. for (mempool, err) in &shadowed_errors { - observe::mempool_executed(mempool, settlement, Err(err)); + observe::mempool_failed(mempool, settlement, err); } let (_last_mempool, last_err) = shadowed_errors .pop() diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index 0dfc4128ef..733c44f71d 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -10,9 +10,7 @@ use { domain::{ Liquidity, competition::{ - self, - Solution, - Solved, + self, Solution, Solved, solution::{self, Settlement}, }, mempools::{self, SubmissionSuccess}, @@ -22,7 +20,7 @@ use { infra::solver, util::http, }, - eth_domain_types::{self as eth, Gas}, + eth_domain_types::{self as eth, BlockNo, Gas}, ethrpc::block_stream::BlockInfo, num::Saturating, std::{ @@ -360,28 +358,42 @@ pub fn solver_response( .observe(compute_time.as_secs_f64()); } -/// Observe the result of mempool transaction execution. -pub fn mempool_executed( +/// Observe a successful mempool transaction execution. +pub fn mempool_succeeded( mempool: &Mempool, settlement: &Settlement, - res: Result<&SubmissionSuccess, &mempools::Error>, + submission: &SubmissionSuccess, ) { - match res { - Ok(submission) => { - tracing::info!( - txid = ?submission.tx_hash, - %mempool, - ?settlement, - "sending transaction via mempool succeeded", - ); - } - Err(mempools::Error::Disabled) => { + tracing::info!( + txid = ?submission.tx_hash, + %mempool, + ?settlement, + "sending transaction via mempool succeeded", + ); + metrics::get() + .mempool_submission + .with_label_values(&[mempool.to_string().as_str(), "Success"]) + .inc(); + let blocks_passed = submission + .included_in_block + .saturating_sub(submission.submitted_at_block); + metrics::get() + .mempool_submission_results_blocks_passed + .with_label_values(&[mempool.to_string().as_str(), "Success"]) + .inc_by(blocks_passed.0); +} + +/// Observe a failed mempool transaction execution. +pub fn mempool_failed(mempool: &Mempool, settlement: &Settlement, err: &mempools::Error) { + use mempools::Error::*; + match err { + Disabled => { tracing::debug!( %mempool, "sending transaction via mempool disabled", ); } - Err(err) => { + _ => { tracing::warn!( ?err, %mempool, @@ -390,51 +402,41 @@ pub fn mempool_executed( ); } } - let result = match res { - Ok(_) => "Success", - Err(mempools::Error::Revert { .. } | mempools::Error::SimulationRevert { .. }) => "Revert", - Err(mempools::Error::Expired { .. }) => "Expired", - Err(mempools::Error::Other(_)) => "Other", - Err(mempools::Error::Disabled) => "Disabled", + let label = match err { + Revert { .. } | SimulationRevert { .. } => "Revert", + Expired { .. } => "Expired", + Other(_) => "Other", + Disabled => "Disabled", }; metrics::get() .mempool_submission - .with_label_values(&[mempool.to_string().as_str(), result]) + .with_label_values(&[mempool.to_string().as_str(), label]) .inc(); // For some of the errors we are interested in observing the exact block numbers // passed since the first submission. - let blocks_passed = match res { - Ok(SubmissionSuccess { + let (start, end) = match err { + Revert { submitted_at_block, - included_in_block, + reverted_at_block: end, .. - }) => Some(("Success", submitted_at_block, included_in_block)), - Err(mempools::Error::Revert { - tx_id: _, - submitted_at_block, - reverted_at_block, - }) => Some(("Revert", submitted_at_block, reverted_at_block)), - Err(mempools::Error::SimulationRevert { + } + | SimulationRevert { submitted_at_block, - reverted_at_block, - }) => Some(("Revert", submitted_at_block, reverted_at_block)), - Err(mempools::Error::Expired { - tx_id: _, + reverted_at_block: end, + } + | Expired { submitted_at_block, - submission_deadline, - }) => Some(("Expired", submitted_at_block, submission_deadline)), - Err(mempools::Error::Other(_)) => None, - Err(mempools::Error::Disabled) => None, + submission_deadline: end, + .. + } => (submitted_at_block, end), + _ => return, }; - - if let Some((label, start, end)) = blocks_passed { - let blocks_passed = end.saturating_sub(*start); - metrics::get() - .mempool_submission_results_blocks_passed - .with_label_values(&[mempool.to_string().as_str(), label]) - .inc_by(blocks_passed.0); - } + let BlockNo(blocks_passed) = end.saturating_sub(*start); + metrics::get() + .mempool_submission_results_blocks_passed + .with_label_values(&[mempool.to_string().as_str(), label]) + .inc_by(blocks_passed); } /// A mempool's submission failed but another mempool succeeded for the From d4d060d250be028233231860b4ee77c727e11c61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Wed, 6 May 2026 10:18:51 -0300 Subject: [PATCH 5/7] minor adjustments --- crates/driver/src/infra/observe/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index 733c44f71d..86be19fa2e 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -415,7 +415,7 @@ pub fn mempool_failed(mempool: &Mempool, settlement: &Settlement, err: &mempools // For some of the errors we are interested in observing the exact block numbers // passed since the first submission. - let (start, end) = match err { + let (BlockNo(start), BlockNo(end)) = match err { Revert { submitted_at_block, reverted_at_block: end, @@ -432,11 +432,10 @@ pub fn mempool_failed(mempool: &Mempool, settlement: &Settlement, err: &mempools } => (submitted_at_block, end), _ => return, }; - let BlockNo(blocks_passed) = end.saturating_sub(*start); metrics::get() .mempool_submission_results_blocks_passed .with_label_values(&[mempool.to_string().as_str(), label]) - .inc_by(blocks_passed); + .inc_by(end.saturating_sub(*start)); } /// A mempool's submission failed but another mempool succeeded for the From 0ed175323a84ade9cfade53ccd463e8a90d07935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Wed, 6 May 2026 10:22:35 -0300 Subject: [PATCH 6/7] fmt --- crates/driver/src/infra/observe/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index 86be19fa2e..44c9485472 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -10,7 +10,9 @@ use { domain::{ Liquidity, competition::{ - self, Solution, Solved, + self, + Solution, + Solved, solution::{self, Settlement}, }, mempools::{self, SubmissionSuccess}, From d9fb0cb9bd48d67c95afbc3fc1db414d3daae882 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tiago=20Guimar=C3=A3es?= Date: Wed, 6 May 2026 12:37:56 -0300 Subject: [PATCH 7/7] Document dropped futures in mempool race --- crates/driver/src/domain/mempools.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index f46a58ac7c..b46ada9efa 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -85,6 +85,10 @@ impl Mempools { while let Some((mempool, result)) = futures.next().await { match result { Ok(submission) => { + // First success wins: record this mempool as the winner and label any + // already-failed mempools as `Superseded`. Remaining in-flight futures are + // dropped here without awaiting. (their outcomes are never logged and emit no + // metrics) observe::mempool_succeeded(mempool, settlement, &submission); for (shadowed_mempool, err) in &shadowed_errors { observe::mempool_superseded(shadowed_mempool, mempool, settlement, err);