diff --git a/configs/default.toml b/configs/default.toml index bffb9cc..3e9305e 100644 --- a/configs/default.toml +++ b/configs/default.toml @@ -20,6 +20,8 @@ token_contract_address = "0x2000000000000000000000000000000000000001" recipient_distribution_factor = 20 # 1/20 of accounts receive transfers. max_transfer_amount = 10 +batch_size = 1000 # Number of transactions to generate before pushing to queue. + [rate_limiting] initial_ratelimit = 100 # txs/s diff --git a/crescendo/src/bin/generate_genesis_alloc.rs b/crescendo/src/bin/generate_genesis_alloc.rs index 3d927fb..9ddf89e 100644 --- a/crescendo/src/bin/generate_genesis_alloc.rs +++ b/crescendo/src/bin/generate_genesis_alloc.rs @@ -18,7 +18,7 @@ fn main() -> Result<(), Box> { const NUM_ACCOUNTS: u32 = 50_000; const MNEMONIC: &str = "test test test test test test test test test test test junk"; - println!("Generating {} accounts...", NUM_ACCOUNTS); + println!("Generating {NUM_ACCOUNTS} accounts..."); let genesis_alloc: BTreeMap = (0..NUM_ACCOUNTS) .into_par_iter() @@ -27,9 +27,9 @@ fn main() -> Result<(), Box> { let signer = MnemonicBuilder::::default().phrase(MNEMONIC).index(worker_id).unwrap().build().unwrap(); - let address = secret_key_to_address(&signer.credential()); + let address = secret_key_to_address(signer.credential()); - (format!("{:?}", address), AccountBalance { balance: "0xD3C21BCECCEDA1000000".to_string() }) + (format!("{address:?}"), AccountBalance { balance: "0xD3C21BCECCEDA1000000".to_string() }) }) .collect(); @@ -37,7 +37,7 @@ fn main() -> Result<(), Box> { let json = serde_json::to_string_pretty(&genesis_alloc)?; fs::write(output_path, json)?; - println!("\nSuccessfully generated {} accounts!", NUM_ACCOUNTS); + println!("\nSuccessfully generated {NUM_ACCOUNTS} accounts!"); println!("Accounts saved to: {}", output_path.display()); Ok(()) diff --git a/crescendo/src/config.rs b/crescendo/src/config.rs index 1422c0f..9afd289 100644 --- a/crescendo/src/config.rs +++ b/crescendo/src/config.rs @@ -82,6 +82,8 @@ pub struct TxGenWorkerConfig { pub token_contract_address: String, pub recipient_distribution_factor: u32, pub max_transfer_amount: u64, + + pub batch_size: u32, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crescendo/src/main.rs b/crescendo/src/main.rs index d6e55b6..1de31c7 100644 --- a/crescendo/src/main.rs +++ b/crescendo/src/main.rs @@ -4,7 +4,6 @@ use std::thread; use std::time::Duration; use clap::Parser; -use core_affinity; use mimalloc::MiMalloc; mod config; @@ -70,7 +69,7 @@ async fn main() { let connections_per_network_worker = config::get().network_worker.total_connections / worker_counts[&WorkerType::Network]; - println!("[*] Connections per network worker: {}", connections_per_network_worker); + println!("[*] Connections per network worker: {connections_per_network_worker}"); // TODO: Having the assign_workers function do this would be cleaner. let mut tx_gen_worker_id = 0; diff --git a/crescendo/src/tx_queue.rs b/crescendo/src/tx_queue.rs index b37a01f..4757ebc 100644 --- a/crescendo/src/tx_queue.rs +++ b/crescendo/src/tx_queue.rs @@ -37,9 +37,9 @@ impl TxQueue { pub static TX_QUEUE: std::sync::LazyLock = std::sync::LazyLock::new(TxQueue::new); impl TxQueue { - pub fn push_tx(&self, tx: Vec) { - self.total_added.fetch_add(1, Ordering::Relaxed); - self.queue.lock().push_back(tx); + pub fn push_txs(&self, txs: Vec>) { + self.total_added.fetch_add(txs.len() as u64, Ordering::Relaxed); + self.queue.lock().extend(txs); } pub fn queue_len(&self) -> usize { @@ -47,15 +47,25 @@ impl TxQueue { } pub async fn pop_at_most(&self, max_count: usize) -> Option>> { - let mut queue = self.queue.lock(); - let allowed = (0..queue.len().min(max_count)).take_while(|_| self.rate_limiter.try_wait().is_ok()).count(); + // Assume the queue has sufficient items for now. + let allowed = (0..max_count).take_while(|_| self.rate_limiter.try_wait().is_ok()).count(); if allowed == 0 { return None; + } + + // Scope to release lock asap. + let drained = { + let mut queue = self.queue.lock(); + let to_drain = allowed.min(queue.len()); + if to_drain == 0 { + return None; + } + queue.drain(..to_drain).collect::>() }; - self.total_popped.fetch_add(allowed as u64, Ordering::Relaxed); + self.total_popped.fetch_add(drained.len() as u64, Ordering::Relaxed); - Some(queue.drain(..allowed).collect()) + Some(drained) } pub async fn start_reporter(&self, measurement_interval: std::time::Duration) { diff --git a/crescendo/src/utils.rs b/crescendo/src/utils.rs index 13cab93..ee472ac 100644 --- a/crescendo/src/utils.rs +++ b/crescendo/src/utils.rs @@ -13,7 +13,7 @@ pub fn increase_nofile_limit(min_limit: u64) -> io::Result { println!("[*] At startup, file descriptor limit: soft = {soft}, hard = {hard}"); if hard < min_limit { - panic!("[!] File descriptor hard limit is too low. Please increase it to at least {}.", min_limit); + panic!("[!] File descriptor hard limit is too low. Please increase it to at least {min_limit}."); } if soft != hard { @@ -61,7 +61,7 @@ pub fn format_ranges(nums: &[usize]) -> String { if start == end { ranges.push(start.to_string()); } else { - ranges.push(format!("{}-{}", start, end)); + ranges.push(format!("{start}-{end}")); } i += 1; diff --git a/crescendo/src/workers.rs b/crescendo/src/workers.rs index 93337be..b953748 100644 --- a/crescendo/src/workers.rs +++ b/crescendo/src/workers.rs @@ -57,7 +57,7 @@ pub fn assign_workers( if let Some(core_id) = core_ids.pop() { result.push((core_id, worker_type)); *worker_counts.entry(worker_type).or_insert(0) += 1; - worker_cores.entry(worker_type).or_insert_with(Vec::new).push(core_id); + worker_cores.entry(worker_type).or_default().push(core_id); remaining_cores -= 1; } } @@ -74,7 +74,7 @@ pub fn assign_workers( if let Some(core_id) = core_ids.pop() { result.push((core_id, *worker_type)); *worker_counts.entry(*worker_type).or_insert(0) += 1; - worker_cores.entry(*worker_type).or_insert_with(Vec::new).push(core_id); + worker_cores.entry(*worker_type).or_default().push(core_id); remaining_cores -= 1; } } @@ -87,12 +87,12 @@ pub fn assign_workers( if let Some(core_id) = core_ids.pop() { result.push((core_id, worker_type)); *worker_counts.entry(worker_type).or_insert(0) += 1; - worker_cores.entry(worker_type).or_insert_with(Vec::new).push(core_id); + worker_cores.entry(worker_type).or_default().push(core_id); } } } - println!("[+] Spawning {} workers:", total_starting_cores); + println!("[+] Spawning {total_starting_cores} workers:"); for (worker_type, count) in worker_counts.clone() { if log_core_ranges { if let Some(cores) = worker_cores.get(&worker_type) { @@ -100,13 +100,13 @@ pub fn assign_workers( core_ids.sort(); let core_str = match core_ids.as_slice() { - [single] => format!("core {}", single), + [single] => format!("core {single}"), ids => format!("cores {}", format_ranges(ids)), }; - println!("- {:?}: {} ({})", worker_type, count, core_str); + println!("- {worker_type:?}: {count} ({core_str})"); } } else { - println!("- {:?}: {}", worker_type, count); + println!("- {worker_type:?}: {count}"); } } diff --git a/crescendo/src/workers/network.rs b/crescendo/src/workers/network.rs index 7860c45..409c33c 100644 --- a/crescendo/src/workers/network.rs +++ b/crescendo/src/workers/network.rs @@ -82,19 +82,19 @@ pub async fn network_worker(worker_id: usize) { NETWORK_STATS.inc_requests_by(txs.len() - error_count); } Err(e) => { - eprintln!("[!] Failed to read response body: {:?}", e); + eprintln!("[!] Failed to read response body: {e:?}"); NETWORK_STATS.inc_errors_by(txs.len()); tokio::time::sleep(Duration::from_millis(config.error_sleep_ms)).await; } } } else { - println!("[!] Request did not have OK status: {:?}", res); + println!("[!] Request did not have OK status: {res:?}"); NETWORK_STATS.inc_errors_by(txs.len()); tokio::time::sleep(Duration::from_millis(100)).await; } } Err(e) => { - eprintln!("[!] Request failed: {:?}", e); + eprintln!("[!] Request failed: {e:?}"); NETWORK_STATS.inc_errors_by(txs.len()); tokio::time::sleep(Duration::from_millis(100)).await; } diff --git a/crescendo/src/workers/tx_gen.rs b/crescendo/src/workers/tx_gen.rs index a6a40b8..c1fa9ff 100644 --- a/crescendo/src/workers/tx_gen.rs +++ b/crescendo/src/workers/tx_gen.rs @@ -44,11 +44,12 @@ sol! { } pub fn tx_gen_worker(_worker_id: u32) { + let config = &config::get().tx_gen_worker; + let mut rng = rand::rng(); + let mut tx_batch = Vec::with_capacity(config.batch_size as usize); loop { - let config = &config::get().tx_gen_worker; - let account_index = rng.random_range(0..config.num_accounts); // Acount we'll be sending from. // Get and increment nonce atomically. @@ -83,7 +84,12 @@ pub fn tx_gen_worker(_worker_id: u32) { }, ); - TX_QUEUE.push_tx(tx); + tx_batch.push(tx); + + // Once we've accumulated batch_size transactions, drain them all to the queue. + if tx_batch.len() >= config.batch_size as usize { + TX_QUEUE.push_txs(std::mem::take(&mut tx_batch)); + } } }