Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub enum ChainOrchestratorEvent {
/// The L2 safe block info.
l2_safe_block_info: Option<BlockInfo>,
},
/// The chain has been unwound to the specified L1 block number.
UnwoundToL1Block(u64),
/// The chain orchestrator has synced to the L1 head.
L1Synced,
/// An L2 block has been committed returning the [`L2BlockInfoWithL1Messages`] and an
Expand Down
2 changes: 2 additions & 0 deletions crates/chain-orchestrator/src/handle/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum ChainOrchestratorCommand<N: FullNetwork<Primitives = ScrollNetworkPrimi
DisableAutomaticSequencing(oneshot::Sender<bool>),
/// Send a database query to the rollup manager.
DatabaseQuery(DatabaseQuery),
/// Revert the rollup node state to the specified L1 block number.
RevertToL1Block((u64, oneshot::Sender<bool>)),
/// Enable gossiping of blocks to peers.
#[cfg(feature = "test-utils")]
SetGossip((bool, oneshot::Sender<()>)),
Expand Down
10 changes: 10 additions & 0 deletions crates/chain-orchestrator/src/handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> ChainOrchestratorHand
rx.await
}

/// Revert the rollup node state to the specified L1 block number.
pub async fn revert_to_l1_block(
&self,
block_number: u64,
) -> Result<bool, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
self.send_command(ChainOrchestratorCommand::RevertToL1Block((block_number, tx)));
rx.await
}

/// Sends a command to the rollup manager to enable or disable gossiping of blocks to peers.
#[cfg(feature = "test-utils")]
pub async fn set_gossip(&self, enabled: bool) -> Result<(), oneshot::error::RecvError> {
Expand Down
31 changes: 24 additions & 7 deletions crates/chain-orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use rollup_node_primitives::{
use rollup_node_providers::L1MessageProvider;
use rollup_node_sequencer::{Sequencer, SequencerEvent};
use rollup_node_signer::{SignatureAsBytes, SignerEvent, SignerHandle};
use rollup_node_watcher::L1Notification;
use rollup_node_watcher::{L1Notification, L1WatcherHandle};
use scroll_alloy_consensus::{ScrollTxEnvelope, TxL1Message};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
Expand All @@ -35,7 +35,7 @@ use scroll_network::{
BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent,
};
use std::{collections::VecDeque, sync::Arc, time::Instant, vec};
use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver};
use tokio::sync::mpsc::{self, UnboundedReceiver};

mod config;
pub use config::ChainOrchestratorConfig;
Expand Down Expand Up @@ -115,8 +115,8 @@ pub struct ChainOrchestrator<
database: Arc<Database>,
/// The current sync state of the [`ChainOrchestrator`].
sync_state: SyncState,
/// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`].
l1_notification_rx: Receiver<Arc<L1Notification>>,
/// A handle for the [`rollup_node_watcher::L1Watcher`].
l1_watcher: L1WatcherHandle,
/// The network manager that manages the scroll p2p network.
network: ScrollNetwork<N>,
/// The consensus algorithm used by the rollup node.
Expand Down Expand Up @@ -150,7 +150,7 @@ impl<
config: ChainOrchestratorConfig<ChainSpec>,
block_client: Arc<FullBlockClient<<N as BlockDownloaderProvider>::Client>>,
l2_provider: L2P,
l1_notification_rx: Receiver<Arc<L1Notification>>,
l1_watcher: L1WatcherHandle,
network: ScrollNetwork<N>,
consensus: Box<dyn Consensus + 'static>,
engine: Engine<EC>,
Expand All @@ -167,7 +167,7 @@ impl<
database,
config,
sync_state: SyncState::default(),
l1_notification_rx,
l1_watcher,
network,
consensus,
engine,
Expand Down Expand Up @@ -224,7 +224,7 @@ impl<
let res = self.handle_network_event(event).await;
self.handle_outcome(res);
}
Some(notification) = self.l1_notification_rx.recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
Some(notification) = self.l1_watcher.l1_notification_receiver().recv(), if self.sync_state.l2().is_synced() && self.derivation_pipeline.is_empty() => {
let res = self.handle_l1_notification(notification).await;
self.handle_outcome(res);
}
Expand Down Expand Up @@ -401,6 +401,23 @@ impl<
let _ = sender.send(l1_message);
}
},
ChainOrchestratorCommand::RevertToL1Block((block_number, tx)) => {
self.sync_state.l1_mut().set_syncing();
let unwind_result = self.database.unwind(block_number).await?;

// Check if the unwind impacts the fcs safe head.
if let Some(block_info) = unwind_result.l2_safe_block_info {
// If the safe head was unwound and is above or equal to the finalized head,
// update the fcs.
if block_info.number != self.engine.fcs().safe_block_info().number &&
block_info.number >= self.engine.fcs().finalized_block_info().number
{
self.engine.update_fcs(None, Some(block_info), None).await?;
}
}
self.notify(ChainOrchestratorEvent::UnwoundToL1Block(block_number));
let _ = tx.send(true);
}
#[cfg(feature = "test-utils")]
ChainOrchestratorCommand::SetGossip((enabled, tx)) => {
self.network.handle().set_gossip(enabled).await;
Expand Down
6 changes: 3 additions & 3 deletions crates/node/src/add_ons/handle.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#[cfg(feature = "test-utils")]
use crate::test_utils::l1_helpers::L1WatcherMock;
use reth_network_api::FullNetwork;
use reth_node_api::FullNodeComponents;
use reth_node_builder::rpc::{RpcHandle, RpcHandleProvider};
use reth_rpc_eth_api::EthApiTypes;
use reth_scroll_node::ScrollNetworkPrimitives;
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
#[cfg(feature = "test-utils")]
use {rollup_node_watcher::L1Notification, std::sync::Arc, tokio::sync::mpsc::Sender};

/// A handle for scroll addons, which includes handles for the rollup manager and RPC server.
#[derive(Debug, Clone)]
Expand All @@ -19,7 +19,7 @@ pub struct ScrollAddOnsHandle<
pub rpc_handle: RpcHandle<Node, EthApi>,
/// An optional channel used to send `L1Watcher` notifications to the `RollupNodeManager`.
#[cfg(feature = "test-utils")]
pub l1_watcher_tx: Option<Sender<Arc<L1Notification>>>,
pub l1_watcher_tx: Option<L1WatcherMock>,
}

impl<
Expand Down
12 changes: 5 additions & 7 deletions crates/node/src/add_ons/rollup.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::args::ScrollRollupNodeConfig;
use crate::{args::ScrollRollupNodeConfig, test_utils::l1_helpers::L1WatcherMock};

use reth_chainspec::NamedChain;
use reth_network::NetworkProtocols;
Expand All @@ -9,11 +9,9 @@ use reth_rpc_eth_api::EthApiTypes;
use reth_scroll_chainspec::{ChainConfig, ScrollChainConfig, ScrollChainSpec};
use reth_scroll_node::ScrollNetworkPrimitives;
use rollup_node_chain_orchestrator::ChainOrchestratorHandle;
use rollup_node_watcher::L1Notification;
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_wire::ScrollWireEvent;
use std::sync::Arc;
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use tokio::sync::mpsc::UnboundedReceiver;

/// Implementing the trait allows the type to return whether it is configured for dev chain.
#[auto_impl::auto_impl(Arc)]
Expand Down Expand Up @@ -55,13 +53,13 @@ impl RollupManagerAddOn {
self,
ctx: AddOnsContext<'_, N>,
rpc: RpcHandle<N, EthApi>,
) -> eyre::Result<(ChainOrchestratorHandle<N::Network>, Option<Sender<Arc<L1Notification>>>)>
) -> eyre::Result<(ChainOrchestratorHandle<N::Network>, Option<L1WatcherMock>)>
where
<<N as FullNodeTypes>::Types as NodeTypes>::ChainSpec:
ChainConfig<Config = ScrollChainConfig> + ScrollHardforks + IsDevChain,
N::Network: NetworkProtocols + FullNetwork<Primitives = ScrollNetworkPrimitives>,
{
let (chain_orchestrator, handle, l1_notification_tx) = self
let (chain_orchestrator, handle, l1_watcher_mock) = self
.config
.build((&ctx).into(), self.scroll_wire_event, rpc.rpc_server_handles)
.await?;
Expand All @@ -70,6 +68,6 @@ impl RollupManagerAddOn {
.spawn_critical_with_shutdown_signal("rollup_node_manager", |shutdown| {
chain_orchestrator.run_until_shutdown(shutdown)
});
Ok((handle, l1_notification_tx))
Ok((handle, l1_watcher_mock))
}
}
26 changes: 26 additions & 0 deletions crates/node/src/add_ons/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub trait RollupNodeAdminApi {
/// Disables automatic sequencing in the rollup node.
#[method(name = "disableAutomaticSequencing")]
async fn disable_automatic_sequencing(&self) -> RpcResult<bool>;

/// Reverts the rollup node state to a specified L1 block number.
#[method(name = "revertToL1Block")]
async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool>;
}

#[async_trait]
Expand Down Expand Up @@ -220,6 +224,24 @@ where
)
})
}

async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool> {
let handle = self.rollup_manager_handle().await.map_err(|e| {
ErrorObjectOwned::owned(
error::INTERNAL_ERROR_CODE,
format!("Failed to get rollup manager handle: {}", e),
None::<()>,
)
})?;

handle.revert_to_l1_block(block_number).await.map_err(|e| {
ErrorObjectOwned::owned(
error::INTERNAL_ERROR_CODE,
format!("Failed to revert to L1 block {}: {}", block_number, e),
None::<()>,
)
})
}
}

// Implement RollupNodeApiServer for Arc<RollupNodeRpcExt<N>> to allow shared ownership
Expand Down Expand Up @@ -257,4 +279,8 @@ where
async fn disable_automatic_sequencing(&self) -> RpcResult<bool> {
(**self).disable_automatic_sequencing().await
}

async fn revert_to_l1_block(&self, block_number: u64) -> RpcResult<bool> {
(**self).revert_to_l1_block(block_number).await
}
}
76 changes: 42 additions & 34 deletions crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
use scroll_migration::MigratorTrait;
use std::{fs, path::PathBuf, sync::Arc};

use super::test_utils::l1_helpers::L1WatcherMock;
use alloy_chains::NamedChain;
use alloy_primitives::{hex, Address, U128};
use alloy_provider::{layers::CacheLayer, Provider, ProviderBuilder};
Expand Down Expand Up @@ -38,7 +39,7 @@ use rollup_node_providers::{
use rollup_node_sequencer::{
L1MessageInclusionMode, PayloadBuildingConfig, Sequencer, SequencerConfig,
};
use rollup_node_watcher::{L1Notification, L1Watcher};
use rollup_node_watcher::{L1Watcher, L1WatcherHandle};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
use scroll_alloy_provider::{ScrollAuthApiEngineClient, ScrollEngineApi};
Expand All @@ -51,7 +52,7 @@ use scroll_engine::{Engine, ForkchoiceState};
use scroll_migration::traits::ScrollMigrator;
use scroll_network::ScrollNetworkManager;
use scroll_wire::ScrollWireEvent;
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use tokio::sync::mpsc::UnboundedReceiver;

/// A struct that represents the arguments for the rollup node.
#[derive(Debug, Clone, clap::Args)]
Expand Down Expand Up @@ -166,7 +167,7 @@ impl ScrollRollupNodeConfig {
impl ScrollEngineApi,
>,
ChainOrchestratorHandle<N>,
Option<Sender<Arc<L1Notification>>>,
Option<L1WatcherMock>,
)>
where
N: FullNetwork<Primitives = ScrollNetworkPrimitives> + NetworkProtocols,
Expand Down Expand Up @@ -350,35 +351,42 @@ impl ScrollRollupNodeConfig {
};
let consensus = self.consensus_args.consensus(authorized_signer)?;

let (l1_notification_tx, l1_notification_rx): (Option<Sender<Arc<L1Notification>>>, _) =
if let Some(provider) = l1_provider.filter(|_| !self.test) {
tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher");
(
None,
Some(
L1Watcher::spawn(
provider,
l1_block_startup_info,
node_config,
self.l1_provider_args.logs_query_block_range,
)
.await,
),
)
} else {
// Create a channel for L1 notifications that we can use to inject L1 messages for
// testing
#[cfg(feature = "test-utils")]
{
let (tx, rx) = tokio::sync::mpsc::channel(1000);
(Some(tx), Some(rx))
}

#[cfg(not(feature = "test-utils"))]
{
(None, None)
}
};
let (l1_watcher_mock, l1_watcher_handle) = if let Some(provider) =
l1_provider.filter(|_| !self.test)
{
tracing::info!(target: "scroll::node::args", ?l1_block_startup_info, "Starting L1 watcher");
(
None,
Some(
L1Watcher::spawn(
provider,
l1_block_startup_info,
node_config,
self.l1_provider_args.logs_query_block_range,
)
.await,
),
)
} else {
// Create a channel for L1 notifications that we can use to inject L1 messages for
// testing
#[cfg(feature = "test-utils")]
{
let (notification_tx, notification_rx) = tokio::sync::mpsc::channel(1000);
let (command_tx, command_rx) = tokio::sync::mpsc::unbounded_channel();
let handle = L1WatcherHandle::new(command_tx, notification_rx);
let watcher_mock = L1WatcherMock {
command_rx: Arc::new(tokio::sync::Mutex::new(command_rx)),
notification_tx,
};
(Some(watcher_mock), Some(handle))
}

#[cfg(not(feature = "test-utils"))]
{
(None, None)
}
};

// Construct the l1 provider.
let l1_messages_provider = db.clone();
Expand Down Expand Up @@ -457,7 +465,7 @@ impl ScrollRollupNodeConfig {
config,
Arc::new(block_client),
l2_provider,
l1_notification_rx.expect("L1 notification receiver should be set"),
l1_watcher_handle.expect("L1 notification receiver should be set"),
scroll_network_handle.into_scroll_network().await,
consensus,
engine,
Expand All @@ -467,7 +475,7 @@ impl ScrollRollupNodeConfig {
)
.await?;

Ok((chain_orchestrator, handle, l1_notification_tx))
Ok((chain_orchestrator, handle, l1_watcher_mock))
}
}

Expand Down
9 changes: 9 additions & 0 deletions crates/node/src/test_utils/event_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ impl<'a> EventWaiter<'a> {
Ok(())
}

/// Wait for chain unwound event on all specified nodes.
pub async fn revert_to_l1_block(self) -> eyre::Result<()> {
self.wait_for_event_on_all(|e| {
matches!(e, ChainOrchestratorEvent::UnwoundToL1Block(_)).then_some(())
})
.await?;
Ok(())
}

/// Wait for block consolidated event on all specified nodes.
pub async fn block_consolidated(self, target_block: u64) -> eyre::Result<()> {
self.wait_for_event_on_all(|e| {
Expand Down
Loading
Loading