From 1b82157e7d2a314efa508cf3d2059550e983864e Mon Sep 17 00:00:00 2001
From: findolor <16416963+findolor@users.noreply.github.com>
Date: Wed, 13 May 2026 11:07:50 +0000
Subject: [PATCH] feat: add token filters to getTrades (#2570)
## Related Issue
- [RAI-522: Add get v1/trades token tokenAddress](https://linear.app/makeitrain/issue/RAI-522/add-get-v1tradestokentokenaddress)
## Motivation
Consumers need the SDK-level `getTrades` API to support token filtering, matching the filter shape already available on `getOrders`. The implementation should work across local DB and subgraph sources without changing the existing owner-specific or transaction-specific trade APIs.
## Solution
- Add `RaindexClient.getTrades` in `trades/get_all.rs`, keeping `trades/mod.rs` limited to module wiring and exports.
- Add general local DB `fetch_trades` and `fetch_trades_count` query wrappers with owner, order hash, orderbook, time, pagination, and input/output token filters.
- Add a separate subgraph `trades_list` query and filter types instead of repurposing owner or transaction trade queries.
- Add multi-subgraph `trades_list` and `trades_count` helpers, with all-subgraph-failure behavior aligned with existing list APIs.
- Preserve the existing `getTradesForOwner` and `getTradesByTx` paths.
- Add focused tests for local token SQL generation, count query generation, and subgraph filter mapping.
## Checks
By submitting this for review, I'm confirming I've done the following:
- [x] made this PR as small as possible
- [x] unit-tested any new functionality
- [x] linked any relevant issues or PRs
- [ ] included screenshots (if this involves a front-end change)
Additional validation run:
- `cargo fmt --all`
- `git diff --check`
- `nix develop -c cargo check -p rain_orderbook_common`
- `nix develop -c cargo test -p rain_orderbook_subgraph_client test_trades_by_transaction_no_subgraphs`
- `nix develop -c cargo test -p rain_orderbook_subgraph_client test_trades_by_transaction_with_orderbook_filter`
Note: compiling `rain_orderbook_common` tests is currently blocked by missing Foundry artifact JSON files under `lib/rain.interpreter/out/...`; this appears unrelated to this PR.
## Summary by CodeRabbit
* **New Features**
* Comprehensive trades query API with owners, order-hash, time-range filters
* Directional and combined token filtering with deterministic matching semantics
* Pagination and trade count queries; aggregated results from local DB and subgraph sources
* WASM-exported client method to fetch merged, paginated trades
* Cross-platform timing utility for elapsed-time measurement
* **Tests**
* Unit and integration tests covering ordering, token filters, pagination, counts, and multi-subgraph merging
[](https://app.coderabbit.ai/change-stack/rainlanguage/raindex/pull/2570)
---
.../src/local_db/query/fetch_trades/mod.rs | 383 +++++++++++
.../src/local_db/query/fetch_trades/query.sql | 380 +++++++++++
crates/common/src/local_db/query/mod.rs | 1 +
.../local_db/query/fetch_trades.rs | 57 ++
.../src/raindex_client/local_db/query/mod.rs | 1 +
.../src/raindex_client/trades/get_all.rs | 641 ++++++++++++++++++
.../common/src/raindex_client/trades/mod.rs | 3 +
crates/common/src/utils/mod.rs | 1 +
crates/common/src/utils/timing.rs | 37 +
crates/subgraph/src/multi_orderbook_client.rs | 583 +++++++++++++++-
crates/subgraph/src/orderbook_client/mod.rs | 2 +-
.../src/orderbook_client/order_trade.rs | 75 ++
crates/subgraph/src/types/common.rs | 74 ++
crates/subgraph/src/types/order_trade.rs | 14 +
14 files changed, 2249 insertions(+), 3 deletions(-)
create mode 100644 crates/common/src/local_db/query/fetch_trades/mod.rs
create mode 100644 crates/common/src/local_db/query/fetch_trades/query.sql
create mode 100644 crates/common/src/raindex_client/local_db/query/fetch_trades.rs
create mode 100644 crates/common/src/raindex_client/trades/get_all.rs
create mode 100644 crates/common/src/utils/timing.rs
diff --git a/crates/common/src/local_db/query/fetch_trades/mod.rs b/crates/common/src/local_db/query/fetch_trades/mod.rs
new file mode 100644
index 0000000000..0d23d8eece
--- /dev/null
+++ b/crates/common/src/local_db/query/fetch_trades/mod.rs
@@ -0,0 +1,383 @@
+use crate::local_db::query::fetch_order_trades_count::LocalDbTradeCountRow;
+use crate::local_db::query::{SqlBuildError, SqlStatement, SqlValue};
+use crate::raindex_client::{PaginationParams, TimeFilter};
+use alloy::primitives::{Address, B256};
+use std::convert::TryFrom;
+
+const QUERY_TEMPLATE: &str = include_str!("query.sql");
+
+const TAKE_ORDERS_CHAIN_IDS_CLAUSE: &str = "/*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/";
+const TAKE_ORDERS_CHAIN_IDS_CLAUSE_BODY: &str = "AND t.chain_id IN ({list})";
+const TAKE_ORDERS_ORDERBOOKS_CLAUSE: &str = "/*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/";
+const TAKE_ORDERS_ORDERBOOKS_CLAUSE_BODY: &str = "AND t.orderbook_address IN ({list})";
+
+const CLEAR_EVENTS_CHAIN_IDS_CLAUSE: &str = "/*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/";
+const CLEAR_EVENTS_CHAIN_IDS_CLAUSE_BODY: &str = "AND c.chain_id IN ({list})";
+const CLEAR_EVENTS_ORDERBOOKS_CLAUSE: &str = "/*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/";
+const CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY: &str = "AND c.orderbook_address IN ({list})";
+const OWNERS_CLAUSE: &str = "/*OWNERS_CLAUSE*/";
+const OWNERS_CLAUSE_BODY: &str = "AND tws.order_owner IN ({list})";
+const ORDER_HASH_CLAUSE: &str = "/*ORDER_HASH_CLAUSE*/";
+const ORDER_HASH_CLAUSE_BODY: &str = "AND tws.order_hash = {param}";
+const START_TS_CLAUSE: &str = "/*START_TS_CLAUSE*/";
+const START_TS_BODY: &str = "AND tws.block_timestamp >= {param}";
+const END_TS_CLAUSE: &str = "/*END_TS_CLAUSE*/";
+const END_TS_BODY: &str = "AND tws.block_timestamp <= {param}";
+const PAGINATION_CLAUSE: &str = "/*PAGINATION_CLAUSE*/";
+const INPUT_TOKENS_CLAUSE: &str = "/*INPUT_TOKENS_CLAUSE*/";
+const INPUT_TOKENS_CLAUSE_BODY: &str = "AND tws.input_token IN ({list})";
+const OUTPUT_TOKENS_CLAUSE: &str = "/*OUTPUT_TOKENS_CLAUSE*/";
+const OUTPUT_TOKENS_CLAUSE_BODY: &str = "AND tws.output_token IN ({list})";
+const COMBINED_TOKENS_CLAUSE_BODY: &str =
+ "AND (tws.input_token IN ({input_list}) OR tws.output_token IN ({output_list}))";
+
+#[derive(Debug, Clone, Default)]
+pub struct FetchTradesTokensFilter {
+ pub inputs: Vec
,
+ pub outputs: Vec,
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct FetchTradesArgs {
+ pub chain_ids: Vec,
+ pub orderbook_addresses: Vec,
+ pub owners: Vec,
+ pub order_hash: Option,
+ pub tokens: FetchTradesTokensFilter,
+ pub time_filter: TimeFilter,
+ pub pagination: PaginationParams,
+}
+
+pub fn build_fetch_trades_stmt(args: &FetchTradesArgs) -> Result {
+ let mut stmt = SqlStatement::new(QUERY_TEMPLATE);
+
+ let mut chain_ids = args.chain_ids.clone();
+ chain_ids.sort_unstable();
+ chain_ids.dedup();
+
+ let mut orderbooks = args.orderbook_addresses.clone();
+ orderbooks.sort();
+ orderbooks.dedup();
+
+ let chain_ids_iter = || chain_ids.iter().cloned().map(SqlValue::from);
+ let orderbooks_iter = || orderbooks.iter().cloned().map(SqlValue::from);
+
+ stmt.bind_list_clause(
+ TAKE_ORDERS_CHAIN_IDS_CLAUSE,
+ TAKE_ORDERS_CHAIN_IDS_CLAUSE_BODY,
+ chain_ids_iter(),
+ )?;
+ stmt.bind_list_clause(
+ CLEAR_EVENTS_CHAIN_IDS_CLAUSE,
+ CLEAR_EVENTS_CHAIN_IDS_CLAUSE_BODY,
+ chain_ids_iter(),
+ )?;
+ stmt.bind_list_clause(
+ TAKE_ORDERS_ORDERBOOKS_CLAUSE,
+ TAKE_ORDERS_ORDERBOOKS_CLAUSE_BODY,
+ orderbooks_iter(),
+ )?;
+ stmt.bind_list_clause(
+ CLEAR_EVENTS_ORDERBOOKS_CLAUSE,
+ CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY,
+ orderbooks_iter(),
+ )?;
+ let mut owners = args.owners.clone();
+ owners.sort();
+ owners.dedup();
+ stmt.bind_list_clause(
+ OWNERS_CLAUSE,
+ OWNERS_CLAUSE_BODY,
+ owners.into_iter().map(SqlValue::from),
+ )?;
+ stmt.bind_param_clause(
+ ORDER_HASH_CLAUSE,
+ ORDER_HASH_CLAUSE_BODY,
+ args.order_hash.map(SqlValue::from),
+ )?;
+
+ if let (Some(start), Some(end)) = (args.time_filter.start, args.time_filter.end) {
+ if start > end {
+ return Err(SqlBuildError::new("start_timestamp > end_timestamp"));
+ }
+ }
+ let start_param = args
+ .time_filter
+ .start
+ .map(|v| {
+ i64::try_from(v).map(SqlValue::I64).map_err(|e| {
+ SqlBuildError::new(format!(
+ "start_timestamp out of range for i64: {} ({})",
+ v, e
+ ))
+ })
+ })
+ .transpose()?;
+ stmt.bind_param_clause(START_TS_CLAUSE, START_TS_BODY, start_param)?;
+
+ let end_param = args
+ .time_filter
+ .end
+ .map(|v| {
+ i64::try_from(v).map(SqlValue::I64).map_err(|e| {
+ SqlBuildError::new(format!("end_timestamp out of range for i64: {} ({})", v, e))
+ })
+ })
+ .transpose()?;
+ stmt.bind_param_clause(END_TS_CLAUSE, END_TS_BODY, end_param)?;
+ bind_token_filters(&mut stmt, &args.tokens)?;
+ if let Some(page) = args.pagination.page {
+ let page_size = args.pagination.page_size.unwrap_or(100);
+ let offset = (page.saturating_sub(1) as u64) * (page_size as u64);
+ let limit_placeholder = format!("?{}", stmt.params.len() + 1);
+ let offset_placeholder = format!("?{}", stmt.params.len() + 2);
+ let pagination = format!("LIMIT {} OFFSET {}", limit_placeholder, offset_placeholder);
+ stmt.sql = stmt.sql.replace(PAGINATION_CLAUSE, &pagination);
+ stmt.push(SqlValue::U64(page_size as u64));
+ stmt.push(SqlValue::U64(offset));
+ } else {
+ stmt.sql = stmt.sql.replace(PAGINATION_CLAUSE, "");
+ }
+ Ok(stmt)
+}
+
+pub fn build_fetch_trades_count_stmt(
+ args: &FetchTradesArgs,
+) -> Result {
+ let mut args = args.clone();
+ args.pagination = PaginationParams::default();
+ let stmt = build_fetch_trades_stmt(&args)?;
+ let inner_sql = stmt.sql.trim().trim_end_matches(';').trim();
+ Ok(SqlStatement {
+ sql: format!(
+ "SELECT COUNT(*) AS trade_count FROM ({}) AS filtered_trades",
+ inner_sql
+ ),
+ params: stmt.params,
+ })
+}
+
+pub fn extract_trades_count(rows: &[LocalDbTradeCountRow]) -> u64 {
+ rows.first().map(|row| row.trade_count).unwrap_or(0)
+}
+
+fn bind_token_filters(
+ stmt: &mut SqlStatement,
+ tokens: &FetchTradesTokensFilter,
+) -> Result<(), SqlBuildError> {
+ let mut input_tokens = tokens.inputs.clone();
+ input_tokens.sort();
+ input_tokens.dedup();
+
+ let mut output_tokens = tokens.outputs.clone();
+ output_tokens.sort();
+ output_tokens.dedup();
+
+ let has_inputs = !input_tokens.is_empty();
+ let has_outputs = !output_tokens.is_empty();
+
+ if has_inputs && has_outputs && input_tokens == output_tokens {
+ let input_placeholders: Vec = input_tokens
+ .iter()
+ .enumerate()
+ .map(|(i, _)| format!("?{}", stmt.params.len() + i + 1))
+ .collect();
+ let input_list = input_placeholders.join(", ");
+ for token in &input_tokens {
+ stmt.push(SqlValue::from(*token));
+ }
+
+ let output_placeholders: Vec = output_tokens
+ .iter()
+ .enumerate()
+ .map(|(i, _)| format!("?{}", stmt.params.len() + i + 1))
+ .collect();
+ let output_list = output_placeholders.join(", ");
+ for token in &output_tokens {
+ stmt.push(SqlValue::from(*token));
+ }
+
+ let clause = COMBINED_TOKENS_CLAUSE_BODY
+ .replace("{input_list}", &input_list)
+ .replace("{output_list}", &output_list);
+ stmt.replace(INPUT_TOKENS_CLAUSE, &clause)?;
+ stmt.replace(OUTPUT_TOKENS_CLAUSE, "")?;
+ } else {
+ stmt.bind_list_clause(
+ INPUT_TOKENS_CLAUSE,
+ INPUT_TOKENS_CLAUSE_BODY,
+ input_tokens.into_iter().map(SqlValue::from),
+ )?;
+ stmt.bind_list_clause(
+ OUTPUT_TOKENS_CLAUSE,
+ OUTPUT_TOKENS_CLAUSE_BODY,
+ output_tokens.into_iter().map(SqlValue::from),
+ )?;
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use alloy::{hex, primitives::address};
+
+ #[test]
+ fn builds_with_chain_ids() {
+ let stmt = build_fetch_trades_stmt(&FetchTradesArgs {
+ chain_ids: vec![137, 1, 137],
+ orderbook_addresses: vec![],
+ ..Default::default()
+ })
+ .unwrap();
+ assert_eq!(stmt.params.len(), 4);
+ assert_eq!(stmt.params[0], SqlValue::U64(1));
+ assert_eq!(stmt.params[1], SqlValue::U64(137));
+ assert_eq!(stmt.params[2], SqlValue::U64(1));
+ assert_eq!(stmt.params[3], SqlValue::U64(137));
+ assert!(stmt.sql.contains("t.chain_id IN (?1, ?2)"));
+ assert!(stmt.sql.contains("c.chain_id IN (?3, ?4)"));
+ assert!(!stmt.sql.contains(TAKE_ORDERS_CHAIN_IDS_CLAUSE));
+ assert!(!stmt.sql.contains(CLEAR_EVENTS_CHAIN_IDS_CLAUSE));
+ }
+
+ #[test]
+ fn builds_with_orderbook_address_filters() {
+ let ob = address!("0x2f209e5b67a33b8fe96e28f24628df6da301c8eb");
+ let stmt = build_fetch_trades_stmt(&FetchTradesArgs {
+ chain_ids: vec![137],
+ orderbook_addresses: vec![ob],
+ ..Default::default()
+ })
+ .unwrap();
+ assert_eq!(stmt.params.len(), 4);
+ assert_eq!(stmt.params[0], SqlValue::U64(137));
+ assert_eq!(stmt.params[1], SqlValue::U64(137));
+ assert_eq!(stmt.params[2], SqlValue::Text(hex::encode_prefixed(ob)));
+ assert_eq!(stmt.params[3], SqlValue::Text(hex::encode_prefixed(ob)));
+ assert!(stmt.sql.contains("t.orderbook_address IN (?3)"));
+ assert!(stmt.sql.contains("c.orderbook_address IN (?4)"));
+ assert!(!stmt.sql.contains(TAKE_ORDERS_ORDERBOOKS_CLAUSE));
+ assert!(!stmt.sql.contains(CLEAR_EVENTS_ORDERBOOKS_CLAUSE));
+ }
+
+ #[test]
+ fn builds_with_directional_token_filters() {
+ let input = address!("0x1111111111111111111111111111111111111111");
+ let output = address!("0x2222222222222222222222222222222222222222");
+ let stmt = build_fetch_trades_stmt(&FetchTradesArgs {
+ tokens: FetchTradesTokensFilter {
+ inputs: vec![input],
+ outputs: vec![output],
+ },
+ ..Default::default()
+ })
+ .unwrap();
+
+ assert!(stmt.sql.contains("tws.input_token IN (?1)"));
+ assert!(stmt.sql.contains("tws.output_token IN (?2)"));
+ assert_eq!(stmt.params[0], SqlValue::Text(hex::encode_prefixed(input)));
+ assert_eq!(stmt.params[1], SqlValue::Text(hex::encode_prefixed(output)));
+ }
+
+ #[test]
+ fn builds_with_same_token_as_either_side_filter() {
+ let token = address!("0x1111111111111111111111111111111111111111");
+ let stmt = build_fetch_trades_stmt(&FetchTradesArgs {
+ tokens: FetchTradesTokensFilter {
+ inputs: vec![token],
+ outputs: vec![token],
+ },
+ ..Default::default()
+ })
+ .unwrap();
+
+ assert!(stmt
+ .sql
+ .contains("tws.input_token IN (?1) OR tws.output_token IN (?2)"));
+ assert!(!stmt.sql.contains(INPUT_TOKENS_CLAUSE));
+ assert!(!stmt.sql.contains(OUTPUT_TOKENS_CLAUSE));
+ assert_eq!(stmt.params[0], SqlValue::Text(hex::encode_prefixed(token)));
+ assert_eq!(stmt.params[1], SqlValue::Text(hex::encode_prefixed(token)));
+ }
+
+ #[test]
+ fn builds_with_pagination() {
+ let stmt = build_fetch_trades_stmt(&FetchTradesArgs {
+ pagination: PaginationParams {
+ page: Some(2),
+ page_size: Some(50),
+ },
+ ..Default::default()
+ })
+ .unwrap();
+
+ assert!(stmt.sql.contains("LIMIT ?1 OFFSET ?2"));
+ assert!(!stmt.sql.contains(PAGINATION_CLAUSE));
+ assert_eq!(stmt.params, vec![SqlValue::U64(50), SqlValue::U64(50)]);
+ }
+
+ #[test]
+ fn disambiguates_clear_sides_in_trade_id_and_sort_order() {
+ let stmt = build_fetch_trades_stmt(&FetchTradesArgs::default()).unwrap();
+
+ assert!(stmt.sql.contains("'alice' AS trade_side"));
+ assert!(stmt.sql.contains("'bob' AS trade_side"));
+ assert!(stmt.sql.contains("WHEN 'alice' THEN '01'"));
+ assert!(stmt.sql.contains("WHEN 'bob' THEN '02'"));
+ assert!(stmt.sql.contains("tws.trade_kind, tws.trade_side\n"));
+ }
+
+ #[test]
+ fn builds_count_query_without_pagination() {
+ let stmt = build_fetch_trades_count_stmt(&FetchTradesArgs {
+ pagination: PaginationParams {
+ page: Some(2),
+ page_size: Some(50),
+ },
+ ..Default::default()
+ })
+ .unwrap();
+
+ assert!(stmt
+ .sql
+ .starts_with("SELECT COUNT(*) AS trade_count FROM ("));
+ assert!(!stmt.sql.contains("LIMIT"));
+ assert!(!stmt.sql.contains(PAGINATION_CLAUSE));
+ }
+
+ #[cfg(not(target_family = "wasm"))]
+ #[test]
+ fn builds_token_filtered_count_query_without_inner_trailing_semicolon() {
+ let token = address!("0x1111111111111111111111111111111111111111");
+ let stmt = build_fetch_trades_count_stmt(&FetchTradesArgs {
+ tokens: FetchTradesTokensFilter {
+ inputs: vec![token],
+ outputs: vec![token],
+ },
+ ..Default::default()
+ })
+ .unwrap();
+
+ assert!(!stmt.sql.contains(";\n) AS filtered_trades"));
+ assert!(!stmt.sql.contains(";) AS filtered_trades"));
+ assert!(stmt
+ .sql
+ .contains("tws.input_token IN (?1) OR tws.output_token IN (?2)"));
+
+ let conn = rusqlite::Connection::open_in_memory().unwrap();
+ crate::local_db::functions::register_all(&conn).unwrap();
+ conn.execute_batch(crate::local_db::query::create_tables::create_tables_sql())
+ .unwrap();
+ conn.prepare(&stmt.sql).unwrap();
+ }
+
+ #[test]
+ fn extract_trades_count_works() {
+ let rows = vec![LocalDbTradeCountRow { trade_count: 42 }];
+ assert_eq!(extract_trades_count(&rows), 42);
+ let empty: Vec = vec![];
+ assert_eq!(extract_trades_count(&empty), 0);
+ }
+}
diff --git a/crates/common/src/local_db/query/fetch_trades/query.sql b/crates/common/src/local_db/query/fetch_trades/query.sql
new file mode 100644
index 0000000000..2a14cd2f8d
--- /dev/null
+++ b/crates/common/src/local_db/query/fetch_trades/query.sql
@@ -0,0 +1,380 @@
+WITH
+matching_take_orders AS (
+ SELECT
+ 'take' AS trade_kind,
+ t.chain_id,
+ t.orderbook_address,
+ t.order_owner,
+ t.order_nonce,
+ t.transaction_hash,
+ t.log_index,
+ t.block_number,
+ t.block_timestamp,
+ t.sender AS transaction_sender,
+ t.input_io_index,
+ t.output_io_index,
+ t.taker_output AS input_delta,
+ FLOAT_NEGATE(t.taker_input) AS output_delta
+ FROM take_orders t
+ WHERE 1 = 1
+ /*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/
+ /*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/
+),
+matching_clears AS (
+ SELECT
+ c.chain_id,
+ c.orderbook_address,
+ c.transaction_hash,
+ c.log_index,
+ c.block_number,
+ c.block_timestamp,
+ c.sender,
+ c.alice_order_hash,
+ c.bob_order_hash,
+ c.alice_input_io_index,
+ c.alice_output_io_index,
+ c.alice_input_vault_id,
+ c.alice_output_vault_id,
+ c.bob_input_io_index,
+ c.bob_output_io_index,
+ c.bob_input_vault_id,
+ c.bob_output_vault_id
+ FROM clear_v3_events c
+ WHERE 1 = 1
+ /*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/
+ /*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/
+),
+take_trades AS (
+ SELECT
+ mt.trade_kind,
+ 'take' AS trade_side,
+ mt.chain_id,
+ mt.orderbook_address,
+ oe.order_hash,
+ mt.order_owner,
+ mt.order_nonce,
+ mt.transaction_hash,
+ mt.log_index,
+ mt.block_number,
+ mt.block_timestamp,
+ mt.transaction_sender,
+ io_in.vault_id AS input_vault_id,
+ io_in.token AS input_token,
+ mt.input_delta,
+ io_out.vault_id AS output_vault_id,
+ io_out.token AS output_token,
+ mt.output_delta
+ FROM matching_take_orders mt
+ JOIN order_events oe
+ ON oe.chain_id = mt.chain_id
+ AND oe.orderbook_address = mt.orderbook_address
+ AND oe.order_owner = mt.order_owner
+ AND oe.order_nonce = mt.order_nonce
+ AND oe.event_type = 'AddOrderV3'
+ AND (
+ oe.block_number < mt.block_number
+ OR (oe.block_number = mt.block_number AND oe.log_index <= mt.log_index)
+ )
+ AND NOT EXISTS (
+ SELECT 1
+ FROM order_events newer
+ WHERE newer.chain_id = oe.chain_id
+ AND newer.orderbook_address = oe.orderbook_address
+ AND newer.order_owner = oe.order_owner
+ AND newer.order_nonce = oe.order_nonce
+ AND newer.event_type = 'AddOrderV3'
+ AND (
+ newer.block_number < mt.block_number
+ OR (newer.block_number = mt.block_number AND newer.log_index <= mt.log_index)
+ )
+ AND (
+ newer.block_number > oe.block_number
+ OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)
+ )
+ )
+ JOIN order_ios io_in
+ ON io_in.chain_id = oe.chain_id
+ AND io_in.orderbook_address = oe.orderbook_address
+ AND io_in.transaction_hash = oe.transaction_hash
+ AND io_in.log_index = oe.log_index
+ AND io_in.io_index = mt.input_io_index
+ AND io_in.io_type = 'input'
+ JOIN order_ios io_out
+ ON io_out.chain_id = oe.chain_id
+ AND io_out.orderbook_address = oe.orderbook_address
+ AND io_out.transaction_hash = oe.transaction_hash
+ AND io_out.log_index = oe.log_index
+ AND io_out.io_index = mt.output_io_index
+ AND io_out.io_type = 'output'
+),
+clear_alice AS (
+ SELECT DISTINCT
+ 'clear' AS trade_kind,
+ 'alice' AS trade_side,
+ mc.chain_id,
+ mc.orderbook_address,
+ oe.order_hash,
+ oe.order_owner,
+ oe.order_nonce,
+ mc.transaction_hash,
+ mc.log_index,
+ mc.block_number,
+ mc.block_timestamp,
+ mc.sender AS transaction_sender,
+ mc.alice_input_vault_id AS input_vault_id,
+ io_in.token AS input_token,
+ a.alice_input AS input_delta,
+ mc.alice_output_vault_id AS output_vault_id,
+ io_out.token AS output_token,
+ FLOAT_NEGATE(a.alice_output) AS output_delta
+ FROM matching_clears mc
+ JOIN order_events oe
+ ON oe.chain_id = mc.chain_id
+ AND oe.orderbook_address = mc.orderbook_address
+ AND oe.order_hash = mc.alice_order_hash
+ AND oe.event_type = 'AddOrderV3'
+ AND (
+ oe.block_number < mc.block_number
+ OR (oe.block_number = mc.block_number AND oe.log_index <= mc.log_index)
+ )
+ AND NOT EXISTS (
+ SELECT 1
+ FROM order_events newer
+ WHERE newer.chain_id = oe.chain_id
+ AND newer.orderbook_address = oe.orderbook_address
+ AND newer.order_hash = oe.order_hash
+ AND newer.event_type = 'AddOrderV3'
+ AND (
+ newer.block_number < mc.block_number
+ OR (newer.block_number = mc.block_number AND newer.log_index <= mc.log_index)
+ )
+ AND (
+ newer.block_number > oe.block_number
+ OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)
+ )
+ )
+ JOIN after_clear_v2_events a
+ ON a.chain_id = mc.chain_id
+ AND a.orderbook_address = mc.orderbook_address
+ AND a.transaction_hash = mc.transaction_hash
+ AND a.log_index = (
+ SELECT MIN(ac.log_index)
+ FROM after_clear_v2_events ac
+ WHERE ac.chain_id = mc.chain_id
+ AND ac.orderbook_address = mc.orderbook_address
+ AND ac.transaction_hash = mc.transaction_hash
+ AND ac.log_index > mc.log_index
+ )
+ JOIN order_ios io_in
+ ON io_in.chain_id = oe.chain_id
+ AND io_in.orderbook_address = oe.orderbook_address
+ AND io_in.transaction_hash = oe.transaction_hash
+ AND io_in.log_index = oe.log_index
+ AND io_in.io_index = mc.alice_input_io_index
+ AND io_in.io_type = 'input'
+ JOIN order_ios io_out
+ ON io_out.chain_id = oe.chain_id
+ AND io_out.orderbook_address = oe.orderbook_address
+ AND io_out.transaction_hash = oe.transaction_hash
+ AND io_out.log_index = oe.log_index
+ AND io_out.io_index = mc.alice_output_io_index
+ AND io_out.io_type = 'output'
+),
+clear_bob AS (
+ SELECT DISTINCT
+ 'clear' AS trade_kind,
+ 'bob' AS trade_side,
+ mc.chain_id,
+ mc.orderbook_address,
+ oe.order_hash,
+ oe.order_owner,
+ oe.order_nonce,
+ mc.transaction_hash,
+ mc.log_index,
+ mc.block_number,
+ mc.block_timestamp,
+ mc.sender AS transaction_sender,
+ mc.bob_input_vault_id AS input_vault_id,
+ io_in.token AS input_token,
+ a.bob_input AS input_delta,
+ mc.bob_output_vault_id AS output_vault_id,
+ io_out.token AS output_token,
+ FLOAT_NEGATE(a.bob_output) AS output_delta
+ FROM matching_clears mc
+ JOIN order_events oe
+ ON oe.chain_id = mc.chain_id
+ AND oe.orderbook_address = mc.orderbook_address
+ AND oe.order_hash = mc.bob_order_hash
+ AND oe.event_type = 'AddOrderV3'
+ AND (
+ oe.block_number < mc.block_number
+ OR (oe.block_number = mc.block_number AND oe.log_index <= mc.log_index)
+ )
+ AND NOT EXISTS (
+ SELECT 1
+ FROM order_events newer
+ WHERE newer.chain_id = oe.chain_id
+ AND newer.orderbook_address = oe.orderbook_address
+ AND newer.order_hash = oe.order_hash
+ AND newer.event_type = 'AddOrderV3'
+ AND (
+ newer.block_number < mc.block_number
+ OR (newer.block_number = mc.block_number AND newer.log_index <= mc.log_index)
+ )
+ AND (
+ newer.block_number > oe.block_number
+ OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index)
+ )
+ )
+ JOIN after_clear_v2_events a
+ ON a.chain_id = mc.chain_id
+ AND a.orderbook_address = mc.orderbook_address
+ AND a.transaction_hash = mc.transaction_hash
+ AND a.log_index = (
+ SELECT MIN(ac.log_index)
+ FROM after_clear_v2_events ac
+ WHERE ac.chain_id = mc.chain_id
+ AND ac.orderbook_address = mc.orderbook_address
+ AND ac.transaction_hash = mc.transaction_hash
+ AND ac.log_index > mc.log_index
+ )
+ JOIN order_ios io_in
+ ON io_in.chain_id = oe.chain_id
+ AND io_in.orderbook_address = oe.orderbook_address
+ AND io_in.transaction_hash = oe.transaction_hash
+ AND io_in.log_index = oe.log_index
+ AND io_in.io_index = mc.bob_input_io_index
+ AND io_in.io_type = 'input'
+ JOIN order_ios io_out
+ ON io_out.chain_id = oe.chain_id
+ AND io_out.orderbook_address = oe.orderbook_address
+ AND io_out.transaction_hash = oe.transaction_hash
+ AND io_out.log_index = oe.log_index
+ AND io_out.io_index = mc.bob_output_io_index
+ AND io_out.io_type = 'output'
+),
+clear_trades AS (
+ SELECT * FROM clear_alice
+ UNION ALL
+ SELECT * FROM clear_bob
+),
+unioned_trades AS (
+ SELECT * FROM take_trades
+ UNION ALL
+ SELECT * FROM clear_trades
+),
+trade_rows AS (
+ SELECT
+ ut.trade_kind,
+ ut.trade_side,
+ ut.chain_id,
+ ut.orderbook_address,
+ ut.order_hash,
+ ut.order_owner,
+ ut.order_nonce,
+ ut.transaction_hash,
+ ut.log_index,
+ ut.block_number,
+ ut.block_timestamp,
+ ut.transaction_sender,
+ ut.input_vault_id,
+ ut.input_token,
+ ut.input_delta,
+ ut.output_vault_id,
+ ut.output_token,
+ ut.output_delta
+ FROM unioned_trades ut
+),
+trade_with_snapshots AS (
+ SELECT
+ tr.*,
+ mvb_in.balance AS input_base_balance,
+ mvb_in.last_block AS input_base_block,
+ mvb_in.last_log_index AS input_base_log_index,
+ mvb_out.balance AS output_base_balance,
+ mvb_out.last_block AS output_base_block,
+ mvb_out.last_log_index AS output_base_log_index
+ FROM trade_rows tr
+ LEFT JOIN running_vault_balances mvb_in
+ ON mvb_in.chain_id = tr.chain_id
+ AND mvb_in.orderbook_address = tr.orderbook_address
+ AND mvb_in.owner = tr.order_owner
+ AND mvb_in.token = tr.input_token
+ AND mvb_in.vault_id = tr.input_vault_id
+ LEFT JOIN running_vault_balances mvb_out
+ ON mvb_out.chain_id = tr.chain_id
+ AND mvb_out.orderbook_address = tr.orderbook_address
+ AND mvb_out.owner = tr.order_owner
+ AND mvb_out.token = tr.output_token
+ AND mvb_out.vault_id = tr.output_vault_id
+)
+SELECT
+ tws.chain_id,
+ tws.trade_kind,
+ tws.orderbook_address AS orderbook,
+ tws.order_hash,
+ tws.order_owner,
+ tws.order_nonce,
+ tws.transaction_hash,
+ tws.log_index,
+ tws.block_number,
+ tws.block_timestamp,
+ tws.transaction_sender,
+ tws.input_vault_id,
+ tws.input_token,
+ tok_in.name AS input_token_name,
+ tok_in.symbol AS input_token_symbol,
+ tok_in.decimals AS input_token_decimals,
+ tws.input_delta,
+ vbc_input.running_balance AS input_running_balance,
+ tws.output_vault_id,
+ tws.output_token,
+ tok_out.name AS output_token_name,
+ tok_out.symbol AS output_token_symbol,
+ tok_out.decimals AS output_token_decimals,
+ tws.output_delta,
+ vbc_output.running_balance AS output_running_balance,
+ (
+ '0x' ||
+ lower(replace(tws.transaction_hash, '0x', '')) ||
+ printf('%016x', tws.log_index) ||
+ CASE tws.trade_side
+ WHEN 'alice' THEN '01'
+ WHEN 'bob' THEN '02'
+ ELSE ''
+ END
+ ) AS trade_id
+FROM trade_with_snapshots tws
+LEFT JOIN vault_balance_changes vbc_input
+ ON vbc_input.chain_id = tws.chain_id
+ AND vbc_input.orderbook_address = tws.orderbook_address
+ AND vbc_input.owner = tws.order_owner
+ AND vbc_input.token = tws.input_token
+ AND vbc_input.vault_id = tws.input_vault_id
+ AND vbc_input.block_number = tws.block_number
+ AND vbc_input.log_index = tws.log_index
+LEFT JOIN vault_balance_changes vbc_output
+ ON vbc_output.chain_id = tws.chain_id
+ AND vbc_output.orderbook_address = tws.orderbook_address
+ AND vbc_output.owner = tws.order_owner
+ AND vbc_output.token = tws.output_token
+ AND vbc_output.vault_id = tws.output_vault_id
+ AND vbc_output.block_number = tws.block_number
+ AND vbc_output.log_index = tws.log_index
+LEFT JOIN erc20_tokens tok_in
+ ON tok_in.chain_id = tws.chain_id
+ AND tok_in.orderbook_address = tws.orderbook_address
+ AND tok_in.token_address = tws.input_token
+LEFT JOIN erc20_tokens tok_out
+ ON tok_out.chain_id = tws.chain_id
+ AND tok_out.orderbook_address = tws.orderbook_address
+ AND tok_out.token_address = tws.output_token
+WHERE 1 = 1
+/*OWNERS_CLAUSE*/
+/*ORDER_HASH_CLAUSE*/
+/*START_TS_CLAUSE*/
+/*END_TS_CLAUSE*/
+/*INPUT_TOKENS_CLAUSE*/
+/*OUTPUT_TOKENS_CLAUSE*/
+ORDER BY tws.block_timestamp DESC, tws.block_number DESC, tws.log_index DESC, tws.trade_kind, tws.trade_side
+/*PAGINATION_CLAUSE*/;
diff --git a/crates/common/src/local_db/query/mod.rs b/crates/common/src/local_db/query/mod.rs
index f510f14e55..a6d51de9f5 100644
--- a/crates/common/src/local_db/query/mod.rs
+++ b/crates/common/src/local_db/query/mod.rs
@@ -19,6 +19,7 @@ pub mod fetch_owner_trades_count;
pub mod fetch_store_addresses;
pub mod fetch_tables;
pub mod fetch_target_watermark;
+pub mod fetch_trades;
pub mod fetch_trades_by_tx;
pub mod fetch_transaction_by_hash;
pub mod fetch_vault_balance_changes;
diff --git a/crates/common/src/raindex_client/local_db/query/fetch_trades.rs b/crates/common/src/raindex_client/local_db/query/fetch_trades.rs
new file mode 100644
index 0000000000..8fb2b89adc
--- /dev/null
+++ b/crates/common/src/raindex_client/local_db/query/fetch_trades.rs
@@ -0,0 +1,57 @@
+use crate::local_db::query::fetch_order_trades::LocalDbOrderTrade;
+use crate::local_db::query::fetch_order_trades_count::LocalDbTradeCountRow;
+use crate::local_db::query::fetch_trades::{
+ build_fetch_trades_count_stmt, build_fetch_trades_stmt, FetchTradesArgs,
+};
+use crate::local_db::query::{LocalDbQueryError, LocalDbQueryExecutor};
+
+pub async fn fetch_trades(
+ exec: &E,
+ args: FetchTradesArgs,
+) -> Result, LocalDbQueryError> {
+ let stmt = build_fetch_trades_stmt(&args)?;
+ exec.query_json(&stmt).await
+}
+
+pub async fn fetch_trades_count(
+ exec: &E,
+ args: FetchTradesArgs,
+) -> Result, LocalDbQueryError> {
+ let stmt = build_fetch_trades_count_stmt(&args)?;
+ exec.query_json(&stmt).await
+}
+
+#[cfg(all(test, target_family = "wasm"))]
+mod wasm_tests {
+ use super::*;
+ use crate::raindex_client::local_db::executor::tests::create_sql_capturing_callback;
+ use crate::raindex_client::local_db::executor::JsCallbackExecutor;
+ use std::cell::RefCell;
+ use std::rc::Rc;
+ use wasm_bindgen_test::*;
+ use wasm_bindgen_utils::prelude::*;
+
+ #[wasm_bindgen_test]
+ async fn wrapper_uses_builder_sql_exactly() {
+ let args = FetchTradesArgs {
+ chain_ids: vec![137, 42161],
+ orderbook_addresses: vec![],
+ ..Default::default()
+ };
+
+ let expected_stmt = build_fetch_trades_stmt(&args).unwrap();
+
+ let store = Rc::new(RefCell::new((
+ String::new(),
+ wasm_bindgen::JsValue::UNDEFINED,
+ )));
+ let callback = create_sql_capturing_callback("[]", store.clone());
+ let exec = JsCallbackExecutor::from_ref(&callback);
+
+ let res = fetch_trades(&exec, args).await;
+ assert!(res.is_ok());
+
+ let captured = store.borrow().clone();
+ assert_eq!(captured.0, expected_stmt.sql);
+ }
+}
diff --git a/crates/common/src/raindex_client/local_db/query/mod.rs b/crates/common/src/raindex_client/local_db/query/mod.rs
index 386654ab32..99659a8310 100644
--- a/crates/common/src/raindex_client/local_db/query/mod.rs
+++ b/crates/common/src/raindex_client/local_db/query/mod.rs
@@ -12,6 +12,7 @@ pub mod fetch_owner_trades;
pub mod fetch_owner_trades_count;
pub mod fetch_store_addresses;
pub mod fetch_tables;
+pub mod fetch_trades;
pub mod fetch_trades_by_tx;
pub mod fetch_transaction_by_hash;
pub mod fetch_vault_balance_changes;
diff --git a/crates/common/src/raindex_client/trades/get_all.rs b/crates/common/src/raindex_client/trades/get_all.rs
new file mode 100644
index 0000000000..f4babbca91
--- /dev/null
+++ b/crates/common/src/raindex_client/trades/get_all.rs
@@ -0,0 +1,641 @@
+use super::*;
+use crate::local_db::query::fetch_trades::{
+ extract_trades_count, FetchTradesArgs, FetchTradesTokensFilter,
+};
+use crate::raindex_client::local_db::query::fetch_trades::{fetch_trades, fetch_trades_count};
+use crate::utils::timing::Timing;
+use rain_orderbook_subgraph_client::types::common::{
+ SgBigInt, SgBytes, SgTrade, SgTradeOrderFilter, SgTradesListQueryFilters,
+ SgTradesTokensFilterArgs,
+};
+use rain_orderbook_subgraph_client::MultiOrderbookSubgraphClient;
+use tsify::Tsify;
+use wasm_bindgen_utils::impl_wasm_traits;
+
+const DEFAULT_PAGE_SIZE: u16 = 100;
+
+#[derive(Serialize, Deserialize, Debug, Clone, Tsify, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct GetTradesTokenFilter {
+ #[cfg_attr(target_family = "wasm", tsify(optional, type = "Address[]"))]
+ pub inputs: Option>,
+ #[cfg_attr(target_family = "wasm", tsify(optional, type = "Address[]"))]
+ pub outputs: Option>,
+}
+impl_wasm_traits!(GetTradesTokenFilter);
+
+#[derive(Serialize, Deserialize, Debug, Clone, Tsify, Default)]
+#[serde(rename_all = "camelCase", default)]
+pub struct GetTradesFilters {
+ #[tsify(optional, type = "Address[]")]
+ pub owners: Vec,
+ #[tsify(optional, type = "Hex")]
+ pub order_hash: Option,
+ #[tsify(optional)]
+ pub tokens: Option,
+ #[tsify(optional, type = "Address[]")]
+ pub orderbook_addresses: Option>,
+ #[tsify(optional)]
+ pub time_filter: Option,
+}
+impl_wasm_traits!(GetTradesFilters);
+
+impl From for Option {
+ fn from(filter: GetTradesTokenFilter) -> Self {
+ let inputs: Vec = filter
+ .inputs
+ .unwrap_or_default()
+ .into_iter()
+ .map(|token| token.to_string().to_lowercase())
+ .collect();
+ let outputs: Vec = filter
+ .outputs
+ .unwrap_or_default()
+ .into_iter()
+ .map(|token| token.to_string().to_lowercase())
+ .collect();
+
+ if inputs.is_empty() && outputs.is_empty() {
+ None
+ } else {
+ Some(SgTradesTokensFilterArgs { inputs, outputs })
+ }
+ }
+}
+
+impl From for SgTradesListQueryFilters {
+ fn from(filters: GetTradesFilters) -> Self {
+ let time_filter = filters.time_filter.unwrap_or_default();
+ let mut sg_filters = SgTradesListQueryFilters {
+ timestamp_gte: Some(
+ time_filter
+ .start
+ .map_or(SgBigInt("0".to_string()), |v| SgBigInt(v.to_string())),
+ ),
+ timestamp_lte: Some(
+ time_filter
+ .end
+ .map_or(SgBigInt(u64::MAX.to_string()), |v| SgBigInt(v.to_string())),
+ ),
+ orderbook_in: filters
+ .orderbook_addresses
+ .unwrap_or_default()
+ .into_iter()
+ .map(|address| address.to_string().to_lowercase())
+ .collect(),
+ ..Default::default()
+ };
+
+ if !filters.owners.is_empty() || filters.order_hash.is_some() {
+ sg_filters.order_ = Some(SgTradeOrderFilter {
+ owner_in: filters
+ .owners
+ .into_iter()
+ .map(|owner| SgBytes(owner.to_string()))
+ .collect(),
+ order_hash: filters.order_hash.map(|hash| SgBytes(hash.to_string())),
+ });
+ }
+
+ sg_filters
+ }
+}
+
+fn local_trades_pagination(
+ has_subgraph_sources: bool,
+ page_number: u16,
+ page_size: u16,
+) -> (PaginationParams, bool) {
+ if has_subgraph_sources {
+ (PaginationParams::default(), false)
+ } else {
+ (
+ PaginationParams {
+ page: Some(page_number),
+ page_size: Some(page_size),
+ },
+ true,
+ )
+ }
+}
+
+fn normalize_trade_tokens(mut tokens: SgTradesTokensFilterArgs) -> SgTradesTokensFilterArgs {
+ tokens.inputs.sort_unstable();
+ tokens.inputs.dedup();
+ tokens.outputs.sort_unstable();
+ tokens.outputs.dedup();
+ tokens
+}
+
+fn sg_trade_matches_token_filter(trade: &SgTrade, tokens: &SgTradesTokensFilterArgs) -> bool {
+ let has_inputs = !tokens.inputs.is_empty();
+ let has_outputs = !tokens.outputs.is_empty();
+ let input_token = trade
+ .input_vault_balance_change
+ .vault
+ .token
+ .address
+ .0
+ .to_lowercase();
+ let output_token = trade
+ .output_vault_balance_change
+ .vault
+ .token
+ .address
+ .0
+ .to_lowercase();
+
+ if has_inputs && has_outputs && tokens.inputs == tokens.outputs {
+ tokens.inputs.contains(&input_token) || tokens.outputs.contains(&output_token)
+ } else {
+ (!has_inputs || tokens.inputs.contains(&input_token))
+ && (!has_outputs || tokens.outputs.contains(&output_token))
+ }
+}
+
+#[wasm_export]
+impl RaindexClient {
+ #[wasm_export(
+ js_name = "getTrades",
+ return_description = "Trades list result with total count and per-pair summary",
+ unchecked_return_type = "RaindexTradesListResult",
+ preserve_js_class
+ )]
+ pub async fn get_trades(
+ &self,
+ #[wasm_export(
+ js_name = "chainIds",
+ param_description = "Specific blockchain networks to query (optional, queries all networks if not specified)"
+ )]
+ chain_ids: Option,
+ #[wasm_export(
+ param_description = "Filtering criteria including owners, order hash, token addresses, orderbooks, and time range"
+ )]
+ filters: Option,
+ #[wasm_export(param_description = "Page number for pagination (optional, defaults to 1)")]
+ page: Option,
+ #[wasm_export(
+ js_name = "pageSize",
+ param_description = "Number of items per page (optional, defaults to 100)"
+ )]
+ page_size: Option,
+ ) -> Result {
+ let route_started = Timing::now();
+ let filters = filters.unwrap_or_default();
+ let page_number = page.unwrap_or(1).max(1);
+ let page_size = page_size.unwrap_or(DEFAULT_PAGE_SIZE).max(1);
+ let ids = chain_ids.map(|ChainIds(ids)| ids);
+ let requested_chain_ids_count = ids.as_ref().map_or(0, Vec::len);
+ let (local_db, local_ids, sg_ids) = self.classify_chains(ids)?;
+ let local_chain_ids_count = local_ids.len();
+ let subgraph_chain_ids_count = sg_ids.len();
+ let input_tokens_count = filters
+ .tokens
+ .as_ref()
+ .and_then(|tokens| tokens.inputs.as_ref())
+ .map_or(0, Vec::len);
+ let output_tokens_count = filters
+ .tokens
+ .as_ref()
+ .and_then(|tokens| tokens.outputs.as_ref())
+ .map_or(0, Vec::len);
+ let orderbooks_count = filters.orderbook_addresses.as_ref().map_or(0, Vec::len);
+
+ let mut all_trades = Vec::new();
+ let mut total_count: u64 = 0;
+ let mut local_fetch_ms = None;
+ let mut local_count_ms = None;
+ let mut local_rows = 0usize;
+ let mut local_total_count = 0u64;
+ let mut local_query_paginated = false;
+ let mut subgraph_fetch_ms = None;
+ let mut subgraph_rows_before_token_filter = 0usize;
+ let mut subgraph_rows_after_token_filter = 0usize;
+
+ if let Some(db) = local_db {
+ let (local_pagination, is_local_query_paginated) =
+ local_trades_pagination(!sg_ids.is_empty(), page_number, page_size);
+ local_query_paginated = is_local_query_paginated;
+ let local_tokens = filters
+ .tokens
+ .clone()
+ .map(|tokens| FetchTradesTokensFilter {
+ inputs: tokens.inputs.unwrap_or_default(),
+ outputs: tokens.outputs.unwrap_or_default(),
+ })
+ .unwrap_or_default();
+ let local_fetch_started = Timing::now();
+ let local_trades = match fetch_trades(
+ &db,
+ FetchTradesArgs {
+ chain_ids: local_ids.clone(),
+ orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(),
+ owners: filters.owners.clone(),
+ order_hash: filters.order_hash,
+ tokens: local_tokens.clone(),
+ time_filter: filters.time_filter.clone().unwrap_or_default(),
+ pagination: local_pagination,
+ },
+ )
+ .await
+ {
+ Ok(trades) => trades,
+ Err(err) => {
+ tracing::error!(
+ elapsed_ms = local_fetch_started.elapsed_ms(),
+ local_chain_ids_count,
+ error = %err,
+ "getTrades local DB fetch failed"
+ );
+ return Err(err.into());
+ }
+ };
+ local_fetch_ms = Some(local_fetch_started.elapsed_ms());
+ local_rows = local_trades.len();
+
+ let local_count_started = Timing::now();
+ let count_rows = match fetch_trades_count(
+ &db,
+ FetchTradesArgs {
+ chain_ids: local_ids,
+ orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(),
+ owners: filters.owners.clone(),
+ order_hash: filters.order_hash,
+ tokens: local_tokens,
+ time_filter: filters.time_filter.clone().unwrap_or_default(),
+ pagination: PaginationParams::default(),
+ },
+ )
+ .await
+ {
+ Ok(count_rows) => count_rows,
+ Err(err) => {
+ tracing::error!(
+ elapsed_ms = local_count_started.elapsed_ms(),
+ local_chain_ids_count,
+ error = %err,
+ "getTrades local DB count failed"
+ );
+ return Err(err.into());
+ }
+ };
+ local_count_ms = Some(local_count_started.elapsed_ms());
+ local_total_count = extract_trades_count(&count_rows);
+ total_count += local_total_count;
+ all_trades.extend(
+ local_trades
+ .into_iter()
+ .map(RaindexTrade::try_from_local_db_trade)
+ .collect::, _>>()?,
+ );
+ tracing::debug!(
+ local_chain_ids_count,
+ rows = local_rows,
+ total_count = local_total_count,
+ fetch_ms = local_fetch_ms,
+ count_ms = local_count_ms,
+ "getTrades local DB completed"
+ );
+ }
+
+ if !sg_ids.is_empty() {
+ let multi_subgraph_args = self.get_multi_subgraph_args(Some(sg_ids))?;
+ let sg_filters: SgTradesListQueryFilters = filters.clone().into();
+ let sg_token_filter = filters
+ .tokens
+ .clone()
+ .and_then(Option::::from)
+ .map(normalize_trade_tokens);
+ let name_to_chain_id: std::collections::HashMap<&str, u32> = multi_subgraph_args
+ .iter()
+ .flat_map(|(chain_id, args)| args.iter().map(|arg| (arg.name.as_str(), *chain_id)))
+ .collect();
+ let client = MultiOrderbookSubgraphClient::new(
+ multi_subgraph_args.values().flatten().cloned().collect(),
+ );
+ let subgraph_fetch_started = Timing::now();
+ let sg_trades = match client.trades_list_all(sg_filters).await {
+ Ok(trades) => trades,
+ Err(err) => {
+ tracing::error!(
+ elapsed_ms = subgraph_fetch_started.elapsed_ms(),
+ subgraph_chain_ids_count,
+ error = %err,
+ "getTrades subgraph fetch failed"
+ );
+ return Err(err.into());
+ }
+ };
+ subgraph_fetch_ms = Some(subgraph_fetch_started.elapsed_ms());
+ subgraph_rows_before_token_filter = sg_trades.len();
+ let sg_trades: Vec<_> = if let Some(tokens) = sg_token_filter {
+ let filtered_trades: Vec<_> = sg_trades
+ .into_iter()
+ .filter(|trade| sg_trade_matches_token_filter(&trade.trade, &tokens))
+ .collect();
+ total_count += filtered_trades.len() as u64;
+ subgraph_rows_after_token_filter = filtered_trades.len();
+ filtered_trades
+ } else {
+ total_count += sg_trades.len() as u64;
+ subgraph_rows_after_token_filter = sg_trades.len();
+ sg_trades
+ };
+ for trade_with_name in sg_trades {
+ let chain_id = name_to_chain_id
+ .get(trade_with_name.subgraph_name.as_str())
+ .copied()
+ .ok_or(RaindexError::SubgraphNotFound(
+ trade_with_name.subgraph_name.clone(),
+ trade_with_name.trade.id.0.clone(),
+ ))?;
+ all_trades.push(RaindexTrade::try_from_sg_trade(
+ chain_id,
+ trade_with_name.trade,
+ )?);
+ }
+ tracing::debug!(
+ subgraph_chain_ids_count,
+ rows_before_token_filter = subgraph_rows_before_token_filter,
+ rows_after_token_filter = subgraph_rows_after_token_filter,
+ fetch_ms = subgraph_fetch_ms,
+ "getTrades subgraph completed"
+ );
+ }
+
+ let merge_started = Timing::now();
+ all_trades.sort_by(|a, b| b.timestamp.cmp(&a.timestamp).then_with(|| a.id.cmp(&b.id)));
+ let offset = if local_query_paginated && subgraph_chain_ids_count == 0 {
+ 0
+ } else {
+ ((page_number - 1) as usize) * (page_size as usize)
+ };
+ let limit = page_size as usize;
+ let merged_rows_before_pagination = all_trades.len();
+ let trades: Vec<_> = all_trades.into_iter().skip(offset).take(limit).collect();
+ let returned_rows = trades.len();
+ let summary = RaindexPairSummary::from_trades(&trades)?;
+ let merge_sort_page_ms = merge_started.elapsed_ms();
+ tracing::debug!(
+ page = page_number,
+ page_size,
+ requested_chain_ids_count,
+ local_chain_ids_count,
+ subgraph_chain_ids_count,
+ owners_count = filters.owners.len(),
+ orderbooks_count,
+ has_order_hash = filters.order_hash.is_some(),
+ has_time_filter = filters.time_filter.is_some(),
+ input_tokens_count,
+ output_tokens_count,
+ local_fetch_ms,
+ local_count_ms,
+ local_query_paginated,
+ local_rows,
+ local_total_count,
+ subgraph_fetch_ms,
+ subgraph_rows_before_token_filter,
+ subgraph_rows_after_token_filter,
+ merged_rows_before_pagination,
+ returned_rows,
+ total_count,
+ merge_sort_page_ms,
+ total_ms = route_started.elapsed_ms(),
+ "getTrades completed"
+ );
+ Ok(RaindexTradesListResult {
+ trades,
+ total_count,
+ summary: Some(summary),
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use alloy::primitives::{address, b256};
+ use rain_orderbook_subgraph_client::types::common::{
+ SgErc20, SgOrderbook, SgTradeEvent, SgTradeEventTypename, SgTradeRef,
+ SgTradeStructPartialOrder, SgTradeVaultBalanceChange, SgTransaction,
+ SgVaultBalanceChangeVault,
+ };
+
+ #[test]
+ fn local_trades_pagination_is_applied_for_local_only_queries() {
+ let (pagination, is_paginated) = local_trades_pagination(false, 2, 25);
+
+ assert!(is_paginated);
+ assert_eq!(pagination.page, Some(2));
+ assert_eq!(pagination.page_size, Some(25));
+ }
+
+ #[test]
+ fn local_trades_pagination_is_deferred_for_mixed_source_queries() {
+ let (pagination, is_paginated) = local_trades_pagination(true, 2, 25);
+
+ assert!(!is_paginated);
+ assert_eq!(pagination.page, None);
+ assert_eq!(pagination.page_size, None);
+ }
+
+ #[test]
+ fn maps_trade_filters_to_subgraph_filters() {
+ let owner = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
+ let orderbook = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
+ let order_hash =
+ b256!("0x0000000000000000000000000000000000000000000000000000000000abcdef");
+
+ let filters: SgTradesListQueryFilters = GetTradesFilters {
+ owners: vec![owner],
+ order_hash: Some(order_hash),
+ orderbook_addresses: Some(vec![orderbook]),
+ time_filter: Some(TimeFilter {
+ start: Some(10),
+ end: Some(20),
+ }),
+ ..Default::default()
+ }
+ .into();
+
+ let order_filter = filters.order_.unwrap();
+ assert_eq!(order_filter.owner_in, vec![SgBytes(owner.to_string())]);
+ assert_eq!(
+ order_filter.order_hash,
+ Some(SgBytes(order_hash.to_string()))
+ );
+ assert_eq!(filters.orderbook_in, vec![format!("{orderbook:#x}")]);
+ assert_eq!(filters.timestamp_gte, Some(SgBigInt("10".to_string())));
+ assert_eq!(filters.timestamp_lte, Some(SgBigInt("20".to_string())));
+ }
+
+ #[test]
+ fn deserializes_partial_filters_without_owners() {
+ let json = serde_json::json!({
+ "tokens": {
+ "inputs": ["0x1111111111111111111111111111111111111111"]
+ }
+ });
+
+ let filters: GetTradesFilters = serde_json::from_value(json).unwrap();
+
+ assert!(filters.owners.is_empty());
+ assert!(filters.tokens.is_some());
+ }
+
+ #[test]
+ fn maps_token_filters_to_subgraph_candidate_filter_without_unsupported_child_nesting() {
+ let owner = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
+ let orderbook = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
+ let token_a = address!("0x1111111111111111111111111111111111111111");
+ let token_b = address!("0x2222222222222222222222222222222222222222");
+ let filters: SgTradesListQueryFilters = GetTradesFilters {
+ owners: vec![owner],
+ orderbook_addresses: Some(vec![orderbook]),
+ time_filter: Some(TimeFilter {
+ start: Some(10),
+ end: Some(20),
+ }),
+ tokens: Some(GetTradesTokenFilter {
+ inputs: Some(vec![token_b, token_a, token_a]),
+ outputs: Some(vec![token_a, token_b]),
+ }),
+ ..Default::default()
+ }
+ .into();
+
+ let order_filter = filters.order_.as_ref().unwrap();
+ assert_eq!(order_filter.owner_in, vec![SgBytes(owner.to_string())]);
+ assert_eq!(filters.timestamp_gte, Some(SgBigInt("10".to_string())));
+ assert_eq!(filters.timestamp_lte, Some(SgBigInt("20".to_string())));
+ assert_eq!(filters.orderbook_in, vec![format!("{orderbook:#x}")]);
+ assert!(filters.or.is_none());
+ assert!(filters.input_vault_balance_change_.is_none());
+ assert!(filters.output_vault_balance_change_.is_none());
+ }
+
+ fn test_sg_trade(input_token: Address, output_token: Address) -> SgTrade {
+ fn erc20(token: Address) -> SgErc20 {
+ SgErc20 {
+ id: SgBytes(token.to_string()),
+ address: SgBytes(token.to_string()),
+ name: Some("Token".to_string()),
+ symbol: Some("TKN".to_string()),
+ decimals: Some(SgBigInt("18".to_string())),
+ }
+ }
+
+ fn balance_change(token: Address) -> SgTradeVaultBalanceChange {
+ SgTradeVaultBalanceChange {
+ id: SgBytes(format!("{}-change", token)),
+ __typename: "TradeVaultBalanceChange".to_string(),
+ amount: SgBytes("0x01".to_string()),
+ new_vault_balance: SgBytes("0x01".to_string()),
+ old_vault_balance: SgBytes("0x00".to_string()),
+ vault: SgVaultBalanceChangeVault {
+ id: SgBytes(format!("{}-vault", token)),
+ vault_id: SgBytes("0x01".to_string()),
+ token: erc20(token),
+ },
+ timestamp: SgBigInt("10".to_string()),
+ transaction: SgTransaction {
+ id: SgBytes(
+ "0x0000000000000000000000000000000000000000000000000000000000000001"
+ .to_string(),
+ ),
+ from: SgBytes("0x0000000000000000000000000000000000000000".to_string()),
+ block_number: SgBigInt("1".to_string()),
+ timestamp: SgBigInt("10".to_string()),
+ },
+ orderbook: SgOrderbook {
+ id: SgBytes("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()),
+ },
+ trade: SgTradeRef {
+ trade_event: SgTradeEventTypename {
+ __typename: "TakeOrder".to_string(),
+ },
+ },
+ }
+ }
+
+ SgTrade {
+ id: SgBytes(
+ "0x0000000000000000000000000000000000000000000000000000000000000001".to_string(),
+ ),
+ trade_event: SgTradeEvent {
+ transaction: SgTransaction {
+ id: SgBytes(
+ "0x0000000000000000000000000000000000000000000000000000000000000001"
+ .to_string(),
+ ),
+ from: SgBytes("0x0000000000000000000000000000000000000000".to_string()),
+ block_number: SgBigInt("1".to_string()),
+ timestamp: SgBigInt("10".to_string()),
+ },
+ sender: SgBytes("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
+ },
+ output_vault_balance_change: balance_change(output_token),
+ order: SgTradeStructPartialOrder {
+ id: SgBytes("0xorder".to_string()),
+ order_hash: SgBytes(
+ "0x0000000000000000000000000000000000000000000000000000000000000001"
+ .to_string(),
+ ),
+ owner: SgBytes("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
+ },
+ input_vault_balance_change: balance_change(input_token),
+ timestamp: SgBigInt("10".to_string()),
+ orderbook: SgOrderbook {
+ id: SgBytes("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()),
+ },
+ }
+ }
+
+ #[test]
+ fn same_token_post_filter_matches_either_input_or_output_token() {
+ let token_a = address!("0x1111111111111111111111111111111111111111");
+ let token_b = address!("0x2222222222222222222222222222222222222222");
+ let token_c = address!("0x3333333333333333333333333333333333333333");
+ let tokens = normalize_trade_tokens(SgTradesTokensFilterArgs {
+ inputs: vec![token_a.to_string(), token_b.to_string()],
+ outputs: vec![token_b.to_string(), token_a.to_string()],
+ });
+
+ assert!(sg_trade_matches_token_filter(
+ &test_sg_trade(token_a, token_c),
+ &tokens
+ ));
+ assert!(sg_trade_matches_token_filter(
+ &test_sg_trade(token_c, token_b),
+ &tokens
+ ));
+ assert!(!sg_trade_matches_token_filter(
+ &test_sg_trade(token_c, token_c),
+ &tokens
+ ));
+ }
+
+ #[test]
+ fn directional_post_filter_requires_matching_input_and_output_tokens() {
+ let input = address!("0x1111111111111111111111111111111111111111");
+ let output = address!("0x2222222222222222222222222222222222222222");
+ let other = address!("0x3333333333333333333333333333333333333333");
+ let tokens = normalize_trade_tokens(SgTradesTokensFilterArgs {
+ inputs: vec![input.to_string()],
+ outputs: vec![output.to_string()],
+ });
+
+ assert!(sg_trade_matches_token_filter(
+ &test_sg_trade(input, output),
+ &tokens
+ ));
+ assert!(!sg_trade_matches_token_filter(
+ &test_sg_trade(input, other),
+ &tokens
+ ));
+ assert!(!sg_trade_matches_token_filter(
+ &test_sg_trade(other, output),
+ &tokens
+ ));
+ }
+}
diff --git a/crates/common/src/raindex_client/trades/mod.rs b/crates/common/src/raindex_client/trades/mod.rs
index ebba11a740..f1f060affa 100644
--- a/crates/common/src/raindex_client/trades/mod.rs
+++ b/crates/common/src/raindex_client/trades/mod.rs
@@ -1,6 +1,9 @@
+mod get_all;
mod get_by_owner;
mod get_by_tx;
+pub use get_all::{GetTradesFilters, GetTradesTokenFilter};
+
use super::local_db::orders::LocalDbOrders;
use super::orders::{OrdersDataSource, SubgraphOrders};
use super::ClientRef;
diff --git a/crates/common/src/utils/mod.rs b/crates/common/src/utils/mod.rs
index 77db7faa28..1dd3ad223b 100644
--- a/crates/common/src/utils/mod.rs
+++ b/crates/common/src/utils/mod.rs
@@ -2,3 +2,4 @@ pub mod amount_formatter;
pub mod float;
pub mod serde;
pub mod timestamp;
+pub mod timing;
diff --git a/crates/common/src/utils/timing.rs b/crates/common/src/utils/timing.rs
new file mode 100644
index 0000000000..d566757155
--- /dev/null
+++ b/crates/common/src/utils/timing.rs
@@ -0,0 +1,37 @@
+#[cfg(target_family = "wasm")]
+use wasm_bindgen_utils::prelude::js_sys::Date;
+
+pub struct Timing {
+ #[cfg(not(target_family = "wasm"))]
+ started_at: std::time::Instant,
+ #[cfg(target_family = "wasm")]
+ started_at_ms: f64,
+}
+
+impl Timing {
+ pub fn now() -> Self {
+ Self {
+ #[cfg(not(target_family = "wasm"))]
+ started_at: std::time::Instant::now(),
+ #[cfg(target_family = "wasm")]
+ started_at_ms: Date::now(),
+ }
+ }
+
+ pub fn elapsed_ms(&self) -> u64 {
+ #[cfg(not(target_family = "wasm"))]
+ {
+ self.started_at.elapsed().as_millis() as u64
+ }
+
+ #[cfg(target_family = "wasm")]
+ {
+ let elapsed = Date::now() - self.started_at_ms;
+ if elapsed.is_finite() && elapsed > 0.0 {
+ elapsed as u64
+ } else {
+ 0
+ }
+ }
+ }
+}
diff --git a/crates/subgraph/src/multi_orderbook_client.rs b/crates/subgraph/src/multi_orderbook_client.rs
index 3c62667ef7..d406ed6023 100644
--- a/crates/subgraph/src/multi_orderbook_client.rs
+++ b/crates/subgraph/src/multi_orderbook_client.rs
@@ -1,7 +1,8 @@
use crate::{
types::common::{
SgErc20WithSubgraphName, SgOrderWithSubgraphName, SgOrdersListFilterArgs,
- SgTradeWithSubgraphName, SgVaultWithSubgraphName, SgVaultsListFilterArgs,
+ SgTradeWithSubgraphName, SgTradesListQueryFilters, SgVaultWithSubgraphName,
+ SgVaultsListFilterArgs,
},
OrderbookSubgraphClient, OrderbookSubgraphClientError, SgPaginationArgs,
};
@@ -21,6 +22,17 @@ impl_wasm_traits!(MultiSubgraphArgs);
pub struct MultiOrderbookSubgraphClient {
subgraphs: Vec,
}
+
+fn sort_trades(trades: &mut [SgTradeWithSubgraphName]) {
+ trades.sort_by(|a, b| {
+ let a_timestamp = a.trade.timestamp.0.parse::().unwrap_or(0);
+ let b_timestamp = b.trade.timestamp.0.parse::().unwrap_or(0);
+ b_timestamp
+ .cmp(&a_timestamp)
+ .then_with(|| a.trade.id.0.cmp(&b.trade.id.0))
+ });
+}
+
impl MultiOrderbookSubgraphClient {
pub fn new(subgraphs: Vec) -> Self {
Self { subgraphs }
@@ -237,6 +249,171 @@ impl MultiOrderbookSubgraphClient {
.collect()
}
+ /// Fetches the general filtered trades list across every configured subgraph.
+ ///
+ /// This backs the SDK-level `RaindexClient.getTrades` API. Order-specific trade
+ /// history still uses `OrderbookSubgraphClient::order_trades_list`.
+ pub async fn trades_list(
+ &self,
+ filters: SgTradesListQueryFilters,
+ pagination_args: SgPaginationArgs,
+ ) -> Result, OrderbookSubgraphClientError> {
+ let futures = self.subgraphs.iter().map(|subgraph| {
+ let url = subgraph.url.clone();
+ let subgraph_name = subgraph.name.clone();
+ let filters = filters.clone();
+ let pagination_args = pagination_args.clone();
+ async move {
+ let client = self.get_orderbook_subgraph_client(url.clone());
+ let result = client
+ .trades_list(filters, pagination_args)
+ .await
+ .map(|trades| {
+ trades
+ .into_iter()
+ .map(|trade| SgTradeWithSubgraphName {
+ trade,
+ subgraph_name: subgraph_name.clone(),
+ })
+ .collect::>()
+ });
+ (subgraph_name, url, result)
+ }
+ });
+
+ let results = join_all(futures).await;
+ let mut all_trades = Vec::new();
+ let mut last_error = None;
+ let mut any_success = false;
+ for (subgraph_name, url, result) in results {
+ match result {
+ Ok(items) => {
+ any_success = true;
+ all_trades.extend(items);
+ }
+ Err(e) => {
+ tracing::warn!(
+ subgraph = %subgraph_name,
+ url = %url,
+ error = %e,
+ "failed to fetch trades from subgraph"
+ );
+ last_error = Some(e);
+ }
+ }
+ }
+ if !any_success {
+ if let Some(e) = last_error {
+ return Err(e);
+ }
+ }
+
+ sort_trades(&mut all_trades);
+ Ok(all_trades)
+ }
+
+ /// Fetches all filtered trades across every configured subgraph.
+ ///
+ /// This is used when callers need to merge and paginate across multiple data
+ /// sources after fetching. Order-specific trade history still uses
+ /// `OrderbookSubgraphClient::order_trades_list`.
+ pub async fn trades_list_all(
+ &self,
+ filters: SgTradesListQueryFilters,
+ ) -> Result, OrderbookSubgraphClientError> {
+ let futures = self.subgraphs.iter().map(|subgraph| {
+ let url = subgraph.url.clone();
+ let subgraph_name = subgraph.name.clone();
+ let filters = filters.clone();
+ async move {
+ let client = self.get_orderbook_subgraph_client(url.clone());
+ let result = client.trades_list_all(filters).await.map(|trades| {
+ trades
+ .into_iter()
+ .map(|trade| SgTradeWithSubgraphName {
+ trade,
+ subgraph_name: subgraph_name.clone(),
+ })
+ .collect::>()
+ });
+ (subgraph_name, url, result)
+ }
+ });
+
+ let results = join_all(futures).await;
+ let mut all_trades = Vec::new();
+ let mut last_error = None;
+ let mut any_success = false;
+ for (subgraph_name, url, result) in results {
+ match result {
+ Ok(items) => {
+ any_success = true;
+ all_trades.extend(items);
+ }
+ Err(e) => {
+ tracing::warn!(
+ subgraph = %subgraph_name,
+ url = %url,
+ error = %e,
+ "failed to fetch all trades from subgraph"
+ );
+ last_error = Some(e);
+ }
+ }
+ }
+ if !any_success {
+ if let Some(e) = last_error {
+ return Err(e);
+ }
+ }
+
+ sort_trades(&mut all_trades);
+ Ok(all_trades)
+ }
+
+ pub async fn trades_count(
+ &self,
+ filters: SgTradesListQueryFilters,
+ ) -> Result {
+ let futures = self.subgraphs.iter().map(|subgraph| {
+ let url = subgraph.url.clone();
+ let subgraph_name = subgraph.name.clone();
+ let filters = filters.clone();
+ async move {
+ let client = self.get_orderbook_subgraph_client(url.clone());
+ (subgraph_name, url, client.trades_count(filters).await)
+ }
+ });
+
+ let results = join_all(futures).await;
+ let mut total: u32 = 0;
+ let mut last_error = None;
+ let mut any_success = false;
+ for (subgraph_name, url, result) in results {
+ match result {
+ Ok(count) => {
+ any_success = true;
+ total += count;
+ }
+ Err(e) => {
+ tracing::warn!(
+ subgraph = %subgraph_name,
+ url = %url,
+ error = %e,
+ "failed to count trades from subgraph"
+ );
+ last_error = Some(e);
+ }
+ }
+ }
+ if !any_success {
+ if let Some(e) = last_error {
+ return Err(e);
+ }
+ }
+ Ok(total)
+ }
+
pub async fn tokens_list(
&self,
) -> Result, OrderbookSubgraphClientError> {
@@ -280,10 +457,12 @@ impl MultiOrderbookSubgraphClient {
mod tests {
use super::*;
use crate::cynic_client::CynicClientError;
+ use crate::orderbook_client::ALL_PAGES_QUERY_PAGE_SIZE;
use crate::types::common::{
SgBigInt, SgBytes, SgErc20, SgOrder, SgOrderbook, SgOrdersListFilterArgs, SgTrade,
SgTradeEvent, SgTradeEventTypename, SgTradeRef, SgTradeStructPartialOrder,
- SgTradeVaultBalanceChange, SgTransaction, SgVault, SgVaultBalanceChangeVault,
+ SgTradeVaultBalanceChange, SgTradesListQueryFilters, SgTransaction, SgVault,
+ SgVaultBalanceChangeVault,
};
use crate::utils::float::*;
use httpmock::prelude::*;
@@ -941,6 +1120,406 @@ mod tests {
}
}
+ fn sample_sg_trade(id: &str, timestamp: &str) -> SgTrade {
+ SgTrade {
+ id: SgBytes(id.to_string()),
+ timestamp: SgBigInt(timestamp.to_string()),
+ ..default_sg_trade()
+ }
+ }
+
+ fn default_trade_filters() -> SgTradesListQueryFilters {
+ SgTradesListQueryFilters::default()
+ }
+
+ #[tokio::test]
+ async fn test_trades_list_no_subgraphs() {
+ let client = MultiOrderbookSubgraphClient::new(vec![]);
+ let trades = client
+ .trades_list(
+ default_trade_filters(),
+ SgPaginationArgs {
+ page: 1,
+ page_size: 10,
+ },
+ )
+ .await
+ .unwrap();
+ assert!(trades.is_empty());
+ }
+
+ #[tokio::test]
+ async fn test_trades_list_multiple_subgraphs_merge() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ let trade_s1 = sample_sg_trade("0xtrade_old", "100");
+ let trade_s2 = sample_sg_trade("0xtrade_new", "200");
+ server1.mock(|when, then| {
+ when.method(POST)
+ .path("/")
+ .body_contains("\"first\":10")
+ .body_contains("\"skip\":0");
+ then.status(200)
+ .json_body(json!({"data": {"trades": [trade_s1]}}));
+ });
+ server2.mock(|when, then| {
+ when.method(POST)
+ .path("/")
+ .body_contains("\"first\":10")
+ .body_contains("\"skip\":0");
+ then.status(200)
+ .json_body(json!({"data": {"trades": [trade_s2]}}));
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two".to_string(),
+ },
+ ]);
+
+ let trades = client
+ .trades_list(
+ default_trade_filters(),
+ SgPaginationArgs {
+ page: 1,
+ page_size: 10,
+ },
+ )
+ .await
+ .unwrap();
+ assert_eq!(trades.len(), 2);
+ let names: std::collections::HashSet<_> = trades
+ .iter()
+ .map(|trade| trade.subgraph_name.as_str())
+ .collect();
+ assert!(names.contains("sg_one"));
+ assert!(names.contains("sg_two"));
+ assert_eq!(trades[0].trade.id.0, "0xtrade_new");
+ assert_eq!(trades[1].trade.id.0, "0xtrade_old");
+ }
+
+ #[tokio::test]
+ async fn test_trades_list_one_subgraph_errors_others_succeed() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ let trade_s1 = default_sg_trade();
+ server1.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(200)
+ .json_body(json!({"data": {"trades": [trade_s1]}}));
+ });
+ server2.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two_err".to_string(),
+ },
+ ]);
+
+ let trades = client
+ .trades_list(
+ default_trade_filters(),
+ SgPaginationArgs {
+ page: 1,
+ page_size: 10,
+ },
+ )
+ .await
+ .unwrap();
+ assert_eq!(trades.len(), 1);
+ assert_eq!(trades[0].subgraph_name, "sg_one");
+ }
+
+ #[tokio::test]
+ async fn test_trades_list_every_subgraph_errors() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ server1.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+ server2.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one_err".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two_err".to_string(),
+ },
+ ]);
+
+ let result = client
+ .trades_list(
+ default_trade_filters(),
+ SgPaginationArgs {
+ page: 1,
+ page_size: 10,
+ },
+ )
+ .await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_trades_list_all_multiple_subgraphs_merge() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ let trade_s1 = sample_sg_trade("0xtrade_old", "100");
+ let trade_s2 = sample_sg_trade("0xtrade_new", "200");
+ server1.mock(|when, then| {
+ when.method(POST)
+ .path("/")
+ .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE))
+ .body_contains("\"skip\":0");
+ then.status(200)
+ .json_body(json!({"data": {"trades": [trade_s1]}}));
+ });
+ server2.mock(|when, then| {
+ when.method(POST)
+ .path("/")
+ .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE))
+ .body_contains("\"skip\":0");
+ then.status(200)
+ .json_body(json!({"data": {"trades": [trade_s2]}}));
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two".to_string(),
+ },
+ ]);
+
+ let trades = client
+ .trades_list_all(default_trade_filters())
+ .await
+ .unwrap();
+ assert_eq!(trades.len(), 2);
+ let names: std::collections::HashSet<_> = trades
+ .iter()
+ .map(|trade| trade.subgraph_name.as_str())
+ .collect();
+ assert!(names.contains("sg_one"));
+ assert!(names.contains("sg_two"));
+ assert_eq!(trades[0].trade.id.0, "0xtrade_new");
+ assert_eq!(trades[1].trade.id.0, "0xtrade_old");
+ }
+
+ #[tokio::test]
+ async fn test_trades_list_all_one_subgraph_errors_others_succeed() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ let trade_s1 = default_sg_trade();
+ server1.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(200)
+ .json_body(json!({"data": {"trades": [trade_s1]}}));
+ });
+ server2.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two_err".to_string(),
+ },
+ ]);
+
+ let trades = client
+ .trades_list_all(default_trade_filters())
+ .await
+ .unwrap();
+ assert_eq!(trades.len(), 1);
+ assert_eq!(trades[0].subgraph_name, "sg_one");
+ }
+
+ #[tokio::test]
+ async fn test_trades_list_all_every_subgraph_errors() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ server1.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+ server2.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one_err".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two_err".to_string(),
+ },
+ ]);
+
+ let result = client.trades_list_all(default_trade_filters()).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_trades_count_no_subgraphs() {
+ let client = MultiOrderbookSubgraphClient::new(vec![]);
+ let count = client.trades_count(default_trade_filters()).await.unwrap();
+ assert_eq!(count, 0);
+ }
+
+ #[tokio::test]
+ async fn test_trades_count_multiple_subgraphs_sum() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ let trades_s1 = vec![default_sg_trade(), default_sg_trade()];
+ let trades_s2 = vec![default_sg_trade(), default_sg_trade(), default_sg_trade()];
+ server1.mock(|when, then| {
+ when.method(POST)
+ .path("/")
+ .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE))
+ .body_contains("\"skip\":0");
+ then.status(200)
+ .json_body(json!({"data": {"trades": trades_s1}}));
+ });
+ server2.mock(|when, then| {
+ when.method(POST)
+ .path("/")
+ .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE))
+ .body_contains("\"skip\":0");
+ then.status(200)
+ .json_body(json!({"data": {"trades": trades_s2}}));
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two".to_string(),
+ },
+ ]);
+
+ let count = client.trades_count(default_trade_filters()).await.unwrap();
+ assert_eq!(count, 5);
+ }
+
+ #[tokio::test]
+ async fn test_trades_count_one_subgraph_errors_others_succeed() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ let trades_s1 = vec![default_sg_trade(), default_sg_trade()];
+ server1.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(200)
+ .json_body(json!({"data": {"trades": trades_s1}}));
+ });
+ server2.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two_err".to_string(),
+ },
+ ]);
+
+ let count = client.trades_count(default_trade_filters()).await.unwrap();
+ assert_eq!(count, 2);
+ }
+
+ #[tokio::test]
+ async fn test_trades_count_all_subgraphs_error() {
+ let server1 = MockServer::start_async().await;
+ let sg1_url = Url::parse(&server1.url("")).unwrap();
+ let server2 = MockServer::start_async().await;
+ let sg2_url = Url::parse(&server2.url("")).unwrap();
+
+ server1.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+ server2.mock(|when, then| {
+ when.method(POST).path("/");
+ then.status(500);
+ });
+
+ let client = MultiOrderbookSubgraphClient::new(vec![
+ MultiSubgraphArgs {
+ url: sg1_url,
+ name: "sg_one_err".to_string(),
+ },
+ MultiSubgraphArgs {
+ url: sg2_url,
+ name: "sg_two_err".to_string(),
+ },
+ ]);
+
+ let result = client.trades_count(default_trade_filters()).await;
+ assert!(result.is_err());
+ }
+
#[tokio::test]
async fn test_trades_by_transaction_no_subgraphs() {
let client = MultiOrderbookSubgraphClient::new(vec![]);
diff --git a/crates/subgraph/src/orderbook_client/mod.rs b/crates/subgraph/src/orderbook_client/mod.rs
index 2a73cadd06..f93fb516a0 100644
--- a/crates/subgraph/src/orderbook_client/mod.rs
+++ b/crates/subgraph/src/orderbook_client/mod.rs
@@ -7,7 +7,7 @@ use crate::types::order::{
SgOrderDetailByHashQueryVariables, SgOrderDetailByIdQuery, SgOrderIdList, SgOrdersListQuery,
};
use crate::types::order_trade::{
- SgOrderTradeDetailQuery, SgOrderTradesListQuery, SgOwnerTradesListQuery,
+ SgOrderTradeDetailQuery, SgOrderTradesListQuery, SgOwnerTradesListQuery, SgTradesListQuery,
SgTransactionTradesQuery,
};
use crate::types::remove_order::{
diff --git a/crates/subgraph/src/orderbook_client/order_trade.rs b/crates/subgraph/src/orderbook_client/order_trade.rs
index 9e1ea41fa9..3adc44c7b0 100644
--- a/crates/subgraph/src/orderbook_client/order_trade.rs
+++ b/crates/subgraph/src/orderbook_client/order_trade.rs
@@ -116,6 +116,81 @@ impl OrderbookSubgraphClient {
Ok(data.trades)
}
+ /// Fetch a single page of trades matching the provided filters.
+ ///
+ /// The filters are passed directly to the subgraph `trades` query and
+ /// pagination is converted with `parse_pagination_args`. Results are ordered
+ /// by timestamp descending by the GraphQL query. Subgraph query errors are
+ /// returned as `OrderbookSubgraphClientError`.
+ pub async fn trades_list(
+ &self,
+ filters: SgTradesListQueryFilters,
+ pagination_args: SgPaginationArgs,
+ ) -> Result, OrderbookSubgraphClientError> {
+ let pagination_variables = Self::parse_pagination_args(pagination_args);
+ let data = self
+ .query::(SgTradesListQueryVariables {
+ first: pagination_variables.first,
+ skip: pagination_variables.skip,
+ filters: Some(filters),
+ })
+ .await?;
+
+ Ok(data.trades)
+ }
+
+ async fn fetch_all_trades_pages(
+ &self,
+ filters: SgTradesListQueryFilters,
+ ) -> Result, OrderbookSubgraphClientError> {
+ let mut all_pages_merged = vec![];
+ let mut page: u16 = 1;
+
+ loop {
+ let page_data = self
+ .trades_list(
+ filters.clone(),
+ SgPaginationArgs {
+ page,
+ page_size: ALL_PAGES_QUERY_PAGE_SIZE,
+ },
+ )
+ .await?;
+ let batch_len = page_data.len();
+ all_pages_merged.extend(page_data);
+ if batch_len < ALL_PAGES_QUERY_PAGE_SIZE as usize {
+ break;
+ }
+ page += 1;
+ }
+
+ Ok(all_pages_merged)
+ }
+
+ /// Fetch all trades matching the provided filters across every page.
+ ///
+ /// This repeatedly calls `trades_list` using the standard all-pages query page
+ /// size until a partial page is returned. Any subgraph query error encountered
+ /// while fetching a page is propagated.
+ pub async fn trades_list_all(
+ &self,
+ filters: SgTradesListQueryFilters,
+ ) -> Result, OrderbookSubgraphClientError> {
+ self.fetch_all_trades_pages(filters).await
+ }
+
+ /// Count all trades matching the provided filters.
+ ///
+ /// This walks every page with `fetch_all_trades_pages` and returns the number
+ /// of matching trades. Any subgraph query error encountered while fetching
+ /// pages is propagated.
+ pub async fn trades_count(
+ &self,
+ filters: SgTradesListQueryFilters,
+ ) -> Result {
+ Ok(self.fetch_all_trades_pages(filters).await?.len() as u32)
+ }
+
pub async fn trades_by_owner_all(
&self,
owner: String,
diff --git a/crates/subgraph/src/types/common.rs b/crates/subgraph/src/types/common.rs
index ae4fbd5c67..865592c4d2 100644
--- a/crates/subgraph/src/types/common.rs
+++ b/crates/subgraph/src/types/common.rs
@@ -10,6 +10,14 @@ pub struct SgOrdersTokensFilterArgs {
}
impl_wasm_traits!(SgOrdersTokensFilterArgs);
+#[derive(Debug, Clone, Serialize, Deserialize, Tsify, Default)]
+#[serde(rename_all = "camelCase")]
+pub struct SgTradesTokensFilterArgs {
+ pub inputs: Vec,
+ pub outputs: Vec,
+}
+impl_wasm_traits!(SgTradesTokensFilterArgs);
+
#[derive(cynic::QueryVariables, Debug, Clone, Tsify)]
pub struct SgIdQueryVariables<'a> {
#[cfg_attr(target_family = "wasm", tsify(type = "string"))]
@@ -129,6 +137,72 @@ pub struct SgOwnerTradesQueryVariables {
pub orderbook_in: Option>,
}
+#[derive(cynic::QueryVariables, Debug, Clone, Tsify)]
+pub struct SgTradesListQueryVariables {
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub first: Option,
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub skip: Option,
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub filters: Option,
+}
+
+#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)]
+#[cynic(graphql_type = "Order_filter")]
+pub struct SgTradeOrderFilter {
+ #[cynic(rename = "owner_in", skip_serializing_if = "Vec::is_empty")]
+ pub owner_in: Vec,
+ #[cynic(rename = "orderHash", skip_serializing_if = "Option::is_none")]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub order_hash: Option,
+}
+
+#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)]
+#[cynic(graphql_type = "Vault_filter")]
+pub struct SgTradeVaultTokenFilter {
+ #[cynic(rename = "token_in", skip_serializing_if = "Vec::is_empty")]
+ pub token_in: Vec,
+}
+
+#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)]
+#[cynic(graphql_type = "TradeVaultBalanceChange_filter")]
+pub struct SgTradeVaultBalanceChangeTokenFilter {
+ #[cynic(rename = "vault_", skip_serializing_if = "Option::is_none")]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub vault_: Option,
+}
+
+#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)]
+#[cynic(graphql_type = "Trade_filter")]
+pub struct SgTradesListQueryFilters {
+ #[cynic(rename = "order_", skip_serializing_if = "Option::is_none")]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub order_: Option,
+ #[cynic(rename = "timestamp_gte", skip_serializing_if = "Option::is_none")]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub timestamp_gte: Option,
+ #[cynic(rename = "timestamp_lte", skip_serializing_if = "Option::is_none")]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub timestamp_lte: Option,
+ #[cynic(rename = "orderbook_in", skip_serializing_if = "Vec::is_empty")]
+ pub orderbook_in: Vec,
+ #[cynic(
+ rename = "inputVaultBalanceChange_",
+ skip_serializing_if = "Option::is_none"
+ )]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub input_vault_balance_change_: Option,
+ #[cynic(
+ rename = "outputVaultBalanceChange_",
+ skip_serializing_if = "Option::is_none"
+ )]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub output_vault_balance_change_: Option,
+ #[cynic(rename = "or", skip_serializing_if = "Option::is_none")]
+ #[cfg_attr(target_family = "wasm", tsify(optional))]
+ pub or: Option>,
+}
+
#[derive(cynic::QueryFragment, Debug, Serialize, Clone, Tsify)]
#[cynic(graphql_type = "Orderbook")]
pub struct SgOrderbook {
diff --git a/crates/subgraph/src/types/order_trade.rs b/crates/subgraph/src/types/order_trade.rs
index c5674c896a..691fbc8de5 100644
--- a/crates/subgraph/src/types/order_trade.rs
+++ b/crates/subgraph/src/types/order_trade.rs
@@ -53,6 +53,20 @@ pub struct SgOwnerTradesListQuery {
pub trades: Vec,
}
+#[derive(cynic::QueryFragment, Debug, Clone, Serialize)]
+#[cynic(graphql_type = "Query", variables = "SgTradesListQueryVariables")]
+#[cfg_attr(target_family = "wasm", derive(Tsify))]
+pub struct SgTradesListQuery {
+ #[arguments(
+ skip: $skip,
+ first: $first,
+ orderBy: "timestamp",
+ orderDirection: "desc",
+ where: $filters
+ )]
+ pub trades: Vec,
+}
+
#[derive(cynic::QueryFragment, Debug, Clone, Serialize)]
#[cynic(
graphql_type = "Query",