Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/tests/e2e-tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async fn test_send_transaction() {
operations::manager::DEFAULT_L2_NETWORK_URL,
amount,
to_address,
true,
)
.await
.expect("Failed to transfer tokens");
Expand Down Expand Up @@ -395,6 +396,7 @@ async fn test_eth_transaction_rpc(#[case] test_name: &str) {
operations::manager::DEFAULT_L2_NETWORK_URL,
U256::from(1_000_000_000u64), // 1 Gwei
operations::manager::DEFAULT_L2_NEW_ACC1_ADDRESS,
true,
)
.await
.expect("Failed to send transaction");
Expand Down
190 changes: 174 additions & 16 deletions crates/tests/flashblocks-tests/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ use alloy_primitives::{hex, keccak256, Address, U256};
use alloy_sol_types::{sol, SolCall};
use eyre::Result;
use futures_util::StreamExt;
use scopeguard::defer;
use serde_json::json;
use serde_json::{json, Value};
use std::{
collections::HashSet,
collections::{HashMap, HashSet},
str::FromStr,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -49,6 +48,7 @@ async fn fb_smoke_test() {
operations::DEFAULT_L2_NETWORK_URL_FB,
U256::from(operations::GWEI),
test_address,
true,
)
.await
.expect("Failed to send tx");
Expand Down Expand Up @@ -211,6 +211,7 @@ async fn fb_benchmark_native_tx_confirmation() {
operations::DEFAULT_L2_NETWORK_URL,
U256::from(operations::GWEI),
test_address,
true,
)
.await
.unwrap();
Expand Down Expand Up @@ -953,17 +954,13 @@ async fn fb_subscription_test() -> Result<()> {
println!("Connected: {:?}", response.status());
let (_, mut read) = ws_stream.split();

// Guarantee cleanup on scope exit
defer! {
println!("Closing WebSocket connection");
};

let mut remaining: HashSet<String> = HashSet::new();
for i in 0..num_txs {
let tx_hash = operations::native_balance_transfer(
operations::DEFAULT_L2_NETWORK_URL_FB,
U256::from(operations::GWEI),
test_address,
true,
)
.await?;
println!("Sent tx {}: {}", i + 1, tx_hash);
Expand Down Expand Up @@ -1056,8 +1053,6 @@ async fn fb_subscription_test() -> Result<()> {
#[ignore = "Requires flashblocks WebSocket server with flashblocks subscription support"]
#[tokio::test]
async fn fb_eth_subscribe_test() -> Result<()> {
use serde_json::Value;

let ws_url = operations::manager::DEFAULT_WEBSOCKET_URL;
let sender_address = operations::DEFAULT_RICH_ADDRESS;
let test_address = operations::DEFAULT_L2_NEW_ACC1_ADDRESS;
Expand All @@ -1066,7 +1061,6 @@ async fn fb_eth_subscribe_test() -> Result<()> {
let ws_client = operations::websocket::EthWebSocketClient::connect(ws_url).await?;
println!("Connected successfully");

// Subscribe to flashblocks with specific parameters
let subscription_params = json!({
"headerInfo": true,
"subTxFilter": {
Expand All @@ -1080,11 +1074,6 @@ async fn fb_eth_subscribe_test() -> Result<()> {
ws_client.subscribe("flashblocks", Some(subscription_params)).await?;
println!("Subscription created successfully");

// Guarantee cleanup on scope exit
defer! {
println!("Unsubscribing and closing WebSocket connection");
};

let num_txs = 3;

let mut remaining: HashSet<String> = HashSet::new();
Expand All @@ -1093,6 +1082,7 @@ async fn fb_eth_subscribe_test() -> Result<()> {
operations::DEFAULT_L2_NETWORK_URL_FB,
U256::from(operations::GWEI),
test_address,
true,
)
.await?;
println!("Sent tx {}: {}", i + 1, tx_hash);
Expand Down Expand Up @@ -1181,3 +1171,171 @@ async fn fb_eth_subscribe_test() -> Result<()> {

Ok(())
}

#[ignore = "Requires flashblocks WebSocket server with flashblocks subscription support"]
#[tokio::test]
async fn fb_benchmark_new_heads_subscription_test() -> Result<()> {
let ws_url = operations::manager::DEFAULT_WEBSOCKET_URL;

println!("Connecting to flashblocks WebSocket at {ws_url}...");
let ws_client = operations::websocket::EthWebSocketClient::connect(ws_url).await?;
println!("Connected successfully");

let subscription_params = json!({
"headerInfo": true
});

let mut flashblocks_subscription: jsonrpsee::core::client::Subscription<Value> =
ws_client.subscribe("flashblocks", Some(subscription_params)).await?;
println!("Flashblocks subscription created successfully");

let mut eth_subscription: jsonrpsee::core::client::Subscription<Value> =
ws_client.subscribe("newHeads", None).await?;
println!("Eth newHeads subscription created successfully");

let mut total_sub_time_diff = Duration::ZERO;
let mut heights = HashMap::<u64, Instant>::new();
let mut count = 0;

println!("Starting benchmark: comparing flashblocks vs eth newHeads subscriptions...");

while count < ITERATIONS {
tokio::select! {
// Flashblocks subscription message
Some(result) = flashblocks_subscription.next() => {
match result {
Ok(notification) => {
if let Some(header) = notification.get("header")
&& let Some(number_hex) = header.get("number").and_then(|n| n.as_str())
&& let Ok(height) = u64::from_str_radix(number_hex.trim_start_matches("0x"), 16)
{
heights.insert(height, Instant::now());
println!("Flashblocks received block #{height}");
}
}
Err(e) => {
eprintln!("Flashblocks subscription error: {}", e);
break;
}
}
}
// Eth newHeads subscription message
Some(result) = eth_subscription.next() => {
match result {
Ok(notification) => {
if let Some(number_hex) = notification.get("number").and_then(|n| n.as_str())
&& let Ok(height) = u64::from_str_radix(number_hex.trim_start_matches("0x"), 16)
{
if let Some(flashblocks_time) = heights.get(&height) {
let time_diff = flashblocks_time.elapsed();
count += 1;

if count == 1 {
continue;
}

total_sub_time_diff += time_diff;
println!(
"Flashblocks newHeads sub is faster than ETH newHeads sub by: {time_diff:?} (block #{height})"
);
} else {
println!("Eth newHeads received block #{height} (not in flashblocks yet)");
}
}
}
Err(e) => {
eprintln!("Eth subscription error: {e}");
break;
}
}
}
}
}

let avg_time_diff = total_sub_time_diff / (ITERATIONS - 1) as u32;
println!("\n=== Benchmark Results ===");
println!("Avg Flashblocks newHeads sub is faster than ETH newHeads sub by: {avg_time_diff:?}");

Ok(())
}

#[ignore = "Requires flashblocks WebSocket server with flashblocks subscription support"]
#[tokio::test]
async fn fb_benchmark_new_transactions_subscription_test() -> Result<()> {
let ws_url = operations::manager::DEFAULT_WEBSOCKET_URL;
let sender_address = operations::DEFAULT_RICH_ADDRESS;
let test_address = operations::DEFAULT_L2_NEW_ACC1_ADDRESS;

println!("Connecting to flashblocks WebSocket at {ws_url}...");
let ws_client = operations::websocket::EthWebSocketClient::connect(ws_url).await?;
println!("Connected successfully");

let subscription_params = json!({
"headerInfo": true,
"subTxFilter": {
"txInfo": true,
"txReceipt": true,
"subscribeAddresses": [sender_address, test_address]
}
});

let mut flashblocks_subscription: jsonrpsee::core::client::Subscription<Value> =
ws_client.subscribe("flashblocks", Some(subscription_params)).await?;
println!("Flashblocks subscription created successfully");

let mut total_flashblocks_duration = Duration::ZERO;

for i in 0..ITERATIONS {
println!("\n--- Iteration {i} ---");

let tx_hash = operations::native_balance_transfer(
operations::DEFAULT_L2_NETWORK_URL_FB,
U256::from(operations::GWEI),
test_address,
false,
)
.await?;

println!("Sent transaction: {tx_hash}");
let start_time = Instant::now();

// Wait for transaction to appear in flashblocks subscription
let sub_duration = tokio::time::timeout(TX_CONFIRMATION_TIMEOUT, async {
loop {
match flashblocks_subscription.next().await {
Some(Ok(notification)) => {
if operations::contains_tx_hash(&notification, &tx_hash) {
return Ok(start_time.elapsed());
}
}
Some(Err(e)) => {
eprintln!("Flashblocks subscription error: {e}");
return Err(eyre::eyre!("Subscription error: {e}"));
}
None => {
eprintln!("Flashblocks subscription ended unexpectedly");
return Err(eyre::eyre!("Subscription ended"));
}
}
}
})
.await
.map_err(|_| {
eyre::eyre!("Timeout waiting for transaction in flashblocks subscription")
})??;

println!("Flashblocks newTx sub duration: {sub_duration:?}");

if i == 0 {
continue;
}

total_flashblocks_duration += sub_duration;
}

let avg_duration = total_flashblocks_duration / (ITERATIONS - 1) as u32;
println!("\n=== Benchmark Results ===");
println!("Avg Flashblocks newTx sub duration: {avg_duration:?}");

Ok(())
}
72 changes: 47 additions & 25 deletions crates/tests/operations/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn native_balance_transfer(
endpoint_url: &str,
amount: U256,
to_address: &str,
wait_for_confirmation: bool,
) -> Result<String> {
let signer = PrivateKeySigner::from_str(DEFAULT_RICH_PRIVATE_KEY.trim_start_matches("0x"))?;
let wallet = EthereumWallet::from(signer.clone());
Expand All @@ -63,11 +64,15 @@ pub async fn native_balance_transfer(
let pending_tx = provider.send_transaction(tx).await?;

let tx_hash = *pending_tx.tx_hash();
println!("Tx sent: {tx_hash:#x}, waiting for tx receipt confirmation.");

// Wait for the transaction to be mined
wait_for_tx_mined(endpoint_url, &format!("{tx_hash:#x}")).await?;
println!("Transaction {tx_hash:#x} confirmed successfully");
if wait_for_confirmation {
println!("Tx sent: {tx_hash:#x}, waiting for tx receipt confirmation.");
wait_for_tx_mined(endpoint_url, &format!("{tx_hash:#x}")).await?;
println!("Transaction {tx_hash:#x} confirmed successfully");
} else {
println!("Tx sent: {tx_hash:#x}");
}

Ok(format!("{tx_hash:#x}"))
}

Expand All @@ -78,7 +83,8 @@ pub async fn fund_address_and_wait_for_balance(
to_address: &str,
funding_amount: U256,
) -> Result<()> {
let funding_tx_hash = native_balance_transfer(endpoint_url, funding_amount, to_address).await?;
let funding_tx_hash =
native_balance_transfer(endpoint_url, funding_amount, to_address, true).await?;

let receipt = eth_get_transaction_receipt(client, &funding_tx_hash).await?;
let funding_block_num = receipt["blockNumber"]
Expand Down Expand Up @@ -372,16 +378,15 @@ pub async fn wait_for_tx_mined(endpoint_url: &str, tx_hash: &str) -> Result<serd
let client = create_test_client(endpoint_url);
tokio::time::timeout(DEFAULT_TIMEOUT_TX_TO_BE_MINED, async {
loop {
if let Ok(receipt) = eth_get_transaction_receipt(&client, tx_hash).await {
if !receipt.is_null() {
let status = receipt["status"]
.as_str()
.ok_or(eyre!("tx receipt missing status field"))?;
if status == "0x1" {
return Ok(receipt);
} else {
return Err(eyre!("tx execution failed with status: {}", status));
}
if let Ok(receipt) = eth_get_transaction_receipt(&client, tx_hash).await
&& !receipt.is_null()
{
let status =
receipt["status"].as_str().ok_or(eyre!("tx receipt missing status field"))?;
if status == "0x1" {
return Ok(receipt);
} else {
return Err(eyre!("tx execution failed with status: {}", status));
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
Expand Down Expand Up @@ -476,16 +481,16 @@ pub fn get_refund_counter_from_trace(trace_result: &serde_json::Value, opcode: &
// Iterate through each log entry
for entry in struct_logs {
// Check if this log has the matching opcode
if let Some(op) = entry["op"].as_str() {
if op == opcode {
// Extract the refund value
if let Some(refund) = entry["refund"].as_u64() {
refund_counter = refund;
break;
} else if let Some(refund) = entry["refund"].as_f64() {
refund_counter = refund as u64;
break;
}
if let Some(op) = entry["op"].as_str()
&& op == opcode
{
// Extract the refund value
if let Some(refund) = entry["refund"].as_u64() {
refund_counter = refund;
break;
} else if let Some(refund) = entry["refund"].as_f64() {
refund_counter = refund as u64;
break;
}
}
}
Expand Down Expand Up @@ -592,3 +597,20 @@ pub fn validate_internal_transaction(

Ok(())
}

/// Check if a flashblocks notification contains a specific transaction hash
pub fn contains_tx_hash(notification: &serde_json::Value, tx_hash: &str) -> bool {
let Some(transactions) = notification.get("transactions").and_then(|t| t.as_array()) else {
return false;
};

for tx in transactions {
if let Some(tx_hash_str) = tx.get("txHash").and_then(|h| h.as_str())
&& tx_hash_str == tx_hash
{
return true;
}
}

false
}