Skip to content

Commit aac0199

Browse files
mskrzypkowsCopilot
andauthored
Proposal bulding in separate thread (#898)
* Extended poposal transaction monitor with transaction building. Building the blob sidecar (EIP-7584) takes more than 0.5 seconds. Moving it to a separate thread keeps the main pre-confirmation heartbeat running at consistent intervals. Proposal builder passed as parameter for transaction monitor. * Updated dep * review, fixed wrong handling of error from submitted trasactions * Corrected handling not submitted proposal which need to be submitted again * comment * Update shasta/src/node/proposal_manager/batch_builder.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update shasta/src/l1/proposal_tx_builder.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Build failed error instead of estimation error * Renamed batch to proposal for shasta (#901) --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent a3360dd commit aac0199

16 files changed

Lines changed: 356 additions & 222 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/src/l1/transaction_error.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
#[derive(Debug)]
1+
#[derive(Debug, Clone)]
22
pub enum TransactionError {
3+
BuildFailed,
34
EstimationFailed,
45
EstimationTooEarly,
56
TransactionReverted,

common/src/l2/engine.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ impl L2Engine {
8080
}
8181
}
8282

83+
/// Batch (pacaya) is equivalent to the proposal (shasta).
8384
pub async fn get_pending_l2_tx_list(
8485
&self,
8586
base_fee: u64,

common/src/shared/transaction_monitor.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,20 @@ use alloy::{
1313
};
1414
use alloy_json_rpc::RpcError;
1515
use anyhow::Error;
16+
use std::future::Future;
1617
use std::{sync::Arc, time::Duration};
1718
use tokio::sync::Mutex;
1819
use tokio::sync::mpsc::Sender;
1920
use tokio::task::JoinHandle;
2021
use tracing::{debug, error, info, warn};
2122

23+
/// Trait for types that can asynchronously build a `TransactionRequest`.
24+
/// Implement this on protocol-specific builders (e.g. `ProposalTxBuilder`)
25+
/// to pass them into `monitor_new_transaction_with_builder`.
26+
pub trait TransactionRequestBuilder: Send + 'static {
27+
fn build(self) -> impl Future<Output = Result<TransactionRequest, TransactionError>> + Send;
28+
}
29+
2230
// Transaction status enum
2331
#[derive(Debug, Clone, PartialEq)]
2432
pub enum TxStatus {
@@ -118,6 +126,36 @@ impl TransactionMonitor {
118126
Ok(())
119127
}
120128

129+
/// Monitor a transaction built by a deferred builder.
130+
/// The builder future is awaited inside the spawned task, so this method returns immediately.
131+
/// If the builder fails, the error is sent via the error notification channel.
132+
pub async fn monitor_new_transaction_with_builder(
133+
&self,
134+
tx_builder: impl TransactionRequestBuilder,
135+
nonce: u64,
136+
) -> Result<(), Error> {
137+
let mut guard = self.join_handle.lock().await;
138+
if let Some(join_handle) = guard.as_ref()
139+
&& !join_handle.is_finished()
140+
{
141+
return Err(Error::msg(
142+
"Cannot monitor new transaction, previous transaction is in progress",
143+
));
144+
}
145+
146+
let monitor_thread = TransactionMonitorThread::new(
147+
self.provider.clone(),
148+
self.config.clone(),
149+
nonce,
150+
self.error_notification_channel.clone(),
151+
self.metrics.clone(),
152+
self.chain_id,
153+
);
154+
let join_handle = monitor_thread.spawn_monitoring_task_with_builder(tx_builder);
155+
*guard = Some(join_handle);
156+
Ok(())
157+
}
158+
121159
pub async fn is_transaction_in_progress(&self) -> Result<bool, Error> {
122160
let guard = self.join_handle.lock().await;
123161
if let Some(join_handle) = guard.as_ref() {
@@ -152,6 +190,23 @@ impl TransactionMonitorThread {
152190
})
153191
}
154192

193+
pub fn spawn_monitoring_task_with_builder(
194+
mut self,
195+
tx_builder: impl TransactionRequestBuilder,
196+
) -> JoinHandle<()> {
197+
tokio::spawn(async move {
198+
match tx_builder.build().await {
199+
Ok(tx) => {
200+
self.monitor_transaction(tx).await;
201+
}
202+
Err(err) => {
203+
error!("Transaction builder failed: {}", err);
204+
self.send_error_signal(err).await;
205+
}
206+
}
207+
})
208+
}
209+
155210
async fn monitor_transaction(&mut self, mut tx: TransactionRequest) {
156211
tx.set_nonce(self.nonce);
157212
if !matches!(tx.buildable_type(), Some(TxType::Eip1559 | TxType::Eip4844)) {

pacaya/src/node/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,10 @@ impl Node {
746746
warn!("Propose batch transaction executed too late.");
747747
return Ok(());
748748
}
749+
TransactionError::BuildFailed => {
750+
self.cancel_token.cancel_on_critical_error();
751+
return Err(anyhow::anyhow!("Transaction build failed, exiting"));
752+
}
749753
}
750754

751755
Ok(())

permissionless/src/node/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,10 @@ impl Node {
308308
warn!("Proposal transaction executed too late.");
309309
Ok(())
310310
}
311+
TransactionError::BuildFailed => {
312+
self.cancel_token.cancel_on_critical_error();
313+
Err(anyhow::anyhow!("Transaction build failed, exiting"))
314+
}
311315
}
312316
}
313317
}

shasta/src/l1/execution_layer.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl PreconfOperator for ExecutionLayer {
180180
}
181181

182182
impl ExecutionLayer {
183-
pub async fn send_batch_to_l1(
183+
pub async fn send_proposal_to_l1(
184184
&self,
185185
l2_blocks: Vec<L2BlockV2>,
186186
num_forced_inclusion: u16,
@@ -191,28 +191,27 @@ impl ExecutionLayer {
191191
num_forced_inclusion,
192192
);
193193

194-
// Build propose transaction
195-
let builder = ProposalTxBuilder::new(self.provider.clone(), self.extra_gas_percentage);
196-
let tx = builder
197-
.build_propose_tx(
198-
l2_blocks,
199-
self.common().preconfer_address(),
200-
self.contract_addresses.shasta_inbox,
201-
num_forced_inclusion,
202-
)
203-
.await
204-
.map_err(|e| Error::msg(format!("build_propose_tx failed: {e}")))?;
205-
206194
let pending_nonce = self.get_preconfer_nonce_pending().await.map_err(|e| {
207195
Error::msg(format!(
208-
"get_preconfer_nonce_pending (send_batch_to_l1) failed: {e}"
196+
"get_preconfer_nonce_pending (send_proposal_to_l1) failed: {e}"
209197
))
210198
})?;
211-
// Spawn a monitor for this transaction
199+
200+
// Build the transaction asynchronously inside the monitor's spawned task.
201+
// This moves the ~650ms KZG sidecar computation off the hot path.
202+
let tx_builder = ProposalTxBuilder::new(
203+
self.provider.clone(),
204+
self.extra_gas_percentage,
205+
l2_blocks,
206+
self.common().preconfer_address(),
207+
self.contract_addresses.shasta_inbox,
208+
num_forced_inclusion,
209+
);
210+
212211
self.transaction_monitor
213-
.monitor_new_transaction(tx, pending_nonce)
212+
.monitor_new_transaction_with_builder(tx_builder, pending_nonce)
214213
.await
215-
.map_err(|e| Error::msg(format!("Sending batch to L1 failed: {e}")))?;
214+
.map_err(|e| Error::msg(format!("Sending proposal to L1 failed: {e}")))?;
216215

217216
Ok(())
218217
}
Lines changed: 82 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use alloy::{
22
consensus::SidecarBuilder,
3+
eips::eip7594::BlobTransactionSidecarEip7594,
34
network::{TransactionBuilder, TransactionBuilder7594},
45
primitives::{
56
Address, Bytes,
@@ -12,36 +13,91 @@ use alloy_json_rpc::RpcError;
1213
use anyhow::{Context, Error};
1314
use common::l1::{fees_per_gas::FeesPerGas, tools, transaction_error::TransactionError};
1415
use common::shared::l2_block_v2::L2BlockV2;
16+
use common::shared::transaction_monitor::TransactionRequestBuilder;
1517
use taiko_bindings::inbox::{IInbox::ProposeInput, Inbox, LibBlobs::BlobReference};
1618
use taiko_protocol::shasta::{
1719
BlobCoder,
1820
manifest::{BlockManifest, DerivationSourceManifest},
1921
};
20-
use tracing::warn;
22+
use tracing::{info, warn};
23+
24+
/// Build the EIP-7594 blob sidecar from L2 blocks. This is a CPU-intensive operation
25+
/// (KZG commitment + cell proof computation).
26+
fn build_sidecar_from_l2_blocks(
27+
l2_blocks: &[L2BlockV2],
28+
) -> Result<BlobTransactionSidecarEip7594, Error> {
29+
let start = std::time::Instant::now();
30+
31+
let mut block_manifests = Vec::with_capacity(l2_blocks.len());
32+
for l2_block in l2_blocks {
33+
block_manifests.push(BlockManifest {
34+
timestamp: l2_block.timestamp_sec,
35+
coinbase: l2_block.coinbase,
36+
anchor_block_number: l2_block.anchor_block_number,
37+
gas_limit: l2_block.gas_limit_without_anchor,
38+
transactions: l2_block
39+
.prebuilt_tx_list
40+
.tx_list
41+
.iter()
42+
.map(|tx| tx.clone().into())
43+
.collect(),
44+
});
45+
}
46+
47+
let manifest = DerivationSourceManifest {
48+
blocks: block_manifests,
49+
};
50+
51+
let manifest_data = manifest
52+
.encode_and_compress()
53+
.map_err(|e| Error::msg(format!("Can't encode and compress manifest: {e}")))?;
54+
55+
let sidecar_builder: SidecarBuilder<BlobCoder> = SidecarBuilder::from_slice(&manifest_data);
56+
let sidecar = sidecar_builder
57+
.build_7594()
58+
.map_err(|e| Error::msg(format!("sidecar builder build_7594 failed: {e}")))?;
59+
60+
info!(
61+
"⏱️ build_sidecar_from_l2_blocks ({} blocks, {} bytes compressed) took {:?}",
62+
l2_blocks.len(),
63+
manifest_data.len(),
64+
start.elapsed()
65+
);
66+
67+
Ok(sidecar)
68+
}
2169

2270
pub struct ProposalTxBuilder {
2371
provider: DynProvider,
2472
extra_gas_percentage: u64,
73+
l2_blocks: Vec<L2BlockV2>,
74+
from: Address,
75+
to: Address,
76+
num_forced_inclusion: u16,
2577
}
2678

2779
impl ProposalTxBuilder {
28-
pub fn new(provider: DynProvider, extra_gas_percentage: u64) -> Self {
80+
pub fn new(
81+
provider: DynProvider,
82+
extra_gas_percentage: u64,
83+
l2_blocks: Vec<L2BlockV2>,
84+
from: Address,
85+
to: Address,
86+
num_forced_inclusion: u16,
87+
) -> Self {
2988
Self {
3089
provider,
3190
extra_gas_percentage,
91+
l2_blocks,
92+
from,
93+
to,
94+
num_forced_inclusion,
3295
}
3396
}
3497

35-
#[allow(clippy::too_many_arguments)]
36-
pub async fn build_propose_tx(
37-
&self,
38-
l2_blocks: Vec<L2BlockV2>,
39-
from: Address,
40-
to: Address,
41-
num_forced_inclusion: u16,
42-
) -> Result<TransactionRequest, Error> {
98+
async fn build_propose_tx(&self) -> Result<TransactionRequest, Error> {
4399
let tx_blob = self
44-
.build_propose_blob(l2_blocks, from, to, num_forced_inclusion)
100+
.build_propose_blob()
45101
.await
46102
.map_err(|e| Error::msg(format!("build_propose_blob failed: {e}")))?;
47103
let tx_blob_gas = match self.provider.estimate_gas(tx_blob.clone()).await {
@@ -80,44 +136,8 @@ impl ProposalTxBuilder {
80136
Ok(tx_blob)
81137
}
82138

83-
#[allow(clippy::too_many_arguments)]
84-
pub async fn build_propose_blob(
85-
&self,
86-
l2_blocks: Vec<L2BlockV2>,
87-
from: Address,
88-
to: Address,
89-
num_forced_inclusion: u16,
90-
) -> Result<TransactionRequest, Error> {
91-
let mut block_manifests = <Vec<BlockManifest>>::with_capacity(l2_blocks.len());
92-
for l2_block in &l2_blocks {
93-
// Build the block manifests.
94-
block_manifests.push(BlockManifest {
95-
timestamp: l2_block.timestamp_sec,
96-
coinbase: l2_block.coinbase,
97-
anchor_block_number: l2_block.anchor_block_number,
98-
gas_limit: l2_block.gas_limit_without_anchor,
99-
transactions: l2_block
100-
.prebuilt_tx_list
101-
.tx_list
102-
.iter()
103-
.map(|tx| tx.clone().into())
104-
.collect(),
105-
});
106-
}
107-
108-
// Build the proposal manifest.
109-
let manifest = DerivationSourceManifest {
110-
blocks: block_manifests,
111-
};
112-
113-
let manifest_data = manifest
114-
.encode_and_compress()
115-
.map_err(|e| Error::msg(format!("Can't encode and compress manifest: {e}")))?;
116-
117-
let sidecar_builder: SidecarBuilder<BlobCoder> = SidecarBuilder::from_slice(&manifest_data);
118-
let sidecar = sidecar_builder
119-
.build_7594()
120-
.map_err(|e| Error::msg(format!("sidecar builder build_7594 failed: {e}")))?;
139+
async fn build_propose_blob(&self) -> Result<TransactionRequest, Error> {
140+
let sidecar = build_sidecar_from_l2_blocks(&self.l2_blocks)?;
121141

122142
// Build the propose input.
123143
let input = ProposeInput {
@@ -131,19 +151,19 @@ impl ProposalTxBuilder {
131151
.context("blobs len try_into")?,
132152
offset: U24::ZERO,
133153
},
134-
numForcedInclusions: num_forced_inclusion,
154+
numForcedInclusions: self.num_forced_inclusion,
135155
};
136156

137-
let inbox = Inbox::new(to, self.provider.clone());
157+
let inbox = Inbox::new(self.to, self.provider.clone());
138158
let encoded_proposal_input = inbox
139159
.encodeProposeInput(input)
140160
.call()
141161
.await
142162
.map_err(|e| Error::msg(format!("inbox encodeProposeInput failed: {e}")))?;
143163

144164
let tx = TransactionRequest::default()
145-
.with_from(from)
146-
.with_to(to)
165+
.with_from(self.from)
166+
.with_to(self.to)
147167
.with_blob_sidecar(sidecar)
148168
.with_call(&Inbox::proposeCall {
149169
_lookahead: Bytes::new(),
@@ -153,3 +173,12 @@ impl ProposalTxBuilder {
153173
Ok(tx)
154174
}
155175
}
176+
177+
impl TransactionRequestBuilder for ProposalTxBuilder {
178+
async fn build(self) -> Result<TransactionRequest, TransactionError> {
179+
self.build_propose_tx().await.map_err(|e| {
180+
e.downcast::<TransactionError>()
181+
.unwrap_or(TransactionError::BuildFailed)
182+
})
183+
}
184+
}

0 commit comments

Comments
 (0)