diff --git a/crates/common/src/local_db/query/fetch_trades_by_tx/mod.rs b/crates/common/src/local_db/query/fetch_trades_by_tx/mod.rs new file mode 100644 index 0000000000..73eddcb5b2 --- /dev/null +++ b/crates/common/src/local_db/query/fetch_trades_by_tx/mod.rs @@ -0,0 +1,42 @@ +use crate::local_db::{ + query::{SqlBuildError, SqlStatement, SqlValue}, + OrderbookIdentifier, +}; +use alloy::primitives::B256; + +const QUERY_TEMPLATE: &str = include_str!("query.sql"); + +pub fn build_fetch_trades_by_tx_stmt( + ob_id: &OrderbookIdentifier, + tx_hash: B256, +) -> Result { + let mut stmt = SqlStatement::new(QUERY_TEMPLATE); + stmt.push(SqlValue::from(ob_id.chain_id)); + stmt.push(SqlValue::from(ob_id.orderbook_address)); + stmt.push(SqlValue::from(tx_hash)); + Ok(stmt) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy::{ + hex, + primitives::{b256, Address}, + }; + + #[test] + fn builds_with_chain_id_and_tx_hash() { + let tx_hash = b256!("0x00000000000000000000000000000000000000000000000000000000deadface"); + let stmt = + build_fetch_trades_by_tx_stmt(&OrderbookIdentifier::new(137, Address::ZERO), tx_hash) + .unwrap(); + assert_eq!(stmt.params.len(), 3); + assert_eq!(stmt.params[0], SqlValue::U64(137)); + assert_eq!(stmt.params[1], SqlValue::Text(Address::ZERO.to_string())); + assert_eq!( + stmt.params[2], + SqlValue::Text(hex::encode_prefixed(tx_hash)) + ); + } +} diff --git a/crates/common/src/local_db/query/fetch_trades_by_tx/query.sql b/crates/common/src/local_db/query/fetch_trades_by_tx/query.sql new file mode 100644 index 0000000000..54fbed1072 --- /dev/null +++ b/crates/common/src/local_db/query/fetch_trades_by_tx/query.sql @@ -0,0 +1,335 @@ +WITH +params AS ( + SELECT + ?1 AS chain_id, + ?2 AS orderbook_address, + ?3 AS transaction_hash +), +take_trades AS ( + SELECT + 'take' AS trade_kind, + t.chain_id, + t.orderbook_address, + oe.order_hash, + t.order_owner, + t.order_nonce, + t.transaction_hash, + t.log_index, + t.block_number, + t.block_timestamp, + t.sender AS transaction_sender, + io_in.vault_id AS input_vault_id, + io_in.token AS input_token, + t.taker_output AS input_delta, + io_out.vault_id AS output_vault_id, + io_out.token AS output_token, + FLOAT_NEGATE(t.taker_input) AS output_delta + FROM take_orders t + JOIN params p + ON t.chain_id = p.chain_id + AND t.orderbook_address = p.orderbook_address + AND t.transaction_hash = p.transaction_hash + JOIN order_events oe + ON oe.chain_id = t.chain_id + AND oe.orderbook_address = t.orderbook_address + AND oe.order_owner = t.order_owner + AND oe.order_nonce = t.order_nonce + AND oe.event_type = 'AddOrderV3' + AND ( + oe.block_number < t.block_number + OR (oe.block_number = t.block_number AND oe.log_index <= t.log_index) + ) + AND NOT EXISTS ( + SELECT 1 + FROM order_events newer + WHERE newer.chain_id = oe.chain_id + AND newer.orderbook_address = oe.orderbook_address + AND newer.order_owner = oe.order_owner + AND newer.order_nonce = oe.order_nonce + AND newer.event_type = 'AddOrderV3' + AND ( + newer.block_number < t.block_number + OR (newer.block_number = t.block_number AND newer.log_index <= t.log_index) + ) + AND ( + newer.block_number > oe.block_number + OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index) + ) + ) + JOIN order_ios io_in + ON io_in.chain_id = oe.chain_id + AND io_in.orderbook_address = oe.orderbook_address + AND io_in.transaction_hash = oe.transaction_hash + AND io_in.log_index = oe.log_index + AND io_in.io_index = t.input_io_index + AND io_in.io_type = 'input' + JOIN order_ios io_out + ON io_out.chain_id = oe.chain_id + AND io_out.orderbook_address = oe.orderbook_address + AND io_out.transaction_hash = oe.transaction_hash + AND io_out.log_index = oe.log_index + AND io_out.io_index = t.output_io_index + AND io_out.io_type = 'output' +), +clear_alice AS ( + SELECT DISTINCT + 'clear' AS trade_kind, + c.chain_id, + c.orderbook_address, + oe.order_hash, + oe.order_owner, + oe.order_nonce, + c.transaction_hash, + c.log_index, + c.block_number, + c.block_timestamp, + c.sender AS transaction_sender, + c.alice_input_vault_id AS input_vault_id, + io_in.token AS input_token, + a.alice_input AS input_delta, + c.alice_output_vault_id AS output_vault_id, + io_out.token AS output_token, + FLOAT_NEGATE(a.alice_output) AS output_delta + FROM clear_v3_events c + JOIN params p + ON c.chain_id = p.chain_id + AND c.orderbook_address = p.orderbook_address + AND c.transaction_hash = p.transaction_hash + JOIN order_events oe + ON oe.chain_id = c.chain_id + AND oe.orderbook_address = c.orderbook_address + AND oe.order_hash = c.alice_order_hash + AND oe.event_type = 'AddOrderV3' + AND ( + oe.block_number < c.block_number + OR (oe.block_number = c.block_number AND oe.log_index <= c.log_index) + ) + AND NOT EXISTS ( + SELECT 1 + FROM order_events newer + WHERE newer.chain_id = oe.chain_id + AND newer.orderbook_address = oe.orderbook_address + AND newer.order_hash = oe.order_hash + AND newer.event_type = 'AddOrderV3' + AND ( + newer.block_number < c.block_number + OR (newer.block_number = c.block_number AND newer.log_index <= c.log_index) + ) + AND ( + newer.block_number > oe.block_number + OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index) + ) + ) + JOIN after_clear_v2_events a + ON a.chain_id = c.chain_id + AND a.orderbook_address = c.orderbook_address + AND a.transaction_hash = c.transaction_hash + AND a.log_index = ( + SELECT MIN(ac.log_index) + FROM after_clear_v2_events ac + WHERE ac.chain_id = c.chain_id + AND ac.orderbook_address = c.orderbook_address + AND ac.transaction_hash = c.transaction_hash + AND ac.log_index > c.log_index + ) + JOIN order_ios io_in + ON io_in.chain_id = oe.chain_id + AND io_in.orderbook_address = oe.orderbook_address + AND io_in.transaction_hash = oe.transaction_hash + AND io_in.log_index = oe.log_index + AND io_in.io_index = c.alice_input_io_index + AND io_in.io_type = 'input' + JOIN order_ios io_out + ON io_out.chain_id = oe.chain_id + AND io_out.orderbook_address = oe.orderbook_address + AND io_out.transaction_hash = oe.transaction_hash + AND io_out.log_index = oe.log_index + AND io_out.io_index = c.alice_output_io_index + AND io_out.io_type = 'output' +), +clear_bob AS ( + SELECT DISTINCT + 'clear' AS trade_kind, + c.chain_id, + c.orderbook_address, + oe.order_hash, + oe.order_owner, + oe.order_nonce, + c.transaction_hash, + c.log_index, + c.block_number, + c.block_timestamp, + c.sender AS transaction_sender, + c.bob_input_vault_id AS input_vault_id, + io_in.token AS input_token, + a.bob_input AS input_delta, + c.bob_output_vault_id AS output_vault_id, + io_out.token AS output_token, + FLOAT_NEGATE(a.bob_output) AS output_delta + FROM clear_v3_events c + JOIN params p + ON c.chain_id = p.chain_id + AND c.orderbook_address = p.orderbook_address + AND c.transaction_hash = p.transaction_hash + JOIN order_events oe + ON oe.chain_id = c.chain_id + AND oe.orderbook_address = c.orderbook_address + AND oe.order_hash = c.bob_order_hash + AND oe.event_type = 'AddOrderV3' + AND ( + oe.block_number < c.block_number + OR (oe.block_number = c.block_number AND oe.log_index <= c.log_index) + ) + AND NOT EXISTS ( + SELECT 1 + FROM order_events newer + WHERE newer.chain_id = oe.chain_id + AND newer.orderbook_address = oe.orderbook_address + AND newer.order_hash = oe.order_hash + AND newer.event_type = 'AddOrderV3' + AND ( + newer.block_number < c.block_number + OR (newer.block_number = c.block_number AND newer.log_index <= c.log_index) + ) + AND ( + newer.block_number > oe.block_number + OR (newer.block_number = oe.block_number AND newer.log_index > oe.log_index) + ) + ) + JOIN after_clear_v2_events a + ON a.chain_id = c.chain_id + AND a.orderbook_address = c.orderbook_address + AND a.transaction_hash = c.transaction_hash + AND a.log_index = ( + SELECT MIN(ac.log_index) + FROM after_clear_v2_events ac + WHERE ac.chain_id = c.chain_id + AND ac.orderbook_address = c.orderbook_address + AND ac.transaction_hash = c.transaction_hash + AND ac.log_index > c.log_index + ) + JOIN order_ios io_in + ON io_in.chain_id = oe.chain_id + AND io_in.orderbook_address = oe.orderbook_address + AND io_in.transaction_hash = oe.transaction_hash + AND io_in.log_index = oe.log_index + AND io_in.io_index = c.bob_input_io_index + AND io_in.io_type = 'input' + JOIN order_ios io_out + ON io_out.chain_id = oe.chain_id + AND io_out.orderbook_address = oe.orderbook_address + AND io_out.transaction_hash = oe.transaction_hash + AND io_out.log_index = oe.log_index + AND io_out.io_index = c.bob_output_io_index + AND io_out.io_type = 'output' +), +clear_trades AS ( + SELECT * FROM clear_alice + UNION ALL + SELECT * FROM clear_bob +), +unioned_trades AS ( + SELECT * FROM take_trades + UNION ALL + SELECT * FROM clear_trades +), +trade_rows AS ( + SELECT + ut.trade_kind, + ut.chain_id, + ut.orderbook_address, + ut.order_hash, + ut.order_owner, + ut.order_nonce, + ut.transaction_hash, + ut.log_index, + ut.block_number, + ut.block_timestamp, + ut.transaction_sender, + ut.input_vault_id, + ut.input_token, + ut.input_delta, + ut.output_vault_id, + ut.output_token, + ut.output_delta + FROM unioned_trades ut +), +trade_with_snapshots AS ( + SELECT + tr.*, + mvb_in.balance AS input_base_balance, + mvb_in.last_block AS input_base_block, + mvb_in.last_log_index AS input_base_log_index, + mvb_out.balance AS output_base_balance, + mvb_out.last_block AS output_base_block, + mvb_out.last_log_index AS output_base_log_index + FROM trade_rows tr + LEFT JOIN running_vault_balances mvb_in + ON mvb_in.chain_id = tr.chain_id + AND mvb_in.orderbook_address = tr.orderbook_address + AND mvb_in.owner = tr.order_owner + AND mvb_in.token = tr.input_token + AND mvb_in.vault_id = tr.input_vault_id + LEFT JOIN running_vault_balances mvb_out + ON mvb_out.chain_id = tr.chain_id + AND mvb_out.orderbook_address = tr.orderbook_address + AND mvb_out.owner = tr.order_owner + AND mvb_out.token = tr.output_token + AND mvb_out.vault_id = tr.output_vault_id +) +SELECT + tws.trade_kind, + tws.orderbook_address AS orderbook, + tws.order_hash, + tws.order_owner, + tws.order_nonce, + tws.transaction_hash, + tws.log_index, + tws.block_number, + tws.block_timestamp, + tws.transaction_sender, + tws.input_vault_id, + tws.input_token, + tok_in.name AS input_token_name, + tok_in.symbol AS input_token_symbol, + tok_in.decimals AS input_token_decimals, + tws.input_delta, + vbc_input.running_balance AS input_running_balance, + tws.output_vault_id, + tws.output_token, + tok_out.name AS output_token_name, + tok_out.symbol AS output_token_symbol, + tok_out.decimals AS output_token_decimals, + tws.output_delta, + vbc_output.running_balance AS output_running_balance, + ( + '0x' || + lower(replace(tws.transaction_hash, '0x', '')) || + printf('%016x', tws.log_index) + ) AS trade_id +FROM trade_with_snapshots tws +LEFT JOIN vault_balance_changes vbc_input + ON vbc_input.chain_id = tws.chain_id + AND vbc_input.orderbook_address = tws.orderbook_address + AND vbc_input.owner = tws.order_owner + AND vbc_input.token = tws.input_token + AND vbc_input.vault_id = tws.input_vault_id + AND vbc_input.block_number = tws.block_number + AND vbc_input.log_index = tws.log_index +LEFT JOIN vault_balance_changes vbc_output + ON vbc_output.chain_id = tws.chain_id + AND vbc_output.orderbook_address = tws.orderbook_address + AND vbc_output.owner = tws.order_owner + AND vbc_output.token = tws.output_token + AND vbc_output.vault_id = tws.output_vault_id + AND vbc_output.block_number = tws.block_number + AND vbc_output.log_index = tws.log_index +LEFT JOIN erc20_tokens tok_in + ON tok_in.chain_id = tws.chain_id + AND tok_in.orderbook_address = tws.orderbook_address + AND tok_in.token_address = tws.input_token +LEFT JOIN erc20_tokens tok_out + ON tok_out.chain_id = tws.chain_id + AND tok_out.orderbook_address = tws.orderbook_address + AND tok_out.token_address = tws.output_token +ORDER BY tws.block_timestamp DESC, tws.block_number DESC, tws.log_index DESC, tws.trade_kind; diff --git a/crates/common/src/local_db/query/mod.rs b/crates/common/src/local_db/query/mod.rs index 1ddeb6cf69..0ab9ebbce3 100644 --- a/crates/common/src/local_db/query/mod.rs +++ b/crates/common/src/local_db/query/mod.rs @@ -14,6 +14,7 @@ pub mod fetch_orders; pub mod fetch_store_addresses; pub mod fetch_tables; pub mod fetch_target_watermark; +pub mod fetch_trades_by_tx; pub mod fetch_transaction_by_hash; pub mod fetch_vault_balance_changes; pub mod fetch_vaults; diff --git a/crates/common/src/raindex_client/local_db/mod.rs b/crates/common/src/raindex_client/local_db/mod.rs index c18929415c..931ae5b14a 100644 --- a/crates/common/src/raindex_client/local_db/mod.rs +++ b/crates/common/src/raindex_client/local_db/mod.rs @@ -16,6 +16,7 @@ pub mod executor; pub mod orders; pub mod pipeline; pub mod query; +pub mod trades; pub mod transactions; pub mod vaults; diff --git a/crates/common/src/raindex_client/local_db/query/fetch_trades_by_tx.rs b/crates/common/src/raindex_client/local_db/query/fetch_trades_by_tx.rs new file mode 100644 index 0000000000..d5af31046e --- /dev/null +++ b/crates/common/src/raindex_client/local_db/query/fetch_trades_by_tx.rs @@ -0,0 +1,57 @@ +use crate::local_db::query::fetch_order_trades::LocalDbOrderTrade; +use crate::local_db::query::fetch_trades_by_tx::build_fetch_trades_by_tx_stmt; +use crate::local_db::query::{LocalDbQueryError, LocalDbQueryExecutor}; +use crate::local_db::OrderbookIdentifier; +use alloy::primitives::B256; + +pub async fn fetch_trades_by_tx( + exec: &E, + ob_id: &OrderbookIdentifier, + tx_hash: B256, +) -> Result, LocalDbQueryError> { + let stmt = build_fetch_trades_by_tx_stmt(ob_id, tx_hash)?; + exec.query_json(&stmt).await +} + +#[cfg(all(test, target_family = "wasm"))] +mod wasm_tests { + use super::*; + use crate::raindex_client::local_db::executor::tests::create_sql_capturing_callback; + use crate::raindex_client::local_db::executor::JsCallbackExecutor; + use alloy::primitives::{b256, Address}; + use std::cell::RefCell; + use std::rc::Rc; + use wasm_bindgen_test::*; + use wasm_bindgen_utils::prelude::*; + + #[wasm_bindgen_test] + async fn wrapper_uses_builder_sql_exactly() { + let chain_id = 111; + let orderbook = Address::from([0x77; 20]); + let tx_hash = b256!("0x000000000000000000000000000000000000000000000000000000000000abcd"); + + let expected_stmt = build_fetch_trades_by_tx_stmt( + &OrderbookIdentifier::new(chain_id, orderbook), + tx_hash.clone(), + ) + .unwrap(); + + let store = Rc::new(RefCell::new(( + String::new(), + wasm_bindgen::JsValue::UNDEFINED, + ))); + let callback = create_sql_capturing_callback("[]", store.clone()); + let exec = JsCallbackExecutor::from_ref(&callback); + + let res = super::fetch_trades_by_tx( + &exec, + &OrderbookIdentifier::new(chain_id, orderbook), + tx_hash, + ) + .await; + assert!(res.is_ok()); + + let captured = store.borrow().clone(); + assert_eq!(captured.0, expected_stmt.sql); + } +} diff --git a/crates/common/src/raindex_client/local_db/query/mod.rs b/crates/common/src/raindex_client/local_db/query/mod.rs index d8a7921360..3a18a261d2 100644 --- a/crates/common/src/raindex_client/local_db/query/mod.rs +++ b/crates/common/src/raindex_client/local_db/query/mod.rs @@ -9,6 +9,7 @@ pub mod fetch_order_vaults_volume; pub mod fetch_orders; pub mod fetch_store_addresses; pub mod fetch_tables; +pub mod fetch_trades_by_tx; pub mod fetch_transaction_by_hash; pub mod fetch_vault_balance_changes; pub mod fetch_vaults; diff --git a/crates/common/src/raindex_client/local_db/trades.rs b/crates/common/src/raindex_client/local_db/trades.rs new file mode 100644 index 0000000000..9592a869c8 --- /dev/null +++ b/crates/common/src/raindex_client/local_db/trades.rs @@ -0,0 +1,155 @@ +use super::super::trades::RaindexTrade; +use super::query::fetch_trades_by_tx::fetch_trades_by_tx; +use super::{LocalDb, RaindexError}; +use crate::local_db::OrderbookIdentifier; +use alloy::primitives::B256; + +pub struct LocalDbTrades<'a> { + pub(crate) db: &'a LocalDb, +} + +impl<'a> LocalDbTrades<'a> { + pub(crate) fn new(db: &'a LocalDb) -> Self { + Self { db } + } + + pub async fn get_by_tx_hash( + &self, + ob_id: &OrderbookIdentifier, + tx_hash: B256, + ) -> Result, RaindexError> { + let local_trades = fetch_trades_by_tx(self.db, ob_id, tx_hash).await?; + local_trades + .into_iter() + .map(|trade| RaindexTrade::try_from_local_db_trade(ob_id.chain_id, trade)) + .collect() + } +} + +#[cfg(test)] +mod tests { + #[cfg(target_family = "wasm")] + use super::*; + + #[cfg(target_family = "wasm")] + mod wasm_tests { + use super::*; + use crate::raindex_client::local_db::executor::JsCallbackExecutor; + use crate::raindex_client::local_db::LocalDb; + use alloy::primitives::{address, b256}; + use serde_json::json; + use wasm_bindgen_test::wasm_bindgen_test; + use wasm_bindgen_utils::prelude::*; + + fn create_mock_callback(response_json: &str) -> js_sys::Function { + let json_str = response_json.to_string(); + let result = WasmEncodedResult::Success:: { + value: json_str, + error: None, + }; + let payload = js_sys::JSON::stringify(&serde_wasm_bindgen::to_value(&result).unwrap()) + .unwrap() + .as_string() + .unwrap(); + + let closure = + Closure::wrap(Box::new(move |_sql: String, _params: JsValue| -> JsValue { + js_sys::JSON::parse(&payload).unwrap() + }) + as Box JsValue>); + + closure.into_js_value().dyn_into().unwrap() + } + + #[wasm_bindgen_test] + async fn test_get_by_tx_hash_returns_trades_when_found() { + let tx_hash = + b256!("0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"); + let orderbook = address!("0x2222222222222222222222222222222222222222"); + let order_hash = + b256!("0x1111111111111111111111111111111111111111111111111111111111111111"); + let input_token = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + let output_token = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + let sender = address!("0x3333333333333333333333333333333333333333"); + let owner = address!("0x4444444444444444444444444444444444444444"); + + let trade_json = json!([{ + "trade_kind": "take", + "orderbook": orderbook.to_string(), + "order_hash": order_hash.to_string(), + "order_owner": owner.to_string(), + "order_nonce": "1", + "transaction_hash": tx_hash.to_string(), + "log_index": 5, + "block_number": 12345, + "block_timestamp": 1700000000u64, + "transaction_sender": sender.to_string(), + "input_vault_id": "0x01", + "input_token": input_token.to_string(), + "input_token_name": "Token A", + "input_token_symbol": "TKNA", + "input_token_decimals": 18, + "input_delta": "0x0000000000000000000000000000000000000000000000000000000000000001", + "input_running_balance": "0x0000000000000000000000000000000000000000000000000000000000000003", + "output_vault_id": "0x02", + "output_token": output_token.to_string(), + "output_token_name": "Token B", + "output_token_symbol": "TKNB", + "output_token_decimals": 6, + "output_delta": "0x00000000fffffffffffffffffffffffffffffffffffffffffffffffffffffffe", + "output_running_balance": "0x0000000000000000000000000000000000000000000000000000000000000001", + "trade_id": format!( + "0x{}{:016x}", + tx_hash.to_string().trim_start_matches("0x"), + 5u64 + ) + }]); + + let callback = create_mock_callback(&trade_json.to_string()); + let exec = JsCallbackExecutor::from_ref(&callback); + let local_db = LocalDb::new(exec); + + let trades = LocalDbTrades::new(&local_db); + let ob_id = OrderbookIdentifier::new(42161, orderbook); + + let result = trades.get_by_tx_hash(&ob_id, tx_hash).await; + + assert!(result.is_ok()); + let trades = result.unwrap(); + assert_eq!(trades.len(), 1); + + let trade = &trades[0]; + assert_eq!(trade.transaction().id(), tx_hash.to_string()); + assert_eq!(trade.orderbook(), orderbook.to_string()); + assert_eq!( + trade + .timestamp() + .unwrap() + .to_string(10) + .unwrap() + .as_string() + .unwrap(), + "1700000000" + ); + } + + #[wasm_bindgen_test] + async fn test_get_by_tx_hash_returns_empty_when_not_found() { + let tx_hash = + b256!("0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"); + let orderbook = address!("0x2222222222222222222222222222222222222222"); + + let callback = create_mock_callback("[]"); + let exec = JsCallbackExecutor::from_ref(&callback); + let local_db = LocalDb::new(exec); + + let trades = LocalDbTrades::new(&local_db); + let ob_id = OrderbookIdentifier::new(42161, orderbook); + + let result = trades.get_by_tx_hash(&ob_id, tx_hash).await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_empty()); + } + } +} diff --git a/crates/common/src/raindex_client/mod.rs b/crates/common/src/raindex_client/mod.rs index a95e1a0dbd..1d6841c9c5 100644 --- a/crates/common/src/raindex_client/mod.rs +++ b/crates/common/src/raindex_client/mod.rs @@ -257,6 +257,8 @@ pub enum RaindexError { SubgraphNotConfigured(String), #[error("Transaction {tx_hash:#x} was not indexed after {attempts} attempts")] TransactionIndexingTimeout { tx_hash: B256, attempts: usize }, + #[error("Trades for transaction {tx_hash:#x} were not indexed after {attempts} attempts")] + TradesIndexingTimeout { tx_hash: B256, attempts: usize }, #[error(transparent)] YamlError(#[from] YamlError), #[error(transparent)] @@ -395,6 +397,11 @@ impl RaindexError { "Timeout waiting for transaction {tx_hash:#x} to be indexed after {attempts} attempts." ) } + RaindexError::TradesIndexingTimeout { tx_hash, attempts } => { + format!( + "Timeout waiting for trades of transaction {tx_hash:#x} to be indexed after {attempts} attempts." + ) + } RaindexError::YamlError(err) => format!( "YAML configuration parsing failed: {}. Check file syntax and structure.", err diff --git a/crates/common/src/raindex_client/trades/get_by_tx.rs b/crates/common/src/raindex_client/trades/get_by_tx.rs new file mode 100644 index 0000000000..53379ac382 --- /dev/null +++ b/crates/common/src/raindex_client/trades/get_by_tx.rs @@ -0,0 +1,432 @@ +use super::RaindexTrade; +use super::*; +use crate::local_db::is_chain_supported_local_db; +use crate::local_db::OrderbookIdentifier; +use crate::raindex_client::local_db::trades::LocalDbTrades; +use alloy::primitives::{Address, B256}; +#[cfg(target_family = "wasm")] +use gloo_timers::future::TimeoutFuture; +use rain_orderbook_subgraph_client::types::Id; +use rain_orderbook_subgraph_client::OrderbookSubgraphClientError; +use std::str::FromStr; +#[cfg(not(target_family = "wasm"))] +use std::time::Duration; +#[cfg(not(target_family = "wasm"))] +use tokio::time::sleep; + +const DEFAULT_TRADES_TX_POLL_ATTEMPTS: usize = 10; +const DEFAULT_TRADES_TX_POLL_INTERVAL_MS: u64 = 1_000; + +#[cfg(target_family = "wasm")] +async fn sleep_ms(ms: u64) { + let delay = ms.min(u32::MAX as u64) as u32; + TimeoutFuture::new(delay).await; +} + +#[cfg(not(target_family = "wasm"))] +async fn sleep_ms(ms: u64) { + sleep(Duration::from_millis(ms)).await; +} + +#[wasm_export] +impl RaindexClient { + #[wasm_export( + js_name = "getTradesForTransaction", + return_description = "Array of trades in the transaction", + unchecked_return_type = "RaindexTrade[]", + preserve_js_class + )] + pub async fn get_trades_for_transaction_wasm_binding( + &self, + #[wasm_export(js_name = "chainId", param_description = "Chain ID for the network")] + chain_id: u32, + #[wasm_export( + js_name = "orderbookAddress", + param_description = "Orderbook contract address", + unchecked_param_type = "Address" + )] + orderbook_address: String, + #[wasm_export( + js_name = "txHash", + param_description = "Transaction hash", + unchecked_param_type = "Hex" + )] + tx_hash: String, + #[wasm_export( + js_name = "maxAttempts", + param_description = "Optional maximum polling attempts before timing out" + )] + max_attempts: Option, + #[wasm_export( + js_name = "intervalMs", + param_description = "Optional polling interval in milliseconds" + )] + interval_ms: Option, + ) -> Result, RaindexError> { + let orderbook_address = Address::from_str(&orderbook_address)?; + let tx_hash = B256::from_str(&tx_hash)?; + self.get_trades_for_transaction( + chain_id, + orderbook_address, + tx_hash, + max_attempts.map(|v| v as usize), + interval_ms.map(|v| v as u64), + ) + .await + } +} +impl RaindexClient { + pub async fn get_trades_for_transaction( + &self, + chain_id: u32, + orderbook_address: Address, + tx_hash: B256, + max_attempts: Option, + interval_ms: Option, + ) -> Result, RaindexError> { + let attempts = max_attempts + .unwrap_or(DEFAULT_TRADES_TX_POLL_ATTEMPTS) + .max(1); + let interval_ms = interval_ms.unwrap_or(DEFAULT_TRADES_TX_POLL_INTERVAL_MS); + let ob_id = OrderbookIdentifier::new(chain_id, orderbook_address); + + if is_chain_supported_local_db(chain_id) { + if let Some(local_db) = self.local_db() { + let local_source = LocalDbTrades::new(&local_db); + for attempt in 1..=attempts { + let trades = local_source.get_by_tx_hash(&ob_id, tx_hash).await?; + if !trades.is_empty() { + return Ok(trades); + } + if attempt < attempts { + sleep_ms(interval_ms).await; + } + } + } + } + + let client = self.get_orderbook_client(orderbook_address)?; + for attempt in 1..=attempts { + match client + .transaction_trades(Id::new(tx_hash.to_string())) + .await + { + Ok(sg_trades) => { + return sg_trades + .into_iter() + .map(|t| RaindexTrade::try_from_sg_trade(chain_id, t)) + .collect(); + } + Err(OrderbookSubgraphClientError::Empty) => { + if attempt < attempts { + sleep_ms(interval_ms).await; + continue; + } + } + Err(e) => return Err(e.into()), + } + } + + Err(RaindexError::TradesIndexingTimeout { tx_hash, attempts }) + } +} + +#[cfg(test)] +mod tests { + #[cfg(not(target_family = "wasm"))] + mod non_wasm { + use super::super::super::*; + use crate::raindex_client::tests::{get_test_yaml, CHAIN_ID_1_ORDERBOOK_ADDRESS}; + use alloy::primitives::{b256, Address, Bytes, U256}; + use httpmock::MockServer; + use rain_orderbook_subgraph_client::utils::float::*; + use serde_json::{json, Value}; + use std::str::FromStr; + + fn sample_trades_response() -> Value { + json!({ + "data": { + "trades": [ + { + "id": "0xabc1", + "tradeEvent": { + "transaction": { + "id": "0x0000000000000000000000000000000000000000000000000000000000000456", + "from": "0x0000000000000000000000000000000000000001", + "blockNumber": "100", + "timestamp": "1700000000" + }, + "sender": "0x0000000000000000000000000000000000000002" + }, + "outputVaultBalanceChange": { + "id": "0xout1", + "__typename": "TradeVaultBalanceChange", + "amount": NEG2, + "newVaultBalance": F0, + "oldVaultBalance": F0, + "vault": { + "id": "0xv1", + "vaultId": "0x01", + "token": { + "id": "0x12e605bc104e93b45e1ad99f9e555f659051c2bb", + "address": "0x12e605bc104e93b45e1ad99f9e555f659051c2bb", + "name": "Staked FLR", + "symbol": "sFLR", + "decimals": "18" + } + }, + "timestamp": "1700000000", + "transaction": { + "id": "0x0000000000000000000000000000000000000000000000000000000000000456", + "from": "0x0000000000000000000000000000000000000001", + "blockNumber": "100", + "timestamp": "1700000000" + }, + "orderbook": { "id": "0x1234567890123456789012345678901234567890" }, + "trade": { "tradeEvent": { "__typename": "TakeOrder" } } + }, + "order": { + "id": "0x0000000000000000000000000000000000000001", + "orderHash": "0x00000000000000000000000000000000000000000000000000000000000abc01" + }, + "inputVaultBalanceChange": { + "id": "0xin1", + "__typename": "TradeVaultBalanceChange", + "amount": F1, + "newVaultBalance": F0, + "oldVaultBalance": F0, + "vault": { + "id": "0xv2", + "vaultId": "0x02", + "token": { + "id": "0x1d80c49bbbcd1c0911346656b529df9e5c2f783d", + "address": "0x1d80c49bbbcd1c0911346656b529df9e5c2f783d", + "name": "Wrapped Flare", + "symbol": "WFLR", + "decimals": "18" + } + }, + "timestamp": "1700000000", + "transaction": { + "id": "0x0000000000000000000000000000000000000000000000000000000000000456", + "from": "0x0000000000000000000000000000000000000001", + "blockNumber": "100", + "timestamp": "1700000000" + }, + "orderbook": { "id": "0x1234567890123456789012345678901234567890" }, + "trade": { "tradeEvent": { "__typename": "TakeOrder" } } + }, + "timestamp": "1700000000", + "orderbook": { "id": "0x1234567890123456789012345678901234567890" } + } + ] + } + }) + } + + fn empty_trades_response() -> Value { + json!({ + "data": { + "trades": [] + } + }) + } + + #[tokio::test] + async fn test_get_trades_for_transaction_found() { + let sg_server = MockServer::start_async().await; + sg_server.mock(|when, then| { + when.path("/sg"); + then.status(200).json_body_obj(&sample_trades_response()); + }); + + let raindex_client = RaindexClient::new( + vec![get_test_yaml( + &sg_server.url("/sg"), + "http://localhost:3000", + "http://localhost:3000", + "http://localhost:3000", + )], + None, + ) + .unwrap(); + + let trades = raindex_client + .get_trades_for_transaction( + 1, + Address::from_str(CHAIN_ID_1_ORDERBOOK_ADDRESS).unwrap(), + b256!("0x0000000000000000000000000000000000000000000000000000000000000456"), + None, + None, + ) + .await + .unwrap(); + + assert_eq!(trades.len(), 1); + let trade = &trades[0]; + assert_eq!( + trade.transaction().id(), + b256!("0x0000000000000000000000000000000000000000000000000000000000000456") + ); + assert_eq!( + trade.order_hash(), + Bytes::from_str( + "0x00000000000000000000000000000000000000000000000000000000000abc01" + ) + .unwrap() + ); + assert_eq!( + trade.orderbook(), + Address::from_str(CHAIN_ID_1_ORDERBOOK_ADDRESS).unwrap() + ); + assert_eq!(trade.timestamp(), U256::from(1700000000u64)); + assert_eq!( + trade.output_vault_balance_change().token().symbol(), + Some("sFLR".to_string()) + ); + assert_eq!( + trade.input_vault_balance_change().token().symbol(), + Some("WFLR".to_string()) + ); + } + + #[tokio::test] + async fn test_get_trades_for_transaction_empty() { + let sg_server = MockServer::start_async().await; + sg_server.mock(|when, then| { + when.path("/sg"); + then.status(200).json_body_obj(&empty_trades_response()); + }); + + let raindex_client = RaindexClient::new( + vec![get_test_yaml( + &sg_server.url("/sg"), + "http://localhost:3000", + "http://localhost:3000", + "http://localhost:3000", + )], + None, + ) + .unwrap(); + + let result = raindex_client + .get_trades_for_transaction( + 1, + Address::from_str(CHAIN_ID_1_ORDERBOOK_ADDRESS).unwrap(), + b256!("0x0000000000000000000000000000000000000000000000000000000000000789"), + Some(1), + Some(10), + ) + .await + .unwrap_err(); + + assert!(matches!( + result, + RaindexError::TradesIndexingTimeout { attempts: 1, .. } + )); + } + + #[tokio::test] + async fn test_get_trades_for_transaction_network_error() { + let sg_server = MockServer::start_async().await; + sg_server.mock(|when, then| { + when.path("/sg"); + then.status(500); + }); + + let raindex_client = RaindexClient::new( + vec![get_test_yaml( + &sg_server.url("/sg"), + "http://localhost:3000", + "http://localhost:3000", + "http://localhost:3000", + )], + None, + ) + .unwrap(); + + let result = raindex_client + .get_trades_for_transaction( + 1, + Address::from_str(CHAIN_ID_1_ORDERBOOK_ADDRESS).unwrap(), + b256!("0x0000000000000000000000000000000000000000000000000000000000000999"), + None, + None, + ) + .await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_get_trades_for_transaction_polling_success() { + let sg_server = MockServer::start_async().await; + sg_server.mock(|when, then| { + when.path("/sg"); + then.status(200).json_body_obj(&sample_trades_response()); + }); + + let raindex_client = RaindexClient::new( + vec![get_test_yaml( + &sg_server.url("/sg"), + "http://localhost:3000", + "http://localhost:3000", + "http://localhost:3000", + )], + None, + ) + .unwrap(); + + let trades = raindex_client + .get_trades_for_transaction( + 1, + Address::from_str(CHAIN_ID_1_ORDERBOOK_ADDRESS).unwrap(), + b256!("0x0000000000000000000000000000000000000000000000000000000000000456"), + Some(5), + Some(10), + ) + .await + .unwrap(); + + assert_eq!(trades.len(), 1); + } + + #[tokio::test] + async fn test_get_trades_for_transaction_timeout() { + let sg_server = MockServer::start_async().await; + sg_server.mock(|when, then| { + when.path("/sg"); + then.status(200).json_body_obj(&empty_trades_response()); + }); + + let raindex_client = RaindexClient::new( + vec![get_test_yaml( + &sg_server.url("/sg"), + "http://localhost:3000", + "http://localhost:3000", + "http://localhost:3000", + )], + None, + ) + .unwrap(); + + let err = raindex_client + .get_trades_for_transaction( + 1, + Address::from_str(CHAIN_ID_1_ORDERBOOK_ADDRESS).unwrap(), + b256!("0x0000000000000000000000000000000000000000000000000000000000000456"), + Some(3), + Some(10), + ) + .await + .unwrap_err(); + + match err { + RaindexError::TradesIndexingTimeout { attempts, .. } => { + assert_eq!(attempts, 3); + } + other => panic!("expected TradesIndexingTimeout, got {other:?}"), + } + } + } +} diff --git a/crates/common/src/raindex_client/trades.rs b/crates/common/src/raindex_client/trades/mod.rs similarity index 99% rename from crates/common/src/raindex_client/trades.rs rename to crates/common/src/raindex_client/trades/mod.rs index fd3c56b147..8b08a767a6 100644 --- a/crates/common/src/raindex_client/trades.rs +++ b/crates/common/src/raindex_client/trades/mod.rs @@ -1,3 +1,5 @@ +mod get_by_tx; + use super::local_db::orders::LocalDbOrders; use super::orders::{OrdersDataSource, SubgraphOrders}; use super::*; diff --git a/crates/js_api/src/gui/mod.rs b/crates/js_api/src/gui/mod.rs index 403feca239..60f9114544 100644 --- a/crates/js_api/src/gui/mod.rs +++ b/crates/js_api/src/gui/mod.rs @@ -36,7 +36,7 @@ use wasm_bindgen_utils::{impl_wasm_traits, prelude::*, wasm_export}; mod deposits; mod field_values; -mod order_operations; +pub mod order_operations; mod select_tokens; mod state_management; mod validation; diff --git a/crates/js_api/src/gui/order_operations.rs b/crates/js_api/src/gui/order_operations.rs index 3f92008fa3..851c796b66 100644 --- a/crates/js_api/src/gui/order_operations.rs +++ b/crates/js_api/src/gui/order_operations.rs @@ -97,14 +97,14 @@ impl_wasm_traits!(ExternalCall); #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Tsify)] #[serde(rename_all = "camelCase")] pub struct DeploymentTransactionArgs { - approvals: Vec, + pub approvals: Vec, #[tsify(type = "string")] - deployment_calldata: Bytes, + pub deployment_calldata: Bytes, #[tsify(type = "string")] - orderbook_address: Address, - chain_id: u32, + pub orderbook_address: Address, + pub chain_id: u32, #[tsify(type = "ExternalCall | undefined")] - emit_meta_call: Option, + pub emit_meta_call: Option, } impl_wasm_traits!(DeploymentTransactionArgs); diff --git a/crates/subgraph/src/orderbook_client/mod.rs b/crates/subgraph/src/orderbook_client/mod.rs index c26a72993c..e096058945 100644 --- a/crates/subgraph/src/orderbook_client/mod.rs +++ b/crates/subgraph/src/orderbook_client/mod.rs @@ -6,7 +6,10 @@ use crate::types::order::{ SgBatchOrderDetailQuery, SgBatchOrderDetailQueryVariables, SgOrderDetailByHashQuery, SgOrderDetailByHashQueryVariables, SgOrderDetailByIdQuery, SgOrderIdList, SgOrdersListQuery, }; -use crate::types::order_trade::{SgOrderTradeDetailQuery, SgOrderTradesListQuery}; +use crate::types::order_trade::{ + SgOrderTradeDetailQuery, SgOrderTradesListQuery, SgTransactionTradesQuery, + TransactionTradesVariables, +}; use crate::types::remove_order::{ SgTransactionRemoveOrdersQuery, TransactionRemoveOrdersVariables, }; diff --git a/crates/subgraph/src/orderbook_client/transaction.rs b/crates/subgraph/src/orderbook_client/transaction.rs index 34dbcf549e..ee3845a3d8 100644 --- a/crates/subgraph/src/orderbook_client/transaction.rs +++ b/crates/subgraph/src/orderbook_client/transaction.rs @@ -34,6 +34,25 @@ impl OrderbookSubgraphClient { Ok(data.add_orders) } + pub async fn transaction_trades( + &self, + id: Id, + ) -> Result, OrderbookSubgraphClientError> { + let data = self + .query::( + TransactionTradesVariables { + id: id.inner().to_string(), + }, + ) + .await?; + + if data.trades.is_empty() { + return Err(OrderbookSubgraphClientError::Empty); + } + + Ok(data.trades) + } + /// Fetch all remove orders for a given transaction pub async fn transaction_remove_orders( &self, @@ -60,7 +79,9 @@ mod tests { use super::*; use crate::types::common::{ SgAddOrderWithOrder, SgBigInt, SgBytes, SgErc20, SgOrder, SgOrderbook, - SgRemoveOrderWithOrder, SgTransaction, SgVault, + SgRemoveOrderWithOrder, SgTrade, SgTradeEvent, SgTradeEventTypename, SgTradeRef, + SgTradeStructPartialOrder, SgTradeVaultBalanceChange, SgTransaction, SgVault, + SgVaultBalanceChangeVault, }; use crate::utils::float::*; use cynic::Id; @@ -366,4 +387,125 @@ mod tests { Err(OrderbookSubgraphClientError::CynicClientError(_)) )); } + + fn default_sg_trade(tx_id_str: &str, trade_id_str: &str) -> SgTrade { + let tx = default_sg_transaction(tx_id_str); + SgTrade { + id: SgBytes(trade_id_str.to_string()), + trade_event: SgTradeEvent { + transaction: tx.clone(), + sender: SgBytes("0xsender_default".to_string()), + }, + output_vault_balance_change: SgTradeVaultBalanceChange { + id: SgBytes(format!("{}_out_vbc", trade_id_str)), + __typename: "TradeVaultBalanceChange".to_string(), + amount: SgBytes(F0.as_hex()), + new_vault_balance: SgBytes(F0.as_hex()), + old_vault_balance: SgBytes(F0.as_hex()), + vault: SgVaultBalanceChangeVault { + id: SgBytes("0xvault_out".to_string()), + vault_id: SgBytes("0x01".to_string()), + token: default_sg_erc20("out"), + }, + timestamp: SgBigInt("1600000000".to_string()), + transaction: tx.clone(), + orderbook: SgOrderbook { + id: SgBytes("0xorderbook_default".to_string()), + }, + trade: SgTradeRef { + trade_event: SgTradeEventTypename { + __typename: "TakeOrder".to_string(), + }, + }, + }, + order: SgTradeStructPartialOrder { + id: SgBytes("0xorder_default".to_string()), + order_hash: SgBytes("0xorderhash_default".to_string()), + }, + input_vault_balance_change: SgTradeVaultBalanceChange { + id: SgBytes(format!("{}_in_vbc", trade_id_str)), + __typename: "TradeVaultBalanceChange".to_string(), + amount: SgBytes(F0.as_hex()), + new_vault_balance: SgBytes(F0.as_hex()), + old_vault_balance: SgBytes(F0.as_hex()), + vault: SgVaultBalanceChangeVault { + id: SgBytes("0xvault_in".to_string()), + vault_id: SgBytes("0x02".to_string()), + token: default_sg_erc20("in"), + }, + timestamp: SgBigInt("1600000000".to_string()), + transaction: tx.clone(), + orderbook: SgOrderbook { + id: SgBytes("0xorderbook_default".to_string()), + }, + trade: SgTradeRef { + trade_event: SgTradeEventTypename { + __typename: "TakeOrder".to_string(), + }, + }, + }, + timestamp: SgBigInt("1600000000".to_string()), + orderbook: SgOrderbook { + id: SgBytes("0xorderbook_default".to_string()), + }, + } + } + + #[tokio::test] + async fn test_transaction_trades_found() { + let sg_server = MockServer::start_async().await; + let client = setup_client(&sg_server); + let tx_id_str = "0xtx_trades_1"; + let tx_id = Id::new(tx_id_str); + let expected_trades = vec![ + default_sg_trade(tx_id_str, "0xtrade1"), + default_sg_trade(tx_id_str, "0xtrade2"), + ]; + + sg_server.mock(|when, then| { + when.method(POST).path("/"); + then.status(200) + .json_body(json!({"data": {"trades": expected_trades}})); + }); + + let result = client.transaction_trades(tx_id).await; + assert!(result.is_ok(), "Result was: {:?}", result); + let trades = result.unwrap(); + assert_eq!(trades.len(), 2); + assert_eq!(trades[0].id.0, "0xtrade1"); + assert_eq!(trades[1].id.0, "0xtrade2"); + } + + #[tokio::test] + async fn test_transaction_trades_empty_result_returns_error() { + let sg_server = MockServer::start_async().await; + let client = setup_client(&sg_server); + let tx_id = Id::new("0xtx_trades_empty"); + + sg_server.mock(|when, then| { + when.method(POST).path("/"); + then.status(200).json_body(json!({"data": {"trades": []}})); + }); + + let result = client.transaction_trades(tx_id).await; + assert!(matches!(result, Err(OrderbookSubgraphClientError::Empty))); + } + + #[tokio::test] + async fn test_transaction_trades_network_error() { + let sg_server = MockServer::start_async().await; + let client = setup_client(&sg_server); + let tx_id = Id::new("0xtx_trades_network_err"); + + sg_server.mock(|when, then| { + when.method(POST).path("/"); + then.status(500); + }); + + let result = client.transaction_trades(tx_id).await; + assert!(matches!( + result, + Err(OrderbookSubgraphClientError::CynicClientError(_)) + )); + } } diff --git a/crates/subgraph/src/types/order_trade.rs b/crates/subgraph/src/types/order_trade.rs index bb29d45e97..10dea08544 100644 --- a/crates/subgraph/src/types/order_trade.rs +++ b/crates/subgraph/src/types/order_trade.rs @@ -33,3 +33,17 @@ pub struct SgOrderTradeDetailQuery { #[cfg_attr(target_family = "wasm", tsify(optional))] pub trade: Option, } + +#[derive(cynic::QueryVariables, Debug)] +pub struct TransactionTradesVariables { + pub id: String, +} + +#[derive(cynic::QueryFragment, Debug, Serialize)] +#[cynic(graphql_type = "Query", variables = "TransactionTradesVariables")] +#[cfg_attr(target_family = "wasm", derive(Tsify))] +#[serde(rename_all = "camelCase")] +pub struct SgTransactionTradesQuery { + #[arguments(where: { tradeEvent_: { transaction: $id } })] + pub trades: Vec, +}