diff --git a/Cargo.lock b/Cargo.lock index c23352acee..ef2b7b8f58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,6 +91,7 @@ dependencies = [ "alloy-contract", "alloy-core", "alloy-eips", + "alloy-genesis", "alloy-json-rpc", "alloy-network", "alloy-provider", @@ -136,7 +137,7 @@ dependencies = [ "k256", "once_cell", "rand 0.8.5", - "secp256k1", + "secp256k1 0.30.0", "serde", "serde_json", "serde_with", @@ -244,7 +245,9 @@ dependencies = [ "alloy-primitives", "alloy-rlp", "borsh", + "k256", "serde", + "serde_with", "thiserror 2.0.17", ] @@ -283,6 +286,21 @@ dependencies = [ "sha2", ] +[[package]] +name = "alloy-genesis" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbf9480307b09d22876efb67d30cadd9013134c21f3a17ec9f93fd7536d38024" +dependencies = [ + "alloy-eips", + "alloy-primitives", + "alloy-serde", + "alloy-trie", + "borsh", + "serde", + "serde_with", +] + [[package]] name = "alloy-json-abi" version = "1.5.7" @@ -500,6 +518,7 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-anvil", "alloy-rpc-types-debug", + "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-rpc-types-trace", "alloy-rpc-types-txpool", @@ -542,6 +561,24 @@ dependencies = [ "serde_with", ] +[[package]] +name = "alloy-rpc-types-engine" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb9b97b6e7965679ad22df297dda809b11cebc13405c1b537e5cffecc95834fa" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rlp", + "alloy-serde", + "derive_more 2.1.1", + "jsonwebtoken", + "rand 0.8.5", + "serde", + "strum", +] + [[package]] name = "alloy-rpc-types-eth" version = "1.8.3" @@ -3753,7 +3790,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.114", ] [[package]] @@ -3944,6 +3981,7 @@ dependencies = [ "num", "number", "observe", + "pod-sdk", "price-estimation", "prometheus", "prometheus-metric-storage", @@ -3971,6 +4009,7 @@ dependencies = [ "tracing", "url", "vergen", + "winner-selection", ] [[package]] @@ -4015,6 +4054,7 @@ dependencies = [ "number", "observe", "orderbook", + "pod-sdk", "price-estimation", "refunder", "reqwest 0.13.2", @@ -4702,6 +4742,9 @@ name = "hex" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +dependencies = [ + "serde", +] [[package]] name = "hex-conservative" @@ -4945,7 +4988,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -5266,6 +5309,21 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonwebtoken" +version = "9.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a87cc7a48537badeae96744432de36f4be2b4a34a05a5ef32e9dd8a1c169dde" +dependencies = [ + "base64 0.22.1", + "js-sys", + "pem", + "ring", + "serde", + "serde_json", + "simple_asn1", +] + [[package]] name = "k256" version = "0.13.4" @@ -5283,9 +5341,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.6" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" +checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" dependencies = [ "cpufeatures", ] @@ -6179,6 +6237,16 @@ dependencies = [ "hmac", ] +[[package]] +name = "pem" +version = "3.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" +dependencies = [ + "base64 0.22.1", + "serde_core", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -6273,6 +6341,71 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "pod-examples-solidity" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec6bf9dd02ec5502f0a937acfbd6d6cd69cf20b02287c45ab5e90ccce79c701f" +dependencies = [ + "alloy", + "serde", +] + +[[package]] +name = "pod-sdk" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c994bd9a27d72cdb0e52bd7ae6db6e1eeea3a1739cdc3e6012d94791fac913f9" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-json-rpc", + "alloy-network", + "alloy-primitives", + "alloy-provider", + "alloy-pubsub", + "alloy-rpc-types", + "alloy-signer", + "alloy-signer-local", + "alloy-sol-types", + "alloy-transport", + "anyhow", + "async-trait", + "hex", + "pod-examples-solidity", + "pod-types", + "serde", + "tokio", + "tracing", +] + +[[package]] +name = "pod-types" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f19b462663b0ff2db6c59187e0c3d50a52f7679b909fa25261b86ae75591f9b" +dependencies = [ + "alloy-consensus", + "alloy-network", + "alloy-primitives", + "alloy-rpc-types", + "alloy-signer", + "alloy-signer-local", + "alloy-sol-types", + "anyhow", + "base64 0.22.1", + "bytes", + "hex", + "itertools 0.14.0", + "secp256k1 0.31.1", + "serde", + "serde_with", + "thiserror 2.0.17", + "tokio", + "tracing", + "utoipa", +] + [[package]] name = "portable-atomic" version = "1.13.0" @@ -6566,7 +6699,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.114", @@ -6579,7 +6712,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.114", @@ -6649,7 +6782,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.6.1", + "socket2 0.5.10", "thiserror 2.0.17", "tokio", "tracing", @@ -6687,7 +6820,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.1", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] @@ -7394,7 +7527,19 @@ checksum = "b50c5943d326858130af85e049f2661ba3c78b26589b8ab98e65e80ae44a1252" dependencies = [ "bitcoin_hashes", "rand 0.8.5", - "secp256k1-sys", + "secp256k1-sys 0.10.1", + "serde", +] + +[[package]] +name = "secp256k1" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c3c81b43dc2d8877c216a3fccf76677ee1ebccd429566d3e67447290d0c42b2" +dependencies = [ + "bitcoin_hashes", + "rand 0.9.4", + "secp256k1-sys 0.11.0", "serde", ] @@ -7407,6 +7552,15 @@ dependencies = [ "cc", ] +[[package]] +name = "secp256k1-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcb913707158fadaf0d8702c2db0e857de66eb003ccfdda5924b5f5ac98efb38" +dependencies = [ + "cc", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -7770,6 +7924,18 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" +[[package]] +name = "simple_asn1" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" +dependencies = [ + "num-bigint", + "num-traits", + "thiserror 2.0.17", + "time", +] + [[package]] name = "simulator" version = "0.1.0" @@ -9054,6 +9220,29 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "utoipa" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fcc29c80c21c31608227e0912b2d7fddba57ad76b606890627ba8ee7964e993" +dependencies = [ + "indexmap 2.13.0", + "serde", + "serde_json", + "utoipa-gen", +] + +[[package]] +name = "utoipa-gen" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d79d08d92ab8af4c5e8a6da20c47ae3f61a0f1dabc1997cdf2d082b757ca08b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "uuid" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 7ad6cdcfab..121fac815d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ order-validation = { path = "crates/order-validation" } orderbook = { path = "crates/orderbook" } paste = "1.0" pin-project-lite = "0.2.14" +pod-sdk = "0.5.1" prettyplease = "0.2.37" price-estimation = { path = "crates/price-estimation" } proc-macro2 = "1.0.103" diff --git a/Justfile b/Justfile index a9156bda66..6d76a763b1 100644 --- a/Justfile +++ b/Justfile @@ -47,6 +47,13 @@ test-e2e filters="" *extra="": test-driver: RUST_MIN_STACK=3145728 cargo nextest run -p driver --test-threads 1 --run-ignored ignored-only +# Run pod flow tests (pod_ prefixed tests). +# +# These tests validate the pod network integration and are not run in CI by +# default. They require access to the pod network endpoint. +test-pod: + cargo nextest run -p e2e 'pod_' --test-threads 1 --failure-output final --run-ignored ignored-only + # Run clippy clippy: cargo clippy --locked --workspace --all-features --all-targets -- -D warnings diff --git a/crates/autopilot/src/domain/competition/bid.rs b/crates/autopilot/src/domain/competition/bid.rs index 4bf089ee56..fa3ab8c772 100644 --- a/crates/autopilot/src/domain/competition/bid.rs +++ b/crates/autopilot/src/domain/competition/bid.rs @@ -9,24 +9,20 @@ use { pub type Scored = state::Scored; pub type Ranked = state::Ranked; -/// A solver's auction bid, which includes solution and corresponding driver -/// data, progressing through the winner selection process. -/// -/// It uses the type-state pattern to enforce correct state -/// transitions at compile time. The state parameter tracks progression through -/// three phases: -/// -/// 1. **Unscored**: Initial state when the solution is received from the driver -/// 2. **Scored**: After computing surplus and fees for the solution -/// 3. **Ranked**: After winner selection determines if this is a winner +/// Payload carried by [`Bid`]: the solution plus its originating driver. +/// Accessible directly through the bid via [`winner_selection::Bid`]'s +/// `Deref` impl, so `bid.solution()` and `bid.driver()` keep working. #[derive(Clone)] -pub struct Bid { +pub struct BidPayload { solution: Solution, driver: Arc, - state: State, } -impl Bid { +impl BidPayload { + pub fn new(solution: Solution, driver: Arc) -> Self { + Self { solution, driver } + } + pub fn solution(&self) -> &Solution { &self.solution } @@ -36,29 +32,7 @@ impl Bid { } } -impl state::HasState for Bid { - type Next = Bid; - type State = State; - - fn with_state(self, state: NewState) -> Self::Next { - Bid { - solution: self.solution, - driver: self.driver, - state, - } - } - - fn state(&self) -> &Self::State { - &self.state - } -} - -impl Bid { - pub fn new(solution: Solution, driver: Arc) -> Self { - Self { - solution, - driver, - state: Unscored, - } - } -} +/// A solver's auction bid in the typestate pipeline `Unscored -> Scored -> +/// Ranked`. State transitions are enforced at compile time via +/// [`winner_selection::Bid`]. +pub type Bid = ::winner_selection::Bid; diff --git a/crates/autopilot/src/domain/competition/mod.rs b/crates/autopilot/src/domain/competition/mod.rs index 2900ddec47..62114081e9 100644 --- a/crates/autopilot/src/domain/competition/mod.rs +++ b/crates/autopilot/src/domain/competition/mod.rs @@ -11,7 +11,7 @@ use { mod bid; pub mod winner_selection; -pub use bid::{Bid, RankType, Ranked, Scored, Unscored}; +pub use bid::{Bid, BidPayload, RankType, Ranked, Scored, Unscored}; type SolutionId = u64; @@ -88,7 +88,7 @@ pub struct TradedOrder { Eq, Ord, )] -pub struct Score(eth::Ether); +pub struct Score(pub eth::Ether); impl Score { pub fn try_new(score: eth::Ether) -> Result { diff --git a/crates/autopilot/src/domain/competition/winner_selection.rs b/crates/autopilot/src/domain/competition/winner_selection.rs index d779bd52ee..aad8d69aa8 100644 --- a/crates/autopilot/src/domain/competition/winner_selection.rs +++ b/crates/autopilot/src/domain/competition/winner_selection.rs @@ -31,6 +31,7 @@ use { competition::{Bid, RankType, Ranked, Score, Solution, TradedOrder, Unscored}, fee, }, + ::observe::metrics, ::winner_selection::state::{HasState, RankedItem, ScoredItem, UnscoredItem}, eth_domain_types::{self as eth, Address, WrappedNativeToken}, std::collections::HashMap, @@ -59,52 +60,50 @@ impl Arbitrator { /// Runs the entire auction mechanism on the passed in solutions. #[instrument(skip_all)] pub fn arbitrate(&self, bids: Vec>, auction: &domain::Auction) -> Ranking { - let context = auction.into(); - let mut bid_by_key = HashMap::with_capacity(bids.len()); - let mut solutions = Vec::with_capacity(bids.len()); - - for bid in bids { - let key = SolutionKey::from(bid.solution()); - let solution = bid.solution().into(); - bid_by_key.insert(key, bid); - solutions.push(solution); - } - - let ws_ranking = self.0.arbitrate(solutions, &context); - - // Compute reference scores while we still have ws_ranking - let reference_scores: HashMap = self - .0 - .compute_reference_scores(&ws_ranking) + let paired = bids .into_iter() - .map(|(solver, score)| (solver, Score(eth::Ether(score)))) + .map(|bid| { + let solution: winsel::Solution = bid.solution().into(); + (bid, solution) + }) .collect(); - - let mut filtered_out = Vec::with_capacity(ws_ranking.filtered_out.len()); - for ws_solution in ws_ranking.filtered_out { - let key = SolutionKey::from(&ws_solution); - let bid = bid_by_key - .remove(&key) - .expect("every ranked solution has a matching bid"); - let score = ws_solution.score(); - filtered_out.push( - bid.with_score(Score(eth::Ether(score))) - .with_rank(RankType::FilteredOut), + let rejoined = self.0.arbitrate_paired_and_rejoin(paired, &auction.into()); + + // An orphan means two input bids shared a `SolutionKey`. Autopilot + // runs one process per chain; panicking here takes auctioning down + // until restart. Warn and bump the counter so oncall can alert. + if rejoined.orphans > 0 { + tracing::warn!( + orphans = rejoined.orphans, + "ranked solutions had no matching bid; SolutionKey collision suspected", ); + Metrics::get() + .orphan_solutions + .inc_by(rejoined.orphans as u64); } + debug_assert!(rejoined.orphans == 0, "expected no orphans"); - let mut ranked = Vec::with_capacity(ws_ranking.ranked.len()); - for ranked_solution in ws_ranking.ranked { - let key = SolutionKey::from(&ranked_solution); - let bid = bid_by_key - .remove(&key) - .expect("every ranked solution has a matching bid"); - let score = ranked_solution.score(); - ranked.push( - bid.with_score(Score(eth::Ether(score))) - .with_rank(ranked_solution.state().rank_type), - ); - } + let reference_scores = rejoined + .reference_scores + .into_iter() + .map(|(solver, score)| (solver, Score(eth::Ether(score)))) + .collect(); + let filtered_out = rejoined + .filtered_out + .into_iter() + .map(|(bid, ws_solution)| { + bid.with_score(Score(eth::Ether(ws_solution.score()))) + .with_rank(RankType::FilteredOut) + }) + .collect(); + let ranked = rejoined + .ranked + .into_iter() + .map(|(bid, ws_solution)| { + bid.with_score(Score(eth::Ether(ws_solution.score()))) + .with_rank(ws_solution.state().rank_type) + }) + .collect(); Ranking { filtered_out, @@ -114,6 +113,21 @@ impl Arbitrator { } } +#[derive(prometheus_metric_storage::MetricStorage)] +#[metric(subsystem = "winner_selection")] +struct Metrics { + /// Arbitrator-returned solutions whose `SolutionKey` had no matching + /// bid in the rejoin step. Non-zero indicates a `SolutionKey` collision + /// in the input set or an arbitrator invariant violation. + orphan_solutions: prometheus::IntCounter, +} + +impl Metrics { + fn get() -> &'static Self { + Metrics::instance(metrics::get_storage_registry()).unwrap() + } +} + impl From<&domain::Auction> for winsel::AuctionContext { fn from(auction: &domain::Auction) -> Self { Self { @@ -155,6 +169,11 @@ impl From<&Solution> for winsel::Solution { .iter() .map(|(uid, order)| to_winsel_order(*uid, order)) .collect(), + solution + .prices() + .iter() + .map(|(token, price)| (Address::from(*token), price.get().0)) + .collect(), ) } } @@ -206,30 +225,6 @@ impl From for winsel::primitives::FeePolicy { } } -#[derive(Clone, Copy, Hash, Eq, PartialEq)] -struct SolutionKey { - solver: eth::Address, - solution_id: u64, -} - -impl From<&Solution> for SolutionKey { - fn from(solution: &Solution) -> Self { - Self { - solver: solution.solver(), - solution_id: solution.id(), - } - } -} - -impl From<&winsel::Solution> for SolutionKey { - fn from(solution: &winsel::Solution) -> Self { - Self { - solver: solution.solver(), - solution_id: solution.id(), - } - } -} - pub struct Ranking { /// Solutions that were discarded because they were malformed /// in some way or deemed unfair by the selection mechanism. @@ -286,7 +281,7 @@ mod tests { Price, order::{self, AppDataHash}, }, - competition::{Bid, Solution, TradedOrder, Unscored}, + competition::{Bid, BidPayload, Solution, TradedOrder, Unscored}, }, infra::Driver, }, @@ -1220,7 +1215,7 @@ mod tests { .await .unwrap(); - Bid::new(solution, std::sync::Arc::new(driver)) + Bid::new(BidPayload::new(solution, std::sync::Arc::new(driver))) } fn amount(value: u128) -> String { diff --git a/crates/autopilot/src/domain/settlement/mod.rs b/crates/autopilot/src/domain/settlement/mod.rs index 40fd64de2f..15f71ab1ee 100644 --- a/crates/autopilot/src/domain/settlement/mod.rs +++ b/crates/autopilot/src/domain/settlement/mod.rs @@ -400,7 +400,7 @@ mod tests { .collect(), native_prices: prices.clone(), }; - let solution = ws::Solution::new(0, ws::Address::ZERO, vec![order]); + let solution = ws::Solution::new(0, ws::Address::ZERO, vec![order], Default::default()); let arbitrator = ws::Arbitrator { max_winners: 1, weth: ws::Address::ZERO, diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index d95fbd7b7f..c8f24780ba 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -643,7 +643,10 @@ impl RunLoop { .filter_map(|solution| match solution { Ok(solution) => { Metrics::solution_ok(&driver); - Some(competition::Bid::new(solution, driver.clone())) + Some(competition::Bid::new(competition::BidPayload::new( + solution, + driver.clone(), + ))) } Err(err) => { Metrics::solution_err(&driver, &err); diff --git a/crates/autopilot/src/shadow.rs b/crates/autopilot/src/shadow.rs index 7ec6833fcb..44d161d054 100644 --- a/crates/autopilot/src/shadow.rs +++ b/crates/autopilot/src/shadow.rs @@ -11,7 +11,7 @@ use { crate::{ domain::{ self, - competition::{Bid, Score, Unscored, winner_selection}, + competition::{Bid, BidPayload, Score, Unscored, winner_selection}, }, infra::{ self, @@ -259,7 +259,7 @@ impl RunLoop { solutions .into_iter() - .map(|s| Bid::new(s, Arc::clone(&driver))) + .map(|s| Bid::new(BidPayload::new(s, Arc::clone(&driver)))) .collect() } diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index 470395b1c2..1bde4e8e32 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -66,7 +66,7 @@ serde-ext = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } simulator = { workspace = true } -solvers-dto = { path = "../solvers-dto" } +solvers-dto = { workspace = true } thiserror = { workspace = true } tikv-jemallocator = { workspace = true } token-info = { workspace = true } @@ -86,10 +86,12 @@ clap = { workspace = true } contracts = { workspace = true } model = { workspace = true } observe = { workspace = true } +pod-sdk = { workspace = true } shared = { workspace = true } signature-validator = { workspace = true } solver = { workspace = true } tracing = { workspace = true } +winner-selection = { workspace = true } [dev-dependencies] alloy = { workspace = true, features = ["signer-mnemonic"] } diff --git a/crates/driver/src/domain/competition/auction.rs b/crates/driver/src/domain/competition/auction.rs index c4ddc6e498..f98ebfa68e 100644 --- a/crates/driver/src/domain/competition/auction.rs +++ b/crates/driver/src/domain/competition/auction.rs @@ -142,6 +142,10 @@ impl Tokens { pub fn iter(&self) -> impl Iterator { self.0.values() } + + pub fn iter_keys_values(&self) -> impl Iterator { + self.0.iter() + } } #[derive(Debug, Clone)] diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 864dbf39dc..2bbd5525cc 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -41,6 +41,7 @@ pub mod order; mod pre_processing; pub mod risk_detector; pub mod solution; +pub mod solver_winner_selection; pub mod sorting; use { @@ -561,6 +562,19 @@ impl Competition { lock.truncate(max_to_propose * MAX_CONCURRENT_AUCTIONS); } + // Shadow-mode submission to pod. Failures and cost must not affect + // the response we return to autopilot. `spawn` does its serialization + // synchronously from borrowed inputs, so no deep `Auction` clone + // happens here. Send the full scored vec so pod and autopilot rank + // the same set. + if let (Some(pm), Some(auction_id)) = (self.solver.pod_manager(), auction.id) { + let solveds: Vec = scored.iter().map(|(s, _)| s).cloned().collect(); + pm.spawn(auction_id, deadline, solveds, auction, self.solver.clone()); + } + + // Re-simulate the solution on every new block until the deadline ends to make + // sure we actually submit a working solution close to when the winner + // gets picked by the procotol. if let Ok(remaining) = deadline.remaining() { let _ = tokio::time::timeout( remaining, @@ -1006,7 +1020,7 @@ fn merge( /// Solution information sent to the protocol by the driver before the solution /// ranking happens. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Solved { pub id: solution::Id, pub score: eth::Ether, @@ -1015,7 +1029,7 @@ pub struct Solved { pub gas: Option, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Amounts { pub side: order::Side, /// The sell token and limit sell amount of sell token. diff --git a/crates/driver/src/domain/competition/solver_winner_selection.rs b/crates/driver/src/domain/competition/solver_winner_selection.rs new file mode 100644 index 0000000000..fed5945f41 --- /dev/null +++ b/crates/driver/src/domain/competition/solver_winner_selection.rs @@ -0,0 +1,202 @@ +pub use winner_selection::Unscored; +use { + crate::{domain::competition::order::FeePolicy, infra::api::routes::solve::dto}, + ::observe::metrics, + eth_domain_types::{self as eth, Ether, WrappedNativeToken}, + winner_selection::{ + self as winsel, + OrderUid, + state::{HasState, RankedItem, ScoredItem, UnscoredItem}, + }, +}; + +/// Score for a solution, wrapping the surplus value. +#[derive(Debug, Clone, Copy)] +pub struct Score(pub Ether); + +#[derive(Clone)] +pub struct SolverArbitrator(winsel::Arbitrator); + +/// Implements auction arbitration in 3 phases: +/// 1. filter unfair solutions +/// 2. mark winners +/// 3. compute reference scores +/// +/// The functions assume the `Arbitrator` is the only one +/// changing the ordering or the `bids`. +impl SolverArbitrator { + pub fn new(max_winners: usize, wrapped_native_token: WrappedNativeToken) -> Self { + let token: eth::TokenAddress = *wrapped_native_token; + Self(winsel::Arbitrator { + max_winners, + weth: *token, + }) + } + + /// Runs the entire auction mechanism on the passed in solutions. + pub fn arbitrate( + &self, + bids: Vec>, + auction: &crate::domain::competition::Auction, + ) -> Vec { + self.arbitrate_with_context(bids, &auction.into()) + } + + /// Same as [`Self::arbitrate`] but takes a precomputed + /// [`winsel::AuctionContext`]. Use this when the caller has already + /// built the context in the foreground, to avoid the per-call + /// conversion from `Auction`. + pub fn arbitrate_with_context( + &self, + bids: Vec>, + context: &winsel::AuctionContext, + ) -> Vec { + let paired = bids + .into_iter() + .map(|bid| { + let solution: winsel::Solution = bid.payload().into(); + (bid, solution) + }) + .collect(); + let rejoined = self.0.arbitrate_paired_and_rejoin(paired, context); + + // An orphan means two input bids shared a `SolutionKey`. Pod is + // open so untrusted input can engineer such collisions. Don't panic + // (it would kill the spawned pod task); warn and bump the counter + // so oncall can alert. + if rejoined.orphans > 0 { + tracing::warn!( + orphans = rejoined.orphans, + "ranked solutions had no matching bid; SolutionKey collision suspected", + ); + Metrics::get() + .orphan_solutions + .inc_by(rejoined.orphans as u64); + } + debug_assert!(rejoined.orphans == 0, "expected no orphans"); + + rejoined + .ranked + .into_iter() + .map(|(bid, ws_solution)| { + bid.with_score(Score(eth::Ether(ws_solution.score()))) + .with_rank(ws_solution.state().rank_type) + }) + .collect() + } +} + +#[derive(prometheus_metric_storage::MetricStorage)] +#[metric(subsystem = "winner_selection")] +struct Metrics { + /// Arbitrator-returned solutions whose `SolutionKey` had no matching + /// bid in the rejoin step. Non-zero indicates a `SolutionKey` collision + /// in the input set or an arbitrator invariant violation. + orphan_solutions: prometheus::IntCounter, +} + +impl Metrics { + fn get() -> &'static Self { + Metrics::instance(metrics::get_storage_registry()).unwrap() + } +} + +impl From<&crate::domain::competition::Auction> for winsel::AuctionContext { + fn from(auction: &crate::domain::competition::Auction) -> Self { + Self { + fee_policies: auction + .orders + .iter() + .map(|order| { + let uid = winsel::OrderUid(order.uid.0.0); + let policies = order + .protocol_fees + .iter() + .map(winsel::primitives::FeePolicy::from) + .collect(); + (uid, policies) + }) + .collect(), + surplus_capturing_jit_order_owners: auction + .surplus_capturing_jit_order_owners + .iter() + .copied() + .collect(), + native_prices: auction + .tokens + .iter_keys_values() + .map(|(token, price)| ((*token).into(), price.price.unwrap().0.0)) + .collect(), + } + } +} + +impl From<&crate::domain::competition::order::fees::FeePolicy> for winsel::primitives::FeePolicy { + fn from(policy: &crate::domain::competition::order::fees::FeePolicy) -> Self { + match policy { + FeePolicy::Surplus { + factor, + max_volume_factor, + } => Self::Surplus { + factor: *factor, + max_volume_factor: *max_volume_factor, + }, + FeePolicy::PriceImprovement { + factor, + max_volume_factor, + quote, + } => Self::PriceImprovement { + factor: *factor, + max_volume_factor: *max_volume_factor, + quote: winsel::primitives::Quote { + sell_amount: quote.sell.amount.0, + buy_amount: quote.buy.amount.0, + fee: quote.fee.amount.0, + solver: quote.solver, + }, + }, + FeePolicy::Volume { factor } => Self::Volume { factor: *factor }, + } + } +} + +impl From<&crate::infra::api::routes::solve::dto::solve_response::Solution> + for winsel::Solution +{ + fn from(solution: &crate::infra::api::routes::solve::dto::solve_response::Solution) -> Self { + Self::new( + solution.solution_id, + solution.submission_address, + solution + .orders + .iter() + .map(|(uid, order)| winsel::Order { + uid: OrderUid(*uid), + sell_token: order.sell_token, + buy_token: order.buy_token, + sell_amount: order.limit_sell, + buy_amount: order.limit_buy, + executed_sell: order.executed_sell, + executed_buy: order.executed_buy, + side: match order.side { + crate::infra::api::routes::solve::dto::solve_response::Side::Buy => { + winsel::Side::Buy + } + crate::infra::api::routes::solve::dto::solve_response::Side::Sell => { + winsel::Side::Sell + } + }, + }) + .collect(), + solution.clearing_prices.clone(), + ) + } +} + +pub type Scored = winsel::state::Scored; +pub type Ranked = winsel::state::Ranked; + +/// A solver's auction bid in the typestate pipeline `Unscored -> Scored -> +/// Ranked`. State transitions are enforced at compile time via +/// [`winsel::Bid`]. +pub type Bid = winsel::Bid; diff --git a/crates/driver/src/infra/api/mod.rs b/crates/driver/src/infra/api/mod.rs index 0ba4ad0441..20c4b59223 100644 --- a/crates/driver/src/infra/api/mod.rs +++ b/crates/driver/src/infra/api/mod.rs @@ -114,7 +114,7 @@ impl Api { let router = router.with_state(State(Arc::new(Inner { eth: self.eth.clone(), - solver: solver.clone(), + solver: Arc::new(solver.clone()), competition: domain::Competition::new( solver, self.eth.clone(), @@ -192,7 +192,7 @@ impl State { &self.0.eth } - fn solver(&self) -> &Solver { + fn solver(&self) -> &Arc { &self.0.solver } @@ -211,7 +211,7 @@ impl State { struct Inner { eth: Ethereum, - solver: Solver, + solver: Arc, competition: Arc, liquidity: liquidity::Fetcher, tokens: tokens::Fetcher, diff --git a/crates/driver/src/infra/api/routes/solve/dto/mod.rs b/crates/driver/src/infra/api/routes/solve/dto/mod.rs index 458d9c47a9..d323f97461 100644 --- a/crates/driver/src/infra/api/routes/solve/dto/mod.rs +++ b/crates/driver/src/infra/api/routes/solve/dto/mod.rs @@ -1,5 +1,5 @@ pub mod solve_request; -mod solve_response; +pub(crate) mod solve_response; pub use { solve_request::{Error as AuctionError, SolveRequest}, diff --git a/crates/driver/src/infra/api/routes/solve/dto/solve_response.rs b/crates/driver/src/infra/api/routes/solve/dto/solve_response.rs index 3db573f820..ca3c1eee39 100644 --- a/crates/driver/src/infra/api/routes/solve/dto/solve_response.rs +++ b/crates/driver/src/infra/api/routes/solve/dto/solve_response.rs @@ -4,7 +4,7 @@ use { infra::Solver, }, eth_domain_types as eth, - serde::Serialize, + serde::{Deserialize, Serialize}, serde_with::serde_as, std::collections::HashMap, }; @@ -20,10 +20,10 @@ impl SolveResponse { } #[serde_as] -#[derive(Debug, Default, Serialize)] +#[derive(Debug, Default, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct SolveResponse { - solutions: Vec, + pub solutions: Vec, } impl Solution { @@ -62,26 +62,26 @@ impl Solution { } } -type OrderId = [u8; order::UID_LEN]; +pub(crate) type OrderId = [u8; order::UID_LEN]; #[serde_as] -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Solution { /// Unique ID of the solution (per driver competition), used to identify it /// in subsequent requests (reveal, settle). - solution_id: u64, - submission_address: eth::Address, + pub solution_id: u64, + pub submission_address: eth::Address, #[serde_as(as = "serde_ext::U256")] - score: eth::U256, + pub score: eth::U256, #[serde_as(as = "HashMap")] - orders: HashMap, + pub orders: HashMap, #[serde_as(as = "HashMap<_, serde_ext::U256>")] - clearing_prices: HashMap, + pub clearing_prices: HashMap, } #[serde_as] -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct TradedOrder { pub side: Side, @@ -102,7 +102,7 @@ pub struct TradedOrder { } #[serde_as] -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Side { Buy, diff --git a/crates/driver/src/infra/config/file/load.rs b/crates/driver/src/infra/config/file/load.rs index 6149891e2c..1c740293df 100644 --- a/crates/driver/src/infra/config/file/load.rs +++ b/crates/driver/src/infra/config/file/load.rs @@ -51,100 +51,106 @@ pub async fn load(chain: Chain, path: &Path) -> infra::Config { "The configured chain ID does not match the connected Ethereum node" ); infra::Config { - solvers: join_all(config.solvers.into_iter().map(|solver_config| async move { - let account = load_account(solver_config.account, config.chain_id).await; - solver::Config { - endpoint: solver_config.endpoint, - name: solver_config.name.into(), - slippage: solver::Slippage { - relative: big_decimal_to_big_rational(&solver_config.slippage.relative), - absolute: solver_config.slippage.absolute.map(eth::Ether), - }, - liquidity: if solver_config.skip_liquidity { - solver::Liquidity::Skip - } else { - solver::Liquidity::Fetch - }, - account, - timeouts: solver::Timeouts { - http_delay: chrono::Duration::from_std(solver_config.timeouts.http_time_buffer) - .unwrap(), - solving_share_of_deadline: solver_config - .timeouts - .solving_share_of_deadline - .try_into() + solvers: join_all(config.solvers.into_iter().map(|solver_config| { + let pod_config = config.pod.clone(); + async move { + let account = load_account(solver_config.account, config.chain_id).await; + solver::Config { + endpoint: solver_config.endpoint, + name: solver_config.name.into(), + slippage: solver::Slippage { + relative: big_decimal_to_big_rational(&solver_config.slippage.relative), + absolute: solver_config.slippage.absolute.map(eth::Ether), + }, + liquidity: if solver_config.skip_liquidity { + solver::Liquidity::Skip + } else { + solver::Liquidity::Fetch + }, + account, + timeouts: solver::Timeouts { + http_delay: chrono::Duration::from_std( + solver_config.timeouts.http_time_buffer, + ) .unwrap(), - }, - request_headers: solver_config.request_headers, - fee_handler: solver_config.fee_handler, - quote_using_limit_orders: solver_config.quote_using_limit_orders, - merge_solutions: match solver_config.merge_solutions { - true => SolutionMerging::Allowed { - max_orders_per_merged_solution: solver_config - .max_orders_per_merged_solution, + solving_share_of_deadline: solver_config + .timeouts + .solving_share_of_deadline + .try_into() + .unwrap(), }, - false => SolutionMerging::Forbidden, - }, - s3: solver_config.s3.map(Into::into), - solver_native_token: solver_config.manage_native_token.to_domain(), - quote_tx_origin: solver_config.quote_tx_origin, - response_size_limit_max_bytes: solver_config.response_size_limit_max_bytes, - bad_order_detection: BadOrderDetection { - tokens_supported: solver_config - .bad_order_detection - .token_supported - .iter() - .map(|(token, supported)| { - ( - eth::TokenAddress::from(*token), - match supported { - true => risk_detector::Quality::Supported, - false => risk_detector::Quality::Unsupported, - }, - ) - }) - .collect(), - enable_simulation_strategy: solver_config - .bad_order_detection - .enable_simulation_strategy, - enable_metrics_strategy: solver_config - .bad_order_detection - .enable_metrics_strategy, - metrics_strategy_failure_ratio: solver_config - .bad_order_detection - .metrics_strategy_failure_ratio, - metrics_strategy_required_measurements: solver_config - .bad_order_detection - .metrics_strategy_required_measurements, - metrics_strategy_log_only: solver_config - .bad_order_detection - .metrics_strategy_log_only, - metrics_strategy_order_freeze_time: solver_config - .bad_order_detection - .metrics_strategy_freeze_time, - metrics_strategy_cache_gc_interval: solver_config - .bad_order_detection - .metrics_strategy_gc_interval, - metrics_strategy_cache_max_age: solver_config - .bad_order_detection - .metrics_strategy_gc_max_age, - }, - settle_queue_size: solver_config.settle_queue_size, - flashloans_enabled: config.flashloans_enabled, - fetch_liquidity_at_block: match config.liquidity.fetch_at_block { - file::AtBlock::Latest => liquidity::AtBlock::Latest, - file::AtBlock::Finalized => liquidity::AtBlock::Finalized, - }, - haircut_bps: solver_config.haircut_bps, - submission_accounts: join_all( - solver_config - .submission_accounts - .into_iter() - .map(|acc| load_account(acc, config.chain_id)), - ) - .await, - forwarder_contract: solver_config.forwarder_contract, - max_solutions_to_propose: solver_config.max_solutions_to_propose, + request_headers: solver_config.request_headers, + fee_handler: solver_config.fee_handler, + quote_using_limit_orders: solver_config.quote_using_limit_orders, + merge_solutions: match solver_config.merge_solutions { + true => SolutionMerging::Allowed { + max_orders_per_merged_solution: solver_config + .max_orders_per_merged_solution, + }, + false => SolutionMerging::Forbidden, + }, + s3: solver_config.s3.map(Into::into), + solver_native_token: solver_config.manage_native_token.to_domain(), + quote_tx_origin: solver_config.quote_tx_origin, + response_size_limit_max_bytes: solver_config.response_size_limit_max_bytes, + bad_order_detection: BadOrderDetection { + tokens_supported: solver_config + .bad_order_detection + .token_supported + .iter() + .map(|(token, supported)| { + ( + eth::TokenAddress::from(*token), + match supported { + true => risk_detector::Quality::Supported, + false => risk_detector::Quality::Unsupported, + }, + ) + }) + .collect(), + enable_simulation_strategy: solver_config + .bad_order_detection + .enable_simulation_strategy, + enable_metrics_strategy: solver_config + .bad_order_detection + .enable_metrics_strategy, + metrics_strategy_failure_ratio: solver_config + .bad_order_detection + .metrics_strategy_failure_ratio, + metrics_strategy_required_measurements: solver_config + .bad_order_detection + .metrics_strategy_required_measurements, + metrics_strategy_log_only: solver_config + .bad_order_detection + .metrics_strategy_log_only, + metrics_strategy_order_freeze_time: solver_config + .bad_order_detection + .metrics_strategy_freeze_time, + metrics_strategy_cache_gc_interval: solver_config + .bad_order_detection + .metrics_strategy_gc_interval, + metrics_strategy_cache_max_age: solver_config + .bad_order_detection + .metrics_strategy_gc_max_age, + }, + settle_queue_size: solver_config.settle_queue_size, + flashloans_enabled: config.flashloans_enabled, + fetch_liquidity_at_block: match config.liquidity.fetch_at_block { + file::AtBlock::Latest => liquidity::AtBlock::Latest, + file::AtBlock::Finalized => liquidity::AtBlock::Finalized, + }, + haircut_bps: solver_config.haircut_bps, + submission_accounts: join_all( + solver_config + .submission_accounts + .into_iter() + .map(|acc| load_account(acc, config.chain_id)), + ) + .await, + forwarder_contract: solver_config.forwarder_contract, + max_solutions_to_propose: solver_config.max_solutions_to_propose, + pod: pod_config, + } } })) .await, diff --git a/crates/driver/src/infra/config/file/mod.rs b/crates/driver/src/infra/config/file/mod.rs index 054028be10..1793057897 100644 --- a/crates/driver/src/infra/config/file/mod.rs +++ b/crates/driver/src/infra/config/file/mod.rs @@ -56,6 +56,8 @@ struct Config { #[serde(default)] liquidity: LiquidityConfig, + pod: Option, + /// Defines order prioritization strategies that will be applied in the /// specified order. #[serde( diff --git a/crates/driver/src/infra/mod.rs b/crates/driver/src/infra/mod.rs index 929d03119d..003bb9711f 100644 --- a/crates/driver/src/infra/mod.rs +++ b/crates/driver/src/infra/mod.rs @@ -7,6 +7,7 @@ pub mod mempool; pub mod notify; pub mod observe; pub mod persistence; +pub mod pod; pub mod solver; pub mod time; pub mod tokens; diff --git a/crates/driver/src/infra/pod/config.rs b/crates/driver/src/infra/pod/config.rs new file mode 100644 index 0000000000..68c95bc249 --- /dev/null +++ b/crates/driver/src/infra/pod/config.rs @@ -0,0 +1,25 @@ +use {serde::Deserialize, serde_with::serde_as, url::Url}; + +/// Default number of winning solutions selected by the local arbitrator. +/// +/// Once pod takes real auction traffic this acts as a protocol parameter: +/// changing it has the same blast radius as a parameter hardfork. Exposed +/// in config from the first version so a future change is configurable +/// without a release. +const DEFAULT_MAX_WINNERS: usize = 10; + +#[serde_as] +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +pub struct Config { + pub endpoint: Url, + pub auction_contract_address: pod_sdk::alloy_primitives::Address, + /// Maximum number of winning solutions selected by the local arbitrator + /// when running shadow-mode arbitration over fetched pod bids. + #[serde(default = "default_max_winners")] + pub max_winners: usize, +} + +fn default_max_winners() -> usize { + DEFAULT_MAX_WINNERS +} diff --git a/crates/driver/src/infra/pod/flow.rs b/crates/driver/src/infra/pod/flow.rs new file mode 100644 index 0000000000..d48cb7b19c --- /dev/null +++ b/crates/driver/src/infra/pod/flow.rs @@ -0,0 +1,368 @@ +//! Shadow-mode pod-network bid flow. +//! +//! `PodManager` owns the auction-contract handle, the arbitrator, and the +//! `JoinSet` tracking in-flight pod tasks. `Competition::solve` only calls +//! `PodManager::spawn(...)`. Tasks are aborted when the last `PodManager` +//! clone drops. +//! +//! TODO(production-pod): the fire-and-forget submit + best-effort +//! locked-account recovery is acceptable for shadow mode but must be +//! revisited before pod runs the auction for real. Concretely: queueing +//! semantics for concurrent submissions per solver EOA, retry/backoff +//! policy, observability + alerting on locked-account streaks, and exposing +//! arbitration results for cross-validation against autopilot. + +use { + super::{config, recovery}, + crate::{ + domain::competition::{ + Auction, + Solved, + auction::Id, + solver_winner_selection::{Bid, SolverArbitrator, Unscored}, + }, + infra::{ + api::routes::solve::dto, + solver::{Account, Solver}, + }, + }, + alloy::network::EthereumWallet, + anyhow::{Context as _, Result}, + pod_sdk::{ + Provider, + auctions::client::AuctionClient, + provider::{PodProvider, PodProviderBuilder}, + }, + std::{ + future::Future, + sync::{Arc, Mutex}, + time::Duration, + }, + tokio::task::JoinSet, + tracing::{Instrument as _, instrument}, + winner_selection::{AuctionContext, state::RankedItem}, +}; + +/// Grace window past the auction deadline allowed for pod RPCs. +const POD_RPC_GRACE: Duration = Duration::from_secs(10); +/// Hard timeout for calls that happen after the auction deadline has +/// already passed (fetching bids, recovering a locked account). +const POD_POST_DEADLINE_TIMEOUT: Duration = Duration::from_secs(30); + +#[derive(Clone)] +pub struct PodManager { + inner: Arc, + /// Tracks spawned pod tasks so they are aborted when the last + /// `PodManager` clone drops. Spawned futures only hold `Arc`, + /// not `PodManager`, so there is no Arc cycle keeping the `JoinSet` + /// alive. + tasks: Arc>>, +} + +struct PodInner { + provider: PodProvider, + arbitrator: SolverArbitrator, + /// Single `AuctionClient` reused across auctions. AuctionClient just wraps + /// the alloy contract handle and carries no per-auction state, so building + /// it once at startup avoids redundant allocations for every bid round. + client: Arc, +} + +impl std::fmt::Debug for PodManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PodManager").finish_non_exhaustive() + } +} + +impl PodManager { + /// Build a `PodManager` from solver and pod config. Returns `None` if pod + /// is unconfigured for this solver. Also returns `None` (after logging) + /// if the provider cannot be constructed; pod runs in shadow mode and + /// must not block solver startup on a transient RPC failure. + pub async fn try_new( + account: &Account, + pod_config: &config::Config, + weth: eth_domain_types::WrappedNativeToken, + ) -> Option { + let provider = match build_pod_provider(account, pod_config).await { + Ok(p) => p, + Err(e) => { + tracing::error!(error = %e, "failed to initialize pod provider"); + return None; + } + }; + + let client = Arc::new(AuctionClient::new( + provider.clone(), + pod_config.auction_contract_address, + )); + let arbitrator = SolverArbitrator::new(pod_config.max_winners, weth); + + Some(Self { + inner: Arc::new(PodInner { + provider, + arbitrator, + client, + }), + tasks: Arc::new(Mutex::new(JoinSet::new())), + }) + } + + /// Fire-and-forget shadow-mode flow: submits the proposed solutions to + /// pod, waits for the auction to end, fetches all bids, and runs local + /// arbitration. Errors are logged and never surfaced to the caller. + /// + /// Bid serialization and [`AuctionContext`] construction happen + /// synchronously on the calling thread, so the spawned task only owns + /// cheap data and the [`Auction`] is never deep-cloned. The full + /// `scored` vec is sent so pod's arbitrator sees the same input set + /// as autopilot's. + pub fn spawn( + &self, + auction_id: Id, + deadline: chrono::DateTime, + scored: Vec, + auction: &Auction, + solver: Solver, + ) { + let bid_value = match scored.first() { + Some(s) => s.score.0, + None => return, + }; + let bid_data = match serde_json::to_vec(&dto::SolveResponse::new(scored, &solver)) { + Ok(b) => b, + Err(e) => { + tracing::warn!(error = %e, "failed to serialize pod bid payload"); + return; + } + }; + let context = AuctionContext::from(auction); + + let inner = self.inner.clone(); + let span = + tracing::info_span!("pod_flow", auction_id = %auction_id.0, solver = %solver.name()); + let task = async move { + if let Err(e) = inner + .submit_bid(auction_id, deadline, bid_value, bid_data, &solver) + .await + { + tracing::warn!(error = %e, "pod bid submission failed (shadow mode)"); + return; + } + + let participants = match inner.fetch_bids(auction_id, deadline).await { + Ok(p) => p, + Err(e) => { + tracing::warn!(error = %e, "pod fetch bids failed (shadow mode)"); + return; + } + }; + + inner.local_arbitration(auction_id, &context, participants); + }; + self.tasks.lock().unwrap().spawn(task.instrument(span)); + } +} + +impl PodInner { + #[instrument(name = "pod_submit_bid", skip_all, fields(auction_id = %auction_id.0))] + async fn submit_bid( + &self, + auction_id: Id, + deadline: chrono::DateTime, + bid_value: pod_sdk::U256, + bid_data: Vec, + solver: &Solver, + ) -> Result<()> { + let pod_auction_id = + pod_sdk::U256::from(u64::try_from(auction_id.0).context("auction id")?); + + tracing::info!(score = %bid_value, payload_len = bid_data.len(), "submitting bid"); + let submit = || async { + self.client + .submit_bid(pod_auction_id, deadline.into(), bid_value, bid_data.clone()) + .await + }; + + // `is_account_locked_error` matches the raw pod-sdk error string. + // `with_timeout` would wrap it via `.context()`, hiding that + // message (anyhow's `Display` shows only the top context). Handle + // the timeout inline to keep the raw error visible. + let timeout = remaining_until(deadline) + POD_RPC_GRACE; + match tokio::time::timeout(timeout, submit()).await { + Ok(Ok(_)) => { + tracing::info!("bid submitted"); + return Ok(()); + } + Ok(Err(e)) if recovery::is_account_locked_error(&e.to_string()) => { + tracing::warn!(error = %e, "locked account detected, attempting recovery"); + } + Ok(Err(e)) => return Err(e.context("submit bid")), + Err(_) => anyhow::bail!("submit bid timed out after {timeout:?}"), + } + + if !with_timeout( + "recover locked account", + POD_POST_DEADLINE_TIMEOUT, + recovery::recover_locked_account(&self.provider, solver.address()), + ) + .await? + { + anyhow::bail!("submission failed but account was not locked"); + } + + let timeout = remaining_until(deadline) + POD_RPC_GRACE; + match tokio::time::timeout(timeout, submit()).await { + Ok(Ok(_)) => { + tracing::info!("bid submitted after recovery"); + Ok(()) + } + Ok(Err(e)) => Err(e.context("submit bid after recovery")), + Err(_) => anyhow::bail!("submit bid after recovery timed out after {timeout:?}"), + } + } + + #[instrument(name = "pod_fetch_bids", skip_all, fields(auction_id = %auction_id.0))] + async fn fetch_bids( + &self, + auction_id: Id, + deadline: chrono::DateTime, + ) -> Result>> { + let pod_auction_id = + pod_sdk::U256::from(u64::try_from(auction_id.0).context("auction id")?); + + let wait_timeout = remaining_until(deadline) + POD_RPC_GRACE; + with_timeout( + "wait for auction end", + wait_timeout, + self.client.wait_for_auction_end(deadline.into()), + ) + .await?; + + let bids = with_timeout( + "fetch bids", + POD_POST_DEADLINE_TIMEOUT, + self.client.fetch_bids(pod_auction_id), + ) + .await?; + + // `bid.bidder` is the on-chain signer; `submission_address` inside + // `bid.data` is unauthenticated payload that any submitter can set + // to any value. Override with `bid.bidder` and warn on mismatch so + // impersonation attempts stay visible. Distinct bidders also keep + // `SolutionKey`s unique downstream. + let mut participants = Vec::with_capacity(bids.len()); + let mut malformed = 0; + let mut spoofed = 0; + for bid in bids { + match serde_json::from_slice::(&bid.data) { + Ok(resp) => { + for mut solution in resp.solutions { + if solution.submission_address != bid.bidder { + spoofed += 1; + tracing::warn!( + bidder = %bid.bidder, + claimed = %solution.submission_address, + "submission_address mismatch, overriding with on-chain bidder", + ); + solution.submission_address = bid.bidder; + } + participants.push(Bid::new(solution)); + } + } + Err(e) => { + malformed += 1; + tracing::warn!(error = %e, bidder = %bid.bidder, "skipping malformed bid"); + } + } + } + if malformed > 0 { + tracing::warn!(malformed, "some bids were malformed and skipped"); + } + if spoofed > 0 { + tracing::warn!(spoofed, "some bids declared a different submission_address"); + } + tracing::info!(num_participants = participants.len(), "fetched bids"); + Ok(participants) + } + + #[instrument( + name = "pod_local_arbitration", + skip_all, + fields(auction_id = %auction_id.0, num_participants = participants.len()), + )] + fn local_arbitration( + &self, + auction_id: Id, + context: &AuctionContext, + participants: Vec>, + ) { + // `auction_id` is read by the `#[instrument]` span fields above. + let _ = auction_id; + let ranked = self + .arbitrator + .arbitrate_with_context(participants, context); + + let (winners, non_winners): (Vec<_>, Vec<_>) = ranked.iter().partition(|b| b.is_winner()); + tracing::info!( + num_winners = winners.len(), + num_non_winners = non_winners.len(), + "local arbitration completed", + ); + for winner in winners { + tracing::info!( + submission_address = %winner.submission_address, + computed_score = ?winner.score(), + "winner selected", + ); + } + } +} + +/// Time remaining until `deadline`. Returns zero if already elapsed. +fn remaining_until(deadline: chrono::DateTime) -> Duration { + (deadline - chrono::Utc::now()) + .to_std() + .unwrap_or(Duration::ZERO) +} + +/// Wrap a future in a hard timeout. On elapse, returns an `anyhow::Error` +/// labelled with `label`; otherwise propagates the inner result with +/// `label` as added context. +async fn with_timeout(label: &'static str, dur: Duration, f: F) -> Result +where + F: Future>, + anyhow::Error: From, +{ + match tokio::time::timeout(dur, f).await { + Ok(r) => r.map_err(|e| anyhow::Error::from(e).context(label)), + Err(_) => Err(anyhow::anyhow!("{label} timed out after {dur:?}")), + } +} + +async fn build_pod_provider(account: &Account, pod_config: &config::Config) -> Result { + let wallet = match account { + Account::PrivateKey(s) => EthereumWallet::from(s.clone()), + Account::Kms(s) => EthereumWallet::from(s.clone()), + Account::Address(addr) => { + anyhow::bail!("address-only account ({addr:?}) cannot sign pod transactions") + } + }; + let signer_address = alloy::network::TxSigner::address(account); + let provider = PodProviderBuilder::with_recommended_settings() + .wallet(wallet) + .on_url(pod_config.endpoint.clone()) + .await?; + + // Diagnostic info for debugging pending-TX issues; don't fail provider + // creation if these RPCs hiccup. + let balance = provider.get_balance(signer_address).await.ok(); + let nonce = provider.get_transaction_count(signer_address).await.ok(); + tracing::info!( + %signer_address, + ?balance, + ?nonce, + "pod provider initialized", + ); + Ok(provider) +} diff --git a/crates/driver/src/infra/pod/mod.rs b/crates/driver/src/infra/pod/mod.rs new file mode 100644 index 0000000000..c02940f784 --- /dev/null +++ b/crates/driver/src/infra/pod/mod.rs @@ -0,0 +1,5 @@ +pub mod config; +pub mod flow; +pub mod recovery; + +pub use flow::PodManager; diff --git a/crates/driver/src/infra/pod/recovery.rs b/crates/driver/src/infra/pod/recovery.rs new file mode 100644 index 0000000000..5dc99c797a --- /dev/null +++ b/crates/driver/src/infra/pod/recovery.rs @@ -0,0 +1,108 @@ +//! Pod network account recovery for locked accounts. +//! +//! Pod accepts at most one in-flight transaction per submitter address. +//! Submitting a second transaction before the first is mined causes pod to +//! mark the account as *locked*; subsequent submissions fail until the +//! account is recovered via `pod_getRecoveryTargetTx` + the recovery +//! precompile. See: +//! +//! +//! TODO(production-pod): the fire-and-forget submit + best-effort recovery +//! shape is shadow-mode-only. Before pod runs the auction for real this must +//! be reworked: queueing semantics for concurrent submissions per solver +//! EOA, retry/backoff policy, observability + alerting on locked-account +//! streaks, and propagation of arbitration results for cross-validation +//! against the autopilot's ranking. + +use { + alloy::{primitives::address, providers::Provider, sol}, + anyhow::{Context, anyhow}, + pod_sdk::{Address, alloy_primitives::B256, provider::PodProvider}, +}; + +const RECOVERY_PRECOMPILE: Address = address!("50d0000000000000000000000000000000000003"); + +sol! { + #[sol(rpc)] + contract Recovery { + function recover(bytes32 txHash, uint64 nonce) public; + } +} + +#[derive(Debug, Clone, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct RecoveryTarget { + tx_hash: B256, + nonce: u64, +} + +/// Attempts to recover a locked pod account. Returns `Ok(true)` on successful +/// recovery, `Ok(false)` if the account is not locked, and `Err` on RPC or +/// transaction failure. +#[tracing::instrument(skip_all, fields(%account))] +pub async fn recover_locked_account( + provider: &PodProvider, + account: Address, +) -> anyhow::Result { + let Some(target) = get_recovery_target(provider, account).await? else { + return Ok(false); + }; + + tracing::info!(tx_hash = %target.tx_hash, nonce = target.nonce, "recovering"); + + let receipt = Recovery::new(RECOVERY_PRECOMPILE, provider) + .recover(target.tx_hash, target.nonce) + .send() + .await + .context("send recovery tx")? + .get_receipt() + .await + .context("recovery tx receipt")?; + + if !receipt.status() { + return Err(anyhow!("recovery tx reverted")); + } + + tracing::info!("recovered"); + Ok(true) +} + +async fn get_recovery_target( + provider: &PodProvider, + account: Address, +) -> anyhow::Result> { + let result = provider + .raw_request::<_, serde_json::Value>("pod_getRecoveryTargetTx".into(), vec![account]) + .await; + + match result { + Ok(value) => Ok(Some( + serde_json::from_value(value).context("decode target")?, + )), + Err(e) => { + let s = e.to_string(); + if s.contains("not locked") || s.contains("no recovery") { + Ok(None) + } else { + Err(anyhow!(s).context("pod_getRecoveryTargetTx")) + } + } + } +} + +/// Detects the "account is locked" condition in an error returned by +/// `AuctionClient::submit_bid`. +/// +/// Pod limits each submitter to one in-flight transaction; when a second +/// submission arrives before the first is mined, pod returns an RPC error +/// containing the substrings matched below. The matcher is brittle: pod-sdk +/// 0.5.1 returns a stringly RPC error with no typed variant, so the wire +/// format is the only thing to match on. Pinning the SDK version gives a +/// future bump a fair chance to break the test and force re-evaluation. +/// +/// On a hit we attempt recovery so the next bid in the auction's hot path +/// can proceed; failing the whole submission would lose the bid for an +/// otherwise-recoverable transient. +pub fn is_account_locked_error(error: &str) -> bool { + error.contains("Another transaction") && error.contains("is still pending") +} diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index f3cc96005a..8f054aba67 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -17,6 +17,7 @@ use { blockchain::Ethereum, config::file::FeeHandler, persistence::{Persistence, S3}, + pod::{self, PodManager}, }, util, }, @@ -100,12 +101,27 @@ pub struct ManageNativeToken { /// Solvers are controlled by the driver. Their job is to search for solutions /// to auctions. They do this in various ways, often by analyzing different AMMs /// on the Ethereum blockchain. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct Solver { client: reqwest::Client, config: Config, eth: Ethereum, persistence: Persistence, + /// Pod-network shadow-mode flow. `None` when pod is unconfigured for this + /// solver; the auction contract handle and the local arbitrator only exist + /// when pod does, so they're paired structurally. + pod_manager: Option, +} + +impl std::fmt::Debug for Solver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Solver") + .field("client", &self.client) + .field("config", &self.config) + .field("eth", &self.eth) + .field("persistence", &self.persistence) + .finish_non_exhaustive() + } } #[derive(Debug, Clone)] @@ -120,7 +136,7 @@ impl TxSigner for Account { fn address(&self) -> Address { match self { Account::PrivateKey(local_signer) => local_signer.address(), - Account::Kms(aws_signer) => aws_signer.address(), + Account::Kms(aws_signer) => TxSigner::::address(aws_signer), Account::Address(address) => *address, } } @@ -222,6 +238,8 @@ pub struct Config { /// Maximum number of solutions the driver proposes to the autopilot per /// auction. When 1 (the default), only the best-scoring solution is sent. pub max_solutions_to_propose: std::num::NonZeroUsize, + /// Pod configuration + pub pod: Option, } impl Solver { @@ -240,6 +258,14 @@ impl Solver { let persistence = Persistence::build(&config).await; + let pod_manager = match config.pod.as_ref() { + Some(pod_config) => { + PodManager::try_new(&config.account, pod_config, eth.contracts().weth_address()) + .await + } + None => None, + }; + Ok(Self { client: reqwest::ClientBuilder::new() .default_headers(headers) @@ -248,6 +274,7 @@ impl Solver { config, eth, persistence, + pod_manager, }) } @@ -332,6 +359,10 @@ impl Solver { self.config.max_solutions_to_propose.get() } + pub fn pod_manager(&self) -> Option<&PodManager> { + self.pod_manager.as_ref() + } + /// Make a POST request instructing the solver to solve an auction. /// Allocates at most `timeout` time for the solving. #[instrument(name = "solver_engine", skip_all)] diff --git a/crates/e2e/Cargo.toml b/crates/e2e/Cargo.toml index 1f5dbaa391..5fc9c3a201 100644 --- a/crates/e2e/Cargo.toml +++ b/crates/e2e/Cargo.toml @@ -49,6 +49,7 @@ model = { workspace = true, features = ["e2e"] } number = { workspace = true } observe = { workspace = true } orderbook = { workspace = true, features = ["e2e", "test-util"] } +pod-sdk = { workspace = true } price-estimation = { workspace = true } reqwest = { workspace = true, features = ["blocking", "query"] } serde = { workspace = true } diff --git a/crates/e2e/src/setup/colocation.rs b/crates/e2e/src/setup/colocation.rs index 54ccde72c2..c5ddd546fb 100644 --- a/crates/e2e/src/setup/colocation.rs +++ b/crates/e2e/src/setup/colocation.rs @@ -150,6 +150,30 @@ pub fn start_driver( ) } +pub fn start_driver_with_pod( + contracts: &Contracts, + solvers: Vec, + liquidity: LiquidityProvider, + quote_using_limit_orders: bool, +) -> JoinHandle<()> { + let pod_snippet = format!( + r#" +[pod] +endpoint = {:?} +auction-contract-address = {:?} +"#, + config::pod::POD_ENDPOINT, + config::pod::POD_AUCTION_CONTRACT, + ); + start_driver_with_config_override( + contracts, + solvers, + liquidity, + quote_using_limit_orders, + Some(&pod_snippet), + ) +} + pub fn start_driver_with_config_override( contracts: &Contracts, solvers: Vec, diff --git a/crates/e2e/src/setup/config/mod.rs b/crates/e2e/src/setup/config/mod.rs new file mode 100644 index 0000000000..d5900237c5 --- /dev/null +++ b/crates/e2e/src/setup/config/mod.rs @@ -0,0 +1 @@ +pub mod pod; diff --git a/crates/e2e/src/setup/config/pod.rs b/crates/e2e/src/setup/config/pod.rs new file mode 100644 index 0000000000..9844ed21f3 --- /dev/null +++ b/crates/e2e/src/setup/config/pod.rs @@ -0,0 +1,2 @@ +pub(crate) static POD_ENDPOINT: &str = "http://cow.pod.network:11600"; +pub(crate) static POD_AUCTION_CONTRACT: &str = "0xeDD0670497E00ded712a398563Ea938A29dD28c7"; diff --git a/crates/e2e/src/setup/mod.rs b/crates/e2e/src/setup/mod.rs index c5277e9a17..6bd2cf396b 100644 --- a/crates/e2e/src/setup/mod.rs +++ b/crates/e2e/src/setup/mod.rs @@ -1,6 +1,8 @@ pub mod colocation; +mod config; mod deploy; pub mod onchain_components; +pub mod pod; pub mod proxy; mod services; mod solver; @@ -175,6 +177,30 @@ pub async fn run_forked_test_with_extra_filters_and_block_number( run(f, extra_filters, Some((fork_url, Some(block_number)))).await } +/// Run a pod flow test. This is the same as `run_test` but signals that +/// pod config should be enabled for the driver. +/// Tests using this should be prefixed with `pod_` and marked with `#[ignore]`. +pub async fn run_pod_test(f: F) +where + F: FnOnce(Web3) -> Fut, + Fut: Future, +{ + // Verbose logging for pod flow debugging + run( + f, + [ + "pod=debug", + "driver=debug", + "driver::domain::competition=debug", + "driver::infra::solver=debug", + "autopilot=debug", + "autopilot::run=debug", + ], + None, + ) + .await +} + async fn run( f: F, filters: impl IntoIterator, diff --git a/crates/e2e/src/setup/pod.rs b/crates/e2e/src/setup/pod.rs new file mode 100644 index 0000000000..711fc75e6d --- /dev/null +++ b/crates/e2e/src/setup/pod.rs @@ -0,0 +1,73 @@ +use { + alloy::{network::EthereumWallet, primitives::Address, signers::local::PrivateKeySigner}, + pod_sdk::{auctions::client::AuctionClient, provider::PodProviderBuilder}, + url::Url, +}; + +/// Pod network configuration for tests +pub struct PodConfig { + pub endpoint: Url, + pub auction_contract: Address, +} + +impl Default for PodConfig { + fn default() -> Self { + Self { + endpoint: super::config::pod::POD_ENDPOINT.parse().unwrap(), + auction_contract: super::config::pod::POD_AUCTION_CONTRACT.parse().unwrap(), + } + } +} + +/// Client for querying pod network state in tests +pub struct PodTestClient { + client: AuctionClient, +} + +/// Information about a bid fetched from pod network +#[derive(Debug, Clone)] +pub struct PodBidInfo { + pub submission_address: Address, + pub score: pod_sdk::U256, + pub data_len: usize, +} + +impl PodTestClient { + /// Create a new pod test client with default configuration + pub async fn new() -> anyhow::Result { + Self::with_config(PodConfig::default()).await + } + + /// Create a new pod test client with custom configuration + pub async fn with_config(config: PodConfig) -> anyhow::Result { + // Create a dummy signer for read-only operations + // We only need this to satisfy the provider builder requirements + let dummy_signer = PrivateKeySigner::random(); + let wallet = EthereumWallet::from(dummy_signer); + + let provider = PodProviderBuilder::with_recommended_settings() + .wallet(wallet) + .on_url(config.endpoint) + .await?; + + let client = AuctionClient::new(provider, config.auction_contract); + Ok(Self { client }) + } + + /// Fetch all bids for a given auction ID. Should only be called after + /// the auction deadline has passed. + pub async fn fetch_bids(&self, auction_id: i64) -> anyhow::Result> { + let bids = self + .client + .fetch_bids(pod_sdk::U256::from(auction_id as u64)) + .await?; + Ok(bids + .into_iter() + .map(|bid| PodBidInfo { + submission_address: bid.bidder, + score: bid.amount, + data_len: bid.data.len(), + }) + .collect()) + } +} diff --git a/crates/e2e/src/setup/services.rs b/crates/e2e/src/setup/services.rs index 10210baf7b..caccf6a29b 100644 --- a/crates/e2e/src/setup/services.rs +++ b/crates/e2e/src/setup/services.rs @@ -236,6 +236,97 @@ impl<'a> Services<'a> { .await; } + /// Starts a basic version of the protocol with pod flow enabled. + /// Use this for pod_* prefixed tests with a single solver. + pub async fn start_protocol_with_pod(&self, solver: TestAccount) { + self.start_protocol_with_pod_solvers(vec![(solver, 0)]) // 0 = no haircut + .await; + } + + /// Starts the protocol with multiple solvers, all with pod enabled. + /// Each solver has a haircut_bps value to differentiate scores. + /// Use this for testing pod winner selection with competing solvers. + pub async fn start_protocol_with_pod_multi_solver( + &self, + solvers: Vec<(TestAccount, u32)>, // (solver_account, haircut_bps) + ) { + self.start_protocol_with_pod_solvers(solvers).await; + } + + /// Internal helper: starts protocol with pod-enabled driver for given + /// solvers. + async fn start_protocol_with_pod_solvers( + &self, + solvers: Vec<(TestAccount, u32)>, // (solver_account, haircut_bps) + ) { + use configs::autopilot::solver::Solver; + + let solver_engines: Vec = + futures::future::join_all(solvers.iter().enumerate().map( + |(i, (solver, haircut_bps))| { + let name = if i == 0 { + "test_solver".to_string() + } else { + format!("solver_{}", i + 1) + }; + colocation::start_baseline_solver_with_haircut( + name, + solver.clone(), + *self.contracts.weth.address(), + vec![], // no special base tokens needed + 2, + true, + *haircut_bps, + ) + }, + )) + .await; + + let driver_solvers: Vec = solver_engines + .iter() + .map(|e| Solver::test(&e.name, e.account.address())) + .collect(); + + colocation::start_driver_with_pod( + self.contracts, + solver_engines, + colocation::LiquidityProvider::UniswapV2, + false, + ); + + // Wait for driver to be ready before proceeding. + // The driver with pod config may take longer to start due to pod network + // connection. + Self::wait_for_driver_to_come_up().await; + + let test_quoter = ExternalSolver::new("test_quoter", "http://localhost:11088/test_solver"); + + let autopilot_config = Configuration { + drivers: driver_solvers, + order_quoting: OrderQuoting::test_with_drivers(vec![test_quoter.clone()]), + shared: SharedConfig { + gas_estimators: vec![GasEstimatorType::Driver { + url: Url::from_str("http://localhost:11088/gasprice").unwrap(), + }], + ..Default::default() + }, + ..Configuration::test_no_drivers() + }; + let orderbook_config = configs::orderbook::Configuration { + order_quoting: OrderQuoting::test_with_drivers(vec![test_quoter]), + shared: SharedConfig { + gas_estimators: vec![GasEstimatorType::Driver { + url: Url::from_str("http://localhost:11088/gasprice").unwrap(), + }], + ..Default::default() + }, + ..configs::orderbook::Configuration::test_default() + }; + + self.start_autopilot(None, autopilot_config).await; + self.start_api(orderbook_config).await; + } + pub async fn start_protocol_with_args( &self, autopilot_config: configs::autopilot::Configuration, @@ -422,6 +513,16 @@ impl<'a> Services<'a> { .expect("waiting for API timed out"); } + async fn wait_for_driver_to_come_up() { + const DRIVER_HOST: &str = "http://localhost:11088"; + let is_up = || async { reqwest::get(format!("{DRIVER_HOST}/healthz")).await.is_ok() }; + + tracing::info!("Waiting for driver to come up."); + wait_for_condition(TIMEOUT, is_up) + .await + .expect("waiting for driver timed out"); + } + async fn wait_until_autopilot_ready(&self) { let is_up = || async { let mut db = self.db.acquire().await.unwrap(); diff --git a/crates/e2e/tests/e2e/main.rs b/crates/e2e/tests/e2e/main.rs index 9711aee7e6..f4f4fbb9ba 100644 --- a/crates/e2e/tests/e2e/main.rs +++ b/crates/e2e/tests/e2e/main.rs @@ -33,6 +33,7 @@ mod partial_fill; mod partially_fillable_balance; mod partially_fillable_pool; mod place_order_with_quote; +mod pod; mod protocol_fee; mod quote_verification; mod quoting; diff --git a/crates/e2e/tests/e2e/pod.rs b/crates/e2e/tests/e2e/pod.rs new file mode 100644 index 0000000000..4c3f822b84 --- /dev/null +++ b/crates/e2e/tests/e2e/pod.rs @@ -0,0 +1,706 @@ +//! Pod-network end-to-end tests. +//! +//! Each test is `#[ignore]`d so `cargo nextest run` skips it by default. +//! Run them manually with `just test-pod`. They hit the live pod network +//! at `cow.pod.network:11600`, so they need outbound connectivity to that +//! host and should not run in CI without a dedicated environment. + +use { + bigdecimal::Zero, + e2e::setup::{pod::PodTestClient, wait_for_condition, *}, + ethrpc::alloy::CallBuilderExt, + model::{ + order::{OrderCreation, OrderKind}, + quote::{OrderQuoteRequest, OrderQuoteSide, SellAmount}, + signature::EcdsaSigningScheme, + }, + number::{nonzero::NonZeroU256, units::EthUnit}, + pod_sdk::alloy_primitives::U256, + shared::web3::Web3, +}; + +/// Basic pod test - single order, single solver. +/// Verifies the fundamental pod flow: bid submission, auction end, and winner +/// selection. +#[tokio::test] +#[ignore] +async fn pod_test_basic() { + run_pod_test(pod_basic_test).await; +} + +async fn pod_basic_test(web3: Web3) { + tracing::info!("Setting up chain state for basic pod test."); + let mut onchain = OnchainComponents::deploy(web3.clone()).await; + + let [solver] = onchain.make_solvers(10u64.eth()).await; + let [trader] = onchain.make_accounts(10u64.eth()).await; + let [token] = onchain + .deploy_tokens_with_weth_uni_v2_pools(1_000u64.eth(), 1_000u64.eth()) + .await; + + tracing::info!(?solver, "Created solver account"); + tracing::info!(?trader, "Created trader account"); + tracing::info!(token_address = ?token.address(), "Deployed test token with UniV2 pool"); + + // Approve and deposit WETH for trader + onchain + .contracts() + .weth + .approve(onchain.contracts().allowance, 3u64.eth()) + .from(trader.address()) + .send_and_watch() + .await + .unwrap(); + onchain + .contracts() + .weth + .deposit() + .from(trader.address()) + .value(3u64.eth()) + .send_and_watch() + .await + .unwrap(); + tracing::info!("Trader approved and deposited 3 ETH as WETH"); + + tracing::info!("Starting services with pod-enabled driver."); + let services = Services::new(&onchain).await; + services.start_protocol_with_pod(solver.clone()).await; + tracing::info!("Services started - driver has pod config enabled"); + + tracing::info!("Submitting quote request"); + let quote_sell_amount = 1u64.eth(); + let quote_request = OrderQuoteRequest { + from: trader.address(), + sell_token: *onchain.contracts().weth.address(), + buy_token: *token.address(), + side: OrderQuoteSide::Sell { + sell_amount: SellAmount::BeforeFee { + value: NonZeroU256::try_from(quote_sell_amount).unwrap(), + }, + }, + ..Default::default() + }; + let quote_response = services.submit_quote("e_request).await.unwrap(); + tracing::info!( + quote_id = ?quote_response.id, + buy_amount = ?quote_response.quote.buy_amount, + "Got quote response" + ); + + tracing::info!("Placing order"); + let order = OrderCreation { + quote_id: quote_response.id, + sell_token: *onchain.contracts().weth.address(), + sell_amount: quote_sell_amount, + buy_token: *token.address(), + buy_amount: quote_response.quote.buy_amount, + valid_to: model::time::now_in_epoch_seconds() + 300, + kind: OrderKind::Sell, + ..Default::default() + } + .sign( + EcdsaSigningScheme::Eip712, + &onchain.contracts().domain_separator, + &trader.signer, + ); + let order_uid = services.create_order(&order).await.unwrap(); + tracing::info!(?order_uid, "Order created successfully"); + + // Wait for order to appear in auction + tracing::info!("Waiting for order to appear in auction..."); + wait_for_condition(TIMEOUT, || async { + onchain.mint_block().await; + let auction = services.get_auction().await; + let has_order = !auction.auction.orders.is_empty(); + if has_order { + tracing::info!( + auction_id = auction.id, + num_orders = auction.auction.orders.len(), + "Order appeared in auction" + ); + } + has_order + }) + .await + .expect("Order should appear in auction"); + + // Now wait for trade to happen - this triggers the full auction flow including + // pod + tracing::info!("Waiting for trade execution (pod flow should trigger)..."); + wait_for_condition(TIMEOUT, || async { + onchain.mint_block().await; + let order = services.get_order(&order_uid).await.unwrap(); + let executed = !order.metadata.executed_buy_amount.is_zero(); + if executed { + tracing::info!( + executed_buy_amount = ?order.metadata.executed_buy_amount, + executed_sell_amount = ?order.metadata.executed_sell_amount, + "Trade executed!" + ); + } + executed + }) + .await + .expect("Trade should execute"); + + // Verify solver competition data from autopilot + let competition = services + .get_latest_solver_competition() + .await + .expect("Should have solver competition data"); + + // Verify auction had our order + assert_eq!( + competition.auction.orders.len(), + 1, + "Auction should have exactly 1 order" + ); + + // Verify a winner was selected by autopilot + let winners: Vec<_> = competition + .solutions + .iter() + .filter(|s| s.is_winner) + .collect(); + assert_eq!(winners.len(), 1, "Should have exactly 1 winner"); + + let autopilot_winner = winners[0]; + assert_eq!( + autopilot_winner.solver_address, + solver.address(), + "Autopilot winner should be our solver" + ); + assert!( + !autopilot_winner.score.is_zero(), + "Winner should have non-zero score" + ); + + // === POD NETWORK VERIFICATION === + tracing::info!( + auction_id = competition.auction_id, + "Querying pod network for bids..." + ); + + let pod_client = PodTestClient::new() + .await + .expect("Should be able to connect to pod network"); + + let pod_bids = pod_client + .fetch_bids(competition.auction_id) + .await + .expect("Should be able to fetch bids from pod network"); + + tracing::info!( + auction_id = competition.auction_id, + num_pod_bids = pod_bids.len(), + solver = %solver.address(), + "Fetched bids from pod network" + ); + + // Verify driver submitted a bid to pod network + assert!( + !pod_bids.is_empty(), + "Driver should have submitted at least 1 bid to pod network" + ); + + let solver_bid = pod_bids + .iter() + .find(|b| b.submission_address == solver.address()) + .unwrap_or_else(|| { + panic!( + "solver {} should have a bid in pod network", + solver.address() + ) + }); + + // Verify the autopilot winner matches the pod bid submitter + assert_eq!( + autopilot_winner.solver_address, solver_bid.submission_address, + "Autopilot winner should match pod bid submitter" + ); +} + +/// Multi-order pod test - tests that multiple orders in a single auction +/// are properly handled by the pod flow and winner selection logic. +#[tokio::test] +#[ignore] +async fn pod_test_multi_order() { + run_pod_test(pod_multi_order_test).await; +} + +async fn pod_multi_order_test(web3: Web3) { + tracing::info!("Setting up chain state for pod multi-order test."); + let mut onchain = OnchainComponents::deploy(web3.clone()).await; + + let [solver] = onchain.make_solvers(10u64.eth()).await; + let [trader_a, trader_b] = onchain.make_accounts(10u64.eth()).await; + // Deploy two tokens with separate pools for different trading pairs + let [token_a, token_b] = onchain + .deploy_tokens_with_weth_uni_v2_pools(1_000u64.eth(), 1_000u64.eth()) + .await; + + tracing::info!(?solver, "Created solver account"); + tracing::info!(?trader_a, "Created trader A account"); + tracing::info!(?trader_b, "Created trader B account"); + tracing::info!(token_a = ?token_a.address(), token_b = ?token_b.address(), "Deployed test tokens"); + + // Setup trader A: approve and deposit WETH + onchain + .contracts() + .weth + .approve(onchain.contracts().allowance, 5u64.eth()) + .from(trader_a.address()) + .send_and_watch() + .await + .unwrap(); + onchain + .contracts() + .weth + .deposit() + .from(trader_a.address()) + .value(5u64.eth()) + .send_and_watch() + .await + .unwrap(); + tracing::info!("Trader A approved and deposited 5 ETH as WETH"); + + // Setup trader B: approve and deposit WETH + onchain + .contracts() + .weth + .approve(onchain.contracts().allowance, 5u64.eth()) + .from(trader_b.address()) + .send_and_watch() + .await + .unwrap(); + onchain + .contracts() + .weth + .deposit() + .from(trader_b.address()) + .value(5u64.eth()) + .send_and_watch() + .await + .unwrap(); + tracing::info!("Trader B approved and deposited 5 ETH as WETH"); + + tracing::info!("Starting services with pod-enabled driver."); + let services = Services::new(&onchain).await; + services.start_protocol_with_pod(solver.clone()).await; + tracing::info!("Services started - driver has pod config enabled"); + + // Get quotes and create orders for both traders + let sell_amount_a = 1u64.eth(); + let quote_a = OrderQuoteRequest { + from: trader_a.address(), + sell_token: *onchain.contracts().weth.address(), + buy_token: *token_a.address(), + side: OrderQuoteSide::Sell { + sell_amount: SellAmount::BeforeFee { + value: NonZeroU256::try_from(sell_amount_a).unwrap(), + }, + }, + ..Default::default() + }; + let quote_response_a = services.submit_quote("e_a).await.unwrap(); + tracing::info!( + quote_id = ?quote_response_a.id, + buy_amount = ?quote_response_a.quote.buy_amount, + "Got quote for trader A (WETH -> token_a)" + ); + + let sell_amount_b = 2u64.eth(); + let quote_b = OrderQuoteRequest { + from: trader_b.address(), + sell_token: *onchain.contracts().weth.address(), + buy_token: *token_b.address(), + side: OrderQuoteSide::Sell { + sell_amount: SellAmount::BeforeFee { + value: NonZeroU256::try_from(sell_amount_b).unwrap(), + }, + }, + ..Default::default() + }; + let quote_response_b = services.submit_quote("e_b).await.unwrap(); + tracing::info!( + quote_id = ?quote_response_b.id, + buy_amount = ?quote_response_b.quote.buy_amount, + "Got quote for trader B (WETH -> token_b)" + ); + + // Place order A + tracing::info!("Placing order A"); + let order_a = OrderCreation { + quote_id: quote_response_a.id, + sell_token: *onchain.contracts().weth.address(), + sell_amount: sell_amount_a, + buy_token: *token_a.address(), + buy_amount: quote_response_a.quote.buy_amount, + valid_to: model::time::now_in_epoch_seconds() + 300, + kind: OrderKind::Sell, + ..Default::default() + } + .sign( + EcdsaSigningScheme::Eip712, + &onchain.contracts().domain_separator, + &trader_a.signer, + ); + let order_uid_a = services.create_order(&order_a).await.unwrap(); + tracing::info!(?order_uid_a, "Order A created"); + + // Place order B + tracing::info!("Placing order B"); + let order_b = OrderCreation { + quote_id: quote_response_b.id, + sell_token: *onchain.contracts().weth.address(), + sell_amount: sell_amount_b, + buy_token: *token_b.address(), + buy_amount: quote_response_b.quote.buy_amount, + valid_to: model::time::now_in_epoch_seconds() + 300, + kind: OrderKind::Sell, + ..Default::default() + } + .sign( + EcdsaSigningScheme::Eip712, + &onchain.contracts().domain_separator, + &trader_b.signer, + ); + let order_uid_b = services.create_order(&order_b).await.unwrap(); + tracing::info!(?order_uid_b, "Order B created"); + + // Wait for both orders to appear in auction + tracing::info!("Waiting for both orders to appear in auction..."); + wait_for_condition(TIMEOUT, || async { + onchain.mint_block().await; + let auction = services.get_auction().await; + let num_orders = auction.auction.orders.len(); + if num_orders == 2 { + tracing::info!( + auction_id = auction.id, + num_orders, + "Both orders appeared in auction" + ); + } + num_orders == 2 + }) + .await + .expect("Both orders should appear in auction"); + + // Wait for both trades to execute - this triggers pod flow with multiple orders + tracing::info!("Waiting for both trades to execute (pod multi-order flow)..."); + wait_for_condition(TIMEOUT, || async { + onchain.mint_block().await; + let order_a_status = services.get_order(&order_uid_a).await.unwrap(); + let order_b_status = services.get_order(&order_uid_b).await.unwrap(); + + let a_executed = !order_a_status.metadata.executed_buy_amount.is_zero(); + let b_executed = !order_b_status.metadata.executed_buy_amount.is_zero(); + + if a_executed { + tracing::info!( + order = "A", + executed_buy = ?order_a_status.metadata.executed_buy_amount, + executed_sell = ?order_a_status.metadata.executed_sell_amount, + "Order A executed" + ); + } + if b_executed { + tracing::info!( + order = "B", + executed_buy = ?order_b_status.metadata.executed_buy_amount, + executed_sell = ?order_b_status.metadata.executed_sell_amount, + "Order B executed" + ); + } + + a_executed && b_executed + }) + .await + .expect("Both trades should execute"); + + // Verify solver competition data from autopilot + let competition = services + .get_latest_solver_competition() + .await + .expect("Should have solver competition data"); + + // Verify auction had both orders + assert_eq!( + competition.auction.orders.len(), + 2, + "Auction should have exactly 2 orders" + ); + + // Verify a winner was selected by autopilot + let winners: Vec<_> = competition + .solutions + .iter() + .filter(|s| s.is_winner) + .collect(); + assert_eq!(winners.len(), 1, "Should have exactly 1 winner"); + + let autopilot_winner = winners[0]; + assert_eq!( + autopilot_winner.solver_address, + solver.address(), + "Autopilot winner should be our solver" + ); + + // Verify the winning solution contains both orders + assert_eq!( + autopilot_winner.orders.len(), + 2, + "Winning solution should contain both orders" + ); + + // === POD NETWORK VERIFICATION === + tracing::info!( + auction_id = competition.auction_id, + "Querying pod network for multi-order auction..." + ); + + let pod_client = PodTestClient::new() + .await + .expect("Should be able to connect to pod network"); + + let pod_bids = pod_client + .fetch_bids(competition.auction_id) + .await + .expect("Should be able to fetch bids from pod network"); + + assert!( + !pod_bids.is_empty(), + "Driver should have submitted bid to pod network" + ); + + pod_bids + .iter() + .find(|b| b.submission_address == solver.address()) + .expect("our solver should have bid in pod network"); +} + +/// Multi-solver pod test - tests that multiple solvers competing in the same +/// auction have their bids properly submitted to pod and winner selection +/// works. Each solver knows about different liquidity routes, leading to +/// different scores. +#[tokio::test] +#[ignore] +async fn pod_test_multi_solver() { + run_pod_test(pod_multi_solver_test).await; +} + +async fn pod_multi_solver_test(web3: Web3) { + tracing::info!("Setting up chain state for pod multi-solver test."); + let mut onchain = OnchainComponents::deploy(web3.clone()).await; + + let [solver_a, solver_b] = onchain.make_solvers(10u64.eth()).await; + let [trader] = onchain.make_accounts(10u64.eth()).await; + + // Deploy token with WETH pool + let [token] = onchain + .deploy_tokens_with_weth_uni_v2_pools(1_000u64.eth(), 1_000u64.eth()) + .await; + + tracing::info!(?solver_a, "Created solver A account (no haircut)"); + tracing::info!( + ?solver_b, + "Created solver B account (with haircut -> lower score)" + ); + tracing::info!(?trader, "Created trader account"); + tracing::info!( + token = ?token.address(), + "Deployed token with WETH pool - solvers differentiated by haircut" + ); + + // Setup trader: approve and deposit WETH + onchain + .contracts() + .weth + .approve(onchain.contracts().allowance, 5u64.eth()) + .from(trader.address()) + .send_and_watch() + .await + .unwrap(); + onchain + .contracts() + .weth + .deposit() + .from(trader.address()) + .value(5u64.eth()) + .send_and_watch() + .await + .unwrap(); + tracing::info!("Trader approved and deposited 5 ETH as WETH"); + + tracing::info!("Starting services with pod-enabled multi-solver driver."); + let services = Services::new(&onchain).await; + + // Solver A: no haircut, Solver B: 1% haircut (100 bps) for score + // differentiation + services + .start_protocol_with_pod_multi_solver(vec![(solver_a.clone(), 0), (solver_b.clone(), 100)]) + .await; + tracing::info!("Services started - solver_a vs solver_b"); + + // Get quote and create order + let sell_amount = 1u64.eth(); + let quote = OrderQuoteRequest { + from: trader.address(), + sell_token: *onchain.contracts().weth.address(), + buy_token: *token.address(), + side: OrderQuoteSide::Sell { + sell_amount: SellAmount::BeforeFee { + value: NonZeroU256::try_from(sell_amount).unwrap(), + }, + }, + ..Default::default() + }; + let quote_response = services.submit_quote("e).await.unwrap(); + tracing::info!( + quote_id = ?quote_response.id, + buy_amount = ?quote_response.quote.buy_amount, + "Got quote" + ); + + // Place order with 5% slippage tolerance to accommodate haircut differences + let min_buy = quote_response.quote.buy_amount * U256::from(95) / U256::from(100); + tracing::info!("Placing order"); + let order = OrderCreation { + quote_id: quote_response.id, + sell_token: *onchain.contracts().weth.address(), + sell_amount, + buy_token: *token.address(), + buy_amount: min_buy, + valid_to: model::time::now_in_epoch_seconds() + 300, + kind: OrderKind::Sell, + ..Default::default() + } + .sign( + EcdsaSigningScheme::Eip712, + &onchain.contracts().domain_separator, + &trader.signer, + ); + let order_uid = services.create_order(&order).await.unwrap(); + tracing::info!(?order_uid, "Order created"); + + // Wait for order to appear in auction + tracing::info!("Waiting for order to appear in auction..."); + wait_for_condition(TIMEOUT, || async { + onchain.mint_block().await; + let auction = services.get_auction().await; + let has_order = !auction.auction.orders.is_empty(); + if has_order { + tracing::info!( + auction_id = auction.id, + num_orders = auction.auction.orders.len(), + "Order appeared in auction" + ); + } + has_order + }) + .await + .expect("Order should appear in auction"); + + // Wait for trade to execute - this triggers pod flow with multiple solvers + // competing + tracing::info!("Waiting for trade execution (pod multi-solver competition)..."); + wait_for_condition(TIMEOUT, || async { + onchain.mint_block().await; + let order_status = services.get_order(&order_uid).await.unwrap(); + let executed = !order_status.metadata.executed_buy_amount.is_zero(); + if executed { + tracing::info!( + executed_buy = ?order_status.metadata.executed_buy_amount, + executed_sell = ?order_status.metadata.executed_sell_amount, + "Trade executed" + ); + } + executed + }) + .await + .expect("Trade should execute"); + + // Verify solver competition data from autopilot + let competition = services + .get_latest_solver_competition() + .await + .expect("Should have solver competition data"); + + // Verify auction had our order + assert_eq!( + competition.auction.orders.len(), + 1, + "Auction should have exactly 1 order" + ); + + // Verify we have multiple solutions from different solvers + assert!( + competition.solutions.len() >= 2, + "Should have at least 2 solutions from different solvers" + ); + + // Verify exactly one winner was selected by autopilot + let winners: Vec<_> = competition + .solutions + .iter() + .filter(|s| s.is_winner) + .collect(); + assert_eq!(winners.len(), 1, "Should have exactly 1 winner"); + + let autopilot_winner = winners[0]; + assert!( + !autopilot_winner.score.is_zero(), + "Winner should have non-zero score" + ); + + // The winner should be one of our solvers + let valid_solvers = [solver_a.address(), solver_b.address()]; + assert!( + valid_solvers.contains(&autopilot_winner.solver_address), + "Winner should be one of our solvers" + ); + + // === POD NETWORK VERIFICATION === + tracing::info!( + auction_id = competition.auction_id, + "Querying pod network for multi-solver auction..." + ); + + let pod_client = PodTestClient::new() + .await + .expect("Should be able to connect to pod network"); + + let pod_bids = pod_client + .fetch_bids(competition.auction_id) + .await + .expect("Should be able to fetch bids from pod network"); + + // Verify both solvers submitted bids to pod network + assert!( + pod_bids.len() >= 2, + "Both solvers should have submitted bids to pod network, got {}", + pod_bids.len() + ); + + assert!( + pod_bids + .iter() + .any(|b| b.submission_address == solver_a.address()), + "solver A should have submitted bid to pod network", + ); + assert!( + pod_bids + .iter() + .any(|b| b.submission_address == solver_b.address()), + "solver B should have submitted bid to pod network", + ); + + let pod_winner = pod_bids + .iter() + .max_by_key(|b| b.score) + .expect("pod_bids is non-empty per assertion above"); + assert_eq!( + autopilot_winner.solver_address, pod_winner.submission_address, + "autopilot winner should match pod network winner" + ); +} diff --git a/crates/winner-selection/Cargo.toml b/crates/winner-selection/Cargo.toml index d8e3f9f559..08b161a12e 100644 --- a/crates/winner-selection/Cargo.toml +++ b/crates/winner-selection/Cargo.toml @@ -10,3 +10,6 @@ itertools = { workspace = true } number = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } + +[lints] +workspace = true diff --git a/crates/winner-selection/src/arbitrator.rs b/crates/winner-selection/src/arbitrator.rs index 3a41651732..8fc5cd0a27 100644 --- a/crates/winner-selection/src/arbitrator.rs +++ b/crates/winner-selection/src/arbitrator.rs @@ -23,6 +23,7 @@ use { }; /// Auction arbitrator responsible for selecting winning solutions. +#[derive(Clone)] pub struct Arbitrator { /// Maximum number of winning solutions to select. pub max_winners: usize, @@ -599,6 +600,73 @@ impl Arbitrator { winners } + /// Pairs each item with its winsel solution, sorts deterministically by + /// `canonical_hash`, runs arbitration, and returns the ranking together + /// with a `SolutionKey -> T` map so callers can rejoin their domain bids + /// with the ranked output. + /// + /// Sorting before arbitration is what lets independent observers + /// (autopilot, driver, third-party verifiers) reach the same + /// tie-breaking decision on the same logical solution set. + pub fn arbitrate_paired( + &self, + items: Vec<(T, Solution)>, + context: &AuctionContext, + ) -> (Ranking, HashMap) { + let mut paired = items; + paired.sort_by_cached_key(|(_, solution)| solution.canonical_hash()); + let mut by_key = HashMap::with_capacity(paired.len()); + let mut solutions = Vec::with_capacity(paired.len()); + for (item, solution) in paired { + by_key.insert(SolutionKey::from(&solution), item); + solutions.push(solution); + } + (self.arbitrate(solutions, context), by_key) + } + + /// Runs arbitration and rejoins each ranked and filtered-out solution + /// to its original input `T`. Also computes reference scores. Output + /// solutions that cannot be matched back to an input are counted in + /// `orphans` rather than panicking. + /// + /// A non-zero `orphans` count means two inputs shared the same + /// `SolutionKey`. Callers should log and alert on this, not treat it + /// as fatal. + pub fn arbitrate_paired_and_rejoin( + &self, + items: Vec<(T, Solution)>, + context: &AuctionContext, + ) -> Rejoined { + let (ranking, mut by_key) = self.arbitrate_paired(items, context); + let reference_scores = self.compute_reference_scores(&ranking); + + let mut orphans = 0; + let mut rejoin = |s: Solution| -> Option<(T, Solution)> { + let key = SolutionKey::from(&s); + match by_key.remove(&key) { + Some(t) => Some((t, s)), + None => { + orphans += 1; + None + } + } + }; + + let filtered_out = ranking + .filtered_out + .into_iter() + .filter_map(&mut rejoin) + .collect(); + let ranked = ranking.ranked.into_iter().filter_map(&mut rejoin).collect(); + + Rejoined { + filtered_out, + ranked, + reference_scores, + orphans, + } + } + /// Compute reference scores for winning solvers. #[instrument(skip_all)] pub fn compute_reference_scores(&self, ranking: &Ranking) -> HashMap { @@ -713,11 +781,40 @@ struct PriceLimits { buy: U256, } -/// Key to uniquely identify every solution. -#[derive(Clone, Copy, PartialEq, Eq, Hash)] -struct SolutionKey { - solver: Address, - solution_id: u64, +/// Result of [`Arbitrator::arbitrate_paired_and_rejoin`]. +#[derive(Debug)] +pub struct Rejoined { + /// Solutions filtered out as unfair, paired with their original input. + pub filtered_out: Vec<(T, Solution)>, + /// Ranked solutions (winners + non-winners), paired with their original + /// input, ordered as produced by [`Arbitrator::arbitrate`]. + pub ranked: Vec<(T, Solution)>, + /// Reference scores per winning solver. + pub reference_scores: HashMap, + /// Number of arbitrator-returned solutions whose `SolutionKey` could + /// not be matched back to an input item. Non-zero indicates a + /// `SolutionKey` collision in the input set. + pub orphans: usize, +} + +/// Key to uniquely identify every solution within an auction. +/// +/// Two solutions submitted by the same solver share an `auction-scoped` id +/// (`solution_id`); two solutions from different solvers may collide on +/// `solution_id` but never on `(solver, solution_id)`. +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub struct SolutionKey { + pub solver: Address, + pub solution_id: u64, +} + +impl From<&Solution> for SolutionKey { + fn from(solution: &Solution) -> Self { + Self { + solver: solution.solver(), + solution_id: solution.id(), + } + } } /// Scores of all trades in a solution aggregated by the directional diff --git a/crates/winner-selection/src/bid.rs b/crates/winner-selection/src/bid.rs new file mode 100644 index 0000000000..65f70eaa2d --- /dev/null +++ b/crates/winner-selection/src/bid.rs @@ -0,0 +1,71 @@ +//! Generic typestate bid wrapper. +//! +//! `Bid` pairs a domain payload `P` with a winner-selection state +//! marker. `P` is exposed via `Deref` so methods on the payload +//! are callable through the bid. Used together with +//! [`crate::Arbitrator::arbitrate_paired_and_rejoin`] to share the typestate +//! and rejoin glue across domains. + +use crate::state::{HasState, Unscored}; + +pub struct Bid { + payload: P, + state: State, +} + +impl Clone for Bid { + fn clone(&self) -> Self { + Self { + payload: self.payload.clone(), + state: self.state.clone(), + } + } +} + +impl std::fmt::Debug for Bid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Bid") + .field("payload", &self.payload) + .field("state", &self.state) + .finish() + } +} + +impl Bid { + pub fn payload(&self) -> &P { + &self.payload + } +} + +impl

Bid { + pub fn new(payload: P) -> Self { + Self { + payload, + state: Unscored, + } + } +} + +impl std::ops::Deref for Bid { + type Target = P; + + fn deref(&self) -> &P { + &self.payload + } +} + +impl HasState for Bid { + type Next = Bid; + type State = State; + + fn with_state(self, state: NewState) -> Self::Next { + Bid { + payload: self.payload, + state, + } + } + + fn state(&self) -> &Self::State { + &self.state + } +} diff --git a/crates/winner-selection/src/lib.rs b/crates/winner-selection/src/lib.rs index df1c34588f..df221d24b3 100644 --- a/crates/winner-selection/src/lib.rs +++ b/crates/winner-selection/src/lib.rs @@ -7,14 +7,16 @@ pub mod arbitrator; pub mod auction; +pub mod bid; pub mod primitives; pub mod solution; pub mod state; // Re-export key types for convenience pub use { - arbitrator::{Arbitrator, Ranking}, + arbitrator::{Arbitrator, Ranking, Rejoined, SolutionKey}, auction::AuctionContext, + bid::Bid, primitives::{Address, DirectedTokenPair, OrderUid, Side, U256}, solution::{Order, RankType, Ranked, Scored, Solution, Unscored}, }; diff --git a/crates/winner-selection/src/solution.rs b/crates/winner-selection/src/solution.rs index f3f816fa1b..01caa1c665 100644 --- a/crates/winner-selection/src/solution.rs +++ b/crates/winner-selection/src/solution.rs @@ -1,7 +1,9 @@ -//! Minimal solution and order data structures. +//! Solution and order types for winner selection. //! -//! These structs contain only the data needed for winner selection, -//! making them small enough to efficiently send to/from the Pod Service. +//! `Solution` carries both the inputs read by the arbitrator and the +//! data covered by `canonical_hash`, so independent observers (autopilot, +//! driver, third-party verifiers) tie-break identically on the same +//! solution. pub use state::{RankType, Unscored}; use { @@ -9,7 +11,8 @@ use { primitives::{OrderUid, Side}, state, }, - alloy_primitives::{Address, U256}, + alloy_primitives::{Address, U256, keccak256}, + std::collections::HashMap, }; pub type Scored = state::Scored; pub type Ranked = state::Ranked; @@ -29,6 +32,10 @@ pub struct Solution { /// Orders executed in this solution. orders: Vec, + /// Clearing prices keyed by token address. Not used by arbitration; + /// included so `canonical_hash` fingerprints them. + prices: HashMap, + /// State marker (score and ranking information). state: State, } @@ -48,6 +55,51 @@ impl Solution { pub fn orders(&self) -> &[Order] { &self.orders } + + /// Get the clearing prices. + pub fn prices(&self) -> &HashMap { + &self.prices + } + + /// Deterministic 32-byte fingerprint of the solution payload. + /// + /// The hash is independent of the order in which `orders` and `prices` + /// were inserted: both are sorted by their key (order uid / token + /// address) before encoding. This is what lets the autopilot, the + /// driver, and any third-party verifier reach the same tie-breaking + /// decision when running the arbitrator over the same logical bids. + pub fn canonical_hash(&self) -> [u8; 32] { + let mut buf = Vec::new(); + buf.extend_from_slice(&self.id.to_be_bytes()); + buf.extend_from_slice(self.solver.as_slice()); + + let mut orders: Vec<&Order> = self.orders.iter().collect(); + orders.sort_by_key(|o| o.uid.0); + buf.extend_from_slice(&(orders.len() as u64).to_be_bytes()); + for order in orders { + buf.extend_from_slice(&order.uid.0); + buf.push(match order.side { + Side::Buy => 0, + Side::Sell => 1, + }); + buf.extend_from_slice(order.sell_token.as_slice()); + buf.extend_from_slice(&order.sell_amount.to_be_bytes::<32>()); + buf.extend_from_slice(order.buy_token.as_slice()); + buf.extend_from_slice(&order.buy_amount.to_be_bytes::<32>()); + buf.extend_from_slice(&order.executed_sell.to_be_bytes::<32>()); + buf.extend_from_slice(&order.executed_buy.to_be_bytes::<32>()); + } + + let mut prices: Vec<(&Address, &U256)> = self.prices.iter().collect(); + prices.sort_by_key(|(token, _)| **token); + buf.extend_from_slice(&(prices.len() as u64).to_be_bytes()); + for (token, price) in prices { + buf.extend_from_slice(token.as_slice()); + buf.extend_from_slice(&price.to_be_bytes::<32>()); + } + + keccak256(&buf).0 + } } impl state::HasState for Solution { @@ -59,6 +111,7 @@ impl state::HasState for Solution { id: self.id, solver: self.solver, orders: self.orders, + prices: self.prices, state, } } @@ -70,11 +123,17 @@ impl state::HasState for Solution { impl Solution { /// Create a new unscored solution. - pub fn new(id: u64, solver: Address, orders: Vec) -> Self { + pub fn new( + id: u64, + solver: Address, + orders: Vec, + prices: HashMap, + ) -> Self { Self { id, solver, orders, + prices, state: Unscored, } } @@ -121,3 +180,154 @@ pub struct Order { /// Determines how surplus is calculated. pub side: Side, } + +#[cfg(test)] +mod tests { + use super::*; + + fn order_uid(id: u8) -> OrderUid { + let mut uid = [0u8; 56]; + uid[0] = id; + OrderUid(uid) + } + + fn token(id: u8) -> Address { + let mut addr = [0u8; 20]; + addr[0] = id; + Address::from(addr) + } + + fn solver(id: u8) -> Address { + let mut addr = [0u8; 20]; + addr[19] = id; + Address::from(addr) + } + + fn order(sell_token: Address, buy_token: Address) -> Order { + Order { + uid: order_uid(1), + sell_token, + buy_token, + sell_amount: U256::from(1000u64), + buy_amount: U256::from(900u64), + executed_sell: U256::from(1000u64), + executed_buy: U256::from(950u64), + side: Side::Sell, + } + } + + #[test] + fn determinism_same_solution_same_hash() { + let s = solver(1); + let sell = token(0xAA); + let buy = token(0xBB); + let make = || { + Solution::new( + 42, + s, + vec![order(sell, buy)], + HashMap::from([ + (sell, U256::from(1_000_000u64)), + (buy, U256::from(2_000_000u64)), + ]), + ) + }; + assert_eq!(make().canonical_hash(), make().canonical_hash()); + } + + #[test] + fn order_independence_orders() { + let s = solver(1); + let sell_a = token(0xAA); + let buy_a = token(0xBB); + let sell_b = token(0xCC); + let buy_b = token(0xDD); + let mut a = order(sell_a, buy_a); + a.uid = order_uid(1); + let mut b = order(sell_b, buy_b); + b.uid = order_uid(2); + + let prices = HashMap::from([(sell_a, U256::from(100u64)), (buy_a, U256::from(200u64))]); + + let ab = Solution::new(1, s, vec![a.clone(), b.clone()], prices.clone()); + let ba = Solution::new(1, s, vec![b, a], prices); + assert_eq!(ab.canonical_hash(), ba.canonical_hash()); + } + + #[test] + fn order_independence_prices() { + let s = solver(1); + let sell = token(0xAA); + let buy = token(0xBB); + let third = token(0xCC); + + let prices_a = HashMap::from([ + (sell, U256::from(100u64)), + (buy, U256::from(200u64)), + (third, U256::from(300u64)), + ]); + let prices_b = HashMap::from([ + (third, U256::from(300u64)), + (buy, U256::from(200u64)), + (sell, U256::from(100u64)), + ]); + + let a = Solution::new(1, s, vec![order(sell, buy)], prices_a); + let b = Solution::new(1, s, vec![order(sell, buy)], prices_b); + assert_eq!(a.canonical_hash(), b.canonical_hash()); + } + + #[test] + fn uniqueness_different_solution_id() { + let s = solver(1); + let make = |id| Solution::new(id, s, vec![], HashMap::new()); + assert_ne!(make(1).canonical_hash(), make(2).canonical_hash()); + } + + #[test] + fn uniqueness_different_solver() { + let a = Solution::new(1, solver(1), vec![], HashMap::new()); + let b = Solution::new(1, solver(2), vec![], HashMap::new()); + assert_ne!(a.canonical_hash(), b.canonical_hash()); + } + + #[test] + fn uniqueness_different_executed_amounts() { + let s = solver(1); + let sell = token(0xAA); + let buy = token(0xBB); + + let mut a = order(sell, buy); + a.executed_buy = U256::from(950u64); + let mut b = order(sell, buy); + b.executed_buy = U256::from(960u64); + + let prices = HashMap::from([(sell, U256::from(100u64))]); + + let sa = Solution::new(1, s, vec![a], prices.clone()); + let sb = Solution::new(1, s, vec![b], prices); + assert_ne!(sa.canonical_hash(), sb.canonical_hash()); + } + + #[test] + fn uniqueness_different_prices() { + let s = solver(1); + let sell = token(0xAA); + let buy = token(0xBB); + let make = |price: u64| { + Solution::new( + 1, + s, + vec![order(sell, buy)], + HashMap::from([(sell, U256::from(price))]), + ) + }; + assert_ne!(make(100).canonical_hash(), make(200).canonical_hash()); + } + + #[test] + fn empty_orders_and_prices_do_not_panic() { + let h = Solution::new(1, solver(1), vec![], HashMap::new()).canonical_hash(); + assert_ne!(h, [0u8; 32]); + } +}