diff --git a/crates/op-rbuilder/src/builder/hooks/channel.rs b/crates/op-rbuilder/src/builder/hooks/channel.rs new file mode 100644 index 00000000..0f450b6d --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/channel.rs @@ -0,0 +1,31 @@ +use crate::builder::hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta}; +use reth_optimism_node::OpBuiltPayload; +use tokio::sync::mpsc; +use tracing::warn; + +/// Forwards each sealed candidate over a named mpsc channel. +#[derive(Debug)] +pub(crate) struct ChannelHook { + name: &'static str, + sender: mpsc::Sender, +} + +impl ChannelHook { + pub(crate) fn new(name: &'static str, sender: mpsc::Sender) -> Self { + Self { name, sender } + } +} + +impl PostSealHook for ChannelHook { + fn on_sealed(&self, candidate: &SealedCandidate, _slot: &SlotMeta) { + if let Err(e) = self.sender.try_send(candidate.payload.clone()) { + warn!( + target: "payload_builder", + channel = self.name, + error = %e, + flashblock_index = candidate.fb_payload.index, + "Failed to forward sealed payload over channel" + ); + } + } +} diff --git a/crates/op-rbuilder/src/builder/hooks/metrics.rs b/crates/op-rbuilder/src/builder/hooks/metrics.rs new file mode 100644 index 00000000..5c67e8cd --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/metrics.rs @@ -0,0 +1,29 @@ +use crate::{ + builder::hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta}, + metrics::OpRBuilderMetrics, +}; +use std::sync::Arc; + +/// Records per-flashblock metrics that aren't tied to publication: +/// build duration and transaction-count histogram. +#[derive(Debug)] +pub(crate) struct MetricsHook { + metrics: Arc, +} + +impl MetricsHook { + pub(crate) fn new(metrics: Arc) -> Self { + Self { metrics } + } +} + +impl PostSealHook for MetricsHook { + fn on_sealed(&self, candidate: &SealedCandidate, _slot: &SlotMeta) { + if let Some(duration) = candidate.build_duration { + self.metrics.flashblock_build_duration.record(duration); + } + self.metrics + .flashblock_num_tx_histogram + .record(candidate.payload.block().body().transactions.len() as f64); + } +} diff --git a/crates/op-rbuilder/src/builder/hooks/mod.rs b/crates/op-rbuilder/src/builder/hooks/mod.rs new file mode 100644 index 00000000..aa05fae4 --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/mod.rs @@ -0,0 +1,30 @@ +//! Lifecycle hooks dispatched from the building loop. +//! +//! [`PostSealHook`] fires after each sealed candidate (fallback or +//! flashblock) and isolates downstream side-effects — WS publication, p2p +//! broadcast, engine propagation, metrics — from the building loop itself. +//! Concrete implementations live alongside this module. + +mod channel; +mod metrics; +mod post_seal; +mod ws; + +pub(super) use channel::ChannelHook; +pub(super) use metrics::MetricsHook; +pub(super) use post_seal::{PostSealHook, SealedCandidate, SlotMeta}; +pub(super) use ws::WsHook; + +/// Dispatch a sealed candidate to every hook in `hooks`. +/// +/// Hook impls are expected to be cheap; we run them sequentially in the +/// caller's context (sync, including from within `spawn_blocking`). +pub(super) fn dispatch_post_seal( + hooks: &[Box], + candidate: &SealedCandidate, + slot: &SlotMeta, +) { + for h in hooks { + h.on_sealed(candidate, slot); + } +} diff --git a/crates/op-rbuilder/src/builder/hooks/post_seal.rs b/crates/op-rbuilder/src/builder/hooks/post_seal.rs new file mode 100644 index 00000000..562b93db --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/post_seal.rs @@ -0,0 +1,38 @@ +use core::time::Duration; +use op_alloy_rpc_types_engine::OpFlashblockPayload; +use reth_optimism_node::OpBuiltPayload; +use reth_payload_builder::PayloadId; + +/// A flashblock candidate that has been sealed and is ready for publication +/// and downstream propagation. +/// +/// `payload` is the full built payload for engine/p2p delivery; `fb_payload` +/// is the slim, serialisable view streamed to flashblocks subscribers. +/// `build_duration` is the wall-clock time spent building this flashblock, +/// or `None` for the fallback candidate (no incremental build step). +#[derive(Debug, Clone)] +pub(crate) struct SealedCandidate { + pub payload: OpBuiltPayload, + pub fb_payload: OpFlashblockPayload, + pub build_duration: Option, +} + +/// Slot-level metadata for a given building slot. +#[derive(Debug, Clone)] +pub(crate) struct SlotMeta { + pub payload_id: PayloadId, + /// True when the FCU specified `no_tx_pool`. + pub no_tx_pool: bool, + /// Slot start timestamp from the payload attributes. + pub slot_timestamp_secs: u64, + pub block_time: Duration, +} + +/// Hook invoked after a flashblock or fallback candidate has been sealed. +/// +/// Implementations should be cheap and non-blocking: dispatch happens on the +/// builder's hot path. Errors are intentionally swallowed at the dispatch site; +/// hooks that want to surface failures should do so via metrics or logs. +pub(crate) trait PostSealHook: Send + Sync + std::fmt::Debug { + fn on_sealed(&self, candidate: &SealedCandidate, slot: &SlotMeta); +} diff --git a/crates/op-rbuilder/src/builder/hooks/ws.rs b/crates/op-rbuilder/src/builder/hooks/ws.rs new file mode 100644 index 00000000..5b33ccfc --- /dev/null +++ b/crates/op-rbuilder/src/builder/hooks/ws.rs @@ -0,0 +1,79 @@ +use crate::{ + builder::{ + hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta}, + timing::compute_slot_offset_ms, + wspub::WebSocketPublisher, + }, + metrics::{OpRBuilderMetrics, record_flashblock_publish_timing}, +}; +use std::sync::Arc; +use tracing::{debug, warn}; + +/// Publishes the flashblock payload to WebSocket subscribers, record metrics. +/// +/// Suppressed when `SlotMeta::no_tx_pool` is true. +pub(crate) struct WsHook { + ws_pub: Arc, + metrics: Arc, + enable_tx_tracking_debug_logs: bool, +} + +impl std::fmt::Debug for WsHook { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WsHook").finish_non_exhaustive() + } +} + +impl WsHook { + pub(crate) fn new( + ws_pub: Arc, + metrics: Arc, + enable_tx_tracking_debug_logs: bool, + ) -> Self { + Self { + ws_pub, + metrics, + enable_tx_tracking_debug_logs, + } + } +} + +impl PostSealHook for WsHook { + fn on_sealed(&self, candidate: &SealedCandidate, slot: &SlotMeta) { + if slot.no_tx_pool { + return; + } + + let byte_size = match self.ws_pub.publish(&candidate.fb_payload) { + Ok(size) => size, + Err(e) => { + warn!( + target: "payload_builder", + error = %e, + flashblock_index = candidate.fb_payload.index, + "Failed to publish flashblock via websocket" + ); + return; + } + }; + + let slot_offset_ms = compute_slot_offset_ms(slot.slot_timestamp_secs, slot.block_time); + record_flashblock_publish_timing(candidate.fb_payload.index, slot_offset_ms); + self.metrics + .flashblock_byte_size_histogram + .record(byte_size as f64); + + if self.enable_tx_tracking_debug_logs { + debug!( + target: "tx_trace", + payload_id = %slot.payload_id, + block_number = candidate.payload.block().header().number, + flashblock_index = candidate.fb_payload.index, + byte_size, + total_txs = candidate.payload.block().body().transactions.len(), + slot_offset_ms, + stage = "fb_published" + ); + } + } +} diff --git a/crates/op-rbuilder/src/builder/mod.rs b/crates/op-rbuilder/src/builder/mod.rs index dfffe3c0..d4fcd386 100644 --- a/crates/op-rbuilder/src/builder/mod.rs +++ b/crates/op-rbuilder/src/builder/mod.rs @@ -17,6 +17,7 @@ mod config; mod context; mod flashblocks_builder_tx; mod generator; +mod hooks; mod p2p; mod payload; mod payload_handler; diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index f6d4cf7c..8f4dc261 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -7,12 +7,16 @@ use crate::{ cancellation::FlashblockJobCancellation, context::{OpPayloadBuilderCtx, OpPayloadJobCtx}, generator::{BuildArguments, PayloadBuilder}, - timing::{FlashblockScheduler, compute_slot_offset_ms}, + hooks::{ + ChannelHook, MetricsHook, PostSealHook, SealedCandidate, SlotMeta, WsHook, + dispatch_post_seal, + }, + timing::FlashblockScheduler, }, evm::OpBlockEvmFactory, hardforks::ActiveHardforks, limiter::AddressLimiter, - metrics::{OpRBuilderMetrics, record_flashblock_publish_timing}, + metrics::OpRBuilderMetrics, primitives::reth::ExecutionInfo, runtime_ext::RuntimeExt, tokio_metrics::FlashblocksTaskMetrics, @@ -94,7 +98,7 @@ struct FallbackBuildOutput { struct FlashblockBuildOutput { ctx: OpPayloadJobCtx, - build_result: eyre::Result>, + build_result: eyre::Result>, cache: Cache, transition: Transition, tx_tracker: FlashblockTxTracker, @@ -103,6 +107,14 @@ struct FlashblockBuildOutput { state_root_calc: StateRootCalculator, } +/// Output of a successful `build_next_flashblock`. +struct FlashblockBuildResult { + next_fb_state: FlashblocksState, + new_payload: OpBuiltPayload, + fb_payload: OpFlashblockPayload, + build_duration: core::time::Duration, +} + impl FlashblocksState { fn new(target_flashblock_count: u64) -> Self { Self { @@ -248,15 +260,8 @@ pub(super) struct OpPayloadBuilderInner { pool: Pool, /// Node client client: Client, - /// Sender for sending built flashblock payloads to [`PayloadHandler`], - /// which broadcasts outgoing flashblock payloads via p2p. - built_fb_payload_tx: mpsc::Sender, - /// Sender for sending built full block payloads to [`PayloadHandler`], - /// which updates the engine tree state. - built_payload_tx: mpsc::Sender, - /// WebSocket publisher for broadcasting flashblocks - /// to all connected subscribers. - ws_pub: WebSocketPublisher, + /// Hooks dispatched after each sealed candidate (fallback or flashblock). + post_seal_hooks: Vec>, /// System configuration for the builder config: BuilderConfig, /// The end of builder transaction type @@ -310,7 +315,7 @@ where da_config: config.da_config.clone(), gas_limit_config: config.gas_limit_config.clone(), chain_spec: client.chain_spec(), - metrics, + metrics: Arc::clone(&metrics), max_gas_per_txn: config.max_gas_per_txn, max_uncompressed_block_size: config.max_uncompressed_block_size, address_limiter, @@ -320,14 +325,25 @@ where disable_state_root: config.flashblocks_config.disable_state_root, enable_incremental_state_root: config.flashblocks_config.enable_incremental_state_root, }); + + let ws_pub = Arc::new(ws_pub); + let post_seal_hooks: Vec> = vec![ + Box::new(WsHook::new( + Arc::clone(&ws_pub), + Arc::clone(&metrics), + config.enable_tx_tracking_debug_logs, + )), + Box::new(ChannelHook::new("p2p", built_fb_payload_tx)), + Box::new(ChannelHook::new("engine", built_payload_tx)), + Box::new(MetricsHook::new(Arc::clone(&metrics))), + ]; + Self { inner: Arc::new(OpPayloadBuilderInner { builder_ctx, pool, client, - built_fb_payload_tx, - built_payload_tx, - ws_pub, + post_seal_hooks, config, builder_tx, task_metrics, @@ -494,16 +510,18 @@ where fb_state = returned_fb_state; state_root_calc = returned_state_root_calc; - self.built_fb_payload_tx - .try_send(payload.clone()) - .map_err(PayloadBuilderError::other)?; - if let Err(e) = self.built_payload_tx.try_send(payload.clone()) { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } + let candidate = SealedCandidate { + payload: payload.clone(), + fb_payload: fb_payload.clone(), + build_duration: None, + }; + let slot = SlotMeta { + payload_id: ctx.payload_id(), + no_tx_pool: ctx.attributes().no_tx_pool, + slot_timestamp_secs: config.attributes.timestamp(), + block_time: self.config.block_time, + }; + dispatch_post_seal(&self.post_seal_hooks, &candidate, &slot); best_payload_tx.send_replace(Some(payload)); info!( @@ -512,34 +530,6 @@ where "Fallback block built" ); - // not emitting flashblock if no_tx_pool in FCU, it's just syncing - if !ctx.attributes().no_tx_pool { - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .map_err(PayloadBuilderError::other)?; - - let slot_offset_ms = - compute_slot_offset_ms(config.attributes.timestamp(), self.config.block_time); - record_flashblock_publish_timing(fb_payload.index, slot_offset_ms); - - if self.config.enable_tx_tracking_debug_logs { - debug!( - target: "tx_trace", - payload_id = %ctx.payload_id(), - block_number = ctx.block_number(), - flashblock_index = fb_payload.index, - byte_size = flashblock_byte_size, - total_txs = info.executed_transactions.len(), - slot_offset_ms, - stage = "fb_published" - ); - } - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - } - if ctx.attributes().no_tx_pool { info!( target: "payload_builder", @@ -802,9 +792,28 @@ where } let next_flashblock_state = match build_result { - Ok(Some((next_flashblock_state, new_payload))) => { + Ok(Some(result)) => { + let FlashblockBuildResult { + next_fb_state, + new_payload, + fb_payload: built_fb_payload, + build_duration, + } = result; + + let candidate = SealedCandidate { + payload: new_payload.clone(), + fb_payload: built_fb_payload, + build_duration: Some(build_duration), + }; + let slot = SlotMeta { + payload_id: ctx.payload_id(), + no_tx_pool: ctx.attributes().no_tx_pool, + slot_timestamp_secs: ctx.attributes().timestamp(), + block_time: self.config.block_time, + }; + dispatch_post_seal(&self.post_seal_hooks, &candidate, &slot); best_payload_tx.send_replace(Some(new_payload)); - next_flashblock_state + next_fb_state } Ok(None) => { Self::record_cancellation_reason( @@ -920,7 +929,7 @@ where best_txs: &mut NextFlashblockPoolTxCursor<'a, Pool>, block_cancel: &CancellationToken, state_root_calc: &mut StateRootCalculator, - ) -> eyre::Result> { + ) -> eyre::Result> { let flashblock_index = fb_state.flashblock_index(); let mut target_gas_for_batch = fb_state.target_gas_for_batch(); let mut target_da_for_batch = fb_state.target_da_for_batch(); @@ -1066,54 +1075,17 @@ where if block_cancel.is_cancelled() { return Ok(None); } - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .wrap_err("failed to publish flashblock via websocket")?; - - // Record slot-relative publish timing (ms since slot start) - let slot_offset_ms = - compute_slot_offset_ms(ctx.attributes().timestamp(), self.config.block_time); - record_flashblock_publish_timing(flashblock_index, slot_offset_ms); - - if self.config.enable_tx_tracking_debug_logs { - debug!( - target: "tx_trace", - payload_id = %ctx.payload_id(), - block_number = ctx.block_number(), - flashblock_index, - byte_size = flashblock_byte_size, - total_txs = info.executed_transactions.len(), - slot_offset_ms, - stage = "fb_published" - ); - } - self.built_fb_payload_tx - .try_send(new_payload.clone()) - .wrap_err("failed to send built payload to handler")?; - if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } - // Record flashblock build duration - ctx.metrics - .flashblock_build_duration - .record(flashblock_build_start_time.elapsed()); - ctx.metrics - .flashblock_byte_size_histogram - .record(flashblock_byte_size as f64); - ctx.metrics - .flashblock_num_tx_histogram - .record(info.executed_transactions.len() as f64); // Advance batch budgets for the next flashblock. let next_flashblock_state = fb_state.next_after_seal(target_da_for_batch, target_da_footprint_for_batch); - Ok(Some((next_flashblock_state, new_payload))) + Ok(Some(FlashblockBuildResult { + next_fb_state: next_flashblock_state, + new_payload, + fb_payload, + build_duration: flashblock_build_start_time.elapsed(), + })) } } }