driver: parallelize unsupported order detection#4347
driver: parallelize unsupported order detection#4347metalurgical wants to merge 19 commits intocowprotocol:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the competition logic to parallelize unsupported order detection by updating the risk detector API to return a set of UIDs instead of modifying the auction in place. A critical logic error was identified in the new unsupported_order_uids method where orders flagged as unsupported by metrics are filtered out of the processing loop but never added to the removal set, effectively bypassing the intended filtering logic.
|
@jmg-duarte This is minimal, easily reviewable and I think in line with what is expected for resolving the linked issue. Just finishing off what I started. |
16e3345 to
dcfb3b8
Compare
a1b49e8 to
e5919f9
Compare
Make unsupported order detection read-only and execute it in parallel with sorting and data fetching. Apply filtering after update_orders to preserve existing ordering.
e5919f9 to
8ae2e9b
Compare
|
@AryanGodara Thank you for the comments. It should be ready to be reviewed again. Excuse the force push, had to rebase a branch in to ensure it was working and then rebase it out again. |
There was a problem hiding this comment.
Code Review
This pull request refactors the risk detection logic by introducing a SellQualityDetector trait to decouple the simulation detector and improve testability. It updates the Competition and Quote domains to use the refactored filter_unsupported_orders_in_auction and unsupported_order_uids methods, enabling in-place filtering of unsupported orders. Additionally, comprehensive unit tests were added for the Detector logic. I have no feedback to provide as the review comments provided did not meet the high severity threshold required by the style guide.
AryanGodara
left a comment
There was a problem hiding this comment.
Description nit: "Run unsupported detection in parallel in solve()" fit the original version, but not entirely accurate imo after the prev commits
The real wins here are the read-only unsupported_order_uids API, so quote.rs can reuse detection without faking an Auction, and the SellQualityDetector trait that enables the new tests. Worth surfacing those in the description instead.
otherwise lgtm
Yes, this is true, with the latest commits it is effectively concurrent with liquidity fetching. Edit: I have updated this to use |
jmg-duarte
left a comment
There was a problem hiding this comment.
I think the diff is going in a much better direction now, the core logic changes are much simpler and the separation you did made me think; good job
Please review all the comments before starting coding, there's one in specific I'd appreciate we discuss as it might be possible to separate out of the PR; and make this an incremental change 🚀
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn without_unsupported_orders_filters_unsupported_orders_and_flashloans() { |
There was a problem hiding this comment.
can't we just break this big test into multiple separate ones? im ok with repeated setup, the combinations here get a bit messy
you could do each case separate like "metrics bad" "token bad", etc
also the flashloan test is only useful for orders that would actually work if the flashloan was supported, otherwise you cant verify the order wasnt separated for other reason
There was a problem hiding this comment.
Yes that can be done.
Though I'd then wrap the setup code into a helper so it isn't duplicated everywhere.
The flashloan is still useful here to show it gets filtered out, but yes I do agree it needs additional comments to this effect.
There was a problem hiding this comment.
Setup code has been wrapped into helpers.
Have implemented singular tests in addition to the mixed case test.
A mixed case test is still useful to have here, have added comments to aid readability.
| // Check if uid is unsupported in metrics | ||
| if matches!( | ||
| self.metrics | ||
| .as_ref() | ||
| .map(|m| m.get_quality(&order.uid, now)), | ||
| Some(Quality::Unsupported) | ||
| ) { | ||
| return Some(order.uid); | ||
| } | ||
| let sell = self.get_token_quality(order.sell.token, now); | ||
| let buy = self.get_token_quality(order.buy.token, now); | ||
| match (sell, buy) { | ||
| // both tokens supported => keep order | ||
| (Quality::Supported, Quality::Supported) => Some(order), | ||
| (Quality::Supported, Quality::Supported) => None, | ||
| // at least 1 token unsupported => drop order | ||
| (Quality::Unsupported, _) | (_, Quality::Unsupported) => { | ||
| removed_uids.push(order.uid); | ||
| None | ||
| } | ||
| (Quality::Unsupported, _) | (_, Quality::Unsupported) => Some(order.uid), | ||
| // sell token quality is unknown => keep order if token is supported | ||
| (Quality::Unknown, _) => { | ||
| let Some(detector) = &self.simulation_detector else { | ||
| // we can't determine quality => assume order is good | ||
| return Some(order); | ||
| return None; | ||
| }; | ||
| let check_tokens_fut = async move { | ||
| let quality = detector.determine_sell_token_quality(&order, now).await; | ||
| (order, quality) | ||
| let quality = detector.determine_sell_token_quality(order, now).await; | ||
| (order.uid, quality) | ||
| }; | ||
| token_quality_checks.push(check_tokens_fut); | ||
| None | ||
| } | ||
| // buy token quality is unknown => keep order (because we can't | ||
| // determine quality and assume it's good) | ||
| (_, Quality::Unknown) => Some(order), | ||
| (_, Quality::Unknown) => None, | ||
| } | ||
| }) |
There was a problem hiding this comment.
the way you separated the unsupported_order_uids from the retain made the diff much simpler, but now we iterate twice over the orders (one to collect the orders to be removed, another to remove them), if we extract this logic to analyze a single order, won't we be able to perform just the retain while analyzing each order?
comparing to the previous code, that approach change alone (of taking auction orders instead of the full auction to filter just the orders) seems to make it easier to
- follow the code
- make filtering clearer
- unlock some optimizations like avoiding multiple passes over the array
- make testing easier too since you can check single orders
let me know your thoughts before handling this comment
There was a problem hiding this comment.
iterate twice over the orders (one to collect the orders to be removed, another to remove them),
if we extract this logic to analyze a single order, won't we be able to perform just the retain while analyzing each order?
Yes, could just to do it in filter_unsupported_orders_in_auction_orders(&self, orders: &mut Vec<Order>) without having to go through unsupported_order_uids(&self, orders: &[Order]).
I don't think this will reduce clarity by doing so?
The added tests would be unaffected as well.
Edit: It will very much look like the original code then though. Loop over the orders once, building a replacement set and then doing *orders = replacement. The filter().filter_map() can then also just be a normal for loop which may be clearer
There was a problem hiding this comment.
Have used a for loop here, it is slightly less idiomatic but of the same complexity/performance and easier to read.
It also now only does one pass, flashloans inclusive.
There was a problem hiding this comment.
it is slightly less idiomatic
Readability > idiomatic
There was a problem hiding this comment.
I'd appreciate you stopped closing these out when its supposed to be open for discussion.
There was a reason I stated:
Please review all the comments before starting coding, there's one in specific I'd appreciate we discuss as it might be possible to separate out of the PR; and make this an incremental change 🚀
Otherwise, we keep chasing a moving goal. I'm trying to discuss an idea to ease the coding burden and you keep writing code that I need to keep reviewing.
And let me remember you that all this still needs to be measured.
There was a problem hiding this comment.
Understood, please continue.
There was a problem hiding this comment.
Reviewing your code before the for loop with a std::mem::take (which is great footgun as it's a borrowed vector; before it took ownership over the auction) made me notice that we can probably split this PR further to make everything easier for everyone
- Make filter_unsupported_orders_in_auction take
&mut Vec<Order>instead of the whole auction + add the tests — no fancy async, just simple changes to create assurance that what comes next will work - Then we could make the
filter_mapinsidefilter_unsupported_orders_in_auctiona standalone function that returns an OrderQuality enum; using that + whether or not there's a detector we can split the collection of futures into one that doesn't even create a futures unordered and another that does, making the code cleaner
I drafted this
Apply on top of 6045c8b
diff --git a/crates/driver/src/domain/competition/risk_detector/mod.rs b/crates/driver/src/domain/competition/risk_detector/mod.rs
index ca33e0d28..5bf54f709 100644
--- a/crates/driver/src/domain/competition/risk_detector/mod.rs
+++ b/crates/driver/src/domain/competition/risk_detector/mod.rs
@@ -18,12 +18,11 @@
use {
crate::domain::competition::{
- Order,
- order::Uid,
- risk_detector::bad_tokens::simulation::SellQualityDetector,
+ Order, order::Uid, risk_detector::bad_tokens::simulation::SellQualityDetector,
},
eth_domain_types as eth,
- futures::{StreamExt, stream::FuturesUnordered},
+ futures::{FutureExt, StreamExt, stream::FuturesUnordered},
+ itertools::{Either, Itertools},
std::{
collections::{HashMap, HashSet},
fmt,
@@ -64,6 +63,12 @@ pub struct Detector {
metrics: Option<bad_orders::metrics::Detector>,
}
+enum OrderQuality {
+ Bad,
+ Good,
+ Unknown,
+}
+
impl Detector {
/// Hardcodes tokens as (un)supported based on the provided config. This has
/// the highest priority when looking up a token's quality.
@@ -114,57 +119,103 @@ impl Detector {
}
}
- /// Returns a set of orders uids for orders that are found to be
- /// unsupported.
- pub async fn unsupported_order_uids(&self, orders: &[Order]) -> HashSet<Uid> {
- let now = Instant::now();
- let mut token_quality_checks = FuturesUnordered::new();
+ fn qualify_order(&self, order: &Order, now: Instant) -> OrderQuality {
+ // Check if uid is unsupported in metrics
+ if matches!(
+ self.metrics
+ .as_ref()
+ .map(|m| m.get_quality(&order.uid, now)),
+ Some(Quality::Unsupported)
+ ) {
+ return OrderQuality::Bad;
+ }
+
+ let sell = self.get_token_quality(order.sell.token, now);
+ let buy = self.get_token_quality(order.buy.token, now);
+ match (sell, buy) {
+ // both tokens supported => keep order
+ (Quality::Supported, Quality::Supported) => OrderQuality::Good,
+ // at least 1 token unsupported => drop order
+ (Quality::Unsupported, _) | (_, Quality::Unsupported) => OrderQuality::Bad,
+ // sell token quality is unknown => keep order if token is supported
+ (Quality::Unknown, _) if self.simulation_detector.is_some() => {
+ // we can't determine quality => assume order is good
+ OrderQuality::Good
+ }
+ (Quality::Unknown, _) => OrderQuality::Unknown,
+ // buy token quality is unknown => keep order (because we can't
+ // determine quality and assume it's good)
+ (_, Quality::Unknown) => OrderQuality::Good,
+ }
+ }
- let mut removed_uids: HashSet<Uid> = orders
+ fn without_sim_detector(&self, orders: &[Order]) -> HashSet<Uid> {
+ let now = Instant::now();
+ orders
.iter()
- .filter_map(|order| {
- // Check if uid is unsupported in metrics
- if matches!(
- self.metrics
- .as_ref()
- .map(|m| m.get_quality(&order.uid, now)),
- Some(Quality::Unsupported)
- ) {
- return Some(order.uid);
- }
- let sell = self.get_token_quality(order.sell.token, now);
- let buy = self.get_token_quality(order.buy.token, now);
- match (sell, buy) {
- // both tokens supported => keep order
- (Quality::Supported, Quality::Supported) => None,
- // at least 1 token unsupported => drop order
- (Quality::Unsupported, _) | (_, Quality::Unsupported) => Some(order.uid),
- // sell token quality is unknown => keep order if token is supported
- (Quality::Unknown, _) => {
- let Some(detector) = &self.simulation_detector else {
- // we can't determine quality => assume order is good
- return None;
- };
- let check_tokens_fut = async move {
- let quality = detector.determine_sell_token_quality(order, now).await;
- (order.uid, quality)
- };
- token_quality_checks.push(check_tokens_fut);
- None
- }
- // buy token quality is unknown => keep order (because we can't
- // determine quality and assume it's good)
- (_, Quality::Unknown) => None,
- }
+ .filter(|order| {
+ let order_quality = self.qualify_order(order, now);
+ // Without a detector we assume the unknown quality orders are good
+ matches!(order_quality, OrderQuality::Good | OrderQuality::Unknown)
})
- .collect();
+ .map(|order| order.uid)
+ .collect()
+ }
+
+ async fn using_simulation_detector(
+ &self,
+ orders: &[Order],
+ detector: &Box<dyn SellQualityDetector>,
+ ) -> HashSet<Uid> {
+ let now = Instant::now();
+
+ let (mut removed_uids, mut token_quality_checks): (HashSet<Uid>, FuturesUnordered<_>) =
+ orders
+ .iter()
+ .filter_map(|order| {
+ let order_quality = self.qualify_order(order, now);
+ match order_quality {
+ OrderQuality::Good => None,
+ OrderQuality::Bad => Some((order, true)),
+ OrderQuality::Unknown => Some((order, false)),
+ }
+ })
+ .partition_map(|(order, quality_known)| {
+ if quality_known {
+ Either::Left(order.uid)
+ } else {
+ Either::Right(
+ async move {
+ let quality =
+ detector.determine_sell_token_quality(order, now).await;
+ (order.uid, quality)
+ }
+ .boxed(),
+ )
+ }
+ });
while let Some((uid, quality)) = token_quality_checks.next().await {
- if quality != Quality::Supported {
+ if !matches!(quality, Quality::Supported) {
removed_uids.insert(uid);
}
}
+ removed_uids
+ }
+
+ /// Returns a set of orders uids for orders that are found to be
+ /// unsupported.
+ pub async fn unsupported_order_uids(&self, orders: &[Order]) -> HashSet<Uid> {
+ let removed_uids = match &self.simulation_detector {
+ Some(detector) => {
+ let removed_uids = self.using_simulation_detector(orders, detector).await;
+ detector.evict_outdated_entries();
+ removed_uids
+ }
+ None => self.without_sim_detector(orders),
+ };
+
if !removed_uids.is_empty() {
tracing::debug!(
orders = ?removed_uids,
@@ -172,10 +223,6 @@ impl Detector {
);
}
- if let Some(detector) = &self.simulation_detector {
- detector.evict_outdated_entries();
- }
-
removed_uids
}
@@ -222,15 +269,8 @@ mod tests {
domain::competition::{
Order,
order::{
- BuyTokenBalance,
- Kind,
- Partial,
- SellTokenBalance,
- Side,
- Signature,
- Uid,
- app_data::AppData,
- signature,
+ BuyTokenBalance, Kind, Partial, SellTokenBalance, Side, Signature, Uid,
+ app_data::AppData, signature,
},
risk_detector::bad_tokens::simulation::MockSellQualityDetector,
},
@@ -238,12 +278,7 @@ mod tests {
util,
},
app_data::{
- AppDataHash,
- Flashloan,
- ProtocolAppData,
- Root,
- ValidatedAppData,
- hash_full_app_data,
+ AppDataHash, Flashloan, ProtocolAppData, Root, ValidatedAppData, hash_full_app_data,
},
eth_domain_types::{Asset, TokenAmount, U256},
std::{sync::Arc, time::Duration},
There was a problem hiding this comment.
Will take a look
There was a problem hiding this comment.
Interesting, I think I understand the way you reasoned this and have refactored the code to match it.
I also replaced the std::mem::take with .drain(..).
Let me know what you think of it now.
@jmg-duarte Appreciate the comments. Happy to discuss it.. |
| tokio::spawn( | ||
| async move { | ||
| risk_detector | ||
| .without_unsupported_orders(&mut auction.orders, flashloans_enabled) |
There was a problem hiding this comment.
Before this PR, the quote path only ran token quality filtering. Flashloan filtering only happened in solve(). Now, without_unsupported_orders does both, so a solver with flashloans_enabled = false will start returning UnsupportedToken at quote time for flashloan orders. That's probably the right behavior (fail fast instead of silently quoting something the solver can't settle), but it's a semantic change that isn't called out in the PR description and isn't covered by an explicit test.
There was a problem hiding this comment.
The change to quote.rs is mentioned in the description, perhaps it was not explicit enough?
Logically the solver config should be used here, unless there is a specific reason not to? Could you confirm the exact behavior that is expected here?
It does not appear that a test can be written for this currently.
| solver::Liquidity::Fetch => tasks.liquidity.await, | ||
| solver::Liquidity::Skip => Arc::new(Vec::new()), | ||
| let (auction, liquidity) = tokio::join!( | ||
| tokio::spawn( |
There was a problem hiding this comment.
tokio::spawn returns detached JoinHandles. tokio::join! awaits them, but if the outer solve() future is dropped (e.g. client disconnect, overload shed), the spawned tasks keep running to completion. The previous inline tokio::join!(async { … }, future) would have been cancelled with the parent.
There was a problem hiding this comment.
I see your point. Though this behavior is not new here, it already exists with run_blocking_with_timer.
I would like to suggest that you have a look at #4379. The same concept there can be used for this as well, just couple it with a scopeguard. This is also probably better implemented there as well, else it is duplicating efforts.
There was a problem hiding this comment.
Wouldn't this be solveable by simply using a cancellationtoken + dropguard?
The PR you're pointing to is huge and the scope/steps need to be discussed
There was a problem hiding this comment.
Yes that is it.
The PR I am pointing to already has the cancellation token threaded through. It appears large but much of it is tests and plumbing. Did subsequently open an issue for it, #4380. So it just needs the drop guard added to it (and an patched in here) then the above behavior mentioned here will be handled as well.
Happy to discuss scope and steps further.
squadgazzz
left a comment
There was a problem hiding this comment.
Two notes on the c06963a refactor.
Description
Allow unsupported order detection be executed in parallel and optimize unsupported order detection.
Changes
without_unsupported_ordersto risk detector.without_unsupported_ordersto do all unsupported order filtering (including flashloans) in one pass.DetectorApitrait to decouple the simulation detector for tests.tokio::spawninsolve(), allowing parallel execution.quote.rsto usewithout_unsupported_orders.How to test
cargo test -p driver
Related Issues
Fixes #3516, follow up PR to #4309 and taking into account #4329