From 1cb4ae08f6db1d6a239a39e0a68e418006a842bd Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Fri, 15 May 2026 22:24:14 +0800 Subject: [PATCH 1/2] refactor: extract post-seal side-effects into PostSealHook trait Move WS publication, p2p forwarding, engine propagation, and per-flashblock metrics out of the building loop and behind a PostSealHook trait. Hooks are constructed once in OpPayloadBuilder::new() and dispatched after each sealed candidate (fallback + every flashblock). build_next_flashblock no longer performs side-effects: it returns a FlashblockBuildResult struct and the async loop handles dispatch. The side-effect call sites in the fallback path and the FB iteration are collapsed to a single dispatch_post_seal call each. p2p and engine forwarders are unified as ChannelHook(name, sender) since they only differ in the failure log label. --- .../op-rbuilder/src/builder/hooks/channel.rs | 34 ++++ .../op-rbuilder/src/builder/hooks/metrics.rs | 29 +++ crates/op-rbuilder/src/builder/hooks/mod.rs | 30 +++ .../src/builder/hooks/post_seal.rs | 46 +++++ crates/op-rbuilder/src/builder/hooks/ws.rs | 73 +++++++ crates/op-rbuilder/src/builder/mod.rs | 1 + crates/op-rbuilder/src/builder/payload.rs | 179 ++++++++---------- 7 files changed, 292 insertions(+), 100 deletions(-) create mode 100644 crates/op-rbuilder/src/builder/hooks/channel.rs create mode 100644 crates/op-rbuilder/src/builder/hooks/metrics.rs create mode 100644 crates/op-rbuilder/src/builder/hooks/mod.rs create mode 100644 crates/op-rbuilder/src/builder/hooks/post_seal.rs create mode 100644 crates/op-rbuilder/src/builder/hooks/ws.rs diff --git a/crates/op-rbuilder/src/builder/hooks/channel.rs b/crates/op-rbuilder/src/builder/hooks/channel.rs new file mode 100644 index 00000000..2bbe9b7a --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/channel.rs @@ -0,0 +1,34 @@ +use crate::builder::hooks::post_seal::{PostSealHook, SealedCandidate, SealedCtx}; +use reth_optimism_node::OpBuiltPayload; +use tokio::sync::mpsc; +use tracing::warn; + +/// Forwards each sealed candidate over a named mpsc channel. +#[derive(Debug)] +pub(in crate::builder) struct ChannelHook { + name: &'static str, + sender: mpsc::Sender, +} + +impl ChannelHook { + pub(in crate::builder) fn new( + name: &'static str, + sender: mpsc::Sender, + ) -> Self { + Self { name, sender } + } +} + +impl PostSealHook for ChannelHook { + fn on_sealed(&self, candidate: &SealedCandidate, ctx: &SealedCtx) { + if let Err(e) = self.sender.try_send(candidate.payload.clone()) { + warn!( + target: "payload_builder", + channel = self.name, + error = %e, + flashblock_index = ctx.flashblock_index, + "Failed to forward sealed payload over channel" + ); + } + } +} diff --git a/crates/op-rbuilder/src/builder/hooks/metrics.rs b/crates/op-rbuilder/src/builder/hooks/metrics.rs new file mode 100644 index 00000000..a0bed972 --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/metrics.rs @@ -0,0 +1,29 @@ +use crate::{ + builder::hooks::post_seal::{PostSealHook, SealedCandidate, SealedCtx}, + metrics::OpRBuilderMetrics, +}; +use std::sync::Arc; + +/// Records per-flashblock metrics that aren't tied to publication: +/// build duration and transaction-count histogram. +#[derive(Debug)] +pub(in crate::builder) struct MetricsHook { + metrics: Arc, +} + +impl MetricsHook { + pub(in crate::builder) fn new(metrics: Arc) -> Self { + Self { metrics } + } +} + +impl PostSealHook for MetricsHook { + fn on_sealed(&self, _candidate: &SealedCandidate, ctx: &SealedCtx) { + if let Some(duration) = ctx.flashblock_build_duration { + self.metrics.flashblock_build_duration.record(duration); + } + self.metrics + .flashblock_num_tx_histogram + .record(ctx.executed_tx_count as f64); + } +} diff --git a/crates/op-rbuilder/src/builder/hooks/mod.rs b/crates/op-rbuilder/src/builder/hooks/mod.rs new file mode 100644 index 00000000..765d727d --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/mod.rs @@ -0,0 +1,30 @@ +//! Lifecycle hooks dispatched from the building loop. +//! +//! [`PostSealHook`] fires after each sealed candidate (fallback or +//! flashblock) and isolates downstream side-effects — WS publication, p2p +//! broadcast, engine propagation, metrics — from the building loop itself. +//! Concrete implementations live alongside this module. + +mod channel; +mod metrics; +mod post_seal; +mod ws; + +pub(super) use channel::ChannelHook; +pub(super) use metrics::MetricsHook; +pub(super) use post_seal::{PostSealHook, SealedCandidate, SealedCtx}; +pub(super) use ws::WsHook; + +/// Dispatch a sealed candidate to every hook in `hooks`. +/// +/// Hook impls are expected to be cheap; we run them sequentially in the +/// caller's context (sync, including from within `spawn_blocking`). +pub(super) fn dispatch_post_seal( + hooks: &[Box], + candidate: &SealedCandidate, + ctx: &SealedCtx, +) { + for h in hooks { + h.on_sealed(candidate, ctx); + } +} diff --git a/crates/op-rbuilder/src/builder/hooks/post_seal.rs b/crates/op-rbuilder/src/builder/hooks/post_seal.rs new file mode 100644 index 00000000..a136491b --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/post_seal.rs @@ -0,0 +1,46 @@ +use core::time::Duration; +use op_alloy_rpc_types_engine::OpFlashblockPayload; +use reth_optimism_node::OpBuiltPayload; +use reth_payload_builder::PayloadId; + +/// A flashblock candidate that has been sealed and is ready for publication +/// and downstream propagation. +/// +/// `payload` is the full built payload for engine/p2p delivery; `fb_payload` +/// is the slim, serialisable view streamed to flashblocks subscribers. +#[derive(Debug, Clone)] +pub(in crate::builder) struct SealedCandidate { + pub payload: OpBuiltPayload, + pub fb_payload: OpFlashblockPayload, +} + +/// Context describing the slot a sealed candidate belongs to. +/// +/// The fields are intentionally limited to data downstream hooks need. +#[derive(Debug, Clone)] +pub(in crate::builder) struct SealedCtx { + pub payload_id: PayloadId, + pub block_number: u64, + pub flashblock_index: u64, + /// True when the FCU specified `no_tx_pool`. + pub no_tx_pool: bool, + pub executed_tx_count: usize, + /// Slot start timestamp from the payload attributes. + pub slot_timestamp_secs: u64, + pub block_time: Duration, + /// Wall-clock time spent building this flashblock. + /// `None` for the fallback candidate. + pub flashblock_build_duration: Option, + pub enable_tx_tracking_debug_logs: bool, +} + +/// Hook invoked after a flashblock or fallback candidate has been sealed. +/// +/// Implementations should be cheap and non-blocking: dispatch happens on the +/// builder's hot path. Errors are intentionally swallowed at the dispatch site; +/// hooks that want to surface failures should do so via metrics or logs. +pub(in crate::builder) trait PostSealHook: + Send + Sync + std::fmt::Debug +{ + fn on_sealed(&self, candidate: &SealedCandidate, ctx: &SealedCtx); +} diff --git a/crates/op-rbuilder/src/builder/hooks/ws.rs b/crates/op-rbuilder/src/builder/hooks/ws.rs new file mode 100644 index 00000000..868d055c --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/ws.rs @@ -0,0 +1,73 @@ +use crate::{ + builder::{ + hooks::post_seal::{PostSealHook, SealedCandidate, SealedCtx}, + timing::compute_slot_offset_ms, + wspub::WebSocketPublisher, + }, + metrics::{OpRBuilderMetrics, record_flashblock_publish_timing}, +}; +use std::sync::Arc; +use tracing::{debug, warn}; + +/// Publishes the flashblock payload to WebSocket subscribers, record metrics. +/// +/// Suppressed when `SealedCtx::no_tx_pool` is true +pub(in crate::builder) struct WsHook { + ws_pub: Arc, + metrics: Arc, +} + +impl std::fmt::Debug for WsHook { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WsHook").finish_non_exhaustive() + } +} + +impl WsHook { + pub(in crate::builder) fn new( + ws_pub: Arc, + metrics: Arc, + ) -> Self { + Self { ws_pub, metrics } + } +} + +impl PostSealHook for WsHook { + fn on_sealed(&self, candidate: &SealedCandidate, ctx: &SealedCtx) { + if ctx.no_tx_pool { + return; + } + + let byte_size = match self.ws_pub.publish(&candidate.fb_payload) { + Ok(size) => size, + Err(e) => { + warn!( + target: "payload_builder", + error = %e, + flashblock_index = ctx.flashblock_index, + "Failed to publish flashblock via websocket" + ); + return; + } + }; + + let slot_offset_ms = compute_slot_offset_ms(ctx.slot_timestamp_secs, ctx.block_time); + record_flashblock_publish_timing(candidate.fb_payload.index, slot_offset_ms); + self.metrics + .flashblock_byte_size_histogram + .record(byte_size as f64); + + if ctx.enable_tx_tracking_debug_logs { + debug!( + target: "tx_trace", + payload_id = %ctx.payload_id, + block_number = ctx.block_number, + flashblock_index = candidate.fb_payload.index, + byte_size, + total_txs = ctx.executed_tx_count, + slot_offset_ms, + stage = "fb_published" + ); + } + } +} diff --git a/crates/op-rbuilder/src/builder/mod.rs b/crates/op-rbuilder/src/builder/mod.rs index dfffe3c0..d4fcd386 100644 --- a/crates/op-rbuilder/src/builder/mod.rs +++ b/crates/op-rbuilder/src/builder/mod.rs @@ -17,6 +17,7 @@ mod config; mod context; mod flashblocks_builder_tx; mod generator; +mod hooks; mod p2p; mod payload; mod payload_handler; diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index f6d4cf7c..f1154855 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -7,12 +7,16 @@ use crate::{ cancellation::FlashblockJobCancellation, context::{OpPayloadBuilderCtx, OpPayloadJobCtx}, generator::{BuildArguments, PayloadBuilder}, - timing::{FlashblockScheduler, compute_slot_offset_ms}, + hooks::{ + ChannelHook, MetricsHook, PostSealHook, SealedCandidate, SealedCtx, WsHook, + dispatch_post_seal, + }, + timing::FlashblockScheduler, }, evm::OpBlockEvmFactory, hardforks::ActiveHardforks, limiter::AddressLimiter, - metrics::{OpRBuilderMetrics, record_flashblock_publish_timing}, + metrics::OpRBuilderMetrics, primitives::reth::ExecutionInfo, runtime_ext::RuntimeExt, tokio_metrics::FlashblocksTaskMetrics, @@ -94,7 +98,7 @@ struct FallbackBuildOutput { struct FlashblockBuildOutput { ctx: OpPayloadJobCtx, - build_result: eyre::Result>, + build_result: eyre::Result>, cache: Cache, transition: Transition, tx_tracker: FlashblockTxTracker, @@ -103,6 +107,15 @@ struct FlashblockBuildOutput { state_root_calc: StateRootCalculator, } +/// Output of a successful `build_next_flashblock`. +struct FlashblockBuildResult { + next_fb_state: FlashblocksState, + new_payload: OpBuiltPayload, + fb_payload: OpFlashblockPayload, + build_duration: core::time::Duration, + executed_tx_count: usize, +} + impl FlashblocksState { fn new(target_flashblock_count: u64) -> Self { Self { @@ -248,15 +261,8 @@ pub(super) struct OpPayloadBuilderInner { pool: Pool, /// Node client client: Client, - /// Sender for sending built flashblock payloads to [`PayloadHandler`], - /// which broadcasts outgoing flashblock payloads via p2p. - built_fb_payload_tx: mpsc::Sender, - /// Sender for sending built full block payloads to [`PayloadHandler`], - /// which updates the engine tree state. - built_payload_tx: mpsc::Sender, - /// WebSocket publisher for broadcasting flashblocks - /// to all connected subscribers. - ws_pub: WebSocketPublisher, + /// Hooks dispatched after each sealed candidate (fallback or flashblock). + post_seal_hooks: Vec>, /// System configuration for the builder config: BuilderConfig, /// The end of builder transaction type @@ -310,7 +316,7 @@ where da_config: config.da_config.clone(), gas_limit_config: config.gas_limit_config.clone(), chain_spec: client.chain_spec(), - metrics, + metrics: Arc::clone(&metrics), max_gas_per_txn: config.max_gas_per_txn, max_uncompressed_block_size: config.max_uncompressed_block_size, address_limiter, @@ -320,14 +326,21 @@ where disable_state_root: config.flashblocks_config.disable_state_root, enable_incremental_state_root: config.flashblocks_config.enable_incremental_state_root, }); + + let ws_pub = Arc::new(ws_pub); + let post_seal_hooks: Vec> = vec![ + Box::new(WsHook::new(Arc::clone(&ws_pub), Arc::clone(&metrics))), + Box::new(ChannelHook::new("p2p", built_fb_payload_tx)), + Box::new(ChannelHook::new("engine", built_payload_tx)), + Box::new(MetricsHook::new(Arc::clone(&metrics))), + ]; + Self { inner: Arc::new(OpPayloadBuilderInner { builder_ctx, pool, client, - built_fb_payload_tx, - built_payload_tx, - ws_pub, + post_seal_hooks, config, builder_tx, task_metrics, @@ -494,16 +507,22 @@ where fb_state = returned_fb_state; state_root_calc = returned_state_root_calc; - self.built_fb_payload_tx - .try_send(payload.clone()) - .map_err(PayloadBuilderError::other)?; - if let Err(e) = self.built_payload_tx.try_send(payload.clone()) { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } + let candidate = SealedCandidate { + payload: payload.clone(), + fb_payload: fb_payload.clone(), + }; + let sealed_ctx = SealedCtx { + payload_id: ctx.payload_id(), + block_number: ctx.block_number(), + flashblock_index: fb_payload.index, + no_tx_pool: ctx.attributes().no_tx_pool, + executed_tx_count: info.executed_transactions.len(), + slot_timestamp_secs: config.attributes.timestamp(), + block_time: self.config.block_time, + flashblock_build_duration: None, + enable_tx_tracking_debug_logs: self.config.enable_tx_tracking_debug_logs, + }; + dispatch_post_seal(&self.post_seal_hooks, &candidate, &sealed_ctx); best_payload_tx.send_replace(Some(payload)); info!( @@ -512,34 +531,6 @@ where "Fallback block built" ); - // not emitting flashblock if no_tx_pool in FCU, it's just syncing - if !ctx.attributes().no_tx_pool { - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .map_err(PayloadBuilderError::other)?; - - let slot_offset_ms = - compute_slot_offset_ms(config.attributes.timestamp(), self.config.block_time); - record_flashblock_publish_timing(fb_payload.index, slot_offset_ms); - - if self.config.enable_tx_tracking_debug_logs { - debug!( - target: "tx_trace", - payload_id = %ctx.payload_id(), - block_number = ctx.block_number(), - flashblock_index = fb_payload.index, - byte_size = flashblock_byte_size, - total_txs = info.executed_transactions.len(), - slot_offset_ms, - stage = "fb_published" - ); - } - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - } - if ctx.attributes().no_tx_pool { info!( target: "payload_builder", @@ -802,9 +793,33 @@ where } let next_flashblock_state = match build_result { - Ok(Some((next_flashblock_state, new_payload))) => { + Ok(Some(result)) => { + let FlashblockBuildResult { + next_fb_state, + new_payload, + fb_payload: built_fb_payload, + build_duration, + executed_tx_count, + } = result; + + let candidate = SealedCandidate { + payload: new_payload.clone(), + fb_payload: built_fb_payload, + }; + let sealed_ctx = SealedCtx { + payload_id: ctx.payload_id(), + block_number: ctx.block_number(), + flashblock_index: candidate.fb_payload.index, + no_tx_pool: ctx.attributes().no_tx_pool, + executed_tx_count, + slot_timestamp_secs: ctx.attributes().timestamp(), + block_time: self.config.block_time, + flashblock_build_duration: Some(build_duration), + enable_tx_tracking_debug_logs: self.config.enable_tx_tracking_debug_logs, + }; + dispatch_post_seal(&self.post_seal_hooks, &candidate, &sealed_ctx); best_payload_tx.send_replace(Some(new_payload)); - next_flashblock_state + next_fb_state } Ok(None) => { Self::record_cancellation_reason( @@ -920,7 +935,7 @@ where best_txs: &mut NextFlashblockPoolTxCursor<'a, Pool>, block_cancel: &CancellationToken, state_root_calc: &mut StateRootCalculator, - ) -> eyre::Result> { + ) -> eyre::Result> { let flashblock_index = fb_state.flashblock_index(); let mut target_gas_for_batch = fb_state.target_gas_for_batch(); let mut target_da_for_batch = fb_state.target_da_for_batch(); @@ -1066,54 +1081,18 @@ where if block_cancel.is_cancelled() { return Ok(None); } - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .wrap_err("failed to publish flashblock via websocket")?; - - // Record slot-relative publish timing (ms since slot start) - let slot_offset_ms = - compute_slot_offset_ms(ctx.attributes().timestamp(), self.config.block_time); - record_flashblock_publish_timing(flashblock_index, slot_offset_ms); - - if self.config.enable_tx_tracking_debug_logs { - debug!( - target: "tx_trace", - payload_id = %ctx.payload_id(), - block_number = ctx.block_number(), - flashblock_index, - byte_size = flashblock_byte_size, - total_txs = info.executed_transactions.len(), - slot_offset_ms, - stage = "fb_published" - ); - } - self.built_fb_payload_tx - .try_send(new_payload.clone()) - .wrap_err("failed to send built payload to handler")?; - if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } - // Record flashblock build duration - ctx.metrics - .flashblock_build_duration - .record(flashblock_build_start_time.elapsed()); - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - ctx.metrics - .flashblock_num_tx_histogram - .record(info.executed_transactions.len() as f64); // Advance batch budgets for the next flashblock. let next_flashblock_state = fb_state.next_after_seal(target_da_for_batch, target_da_footprint_for_batch); - Ok(Some((next_flashblock_state, new_payload))) + Ok(Some(FlashblockBuildResult { + next_fb_state: next_flashblock_state, + new_payload, + fb_payload, + build_duration: flashblock_build_start_time.elapsed(), + executed_tx_count: info.executed_transactions.len(), + })) } } } From 849ba742f9bbbeebcd1df9c1b6751fb9f7036ff7 Mon Sep 17 00:00:00 2001 From: julio4 <30329843+julio4@users.noreply.github.com> Date: Thu, 21 May 2026 18:20:03 +0800 Subject: [PATCH 2/2] remove SealedCtx in favor of SlotMeta+Payload per hook derivation --- .../op-rbuilder/src/builder/hooks/channel.rs | 13 +++----- .../op-rbuilder/src/builder/hooks/metrics.rs | 12 +++---- crates/op-rbuilder/src/builder/hooks/mod.rs | 6 ++-- .../src/builder/hooks/post_seal.rs | 24 +++++--------- crates/op-rbuilder/src/builder/hooks/ws.rs | 32 +++++++++++-------- crates/op-rbuilder/src/builder/payload.rs | 31 +++++++----------- 6 files changed, 53 insertions(+), 65 deletions(-) diff --git a/crates/op-rbuilder/src/builder/hooks/channel.rs b/crates/op-rbuilder/src/builder/hooks/channel.rs index 2bbe9b7a..0f450b6d 100644 --- a/crates/op-rbuilder/src/builder/hooks/channel.rs +++ b/crates/op-rbuilder/src/builder/hooks/channel.rs @@ -1,32 +1,29 @@ -use crate::builder::hooks::post_seal::{PostSealHook, SealedCandidate, SealedCtx}; +use crate::builder::hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta}; use reth_optimism_node::OpBuiltPayload; use tokio::sync::mpsc; use tracing::warn; /// Forwards each sealed candidate over a named mpsc channel. #[derive(Debug)] -pub(in crate::builder) struct ChannelHook { +pub(crate) struct ChannelHook { name: &'static str, sender: mpsc::Sender, } impl ChannelHook { - pub(in crate::builder) fn new( - name: &'static str, - sender: mpsc::Sender, - ) -> Self { + pub(crate) fn new(name: &'static str, sender: mpsc::Sender) -> Self { Self { name, sender } } } impl PostSealHook for ChannelHook { - fn on_sealed(&self, candidate: &SealedCandidate, ctx: &SealedCtx) { + fn on_sealed(&self, candidate: &SealedCandidate, _slot: &SlotMeta) { if let Err(e) = self.sender.try_send(candidate.payload.clone()) { warn!( target: "payload_builder", channel = self.name, error = %e, - flashblock_index = ctx.flashblock_index, + flashblock_index = candidate.fb_payload.index, "Failed to forward sealed payload over channel" ); } diff --git a/crates/op-rbuilder/src/builder/hooks/metrics.rs b/crates/op-rbuilder/src/builder/hooks/metrics.rs index a0bed972..5c67e8cd 100644 --- a/crates/op-rbuilder/src/builder/hooks/metrics.rs +++ b/crates/op-rbuilder/src/builder/hooks/metrics.rs @@ -1,5 +1,5 @@ use crate::{ - builder::hooks::post_seal::{PostSealHook, SealedCandidate, SealedCtx}, + builder::hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta}, metrics::OpRBuilderMetrics, }; use std::sync::Arc; @@ -7,23 +7,23 @@ use std::sync::Arc; /// Records per-flashblock metrics that aren't tied to publication: /// build duration and transaction-count histogram. #[derive(Debug)] -pub(in crate::builder) struct MetricsHook { +pub(crate) struct MetricsHook { metrics: Arc, } impl MetricsHook { - pub(in crate::builder) fn new(metrics: Arc) -> Self { + pub(crate) fn new(metrics: Arc) -> Self { Self { metrics } } } impl PostSealHook for MetricsHook { - fn on_sealed(&self, _candidate: &SealedCandidate, ctx: &SealedCtx) { - if let Some(duration) = ctx.flashblock_build_duration { + fn on_sealed(&self, candidate: &SealedCandidate, _slot: &SlotMeta) { + if let Some(duration) = candidate.build_duration { self.metrics.flashblock_build_duration.record(duration); } self.metrics .flashblock_num_tx_histogram - .record(ctx.executed_tx_count as f64); + .record(candidate.payload.block().body().transactions.len() as f64); } } diff --git a/crates/op-rbuilder/src/builder/hooks/mod.rs b/crates/op-rbuilder/src/builder/hooks/mod.rs index 765d727d..aa05fae4 100644 --- a/crates/op-rbuilder/src/builder/hooks/mod.rs +++ b/crates/op-rbuilder/src/builder/hooks/mod.rs @@ -12,7 +12,7 @@ mod ws; pub(super) use channel::ChannelHook; pub(super) use metrics::MetricsHook; -pub(super) use post_seal::{PostSealHook, SealedCandidate, SealedCtx}; +pub(super) use post_seal::{PostSealHook, SealedCandidate, SlotMeta}; pub(super) use ws::WsHook; /// Dispatch a sealed candidate to every hook in `hooks`. @@ -22,9 +22,9 @@ pub(super) use ws::WsHook; pub(super) fn dispatch_post_seal( hooks: &[Box], candidate: &SealedCandidate, - ctx: &SealedCtx, + slot: &SlotMeta, ) { for h in hooks { - h.on_sealed(candidate, ctx); + h.on_sealed(candidate, slot); } } diff --git a/crates/op-rbuilder/src/builder/hooks/post_seal.rs b/crates/op-rbuilder/src/builder/hooks/post_seal.rs index a136491b..562b93db 100644 --- a/crates/op-rbuilder/src/builder/hooks/post_seal.rs +++ b/crates/op-rbuilder/src/builder/hooks/post_seal.rs @@ -8,30 +8,24 @@ use reth_payload_builder::PayloadId; /// /// `payload` is the full built payload for engine/p2p delivery; `fb_payload` /// is the slim, serialisable view streamed to flashblocks subscribers. +/// `build_duration` is the wall-clock time spent building this flashblock, +/// or `None` for the fallback candidate (no incremental build step). #[derive(Debug, Clone)] -pub(in crate::builder) struct SealedCandidate { +pub(crate) struct SealedCandidate { pub payload: OpBuiltPayload, pub fb_payload: OpFlashblockPayload, + pub build_duration: Option, } -/// Context describing the slot a sealed candidate belongs to. -/// -/// The fields are intentionally limited to data downstream hooks need. +/// Slot-level metadata for a given building slot. #[derive(Debug, Clone)] -pub(in crate::builder) struct SealedCtx { +pub(crate) struct SlotMeta { pub payload_id: PayloadId, - pub block_number: u64, - pub flashblock_index: u64, /// True when the FCU specified `no_tx_pool`. pub no_tx_pool: bool, - pub executed_tx_count: usize, /// Slot start timestamp from the payload attributes. pub slot_timestamp_secs: u64, pub block_time: Duration, - /// Wall-clock time spent building this flashblock. - /// `None` for the fallback candidate. - pub flashblock_build_duration: Option, - pub enable_tx_tracking_debug_logs: bool, } /// Hook invoked after a flashblock or fallback candidate has been sealed. @@ -39,8 +33,6 @@ pub(in crate::builder) struct SealedCtx { /// Implementations should be cheap and non-blocking: dispatch happens on the /// builder's hot path. Errors are intentionally swallowed at the dispatch site; /// hooks that want to surface failures should do so via metrics or logs. -pub(in crate::builder) trait PostSealHook: - Send + Sync + std::fmt::Debug -{ - fn on_sealed(&self, candidate: &SealedCandidate, ctx: &SealedCtx); +pub(crate) trait PostSealHook: Send + Sync + std::fmt::Debug { + fn on_sealed(&self, candidate: &SealedCandidate, slot: &SlotMeta); } diff --git a/crates/op-rbuilder/src/builder/hooks/ws.rs b/crates/op-rbuilder/src/builder/hooks/ws.rs index 868d055c..5b33ccfc 100644 --- a/crates/op-rbuilder/src/builder/hooks/ws.rs +++ b/crates/op-rbuilder/src/builder/hooks/ws.rs @@ -1,6 +1,6 @@ use crate::{ builder::{ - hooks::post_seal::{PostSealHook, SealedCandidate, SealedCtx}, + hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta}, timing::compute_slot_offset_ms, wspub::WebSocketPublisher, }, @@ -11,10 +11,11 @@ use tracing::{debug, warn}; /// Publishes the flashblock payload to WebSocket subscribers, record metrics. /// -/// Suppressed when `SealedCtx::no_tx_pool` is true -pub(in crate::builder) struct WsHook { +/// Suppressed when `SlotMeta::no_tx_pool` is true. +pub(crate) struct WsHook { ws_pub: Arc, metrics: Arc, + enable_tx_tracking_debug_logs: bool, } impl std::fmt::Debug for WsHook { @@ -24,17 +25,22 @@ impl std::fmt::Debug for WsHook { } impl WsHook { - pub(in crate::builder) fn new( + pub(crate) fn new( ws_pub: Arc, metrics: Arc, + enable_tx_tracking_debug_logs: bool, ) -> Self { - Self { ws_pub, metrics } + Self { + ws_pub, + metrics, + enable_tx_tracking_debug_logs, + } } } impl PostSealHook for WsHook { - fn on_sealed(&self, candidate: &SealedCandidate, ctx: &SealedCtx) { - if ctx.no_tx_pool { + fn on_sealed(&self, candidate: &SealedCandidate, slot: &SlotMeta) { + if slot.no_tx_pool { return; } @@ -44,27 +50,27 @@ impl PostSealHook for WsHook { warn!( target: "payload_builder", error = %e, - flashblock_index = ctx.flashblock_index, + flashblock_index = candidate.fb_payload.index, "Failed to publish flashblock via websocket" ); return; } }; - let slot_offset_ms = compute_slot_offset_ms(ctx.slot_timestamp_secs, ctx.block_time); + let slot_offset_ms = compute_slot_offset_ms(slot.slot_timestamp_secs, slot.block_time); record_flashblock_publish_timing(candidate.fb_payload.index, slot_offset_ms); self.metrics .flashblock_byte_size_histogram .record(byte_size as f64); - if ctx.enable_tx_tracking_debug_logs { + if self.enable_tx_tracking_debug_logs { debug!( target: "tx_trace", - payload_id = %ctx.payload_id, - block_number = ctx.block_number, + payload_id = %slot.payload_id, + block_number = candidate.payload.block().header().number, flashblock_index = candidate.fb_payload.index, byte_size, - total_txs = ctx.executed_tx_count, + total_txs = candidate.payload.block().body().transactions.len(), slot_offset_ms, stage = "fb_published" ); diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index f1154855..8f4dc261 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -8,7 +8,7 @@ use crate::{ context::{OpPayloadBuilderCtx, OpPayloadJobCtx}, generator::{BuildArguments, PayloadBuilder}, hooks::{ - ChannelHook, MetricsHook, PostSealHook, SealedCandidate, SealedCtx, WsHook, + ChannelHook, MetricsHook, PostSealHook, SealedCandidate, SlotMeta, WsHook, dispatch_post_seal, }, timing::FlashblockScheduler, @@ -113,7 +113,6 @@ struct FlashblockBuildResult { new_payload: OpBuiltPayload, fb_payload: OpFlashblockPayload, build_duration: core::time::Duration, - executed_tx_count: usize, } impl FlashblocksState { @@ -329,7 +328,11 @@ where let ws_pub = Arc::new(ws_pub); let post_seal_hooks: Vec> = vec![ - Box::new(WsHook::new(Arc::clone(&ws_pub), Arc::clone(&metrics))), + Box::new(WsHook::new( + Arc::clone(&ws_pub), + Arc::clone(&metrics), + config.enable_tx_tracking_debug_logs, + )), Box::new(ChannelHook::new("p2p", built_fb_payload_tx)), Box::new(ChannelHook::new("engine", built_payload_tx)), Box::new(MetricsHook::new(Arc::clone(&metrics))), @@ -510,19 +513,15 @@ where let candidate = SealedCandidate { payload: payload.clone(), fb_payload: fb_payload.clone(), + build_duration: None, }; - let sealed_ctx = SealedCtx { + let slot = SlotMeta { payload_id: ctx.payload_id(), - block_number: ctx.block_number(), - flashblock_index: fb_payload.index, no_tx_pool: ctx.attributes().no_tx_pool, - executed_tx_count: info.executed_transactions.len(), slot_timestamp_secs: config.attributes.timestamp(), block_time: self.config.block_time, - flashblock_build_duration: None, - enable_tx_tracking_debug_logs: self.config.enable_tx_tracking_debug_logs, }; - dispatch_post_seal(&self.post_seal_hooks, &candidate, &sealed_ctx); + dispatch_post_seal(&self.post_seal_hooks, &candidate, &slot); best_payload_tx.send_replace(Some(payload)); info!( @@ -799,25 +798,20 @@ where new_payload, fb_payload: built_fb_payload, build_duration, - executed_tx_count, } = result; let candidate = SealedCandidate { payload: new_payload.clone(), fb_payload: built_fb_payload, + build_duration: Some(build_duration), }; - let sealed_ctx = SealedCtx { + let slot = SlotMeta { payload_id: ctx.payload_id(), - block_number: ctx.block_number(), - flashblock_index: candidate.fb_payload.index, no_tx_pool: ctx.attributes().no_tx_pool, - executed_tx_count, slot_timestamp_secs: ctx.attributes().timestamp(), block_time: self.config.block_time, - flashblock_build_duration: Some(build_duration), - enable_tx_tracking_debug_logs: self.config.enable_tx_tracking_debug_logs, }; - dispatch_post_seal(&self.post_seal_hooks, &candidate, &sealed_ctx); + dispatch_post_seal(&self.post_seal_hooks, &candidate, &slot); best_payload_tx.send_replace(Some(new_payload)); next_fb_state } @@ -1091,7 +1085,6 @@ where new_payload, fb_payload, build_duration: flashblock_build_start_time.elapsed(), - executed_tx_count: info.executed_transactions.len(), })) } }