Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions crates/op-rbuilder/src/builder/hooks/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::builder::hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta};
use reth_optimism_node::OpBuiltPayload;
use tokio::sync::mpsc;
use tracing::warn;

/// Forwards each sealed candidate over a named mpsc channel.
#[derive(Debug)]
pub(crate) struct ChannelHook {
name: &'static str,
sender: mpsc::Sender<OpBuiltPayload>,
}

impl ChannelHook {
pub(crate) fn new(name: &'static str, sender: mpsc::Sender<OpBuiltPayload>) -> Self {
Self { name, sender }
}
}

impl PostSealHook for ChannelHook {
fn on_sealed(&self, candidate: &SealedCandidate, _slot: &SlotMeta) {
if let Err(e) = self.sender.try_send(candidate.payload.clone()) {
warn!(
target: "payload_builder",
channel = self.name,
error = %e,
flashblock_index = candidate.fb_payload.index,
"Failed to forward sealed payload over channel"
);
}
}
}
29 changes: 29 additions & 0 deletions crates/op-rbuilder/src/builder/hooks/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::{
builder::hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta},
metrics::OpRBuilderMetrics,
};
use std::sync::Arc;

/// Records per-flashblock metrics that aren't tied to publication:
/// build duration and transaction-count histogram.
#[derive(Debug)]
pub(crate) struct MetricsHook {
metrics: Arc<OpRBuilderMetrics>,
}

impl MetricsHook {
pub(crate) fn new(metrics: Arc<OpRBuilderMetrics>) -> Self {
Self { metrics }
}
}

impl PostSealHook for MetricsHook {
fn on_sealed(&self, candidate: &SealedCandidate, _slot: &SlotMeta) {
if let Some(duration) = candidate.build_duration {
self.metrics.flashblock_build_duration.record(duration);
}
self.metrics
.flashblock_num_tx_histogram
.record(candidate.payload.block().body().transactions.len() as f64);
}
}
30 changes: 30 additions & 0 deletions crates/op-rbuilder/src/builder/hooks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//! Lifecycle hooks dispatched from the building loop.
//!
//! [`PostSealHook`] fires after each sealed candidate (fallback or
//! flashblock) and isolates downstream side-effects — WS publication, p2p
//! broadcast, engine propagation, metrics — from the building loop itself.
//! Concrete implementations live alongside this module.

mod channel;
mod metrics;
mod post_seal;
mod ws;

pub(super) use channel::ChannelHook;
pub(super) use metrics::MetricsHook;
pub(super) use post_seal::{PostSealHook, SealedCandidate, SlotMeta};
pub(super) use ws::WsHook;

/// Dispatch a sealed candidate to every hook in `hooks`.
///
/// Hook impls are expected to be cheap; we run them sequentially in the
/// caller's context (sync, including from within `spawn_blocking`).
pub(super) fn dispatch_post_seal(
hooks: &[Box<dyn PostSealHook>],
candidate: &SealedCandidate,
slot: &SlotMeta,
) {
for h in hooks {
h.on_sealed(candidate, slot);
}
}
38 changes: 38 additions & 0 deletions crates/op-rbuilder/src/builder/hooks/post_seal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use core::time::Duration;
use op_alloy_rpc_types_engine::OpFlashblockPayload;
use reth_optimism_node::OpBuiltPayload;
use reth_payload_builder::PayloadId;

/// A flashblock candidate that has been sealed and is ready for publication
/// and downstream propagation.
///
/// `payload` is the full built payload for engine/p2p delivery; `fb_payload`
/// is the slim, serialisable view streamed to flashblocks subscribers.
/// `build_duration` is the wall-clock time spent building this flashblock,
/// or `None` for the fallback candidate (no incremental build step).
#[derive(Debug, Clone)]
pub(crate) struct SealedCandidate {
pub payload: OpBuiltPayload,
pub fb_payload: OpFlashblockPayload,
pub build_duration: Option<Duration>,
}

/// Slot-level metadata for a given building slot.
#[derive(Debug, Clone)]
pub(crate) struct SlotMeta {
pub payload_id: PayloadId,
/// True when the FCU specified `no_tx_pool`.
pub no_tx_pool: bool,
/// Slot start timestamp from the payload attributes.
pub slot_timestamp_secs: u64,
pub block_time: Duration,
}

/// Hook invoked after a flashblock or fallback candidate has been sealed.
///
/// Implementations should be cheap and non-blocking: dispatch happens on the
/// builder's hot path. Errors are intentionally swallowed at the dispatch site;
/// hooks that want to surface failures should do so via metrics or logs.
pub(crate) trait PostSealHook: Send + Sync + std::fmt::Debug {
fn on_sealed(&self, candidate: &SealedCandidate, slot: &SlotMeta);
}
79 changes: 79 additions & 0 deletions crates/op-rbuilder/src/builder/hooks/ws.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use crate::{
builder::{
hooks::post_seal::{PostSealHook, SealedCandidate, SlotMeta},
timing::compute_slot_offset_ms,
wspub::WebSocketPublisher,
},
metrics::{OpRBuilderMetrics, record_flashblock_publish_timing},
};
use std::sync::Arc;
use tracing::{debug, warn};

/// Publishes the flashblock payload to WebSocket subscribers, record metrics.
///
/// Suppressed when `SlotMeta::no_tx_pool` is true.
pub(crate) struct WsHook {
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<OpRBuilderMetrics>,
enable_tx_tracking_debug_logs: bool,
}

impl std::fmt::Debug for WsHook {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WsHook").finish_non_exhaustive()
}
}

impl WsHook {
pub(crate) fn new(
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<OpRBuilderMetrics>,
enable_tx_tracking_debug_logs: bool,
) -> Self {
Self {
ws_pub,
metrics,
enable_tx_tracking_debug_logs,
}
}
}

impl PostSealHook for WsHook {
fn on_sealed(&self, candidate: &SealedCandidate, slot: &SlotMeta) {
if slot.no_tx_pool {
return;
}

let byte_size = match self.ws_pub.publish(&candidate.fb_payload) {
Ok(size) => size,
Err(e) => {
warn!(
target: "payload_builder",
error = %e,
flashblock_index = candidate.fb_payload.index,
"Failed to publish flashblock via websocket"
);
return;
}
};

let slot_offset_ms = compute_slot_offset_ms(slot.slot_timestamp_secs, slot.block_time);
record_flashblock_publish_timing(candidate.fb_payload.index, slot_offset_ms);
self.metrics
.flashblock_byte_size_histogram
.record(byte_size as f64);

if self.enable_tx_tracking_debug_logs {
debug!(
target: "tx_trace",
payload_id = %slot.payload_id,
block_number = candidate.payload.block().header().number,
flashblock_index = candidate.fb_payload.index,
byte_size,
total_txs = candidate.payload.block().body().transactions.len(),
slot_offset_ms,
stage = "fb_published"
);
}
}
}
1 change: 1 addition & 0 deletions crates/op-rbuilder/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod config;
mod context;
mod flashblocks_builder_tx;
mod generator;
mod hooks;
mod p2p;
mod payload;
mod payload_handler;
Expand Down
Loading
Loading