From 58fdb92b8b14f0e178934ddf222bc2f6c9065caa Mon Sep 17 00:00:00 2001 From: avalonche Date: Wed, 20 May 2026 23:55:12 -0700 Subject: [PATCH] fix: don't publish flashblock after payload cancellation --- .../op-rbuilder/src/builder/cancellation.rs | 20 +- crates/op-rbuilder/src/builder/payload.rs | 199 ++++++++++++------ crates/op-rbuilder/src/builder/timing.rs | 2 +- 3 files changed, 151 insertions(+), 70 deletions(-) diff --git a/crates/op-rbuilder/src/builder/cancellation.rs b/crates/op-rbuilder/src/builder/cancellation.rs index 99c0f12b..5fbad091 100644 --- a/crates/op-rbuilder/src/builder/cancellation.rs +++ b/crates/op-rbuilder/src/builder/cancellation.rs @@ -68,10 +68,15 @@ impl PayloadJobCancellation { } /// Future that resolves when cancelled (any reason). - pub(crate) fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> { + pub(crate) fn wait_for_cancellation(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> { self.token.cancelled() } + /// Returns true if this job has been cancelled for any reason. + pub(crate) fn is_cancelled(&self) -> bool { + self.token.is_cancelled() + } + /// Returns the underlying token. /// Passed to blocking tasks and the scheduler. pub(crate) fn token(&self) -> CancellationToken { @@ -145,24 +150,28 @@ mod tests { let cancel = PayloadJobCancellation::new(); cancel.cancel_deadline(); assert!(cancel.token().is_cancelled()); + assert!(cancel.is_cancelled()); assert!(!cancel.is_new_fcu()); assert!(!cancel.is_resolved()); assert_eq!(cancel.reason(), Some(CancellationReason::Deadline)); } #[tokio::test] - async fn test_awaitable() { + async fn test_wait_for_cancellation() { let cancel = PayloadJobCancellation::new(); - let token = cancel.token(); + let cancel_to_fire = cancel.clone(); tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(10)).await; - cancel.cancel_resolved(); + cancel_to_fire.cancel_resolved(); }); - timeout(Duration::from_millis(100), token.cancelled()) + timeout(Duration::from_millis(100), cancel.wait_for_cancellation()) .await .expect("token should fire when resolved fires"); + + assert!(cancel.is_cancelled()); + assert_eq!(cancel.reason(), Some(CancellationReason::Resolved)); } #[tokio::test] @@ -178,6 +187,7 @@ mod tests { #[tokio::test] async fn test_reason_none_when_not_cancelled() { let cancel = PayloadJobCancellation::new(); + assert!(!cancel.is_cancelled()); assert_eq!(cancel.reason(), None); } diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index 35bd3174..f2c5cad9 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -4,7 +4,7 @@ use crate::{ BuilderConfig, best_txs::{FlashblockPoolTxCursor, FlashblockTxTracker}, builder_tx::{BuilderTransactions, reserve_builder_tx_budget}, - cancellation::FlashblockJobCancellation, + cancellation::{FlashblockJobCancellation, PayloadJobCancellation}, context::{OpPayloadBuilderCtx, OpPayloadJobCtx}, generator::{BuildArguments, PayloadBuilder}, timing::{FlashblockScheduler, compute_slot_offset_ms}, @@ -39,7 +39,11 @@ use reth_revm::{ use reth_tasks::Runtime; use reth_transaction_pool::TransactionPool; use revm::Database; -use std::{ops::Deref, sync::Arc, time::Instant}; +use std::{ + ops::Deref, + sync::Arc, + time::{Duration, Instant}, +}; use tokio::sync::{mpsc, watch}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, metadata::Level, span, warn}; @@ -94,7 +98,7 @@ struct FallbackBuildOutput { struct FlashblockBuildOutput { ctx: OpPayloadJobCtx, - build_result: eyre::Result>, + build_result: eyre::Result>, cache: Cache, transition: Transition, tx_tracker: FlashblockTxTracker, @@ -103,6 +107,13 @@ struct FlashblockBuildOutput { state_root_calc: StateRootCalculator, } +struct BuiltFlashblockOutput { + next_flashblock_state: FlashblocksState, + new_payload: OpBuiltPayload, + fb_payload: OpFlashblockPayload, + build_duration: Duration, +} + impl FlashblocksState { fn new(target_flashblock_count: u64) -> Self { Self { @@ -489,17 +500,13 @@ where fb_state = returned_fb_state; state_root_calc = returned_state_root_calc; - self.built_fb_payload_tx - .try_send(payload.clone()) - .map_err(PayloadBuilderError::other)?; - if let Err(e) = self.built_payload_tx.try_send(payload.clone()) { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); + if payload_cancel.is_cancelled() { + Self::record_cancellation_reason(&self.builder_ctx.metrics, &payload_cancel, &span); + return Ok(()); } - best_payload_tx.send_replace(Some(payload)); + + best_payload_tx.send_replace(Some(payload.clone())); + self.notify_built_payload(payload); info!( target: "payload_builder", @@ -652,7 +659,7 @@ where let new_fb_cancel = tokio::select! { // ensures cancellation is checked before trigger. biased; - _ = payload_cancel.cancelled() => { + _ = payload_cancel.wait_for_cancellation() => { Self::record_cancellation_reason(&self.builder_ctx.metrics, &payload_cancel, &span); self.record_flashblocks_metrics(&ctx, &fb_state, &info, target_flashblocks, &span); return Ok(()); @@ -697,7 +704,7 @@ where // is dropped (the thread finishes but the oneshot result is discarded). let build_output = tokio::select! { biased; - _ = payload_cancel.cancelled() => { + _ = payload_cancel.wait_for_cancellation() => { if payload_cancel.is_resolved() { // Suppressed flashblock: we received getResolve during flashblock building self.builder_ctx.metrics.flashblock_publish_suppressed_total.increment(1); @@ -797,8 +804,28 @@ where } let next_flashblock_state = match build_result { - Ok(Some((next_flashblock_state, new_payload))) => { - best_payload_tx.send_replace(Some(new_payload)); + Ok(Some(built_flashblock)) => { + let Some(next_flashblock_state) = self + .publish_flashblock_payload( + &ctx, + &best_payload_tx, + &fb_state, + &payload_cancel, + &span, + built_flashblock, + ) + .map_err(|e| PayloadBuilderError::Other(e.into()))? + else { + self.record_flashblocks_metrics( + &ctx, + &fb_state, + &info, + fb_state.target_flashblock_count(), + &span, + ); + return Ok(()); + }; + next_flashblock_state } Ok(None) => { @@ -900,6 +927,87 @@ where }) } + fn notify_built_payload(&self, payload: OpBuiltPayload) { + if let Err(e) = self.built_fb_payload_tx.try_send(payload.clone()) { + warn!( + target: "payload_builder", + error = %e, + "Failed to send built flashblock payload to handler" + ); + } + + if let Err(e) = self.built_payload_tx.try_send(payload) { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } + } + + fn publish_flashblock_payload( + &self, + ctx: &OpPayloadJobCtx, + best_payload_tx: &watch::Sender>, + fb_state: &FlashblocksState, + payload_cancel: &PayloadJobCancellation, + span: &tracing::Span, + built_flashblock: BuiltFlashblockOutput, + ) -> eyre::Result> { + let BuiltFlashblockOutput { + next_flashblock_state, + new_payload, + fb_payload, + build_duration, + } = built_flashblock; + + if payload_cancel.is_cancelled() { + if payload_cancel.is_resolved() { + ctx.metrics.flashblock_publish_suppressed_total.increment(1); + } + Self::record_cancellation_reason(&self.builder_ctx.metrics, payload_cancel, span); + return Ok(None); + } + + // After this point, all side effects are synchronous. If cancellation wins the race after + // this check, still publish the local payload so getPayload can include this flashblock. + let flashblock_byte_size = self + .ws_pub + .publish(&fb_payload) + .wrap_err("failed to publish flashblock via websocket")?; + let flashblock_tx_count = fb_payload.raw_transactions().len(); + + best_payload_tx.send_replace(Some(new_payload.clone())); + self.notify_built_payload(new_payload); + + let slot_offset_ms = + compute_slot_offset_ms(ctx.attributes().timestamp(), self.config.block_time); + record_flashblock_publish_timing(fb_state.flashblock_index(), slot_offset_ms); + + if self.config.enable_tx_tracking_debug_logs { + debug!( + target: "tx_trace", + payload_id = %ctx.payload_id(), + block_number = ctx.block_number(), + flashblock_index = fb_state.flashblock_index(), + byte_size = flashblock_byte_size, + total_txs = flashblock_tx_count, + slot_offset_ms, + stage = "fb_published" + ); + } + + ctx.metrics.flashblock_build_duration.record(build_duration); + ctx.metrics + .flashblock_byte_size_histogram + .record(flashblock_byte_size as f64); + ctx.metrics + .flashblock_num_tx_histogram + .record(flashblock_tx_count as f64); + + Ok(Some(next_flashblock_state)) + } + #[expect(clippy::too_many_arguments)] fn build_next_flashblock< 'a, @@ -915,7 +1023,7 @@ where best_txs: &mut NextFlashblockPoolTxCursor<'a, Pool>, block_cancel: &CancellationToken, state_root_calc: &mut StateRootCalculator, - ) -> eyre::Result> { + ) -> eyre::Result> { let flashblock_index = fb_state.flashblock_index(); let mut target_gas_for_batch = fb_state.target_gas_for_batch(); let mut target_da_for_batch = fb_state.target_da_for_batch(); @@ -1056,59 +1164,22 @@ where fb_payload.index = flashblock_index; fb_payload.base = None; - // Block canceled (new FCU, getPayload resolved, or deadline). - // Don't publish to ensures every published flashblock is a subset of the resolved payload. + // Block canceled (new FCU, getPayload resolved, or deadline). The async outer + // loop owns publishing and re-checks cancellation before every side effect. if block_cancel.is_cancelled() { return Ok(None); } - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .wrap_err("failed to publish flashblock via websocket")?; - - // Record slot-relative publish timing (ms since slot start) - let slot_offset_ms = - compute_slot_offset_ms(ctx.attributes().timestamp(), self.config.block_time); - record_flashblock_publish_timing(flashblock_index, slot_offset_ms); - - if self.config.enable_tx_tracking_debug_logs { - debug!( - target: "tx_trace", - payload_id = %ctx.payload_id(), - block_number = ctx.block_number(), - flashblock_index, - byte_size = flashblock_byte_size, - total_txs = info.executed_transactions.len(), - slot_offset_ms, - stage = "fb_published" - ); - } - self.built_fb_payload_tx - .try_send(new_payload.clone()) - .wrap_err("failed to send built payload to handler")?; - if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } - // Record flashblock build duration - ctx.metrics - .flashblock_build_duration - .record(flashblock_build_start_time.elapsed()); - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - ctx.metrics - .flashblock_num_tx_histogram - .record(info.executed_transactions.len() as f64); // Advance batch budgets for the next flashblock. let next_flashblock_state = fb_state.next_after_seal(target_da_for_batch, target_da_footprint_for_batch); - Ok(Some((next_flashblock_state, new_payload))) + Ok(Some(BuiltFlashblockOutput { + next_flashblock_state, + new_payload, + fb_payload, + build_duration: flashblock_build_start_time.elapsed(), + })) } } } @@ -1116,7 +1187,7 @@ where /// Records cancellation reason for observability. fn record_cancellation_reason( metrics: &OpRBuilderMetrics, - cancellation: &super::cancellation::PayloadJobCancellation, + cancellation: &PayloadJobCancellation, span: &tracing::Span, ) { let reason_str = match cancellation.reason() { diff --git a/crates/op-rbuilder/src/builder/timing.rs b/crates/op-rbuilder/src/builder/timing.rs index 03380657..851b6830 100644 --- a/crates/op-rbuilder/src/builder/timing.rs +++ b/crates/op-rbuilder/src/builder/timing.rs @@ -104,7 +104,7 @@ impl FlashblockScheduler { return; } } - _ = block_cancel.cancelled() => { + _ = block_cancel.wait_for_cancellation() => { warn!( target: "payload_builder", id = %payload_id,