Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/op-rbuilder/src/pool/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub(super) struct PoolMetrics {
pub presim_duration: Histogram,
/// Number of updates to the tip state for the top of block simulator
pub presim_tip_state_updates: Counter,
/// Number of times the tip state was dropped because no canonical state
/// notification arrived within the staleness window (e.g. during sync).
pub presim_tip_state_evictions: Counter,
/// Number of pending txs evicted due to failing top of block simulation
pub presim_pending_evictions: Counter,
/// Number of presim tasks waiting for a concurrency permit.
Expand Down
102 changes: 89 additions & 13 deletions crates/op-rbuilder/src/pool/presim.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::{num::NonZeroUsize, sync::Arc, time::Instant};

use alloy_consensus::{BlockHeader, Header};
use alloy_evm::{Evm, InvalidTxError};
use alloy_primitives::{Address, B256, Bytes};
Expand All @@ -20,6 +18,11 @@ use revm::{
context::{TxEnv, result::ResultAndState},
context_interface::result::InvalidTransaction,
};
use std::{
num::NonZeroUsize,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -121,7 +124,7 @@ impl TopOfBlockSimulator {
};

let Some(ref tip_state) = *tip_state else {
warn!("tip state for top of block simulator not initialized yet");
debug!("no tip state available for top of block simulator, passing through");
return true;
};

Expand All @@ -131,6 +134,11 @@ impl TopOfBlockSimulator {
pub(crate) fn update_tip(&self, tip_state: TipState) {
*self.tip_state.write() = Arc::new(Some(tip_state));
}

/// Drop any held tip state, releasing the underlying read transaction.
pub(crate) fn clear_tip(&self) {
*self.tip_state.write() = Arc::new(None);
}
}

impl TipState {
Expand Down Expand Up @@ -208,13 +216,48 @@ impl TipState {
}
}

/// If no canonical state notification arrives within this window, drop the
/// held tip state so we don't pin a read transaction open for the sync duration
const STALE_TIP_TIMEOUT: Duration = Duration::from_secs(30);

pub(crate) async fn maintain_tip_state<N, St, Provider>(
simulator: Arc<TopOfBlockSimulator>,
provider: Provider,
evm_config: OpEvmConfig,
block_time_secs: u64,
metrics: Arc<PoolMetrics>,
events: St,
) where
N: NodePrimitives<Block: Block<Header = Header>>,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
Provider: StateProviderFactory
+ ChainSpecProvider<ChainSpec = OpChainSpec>
+ BlockReaderIdExt<Header = Header>
+ Clone
+ Send
+ Sync
+ 'static,
{
maintain_tip_state_with_timeout(
simulator,
provider,
evm_config,
block_time_secs,
metrics,
events,
STALE_TIP_TIMEOUT,
)
.await
}

async fn maintain_tip_state_with_timeout<N, St, Provider>(
simulator: Arc<TopOfBlockSimulator>,
provider: Provider,
evm_config: OpEvmConfig,
block_time_secs: u64,
metrics: Arc<PoolMetrics>,
mut events: St,
stale_timeout: Duration,
) where
N: NodePrimitives<Block: Block<Header = Header>>,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
Expand All @@ -226,18 +269,36 @@ pub(crate) async fn maintain_tip_state<N, St, Provider>(
+ Sync
+ 'static,
{
let mut evicted = false;
loop {
let Some(event) = events.next().await else {
break;
};

match TipState::create(&provider, evm_config.clone(), block_time_secs, event.tip()) {
Ok(tip_state) => {
simulator.update_tip(tip_state);
metrics.presim_tip_state_updates.increment(1);
match tokio::time::timeout(stale_timeout, events.next()).await {
Ok(Some(event)) => {
if evicted {
debug!("resumed receiving canon state notifications");
evicted = false;
}
match TipState::create(&provider, evm_config.clone(), block_time_secs, event.tip())
{
Ok(tip_state) => {
simulator.update_tip(tip_state);
metrics.presim_tip_state_updates.increment(1);
}
Err(e) => {
warn!(error = %e, "failed to create tip state for pre-simulation");
}
}
}
Err(e) => {
warn!(error = %e, "failed to create tip state for pre-simulation");
Ok(None) => break,
Err(_) => {
if !evicted {
simulator.clear_tip();
metrics.presim_tip_state_evictions.increment(1);
warn!(
timeout_secs = stale_timeout.as_secs(),
"no canon state notification received, dropping presim tip state"
);
evicted = true;
}
}
}
}
Expand Down Expand Up @@ -467,6 +528,21 @@ mod tests {
assert!(!simulator.simulate_tx_sync(reverting_create(&signer, 0)));
}

#[test]
fn clear_tip_drops_state_provider() {
let simulator = TopOfBlockSimulator::new_for_test();
let signer = PrivateKeySigner::random();
let provider = funded_provider(signer.address());

simulator.update_tip(tip_state_with_provider(provider));
assert!(!simulator.simulate_tx_sync(reverting_create(&signer, 0)));

// After clear: back to pass-through behavior;
// state provider no longer held
simulator.clear_tip();
assert!(simulator.simulate_tx_sync(reverting_create(&signer, 0)));
}

#[test]
fn simulations_are_isolated() {
let signer = PrivateKeySigner::random();
Expand Down
Loading