Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions crates/op-rbuilder/src/builder/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,15 @@ impl PayloadJobCancellation {
}

/// Future that resolves when cancelled (any reason).
pub(crate) fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> {
pub(crate) fn wait_for_cancellation(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> {
self.token.cancelled()
}

/// Returns true if this job has been cancelled for any reason.
pub(crate) fn is_cancelled(&self) -> bool {
self.token.is_cancelled()
}

/// Returns the underlying token.
/// Passed to blocking tasks and the scheduler.
pub(crate) fn token(&self) -> CancellationToken {
Expand Down Expand Up @@ -145,24 +150,28 @@ mod tests {
let cancel = PayloadJobCancellation::new();
cancel.cancel_deadline();
assert!(cancel.token().is_cancelled());
assert!(cancel.is_cancelled());
assert!(!cancel.is_new_fcu());
assert!(!cancel.is_resolved());
assert_eq!(cancel.reason(), Some(CancellationReason::Deadline));
}

#[tokio::test]
async fn test_awaitable() {
async fn test_wait_for_cancellation() {
let cancel = PayloadJobCancellation::new();
let token = cancel.token();
let cancel_to_fire = cancel.clone();

tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
cancel.cancel_resolved();
cancel_to_fire.cancel_resolved();
});

timeout(Duration::from_millis(100), token.cancelled())
timeout(Duration::from_millis(100), cancel.wait_for_cancellation())
.await
.expect("token should fire when resolved fires");

assert!(cancel.is_cancelled());
assert_eq!(cancel.reason(), Some(CancellationReason::Resolved));
}

#[tokio::test]
Expand All @@ -178,6 +187,7 @@ mod tests {
#[tokio::test]
async fn test_reason_none_when_not_cancelled() {
let cancel = PayloadJobCancellation::new();
assert!(!cancel.is_cancelled());
assert_eq!(cancel.reason(), None);
}

Expand Down
199 changes: 135 additions & 64 deletions crates/op-rbuilder/src/builder/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
BuilderConfig,
best_txs::{FlashblockPoolTxCursor, FlashblockTxTracker},
builder_tx::{BuilderTransactions, reserve_builder_tx_budget},
cancellation::FlashblockJobCancellation,
cancellation::{FlashblockJobCancellation, PayloadJobCancellation},
context::{OpPayloadBuilderCtx, OpPayloadJobCtx},
generator::{BuildArguments, PayloadBuilder},
timing::{FlashblockScheduler, compute_slot_offset_ms},
Expand Down Expand Up @@ -39,7 +39,11 @@ use reth_revm::{
use reth_tasks::Runtime;
use reth_transaction_pool::TransactionPool;
use revm::Database;
use std::{ops::Deref, sync::Arc, time::Instant};
use std::{
ops::Deref,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{mpsc, watch};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, metadata::Level, span, warn};
Expand Down Expand Up @@ -94,7 +98,7 @@ struct FallbackBuildOutput<Cache, Transition> {

struct FlashblockBuildOutput<Cache, Transition> {
ctx: OpPayloadJobCtx,
build_result: eyre::Result<Option<(FlashblocksState, OpBuiltPayload)>>,
build_result: eyre::Result<Option<BuiltFlashblockOutput>>,
cache: Cache,
transition: Transition,
tx_tracker: FlashblockTxTracker,
Expand All @@ -103,6 +107,13 @@ struct FlashblockBuildOutput<Cache, Transition> {
state_root_calc: StateRootCalculator,
}

struct BuiltFlashblockOutput {
next_flashblock_state: FlashblocksState,
new_payload: OpBuiltPayload,
fb_payload: OpFlashblockPayload,
build_duration: Duration,
}

impl FlashblocksState {
fn new(target_flashblock_count: u64) -> Self {
Self {
Expand Down Expand Up @@ -489,17 +500,13 @@ where
fb_state = returned_fb_state;
state_root_calc = returned_state_root_calc;

self.built_fb_payload_tx
.try_send(payload.clone())
.map_err(PayloadBuilderError::other)?;
if let Err(e) = self.built_payload_tx.try_send(payload.clone()) {
warn!(
target: "payload_builder",
error = %e,
"Failed to send updated payload"
);
if payload_cancel.is_cancelled() {
Self::record_cancellation_reason(&self.builder_ctx.metrics, &payload_cancel, &span);
return Ok(());
}
best_payload_tx.send_replace(Some(payload));

best_payload_tx.send_replace(Some(payload.clone()));
self.notify_built_payload(payload);

info!(
target: "payload_builder",
Expand Down Expand Up @@ -652,7 +659,7 @@ where
let new_fb_cancel = tokio::select! {
// ensures cancellation is checked before trigger.
biased;
_ = payload_cancel.cancelled() => {
_ = payload_cancel.wait_for_cancellation() => {
Self::record_cancellation_reason(&self.builder_ctx.metrics, &payload_cancel, &span);
self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span);
return Ok(());
Expand Down Expand Up @@ -697,7 +704,7 @@ where
// is dropped (the thread finishes but the oneshot result is discarded).
let build_output = tokio::select! {
biased;
_ = payload_cancel.cancelled() => {
_ = payload_cancel.wait_for_cancellation() => {
if payload_cancel.is_resolved() {
// Suppressed flashblock: we received getResolve during flashblock building
self.builder_ctx.metrics.flashblock_publish_suppressed_total.increment(1);
Expand Down Expand Up @@ -797,8 +804,28 @@ where
}

let next_flashblock_state = match build_result {
Ok(Some((next_flashblock_state, new_payload))) => {
best_payload_tx.send_replace(Some(new_payload));
Ok(Some(built_flashblock)) => {
let Some(next_flashblock_state) = self
.publish_flashblock_payload(
&ctx,
&best_payload_tx,
&fb_state,
&payload_cancel,
&span,
built_flashblock,
)
.map_err(|e| PayloadBuilderError::Other(e.into()))?
else {
self.record_flashblocks_metrics(
&ctx,
&fb_state,
&info,
fb_state.target_flashblock_count(),
&span,
);
return Ok(());
};

next_flashblock_state
}
Ok(None) => {
Expand Down Expand Up @@ -900,6 +927,87 @@ where
})
}

fn notify_built_payload(&self, payload: OpBuiltPayload) {
if let Err(e) = self.built_fb_payload_tx.try_send(payload.clone()) {
warn!(
target: "payload_builder",
error = %e,
"Failed to send built flashblock payload to handler"
);
}

if let Err(e) = self.built_payload_tx.try_send(payload) {
warn!(
target: "payload_builder",
error = %e,
"Failed to send updated payload"
);
}
}

fn publish_flashblock_payload(
&self,
ctx: &OpPayloadJobCtx,
best_payload_tx: &watch::Sender<Option<OpBuiltPayload>>,
fb_state: &FlashblocksState,
payload_cancel: &PayloadJobCancellation,
span: &tracing::Span,
built_flashblock: BuiltFlashblockOutput,
) -> eyre::Result<Option<FlashblocksState>> {
let BuiltFlashblockOutput {
next_flashblock_state,
new_payload,
fb_payload,
build_duration,
} = built_flashblock;

if payload_cancel.is_cancelled() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we checking for cancellation just before we call this publish function? So this seems unnecessary here

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we check is_resolved but is_cancelled() can also be true if deadline / error is reached. but yes I believe it can be refactored better

if payload_cancel.is_resolved() {
ctx.metrics.flashblock_publish_suppressed_total.increment(1);
}
Self::record_cancellation_reason(&self.builder_ctx.metrics, payload_cancel, span);
return Ok(None);
}

// After this point, all side effects are synchronous. If cancellation wins the race after
// this check, still publish the local payload so getPayload can include this flashblock.
let flashblock_byte_size = self
.ws_pub
.publish(&fb_payload)
.wrap_err("failed to publish flashblock via websocket")?;
let flashblock_tx_count = fb_payload.raw_transactions().len();

best_payload_tx.send_replace(Some(new_payload.clone()));
self.notify_built_payload(new_payload);

let slot_offset_ms =
compute_slot_offset_ms(ctx.attributes().timestamp(), self.config.block_time);
record_flashblock_publish_timing(fb_state.flashblock_index(), slot_offset_ms);

if self.config.enable_tx_tracking_debug_logs {
debug!(
target: "tx_trace",
payload_id = %ctx.payload_id(),
block_number = ctx.block_number(),
flashblock_index = fb_state.flashblock_index(),
byte_size = flashblock_byte_size,
total_txs = flashblock_tx_count,
slot_offset_ms,
stage = "fb_published"
);
}

ctx.metrics.flashblock_build_duration.record(build_duration);
ctx.metrics
.flashblock_byte_size_histogram
.record(flashblock_byte_size as f64);
ctx.metrics
.flashblock_num_tx_histogram
.record(flashblock_tx_count as f64);

Ok(Some(next_flashblock_state))
}

#[expect(clippy::too_many_arguments)]
fn build_next_flashblock<
'a,
Expand All @@ -915,7 +1023,7 @@ where
best_txs: &mut NextFlashblockPoolTxCursor<'a, Pool>,
block_cancel: &CancellationToken,
state_root_calc: &mut StateRootCalculator,
) -> eyre::Result<Option<(FlashblocksState, OpBuiltPayload)>> {
) -> eyre::Result<Option<BuiltFlashblockOutput>> {
let flashblock_index = fb_state.flashblock_index();
let mut target_gas_for_batch = fb_state.target_gas_for_batch();
let mut target_da_for_batch = fb_state.target_da_for_batch();
Expand Down Expand Up @@ -1056,67 +1164,30 @@ where
fb_payload.index = flashblock_index;
fb_payload.base = None;

// Block canceled (new FCU, getPayload resolved, or deadline).
// Don't publish to ensures every published flashblock is a subset of the resolved payload.
// Block canceled (new FCU, getPayload resolved, or deadline). The async outer
// loop owns publishing and re-checks cancellation before every side effect.
if block_cancel.is_cancelled() {
return Ok(None);
}
let flashblock_byte_size = self
.ws_pub
.publish(&fb_payload)
.wrap_err("failed to publish flashblock via websocket")?;

// Record slot-relative publish timing (ms since slot start)
let slot_offset_ms =
compute_slot_offset_ms(ctx.attributes().timestamp(), self.config.block_time);
record_flashblock_publish_timing(flashblock_index, slot_offset_ms);

if self.config.enable_tx_tracking_debug_logs {
debug!(
target: "tx_trace",
payload_id = %ctx.payload_id(),
block_number = ctx.block_number(),
flashblock_index,
byte_size = flashblock_byte_size,
total_txs = info.executed_transactions.len(),
slot_offset_ms,
stage = "fb_published"
);
}
self.built_fb_payload_tx
.try_send(new_payload.clone())
.wrap_err("failed to send built payload to handler")?;
if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) {
warn!(
target: "payload_builder",
error = %e,
"Failed to send updated payload"
);
}
// Record flashblock build duration
ctx.metrics
.flashblock_build_duration
.record(flashblock_build_start_time.elapsed());
ctx.metrics
.flashblock_byte_size_histogram
.record(flashblock_byte_size as f64);
ctx.metrics
.flashblock_num_tx_histogram
.record(info.executed_transactions.len() as f64);

// Advance batch budgets for the next flashblock.
let next_flashblock_state =
fb_state.next_after_seal(target_da_for_batch, target_da_footprint_for_batch);

Ok(Some((next_flashblock_state, new_payload)))
Ok(Some(BuiltFlashblockOutput {
next_flashblock_state,
new_payload,
fb_payload,
build_duration: flashblock_build_start_time.elapsed(),
}))
}
}
}

/// Records cancellation reason for observability.
fn record_cancellation_reason(
metrics: &OpRBuilderMetrics,
cancellation: &super::cancellation::PayloadJobCancellation,
cancellation: &PayloadJobCancellation,
span: &tracing::Span,
) {
let reason_str = match cancellation.reason() {
Expand Down
2 changes: 1 addition & 1 deletion crates/op-rbuilder/src/builder/timing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl FlashblockScheduler {
return;
}
}
_ = block_cancel.cancelled() => {
_ = block_cancel.wait_for_cancellation() => {
warn!(
target: "payload_builder",
id = %payload_id,
Expand Down
Loading