diff --git a/crates/common/src/local_db/query/fetch_trades/mod.rs b/crates/common/src/local_db/query/fetch_trades/mod.rs new file mode 100644 index 0000000000..0d23d8eece --- /dev/null +++ b/crates/common/src/local_db/query/fetch_trades/mod.rs @@ -0,0 +1,383 @@ +use crate::local_db::query::fetch_order_trades_count::LocalDbTradeCountRow; +use crate::local_db::query::{SqlBuildError, SqlStatement, SqlValue}; +use crate::raindex_client::{PaginationParams, TimeFilter}; +use alloy::primitives::{Address, B256}; +use std::convert::TryFrom; + +const QUERY_TEMPLATE: &str = include_str!("query.sql"); + +const TAKE_ORDERS_CHAIN_IDS_CLAUSE: &str = "/*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/"; +const TAKE_ORDERS_CHAIN_IDS_CLAUSE_BODY: &str = "AND t.chain_id IN ({list})"; +const TAKE_ORDERS_ORDERBOOKS_CLAUSE: &str = "/*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/"; +const TAKE_ORDERS_ORDERBOOKS_CLAUSE_BODY: &str = "AND t.orderbook_address IN ({list})"; + +const CLEAR_EVENTS_CHAIN_IDS_CLAUSE: &str = "/*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/"; +const CLEAR_EVENTS_CHAIN_IDS_CLAUSE_BODY: &str = "AND c.chain_id IN ({list})"; +const CLEAR_EVENTS_ORDERBOOKS_CLAUSE: &str = "/*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/"; +const CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY: &str = "AND c.orderbook_address IN ({list})"; +const OWNERS_CLAUSE: &str = "/*OWNERS_CLAUSE*/"; +const OWNERS_CLAUSE_BODY: &str = "AND tws.order_owner IN ({list})"; +const ORDER_HASH_CLAUSE: &str = "/*ORDER_HASH_CLAUSE*/"; +const ORDER_HASH_CLAUSE_BODY: &str = "AND tws.order_hash = {param}"; +const START_TS_CLAUSE: &str = "/*START_TS_CLAUSE*/"; +const START_TS_BODY: &str = "AND tws.block_timestamp >= {param}"; +const END_TS_CLAUSE: &str = "/*END_TS_CLAUSE*/"; +const END_TS_BODY: &str = "AND tws.block_timestamp <= {param}"; +const PAGINATION_CLAUSE: &str = "/*PAGINATION_CLAUSE*/"; +const INPUT_TOKENS_CLAUSE: &str = "/*INPUT_TOKENS_CLAUSE*/"; +const INPUT_TOKENS_CLAUSE_BODY: &str = "AND tws.input_token IN ({list})"; +const OUTPUT_TOKENS_CLAUSE: &str = "/*OUTPUT_TOKENS_CLAUSE*/"; +const OUTPUT_TOKENS_CLAUSE_BODY: &str = "AND tws.output_token IN ({list})"; +const COMBINED_TOKENS_CLAUSE_BODY: &str = + "AND (tws.input_token IN ({input_list}) OR tws.output_token IN ({output_list}))"; + +#[derive(Debug, Clone, Default)] +pub struct FetchTradesTokensFilter { + pub inputs: Vec
, + pub outputs: Vec
, +} + +#[derive(Debug, Clone, Default)] +pub struct FetchTradesArgs { + pub chain_ids: Vec, + pub orderbook_addresses: Vec
, + pub owners: Vec
, + pub order_hash: Option, + pub tokens: FetchTradesTokensFilter, + pub time_filter: TimeFilter, + pub pagination: PaginationParams, +} + +pub fn build_fetch_trades_stmt(args: &FetchTradesArgs) -> Result { + let mut stmt = SqlStatement::new(QUERY_TEMPLATE); + + let mut chain_ids = args.chain_ids.clone(); + chain_ids.sort_unstable(); + chain_ids.dedup(); + + let mut orderbooks = args.orderbook_addresses.clone(); + orderbooks.sort(); + orderbooks.dedup(); + + let chain_ids_iter = || chain_ids.iter().cloned().map(SqlValue::from); + let orderbooks_iter = || orderbooks.iter().cloned().map(SqlValue::from); + + stmt.bind_list_clause( + TAKE_ORDERS_CHAIN_IDS_CLAUSE, + TAKE_ORDERS_CHAIN_IDS_CLAUSE_BODY, + chain_ids_iter(), + )?; + stmt.bind_list_clause( + CLEAR_EVENTS_CHAIN_IDS_CLAUSE, + CLEAR_EVENTS_CHAIN_IDS_CLAUSE_BODY, + chain_ids_iter(), + )?; + stmt.bind_list_clause( + TAKE_ORDERS_ORDERBOOKS_CLAUSE, + TAKE_ORDERS_ORDERBOOKS_CLAUSE_BODY, + orderbooks_iter(), + )?; + stmt.bind_list_clause( + CLEAR_EVENTS_ORDERBOOKS_CLAUSE, + CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY, + orderbooks_iter(), + )?; + let mut owners = args.owners.clone(); + owners.sort(); + owners.dedup(); + stmt.bind_list_clause( + OWNERS_CLAUSE, + OWNERS_CLAUSE_BODY, + owners.into_iter().map(SqlValue::from), + )?; + stmt.bind_param_clause( + ORDER_HASH_CLAUSE, + ORDER_HASH_CLAUSE_BODY, + args.order_hash.map(SqlValue::from), + )?; + + if let (Some(start), Some(end)) = (args.time_filter.start, args.time_filter.end) { + if start > end { + return Err(SqlBuildError::new("start_timestamp > end_timestamp")); + } + } + let start_param = args + .time_filter + .start + .map(|v| { + i64::try_from(v).map(SqlValue::I64).map_err(|e| { + SqlBuildError::new(format!( + "start_timestamp out of range for i64: {} ({})", + v, e + )) + }) + }) + .transpose()?; + stmt.bind_param_clause(START_TS_CLAUSE, START_TS_BODY, start_param)?; + + let end_param = args + .time_filter + .end + .map(|v| { + i64::try_from(v).map(SqlValue::I64).map_err(|e| { + SqlBuildError::new(format!("end_timestamp out of range for i64: {} ({})", v, e)) + }) + }) + .transpose()?; + stmt.bind_param_clause(END_TS_CLAUSE, END_TS_BODY, end_param)?; + bind_token_filters(&mut stmt, &args.tokens)?; + if let Some(page) = args.pagination.page { + let page_size = args.pagination.page_size.unwrap_or(100); + let offset = (page.saturating_sub(1) as u64) * (page_size as u64); + let limit_placeholder = format!("?{}", stmt.params.len() + 1); + let offset_placeholder = format!("?{}", stmt.params.len() + 2); + let pagination = format!("LIMIT {} OFFSET {}", limit_placeholder, offset_placeholder); + stmt.sql = stmt.sql.replace(PAGINATION_CLAUSE, &pagination); + stmt.push(SqlValue::U64(page_size as u64)); + stmt.push(SqlValue::U64(offset)); + } else { + stmt.sql = stmt.sql.replace(PAGINATION_CLAUSE, ""); + } + Ok(stmt) +} + +pub fn build_fetch_trades_count_stmt( + args: &FetchTradesArgs, +) -> Result { + let mut args = args.clone(); + args.pagination = PaginationParams::default(); + let stmt = build_fetch_trades_stmt(&args)?; + let inner_sql = stmt.sql.trim().trim_end_matches(';').trim(); + Ok(SqlStatement { + sql: format!( + "SELECT COUNT(*) AS trade_count FROM ({}) AS filtered_trades", + inner_sql + ), + params: stmt.params, + }) +} + +pub fn extract_trades_count(rows: &[LocalDbTradeCountRow]) -> u64 { + rows.first().map(|row| row.trade_count).unwrap_or(0) +} + +fn bind_token_filters( + stmt: &mut SqlStatement, + tokens: &FetchTradesTokensFilter, +) -> Result<(), SqlBuildError> { + let mut input_tokens = tokens.inputs.clone(); + input_tokens.sort(); + input_tokens.dedup(); + + let mut output_tokens = tokens.outputs.clone(); + output_tokens.sort(); + output_tokens.dedup(); + + let has_inputs = !input_tokens.is_empty(); + let has_outputs = !output_tokens.is_empty(); + + if has_inputs && has_outputs && input_tokens == output_tokens { + let input_placeholders: Vec = input_tokens + .iter() + .enumerate() + .map(|(i, _)| format!("?{}", stmt.params.len() + i + 1)) + .collect(); + let input_list = input_placeholders.join(", "); + for token in &input_tokens { + stmt.push(SqlValue::from(*token)); + } + + let output_placeholders: Vec = output_tokens + .iter() + .enumerate() + .map(|(i, _)| format!("?{}", stmt.params.len() + i + 1)) + .collect(); + let output_list = output_placeholders.join(", "); + for token in &output_tokens { + stmt.push(SqlValue::from(*token)); + } + + let clause = COMBINED_TOKENS_CLAUSE_BODY + .replace("{input_list}", &input_list) + .replace("{output_list}", &output_list); + stmt.replace(INPUT_TOKENS_CLAUSE, &clause)?; + stmt.replace(OUTPUT_TOKENS_CLAUSE, "")?; + } else { + stmt.bind_list_clause( + INPUT_TOKENS_CLAUSE, + INPUT_TOKENS_CLAUSE_BODY, + input_tokens.into_iter().map(SqlValue::from), + )?; + stmt.bind_list_clause( + OUTPUT_TOKENS_CLAUSE, + OUTPUT_TOKENS_CLAUSE_BODY, + output_tokens.into_iter().map(SqlValue::from), + )?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::{hex, primitives::address}; + + #[test] + fn builds_with_chain_ids() { + let stmt = build_fetch_trades_stmt(&FetchTradesArgs { + chain_ids: vec![137, 1, 137], + orderbook_addresses: vec![], + ..Default::default() + }) + .unwrap(); + assert_eq!(stmt.params.len(), 4); + assert_eq!(stmt.params[0], SqlValue::U64(1)); + assert_eq!(stmt.params[1], SqlValue::U64(137)); + assert_eq!(stmt.params[2], SqlValue::U64(1)); + assert_eq!(stmt.params[3], SqlValue::U64(137)); + assert!(stmt.sql.contains("t.chain_id IN (?1, ?2)")); + assert!(stmt.sql.contains("c.chain_id IN (?3, ?4)")); + assert!(!stmt.sql.contains(TAKE_ORDERS_CHAIN_IDS_CLAUSE)); + assert!(!stmt.sql.contains(CLEAR_EVENTS_CHAIN_IDS_CLAUSE)); + } + + #[test] + fn builds_with_orderbook_address_filters() { + let ob = address!("0x2f209e5b67a33b8fe96e28f24628df6da301c8eb"); + let stmt = build_fetch_trades_stmt(&FetchTradesArgs { + chain_ids: vec![137], + orderbook_addresses: vec![ob], + ..Default::default() + }) + .unwrap(); + assert_eq!(stmt.params.len(), 4); + assert_eq!(stmt.params[0], SqlValue::U64(137)); + assert_eq!(stmt.params[1], SqlValue::U64(137)); + assert_eq!(stmt.params[2], SqlValue::Text(hex::encode_prefixed(ob))); + assert_eq!(stmt.params[3], SqlValue::Text(hex::encode_prefixed(ob))); + assert!(stmt.sql.contains("t.orderbook_address IN (?3)")); + assert!(stmt.sql.contains("c.orderbook_address IN (?4)")); + assert!(!stmt.sql.contains(TAKE_ORDERS_ORDERBOOKS_CLAUSE)); + assert!(!stmt.sql.contains(CLEAR_EVENTS_ORDERBOOKS_CLAUSE)); + } + + #[test] + fn builds_with_directional_token_filters() { + let input = address!("0x1111111111111111111111111111111111111111"); + let output = address!("0x2222222222222222222222222222222222222222"); + let stmt = build_fetch_trades_stmt(&FetchTradesArgs { + tokens: FetchTradesTokensFilter { + inputs: vec![input], + outputs: vec![output], + }, + ..Default::default() + }) + .unwrap(); + + assert!(stmt.sql.contains("tws.input_token IN (?1)")); + assert!(stmt.sql.contains("tws.output_token IN (?2)")); + assert_eq!(stmt.params[0], SqlValue::Text(hex::encode_prefixed(input))); + assert_eq!(stmt.params[1], SqlValue::Text(hex::encode_prefixed(output))); + } + + #[test] + fn builds_with_same_token_as_either_side_filter() { + let token = address!("0x1111111111111111111111111111111111111111"); + let stmt = build_fetch_trades_stmt(&FetchTradesArgs { + tokens: FetchTradesTokensFilter { + inputs: vec![token], + outputs: vec![token], + }, + ..Default::default() + }) + .unwrap(); + + assert!(stmt + .sql + .contains("tws.input_token IN (?1) OR tws.output_token IN (?2)")); + assert!(!stmt.sql.contains(INPUT_TOKENS_CLAUSE)); + assert!(!stmt.sql.contains(OUTPUT_TOKENS_CLAUSE)); + assert_eq!(stmt.params[0], SqlValue::Text(hex::encode_prefixed(token))); + assert_eq!(stmt.params[1], SqlValue::Text(hex::encode_prefixed(token))); + } + + #[test] + fn builds_with_pagination() { + let stmt = build_fetch_trades_stmt(&FetchTradesArgs { + pagination: PaginationParams { + page: Some(2), + page_size: Some(50), + }, + ..Default::default() + }) + .unwrap(); + + assert!(stmt.sql.contains("LIMIT ?1 OFFSET ?2")); + assert!(!stmt.sql.contains(PAGINATION_CLAUSE)); + assert_eq!(stmt.params, vec![SqlValue::U64(50), SqlValue::U64(50)]); + } + + #[test] + fn disambiguates_clear_sides_in_trade_id_and_sort_order() { + let stmt = build_fetch_trades_stmt(&FetchTradesArgs::default()).unwrap(); + + assert!(stmt.sql.contains("'alice' AS trade_side")); + assert!(stmt.sql.contains("'bob' AS trade_side")); + assert!(stmt.sql.contains("WHEN 'alice' THEN '01'")); + assert!(stmt.sql.contains("WHEN 'bob' THEN '02'")); + assert!(stmt.sql.contains("tws.trade_kind, tws.trade_side\n")); + } + + #[test] + fn builds_count_query_without_pagination() { + let stmt = build_fetch_trades_count_stmt(&FetchTradesArgs { + pagination: PaginationParams { + page: Some(2), + page_size: Some(50), + }, + ..Default::default() + }) + .unwrap(); + + assert!(stmt + .sql + .starts_with("SELECT COUNT(*) AS trade_count FROM (")); + assert!(!stmt.sql.contains("LIMIT")); + assert!(!stmt.sql.contains(PAGINATION_CLAUSE)); + } + + #[cfg(not(target_family = "wasm"))] + #[test] + fn builds_token_filtered_count_query_without_inner_trailing_semicolon() { + let token = address!("0x1111111111111111111111111111111111111111"); + let stmt = build_fetch_trades_count_stmt(&FetchTradesArgs { + tokens: FetchTradesTokensFilter { + inputs: vec![token], + outputs: vec![token], + }, + ..Default::default() + }) + .unwrap(); + + assert!(!stmt.sql.contains(";\n) AS filtered_trades")); + assert!(!stmt.sql.contains(";) AS filtered_trades")); + assert!(stmt + .sql + .contains("tws.input_token IN (?1) OR tws.output_token IN (?2)")); + + let conn = rusqlite::Connection::open_in_memory().unwrap(); + crate::local_db::functions::register_all(&conn).unwrap(); + conn.execute_batch(crate::local_db::query::create_tables::create_tables_sql()) + .unwrap(); + conn.prepare(&stmt.sql).unwrap(); + } + + #[test] + fn extract_trades_count_works() { + let rows = vec![LocalDbTradeCountRow { trade_count: 42 }]; + assert_eq!(extract_trades_count(&rows), 42); + let empty: Vec = vec![]; + assert_eq!(extract_trades_count(&empty), 0); + } +} diff --git a/crates/common/src/local_db/query/fetch_trades/query.sql b/crates/common/src/local_db/query/fetch_trades/query.sql new file mode 100644 index 0000000000..2a14cd2f8d --- /dev/null +++ b/crates/common/src/local_db/query/fetch_trades/query.sql @@ -0,0 +1,380 @@ +WITH +matching_take_orders AS ( + SELECT + 'take' AS trade_kind, + t.chain_id, + t.orderbook_address, + t.order_owner, + t.order_nonce, + t.transaction_hash, + t.log_index, + t.block_number, + t.block_timestamp, + t.sender AS transaction_sender, + t.input_io_index, + t.output_io_index, + t.taker_output AS input_delta, + FLOAT_NEGATE(t.taker_input) AS output_delta + FROM take_orders t + WHERE 1 = 1 + /*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/ + /*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/ +), +matching_clears AS ( + SELECT + c.chain_id, + c.orderbook_address, + c.transaction_hash, + c.log_index, + c.block_number, + c.block_timestamp, + c.sender, + c.alice_order_hash, + c.bob_order_hash, + c.alice_input_io_index, + c.alice_output_io_index, + c.alice_input_vault_id, + c.alice_output_vault_id, + c.bob_input_io_index, + c.bob_output_io_index, + c.bob_input_vault_id, + c.bob_output_vault_id + FROM clear_v3_events c + WHERE 1 = 1 + /*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/ + /*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/ +), +take_trades AS ( + SELECT + mt.trade_kind, + 'take' AS trade_side, + mt.chain_id, + mt.orderbook_address, + oe.order_hash, + mt.order_owner, + mt.order_nonce, + mt.transaction_hash, + mt.log_index, + mt.block_number, + mt.block_timestamp, + mt.transaction_sender, + io_in.vault_id AS input_vault_id, + io_in.token AS input_token, + mt.input_delta, + io_out.vault_id AS output_vault_id, + io_out.token AS output_token, + mt.output_delta + FROM matching_take_orders mt + JOIN order_events oe + ON oe.chain_id = mt.chain_id + AND oe.orderbook_address = mt.orderbook_address + AND oe.order_owner = mt.order_owner + AND oe.order_nonce = mt.order_nonce + AND oe.event_type = 'AddOrderV3' + AND ( + oe.block_number < mt.block_number + OR (oe.block_number = mt.block_number AND oe.log_index <= mt.log_index) + ) + AND NOT EXISTS ( + SELECT 1 + FROM order_events newer + WHERE newer.chain_id = oe.chain_id + AND newer.orderbook_address = oe.orderbook_address + AND newer.order_owner = oe.order_owner + AND newer.order_nonce = oe.order_nonce + AND newer.event_type = 'AddOrderV3' + AND ( + newer.block_number < mt.block_number + OR (newer.block_number = mt.block_number AND newer.log_index <= mt.log_index) + ) + AND ( + newer.block_number > oe.block_number + OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index) + ) + ) + JOIN order_ios io_in + ON io_in.chain_id = oe.chain_id + AND io_in.orderbook_address = oe.orderbook_address + AND io_in.transaction_hash = oe.transaction_hash + AND io_in.log_index = oe.log_index + AND io_in.io_index = mt.input_io_index + AND io_in.io_type = 'input' + JOIN order_ios io_out + ON io_out.chain_id = oe.chain_id + AND io_out.orderbook_address = oe.orderbook_address + AND io_out.transaction_hash = oe.transaction_hash + AND io_out.log_index = oe.log_index + AND io_out.io_index = mt.output_io_index + AND io_out.io_type = 'output' +), +clear_alice AS ( + SELECT DISTINCT + 'clear' AS trade_kind, + 'alice' AS trade_side, + mc.chain_id, + mc.orderbook_address, + oe.order_hash, + oe.order_owner, + oe.order_nonce, + mc.transaction_hash, + mc.log_index, + mc.block_number, + mc.block_timestamp, + mc.sender AS transaction_sender, + mc.alice_input_vault_id AS input_vault_id, + io_in.token AS input_token, + a.alice_input AS input_delta, + mc.alice_output_vault_id AS output_vault_id, + io_out.token AS output_token, + FLOAT_NEGATE(a.alice_output) AS output_delta + FROM matching_clears mc + JOIN order_events oe + ON oe.chain_id = mc.chain_id + AND oe.orderbook_address = mc.orderbook_address + AND oe.order_hash = mc.alice_order_hash + AND oe.event_type = 'AddOrderV3' + AND ( + oe.block_number < mc.block_number + OR (oe.block_number = mc.block_number AND oe.log_index <= mc.log_index) + ) + AND NOT EXISTS ( + SELECT 1 + FROM order_events newer + WHERE newer.chain_id = oe.chain_id + AND newer.orderbook_address = oe.orderbook_address + AND newer.order_hash = oe.order_hash + AND newer.event_type = 'AddOrderV3' + AND ( + newer.block_number < mc.block_number + OR (newer.block_number = mc.block_number AND newer.log_index <= mc.log_index) + ) + AND ( + newer.block_number > oe.block_number + OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index) + ) + ) + JOIN after_clear_v2_events a + ON a.chain_id = mc.chain_id + AND a.orderbook_address = mc.orderbook_address + AND a.transaction_hash = mc.transaction_hash + AND a.log_index = ( + SELECT MIN(ac.log_index) + FROM after_clear_v2_events ac + WHERE ac.chain_id = mc.chain_id + AND ac.orderbook_address = mc.orderbook_address + AND ac.transaction_hash = mc.transaction_hash + AND ac.log_index > mc.log_index + ) + JOIN order_ios io_in + ON io_in.chain_id = oe.chain_id + AND io_in.orderbook_address = oe.orderbook_address + AND io_in.transaction_hash = oe.transaction_hash + AND io_in.log_index = oe.log_index + AND io_in.io_index = mc.alice_input_io_index + AND io_in.io_type = 'input' + JOIN order_ios io_out + ON io_out.chain_id = oe.chain_id + AND io_out.orderbook_address = oe.orderbook_address + AND io_out.transaction_hash = oe.transaction_hash + AND io_out.log_index = oe.log_index + AND io_out.io_index = mc.alice_output_io_index + AND io_out.io_type = 'output' +), +clear_bob AS ( + SELECT DISTINCT + 'clear' AS trade_kind, + 'bob' AS trade_side, + mc.chain_id, + mc.orderbook_address, + oe.order_hash, + oe.order_owner, + oe.order_nonce, + mc.transaction_hash, + mc.log_index, + mc.block_number, + mc.block_timestamp, + mc.sender AS transaction_sender, + mc.bob_input_vault_id AS input_vault_id, + io_in.token AS input_token, + a.bob_input AS input_delta, + mc.bob_output_vault_id AS output_vault_id, + io_out.token AS output_token, + FLOAT_NEGATE(a.bob_output) AS output_delta + FROM matching_clears mc + JOIN order_events oe + ON oe.chain_id = mc.chain_id + AND oe.orderbook_address = mc.orderbook_address + AND oe.order_hash = mc.bob_order_hash + AND oe.event_type = 'AddOrderV3' + AND ( + oe.block_number < mc.block_number + OR (oe.block_number = mc.block_number AND oe.log_index <= mc.log_index) + ) + AND NOT EXISTS ( + SELECT 1 + FROM order_events newer + WHERE newer.chain_id = oe.chain_id + AND newer.orderbook_address = oe.orderbook_address + AND newer.order_hash = oe.order_hash + AND newer.event_type = 'AddOrderV3' + AND ( + newer.block_number < mc.block_number + OR (newer.block_number = mc.block_number AND newer.log_index <= mc.log_index) + ) + AND ( + newer.block_number > oe.block_number + OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index) + ) + ) + JOIN after_clear_v2_events a + ON a.chain_id = mc.chain_id + AND a.orderbook_address = mc.orderbook_address + AND a.transaction_hash = mc.transaction_hash + AND a.log_index = ( + SELECT MIN(ac.log_index) + FROM after_clear_v2_events ac + WHERE ac.chain_id = mc.chain_id + AND ac.orderbook_address = mc.orderbook_address + AND ac.transaction_hash = mc.transaction_hash + AND ac.log_index > mc.log_index + ) + JOIN order_ios io_in + ON io_in.chain_id = oe.chain_id + AND io_in.orderbook_address = oe.orderbook_address + AND io_in.transaction_hash = oe.transaction_hash + AND io_in.log_index = oe.log_index + AND io_in.io_index = mc.bob_input_io_index + AND io_in.io_type = 'input' + JOIN order_ios io_out + ON io_out.chain_id = oe.chain_id + AND io_out.orderbook_address = oe.orderbook_address + AND io_out.transaction_hash = oe.transaction_hash + AND io_out.log_index = oe.log_index + AND io_out.io_index = mc.bob_output_io_index + AND io_out.io_type = 'output' +), +clear_trades AS ( + SELECT * FROM clear_alice + UNION ALL + SELECT * FROM clear_bob +), +unioned_trades AS ( + SELECT * FROM take_trades + UNION ALL + SELECT * FROM clear_trades +), +trade_rows AS ( + SELECT + ut.trade_kind, + ut.trade_side, + ut.chain_id, + ut.orderbook_address, + ut.order_hash, + ut.order_owner, + ut.order_nonce, + ut.transaction_hash, + ut.log_index, + ut.block_number, + ut.block_timestamp, + ut.transaction_sender, + ut.input_vault_id, + ut.input_token, + ut.input_delta, + ut.output_vault_id, + ut.output_token, + ut.output_delta + FROM unioned_trades ut +), +trade_with_snapshots AS ( + SELECT + tr.*, + mvb_in.balance AS input_base_balance, + mvb_in.last_block AS input_base_block, + mvb_in.last_log_index AS input_base_log_index, + mvb_out.balance AS output_base_balance, + mvb_out.last_block AS output_base_block, + mvb_out.last_log_index AS output_base_log_index + FROM trade_rows tr + LEFT JOIN running_vault_balances mvb_in + ON mvb_in.chain_id = tr.chain_id + AND mvb_in.orderbook_address = tr.orderbook_address + AND mvb_in.owner = tr.order_owner + AND mvb_in.token = tr.input_token + AND mvb_in.vault_id = tr.input_vault_id + LEFT JOIN running_vault_balances mvb_out + ON mvb_out.chain_id = tr.chain_id + AND mvb_out.orderbook_address = tr.orderbook_address + AND mvb_out.owner = tr.order_owner + AND mvb_out.token = tr.output_token + AND mvb_out.vault_id = tr.output_vault_id +) +SELECT + tws.chain_id, + tws.trade_kind, + tws.orderbook_address AS orderbook, + tws.order_hash, + tws.order_owner, + tws.order_nonce, + tws.transaction_hash, + tws.log_index, + tws.block_number, + tws.block_timestamp, + tws.transaction_sender, + tws.input_vault_id, + tws.input_token, + tok_in.name AS input_token_name, + tok_in.symbol AS input_token_symbol, + tok_in.decimals AS input_token_decimals, + tws.input_delta, + vbc_input.running_balance AS input_running_balance, + tws.output_vault_id, + tws.output_token, + tok_out.name AS output_token_name, + tok_out.symbol AS output_token_symbol, + tok_out.decimals AS output_token_decimals, + tws.output_delta, + vbc_output.running_balance AS output_running_balance, + ( + '0x' || + lower(replace(tws.transaction_hash, '0x', '')) || + printf('%016x', tws.log_index) || + CASE tws.trade_side + WHEN 'alice' THEN '01' + WHEN 'bob' THEN '02' + ELSE '' + END + ) AS trade_id +FROM trade_with_snapshots tws +LEFT JOIN vault_balance_changes vbc_input + ON vbc_input.chain_id = tws.chain_id + AND vbc_input.orderbook_address = tws.orderbook_address + AND vbc_input.owner = tws.order_owner + AND vbc_input.token = tws.input_token + AND vbc_input.vault_id = tws.input_vault_id + AND vbc_input.block_number = tws.block_number + AND vbc_input.log_index = tws.log_index +LEFT JOIN vault_balance_changes vbc_output + ON vbc_output.chain_id = tws.chain_id + AND vbc_output.orderbook_address = tws.orderbook_address + AND vbc_output.owner = tws.order_owner + AND vbc_output.token = tws.output_token + AND vbc_output.vault_id = tws.output_vault_id + AND vbc_output.block_number = tws.block_number + AND vbc_output.log_index = tws.log_index +LEFT JOIN erc20_tokens tok_in + ON tok_in.chain_id = tws.chain_id + AND tok_in.orderbook_address = tws.orderbook_address + AND tok_in.token_address = tws.input_token +LEFT JOIN erc20_tokens tok_out + ON tok_out.chain_id = tws.chain_id + AND tok_out.orderbook_address = tws.orderbook_address + AND tok_out.token_address = tws.output_token +WHERE 1 = 1 +/*OWNERS_CLAUSE*/ +/*ORDER_HASH_CLAUSE*/ +/*START_TS_CLAUSE*/ +/*END_TS_CLAUSE*/ +/*INPUT_TOKENS_CLAUSE*/ +/*OUTPUT_TOKENS_CLAUSE*/ +ORDER BY tws.block_timestamp DESC, tws.block_number DESC, tws.log_index DESC, tws.trade_kind, tws.trade_side +/*PAGINATION_CLAUSE*/; diff --git a/crates/common/src/local_db/query/mod.rs b/crates/common/src/local_db/query/mod.rs index f510f14e55..a6d51de9f5 100644 --- a/crates/common/src/local_db/query/mod.rs +++ b/crates/common/src/local_db/query/mod.rs @@ -19,6 +19,7 @@ pub mod fetch_owner_trades_count; pub mod fetch_store_addresses; pub mod fetch_tables; pub mod fetch_target_watermark; +pub mod fetch_trades; pub mod fetch_trades_by_tx; pub mod fetch_transaction_by_hash; pub mod fetch_vault_balance_changes; diff --git a/crates/common/src/raindex_client/local_db/query/fetch_trades.rs b/crates/common/src/raindex_client/local_db/query/fetch_trades.rs new file mode 100644 index 0000000000..8fb2b89adc --- /dev/null +++ b/crates/common/src/raindex_client/local_db/query/fetch_trades.rs @@ -0,0 +1,57 @@ +use crate::local_db::query::fetch_order_trades::LocalDbOrderTrade; +use crate::local_db::query::fetch_order_trades_count::LocalDbTradeCountRow; +use crate::local_db::query::fetch_trades::{ + build_fetch_trades_count_stmt, build_fetch_trades_stmt, FetchTradesArgs, +}; +use crate::local_db::query::{LocalDbQueryError, LocalDbQueryExecutor}; + +pub async fn fetch_trades( + exec: &E, + args: FetchTradesArgs, +) -> Result, LocalDbQueryError> { + let stmt = build_fetch_trades_stmt(&args)?; + exec.query_json(&stmt).await +} + +pub async fn fetch_trades_count( + exec: &E, + args: FetchTradesArgs, +) -> Result, LocalDbQueryError> { + let stmt = build_fetch_trades_count_stmt(&args)?; + exec.query_json(&stmt).await +} + +#[cfg(all(test, target_family = "wasm"))] +mod wasm_tests { + use super::*; + use crate::raindex_client::local_db::executor::tests::create_sql_capturing_callback; + use crate::raindex_client::local_db::executor::JsCallbackExecutor; + use std::cell::RefCell; + use std::rc::Rc; + use wasm_bindgen_test::*; + use wasm_bindgen_utils::prelude::*; + + #[wasm_bindgen_test] + async fn wrapper_uses_builder_sql_exactly() { + let args = FetchTradesArgs { + chain_ids: vec![137, 42161], + orderbook_addresses: vec![], + ..Default::default() + }; + + let expected_stmt = build_fetch_trades_stmt(&args).unwrap(); + + let store = Rc::new(RefCell::new(( + String::new(), + wasm_bindgen::JsValue::UNDEFINED, + ))); + let callback = create_sql_capturing_callback("[]", store.clone()); + let exec = JsCallbackExecutor::from_ref(&callback); + + let res = fetch_trades(&exec, args).await; + assert!(res.is_ok()); + + let captured = store.borrow().clone(); + assert_eq!(captured.0, expected_stmt.sql); + } +} diff --git a/crates/common/src/raindex_client/local_db/query/mod.rs b/crates/common/src/raindex_client/local_db/query/mod.rs index 386654ab32..99659a8310 100644 --- a/crates/common/src/raindex_client/local_db/query/mod.rs +++ b/crates/common/src/raindex_client/local_db/query/mod.rs @@ -12,6 +12,7 @@ pub mod fetch_owner_trades; pub mod fetch_owner_trades_count; pub mod fetch_store_addresses; pub mod fetch_tables; +pub mod fetch_trades; pub mod fetch_trades_by_tx; pub mod fetch_transaction_by_hash; pub mod fetch_vault_balance_changes; diff --git a/crates/common/src/raindex_client/trades/get_all.rs b/crates/common/src/raindex_client/trades/get_all.rs new file mode 100644 index 0000000000..f4babbca91 --- /dev/null +++ b/crates/common/src/raindex_client/trades/get_all.rs @@ -0,0 +1,641 @@ +use super::*; +use crate::local_db::query::fetch_trades::{ + extract_trades_count, FetchTradesArgs, FetchTradesTokensFilter, +}; +use crate::raindex_client::local_db::query::fetch_trades::{fetch_trades, fetch_trades_count}; +use crate::utils::timing::Timing; +use rain_orderbook_subgraph_client::types::common::{ + SgBigInt, SgBytes, SgTrade, SgTradeOrderFilter, SgTradesListQueryFilters, + SgTradesTokensFilterArgs, +}; +use rain_orderbook_subgraph_client::MultiOrderbookSubgraphClient; +use tsify::Tsify; +use wasm_bindgen_utils::impl_wasm_traits; + +const DEFAULT_PAGE_SIZE: u16 = 100; + +#[derive(Serialize, Deserialize, Debug, Clone, Tsify, Default)] +#[serde(rename_all = "camelCase")] +pub struct GetTradesTokenFilter { + #[cfg_attr(target_family = "wasm", tsify(optional, type = "Address[]"))] + pub inputs: Option>, + #[cfg_attr(target_family = "wasm", tsify(optional, type = "Address[]"))] + pub outputs: Option>, +} +impl_wasm_traits!(GetTradesTokenFilter); + +#[derive(Serialize, Deserialize, Debug, Clone, Tsify, Default)] +#[serde(rename_all = "camelCase", default)] +pub struct GetTradesFilters { + #[tsify(optional, type = "Address[]")] + pub owners: Vec
, + #[tsify(optional, type = "Hex")] + pub order_hash: Option, + #[tsify(optional)] + pub tokens: Option, + #[tsify(optional, type = "Address[]")] + pub orderbook_addresses: Option>, + #[tsify(optional)] + pub time_filter: Option, +} +impl_wasm_traits!(GetTradesFilters); + +impl From for Option { + fn from(filter: GetTradesTokenFilter) -> Self { + let inputs: Vec = filter + .inputs + .unwrap_or_default() + .into_iter() + .map(|token| token.to_string().to_lowercase()) + .collect(); + let outputs: Vec = filter + .outputs + .unwrap_or_default() + .into_iter() + .map(|token| token.to_string().to_lowercase()) + .collect(); + + if inputs.is_empty() && outputs.is_empty() { + None + } else { + Some(SgTradesTokensFilterArgs { inputs, outputs }) + } + } +} + +impl From for SgTradesListQueryFilters { + fn from(filters: GetTradesFilters) -> Self { + let time_filter = filters.time_filter.unwrap_or_default(); + let mut sg_filters = SgTradesListQueryFilters { + timestamp_gte: Some( + time_filter + .start + .map_or(SgBigInt("0".to_string()), |v| SgBigInt(v.to_string())), + ), + timestamp_lte: Some( + time_filter + .end + .map_or(SgBigInt(u64::MAX.to_string()), |v| SgBigInt(v.to_string())), + ), + orderbook_in: filters + .orderbook_addresses + .unwrap_or_default() + .into_iter() + .map(|address| address.to_string().to_lowercase()) + .collect(), + ..Default::default() + }; + + if !filters.owners.is_empty() || filters.order_hash.is_some() { + sg_filters.order_ = Some(SgTradeOrderFilter { + owner_in: filters + .owners + .into_iter() + .map(|owner| SgBytes(owner.to_string())) + .collect(), + order_hash: filters.order_hash.map(|hash| SgBytes(hash.to_string())), + }); + } + + sg_filters + } +} + +fn local_trades_pagination( + has_subgraph_sources: bool, + page_number: u16, + page_size: u16, +) -> (PaginationParams, bool) { + if has_subgraph_sources { + (PaginationParams::default(), false) + } else { + ( + PaginationParams { + page: Some(page_number), + page_size: Some(page_size), + }, + true, + ) + } +} + +fn normalize_trade_tokens(mut tokens: SgTradesTokensFilterArgs) -> SgTradesTokensFilterArgs { + tokens.inputs.sort_unstable(); + tokens.inputs.dedup(); + tokens.outputs.sort_unstable(); + tokens.outputs.dedup(); + tokens +} + +fn sg_trade_matches_token_filter(trade: &SgTrade, tokens: &SgTradesTokensFilterArgs) -> bool { + let has_inputs = !tokens.inputs.is_empty(); + let has_outputs = !tokens.outputs.is_empty(); + let input_token = trade + .input_vault_balance_change + .vault + .token + .address + .0 + .to_lowercase(); + let output_token = trade + .output_vault_balance_change + .vault + .token + .address + .0 + .to_lowercase(); + + if has_inputs && has_outputs && tokens.inputs == tokens.outputs { + tokens.inputs.contains(&input_token) || tokens.outputs.contains(&output_token) + } else { + (!has_inputs || tokens.inputs.contains(&input_token)) + && (!has_outputs || tokens.outputs.contains(&output_token)) + } +} + +#[wasm_export] +impl RaindexClient { + #[wasm_export( + js_name = "getTrades", + return_description = "Trades list result with total count and per-pair summary", + unchecked_return_type = "RaindexTradesListResult", + preserve_js_class + )] + pub async fn get_trades( + &self, + #[wasm_export( + js_name = "chainIds", + param_description = "Specific blockchain networks to query (optional, queries all networks if not specified)" + )] + chain_ids: Option, + #[wasm_export( + param_description = "Filtering criteria including owners, order hash, token addresses, orderbooks, and time range" + )] + filters: Option, + #[wasm_export(param_description = "Page number for pagination (optional, defaults to 1)")] + page: Option, + #[wasm_export( + js_name = "pageSize", + param_description = "Number of items per page (optional, defaults to 100)" + )] + page_size: Option, + ) -> Result { + let route_started = Timing::now(); + let filters = filters.unwrap_or_default(); + let page_number = page.unwrap_or(1).max(1); + let page_size = page_size.unwrap_or(DEFAULT_PAGE_SIZE).max(1); + let ids = chain_ids.map(|ChainIds(ids)| ids); + let requested_chain_ids_count = ids.as_ref().map_or(0, Vec::len); + let (local_db, local_ids, sg_ids) = self.classify_chains(ids)?; + let local_chain_ids_count = local_ids.len(); + let subgraph_chain_ids_count = sg_ids.len(); + let input_tokens_count = filters + .tokens + .as_ref() + .and_then(|tokens| tokens.inputs.as_ref()) + .map_or(0, Vec::len); + let output_tokens_count = filters + .tokens + .as_ref() + .and_then(|tokens| tokens.outputs.as_ref()) + .map_or(0, Vec::len); + let orderbooks_count = filters.orderbook_addresses.as_ref().map_or(0, Vec::len); + + let mut all_trades = Vec::new(); + let mut total_count: u64 = 0; + let mut local_fetch_ms = None; + let mut local_count_ms = None; + let mut local_rows = 0usize; + let mut local_total_count = 0u64; + let mut local_query_paginated = false; + let mut subgraph_fetch_ms = None; + let mut subgraph_rows_before_token_filter = 0usize; + let mut subgraph_rows_after_token_filter = 0usize; + + if let Some(db) = local_db { + let (local_pagination, is_local_query_paginated) = + local_trades_pagination(!sg_ids.is_empty(), page_number, page_size); + local_query_paginated = is_local_query_paginated; + let local_tokens = filters + .tokens + .clone() + .map(|tokens| FetchTradesTokensFilter { + inputs: tokens.inputs.unwrap_or_default(), + outputs: tokens.outputs.unwrap_or_default(), + }) + .unwrap_or_default(); + let local_fetch_started = Timing::now(); + let local_trades = match fetch_trades( + &db, + FetchTradesArgs { + chain_ids: local_ids.clone(), + orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(), + owners: filters.owners.clone(), + order_hash: filters.order_hash, + tokens: local_tokens.clone(), + time_filter: filters.time_filter.clone().unwrap_or_default(), + pagination: local_pagination, + }, + ) + .await + { + Ok(trades) => trades, + Err(err) => { + tracing::error!( + elapsed_ms = local_fetch_started.elapsed_ms(), + local_chain_ids_count, + error = %err, + "getTrades local DB fetch failed" + ); + return Err(err.into()); + } + }; + local_fetch_ms = Some(local_fetch_started.elapsed_ms()); + local_rows = local_trades.len(); + + let local_count_started = Timing::now(); + let count_rows = match fetch_trades_count( + &db, + FetchTradesArgs { + chain_ids: local_ids, + orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(), + owners: filters.owners.clone(), + order_hash: filters.order_hash, + tokens: local_tokens, + time_filter: filters.time_filter.clone().unwrap_or_default(), + pagination: PaginationParams::default(), + }, + ) + .await + { + Ok(count_rows) => count_rows, + Err(err) => { + tracing::error!( + elapsed_ms = local_count_started.elapsed_ms(), + local_chain_ids_count, + error = %err, + "getTrades local DB count failed" + ); + return Err(err.into()); + } + }; + local_count_ms = Some(local_count_started.elapsed_ms()); + local_total_count = extract_trades_count(&count_rows); + total_count += local_total_count; + all_trades.extend( + local_trades + .into_iter() + .map(RaindexTrade::try_from_local_db_trade) + .collect::, _>>()?, + ); + tracing::debug!( + local_chain_ids_count, + rows = local_rows, + total_count = local_total_count, + fetch_ms = local_fetch_ms, + count_ms = local_count_ms, + "getTrades local DB completed" + ); + } + + if !sg_ids.is_empty() { + let multi_subgraph_args = self.get_multi_subgraph_args(Some(sg_ids))?; + let sg_filters: SgTradesListQueryFilters = filters.clone().into(); + let sg_token_filter = filters + .tokens + .clone() + .and_then(Option::::from) + .map(normalize_trade_tokens); + let name_to_chain_id: std::collections::HashMap<&str, u32> = multi_subgraph_args + .iter() + .flat_map(|(chain_id, args)| args.iter().map(|arg| (arg.name.as_str(), *chain_id))) + .collect(); + let client = MultiOrderbookSubgraphClient::new( + multi_subgraph_args.values().flatten().cloned().collect(), + ); + let subgraph_fetch_started = Timing::now(); + let sg_trades = match client.trades_list_all(sg_filters).await { + Ok(trades) => trades, + Err(err) => { + tracing::error!( + elapsed_ms = subgraph_fetch_started.elapsed_ms(), + subgraph_chain_ids_count, + error = %err, + "getTrades subgraph fetch failed" + ); + return Err(err.into()); + } + }; + subgraph_fetch_ms = Some(subgraph_fetch_started.elapsed_ms()); + subgraph_rows_before_token_filter = sg_trades.len(); + let sg_trades: Vec<_> = if let Some(tokens) = sg_token_filter { + let filtered_trades: Vec<_> = sg_trades + .into_iter() + .filter(|trade| sg_trade_matches_token_filter(&trade.trade, &tokens)) + .collect(); + total_count += filtered_trades.len() as u64; + subgraph_rows_after_token_filter = filtered_trades.len(); + filtered_trades + } else { + total_count += sg_trades.len() as u64; + subgraph_rows_after_token_filter = sg_trades.len(); + sg_trades + }; + for trade_with_name in sg_trades { + let chain_id = name_to_chain_id + .get(trade_with_name.subgraph_name.as_str()) + .copied() + .ok_or(RaindexError::SubgraphNotFound( + trade_with_name.subgraph_name.clone(), + trade_with_name.trade.id.0.clone(), + ))?; + all_trades.push(RaindexTrade::try_from_sg_trade( + chain_id, + trade_with_name.trade, + )?); + } + tracing::debug!( + subgraph_chain_ids_count, + rows_before_token_filter = subgraph_rows_before_token_filter, + rows_after_token_filter = subgraph_rows_after_token_filter, + fetch_ms = subgraph_fetch_ms, + "getTrades subgraph completed" + ); + } + + let merge_started = Timing::now(); + all_trades.sort_by(|a, b| b.timestamp.cmp(&a.timestamp).then_with(|| a.id.cmp(&b.id))); + let offset = if local_query_paginated && subgraph_chain_ids_count == 0 { + 0 + } else { + ((page_number - 1) as usize) * (page_size as usize) + }; + let limit = page_size as usize; + let merged_rows_before_pagination = all_trades.len(); + let trades: Vec<_> = all_trades.into_iter().skip(offset).take(limit).collect(); + let returned_rows = trades.len(); + let summary = RaindexPairSummary::from_trades(&trades)?; + let merge_sort_page_ms = merge_started.elapsed_ms(); + tracing::debug!( + page = page_number, + page_size, + requested_chain_ids_count, + local_chain_ids_count, + subgraph_chain_ids_count, + owners_count = filters.owners.len(), + orderbooks_count, + has_order_hash = filters.order_hash.is_some(), + has_time_filter = filters.time_filter.is_some(), + input_tokens_count, + output_tokens_count, + local_fetch_ms, + local_count_ms, + local_query_paginated, + local_rows, + local_total_count, + subgraph_fetch_ms, + subgraph_rows_before_token_filter, + subgraph_rows_after_token_filter, + merged_rows_before_pagination, + returned_rows, + total_count, + merge_sort_page_ms, + total_ms = route_started.elapsed_ms(), + "getTrades completed" + ); + Ok(RaindexTradesListResult { + trades, + total_count, + summary: Some(summary), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::primitives::{address, b256}; + use rain_orderbook_subgraph_client::types::common::{ + SgErc20, SgOrderbook, SgTradeEvent, SgTradeEventTypename, SgTradeRef, + SgTradeStructPartialOrder, SgTradeVaultBalanceChange, SgTransaction, + SgVaultBalanceChangeVault, + }; + + #[test] + fn local_trades_pagination_is_applied_for_local_only_queries() { + let (pagination, is_paginated) = local_trades_pagination(false, 2, 25); + + assert!(is_paginated); + assert_eq!(pagination.page, Some(2)); + assert_eq!(pagination.page_size, Some(25)); + } + + #[test] + fn local_trades_pagination_is_deferred_for_mixed_source_queries() { + let (pagination, is_paginated) = local_trades_pagination(true, 2, 25); + + assert!(!is_paginated); + assert_eq!(pagination.page, None); + assert_eq!(pagination.page_size, None); + } + + #[test] + fn maps_trade_filters_to_subgraph_filters() { + let owner = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + let orderbook = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + let order_hash = + b256!("0x0000000000000000000000000000000000000000000000000000000000abcdef"); + + let filters: SgTradesListQueryFilters = GetTradesFilters { + owners: vec![owner], + order_hash: Some(order_hash), + orderbook_addresses: Some(vec![orderbook]), + time_filter: Some(TimeFilter { + start: Some(10), + end: Some(20), + }), + ..Default::default() + } + .into(); + + let order_filter = filters.order_.unwrap(); + assert_eq!(order_filter.owner_in, vec![SgBytes(owner.to_string())]); + assert_eq!( + order_filter.order_hash, + Some(SgBytes(order_hash.to_string())) + ); + assert_eq!(filters.orderbook_in, vec![format!("{orderbook:#x}")]); + assert_eq!(filters.timestamp_gte, Some(SgBigInt("10".to_string()))); + assert_eq!(filters.timestamp_lte, Some(SgBigInt("20".to_string()))); + } + + #[test] + fn deserializes_partial_filters_without_owners() { + let json = serde_json::json!({ + "tokens": { + "inputs": ["0x1111111111111111111111111111111111111111"] + } + }); + + let filters: GetTradesFilters = serde_json::from_value(json).unwrap(); + + assert!(filters.owners.is_empty()); + assert!(filters.tokens.is_some()); + } + + #[test] + fn maps_token_filters_to_subgraph_candidate_filter_without_unsupported_child_nesting() { + let owner = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + let orderbook = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + let token_a = address!("0x1111111111111111111111111111111111111111"); + let token_b = address!("0x2222222222222222222222222222222222222222"); + let filters: SgTradesListQueryFilters = GetTradesFilters { + owners: vec![owner], + orderbook_addresses: Some(vec![orderbook]), + time_filter: Some(TimeFilter { + start: Some(10), + end: Some(20), + }), + tokens: Some(GetTradesTokenFilter { + inputs: Some(vec![token_b, token_a, token_a]), + outputs: Some(vec![token_a, token_b]), + }), + ..Default::default() + } + .into(); + + let order_filter = filters.order_.as_ref().unwrap(); + assert_eq!(order_filter.owner_in, vec![SgBytes(owner.to_string())]); + assert_eq!(filters.timestamp_gte, Some(SgBigInt("10".to_string()))); + assert_eq!(filters.timestamp_lte, Some(SgBigInt("20".to_string()))); + assert_eq!(filters.orderbook_in, vec![format!("{orderbook:#x}")]); + assert!(filters.or.is_none()); + assert!(filters.input_vault_balance_change_.is_none()); + assert!(filters.output_vault_balance_change_.is_none()); + } + + fn test_sg_trade(input_token: Address, output_token: Address) -> SgTrade { + fn erc20(token: Address) -> SgErc20 { + SgErc20 { + id: SgBytes(token.to_string()), + address: SgBytes(token.to_string()), + name: Some("Token".to_string()), + symbol: Some("TKN".to_string()), + decimals: Some(SgBigInt("18".to_string())), + } + } + + fn balance_change(token: Address) -> SgTradeVaultBalanceChange { + SgTradeVaultBalanceChange { + id: SgBytes(format!("{}-change", token)), + __typename: "TradeVaultBalanceChange".to_string(), + amount: SgBytes("0x01".to_string()), + new_vault_balance: SgBytes("0x01".to_string()), + old_vault_balance: SgBytes("0x00".to_string()), + vault: SgVaultBalanceChangeVault { + id: SgBytes(format!("{}-vault", token)), + vault_id: SgBytes("0x01".to_string()), + token: erc20(token), + }, + timestamp: SgBigInt("10".to_string()), + transaction: SgTransaction { + id: SgBytes( + "0x0000000000000000000000000000000000000000000000000000000000000001" + .to_string(), + ), + from: SgBytes("0x0000000000000000000000000000000000000000".to_string()), + block_number: SgBigInt("1".to_string()), + timestamp: SgBigInt("10".to_string()), + }, + orderbook: SgOrderbook { + id: SgBytes("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()), + }, + trade: SgTradeRef { + trade_event: SgTradeEventTypename { + __typename: "TakeOrder".to_string(), + }, + }, + } + } + + SgTrade { + id: SgBytes( + "0x0000000000000000000000000000000000000000000000000000000000000001".to_string(), + ), + trade_event: SgTradeEvent { + transaction: SgTransaction { + id: SgBytes( + "0x0000000000000000000000000000000000000000000000000000000000000001" + .to_string(), + ), + from: SgBytes("0x0000000000000000000000000000000000000000".to_string()), + block_number: SgBigInt("1".to_string()), + timestamp: SgBigInt("10".to_string()), + }, + sender: SgBytes("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()), + }, + output_vault_balance_change: balance_change(output_token), + order: SgTradeStructPartialOrder { + id: SgBytes("0xorder".to_string()), + order_hash: SgBytes( + "0x0000000000000000000000000000000000000000000000000000000000000001" + .to_string(), + ), + owner: SgBytes("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()), + }, + input_vault_balance_change: balance_change(input_token), + timestamp: SgBigInt("10".to_string()), + orderbook: SgOrderbook { + id: SgBytes("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()), + }, + } + } + + #[test] + fn same_token_post_filter_matches_either_input_or_output_token() { + let token_a = address!("0x1111111111111111111111111111111111111111"); + let token_b = address!("0x2222222222222222222222222222222222222222"); + let token_c = address!("0x3333333333333333333333333333333333333333"); + let tokens = normalize_trade_tokens(SgTradesTokensFilterArgs { + inputs: vec![token_a.to_string(), token_b.to_string()], + outputs: vec![token_b.to_string(), token_a.to_string()], + }); + + assert!(sg_trade_matches_token_filter( + &test_sg_trade(token_a, token_c), + &tokens + )); + assert!(sg_trade_matches_token_filter( + &test_sg_trade(token_c, token_b), + &tokens + )); + assert!(!sg_trade_matches_token_filter( + &test_sg_trade(token_c, token_c), + &tokens + )); + } + + #[test] + fn directional_post_filter_requires_matching_input_and_output_tokens() { + let input = address!("0x1111111111111111111111111111111111111111"); + let output = address!("0x2222222222222222222222222222222222222222"); + let other = address!("0x3333333333333333333333333333333333333333"); + let tokens = normalize_trade_tokens(SgTradesTokensFilterArgs { + inputs: vec![input.to_string()], + outputs: vec![output.to_string()], + }); + + assert!(sg_trade_matches_token_filter( + &test_sg_trade(input, output), + &tokens + )); + assert!(!sg_trade_matches_token_filter( + &test_sg_trade(input, other), + &tokens + )); + assert!(!sg_trade_matches_token_filter( + &test_sg_trade(other, output), + &tokens + )); + } +} diff --git a/crates/common/src/raindex_client/trades/mod.rs b/crates/common/src/raindex_client/trades/mod.rs index ebba11a740..f1f060affa 100644 --- a/crates/common/src/raindex_client/trades/mod.rs +++ b/crates/common/src/raindex_client/trades/mod.rs @@ -1,6 +1,9 @@ +mod get_all; mod get_by_owner; mod get_by_tx; +pub use get_all::{GetTradesFilters, GetTradesTokenFilter}; + use super::local_db::orders::LocalDbOrders; use super::orders::{OrdersDataSource, SubgraphOrders}; use super::ClientRef; diff --git a/crates/common/src/utils/mod.rs b/crates/common/src/utils/mod.rs index 77db7faa28..1dd3ad223b 100644 --- a/crates/common/src/utils/mod.rs +++ b/crates/common/src/utils/mod.rs @@ -2,3 +2,4 @@ pub mod amount_formatter; pub mod float; pub mod serde; pub mod timestamp; +pub mod timing; diff --git a/crates/common/src/utils/timing.rs b/crates/common/src/utils/timing.rs new file mode 100644 index 0000000000..d566757155 --- /dev/null +++ b/crates/common/src/utils/timing.rs @@ -0,0 +1,37 @@ +#[cfg(target_family = "wasm")] +use wasm_bindgen_utils::prelude::js_sys::Date; + +pub struct Timing { + #[cfg(not(target_family = "wasm"))] + started_at: std::time::Instant, + #[cfg(target_family = "wasm")] + started_at_ms: f64, +} + +impl Timing { + pub fn now() -> Self { + Self { + #[cfg(not(target_family = "wasm"))] + started_at: std::time::Instant::now(), + #[cfg(target_family = "wasm")] + started_at_ms: Date::now(), + } + } + + pub fn elapsed_ms(&self) -> u64 { + #[cfg(not(target_family = "wasm"))] + { + self.started_at.elapsed().as_millis() as u64 + } + + #[cfg(target_family = "wasm")] + { + let elapsed = Date::now() - self.started_at_ms; + if elapsed.is_finite() && elapsed > 0.0 { + elapsed as u64 + } else { + 0 + } + } + } +} diff --git a/crates/subgraph/src/multi_orderbook_client.rs b/crates/subgraph/src/multi_orderbook_client.rs index 3c62667ef7..d406ed6023 100644 --- a/crates/subgraph/src/multi_orderbook_client.rs +++ b/crates/subgraph/src/multi_orderbook_client.rs @@ -1,7 +1,8 @@ use crate::{ types::common::{ SgErc20WithSubgraphName, SgOrderWithSubgraphName, SgOrdersListFilterArgs, - SgTradeWithSubgraphName, SgVaultWithSubgraphName, SgVaultsListFilterArgs, + SgTradeWithSubgraphName, SgTradesListQueryFilters, SgVaultWithSubgraphName, + SgVaultsListFilterArgs, }, OrderbookSubgraphClient, OrderbookSubgraphClientError, SgPaginationArgs, }; @@ -21,6 +22,17 @@ impl_wasm_traits!(MultiSubgraphArgs); pub struct MultiOrderbookSubgraphClient { subgraphs: Vec, } + +fn sort_trades(trades: &mut [SgTradeWithSubgraphName]) { + trades.sort_by(|a, b| { + let a_timestamp = a.trade.timestamp.0.parse::().unwrap_or(0); + let b_timestamp = b.trade.timestamp.0.parse::().unwrap_or(0); + b_timestamp + .cmp(&a_timestamp) + .then_with(|| a.trade.id.0.cmp(&b.trade.id.0)) + }); +} + impl MultiOrderbookSubgraphClient { pub fn new(subgraphs: Vec) -> Self { Self { subgraphs } @@ -237,6 +249,171 @@ impl MultiOrderbookSubgraphClient { .collect() } + /// Fetches the general filtered trades list across every configured subgraph. + /// + /// This backs the SDK-level `RaindexClient.getTrades` API. Order-specific trade + /// history still uses `OrderbookSubgraphClient::order_trades_list`. + pub async fn trades_list( + &self, + filters: SgTradesListQueryFilters, + pagination_args: SgPaginationArgs, + ) -> Result, OrderbookSubgraphClientError> { + let futures = self.subgraphs.iter().map(|subgraph| { + let url = subgraph.url.clone(); + let subgraph_name = subgraph.name.clone(); + let filters = filters.clone(); + let pagination_args = pagination_args.clone(); + async move { + let client = self.get_orderbook_subgraph_client(url.clone()); + let result = client + .trades_list(filters, pagination_args) + .await + .map(|trades| { + trades + .into_iter() + .map(|trade| SgTradeWithSubgraphName { + trade, + subgraph_name: subgraph_name.clone(), + }) + .collect::>() + }); + (subgraph_name, url, result) + } + }); + + let results = join_all(futures).await; + let mut all_trades = Vec::new(); + let mut last_error = None; + let mut any_success = false; + for (subgraph_name, url, result) in results { + match result { + Ok(items) => { + any_success = true; + all_trades.extend(items); + } + Err(e) => { + tracing::warn!( + subgraph = %subgraph_name, + url = %url, + error = %e, + "failed to fetch trades from subgraph" + ); + last_error = Some(e); + } + } + } + if !any_success { + if let Some(e) = last_error { + return Err(e); + } + } + + sort_trades(&mut all_trades); + Ok(all_trades) + } + + /// Fetches all filtered trades across every configured subgraph. + /// + /// This is used when callers need to merge and paginate across multiple data + /// sources after fetching. Order-specific trade history still uses + /// `OrderbookSubgraphClient::order_trades_list`. + pub async fn trades_list_all( + &self, + filters: SgTradesListQueryFilters, + ) -> Result, OrderbookSubgraphClientError> { + let futures = self.subgraphs.iter().map(|subgraph| { + let url = subgraph.url.clone(); + let subgraph_name = subgraph.name.clone(); + let filters = filters.clone(); + async move { + let client = self.get_orderbook_subgraph_client(url.clone()); + let result = client.trades_list_all(filters).await.map(|trades| { + trades + .into_iter() + .map(|trade| SgTradeWithSubgraphName { + trade, + subgraph_name: subgraph_name.clone(), + }) + .collect::>() + }); + (subgraph_name, url, result) + } + }); + + let results = join_all(futures).await; + let mut all_trades = Vec::new(); + let mut last_error = None; + let mut any_success = false; + for (subgraph_name, url, result) in results { + match result { + Ok(items) => { + any_success = true; + all_trades.extend(items); + } + Err(e) => { + tracing::warn!( + subgraph = %subgraph_name, + url = %url, + error = %e, + "failed to fetch all trades from subgraph" + ); + last_error = Some(e); + } + } + } + if !any_success { + if let Some(e) = last_error { + return Err(e); + } + } + + sort_trades(&mut all_trades); + Ok(all_trades) + } + + pub async fn trades_count( + &self, + filters: SgTradesListQueryFilters, + ) -> Result { + let futures = self.subgraphs.iter().map(|subgraph| { + let url = subgraph.url.clone(); + let subgraph_name = subgraph.name.clone(); + let filters = filters.clone(); + async move { + let client = self.get_orderbook_subgraph_client(url.clone()); + (subgraph_name, url, client.trades_count(filters).await) + } + }); + + let results = join_all(futures).await; + let mut total: u32 = 0; + let mut last_error = None; + let mut any_success = false; + for (subgraph_name, url, result) in results { + match result { + Ok(count) => { + any_success = true; + total += count; + } + Err(e) => { + tracing::warn!( + subgraph = %subgraph_name, + url = %url, + error = %e, + "failed to count trades from subgraph" + ); + last_error = Some(e); + } + } + } + if !any_success { + if let Some(e) = last_error { + return Err(e); + } + } + Ok(total) + } + pub async fn tokens_list( &self, ) -> Result, OrderbookSubgraphClientError> { @@ -280,10 +457,12 @@ impl MultiOrderbookSubgraphClient { mod tests { use super::*; use crate::cynic_client::CynicClientError; + use crate::orderbook_client::ALL_PAGES_QUERY_PAGE_SIZE; use crate::types::common::{ SgBigInt, SgBytes, SgErc20, SgOrder, SgOrderbook, SgOrdersListFilterArgs, SgTrade, SgTradeEvent, SgTradeEventTypename, SgTradeRef, SgTradeStructPartialOrder, - SgTradeVaultBalanceChange, SgTransaction, SgVault, SgVaultBalanceChangeVault, + SgTradeVaultBalanceChange, SgTradesListQueryFilters, SgTransaction, SgVault, + SgVaultBalanceChangeVault, }; use crate::utils::float::*; use httpmock::prelude::*; @@ -941,6 +1120,406 @@ mod tests { } } + fn sample_sg_trade(id: &str, timestamp: &str) -> SgTrade { + SgTrade { + id: SgBytes(id.to_string()), + timestamp: SgBigInt(timestamp.to_string()), + ..default_sg_trade() + } + } + + fn default_trade_filters() -> SgTradesListQueryFilters { + SgTradesListQueryFilters::default() + } + + #[tokio::test] + async fn test_trades_list_no_subgraphs() { + let client = MultiOrderbookSubgraphClient::new(vec![]); + let trades = client + .trades_list( + default_trade_filters(), + SgPaginationArgs { + page: 1, + page_size: 10, + }, + ) + .await + .unwrap(); + assert!(trades.is_empty()); + } + + #[tokio::test] + async fn test_trades_list_multiple_subgraphs_merge() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + let trade_s1 = sample_sg_trade("0xtrade_old", "100"); + let trade_s2 = sample_sg_trade("0xtrade_new", "200"); + server1.mock(|when, then| { + when.method(POST) + .path("/") + .body_contains("\"first\":10") + .body_contains("\"skip\":0"); + then.status(200) + .json_body(json!({"data": {"trades": [trade_s1]}})); + }); + server2.mock(|when, then| { + when.method(POST) + .path("/") + .body_contains("\"first\":10") + .body_contains("\"skip\":0"); + then.status(200) + .json_body(json!({"data": {"trades": [trade_s2]}})); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two".to_string(), + }, + ]); + + let trades = client + .trades_list( + default_trade_filters(), + SgPaginationArgs { + page: 1, + page_size: 10, + }, + ) + .await + .unwrap(); + assert_eq!(trades.len(), 2); + let names: std::collections::HashSet<_> = trades + .iter() + .map(|trade| trade.subgraph_name.as_str()) + .collect(); + assert!(names.contains("sg_one")); + assert!(names.contains("sg_two")); + assert_eq!(trades[0].trade.id.0, "0xtrade_new"); + assert_eq!(trades[1].trade.id.0, "0xtrade_old"); + } + + #[tokio::test] + async fn test_trades_list_one_subgraph_errors_others_succeed() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + let trade_s1 = default_sg_trade(); + server1.mock(|when, then| { + when.method(POST).path("/"); + then.status(200) + .json_body(json!({"data": {"trades": [trade_s1]}})); + }); + server2.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two_err".to_string(), + }, + ]); + + let trades = client + .trades_list( + default_trade_filters(), + SgPaginationArgs { + page: 1, + page_size: 10, + }, + ) + .await + .unwrap(); + assert_eq!(trades.len(), 1); + assert_eq!(trades[0].subgraph_name, "sg_one"); + } + + #[tokio::test] + async fn test_trades_list_every_subgraph_errors() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + server1.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + server2.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one_err".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two_err".to_string(), + }, + ]); + + let result = client + .trades_list( + default_trade_filters(), + SgPaginationArgs { + page: 1, + page_size: 10, + }, + ) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_trades_list_all_multiple_subgraphs_merge() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + let trade_s1 = sample_sg_trade("0xtrade_old", "100"); + let trade_s2 = sample_sg_trade("0xtrade_new", "200"); + server1.mock(|when, then| { + when.method(POST) + .path("/") + .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE)) + .body_contains("\"skip\":0"); + then.status(200) + .json_body(json!({"data": {"trades": [trade_s1]}})); + }); + server2.mock(|when, then| { + when.method(POST) + .path("/") + .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE)) + .body_contains("\"skip\":0"); + then.status(200) + .json_body(json!({"data": {"trades": [trade_s2]}})); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two".to_string(), + }, + ]); + + let trades = client + .trades_list_all(default_trade_filters()) + .await + .unwrap(); + assert_eq!(trades.len(), 2); + let names: std::collections::HashSet<_> = trades + .iter() + .map(|trade| trade.subgraph_name.as_str()) + .collect(); + assert!(names.contains("sg_one")); + assert!(names.contains("sg_two")); + assert_eq!(trades[0].trade.id.0, "0xtrade_new"); + assert_eq!(trades[1].trade.id.0, "0xtrade_old"); + } + + #[tokio::test] + async fn test_trades_list_all_one_subgraph_errors_others_succeed() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + let trade_s1 = default_sg_trade(); + server1.mock(|when, then| { + when.method(POST).path("/"); + then.status(200) + .json_body(json!({"data": {"trades": [trade_s1]}})); + }); + server2.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two_err".to_string(), + }, + ]); + + let trades = client + .trades_list_all(default_trade_filters()) + .await + .unwrap(); + assert_eq!(trades.len(), 1); + assert_eq!(trades[0].subgraph_name, "sg_one"); + } + + #[tokio::test] + async fn test_trades_list_all_every_subgraph_errors() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + server1.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + server2.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one_err".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two_err".to_string(), + }, + ]); + + let result = client.trades_list_all(default_trade_filters()).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_trades_count_no_subgraphs() { + let client = MultiOrderbookSubgraphClient::new(vec![]); + let count = client.trades_count(default_trade_filters()).await.unwrap(); + assert_eq!(count, 0); + } + + #[tokio::test] + async fn test_trades_count_multiple_subgraphs_sum() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + let trades_s1 = vec![default_sg_trade(), default_sg_trade()]; + let trades_s2 = vec![default_sg_trade(), default_sg_trade(), default_sg_trade()]; + server1.mock(|when, then| { + when.method(POST) + .path("/") + .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE)) + .body_contains("\"skip\":0"); + then.status(200) + .json_body(json!({"data": {"trades": trades_s1}})); + }); + server2.mock(|when, then| { + when.method(POST) + .path("/") + .body_contains(format!("\"first\":{}", ALL_PAGES_QUERY_PAGE_SIZE)) + .body_contains("\"skip\":0"); + then.status(200) + .json_body(json!({"data": {"trades": trades_s2}})); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two".to_string(), + }, + ]); + + let count = client.trades_count(default_trade_filters()).await.unwrap(); + assert_eq!(count, 5); + } + + #[tokio::test] + async fn test_trades_count_one_subgraph_errors_others_succeed() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + let trades_s1 = vec![default_sg_trade(), default_sg_trade()]; + server1.mock(|when, then| { + when.method(POST).path("/"); + then.status(200) + .json_body(json!({"data": {"trades": trades_s1}})); + }); + server2.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two_err".to_string(), + }, + ]); + + let count = client.trades_count(default_trade_filters()).await.unwrap(); + assert_eq!(count, 2); + } + + #[tokio::test] + async fn test_trades_count_all_subgraphs_error() { + let server1 = MockServer::start_async().await; + let sg1_url = Url::parse(&server1.url("")).unwrap(); + let server2 = MockServer::start_async().await; + let sg2_url = Url::parse(&server2.url("")).unwrap(); + + server1.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + server2.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + + let client = MultiOrderbookSubgraphClient::new(vec![ + MultiSubgraphArgs { + url: sg1_url, + name: "sg_one_err".to_string(), + }, + MultiSubgraphArgs { + url: sg2_url, + name: "sg_two_err".to_string(), + }, + ]); + + let result = client.trades_count(default_trade_filters()).await; + assert!(result.is_err()); + } + #[tokio::test] async fn test_trades_by_transaction_no_subgraphs() { let client = MultiOrderbookSubgraphClient::new(vec![]); diff --git a/crates/subgraph/src/orderbook_client/mod.rs b/crates/subgraph/src/orderbook_client/mod.rs index 2a73cadd06..f93fb516a0 100644 --- a/crates/subgraph/src/orderbook_client/mod.rs +++ b/crates/subgraph/src/orderbook_client/mod.rs @@ -7,7 +7,7 @@ use crate::types::order::{ SgOrderDetailByHashQueryVariables, SgOrderDetailByIdQuery, SgOrderIdList, SgOrdersListQuery, }; use crate::types::order_trade::{ - SgOrderTradeDetailQuery, SgOrderTradesListQuery, SgOwnerTradesListQuery, + SgOrderTradeDetailQuery, SgOrderTradesListQuery, SgOwnerTradesListQuery, SgTradesListQuery, SgTransactionTradesQuery, }; use crate::types::remove_order::{ diff --git a/crates/subgraph/src/orderbook_client/order_trade.rs b/crates/subgraph/src/orderbook_client/order_trade.rs index 9e1ea41fa9..3adc44c7b0 100644 --- a/crates/subgraph/src/orderbook_client/order_trade.rs +++ b/crates/subgraph/src/orderbook_client/order_trade.rs @@ -116,6 +116,81 @@ impl OrderbookSubgraphClient { Ok(data.trades) } + /// Fetch a single page of trades matching the provided filters. + /// + /// The filters are passed directly to the subgraph `trades` query and + /// pagination is converted with `parse_pagination_args`. Results are ordered + /// by timestamp descending by the GraphQL query. Subgraph query errors are + /// returned as `OrderbookSubgraphClientError`. + pub async fn trades_list( + &self, + filters: SgTradesListQueryFilters, + pagination_args: SgPaginationArgs, + ) -> Result, OrderbookSubgraphClientError> { + let pagination_variables = Self::parse_pagination_args(pagination_args); + let data = self + .query::(SgTradesListQueryVariables { + first: pagination_variables.first, + skip: pagination_variables.skip, + filters: Some(filters), + }) + .await?; + + Ok(data.trades) + } + + async fn fetch_all_trades_pages( + &self, + filters: SgTradesListQueryFilters, + ) -> Result, OrderbookSubgraphClientError> { + let mut all_pages_merged = vec![]; + let mut page: u16 = 1; + + loop { + let page_data = self + .trades_list( + filters.clone(), + SgPaginationArgs { + page, + page_size: ALL_PAGES_QUERY_PAGE_SIZE, + }, + ) + .await?; + let batch_len = page_data.len(); + all_pages_merged.extend(page_data); + if batch_len < ALL_PAGES_QUERY_PAGE_SIZE as usize { + break; + } + page += 1; + } + + Ok(all_pages_merged) + } + + /// Fetch all trades matching the provided filters across every page. + /// + /// This repeatedly calls `trades_list` using the standard all-pages query page + /// size until a partial page is returned. Any subgraph query error encountered + /// while fetching a page is propagated. + pub async fn trades_list_all( + &self, + filters: SgTradesListQueryFilters, + ) -> Result, OrderbookSubgraphClientError> { + self.fetch_all_trades_pages(filters).await + } + + /// Count all trades matching the provided filters. + /// + /// This walks every page with `fetch_all_trades_pages` and returns the number + /// of matching trades. Any subgraph query error encountered while fetching + /// pages is propagated. + pub async fn trades_count( + &self, + filters: SgTradesListQueryFilters, + ) -> Result { + Ok(self.fetch_all_trades_pages(filters).await?.len() as u32) + } + pub async fn trades_by_owner_all( &self, owner: String, diff --git a/crates/subgraph/src/types/common.rs b/crates/subgraph/src/types/common.rs index ae4fbd5c67..865592c4d2 100644 --- a/crates/subgraph/src/types/common.rs +++ b/crates/subgraph/src/types/common.rs @@ -10,6 +10,14 @@ pub struct SgOrdersTokensFilterArgs { } impl_wasm_traits!(SgOrdersTokensFilterArgs); +#[derive(Debug, Clone, Serialize, Deserialize, Tsify, Default)] +#[serde(rename_all = "camelCase")] +pub struct SgTradesTokensFilterArgs { + pub inputs: Vec, + pub outputs: Vec, +} +impl_wasm_traits!(SgTradesTokensFilterArgs); + #[derive(cynic::QueryVariables, Debug, Clone, Tsify)] pub struct SgIdQueryVariables<'a> { #[cfg_attr(target_family = "wasm", tsify(type = "string"))] @@ -129,6 +137,72 @@ pub struct SgOwnerTradesQueryVariables { pub orderbook_in: Option>, } +#[derive(cynic::QueryVariables, Debug, Clone, Tsify)] +pub struct SgTradesListQueryVariables { + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub first: Option, + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub skip: Option, + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub filters: Option, +} + +#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] +#[cynic(graphql_type = "Order_filter")] +pub struct SgTradeOrderFilter { + #[cynic(rename = "owner_in", skip_serializing_if = "Vec::is_empty")] + pub owner_in: Vec, + #[cynic(rename = "orderHash", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub order_hash: Option, +} + +#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] +#[cynic(graphql_type = "Vault_filter")] +pub struct SgTradeVaultTokenFilter { + #[cynic(rename = "token_in", skip_serializing_if = "Vec::is_empty")] + pub token_in: Vec, +} + +#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] +#[cynic(graphql_type = "TradeVaultBalanceChange_filter")] +pub struct SgTradeVaultBalanceChangeTokenFilter { + #[cynic(rename = "vault_", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub vault_: Option, +} + +#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)] +#[cynic(graphql_type = "Trade_filter")] +pub struct SgTradesListQueryFilters { + #[cynic(rename = "order_", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub order_: Option, + #[cynic(rename = "timestamp_gte", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub timestamp_gte: Option, + #[cynic(rename = "timestamp_lte", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub timestamp_lte: Option, + #[cynic(rename = "orderbook_in", skip_serializing_if = "Vec::is_empty")] + pub orderbook_in: Vec, + #[cynic( + rename = "inputVaultBalanceChange_", + skip_serializing_if = "Option::is_none" + )] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub input_vault_balance_change_: Option, + #[cynic( + rename = "outputVaultBalanceChange_", + skip_serializing_if = "Option::is_none" + )] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub output_vault_balance_change_: Option, + #[cynic(rename = "or", skip_serializing_if = "Option::is_none")] + #[cfg_attr(target_family = "wasm", tsify(optional))] + pub or: Option>, +} + #[derive(cynic::QueryFragment, Debug, Serialize, Clone, Tsify)] #[cynic(graphql_type = "Orderbook")] pub struct SgOrderbook { diff --git a/crates/subgraph/src/types/order_trade.rs b/crates/subgraph/src/types/order_trade.rs index c5674c896a..691fbc8de5 100644 --- a/crates/subgraph/src/types/order_trade.rs +++ b/crates/subgraph/src/types/order_trade.rs @@ -53,6 +53,20 @@ pub struct SgOwnerTradesListQuery { pub trades: Vec, } +#[derive(cynic::QueryFragment, Debug, Clone, Serialize)] +#[cynic(graphql_type = "Query", variables = "SgTradesListQueryVariables")] +#[cfg_attr(target_family = "wasm", derive(Tsify))] +pub struct SgTradesListQuery { + #[arguments( + skip: $skip, + first: $first, + orderBy: "timestamp", + orderDirection: "desc", + where: $filters + )] + pub trades: Vec, +} + #[derive(cynic::QueryFragment, Debug, Clone, Serialize)] #[cynic( graphql_type = "Query",