From 90d483972b1ca1b3e4f80251552c4d852c030b59 Mon Sep 17 00:00:00 2001 From: findolor <16416963+findolor@users.noreply.github.com> Date: Wed, 13 May 2026 11:07:50 +0000 Subject: [PATCH] feat: add taker filters to getTrades (#2571) ## Related Issue - [RAI-529: Add taker address filters to raindex getTrades SDK](https://linear.app/makeitrain/issue/RAI-529/add-taker-address-filters-to-raindex-gettrades-sdk) ## Dependent PRs - Depends on #2570 ## Motivation Consumers need the SDK-level `getTrades` API to support filtering by taker address. The REST API work identifies the taker address as the trade event sender, which is available in the raindex local DB and subgraph trade models. For the local DB path, the taker predicate also needs to be applied before the trade reconstruction CTEs expand and hydrate rows. Filtering only after the normalized trade result forces SQLite to build a much larger intermediate result set before returning the requested page. ## Solution - Add `takers` to the existing `GetTradesFilters` SDK filter shape. - Map takers into local `FetchTradesArgs` and push the SQL predicate into the source CTEs: `take_orders.sender IN (...)` and `clear_v3_events.sender IN (...)`. - Add sender/time indexes for `take_orders` and `clear_v3_events` to support the new taker lookup path. - Add local DB `getTrades` fetch/count tracing with filter counts, parameter count, row/count results, and duration. - Add a subgraph `TradeEvent_filter` binding and map takers to `tradeEvent_: { sender_in: [...] }`. - Preserve the existing `getTrades` pagination, time, owner, order hash, orderbook, and token filter behavior. - Add focused tests for local SQL generation and subgraph filter mapping. ## Checks By submitting this for review, I confirm I have done the following: - [x] made this PR as small as possible - [x] unit-tested any new functionality - [x] linked any relevant issues or PRs - [ ] included screenshots (if this involves a front-end change) Additional validation run: - `cargo fmt --all` - `nix develop -c cargo test -p rain_orderbook_common fetch_trades` - `nix develop -c cargo test -p rain_orderbook_common -p rain_orderbook_subgraph_client` - `nix develop -c cargo test --workspace` --- .../local_db/query/create_tables/query.sql | 2 + .../src/local_db/query/fetch_trades/mod.rs | 52 +++++++++ .../src/local_db/query/fetch_trades/query.sql | 2 + .../local_db/query/fetch_trades.rs | 100 +++++++++++++++++- .../src/raindex_client/trades/get_all.rs | 26 ++++- crates/subgraph/src/types/common.rs | 10 ++ 6 files changed, 189 insertions(+), 3 deletions(-) diff --git a/crates/common/src/local_db/query/create_tables/query.sql b/crates/common/src/local_db/query/create_tables/query.sql index e4cfa4882a..03131cc33e 100644 --- a/crates/common/src/local_db/query/create_tables/query.sql +++ b/crates/common/src/local_db/query/create_tables/query.sql @@ -258,9 +258,11 @@ CREATE INDEX idx_order_ios_token ON order_ios(chain_id, orderbook_address, token CREATE INDEX idx_take_orders_owner ON take_orders(chain_id, orderbook_address, order_owner); CREATE INDEX idx_take_orders_block ON take_orders(chain_id, orderbook_address, block_number); +CREATE INDEX idx_take_orders_sender_time ON take_orders(chain_id, orderbook_address, sender, block_timestamp DESC, block_number DESC, log_index DESC); CREATE INDEX idx_clear_events_alice_bob ON clear_v3_events(chain_id, orderbook_address, alice_order_hash, bob_order_hash); CREATE INDEX idx_clear_events_block ON clear_v3_events(chain_id, orderbook_address, block_number); +CREATE INDEX idx_clear_events_sender_time ON clear_v3_events(chain_id, orderbook_address, sender, block_timestamp DESC, block_number DESC, log_index DESC); CREATE INDEX idx_clear_alice_vaults ON clear_v3_events(chain_id, orderbook_address, alice_input_vault_id, alice_output_vault_id); CREATE INDEX idx_clear_bob_vaults ON clear_v3_events(chain_id, orderbook_address, bob_input_vault_id, bob_output_vault_id); diff --git a/crates/common/src/local_db/query/fetch_trades/mod.rs b/crates/common/src/local_db/query/fetch_trades/mod.rs index 0d23d8eece..eb16983de4 100644 --- a/crates/common/src/local_db/query/fetch_trades/mod.rs +++ b/crates/common/src/local_db/query/fetch_trades/mod.rs @@ -10,11 +10,15 @@ const TAKE_ORDERS_CHAIN_IDS_CLAUSE: &str = "/*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/"; const TAKE_ORDERS_CHAIN_IDS_CLAUSE_BODY: &str = "AND t.chain_id IN ({list})"; const TAKE_ORDERS_ORDERBOOKS_CLAUSE: &str = "/*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/"; const TAKE_ORDERS_ORDERBOOKS_CLAUSE_BODY: &str = "AND t.orderbook_address IN ({list})"; +const TAKE_ORDERS_TAKERS_CLAUSE: &str = "/*TAKE_ORDERS_TAKERS_CLAUSE*/"; +const TAKE_ORDERS_TAKERS_CLAUSE_BODY: &str = "AND t.sender IN ({list})"; const CLEAR_EVENTS_CHAIN_IDS_CLAUSE: &str = "/*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/"; const CLEAR_EVENTS_CHAIN_IDS_CLAUSE_BODY: &str = "AND c.chain_id IN ({list})"; const CLEAR_EVENTS_ORDERBOOKS_CLAUSE: &str = "/*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/"; const CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY: &str = "AND c.orderbook_address IN ({list})"; +const CLEAR_EVENTS_TAKERS_CLAUSE: &str = "/*CLEAR_EVENTS_TAKERS_CLAUSE*/"; +const CLEAR_EVENTS_TAKERS_CLAUSE_BODY: &str = "AND c.sender IN ({list})"; const OWNERS_CLAUSE: &str = "/*OWNERS_CLAUSE*/"; const OWNERS_CLAUSE_BODY: &str = "AND tws.order_owner IN ({list})"; const ORDER_HASH_CLAUSE: &str = "/*ORDER_HASH_CLAUSE*/"; @@ -42,6 +46,7 @@ pub struct FetchTradesArgs { pub chain_ids: Vec, pub orderbook_addresses: Vec
, pub owners: Vec
, + pub takers: Vec
, pub order_hash: Option, pub tokens: FetchTradesTokensFilter, pub time_filter: TimeFilter, @@ -82,6 +87,20 @@ pub fn build_fetch_trades_stmt(args: &FetchTradesArgs) -> Result( exec: &E, args: FetchTradesArgs, ) -> Result, LocalDbQueryError> { + let started = Timing::now(); + let chain_ids_count = args.chain_ids.len(); + let orderbooks_count = args.orderbook_addresses.len(); + let owners_count = args.owners.len(); + let takers_count = args.takers.len(); + let input_tokens_count = args.tokens.inputs.len(); + let output_tokens_count = args.tokens.outputs.len(); + let has_order_hash = args.order_hash.is_some(); + let has_time_filter = args.time_filter.start.is_some() || args.time_filter.end.is_some(); + let page = args.pagination.page; + let page_size = args.pagination.page_size; let stmt = build_fetch_trades_stmt(&args)?; - exec.query_json(&stmt).await + match exec.query_json::>(&stmt).await { + Ok(trades) => { + tracing::info!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + page, + page_size, + params_count = stmt.params().len(), + rows = trades.len(), + duration_ms = started.elapsed_ms(), + "local DB getTrades fetch completed" + ); + Ok(trades) + } + Err(err) => { + tracing::error!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + page, + page_size, + params_count = stmt.params().len(), + duration_ms = started.elapsed_ms(), + error = %err, + "local DB getTrades fetch failed" + ); + Err(err) + } + } } pub async fn fetch_trades_count( exec: &E, args: FetchTradesArgs, ) -> Result, LocalDbQueryError> { + let started = Timing::now(); + let chain_ids_count = args.chain_ids.len(); + let orderbooks_count = args.orderbook_addresses.len(); + let owners_count = args.owners.len(); + let takers_count = args.takers.len(); + let input_tokens_count = args.tokens.inputs.len(); + let output_tokens_count = args.tokens.outputs.len(); + let has_order_hash = args.order_hash.is_some(); + let has_time_filter = args.time_filter.start.is_some() || args.time_filter.end.is_some(); let stmt = build_fetch_trades_count_stmt(&args)?; - exec.query_json(&stmt).await + match exec.query_json::>(&stmt).await { + Ok(rows) => { + tracing::info!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + params_count = stmt.params().len(), + rows = rows.len(), + trade_count = rows.first().map(|row| row.trade_count), + duration_ms = started.elapsed_ms(), + "local DB getTrades count completed" + ); + Ok(rows) + } + Err(err) => { + tracing::error!( + chain_ids_count, + orderbooks_count, + owners_count, + takers_count, + input_tokens_count, + output_tokens_count, + has_order_hash, + has_time_filter, + params_count = stmt.params().len(), + duration_ms = started.elapsed_ms(), + error = %err, + "local DB getTrades count failed" + ); + Err(err) + } + } } #[cfg(all(test, target_family = "wasm"))] diff --git a/crates/common/src/raindex_client/trades/get_all.rs b/crates/common/src/raindex_client/trades/get_all.rs index f4babbca91..31ba1e1cdc 100644 --- a/crates/common/src/raindex_client/trades/get_all.rs +++ b/crates/common/src/raindex_client/trades/get_all.rs @@ -5,7 +5,7 @@ use crate::local_db::query::fetch_trades::{ use crate::raindex_client::local_db::query::fetch_trades::{fetch_trades, fetch_trades_count}; use crate::utils::timing::Timing; use rain_orderbook_subgraph_client::types::common::{ - SgBigInt, SgBytes, SgTrade, SgTradeOrderFilter, SgTradesListQueryFilters, + SgBigInt, SgBytes, SgTrade, SgTradeEventFilter, SgTradeOrderFilter, SgTradesListQueryFilters, SgTradesTokensFilterArgs, }; use rain_orderbook_subgraph_client::MultiOrderbookSubgraphClient; @@ -29,6 +29,8 @@ impl_wasm_traits!(GetTradesTokenFilter); pub struct GetTradesFilters { #[tsify(optional, type = "Address[]")] pub owners: Vec
, + #[tsify(optional, type = "Address[]")] + pub takers: Vec
, #[tsify(optional, type = "Hex")] pub order_hash: Option, #[tsify(optional)] @@ -97,6 +99,16 @@ impl From for SgTradesListQueryFilters { }); } + if !filters.takers.is_empty() { + sg_filters.trade_event_ = Some(SgTradeEventFilter { + sender_in: filters + .takers + .into_iter() + .map(|taker| SgBytes(taker.to_string())) + .collect(), + }); + } + sg_filters } } @@ -199,6 +211,7 @@ impl RaindexClient { .as_ref() .and_then(|tokens| tokens.outputs.as_ref()) .map_or(0, Vec::len); + let takers_count = filters.takers.len(); let orderbooks_count = filters.orderbook_addresses.as_ref().map_or(0, Vec::len); let mut all_trades = Vec::new(); @@ -231,6 +244,7 @@ impl RaindexClient { chain_ids: local_ids.clone(), orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(), owners: filters.owners.clone(), + takers: filters.takers.clone(), order_hash: filters.order_hash, tokens: local_tokens.clone(), time_filter: filters.time_filter.clone().unwrap_or_default(), @@ -260,6 +274,7 @@ impl RaindexClient { chain_ids: local_ids, orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(), owners: filters.owners.clone(), + takers: filters.takers.clone(), order_hash: filters.order_hash, tokens: local_tokens, time_filter: filters.time_filter.clone().unwrap_or_default(), @@ -383,6 +398,7 @@ impl RaindexClient { local_chain_ids_count, subgraph_chain_ids_count, owners_count = filters.owners.len(), + takers_count, orderbooks_count, has_order_hash = filters.order_hash.is_some(), has_time_filter = filters.time_filter.is_some(), @@ -442,12 +458,14 @@ mod tests { #[test] fn maps_trade_filters_to_subgraph_filters() { let owner = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + let taker = address!("0xcccccccccccccccccccccccccccccccccccccccc"); let orderbook = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); let order_hash = b256!("0x0000000000000000000000000000000000000000000000000000000000abcdef"); let filters: SgTradesListQueryFilters = GetTradesFilters { owners: vec![owner], + takers: vec![taker], order_hash: Some(order_hash), orderbook_addresses: Some(vec![orderbook]), time_filter: Some(TimeFilter { @@ -464,6 +482,11 @@ mod tests { order_filter.order_hash, Some(SgBytes(order_hash.to_string())) ); + let trade_event_filter = filters.trade_event_.unwrap(); + assert_eq!( + trade_event_filter.sender_in, + vec![SgBytes(taker.to_string())] + ); assert_eq!(filters.orderbook_in, vec![format!("{orderbook:#x}")]); assert_eq!(filters.timestamp_gte, Some(SgBigInt("10".to_string()))); assert_eq!(filters.timestamp_lte, Some(SgBigInt("20".to_string()))); @@ -512,6 +535,7 @@ mod tests { assert!(filters.or.is_none()); assert!(filters.input_vault_balance_change_.is_none()); assert!(filters.output_vault_balance_change_.is_none()); + assert!(filters.trade_event_.is_none()); } fn test_sg_trade(input_token: Address, output_token: Address) -> SgTrade { diff --git a/crates/subgraph/src/types/common.rs b/crates/subgraph/src/types/common.rs index 865592c4d2..6744578a49 100644 --- a/crates/subgraph/src/types/common.rs +++ b/crates/subgraph/src/types/common.rs @@ -172,6 +172,13 @@ pub struct SgTradeVaultBalanceChangeTokenFilter { pub vault_: Option, } +#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] +#[cynic(graphql_type = "TradeEvent_filter")] +pub struct SgTradeEventFilter { + #[cynic(rename = "sender_in", skip_serializing_if = "Vec::is_empty")] + pub sender_in: Vec, +} + #[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] #[cynic(graphql_type = "Trade_filter")] pub struct SgTradesListQueryFilters { @@ -198,6 +205,9 @@ pub struct SgTradesListQueryFilters { )] #[cfg_attr(target_family = "wasm", tsify(optional))] pub output_vault_balance_change_: Option, + #[cynic(rename = "tradeEvent_", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub trade_event_: Option, #[cynic(rename = "or", skip_serializing_if = "Option::is_none")] #[cfg_attr(target_family = "wasm", tsify(optional))] pub or: Option>,