From 8cf063a5dfd3ce672b0deefec266fe1b2290ab6a Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 29 May 2026 17:05:00 -0400 Subject: [PATCH 1/2] feat: add scenario access_list with placeholder resolution Supersedes #581. Adds an `access_list` field to FunctionCallDefinition that accepts {placeholder} strings in `address` and `storageKeys`, resolved during the loose-to-strict conversion via the existing templater + DB map. Coexists with max_priority_fee_per_gas (#580) and alloy 2.0 (#561). --- crates/cli/src/server/static/openrpc.json | 10 +-- crates/core/src/generator/function_def.rs | 71 +++++++++++++++++++ crates/core/src/generator/templater.rs | 85 ++++++++++++++++++++++- crates/core/src/generator/trait.rs | 42 ++++++++++- crates/testfile/src/lib.rs | 69 ++++++++++++++++++ docs/creating_scenarios.md | 24 +++++++ 6 files changed, 293 insertions(+), 8 deletions(-) diff --git a/crates/cli/src/server/static/openrpc.json b/crates/cli/src/server/static/openrpc.json index d01d1e38..d0d5d67b 100644 --- a/crates/cli/src/server/static/openrpc.json +++ b/crates/cli/src/server/static/openrpc.json @@ -15,7 +15,7 @@ "BlobsCliArgs": {"file":"crates/cli/src/default_scenarios/blobs.rs","line":10}, "BuilderParams": {"file":"crates/cli/src/server/rpc_server/types.rs","line":227}, "BuiltinScenarioCli": {"file":"crates/cli/src/default_scenarios/builtin.rs","line":33}, - "BundleCallDefinition": {"file":"crates/core/src/generator/function_def.rs","line":61}, + "BundleCallDefinition": {"file":"crates/core/src/generator/function_def.rs","line":79}, "BundleTypeCli": {"file":"crates/cli/src/commands/common.rs","line":438}, "CompiledContract": {"file":"crates/core/src/generator/create_def.rs","line":8}, "ContenderSessionInfo": {"file":"crates/cli/src/server/sessions.rs","line":128}, @@ -27,9 +27,10 @@ "EthereumOpcode": {"file":"crates/cli/src/default_scenarios/eth_functions/opcodes.rs","line":10}, "EthereumPrecompile": {"file":"crates/cli/src/default_scenarios/eth_functions/precompiles.rs","line":11}, "FillBlockCliArgs": {"file":"crates/cli/src/default_scenarios/fill_block.rs","line":17}, - "FunctionCallDefinition": {"file":"crates/core/src/generator/function_def.rs","line":12}, + "FunctionCallDefinition": {"file":"crates/core/src/generator/function_def.rs","line":25}, "FundAccountsParams": {"file":"crates/cli/src/server/rpc_server/types.rs","line":273}, - "FuzzParam": {"file":"crates/core/src/generator/function_def.rs","line":176}, + "FuzzParam": {"file":"crates/core/src/generator/function_def.rs","line":200}, + "LooseAccessListItem": {"file":"crates/core/src/generator/function_def.rs","line":15}, "RevertCliArgs": {"file":"crates/cli/src/default_scenarios/revert.rs","line":8}, "ServerStatus": {"file":"crates/cli/src/server/rpc_server/types.rs","line":33}, "SessionOptions": {"file":"crates/cli/src/server/rpc_server/types.rs","line":234}, @@ -188,9 +189,10 @@ "EthereumOpcode": {"type":"string","enum":["Stop","Add","Mul","Sub","Div","Sdiv","Mod","Smod","Addmod","Mulmod","Exp","Signextend","Lt","Gt","Slt","Sgt","Eq","Iszero","And","Or","Xor","Not","Byte","Shl","Shr","Sar","Sha3","Keccak256","Address","Balance","Origin","Caller","Callvalue","Calldataload","Calldatasize","Calldatacopy","Codesize","Codecopy","Gasprice","Extcodesize","Extcodecopy","Returndatasize","Returndatacopy","Extcodehash","Blockhash","Coinbase","Timestamp","Number","Prevrandao","Gaslimit","Chainid","Selfbalance","Basefee","Pop","Mload","Mstore","Mstore8","Sload","Sstore","Msize","Gas","Log0","Log1","Log2","Log3","Log4","Create","Call","Callcode","Return","Delegatecall","Create2","Staticcall","Revert","Invalid","Selfdestruct"]}, "EthereumPrecompile": {"type":"string","enum":["HashSha256","HashRipemd160","Identity","ModExp","EcAdd","EcMul","EcPairing","Blake2f"]}, "FillBlockCliArgs": {"description":"Taken from the CLI, this is used to fill a block with transactions.","type":"object","properties":{"max_gas_per_block":{"type":"integer"}}}, - "FunctionCallDefinition": {"description":"User-facing definition of a function call to be executed.","type":"object","properties":{"to":{"description":"Address of the contract to call.","type":"string"},"from":{"description":"Address of the tx sender.","type":"string"},"from_pool":{"description":"Get a `from` address from the pool of signers specified here.","type":"string"},"signature":{"description":"Name of the function to call.","type":"string"},"args":{"description":"Parameters to pass to the function.","type":"array","items":{"type":"string"}},"value":{"description":"Value in wei to send with the tx.","type":"string"},"fuzz":{"description":"Parameters to fuzz during the test.","type":"array","items":{"$ref":"#/components/schemas/FuzzParam"}},"kind":{"description":"Optional type of the spam transaction for categorization.","type":"string"},"gas_limit":{"description":"Optional gas limit, which will skip gas estimation. This allows reverting txs to be sent.","type":"integer"},"blob_data":{"description":"Optional blob data; tx type must be set to EIP4844 by spammer","type":"string"},"authorization_address":{"description":"Optional setCode data; tx type must be set to EIP7702 by spammer","type":"string"},"for_all_accounts":{"description":"If true and `from_pool` is set, run this setup transaction for all accounts in the pool. Defaults to false (only runs for the first account).","type":"boolean"},"max_priority_fee_per_gas":{"description":"Optional EIP-1559 priority fee for this tx. Accepts raw wei (`\"10000000000\"`), hex (`\"0x2540be400\"`), or a unit string (`\"10 gwei\"`, `\"0.001 eth\"`). May also be a `{placeholder}` that resolves to one of those forms. If unset, the spammer falls back to its default (`gas_price / 10`). This field is also fuzzable via `FuzzParam::max_priority_fee_per_gas = true`.","type":"string"}},"required":["to","for_all_accounts"]}, + "FunctionCallDefinition": {"description":"User-facing definition of a function call to be executed.","type":"object","properties":{"to":{"description":"Address of the contract to call.","type":"string"},"from":{"description":"Address of the tx sender.","type":"string"},"from_pool":{"description":"Get a `from` address from the pool of signers specified here.","type":"string"},"signature":{"description":"Name of the function to call.","type":"string"},"args":{"description":"Parameters to pass to the function.","type":"array","items":{"type":"string"}},"value":{"description":"Value in wei to send with the tx.","type":"string"},"fuzz":{"description":"Parameters to fuzz during the test.","type":"array","items":{"$ref":"#/components/schemas/FuzzParam"}},"kind":{"description":"Optional type of the spam transaction for categorization.","type":"string"},"gas_limit":{"description":"Optional gas limit, which will skip gas estimation. This allows reverting txs to be sent.","type":"integer"},"blob_data":{"description":"Optional blob data; tx type must be set to EIP4844 by spammer","type":"string"},"authorization_address":{"description":"Optional setCode data; tx type must be set to EIP7702 by spammer","type":"string"},"access_list":{"description":"Optional EIP-2930 access list entries to include in the transaction. Address and storage keys may contain `{placeholder}` references that are resolved when the loose definition is converted to its strict form.","type":"array","items":{"$ref":"#/components/schemas/LooseAccessListItem"}},"for_all_accounts":{"description":"If true and `from_pool` is set, run this setup transaction for all accounts in the pool. Defaults to false (only runs for the first account).","type":"boolean"},"max_priority_fee_per_gas":{"description":"Optional EIP-1559 priority fee for this tx. Accepts raw wei (`\"10000000000\"`), hex (`\"0x2540be400\"`), or a unit string (`\"10 gwei\"`, `\"0.001 eth\"`). May also be a `{placeholder}` that resolves to one of those forms. If unset, the spammer falls back to its default (`gas_price / 10`). This field is also fuzzable via `FuzzParam::max_priority_fee_per_gas = true`.","type":"string"}},"required":["to","for_all_accounts"]}, "FundAccountsParams": {"type":"object","properties":{"sessionId":{"type":"integer"},"agentClass":{"$ref":"#/components/schemas/AgentClass"},"amount":{"type":"string"}},"required":["sessionId","amount"]}, "FuzzParam": {"type":"object","properties":{"param":{"description":"Name of the parameter to fuzz.","type":"string"},"value":{"description":"Fuzz the `value` field of the tx (ETH sent with the tx).","type":"boolean"},"max_priority_fee_per_gas":{"description":"Fuzz the `max_priority_fee_per_gas` field of the tx (EIP-1559 priority fee, in wei).","type":"boolean"},"min":{"description":"Minimum value fuzzer will use. Accepts raw wei (`\"100\"`), hex (`\"0x2540be400\"`), or unit strings (`\"10 gwei\"`, `\"0.001 eth\"`).","type":"string"},"max":{"description":"Maximum value fuzzer will use. Accepts raw wei (`\"100\"`), hex (`\"0x2540be400\"`), or unit strings (`\"10 gwei\"`, `\"0.001 eth\"`).","type":"string"}}}, + "LooseAccessListItem": {"description":"User-facing access list entry. Address and storage keys are kept as strings so they can contain `{placeholder}` references that are resolved when the loose definition is converted to its strict form.","type":"object","properties":{"address":{"description":"Address of the contract. May be a `{placeholder}`.","type":"string"},"storageKeys":{"description":"Storage keys to prewarm. Each may be a `{placeholder}`.","type":"array","items":{"type":"string"}}},"required":["address","storageKeys"]}, "RevertCliArgs": {"type":"object","properties":{"gas_use":{"description":"Amount of gas to use before reverting.","type":"integer"}},"required":["gas_use"]}, "ServerStatus": {"description":"Data returned from the `status` endpoint, containing general info about the server.","type":"object","properties":{"numSessions":{"type":"integer"}},"required":["numSessions"]}, "SessionOptions": {"type":"object","properties":{"auth":{"$ref":"#/components/schemas/AuthParams"},"builder":{"$ref":"#/components/schemas/BuilderParams"},"minBalance":{"type":"string"},"timeoutSecs":{"type":"object","properties":{"secs":{"type":"integer"},"nanos":{"type":"integer"}}},"txType":{"$ref":"#/components/schemas/TxTypeCli"},"privateKeys":{"type":"array","items":{"type":"string"}},"agents":{"$ref":"#/components/schemas/AgentParams"},"env":{"type":"object","additionalProperties":{"type":"string"}}}}, diff --git a/crates/core/src/generator/function_def.rs b/crates/core/src/generator/function_def.rs index 8b566108..76dc06bf 100644 --- a/crates/core/src/generator/function_def.rs +++ b/crates/core/src/generator/function_def.rs @@ -4,9 +4,22 @@ use alloy::{ eips::eip7702::SignedAuthorization, hex::{FromHex, ToHexExt}, primitives::{Address, Bytes, U256}, + rpc::types::AccessListItem, }; use serde::{Deserialize, Serialize}; +/// User-facing access list entry. Address and storage keys are kept as strings +/// so they can contain `{placeholder}` references that are resolved when the +/// loose definition is converted to its strict form. +#[derive(Clone, Deserialize, Debug, Serialize)] +pub struct LooseAccessListItem { + /// Address of the contract. May be a `{placeholder}`. + pub address: String, + /// Storage keys to prewarm. Each may be a `{placeholder}`. + #[serde(rename = "storageKeys")] + pub storage_keys: Vec, +} + /// User-facing definition of a function call to be executed. #[derive(Clone, Deserialize, Debug, Serialize)] pub struct FunctionCallDefinition { @@ -42,6 +55,11 @@ pub struct FunctionCallDefinition { /// Optional setCode data; tx type must be set to EIP7702 by spammer #[serde(skip_serializing_if = "Option::is_none")] pub authorization_address: Option, + /// Optional EIP-2930 access list entries to include in the transaction. + /// Address and storage keys may contain `{placeholder}` references that are + /// resolved when the loose definition is converted to its strict form. + #[serde(skip_serializing_if = "Option::is_none")] + pub access_list: Option>, /// If true and `from_pool` is set, run this setup transaction for all accounts in the pool. /// Defaults to false (only runs for the first account). #[serde(default, skip_serializing_if = "std::ops::Not::not")] @@ -77,6 +95,7 @@ impl FunctionCallDefinition { gas_limit: None, blob_data: None, authorization_address: None, + access_list: None, for_all_accounts: false, max_priority_fee_per_gas: None, } @@ -127,6 +146,10 @@ impl FunctionCallDefinition { self.authorization_address = Some(auth_addr.as_ref().to_owned()); self } + pub fn with_access_list(mut self, access_list: Vec) -> Self { + self.access_list = Some(access_list); + self + } pub fn with_for_all_accounts(mut self, for_all_accounts: bool) -> Self { self.for_all_accounts = for_all_accounts; self @@ -170,6 +193,7 @@ pub struct FunctionCallDefinitionStrict { pub max_priority_fee_per_gas: Option, // may be a placeholder, so we can't use u128 pub sidecar: Option, pub authorization: Option>, + pub access_list: Option>, } #[derive(Clone, Deserialize, Debug, Serialize)] @@ -337,4 +361,51 @@ mod tests { assert!(fuzz[0].param.is_some()); assert_eq!(fuzz[0].max_priority_fee_per_gas, Some(true)); } + + #[test] + fn access_list_parses_from_toml() { + let toml = r#" + to = "0x1234567890123456789012345678901234567890" + from_pool = "test_pool" + signature = "test()" + + [[access_list]] + address = "0x4200000000000000000000000000000000000022" + storageKeys = [ + "0x0100000000000000000000000000000000000000000000000000000000000000", + "0x0300000000000000000000000000000000000000000000000000000000000000", + ] + "#; + let def: FunctionCallDefinition = toml::from_str(toml).unwrap(); + let access_list = def.access_list.unwrap(); + + assert_eq!(access_list.len(), 1); + assert_eq!( + access_list[0].address, + "0x4200000000000000000000000000000000000022" + ); + assert_eq!(access_list[0].storage_keys.len(), 2); + } + + #[test] + fn access_list_parses_placeholders_from_toml() { + let toml = r#" + to = "0x1234567890123456789012345678901234567890" + from_pool = "test_pool" + signature = "test()" + + [[access_list]] + address = "{SpamMe5}" + storageKeys = ["{testkey1}", "{testkey2}"] + "#; + let def: FunctionCallDefinition = toml::from_str(toml).unwrap(); + let access_list = def.access_list.unwrap(); + + assert_eq!(access_list.len(), 1); + assert_eq!(access_list[0].address, "{SpamMe5}"); + assert_eq!( + access_list[0].storage_keys, + vec!["{testkey1}".to_string(), "{testkey2}".to_string()] + ); + } } diff --git a/crates/core/src/generator/templater.rs b/crates/core/src/generator/templater.rs index 85be155b..15ed2767 100644 --- a/crates/core/src/generator/templater.rs +++ b/crates/core/src/generator/templater.rs @@ -10,7 +10,7 @@ use crate::{ use alloy::{ hex::{FromHex, ToHexExt}, primitives::{Address, Bytes, FixedBytes, TxKind, U256}, - rpc::types::TransactionRequest, + rpc::types::{AccessList, TransactionRequest}, }; use std::collections::HashMap; use thiserror::Error; @@ -108,7 +108,7 @@ where /// Finds {placeholders} in `fncall` and looks them up in `db`, /// then inserts the values it finds into `placeholder_map`. - /// NOTE: only finds placeholders in `args`, `authorization_addr`, and `to` fields. + /// NOTE: scans `args`, `from`, `to`, `authorization_address`, and `access_list` fields. fn find_fncall_placeholders( &self, fncall: &FunctionCallDefinition, @@ -158,6 +158,28 @@ where scenario_label, )?; } + if let Some(access_list) = &fncall.access_list { + for item in access_list.iter() { + self.find_placeholder_values( + &item.address, + placeholder_map, + db, + rpc_url, + genesis_hash, + scenario_label, + )?; + for key in &item.storage_keys { + self.find_placeholder_values( + key, + placeholder_map, + db, + rpc_url, + genesis_hash, + scenario_label, + )?; + } + } + } Ok(()) } @@ -252,6 +274,11 @@ where }) .transpose()?; + let access_list = funcdef + .access_list + .as_ref() + .map(|items| AccessList::from(items.to_owned())); + Ok(TransactionRequest { to: Some(TxKind::Call(to)), input: alloy::rpc::types::TransactionInput::both(input.into()), @@ -261,6 +288,7 @@ where max_priority_fee_per_gas, sidecar: funcdef.sidecar.as_ref().map(|sc| sc.to_owned().into()), authorization_list: funcdef.authorization.to_owned(), + access_list, ..Default::default() }) } @@ -374,6 +402,7 @@ mod tests { max_priority_fee_per_gas: priority_fee.map(|s| s.to_owned()), sidecar: None, authorization: None, + access_list: None, } } @@ -428,4 +457,56 @@ mod tests { ); } } + + #[test] + fn template_function_call_threads_access_list_into_request() { + use alloy::primitives::B256; + use alloy::rpc::types::AccessListItem; + let templater = CurlyBraceTemplater; + let access_list_address = "0x4200000000000000000000000000000000000022"; + let storage_key = "0x0100000000000000000000000000000000000000000000000000000000000000"; + let second_storage_key = + "0x0300000000000000000000000000000000000000000000000000000000000000"; + let placeholder_map = HashMap::new(); + let funcdef = FunctionCallDefinitionStrict { + to: access_list_address.to_string(), + from: Address::ZERO, + signature: "validate()".to_string(), + args: vec![], + value: None, + fuzz: vec![], + kind: None, + gas_limit: Some(200_000), + max_priority_fee_per_gas: None, + sidecar: None, + authorization: None, + access_list: Some(vec![AccessListItem { + address: access_list_address.parse::
().unwrap(), + storage_keys: vec![ + storage_key.parse::().unwrap(), + second_storage_key.parse::().unwrap(), + ], + }]), + }; + + let tx = templater + .template_function_call(&funcdef, &placeholder_map) + .unwrap(); + let access_list = tx.access_list.as_ref().unwrap(); + + assert_eq!(access_list.len(), 1); + assert_eq!( + access_list[0].address, + access_list_address.parse::
().unwrap() + ); + assert_eq!(access_list[0].storage_keys.len(), 2); + assert_eq!( + access_list[0].storage_keys[0], + storage_key.parse::().unwrap() + ); + assert_eq!( + access_list[0].storage_keys[1], + second_storage_key.parse::().unwrap() + ); + } } diff --git a/crates/core/src/generator/trait.rs b/crates/core/src/generator/trait.rs index 683b872b..ee717c31 100644 --- a/crates/core/src/generator/trait.rs +++ b/crates/core/src/generator/trait.rs @@ -17,9 +17,9 @@ use crate::{ use alloy::{ eips::eip7702::SignedAuthorization, hex::ToHexExt, - primitives::{keccak256, Address, FixedBytes, U256}, + primitives::{keccak256, Address, FixedBytes, B256, U256}, providers::Provider, - rpc::types::Authorization, + rpc::types::{AccessListItem, Authorization}, signers::{local::PrivateKeySigner, SignerSync}, }; use async_trait::async_trait; @@ -316,6 +316,43 @@ where None }; + // Resolve {placeholders} in the optional access list, then parse each + // entry into alloy's strict `AccessListItem` type. + let access_list = if let Some(loose_list) = &funcdef.access_list { + let mut placeholder_map = HashMap::::new(); + let templater = self.get_templater(); + templater.find_fncall_placeholders( + funcdef, + self.get_db(), + &mut placeholder_map, + &self.get_rpc_url(), + self.get_genesis_hash(), + self.get_scenario_label(), + )?; + let mut items = Vec::with_capacity(loose_list.len()); + for item in loose_list.iter() { + let resolved_addr = templater.replace_placeholders(&item.address, &placeholder_map); + let address = resolved_addr + .parse::
() + .map_err(|_| GeneratorError::address_not_found(&item.address))?; + let mut storage_keys = Vec::with_capacity(item.storage_keys.len()); + for key in &item.storage_keys { + let resolved_key = templater.replace_placeholders(key, &placeholder_map); + let parsed = resolved_key + .parse::() + .map_err(|_| GeneratorError::address_not_found(key))?; + storage_keys.push(parsed); + } + items.push(AccessListItem { + address, + storage_keys, + }); + } + Some(items) + } else { + None + }; + Ok(FunctionCallDefinitionStrict { to: to_address, from: from_address, @@ -328,6 +365,7 @@ where max_priority_fee_per_gas: funcdef.max_priority_fee_per_gas.to_owned(), sidecar: funcdef.sidecar_data()?, authorization: signed_auth.map(|a| vec![a]), + access_list, }) } diff --git a/crates/testfile/src/lib.rs b/crates/testfile/src/lib.rs index b39dbb20..854d7adb 100644 --- a/crates/testfile/src/lib.rs +++ b/crates/testfile/src/lib.rs @@ -215,6 +215,75 @@ pub mod tests { } } + #[test] + fn parses_spam_tx_access_list_toml() { + let test_file = TestConfig::from_str( + r#" + [[spam]] + [spam.tx] + to = "0x4200000000000000000000000000000000000022" + from_pool = "spammers" + signature = "validate()" + gas_limit = 200000 + + [[spam.tx.access_list]] + address = "0x4200000000000000000000000000000000000022" + storageKeys = [ + "0x0100000000000000000000000000000000000000000000000000000000000000", + "0x0300000000000000000000000000000000000000000000000000000000000000", + ] + "#, + ) + .unwrap(); + let spam = test_file.spam.unwrap(); + + match &spam[0] { + SpamRequest::Tx(fncall) => { + let access_list = fncall.access_list.as_ref().unwrap(); + assert_eq!(access_list.len(), 1); + assert_eq!( + access_list[0].address, + "0x4200000000000000000000000000000000000022" + ); + assert_eq!(access_list[0].storage_keys.len(), 2); + } + SpamRequest::Bundle(_) => panic!("expected SpamRequest::Tx"), + } + } + + #[test] + fn parses_spam_tx_access_list_with_placeholders_toml() { + let test_file = TestConfig::from_str( + r#" + [[spam]] + [spam.tx] + to = "0x4200000000000000000000000000000000000022" + from_pool = "spammers" + signature = "validate()" + gas_limit = 200000 + + [[spam.tx.access_list]] + address = "{SpamMe5}" + storageKeys = ["{testkey1}", "{testkey2}"] + "#, + ) + .unwrap(); + let spam = test_file.spam.unwrap(); + + match &spam[0] { + SpamRequest::Tx(fncall) => { + let access_list = fncall.access_list.as_ref().unwrap(); + assert_eq!(access_list.len(), 1); + assert_eq!(access_list[0].address, "{SpamMe5}"); + assert_eq!( + access_list[0].storage_keys, + vec!["{testkey1}".to_string(), "{testkey2}".to_string()] + ); + } + SpamRequest::Bundle(_) => panic!("expected SpamRequest::Tx"), + } + } + fn repo_root_path() -> std::path::PathBuf { let mut dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); dir.pop(); // crates diff --git a/docs/creating_scenarios.md b/docs/creating_scenarios.md index 010d61f4..97a8528b 100644 --- a/docs/creating_scenarios.md +++ b/docs/creating_scenarios.md @@ -207,6 +207,30 @@ args = ["1350000"] gas_limit = 1350000 ``` +### access lists + +Spam transactions can include EIP-2930 access-list entries. This is useful for workloads that already know which account and storage keys need to be warm, while still sending EIP-1559 transactions by default. + +```toml +[[spam]] + +[spam.tx] +to = "0x1111111111111111111111111111111111111111" +from_pool = "bluepool" +signature = "touch(bytes32 lookupKey)" +args = [ + "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", +] +gas_limit = 200000 + +[[spam.tx.access_list]] +address = "0x1111111111111111111111111111111111111111" +storageKeys = [ + "0x0100000000000000000000000000000000000000000000000000000000000000", + "0x0300000000000000000000000000000000000000000000000000000000000000", +] +``` + ### sending bundles The `[spam.tx]` directive sends a mempool transaction using `eth_sendRawTransaction`, but Contender also supports bundles. From 3c13137cc384c35e668f5784907d2020c52d28a5 Mon Sep 17 00:00:00 2001 From: Jacob Elias Date: Fri, 29 May 2026 17:10:55 -0400 Subject: [PATCH 2/2] feat: add spam-stream subcommand for streaming tx specs (draft) Reads newline-delimited JSON FunctionCallDefinitions from stdin or a file and spams them via the existing TestScenario pipeline. Reuses agent pools, rate limiting, nonce management, and receipt tracking. See docs/stream-mode.md for the design note and scope. --- .../cli/src/commands/contender_subcommand.rs | 11 + crates/cli/src/commands/error.rs | 3 + crates/cli/src/commands/mod.rs | 1 + crates/cli/src/commands/spam_stream.rs | 544 ++++++++++++++++++ crates/cli/src/main.rs | 4 + docs/stream-mode.md | 187 ++++++ 6 files changed, 750 insertions(+) create mode 100644 crates/cli/src/commands/spam_stream.rs create mode 100644 docs/stream-mode.md diff --git a/crates/cli/src/commands/contender_subcommand.rs b/crates/cli/src/commands/contender_subcommand.rs index 92f7e25d..ec886b46 100644 --- a/crates/cli/src/commands/contender_subcommand.rs +++ b/crates/cli/src/commands/contender_subcommand.rs @@ -9,6 +9,7 @@ use crate::default_scenarios::BuiltinScenarioCli; use super::admin::AdminCommand; use super::spam::SpamCliArgs; +use super::spam_stream::SpamStreamCliArgs; use super::ReportFormat; #[derive(Debug, Subcommand)] @@ -34,6 +35,16 @@ pub enum ContenderSubcommand { builtin_scenario_config: Option, }, + #[command( + name = "spam-stream", + long_about = "Read newline-delimited JSON tx specs from stdin or a file and spam them. \ + Each line is a FunctionCallDefinition (same fields as scenario TOML `[[spam.tx]]`)." + )] + SpamStream { + #[command(flatten)] + args: Box, + }, + #[command( name = "setup", long_about = "Deploy contracts and execute one-time setup txs." diff --git a/crates/cli/src/commands/error.rs b/crates/cli/src/commands/error.rs index 9ab196f0..766febf6 100644 --- a/crates/cli/src/commands/error.rs +++ b/crates/cli/src/commands/error.rs @@ -78,6 +78,9 @@ pub enum ArgsError { #[error("failed to parse url")] UrlParse(#[from] url::ParseError), + + #[error("{0}")] + Custom(String), } #[derive(Debug, Error)] diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index 6b834134..d3e6c834 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -8,6 +8,7 @@ pub mod replay; pub mod rpc; mod setup; mod spam; +pub mod spam_stream; use clap::{Parser, ValueEnum}; use contender_core::db::DbOps; diff --git a/crates/cli/src/commands/spam_stream.rs b/crates/cli/src/commands/spam_stream.rs new file mode 100644 index 00000000..8e1c3a8d --- /dev/null +++ b/crates/cli/src/commands/spam_stream.rs @@ -0,0 +1,544 @@ +//! `spam-stream` subcommand: read newline-delimited JSON tx specs from stdin +//! or a file, and spam them through the contender pipeline. +//! +//! This is the entry point for the "stream mode" prototype. See +//! `docs/stream-mode.md` for the design note. + +use crate::{ + commands::{ + common::{HELP_HEADING_COMMON, HELP_HEADING_PAYLOAD, HELP_HEADING_RUNTIME}, + error::ArgsError, + Result, + }, + error::CliError, + util::fund_accounts, + LATENCY_HIST as HIST, PROM, +}; +use alloy::{ + consensus::TxType, + network::{AnyTxEnvelope, Ethereum, NetworkTransactionBuilder}, + primitives::{utils::format_ether, U256}, + providers::Provider, + transports::http::reqwest::Url, +}; +use clap::Args; +use contender_core::{ + generator::{ + agent_pools::AgentSpec, seeder::rand_seed::SeedGenerator, templater::Templater, + types::SpamRequest, FunctionCallDefinition, Generator, PlanConfig, RandSeed, + }, + spammer::tx_actor::{ActorContext, CacheTx}, + test_scenario::{TestScenario, TestScenarioParams}, + BundleType, +}; +use contender_sqlite::SqliteDb; +use contender_testfile::TestConfig; +use std::{ + path::PathBuf, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::mpsc, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; + +/// Default agent-pool name when the user does not specify one. +const DEFAULT_POOL: &str = "executors"; +/// Channel buffer between the reader task and the spam loop. +const STREAM_BUFFER: usize = 256; + +#[derive(Clone, Debug, Args)] +pub struct SpamStreamCliArgs { + /// RPC URL to send transactions to. + #[arg( + env = "RPC_URL", + short = 'r', + long, + default_value = "http://localhost:8545", + help_heading = HELP_HEADING_COMMON, + )] + pub rpc_url: Url, + + /// Funder private key. Used to fund the pool of executor accounts before spam begins. + #[arg( + env = "CONTENDER_PRIVATE_KEY", + short = 'p', + long = "priv-key", + help_heading = HELP_HEADING_COMMON, + )] + pub private_key: Option, + + /// Source of the JSON-lines stream: either `stdin` or a file path. + #[arg( + long = "from", + default_value = "stdin", + help_heading = HELP_HEADING_COMMON, + )] + pub from: String, + + /// Pool name used to source signers for each tx. Stream specs that omit + /// `from`/`from_pool` will default to this pool. + #[arg( + long = "from-pool", + default_value = DEFAULT_POOL, + help_heading = HELP_HEADING_PAYLOAD, + )] + pub from_pool: String, + + /// Number of accounts to generate in the pool. + #[arg( + long = "pool-size", + default_value_t = 10, + help_heading = HELP_HEADING_PAYLOAD, + )] + pub pool_size: usize, + + /// Target transactions per second. `0` means "send as fast as the stream + /// can be parsed". + #[arg( + long, + default_value_t = 0, + help_heading = HELP_HEADING_RUNTIME, + )] + pub tps: u64, + + /// Minimum balance to keep in each pool account, in wei. + #[arg( + long, + default_value_t = U256::from(10_000_000_000_000_000u128), + help_heading = HELP_HEADING_RUNTIME, + )] + pub min_balance: U256, + + /// Seed for deterministic pool-signer generation. + #[arg( + env = "CONTENDER_SEED", + long, + help_heading = HELP_HEADING_RUNTIME, + )] + pub seed: Option, + + /// Skip funding the executor accounts. Useful when the pool was funded + /// out-of-band or every spec carries a `from` address with an existing balance. + #[arg(long, default_value_t = false, help_heading = HELP_HEADING_RUNTIME)] + pub skip_funding: bool, +} + +/// Asynchronously reads JSON lines from `from` (stdin or file path), parses each +/// into a `FunctionCallDefinition`, and forwards it to `tx`. The task exits when +/// EOF is reached or the receiver drops. +pub fn spawn_stream_reader( + from: &str, + tx: mpsc::Sender, +) -> Result> { + let handle: tokio::task::JoinHandle<()> = if from == "stdin" { + tokio::spawn(async move { + let reader = BufReader::new(tokio::io::stdin()); + forward_lines(reader, tx).await; + }) + } else { + let path = PathBuf::from(from); + if !path.exists() { + return Err(CliError::Args(ArgsError::Custom(format!( + "stream source file not found: {}", + path.display() + )))); + } + tokio::spawn(async move { + match tokio::fs::File::open(&path).await { + Ok(f) => { + let reader = BufReader::new(f); + forward_lines(reader, tx).await; + } + Err(e) => warn!("failed to open stream source {}: {e}", path.display()), + } + }) + }; + Ok(handle) +} + +async fn forward_lines(reader: R, tx: mpsc::Sender) +where + R: tokio::io::AsyncBufRead + Unpin, +{ + let mut lines = reader.lines(); + let mut line_no: u64 = 0; + loop { + match lines.next_line().await { + Ok(Some(line)) => { + line_no += 1; + let trimmed = line.trim(); + if trimmed.is_empty() || trimmed.starts_with('#') { + continue; + } + match serde_json::from_str::(trimmed) { + Ok(spec) => { + if tx.send(spec).await.is_err() { + // receiver dropped — stop reading + return; + } + } + Err(e) => warn!("stream: skipping malformed line {line_no}: {e}"), + } + } + Ok(None) => return, // EOF + Err(e) => { + warn!("stream: read error: {e}"); + return; + } + } + } +} + +/// Build a one-step `TestConfig` that references `from_pool`, so the scenario +/// builds an agent store containing that pool with `pool_size` signers. +/// The decoy entry is never executed; we bypass `load_txs` entirely. +fn build_decoy_config(from_pool: &str) -> TestConfig { + let decoy = FunctionCallDefinition::new("0x0000000000000000000000000000000000000000") + .with_from_pool(from_pool); + TestConfig { + env: None, + create: None, + setup: None, + spam: Some(vec![SpamRequest::Tx(Box::new(decoy))]), + } +} + +/// Drive the stream loop: pull specs, build/sign/send txs, cache in the +/// tx_actor for receipt tracking. Returns when the stream channel closes. +async fn drive_stream( + scenario: &mut TestScenario, + mut rx: mpsc::Receiver, + run_id: u64, + fallback_pool: String, + tps: u64, + cancel: CancellationToken, +) -> Result +where + S: SeedGenerator + Send + Sync + Clone, + P: PlanConfig + Templater + Send + Sync + Clone, +{ + // Rate limiter: only ticks when tps > 0. + let mut ticker = if tps > 0 { + let period = Duration::from_secs_f64(1.0 / tps as f64); + let mut int = tokio::time::interval(period); + int.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + Some(int) + } else { + None + }; + + let mut sent: usize = 0; + let mut idx: usize = 0; + let placeholder_map = std::collections::HashMap::::new(); + + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!("stream cancelled; flushing."); + break; + } + spec = rx.recv() => { + let Some(mut spec) = spec else { + debug!("stream EOF received"); + break; + }; + + // Apply default pool if the spec doesn't pick one. + if spec.from.is_none() && spec.from_pool.is_none() { + spec.from_pool = Some(fallback_pool.clone()); + } + + // Rate limit (only when --tps > 0). + if let Some(int) = ticker.as_mut() { + int.tick().await; + } + + match send_one(scenario, &spec, idx, &placeholder_map, run_id).await { + Ok(()) => { + sent += 1; + } + Err(e) => warn!("stream: failed to send tx (idx {idx}): {e}"), + } + idx += 1; + } + } + } + + Ok(sent) +} + +/// Build a single transaction from a stream spec, sign it, send it, and cache +/// it in the tx_actor for receipt tracking. +async fn send_one( + scenario: &mut TestScenario, + spec: &FunctionCallDefinition, + idx: usize, + placeholder_map: &std::collections::HashMap, + _run_id: u64, +) -> Result<()> +where + S: SeedGenerator + Send + Sync + Clone, + P: PlanConfig + Templater + Send + Sync + Clone, +{ + // 1. Resolve `from`/`from_pool` and access list against the scenario's + // agent store + templater. This produces a strict FunctionCallDefinition. + let strict = scenario + .make_strict_call(spec, idx) + .map_err(contender_core::Error::Generator)?; + + // 2. Render the strict definition into a TransactionRequest (encodes + // calldata, threads access_list, sets value/gas_limit). + let tx_req = scenario + .get_templater() + .template_function_call(&strict, placeholder_map) + .map_err(contender_core::Error::Templater)?; + + // 3. Fetch a gas price and assign nonce/gas-limit + sign. + let gas_price = scenario.rpc_client.get_gas_price().await?; + let (prepared, wallet) = scenario.prepare_tx_request(&tx_req, gas_price, 0).await?; + // Build & sign via the alloy Ethereum network. The op-alloy-network re-export + // can create trait-resolution ambiguity, so we fully-qualify the trait + // method and convert the error string-ly instead of relying on From. + let envelope = + >::build( + prepared, &wallet, + ) + .await + .map_err(|e| CliError::Args(ArgsError::Custom(format!("build envelope: {e}"))))?; + let tx_hash = envelope.tx_hash().to_owned(); + + // 4. Send via the same txs_client the regular spammer uses. + let any_envelope = AnyTxEnvelope::Ethereum(envelope); + let start_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let res = scenario.txs_client.send_tx_envelope(any_envelope).await; + let error = match res { + Ok(_) => { + info!("stream tx[{idx}]: {tx_hash} sent"); + None + } + Err(e) => { + let msg = e + .as_error_resp() + .map(|err| err.message.to_string()) + .unwrap_or_else(|| format!("{e}")); + warn!("stream tx[{idx}]: {tx_hash} failed: {msg}"); + Some(msg) + } + }; + + // 5. Cache in the tx_actor so its flush loop polls for the receipt. + scenario + .tx_actor() + .cache_run_tx(CacheTx { + tx_hash, + start_timestamp_ms: start_ms, + end_timestamp_ms: None, + kind: spec.kind.clone(), + error, + }) + .await?; + + Ok(()) +} + +/// Top-level entry point invoked from `main.rs`. +pub async fn spam_stream(db: &SqliteDb, args: SpamStreamCliArgs) -> Result<()> { + let seed = if let Some(s) = &args.seed { + RandSeed::seed_from_str(s) + } else { + RandSeed::new() + }; + + // Build a decoy TestConfig + agent store so the scenario sets up the + // requested pool with the requested number of signers. + let config = build_decoy_config(&args.from_pool); + let agent_spec = AgentSpec::default() + .create_accounts(0) + .setup_accounts(0) + .spam_accounts(args.pool_size); + + let funder = if let Some(key) = &args.private_key { + let s = key.trim().trim_start_matches("0x"); + Some( + alloy::signers::local::PrivateKeySigner::from_slice( + &alloy::hex::decode(s) + .map_err(|e| CliError::Args(ArgsError::Custom(format!("bad priv-key: {e}"))))?, + ) + .map_err(|e| CliError::Args(ArgsError::Custom(format!("bad priv-key: {e}"))))?, + ) + } else { + None + }; + // TestScenario needs at least one user signer for signer_map. Use a + // throwaway signer if none was provided (we only sign with pool accounts). + let user_signers = if let Some(s) = &funder { + vec![s.clone()] + } else { + vec![alloy::signers::local::PrivateKeySigner::random()] + }; + + let cancel = CancellationToken::new(); + let params = TestScenarioParams { + rpc_url: args.rpc_url.clone(), + builder_rpc_url: None, + txs_rpc_url: None, + signers: user_signers, + agent_spec, + tx_type: TxType::Eip1559, + bundle_type: BundleType::default(), + pending_tx_timeout: Duration::from_secs(60), + extra_msg_handles: None, + sync_nonces_after_batch: false, + rpc_batch_size: 0, + gas_price: None, + scenario_label: Some(format!("stream-{}", args.from_pool)), + send_raw_tx_sync: false, + flashblocks_ws_url: None, + }; + + let mut scenario: TestScenario = TestScenario::new( + config, + Arc::new(db.clone()), + seed, + params, + None, + (&PROM, &HIST).into(), + &cancel, + ) + .await?; + + // Fund the pool from the funder key if provided. + if !args.skip_funding { + if let Some(funder) = &funder { + let addrs = scenario.agent_store.all_signer_addresses(); + if !addrs.is_empty() { + info!( + "funding {} pool account(s) to {} ETH min from {}", + addrs.len(), + format_ether(args.min_balance), + funder.address() + ); + fund_accounts( + &addrs, + funder, + &scenario.rpc_client, + args.min_balance, + TxType::Legacy, + &Default::default(), + ) + .await?; + // Re-sync nonces for the freshly-funded accounts so the first + // sent tx uses the correct nonce. + scenario.sync_nonces().await?; + } + } else { + warn!("no funder key supplied; pool accounts must already be funded"); + } + } + + // Set up tx_actor context so cached txs flush to the DB. + let start_block = scenario.rpc_client.get_block_number().await?; + let run_id = 0u64; + let actor_ctx = + ActorContext::new(start_block, run_id).with_pending_tx_timeout(Duration::from_secs(60)); + scenario.tx_actor().init_ctx(actor_ctx).await?; + + // Spawn the reader and run the drive loop. + let (tx, rx) = mpsc::channel::(STREAM_BUFFER); + let _reader = spawn_stream_reader(&args.from, tx)?; + + let drive_cancel = cancel.clone(); + let sent = tokio::select! { + res = drive_stream(&mut scenario, rx, run_id, args.from_pool.clone(), args.tps, drive_cancel) => { + res? + } + _ = tokio::signal::ctrl_c() => { + info!("CTRL-C: stopping stream loop"); + cancel.cancel(); + 0 + } + }; + + info!("stream complete: {sent} tx(s) sent; draining pending receipts..."); + + tokio::select! { + _ = scenario.dump_tx_cache(run_id) => {} + _ = tokio::signal::ctrl_c() => { + info!("CTRL-C during drain; exiting"); + } + } + scenario.shutdown().await; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_minimal_spec() { + let line = + r#"{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit":21000}"#; + let spec: FunctionCallDefinition = serde_json::from_str(line).unwrap(); + assert_eq!(spec.to, "0xdeAD000000000000000000000000000000000000"); + assert_eq!(spec.value.as_deref(), Some("1")); + assert_eq!(spec.gas_limit, Some(21000)); + assert!(spec.from.is_none() && spec.from_pool.is_none()); + } + + #[test] + fn parses_spec_with_access_list_and_signature() { + let line = r#"{ + "to": "0x4200000000000000000000000000000000000022", + "signature": "validateMessage(bytes32)", + "args": ["0x0102030405060708091011121314151617181920212223242526272829303132"], + "access_list": [{ + "address": "0x4200000000000000000000000000000000000022", + "storageKeys": ["0x0100000000000000000000000000000000000000000000000000000000000000"] + }], + "gas_limit": 200000 + }"#; + let spec: FunctionCallDefinition = serde_json::from_str(line).unwrap(); + assert_eq!(spec.signature.as_deref(), Some("validateMessage(bytes32)")); + assert_eq!(spec.gas_limit, Some(200000)); + let al = spec.access_list.unwrap(); + assert_eq!(al.len(), 1); + assert_eq!(al[0].storage_keys.len(), 1); + } + + #[tokio::test] + async fn forward_lines_skips_blank_and_comments_and_emits_specs() { + let (tx, mut rx) = mpsc::channel::(8); + let input = b"\n# this is a comment\n{\"to\":\"0xdeAD000000000000000000000000000000000000\",\"value\":\"1\"}\n\n{\"to\":\"0xdeAD000000000000000000000000000000000001\",\"value\":\"2\"}\n"; + let reader = BufReader::new(&input[..]); + forward_lines(reader, tx).await; + let mut received = vec![]; + while let Ok(spec) = rx.try_recv() { + received.push(spec); + } + assert_eq!(received.len(), 2); + assert_eq!(received[0].value.as_deref(), Some("1")); + assert_eq!(received[1].value.as_deref(), Some("2")); + } + + #[tokio::test] + async fn forward_lines_skips_malformed_lines() { + let (tx, mut rx) = mpsc::channel::(8); + let input = b"not json at all\n{\"to\":\"0xdeAD000000000000000000000000000000000000\"}\n"; + let reader = BufReader::new(&input[..]); + forward_lines(reader, tx).await; + let mut received = vec![]; + while let Ok(spec) = rx.try_recv() { + received.push(spec); + } + assert_eq!(received.len(), 1); + } +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 4e0af9a5..f98b519f 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -140,6 +140,10 @@ async fn run() -> Result<(), contender_cli::Error> { commands::spam(&db, &spam_args, SpamCampaignContext::default()).await?; } + ContenderSubcommand::SpamStream { args } => { + contender_cli::commands::spam_stream::spam_stream(&db, *args).await?; + } + ContenderSubcommand::Server => { contender_cli::server::run() .await diff --git a/docs/stream-mode.md b/docs/stream-mode.md new file mode 100644 index 00000000..9186284e --- /dev/null +++ b/docs/stream-mode.md @@ -0,0 +1,187 @@ +# Stream Mode (`spam-stream`) + +> **Status:** prototype / draft. This document accompanies the +> initial implementation of `spam-stream`. Expect the CLI surface and the +> stream format to evolve based on review feedback. + +## Motivation + +Contender is a TPS spammer driven by static scenario TOML files. It cycles +through `[[spam]]` entries, fuzzes args, and sends at a configured rate. + +Some use cases need to feed *dynamically discovered* tx specs into the spam +loop. For example, an interop relayer reads message-emitted events on chain A +and needs to execute the corresponding `validateMessage(...)` call on chain B +— with the right access list, calldata, and target. Today these workflows +end up reimplementing rate limiting, signer pools, nonce management, and +receipt tracking outside contender. + +Stream mode lets contender act as the "sender" half of those workflows: any +upstream process can pipe JSON tx specs into contender and reuse the existing +agent pools, rate limiter, `tx_actor` receipt tracking, gas-price caching, +and Prometheus latency metrics. + +## CLI + +```bash +contender spam-stream \ + -r https://chain-b \ + -p $FUNDING_KEY \ + --from \ + --from-pool executors --pool-size 10 \ + --tps 5 +``` + +Key flags: + +| Flag | Default | Meaning | +|------|---------|---------| +| `-r, --rpc-url` | `http://localhost:8545` | Target RPC. | +| `-p, --priv-key` | none | Funder key (funds the pool before spam starts). | +| `--from` | `stdin` | `stdin` or a file path. | +| `--from-pool` | `executors` | Pool name. Specs that omit `from`/`from_pool` use this pool. | +| `--pool-size` | `10` | Accounts generated in the pool. | +| `--tps` | `0` | `0` = consume as fast as channel emits. | +| `--min-balance` | `0.01 ETH` (wei) | Min pool-account balance during funding. | +| `--skip-funding` | `false` | Skip pre-spam funding. | +| `--seed` | random | Deterministic pool generation. | + +## Stream format + +Newline-delimited JSON, one [`FunctionCallDefinition`](../crates/core/src/generator/function_def.rs) +per line. Same field names as scenario TOML. + +Minimal: + +```json +{"to":"0xdeAD000000000000000000000000000000000000","value":"1 wei","gas_limit":21000} +``` + +Full (interop-style): + +```json +{ + "to": "0x4200000000000000000000000000000000000022", + "signature": "validateMessage(bytes32)", + "args": ["0x0102030405060708091011121314151617181920212223242526272829303132"], + "access_list": [ + { + "address": "0x4200000000000000000000000000000000000022", + "storageKeys": ["0x0100000000000000000000000000000000000000000000000000000000000000"] + } + ], + "gas_limit": 200000 +} +``` + +Empty lines and lines beginning with `#` are ignored. Malformed JSON lines +log a warning and the loop continues. + +## Architecture + +Stream mode does **not** introduce a new spammer trait or a new tx pipeline. +It reuses the existing `TestScenario` machinery and wires a JSON-line reader +to it through an mpsc channel. + +``` +stdin/file -> reader task -> mpsc + | + v + drive_stream loop + | + v + for each spec: + scenario.make_strict_call (Generator trait, resolves from_pool + access_list) + scenario.config.template_function_call (Templater, builds TransactionRequest) + scenario.prepare_tx_request (assigns nonce, gas limit, signs key from pool) + scenario.txs_client.send_tx_envelope + scenario.tx_actor().cache_run_tx (queues for receipt polling) +``` + +Code map: + +- [`crates/cli/src/commands/spam_stream.rs`](../crates/cli/src/commands/spam_stream.rs) + is the new subcommand. All logic lives there. +- A "decoy" one-step `TestConfig` is constructed so the existing + `AgentPools::build_agent_store` produces a pool with the requested name and + size. The decoy spam step is never executed; we bypass `load_txs` entirely. +- The reader task is a small wrapper around `tokio::io::BufReader::lines()` + that forwards parsed specs over an mpsc channel and exits on EOF. +- The drive loop honors `--tps` via `tokio::time::interval`. + +## Reuse vs. new code + +| Existing piece | Reused as-is | +|----------------|--------------| +| `TestScenario` constructor (signer map, nonce sync, txs_client) | yes | +| `Generator::make_strict_call` (resolves `from_pool`, access list, EIP-7702) | yes | +| `Templater::template_function_call` (calldata encoding, access list threading) | yes | +| `TestScenario::prepare_tx_request` (nonce, gas limit, complete_tx_request) | yes | +| Pool generation via `AgentPools::build_agent_store` | yes | +| `TxActorHandle::cache_run_tx` + flush loop (DB writes, receipt polling) | yes | +| `fund_accounts` helper | yes | +| Prometheus latency histograms via Tower middleware | yes (inherited from `TestScenario`) | +| The `Spammer` trait + `TimedSpammer`/`BlockwiseSpammer` | **not** reused | + +The reason we skip `TimedSpammer` is that its `on_spam` loop drives ticks +from a pre-loaded `Vec>` returned by +`get_spam_tx_chunks`. Stream mode wants a stream-shaped tick: pull one spec, +send one tx. Bolting the channel into `TimedSpammer` would require a generic +`SpamSource` abstraction across the existing spammers. Out of scope for the +prototype; a candidate for a follow-up if the prototype lands. + +## Scope of the prototype + +In scope: + +- `--from stdin|FILE`, `--from-pool`, `--pool-size`, `--tps`, `--priv-key`, `--rpc-url`, `--seed`, `--min-balance`, `--skip-funding`. +- Same `FunctionCallDefinition` schema as scenario TOML (incl. `access_list`, + `gas_limit`, `signature`, `args`, `value`, `from`, `from_pool`, `kind`). +- Pool funding from `--priv-key` before spam begins. +- Receipt tracking + DB persistence via the existing tx_actor flush loop. +- Graceful CTRL-C and stream EOF handling (drain pending receipts before exit). + +Out of scope (future work): + +- Bundle support (`[[spam.bundle]]` analogue in the stream). +- Blob transactions (EIP-4844) and authorization transactions (EIP-7702) — + the `FunctionCallDefinition` fields are deserialized but not exercised in + prototype tests. +- Fuzzing in stream mode — fuzz happens upstream of the stream. +- Gas-bump / nonce-shift retry logic from the regular spammer's + `handle_tx_outcome`. The stream loop currently logs send errors and moves + on; the upstream is expected to resubmit if it cares. +- `--rpc-batch-size`, `--send-raw-tx-sync` integration. +- Recording the run in the `spam_runs` table — stream runs use `run_id = 0` + and rely on the tx_actor's cache only. +- A generic `SpamSource` trait so `TimedSpammer` can consume a stream too. + +## Validation + +Unit tests live alongside the implementation: + +``` +cargo test -p contender_cli spam_stream +``` + +Smoke test: + +```bash +echo '{"to":"0xdeAD000000000000000000000000000000000000","value":"1","gas_limit":21000}' | \ + contender spam-stream -r $RPC -p $FUNDING_KEY --from stdin --tps 1 +``` + +The tx should land on the target chain; the funder needs at least enough ETH +to fund the executor pool. + +## Open questions + +1. Should stream mode get its own `Spammer` impl in `contender_core` so + campaigns can reuse it? Today the prototype lives entirely in `cli/`. +2. Is the JSON spec the right shape, or should we standardize on something + like a tagged envelope (`{"v":1,"tx":{...}}`) so we can evolve it later? +3. How should errors propagate back to the upstream producer? Currently the + only feedback is the DB + logs. A structured response stream (stdout JSON + lines mirroring the input) would be valuable for reactive callers. +4. Should `--tps 0` (drain-as-fast) bound concurrency by pool size, or is + "one in flight at a time" acceptable for the relayer case?