diff --git a/common/src/utils/logging.rs b/common/src/utils/logging.rs index a52fe711..23c0e25a 100644 --- a/common/src/utils/logging.rs +++ b/common/src/utils/logging.rs @@ -13,6 +13,11 @@ pub fn init_logging() { .parse() .expect("assert: can parse env filter directive"), ) + .add_directive( + "h2=info" + .parse() + .expect("assert: can parse env filter directive"), + ) .add_directive( "alloy_transport=info" .parse() diff --git a/realtime/src/l2/execution_layer.rs b/realtime/src/l2/execution_layer.rs index b06b7b1e..95ea1e29 100644 --- a/realtime/src/l2/execution_layer.rs +++ b/realtime/src/l2/execution_layer.rs @@ -234,6 +234,69 @@ impl L2ExecutionLayer { } } +// Surge: L2 UserOp execution + +use crate::node::proposal_manager::bridge_handler::UserOp; + +impl L2ExecutionLayer { + /// Construct a signed L2 transaction that executes a UserOp on L2 + /// by forwarding the calldata to the submitter smart wallet. + pub async fn construct_l2_user_op_tx(&self, user_op: &UserOp) -> Result { + use alloy::signers::local::PrivateKeySigner; + use std::str::FromStr; + + debug!("Constructing L2 UserOp execution tx for submitter={}", user_op.submitter); + + let signer_address = self.l2_call_signer.get_address(); + + let nonce = self + .provider + .get_transaction_count(signer_address) + .await + .map_err(|e| anyhow::anyhow!("Failed to get nonce for L2 UserOp tx: {}", e))?; + + let typed_tx = alloy::consensus::TxEip1559 { + chain_id: self.chain_id, + nonce, + gas_limit: 3_000_000, + max_fee_per_gas: 1_000_000_000, + max_priority_fee_per_gas: 0, + to: alloy::primitives::TxKind::Call(user_op.submitter), + value: alloy::primitives::U256::ZERO, + input: user_op.calldata.clone(), + access_list: Default::default(), + }; + + let signature = match self.l2_call_signer.as_ref() { + Signer::Web3signer(web3signer, address) => { + let signature_bytes = web3signer.sign_transaction(&typed_tx, *address).await?; + Signature::try_from(signature_bytes.as_slice()) + .map_err(|e| anyhow::anyhow!("Failed to parse signature: {}", e))? + } + Signer::PrivateKey(private_key, _) => { + let signer = PrivateKeySigner::from_str(private_key.as_str())?; + AlloySigner::sign_hash(&signer, &typed_tx.signature_hash()).await? + } + }; + + let sig_tx = typed_tx.into_signed(signature); + let tx_envelope = TxEnvelope::from(sig_tx); + + debug!("L2 UserOp execution tx hash: {}", tx_envelope.tx_hash()); + + // SAFETY: `new_unchecked` is safe here because we just signed `tx_envelope` with + // `l2_call_signer` and `signer_address` is derived from the same key. + let tx = Transaction { + inner: Recovered::new_unchecked(tx_envelope, signer_address), + block_hash: None, + block_number: None, + transaction_index: None, + effective_gas_price: None, + }; + Ok(tx) + } +} + // Surge: L2 EL ops for Bridge Handler pub trait L2BridgeHandlerOps { @@ -268,7 +331,7 @@ impl L2BridgeHandlerOps for L2ExecutionLayer { let call_builder = self .bridge .processMessage(message, Bytes::new()) - .gas(1_000_000) + .gas(3_000_000) .max_fee_per_gas(1_000_000_000) .max_priority_fee_per_gas(0) .nonce(nonce) diff --git a/realtime/src/lib.rs b/realtime/src/lib.rs index 5d65fd90..bf84bced 100644 --- a/realtime/src/lib.rs +++ b/realtime/src/lib.rs @@ -32,12 +32,6 @@ pub async fn create_realtime_node( ) -> Result<(), Error> { info!("Creating RealTime node"); - if !config.disable_bridging { - return Err(anyhow::anyhow!( - "Bridging is not implemented. Exiting RealTime node creation." - )); - } - let realtime_config = RealtimeConfig::read_env_variables() .map_err(|e| anyhow::anyhow!("Failed to read RealTime configuration: {}", e))?; info!("RealTime config: {}", realtime_config); @@ -137,6 +131,12 @@ pub async fn create_realtime_node( let proof_request_bypass = realtime_config.proof_request_bypass; let raiko_client = raiko::RaikoClient::new(&realtime_config); + let l1_chain_id = { + use common::l1::traits::ELTrait; + ethereum_l1.execution_layer.common().chain_id() + }; + let l2_chain_id = taiko.l2_execution_layer().chain_id; + let node = Node::new( node_config, cancel_token.clone(), @@ -151,6 +151,8 @@ pub async fn create_realtime_node( protocol_config.basefee_sharing_pctg, preconf_only, proof_request_bypass, + l1_chain_id, + l2_chain_id, ) .await .map_err(|e| anyhow::anyhow!("Failed to create Node: {}", e))?; diff --git a/realtime/src/node/mod.rs b/realtime/src/node/mod.rs index 33c5cf28..dc7f4c98 100644 --- a/realtime/src/node/mod.rs +++ b/realtime/src/node/mod.rs @@ -56,6 +56,8 @@ impl Node { basefee_sharing_pctg: u8, preconf_only: bool, proof_request_bypass: bool, + l1_chain_id: u64, + l2_chain_id: u64, ) -> Result { let operator = Operator::new( ethereum_l1.execution_layer.clone(), @@ -85,6 +87,8 @@ impl Node { raiko_client, basefee_sharing_pctg, proof_request_bypass, + l1_chain_id, + l2_chain_id, ) .await .map_err(|e| anyhow::anyhow!("Failed to create BatchManager: {}", e))?; diff --git a/realtime/src/node/proposal_manager/async_submitter.rs b/realtime/src/node/proposal_manager/async_submitter.rs index 46b7bbbf..742cff2f 100644 --- a/realtime/src/node/proposal_manager/async_submitter.rs +++ b/realtime/src/node/proposal_manager/async_submitter.rs @@ -238,6 +238,15 @@ async fn submission_task( }, ); } + // Also track L2 direct UserOps + for id in &proposal.l2_user_op_ids { + store.set( + *id, + &UserOpStatus::ProvingBlock { + block_id: proposal.checkpoint.blockNumber.to::(), + }, + ); + } } let proof = raiko_client.get_proof(&request).await?; @@ -245,7 +254,8 @@ async fn submission_task( } // Step 2: Send L1 transaction - let user_op_ids: Vec = proposal.user_ops.iter().map(|op| op.id).collect(); + let mut user_op_ids: Vec = proposal.user_ops.iter().map(|op| op.id).collect(); + user_op_ids.extend(&proposal.l2_user_op_ids); let has_user_ops = !user_op_ids.is_empty() && status_store.is_some(); let (tx_hash_sender, tx_hash_receiver) = if has_user_ops { @@ -266,7 +276,7 @@ async fn submission_task( .send_batch_to_l1(proposal.clone(), tx_hash_sender, tx_result_sender) .await { - // Mark user ops as rejected on failure + // Mark all user ops (L1 and L2) as rejected on failure if let Some(ref store) = status_store { let reason = format!("L1 multicall failed: {}", err); for op in &proposal.user_ops { @@ -277,6 +287,14 @@ async fn submission_task( }, ); } + for id in &proposal.l2_user_op_ids { + store.set( + *id, + &UserOpStatus::Rejected { + reason: reason.clone(), + }, + ); + } } return Err(err); } @@ -338,6 +356,16 @@ async fn submission_task( } } } + + // Clean up status entries after 60s (client should have polled by then) + let cleanup_store = store.clone(); + let cleanup_ids = user_op_ids.clone(); + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + for id in &cleanup_ids { + cleanup_store.remove(*id); + } + }); }); } diff --git a/realtime/src/node/proposal_manager/batch_builder.rs b/realtime/src/node/proposal_manager/batch_builder.rs index 48550d4b..59298b35 100644 --- a/realtime/src/node/proposal_manager/batch_builder.rs +++ b/realtime/src/node/proposal_manager/batch_builder.rs @@ -93,6 +93,7 @@ impl BatchBuilder { checkpoint: Checkpoint::default(), last_finalized_block_hash, user_ops: vec![], + l2_user_op_ids: vec![], signal_slots: vec![], l1_calls: vec![], zk_proof: None, @@ -139,6 +140,15 @@ impl BatchBuilder { } } + pub fn add_l2_user_op_id(&mut self, id: u64) -> Result<(), Error> { + if let Some(current_proposal) = self.current_proposal.as_mut() { + current_proposal.l2_user_op_ids.push(id); + Ok(()) + } else { + Err(anyhow::anyhow!("No current batch for L2 user op id")) + } + } + pub fn add_signal_slot(&mut self, signal_slot: FixedBytes<32>) -> Result<&Proposal, Error> { if let Some(current_proposal) = self.current_proposal.as_mut() { current_proposal.signal_slots.push(signal_slot); diff --git a/realtime/src/node/proposal_manager/bridge_handler.rs b/realtime/src/node/proposal_manager/bridge_handler.rs index 3818dfce..b4366845 100644 --- a/realtime/src/node/proposal_manager/bridge_handler.rs +++ b/realtime/src/node/proposal_manager/bridge_handler.rs @@ -64,6 +64,8 @@ pub struct UserOp { pub id: u64, pub submitter: Address, pub calldata: Bytes, + #[serde(default, rename = "chainId")] + pub chain_id: u64, } // Data required to build the L1 call transaction initiated by an L2 contract via the bridge @@ -80,6 +82,15 @@ pub struct L2Call { pub signal_slot_on_l2: FixedBytes<32>, } +/// Result of routing a UserOp: either it targets L1 (and triggers an L2 bridge call) +/// or it targets L2 (for direct execution on L2, e.g. bridge-out). +pub enum UserOpRouting { + /// L1 UserOp that triggers a bridge deposit (L1→L2). + L1ToL2 { user_op: UserOp, l2_call: L2Call }, + /// L2 UserOp for direct execution on L2 (e.g. bridge-out L2→L1). + L2Direct { user_op: UserOp }, +} + #[derive(Clone)] struct BridgeRpcContext { tx: mpsc::Sender, @@ -92,6 +103,8 @@ pub struct BridgeHandler { taiko: Arc, rx: Receiver, status_store: UserOpStatusStore, + l1_chain_id: u64, + l2_chain_id: u64, } impl BridgeHandler { @@ -100,6 +113,8 @@ impl BridgeHandler { ethereum_l1: Arc>, taiko: Arc, cancellation_token: CancellationToken, + l1_chain_id: u64, + l2_chain_id: u64, ) -> Result { let (tx, rx) = mpsc::channel::(1024); let status_store = UserOpStatusStore::open("data/user_op_status")?; @@ -179,6 +194,8 @@ impl BridgeHandler { taiko, rx, status_store, + l1_chain_id, + l2_chain_id, }) } @@ -186,37 +203,67 @@ impl BridgeHandler { self.status_store.clone() } - pub async fn next_user_op_and_l2_call( + /// Dequeue the next UserOp and route it based on the `chainId` param. + /// + /// If `chainId` matches L1, simulates on L1 to extract bridge message (L1→L2 deposit). + /// If `chainId` matches L2, returns it for direct L2 block inclusion (bridge-out). + /// If `chainId` is 0 or missing, defaults to L1 (backwards compatible). + pub async fn next_user_op_routed( &mut self, - ) -> Result, anyhow::Error> { - if let Ok(user_op) = self.rx.try_recv() { - if let Some((message_from_l1, signal_slot_on_l2)) = self - .ethereum_l1 - .execution_layer - .find_message_and_signal_slot(user_op.clone()) - .await? - { - return Ok(Some(( - user_op, - L2Call { - message_from_l1, - signal_slot_on_l2, - }, - ))); - } + ) -> Result, anyhow::Error> { + let Ok(user_op) = self.rx.try_recv() else { + return Ok(None); + }; + if user_op.chain_id == self.l2_chain_id { + info!( + "UserOp id={} targets L2 (chainId={}), queueing for L2 execution", + user_op.id, user_op.chain_id + ); + return Ok(Some(UserOpRouting::L2Direct { user_op })); + } + + // Reject unknown chain IDs (0 is allowed as default-to-L1) + if user_op.chain_id != 0 && user_op.chain_id != self.l1_chain_id { warn!( - "UserOp id={} rejected: no L2 call found in user op", - user_op.id + "UserOp id={} has unknown chainId={}, rejecting", + user_op.id, user_op.chain_id ); self.status_store.set( user_op.id, &UserOpStatus::Rejected { - reason: "No L2 call found in user op".to_string(), + reason: format!("Unknown chainId: {}", user_op.chain_id), }, ); + return Ok(None); + } + + // L1 UserOp — simulate on L1 to extract bridge message + if let Some((message_from_l1, signal_slot_on_l2)) = self + .ethereum_l1 + .execution_layer + .find_message_and_signal_slot(user_op.clone()) + .await? + { + return Ok(Some(UserOpRouting::L1ToL2 { + user_op, + l2_call: L2Call { + message_from_l1, + signal_slot_on_l2, + }, + })); } + warn!( + "UserOp id={} targets L1 but no bridge message found", + user_op.id + ); + self.status_store.set( + user_op.id, + &UserOpStatus::Rejected { + reason: "L1 UserOp with no bridge message".to_string(), + }, + ); Ok(None) } diff --git a/realtime/src/node/proposal_manager/mod.rs b/realtime/src/node/proposal_manager/mod.rs index f85c3d6f..cb9816ad 100644 --- a/realtime/src/node/proposal_manager/mod.rs +++ b/realtime/src/node/proposal_manager/mod.rs @@ -63,6 +63,8 @@ impl BatchManager { raiko_client: RaikoClient, basefee_sharing_pctg: u8, proof_request_bypass: bool, + l1_chain_id: u64, + l2_chain_id: u64, ) -> Result { info!( "Batch builder config:\n\ @@ -85,6 +87,8 @@ impl BatchManager { ethereum_l1.clone(), taiko.clone(), cancel_token.clone(), + l1_chain_id, + l2_chain_id, ) .await?, )); @@ -263,39 +267,89 @@ impl BatchManager { self.bridge_handler.lock().await.has_pending_user_ops() } - async fn add_pending_l2_call_to_draft_block( + /// Process all pending UserOps: route each to L1 or L2 based on its chainId field. + /// + /// - L1→L2 deposits: UserOp added to proposal (for L1 multicall), processMessage tx added to L2 block + /// - L2 direct (bridge-out): UserOp execution tx added to L2 block, L2→L1 relay handled post-execution + async fn add_pending_user_ops_to_draft_block( &mut self, l2_draft_block: &mut L2BlockV2Draft, ) -> Result)>, anyhow::Error> { - if let Some((user_op_data, l2_call)) = self - .bridge_handler - .lock() - .await - .next_user_op_and_l2_call() - .await? - { - info!("Processing pending L2 call: {:?}", l2_call); + use bridge_handler::UserOpRouting; - let l2_call_bridge_tx = self - .taiko - .l2_execution_layer() - .construct_l2_call_tx(l2_call.message_from_l1) - .await?; + let (routing, status_store) = { + let mut handler = self.bridge_handler.lock().await; + let routing = handler.next_user_op_routed().await?; + (routing, handler.status_store()) + }; - info!( - "Inserting L2 call bridge transaction into tx list: {:?}", - l2_call_bridge_tx - ); + let Some(routing) = routing else { + return Ok(None); + }; - l2_draft_block - .prebuilt_tx_list - .tx_list - .push(l2_call_bridge_tx); + match routing { + UserOpRouting::L1ToL2 { user_op, l2_call } => { + info!("Processing L1→L2 deposit: UserOp id={}", user_op.id); - return Ok(Some((user_op_data, l2_call.signal_slot_on_l2))); - } + let l2_call_bridge_tx = self + .taiko + .l2_execution_layer() + .construct_l2_call_tx(l2_call.message_from_l1) + .await?; - Ok(None) + info!("Inserting processMessage tx into L2 block"); + l2_draft_block + .prebuilt_tx_list + .tx_list + .push(l2_call_bridge_tx); + + Ok(Some((user_op, l2_call.signal_slot_on_l2))) + } + UserOpRouting::L2Direct { user_op } => { + info!( + "Processing L2 UserOp (bridge-out): id={} submitter={}", + user_op.id, user_op.submitter + ); + + match self + .taiko + .l2_execution_layer() + .construct_l2_user_op_tx(&user_op) + .await + { + Ok(tx) => { + // Track L2 UserOp ID first — only insert tx if tracking succeeds, + // otherwise we'd execute on L2 but show Rejected in the status store. + if let Err(e) = self.batch_builder.add_l2_user_op_id(user_op.id) { + error!( + "Failed to track L2 UserOp id={}: {}. Dropping tx.", + user_op.id, e + ); + status_store.set( + user_op.id, + &bridge_handler::UserOpStatus::Rejected { + reason: format!("Failed to track UserOp: {}", e), + }, + ); + } else { + info!("Inserting L2 UserOp execution tx into block"); + l2_draft_block.prebuilt_tx_list.tx_list.push(tx); + } + } + Err(e) => { + error!("Failed to construct L2 UserOp tx: {}", e); + status_store.set( + user_op.id, + &bridge_handler::UserOpStatus::Rejected { + reason: format!("Failed to construct L2 tx: {}", e), + }, + ); + } + } + // No L1 UserOp or signal slot for L2-direct ops + Ok(None) + } + } } async fn add_draft_block_to_proposal( @@ -306,17 +360,18 @@ impl BatchManager { ) -> Result { let mut anchor_signal_slots: Vec> = vec![]; - debug!("Checking for pending L2 calls"); + debug!("Checking for pending UserOps (L1→L2 deposits and L2 direct)"); if let Some((user_op_data, signal_slot)) = self - .add_pending_l2_call_to_draft_block(&mut l2_draft_block) + .add_pending_user_ops_to_draft_block(&mut l2_draft_block) .await? { self.batch_builder.add_user_op(user_op_data)?; self.batch_builder.add_signal_slot(signal_slot)?; anchor_signal_slots.push(signal_slot); } else { - debug!("No pending L2 calls"); + debug!("No L1→L2 UserOps (L2 direct ops, if any, were handled inline)"); } + let payload = self.batch_builder.add_l2_draft_block(l2_draft_block)?; match self diff --git a/realtime/src/node/proposal_manager/proposal.rs b/realtime/src/node/proposal_manager/proposal.rs index b75ae8c2..230f406f 100644 --- a/realtime/src/node/proposal_manager/proposal.rs +++ b/realtime/src/node/proposal_manager/proposal.rs @@ -29,6 +29,7 @@ pub struct Proposal { // Surge POC fields (carried over) pub user_ops: Vec, + pub l2_user_op_ids: Vec, pub signal_slots: Vec>, pub l1_calls: Vec,