Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions crates/driver/src/domain/mempools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
contracts::CowSettlementForwarder::CowSettlementForwarder,
eth_domain_types::{self as eth, BlockNo, TxId},
ethrpc::block_stream::into_stream,
futures::{FutureExt, StreamExt, future::select_ok},
futures::{StreamExt, stream::FuturesUnordered},
num::Saturating,
thiserror::Error,
tracing::Instrument,
Expand Down Expand Up @@ -63,20 +63,51 @@ impl Mempools {
submission_deadline: BlockNo,
mode: &SubmissionMode,
) -> Result<eth::TxId, Error> {
let (submission, _remaining_futures) = select_ok(self.mempools.iter().map(|mempool| {
async move {
// Errors from mempools that have already failed; will be observed once the
// overall outcome is known.
let mut shadowed_errors: Vec<(&infra::Mempool, Error)> =
Vec::with_capacity(self.mempools.len());

// Race all mempools; first success wins. Failures overtaken by a later success
// are recorded as `Superseded`.
let mut futures: FuturesUnordered<_> = self
.mempools
.iter()
.map(|mempool| async move {
let result = self
.submit(mempool, settlement, submission_deadline, mode)
.instrument(tracing::info_span!("mempool", kind = mempool.to_string()))
.await;
observe::mempool_executed(mempool, settlement, &result);
result
(mempool, result)
})
.collect();

while let Some((mempool, result)) = futures.next().await {
match result {
Ok(submission) => {
// First success wins: record this mempool as the winner and label any
// already-failed mempools as `Superseded`. Remaining in-flight futures are
// dropped here without awaiting. (their outcomes are never logged and emit no
// metrics)
observe::mempool_succeeded(mempool, settlement, &submission);
for (shadowed_mempool, err) in &shadowed_errors {
observe::mempool_superseded(shadowed_mempool, mempool, settlement, err);
}
return Ok(submission.tx_hash);
}
Err(err) => shadowed_errors.push((mempool, err)),
}
.boxed()
}))
.await?;
}

Ok(submission.tx_hash)
// All mempools failed: observe each with its real error label and return the
// last error as the overall failure.
for (mempool, err) in &shadowed_errors {
observe::mempool_failed(mempool, settlement, err);
}
let (_last_mempool, last_err) = shadowed_errors
.pop()
.expect("Mempools::try_new guarantees a non-empty mempool list");
Err(last_err)
}

/// Defines if the mempools are configured in a way that guarantees that
Expand Down
120 changes: 73 additions & 47 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {
infra::solver,
util::http,
},
eth_domain_types::{self as eth, Gas},
eth_domain_types::{self as eth, BlockNo, Gas},
ethrpc::block_stream::BlockInfo,
num::Saturating,
std::{
Expand Down Expand Up @@ -360,28 +360,42 @@ pub fn solver_response(
.observe(compute_time.as_secs_f64());
}

/// Observe the result of mempool transaction execution.
pub fn mempool_executed(
/// Observe a successful mempool transaction execution.
pub fn mempool_succeeded(
mempool: &Mempool,
settlement: &Settlement,
res: &Result<SubmissionSuccess, mempools::Error>,
submission: &SubmissionSuccess,
) {
match res {
Ok(submission) => {
tracing::info!(
txid = ?submission.tx_hash,
%mempool,
?settlement,
"sending transaction via mempool succeeded",
);
}
Err(mempools::Error::Disabled) => {
tracing::info!(
txid = ?submission.tx_hash,
%mempool,
?settlement,
"sending transaction via mempool succeeded",
);
metrics::get()
.mempool_submission
.with_label_values(&[mempool.to_string().as_str(), "Success"])
.inc();
let blocks_passed = submission
.included_in_block
.saturating_sub(submission.submitted_at_block);
metrics::get()
.mempool_submission_results_blocks_passed
.with_label_values(&[mempool.to_string().as_str(), "Success"])
.inc_by(blocks_passed.0);
}

/// Observe a failed mempool transaction execution.
pub fn mempool_failed(mempool: &Mempool, settlement: &Settlement, err: &mempools::Error) {
use mempools::Error::*;
match err {
Disabled => {
tracing::debug!(
%mempool,
"sending transaction via mempool disabled",
);
}
Err(err) => {
_ => {
tracing::warn!(
?err,
%mempool,
Expand All @@ -390,51 +404,63 @@ pub fn mempool_executed(
);
}
}
let result = match res {
Ok(_) => "Success",
Err(mempools::Error::Revert { .. } | mempools::Error::SimulationRevert { .. }) => "Revert",
Err(mempools::Error::Expired { .. }) => "Expired",
Err(mempools::Error::Other(_)) => "Other",
Err(mempools::Error::Disabled) => "Disabled",
let label = match err {
Revert { .. } | SimulationRevert { .. } => "Revert",
Expired { .. } => "Expired",
Other(_) => "Other",
Disabled => "Disabled",
};
metrics::get()
.mempool_submission
.with_label_values(&[mempool.to_string().as_str(), result])
.with_label_values(&[mempool.to_string().as_str(), label])
.inc();

// For some of the errors we are interested in observing the exact block numbers
// passed since the first submission.
let blocks_passed = match res {
Ok(SubmissionSuccess {
let (BlockNo(start), BlockNo(end)) = match err {
Revert {
submitted_at_block,
included_in_block,
reverted_at_block: end,
..
}) => Some(("Success", submitted_at_block, included_in_block)),
Err(mempools::Error::Revert {
tx_id: _,
submitted_at_block,
reverted_at_block,
}) => Some(("Revert", submitted_at_block, reverted_at_block)),
Err(mempools::Error::SimulationRevert {
}
| SimulationRevert {
submitted_at_block,
reverted_at_block,
}) => Some(("Revert", submitted_at_block, reverted_at_block)),
Err(mempools::Error::Expired {
tx_id: _,
reverted_at_block: end,
}
| Expired {
submitted_at_block,
submission_deadline,
}) => Some(("Expired", submitted_at_block, submission_deadline)),
Err(mempools::Error::Other(_)) => None,
Err(mempools::Error::Disabled) => None,
submission_deadline: end,
..
} => (submitted_at_block, end),
_ => return,
};
metrics::get()
.mempool_submission_results_blocks_passed
.with_label_values(&[mempool.to_string().as_str(), label])
.inc_by(end.saturating_sub(*start));
}

if let Some((label, start, end)) = blocks_passed {
let blocks_passed = end.saturating_sub(*start);
metrics::get()
.mempool_submission_results_blocks_passed
.with_label_values(&[mempool.to_string().as_str(), label])
.inc_by(blocks_passed.0);
}
/// A mempool's submission failed but another mempool succeeded for the
/// same settlement. Recorded under a `Superseded` label so the per-mempool
/// metric can be filtered when computing the overall settlement submission
/// success rate.
pub fn mempool_superseded(
mempool: &Mempool,
superseded_by: &Mempool,
settlement: &Settlement,
err: &mempools::Error,
) {
tracing::debug!(
?err,
%mempool,
%superseded_by,
?settlement,
"mempool submission superseded by another mempool",
);
metrics::get()
.mempool_submission
.with_label_values(&[mempool.to_string().as_str(), "Superseded"])
.inc();
}

/// Observe that an invalid DTO was received.
Expand Down
Loading