-
Notifications
You must be signed in to change notification settings - Fork 166
driver: parallelize unsupported order detection #4347
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cc93ca1
889f437
ae060d2
c5cf81d
8ae2e9b
144212f
8375c44
13d0be9
5ad8f6b
b56b2b9
6045c8b
ede7489
efc6d90
e01df58
9dac353
5707a99
c06963a
60e766f
f41f0d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,7 +33,7 @@ use { | |
| time::Instant, | ||
| }, | ||
| tokio::{sync::mpsc, task}, | ||
| tracing::{Instrument, instrument}, | ||
| tracing::Instrument, | ||
| }; | ||
|
|
||
| pub mod auction; | ||
|
|
@@ -339,28 +339,51 @@ impl Competition { | |
| Self::sort_orders(auction, solver_address, order_sorting_strategies) | ||
| }); | ||
|
|
||
| // We can sort the orders and fetch auction data in parallel | ||
| // We can sort the orders and fetch auction data in parallel. | ||
| let (auction, balances, app_data) = | ||
| tokio::join!(sort_orders_future, tasks.balances, tasks.app_data); | ||
|
|
||
| let auction = Self::run_blocking_with_timer("update_orders", move || { | ||
| let mut auction = Self::run_blocking_with_timer("update_orders", move || { | ||
| // Same as before with sort_orders, we use spawn_blocking() because a lot of CPU | ||
| // bound computations are happening and we want to avoid blocking | ||
| // the runtime. | ||
| Self::update_orders(auction, balances, app_data, cow_amm_orders) | ||
| }) | ||
| .await; | ||
|
|
||
| let risk_detector = self.risk_detector.clone(); | ||
| let flashloans_enabled = self.solver.config().flashloans_enabled; | ||
| let liquidity_mode = self.solver.liquidity(); | ||
|
|
||
| // We can run bad token filtering and liquidity fetching in parallel | ||
| let (liquidity, auction) = tokio::join!( | ||
| async { | ||
| match self.solver.liquidity() { | ||
| solver::Liquidity::Fetch => tasks.liquidity.await, | ||
| solver::Liquidity::Skip => Arc::new(Vec::new()), | ||
| let (auction, liquidity) = tokio::join!( | ||
| tokio::spawn( | ||
| async move { | ||
| risk_detector | ||
| .without_unsupported_orders(&mut auction.orders, flashloans_enabled) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before this PR, the quote path only ran token quality filtering. Flashloan filtering only happened in solve(). Now,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The change to Logically the solver config should be used here, unless there is a specific reason not to? Could you confirm the exact behavior that is expected here? It does not appear that a test can be written for this currently. |
||
| .await; | ||
| auction | ||
| } | ||
| }, | ||
| self.without_unsupported_orders(auction) | ||
| .in_current_span(), | ||
| ), | ||
| tokio::spawn( | ||
| async move { | ||
| match liquidity_mode { | ||
| solver::Liquidity::Fetch => tasks.liquidity.await, | ||
| solver::Liquidity::Skip => Arc::new(Vec::new()), | ||
| } | ||
| } | ||
| .in_current_span(), | ||
| ), | ||
| ); | ||
| let auction = auction.map_err(|err| { | ||
| tracing::error!(?err, "order filtering task failed"); | ||
| Error::InternalError(err.to_string()) | ||
| })?; | ||
| let liquidity = liquidity.map_err(|err| { | ||
| tracing::error!(?err, "liquidity fetch task failed"); | ||
| Error::InternalError(err.to_string()) | ||
| })?; | ||
|
|
||
| let elapsed = start.elapsed(); | ||
| metrics::get() | ||
|
|
@@ -951,16 +974,6 @@ impl Competition { | |
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[instrument(skip_all)] | ||
| async fn without_unsupported_orders(&self, mut auction: Auction) -> Auction { | ||
| if !self.solver.config().flashloans_enabled { | ||
| auction.orders.retain(|o| o.app_data.flashloan().is_none()); | ||
| } | ||
| self.risk_detector | ||
| .filter_unsupported_orders_in_auction(auction) | ||
| .await | ||
| } | ||
| } | ||
|
|
||
| const MAX_SOLUTIONS_TO_MERGE: usize = 10; | ||
|
|
@@ -1077,4 +1090,6 @@ pub enum Error { | |
| NoValidOrdersFound, | ||
| #[error("could not parse the request")] | ||
| MalformedRequest, | ||
| #[error("internal error: {0}")] | ||
| InternalError(String), | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::spawnreturns detachedJoinHandles.tokio::join!awaits them, but if the outersolve()future is dropped (e.g. client disconnect, overload shed), the spawned tasks keep running to completion. The previous inline tokio::join!(async { … }, future) would have been cancelled with the parent.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point. Though this behavior is not new here, it already exists with
run_blocking_with_timer.I would like to suggest that you have a look at #4379. The same concept there can be used for this as well, just couple it with a scopeguard. This is also probably better implemented there as well, else it is duplicating efforts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be solveable by simply using a cancellationtoken + dropguard?
The PR you're pointing to is huge and the scope/steps need to be discussed
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is it.
The PR I am pointing to already has the cancellation token threaded through. It appears large but much of it is tests and plumbing. Did subsequently open an issue for it, #4380. So it just needs the drop guard added to it (and an patched in here) then the above behavior mentioned here will be handled as well.
Happy to discuss scope and steps further.