diff --git a/crates/watcher/src/cache.rs b/crates/watcher/src/cache.rs deleted file mode 100644 index d1567c3d..00000000 --- a/crates/watcher/src/cache.rs +++ /dev/null @@ -1,103 +0,0 @@ -use crate::error::CacheError; - -use super::{EthRequestError, L1WatcherResult}; - -use std::num::NonZeroUsize; - -use alloy_primitives::{TxHash, B256}; -use alloy_provider::Provider; -use alloy_rpc_types_eth::{Transaction, TransactionTrait}; -use lru::LruCache; - -/// The L1 watcher cache. -#[derive(Debug)] -pub(crate) struct Cache { - transaction_cache: TransactionCache, - // TODO: introduce block cache. -} - -impl Cache { - /// Creates a new [`Cache`] instance with the given capacity for the transaction cache. - pub(crate) fn new(transaction_cache_capacity: NonZeroUsize) -> Self { - Self { transaction_cache: TransactionCache::new(transaction_cache_capacity) } - } - - /// Gets the transaction for the given hash, fetching it from the provider if not cached. - pub(crate) async fn get_transaction_by_hash( - &mut self, - tx_hash: TxHash, - provider: &P, - ) -> L1WatcherResult { - self.transaction_cache.get_transaction_by_hash(tx_hash, provider).await - } - - /// Gets the next blob versioned hash for the given transaction hash. - /// - /// Errors if the transaction is not in the cache. This method must be called only after - /// fetching the transaction via [`Self::get_transaction_by_hash`]. - pub(crate) async fn get_transaction_next_blob_versioned_hash( - &mut self, - tx_hash: TxHash, - ) -> L1WatcherResult> { - self.transaction_cache.get_transaction_next_blob_versioned_hash(tx_hash).await - } -} - -/// A cache for transactions fetched from the provider. -#[derive(Debug)] -struct TransactionCache { - cache: LruCache, -} - -#[derive(Debug)] -struct TransactionEntry { - transaction: Transaction, - blob_versioned_hashes: Vec, - blob_versioned_hashes_cursor: usize, -} - -impl TransactionCache { - fn new(capacity: NonZeroUsize) -> Self { - Self { cache: LruCache::new(capacity) } - } - - async fn get_transaction_by_hash( - &mut self, - tx_hash: TxHash, - provider: &P, - ) -> L1WatcherResult { - if let Some(entry) = self.cache.get(&tx_hash) { - return Ok(entry.transaction.clone()); - } - - let transaction = provider - .get_transaction_by_hash(tx_hash) - .await? - .ok_or(EthRequestError::MissingTransactionHash(tx_hash))?; - self.cache.put(tx_hash, transaction.clone().into()); - Ok(transaction) - } - - async fn get_transaction_next_blob_versioned_hash( - &mut self, - tx_hash: TxHash, - ) -> L1WatcherResult> { - if let Some(entry) = self.cache.get_mut(&tx_hash) { - let blob_versioned_hash = - entry.blob_versioned_hashes.get(entry.blob_versioned_hashes_cursor).copied(); - entry.blob_versioned_hashes_cursor += 1; - Ok(blob_versioned_hash) - } else { - Err(CacheError::MissingTransactionInCacheForBlobVersionedHash(tx_hash).into()) - } - } -} - -impl From for TransactionEntry { - fn from(transaction: Transaction) -> Self { - let blob_versioned_hashes = - transaction.blob_versioned_hashes().map(|hashes| hashes.to_vec()).unwrap_or_default(); - - Self { transaction, blob_versioned_hashes, blob_versioned_hashes_cursor: 0 } - } -} diff --git a/crates/watcher/src/error.rs b/crates/watcher/src/error.rs index 64ac3101..7b1633e9 100644 --- a/crates/watcher/src/error.rs +++ b/crates/watcher/src/error.rs @@ -1,6 +1,7 @@ use crate::L1Notification; use alloy_json_rpc::RpcError; use alloy_primitives::B256; +use alloy_sol_types::Error; use alloy_transport::TransportErrorKind; use rollup_node_providers::L1ProviderError; use std::sync::Arc; @@ -61,6 +62,14 @@ pub enum FilterLogError { /// Invalid extracted notification length. #[error("expected {0} notifications, got {1}")] InvalidNotificationCount(usize, usize), + /// Failed to decode log of expected type. + #[error("failed to decode log as {log_type} with error {error}")] + DecodeLogFailed { + /// The expected log type. + log_type: &'static str, + /// The decoding error. + error: Error, + }, } /// An error that occurred when accessing data from the cache. diff --git a/crates/watcher/src/lib.rs b/crates/watcher/src/lib.rs index 8a09c3b1..22141e95 100644 --- a/crates/watcher/src/lib.rs +++ b/crates/watcher/src/lib.rs @@ -1,8 +1,5 @@ //! L1 watcher for the Scroll Rollup Node. -mod cache; -use cache::Cache; - mod error; pub use error::{EthRequestError, FilterLogError, L1WatcherError}; @@ -19,7 +16,6 @@ use alloy_provider::{Network, Provider}; use alloy_rpc_types_eth::{BlockNumberOrTag, Filter, Log, TransactionTrait}; use alloy_sol_types::SolEvent; use error::L1WatcherResult; -use itertools::Itertools; use rollup_node_primitives::{ BatchCommitData, BatchInfo, BlockInfo, BoundedVec, ConsensusUpdate, L1BlockStartupInfo, NodeConfig, @@ -27,10 +23,9 @@ use rollup_node_primitives::{ use rollup_node_providers::SystemContractProvider; use scroll_alloy_consensus::TxL1Message; use scroll_l1::abi::logs::{ - try_decode_log, CommitBatch, FinalizeBatch, QueueTransaction, RevertBatch_0, RevertBatch_1, + CommitBatch, FinalizeBatch, QueueTransaction, RevertBatch_0, RevertBatch_1, }; use std::{ - cmp::Ordering, fmt::{Debug, Display, Formatter}, num::NonZeroUsize, sync::Arc, @@ -85,8 +80,6 @@ pub struct L1Watcher { unfinalized_blocks: BoundedVec
, /// The L1 state info relevant to the rollup node. l1_state: L1State, - /// The cache for the L1 watcher. - cache: Cache, /// The latest indexed block. current_block_number: BlockNumber, /// The sender part of the channel for [`L1Notification`]. @@ -265,7 +258,6 @@ where unfinalized_blocks: BoundedVec::new(HEADER_CAPACITY), current_block_number: start_block.saturating_sub(1), l1_state, - cache: Cache::new(TRANSACTION_CACHE_CAPACITY), sender: tx, config, metrics: WatcherMetrics::default(), @@ -341,21 +333,34 @@ where // prepare notifications. let mut notifications = Vec::with_capacity(logs.len()); - for log in logs { - let sig = log.topics()[0]; + // Process logs grouped by signature. + let mut i = 0; + while i < logs.len() { + let sig = logs[i].topics()[0]; + let start = i; + + // Find the end of the group with the same signature. + while i < logs.len() && logs[i].topics()[0] == sig { + i += 1; + } - let notification = match sig { - QueueTransaction::SIGNATURE_HASH => self.handle_l1_messages(&[log]).await?, - CommitBatch::SIGNATURE_HASH => self.handle_batch_commits(&[log]).await?, - FinalizeBatch::SIGNATURE_HASH => self.handle_batch_finalization(&[log]).await?, - RevertBatch_0::SIGNATURE_HASH => self.handle_batch_reverts(&[log]).await?, + // Create a slice for the current group of logs. + let group_logs = &logs[start..i]; + + let group_notifications = match sig { + QueueTransaction::SIGNATURE_HASH => self.handle_l1_messages(group_logs).await?, + CommitBatch::SIGNATURE_HASH => self.handle_batch_commits(group_logs).await?, + FinalizeBatch::SIGNATURE_HASH => { + self.handle_batch_finalization(group_logs).await? + } + RevertBatch_0::SIGNATURE_HASH => self.handle_batch_reverts(group_logs).await?, RevertBatch_1::SIGNATURE_HASH => { - self.handle_batch_revert_ranges(&[log]).await? + self.handle_batch_revert_ranges(group_logs).await? } _ => unreachable!("log signature already filtered"), }; - notifications.extend(notification); + notifications.extend(group_notifications); } if let Some(system_contract_update) = @@ -487,32 +492,22 @@ where Ok(()) } - /// Filters the logs into L1 messages and sends them over the channel. + /// Handles L1 message events. #[tracing::instrument(skip_all)] async fn handle_l1_messages(&self, logs: &[Log]) -> L1WatcherResult> { - let mut l1_messages = logs - .iter() - .map(|l| (&l.inner, l.block_number, l.block_hash, l.block_timestamp)) - .filter_map(|(log, bn, bh, ts)| { - try_decode_log::(log) - .map(|log| (Into::::into(log.data), bn, bh, ts)) - }) - .collect::>(); - - // prepare notifications - let mut notifications = Vec::with_capacity(l1_messages.len()); - - // sort the message by index and group by block number. - l1_messages.sort_by(|(m1, _, _, _), (m2, _, _, _)| m1.queue_index.cmp(&m2.queue_index)); - let groups = l1_messages.into_iter().chunk_by(|(_, bn, bh, _)| (*bn, *bh)); - let groups: Vec<_> = - groups.into_iter().map(|(bn, group)| (bn, group.collect::>())).collect(); - - for ((bn, bh), group) in groups { - let block_number = bn.ok_or(FilterLogError::MissingBlockNumber)?; - let block_hash = bh.ok_or(FilterLogError::MissingBlockHash)?; - // fetch the timestamp if missing from the log. - let block_timestamp = if let Some(ts) = group.first().and_then(|(_, _, _, ts)| *ts) { + let mut notifications = Vec::with_capacity(logs.len()); + + for log in logs { + let l1_message: TxL1Message = QueueTransaction::decode_log(&log.inner) + .map_err(|error| FilterLogError::DecodeLogFailed { + log_type: "QueueTransaction", + error, + })? + .data + .into(); + let block_number = log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = log.block_hash.ok_or(FilterLogError::MissingBlockHash)?; + let block_timestamp = if let Some(ts) = log.block_timestamp { ts } else { self.execution_provider @@ -522,88 +517,81 @@ where .ok_or(FilterLogError::MissingBlockTimestamp)? }; - // push notifications in vector. - for (msg, _, _, _) in group { - notifications.push(L1Notification::L1Message { - message: msg, - block_info: BlockInfo { number: block_number, hash: block_hash }, - block_timestamp, - }); - } + notifications.push(L1Notification::L1Message { + message: l1_message, + block_info: BlockInfo { number: block_number, hash: block_hash }, + block_timestamp, + }); } + Ok(notifications) } - /// Handles the batch commits events. + /// Handles batch commits events. #[tracing::instrument(skip_all)] - async fn handle_batch_commits(&mut self, logs: &[Log]) -> L1WatcherResult> { - // filter commit logs and skip genesis batch (batch_index == 0). - let mut commit_logs_with_tx = logs - .iter() - .map(|l| (l, l.transaction_hash)) - .filter_map(|(log, tx_hash)| { - let tx_hash = tx_hash?; - try_decode_log::(&log.inner) - .filter(|decoded| !decoded.data.batch_index.is_zero()) - .map(|decoded| (log, decoded.data, tx_hash)) - }) - .collect::>(); - + async fn handle_batch_commits(&self, logs: &[Log]) -> L1WatcherResult> { // prepare notifications - let mut notifications = Vec::with_capacity(commit_logs_with_tx.len()); - - // sort the commits by block number then batch index, then group by tx hash. - commit_logs_with_tx.sort_by(|(log_a, data_a, _), (log_b, data_b, _)| { - log_a - .block_number - .and_then(|a| log_b.block_number.map(|b| a.cmp(&b))) - .unwrap_or(Ordering::Equal) - .then_with(|| data_a.batch_index.cmp(&data_b.batch_index)) - }); - let groups = commit_logs_with_tx.into_iter().chunk_by(|(_, _, hash)| *hash); - let groups: Vec<_> = - groups.into_iter().map(|(hash, group)| (hash, group.collect::>())).collect(); - - // iterate each group of commits - for (tx_hash, group) in groups { - // fetch the commit transaction. - let transaction = - self.cache.get_transaction_by_hash(tx_hash, &self.execution_provider).await?; - - // get the calldata. - let input = Arc::new(transaction.input().clone()); - - // iterate the logs emitted in the group - for (raw_log, decoded_log, _) in group { - let block_number = - raw_log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; - let block_hash = raw_log.block_hash.ok_or(FilterLogError::MissingBlockHash)?; - let blob_versioned_hash = - self.cache.get_transaction_next_blob_versioned_hash(tx_hash).await?; - // if the log is missing the block timestamp, we need to fetch it. - // the block timestamp is necessary in order to derive the beacon - // slot and query the blobs. - let block_timestamp = if let Some(ts) = raw_log.block_timestamp { - ts - } else { - self.execution_provider - .get_block(block_number.into()) - .await? - .map(|b| b.header.timestamp) - .ok_or(FilterLogError::MissingBlockTimestamp)? - }; + let mut notifications = Vec::with_capacity(logs.len()); + + // Process batch commits grouped by transaction hash + for logs in logs.chunk_by(|a, b| a.transaction_hash == b.transaction_hash) { + // Extract common data from the first log in the group + let block_number = logs + .first() + .and_then(|log| log.block_number) + .ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = logs + .first() + .and_then(|log| log.block_hash) + .ok_or(FilterLogError::MissingBlockHash)?; + let block_timestamp = if let Some(ts) = logs.first().and_then(|log| log.block_timestamp) + { + ts + } else { + self.execution_provider + .get_block(block_number.into()) + .await? + .map(|b| b.header.timestamp) + .ok_or(FilterLogError::MissingBlockTimestamp)? + }; + let tx_hash = logs + .first() + .and_then(|log| log.transaction_hash) + .ok_or(FilterLogError::MissingTransactionHash)?; + let tx = self + .execution_provider + .get_transaction_by_hash(tx_hash) + .await? + .ok_or(EthRequestError::MissingTransactionHash(tx_hash))?; + let tx_input = Arc::new(tx.input().clone()); + + for (idx, log) in logs.iter().enumerate() { + let commit_batch = CommitBatch::decode_log(&log.inner) + .map_err(|error| FilterLogError::DecodeLogFailed { + log_type: "CommitBatch", + error, + })? + .data; + + if commit_batch.batch_index.is_zero() { + // skip genesis batch. + continue; + } + let batch_index = - decoded_log.batch_index.uint_try_to().expect("u256 to u64 conversion error"); + commit_batch.batch_index.uint_try_to().expect("u256 to u64 conversion error"); + let blob_versioned_hash = + tx.blob_versioned_hashes().and_then(|hashes| hashes.get(idx).copied()); // push in vector. notifications.push(L1Notification::BatchCommit { block_info: BlockInfo { number: block_number, hash: block_hash }, data: BatchCommitData { - hash: decoded_log.batch_hash, + hash: commit_batch.batch_hash, index: batch_index, block_number, block_timestamp, - calldata: input.clone(), + calldata: tx_input.clone(), blob_versioned_hash, finalized_block_number: None, reverted_block_number: None, @@ -611,31 +599,34 @@ where }); } } + Ok(notifications) } /// Handles the batch revert events. #[tracing::instrument(skip_all)] async fn handle_batch_reverts(&self, logs: &[Log]) -> L1WatcherResult> { - // filter revert logs. - logs.iter() - .map(|l| (l, l.block_number, l.block_hash)) - .filter_map(|(log, bn, bh)| { - try_decode_log::(&log.inner).map(|decoded| (decoded.data, bn, bh)) - }) - .map(|(decoded_log, maybe_block_number, maybe_block_hash)| { - // process the revert log. - let block_number = maybe_block_number.ok_or(FilterLogError::MissingBlockNumber)?; - let block_hash = maybe_block_hash.ok_or(FilterLogError::MissingBlockHash)?; - let batch_index = - decoded_log.batchIndex.uint_try_to().expect("u256 to u64 conversion error"); - let batch_hash = decoded_log.batchHash; - Ok(L1Notification::BatchRevert { - batch_info: BatchInfo { index: batch_index, hash: batch_hash }, - block_info: BlockInfo { number: block_number, hash: block_hash }, - }) - }) - .collect() + let mut notifications = Vec::with_capacity(logs.len()); + + for log in logs { + let revert_batch = RevertBatch_0::decode_log(&log.inner) + .map_err(|error| FilterLogError::DecodeLogFailed { + log_type: "RevertBatch_0", + error, + })? + .data; + let block_number = log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = log.block_hash.ok_or(FilterLogError::MissingBlockHash)?; + let batch_hash = revert_batch.batchHash; + let batch_index = + revert_batch.batchIndex.uint_try_to().expect("u256 to u64 conversion error"); + notifications.push(L1Notification::BatchRevert { + batch_info: BatchInfo { index: batch_index, hash: batch_hash }, + block_info: BlockInfo { number: block_number, hash: block_hash }, + }); + } + + Ok(notifications) } /// Handle the batch revert range events. @@ -644,31 +635,33 @@ where &self, logs: &[Log], ) -> L1WatcherResult> { - // filter revert range logs. - logs.iter() - .map(|l| (l, l.block_number, l.block_hash)) - .filter_map(|(log, bn, bh)| { - try_decode_log::(&log.inner).map(|decoded| (decoded.data, bn, bh)) - }) - .map(|(decoded_log, maybe_block_number, maybe_block_hash)| { - // process the revert range log. - let block_number = maybe_block_number.ok_or(FilterLogError::MissingBlockNumber)?; - let block_hash = maybe_block_hash.ok_or(FilterLogError::MissingBlockHash)?; - let start_index = decoded_log - .startBatchIndex - .uint_try_to() - .expect("u256 to u64 conversion error"); - let end_index = decoded_log - .finishBatchIndex - .uint_try_to() - .expect("u256 to u64 conversion error"); - Ok(L1Notification::BatchRevertRange { - start: start_index, - end: end_index, - block_info: BlockInfo { number: block_number, hash: block_hash }, - }) - }) - .collect() + let mut notifications = Vec::with_capacity(logs.len()); + + for log in logs { + let revert_batch_range = RevertBatch_1::decode_log(&log.inner) + .map_err(|error| FilterLogError::DecodeLogFailed { + log_type: "RevertBatch_1", + error, + })? + .data; + let block_number = log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = log.block_hash.ok_or(FilterLogError::MissingBlockHash)?; + let start_index = revert_batch_range + .startBatchIndex + .uint_try_to() + .expect("u256 to u64 conversion error"); + let end_index = revert_batch_range + .finishBatchIndex + .uint_try_to() + .expect("u256 to u64 conversion error"); + notifications.push(L1Notification::BatchRevertRange { + start: start_index, + end: end_index, + block_info: BlockInfo { number: block_number, hash: block_hash }, + }); + } + + Ok(notifications) } /// Handles the finalize batch events. @@ -677,27 +670,33 @@ where &self, logs: &[Log], ) -> L1WatcherResult> { - // filter finalize logs and skip genesis batch (batch_index == 0). - logs.iter() - .map(|l| (l, l.block_number, l.block_hash)) - .filter_map(|(log, bn, bh)| { - try_decode_log::(&log.inner) - .filter(|decoded| !decoded.data.batch_index.is_zero()) - .map(|decoded| (decoded.data, bn, bh)) - }) - .map(|(decoded_log, maybe_block_number, maybe_block_hash)| { - // fetch the finalize transaction. - let block_number = maybe_block_number.ok_or(FilterLogError::MissingBlockNumber)?; - let block_hash = maybe_block_hash.ok_or(FilterLogError::MissingBlockHash)?; - let index = - decoded_log.batch_index.uint_try_to().expect("u256 to u64 conversion error"); - Ok(L1Notification::BatchFinalization { - hash: decoded_log.batch_hash, - index, - block_info: BlockInfo { number: block_number, hash: block_hash }, - }) - }) - .collect() + let mut notifications = Vec::with_capacity(logs.len()); + + for log in logs { + let finalize_batch = FinalizeBatch::decode_log(&log.inner) + .map_err(|error| FilterLogError::DecodeLogFailed { + log_type: "FinalizeBatch", + error, + })? + .data; + + if finalize_batch.batch_index.is_zero() { + // skip genesis batch. + continue; + } + + let block_number = log.block_number.ok_or(FilterLogError::MissingBlockNumber)?; + let block_hash = log.block_hash.ok_or(FilterLogError::MissingBlockHash)?; + let index = + finalize_batch.batch_index.uint_try_to().expect("u256 to u64 conversion error"); + notifications.push(L1Notification::BatchFinalization { + hash: finalize_batch.batch_hash, + index, + block_info: BlockInfo { number: block_number, hash: block_hash }, + }); + } + + Ok(notifications) } /// Handles the system contract update events. @@ -883,7 +882,6 @@ mod tests { execution_provider: provider, unfinalized_blocks: unfinalized_blocks.into(), l1_state: L1State { head: Default::default(), finalized: Default::default() }, - cache: Cache::new(TRANSACTION_CACHE_CAPACITY), current_block_number: 0, sender: tx, config: Arc::new(NodeConfig::mainnet()), @@ -1080,7 +1078,20 @@ mod tests { let (watcher, _) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); // build test logs. - let mut logs = (0..10).map(|_| random!(Log)).collect::>(); + let mut logs = Vec::new(); + + // Produce a random log + let mut queue_transaction = random!(Log); + let mut inner_log = random!(alloy_primitives::Log); + inner_log.data = random!(QueueTransaction).encode_log_data(); + queue_transaction.inner = inner_log; + queue_transaction.block_number = Some(random!(u64)); + queue_transaction.block_timestamp = Some(random!(u64)); + queue_transaction.block_hash = Some(random!(B256)); + queue_transaction.topics_mut()[0] = QueueTransaction::SIGNATURE_HASH; + logs.push(queue_transaction); + + // Produce another random log let mut queue_transaction = random!(Log); let mut inner_log = random!(alloy_primitives::Log); inner_log.data = random!(QueueTransaction).encode_log_data(); @@ -1088,13 +1099,17 @@ mod tests { queue_transaction.block_number = Some(random!(u64)); queue_transaction.block_timestamp = Some(random!(u64)); queue_transaction.block_hash = Some(random!(B256)); + queue_transaction.topics_mut()[0] = QueueTransaction::SIGNATURE_HASH; logs.push(queue_transaction); // When - let notification = watcher.handle_l1_messages(&logs).await?.pop().unwrap(); + let notifications = watcher.handle_l1_messages(&logs).await?; // Then - assert!(matches!(notification, L1Notification::L1Message { .. })); + assert_eq!(notifications.len(), logs.len()); + for notification in notifications { + assert!(matches!(notification, L1Notification::L1Message { .. })); + } Ok(()) } @@ -1116,11 +1131,16 @@ mod tests { effective_gas_price: None, }; - let (mut watcher, _) = + let (watcher, _) = l1_watcher(chain, vec![], vec![tx.clone()], finalized.clone(), latest.clone()); // build test logs. - let mut logs = (0..10).map(|_| random!(Log)).collect::>(); + let mut logs = Vec::new(); + let block_number = random!(u64); + let block_hash = random!(B256); + let block_timestamp = random!(u64); + + // Produce a random batch commit log. let mut batch_commit = random!(Log); let mut inner_log = random!(alloy_primitives::Log); inner_log.data = @@ -1128,16 +1148,32 @@ mod tests { .encode_log_data(); batch_commit.inner = inner_log; batch_commit.transaction_hash = Some(*tx.inner.tx_hash()); - batch_commit.block_number = Some(random!(u64)); - batch_commit.block_hash = Some(random!(B256)); - batch_commit.block_timestamp = Some(random!(u64)); + batch_commit.block_number = Some(block_number); + batch_commit.block_hash = Some(block_hash); + batch_commit.block_timestamp = Some(block_timestamp); + logs.push(batch_commit); + + // Produce another random batch commit log. + let mut batch_commit = random!(Log); + let mut inner_log = random!(alloy_primitives::Log); + inner_log.data = + CommitBatch { batch_index: U256::from(random!(u64)), batch_hash: random!(B256) } + .encode_log_data(); + batch_commit.inner = inner_log; + batch_commit.transaction_hash = Some(*tx.inner.tx_hash()); + batch_commit.block_number = Some(block_number); + batch_commit.block_hash = Some(block_hash); + batch_commit.block_timestamp = Some(block_timestamp); logs.push(batch_commit); // When - let notification = watcher.handle_batch_commits(&logs).await?.pop().unwrap(); + let notifications = watcher.handle_batch_commits(&logs).await?; // Then - assert!(matches!(notification, L1Notification::BatchCommit { .. })); + assert_eq!(notifications.len(), logs.len()); + for notification in notifications { + assert!(matches!(notification, L1Notification::BatchCommit { .. })); + } Ok(()) } @@ -1149,7 +1185,7 @@ mod tests { let (watcher, _) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); // build test logs. - let mut logs = (0..10).map(|_| random!(Log)).collect::>(); + let mut logs = Vec::new(); let mut revert_batch = random!(Log); let mut inner_log = random!(alloy_primitives::Log); inner_log.data = @@ -1176,7 +1212,7 @@ mod tests { let (watcher, _) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); // build test logs. - let mut logs = (0..10).map(|_| random!(Log)).collect::>(); + let mut logs = Vec::new(); let mut revert_batch_range = random!(Log); let mut inner_log = random!(alloy_primitives::Log); inner_log.data = RevertBatch_1 { @@ -1205,7 +1241,9 @@ mod tests { let (watcher, _) = l1_watcher(chain, vec![], vec![], finalized.clone(), latest.clone()); // build test logs. - let mut logs = (0..10).map(|_| random!(Log)).collect::>(); + let mut logs = Vec::new(); + + // Produce a random finalize commit log. let mut finalize_commit = random!(Log); let mut inner_log = random!(alloy_primitives::Log); let mut batch = random!(FinalizeBatch);