From e008127998fd1b5ba4c4a8c0678f1781280d727a Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 23 Dec 2025 18:37:55 +0000 Subject: [PATCH 1/5] add revert to l1 block rpc --- crates/chain-orchestrator/src/event.rs | 2 + .../chain-orchestrator/src/handle/command.rs | 2 + crates/chain-orchestrator/src/handle/mod.rs | 10 ++ crates/chain-orchestrator/src/lib.rs | 33 ++++- crates/database/db/src/operations.rs | 1 + crates/node/src/add_ons/handle.rs | 6 +- crates/node/src/add_ons/rollup.rs | 12 +- crates/node/src/add_ons/rpc.rs | 26 ++++ crates/node/src/args.rs | 76 +++++----- crates/node/src/test_utils/event_utils.rs | 9 ++ crates/node/src/test_utils/fixture.rs | 16 +-- crates/node/src/test_utils/l1_helpers.rs | 16 ++- crates/node/tests/e2e.rs | 131 ++++++++++++++++-- crates/node/tests/sync.rs | 26 ++-- crates/sequencer/tests/e2e.rs | 4 +- crates/watcher/src/handle/command.rs | 15 ++ crates/watcher/src/handle/mod.rs | 46 ++++++ crates/watcher/src/lib.rs | 70 +++++++--- crates/watcher/tests/indexing.rs | 2 +- crates/watcher/tests/logs.rs | 3 +- crates/watcher/tests/reorg.rs | 30 ++-- 21 files changed, 417 insertions(+), 119 deletions(-) create mode 100644 crates/watcher/src/handle/command.rs create mode 100644 crates/watcher/src/handle/mod.rs diff --git a/crates/chain-orchestrator/src/event.rs b/crates/chain-orchestrator/src/event.rs index e1a487db..027dc118 100644 --- a/crates/chain-orchestrator/src/event.rs +++ b/crates/chain-orchestrator/src/event.rs @@ -77,6 +77,8 @@ pub enum ChainOrchestratorEvent { /// The L2 safe block info. l2_safe_block_info: Option, }, + /// The chain has been unwound to the specified L1 block number. + UnwoundToL1Block(u64), /// The chain orchestrator has synced to the L1 head. L1Synced, /// An L2 block has been committed returning the [`L2BlockInfoWithL1Messages`] and an diff --git a/crates/chain-orchestrator/src/handle/command.rs b/crates/chain-orchestrator/src/handle/command.rs index 03ed97ac..ce2d8cb9 100644 --- a/crates/chain-orchestrator/src/handle/command.rs +++ b/crates/chain-orchestrator/src/handle/command.rs @@ -27,6 +27,8 @@ pub enum ChainOrchestratorCommand), /// Send a database query to the rollup manager. DatabaseQuery(DatabaseQuery), + /// Revert the rollup node state to the specified L1 block number. + RevertToL1Block((u64, oneshot::Sender)), /// Enable gossiping of blocks to peers. #[cfg(feature = "test-utils")] SetGossip((bool, oneshot::Sender<()>)), diff --git a/crates/chain-orchestrator/src/handle/mod.rs b/crates/chain-orchestrator/src/handle/mod.rs index b62ee195..ea8477a9 100644 --- a/crates/chain-orchestrator/src/handle/mod.rs +++ b/crates/chain-orchestrator/src/handle/mod.rs @@ -103,6 +103,16 @@ impl> ChainOrchestratorHand rx.await } + /// Revert the rollup node state to the specified L1 block number. + pub async fn revert_to_l1_block( + &self, + block_number: u64, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.send_command(ChainOrchestratorCommand::RevertToL1Block((block_number, tx))); + rx.await + } + /// Sends a command to the rollup manager to enable or disable gossiping of blocks to peers. #[cfg(feature = "test-utils")] pub async fn set_gossip(&self, enabled: bool) -> Result<(), oneshot::error::RecvError> { diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index be1a13f4..912181db 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -20,7 +20,7 @@ use rollup_node_primitives::{ use rollup_node_providers::L1MessageProvider; use rollup_node_sequencer::{Sequencer, SequencerEvent}; use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle}; -use rollup_node_watcher::L1Notification; +use rollup_node_watcher::{L1Notification, L1WatcherHandle}; use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; @@ -35,7 +35,7 @@ use scroll_network::{ BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent, }; use std::{collections::VecDeque, sync::Arc, time::Instant, vec}; -use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver}; +use tokio::sync::mpsc::{self, UnboundedReceiver}; mod config; pub use config::ChainOrchestratorConfig; @@ -115,8 +115,8 @@ pub struct ChainOrchestrator< database: Arc, /// The current sync state of the [`ChainOrchestrator`]. sync_state: SyncState, - /// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`]. - l1_notification_rx: Receiver>, + /// A handle for the [`rollup_node_watcher::L1Watcher`]. + l1_watcher: L1WatcherHandle, /// The network manager that manages the scroll p2p network. network: ScrollNetwork, /// The consensus algorithm used by the rollup node. @@ -150,7 +150,7 @@ impl< config: ChainOrchestratorConfig, block_client: Arc::Client>>, l2_provider: L2P, - l1_notification_rx: Receiver>, + l1_watcher: L1WatcherHandle, network: ScrollNetwork, consensus: Box, engine: Engine, @@ -167,7 +167,7 @@ impl< database, config, sync_state: SyncState::default(), - l1_notification_rx, + l1_watcher, network, consensus, engine, @@ -224,7 +224,7 @@ impl< let res = self.handle_network_event(event).await; self.handle_outcome(res); } - Some(notification) = self.l1_notification_rx.recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => { + Some(notification) = self.l1_watcher.l1_notification_receiver().recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => { let res = self.handle_l1_notification(notification).await; self.handle_outcome(res); } @@ -401,6 +401,25 @@ impl< let _ = sender.send(l1_message); } }, + ChainOrchestratorCommand::RevertToL1Block((block_number, tx)) => { + self.sync_state.l1_mut().set_syncing(); + let unwind_result = self.database.unwind(block_number).await?; + + println!("Unwind result: {:?}", unwind_result); + + // Check if the unwind impacts the fcs safe head. + if let Some(block_info) = unwind_result.l2_safe_block_info { + // If the safe head was unwound and is above or equal to the finalized head, + // update the fcs. + if block_info.number != self.engine.fcs().safe_block_info().number && + block_info.number >= self.engine.fcs().finalized_block_info().number + { + self.engine.update_fcs(None, Some(block_info), None).await?; + } + } + self.notify(ChainOrchestratorEvent::UnwoundToL1Block(block_number)); + let _ = tx.send(true); + } #[cfg(feature = "test-utils")] ChainOrchestratorCommand::SetGossip((enabled, tx)) => { self.network.handle().set_gossip(enabled).await; diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index 0d6cfa21..f28887bd 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -852,6 +852,7 @@ impl DatabaseWriteOperations for T { // delete batch commits, l1 messages and batch finalization effects greater than the // provided l1 block number let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?; + println!("Deleted {} batches", batches_removed); let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?; self.delete_batch_finalization_gt_block_number(l1_block_number).await?; let batch_reverts_removed: u64 = diff --git a/crates/node/src/add_ons/handle.rs b/crates/node/src/add_ons/handle.rs index a293e6b5..6baa7018 100644 --- a/crates/node/src/add_ons/handle.rs +++ b/crates/node/src/add_ons/handle.rs @@ -1,11 +1,11 @@ +#[cfg(feature = "test-utils")] +use crate::test_utils::l1_helpers::L1WatcherMock; use reth_network_api::FullNetwork; use reth_node_api::FullNodeComponents; use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider}; use reth_rpc_eth_api::EthApiTypes; use reth_scroll_node::ScrollNetworkPrimitives; use rollup_node_chain_orchestrator::ChainOrchestratorHandle; -#[cfg(feature = "test-utils")] -use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender}; /// A handle for scroll addons, which includes handles for the rollup manager and RPC server. #[derive(Debug, Clone)] @@ -19,7 +19,7 @@ pub struct ScrollAddOnsHandle< pub rpc_handle: RpcHandle, /// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`. #[cfg(feature = "test-utils")] - pub l1_watcher_tx: Option>>, + pub l1_watcher_tx: Option, } impl< diff --git a/crates/node/src/add_ons/rollup.rs b/crates/node/src/add_ons/rollup.rs index 93e88461..b6a37989 100644 --- a/crates/node/src/add_ons/rollup.rs +++ b/crates/node/src/add_ons/rollup.rs @@ -1,4 +1,4 @@ -use crate::args::ScrollRollupNodeConfig; +use crate::{args::ScrollRollupNodeConfig, test_utils::l1_helpers::L1WatcherMock}; use reth_chainspec::NamedChain; use reth_network::NetworkProtocols; @@ -9,11 +9,9 @@ use reth_rpc_eth_api::EthApiTypes; use reth_scroll_chainspec::{ChainConfig, ScrollChainConfig, ScrollChainSpec}; use reth_scroll_node::ScrollNetworkPrimitives; use rollup_node_chain_orchestrator::ChainOrchestratorHandle; -use rollup_node_watcher::L1Notification; use scroll_alloy_hardforks::ScrollHardforks; use scroll_wire::ScrollWireEvent; -use std::sync::Arc; -use tokio::sync::mpsc::{Sender, UnboundedReceiver}; +use tokio::sync::mpsc::UnboundedReceiver; /// Implementing the trait allows the type to return whether it is configured for dev chain. #[auto_impl::auto_impl(Arc)] @@ -55,13 +53,13 @@ impl RollupManagerAddOn { self, ctx: AddOnsContext<'_, N>, rpc: RpcHandle, - ) -> eyre::Result<(ChainOrchestratorHandle, Option>>)> + ) -> eyre::Result<(ChainOrchestratorHandle, Option)> where <::Types as NodeTypes>::ChainSpec: ChainConfig + ScrollHardforks + IsDevChain, N::Network: NetworkProtocols + FullNetwork, { - let (chain_orchestrator, handle, l1_notification_tx) = self + let (chain_orchestrator, handle, l1_watcher_mock) = self .config .build((&ctx).into(), self.scroll_wire_event, rpc.rpc_server_handles) .await?; @@ -70,6 +68,6 @@ impl RollupManagerAddOn { .spawn_critical_with_shutdown_signal("rollup_node_manager", |shutdown| { chain_orchestrator.run_until_shutdown(shutdown) }); - Ok((handle, l1_notification_tx)) + Ok((handle, l1_watcher_mock)) } } diff --git a/crates/node/src/add_ons/rpc.rs b/crates/node/src/add_ons/rpc.rs index 4d0c8226..1340ba89 100644 --- a/crates/node/src/add_ons/rpc.rs +++ b/crates/node/src/add_ons/rpc.rs @@ -115,6 +115,10 @@ pub trait RollupNodeAdminApi { /// Disables automatic sequencing in the rollup node. #[method(name = "disableAutomaticSequencing")] async fn disable_automatic_sequencing(&self) -> RpcResult; + + /// Reverts the rollup node state to a specified L1 block number. + #[method(name = "revertToL1Block")] + async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult; } #[async_trait] @@ -220,6 +224,24 @@ where ) }) } + + async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult { + let handle = self.rollup_manager_handle().await.map_err(|e| { + ErrorObjectOwned::owned( + error::INTERNAL_ERROR_CODE, + format!("Failed to get rollup manager handle: {}", e), + None::<()>, + ) + })?; + + handle.revert_to_l1_block(block_number).await.map_err(|e| { + ErrorObjectOwned::owned( + error::INTERNAL_ERROR_CODE, + format!("Failed to revert to L1 block {}: {}", block_number, e), + None::<()>, + ) + }) + } } // Implement RollupNodeApiServer for Arc> to allow shared ownership @@ -257,4 +279,8 @@ where async fn disable_automatic_sequencing(&self) -> RpcResult { (**self).disable_automatic_sequencing().await } + + async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult { + (**self).revert_to_l1_block(block_number).await + } } diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index a5f2c71d..a5536ff6 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -6,6 +6,7 @@ use crate::{ use scroll_migration::MigratorTrait; use std::{fs, path::PathBuf, sync::Arc}; +use super::test_utils::l1_helpers::L1WatcherMock; use alloy_chains::NamedChain; use alloy_primitives::{hex, Address, U128}; use alloy_provider::{layers::CacheLayer, Provider, ProviderBuilder}; @@ -38,7 +39,7 @@ use rollup_node_providers::{ use rollup_node_sequencer::{ L1MessageInclusionMode, PayloadBuildingConfig, Sequencer, SequencerConfig, }; -use rollup_node_watcher::{L1Notification, L1Watcher}; +use rollup_node_watcher::{L1Watcher, L1WatcherHandle}; use scroll_alloy_hardforks::ScrollHardforks; use scroll_alloy_network::Scroll; use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; @@ -51,7 +52,7 @@ use scroll_engine::{Engine, ForkchoiceState}; use scroll_migration::traits::ScrollMigrator; use scroll_network::ScrollNetworkManager; use scroll_wire::ScrollWireEvent; -use tokio::sync::mpsc::{Sender, UnboundedReceiver}; +use tokio::sync::mpsc::UnboundedReceiver; /// A struct that represents the arguments for the rollup node. #[derive(Debug, Clone, clap::Args)] @@ -166,7 +167,7 @@ impl ScrollRollupNodeConfig { impl ScrollEngineApi, >, ChainOrchestratorHandle, - Option>>, + Option, )> where N: FullNetwork + NetworkProtocols, @@ -350,35 +351,42 @@ impl ScrollRollupNodeConfig { }; let consensus = self.consensus_args.consensus(authorized_signer)?; - let (l1_notification_tx, l1_notification_rx): (Option>>, _) = - if let Some(provider) = l1_provider.filter(|_| !self.test) { - tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher"); - ( - None, - Some( - L1Watcher::spawn( - provider, - l1_block_startup_info, - node_config, - self.l1_provider_args.logs_query_block_range, - ) - .await, - ), - ) - } else { - // Create a channel for L1 notifications that we can use to inject L1 messages for - // testing - #[cfg(feature = "test-utils")] - { - let (tx, rx) = tokio::sync::mpsc::channel(1000); - (Some(tx), Some(rx)) - } - - #[cfg(not(feature = "test-utils"))] - { - (None, None) - } - }; + let (l1_watcher_mock, l1_watcher_handle) = if let Some(provider) = + l1_provider.filter(|_| !self.test) + { + tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher"); + ( + None, + Some( + L1Watcher::spawn( + provider, + l1_block_startup_info, + node_config, + self.l1_provider_args.logs_query_block_range, + ) + .await, + ), + ) + } else { + // Create a channel for L1 notifications that we can use to inject L1 messages for + // testing + #[cfg(feature = "test-utils")] + { + let (notification_tx, notification_rx) = tokio::sync::mpsc::channel(1000); + let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = L1WatcherHandle::new(command_tx, notification_rx); + let watcher_mock = L1WatcherMock { + command_rx: Arc::new(tokio::sync::Mutex::new(command_rx)), + notification_tx, + }; + (Some(watcher_mock), Some(handle)) + } + + #[cfg(not(feature = "test-utils"))] + { + (None, None) + } + }; // Construct the l1 provider. let l1_messages_provider = db.clone(); @@ -457,7 +465,7 @@ impl ScrollRollupNodeConfig { config, Arc::new(block_client), l2_provider, - l1_notification_rx.expect("L1 notification receiver should be set"), + l1_watcher_handle.expect("L1 notification receiver should be set"), scroll_network_handle.into_scroll_network().await, consensus, engine, @@ -467,7 +475,7 @@ impl ScrollRollupNodeConfig { ) .await?; - Ok((chain_orchestrator, handle, l1_notification_tx)) + Ok((chain_orchestrator, handle, l1_watcher_mock)) } } diff --git a/crates/node/src/test_utils/event_utils.rs b/crates/node/src/test_utils/event_utils.rs index 9d7b7da1..2a08ad24 100644 --- a/crates/node/src/test_utils/event_utils.rs +++ b/crates/node/src/test_utils/event_utils.rs @@ -130,6 +130,15 @@ impl<'a> EventWaiter<'a> { Ok(()) } + /// Wait for chain unwound event on all specified nodes. + pub async fn revert_to_l1_block(self) -> eyre::Result<()> { + self.wait_for_event_on_all(|e| { + matches!(e, ChainOrchestratorEvent::UnwoundToL1Block(_)).then_some(()) + }) + .await?; + Ok(()) + } + /// Wait for block consolidated event on all specified nodes. pub async fn block_consolidated(self, target_block: u64) -> eyre::Result<()> { self.wait_for_event_on_all(|e| { diff --git a/crates/node/src/test_utils/fixture.rs b/crates/node/src/test_utils/fixture.rs index 5616aca4..b0946258 100644 --- a/crates/node/src/test_utils/fixture.rs +++ b/crates/node/src/test_utils/fixture.rs @@ -4,9 +4,10 @@ use super::{ block_builder::BlockBuilder, l1_helpers::L1Helper, setup_engine, tx_helpers::TxHelper, }; use crate::{ - BlobProviderArgs, ChainOrchestratorArgs, ConsensusAlgorithm, ConsensusArgs, EngineDriverArgs, - L1ProviderArgs, RollupNodeDatabaseArgs, RollupNodeGasPriceOracleArgs, RollupNodeNetworkArgs, - RpcArgs, ScrollRollupNode, ScrollRollupNodeConfig, SequencerArgs, SignerArgs, + test_utils::l1_helpers::L1WatcherMock, BlobProviderArgs, ChainOrchestratorArgs, + ConsensusAlgorithm, ConsensusArgs, EngineDriverArgs, L1ProviderArgs, RollupNodeDatabaseArgs, + RollupNodeGasPriceOracleArgs, RollupNodeNetworkArgs, RpcArgs, ScrollRollupNode, + ScrollRollupNodeConfig, SequencerArgs, SignerArgs, }; use alloy_eips::BlockNumberOrTag; @@ -27,7 +28,6 @@ use reth_tokio_util::EventStream; use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle}; use rollup_node_primitives::BlockInfo; use rollup_node_sequencer::L1MessageInclusionMode; -use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::ScrollPooledTransaction; use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi}; use scroll_alloy_rpc_types::Transaction; @@ -37,7 +37,7 @@ use std::{ path::PathBuf, sync::Arc, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::Mutex; /// Main test fixture providing a high-level interface for testing rollup nodes. #[derive(Debug)] @@ -76,7 +76,7 @@ pub struct NodeHandle { /// Engine instance for this node. pub engine: Engine>, /// L1 watcher notification channel. - pub l1_watcher_tx: Option>>, + pub l1_watcher_tx: Option, /// Chain orchestrator listener. pub chain_orchestrator_rx: EventStream, /// Chain orchestrator handle. @@ -436,7 +436,7 @@ impl TestFixtureBuilder { .await?; let mut node_handles = Vec::with_capacity(nodes.len()); - for (index, node) in nodes.into_iter().enumerate() { + for (index, mut node) in nodes.into_iter().enumerate() { let genesis_hash = node.inner.chain_spec().genesis_hash(); // Create engine for the node @@ -451,7 +451,7 @@ impl TestFixtureBuilder { let engine = Engine::new(Arc::new(engine_client), fcs); // Get handles if available - let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.clone(); + let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.take(); let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); let chain_orchestrator_rx = node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; diff --git a/crates/node/src/test_utils/l1_helpers.rs b/crates/node/src/test_utils/l1_helpers.rs index d6c858cf..65e62c9e 100644 --- a/crates/node/src/test_utils/l1_helpers.rs +++ b/crates/node/src/test_utils/l1_helpers.rs @@ -5,8 +5,18 @@ use std::{fmt::Debug, str::FromStr, sync::Arc}; use alloy_primitives::{Address, Bytes, B256, U256}; use rollup_node_primitives::{BatchCommitData, BlockInfo, ConsensusUpdate}; -use rollup_node_watcher::L1Notification; +use rollup_node_watcher::{L1Notification, L1WatcherCommand}; use scroll_alloy_consensus::TxL1Message; +use tokio::sync::{mpsc, Mutex}; + +/// Mock for the L1 Watcher. +#[derive(Clone, Debug)] +pub struct L1WatcherMock { + /// Receiver for L1 watcher commands. + pub command_rx: Arc>>, + /// Sender for L1 notifications. + pub notification_tx: mpsc::Sender>, +} /// Helper for managing L1 interactions in tests. #[derive(Debug)] @@ -117,7 +127,7 @@ impl<'a> L1Helper<'a> { for node in nodes { if let Some(tx) = &node.l1_watcher_tx { - tx.send(notification.clone()).await?; + tx.notification_tx.send(notification.clone()).await?; } } @@ -223,7 +233,7 @@ impl<'a> L1MessageBuilder<'a> { for node in nodes { if let Some(tx) = &node.l1_watcher_tx { - tx.send(notification.clone()).await?; + tx.notification_tx.send(notification.clone()).await?; } } diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index fd50270f..303f93b6 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -24,7 +24,7 @@ use rollup_node::{ }, RollupNodeAdminApiClient, RollupNodeContext, }; -use rollup_node_chain_orchestrator::ChainOrchestratorEvent; +use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, SyncMode}; use rollup_node_primitives::{sig_encode_hash, BatchCommitData, BlockInfo}; use rollup_node_watcher::L1Notification; use scroll_db::{test_utils::setup_test_db, L1MessageKey}; @@ -324,7 +324,7 @@ async fn can_forward_tx_to_sequencer() -> eyre::Result<()> { // Send a notification to set the L1 to synced let sequencer_l1_watcher_tx = sequencer_node[0].inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + sequencer_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); sequencer_events.next().await; sequencer_events.next().await; @@ -466,7 +466,7 @@ async fn can_bridge_blocks() -> eyre::Result<()> { let bridge_node_l1_watcher_tx = bridge_node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); // Send a notification to set the L1 to synced - bridge_node_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + bridge_node_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); // Instantiate the scroll NetworkManager. let network_config = NetworkConfigBuilder::::with_rng_secret_key() @@ -605,8 +605,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() let mut rnm_events = handle.get_event_listener().await?; // Extract the L1 notification sender - let l1_notification_tx: tokio::sync::mpsc::Sender> = - l1_notification_tx.unwrap(); + let l1_notification_tx = l1_notification_tx.unwrap(); // Load test batches let block_0_info = BlockInfo { number: 18318207, hash: B256::random() }; @@ -636,12 +635,14 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Send the first batch commit to the rollup node manager and finalize it. l1_notification_tx + .notification_tx .send(Arc::new(L1Notification::BatchCommit { block_info: block_0_info, data: batch_0_data.clone(), })) .await?; l1_notification_tx + .notification_tx .send(Arc::new(L1Notification::BatchFinalization { hash: batch_0_data.hash, index: batch_0_data.index, @@ -650,7 +651,10 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() .await?; // Lets finalize the first batch - l1_notification_tx.send(Arc::new(L1Notification::Finalized(block_0_info.number))).await?; + l1_notification_tx + .notification_tx + .send(Arc::new(L1Notification::Finalized(block_0_info.number))) + .await?; // Lets iterate over all blocks expected to be derived from the first batch commit. let consolidation_outcome = loop { @@ -664,12 +668,14 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Now we send the second batch commit and finalize it. l1_notification_tx + .notification_tx .send(Arc::new(L1Notification::BatchCommit { block_info: block_1_info, data: batch_1_data.clone(), })) .await?; l1_notification_tx + .notification_tx .send(Arc::new(L1Notification::BatchFinalization { hash: batch_1_data.hash, index: batch_1_data.index, @@ -678,7 +684,10 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() .await?; // Lets finalize the second batch. - l1_notification_tx.send(Arc::new(L1Notification::Finalized(block_1_info.number))).await?; + l1_notification_tx + .notification_tx + .send(Arc::new(L1Notification::Finalized(block_1_info.number))) + .await?; // The second batch commit contains 42 blocks (5-57), lets iterate until the rnm has // consolidated up to block 40. @@ -760,7 +769,10 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Send the second batch again to mimic the watcher behaviour. let block_1_info = BlockInfo { number: 18318215, hash: B256::random() }; - l1_notification_tx.send(Arc::new(L1Notification::Finalized(block_1_info.number))).await?; + l1_notification_tx + .notification_tx + .send(Arc::new(L1Notification::Finalized(block_1_info.number))) + .await?; // Lets fetch the first consolidated block event - this should be the first block of the batch. let l2_block = loop { @@ -857,7 +869,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - .await?; let (_signal, shutdown) = shutdown_signal(); let mut rnm = Box::pin(rnm.run_until_shutdown(shutdown)); - let l1_watcher_tx: tokio::sync::mpsc::Sender> = l1_watcher_tx.unwrap(); + let l1_watcher_tx = l1_watcher_tx.unwrap(); // Poll the rnm until we get an event stream listener. let mut rnm_events_fut = pin!(handle.get_event_listener()); @@ -872,7 +884,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - }; // Poll the rnm until we receive the consolidate event - l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?; + l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await?; loop { let _ = rnm.poll_unpin(&mut Context::from_waker(noop_waker_ref())); if let Poll::Ready(Some(ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ })) = @@ -945,6 +957,105 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - Ok(()) } +#[tokio::test] +async fn can_revert_to_l1_block() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + // Create a follower test fixture using SCROLL_MAINNET chain spec + let mut fixture = TestFixture::builder() + .followers(1) + .with_chain_spec(SCROLL_MAINNET.clone()) + .with_memory_db() + .build() + .await?; + + // Load test batches + let batch_0_block_info = BlockInfo { number: 18318207, hash: B256::random() }; + let raw_calldata_0 = read_to_bytes("./tests/testdata/batch_0_calldata.bin")?; + let batch_0_hash = b256!("5AAEB6101A47FC16866E80D77FFE090B6A7B3CF7D988BE981646AB6AEDFA2C42"); + + let batch_1_block_info = BlockInfo { number: 18318215, hash: B256::random() }; + let raw_calldata_1 = read_to_bytes("./tests/testdata/batch_1_calldata.bin")?; + let batch_1_hash = b256!("AA8181F04F8E305328A6117FA6BC13FA2093A3C4C990C5281DF95A1CB85CA18F"); + + // Send a Synced notification to the chain orchestrator + fixture.l1().sync().await?; + + // Send the first batch + let _ = fixture + .l1() + .commit_batch() + .at_block(batch_0_block_info) + .hash(batch_0_hash) + .index(1) + .calldata(raw_calldata_0) + .send() + .await?; + + // Wait for the batch consolidated event + fixture.expect_event().batch_consolidated().await?; + + // Send the second batch + let _ = fixture + .l1() + .commit_batch() + .at_block(batch_1_block_info) + .hash(batch_1_hash) + .index(2) + .calldata(raw_calldata_1.clone()) + .send() + .await?; + + // Wait for the second batch to be consolidated + fixture.expect_event().batch_consolidated().await?; + + // Get the node status + let status = fixture.get_status(0).await?; + + // Validate the safe block number is 57 + assert_eq!(status.l2.fcs.safe_block_info().number, 57); + + // Now send a revert to L1 block 18318210 + fixture.follower(0).rollup_manager_handle.revert_to_l1_block(18318210).await?; + + // Wait for the chain to be unwound + fixture.expect_event().revert_to_l1_block().await?; + + // Get the node status + let status = fixture.get_status(0).await?; + + // Assert that the safe block number is now 4 + assert_eq!(status.l2.fcs.safe_block_info().number, 4); + + // Assert that the L1 status is now syncing + assert_eq!(status.l1.status, SyncMode::Syncing); + + // Now send a Synced notification to the chain orchestrator to resume eager processing + fixture.l1().sync().await?; + + // Send the second batch again + let _ = fixture + .l1() + .commit_batch() + .at_block(batch_1_block_info) + .hash(batch_1_hash) + .index(2) + .calldata(raw_calldata_1) + .send() + .await?; + + // Wait for the second batch to be consolidated + fixture.expect_event().batch_consolidated().await?; + + // Get the node status + let status = fixture.get_status(0).await?; + + // Validate the safe block number is 57 + assert_eq!(status.l2.fcs.safe_block_info().number, 57); + + Ok(()) +} + #[tokio::test] async fn consolidates_committed_batches_after_chain_consolidation() -> eyre::Result<()> { reth_tracing::init_test_tracing(); diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index 6e7b5003..b7dc1b66 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -569,8 +569,8 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { sequencer.connect(&mut follower).await; // set both the sequencer and follower L1 watchers to synced - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); - follower_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + sequencer_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + follower_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); // Initially the sequencer should build 100 blocks with 1 message in each and the follower // should follow them @@ -589,16 +589,16 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { block_timestamp: i * 10, }); let new_block = Arc::new(L1Notification::NewBlock(block_info)); - sequencer_l1_watcher_tx.send(l1_message.clone()).await.unwrap(); - sequencer_l1_watcher_tx.send(new_block.clone()).await.unwrap(); + sequencer_l1_watcher_tx.notification_tx.send(l1_message.clone()).await.unwrap(); + sequencer_l1_watcher_tx.notification_tx.send(new_block.clone()).await.unwrap(); wait_n_events( &mut sequencer_events, |e| matches!(e, ChainOrchestratorEvent::NewL1Block(_)), 1, ) .await; - follower_l1_watcher_tx.send(l1_message).await.unwrap(); - follower_l1_watcher_tx.send(new_block).await.unwrap(); + follower_l1_watcher_tx.notification_tx.send(l1_message).await.unwrap(); + follower_l1_watcher_tx.notification_tx.send(new_block).await.unwrap(); wait_n_events( &mut follower_events, |e| matches!(e, ChainOrchestratorEvent::NewL1Block(_)), @@ -622,7 +622,11 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { } // send a reorg notification to the sequencer - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(50))).await.unwrap(); + sequencer_l1_watcher_tx + .notification_tx + .send(Arc::new(L1Notification::Reorg(50))) + .await + .unwrap(); wait_n_events( &mut sequencer_events, |e| { @@ -660,8 +664,8 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { }); let new_block = Arc::new(L1Notification::NewBlock(block_info)); l1_notifications.extend([l1_message.clone(), new_block.clone()]); - sequencer_l1_watcher_tx.send(l1_message.clone()).await.unwrap(); - sequencer_l1_watcher_tx.send(new_block.clone()).await.unwrap(); + sequencer_l1_watcher_tx.notification_tx.send(l1_message.clone()).await.unwrap(); + sequencer_l1_watcher_tx.notification_tx.send(new_block.clone()).await.unwrap(); wait_n_events( &mut sequencer_events, |e| matches!(e, ChainOrchestratorEvent::NewL1Block(_)), @@ -694,9 +698,9 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { .await; // Now update the follower node with the new L1 data - follower_l1_watcher_tx.send(Arc::new(L1Notification::Reorg(50))).await.unwrap(); + follower_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Reorg(50))).await.unwrap(); for notification in l1_notifications { - follower_l1_watcher_tx.send(notification).await.unwrap(); + follower_l1_watcher_tx.notification_tx.send(notification).await.unwrap(); } wait_n_events(&mut follower_events, |e| matches!(e, ChainOrchestratorEvent::NewL1Block(_)), 20) .await; diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index db644973..b98e5e64 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -519,7 +519,7 @@ async fn can_sequence_blocks_with_private_key_file() -> eyre::Result<()> { let sequencer_l1_watcher_tx = nodes[0].inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); // Send a notification to set the L1 to synced - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?; + sequencer_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await?; // skip the L1 synced event and consolidated events sequencer_events.next().await; @@ -619,7 +619,7 @@ async fn can_sequence_blocks_with_hex_key_file_without_prefix() -> eyre::Result< let sequencer_l1_watcher_tx = nodes[0].inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); // Send a notification to set the L1 to synced - sequencer_l1_watcher_tx.send(Arc::new(L1Notification::Synced)).await?; + sequencer_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await?; // skip the L1 synced event and consolidated events sequencer_events.next().await; diff --git a/crates/watcher/src/handle/command.rs b/crates/watcher/src/handle/command.rs new file mode 100644 index 00000000..29b9139d --- /dev/null +++ b/crates/watcher/src/handle/command.rs @@ -0,0 +1,15 @@ +use crate::L1Notification; +use std::sync::Arc; +use tokio::sync::mpsc; + +/// Commands that can be sent to the L1 Watcher. +#[derive(Debug, Clone)] +pub enum L1WatcherCommand { + /// Reset the watcher to a specific L1 block number. + ResetToBlock { + /// The L1 block number to reset to. + block: u64, + /// New sender to replace the current notification channel. + tx: mpsc::Sender>, + }, +} diff --git a/crates/watcher/src/handle/mod.rs b/crates/watcher/src/handle/mod.rs new file mode 100644 index 00000000..6b6669e1 --- /dev/null +++ b/crates/watcher/src/handle/mod.rs @@ -0,0 +1,46 @@ +use crate::L1Notification; +use std::sync::Arc; +use tokio::sync::{mpsc, mpsc::UnboundedSender}; + +mod command; +pub use command::L1WatcherCommand; + +/// Handle to interact with the L1 Watcher. +#[derive(Debug)] +pub struct L1WatcherHandle { + to_watcher_tx: UnboundedSender, + l1_notification_rx: mpsc::Receiver>, +} + +impl L1WatcherHandle { + /// Create a new handle with the given command sender. + pub const fn new( + to_watcher_tx: UnboundedSender, + l1_notification_rx: mpsc::Receiver>, + ) -> Self { + Self { to_watcher_tx, l1_notification_rx } + } + + /// Get a mutable reference to the L1 notification receiver. + pub const fn l1_notification_receiver(&mut self) -> &mut mpsc::Receiver> { + &mut self.l1_notification_rx + } + + /// Send a command to the watcher. + fn send_command(&self, command: L1WatcherCommand) { + if let Err(err) = self.to_watcher_tx.send(command) { + tracing::error!(target: "scroll::watcher", ?err, "Failed to send command to L1 watcher"); + } + } + + /// Reset the L1 Watcher to a specific block number with a fresh notification channel. + pub fn revert_to_l1_block(&self, block: u64) -> mpsc::Receiver> { + // Create a fresh notification channel with the same capacity as the original channel + let capacity = self.l1_notification_rx.max_capacity(); + let (tx, rx) = mpsc::channel(capacity); + + self.send_command(L1WatcherCommand::ResetToBlock { block, tx }); + + rx + } +} diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 22141e95..c7c814c4 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -3,6 +3,9 @@ mod error; pub use error::{EthRequestError, FilterLogError, L1WatcherError}; +mod handle; +pub use handle::{L1WatcherCommand, L1WatcherHandle}; + mod metrics; pub use metrics::WatcherMetrics; @@ -82,6 +85,8 @@ pub struct L1Watcher { l1_state: L1State, /// The latest indexed block. current_block_number: BlockNumber, + /// The command receiver for the L1 watcher. + command_rx: mpsc::UnboundedReceiver, /// The sender part of the channel for [`L1Notification`]. sender: mpsc::Sender>, /// The rollup node configuration. @@ -201,10 +206,12 @@ where l1_block_startup_info: L1BlockStartupInfo, config: Arc, log_query_block_range: u64, - ) -> mpsc::Receiver> { + ) -> L1WatcherHandle { tracing::trace!(target: "scroll::watcher", ?l1_block_startup_info, ?config, "spawning L1 watcher"); - let (tx, rx) = mpsc::channel(log_query_block_range as usize); + let (notification_tx, notification_rx) = mpsc::channel(log_query_block_range as usize); + let (command_tx, command_rx) = mpsc::unbounded_channel(); + let handle = L1WatcherHandle::new(command_tx, notification_rx); let fetch_block_info = async |tag: BlockNumberOrTag| { let block = loop { @@ -258,7 +265,8 @@ where unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY), current_block_number: start_block.saturating_sub(1), l1_state, - sender: tx, + command_rx, + sender: notification_tx, config, metrics: WatcherMetrics::default(), is_synced: false, @@ -283,12 +291,19 @@ where tokio::spawn(watcher.run()); - rx + handle } /// Main execution loop for the [`L1Watcher`]. pub async fn run(mut self) { loop { + // Process any pending commands. + while let Ok(command) = self.command_rx.try_recv() { + if let Err(err) = self.handle_command(command) { + tracing::error!(target: "scroll::watcher", ?err, "failed to handle L1 watcher command"); + } + } + // step the watcher. if let Err(L1WatcherError::SendError(_)) = self .step() @@ -315,6 +330,24 @@ where } } + /// Handle a command sent to the L1 watcher. + fn handle_command(&mut self, command: L1WatcherCommand) -> L1WatcherResult<()> { + match command { + L1WatcherCommand::ResetToBlock { block, tx } => { + tracing::info!(target: "scroll::watcher", ?block, "resetting L1 watcher to block"); + + // reset the state. + self.current_block_number = block; + self.unfinalized_blocks.clear(); + self.is_synced = false; + + // replace the notification sender. + self.sender = tx; + } + } + Ok(()) + } + /// A step of work for the [`L1Watcher`]. pub async fn step(&mut self) -> L1WatcherResult<()> { // handle the finalized block. @@ -863,7 +896,7 @@ mod tests { transactions: Vec, finalized: Header, latest: Header, - ) -> (L1Watcher, mpsc::Receiver>) { + ) -> (L1Watcher, L1WatcherHandle) { let provider_blocks = provider_blocks.into_iter().map(|h| Block { header: h, ..Default::default() }); let finalized = Block { header: finalized, ..Default::default() }; @@ -876,20 +909,23 @@ mod tests { vec![latest], ); - let (tx, rx) = mpsc::channel(LOG_QUERY_BLOCK_RANGE as usize); + let (notification_tx, notification_rx) = mpsc::channel(LOG_QUERY_BLOCK_RANGE as usize); + let (command_tx, command_rx) = mpsc::unbounded_channel(); + let handle = L1WatcherHandle::new(command_tx, notification_rx); ( L1Watcher { execution_provider: provider, unfinalized_blocks: unfinalized_blocks.into(), l1_state: L1State { head: Default::default(), finalized: Default::default() }, current_block_number: 0, - sender: tx, + command_rx, + sender: notification_tx, config: Arc::new(NodeConfig::mainnet()), metrics: WatcherMetrics::default(), is_synced: false, log_query_block_range: LOG_QUERY_BLOCK_RANGE, }, - rx, + handle, ) } @@ -947,7 +983,7 @@ mod tests { async fn test_should_handle_finalized_with_empty_state() -> eyre::Result<()> { // Given let (finalized, latest, _) = chain(2); - let (mut watcher, _rx) = l1_watcher(vec![], vec![], vec![], finalized.clone(), latest); + let (mut watcher, _handle) = l1_watcher(vec![], vec![], vec![], finalized.clone(), latest); // When watcher.handle_finalized_block(&finalized).await?; @@ -963,7 +999,7 @@ mod tests { // Given let (_, latest, chain) = chain(10); let finalized = chain[5].clone(); - let (mut watcher, _rx) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); + let (mut watcher, _handle) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); // When watcher.handle_finalized_block(&finalized).await?; @@ -979,7 +1015,7 @@ mod tests { // Given let (_, latest, chain) = chain(10); let finalized = latest.clone(); - let (mut watcher, _rx) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); + let (mut watcher, _handle) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest); // When watcher.handle_finalized_block(&finalized).await?; @@ -1011,7 +1047,7 @@ mod tests { // Given let (finalized, latest, chain) = chain(10); let unfinalized_chain = chain[..9].to_vec(); - let (mut watcher, _rx) = + let (mut watcher, _handle) = l1_watcher(unfinalized_chain, vec![], vec![], finalized.clone(), latest.clone()); assert_eq!(watcher.unfinalized_blocks.len(), 9); @@ -1031,7 +1067,7 @@ mod tests { // Given let (finalized, latest, chain) = chain(10); let unfinalized_chain = chain[..5].to_vec(); - let (mut watcher, mut receiver) = + let (mut watcher, mut handle) = l1_watcher(unfinalized_chain, chain, vec![], finalized.clone(), latest.clone()); // When @@ -1040,7 +1076,7 @@ mod tests { // Then assert_eq!(watcher.unfinalized_blocks.len(), 10); assert_eq!(watcher.unfinalized_blocks.pop().unwrap(), latest); - let notification = receiver.recv().await.unwrap(); + let notification = handle.l1_notification_receiver().recv().await.unwrap(); assert!(matches!(*notification, L1Notification::NewBlock(_))); Ok(()) @@ -1052,7 +1088,7 @@ mod tests { let (finalized, _, chain) = chain(10); let reorged = chain_from(&chain[5], 10); let latest = reorged[9].clone(); - let (mut watcher, mut receiver) = + let (mut watcher, mut handle) = l1_watcher(chain.clone(), reorged, vec![], finalized.clone(), latest.clone()); // When @@ -1063,9 +1099,9 @@ mod tests { assert_eq!(watcher.unfinalized_blocks.pop().unwrap(), latest); assert_eq!(watcher.current_block_number, chain[5].number); - let notification = receiver.recv().await.unwrap(); + let notification = handle.l1_notification_receiver().recv().await.unwrap(); assert!(matches!(*notification, L1Notification::Reorg(_))); - let notification = receiver.recv().await.unwrap(); + let notification = handle.l1_notification_receiver().recv().await.unwrap(); assert!(matches!(*notification, L1Notification::NewBlock(_))); Ok(()) diff --git a/crates/watcher/tests/indexing.rs b/crates/watcher/tests/indexing.rs index c1fdc040..7c1c48a1 100644 --- a/crates/watcher/tests/indexing.rs +++ b/crates/watcher/tests/indexing.rs @@ -72,7 +72,7 @@ async fn test_should_not_index_latest_block_multiple_times() -> eyre::Result<()> loop { select! { - notification = l1_watcher.recv() => { + notification = l1_watcher.l1_notification_receiver().recv() => { let notification = notification.map(|notif| (*notif).clone()); if let Some(L1Notification::L1Message { block_info, .. }) = notification { assert_ne!(prev_block_info, block_info, "indexed same block twice {block_info}"); diff --git a/crates/watcher/tests/logs.rs b/crates/watcher/tests/logs.rs index 0e689e4d..745b1634 100644 --- a/crates/watcher/tests/logs.rs +++ b/crates/watcher/tests/logs.rs @@ -73,7 +73,8 @@ async fn test_should_not_miss_logs_on_reorg() -> eyre::Result<()> { .await; let mut received_logs = Vec::new(); loop { - let notification = l1_watcher.recv().await.map(|notif| (*notif).clone()); + let notification = + l1_watcher.l1_notification_receiver().recv().await.map(|notif| (*notif).clone()); if let Some(L1Notification::L1Message { block_timestamp, message, .. }) = notification { received_logs.push(message); if block_timestamp == last_log.block_timestamp.unwrap() { diff --git a/crates/watcher/tests/reorg.rs b/crates/watcher/tests/reorg.rs index 6c81b9c8..48ace7e4 100644 --- a/crates/watcher/tests/reorg.rs +++ b/crates/watcher/tests/reorg.rs @@ -81,8 +81,8 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { .await; // skip the first two events - l1_watcher.recv().await.unwrap(); - l1_watcher.recv().await.unwrap(); + l1_watcher.l1_notification_receiver().recv().await.unwrap(); + l1_watcher.l1_notification_receiver().recv().await.unwrap(); let mut latest_number = latest_blocks.first().unwrap().header.number; let mut finalized_number = finalized_blocks.first().unwrap().header.number; @@ -90,10 +90,10 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) { // check finalized first. if finalized_number < finalized.header.number { - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); } assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); } @@ -102,23 +102,23 @@ async fn test_should_detect_reorg() -> eyre::Result<()> { continue; } - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); } // skip the `L1Notification::Synced` notifications if matches!(notification.as_ref(), L1Notification::Synced) { - notification = l1_watcher.recv().await.unwrap(); + notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); } // check latest for reorg or new block. if latest_number > latest.header.number { // reorg assert!(matches!(notification.as_ref(), L1Notification::Reorg(_))); - let notification = l1_watcher.recv().await.unwrap(); + let notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); } else { assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); @@ -188,8 +188,8 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { .await; // skip the first two events - l1_watcher.recv().await.unwrap(); - l1_watcher.recv().await.unwrap(); + l1_watcher.l1_notification_receiver().recv().await.unwrap(); + l1_watcher.l1_notification_receiver().recv().await.unwrap(); let mut latest_number = latest_blocks.first().unwrap().header.number; let mut finalized_number = finalized_blocks.first().unwrap().header.number; @@ -197,10 +197,10 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { for (latest, finalized) in latest_blocks[1..].iter().zip(finalized_blocks[1..].iter()) { // check finalized first. if finalized_number < finalized.header.number { - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); } assert_eq!(notification.as_ref(), &L1Notification::Finalized(finalized.header.number)); } @@ -209,16 +209,16 @@ async fn test_should_fetch_gap_in_unfinalized_blocks() -> eyre::Result<()> { continue; } - let mut notification = l1_watcher.recv().await.unwrap(); + let mut notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); // skip the `L1Notification::Processed` notifications if matches!(notification.as_ref(), L1Notification::Processed(_)) { - notification = l1_watcher.recv().await.unwrap(); + notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); } // skip the `L1Notification::Synced` notifications if matches!(notification.as_ref(), L1Notification::Synced) { - notification = l1_watcher.recv().await.unwrap(); + notification = l1_watcher.l1_notification_receiver().recv().await.unwrap(); } assert_eq!(notification.as_ref(), &L1Notification::NewBlock((&latest.header).into())); From 662ed176885a7fffd453ef0e98d18f355c820d93 Mon Sep 17 00:00:00 2001 From: frisitano Date: Tue, 23 Dec 2025 18:56:15 +0000 Subject: [PATCH 2/5] fix tests --- crates/chain-orchestrator/src/lib.rs | 2 -- crates/database/db/src/operations.rs | 1 - crates/node/src/test_utils/l1_helpers.rs | 16 +--------------- crates/node/tests/e2e.rs | 7 +++---- 4 files changed, 4 insertions(+), 22 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 912181db..8d055617 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -405,8 +405,6 @@ impl< self.sync_state.l1_mut().set_syncing(); let unwind_result = self.database.unwind(block_number).await?; - println!("Unwind result: {:?}", unwind_result); - // Check if the unwind impacts the fcs safe head. if let Some(block_info) = unwind_result.l2_safe_block_info { // If the safe head was unwound and is above or equal to the finalized head, diff --git a/crates/database/db/src/operations.rs b/crates/database/db/src/operations.rs index f28887bd..0d6cfa21 100644 --- a/crates/database/db/src/operations.rs +++ b/crates/database/db/src/operations.rs @@ -852,7 +852,6 @@ impl DatabaseWriteOperations for T { // delete batch commits, l1 messages and batch finalization effects greater than the // provided l1 block number let batches_removed = self.delete_batches_gt_block_number(l1_block_number).await?; - println!("Deleted {} batches", batches_removed); let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?; self.delete_batch_finalization_gt_block_number(l1_block_number).await?; let batch_reverts_removed: u64 = diff --git a/crates/node/src/test_utils/l1_helpers.rs b/crates/node/src/test_utils/l1_helpers.rs index 65e62c9e..46d2d143 100644 --- a/crates/node/src/test_utils/l1_helpers.rs +++ b/crates/node/src/test_utils/l1_helpers.rs @@ -248,7 +248,6 @@ pub struct BatchCommitBuilder<'a> { block_info: BlockInfo, hash: B256, index: u64, - block_number: u64, block_timestamp: u64, calldata: Option, calldata_path: Option, @@ -262,7 +261,6 @@ impl<'a> BatchCommitBuilder<'a> { block_info: BlockInfo { number: 0, hash: B256::random() }, hash: B256::random(), index: 0, - block_number: 0, block_timestamp: 0, calldata: None, calldata_path: None, @@ -276,12 +274,6 @@ impl<'a> BatchCommitBuilder<'a> { self } - /// Set the L1 block number for this batch commit. - pub const fn at_block_number(mut self, block_number: u64) -> Self { - self.block_info.number = block_number; - self - } - /// Set the batch hash. pub const fn hash(mut self, hash: B256) -> Self { self.hash = hash; @@ -294,12 +286,6 @@ impl<'a> BatchCommitBuilder<'a> { self } - /// Set the batch block number. - pub const fn block_number(mut self, block_number: u64) -> Self { - self.block_number = block_number; - self - } - /// Set the batch block timestamp. pub const fn block_timestamp(mut self, timestamp: u64) -> Self { self.block_timestamp = timestamp; @@ -337,7 +323,7 @@ impl<'a> BatchCommitBuilder<'a> { let batch_data = BatchCommitData { hash: self.hash, index: self.index, - block_number: self.block_number, + block_number: self.block_info.number, block_timestamp: self.block_timestamp, calldata: Arc::new(raw_calldata), blob_versioned_hash: self.blob_versioned_hash, diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 303f93b6..6d8439cf 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -765,8 +765,6 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Request an event stream from the rollup node manager. let mut rnm_events = handle.get_event_listener().await?; - println!("im here"); - // Send the second batch again to mimic the watcher behaviour. let block_1_info = BlockInfo { number: 18318215, hash: B256::random() }; l1_notification_tx @@ -1052,6 +1050,7 @@ async fn can_revert_to_l1_block() -> eyre::Result<()> { // Validate the safe block number is 57 assert_eq!(status.l2.fcs.safe_block_info().number, 57); + assert_eq!(status.l1.status, SyncMode::Synced); Ok(()) } @@ -1185,7 +1184,7 @@ async fn can_handle_batch_revert_with_reorg() -> eyre::Result<()> { .at_block(batch_0_block_info) .hash(batch_0_hash) .index(1) - .block_number(18318207) + .at_block(batch_0_block_info) .block_timestamp(1696935971) .calldata(raw_calldata_0) .send() @@ -1201,7 +1200,7 @@ async fn can_handle_batch_revert_with_reorg() -> eyre::Result<()> { .at_block(batch_1_block_info) .hash(batch_1_hash) .index(2) - .block_number(18318215) + .at_block(batch_1_block_info) .block_timestamp(1696936000) .calldata(raw_calldata_1) .send() From ab3acb79d8fc2bbbae0d6adea4a67f03fb4f62be Mon Sep 17 00:00:00 2001 From: frisitano Date: Wed, 24 Dec 2025 14:09:15 +0000 Subject: [PATCH 3/5] fix features --- crates/chain-orchestrator/src/handle/mod.rs | 21 ++++- crates/node/src/add_ons/handle.rs | 5 -- crates/node/src/add_ons/mod.rs | 9 +-- crates/node/src/add_ons/rollup.rs | 8 +- crates/node/src/args.rs | 86 +++++++++++---------- crates/node/src/test_utils/fixture.rs | 14 +--- crates/node/src/test_utils/l1_helpers.rs | 16 +--- crates/node/tests/e2e.rs | 30 ++++--- crates/node/tests/sync.rs | 6 +- crates/sequencer/tests/e2e.rs | 6 +- crates/watcher/src/test_utils/mod.rs | 16 +++- 11 files changed, 121 insertions(+), 96 deletions(-) diff --git a/crates/chain-orchestrator/src/handle/mod.rs b/crates/chain-orchestrator/src/handle/mod.rs index ea8477a9..76737777 100644 --- a/crates/chain-orchestrator/src/handle/mod.rs +++ b/crates/chain-orchestrator/src/handle/mod.rs @@ -22,13 +22,32 @@ use metrics::ChainOrchestratorHandleMetrics; pub struct ChainOrchestratorHandle> { /// The channel used to send commands to the rollup manager. to_manager_tx: mpsc::UnboundedSender>, + /// The metrics for the handle. handle_metrics: ChainOrchestratorHandleMetrics, + /// Mock for the L1 Watcher used in tests. + #[cfg(feature = "test-utils")] + pub l1_watcher_mock: Option, } impl> ChainOrchestratorHandle { /// Create a new rollup manager handle. pub fn new(to_manager_tx: mpsc::UnboundedSender>) -> Self { - Self { to_manager_tx, handle_metrics: ChainOrchestratorHandleMetrics::default() } + Self { + to_manager_tx, + handle_metrics: ChainOrchestratorHandleMetrics::default(), + #[cfg(feature = "test-utils")] + l1_watcher_mock: None, + } + } + + /// Sets the L1 watcher mock for the handle. + #[cfg(feature = "test-utils")] + pub fn with_l1_watcher_mock( + mut self, + l1_watcher_mock: Option, + ) -> Self { + self.l1_watcher_mock = l1_watcher_mock; + self } /// Sends a command to the rollup manager. diff --git a/crates/node/src/add_ons/handle.rs b/crates/node/src/add_ons/handle.rs index 6baa7018..a762df35 100644 --- a/crates/node/src/add_ons/handle.rs +++ b/crates/node/src/add_ons/handle.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "test-utils")] -use crate::test_utils::l1_helpers::L1WatcherMock; use reth_network_api::FullNetwork; use reth_node_api::FullNodeComponents; use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider}; @@ -17,9 +15,6 @@ pub struct ScrollAddOnsHandle< pub rollup_manager_handle: ChainOrchestratorHandle, /// The handle used to send commands to the RPC server. pub rpc_handle: RpcHandle, - /// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`. - #[cfg(feature = "test-utils")] - pub l1_watcher_tx: Option, } impl< diff --git a/crates/node/src/add_ons/mod.rs b/crates/node/src/add_ons/mod.rs index 12535fe0..509e1536 100644 --- a/crates/node/src/add_ons/mod.rs +++ b/crates/node/src/add_ons/mod.rs @@ -152,7 +152,7 @@ where }); let rpc_handle = rpc_add_ons.launch_add_ons_with(ctx.clone(), |_| Ok(())).await?; - let (rollup_manager_handle, l1_watcher_tx) = + let rollup_manager_handle = rollup_node_manager_addon.launch(ctx.clone(), rpc_handle.clone()).await?; // Only send handle if RPC is enabled @@ -161,12 +161,7 @@ where .map_err(|_| eyre::eyre!("failed to send rollup manager handle"))?; } - Ok(ScrollAddOnsHandle { - rollup_manager_handle, - rpc_handle, - #[cfg(feature = "test-utils")] - l1_watcher_tx, - }) + Ok(ScrollAddOnsHandle { rollup_manager_handle, rpc_handle }) } } diff --git a/crates/node/src/add_ons/rollup.rs b/crates/node/src/add_ons/rollup.rs index b6a37989..5c76d6ff 100644 --- a/crates/node/src/add_ons/rollup.rs +++ b/crates/node/src/add_ons/rollup.rs @@ -1,4 +1,4 @@ -use crate::{args::ScrollRollupNodeConfig, test_utils::l1_helpers::L1WatcherMock}; +use crate::args::ScrollRollupNodeConfig; use reth_chainspec::NamedChain; use reth_network::NetworkProtocols; @@ -53,13 +53,13 @@ impl RollupManagerAddOn { self, ctx: AddOnsContext<'_, N>, rpc: RpcHandle, - ) -> eyre::Result<(ChainOrchestratorHandle, Option)> + ) -> eyre::Result> where <::Types as NodeTypes>::ChainSpec: ChainConfig + ScrollHardforks + IsDevChain, N::Network: NetworkProtocols + FullNetwork, { - let (chain_orchestrator, handle, l1_watcher_mock) = self + let (chain_orchestrator, handle) = self .config .build((&ctx).into(), self.scroll_wire_event, rpc.rpc_server_handles) .await?; @@ -68,6 +68,6 @@ impl RollupManagerAddOn { .spawn_critical_with_shutdown_signal("rollup_node_manager", |shutdown| { chain_orchestrator.run_until_shutdown(shutdown) }); - Ok((handle, l1_watcher_mock)) + Ok(handle) } } diff --git a/crates/node/src/args.rs b/crates/node/src/args.rs index a5536ff6..4ceceb5a 100644 --- a/crates/node/src/args.rs +++ b/crates/node/src/args.rs @@ -6,7 +6,6 @@ use crate::{ use scroll_migration::MigratorTrait; use std::{fs, path::PathBuf, sync::Arc}; -use super::test_utils::l1_helpers::L1WatcherMock; use alloy_chains::NamedChain; use alloy_primitives::{hex, Address, U128}; use alloy_provider::{layers::CacheLayer, Provider, ProviderBuilder}; @@ -167,7 +166,6 @@ impl ScrollRollupNodeConfig { impl ScrollEngineApi, >, ChainOrchestratorHandle, - Option, )> where N: FullNetwork + NetworkProtocols, @@ -351,42 +349,49 @@ impl ScrollRollupNodeConfig { }; let consensus = self.consensus_args.consensus(authorized_signer)?; - let (l1_watcher_mock, l1_watcher_handle) = if let Some(provider) = - l1_provider.filter(|_| !self.test) - { - tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher"); - ( - None, - Some( - L1Watcher::spawn( - provider, - l1_block_startup_info, - node_config, - self.l1_provider_args.logs_query_block_range, - ) - .await, - ), - ) - } else { - // Create a channel for L1 notifications that we can use to inject L1 messages for - // testing - #[cfg(feature = "test-utils")] - { - let (notification_tx, notification_rx) = tokio::sync::mpsc::channel(1000); - let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel(); - let handle = L1WatcherHandle::new(command_tx, notification_rx); - let watcher_mock = L1WatcherMock { - command_rx: Arc::new(tokio::sync::Mutex::new(command_rx)), - notification_tx, - }; - (Some(watcher_mock), Some(handle)) - } - - #[cfg(not(feature = "test-utils"))] - { - (None, None) - } - }; + // Define some types to support definitions of return type of following function in no_std. + #[cfg(feature = "test-utils")] + type L1WatcherMockOpt = Option; + + #[cfg(not(feature = "test-utils"))] + type L1WatcherMockOpt = Option; + + let (_l1_watcher_mock, l1_watcher_handle): (L1WatcherMockOpt, Option) = + if let Some(provider) = l1_provider.filter(|_| !self.test) { + tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher"); + ( + None, + Some( + L1Watcher::spawn( + provider, + l1_block_startup_info, + node_config, + self.l1_provider_args.logs_query_block_range, + ) + .await, + ), + ) + } else { + // Create a channel for L1 notifications that we can use to inject L1 messages for + // testing + #[cfg(feature = "test-utils")] + { + let (notification_tx, notification_rx) = tokio::sync::mpsc::channel(1000); + let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = + rollup_node_watcher::L1WatcherHandle::new(command_tx, notification_rx); + let watcher_mock = rollup_node_watcher::test_utils::L1WatcherMock { + command_rx: Arc::new(tokio::sync::Mutex::new(command_rx)), + notification_tx, + }; + (Some(watcher_mock), Some(handle)) + } + + #[cfg(not(feature = "test-utils"))] + { + (None, None) + } + }; // Construct the l1 provider. let l1_messages_provider = db.clone(); @@ -475,7 +480,10 @@ impl ScrollRollupNodeConfig { ) .await?; - Ok((chain_orchestrator, handle, l1_watcher_mock)) + #[cfg(feature = "test-utils")] + let handle = handle.with_l1_watcher_mock(_l1_watcher_mock); + + Ok((chain_orchestrator, handle)) } } diff --git a/crates/node/src/test_utils/fixture.rs b/crates/node/src/test_utils/fixture.rs index b0946258..137875bf 100644 --- a/crates/node/src/test_utils/fixture.rs +++ b/crates/node/src/test_utils/fixture.rs @@ -4,10 +4,9 @@ use super::{ block_builder::BlockBuilder, l1_helpers::L1Helper, setup_engine, tx_helpers::TxHelper, }; use crate::{ - test_utils::l1_helpers::L1WatcherMock, BlobProviderArgs, ChainOrchestratorArgs, - ConsensusAlgorithm, ConsensusArgs, EngineDriverArgs, L1ProviderArgs, RollupNodeDatabaseArgs, - RollupNodeGasPriceOracleArgs, RollupNodeNetworkArgs, RpcArgs, ScrollRollupNode, - ScrollRollupNodeConfig, SequencerArgs, SignerArgs, + BlobProviderArgs, ChainOrchestratorArgs, ConsensusAlgorithm, ConsensusArgs, EngineDriverArgs, + L1ProviderArgs, RollupNodeDatabaseArgs, RollupNodeGasPriceOracleArgs, RollupNodeNetworkArgs, + RpcArgs, ScrollRollupNode, ScrollRollupNodeConfig, SequencerArgs, SignerArgs, }; use alloy_eips::BlockNumberOrTag; @@ -75,8 +74,6 @@ pub struct NodeHandle { pub node: NodeHelperType, /// Engine instance for this node. pub engine: Engine>, - /// L1 watcher notification channel. - pub l1_watcher_tx: Option, /// Chain orchestrator listener. pub chain_orchestrator_rx: EventStream, /// Chain orchestrator handle. @@ -102,7 +99,6 @@ impl Debug for NodeHandle { f.debug_struct("NodeHandle") .field("node", &"NodeHelper") .field("engine", &"Box") - .field("l1_watcher_tx", &self.l1_watcher_tx) .field("rollup_manager_handle", &self.rollup_manager_handle) .finish() } @@ -436,7 +432,7 @@ impl TestFixtureBuilder { .await?; let mut node_handles = Vec::with_capacity(nodes.len()); - for (index, mut node) in nodes.into_iter().enumerate() { + for (index, node) in nodes.into_iter().enumerate() { let genesis_hash = node.inner.chain_spec().genesis_hash(); // Create engine for the node @@ -451,7 +447,6 @@ impl TestFixtureBuilder { let engine = Engine::new(Arc::new(engine_client), fcs); // Get handles if available - let l1_watcher_tx = node.inner.add_ons_handle.l1_watcher_tx.take(); let rollup_manager_handle = node.inner.add_ons_handle.rollup_manager_handle.clone(); let chain_orchestrator_rx = node.inner.add_ons_handle.rollup_manager_handle.get_event_listener().await?; @@ -460,7 +455,6 @@ impl TestFixtureBuilder { node, engine, chain_orchestrator_rx, - l1_watcher_tx, rollup_manager_handle, typ: if config.sequencer_args.sequencer_enabled && index == 0 { NodeType::Sequencer diff --git a/crates/node/src/test_utils/l1_helpers.rs b/crates/node/src/test_utils/l1_helpers.rs index 46d2d143..5e5b0d9d 100644 --- a/crates/node/src/test_utils/l1_helpers.rs +++ b/crates/node/src/test_utils/l1_helpers.rs @@ -5,18 +5,8 @@ use std::{fmt::Debug, str::FromStr, sync::Arc}; use alloy_primitives::{Address, Bytes, B256, U256}; use rollup_node_primitives::{BatchCommitData, BlockInfo, ConsensusUpdate}; -use rollup_node_watcher::{L1Notification, L1WatcherCommand}; +use rollup_node_watcher::L1Notification; use scroll_alloy_consensus::TxL1Message; -use tokio::sync::{mpsc, Mutex}; - -/// Mock for the L1 Watcher. -#[derive(Clone, Debug)] -pub struct L1WatcherMock { - /// Receiver for L1 watcher commands. - pub command_rx: Arc>>, - /// Sender for L1 notifications. - pub notification_tx: mpsc::Sender>, -} /// Helper for managing L1 interactions in tests. #[derive(Debug)] @@ -126,7 +116,7 @@ impl<'a> L1Helper<'a> { }; for node in nodes { - if let Some(tx) = &node.l1_watcher_tx { + if let Some(tx) = &node.rollup_manager_handle.l1_watcher_mock { tx.notification_tx.send(notification.clone()).await?; } } @@ -232,7 +222,7 @@ impl<'a> L1MessageBuilder<'a> { }; for node in nodes { - if let Some(tx) = &node.l1_watcher_tx { + if let Some(tx) = &node.rollup_manager_handle.l1_watcher_mock { tx.notification_tx.send(notification.clone()).await?; } } diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 6d8439cf..37b75ef4 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -322,9 +322,14 @@ async fn can_forward_tx_to_sequencer() -> eyre::Result<()> { .unwrap(); // Send a notification to set the L1 to synced - let sequencer_l1_watcher_tx = - sequencer_node[0].inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); - sequencer_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); + let sequencer_l1_watcher_mock = sequencer_node[0] + .inner + .add_ons_handle + .rollup_manager_handle + .l1_watcher_mock + .clone() + .unwrap(); + sequencer_l1_watcher_mock.notification_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); sequencer_events.next().await; sequencer_events.next().await; @@ -463,7 +468,8 @@ async fn can_bridge_blocks() -> eyre::Result<()> { .await?; let mut bridge_node = nodes.pop().unwrap(); let bridge_peer_id = bridge_node.network.record().id; - let bridge_node_l1_watcher_tx = bridge_node.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + let bridge_node_l1_watcher_tx = + bridge_node.inner.add_ons_handle.rollup_manager_handle.l1_watcher_mock.clone().unwrap(); // Send a notification to set the L1 to synced bridge_node_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await.unwrap(); @@ -573,7 +579,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() config.hydrate(node.inner.config.clone()).await?; let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (chain_orchestrator, handle, l1_notification_tx) = config + let (chain_orchestrator, handle) = config .clone() .build( RollupNodeContext::new( @@ -605,7 +611,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() let mut rnm_events = handle.get_event_listener().await?; // Extract the L1 notification sender - let l1_notification_tx = l1_notification_tx.unwrap(); + let l1_notification_tx = handle.l1_watcher_mock.unwrap(); // Load test batches let block_0_info = BlockInfo { number: 18318207, hash: B256::random() }; @@ -733,7 +739,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() // Start the RNM again. let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (chain_orchestrator, handle, l1_notification_tx) = config + let (chain_orchestrator, handle) = config .clone() .build( RollupNodeContext::new( @@ -747,7 +753,7 @@ async fn shutdown_consolidates_most_recent_batch_on_startup() -> eyre::Result<() node.inner.add_ons_handle.rpc_handle.rpc_server_handles.clone(), ) .await?; - let l1_notification_tx = l1_notification_tx.unwrap(); + let l1_notification_tx = handle.l1_watcher_mock.clone().unwrap(); // Spawn a task that constantly polls the rnm to make progress. let (_signal, shutdown) = shutdown_signal(); @@ -851,7 +857,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - config.hydrate(node.inner.config.clone()).await?; let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (rnm, handle, l1_watcher_tx) = config + let (rnm, handle) = config .clone() .build( RollupNodeContext::new( @@ -867,7 +873,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - .await?; let (_signal, shutdown) = shutdown_signal(); let mut rnm = Box::pin(rnm.run_until_shutdown(shutdown)); - let l1_watcher_tx = l1_watcher_tx.unwrap(); + let l1_watcher_mock = handle.l1_watcher_mock.clone().unwrap(); // Poll the rnm until we get an event stream listener. let mut rnm_events_fut = pin!(handle.get_event_listener()); @@ -882,7 +888,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - }; // Poll the rnm until we receive the consolidate event - l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await?; + l1_watcher_mock.notification_tx.send(Arc::new(L1Notification::Synced)).await?; loop { let _ = rnm.poll_unpin(&mut Context::from_waker(noop_waker_ref())); if let Poll::Ready(Some(ChainOrchestratorEvent::ChainConsolidated { from: _, to: _ })) = @@ -925,7 +931,7 @@ async fn graceful_shutdown_sets_fcs_to_latest_signed_block_in_db_on_start_up() - // Start the RNM again. let (_, events) = ScrollWireProtocolHandler::new(ScrollWireConfig::new(true)); - let (rnm, handle, _) = config + let (rnm, handle) = config .clone() .build( RollupNodeContext::new( diff --git a/crates/node/tests/sync.rs b/crates/node/tests/sync.rs index b7dc1b66..ee6f687d 100644 --- a/crates/node/tests/sync.rs +++ b/crates/node/tests/sync.rs @@ -557,13 +557,15 @@ async fn test_chain_orchestrator_l1_reorg() -> eyre::Result<()> { let mut sequencer = nodes.pop().unwrap(); let sequencer_handle = sequencer.inner.rollup_manager_handle.clone(); let mut sequencer_events = sequencer_handle.get_event_listener().await?; - let sequencer_l1_watcher_tx = sequencer.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + let sequencer_l1_watcher_tx = + sequencer.inner.add_ons_handle.rollup_manager_handle.l1_watcher_mock.clone().unwrap(); let (mut nodes, _tasks, _) = setup_engine(node_config.clone(), 1, chain_spec.clone(), false, false).await.unwrap(); let mut follower = nodes.pop().unwrap(); let mut follower_events = follower.inner.rollup_manager_handle.get_event_listener().await?; - let follower_l1_watcher_tx = follower.inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + let follower_l1_watcher_tx = + follower.inner.add_ons_handle.rollup_manager_handle.l1_watcher_mock.clone().unwrap(); // Connect the nodes together. sequencer.connect(&mut follower).await; diff --git a/crates/sequencer/tests/e2e.rs b/crates/sequencer/tests/e2e.rs index b98e5e64..1451293e 100644 --- a/crates/sequencer/tests/e2e.rs +++ b/crates/sequencer/tests/e2e.rs @@ -516,7 +516,8 @@ async fn can_sequence_blocks_with_private_key_file() -> eyre::Result<()> { let sequencer_rnm_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); let mut sequencer_events = sequencer_rnm_handle.get_event_listener().await?; - let sequencer_l1_watcher_tx = nodes[0].inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + let sequencer_l1_watcher_tx = + nodes[0].inner.add_ons_handle.rollup_manager_handle.l1_watcher_mock.clone().unwrap(); // Send a notification to set the L1 to synced sequencer_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await?; @@ -616,7 +617,8 @@ async fn can_sequence_blocks_with_hex_key_file_without_prefix() -> eyre::Result< let sequencer_rnm_handle = nodes[0].inner.add_ons_handle.rollup_manager_handle.clone(); let mut sequencer_events = sequencer_rnm_handle.get_event_listener().await?; - let sequencer_l1_watcher_tx = nodes[0].inner.add_ons_handle.l1_watcher_tx.clone().unwrap(); + let sequencer_l1_watcher_tx = + nodes[0].inner.add_ons_handle.rollup_manager_handle.l1_watcher_mock.clone().unwrap(); // Send a notification to set the L1 to synced sequencer_l1_watcher_tx.notification_tx.send(Arc::new(L1Notification::Synced)).await?; diff --git a/crates/watcher/src/test_utils/mod.rs b/crates/watcher/src/test_utils/mod.rs index d95b54e4..468c4908 100644 --- a/crates/watcher/src/test_utils/mod.rs +++ b/crates/watcher/src/test_utils/mod.rs @@ -1,5 +1,7 @@ -use crate::{random, Header}; +use crate::{random, Header, L1Notification, L1WatcherCommand}; use arbitrary::Arbitrary; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; /// Test utils for arbitrary. pub mod arbitrary; @@ -7,6 +9,18 @@ pub mod arbitrary; /// Test utils for provider. pub mod provider; +/// Mock for the L1 Watcher. +/// +/// This allows tests to simulate L1 watcher behavior by sending commands and receiving +/// notifications. +#[derive(Clone, Debug)] +pub struct L1WatcherMock { + /// Receiver for L1 watcher commands. + pub command_rx: Arc>>, + /// Sender for L1 notifications. + pub notification_tx: mpsc::Sender>, +} + /// Returns a chain of random headers of size `len`. pub fn chain(len: usize) -> (Header, Header, Vec
) { assert!(len >= 2, "chain should have a minimal length of two"); From 5060dabd4189b4d2f545afb5241a5fcd7f19a075 Mon Sep 17 00:00:00 2001 From: frisitano Date: Wed, 24 Dec 2025 15:02:42 +0000 Subject: [PATCH 4/5] fixes --- crates/chain-orchestrator/src/lib.rs | 4 ++++ crates/node/tests/e2e.rs | 10 ++++++++++ crates/watcher/src/handle/mod.rs | 6 ++++-- crates/watcher/src/lib.rs | 1 - crates/watcher/src/test_utils/mod.rs | 16 ++++++++++++++++ 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 8d055617..98f78082 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -415,6 +415,10 @@ impl< self.engine.update_fcs(None, Some(block_info), None).await?; } } + + // Revert the L1 watcher to the specified block. + self.l1_watcher.revert_to_l1_block(block_number); + self.notify(ChainOrchestratorEvent::UnwoundToL1Block(block_number)); let _ = tx.send(true); } diff --git a/crates/node/tests/e2e.rs b/crates/node/tests/e2e.rs index 37b75ef4..9132598a 100644 --- a/crates/node/tests/e2e.rs +++ b/crates/node/tests/e2e.rs @@ -1025,6 +1025,16 @@ async fn can_revert_to_l1_block() -> eyre::Result<()> { // Wait for the chain to be unwound fixture.expect_event().revert_to_l1_block().await?; + // Now have the L1 watcher mock handle the command to rewind the L1 head. + fixture + .follower(0) + .rollup_manager_handle + .l1_watcher_mock + .as_mut() + .unwrap() + .handle_command() + .await; + // Get the node status let status = fixture.get_status(0).await?; diff --git a/crates/watcher/src/handle/mod.rs b/crates/watcher/src/handle/mod.rs index 6b6669e1..9e0045be 100644 --- a/crates/watcher/src/handle/mod.rs +++ b/crates/watcher/src/handle/mod.rs @@ -34,13 +34,15 @@ impl L1WatcherHandle { } /// Reset the L1 Watcher to a specific block number with a fresh notification channel. - pub fn revert_to_l1_block(&self, block: u64) -> mpsc::Receiver> { + pub fn revert_to_l1_block(&mut self, block: u64) { // Create a fresh notification channel with the same capacity as the original channel let capacity = self.l1_notification_rx.max_capacity(); let (tx, rx) = mpsc::channel(capacity); + // Send the reset command to the watcher self.send_command(L1WatcherCommand::ResetToBlock { block, tx }); - rx + // Replace the old receiver with the new one + self.l1_notification_rx = rx; } } diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index c7c814c4..717f6ad0 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -338,7 +338,6 @@ where // reset the state. self.current_block_number = block; - self.unfinalized_blocks.clear(); self.is_synced = false; // replace the notification sender. diff --git a/crates/watcher/src/test_utils/mod.rs b/crates/watcher/src/test_utils/mod.rs index 468c4908..752b393a 100644 --- a/crates/watcher/src/test_utils/mod.rs +++ b/crates/watcher/src/test_utils/mod.rs @@ -21,6 +21,22 @@ pub struct L1WatcherMock { pub notification_tx: mpsc::Sender>, } +impl L1WatcherMock { + /// Handle commands sent to the L1 watcher mock. + pub async fn handle_command(&mut self) { + let mut commands = self.command_rx.lock().await; + if let Some(command) = commands.recv().await { + match command { + L1WatcherCommand::ResetToBlock { block, tx } => { + // For testing purposes, we can just log the reset action. + tracing::info!(target: "scroll::watcher::test_utils", "L1 Watcher Mock resetting to block {}", block); + self.notification_tx = tx; + } + } + } + } +} + /// Returns a chain of random headers of size `len`. pub fn chain(len: usize) -> (Header, Header, Vec
) { assert!(len >= 2, "chain should have a minimal length of two"); From d15b5337db50a9570eaa93a3ca62b0600c163a4e Mon Sep 17 00:00:00 2001 From: frisitano Date: Wed, 24 Dec 2025 15:09:04 +0000 Subject: [PATCH 5/5] update fcu logic --- crates/chain-orchestrator/src/lib.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 98f78082..1ecbfd6b 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -407,12 +407,15 @@ impl< // Check if the unwind impacts the fcs safe head. if let Some(block_info) = unwind_result.l2_safe_block_info { - // If the safe head was unwound and is above or equal to the finalized head, - // update the fcs. - if block_info.number != self.engine.fcs().safe_block_info().number && - block_info.number >= self.engine.fcs().finalized_block_info().number - { + // If the new safe head is above the current finalized head, update the fcs safe + // head to the new safe head. + if block_info.number >= self.engine.fcs().finalized_block_info().number { self.engine.update_fcs(None, Some(block_info), None).await?; + } else { + // Otherwise, update the fcs safe head to the finalized head. + self.engine + .update_fcs(None, Some(*self.engine.fcs().finalized_block_info()), None) + .await?; } }