diff --git a/crates/common/src/local_db/query/create_tables/query.sql b/crates/common/src/local_db/query/create_tables/query.sql index e4cfa4882a..03131cc33e 100644 --- a/crates/common/src/local_db/query/create_tables/query.sql +++ b/crates/common/src/local_db/query/create_tables/query.sql @@ -258,9 +258,11 @@ CREATE INDEX idx_order_ios_token ON order_ios(chain_id, orderbook_address, token CREATE INDEX idx_take_orders_owner ON take_orders(chain_id, orderbook_address, order_owner); CREATE INDEX idx_take_orders_block ON take_orders(chain_id, orderbook_address, block_number); +CREATE INDEX idx_take_orders_sender_time ON take_orders(chain_id, orderbook_address, sender, block_timestamp DESC, block_number DESC, log_index DESC); CREATE INDEX idx_clear_events_alice_bob ON clear_v3_events(chain_id, orderbook_address, alice_order_hash, bob_order_hash); CREATE INDEX idx_clear_events_block ON clear_v3_events(chain_id, orderbook_address, block_number); +CREATE INDEX idx_clear_events_sender_time ON clear_v3_events(chain_id, orderbook_address, sender, block_timestamp DESC, block_number DESC, log_index DESC); CREATE INDEX idx_clear_alice_vaults ON clear_v3_events(chain_id, orderbook_address, alice_input_vault_id, alice_output_vault_id); CREATE INDEX idx_clear_bob_vaults ON clear_v3_events(chain_id, orderbook_address, bob_input_vault_id, bob_output_vault_id); diff --git a/crates/common/src/local_db/query/fetch_trades/mod.rs b/crates/common/src/local_db/query/fetch_trades/mod.rs index 0d23d8eece..eb16983de4 100644 --- a/crates/common/src/local_db/query/fetch_trades/mod.rs +++ b/crates/common/src/local_db/query/fetch_trades/mod.rs @@ -10,11 +10,15 @@ const TAKE_ORDERS_CHAIN_IDS_CLAUSE: &str = "/*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/"; const TAKE_ORDERS_CHAIN_IDS_CLAUSE_BODY: &str = "AND t.chain_id IN ({list})"; const TAKE_ORDERS_ORDERBOOKS_CLAUSE: &str = "/*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/"; const TAKE_ORDERS_ORDERBOOKS_CLAUSE_BODY: &str = "AND t.orderbook_address IN ({list})"; +const TAKE_ORDERS_TAKERS_CLAUSE: &str = "/*TAKE_ORDERS_TAKERS_CLAUSE*/"; +const TAKE_ORDERS_TAKERS_CLAUSE_BODY: &str = "AND t.sender IN ({list})"; const CLEAR_EVENTS_CHAIN_IDS_CLAUSE: &str = "/*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/"; const CLEAR_EVENTS_CHAIN_IDS_CLAUSE_BODY: &str = "AND c.chain_id IN ({list})"; const CLEAR_EVENTS_ORDERBOOKS_CLAUSE: &str = "/*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/"; const CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY: &str = "AND c.orderbook_address IN ({list})"; +const CLEAR_EVENTS_TAKERS_CLAUSE: &str = "/*CLEAR_EVENTS_TAKERS_CLAUSE*/"; +const CLEAR_EVENTS_TAKERS_CLAUSE_BODY: &str = "AND c.sender IN ({list})"; const OWNERS_CLAUSE: &str = "/*OWNERS_CLAUSE*/"; const OWNERS_CLAUSE_BODY: &str = "AND tws.order_owner IN ({list})"; const ORDER_HASH_CLAUSE: &str = "/*ORDER_HASH_CLAUSE*/"; @@ -42,6 +46,7 @@ pub struct FetchTradesArgs { pub chain_ids: Vec, pub orderbook_addresses: Vec
, pub owners: Vec
, + pub takers: Vec
, pub order_hash: Option, pub tokens: FetchTradesTokensFilter, pub time_filter: TimeFilter, @@ -82,6 +87,20 @@ pub fn build_fetch_trades_stmt(args: &FetchTradesArgs) -> Result( exec: &E, args: FetchTradesArgs, ) -> Result, LocalDbQueryError> { + let started = Timing::now(); + let chain_ids_count = args.chain_ids.len(); + let orderbooks_count = args.orderbook_addresses.len(); + let owners_count = args.owners.len(); + let takers_count = args.takers.len(); + let input_tokens_count = args.tokens.inputs.len(); + let output_tokens_count = args.tokens.outputs.len(); + let has_order_hash = args.order_hash.is_some(); + let has_time_filter = args.time_filter.start.is_some() || args.time_filter.end.is_some(); + let page = args.pagination.page; + let page_size = args.pagination.page_size; let stmt = build_fetch_trades_stmt(&args)?; - exec.query_json(&stmt).await + match exec.query_json::>(&stmt).await { + Ok(trades) => { + tracing::info!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + page, + page_size, + params_count = stmt.params().len(), + rows = trades.len(), + duration_ms = started.elapsed_ms(), + "local DB getTrades fetch completed" + ); + Ok(trades) + } + Err(err) => { + tracing::error!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + page, + page_size, + params_count = stmt.params().len(), + duration_ms = started.elapsed_ms(), + error = %err, + "local DB getTrades fetch failed" + ); + Err(err) + } + } } pub async fn fetch_trades_count( exec: &E, args: FetchTradesArgs, ) -> Result, LocalDbQueryError> { + let started = Timing::now(); + let chain_ids_count = args.chain_ids.len(); + let orderbooks_count = args.orderbook_addresses.len(); + let owners_count = args.owners.len(); + let takers_count = args.takers.len(); + let input_tokens_count = args.tokens.inputs.len(); + let output_tokens_count = args.tokens.outputs.len(); + let has_order_hash = args.order_hash.is_some(); + let has_time_filter = args.time_filter.start.is_some() || args.time_filter.end.is_some(); let stmt = build_fetch_trades_count_stmt(&args)?; - exec.query_json(&stmt).await + match exec.query_json::>(&stmt).await { + Ok(rows) => { + tracing::info!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + params_count = stmt.params().len(), + rows = rows.len(), + trade_count = rows.first().map(|row| row.trade_count), + duration_ms = started.elapsed_ms(), + "local DB getTrades count completed" + ); + Ok(rows) + } + Err(err) => { + tracing::error!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + params_count = stmt.params().len(), + duration_ms = started.elapsed_ms(), + error = %err, + "local DB getTrades count failed" + ); + Err(err) + } + } } #[cfg(all(test, target_family = "wasm"))] diff --git a/crates/common/src/raindex_client/trades/get_all.rs b/crates/common/src/raindex_client/trades/get_all.rs index f4babbca91..31ba1e1cdc 100644 --- a/crates/common/src/raindex_client/trades/get_all.rs +++ b/crates/common/src/raindex_client/trades/get_all.rs @@ -5,7 +5,7 @@ use crate::local_db::query::fetch_trades::{ use crate::raindex_client::local_db::query::fetch_trades::{fetch_trades, fetch_trades_count}; use crate::utils::timing::Timing; use rain_orderbook_subgraph_client::types::common::{ - SgBigInt, SgBytes, SgTrade, SgTradeOrderFilter, SgTradesListQueryFilters, + SgBigInt, SgBytes, SgTrade, SgTradeEventFilter, SgTradeOrderFilter, SgTradesListQueryFilters, SgTradesTokensFilterArgs, }; use rain_orderbook_subgraph_client::MultiOrderbookSubgraphClient; @@ -29,6 +29,8 @@ impl_wasm_traits!(GetTradesTokenFilter); pub struct GetTradesFilters { #[tsify(optional, type = "Address[]")] pub owners: Vec
, + #[tsify(optional, type = "Address[]")] + pub takers: Vec
, #[tsify(optional, type = "Hex")] pub order_hash: Option, #[tsify(optional)] @@ -97,6 +99,16 @@ impl From for SgTradesListQueryFilters { }); } + if !filters.takers.is_empty() { + sg_filters.trade_event_ = Some(SgTradeEventFilter { + sender_in: filters + .takers + .into_iter() + .map(|taker| SgBytes(taker.to_string())) + .collect(), + }); + } + sg_filters } } @@ -199,6 +211,7 @@ impl RaindexClient { .as_ref() .and_then(|tokens| tokens.outputs.as_ref()) .map_or(0, Vec::len); + let takers_count = filters.takers.len(); let orderbooks_count = filters.orderbook_addresses.as_ref().map_or(0, Vec::len); let mut all_trades = Vec::new(); @@ -231,6 +244,7 @@ impl RaindexClient { chain_ids: local_ids.clone(), orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(), owners: filters.owners.clone(), + takers: filters.takers.clone(), order_hash: filters.order_hash, tokens: local_tokens.clone(), time_filter: filters.time_filter.clone().unwrap_or_default(), @@ -260,6 +274,7 @@ impl RaindexClient { chain_ids: local_ids, orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(), owners: filters.owners.clone(), + takers: filters.takers.clone(), order_hash: filters.order_hash, tokens: local_tokens, time_filter: filters.time_filter.clone().unwrap_or_default(), @@ -383,6 +398,7 @@ impl RaindexClient { local_chain_ids_count, subgraph_chain_ids_count, owners_count = filters.owners.len(), + takers_count, orderbooks_count, has_order_hash = filters.order_hash.is_some(), has_time_filter = filters.time_filter.is_some(), @@ -442,12 +458,14 @@ mod tests { #[test] fn maps_trade_filters_to_subgraph_filters() { let owner = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + let taker = address!("0xcccccccccccccccccccccccccccccccccccccccc"); let orderbook = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); let order_hash = b256!("0x0000000000000000000000000000000000000000000000000000000000abcdef"); let filters: SgTradesListQueryFilters = GetTradesFilters { owners: vec![owner], + takers: vec![taker], order_hash: Some(order_hash), orderbook_addresses: Some(vec![orderbook]), time_filter: Some(TimeFilter { @@ -464,6 +482,11 @@ mod tests { order_filter.order_hash, Some(SgBytes(order_hash.to_string())) ); + let trade_event_filter = filters.trade_event_.unwrap(); + assert_eq!( + trade_event_filter.sender_in, + vec![SgBytes(taker.to_string())] + ); assert_eq!(filters.orderbook_in, vec![format!("{orderbook:#x}")]); assert_eq!(filters.timestamp_gte, Some(SgBigInt("10".to_string()))); assert_eq!(filters.timestamp_lte, Some(SgBigInt("20".to_string()))); @@ -512,6 +535,7 @@ mod tests { assert!(filters.or.is_none()); assert!(filters.input_vault_balance_change_.is_none()); assert!(filters.output_vault_balance_change_.is_none()); + assert!(filters.trade_event_.is_none()); } fn test_sg_trade(input_token: Address, output_token: Address) -> SgTrade { diff --git a/crates/subgraph/src/types/common.rs b/crates/subgraph/src/types/common.rs index 865592c4d2..6744578a49 100644 --- a/crates/subgraph/src/types/common.rs +++ b/crates/subgraph/src/types/common.rs @@ -172,6 +172,13 @@ pub struct SgTradeVaultBalanceChangeTokenFilter { pub vault_: Option, } +#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] +#[cynic(graphql_type = "TradeEvent_filter")] +pub struct SgTradeEventFilter { + #[cynic(rename = "sender_in", skip_serializing_if = "Vec::is_empty")] + pub sender_in: Vec, +} + #[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] #[cynic(graphql_type = "Trade_filter")] pub struct SgTradesListQueryFilters { @@ -198,6 +205,9 @@ pub struct SgTradesListQueryFilters { )] #[cfg_attr(target_family = "wasm", tsify(optional))] pub output_vault_balance_change_: Option, + #[cynic(rename = "tradeEvent_", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub trade_event_: Option, #[cynic(rename = "or", skip_serializing_if = "Option::is_none")] #[cfg_attr(target_family = "wasm", tsify(optional))] pub or: Option>,