diff --git a/src/main.rs b/src/main.rs index 4b6af71..2845c51 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,6 +65,8 @@ enum StartupError { routes::admin::put_registry, routes::trades::get_trades_by_tx, routes::trades::get_trades_by_address, + routes::trades::get_taker_trades, + routes::trades::post_trades_batch, routes::registry::get_registry, routes::registry::get_registry_history, ), diff --git a/src/routes/trades/get_taker_trades.rs b/src/routes/trades/get_taker_trades.rs new file mode 100644 index 0000000..b2cb28a --- /dev/null +++ b/src/routes/trades/get_taker_trades.rs @@ -0,0 +1,143 @@ +use crate::auth::AuthenticatedKey; +use crate::cache::AppCache; +use crate::error::{ApiError, ApiErrorResponse}; +use crate::fairings::{GlobalRateLimit, TracingSpan}; +use crate::types::common::ValidatedAddress; +use crate::types::trades::{ + TakerTradesResponse, TradesByTxResponse, TradesPagination, TradesPaginationParams, +}; +use alloy::primitives::{Address, B256}; +use rocket::serde::json::Json; +use rocket::State; +use std::time::Duration; +use tracing::Instrument; + +const TAKER_TX_HASH_CACHE_TTL: Duration = Duration::from_secs(15); +const TAKER_TX_HASH_CACHE_CAPACITY: u64 = 1_000; + +pub(crate) type TakerTradesTxHashCache = AppCache>; + +pub(crate) fn taker_trades_tx_hash_cache() -> TakerTradesTxHashCache { + AppCache::new(TAKER_TX_HASH_CACHE_CAPACITY, TAKER_TX_HASH_CACHE_TTL) +} + +pub(crate) async fn process_get_taker_trades( + ds: &dyn super::TradesDataSource, + direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>, + trades_by_tx_cache: &super::TradesByTxCache, + taker_tx_cache: &TakerTradesTxHashCache, + sender: Address, + params: TradesPaginationParams, +) -> Result { + // Step 1: Get tx hashes (cached) + let tx_hashes = match direct_trades { + Some(fetcher) => taker_tx_cache + .get_or_try_insert(sender, || async { + fetcher.fetch_taker_tx_hashes(&sender).await + }) + .await + .map_err(ApiError::from)?, + None => { + tracing::warn!("direct trades fetcher unavailable; returning empty taker trades"); + return Ok(TakerTradesResponse { + market_orders: vec![], + pagination: TradesPagination { + page: 1, + page_size: params.page_size.unwrap_or(20), + total_trades: 0, + total_pages: 0, + has_more: false, + }, + }); + } + }; + + // Step 2: Paginate + let page = params.page.unwrap_or(1); + let page_size = params.page_size.unwrap_or(20); + let total = tx_hashes.len() as u64; + let total_pages = if page_size == 0 { + 0 + } else { + total.div_ceil(u64::from(page_size)) + }; + let offset = (u64::from(page.saturating_sub(1)) * u64::from(page_size)) as usize; + let page_hashes: Vec = if offset >= tx_hashes.len() { + vec![] + } else { + let end = std::cmp::min(offset + page_size as usize, tx_hashes.len()); + tx_hashes[offset..end].iter().map(|(h, _)| *h).collect() + }; + + // Step 3: Resolve each tx via existing cached trade-by-tx lookup + let mut market_orders = Vec::with_capacity(page_hashes.len()); + for tx_hash in page_hashes { + match super::get_cached_trades_by_tx(trades_by_tx_cache, ds, tx_hash, None).await { + Ok(tx_trades) => market_orders.push(tx_trades), + Err(e) => { + tracing::warn!(tx_hash = %tx_hash, error = %e, "failed to resolve taker tx; skipping"); + } + } + } + + Ok(TakerTradesResponse { + market_orders, + pagination: TradesPagination { + page, + page_size, + total_trades: total, + total_pages, + has_more: u64::from(page) < total_pages, + }, + }) +} + +#[utoipa::path( + get, + path = "/v1/trades/taker/{address}", + tag = "Trades", + security(("basicAuth" = [])), + params( + ("address" = String, Path, description = "Taker address"), + TradesPaginationParams, + ), + responses( + (status = 200, description = "Paginated list of market orders (taker transactions)", body = TakerTradesResponse), + (status = 400, description = "Bad request", body = ApiErrorResponse), + (status = 401, description = "Unauthorized", body = ApiErrorResponse), + (status = 429, description = "Rate limited", body = ApiErrorResponse), + (status = 500, description = "Internal server error", body = ApiErrorResponse), + ) +)] +#[get("/taker/
?")] +pub async fn get_taker_trades( + _global: GlobalRateLimit, + _key: AuthenticatedKey, + shared_raindex: &State, + trades_by_tx_cache: &State, + taker_tx_cache: &State, + direct_trades: &State>, + span: TracingSpan, + address: ValidatedAddress, + params: TradesPaginationParams, +) -> Result, ApiError> { + async move { + tracing::info!(address = ?address, params = ?params, "taker trades request received"); + let raindex = shared_raindex.read().await; + let ds = super::RaindexTradesDataSource { + client: raindex.client(), + }; + let response = process_get_taker_trades( + &ds, + direct_trades.inner().as_ref(), + trades_by_tx_cache, + taker_tx_cache, + address.0, + params, + ) + .await?; + Ok(Json(response)) + } + .instrument(span.0) + .await +} diff --git a/src/routes/trades/post_batch.rs b/src/routes/trades/post_batch.rs new file mode 100644 index 0000000..c0ad7f5 --- /dev/null +++ b/src/routes/trades/post_batch.rs @@ -0,0 +1,188 @@ +use crate::auth::AuthenticatedKey; +use crate::cache::AppCache; +use crate::error::{ApiError, ApiErrorResponse}; +use crate::fairings::{GlobalRateLimit, TracingSpan}; +use crate::types::order::OrderTradeEntry; +use crate::types::trades::{TradesBatchEntry, TradesBatchRequest, TradesBatchResponse}; +use alloy::primitives::B256; +use futures::future::join_all; +use rocket::serde::json::Json; +use rocket::State; +use std::time::{Duration, Instant}; +use tracing::Instrument; + +const TRADES_BY_ORDER_HASH_CACHE_TTL: Duration = Duration::from_secs(60); +const TRADES_BY_ORDER_HASH_CACHE_CAPACITY: u64 = 1_000; +const TRADES_BATCH_MAX_HASHES: usize = 50; + +pub(crate) type TradesByOrderHashCache = AppCache>; + +pub(crate) fn trades_by_order_hash_cache() -> TradesByOrderHashCache { + AppCache::new( + TRADES_BY_ORDER_HASH_CACHE_CAPACITY, + TRADES_BY_ORDER_HASH_CACHE_TTL, + ) +} + +async fn fetch_trades_for_hash( + ds: &dyn super::TradesDataSource, + hash: B256, +) -> Result, ApiError> { + let order = match ds.find_order_by_hash(hash).await? { + Some(o) => o, + None => return Ok(vec![]), + }; + let trades = ds.get_order_trades(&order, None, None).await?; + Ok(trades.iter().map(super::super::order::map_trade).collect()) +} + +pub(crate) async fn process_trades_batch( + ds: &dyn super::TradesDataSource, + cache: &TradesByOrderHashCache, + direct_trades: Option<&crate::direct_trades::DirectTradesFetcher>, + hashes: Vec, +) -> Result { + let total_start = Instant::now(); + + let mut cached_map: std::collections::HashMap> = + std::collections::HashMap::new(); + let mut uncached: Vec = Vec::new(); + + for &hash in &hashes { + if let Some(trades) = cache.get(&hash).await { + cached_map.insert(hash, trades); + } else { + uncached.push(hash); + } + } + + tracing::info!( + total_hashes = hashes.len(), + cached = cached_map.len(), + uncached = uncached.len(), + "batch trades cache check" + ); + + if !uncached.is_empty() { + if let Some(fetcher) = direct_trades { + // Fast path: single batch query via direct SQLite connection + match fetcher.batch_fetch(&uncached).await { + Ok(batch_result) => { + for &hash in &uncached { + let trades = batch_result.get(&hash).cloned().unwrap_or_default(); + cache.insert(hash, trades.clone()).await; + cached_map.insert(hash, trades); + } + } + Err(e) => { + tracing::warn!(error = %e, "direct batch trades failed; falling back to library"); + let results = + join_all(uncached.iter().map(|&hash| fetch_trades_for_hash(ds, hash))) + .await; + for (&hash, result) in uncached.iter().zip(results) { + match result { + Ok(trades) => { + cache.insert(hash, trades.clone()).await; + cached_map.insert(hash, trades); + } + Err(e) => { + tracing::warn!(order_hash = %hash, error = %e, "failed to fetch trades for order in batch"); + cached_map.insert(hash, vec![]); + } + } + } + } + } + } else { + // Fallback: N parallel queries via library + let results = + join_all(uncached.iter().map(|&hash| fetch_trades_for_hash(ds, hash))).await; + for (&hash, result) in uncached.iter().zip(results) { + match result { + Ok(trades) => { + cache.insert(hash, trades.clone()).await; + cached_map.insert(hash, trades); + } + Err(e) => { + tracing::warn!(order_hash = %hash, error = %e, "failed to fetch trades for order in batch"); + cached_map.insert(hash, vec![]); + } + } + } + } + } + + let entries = hashes + .iter() + .map(|hash| TradesBatchEntry { + order_hash: *hash, + trades: cached_map.remove(hash).unwrap_or_default(), + }) + .collect(); + + tracing::info!( + total_duration_ms = total_start.elapsed().as_millis(), + total_hashes = hashes.len(), + "batch trades request processed" + ); + + Ok(TradesBatchResponse { orders: entries }) +} + +#[utoipa::path( + post, + path = "/v1/trades/batch", + tag = "Trades", + security(("basicAuth" = [])), + request_body = TradesBatchRequest, + responses( + (status = 200, description = "Trades grouped by order hash", body = TradesBatchResponse), + (status = 400, description = "Bad request", body = ApiErrorResponse), + (status = 401, description = "Unauthorized", body = ApiErrorResponse), + (status = 429, description = "Rate limited", body = ApiErrorResponse), + (status = 500, description = "Internal server error", body = ApiErrorResponse), + ) +)] +#[post("/batch", data = "")] +pub async fn post_trades_batch( + _global: GlobalRateLimit, + _key: AuthenticatedKey, + shared_raindex: &State, + trades_by_order_hash_cache: &State, + direct_trades: &State>, + span: TracingSpan, + body: Json, +) -> Result, ApiError> { + async move { + tracing::info!( + hash_count = body.order_hashes.len(), + "batch trades request received" + ); + + if body.order_hashes.is_empty() { + return Ok(Json(TradesBatchResponse { orders: vec![] })); + } + + if body.order_hashes.len() > TRADES_BATCH_MAX_HASHES { + return Err(ApiError::BadRequest(format!( + "maximum {} order hashes per batch request", + TRADES_BATCH_MAX_HASHES + ))); + } + + let raindex = shared_raindex.read().await; + let ds = super::RaindexTradesDataSource { + client: raindex.client(), + }; + let response = process_trades_batch( + &ds, + trades_by_order_hash_cache, + direct_trades.inner().as_ref(), + body.into_inner().order_hashes, + ) + .await?; + Ok(Json(response)) + } + .instrument(span.0) + .await +} diff --git a/src/types/trades.rs b/src/types/trades.rs index 9fb2668..78d790a 100644 --- a/src/types/trades.rs +++ b/src/types/trades.rs @@ -123,3 +123,31 @@ pub struct TradesByTxResponse { pub trades: Vec, pub totals: TradesTotals, } + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TradesBatchRequest { + #[schema(value_type = Vec)] + pub order_hashes: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TradesBatchEntry { + #[schema(value_type = String)] + pub order_hash: alloy::primitives::B256, + pub trades: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TradesBatchResponse { + pub orders: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct TakerTradesResponse { + pub market_orders: Vec, + pub pagination: TradesPagination, +}