Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/common/src/local_db/query/create_tables/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,11 @@ CREATE INDEX idx_order_ios_token ON order_ios(chain_id, orderbook_address, token

CREATE INDEX idx_take_orders_owner ON take_orders(chain_id, orderbook_address, order_owner);
CREATE INDEX idx_take_orders_block ON take_orders(chain_id, orderbook_address, block_number);
CREATE INDEX idx_take_orders_sender_time ON take_orders(chain_id, orderbook_address, sender, block_timestamp DESC, block_number DESC, log_index DESC);

CREATE INDEX idx_clear_events_alice_bob ON clear_v3_events(chain_id, orderbook_address, alice_order_hash, bob_order_hash);
CREATE INDEX idx_clear_events_block ON clear_v3_events(chain_id, orderbook_address, block_number);
CREATE INDEX idx_clear_events_sender_time ON clear_v3_events(chain_id, orderbook_address, sender, block_timestamp DESC, block_number DESC, log_index DESC);
CREATE INDEX idx_clear_alice_vaults ON clear_v3_events(chain_id, orderbook_address, alice_input_vault_id, alice_output_vault_id);
CREATE INDEX idx_clear_bob_vaults ON clear_v3_events(chain_id, orderbook_address, bob_input_vault_id, bob_output_vault_id);

Expand Down
52 changes: 52 additions & 0 deletions crates/common/src/local_db/query/fetch_trades/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ const TAKE_ORDERS_CHAIN_IDS_CLAUSE: &str = "/*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/";
const TAKE_ORDERS_CHAIN_IDS_CLAUSE_BODY: &str = "AND t.chain_id IN ({list})";
const TAKE_ORDERS_ORDERBOOKS_CLAUSE: &str = "/*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/";
const TAKE_ORDERS_ORDERBOOKS_CLAUSE_BODY: &str = "AND t.orderbook_address IN ({list})";
const TAKE_ORDERS_TAKERS_CLAUSE: &str = "/*TAKE_ORDERS_TAKERS_CLAUSE*/";
const TAKE_ORDERS_TAKERS_CLAUSE_BODY: &str = "AND t.sender IN ({list})";

const CLEAR_EVENTS_CHAIN_IDS_CLAUSE: &str = "/*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/";
const CLEAR_EVENTS_CHAIN_IDS_CLAUSE_BODY: &str = "AND c.chain_id IN ({list})";
const CLEAR_EVENTS_ORDERBOOKS_CLAUSE: &str = "/*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/";
const CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY: &str = "AND c.orderbook_address IN ({list})";
const CLEAR_EVENTS_TAKERS_CLAUSE: &str = "/*CLEAR_EVENTS_TAKERS_CLAUSE*/";
const CLEAR_EVENTS_TAKERS_CLAUSE_BODY: &str = "AND c.sender IN ({list})";
const OWNERS_CLAUSE: &str = "/*OWNERS_CLAUSE*/";
const OWNERS_CLAUSE_BODY: &str = "AND tws.order_owner IN ({list})";
const ORDER_HASH_CLAUSE: &str = "/*ORDER_HASH_CLAUSE*/";
Expand Down Expand Up @@ -42,6 +46,7 @@ pub struct FetchTradesArgs {
pub chain_ids: Vec<u32>,
pub orderbook_addresses: Vec<Address>,
pub owners: Vec<Address>,
pub takers: Vec<Address>,
pub order_hash: Option<B256>,
pub tokens: FetchTradesTokensFilter,
pub time_filter: TimeFilter,
Expand Down Expand Up @@ -82,6 +87,20 @@ pub fn build_fetch_trades_stmt(args: &FetchTradesArgs) -> Result<SqlStatement, S
CLEAR_EVENTS_ORDERBOOKS_CLAUSE_BODY,
orderbooks_iter(),
)?;
let mut takers = args.takers.clone();
takers.sort();
takers.dedup();
let takers_iter = || takers.iter().cloned().map(SqlValue::from);
stmt.bind_list_clause(
TAKE_ORDERS_TAKERS_CLAUSE,
TAKE_ORDERS_TAKERS_CLAUSE_BODY,
takers_iter(),
)?;
stmt.bind_list_clause(
CLEAR_EVENTS_TAKERS_CLAUSE,
CLEAR_EVENTS_TAKERS_CLAUSE_BODY,
takers_iter(),
)?;
let mut owners = args.owners.clone();
owners.sort();
owners.dedup();
Expand Down Expand Up @@ -281,6 +300,39 @@ mod tests {
assert_eq!(stmt.params[1], SqlValue::Text(hex::encode_prefixed(output)));
}

#[test]
fn builds_with_taker_filters() {
let taker_a = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
let taker_b = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
let stmt = build_fetch_trades_stmt(&FetchTradesArgs {
takers: vec![taker_b, taker_a, taker_a],
..Default::default()
})
.unwrap();

assert!(stmt.sql.contains("t.sender IN (?1, ?2)"));
assert!(stmt.sql.contains("c.sender IN (?3, ?4)"));
assert!(!stmt.sql.contains("tws.transaction_sender IN"));
assert!(!stmt.sql.contains(TAKE_ORDERS_TAKERS_CLAUSE));
assert!(!stmt.sql.contains(CLEAR_EVENTS_TAKERS_CLAUSE));
assert_eq!(
stmt.params[0],
SqlValue::Text(hex::encode_prefixed(taker_a))
);
assert_eq!(
stmt.params[1],
SqlValue::Text(hex::encode_prefixed(taker_b))
);
assert_eq!(
stmt.params[2],
SqlValue::Text(hex::encode_prefixed(taker_a))
);
assert_eq!(
stmt.params[3],
SqlValue::Text(hex::encode_prefixed(taker_b))
);
}

#[test]
fn builds_with_same_token_as_either_side_filter() {
let token = address!("0x1111111111111111111111111111111111111111");
Expand Down
2 changes: 2 additions & 0 deletions crates/common/src/local_db/query/fetch_trades/query.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ matching_take_orders AS (
WHERE 1 = 1
/*TAKE_ORDERS_CHAIN_IDS_CLAUSE*/
/*TAKE_ORDERS_ORDERBOOKS_CLAUSE*/
/*TAKE_ORDERS_TAKERS_CLAUSE*/
),
matching_clears AS (
SELECT
Expand All @@ -43,6 +44,7 @@ matching_clears AS (
WHERE 1 = 1
/*CLEAR_EVENTS_CHAIN_IDS_CLAUSE*/
/*CLEAR_EVENTS_ORDERBOOKS_CLAUSE*/
/*CLEAR_EVENTS_TAKERS_CLAUSE*/
),
take_trades AS (
SELECT
Expand Down
100 changes: 98 additions & 2 deletions crates/common/src/raindex_client/local_db/query/fetch_trades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,117 @@ use crate::local_db::query::fetch_trades::{
build_fetch_trades_count_stmt, build_fetch_trades_stmt, FetchTradesArgs,
};
use crate::local_db::query::{LocalDbQueryError, LocalDbQueryExecutor};
use crate::utils::timing::Timing;

pub async fn fetch_trades<E: LocalDbQueryExecutor + ?Sized>(
exec: &E,
args: FetchTradesArgs,
) -> Result<Vec<LocalDbOrderTrade>, LocalDbQueryError> {
let started = Timing::now();
let chain_ids_count = args.chain_ids.len();
let orderbooks_count = args.orderbook_addresses.len();
let owners_count = args.owners.len();
let takers_count = args.takers.len();
let input_tokens_count = args.tokens.inputs.len();
let output_tokens_count = args.tokens.outputs.len();
let has_order_hash = args.order_hash.is_some();
let has_time_filter = args.time_filter.start.is_some() || args.time_filter.end.is_some();
let page = args.pagination.page;
let page_size = args.pagination.page_size;
let stmt = build_fetch_trades_stmt(&args)?;
exec.query_json(&stmt).await
match exec.query_json::<Vec<LocalDbOrderTrade>>(&stmt).await {
Ok(trades) => {
tracing::info!(
chain_ids_count,
orderbooks_count,
owners_count,
takers_count,
input_tokens_count,
output_tokens_count,
has_order_hash,
has_time_filter,
page,
page_size,
params_count = stmt.params().len(),
rows = trades.len(),
duration_ms = started.elapsed_ms(),
"local DB getTrades fetch completed"
);
Ok(trades)
}
Err(err) => {
tracing::error!(
chain_ids_count,
orderbooks_count,
owners_count,
takers_count,
input_tokens_count,
output_tokens_count,
has_order_hash,
has_time_filter,
page,
page_size,
params_count = stmt.params().len(),
duration_ms = started.elapsed_ms(),
error = %err,
"local DB getTrades fetch failed"
);
Err(err)
}
}
}

pub async fn fetch_trades_count<E: LocalDbQueryExecutor + ?Sized>(
exec: &E,
args: FetchTradesArgs,
) -> Result<Vec<LocalDbTradeCountRow>, LocalDbQueryError> {
let started = Timing::now();
let chain_ids_count = args.chain_ids.len();
let orderbooks_count = args.orderbook_addresses.len();
let owners_count = args.owners.len();
let takers_count = args.takers.len();
let input_tokens_count = args.tokens.inputs.len();
let output_tokens_count = args.tokens.outputs.len();
let has_order_hash = args.order_hash.is_some();
let has_time_filter = args.time_filter.start.is_some() || args.time_filter.end.is_some();
let stmt = build_fetch_trades_count_stmt(&args)?;
exec.query_json(&stmt).await
match exec.query_json::<Vec<LocalDbTradeCountRow>>(&stmt).await {
Ok(rows) => {
tracing::info!(
chain_ids_count,
orderbooks_count,
owners_count,
takers_count,
input_tokens_count,
output_tokens_count,
has_order_hash,
has_time_filter,
params_count = stmt.params().len(),
rows = rows.len(),
trade_count = rows.first().map(|row| row.trade_count),
duration_ms = started.elapsed_ms(),
"local DB getTrades count completed"
);
Ok(rows)
}
Err(err) => {
tracing::error!(
chain_ids_count,
orderbooks_count,
owners_count,
takers_count,
input_tokens_count,
output_tokens_count,
has_order_hash,
has_time_filter,
params_count = stmt.params().len(),
duration_ms = started.elapsed_ms(),
error = %err,
"local DB getTrades count failed"
);
Err(err)
}
}
}

#[cfg(all(test, target_family = "wasm"))]
Expand Down
26 changes: 25 additions & 1 deletion crates/common/src/raindex_client/trades/get_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::local_db::query::fetch_trades::{
use crate::raindex_client::local_db::query::fetch_trades::{fetch_trades, fetch_trades_count};
use crate::utils::timing::Timing;
use rain_orderbook_subgraph_client::types::common::{
SgBigInt, SgBytes, SgTrade, SgTradeOrderFilter, SgTradesListQueryFilters,
SgBigInt, SgBytes, SgTrade, SgTradeEventFilter, SgTradeOrderFilter, SgTradesListQueryFilters,
SgTradesTokensFilterArgs,
};
use rain_orderbook_subgraph_client::MultiOrderbookSubgraphClient;
Expand All @@ -29,6 +29,8 @@ impl_wasm_traits!(GetTradesTokenFilter);
pub struct GetTradesFilters {
#[tsify(optional, type = "Address[]")]
pub owners: Vec<Address>,
#[tsify(optional, type = "Address[]")]
pub takers: Vec<Address>,
#[tsify(optional, type = "Hex")]
pub order_hash: Option<B256>,
#[tsify(optional)]
Expand Down Expand Up @@ -97,6 +99,16 @@ impl From<GetTradesFilters> for SgTradesListQueryFilters {
});
}

if !filters.takers.is_empty() {
sg_filters.trade_event_ = Some(SgTradeEventFilter {
sender_in: filters
.takers
.into_iter()
.map(|taker| SgBytes(taker.to_string()))
.collect(),
});
}

sg_filters
}
}
Expand Down Expand Up @@ -199,6 +211,7 @@ impl RaindexClient {
.as_ref()
.and_then(|tokens| tokens.outputs.as_ref())
.map_or(0, Vec::len);
let takers_count = filters.takers.len();
let orderbooks_count = filters.orderbook_addresses.as_ref().map_or(0, Vec::len);

let mut all_trades = Vec::new();
Expand Down Expand Up @@ -231,6 +244,7 @@ impl RaindexClient {
chain_ids: local_ids.clone(),
orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(),
owners: filters.owners.clone(),
takers: filters.takers.clone(),
order_hash: filters.order_hash,
tokens: local_tokens.clone(),
time_filter: filters.time_filter.clone().unwrap_or_default(),
Expand Down Expand Up @@ -260,6 +274,7 @@ impl RaindexClient {
chain_ids: local_ids,
orderbook_addresses: filters.orderbook_addresses.clone().unwrap_or_default(),
owners: filters.owners.clone(),
takers: filters.takers.clone(),
order_hash: filters.order_hash,
tokens: local_tokens,
time_filter: filters.time_filter.clone().unwrap_or_default(),
Expand Down Expand Up @@ -383,6 +398,7 @@ impl RaindexClient {
local_chain_ids_count,
subgraph_chain_ids_count,
owners_count = filters.owners.len(),
takers_count,
orderbooks_count,
has_order_hash = filters.order_hash.is_some(),
has_time_filter = filters.time_filter.is_some(),
Expand Down Expand Up @@ -442,12 +458,14 @@ mod tests {
#[test]
fn maps_trade_filters_to_subgraph_filters() {
let owner = address!("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
let taker = address!("0xcccccccccccccccccccccccccccccccccccccccc");
let orderbook = address!("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
let order_hash =
b256!("0x0000000000000000000000000000000000000000000000000000000000abcdef");

let filters: SgTradesListQueryFilters = GetTradesFilters {
owners: vec![owner],
takers: vec![taker],
order_hash: Some(order_hash),
orderbook_addresses: Some(vec![orderbook]),
time_filter: Some(TimeFilter {
Expand All @@ -464,6 +482,11 @@ mod tests {
order_filter.order_hash,
Some(SgBytes(order_hash.to_string()))
);
let trade_event_filter = filters.trade_event_.unwrap();
assert_eq!(
trade_event_filter.sender_in,
vec![SgBytes(taker.to_string())]
);
assert_eq!(filters.orderbook_in, vec![format!("{orderbook:#x}")]);
assert_eq!(filters.timestamp_gte, Some(SgBigInt("10".to_string())));
assert_eq!(filters.timestamp_lte, Some(SgBigInt("20".to_string())));
Expand Down Expand Up @@ -512,6 +535,7 @@ mod tests {
assert!(filters.or.is_none());
assert!(filters.input_vault_balance_change_.is_none());
assert!(filters.output_vault_balance_change_.is_none());
assert!(filters.trade_event_.is_none());
}

fn test_sg_trade(input_token: Address, output_token: Address) -> SgTrade {
Expand Down
10 changes: 10 additions & 0 deletions crates/subgraph/src/types/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ pub struct SgTradeVaultBalanceChangeTokenFilter {
pub vault_: Option<SgTradeVaultTokenFilter>,
}

#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)]
#[cynic(graphql_type = "TradeEvent_filter")]
pub struct SgTradeEventFilter {
#[cynic(rename = "sender_in", skip_serializing_if = "Vec::is_empty")]
pub sender_in: Vec<SgBytes>,
}

#[derive(cynic::InputObject, Debug, Clone, Tsify, Default)]
#[cynic(graphql_type = "Trade_filter")]
pub struct SgTradesListQueryFilters {
Expand All @@ -198,6 +205,9 @@ pub struct SgTradesListQueryFilters {
)]
#[cfg_attr(target_family = "wasm", tsify(optional))]
pub output_vault_balance_change_: Option<SgTradeVaultBalanceChangeTokenFilter>,
#[cynic(rename = "tradeEvent_", skip_serializing_if = "Option::is_none")]
#[cfg_attr(target_family = "wasm", tsify(optional))]
pub trade_event_: Option<SgTradeEventFilter>,
#[cynic(rename = "or", skip_serializing_if = "Option::is_none")]
#[cfg_attr(target_family = "wasm", tsify(optional))]
pub or: Option<Vec<SgTradesListQueryFilters>>,
Expand Down
Loading