diff --git a/Cargo.lock b/Cargo.lock index 01cdca2..2e9f76f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1443,17 +1443,6 @@ dependencies = [ "serde", ] -[[package]] -name = "async-lock" -version = "3.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" -dependencies = [ - "event-listener", - "event-listener-strategy", - "pin-project-lite", -] - [[package]] name = "async-recursion" version = "1.1.1" @@ -3088,16 +3077,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "event-listener-strategy" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" -dependencies = [ - "event-listener", - "pin-project-lite", -] - [[package]] name = "eyre" version = "0.6.12" @@ -5334,26 +5313,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "moka" -version = "0.12.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" -dependencies = [ - "async-lock", - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "equivalent", - "event-listener", - "futures-util", - "parking_lot", - "portable-atomic", - "smallvec", - "tagptr", - "uuid 1.17.0", -] - [[package]] name = "multer" version = "3.1.0" @@ -8962,7 +8921,6 @@ dependencies = [ "base64 0.22.1", "clap", "futures", - "moka", "rain-math-float", "rain_orderbook_app_settings", "rain_orderbook_bindings", @@ -8972,7 +8930,6 @@ dependencies = [ "reqwest 0.13.2", "rocket", "rocket_cors", - "rusqlite", "serde", "serde_json", "sqlx", @@ -9325,12 +9282,6 @@ dependencies = [ "libc", ] -[[package]] -name = "tagptr" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" - [[package]] name = "tap" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 7b50c84..267ac21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,8 +32,6 @@ rain_orderbook_app_settings = { path = "lib/rain.orderbook/crates/settings", def rain_orderbook_bindings = { path = "lib/rain.orderbook/crates/bindings", default-features = false } rain-math-float = { path = "lib/rain.orderbook/lib/rain.interpreter/lib/rain.interpreter.interface/lib/rain.math.float/crates/float" } wasm-bindgen = "=0.2.100" -moka = { version = "0.12", features = ["future"] } -rusqlite = { version = "0.32" } [dev-dependencies] tracing-test = "0.2" diff --git a/keys.nix b/keys.nix index 0f8821d..538238b 100644 --- a/keys.nix +++ b/keys.nix @@ -3,18 +3,16 @@ rec { st0x-op = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIPZ56nOYbGDd0ZfbqxeY7AbvaQGQrHnlC80ccpRGpCoj"; host = - "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIN1JMILASAjU2qxDdKpdwprx+GllpRWDneNk7dazY3uY"; + "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIK9JhlVsHGlSS3c+RGKFSwXyuFpvUTbnOny9e2AdBQ6G"; ci = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIPTd2zKSwHgWegi290EiK5nYp1Wp4+x2fDYqFxbd0WLN"; arda = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIAyTREGZCOzMsl7N9dp1saN/t7DCs7YesusVUKApMJ78"; sid = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIPl3/6RlR6Rvz0ZRyZukzFtt4zUYNz5OVuTsajJl7V3n"; - alastair = - "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJArH3PA+bFIon0JkCVQGs9aWr45lnVjiiTLLO9BPItn"; }; roles = with keys; { - infra = [ st0x-op ci sid alastair ]; - ssh = [ st0x-op ci arda sid alastair ]; + infra = [ st0x-op ci sid ]; + ssh = [ st0x-op ci arda sid ]; }; } diff --git a/os.nix b/os.nix index 0f41a3e..76d0686 100644 --- a/os.nix +++ b/os.nix @@ -99,42 +99,11 @@ in { enable = true; recommendedTlsSettings = true; recommendedProxySettings = true; - recommendedOptimisation = true; - recommendedGzipSettings = true; - - # Rate-limit zone: 10 req/s per IP, burst 20 - appendHttpConfig = '' - limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s; - ''; - virtualHosts."api.st0x.io" = { enableACME = true; forceSSL = true; - - extraConfig = '' - # Security headers - add_header X-Content-Type-Options "nosniff" always; - add_header X-Frame-Options "DENY" always; - add_header Referrer-Policy "strict-origin-when-cross-origin" always; - - # Limit request body size (API payloads are small) - client_max_body_size 1m; - ''; - - # Block common exploit scanners (PHP, Docker, ThinkPHP, etc.) - locations."~ \\.(php|asp|aspx|jsp|cgi)$" = { - return = "444"; - }; - locations."~ ^/(containers|_ignition|vendor|public/index)" = { - return = "444"; - }; - locations."/" = { proxyPass = "http://127.0.0.1:8000"; - extraConfig = '' - limit_req zone=api burst=20 nodelay; - limit_req_status 429; - ''; }; }; }; @@ -157,7 +126,7 @@ in { }; fileSystems."/mnt/data" = { - device = "/dev/disk/by-id/scsi-0DO_Volume_st0x-rest-api-data-v2"; + device = "/dev/disk/by-id/scsi-0DO_Volume_st0x-rest-api-data"; fsType = "ext4"; }; @@ -166,7 +135,6 @@ in { experimental-features = [ "nix-command" "flakes" ]; auto-optimise-store = true; download-buffer-size = 268435456; - sandbox = false; }; gc = { diff --git a/src/cache.rs b/src/cache.rs deleted file mode 100644 index 5567c54..0000000 --- a/src/cache.rs +++ /dev/null @@ -1,132 +0,0 @@ -use moka::future::Cache; -use std::future::Future; -use std::sync::Arc; -use std::time::Duration; - -pub(crate) struct AppCache(Cache) -where - K: std::hash::Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static; - -impl AppCache -where - K: std::hash::Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, -{ - pub(crate) fn new(max_capacity: u64, ttl: Duration) -> Self { - Self( - Cache::builder() - .max_capacity(max_capacity) - .time_to_live(ttl) - .build(), - ) - } - - pub(crate) async fn get(&self, key: &K) -> Option { - self.0.get(key).await - } - - pub(crate) async fn insert(&self, key: K, value: V) { - self.0.insert(key, value).await - } - - pub(crate) async fn get_or_try_insert(&self, key: K, fetch: F) -> Result> - where - F: FnOnce() -> Fut, - Fut: Future>, - E: Send + Sync + 'static, - { - self.0.try_get_with(key, fetch()).await - } - - pub(crate) fn invalidate_all(&self) { - self.0.invalidate_all(); - } -} - -trait Invalidatable: Send + Sync { - fn invalidate_all(&self); -} - -impl Invalidatable for Cache -where - K: std::hash::Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, -{ - fn invalidate_all(&self) { - Cache::invalidate_all(self); - } -} - -pub(crate) struct CacheGroup { - caches: Vec>, -} - -impl CacheGroup { - pub(crate) fn new() -> Self { - Self { caches: Vec::new() } - } - - pub(crate) fn register(&mut self, cache: &AppCache) - where - K: std::hash::Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, - { - self.caches.push(Arc::new(cache.0.clone())); - } - - pub(crate) fn invalidate_all(&self) { - for cache in &self.caches { - cache.invalidate_all(); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[rocket::async_test] - async fn test_app_cache_insert_and_get() { - let cache: AppCache<&str, u32> = AppCache::new(10, Duration::from_secs(60)); - cache.insert("key", 42).await; - assert_eq!(cache.get(&"key").await, Some(42)); - } - - #[rocket::async_test] - async fn test_app_cache_invalidate_all_clears_entries() { - let cache: AppCache<&str, u32> = AppCache::new(10, Duration::from_secs(60)); - cache.insert("a", 1).await; - cache.insert("b", 2).await; - cache.invalidate_all(); - tokio::task::yield_now().await; - assert!(cache.get(&"a").await.is_none()); - assert!(cache.get(&"b").await.is_none()); - } - - #[rocket::async_test] - async fn test_get_or_try_insert_uses_single_flight() { - let cache: AppCache<&str, u32> = AppCache::new(10, Duration::from_secs(60)); - let result: Result> = - cache.get_or_try_insert("key", || async { Ok(42) }).await; - assert_eq!(result.unwrap(), 42); - assert_eq!(cache.get(&"key").await, Some(42)); - } - - #[rocket::async_test] - async fn test_cache_group_invalidate_all_clears_registered_caches() { - let cache_a: AppCache<&str, u32> = AppCache::new(10, Duration::from_secs(60)); - let cache_b: AppCache = AppCache::new(10, Duration::from_secs(60)); - cache_a.insert("x", 10).await; - cache_b.insert(1, "hello".into()).await; - - let mut group = CacheGroup::new(); - group.register(&cache_a); - group.register(&cache_b); - group.invalidate_all(); - - tokio::task::yield_now().await; - assert!(cache_a.get(&"x").await.is_none()); - assert!(cache_b.get(&1).await.is_none()); - } -} diff --git a/src/direct_trades.rs b/src/direct_trades.rs deleted file mode 100644 index e980f42..0000000 --- a/src/direct_trades.rs +++ /dev/null @@ -1,691 +0,0 @@ -/// Direct SQLite trade fetcher -/// -/// Bypasses the rain.orderbook library's per-query connection model by -/// maintaining a single shared connection. Runs a batch SQL query for -/// multiple order hashes in one call instead of N individual queries -/// that each open their own connection. -use crate::error::ApiError; -use crate::types::order::OrderTradeEntry; -use alloy::primitives::{Address, B256}; -use rain_math_float::Float; -use rusqlite::Connection; -use std::collections::HashMap; -use std::path::Path; -use std::str::FromStr; -use std::sync::{Arc, Mutex}; -use std::time::Instant; -use tokio::task::spawn_blocking; - -/// Holds a shared SQLite connection to the raindex local database. -pub(crate) struct DirectTradesFetcher { - conn: Arc>, - chain_id: i64, - orderbook_address: String, -} - -impl DirectTradesFetcher { - pub(crate) fn new( - db_path: &Path, - chain_id: u32, - orderbook_address: Address, - ) -> Result { - let conn = - Connection::open(db_path).map_err(|e| format!("failed to open raindex db: {e}"))?; - - conn.pragma_update(None, "journal_mode", "wal") - .map_err(|e| format!("failed to set WAL: {e}"))?; - conn.busy_timeout(std::time::Duration::from_secs(5)) - .map_err(|e| format!("failed to set busy_timeout: {e}"))?; - - // Create indexes that the upstream library is missing. These speed up - // the join between take_orders and order_add_events (which uses - // owner+nonce), and the vault_balance_changes lookup by block+log. - let indexes = [ - "CREATE INDEX IF NOT EXISTS idx_take_orders_owner_nonce \ - ON take_orders (chain_id, orderbook_address, order_owner, order_nonce)", - "CREATE INDEX IF NOT EXISTS idx_vbc_block_log \ - ON vault_balance_changes (chain_id, orderbook_address, owner, token, vault_id, block_number, log_index)", - "CREATE INDEX IF NOT EXISTS idx_take_orders_sender \ - ON take_orders (chain_id, orderbook_address, sender)", - ]; - for sql in &indexes { - if let Err(e) = conn.execute_batch(sql) { - tracing::warn!(error = %e, sql, "failed to create performance index (non-fatal)"); - } - } - - Ok(Self { - conn: Arc::new(Mutex::new(conn)), - chain_id: chain_id as i64, - orderbook_address: format!("{:#x}", orderbook_address), - }) - } - - /// Fetch trades for multiple order hashes in a single batch query. - pub(crate) async fn batch_fetch( - &self, - hashes: &[B256], - ) -> Result>, ApiError> { - if hashes.is_empty() { - return Ok(HashMap::new()); - } - - let conn = Arc::clone(&self.conn); - let chain_id = self.chain_id; - let ob_addr = self.orderbook_address.clone(); - let hash_strings: Vec = hashes.iter().map(|h| format!("{:#x}", h)).collect(); - - spawn_blocking(move || { - let start = Instant::now(); - let conn = conn.lock().map_err(|e| { - tracing::error!(error = %e, "failed to lock direct trades connection"); - ApiError::Internal("trade query failed".into()) - })?; - - let placeholders: Vec = (0..hash_strings.len()) - .map(|i| format!("?{}", i + 3)) - .collect(); - let in_clause = placeholders.join(", "); - let query = build_batch_query(&in_clause); - - let mut stmt = conn.prepare(&query).map_err(|e| { - tracing::error!(error = %e, "failed to prepare batch trades query"); - ApiError::Internal("trade query failed".into()) - })?; - - // Bind: ?1 = chain_id, ?2 = orderbook_address, ?3..N = order hashes - let mut params: Vec> = - Vec::with_capacity(hash_strings.len() + 2); - params.push(Box::new(chain_id)); - params.push(Box::new(ob_addr)); - for h in &hash_strings { - params.push(Box::new(h.clone())); - } - let param_refs: Vec<&dyn rusqlite::types::ToSql> = - params.iter().map(|p| p.as_ref()).collect(); - - let rows = stmt - .query_map(param_refs.as_slice(), |row| { - Ok(RawTradeRow { - order_hash: row.get(0)?, - transaction_hash: row.get(1)?, - block_timestamp: row.get(2)?, - transaction_sender: row.get(3)?, - input_delta: row.get(4)?, - output_delta_raw: row.get(5)?, - trade_id: row.get(6)?, - }) - }) - .map_err(|e| { - tracing::error!(error = %e, "batch trades query failed"); - ApiError::Internal("trade query failed".into()) - })?; - - let mut result: HashMap> = HashMap::new(); - let mut row_count = 0u32; - - for row_result in rows { - let raw = row_result.map_err(|e| { - tracing::error!(error = %e, "failed to read trade row"); - ApiError::Internal("trade query failed".into()) - })?; - - row_count += 1; - - match convert_raw_trade(&raw) { - Ok((hash, entry)) => { - result.entry(hash).or_default().push(entry); - } - Err(e) => { - tracing::warn!( - error = %e, - order_hash = %raw.order_hash, - "skipping malformed trade row" - ); - } - } - } - - tracing::info!( - hash_count = hash_strings.len(), - trade_rows = row_count, - duration_ms = start.elapsed().as_millis() as u64, - "direct batch trades query completed" - ); - - Ok(result) - }) - .await - .map_err(|e| { - tracing::error!(error = %e, "batch trades blocking task failed"); - ApiError::Internal("trade query failed".into()) - })? - } - - /// Fetch unique transaction hashes where `sender` was the taker. - /// Returns (tx_hash, timestamp) sorted by timestamp descending. - pub(crate) async fn fetch_taker_tx_hashes( - &self, - sender: &Address, - ) -> Result, ApiError> { - let conn = Arc::clone(&self.conn); - let chain_id = self.chain_id; - let ob_addr = self.orderbook_address.clone(); - let sender_hex = format!("{:#x}", sender); - - spawn_blocking(move || { - let start = Instant::now(); - let conn = conn.lock().map_err(|e| { - tracing::error!(error = %e, "failed to lock direct trades connection"); - ApiError::Internal("taker trades query failed".into()) - })?; - - let mut stmt = conn - .prepare( - "SELECT DISTINCT transaction_hash, MAX(block_timestamp) as ts \ - FROM take_orders \ - WHERE sender = ?1 AND chain_id = ?2 AND orderbook_address = ?3 \ - GROUP BY transaction_hash \ - ORDER BY ts DESC", - ) - .map_err(|e| { - tracing::error!(error = %e, "failed to prepare taker tx query"); - ApiError::Internal("taker trades query failed".into()) - })?; - - let rows = stmt - .query_map(rusqlite::params![sender_hex, chain_id, ob_addr], |row| { - let tx_hash: String = row.get(0)?; - let timestamp: i64 = row.get(1)?; - Ok((tx_hash, timestamp)) - }) - .map_err(|e| { - tracing::error!(error = %e, "taker tx query failed"); - ApiError::Internal("taker trades query failed".into()) - })?; - - let mut results = Vec::new(); - for row_result in rows { - let (hash_str, ts) = row_result.map_err(|e| { - tracing::error!(error = %e, "failed to read taker tx row"); - ApiError::Internal("taker trades query failed".into()) - })?; - let hash = B256::from_str(&hash_str).map_err(|e| { - tracing::error!(error = %e, hash = %hash_str, "invalid tx hash in taker query"); - ApiError::Internal("taker trades query failed".into()) - })?; - results.push((hash, ts as u64)); - } - - tracing::info!( - sender = %sender_hex, - tx_count = results.len(), - duration_ms = start.elapsed().as_millis() as u64, - "fetched taker tx hashes" - ); - - Ok(results) - }) - .await - .map_err(|e| { - tracing::error!(error = %e, "taker tx hashes blocking task failed"); - ApiError::Internal("taker trades query failed".into()) - })? - } - - /// Fetch trades associated with a specific transaction hash. - /// Returns trades grouped by order hash — same shape as `batch_fetch`. - pub(crate) async fn fetch_by_tx_hash( - &self, - tx_hash: &B256, - ) -> Result>, ApiError> { - let conn = Arc::clone(&self.conn); - let chain_id = self.chain_id; - let ob_addr = self.orderbook_address.clone(); - let tx_hex = format!("{:#x}", tx_hash); - - spawn_blocking(move || { - let start = Instant::now(); - let conn = conn.lock().map_err(|e| { - tracing::error!(error = %e, "failed to lock direct trades connection"); - ApiError::Internal("trade query failed".into()) - })?; - - let query = build_tx_hash_query(); - let mut stmt = conn.prepare(&query).map_err(|e| { - tracing::error!(error = %e, "failed to prepare tx hash trades query"); - ApiError::Internal("trade query failed".into()) - })?; - - let rows = stmt - .query_map(rusqlite::params![chain_id, ob_addr, tx_hex], |row| { - Ok(RawTradeRow { - order_hash: row.get(0)?, - transaction_hash: row.get(1)?, - block_timestamp: row.get(2)?, - transaction_sender: row.get(3)?, - input_delta: row.get(4)?, - output_delta_raw: row.get(5)?, - trade_id: row.get(6)?, - }) - }) - .map_err(|e| { - tracing::error!(error = %e, "tx hash trades query failed"); - ApiError::Internal("trade query failed".into()) - })?; - - let mut result: HashMap> = HashMap::new(); - let mut row_count = 0u32; - - for row_result in rows { - let raw = row_result.map_err(|e| { - tracing::error!(error = %e, "failed to read trade row"); - ApiError::Internal("trade query failed".into()) - })?; - - row_count += 1; - - match convert_raw_trade(&raw) { - Ok((hash, entry)) => { - result.entry(hash).or_default().push(entry); - } - Err(e) => { - tracing::warn!( - error = %e, - order_hash = %raw.order_hash, - "skipping malformed trade row" - ); - } - } - } - - tracing::info!( - tx_hash = %tx_hex, - trade_rows = row_count, - duration_ms = start.elapsed().as_millis() as u64, - "direct tx hash trades query completed" - ); - - Ok(result) - }) - .await - .map_err(|e| { - tracing::error!(error = %e, "tx hash trades blocking task failed"); - ApiError::Internal("trade query failed".into()) - })? - } -} - -struct RawTradeRow { - order_hash: String, - transaction_hash: String, - block_timestamp: i64, - transaction_sender: String, - input_delta: String, - output_delta_raw: String, - trade_id: String, -} - -fn convert_raw_trade(raw: &RawTradeRow) -> Result<(B256, OrderTradeEntry), ApiError> { - let order_hash = B256::from_str(&raw.order_hash) - .map_err(|e| ApiError::Internal(format!("invalid order hash: {e}")))?; - - let tx_hash = B256::from_str(&raw.transaction_hash) - .map_err(|e| ApiError::Internal(format!("invalid tx hash: {e}")))?; - - let sender = Address::from_str(&raw.transaction_sender) - .map_err(|e| ApiError::Internal(format!("invalid sender address: {e}")))?; - - let input_amount = format_float_hex(&raw.input_delta)?; - let output_amount = negate_and_format_float_hex(&raw.output_delta_raw)?; - - let entry = OrderTradeEntry { - id: raw.trade_id.clone(), - tx_hash, - input_amount, - output_amount, - timestamp: raw.block_timestamp as u64, - sender, - }; - - Ok((order_hash, entry)) -} - -fn format_float_hex(hex: &str) -> Result { - let float = Float::from_hex(hex).map_err(|e| { - tracing::error!(error = %e, hex, "failed to parse float hex"); - ApiError::Internal("float conversion failed".into()) - })?; - float.format().map_err(|e| { - tracing::error!(error = %e, "failed to format float"); - ApiError::Internal("float formatting failed".into()) - }) -} - -/// Negate a Float hex value and format it — replicates the SQL FLOAT_NEGATE -/// function in Rust so we don't need to register custom SQLite functions. -fn negate_and_format_float_hex(hex: &str) -> Result { - let neg_one = Float::parse("-1".to_string()).map_err(|e| { - tracing::error!(error = %e, "failed to create neg-one float"); - ApiError::Internal("float conversion failed".into()) - })?; - let float = Float::from_hex(hex).map_err(|e| { - tracing::error!(error = %e, hex, "failed to parse float hex"); - ApiError::Internal("float conversion failed".into()) - })?; - let negated = (neg_one * float).map_err(|e| { - tracing::error!(error = %e, "failed to negate float"); - ApiError::Internal("float conversion failed".into()) - })?; - negated.format().map_err(|e| { - tracing::error!(error = %e, "failed to format negated float"); - ApiError::Internal("float formatting failed".into()) - }) -} - -/// Build a batch trade query with a dynamic IN-clause. This is a simplified -/// version of rain.orderbook's `fetch_order_trades/query.sql` that: -/// - Accepts multiple order hashes at once (via IN-clause) -/// - Drops vault balance snapshot lookups (not needed for the API response) -/// - Skips FLOAT_NEGATE (handled in Rust after fetching) -fn build_batch_query(in_clause: &str) -> String { - format!( - r#" -WITH -order_add_events AS ( - SELECT - oe.chain_id, oe.orderbook_address, oe.transaction_hash, oe.log_index, - oe.block_number, oe.block_timestamp, oe.order_owner, oe.order_nonce, oe.order_hash - FROM order_events oe - WHERE oe.chain_id = ?1 - AND oe.orderbook_address = ?2 - AND oe.order_hash IN ({in_clause}) - AND oe.event_type = 'AddOrderV3' -), -take_trades AS ( - SELECT - oe.order_hash, - t.transaction_hash, - t.log_index, - t.block_timestamp, - t.sender AS transaction_sender, - t.taker_output AS input_delta, - t.taker_input AS output_delta_raw - FROM take_orders t - JOIN order_add_events oe - ON oe.chain_id = t.chain_id - AND oe.orderbook_address = t.orderbook_address - AND oe.order_owner = t.order_owner - AND oe.order_nonce = t.order_nonce - AND (oe.block_number < t.block_number - OR (oe.block_number = t.block_number AND oe.log_index <= t.log_index)) - AND NOT EXISTS ( - SELECT 1 FROM order_add_events newer - WHERE newer.chain_id = oe.chain_id - AND newer.orderbook_address = oe.orderbook_address - AND newer.order_owner = oe.order_owner - AND newer.order_nonce = oe.order_nonce - AND (newer.block_number < t.block_number - OR (newer.block_number = t.block_number AND newer.log_index <= t.log_index)) - AND (newer.block_number > oe.block_number - OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)) - ) - WHERE t.chain_id = ?1 - AND t.orderbook_address = ?2 -), -clear_alice AS ( - SELECT DISTINCT - oe.order_hash, - c.transaction_hash, - c.log_index, - c.block_timestamp, - c.sender AS transaction_sender, - a.alice_input AS input_delta, - a.alice_output AS output_delta_raw - FROM clear_v3_events c - JOIN order_add_events oe - ON oe.chain_id = c.chain_id - AND oe.orderbook_address = c.orderbook_address - AND oe.order_hash = c.alice_order_hash - AND (oe.block_number < c.block_number - OR (oe.block_number = c.block_number AND oe.log_index <= c.log_index)) - AND NOT EXISTS ( - SELECT 1 FROM order_add_events newer - WHERE newer.chain_id = oe.chain_id - AND newer.orderbook_address = oe.orderbook_address - AND newer.order_hash = oe.order_hash - AND (newer.block_number < c.block_number - OR (newer.block_number = c.block_number AND newer.log_index <= c.log_index)) - AND (newer.block_number > oe.block_number - OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)) - ) - JOIN after_clear_v2_events a - ON a.chain_id = c.chain_id - AND a.orderbook_address = c.orderbook_address - AND a.transaction_hash = c.transaction_hash - AND a.log_index = ( - SELECT MIN(ac.log_index) - FROM after_clear_v2_events ac - WHERE ac.chain_id = c.chain_id - AND ac.orderbook_address = c.orderbook_address - AND ac.transaction_hash = c.transaction_hash - AND ac.log_index > c.log_index - ) - WHERE c.chain_id = ?1 - AND c.orderbook_address = ?2 - AND c.alice_order_hash IN ({in_clause}) -), -clear_bob AS ( - SELECT DISTINCT - oe.order_hash, - c.transaction_hash, - c.log_index, - c.block_timestamp, - c.sender AS transaction_sender, - a.bob_input AS input_delta, - a.bob_output AS output_delta_raw - FROM clear_v3_events c - JOIN order_add_events oe - ON oe.chain_id = c.chain_id - AND oe.orderbook_address = c.orderbook_address - AND oe.order_hash = c.bob_order_hash - AND (oe.block_number < c.block_number - OR (oe.block_number = c.block_number AND oe.log_index <= c.log_index)) - AND NOT EXISTS ( - SELECT 1 FROM order_add_events newer - WHERE newer.chain_id = oe.chain_id - AND newer.orderbook_address = oe.orderbook_address - AND newer.order_hash = oe.order_hash - AND (newer.block_number < c.block_number - OR (newer.block_number = c.block_number AND newer.log_index <= c.log_index)) - AND (newer.block_number > oe.block_number - OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)) - ) - JOIN after_clear_v2_events a - ON a.chain_id = c.chain_id - AND a.orderbook_address = c.orderbook_address - AND a.transaction_hash = c.transaction_hash - AND a.log_index = ( - SELECT MIN(ac.log_index) - FROM after_clear_v2_events ac - WHERE ac.chain_id = c.chain_id - AND ac.orderbook_address = c.orderbook_address - AND ac.transaction_hash = c.transaction_hash - AND ac.log_index > c.log_index - ) - WHERE c.chain_id = ?1 - AND c.orderbook_address = ?2 - AND c.bob_order_hash IN ({in_clause}) -) -SELECT - order_hash, - transaction_hash, - block_timestamp, - transaction_sender, - input_delta, - output_delta_raw, - ('0x' || lower(replace(transaction_hash, '0x', '')) || printf('%016x', log_index)) AS trade_id -FROM ( - SELECT * FROM take_trades - UNION ALL - SELECT * FROM clear_alice - UNION ALL - SELECT * FROM clear_bob -) -ORDER BY order_hash, block_timestamp DESC, log_index DESC -"#, - in_clause = in_clause - ) -} - -/// Build a query that finds all trades in a given transaction. -/// Filters by `transaction_hash` on the take_orders / clear tables -/// and joins back to order_events to get the order_hash. -/// ?1 = chain_id, ?2 = orderbook_address, ?3 = transaction_hash -fn build_tx_hash_query() -> String { - r#" -WITH -take_trades AS ( - SELECT - oe.order_hash, - t.transaction_hash, - t.log_index, - t.block_timestamp, - t.sender AS transaction_sender, - t.taker_output AS input_delta, - t.taker_input AS output_delta_raw - FROM take_orders t - JOIN order_events oe - ON oe.chain_id = t.chain_id - AND oe.orderbook_address = t.orderbook_address - AND oe.order_owner = t.order_owner - AND oe.order_nonce = t.order_nonce - AND oe.event_type = 'AddOrderV3' - AND (oe.block_number < t.block_number - OR (oe.block_number = t.block_number AND oe.log_index <= t.log_index)) - AND NOT EXISTS ( - SELECT 1 FROM order_events newer - WHERE newer.chain_id = oe.chain_id - AND newer.orderbook_address = oe.orderbook_address - AND newer.order_owner = oe.order_owner - AND newer.order_nonce = oe.order_nonce - AND newer.event_type = 'AddOrderV3' - AND (newer.block_number < t.block_number - OR (newer.block_number = t.block_number AND newer.log_index <= t.log_index)) - AND (newer.block_number > oe.block_number - OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)) - ) - WHERE t.chain_id = ?1 - AND t.orderbook_address = ?2 - AND t.transaction_hash = ?3 -), -clear_alice AS ( - SELECT DISTINCT - oe.order_hash, - c.transaction_hash, - c.log_index, - c.block_timestamp, - c.sender AS transaction_sender, - a.alice_input AS input_delta, - a.alice_output AS output_delta_raw - FROM clear_v3_events c - JOIN order_events oe - ON oe.chain_id = c.chain_id - AND oe.orderbook_address = c.orderbook_address - AND oe.order_hash = c.alice_order_hash - AND oe.event_type = 'AddOrderV3' - AND (oe.block_number < c.block_number - OR (oe.block_number = c.block_number AND oe.log_index <= c.log_index)) - AND NOT EXISTS ( - SELECT 1 FROM order_events newer - WHERE newer.chain_id = oe.chain_id - AND newer.orderbook_address = oe.orderbook_address - AND newer.order_hash = oe.order_hash - AND newer.event_type = 'AddOrderV3' - AND (newer.block_number < c.block_number - OR (newer.block_number = c.block_number AND newer.log_index <= c.log_index)) - AND (newer.block_number > oe.block_number - OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)) - ) - JOIN after_clear_v2_events a - ON a.chain_id = c.chain_id - AND a.orderbook_address = c.orderbook_address - AND a.transaction_hash = c.transaction_hash - AND a.log_index = ( - SELECT MIN(ac.log_index) - FROM after_clear_v2_events ac - WHERE ac.chain_id = c.chain_id - AND ac.orderbook_address = c.orderbook_address - AND ac.transaction_hash = c.transaction_hash - AND ac.log_index > c.log_index - ) - WHERE c.chain_id = ?1 - AND c.orderbook_address = ?2 - AND c.transaction_hash = ?3 -), -clear_bob AS ( - SELECT DISTINCT - oe.order_hash, - c.transaction_hash, - c.log_index, - c.block_timestamp, - c.sender AS transaction_sender, - a.bob_input AS input_delta, - a.bob_output AS output_delta_raw - FROM clear_v3_events c - JOIN order_events oe - ON oe.chain_id = c.chain_id - AND oe.orderbook_address = c.orderbook_address - AND oe.order_hash = c.bob_order_hash - AND oe.event_type = 'AddOrderV3' - AND (oe.block_number < c.block_number - OR (oe.block_number = c.block_number AND oe.log_index <= c.log_index)) - AND NOT EXISTS ( - SELECT 1 FROM order_events newer - WHERE newer.chain_id = oe.chain_id - AND newer.orderbook_address = oe.orderbook_address - AND newer.order_hash = oe.order_hash - AND newer.event_type = 'AddOrderV3' - AND (newer.block_number < c.block_number - OR (newer.block_number = c.block_number AND newer.log_index <= c.log_index)) - AND (newer.block_number > oe.block_number - OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)) - ) - JOIN after_clear_v2_events a - ON a.chain_id = c.chain_id - AND a.orderbook_address = c.orderbook_address - AND a.transaction_hash = c.transaction_hash - AND a.log_index = ( - SELECT MIN(ac.log_index) - FROM after_clear_v2_events ac - WHERE ac.chain_id = c.chain_id - AND ac.orderbook_address = c.orderbook_address - AND ac.transaction_hash = c.transaction_hash - AND ac.log_index > c.log_index - ) - WHERE c.chain_id = ?1 - AND c.orderbook_address = ?2 - AND c.bob_order_hash IN ( - SELECT DISTINCT bob_order_hash FROM clear_v3_events - WHERE chain_id = ?1 AND orderbook_address = ?2 AND transaction_hash = ?3 - ) -) -SELECT - order_hash, - transaction_hash, - block_timestamp, - transaction_sender, - input_delta, - output_delta_raw, - ('0x' || lower(replace(transaction_hash, '0x', '')) || printf('%016x', log_index)) AS trade_id -FROM ( - SELECT * FROM take_trades - UNION ALL - SELECT * FROM clear_alice - UNION ALL - SELECT * FROM clear_bob -) -ORDER BY order_hash, block_timestamp DESC, log_index DESC -"# - .to_string() -} diff --git a/src/error.rs b/src/error.rs index 57724ed..72b222b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,8 +35,6 @@ pub enum ApiError { Internal(String), #[error("Rate limited: {0}")] RateLimited(String), - #[error("Not yet indexed: {0}")] - NotYetIndexed(String), } impl<'r> Responder<'r, 'static> for ApiError { @@ -48,7 +46,6 @@ impl<'r> Responder<'r, 'static> for ApiError { ApiError::NotFound(msg) => (Status::NotFound, "NOT_FOUND", msg.clone()), ApiError::Internal(msg) => (Status::InternalServerError, "INTERNAL_ERROR", msg.clone()), ApiError::RateLimited(msg) => (Status::TooManyRequests, "RATE_LIMITED", msg.clone()), - ApiError::NotYetIndexed(msg) => (Status::Accepted, "NOT_YET_INDEXED", msg.clone()), }; let span = request_span_for(req); span.in_scope(|| { @@ -94,12 +91,6 @@ impl<'r> Responder<'r, 'static> for ApiError { } } -impl From> for ApiError { - fn from(arc: std::sync::Arc) -> Self { - (*arc).clone() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/main.rs b/src/main.rs index 082822e..4b6af71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,12 +2,10 @@ extern crate rocket; mod auth; -mod cache; mod catchers; mod cli; mod config; mod db; -mod direct_trades; mod error; mod fairings; mod raindex; @@ -67,8 +65,6 @@ enum StartupError { routes::admin::put_registry, routes::trades::get_trades_by_tx, routes::trades::get_trades_by_address, - routes::trades::get_taker_trades, - routes::trades::post_trades_batch, routes::registry::get_registry, routes::registry::get_registry_history, ), @@ -122,7 +118,6 @@ pub(crate) fn rocket( rate_limiter: fairings::RateLimiter, raindex_config: raindex::SharedRaindexProvider, docs_dir: String, - direct_trades_fetcher: Option, ) -> Result, StartupError> { let cors = configure_cors()?; @@ -130,24 +125,10 @@ pub(crate) fn rocket( let options = Options::Index | Options::NormalizeDirs; - let trades_by_address_cache = routes::trades::trades_by_address_cache(); - let trades_by_tx_cache = routes::trades::trades_by_tx_cache(); - let trades_by_order_hash_cache = routes::trades::trades_by_order_hash_cache(); - let taker_trades_tx_hash_cache = routes::trades::taker_trades_tx_hash_cache(); - let orders_by_token_cache = routes::orders::orders_by_token_cache(); - let orders_by_owner_cache = routes::orders::orders_by_owner_cache(); - Ok(rocket::custom(figment) .manage(pool) .manage(rate_limiter) .manage(raindex_config) - .manage(trades_by_address_cache) - .manage(trades_by_tx_cache) - .manage(trades_by_order_hash_cache) - .manage(taker_trades_tx_hash_cache) - .manage(orders_by_token_cache) - .manage(orders_by_owner_cache) - .manage(direct_trades_fetcher) .mount("/", routes::health::routes()) .mount("/v1/tokens", routes::tokens::routes()) .mount("/v1/swap", routes::swap::routes()) @@ -272,56 +253,6 @@ async fn main() { } }; - // Create direct trades fetcher for fast batch trade lookups. - // Bypasses the library's per-query connection model. - let direct_trades_fetcher = match raindex_config.db_path() { - Some(db_path) if db_path.exists() => { - match raindex_config.client().get_all_orderbooks() { - Ok(orderbooks) => { - if let Some(ob) = orderbooks.values().next() { - match direct_trades::DirectTradesFetcher::new( - &db_path, - ob.network.chain_id, - ob.address, - ) { - Ok(fetcher) => { - tracing::info!( - chain_id = ob.network.chain_id, - orderbook = %ob.address, - "direct trades fetcher initialized" - ); - Some(fetcher) - } - Err(e) => { - tracing::warn!( - error = %e, - "failed to create direct trades fetcher; using fallback" - ); - None - } - } - } else { - tracing::warn!( - "no orderbooks configured; direct trades fetcher disabled" - ); - None - } - } - Err(e) => { - tracing::warn!( - error = %e, - "failed to get orderbooks; direct trades fetcher disabled" - ); - None - } - } - } - _ => { - tracing::info!("no local db path; direct trades fetcher disabled"); - None - } - }; - let shared_raindex = tokio::sync::RwLock::new(raindex_config); let rate_limiter = fairings::RateLimiter::new(cfg.rate_limit_global_rpm, cfg.rate_limit_per_key_rpm); @@ -333,13 +264,7 @@ async fn main() { } tracing::info!(docs_dir = %cfg.docs_dir, "serving documentation at /docs"); - let rocket = match rocket( - pool, - rate_limiter, - shared_raindex, - cfg.docs_dir, - direct_trades_fetcher, - ) { + let rocket = match rocket(pool, rate_limiter, shared_raindex, cfg.docs_dir) { Ok(r) => r, Err(e) => { tracing::error!(error = %e, "failed to build Rocket instance"); diff --git a/src/raindex/config.rs b/src/raindex/config.rs index d78b677..f23fc9b 100644 --- a/src/raindex/config.rs +++ b/src/raindex/config.rs @@ -10,37 +10,6 @@ pub(crate) struct RaindexProvider { db_path: Option, } -/// Neutralizes the `metaboards` section in YAML settings so the library's -/// `fetch_orders_dotrain_sources()` skips network requests to the Goldsky -/// metaboard subgraph. That function fetches `DotrainSourceV1` metadata per -/// order (~5s for 20 orders). Our API never uses `DotrainSourceV1`, so -/// replacing the metaboard keys with non-matching names causes each order's -/// `fetch_dotrain_source()` to return `Ok(())` immediately. -fn neutralize_metaboards(yaml: &str) -> String { - let mut result = String::with_capacity(yaml.len() + 64); - let mut in_metaboards = false; - - for line in yaml.lines() { - if !in_metaboards && line.starts_with("metaboards:") { - in_metaboards = true; - result.push_str("metaboards:\n _disabled: https://localhost\n"); - continue; - } - - if in_metaboards { - if line.is_empty() || line.starts_with(' ') || line.starts_with('\t') { - continue; - } - in_metaboards = false; - } - - result.push_str(line); - result.push('\n'); - } - - result -} - impl RaindexProvider { pub(crate) async fn load( registry_url: &str, @@ -68,10 +37,8 @@ impl RaindexProvider { .await .map_err(|e| RaindexProviderError::RegistryLoad(e.to_string()))?; - // Build the client with metaboard lookups disabled to avoid ~5s - // of network calls in fetch_orders_dotrain_sources(). - let settings = neutralize_metaboards(®istry.settings()); - let client = RaindexClient::new(vec![settings], None, db.clone()) + let client = registry + .get_raindex_client(db.clone()) .await .map_err(|e| RaindexProviderError::ClientInit(e.to_string()))?; @@ -173,51 +140,6 @@ mod tests { assert!(!config.registry_url().is_empty()); } - #[test] - fn test_neutralize_metaboards_replaces_entries() { - let yaml = "\ -version: 4 -networks: - base: - chain-id: 8453 -metaboards: - base: https://api.goldsky.com/metaboard - ethereum: https://api.goldsky.com/metaboard-eth -orderbooks: - base: - address: 0xabc -"; - let result = neutralize_metaboards(yaml); - assert!(result.contains("metaboards:\n _disabled: https://localhost\n")); - assert!(!result.contains("api.goldsky.com")); - assert!(result.contains("orderbooks:")); - assert!(result.contains("networks:")); - } - - #[test] - fn test_neutralize_metaboards_no_section() { - let yaml = "\ -version: 4 -networks: - base: - chain-id: 8453 -"; - let result = neutralize_metaboards(yaml); - assert_eq!(result.trim(), yaml.trim()); - assert!(!result.contains("metaboards")); - } - - #[test] - fn test_neutralize_metaboards_at_end_of_file() { - let yaml = "\ -version: 4 -metaboards: - base: https://api.goldsky.com/metaboard"; - let result = neutralize_metaboards(yaml); - assert!(result.contains("metaboards:\n _disabled: https://localhost\n")); - assert!(!result.contains("api.goldsky.com")); - } - #[test] fn test_error_maps_to_api_error() { let err = RaindexProviderError::RegistryLoad("test".into()); diff --git a/src/routes/order/get_order.rs b/src/routes/order/get_order.rs index 63b61f3..6bcd392 100644 --- a/src/routes/order/get_order.rs +++ b/src/routes/order/get_order.rs @@ -121,7 +121,7 @@ fn build_order_detail( }) } -pub(crate) fn map_trade(trade: &RaindexTrade) -> OrderTradeEntry { +fn map_trade(trade: &RaindexTrade) -> OrderTradeEntry { let timestamp: u64 = trade.timestamp().try_into().unwrap_or(0); let tx = trade.transaction(); OrderTradeEntry { diff --git a/src/routes/orders/get_by_owner.rs b/src/routes/orders/get_by_owner.rs index d5c3a2c..129232a 100644 --- a/src/routes/orders/get_by_owner.rs +++ b/src/routes/orders/get_by_owner.rs @@ -1,9 +1,8 @@ use super::{ - build_order_summary, build_pagination, OrdersListDataSource, RaindexOrdersListDataSource, + build_orders_list_response, OrdersListDataSource, RaindexOrdersListDataSource, DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE, }; use crate::auth::AuthenticatedKey; -use crate::cache::AppCache; use crate::error::{ApiError, ApiErrorResponse}; use crate::fairings::{GlobalRateLimit, TracingSpan}; use crate::types::common::ValidatedAddress; @@ -12,18 +11,8 @@ use alloy::primitives::Address; use rain_orderbook_common::raindex_client::orders::GetOrdersFilters; use rocket::serde::json::Json; use rocket::State; -use std::time::{Duration, Instant}; use tracing::Instrument; -const ORDERS_BY_OWNER_CACHE_TTL: Duration = Duration::from_secs(15); -const ORDERS_BY_OWNER_CACHE_CAPACITY: u64 = 1_000; - -pub(crate) type OrdersByOwnerCache = AppCache<(Address, u16, u16), OrdersListResponse>; - -pub(crate) fn orders_by_owner_cache() -> OrdersByOwnerCache { - AppCache::new(ORDERS_BY_OWNER_CACHE_CAPACITY, ORDERS_BY_OWNER_CACHE_TTL) -} - pub(crate) async fn process_get_orders_by_owner( ds: &dyn OrdersListDataSource, address: Address, @@ -36,73 +25,27 @@ pub(crate) async fn process_get_orders_by_owner( ..Default::default() }; - let total_start = Instant::now(); let page_num = page.unwrap_or(1); let effective_page_size = page_size .unwrap_or(DEFAULT_PAGE_SIZE as u16) .min(MAX_PAGE_SIZE); - - let orders_stage_start = Instant::now(); let (orders, total_count) = ds .get_orders_list(filters, Some(page_num), Some(effective_page_size)) .await?; - let orders_stage_duration_ms = orders_stage_start.elapsed().as_millis(); - // Only quote orders with non-zero output balance - let mut quotable_indices: Vec = Vec::new(); - let mut quotable_orders: Vec = - Vec::new(); - for (i, order) in orders.iter().enumerate() { - let has_balance = crate::routes::resolve_io_vaults(order) - .map(|(_, output)| { - output - .formatted_balance() - .parse::() - .is_ok_and(|b| b > 0.0) - }) - .unwrap_or(false); - if has_balance { - quotable_indices.push(i); - quotable_orders.push(order.clone()); - } - } - - let quotes_stage_start = Instant::now(); tracing::info!( - total_orders = orders.len(), - quotable_orders = quotable_orders.len(), - skipped_zero_balance = orders.len() - quotable_orders.len(), + quoted_orders = orders.len(), "fetching batched quotes for orders by owner" ); - let quote_results = ds.get_order_quotes_batch("able_orders).await; - let quotes_stage_duration_ms = quotes_stage_start.elapsed().as_millis(); - - // Map quote results back to original order positions - let mut io_ratios: Vec = vec!["-".into(); orders.len()]; - for (qi, &original_idx) in quotable_indices.iter().enumerate() { - io_ratios[original_idx] = super::quote_result_to_io_ratio(&orders[original_idx], quote_results.get(qi).cloned().unwrap_or_else(|| Err(ApiError::Internal("missing quote".into())))); - } - - let mut summaries = Vec::with_capacity(orders.len()); - for (order, io_ratio) in orders.iter().zip(io_ratios.iter()) { - summaries.push(build_order_summary(order, io_ratio)?); - } - - let pagination = build_pagination(total_count, page_num.into(), effective_page_size.into()); - tracing::info!( - page = page_num, - page_size = effective_page_size, - returned_orders = summaries.len(), - total_orders = total_count, - orders_stage_duration_ms, - quotes_stage_duration_ms, - total_duration_ms = total_start.elapsed().as_millis(), - "orders by owner request processed" - ); - Ok(OrdersListResponse { - orders: summaries, - pagination, - }) + let quote_results = ds.get_order_quotes_batch(&orders).await; + + build_orders_list_response( + &orders, + total_count, + page_num.into(), + effective_page_size.into(), + quote_results, + ) } #[utoipa::path( @@ -128,7 +71,6 @@ pub async fn get_orders_by_address( _global: GlobalRateLimit, _key: AuthenticatedKey, shared_raindex: &State, - orders_cache: &State, span: TracingSpan, address: ValidatedAddress, params: OrdersPaginationParams, @@ -136,24 +78,13 @@ pub async fn get_orders_by_address( async move { tracing::info!(address = ?address, params = ?params, "request received"); let addr = address.0; - let page = params.page.unwrap_or(1); - let page_size = params - .page_size - .unwrap_or(DEFAULT_PAGE_SIZE as u16) - .min(MAX_PAGE_SIZE); - let cache_key = (addr, page, page_size); - - let response = orders_cache - .get_or_try_insert(cache_key, || async { - let raindex = shared_raindex.read().await; - let ds = RaindexOrdersListDataSource { - client: raindex.client(), - }; - process_get_orders_by_owner(&ds, addr, Some(page), Some(page_size)).await - }) - .await - .map_err(ApiError::from)?; - + let page = params.page; + let page_size = params.page_size; + let raindex = shared_raindex.read().await; + let ds = RaindexOrdersListDataSource { + client: raindex.client(), + }; + let response = process_get_orders_by_owner(&ds, addr, page, page_size).await?; Ok(Json(response)) } .instrument(span.0) @@ -260,8 +191,7 @@ mod tests { assert_eq!(result.orders.len(), 1); assert_eq!(result.orders[0].input_token.symbol, "wtMSTR"); assert_eq!(result.orders[0].output_token.symbol, "wtMSTR"); - // Zero-balance orders are not quoted; io_ratio defaults to "-" - assert_eq!(result.orders[0].io_ratio, "-"); + assert_eq!(result.orders[0].io_ratio, "200.0"); } #[rocket::async_test] diff --git a/src/routes/orders/get_by_token.rs b/src/routes/orders/get_by_token.rs index addf858..68dcee0 100644 --- a/src/routes/orders/get_by_token.rs +++ b/src/routes/orders/get_by_token.rs @@ -1,9 +1,8 @@ use super::{ - build_order_summary, build_pagination, OrdersListDataSource, RaindexOrdersListDataSource, + build_orders_list_response, OrdersListDataSource, RaindexOrdersListDataSource, DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE, }; use crate::auth::AuthenticatedKey; -use crate::cache::AppCache; use crate::error::{ApiError, ApiErrorResponse}; use crate::fairings::{GlobalRateLimit, TracingSpan}; use crate::types::common::ValidatedAddress; @@ -13,19 +12,8 @@ use rain_orderbook_common::raindex_client::orders::GetOrdersFilters; use rain_orderbook_common::raindex_client::orders::GetOrdersTokenFilter; use rocket::serde::json::Json; use rocket::State; -use std::time::{Duration, Instant}; use tracing::Instrument; -const ORDERS_CACHE_TTL: Duration = Duration::from_secs(15); -const ORDERS_CACHE_CAPACITY: u64 = 1_000; - -pub(crate) type OrdersByTokenCache = - AppCache<(Address, Option, u16, u16), OrdersListResponse>; - -pub(crate) fn orders_by_token_cache() -> OrdersByTokenCache { - AppCache::new(ORDERS_CACHE_CAPACITY, ORDERS_CACHE_TTL) -} - pub(crate) async fn process_get_orders_by_token( ds: &dyn OrdersListDataSource, address: Address, @@ -54,73 +42,27 @@ pub(crate) async fn process_get_orders_by_token( ..Default::default() }; - let total_start = Instant::now(); let page_num = page.unwrap_or(1); let effective_page_size = page_size .unwrap_or(DEFAULT_PAGE_SIZE as u16) .min(MAX_PAGE_SIZE); - - let orders_stage_start = Instant::now(); let (orders, total_count) = ds .get_orders_list(filters, Some(page_num), Some(effective_page_size)) .await?; - let orders_stage_duration_ms = orders_stage_start.elapsed().as_millis(); - // Separate orders with non-zero output balance (worth quoting) from empty ones - let mut quotable_indices: Vec = Vec::new(); - let mut quotable_orders: Vec = - Vec::new(); - for (i, order) in orders.iter().enumerate() { - let has_balance = crate::routes::resolve_io_vaults(order) - .map(|(_, output)| { - output - .formatted_balance() - .parse::() - .is_ok_and(|b| b > 0.0) - }) - .unwrap_or(false); - if has_balance { - quotable_indices.push(i); - quotable_orders.push(order.clone()); - } - } - - let quotes_stage_start = Instant::now(); tracing::info!( - total_orders = orders.len(), - quotable_orders = quotable_orders.len(), - skipped_zero_balance = orders.len() - quotable_orders.len(), + quoted_orders = orders.len(), "fetching batched quotes for orders by token" ); - let quote_results = ds.get_order_quotes_batch("able_orders).await; - let quotes_stage_duration_ms = quotes_stage_start.elapsed().as_millis(); - - // Map quote results back to original order positions - let mut io_ratios: Vec = vec!["-".into(); orders.len()]; - for (qi, &original_idx) in quotable_indices.iter().enumerate() { - io_ratios[original_idx] = super::quote_result_to_io_ratio(&orders[original_idx], quote_results.get(qi).cloned().unwrap_or_else(|| Err(ApiError::Internal("missing quote".into())))); - } - - let mut summaries = Vec::with_capacity(orders.len()); - for (order, io_ratio) in orders.iter().zip(io_ratios.iter()) { - summaries.push(build_order_summary(order, io_ratio)?); - } - - let pagination = build_pagination(total_count, page_num.into(), effective_page_size.into()); - tracing::info!( - page = page_num, - page_size = effective_page_size, - returned_orders = summaries.len(), - total_orders = total_count, - orders_stage_duration_ms, - quotes_stage_duration_ms, - total_duration_ms = total_start.elapsed().as_millis(), - "orders by token request processed" - ); - Ok(OrdersListResponse { - orders: summaries, - pagination, - }) + let quote_results = ds.get_order_quotes_batch(&orders).await; + + build_orders_list_response( + &orders, + total_count, + page_num.into(), + effective_page_size.into(), + quote_results, + ) } #[utoipa::path( @@ -146,7 +88,6 @@ pub async fn get_orders_by_token( _global: GlobalRateLimit, _key: AuthenticatedKey, shared_raindex: &State, - orders_cache: &State, span: TracingSpan, address: ValidatedAddress, params: OrdersByTokenParams, @@ -155,24 +96,13 @@ pub async fn get_orders_by_token( tracing::info!(address = ?address, params = ?params, "request received"); let addr = address.0; let side = params.side; - let page = params.page.unwrap_or(1); - let page_size = params - .page_size - .unwrap_or(DEFAULT_PAGE_SIZE as u16) - .min(MAX_PAGE_SIZE); - let cache_key = (addr, side, page, page_size); - - let response = orders_cache - .get_or_try_insert(cache_key, || async { - let raindex = shared_raindex.read().await; - let ds = RaindexOrdersListDataSource { - client: raindex.client(), - }; - process_get_orders_by_token(&ds, addr, side, Some(page), Some(page_size)).await - }) - .await - .map_err(ApiError::from)?; - + let page = params.page; + let page_size = params.page_size; + let raindex = shared_raindex.read().await; + let ds = RaindexOrdersListDataSource { + client: raindex.client(), + }; + let response = process_get_orders_by_token(&ds, addr, side, page, page_size).await?; Ok(Json(response)) } .instrument(span.0) diff --git a/src/routes/orders/mod.rs b/src/routes/orders/mod.rs index 33eb380..760d8d4 100644 --- a/src/routes/orders/mod.rs +++ b/src/routes/orders/mod.rs @@ -207,10 +207,7 @@ impl<'a> OrdersListDataSource for RaindexOrdersListDataSource<'a> { .first() .map(RaindexOrder::chain_id) .unwrap_or_default(); - // Use small chunk size (4) to avoid exceeding public RPC eth_call gas - // limits, which would trigger expensive probe-and-split retries in the - // quote library. - fetch_order_quotes_batch(orders, None, Some(4)) + fetch_order_quotes_batch(orders, None, None) .await .map_err(|error| { tracing::error!( @@ -322,9 +319,6 @@ pub use get_by_owner::*; pub use get_by_token::*; pub use get_by_tx::*; -pub(crate) use get_by_owner::{orders_by_owner_cache, OrdersByOwnerCache}; -pub(crate) use get_by_token::{orders_by_token_cache, OrdersByTokenCache}; - pub fn routes() -> Vec { rocket::routes![ get_by_tx::get_orders_by_tx, diff --git a/src/routes/trades.rs b/src/routes/trades.rs index 12282de..3312030 100644 --- a/src/routes/trades.rs +++ b/src/routes/trades.rs @@ -1,772 +1,12 @@ use crate::auth::AuthenticatedKey; -use crate::cache::AppCache; use crate::error::{ApiError, ApiErrorResponse}; use crate::fairings::{GlobalRateLimit, TracingSpan}; -use crate::types::common::{TokenRef, ValidatedAddress, ValidatedFixedBytes}; -use crate::types::order::OrderTradeEntry; -use crate::types::trades::{ - TakerTradesResponse, TradeByAddress, TradeByTxEntry, TradeRequest, TradeResult, - TradesBatchEntry, TradesBatchRequest, TradesBatchResponse, TradesByAddressResponse, - TradesByTxResponse, TradesPagination, TradesPaginationParams, TradesTotals, -}; -use alloy::primitives::{Address, FixedBytes, B256}; -use async_trait::async_trait; -use futures::future::join_all; -use rain_math_float::Float; -use rain_orderbook_common::local_db::OrderbookIdentifier; -use rain_orderbook_common::raindex_client::orders::{GetOrdersFilters, RaindexOrder}; -use rain_orderbook_common::raindex_client::trades::RaindexTrade; -use rain_orderbook_common::raindex_client::{RaindexClient, RaindexError}; +use crate::types::common::{ValidatedAddress, ValidatedFixedBytes}; +use crate::types::trades::{TradesByAddressResponse, TradesByTxResponse, TradesPaginationParams}; use rocket::serde::json::Json; use rocket::{Route, State}; -use std::cmp::Reverse; -use std::ops::{Add, Div, Sub}; -use std::str::FromStr; -use std::time::Duration; -use std::time::Instant; use tracing::Instrument; -const ORDERS_SCAN_PAGE_SIZE: u16 = 50; -const FAST_INDEX_CHECK_ATTEMPTS: usize = 1; -const FAST_INDEX_CHECK_INTERVAL_MS: u64 = 0; -const TRADES_BY_ADDRESS_CACHE_TTL: Duration = Duration::from_secs(10); -const TRADES_BY_TX_CACHE_TTL: Duration = Duration::from_secs(300); -const TRADES_BY_ORDER_HASH_CACHE_TTL: Duration = Duration::from_secs(60); -const TRADES_CACHE_CAPACITY: u64 = 1_000; -const TRADES_BATCH_MAX_HASHES: usize = 50; - -type TradesByAddressCache = - AppCache<(Address, u32, u32, Option, Option), TradesByAddressResponse>; -type TradesByTxCache = AppCache; -type TradesByOrderHashCache = AppCache>; -type TakerTradesTxHashCache = AppCache>; - -const TAKER_TX_HASH_CACHE_TTL: Duration = Duration::from_secs(15); - -pub(crate) fn trades_by_address_cache() -> TradesByAddressCache { - AppCache::new(TRADES_CACHE_CAPACITY, TRADES_BY_ADDRESS_CACHE_TTL) -} - -pub(crate) fn trades_by_tx_cache() -> TradesByTxCache { - AppCache::new(TRADES_CACHE_CAPACITY, TRADES_BY_TX_CACHE_TTL) -} - -pub(crate) fn trades_by_order_hash_cache() -> TradesByOrderHashCache { - AppCache::new(TRADES_CACHE_CAPACITY, TRADES_BY_ORDER_HASH_CACHE_TTL) -} - -pub(crate) fn taker_trades_tx_hash_cache() -> TakerTradesTxHashCache { - AppCache::new(TRADES_CACHE_CAPACITY, TAKER_TX_HASH_CACHE_TTL) -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum TxIndexState { - Indexed, - NotYetIndexed, -} - -struct TradeWithOwner { - owner: Address, - trade: RaindexTrade, -} - -#[async_trait] -trait TradesDataSource: Send + Sync { - async fn get_orders( - &self, - filters: GetOrdersFilters, - page: Option, - page_size: Option, - ) -> Result<(Vec, u32), ApiError>; - - async fn get_order_trades( - &self, - order: &RaindexOrder, - start_time: Option, - end_time: Option, - ) -> Result, ApiError>; - - async fn get_trades_by_tx( - &self, - tx_hash: B256, - known_order_hashes: Option>, - ) -> Result, ApiError> { - let orders = match known_order_hashes { - Some(hashes) if !hashes.is_empty() => { - // Targeted lookup: find only the specific orders we know about - let mut found = Vec::with_capacity(hashes.len()); - for hash in hashes { - if let Some(order) = self.find_order_by_hash(hash).await? { - found.push(order); - } - } - found - } - _ => { - // Fallback: fetch all active orders (expensive) - fetch_all_orders( - self, - GetOrdersFilters { - active: Some(true), - ..Default::default() - }, - ) - .await? - } - }; - let trades_with_owner = load_trades_with_owners(self, &orders, None, None).await?; - - Ok(trades_with_owner - .into_iter() - .filter(|trade_with_owner| trade_with_owner.trade.transaction().id() == tx_hash) - .collect()) - } - - async fn find_order_by_hash(&self, hash: B256) -> Result, ApiError>; - - async fn check_tx_index_state(&self, tx_hash: B256) -> Result; -} - -struct RaindexTradesDataSource<'a> { - client: &'a RaindexClient, -} - -#[async_trait] -impl TradesDataSource for RaindexTradesDataSource<'_> { - async fn get_orders( - &self, - filters: GetOrdersFilters, - page: Option, - page_size: Option, - ) -> Result<(Vec, u32), ApiError> { - let result = self - .client - .get_orders(None, Some(filters), page, page_size) - .await - .map_err(|e| { - tracing::error!(error = %e, "failed to query orders"); - ApiError::Internal("failed to query orders".into()) - })?; - Ok((result.orders().to_vec(), result.total_count())) - } - - async fn get_order_trades( - &self, - order: &RaindexOrder, - start_time: Option, - end_time: Option, - ) -> Result, ApiError> { - order - .get_trades_list(start_time, end_time, None) - .await - .map_err(|e| { - tracing::error!(error = %e, order_hash = ?order.order_hash(), "failed to query order trades"); - ApiError::Internal("failed to query order trades".into()) - }) - } - - async fn find_order_by_hash(&self, hash: B256) -> Result, ApiError> { - let orderbooks = self.client.get_all_orderbooks().map_err(|e| { - tracing::error!(error = %e, "failed to get orderbooks"); - ApiError::Internal("failed to get orderbooks".into()) - })?; - - for orderbook in orderbooks.values() { - let ob_id = OrderbookIdentifier::new(orderbook.network.chain_id, orderbook.address); - match self.client.get_order_by_hash(&ob_id, hash).await { - Ok(order) => return Ok(Some(order)), - Err(RaindexError::OrderNotFound(..)) => continue, - Err(e) => { - tracing::warn!( - order_hash = %hash, - error = %e, - "error looking up order by hash" - ); - continue; - } - } - } - Ok(None) - } - - async fn check_tx_index_state(&self, tx_hash: B256) -> Result { - let orderbooks = self.client.get_all_orderbooks().map_err(|e| { - tracing::error!(error = %e, "failed to get orderbooks"); - ApiError::Internal("failed to get orderbooks".into()) - })?; - - let mut saw_timeout = false; - - for orderbook in orderbooks.values() { - match self - .client - .get_transaction( - orderbook.network.chain_id, - orderbook.address, - tx_hash, - Some(FAST_INDEX_CHECK_ATTEMPTS), - Some(FAST_INDEX_CHECK_INTERVAL_MS), - ) - .await - { - Ok(_) => return Ok(TxIndexState::Indexed), - Err(RaindexError::TransactionIndexingTimeout { .. }) => { - saw_timeout = true; - } - Err(err) => { - tracing::error!( - error = %err, - tx_hash = %tx_hash, - chain_id = orderbook.network.chain_id, - orderbook = %orderbook.address, - "failed to query transaction status" - ); - return Err(ApiError::Internal("failed to query transaction".into())); - } - } - } - - if saw_timeout { - return Ok(TxIndexState::NotYetIndexed); - } - Ok(TxIndexState::Indexed) - } -} - -fn to_u64(value: alloy::primitives::U256, field: &'static str) -> Result { - value.try_into().map_err(|_| { - tracing::error!(field, "value does not fit into u64"); - ApiError::Internal(format!("{field} overflow")) - }) -} - -fn parse_trade_order_hash(order_hash: alloy::primitives::Bytes) -> Result { - let hash = order_hash.to_string(); - B256::from_str(&hash).map_err(|e| { - tracing::error!(error = %e, order_hash = %hash, "invalid trade order hash"); - ApiError::Internal("invalid trade order hash".into()) - }) -} - -fn maybe_parse_trade_order_hash(order_hash: alloy::primitives::Bytes) -> Option> { - FixedBytes::<32>::from_str(&order_hash.to_string()).ok() -} - -fn format_float(value: Float, context: &'static str) -> Result { - value.format().map_err(|e| { - tracing::error!(error = %e, context, "float formatting failed"); - ApiError::Internal(format!("{context} calculation failed")) - }) -} - -fn positive_output(output_amount: Float) -> Result { - Float::zero() - .map_err(|e| { - tracing::error!(error = %e, "float zero construction failed"); - ApiError::Internal("io ratio calculation failed".into()) - })? - .sub(output_amount) - .map_err(|e| { - tracing::error!(error = %e, "failed to negate output amount"); - ApiError::Internal("io ratio calculation failed".into()) - }) -} - -fn compute_io_ratio(input_amount: Float, output_amount: Float) -> Result { - let positive_output = positive_output(output_amount)?; - let zero = Float::zero().map_err(|e| { - tracing::error!(error = %e, "float zero construction failed"); - ApiError::Internal("io ratio calculation failed".into()) - })?; - if positive_output.eq(zero).unwrap_or(true) { - return Ok("0".into()); - } - let ratio = input_amount.div(positive_output).map_err(|e| { - tracing::error!(error = %e, "failed to compute io ratio"); - ApiError::Internal("io ratio calculation failed".into()) - })?; - format_float(ratio, "io ratio") -} - -async fn fetch_all_orders( - ds: &T, - filters: GetOrdersFilters, -) -> Result, ApiError> { - let mut all_orders = Vec::new(); - let mut page: u16 = 1; - - loop { - let (orders, _total_count) = ds - .get_orders(filters.clone(), Some(page), Some(ORDERS_SCAN_PAGE_SIZE)) - .await?; - let batch_len = orders.len(); - all_orders.extend(orders); - - if batch_len < ORDERS_SCAN_PAGE_SIZE as usize { - break; - } - page = page.saturating_add(1); - if page == u16::MAX { - break; - } - } - - Ok(all_orders) -} - -async fn load_trades_with_owners( - ds: &T, - orders: &[RaindexOrder], - start_time: Option, - end_time: Option, -) -> Result, ApiError> { - let trade_results = join_all( - orders - .iter() - .map(|order| ds.get_order_trades(order, start_time, end_time)), - ) - .await; - - let mut all_trades = Vec::new(); - for (order, trades_result) in orders.iter().zip(trade_results) { - let owner = order.owner(); - for trade in trades_result? { - all_trades.push(TradeWithOwner { owner, trade }); - } - } - - Ok(all_trades) -} - -async fn process_get_trades_by_tx( - ds: &dyn TradesDataSource, - tx_hash: B256, - known_order_hashes: Option>, -) -> Result { - let lookup_started = Instant::now(); - let matching_trades = ds.get_trades_by_tx(tx_hash, known_order_hashes).await?; - let lookup_duration_ms = lookup_started.elapsed().as_millis() as u64; - - if matching_trades.is_empty() { - let index_check_started = Instant::now(); - match ds.check_tx_index_state(tx_hash).await? { - TxIndexState::NotYetIndexed => { - tracing::info!( - tx_hash = %tx_hash, - tx_lookup_duration_ms = lookup_duration_ms, - index_check_duration_ms = index_check_started.elapsed().as_millis() as u64, - "transaction trades lookup found no indexed results yet" - ); - return Err(ApiError::NotYetIndexed(format!( - "transaction {tx_hash:#x} not yet indexed" - ))); - } - TxIndexState::Indexed => { - tracing::info!( - tx_hash = %tx_hash, - tx_lookup_duration_ms = lookup_duration_ms, - index_check_duration_ms = index_check_started.elapsed().as_millis() as u64, - "transaction trades lookup found no matching trades" - ); - return Err(ApiError::NotFound( - "transaction has no associated trades".into(), - )); - } - } - } - - let first_tx = matching_trades[0].trade.transaction(); - let mut total_input = Float::zero().map_err(|e| { - tracing::error!(error = %e, "float zero construction failed"); - ApiError::Internal("trade totals calculation failed".into()) - })?; - let mut total_output = Float::zero().map_err(|e| { - tracing::error!(error = %e, "float zero construction failed"); - ApiError::Internal("trade totals calculation failed".into()) - })?; - - let mut entries = Vec::with_capacity(matching_trades.len()); - for trade_with_owner in matching_trades { - let trade = trade_with_owner.trade; - let input_change = trade.input_vault_balance_change(); - let output_change = trade.output_vault_balance_change(); - let io_ratio = compute_io_ratio(input_change.amount(), output_change.amount())?; - let order_hash = parse_trade_order_hash(trade.order_hash())?; - - total_input = total_input.add(input_change.amount()).map_err(|e| { - tracing::error!(error = %e, "failed to sum total input"); - ApiError::Internal("trade totals calculation failed".into()) - })?; - total_output = total_output - .add(positive_output(output_change.amount())?) - .map_err(|e| { - tracing::error!(error = %e, "failed to sum total output"); - ApiError::Internal("trade totals calculation failed".into()) - })?; - - entries.push(TradeByTxEntry { - order_hash, - order_owner: trade_with_owner.owner, - request: TradeRequest { - input_token: input_change.token().address(), - output_token: output_change.token().address(), - maximum_input: input_change.formatted_amount(), - maximum_io_ratio: io_ratio.clone(), - }, - result: TradeResult { - input_amount: input_change.formatted_amount(), - output_amount: output_change.formatted_amount(), - actual_io_ratio: io_ratio, - }, - }); - } - - let zero = Float::zero().map_err(|e| { - tracing::error!(error = %e, "float zero construction failed"); - ApiError::Internal("trade totals calculation failed".into()) - })?; - let average_io_ratio = if total_output.eq(zero).unwrap_or(true) { - zero - } else { - total_input.div(total_output).map_err(|e| { - tracing::error!(error = %e, "failed to compute average io ratio"); - ApiError::Internal("trade totals calculation failed".into()) - })? - }; - - tracing::info!( - tx_hash = %tx_hash, - trade_count = entries.len(), - tx_lookup_duration_ms = lookup_duration_ms, - "resolved trades by tx" - ); - - Ok(TradesByTxResponse { - tx_hash, - block_number: to_u64(first_tx.block_number(), "block number")?, - timestamp: to_u64(first_tx.timestamp(), "timestamp")?, - sender: first_tx.from(), - trades: entries, - totals: TradesTotals { - total_input_amount: format_float(total_input, "trade totals")?, - total_output_amount: format_float(total_output, "trade totals")?, - average_io_ratio: format_float(average_io_ratio, "trade totals")?, - }, - }) -} - -async fn process_get_trades_by_address( - ds: &dyn TradesDataSource, - direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>, - owner: Address, - params: TradesPaginationParams, -) -> Result { - let start = Instant::now(); - - let all_orders = fetch_all_orders( - ds, - GetOrdersFilters { - owners: vec![owner], - active: Some(true), - ..Default::default() - }, - ) - .await?; - - let orders_duration_ms = start.elapsed().as_millis() as u64; - tracing::info!( - owner = %owner, - order_count = all_orders.len(), - orders_duration_ms, - "fetched orders for trades-by-address" - ); - - let trades = if let Some(fetcher) = direct_trades { - // Fast path: batch SQLite query via DirectTradesFetcher - let order_hashes: Vec = all_orders.iter().map(|o| o.order_hash()).collect(); - - // Build order_hash → token info lookup - let mut token_map: std::collections::HashMap = - std::collections::HashMap::new(); - for order in &all_orders { - if let Ok((input_vault, output_vault)) = super::resolve_io_vaults(order) { - let input_token_info = input_vault.token(); - let output_token_info = output_vault.token(); - token_map.insert( - order.order_hash(), - ( - TokenRef { - address: input_token_info.address(), - symbol: input_token_info.symbol().unwrap_or_default(), - decimals: input_token_info.decimals(), - }, - TokenRef { - address: output_token_info.address(), - symbol: output_token_info.symbol().unwrap_or_default(), - decimals: output_token_info.decimals(), - }, - ), - ); - } - } - - let batch_start = Instant::now(); - match fetcher.batch_fetch(&order_hashes).await { - Ok(batch_result) => { - let batch_duration_ms = batch_start.elapsed().as_millis() as u64; - tracing::info!( - owner = %owner, - order_count = order_hashes.len(), - batch_duration_ms, - "direct batch trades completed for trades-by-address" - ); - - let mut trades = Vec::new(); - for (order_hash, entries) in &batch_result { - let (input_token, output_token) = - token_map.get(order_hash).cloned().unwrap_or_else(|| { - ( - TokenRef { - address: Address::ZERO, - symbol: String::new(), - decimals: 0, - }, - TokenRef { - address: Address::ZERO, - symbol: String::new(), - decimals: 0, - }, - ) - }); - - for entry in entries { - // Apply time filters if specified - if let Some(start_time) = params.start_time { - if entry.timestamp < start_time { - continue; - } - } - if let Some(end_time) = params.end_time { - if entry.timestamp > end_time { - continue; - } - } - - trades.push(TradeByAddress { - tx_hash: entry.tx_hash, - input_amount: entry.input_amount.clone(), - output_amount: entry.output_amount.clone(), - input_token: input_token.clone(), - output_token: output_token.clone(), - order_hash: Some(*order_hash), - timestamp: entry.timestamp, - block_number: 0, // not available from DirectTradesFetcher - }); - } - } - trades - } - Err(e) => { - tracing::warn!( - error = %e, - owner = %owner, - "direct batch trades failed for trades-by-address; falling back to library" - ); - // Fallback to slow path - build_trades_from_library(ds, &all_orders, ¶ms).await? - } - } - } else { - // Slow path: N individual queries via library - build_trades_from_library(ds, &all_orders, ¶ms).await? - }; - - let mut trades = trades; - trades.sort_by_key(|t| (Reverse(t.timestamp), Reverse(t.block_number))); - - let page = params.page.unwrap_or(1); - let page_size = params.page_size.unwrap_or(20); - let total_trades = trades.len() as u64; - let total_pages = if page_size == 0 { - 0 - } else { - total_trades.div_ceil(u64::from(page_size)) - }; - - let offset = (u64::from(page.saturating_sub(1)) * u64::from(page_size)) as usize; - let paginated = if offset >= trades.len() { - Vec::new() - } else { - let end = std::cmp::min(offset + page_size as usize, trades.len()); - trades[offset..end].to_vec() - }; - - tracing::info!( - owner = %owner, - page, - page_size, - total_trades, - returned_trades = paginated.len(), - total_duration_ms = start.elapsed().as_millis() as u64, - "resolved trades by address" - ); - - Ok(TradesByAddressResponse { - trades: paginated, - pagination: TradesPagination { - page, - page_size, - total_trades, - total_pages, - has_more: u64::from(page) < total_pages, - }, - }) -} - -async fn build_trades_from_library( - ds: &dyn TradesDataSource, - orders: &[RaindexOrder], - params: &TradesPaginationParams, -) -> Result, ApiError> { - let trades_with_owner = - load_trades_with_owners(ds, orders, params.start_time, params.end_time).await?; - - let mut trades = Vec::with_capacity(trades_with_owner.len()); - for trade_with_owner in trades_with_owner { - let trade = trade_with_owner.trade; - let input_change = trade.input_vault_balance_change(); - let output_change = trade.output_vault_balance_change(); - let input_token = input_change.token(); - let output_token = output_change.token(); - trades.push(TradeByAddress { - tx_hash: trade.transaction().id(), - input_amount: input_change.formatted_amount(), - output_amount: output_change.formatted_amount(), - input_token: TokenRef { - address: input_token.address(), - symbol: input_token.symbol().unwrap_or_default(), - decimals: input_token.decimals(), - }, - output_token: TokenRef { - address: output_token.address(), - symbol: output_token.symbol().unwrap_or_default(), - decimals: output_token.decimals(), - }, - order_hash: maybe_parse_trade_order_hash(trade.order_hash()), - timestamp: to_u64(trade.timestamp(), "timestamp")?, - block_number: to_u64(trade.transaction().block_number(), "block number")?, - }); - } - Ok(trades) -} - -async fn get_cached_trades_by_tx( - cache: &TradesByTxCache, - ds: &dyn TradesDataSource, - tx_hash: B256, - known_order_hashes: Option>, -) -> Result { - cache - .get_or_try_insert(tx_hash, || async move { - process_get_trades_by_tx(ds, tx_hash, known_order_hashes).await - }) - .await - .map_err(ApiError::from) -} - -async fn get_cached_trades_by_address( - cache: &TradesByAddressCache, - ds: &dyn TradesDataSource, - direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>, - owner: Address, - params: TradesPaginationParams, -) -> Result { - let cache_key = ( - owner, - params.page.unwrap_or(1), - params.page_size.unwrap_or(20), - params.start_time, - params.end_time, - ); - cache - .get_or_try_insert(cache_key, || async move { - process_get_trades_by_address(ds, direct_trades, owner, params).await - }) - .await - .map_err(ApiError::from) -} - -async fn process_get_taker_trades( - ds: &dyn TradesDataSource, - direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>, - trades_by_tx_cache: &TradesByTxCache, - taker_tx_cache: &TakerTradesTxHashCache, - sender: Address, - params: TradesPaginationParams, -) -> Result { - // Step 1: Get tx hashes (cached) - let tx_hashes = match direct_trades { - Some(fetcher) => taker_tx_cache - .get_or_try_insert(sender, || async { - fetcher.fetch_taker_tx_hashes(&sender).await - }) - .await - .map_err(ApiError::from)?, - None => { - tracing::warn!("direct trades fetcher unavailable; returning empty taker trades"); - return Ok(TakerTradesResponse { - market_orders: vec![], - pagination: TradesPagination { - page: 1, - page_size: params.page_size.unwrap_or(20), - total_trades: 0, - total_pages: 0, - has_more: false, - }, - }); - } - }; - - // Step 2: Paginate - let page = params.page.unwrap_or(1); - let page_size = params.page_size.unwrap_or(20); - let total = tx_hashes.len() as u64; - let total_pages = if page_size == 0 { - 0 - } else { - total.div_ceil(u64::from(page_size)) - }; - let offset = (u64::from(page.saturating_sub(1)) * u64::from(page_size)) as usize; - let page_hashes: Vec = if offset >= tx_hashes.len() { - vec![] - } else { - let end = std::cmp::min(offset + page_size as usize, tx_hashes.len()); - tx_hashes[offset..end].iter().map(|(h, _)| *h).collect() - }; - - // Step 3: Resolve each tx via existing cached trade-by-tx lookup - let mut market_orders = Vec::with_capacity(page_hashes.len()); - for tx_hash in page_hashes { - match get_cached_trades_by_tx(trades_by_tx_cache, ds, tx_hash, None).await { - Ok(tx_trades) => market_orders.push(tx_trades), - Err(e) => { - tracing::warn!(tx_hash = %tx_hash, error = %e, "failed to resolve taker tx; skipping"); - } - } - } - - Ok(TakerTradesResponse { - market_orders, - pagination: TradesPagination { - page, - page_size, - total_trades: total, - total_pages, - has_more: u64::from(page) < total_pages, - }, - }) -} - #[utoipa::path( get, path = "/v1/trades/tx/{tx_hash}", @@ -789,67 +29,13 @@ pub async fn get_trades_by_tx( _global: GlobalRateLimit, _key: AuthenticatedKey, shared_raindex: &State, - trades_by_tx_cache: &State, - direct_trades: &State>, span: TracingSpan, tx_hash: ValidatedFixedBytes, ) -> Result, ApiError> { async move { tracing::info!(tx_hash = ?tx_hash, "request received"); - - // Use DirectTradesFetcher to find which order hashes have trades in this tx, - // so we don't need to fetch ALL orders from the subgraph. - let raindex = shared_raindex.read().await; - let ds = RaindexTradesDataSource { - client: raindex.client(), - }; - - let known_order_hashes = if let Some(fetcher) = direct_trades.inner().as_ref() { - match fetcher.fetch_by_tx_hash(&tx_hash.0).await { - Ok(trades_map) if !trades_map.is_empty() => { - let hashes: Vec = trades_map.keys().copied().collect(); - tracing::info!( - tx_hash = ?tx_hash, - order_count = hashes.len(), - "direct trades fetcher found order hashes for tx" - ); - Some(hashes) - } - Ok(_) => { - // DirectTradesFetcher found no trades — skip expensive full scan - // and go directly to the index state check - tracing::info!(tx_hash = ?tx_hash, "direct trades fetcher found no trades for tx"); - match ds.check_tx_index_state(tx_hash.0).await? { - TxIndexState::NotYetIndexed => { - return Err(ApiError::NotYetIndexed(format!( - "transaction {:#x} not yet indexed", - tx_hash.0 - ))); - } - TxIndexState::Indexed => { - return Err(ApiError::NotFound( - "transaction has no associated trades".into(), - )); - } - } - } - Err(e) => { - tracing::warn!( - tx_hash = ?tx_hash, - error = %e, - "direct trades fetcher failed, falling back to full scan" - ); - None - } - } - } else { - None - }; - - let response = - get_cached_trades_by_tx(trades_by_tx_cache, &ds, tx_hash.0, known_order_hashes) - .await?; - Ok(Json(response)) + let _raindex = shared_raindex.read().await; + todo!() } .instrument(span.0) .await @@ -877,589 +63,19 @@ pub async fn get_trades_by_address( _global: GlobalRateLimit, _key: AuthenticatedKey, shared_raindex: &State, - trades_by_address_cache: &State, - direct_trades: &State>, span: TracingSpan, address: ValidatedAddress, params: TradesPaginationParams, ) -> Result, ApiError> { async move { tracing::info!(address = ?address, params = ?params, "request received"); - let raindex = shared_raindex.read().await; - let ds = RaindexTradesDataSource { - client: raindex.client(), - }; - let response = get_cached_trades_by_address( - trades_by_address_cache, - &ds, - direct_trades.inner().as_ref(), - address.0, - params, - ) - .await?; - Ok(Json(response)) - } - .instrument(span.0) - .await -} - -#[utoipa::path( - get, - path = "/v1/trades/taker/{address}", - tag = "Trades", - security(("basicAuth" = [])), - params( - ("address" = String, Path, description = "Taker address"), - TradesPaginationParams, - ), - responses( - (status = 200, description = "Paginated list of market orders (taker transactions)", body = TakerTradesResponse), - (status = 400, description = "Bad request", body = ApiErrorResponse), - (status = 401, description = "Unauthorized", body = ApiErrorResponse), - (status = 429, description = "Rate limited", body = ApiErrorResponse), - (status = 500, description = "Internal server error", body = ApiErrorResponse), - ) -)] -#[get("/taker/
?")] -pub async fn get_taker_trades( - _global: GlobalRateLimit, - _key: AuthenticatedKey, - shared_raindex: &State, - trades_by_tx_cache: &State, - taker_tx_cache: &State, - direct_trades: &State>, - span: TracingSpan, - address: ValidatedAddress, - params: TradesPaginationParams, -) -> Result, ApiError> { - async move { - tracing::info!(address = ?address, params = ?params, "taker trades request received"); - let raindex = shared_raindex.read().await; - let ds = RaindexTradesDataSource { - client: raindex.client(), - }; - let response = process_get_taker_trades( - &ds, - direct_trades.inner().as_ref(), - trades_by_tx_cache, - taker_tx_cache, - address.0, - params, - ) - .await?; - Ok(Json(response)) - } - .instrument(span.0) - .await -} - -async fn fetch_trades_for_hash( - ds: &dyn TradesDataSource, - hash: B256, -) -> Result, ApiError> { - let order = match ds.find_order_by_hash(hash).await? { - Some(o) => o, - None => return Ok(vec![]), - }; - let trades = ds.get_order_trades(&order, None, None).await?; - Ok(trades.iter().map(super::order::map_trade).collect()) -} - -async fn process_trades_batch( - ds: &dyn TradesDataSource, - cache: &TradesByOrderHashCache, - direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>, - hashes: Vec, -) -> Result { - let total_start = Instant::now(); - - let mut cached_map: std::collections::HashMap> = - std::collections::HashMap::new(); - let mut uncached: Vec = Vec::new(); - - for &hash in &hashes { - if let Some(trades) = cache.get(&hash).await { - cached_map.insert(hash, trades); - } else { - uncached.push(hash); - } - } - - tracing::info!( - total_hashes = hashes.len(), - cached = cached_map.len(), - uncached = uncached.len(), - "batch trades cache check" - ); - - if !uncached.is_empty() { - if let Some(fetcher) = direct_trades { - // Fast path: single batch query via direct SQLite connection - match fetcher.batch_fetch(&uncached).await { - Ok(batch_result) => { - for &hash in &uncached { - let trades = batch_result.get(&hash).cloned().unwrap_or_default(); - cache.insert(hash, trades.clone()).await; - cached_map.insert(hash, trades); - } - } - Err(e) => { - tracing::warn!(error = %e, "direct batch trades failed; falling back to library"); - let results = - join_all(uncached.iter().map(|&hash| fetch_trades_for_hash(ds, hash))) - .await; - for (&hash, result) in uncached.iter().zip(results) { - match result { - Ok(trades) => { - cache.insert(hash, trades.clone()).await; - cached_map.insert(hash, trades); - } - Err(e) => { - tracing::warn!(order_hash = %hash, error = %e, "failed to fetch trades for order in batch"); - cached_map.insert(hash, vec![]); - } - } - } - } - } - } else { - // Fallback: N parallel queries via library - let results = - join_all(uncached.iter().map(|&hash| fetch_trades_for_hash(ds, hash))).await; - for (&hash, result) in uncached.iter().zip(results) { - match result { - Ok(trades) => { - cache.insert(hash, trades.clone()).await; - cached_map.insert(hash, trades); - } - Err(e) => { - tracing::warn!(order_hash = %hash, error = %e, "failed to fetch trades for order in batch"); - cached_map.insert(hash, vec![]); - } - } - } - } - } - - let entries = hashes - .iter() - .map(|hash| TradesBatchEntry { - order_hash: *hash, - trades: cached_map.remove(hash).unwrap_or_default(), - }) - .collect(); - - tracing::info!( - total_duration_ms = total_start.elapsed().as_millis(), - total_hashes = hashes.len(), - "batch trades request processed" - ); - - Ok(TradesBatchResponse { orders: entries }) -} - -#[utoipa::path( - post, - path = "/v1/trades/batch", - tag = "Trades", - security(("basicAuth" = [])), - request_body = TradesBatchRequest, - responses( - (status = 200, description = "Trades grouped by order hash", body = TradesBatchResponse), - (status = 400, description = "Bad request", body = ApiErrorResponse), - (status = 401, description = "Unauthorized", body = ApiErrorResponse), - (status = 429, description = "Rate limited", body = ApiErrorResponse), - (status = 500, description = "Internal server error", body = ApiErrorResponse), - ) -)] -#[post("/batch", data = "")] -pub async fn post_trades_batch( - _global: GlobalRateLimit, - _key: AuthenticatedKey, - shared_raindex: &State, - trades_by_order_hash_cache: &State, - direct_trades: &State>, - span: TracingSpan, - body: Json, -) -> Result, ApiError> { - async move { - tracing::info!( - hash_count = body.order_hashes.len(), - "batch trades request received" - ); - - if body.order_hashes.is_empty() { - return Ok(Json(TradesBatchResponse { orders: vec![] })); - } - - if body.order_hashes.len() > TRADES_BATCH_MAX_HASHES { - return Err(ApiError::BadRequest(format!( - "maximum {} order hashes per batch request", - TRADES_BATCH_MAX_HASHES - ))); - } - - let raindex = shared_raindex.read().await; - let ds = RaindexTradesDataSource { - client: raindex.client(), - }; - let response = process_trades_batch( - &ds, - trades_by_order_hash_cache, - direct_trades.inner().as_ref(), - body.into_inner().order_hashes, - ) - .await?; - Ok(Json(response)) + let _raindex = shared_raindex.read().await; + todo!() } .instrument(span.0) .await } pub fn routes() -> Vec { - rocket::routes![ - get_trades_by_tx, - get_taker_trades, - get_trades_by_address, - post_trades_batch - ] -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::routes::order::test_fixtures::{mock_order, mock_trade}; - use crate::test_helpers::{basic_auth_header, seed_api_key, TestClientBuilder}; - use rocket::http::{Header, Status}; - - struct MockTradesDataSource { - orders_result: Result<(Vec, u32), ApiError>, - trades_result: Result, ApiError>, - tx_index_state: Result, - } - - #[async_trait] - impl TradesDataSource for MockTradesDataSource { - async fn get_orders( - &self, - _filters: GetOrdersFilters, - _page: Option, - _page_size: Option, - ) -> Result<(Vec, u32), ApiError> { - self.orders_result.clone() - } - - async fn get_order_trades( - &self, - _order: &RaindexOrder, - _start_time: Option, - _end_time: Option, - ) -> Result, ApiError> { - self.trades_result.clone() - } - - async fn find_order_by_hash(&self, _hash: B256) -> Result, ApiError> { - match &self.orders_result { - Ok((orders, _)) => Ok(orders.first().cloned()), - Err(_) => Err(ApiError::Internal("failed to find order".into())), - } - } - - async fn check_tx_index_state(&self, _tx_hash: B256) -> Result { - self.tx_index_state.clone() - } - } - - fn tx_hash() -> B256 { - "0x0000000000000000000000000000000000000000000000000000000000000088" - .parse() - .unwrap() - } - - #[rocket::async_test] - async fn test_process_get_trades_by_tx_success() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![mock_order()], 1)), - trades_result: Ok(vec![mock_trade()]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - - let response = process_get_trades_by_tx(&ds, tx_hash(), None) - .await - .unwrap(); - - assert_eq!(response.trades.len(), 1); - assert_eq!(response.block_number, 100); - assert_eq!(response.timestamp, 1700001000); - assert_eq!( - response.sender.to_string(), - "0x0000000000000000000000000000000000000002" - ); - } - - #[rocket::async_test] - async fn test_process_get_trades_by_tx_not_yet_indexed() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![mock_order()], 1)), - trades_result: Ok(vec![]), - tx_index_state: Ok(TxIndexState::NotYetIndexed), - }; - - let result = process_get_trades_by_tx(&ds, tx_hash(), None).await; - assert!(matches!(result, Err(ApiError::NotYetIndexed(_)))); - } - - #[rocket::async_test] - async fn test_process_get_trades_by_address_success() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![mock_order()], 1)), - trades_result: Ok(vec![mock_trade()]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - - let response = process_get_trades_by_address( - &ds, - None, - "0x0000000000000000000000000000000000000001" - .parse() - .unwrap(), - TradesPaginationParams { - page: Some(1), - page_size: Some(20), - start_time: None, - end_time: None, - }, - ) - .await - .unwrap(); - - assert_eq!(response.trades.len(), 1); - assert_eq!(response.pagination.total_trades, 1); - assert_eq!(response.pagination.total_pages, 1); - assert!(!response.pagination.has_more); - } - - #[rocket::async_test] - async fn test_get_cached_trades_by_tx_reuses_cached_response() { - let cache = trades_by_tx_cache(); - let ds = MockTradesDataSource { - orders_result: Ok((vec![mock_order()], 1)), - trades_result: Ok(vec![mock_trade()]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - - let first = get_cached_trades_by_tx(&cache, &ds, tx_hash(), None) - .await - .unwrap(); - let second = get_cached_trades_by_tx(&cache, &ds, tx_hash(), None) - .await - .unwrap(); - - assert_eq!(first.trades.len(), 1); - assert_eq!(second.trades.len(), 1); - assert_eq!(cache.get(&tx_hash()).await.unwrap().trades.len(), 1); - } - - #[rocket::async_test] - async fn test_get_cached_trades_by_address_reuses_cached_response() { - let cache = trades_by_address_cache(); - let ds = MockTradesDataSource { - orders_result: Ok((vec![mock_order()], 1)), - trades_result: Ok(vec![mock_trade()]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - let owner: Address = "0x0000000000000000000000000000000000000001" - .parse() - .unwrap(); - let params = TradesPaginationParams { - page: Some(1), - page_size: Some(20), - start_time: None, - end_time: None, - }; - - let first = get_cached_trades_by_address(&cache, &ds, None, owner, params.clone()) - .await - .unwrap(); - let second = get_cached_trades_by_address(&cache, &ds, None, owner, params.clone()) - .await - .unwrap(); - - assert_eq!(first.trades.len(), 1); - assert_eq!(second.trades.len(), 1); - assert_eq!( - cache - .get(&(owner, 1, 20, None, None)) - .await - .unwrap() - .trades - .len(), - 1 - ); - } - - #[rocket::async_test] - async fn test_get_trades_by_tx_401_without_auth() { - let client = TestClientBuilder::new().build().await; - let response = client - .get("/v1/trades/tx/0x0000000000000000000000000000000000000000000000000000000000000088") - .dispatch() - .await; - assert_eq!(response.status(), Status::Unauthorized); - } - - #[rocket::async_test] - async fn test_get_trades_by_address_401_without_auth() { - let client = TestClientBuilder::new().build().await; - let response = client - .get("/v1/trades/0x0000000000000000000000000000000000000001") - .dispatch() - .await; - assert_eq!(response.status(), Status::Unauthorized); - } - - #[rocket::async_test] - async fn test_process_trades_batch_success() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![mock_order()], 1)), - trades_result: Ok(vec![mock_trade()]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - let cache = trades_by_order_hash_cache(); - let hash: B256 = "0x000000000000000000000000000000000000000000000000000000000000abcd" - .parse() - .unwrap(); - - let response = process_trades_batch(&ds, &cache, None, vec![hash]) - .await - .unwrap(); - - assert_eq!(response.orders.len(), 1); - assert_eq!(response.orders[0].order_hash, hash); - assert_eq!(response.orders[0].trades.len(), 1); - } - - #[rocket::async_test] - async fn test_process_trades_batch_caches_results() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![mock_order()], 1)), - trades_result: Ok(vec![mock_trade()]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - let cache = trades_by_order_hash_cache(); - let hash: B256 = "0x000000000000000000000000000000000000000000000000000000000000abcd" - .parse() - .unwrap(); - - let _ = process_trades_batch(&ds, &cache, None, vec![hash]) - .await - .unwrap(); - let cached = cache.get(&hash).await; - assert!(cached.is_some()); - assert_eq!(cached.unwrap().len(), 1); - } - - #[rocket::async_test] - async fn test_process_trades_batch_empty_hashes() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![], 0)), - trades_result: Ok(vec![]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - let cache = trades_by_order_hash_cache(); - - let response = process_trades_batch(&ds, &cache, None, vec![]) - .await - .unwrap(); - assert!(response.orders.is_empty()); - } - - #[rocket::async_test] - async fn test_process_trades_batch_order_not_found_returns_empty_trades() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![], 0)), - trades_result: Ok(vec![]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - let cache = trades_by_order_hash_cache(); - let hash: B256 = "0x0000000000000000000000000000000000000000000000000000000000001234" - .parse() - .unwrap(); - - let response = process_trades_batch(&ds, &cache, None, vec![hash]) - .await - .unwrap(); - - assert_eq!(response.orders.len(), 1); - assert!(response.orders[0].trades.is_empty()); - } - - #[rocket::async_test] - async fn test_post_trades_batch_401_without_auth() { - let client = TestClientBuilder::new().build().await; - let response = client - .post("/v1/trades/batch") - .body(r#"{"orderHashes":[]}"#) - .dispatch() - .await; - assert_eq!(response.status(), Status::Unauthorized); - } - - #[rocket::async_test] - async fn test_get_trades_by_address_invalid_address_returns_422() { - let client = TestClientBuilder::new().build().await; - let (key_id, secret) = seed_api_key(&client).await; - let header = basic_auth_header(&key_id, &secret); - let response = client - .get("/v1/trades/not-an-address") - .header(Header::new("Authorization", header)) - .dispatch() - .await; - assert_eq!(response.status(), Status::UnprocessableEntity); - } - - #[rocket::async_test] - async fn test_process_get_taker_trades_without_direct_fetcher_returns_empty() { - let ds = MockTradesDataSource { - orders_result: Ok((vec![], 0)), - trades_result: Ok(vec![]), - tx_index_state: Ok(TxIndexState::Indexed), - }; - let tx_cache = trades_by_tx_cache(); - let taker_cache = taker_trades_tx_hash_cache(); - let sender: Address = "0x0000000000000000000000000000000000000001" - .parse() - .unwrap(); - - let result = process_get_taker_trades( - &ds, - None, // no direct fetcher - &tx_cache, - &taker_cache, - sender, - TradesPaginationParams { - page: Some(1), - page_size: Some(20), - start_time: None, - end_time: None, - }, - ) - .await - .unwrap(); - - assert!(result.market_orders.is_empty()); - assert_eq!(result.pagination.total_trades, 0); - assert_eq!(result.pagination.page, 1); - assert!(!result.pagination.has_more); - } - - #[rocket::async_test] - async fn test_get_taker_trades_401_without_auth() { - let client = TestClientBuilder::new().build().await; - let response = client - .get("/v1/trades/taker/0x0000000000000000000000000000000000000001") - .dispatch() - .await; - assert_eq!(response.status(), Status::Unauthorized); - } + rocket::routes![get_trades_by_tx, get_trades_by_address] } diff --git a/src/test_helpers.rs b/src/test_helpers.rs index 0c80d77..15fcc26 100644 --- a/src/test_helpers.rs +++ b/src/test_helpers.rs @@ -57,7 +57,7 @@ impl TestClientBuilder { let shared_raindex = tokio::sync::RwLock::new(raindex_config); let docs_dir = std::env::temp_dir().to_string_lossy().into_owned(); - let rocket = crate::rocket(pool, self.rate_limiter, shared_raindex, docs_dir, None) + let rocket = crate::rocket(pool, self.rate_limiter, shared_raindex, docs_dir) .expect("valid rocket instance"); Client::tracked(rocket).await.expect("valid client") diff --git a/src/types/orders.rs b/src/types/orders.rs index 6d9936f..63151c7 100644 --- a/src/types/orders.rs +++ b/src/types/orders.rs @@ -16,9 +16,7 @@ pub struct OrdersPaginationParams { pub page_size: Option, } -#[derive( - Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, FromFormField, ToSchema, -)] +#[derive(Debug, Clone, Serialize, Deserialize, FromFormField, ToSchema)] #[serde(rename_all = "camelCase")] pub enum OrderSide { Input, diff --git a/src/types/trades.rs b/src/types/trades.rs index 78d790a..9fb2668 100644 --- a/src/types/trades.rs +++ b/src/types/trades.rs @@ -123,31 +123,3 @@ pub struct TradesByTxResponse { pub trades: Vec, pub totals: TradesTotals, } - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TradesBatchRequest { - #[schema(value_type = Vec)] - pub order_hashes: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TradesBatchEntry { - #[schema(value_type = String)] - pub order_hash: alloy::primitives::B256, - pub trades: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TradesBatchResponse { - pub orders: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] -#[serde(rename_all = "camelCase")] -pub struct TakerTradesResponse { - pub market_orders: Vec, - pub pagination: TradesPagination, -}