From 15c1556e33946d5ed00f8994b58fc9be5e7820d1 Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 05:10:49 +0300 Subject: [PATCH 01/11] =?UTF-8?q?feat(realtime):=20support=20L2=20UserOps?= =?UTF-8?q?=20for=20bridge-out=20(L2=E2=86=92L1=20withdrawals)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the ability for users to submit UserOps that execute on L2, enabling bridge-out functionality. The catalyst now processes both L1→L2 deposits and L2→L1 withdrawals in the same block. Changes: - New `surge_sendL2UserOp` RPC method for submitting L2-targeted UserOps - L2 UserOp execution transactions are constructed and included in L2 blocks - After block execution, existing `find_l1_call()` detects the resulting bridge MessageSent events and relays them to L1 via processMessage - Block building handles mixed deposit + withdrawal transactions - Remove `disable_bridging` gate that prevented bridge handler startup Co-Authored-By: Claude Opus 4.6 (1M context) --- ...2026-03-29-auto-process-bridge-deposits.md | 768 ++++++++++++++++++ realtime/src/l1/deposit_watcher.rs | 214 +++++ realtime/src/l2/execution_layer.rs | 61 ++ realtime/src/lib.rs | 6 - .../node/proposal_manager/bridge_handler.rs | 41 + realtime/src/node/proposal_manager/mod.rs | 67 +- 6 files changed, 1148 insertions(+), 9 deletions(-) create mode 100644 docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md create mode 100644 realtime/src/l1/deposit_watcher.rs diff --git a/docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md b/docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md new file mode 100644 index 00000000..64b5f499 --- /dev/null +++ b/docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md @@ -0,0 +1,768 @@ +# Auto-Process L1 Bridge Deposits Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** The Catalyst sequencer should automatically detect L1 bridge `MessageSent` events and include `processMessage` transactions in L2 blocks, without requiring users to submit UserOps. + +**Architecture:** A new `DepositWatcher` polls L1 for `MessageSent` events on the bridge contract. Discovered deposits are queued as `L2Call` structs into the existing `BridgeHandler`, which already knows how to construct `processMessage` L2 transactions and inject them into block building. The `disable_bridging` gate is removed so the bridge handler starts. + +**Tech Stack:** Rust, Alloy (Ethereum RPC), Tokio (async), existing Catalyst crate structure + +--- + +## File Map + +| File | Action | Responsibility | +|------|--------|---------------| +| `realtime/src/l1/deposit_watcher.rs` | Create | Poll L1 bridge for MessageSent+SignalSent events, queue L2Calls | +| `realtime/src/l1/mod.rs` | Modify | Add `pub mod deposit_watcher;` | +| `realtime/src/node/proposal_manager/bridge_handler.rs` | Modify | Add deposit queue intake alongside UserOp queue | +| `realtime/src/node/proposal_manager/mod.rs` | Modify | Drain deposit queue in block building | +| `realtime/src/lib.rs` | Modify | Remove `disable_bridging` gate, start watcher | + +--- + +### Task 1: Create the L1 Deposit Watcher + +**Files:** +- Create: `realtime/src/l1/deposit_watcher.rs` +- Modify: `realtime/src/l1/mod.rs` + +- [ ] **Step 1: Create `deposit_watcher.rs`** + +This module polls L1 for `MessageSent` events on the bridge contract and co-located `SignalSent` events on the signal service. It filters for messages targeting our L2 chain ID and sends discovered `(Message, signal_slot)` pairs through a channel. + +```rust +// realtime/src/l1/deposit_watcher.rs + +use crate::shared_abi::bindings::{ + Bridge::MessageSent, + IBridge::Message, + SignalService::SignalSent, +}; +use alloy::{ + primitives::{Address, FixedBytes}, + providers::{DynProvider, Provider}, + rpc::types::Filter, + sol_types::SolEvent, +}; +use anyhow::Result; +use common::utils::cancellation_token::CancellationToken; +use std::sync::Arc; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +use crate::node::proposal_manager::bridge_handler::L2Call; + +/// Polls L1 for bridge deposit events and queues them for L2 processing. +pub struct DepositWatcher { + provider: DynProvider, + bridge_address: Address, + signal_service_address: Address, + l2_chain_id: u64, + tx: mpsc::Sender, + cancel_token: CancellationToken, +} + +impl DepositWatcher { + pub fn new( + provider: DynProvider, + bridge_address: Address, + signal_service_address: Address, + l2_chain_id: u64, + tx: mpsc::Sender, + cancel_token: CancellationToken, + ) -> Self { + Self { + provider, + bridge_address, + signal_service_address, + l2_chain_id, + tx, + cancel_token, + } + } + + /// Start polling in a background task. Returns the join handle. + pub fn start(self) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = self.run().await { + error!("DepositWatcher exited with error: {}", e); + } + }) + } + + async fn run(self) -> Result<()> { + // Start from the latest block + let mut from_block = self + .provider + .get_block_number() + .await + .map_err(|e| anyhow::anyhow!("Failed to get block number: {}", e))?; + + info!( + "DepositWatcher started: bridge={}, signal_service={}, l2_chain_id={}, from_block={}", + self.bridge_address, self.signal_service_address, self.l2_chain_id, from_block + ); + + loop { + if self.cancel_token.is_cancelled() { + info!("DepositWatcher shutting down"); + return Ok(()); + } + + tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; + + let latest_block = match self.provider.get_block_number().await { + Ok(n) => n, + Err(e) => { + warn!("DepositWatcher: failed to get block number: {}", e); + continue; + } + }; + + if latest_block < from_block { + continue; + } + + match self.scan_range(from_block, latest_block).await { + Ok(count) => { + if count > 0 { + info!( + "DepositWatcher: found {} deposits in blocks {}..{}", + count, from_block, latest_block + ); + } + from_block = latest_block + 1; + } + Err(e) => { + warn!( + "DepositWatcher: error scanning blocks {}..{}: {}", + from_block, latest_block, e + ); + // Retry same range next iteration + } + } + } + } + + async fn scan_range(&self, from_block: u64, to_block: u64) -> Result { + // Query MessageSent events from the bridge + let bridge_filter = Filter::new() + .address(self.bridge_address) + .event_signature(MessageSent::SIGNATURE_HASH) + .from_block(from_block) + .to_block(to_block); + + let bridge_logs = self + .provider + .get_logs(&bridge_filter) + .await + .map_err(|e| anyhow::anyhow!("Failed to get MessageSent logs: {}", e))?; + + if bridge_logs.is_empty() { + return Ok(0); + } + + // Query SignalSent events from the signal service in the same range + let signal_filter = Filter::new() + .address(self.signal_service_address) + .event_signature(SignalSent::SIGNATURE_HASH) + .from_block(from_block) + .to_block(to_block); + + let signal_logs = self + .provider + .get_logs(&signal_filter) + .await + .map_err(|e| anyhow::anyhow!("Failed to get SignalSent logs: {}", e))?; + + // Index signal slots by block number + tx index for matching + let mut signal_by_tx: std::collections::HashMap<(u64, u64), FixedBytes<32>> = + std::collections::HashMap::new(); + + for log in &signal_logs { + if let (Some(block_number), Some(tx_index)) = + (log.block_number, log.transaction_index) + { + let log_data = alloy::primitives::LogData::new_unchecked( + log.topics().to_vec(), + log.data().data.clone(), + ); + if let Ok(decoded) = SignalSent::decode_log_data(&log_data) { + signal_by_tx.insert((block_number, tx_index), decoded.slot); + } + } + } + + let mut count = 0; + + for log in &bridge_logs { + let log_data = alloy::primitives::LogData::new_unchecked( + log.topics().to_vec(), + log.data().data.clone(), + ); + + let decoded = match MessageSent::decode_log_data(&log_data) { + Ok(d) => d, + Err(e) => { + warn!("Failed to decode MessageSent: {}", e); + continue; + } + }; + + // Only process messages targeting our L2 + if decoded.message.destChainId != self.l2_chain_id { + debug!( + "Skipping message with destChainId={} (want {})", + decoded.message.destChainId, self.l2_chain_id + ); + continue; + } + + // Find matching signal slot from the same transaction + let signal_slot = if let (Some(block_number), Some(tx_index)) = + (log.block_number, log.transaction_index) + { + signal_by_tx.get(&(block_number, tx_index)).copied() + } else { + None + }; + + let Some(signal_slot) = signal_slot else { + warn!( + "No matching SignalSent for MessageSent in block={:?} tx={:?}", + log.block_number, log.transaction_index + ); + continue; + }; + + let l2_call = L2Call { + message_from_l1: decoded.message, + signal_slot_on_l2: signal_slot, + }; + + if let Err(e) = self.tx.send(l2_call).await { + error!("Failed to queue deposit L2Call: {}", e); + } else { + count += 1; + } + } + + Ok(count) + } +} +``` + +- [ ] **Step 2: Register the module in `l1/mod.rs`** + +Add the new module to `realtime/src/l1/mod.rs`: + +```rust +pub mod bindings; +pub mod config; +pub mod deposit_watcher; +pub mod execution_layer; +pub mod proposal_tx_builder; +pub mod protocol_config; +``` + +- [ ] **Step 3: Verify it compiles** + +Run from `/tmp/catalyst`: +```bash +cargo check -p realtime 2>&1 | tail -20 +``` + +Expected: may have unused warnings but no errors (the watcher isn't wired in yet). + +- [ ] **Step 4: Commit** + +```bash +git add realtime/src/l1/deposit_watcher.rs realtime/src/l1/mod.rs +git commit -m "feat(realtime): add L1 deposit watcher for bridge MessageSent events" +``` + +--- + +### Task 2: Add deposit queue to BridgeHandler + +**Files:** +- Modify: `realtime/src/node/proposal_manager/bridge_handler.rs` + +The `BridgeHandler` currently only receives bridge data from UserOps via the RPC channel. We add a second channel for direct L1 deposits discovered by the watcher. + +- [ ] **Step 1: Add deposit receiver field and constructor parameter** + +In `realtime/src/node/proposal_manager/bridge_handler.rs`, add the deposit channel: + +Add a new field to `BridgeHandler`: + +```rust +pub struct BridgeHandler { + ethereum_l1: Arc>, + taiko: Arc, + rx: Receiver, + deposit_rx: Receiver, + status_store: UserOpStatusStore, +} +``` + +Update `BridgeHandler::new()` to accept and store the receiver: + +```rust + pub async fn new( + addr: SocketAddr, + ethereum_l1: Arc>, + taiko: Arc, + cancellation_token: CancellationToken, + deposit_rx: Receiver, + ) -> Result { +``` + +And in the return value: + +```rust + Ok(Self { + ethereum_l1, + taiko, + rx, + deposit_rx, + status_store, + }) +``` + +- [ ] **Step 2: Add `next_deposit_l2_call()` method** + +Add a method that drains the deposit queue, returning the next L2Call from direct deposits: + +```rust + pub fn next_deposit_l2_call(&mut self) -> Option { + self.deposit_rx.try_recv().ok() + } + + pub fn has_pending_deposits(&self) -> bool { + !self.deposit_rx.is_empty() + } +``` + +- [ ] **Step 3: Verify it compiles** + +```bash +cargo check -p realtime 2>&1 | tail -20 +``` + +Expected: errors in `mod.rs` where `BridgeHandler::new()` is called without the new parameter. We fix that in Task 3. + +- [ ] **Step 4: Commit** + +```bash +git add realtime/src/node/proposal_manager/bridge_handler.rs +git commit -m "feat(realtime): add deposit receiver channel to BridgeHandler" +``` + +--- + +### Task 3: Wire deposit watcher into node startup and block building + +**Files:** +- Modify: `realtime/src/node/proposal_manager/mod.rs` +- Modify: `realtime/src/lib.rs` + +- [ ] **Step 1: Create deposit channel and pass to BridgeHandler in `mod.rs`** + +In `realtime/src/node/proposal_manager/mod.rs`, update `BatchManager::new()` to create the deposit channel and return the sender: + +Add `use tokio::sync::mpsc;` at the top (already imported for other uses). + +Change the return type and body of `BatchManager::new()`: + +```rust + pub async fn new( + l1_height_lag: u64, + config: BatchBuilderConfig, + ethereum_l1: Arc>, + taiko: Arc, + metrics: Arc, + cancel_token: CancellationToken, + last_finalized_block_hash: B256, + raiko_client: RaikoClient, + basefee_sharing_pctg: u8, + proof_request_bypass: bool, + ) -> Result<(Self, mpsc::Sender), Error> { + // ... existing code ... + + let (deposit_tx, deposit_rx) = mpsc::channel::(256); + + let bridge_addr: SocketAddr = "0.0.0.0:4545".parse()?; + let bridge_handler = Arc::new(Mutex::new( + BridgeHandler::new( + bridge_addr, + ethereum_l1.clone(), + taiko.clone(), + cancel_token.clone(), + deposit_rx, + ) + .await?, + )); + + // ... rest unchanged ... + + Ok((Self { + batch_builder: BatchBuilder::new( + config, + ethereum_l1.slot_clock.clone(), + metrics.clone(), + ), + async_submitter, + bridge_handler, + ethereum_l1, + taiko, + l1_height_lag, + metrics, + cancel_token, + last_finalized_block_hash, + }, deposit_tx)) + } +``` + +- [ ] **Step 2: Add deposit consumption to block building** + +In the same file, update `add_pending_l2_call_to_draft_block()` to also check deposits when no UserOp is pending: + +```rust + async fn add_pending_l2_call_to_draft_block( + &mut self, + l2_draft_block: &mut L2BlockV2Draft, + ) -> Result, FixedBytes<32>)>, anyhow::Error> { + // First, try UserOp-triggered L2 calls (existing behavior) + if let Some((user_op_data, l2_call)) = self + .bridge_handler + .lock() + .await + .next_user_op_and_l2_call() + .await? + { + info!("Processing pending L2 call from UserOp: {:?}", l2_call); + + let l2_call_bridge_tx = self + .taiko + .l2_execution_layer() + .construct_l2_call_tx(l2_call.message_from_l1) + .await?; + + info!( + "Inserting L2 call bridge transaction into tx list: {:?}", + l2_call_bridge_tx + ); + + l2_draft_block + .prebuilt_tx_list + .tx_list + .push(l2_call_bridge_tx); + + return Ok(Some((Some(user_op_data), l2_call.signal_slot_on_l2))); + } + + // Then, try direct deposit L2 calls from the watcher + if let Some(l2_call) = self.bridge_handler.lock().await.next_deposit_l2_call() { + info!( + "Processing pending L2 call from direct deposit: destOwner={}, value={}", + l2_call.message_from_l1.destOwner, l2_call.message_from_l1.value + ); + + let l2_call_bridge_tx = self + .taiko + .l2_execution_layer() + .construct_l2_call_tx(l2_call.message_from_l1) + .await?; + + l2_draft_block + .prebuilt_tx_list + .tx_list + .push(l2_call_bridge_tx); + + return Ok(Some((None, l2_call.signal_slot_on_l2))); + } + + Ok(None) + } +``` + +Update `add_draft_block_to_proposal()` to handle the `Option`: + +```rust + async fn add_draft_block_to_proposal( + &mut self, + mut l2_draft_block: L2BlockV2Draft, + l2_slot_context: &L2SlotContext, + operation_type: OperationType, + ) -> Result { + let mut anchor_signal_slots: Vec> = vec![]; + + debug!("Checking for pending L2 calls"); + if let Some((maybe_user_op, signal_slot)) = self + .add_pending_l2_call_to_draft_block(&mut l2_draft_block) + .await? + { + if let Some(user_op_data) = maybe_user_op { + self.batch_builder.add_user_op(user_op_data)?; + } + self.batch_builder.add_signal_slot(signal_slot)?; + anchor_signal_slots.push(signal_slot); + } else { + debug!("No pending L2 calls"); + } + // ... rest unchanged ... + } +``` + +Also update `has_pending_user_ops()` to include deposits: + +```rust + pub async fn has_pending_user_ops(&self) -> bool { + let handler = self.bridge_handler.lock().await; + handler.has_pending_user_ops() || handler.has_pending_deposits() + } +``` + +- [ ] **Step 3: Update `lib.rs` — remove gate, start watcher, fix `BatchManager::new()` call** + +In `realtime/src/lib.rs`: + +Remove the `disable_bridging` gate (lines 35-39): + +```rust + // DELETE these lines: + // if !config.disable_bridging { + // return Err(anyhow::anyhow!( + // "Bridging is not implemented. Exiting RealTime node creation." + // )); + // } +``` + +Update the `BatchManager::new()` call site in `Node::new()`. Since `BatchManager::new()` now returns a tuple, update `node/mod.rs` `Node::new()`: + +In `realtime/src/node/mod.rs`, change: + +```rust + let proposal_manager = BatchManager::new( + // ... args ... + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to create BatchManager: {}", e))?; +``` + +To: + +```rust + let (proposal_manager, deposit_tx) = BatchManager::new( + // ... args ... + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to create BatchManager: {}", e))?; +``` + +Then return `deposit_tx` from `Node::new()` by adding it to the struct or passing it back. The simplest approach: store it in `Node` temporarily and start the watcher in `entrypoint()`. + +Add `deposit_tx` field to `Node`: + +```rust +pub struct Node { + // ... existing fields ... + deposit_tx: Option>, +} +``` + +Set it in `Node::new()`: + +```rust + Ok(Self { + // ... existing fields ... + deposit_tx: Some(deposit_tx), + }) +``` + +Then in `lib.rs`, after creating the node but before calling `entrypoint()`, start the watcher. Actually, it's cleaner to start it inside `entrypoint()`. Update `entrypoint()` in `node/mod.rs`: + +```rust + pub async fn entrypoint(mut self) -> Result<(), Error> { + info!("Starting RealTime node"); + + if let Err(err) = self.warmup().await { + error!("Failed to warm up node: {}. Shutting down.", err); + self.cancel_token.cancel_on_critical_error(); + return Err(anyhow::anyhow!(err)); + } + + info!("Node warmup successful"); + + // Start the L1 deposit watcher if we have the channel + if let Some(deposit_tx) = self.deposit_tx.take() { + let l1_provider = self.ethereum_l1.execution_layer.common().provider().clone(); + let bridge_address = self.ethereum_l1.execution_layer.contract_addresses().bridge; + let signal_service = self.ethereum_l1.execution_layer.protocol_config().signal_service; + let l2_chain_id = self.taiko.l2_execution_layer().chain_id; + + let watcher = crate::l1::deposit_watcher::DepositWatcher::new( + l1_provider, + bridge_address, + signal_service, + l2_chain_id, + deposit_tx, + self.cancel_token.clone(), + ); + watcher.start(); + info!("L1 deposit watcher started"); + } + + tokio::spawn(async move { + self.preconfirmation_loop().await; + }); + + Ok(()) + } +``` + +- [ ] **Step 4: Expose needed fields from ExecutionLayer** + +In `realtime/src/l1/execution_layer.rs`, add accessor methods: + +```rust +impl ExecutionLayer { + pub fn contract_addresses(&self) -> &ContractAddresses { + &self.contract_addresses + } + + pub fn protocol_config_ref(&self) -> &ProtocolConfig { + // We need to store the protocol config. Add a field or fetch it. + // Simplest: store it during construction. + } +} +``` + +Actually, the protocol config is fetched after `ExecutionLayer` is created (in `lib.rs:66`). The simplest approach: pass the signal service address through to `Node`. Add it as a parameter to `Node::new()`: + +In `lib.rs`, after fetching `protocol_config`: + +```rust + let signal_service_address = protocol_config.signal_service; +``` + +Pass it to `Node::new()` and store it. Then use it in `entrypoint()`. + +Alternatively, expose `contract_addresses` from ExecutionLayer (it's already a field, just needs a pub getter) and store the protocol config's signal_service in it or in `Node`. + +The cleanest approach: add `signal_service` to `ContractAddresses`: + +In `realtime/src/l1/config.rs`: + +```rust +#[derive(Clone)] +pub struct ContractAddresses { + pub realtime_inbox: Address, + pub proposer_multicall: Address, + pub bridge: Address, + pub signal_service: Address, +} +``` + +Set it in `ExecutionLayer::new()` after fetching the config: + +```rust + let contract_addresses = ContractAddresses { + realtime_inbox: specific_config.realtime_inbox, + proposer_multicall: specific_config.proposer_multicall, + bridge: specific_config.bridge, + signal_service: config.signalService, + }; +``` + +And add the accessor: + +```rust +impl ExecutionLayer { + pub fn contract_addresses(&self) -> &ContractAddresses { + &self.contract_addresses + } +} +``` + +Then in `Node::entrypoint()`: + +```rust + let bridge_address = self.ethereum_l1.execution_layer.contract_addresses().bridge; + let signal_service = self.ethereum_l1.execution_layer.contract_addresses().signal_service; +``` + +For the L1 provider, `ExecutionLayerCommon` has a provider. Add a public accessor: + +In `realtime/src/l1/execution_layer.rs`: + +```rust +impl ExecutionLayer { + pub fn provider(&self) -> &DynProvider { + &self.provider + } +} +``` + +- [ ] **Step 5: Add needed imports to `node/mod.rs`** + +```rust +use crate::node::proposal_manager::bridge_handler; +use tokio::sync::mpsc; +``` + +- [ ] **Step 6: Verify it compiles** + +```bash +cargo check -p realtime 2>&1 | tail -30 +``` + +Expected: PASS (no errors). + +- [ ] **Step 7: Commit** + +```bash +git add -A +git commit -m "feat(realtime): wire deposit watcher into block building pipeline + +- Remove disable_bridging gate +- Start DepositWatcher in Node::entrypoint() +- BridgeHandler consumes deposits from both UserOp RPC and direct L1 events +- Direct deposits don't require a UserOp, just signal slot + message" +``` + +--- + +### Task 4: Verify end-to-end (manual) + +- [ ] **Step 1: Build the full project** + +```bash +cargo build -p realtime 2>&1 | tail -20 +``` + +Expected: successful build. + +- [ ] **Step 2: Run clippy** + +```bash +cargo clippy -p realtime -- -D warnings 2>&1 | tail -30 +``` + +Fix any warnings. + +- [ ] **Step 3: Run existing tests** + +```bash +cargo test -p realtime 2>&1 | tail -20 +``` + +- [ ] **Step 4: Commit any fixes** + +```bash +git add -A +git commit -m "fix(realtime): address clippy warnings in deposit watcher" +``` diff --git a/realtime/src/l1/deposit_watcher.rs b/realtime/src/l1/deposit_watcher.rs new file mode 100644 index 00000000..9412fedc --- /dev/null +++ b/realtime/src/l1/deposit_watcher.rs @@ -0,0 +1,214 @@ +use crate::shared_abi::bindings::{ + Bridge::MessageSent, + SignalService::SignalSent, +}; +use alloy::{ + primitives::{Address, FixedBytes}, + providers::{DynProvider, Provider}, + rpc::types::Filter, + sol_types::SolEvent, +}; +use anyhow::Result; +use common::utils::cancellation_token::CancellationToken; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +use crate::node::proposal_manager::bridge_handler::L2Call; + +/// Polls L1 for bridge deposit events and queues them for L2 processing. +pub struct DepositWatcher { + provider: DynProvider, + bridge_address: Address, + signal_service_address: Address, + l2_chain_id: u64, + tx: mpsc::Sender, + cancel_token: CancellationToken, +} + +impl DepositWatcher { + pub fn new( + provider: DynProvider, + bridge_address: Address, + signal_service_address: Address, + l2_chain_id: u64, + tx: mpsc::Sender, + cancel_token: CancellationToken, + ) -> Self { + Self { + provider, + bridge_address, + signal_service_address, + l2_chain_id, + tx, + cancel_token, + } + } + + /// Start polling in a background task. Returns the join handle. + pub fn start(self) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = self.run().await { + error!("DepositWatcher exited with error: {}", e); + } + }) + } + + async fn run(self) -> Result<()> { + let mut from_block = self + .provider + .get_block_number() + .await + .map_err(|e| anyhow::anyhow!("Failed to get block number: {}", e))?; + + info!( + "DepositWatcher started: bridge={}, signal_service={}, l2_chain_id={}, from_block={}", + self.bridge_address, self.signal_service_address, self.l2_chain_id, from_block + ); + + loop { + if self.cancel_token.is_cancelled() { + info!("DepositWatcher shutting down"); + return Ok(()); + } + + tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; + + let latest_block = match self.provider.get_block_number().await { + Ok(n) => n, + Err(e) => { + warn!("DepositWatcher: failed to get block number: {}", e); + continue; + } + }; + + if latest_block < from_block { + continue; + } + + match self.scan_range(from_block, latest_block).await { + Ok(count) => { + if count > 0 { + info!( + "DepositWatcher: found {} deposits in blocks {}..{}", + count, from_block, latest_block + ); + } + from_block = latest_block + 1; + } + Err(e) => { + warn!( + "DepositWatcher: error scanning blocks {}..{}: {}", + from_block, latest_block, e + ); + // Retry same range next iteration + } + } + } + } + + async fn scan_range(&self, from_block: u64, to_block: u64) -> Result { + // Query MessageSent events from the bridge + let bridge_filter = Filter::new() + .address(self.bridge_address) + .event_signature(MessageSent::SIGNATURE_HASH) + .from_block(from_block) + .to_block(to_block); + + let bridge_logs = self + .provider + .get_logs(&bridge_filter) + .await + .map_err(|e| anyhow::anyhow!("Failed to get MessageSent logs: {}", e))?; + + if bridge_logs.is_empty() { + return Ok(0); + } + + // Query SignalSent events from the signal service in the same range + let signal_filter = Filter::new() + .address(self.signal_service_address) + .event_signature(SignalSent::SIGNATURE_HASH) + .from_block(from_block) + .to_block(to_block); + + let signal_logs = self + .provider + .get_logs(&signal_filter) + .await + .map_err(|e| anyhow::anyhow!("Failed to get SignalSent logs: {}", e))?; + + // Index signal slots by block number + tx index for matching + let mut signal_by_tx: std::collections::HashMap<(u64, u64), FixedBytes<32>> = + std::collections::HashMap::new(); + + for log in &signal_logs { + if let (Some(block_number), Some(tx_index)) = + (log.block_number, log.transaction_index) + { + let log_data = alloy::primitives::LogData::new_unchecked( + log.topics().to_vec(), + log.data().data.clone(), + ); + if let Ok(decoded) = SignalSent::decode_log_data(&log_data) { + signal_by_tx.insert((block_number, tx_index), decoded.slot); + } + } + } + + let mut count = 0; + + for log in &bridge_logs { + let log_data = alloy::primitives::LogData::new_unchecked( + log.topics().to_vec(), + log.data().data.clone(), + ); + + let decoded = match MessageSent::decode_log_data(&log_data) { + Ok(d) => d, + Err(e) => { + warn!("Failed to decode MessageSent: {}", e); + continue; + } + }; + + // Only process messages targeting our L2 + if decoded.message.destChainId != self.l2_chain_id { + debug!( + "Skipping message with destChainId={} (want {})", + decoded.message.destChainId, self.l2_chain_id + ); + continue; + } + + // Find matching signal slot from the same transaction + let signal_slot = if let (Some(block_number), Some(tx_index)) = + (log.block_number, log.transaction_index) + { + signal_by_tx.get(&(block_number, tx_index)).copied() + } else { + None + }; + + let Some(signal_slot) = signal_slot else { + warn!( + "No matching SignalSent for MessageSent in block={:?} tx={:?}", + log.block_number, log.transaction_index + ); + continue; + }; + + let l2_call = L2Call { + message_from_l1: decoded.message, + signal_slot_on_l2: signal_slot, + }; + + if let Err(e) = self.tx.send(l2_call).await { + error!("Failed to queue deposit L2Call: {}", e); + } else { + count += 1; + } + } + + Ok(count) + } +} diff --git a/realtime/src/l2/execution_layer.rs b/realtime/src/l2/execution_layer.rs index b06b7b1e..d2745a05 100644 --- a/realtime/src/l2/execution_layer.rs +++ b/realtime/src/l2/execution_layer.rs @@ -234,6 +234,67 @@ impl L2ExecutionLayer { } } +// Surge: L2 UserOp execution + +use crate::node::proposal_manager::bridge_handler::UserOp; + +impl L2ExecutionLayer { + /// Construct a signed L2 transaction that executes a UserOp on L2 + /// by forwarding the calldata to the submitter smart wallet. + pub async fn construct_l2_user_op_tx(&self, user_op: &UserOp) -> Result { + use alloy::signers::local::PrivateKeySigner; + use std::str::FromStr; + + debug!("Constructing L2 UserOp execution tx for submitter={}", user_op.submitter); + + let signer_address = self.l2_call_signer.get_address(); + + let nonce = self + .provider + .get_transaction_count(signer_address) + .await + .map_err(|e| anyhow::anyhow!("Failed to get nonce for L2 UserOp tx: {}", e))?; + + let typed_tx = alloy::consensus::TxEip1559 { + chain_id: self.chain_id, + nonce, + gas_limit: 1_000_000, + max_fee_per_gas: 1_000_000_000, + max_priority_fee_per_gas: 0, + to: alloy::primitives::TxKind::Call(user_op.submitter), + value: alloy::primitives::U256::ZERO, + input: user_op.calldata.clone(), + access_list: Default::default(), + }; + + let signature = match self.l2_call_signer.as_ref() { + Signer::Web3signer(web3signer, address) => { + let signature_bytes = web3signer.sign_transaction(&typed_tx, *address).await?; + Signature::try_from(signature_bytes.as_slice()) + .map_err(|e| anyhow::anyhow!("Failed to parse signature: {}", e))? + } + Signer::PrivateKey(private_key, _) => { + let signer = PrivateKeySigner::from_str(private_key.as_str())?; + AlloySigner::sign_hash(&signer, &typed_tx.signature_hash()).await? + } + }; + + let sig_tx = typed_tx.into_signed(signature); + let tx_envelope = TxEnvelope::from(sig_tx); + + debug!("L2 UserOp execution tx hash: {}", tx_envelope.tx_hash()); + + let tx = Transaction { + inner: Recovered::new_unchecked(tx_envelope, signer_address), + block_hash: None, + block_number: None, + transaction_index: None, + effective_gas_price: None, + }; + Ok(tx) + } +} + // Surge: L2 EL ops for Bridge Handler pub trait L2BridgeHandlerOps { diff --git a/realtime/src/lib.rs b/realtime/src/lib.rs index 5d65fd90..deb73161 100644 --- a/realtime/src/lib.rs +++ b/realtime/src/lib.rs @@ -32,12 +32,6 @@ pub async fn create_realtime_node( ) -> Result<(), Error> { info!("Creating RealTime node"); - if !config.disable_bridging { - return Err(anyhow::anyhow!( - "Bridging is not implemented. Exiting RealTime node creation." - )); - } - let realtime_config = RealtimeConfig::read_env_variables() .map_err(|e| anyhow::anyhow!("Failed to read RealTime configuration: {}", e))?; info!("RealTime config: {}", realtime_config); diff --git a/realtime/src/node/proposal_manager/bridge_handler.rs b/realtime/src/node/proposal_manager/bridge_handler.rs index 3818dfce..37095415 100644 --- a/realtime/src/node/proposal_manager/bridge_handler.rs +++ b/realtime/src/node/proposal_manager/bridge_handler.rs @@ -83,6 +83,7 @@ pub struct L2Call { #[derive(Clone)] struct BridgeRpcContext { tx: mpsc::Sender, + l2_tx: mpsc::Sender, status_store: UserOpStatusStore, next_id: Arc, } @@ -91,6 +92,7 @@ pub struct BridgeHandler { ethereum_l1: Arc>, taiko: Arc, rx: Receiver, + l2_rx: Receiver, status_store: UserOpStatusStore, } @@ -102,10 +104,12 @@ impl BridgeHandler { cancellation_token: CancellationToken, ) -> Result { let (tx, rx) = mpsc::channel::(1024); + let (l2_tx, l2_rx) = mpsc::channel::(1024); let status_store = UserOpStatusStore::open("data/user_op_status")?; let rpc_context = BridgeRpcContext { tx, + l2_tx, status_store: status_store.clone(), next_id: Arc::new(AtomicU64::new(1)), }; @@ -165,6 +169,33 @@ impl BridgeHandler { } })?; + module.register_async_method("surge_sendL2UserOp", |params, ctx, _| async move { + let mut user_op: UserOp = params.parse()?; + let id = ctx.next_id.fetch_add(1, Ordering::Relaxed); + user_op.id = id; + + info!( + "Received L2 UserOp: id={}, submitter={:?}, calldata_len={}", + id, + user_op.submitter, + user_op.calldata.len() + ); + + ctx.status_store.set(id, &UserOpStatus::Pending); + + ctx.l2_tx.send(user_op).await.map_err(|e| { + error!("Failed to send L2 UserOp to queue: {}", e); + ctx.status_store.remove(id); + jsonrpsee::types::ErrorObjectOwned::owned( + -32000, + "Failed to queue L2 user operation", + Some(format!("{}", e)), + ) + })?; + + Ok::(id) + })?; + info!("Bridge handler RPC server starting on {}", addr); let handle = server.start(module); @@ -178,6 +209,7 @@ impl BridgeHandler { ethereum_l1, taiko, rx, + l2_rx, status_store, }) } @@ -246,4 +278,13 @@ impl BridgeHandler { pub fn has_pending_user_ops(&self) -> bool { !self.rx.is_empty() } + + /// Dequeue the next L2 UserOp (for bridge-out / L2 execution). + pub fn next_l2_user_op(&mut self) -> Option { + self.l2_rx.try_recv().ok() + } + + pub fn has_pending_l2_user_ops(&self) -> bool { + !self.l2_rx.is_empty() + } } diff --git a/realtime/src/node/proposal_manager/mod.rs b/realtime/src/node/proposal_manager/mod.rs index f85c3d6f..5945145d 100644 --- a/realtime/src/node/proposal_manager/mod.rs +++ b/realtime/src/node/proposal_manager/mod.rs @@ -260,7 +260,8 @@ impl BatchManager { } pub async fn has_pending_user_ops(&self) -> bool { - self.bridge_handler.lock().await.has_pending_user_ops() + let handler = self.bridge_handler.lock().await; + handler.has_pending_user_ops() || handler.has_pending_l2_user_ops() } async fn add_pending_l2_call_to_draft_block( @@ -298,6 +299,59 @@ impl BatchManager { Ok(None) } + /// Check for pending L2 UserOps (bridge-out) and insert their execution + /// transactions into the draft block's tx list. + async fn add_pending_l2_user_ops_to_draft_block( + &mut self, + l2_draft_block: &mut L2BlockV2Draft, + ) -> Result<(), anyhow::Error> { + // Collect all pending L2 UserOps while holding the lock, then release it + let (l2_user_ops, status_store) = { + let mut handler = self.bridge_handler.lock().await; + let mut ops = vec![]; + while let Some(op) = handler.next_l2_user_op() { + ops.push(op); + } + (ops, handler.status_store()) + }; + + for l2_user_op in l2_user_ops { + info!( + "Processing L2 UserOp id={} submitter={}", + l2_user_op.id, l2_user_op.submitter + ); + + match self + .taiko + .l2_execution_layer() + .construct_l2_user_op_tx(&l2_user_op) + .await + { + Ok(tx) => { + info!("Inserting L2 UserOp execution tx into block tx list"); + l2_draft_block.prebuilt_tx_list.tx_list.push(tx); + status_store.set( + l2_user_op.id, + &bridge_handler::UserOpStatus::Processing { + tx_hash: FixedBytes::default(), + }, + ); + } + Err(e) => { + error!("Failed to construct L2 UserOp tx: {}", e); + status_store.set( + l2_user_op.id, + &bridge_handler::UserOpStatus::Rejected { + reason: format!("Failed to construct L2 tx: {}", e), + }, + ); + } + } + } + + Ok(()) + } + async fn add_draft_block_to_proposal( &mut self, mut l2_draft_block: L2BlockV2Draft, @@ -306,7 +360,8 @@ impl BatchManager { ) -> Result { let mut anchor_signal_slots: Vec> = vec![]; - debug!("Checking for pending L2 calls"); + // 1. L1→L2 deposits: check for pending L1 UserOps that trigger bridge messages + debug!("Checking for pending L2 calls (L1→L2 deposits)"); if let Some((user_op_data, signal_slot)) = self .add_pending_l2_call_to_draft_block(&mut l2_draft_block) .await? @@ -315,8 +370,14 @@ impl BatchManager { self.batch_builder.add_signal_slot(signal_slot)?; anchor_signal_slots.push(signal_slot); } else { - debug!("No pending L2 calls"); + debug!("No pending L1→L2 deposits"); } + + // 2. L2→L1 withdrawals: check for pending L2 UserOps (bridge-out) + debug!("Checking for pending L2 UserOps (L2→L1 withdrawals)"); + self.add_pending_l2_user_ops_to_draft_block(&mut l2_draft_block) + .await?; + let payload = self.batch_builder.add_l2_draft_block(l2_draft_block)?; match self From ef7cc22f587ecd4a4ac20745b5764a6a260e8371 Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 05:10:55 +0300 Subject: [PATCH 02/11] chore: remove unused deposit watcher and plan doc Co-Authored-By: Claude Opus 4.6 (1M context) --- ...2026-03-29-auto-process-bridge-deposits.md | 768 ------------------ realtime/src/l1/deposit_watcher.rs | 214 ----- 2 files changed, 982 deletions(-) delete mode 100644 docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md delete mode 100644 realtime/src/l1/deposit_watcher.rs diff --git a/docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md b/docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md deleted file mode 100644 index 64b5f499..00000000 --- a/docs/superpowers/plans/2026-03-29-auto-process-bridge-deposits.md +++ /dev/null @@ -1,768 +0,0 @@ -# Auto-Process L1 Bridge Deposits Implementation Plan - -> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. - -**Goal:** The Catalyst sequencer should automatically detect L1 bridge `MessageSent` events and include `processMessage` transactions in L2 blocks, without requiring users to submit UserOps. - -**Architecture:** A new `DepositWatcher` polls L1 for `MessageSent` events on the bridge contract. Discovered deposits are queued as `L2Call` structs into the existing `BridgeHandler`, which already knows how to construct `processMessage` L2 transactions and inject them into block building. The `disable_bridging` gate is removed so the bridge handler starts. - -**Tech Stack:** Rust, Alloy (Ethereum RPC), Tokio (async), existing Catalyst crate structure - ---- - -## File Map - -| File | Action | Responsibility | -|------|--------|---------------| -| `realtime/src/l1/deposit_watcher.rs` | Create | Poll L1 bridge for MessageSent+SignalSent events, queue L2Calls | -| `realtime/src/l1/mod.rs` | Modify | Add `pub mod deposit_watcher;` | -| `realtime/src/node/proposal_manager/bridge_handler.rs` | Modify | Add deposit queue intake alongside UserOp queue | -| `realtime/src/node/proposal_manager/mod.rs` | Modify | Drain deposit queue in block building | -| `realtime/src/lib.rs` | Modify | Remove `disable_bridging` gate, start watcher | - ---- - -### Task 1: Create the L1 Deposit Watcher - -**Files:** -- Create: `realtime/src/l1/deposit_watcher.rs` -- Modify: `realtime/src/l1/mod.rs` - -- [ ] **Step 1: Create `deposit_watcher.rs`** - -This module polls L1 for `MessageSent` events on the bridge contract and co-located `SignalSent` events on the signal service. It filters for messages targeting our L2 chain ID and sends discovered `(Message, signal_slot)` pairs through a channel. - -```rust -// realtime/src/l1/deposit_watcher.rs - -use crate::shared_abi::bindings::{ - Bridge::MessageSent, - IBridge::Message, - SignalService::SignalSent, -}; -use alloy::{ - primitives::{Address, FixedBytes}, - providers::{DynProvider, Provider}, - rpc::types::Filter, - sol_types::SolEvent, -}; -use anyhow::Result; -use common::utils::cancellation_token::CancellationToken; -use std::sync::Arc; -use tokio::sync::mpsc; -use tracing::{debug, error, info, warn}; - -use crate::node::proposal_manager::bridge_handler::L2Call; - -/// Polls L1 for bridge deposit events and queues them for L2 processing. -pub struct DepositWatcher { - provider: DynProvider, - bridge_address: Address, - signal_service_address: Address, - l2_chain_id: u64, - tx: mpsc::Sender, - cancel_token: CancellationToken, -} - -impl DepositWatcher { - pub fn new( - provider: DynProvider, - bridge_address: Address, - signal_service_address: Address, - l2_chain_id: u64, - tx: mpsc::Sender, - cancel_token: CancellationToken, - ) -> Self { - Self { - provider, - bridge_address, - signal_service_address, - l2_chain_id, - tx, - cancel_token, - } - } - - /// Start polling in a background task. Returns the join handle. - pub fn start(self) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - if let Err(e) = self.run().await { - error!("DepositWatcher exited with error: {}", e); - } - }) - } - - async fn run(self) -> Result<()> { - // Start from the latest block - let mut from_block = self - .provider - .get_block_number() - .await - .map_err(|e| anyhow::anyhow!("Failed to get block number: {}", e))?; - - info!( - "DepositWatcher started: bridge={}, signal_service={}, l2_chain_id={}, from_block={}", - self.bridge_address, self.signal_service_address, self.l2_chain_id, from_block - ); - - loop { - if self.cancel_token.is_cancelled() { - info!("DepositWatcher shutting down"); - return Ok(()); - } - - tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; - - let latest_block = match self.provider.get_block_number().await { - Ok(n) => n, - Err(e) => { - warn!("DepositWatcher: failed to get block number: {}", e); - continue; - } - }; - - if latest_block < from_block { - continue; - } - - match self.scan_range(from_block, latest_block).await { - Ok(count) => { - if count > 0 { - info!( - "DepositWatcher: found {} deposits in blocks {}..{}", - count, from_block, latest_block - ); - } - from_block = latest_block + 1; - } - Err(e) => { - warn!( - "DepositWatcher: error scanning blocks {}..{}: {}", - from_block, latest_block, e - ); - // Retry same range next iteration - } - } - } - } - - async fn scan_range(&self, from_block: u64, to_block: u64) -> Result { - // Query MessageSent events from the bridge - let bridge_filter = Filter::new() - .address(self.bridge_address) - .event_signature(MessageSent::SIGNATURE_HASH) - .from_block(from_block) - .to_block(to_block); - - let bridge_logs = self - .provider - .get_logs(&bridge_filter) - .await - .map_err(|e| anyhow::anyhow!("Failed to get MessageSent logs: {}", e))?; - - if bridge_logs.is_empty() { - return Ok(0); - } - - // Query SignalSent events from the signal service in the same range - let signal_filter = Filter::new() - .address(self.signal_service_address) - .event_signature(SignalSent::SIGNATURE_HASH) - .from_block(from_block) - .to_block(to_block); - - let signal_logs = self - .provider - .get_logs(&signal_filter) - .await - .map_err(|e| anyhow::anyhow!("Failed to get SignalSent logs: {}", e))?; - - // Index signal slots by block number + tx index for matching - let mut signal_by_tx: std::collections::HashMap<(u64, u64), FixedBytes<32>> = - std::collections::HashMap::new(); - - for log in &signal_logs { - if let (Some(block_number), Some(tx_index)) = - (log.block_number, log.transaction_index) - { - let log_data = alloy::primitives::LogData::new_unchecked( - log.topics().to_vec(), - log.data().data.clone(), - ); - if let Ok(decoded) = SignalSent::decode_log_data(&log_data) { - signal_by_tx.insert((block_number, tx_index), decoded.slot); - } - } - } - - let mut count = 0; - - for log in &bridge_logs { - let log_data = alloy::primitives::LogData::new_unchecked( - log.topics().to_vec(), - log.data().data.clone(), - ); - - let decoded = match MessageSent::decode_log_data(&log_data) { - Ok(d) => d, - Err(e) => { - warn!("Failed to decode MessageSent: {}", e); - continue; - } - }; - - // Only process messages targeting our L2 - if decoded.message.destChainId != self.l2_chain_id { - debug!( - "Skipping message with destChainId={} (want {})", - decoded.message.destChainId, self.l2_chain_id - ); - continue; - } - - // Find matching signal slot from the same transaction - let signal_slot = if let (Some(block_number), Some(tx_index)) = - (log.block_number, log.transaction_index) - { - signal_by_tx.get(&(block_number, tx_index)).copied() - } else { - None - }; - - let Some(signal_slot) = signal_slot else { - warn!( - "No matching SignalSent for MessageSent in block={:?} tx={:?}", - log.block_number, log.transaction_index - ); - continue; - }; - - let l2_call = L2Call { - message_from_l1: decoded.message, - signal_slot_on_l2: signal_slot, - }; - - if let Err(e) = self.tx.send(l2_call).await { - error!("Failed to queue deposit L2Call: {}", e); - } else { - count += 1; - } - } - - Ok(count) - } -} -``` - -- [ ] **Step 2: Register the module in `l1/mod.rs`** - -Add the new module to `realtime/src/l1/mod.rs`: - -```rust -pub mod bindings; -pub mod config; -pub mod deposit_watcher; -pub mod execution_layer; -pub mod proposal_tx_builder; -pub mod protocol_config; -``` - -- [ ] **Step 3: Verify it compiles** - -Run from `/tmp/catalyst`: -```bash -cargo check -p realtime 2>&1 | tail -20 -``` - -Expected: may have unused warnings but no errors (the watcher isn't wired in yet). - -- [ ] **Step 4: Commit** - -```bash -git add realtime/src/l1/deposit_watcher.rs realtime/src/l1/mod.rs -git commit -m "feat(realtime): add L1 deposit watcher for bridge MessageSent events" -``` - ---- - -### Task 2: Add deposit queue to BridgeHandler - -**Files:** -- Modify: `realtime/src/node/proposal_manager/bridge_handler.rs` - -The `BridgeHandler` currently only receives bridge data from UserOps via the RPC channel. We add a second channel for direct L1 deposits discovered by the watcher. - -- [ ] **Step 1: Add deposit receiver field and constructor parameter** - -In `realtime/src/node/proposal_manager/bridge_handler.rs`, add the deposit channel: - -Add a new field to `BridgeHandler`: - -```rust -pub struct BridgeHandler { - ethereum_l1: Arc>, - taiko: Arc, - rx: Receiver, - deposit_rx: Receiver, - status_store: UserOpStatusStore, -} -``` - -Update `BridgeHandler::new()` to accept and store the receiver: - -```rust - pub async fn new( - addr: SocketAddr, - ethereum_l1: Arc>, - taiko: Arc, - cancellation_token: CancellationToken, - deposit_rx: Receiver, - ) -> Result { -``` - -And in the return value: - -```rust - Ok(Self { - ethereum_l1, - taiko, - rx, - deposit_rx, - status_store, - }) -``` - -- [ ] **Step 2: Add `next_deposit_l2_call()` method** - -Add a method that drains the deposit queue, returning the next L2Call from direct deposits: - -```rust - pub fn next_deposit_l2_call(&mut self) -> Option { - self.deposit_rx.try_recv().ok() - } - - pub fn has_pending_deposits(&self) -> bool { - !self.deposit_rx.is_empty() - } -``` - -- [ ] **Step 3: Verify it compiles** - -```bash -cargo check -p realtime 2>&1 | tail -20 -``` - -Expected: errors in `mod.rs` where `BridgeHandler::new()` is called without the new parameter. We fix that in Task 3. - -- [ ] **Step 4: Commit** - -```bash -git add realtime/src/node/proposal_manager/bridge_handler.rs -git commit -m "feat(realtime): add deposit receiver channel to BridgeHandler" -``` - ---- - -### Task 3: Wire deposit watcher into node startup and block building - -**Files:** -- Modify: `realtime/src/node/proposal_manager/mod.rs` -- Modify: `realtime/src/lib.rs` - -- [ ] **Step 1: Create deposit channel and pass to BridgeHandler in `mod.rs`** - -In `realtime/src/node/proposal_manager/mod.rs`, update `BatchManager::new()` to create the deposit channel and return the sender: - -Add `use tokio::sync::mpsc;` at the top (already imported for other uses). - -Change the return type and body of `BatchManager::new()`: - -```rust - pub async fn new( - l1_height_lag: u64, - config: BatchBuilderConfig, - ethereum_l1: Arc>, - taiko: Arc, - metrics: Arc, - cancel_token: CancellationToken, - last_finalized_block_hash: B256, - raiko_client: RaikoClient, - basefee_sharing_pctg: u8, - proof_request_bypass: bool, - ) -> Result<(Self, mpsc::Sender), Error> { - // ... existing code ... - - let (deposit_tx, deposit_rx) = mpsc::channel::(256); - - let bridge_addr: SocketAddr = "0.0.0.0:4545".parse()?; - let bridge_handler = Arc::new(Mutex::new( - BridgeHandler::new( - bridge_addr, - ethereum_l1.clone(), - taiko.clone(), - cancel_token.clone(), - deposit_rx, - ) - .await?, - )); - - // ... rest unchanged ... - - Ok((Self { - batch_builder: BatchBuilder::new( - config, - ethereum_l1.slot_clock.clone(), - metrics.clone(), - ), - async_submitter, - bridge_handler, - ethereum_l1, - taiko, - l1_height_lag, - metrics, - cancel_token, - last_finalized_block_hash, - }, deposit_tx)) - } -``` - -- [ ] **Step 2: Add deposit consumption to block building** - -In the same file, update `add_pending_l2_call_to_draft_block()` to also check deposits when no UserOp is pending: - -```rust - async fn add_pending_l2_call_to_draft_block( - &mut self, - l2_draft_block: &mut L2BlockV2Draft, - ) -> Result, FixedBytes<32>)>, anyhow::Error> { - // First, try UserOp-triggered L2 calls (existing behavior) - if let Some((user_op_data, l2_call)) = self - .bridge_handler - .lock() - .await - .next_user_op_and_l2_call() - .await? - { - info!("Processing pending L2 call from UserOp: {:?}", l2_call); - - let l2_call_bridge_tx = self - .taiko - .l2_execution_layer() - .construct_l2_call_tx(l2_call.message_from_l1) - .await?; - - info!( - "Inserting L2 call bridge transaction into tx list: {:?}", - l2_call_bridge_tx - ); - - l2_draft_block - .prebuilt_tx_list - .tx_list - .push(l2_call_bridge_tx); - - return Ok(Some((Some(user_op_data), l2_call.signal_slot_on_l2))); - } - - // Then, try direct deposit L2 calls from the watcher - if let Some(l2_call) = self.bridge_handler.lock().await.next_deposit_l2_call() { - info!( - "Processing pending L2 call from direct deposit: destOwner={}, value={}", - l2_call.message_from_l1.destOwner, l2_call.message_from_l1.value - ); - - let l2_call_bridge_tx = self - .taiko - .l2_execution_layer() - .construct_l2_call_tx(l2_call.message_from_l1) - .await?; - - l2_draft_block - .prebuilt_tx_list - .tx_list - .push(l2_call_bridge_tx); - - return Ok(Some((None, l2_call.signal_slot_on_l2))); - } - - Ok(None) - } -``` - -Update `add_draft_block_to_proposal()` to handle the `Option`: - -```rust - async fn add_draft_block_to_proposal( - &mut self, - mut l2_draft_block: L2BlockV2Draft, - l2_slot_context: &L2SlotContext, - operation_type: OperationType, - ) -> Result { - let mut anchor_signal_slots: Vec> = vec![]; - - debug!("Checking for pending L2 calls"); - if let Some((maybe_user_op, signal_slot)) = self - .add_pending_l2_call_to_draft_block(&mut l2_draft_block) - .await? - { - if let Some(user_op_data) = maybe_user_op { - self.batch_builder.add_user_op(user_op_data)?; - } - self.batch_builder.add_signal_slot(signal_slot)?; - anchor_signal_slots.push(signal_slot); - } else { - debug!("No pending L2 calls"); - } - // ... rest unchanged ... - } -``` - -Also update `has_pending_user_ops()` to include deposits: - -```rust - pub async fn has_pending_user_ops(&self) -> bool { - let handler = self.bridge_handler.lock().await; - handler.has_pending_user_ops() || handler.has_pending_deposits() - } -``` - -- [ ] **Step 3: Update `lib.rs` — remove gate, start watcher, fix `BatchManager::new()` call** - -In `realtime/src/lib.rs`: - -Remove the `disable_bridging` gate (lines 35-39): - -```rust - // DELETE these lines: - // if !config.disable_bridging { - // return Err(anyhow::anyhow!( - // "Bridging is not implemented. Exiting RealTime node creation." - // )); - // } -``` - -Update the `BatchManager::new()` call site in `Node::new()`. Since `BatchManager::new()` now returns a tuple, update `node/mod.rs` `Node::new()`: - -In `realtime/src/node/mod.rs`, change: - -```rust - let proposal_manager = BatchManager::new( - // ... args ... - ) - .await - .map_err(|e| anyhow::anyhow!("Failed to create BatchManager: {}", e))?; -``` - -To: - -```rust - let (proposal_manager, deposit_tx) = BatchManager::new( - // ... args ... - ) - .await - .map_err(|e| anyhow::anyhow!("Failed to create BatchManager: {}", e))?; -``` - -Then return `deposit_tx` from `Node::new()` by adding it to the struct or passing it back. The simplest approach: store it in `Node` temporarily and start the watcher in `entrypoint()`. - -Add `deposit_tx` field to `Node`: - -```rust -pub struct Node { - // ... existing fields ... - deposit_tx: Option>, -} -``` - -Set it in `Node::new()`: - -```rust - Ok(Self { - // ... existing fields ... - deposit_tx: Some(deposit_tx), - }) -``` - -Then in `lib.rs`, after creating the node but before calling `entrypoint()`, start the watcher. Actually, it's cleaner to start it inside `entrypoint()`. Update `entrypoint()` in `node/mod.rs`: - -```rust - pub async fn entrypoint(mut self) -> Result<(), Error> { - info!("Starting RealTime node"); - - if let Err(err) = self.warmup().await { - error!("Failed to warm up node: {}. Shutting down.", err); - self.cancel_token.cancel_on_critical_error(); - return Err(anyhow::anyhow!(err)); - } - - info!("Node warmup successful"); - - // Start the L1 deposit watcher if we have the channel - if let Some(deposit_tx) = self.deposit_tx.take() { - let l1_provider = self.ethereum_l1.execution_layer.common().provider().clone(); - let bridge_address = self.ethereum_l1.execution_layer.contract_addresses().bridge; - let signal_service = self.ethereum_l1.execution_layer.protocol_config().signal_service; - let l2_chain_id = self.taiko.l2_execution_layer().chain_id; - - let watcher = crate::l1::deposit_watcher::DepositWatcher::new( - l1_provider, - bridge_address, - signal_service, - l2_chain_id, - deposit_tx, - self.cancel_token.clone(), - ); - watcher.start(); - info!("L1 deposit watcher started"); - } - - tokio::spawn(async move { - self.preconfirmation_loop().await; - }); - - Ok(()) - } -``` - -- [ ] **Step 4: Expose needed fields from ExecutionLayer** - -In `realtime/src/l1/execution_layer.rs`, add accessor methods: - -```rust -impl ExecutionLayer { - pub fn contract_addresses(&self) -> &ContractAddresses { - &self.contract_addresses - } - - pub fn protocol_config_ref(&self) -> &ProtocolConfig { - // We need to store the protocol config. Add a field or fetch it. - // Simplest: store it during construction. - } -} -``` - -Actually, the protocol config is fetched after `ExecutionLayer` is created (in `lib.rs:66`). The simplest approach: pass the signal service address through to `Node`. Add it as a parameter to `Node::new()`: - -In `lib.rs`, after fetching `protocol_config`: - -```rust - let signal_service_address = protocol_config.signal_service; -``` - -Pass it to `Node::new()` and store it. Then use it in `entrypoint()`. - -Alternatively, expose `contract_addresses` from ExecutionLayer (it's already a field, just needs a pub getter) and store the protocol config's signal_service in it or in `Node`. - -The cleanest approach: add `signal_service` to `ContractAddresses`: - -In `realtime/src/l1/config.rs`: - -```rust -#[derive(Clone)] -pub struct ContractAddresses { - pub realtime_inbox: Address, - pub proposer_multicall: Address, - pub bridge: Address, - pub signal_service: Address, -} -``` - -Set it in `ExecutionLayer::new()` after fetching the config: - -```rust - let contract_addresses = ContractAddresses { - realtime_inbox: specific_config.realtime_inbox, - proposer_multicall: specific_config.proposer_multicall, - bridge: specific_config.bridge, - signal_service: config.signalService, - }; -``` - -And add the accessor: - -```rust -impl ExecutionLayer { - pub fn contract_addresses(&self) -> &ContractAddresses { - &self.contract_addresses - } -} -``` - -Then in `Node::entrypoint()`: - -```rust - let bridge_address = self.ethereum_l1.execution_layer.contract_addresses().bridge; - let signal_service = self.ethereum_l1.execution_layer.contract_addresses().signal_service; -``` - -For the L1 provider, `ExecutionLayerCommon` has a provider. Add a public accessor: - -In `realtime/src/l1/execution_layer.rs`: - -```rust -impl ExecutionLayer { - pub fn provider(&self) -> &DynProvider { - &self.provider - } -} -``` - -- [ ] **Step 5: Add needed imports to `node/mod.rs`** - -```rust -use crate::node::proposal_manager::bridge_handler; -use tokio::sync::mpsc; -``` - -- [ ] **Step 6: Verify it compiles** - -```bash -cargo check -p realtime 2>&1 | tail -30 -``` - -Expected: PASS (no errors). - -- [ ] **Step 7: Commit** - -```bash -git add -A -git commit -m "feat(realtime): wire deposit watcher into block building pipeline - -- Remove disable_bridging gate -- Start DepositWatcher in Node::entrypoint() -- BridgeHandler consumes deposits from both UserOp RPC and direct L1 events -- Direct deposits don't require a UserOp, just signal slot + message" -``` - ---- - -### Task 4: Verify end-to-end (manual) - -- [ ] **Step 1: Build the full project** - -```bash -cargo build -p realtime 2>&1 | tail -20 -``` - -Expected: successful build. - -- [ ] **Step 2: Run clippy** - -```bash -cargo clippy -p realtime -- -D warnings 2>&1 | tail -30 -``` - -Fix any warnings. - -- [ ] **Step 3: Run existing tests** - -```bash -cargo test -p realtime 2>&1 | tail -20 -``` - -- [ ] **Step 4: Commit any fixes** - -```bash -git add -A -git commit -m "fix(realtime): address clippy warnings in deposit watcher" -``` diff --git a/realtime/src/l1/deposit_watcher.rs b/realtime/src/l1/deposit_watcher.rs deleted file mode 100644 index 9412fedc..00000000 --- a/realtime/src/l1/deposit_watcher.rs +++ /dev/null @@ -1,214 +0,0 @@ -use crate::shared_abi::bindings::{ - Bridge::MessageSent, - SignalService::SignalSent, -}; -use alloy::{ - primitives::{Address, FixedBytes}, - providers::{DynProvider, Provider}, - rpc::types::Filter, - sol_types::SolEvent, -}; -use anyhow::Result; -use common::utils::cancellation_token::CancellationToken; -use tokio::sync::mpsc; -use tracing::{debug, error, info, warn}; - -use crate::node::proposal_manager::bridge_handler::L2Call; - -/// Polls L1 for bridge deposit events and queues them for L2 processing. -pub struct DepositWatcher { - provider: DynProvider, - bridge_address: Address, - signal_service_address: Address, - l2_chain_id: u64, - tx: mpsc::Sender, - cancel_token: CancellationToken, -} - -impl DepositWatcher { - pub fn new( - provider: DynProvider, - bridge_address: Address, - signal_service_address: Address, - l2_chain_id: u64, - tx: mpsc::Sender, - cancel_token: CancellationToken, - ) -> Self { - Self { - provider, - bridge_address, - signal_service_address, - l2_chain_id, - tx, - cancel_token, - } - } - - /// Start polling in a background task. Returns the join handle. - pub fn start(self) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - if let Err(e) = self.run().await { - error!("DepositWatcher exited with error: {}", e); - } - }) - } - - async fn run(self) -> Result<()> { - let mut from_block = self - .provider - .get_block_number() - .await - .map_err(|e| anyhow::anyhow!("Failed to get block number: {}", e))?; - - info!( - "DepositWatcher started: bridge={}, signal_service={}, l2_chain_id={}, from_block={}", - self.bridge_address, self.signal_service_address, self.l2_chain_id, from_block - ); - - loop { - if self.cancel_token.is_cancelled() { - info!("DepositWatcher shutting down"); - return Ok(()); - } - - tokio::time::sleep(tokio::time::Duration::from_secs(6)).await; - - let latest_block = match self.provider.get_block_number().await { - Ok(n) => n, - Err(e) => { - warn!("DepositWatcher: failed to get block number: {}", e); - continue; - } - }; - - if latest_block < from_block { - continue; - } - - match self.scan_range(from_block, latest_block).await { - Ok(count) => { - if count > 0 { - info!( - "DepositWatcher: found {} deposits in blocks {}..{}", - count, from_block, latest_block - ); - } - from_block = latest_block + 1; - } - Err(e) => { - warn!( - "DepositWatcher: error scanning blocks {}..{}: {}", - from_block, latest_block, e - ); - // Retry same range next iteration - } - } - } - } - - async fn scan_range(&self, from_block: u64, to_block: u64) -> Result { - // Query MessageSent events from the bridge - let bridge_filter = Filter::new() - .address(self.bridge_address) - .event_signature(MessageSent::SIGNATURE_HASH) - .from_block(from_block) - .to_block(to_block); - - let bridge_logs = self - .provider - .get_logs(&bridge_filter) - .await - .map_err(|e| anyhow::anyhow!("Failed to get MessageSent logs: {}", e))?; - - if bridge_logs.is_empty() { - return Ok(0); - } - - // Query SignalSent events from the signal service in the same range - let signal_filter = Filter::new() - .address(self.signal_service_address) - .event_signature(SignalSent::SIGNATURE_HASH) - .from_block(from_block) - .to_block(to_block); - - let signal_logs = self - .provider - .get_logs(&signal_filter) - .await - .map_err(|e| anyhow::anyhow!("Failed to get SignalSent logs: {}", e))?; - - // Index signal slots by block number + tx index for matching - let mut signal_by_tx: std::collections::HashMap<(u64, u64), FixedBytes<32>> = - std::collections::HashMap::new(); - - for log in &signal_logs { - if let (Some(block_number), Some(tx_index)) = - (log.block_number, log.transaction_index) - { - let log_data = alloy::primitives::LogData::new_unchecked( - log.topics().to_vec(), - log.data().data.clone(), - ); - if let Ok(decoded) = SignalSent::decode_log_data(&log_data) { - signal_by_tx.insert((block_number, tx_index), decoded.slot); - } - } - } - - let mut count = 0; - - for log in &bridge_logs { - let log_data = alloy::primitives::LogData::new_unchecked( - log.topics().to_vec(), - log.data().data.clone(), - ); - - let decoded = match MessageSent::decode_log_data(&log_data) { - Ok(d) => d, - Err(e) => { - warn!("Failed to decode MessageSent: {}", e); - continue; - } - }; - - // Only process messages targeting our L2 - if decoded.message.destChainId != self.l2_chain_id { - debug!( - "Skipping message with destChainId={} (want {})", - decoded.message.destChainId, self.l2_chain_id - ); - continue; - } - - // Find matching signal slot from the same transaction - let signal_slot = if let (Some(block_number), Some(tx_index)) = - (log.block_number, log.transaction_index) - { - signal_by_tx.get(&(block_number, tx_index)).copied() - } else { - None - }; - - let Some(signal_slot) = signal_slot else { - warn!( - "No matching SignalSent for MessageSent in block={:?} tx={:?}", - log.block_number, log.transaction_index - ); - continue; - }; - - let l2_call = L2Call { - message_from_l1: decoded.message, - signal_slot_on_l2: signal_slot, - }; - - if let Err(e) = self.tx.send(l2_call).await { - error!("Failed to queue deposit L2Call: {}", e); - } else { - count += 1; - } - } - - Ok(count) - } -} From 63eecee4b2949f7fee683c299e5ddb15af3530d1 Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 05:38:31 +0300 Subject: [PATCH 03/11] refactor(realtime): single surge_sendUserOp RPC with auto chain detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of separate RPC methods for L1 and L2 UserOps, the single surge_sendUserOp endpoint now auto-detects the target chain by parsing the EIP-712 signature in the executeBatch calldata. The UserOpsSubmitter's EIP-712 domain includes chainId, so the signature is only valid for one chain. We compute the EIP-712 digest for both L1 and L2 chain IDs, ecrecover each, and route accordingly: - L1 signature → L1→L2 deposit flow (simulate on L1, processMessage on L2) - L2 signature → L2 direct execution (UserOp tx in L2 block, L2→L1 relay via find_l1_call) Both types can coexist in the same block. Co-Authored-By: Claude Opus 4.6 (1M context) --- realtime/src/lib.rs | 8 + realtime/src/node/mod.rs | 4 + .../node/proposal_manager/bridge_handler.rs | 251 +++++++++++++----- realtime/src/node/proposal_manager/mod.rs | 151 +++++------ 4 files changed, 262 insertions(+), 152 deletions(-) diff --git a/realtime/src/lib.rs b/realtime/src/lib.rs index deb73161..bf84bced 100644 --- a/realtime/src/lib.rs +++ b/realtime/src/lib.rs @@ -131,6 +131,12 @@ pub async fn create_realtime_node( let proof_request_bypass = realtime_config.proof_request_bypass; let raiko_client = raiko::RaikoClient::new(&realtime_config); + let l1_chain_id = { + use common::l1::traits::ELTrait; + ethereum_l1.execution_layer.common().chain_id() + }; + let l2_chain_id = taiko.l2_execution_layer().chain_id; + let node = Node::new( node_config, cancel_token.clone(), @@ -145,6 +151,8 @@ pub async fn create_realtime_node( protocol_config.basefee_sharing_pctg, preconf_only, proof_request_bypass, + l1_chain_id, + l2_chain_id, ) .await .map_err(|e| anyhow::anyhow!("Failed to create Node: {}", e))?; diff --git a/realtime/src/node/mod.rs b/realtime/src/node/mod.rs index 33c5cf28..dc7f4c98 100644 --- a/realtime/src/node/mod.rs +++ b/realtime/src/node/mod.rs @@ -56,6 +56,8 @@ impl Node { basefee_sharing_pctg: u8, preconf_only: bool, proof_request_bypass: bool, + l1_chain_id: u64, + l2_chain_id: u64, ) -> Result { let operator = Operator::new( ethereum_l1.execution_layer.clone(), @@ -85,6 +87,8 @@ impl Node { raiko_client, basefee_sharing_pctg, proof_request_bypass, + l1_chain_id, + l2_chain_id, ) .await .map_err(|e| anyhow::anyhow!("Failed to create BatchManager: {}", e))?; diff --git a/realtime/src/node/proposal_manager/bridge_handler.rs b/realtime/src/node/proposal_manager/bridge_handler.rs index 37095415..b1e953a6 100644 --- a/realtime/src/node/proposal_manager/bridge_handler.rs +++ b/realtime/src/node/proposal_manager/bridge_handler.rs @@ -80,10 +80,121 @@ pub struct L2Call { pub signal_slot_on_l2: FixedBytes<32>, } +/// Result of routing a UserOp: either it targets L1 (and triggers an L2 bridge call) +/// or it targets L2 (for direct execution on L2, e.g. bridge-out). +pub enum UserOpRouting { + /// L1 UserOp that triggers a bridge deposit (L1→L2). + L1ToL2 { user_op: UserOp, l2_call: L2Call }, + /// L2 UserOp for direct execution on L2 (e.g. bridge-out L2→L1). + L2Direct { user_op: UserOp }, +} + +/// Determine the target chain of a UserOp by checking the EIP-712 signature. +/// +/// The UserOpsSubmitter uses EIP-712 with domain `(name="UserOpsSubmitter", version="1", +/// chainId, verifyingContract=submitter)`. We decode the `executeBatch(ops[], signature)` +/// calldata, compute the EIP-712 digest for both L1 and L2 chain IDs, and ecrecover. +/// The chain ID that produces a valid recovery (non-zero address) is the target chain. +fn detect_target_chain(user_op: &UserOp, l1_chain_id: u64, l2_chain_id: u64) -> Option { + use alloy::sol; + use alloy::sol_types::{SolCall, SolValue}; + + // ABI definition matching UserOpsSubmitter.executeBatch + sol! { + struct UserOpSol { + address target; + uint256 value; + bytes data; + } + + function executeBatch(UserOpSol[] calldata _ops, bytes calldata _signature) external; + } + + // Decode the calldata + let decoded = executeBatchCall::abi_decode(&user_op.calldata).ok()?; + let ops = &decoded._ops; + let signature = &decoded._signature; + + if signature.len() != 65 { + warn!("UserOp id={}: signature length {} != 65", user_op.id, signature.len()); + return None; + } + + // EIP-712 type hashes + let userop_typehash = alloy::primitives::keccak256( + b"UserOp(address target,uint256 value,bytes data)", + ); + let executebatch_typehash = alloy::primitives::keccak256( + b"ExecuteBatch(UserOp[] ops)UserOp(address target,uint256 value,bytes data)", + ); + + // Hash each op: keccak256(abi.encode(typehash, target, value, keccak256(data))) + let mut op_hashes = Vec::with_capacity(ops.len()); + for op in ops { + let data_hash = alloy::primitives::keccak256(&op.data); + let encoded = (userop_typehash, op.target, op.value, data_hash).abi_encode(); + op_hashes.push(alloy::primitives::keccak256(&encoded)); + } + + // keccak256(abi.encodePacked(opHashes)) + let mut packed = Vec::with_capacity(op_hashes.len() * 32); + for h in &op_hashes { + packed.extend_from_slice(h.as_slice()); + } + let ops_array_hash = alloy::primitives::keccak256(&packed); + + // struct hash = keccak256(abi.encode(EXECUTEBATCH_TYPEHASH, ops_array_hash)) + let struct_hash = alloy::primitives::keccak256( + &(executebatch_typehash, ops_array_hash).abi_encode(), + ); + + // Parse the 65-byte signature + let sig = alloy::signers::Signature::try_from(signature.as_ref()).ok()?; + + // Try both chain IDs + for chain_id in [l1_chain_id, l2_chain_id] { + let domain_separator = compute_domain_separator(chain_id, user_op.submitter); + + // EIP-712 digest: keccak256("\x19\x01" || domainSeparator || structHash) + let mut digest_input = Vec::with_capacity(2 + 32 + 32); + digest_input.extend_from_slice(&[0x19, 0x01]); + digest_input.extend_from_slice(domain_separator.as_slice()); + digest_input.extend_from_slice(struct_hash.as_slice()); + let digest = alloy::primitives::keccak256(&digest_input); + + if let Ok(recovered) = sig.recover_address_from_prehash(&digest) { + if recovered != Address::ZERO { + info!( + "UserOp id={}: signature valid for chain_id={} (recovered={})", + user_op.id, chain_id, recovered + ); + return Some(chain_id); + } + } + } + + warn!("UserOp id={}: could not determine target chain", user_op.id); + None +} + +/// Compute EIP-712 domain separator for UserOpsSubmitter(name="UserOpsSubmitter", version="1") +fn compute_domain_separator(chain_id: u64, verifying_contract: Address) -> B256 { + use alloy::sol_types::SolValue; + + let type_hash = alloy::primitives::keccak256( + b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)", + ); + let name_hash = alloy::primitives::keccak256(b"UserOpsSubmitter"); + let version_hash = alloy::primitives::keccak256(b"1"); + + alloy::primitives::keccak256( + &(type_hash, name_hash, version_hash, alloy::primitives::U256::from(chain_id), verifying_contract).abi_encode(), + ) +} + #[derive(Clone)] struct BridgeRpcContext { tx: mpsc::Sender, - l2_tx: mpsc::Sender, status_store: UserOpStatusStore, next_id: Arc, } @@ -92,8 +203,9 @@ pub struct BridgeHandler { ethereum_l1: Arc>, taiko: Arc, rx: Receiver, - l2_rx: Receiver, status_store: UserOpStatusStore, + l1_chain_id: u64, + l2_chain_id: u64, } impl BridgeHandler { @@ -102,14 +214,14 @@ impl BridgeHandler { ethereum_l1: Arc>, taiko: Arc, cancellation_token: CancellationToken, + l1_chain_id: u64, + l2_chain_id: u64, ) -> Result { let (tx, rx) = mpsc::channel::(1024); - let (l2_tx, l2_rx) = mpsc::channel::(1024); let status_store = UserOpStatusStore::open("data/user_op_status")?; let rpc_context = BridgeRpcContext { tx, - l2_tx, status_store: status_store.clone(), next_id: Arc::new(AtomicU64::new(1)), }; @@ -169,33 +281,6 @@ impl BridgeHandler { } })?; - module.register_async_method("surge_sendL2UserOp", |params, ctx, _| async move { - let mut user_op: UserOp = params.parse()?; - let id = ctx.next_id.fetch_add(1, Ordering::Relaxed); - user_op.id = id; - - info!( - "Received L2 UserOp: id={}, submitter={:?}, calldata_len={}", - id, - user_op.submitter, - user_op.calldata.len() - ); - - ctx.status_store.set(id, &UserOpStatus::Pending); - - ctx.l2_tx.send(user_op).await.map_err(|e| { - error!("Failed to send L2 UserOp to queue: {}", e); - ctx.status_store.remove(id); - jsonrpsee::types::ErrorObjectOwned::owned( - -32000, - "Failed to queue L2 user operation", - Some(format!("{}", e)), - ) - })?; - - Ok::(id) - })?; - info!("Bridge handler RPC server starting on {}", addr); let handle = server.start(module); @@ -209,8 +294,9 @@ impl BridgeHandler { ethereum_l1, taiko, rx, - l2_rx, status_store, + l1_chain_id, + l2_chain_id, }) } @@ -218,38 +304,74 @@ impl BridgeHandler { self.status_store.clone() } - pub async fn next_user_op_and_l2_call( + /// Dequeue the next UserOp and route it to the correct chain. + /// + /// Parses the EIP-712 signature in the `executeBatch` calldata to determine + /// which chain the UserOp targets. If signed for L1, simulates on L1 to + /// extract the bridge message. If signed for L2, returns it for direct + /// L2 block inclusion. + pub async fn next_user_op_routed( &mut self, - ) -> Result, anyhow::Error> { - if let Ok(user_op) = self.rx.try_recv() { - if let Some((message_from_l1, signal_slot_on_l2)) = self - .ethereum_l1 - .execution_layer - .find_message_and_signal_slot(user_op.clone()) - .await? - { - return Ok(Some(( - user_op, - L2Call { - message_from_l1, - signal_slot_on_l2, + ) -> Result, anyhow::Error> { + let Ok(user_op) = self.rx.try_recv() else { + return Ok(None); + }; + + let target_chain = detect_target_chain(&user_op, self.l1_chain_id, self.l2_chain_id); + + match target_chain { + Some(chain_id) if chain_id == self.l1_chain_id => { + // L1 UserOp — simulate on L1 to extract bridge message + if let Some((message_from_l1, signal_slot_on_l2)) = self + .ethereum_l1 + .execution_layer + .find_message_and_signal_slot(user_op.clone()) + .await? + { + return Ok(Some(UserOpRouting::L1ToL2 { + user_op, + l2_call: L2Call { + message_from_l1, + signal_slot_on_l2, + }, + })); + } + + // L1 simulation found no bridge message — still an L1 UserOp (non-bridge) + warn!( + "UserOp id={} targets L1 but no bridge message found, treating as L1 UserOp without bridge", + user_op.id + ); + self.status_store.set( + user_op.id, + &UserOpStatus::Rejected { + reason: "L1 UserOp with no bridge message".to_string(), }, - ))); + ); + Ok(None) + } + Some(chain_id) if chain_id == self.l2_chain_id => { + // L2 UserOp — execute directly on L2 + info!( + "UserOp id={} targets L2 (chain_id={}), queueing for L2 execution", + user_op.id, chain_id + ); + Ok(Some(UserOpRouting::L2Direct { user_op })) + } + _ => { + warn!( + "UserOp id={} rejected: could not determine target chain from signature", + user_op.id + ); + self.status_store.set( + user_op.id, + &UserOpStatus::Rejected { + reason: "Could not determine target chain from signature".to_string(), + }, + ); + Ok(None) } - - warn!( - "UserOp id={} rejected: no L2 call found in user op", - user_op.id - ); - self.status_store.set( - user_op.id, - &UserOpStatus::Rejected { - reason: "No L2 call found in user op".to_string(), - }, - ); } - - Ok(None) } pub async fn find_l1_call( @@ -278,13 +400,4 @@ impl BridgeHandler { pub fn has_pending_user_ops(&self) -> bool { !self.rx.is_empty() } - - /// Dequeue the next L2 UserOp (for bridge-out / L2 execution). - pub fn next_l2_user_op(&mut self) -> Option { - self.l2_rx.try_recv().ok() - } - - pub fn has_pending_l2_user_ops(&self) -> bool { - !self.l2_rx.is_empty() - } } diff --git a/realtime/src/node/proposal_manager/mod.rs b/realtime/src/node/proposal_manager/mod.rs index 5945145d..5695ac78 100644 --- a/realtime/src/node/proposal_manager/mod.rs +++ b/realtime/src/node/proposal_manager/mod.rs @@ -63,6 +63,8 @@ impl BatchManager { raiko_client: RaikoClient, basefee_sharing_pctg: u8, proof_request_bypass: bool, + l1_chain_id: u64, + l2_chain_id: u64, ) -> Result { info!( "Batch builder config:\n\ @@ -85,6 +87,8 @@ impl BatchManager { ethereum_l1.clone(), taiko.clone(), cancel_token.clone(), + l1_chain_id, + l2_chain_id, ) .await?, )); @@ -260,96 +264,83 @@ impl BatchManager { } pub async fn has_pending_user_ops(&self) -> bool { - let handler = self.bridge_handler.lock().await; - handler.has_pending_user_ops() || handler.has_pending_l2_user_ops() + self.bridge_handler.lock().await.has_pending_user_ops() } - async fn add_pending_l2_call_to_draft_block( + /// Process all pending UserOps: route each to L1 or L2 based on its EIP-712 signature. + /// + /// - L1→L2 deposits: UserOp added to proposal (for L1 multicall), processMessage tx added to L2 block + /// - L2 direct (bridge-out): UserOp execution tx added to L2 block, L2→L1 relay handled post-execution + async fn add_pending_user_ops_to_draft_block( &mut self, l2_draft_block: &mut L2BlockV2Draft, ) -> Result)>, anyhow::Error> { - if let Some((user_op_data, l2_call)) = self - .bridge_handler - .lock() - .await - .next_user_op_and_l2_call() - .await? - { - info!("Processing pending L2 call: {:?}", l2_call); + use bridge_handler::UserOpRouting; - let l2_call_bridge_tx = self - .taiko - .l2_execution_layer() - .construct_l2_call_tx(l2_call.message_from_l1) - .await?; + let (routing, status_store) = { + let mut handler = self.bridge_handler.lock().await; + let routing = handler.next_user_op_routed().await?; + (routing, handler.status_store()) + }; - info!( - "Inserting L2 call bridge transaction into tx list: {:?}", - l2_call_bridge_tx - ); + let Some(routing) = routing else { + return Ok(None); + }; - l2_draft_block - .prebuilt_tx_list - .tx_list - .push(l2_call_bridge_tx); + match routing { + UserOpRouting::L1ToL2 { user_op, l2_call } => { + info!("Processing L1→L2 deposit: UserOp id={}", user_op.id); - return Ok(Some((user_op_data, l2_call.signal_slot_on_l2))); - } + let l2_call_bridge_tx = self + .taiko + .l2_execution_layer() + .construct_l2_call_tx(l2_call.message_from_l1) + .await?; - Ok(None) - } + info!("Inserting processMessage tx into L2 block"); + l2_draft_block + .prebuilt_tx_list + .tx_list + .push(l2_call_bridge_tx); - /// Check for pending L2 UserOps (bridge-out) and insert their execution - /// transactions into the draft block's tx list. - async fn add_pending_l2_user_ops_to_draft_block( - &mut self, - l2_draft_block: &mut L2BlockV2Draft, - ) -> Result<(), anyhow::Error> { - // Collect all pending L2 UserOps while holding the lock, then release it - let (l2_user_ops, status_store) = { - let mut handler = self.bridge_handler.lock().await; - let mut ops = vec![]; - while let Some(op) = handler.next_l2_user_op() { - ops.push(op); + Ok(Some((user_op, l2_call.signal_slot_on_l2))) } - (ops, handler.status_store()) - }; - - for l2_user_op in l2_user_ops { - info!( - "Processing L2 UserOp id={} submitter={}", - l2_user_op.id, l2_user_op.submitter - ); + UserOpRouting::L2Direct { user_op } => { + info!( + "Processing L2 UserOp (bridge-out): id={} submitter={}", + user_op.id, user_op.submitter + ); - match self - .taiko - .l2_execution_layer() - .construct_l2_user_op_tx(&l2_user_op) - .await - { - Ok(tx) => { - info!("Inserting L2 UserOp execution tx into block tx list"); - l2_draft_block.prebuilt_tx_list.tx_list.push(tx); - status_store.set( - l2_user_op.id, - &bridge_handler::UserOpStatus::Processing { - tx_hash: FixedBytes::default(), - }, - ); - } - Err(e) => { - error!("Failed to construct L2 UserOp tx: {}", e); - status_store.set( - l2_user_op.id, - &bridge_handler::UserOpStatus::Rejected { - reason: format!("Failed to construct L2 tx: {}", e), - }, - ); + match self + .taiko + .l2_execution_layer() + .construct_l2_user_op_tx(&user_op) + .await + { + Ok(tx) => { + info!("Inserting L2 UserOp execution tx into block"); + l2_draft_block.prebuilt_tx_list.tx_list.push(tx); + status_store.set( + user_op.id, + &bridge_handler::UserOpStatus::Processing { + tx_hash: FixedBytes::default(), + }, + ); + } + Err(e) => { + error!("Failed to construct L2 UserOp tx: {}", e); + status_store.set( + user_op.id, + &bridge_handler::UserOpStatus::Rejected { + reason: format!("Failed to construct L2 tx: {}", e), + }, + ); + } } + // No L1 UserOp or signal slot for L2-direct ops + Ok(None) } } - - Ok(()) } async fn add_draft_block_to_proposal( @@ -360,24 +351,18 @@ impl BatchManager { ) -> Result { let mut anchor_signal_slots: Vec> = vec![]; - // 1. L1→L2 deposits: check for pending L1 UserOps that trigger bridge messages - debug!("Checking for pending L2 calls (L1→L2 deposits)"); + debug!("Checking for pending UserOps (L1→L2 deposits and L2 direct)"); if let Some((user_op_data, signal_slot)) = self - .add_pending_l2_call_to_draft_block(&mut l2_draft_block) + .add_pending_user_ops_to_draft_block(&mut l2_draft_block) .await? { self.batch_builder.add_user_op(user_op_data)?; self.batch_builder.add_signal_slot(signal_slot)?; anchor_signal_slots.push(signal_slot); } else { - debug!("No pending L1→L2 deposits"); + debug!("No pending UserOps"); } - // 2. L2→L1 withdrawals: check for pending L2 UserOps (bridge-out) - debug!("Checking for pending L2 UserOps (L2→L1 withdrawals)"); - self.add_pending_l2_user_ops_to_draft_block(&mut l2_draft_block) - .await?; - let payload = self.batch_builder.add_l2_draft_block(l2_draft_block)?; match self From fab9a04f4557ecd26647749f9b9dafe2a190af6d Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 07:29:14 +0300 Subject: [PATCH 04/11] refactor(realtime): use explicit chainId param instead of signature detection The surge_sendUserOp RPC now accepts an optional chainId field in the UserOp params. If chainId matches L2, the UserOp is executed directly on L2. Otherwise defaults to L1 (backwards compatible). Removes the EIP-712 signature parsing logic which was unreliable (ecrecover always returns a non-zero address). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../node/proposal_manager/bridge_handler.rs | 208 ++++-------------- 1 file changed, 46 insertions(+), 162 deletions(-) diff --git a/realtime/src/node/proposal_manager/bridge_handler.rs b/realtime/src/node/proposal_manager/bridge_handler.rs index b1e953a6..1f3c0224 100644 --- a/realtime/src/node/proposal_manager/bridge_handler.rs +++ b/realtime/src/node/proposal_manager/bridge_handler.rs @@ -64,6 +64,8 @@ pub struct UserOp { pub id: u64, pub submitter: Address, pub calldata: Bytes, + #[serde(default, rename = "chainId")] + pub chain_id: u64, } // Data required to build the L1 call transaction initiated by an L2 contract via the bridge @@ -89,109 +91,6 @@ pub enum UserOpRouting { L2Direct { user_op: UserOp }, } -/// Determine the target chain of a UserOp by checking the EIP-712 signature. -/// -/// The UserOpsSubmitter uses EIP-712 with domain `(name="UserOpsSubmitter", version="1", -/// chainId, verifyingContract=submitter)`. We decode the `executeBatch(ops[], signature)` -/// calldata, compute the EIP-712 digest for both L1 and L2 chain IDs, and ecrecover. -/// The chain ID that produces a valid recovery (non-zero address) is the target chain. -fn detect_target_chain(user_op: &UserOp, l1_chain_id: u64, l2_chain_id: u64) -> Option { - use alloy::sol; - use alloy::sol_types::{SolCall, SolValue}; - - // ABI definition matching UserOpsSubmitter.executeBatch - sol! { - struct UserOpSol { - address target; - uint256 value; - bytes data; - } - - function executeBatch(UserOpSol[] calldata _ops, bytes calldata _signature) external; - } - - // Decode the calldata - let decoded = executeBatchCall::abi_decode(&user_op.calldata).ok()?; - let ops = &decoded._ops; - let signature = &decoded._signature; - - if signature.len() != 65 { - warn!("UserOp id={}: signature length {} != 65", user_op.id, signature.len()); - return None; - } - - // EIP-712 type hashes - let userop_typehash = alloy::primitives::keccak256( - b"UserOp(address target,uint256 value,bytes data)", - ); - let executebatch_typehash = alloy::primitives::keccak256( - b"ExecuteBatch(UserOp[] ops)UserOp(address target,uint256 value,bytes data)", - ); - - // Hash each op: keccak256(abi.encode(typehash, target, value, keccak256(data))) - let mut op_hashes = Vec::with_capacity(ops.len()); - for op in ops { - let data_hash = alloy::primitives::keccak256(&op.data); - let encoded = (userop_typehash, op.target, op.value, data_hash).abi_encode(); - op_hashes.push(alloy::primitives::keccak256(&encoded)); - } - - // keccak256(abi.encodePacked(opHashes)) - let mut packed = Vec::with_capacity(op_hashes.len() * 32); - for h in &op_hashes { - packed.extend_from_slice(h.as_slice()); - } - let ops_array_hash = alloy::primitives::keccak256(&packed); - - // struct hash = keccak256(abi.encode(EXECUTEBATCH_TYPEHASH, ops_array_hash)) - let struct_hash = alloy::primitives::keccak256( - &(executebatch_typehash, ops_array_hash).abi_encode(), - ); - - // Parse the 65-byte signature - let sig = alloy::signers::Signature::try_from(signature.as_ref()).ok()?; - - // Try both chain IDs - for chain_id in [l1_chain_id, l2_chain_id] { - let domain_separator = compute_domain_separator(chain_id, user_op.submitter); - - // EIP-712 digest: keccak256("\x19\x01" || domainSeparator || structHash) - let mut digest_input = Vec::with_capacity(2 + 32 + 32); - digest_input.extend_from_slice(&[0x19, 0x01]); - digest_input.extend_from_slice(domain_separator.as_slice()); - digest_input.extend_from_slice(struct_hash.as_slice()); - let digest = alloy::primitives::keccak256(&digest_input); - - if let Ok(recovered) = sig.recover_address_from_prehash(&digest) { - if recovered != Address::ZERO { - info!( - "UserOp id={}: signature valid for chain_id={} (recovered={})", - user_op.id, chain_id, recovered - ); - return Some(chain_id); - } - } - } - - warn!("UserOp id={}: could not determine target chain", user_op.id); - None -} - -/// Compute EIP-712 domain separator for UserOpsSubmitter(name="UserOpsSubmitter", version="1") -fn compute_domain_separator(chain_id: u64, verifying_contract: Address) -> B256 { - use alloy::sol_types::SolValue; - - let type_hash = alloy::primitives::keccak256( - b"EIP712Domain(string name,string version,uint256 chainId,address verifyingContract)", - ); - let name_hash = alloy::primitives::keccak256(b"UserOpsSubmitter"); - let version_hash = alloy::primitives::keccak256(b"1"); - - alloy::primitives::keccak256( - &(type_hash, name_hash, version_hash, alloy::primitives::U256::from(chain_id), verifying_contract).abi_encode(), - ) -} - #[derive(Clone)] struct BridgeRpcContext { tx: mpsc::Sender, @@ -304,12 +203,11 @@ impl BridgeHandler { self.status_store.clone() } - /// Dequeue the next UserOp and route it to the correct chain. + /// Dequeue the next UserOp and route it based on the `chainId` param. /// - /// Parses the EIP-712 signature in the `executeBatch` calldata to determine - /// which chain the UserOp targets. If signed for L1, simulates on L1 to - /// extract the bridge message. If signed for L2, returns it for direct - /// L2 block inclusion. + /// If `chainId` matches L1, simulates on L1 to extract bridge message (L1→L2 deposit). + /// If `chainId` matches L2, returns it for direct L2 block inclusion (bridge-out). + /// If `chainId` is 0 or missing, defaults to L1 (backwards compatible). pub async fn next_user_op_routed( &mut self, ) -> Result, anyhow::Error> { @@ -317,61 +215,47 @@ impl BridgeHandler { return Ok(None); }; - let target_chain = detect_target_chain(&user_op, self.l1_chain_id, self.l2_chain_id); - - match target_chain { - Some(chain_id) if chain_id == self.l1_chain_id => { - // L1 UserOp — simulate on L1 to extract bridge message - if let Some((message_from_l1, signal_slot_on_l2)) = self - .ethereum_l1 - .execution_layer - .find_message_and_signal_slot(user_op.clone()) - .await? - { - return Ok(Some(UserOpRouting::L1ToL2 { - user_op, - l2_call: L2Call { - message_from_l1, - signal_slot_on_l2, - }, - })); - } - - // L1 simulation found no bridge message — still an L1 UserOp (non-bridge) - warn!( - "UserOp id={} targets L1 but no bridge message found, treating as L1 UserOp without bridge", - user_op.id - ); - self.status_store.set( - user_op.id, - &UserOpStatus::Rejected { - reason: "L1 UserOp with no bridge message".to_string(), - }, - ); - Ok(None) - } - Some(chain_id) if chain_id == self.l2_chain_id => { - // L2 UserOp — execute directly on L2 - info!( - "UserOp id={} targets L2 (chain_id={}), queueing for L2 execution", - user_op.id, chain_id - ); - Ok(Some(UserOpRouting::L2Direct { user_op })) - } - _ => { - warn!( - "UserOp id={} rejected: could not determine target chain from signature", - user_op.id - ); - self.status_store.set( - user_op.id, - &UserOpStatus::Rejected { - reason: "Could not determine target chain from signature".to_string(), - }, - ); - Ok(None) - } + let target_chain = if user_op.chain_id == self.l2_chain_id { + self.l2_chain_id + } else { + self.l1_chain_id // default to L1 (includes chain_id == 0) + }; + + if target_chain == self.l2_chain_id { + info!( + "UserOp id={} targets L2 (chainId={}), queueing for L2 execution", + user_op.id, user_op.chain_id + ); + return Ok(Some(UserOpRouting::L2Direct { user_op })); + } + + // L1 UserOp — simulate on L1 to extract bridge message + if let Some((message_from_l1, signal_slot_on_l2)) = self + .ethereum_l1 + .execution_layer + .find_message_and_signal_slot(user_op.clone()) + .await? + { + return Ok(Some(UserOpRouting::L1ToL2 { + user_op, + l2_call: L2Call { + message_from_l1, + signal_slot_on_l2, + }, + })); } + + warn!( + "UserOp id={} targets L1 but no bridge message found", + user_op.id + ); + self.status_store.set( + user_op.id, + &UserOpStatus::Rejected { + reason: "L1 UserOp with no bridge message".to_string(), + }, + ); + Ok(None) } pub async fn find_l1_call( From 58422a0ff5f28a072846e1e1bc4beb89cb5cf60e Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 08:36:27 +0300 Subject: [PATCH 05/11] fix(realtime): increase L2 tx gas limit to 3M MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit processMessage and L2 UserOp transactions need more gas for operations that deploy contracts (e.g. CREATE2 smart wallet deployment via bridge relay). 1M gas was insufficient — the bridge's post-call gas check was failing. Co-Authored-By: Claude Opus 4.6 (1M context) --- realtime/src/l2/execution_layer.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/realtime/src/l2/execution_layer.rs b/realtime/src/l2/execution_layer.rs index d2745a05..69e9ef12 100644 --- a/realtime/src/l2/execution_layer.rs +++ b/realtime/src/l2/execution_layer.rs @@ -103,7 +103,7 @@ impl L2ExecutionLayer { let call_builder = self .anchor .anchorV4WithSignalSlots(anchor_block_params.0, anchor_block_params.1) - .gas(1_000_000) + .gas(3_000_000) .max_fee_per_gas(u128::from(l2_slot_info.base_fee())) .max_priority_fee_per_gas(0) .nonce(nonce) @@ -258,7 +258,7 @@ impl L2ExecutionLayer { let typed_tx = alloy::consensus::TxEip1559 { chain_id: self.chain_id, nonce, - gas_limit: 1_000_000, + gas_limit: 3_000_000, max_fee_per_gas: 1_000_000_000, max_priority_fee_per_gas: 0, to: alloy::primitives::TxKind::Call(user_op.submitter), @@ -329,7 +329,7 @@ impl L2BridgeHandlerOps for L2ExecutionLayer { let call_builder = self .bridge .processMessage(message, Bytes::new()) - .gas(1_000_000) + .gas(3_000_000) .max_fee_per_gas(1_000_000_000) .max_priority_fee_per_gas(0) .nonce(nonce) From 003f79fcde6783f9e7bc0216560eb9c3a9531f86 Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 08:40:02 +0300 Subject: [PATCH 06/11] fix(realtime): only increase gas for bridge/UserOp txs, not anchor The anchor tx has a required gas limit enforced by the L2 engine. Revert anchor to 1M, keep processMessage and UserOp txs at 3M. Co-Authored-By: Claude Opus 4.6 (1M context) --- realtime/src/l2/execution_layer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/realtime/src/l2/execution_layer.rs b/realtime/src/l2/execution_layer.rs index 69e9ef12..21def702 100644 --- a/realtime/src/l2/execution_layer.rs +++ b/realtime/src/l2/execution_layer.rs @@ -103,7 +103,7 @@ impl L2ExecutionLayer { let call_builder = self .anchor .anchorV4WithSignalSlots(anchor_block_params.0, anchor_block_params.1) - .gas(3_000_000) + .gas(1_000_000) .max_fee_per_gas(u128::from(l2_slot_info.base_fee())) .max_priority_fee_per_gas(0) .nonce(nonce) From e184e694acd90429f93de418c8740a701ff4627e Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 12:49:46 +0300 Subject: [PATCH 07/11] fix(realtime): track L2 UserOp status through proving and submission L2 direct UserOps now get ProvingBlock and Executed/Rejected status updates, same as L1 UserOps. Added l2_user_op_ids to Proposal struct and included them in the async submitter's status tracking. Also adds cleanup: status entries are removed from sled after 60s to prevent unbounded disk growth. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../node/proposal_manager/async_submitter.rs | 22 ++++++++++++++++++- .../node/proposal_manager/batch_builder.rs | 10 +++++++++ realtime/src/node/proposal_manager/mod.rs | 2 ++ .../src/node/proposal_manager/proposal.rs | 1 + 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/realtime/src/node/proposal_manager/async_submitter.rs b/realtime/src/node/proposal_manager/async_submitter.rs index 46b7bbbf..fea88f34 100644 --- a/realtime/src/node/proposal_manager/async_submitter.rs +++ b/realtime/src/node/proposal_manager/async_submitter.rs @@ -238,6 +238,15 @@ async fn submission_task( }, ); } + // Also track L2 direct UserOps + for id in &proposal.l2_user_op_ids { + store.set( + *id, + &UserOpStatus::ProvingBlock { + block_id: proposal.checkpoint.blockNumber.to::(), + }, + ); + } } let proof = raiko_client.get_proof(&request).await?; @@ -245,7 +254,8 @@ async fn submission_task( } // Step 2: Send L1 transaction - let user_op_ids: Vec = proposal.user_ops.iter().map(|op| op.id).collect(); + let mut user_op_ids: Vec = proposal.user_ops.iter().map(|op| op.id).collect(); + user_op_ids.extend(&proposal.l2_user_op_ids); let has_user_ops = !user_op_ids.is_empty() && status_store.is_some(); let (tx_hash_sender, tx_hash_receiver) = if has_user_ops { @@ -338,6 +348,16 @@ async fn submission_task( } } } + + // Clean up status entries after 60s (client should have polled by then) + let cleanup_store = store.clone(); + let cleanup_ids = user_op_ids.clone(); + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + for id in &cleanup_ids { + cleanup_store.remove(*id); + } + }); }); } diff --git a/realtime/src/node/proposal_manager/batch_builder.rs b/realtime/src/node/proposal_manager/batch_builder.rs index 48550d4b..59298b35 100644 --- a/realtime/src/node/proposal_manager/batch_builder.rs +++ b/realtime/src/node/proposal_manager/batch_builder.rs @@ -93,6 +93,7 @@ impl BatchBuilder { checkpoint: Checkpoint::default(), last_finalized_block_hash, user_ops: vec![], + l2_user_op_ids: vec![], signal_slots: vec![], l1_calls: vec![], zk_proof: None, @@ -139,6 +140,15 @@ impl BatchBuilder { } } + pub fn add_l2_user_op_id(&mut self, id: u64) -> Result<(), Error> { + if let Some(current_proposal) = self.current_proposal.as_mut() { + current_proposal.l2_user_op_ids.push(id); + Ok(()) + } else { + Err(anyhow::anyhow!("No current batch for L2 user op id")) + } + } + pub fn add_signal_slot(&mut self, signal_slot: FixedBytes<32>) -> Result<&Proposal, Error> { if let Some(current_proposal) = self.current_proposal.as_mut() { current_proposal.signal_slots.push(signal_slot); diff --git a/realtime/src/node/proposal_manager/mod.rs b/realtime/src/node/proposal_manager/mod.rs index 5695ac78..e5b928f4 100644 --- a/realtime/src/node/proposal_manager/mod.rs +++ b/realtime/src/node/proposal_manager/mod.rs @@ -320,6 +320,8 @@ impl BatchManager { Ok(tx) => { info!("Inserting L2 UserOp execution tx into block"); l2_draft_block.prebuilt_tx_list.tx_list.push(tx); + // Track L2 UserOp ID for status updates after submission + let _ = self.batch_builder.add_l2_user_op_id(user_op.id); status_store.set( user_op.id, &bridge_handler::UserOpStatus::Processing { diff --git a/realtime/src/node/proposal_manager/proposal.rs b/realtime/src/node/proposal_manager/proposal.rs index b75ae8c2..230f406f 100644 --- a/realtime/src/node/proposal_manager/proposal.rs +++ b/realtime/src/node/proposal_manager/proposal.rs @@ -29,6 +29,7 @@ pub struct Proposal { // Surge POC fields (carried over) pub user_ops: Vec, + pub l2_user_op_ids: Vec, pub signal_slots: Vec>, pub l1_calls: Vec, From f13ed37821deb0214dfe47acd0c094668a8d6d62 Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 13:46:09 +0300 Subject: [PATCH 08/11] fix: address code review feedback on L2 UserOps PR - Fix misleading Processing(zero_hash) status: remove premature status update for L2Direct UserOps; status remains Pending until async_submitter sets ProvingBlock - Propagate add_l2_user_op_id error instead of silently ignoring with let _, which could leave status entries orphaned forever - Remove redundant target_chain variable, simplify to direct chain_id comparison - Fix doc comment: routing is based on chainId field, not EIP-712 signature - Add SAFETY comment for Recovered::new_unchecked explaining why it's correct Co-Authored-By: Claude Opus 4.6 (1M context) --- realtime/src/l2/execution_layer.rs | 2 ++ .../node/proposal_manager/bridge_handler.rs | 9 ++------ realtime/src/node/proposal_manager/mod.rs | 21 ++++++++++++------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/realtime/src/l2/execution_layer.rs b/realtime/src/l2/execution_layer.rs index 21def702..95ea1e29 100644 --- a/realtime/src/l2/execution_layer.rs +++ b/realtime/src/l2/execution_layer.rs @@ -284,6 +284,8 @@ impl L2ExecutionLayer { debug!("L2 UserOp execution tx hash: {}", tx_envelope.tx_hash()); + // SAFETY: `new_unchecked` is safe here because we just signed `tx_envelope` with + // `l2_call_signer` and `signer_address` is derived from the same key. let tx = Transaction { inner: Recovered::new_unchecked(tx_envelope, signer_address), block_hash: None, diff --git a/realtime/src/node/proposal_manager/bridge_handler.rs b/realtime/src/node/proposal_manager/bridge_handler.rs index 1f3c0224..8c3a7eff 100644 --- a/realtime/src/node/proposal_manager/bridge_handler.rs +++ b/realtime/src/node/proposal_manager/bridge_handler.rs @@ -215,13 +215,8 @@ impl BridgeHandler { return Ok(None); }; - let target_chain = if user_op.chain_id == self.l2_chain_id { - self.l2_chain_id - } else { - self.l1_chain_id // default to L1 (includes chain_id == 0) - }; - - if target_chain == self.l2_chain_id { + // Default to L1 when chain_id is 0 or matches L1 + if user_op.chain_id == self.l2_chain_id { info!( "UserOp id={} targets L2 (chainId={}), queueing for L2 execution", user_op.id, user_op.chain_id diff --git a/realtime/src/node/proposal_manager/mod.rs b/realtime/src/node/proposal_manager/mod.rs index e5b928f4..11957516 100644 --- a/realtime/src/node/proposal_manager/mod.rs +++ b/realtime/src/node/proposal_manager/mod.rs @@ -267,7 +267,7 @@ impl BatchManager { self.bridge_handler.lock().await.has_pending_user_ops() } - /// Process all pending UserOps: route each to L1 or L2 based on its EIP-712 signature. + /// Process all pending UserOps: route each to L1 or L2 based on its chainId field. /// /// - L1→L2 deposits: UserOp added to proposal (for L1 multicall), processMessage tx added to L2 block /// - L2 direct (bridge-out): UserOp execution tx added to L2 block, L2→L1 relay handled post-execution @@ -321,13 +321,18 @@ impl BatchManager { info!("Inserting L2 UserOp execution tx into block"); l2_draft_block.prebuilt_tx_list.tx_list.push(tx); // Track L2 UserOp ID for status updates after submission - let _ = self.batch_builder.add_l2_user_op_id(user_op.id); - status_store.set( - user_op.id, - &bridge_handler::UserOpStatus::Processing { - tx_hash: FixedBytes::default(), - }, - ); + if let Err(e) = self.batch_builder.add_l2_user_op_id(user_op.id) { + error!( + "Failed to track L2 UserOp id={}: {}. Status will not be updated.", + user_op.id, e + ); + status_store.set( + user_op.id, + &bridge_handler::UserOpStatus::Rejected { + reason: format!("Failed to track UserOp: {}", e), + }, + ); + } } Err(e) => { error!("Failed to construct L2 UserOp tx: {}", e); From e67e4db81baf1259480055b3b787beb0f0df16b3 Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 13:56:40 +0300 Subject: [PATCH 09/11] fix: address second round of review feedback - Mark L2 UserOp IDs as Rejected on L1 multicall failure (were stuck at ProvingBlock) - Track L2 UserOp ID before inserting tx into block (prevents executed-but-Rejected state) - Reject UserOps with unknown chainId instead of silently treating as L1 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/node/proposal_manager/async_submitter.rs | 10 +++++++++- .../src/node/proposal_manager/bridge_handler.rs | 16 +++++++++++++++- realtime/src/node/proposal_manager/mod.rs | 10 ++++++---- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/realtime/src/node/proposal_manager/async_submitter.rs b/realtime/src/node/proposal_manager/async_submitter.rs index fea88f34..742cff2f 100644 --- a/realtime/src/node/proposal_manager/async_submitter.rs +++ b/realtime/src/node/proposal_manager/async_submitter.rs @@ -276,7 +276,7 @@ async fn submission_task( .send_batch_to_l1(proposal.clone(), tx_hash_sender, tx_result_sender) .await { - // Mark user ops as rejected on failure + // Mark all user ops (L1 and L2) as rejected on failure if let Some(ref store) = status_store { let reason = format!("L1 multicall failed: {}", err); for op in &proposal.user_ops { @@ -287,6 +287,14 @@ async fn submission_task( }, ); } + for id in &proposal.l2_user_op_ids { + store.set( + *id, + &UserOpStatus::Rejected { + reason: reason.clone(), + }, + ); + } } return Err(err); } diff --git a/realtime/src/node/proposal_manager/bridge_handler.rs b/realtime/src/node/proposal_manager/bridge_handler.rs index 8c3a7eff..b4366845 100644 --- a/realtime/src/node/proposal_manager/bridge_handler.rs +++ b/realtime/src/node/proposal_manager/bridge_handler.rs @@ -215,7 +215,6 @@ impl BridgeHandler { return Ok(None); }; - // Default to L1 when chain_id is 0 or matches L1 if user_op.chain_id == self.l2_chain_id { info!( "UserOp id={} targets L2 (chainId={}), queueing for L2 execution", @@ -224,6 +223,21 @@ impl BridgeHandler { return Ok(Some(UserOpRouting::L2Direct { user_op })); } + // Reject unknown chain IDs (0 is allowed as default-to-L1) + if user_op.chain_id != 0 && user_op.chain_id != self.l1_chain_id { + warn!( + "UserOp id={} has unknown chainId={}, rejecting", + user_op.id, user_op.chain_id + ); + self.status_store.set( + user_op.id, + &UserOpStatus::Rejected { + reason: format!("Unknown chainId: {}", user_op.chain_id), + }, + ); + return Ok(None); + } + // L1 UserOp — simulate on L1 to extract bridge message if let Some((message_from_l1, signal_slot_on_l2)) = self .ethereum_l1 diff --git a/realtime/src/node/proposal_manager/mod.rs b/realtime/src/node/proposal_manager/mod.rs index 11957516..fefcc204 100644 --- a/realtime/src/node/proposal_manager/mod.rs +++ b/realtime/src/node/proposal_manager/mod.rs @@ -318,12 +318,11 @@ impl BatchManager { .await { Ok(tx) => { - info!("Inserting L2 UserOp execution tx into block"); - l2_draft_block.prebuilt_tx_list.tx_list.push(tx); - // Track L2 UserOp ID for status updates after submission + // Track L2 UserOp ID first — only insert tx if tracking succeeds, + // otherwise we'd execute on L2 but show Rejected in the status store. if let Err(e) = self.batch_builder.add_l2_user_op_id(user_op.id) { error!( - "Failed to track L2 UserOp id={}: {}. Status will not be updated.", + "Failed to track L2 UserOp id={}: {}. Dropping tx.", user_op.id, e ); status_store.set( @@ -332,6 +331,9 @@ impl BatchManager { reason: format!("Failed to track UserOp: {}", e), }, ); + } else { + info!("Inserting L2 UserOp execution tx into block"); + l2_draft_block.prebuilt_tx_list.tx_list.push(tx); } } Err(e) => { From 1d94dba888b1fa1f3504175436a4c53e7f35285a Mon Sep 17 00:00:00 2001 From: smartprogrammer93 Date: Sun, 29 Mar 2026 14:05:49 +0300 Subject: [PATCH 10/11] fix: clarify debug log when L2Direct UserOp is handled The "No pending UserOps" log was misleading when an L2Direct op was processed, since add_pending_user_ops_to_draft_block returns None for both "nothing queued" and "L2Direct handled". Updated to distinguish. Co-Authored-By: Claude Opus 4.6 (1M context) --- realtime/src/node/proposal_manager/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/realtime/src/node/proposal_manager/mod.rs b/realtime/src/node/proposal_manager/mod.rs index fefcc204..cb9816ad 100644 --- a/realtime/src/node/proposal_manager/mod.rs +++ b/realtime/src/node/proposal_manager/mod.rs @@ -369,7 +369,7 @@ impl BatchManager { self.batch_builder.add_signal_slot(signal_slot)?; anchor_signal_slots.push(signal_slot); } else { - debug!("No pending UserOps"); + debug!("No L1→L2 UserOps (L2 direct ops, if any, were handled inline)"); } let payload = self.batch_builder.add_l2_draft_block(l2_draft_block)?; From 6460d3e888b617b1456bf23c4aff651de7564442 Mon Sep 17 00:00:00 2001 From: AnshuJalan Date: Sun, 29 Mar 2026 18:25:13 +0530 Subject: [PATCH 11/11] logs --- common/src/utils/logging.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/common/src/utils/logging.rs b/common/src/utils/logging.rs index a52fe711..23c0e25a 100644 --- a/common/src/utils/logging.rs +++ b/common/src/utils/logging.rs @@ -13,6 +13,11 @@ pub fn init_logging() { .parse() .expect("assert: can parse env filter directive"), ) + .add_directive( + "h2=info" + .parse() + .expect("assert: can parse env filter directive"), + ) .add_directive( "alloy_transport=info" .parse()