diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 8c3c78492af64..11de9d84fd595 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -408,6 +408,10 @@ pub trait ConfigObj: DIService { fn query_timeout(&self) -> u64; + /// Maximum number of rows a single query plan node may materialize in memory. Zero means no + /// limit. + fn materialized_rows_limit(&self) -> usize; + fn not_used_timeout(&self) -> u64; fn in_memory_not_used_timeout(&self) -> u64; @@ -594,6 +598,7 @@ pub struct ConfigObjImpl { pub status_bind_address: Option, pub http_bind_address: Option, pub query_timeout: u64, + pub materialized_rows_limit: usize, /// Must be set to 2*query_timeout in prod, only for overrides in tests. pub not_used_timeout: u64, pub in_memory_not_used_timeout: u64, @@ -771,6 +776,10 @@ impl ConfigObj for ConfigObjImpl { self.query_timeout } + fn materialized_rows_limit(&self) -> usize { + self.materialized_rows_limit + } + fn not_used_timeout(&self) -> u64 { self.not_used_timeout } @@ -1239,6 +1248,8 @@ impl Config { pub fn default() -> Config { let query_timeout = env_parse("CUBESTORE_QUERY_TIMEOUT", 120); + let partition_split_threshold = + env_parse("CUBESTORE_PARTITION_SPLIT_THRESHOLD", 1048576 * 2); let query_cache_stale_while_revalidate_secs: u64 = env_parse("CUBESTORE_QUERY_CACHE_STALE_WHILE_REVALIDATE", 0); let query_cache_time_to_idle_secs = env_parse( @@ -1288,10 +1299,7 @@ impl Config { dump_dir: env::var("CUBESTORE_DUMP_DIR") .ok() .map(|v| PathBuf::from(v)), - partition_split_threshold: env_parse( - "CUBESTORE_PARTITION_SPLIT_THRESHOLD", - 1048576 * 2, - ), + partition_split_threshold, partition_size_split_threshold_bytes: env_parse_size( "CUBESTORE_PARTITION_SIZE_SPLIT_THRESHOLD", 100 * 1024 * 1024, @@ -1391,6 +1399,12 @@ impl Config { format!("0.0.0.0:{}", env_parse("CUBESTORE_HTTP_PORT", 3030)), )), query_timeout, + // A single plan node should not materialize more than one partition's worth of + // rows. + materialized_rows_limit: env_parse( + "CUBESTORE_MATERIALIZED_ROWS_LIMIT", + partition_split_threshold as usize, + ), not_used_timeout: 2 * query_timeout, in_memory_not_used_timeout: 30, import_job_timeout: env_parse("CUBESTORE_IMPORT_JOB_TIMEOUT", 600), @@ -1708,6 +1722,7 @@ impl Config { status_bind_address: None, http_bind_address: None, query_timeout, + materialized_rows_limit: 500_000, not_used_timeout: 2 * query_timeout, in_memory_not_used_timeout: 30, import_job_timeout: 600, @@ -2439,6 +2454,7 @@ impl Config { .clone(), i.get_service_typed().await, i.get_service_typed().await, + i.get_service_typed().await, ) }) .await; diff --git a/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs new file mode 100644 index 0000000000000..e6e22d56374d2 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs @@ -0,0 +1,201 @@ +use crate::CubeError; +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::DataFusionError; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, +}; +use futures::stream::Stream; +use futures::StreamExt; +use std::any::Any; +use std::fmt::Formatter; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Errors out when the wrapped stream produces more than `limit` rows in total across all +/// partitions. Placed at points of the plan where rows get materialized in memory. +#[derive(Debug)] +pub struct MaterializedRowsLimitExec { + pub input: Arc, + pub limit: usize, + /// Human-readable description of the materialization point, used in the error message. + pub stage: &'static str, + /// Total across all partitions. Never reset: plans are built per query and executed once. + rows: Arc, +} + +impl MaterializedRowsLimitExec { + pub fn new(input: Arc, limit: usize, stage: &'static str) -> Self { + Self { + input, + limit, + stage, + rows: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl DisplayAs for MaterializedRowsLimitExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "MaterializedRowsLimitExec, limit: {}, stage: {}", + self.limit, self.stage + ) + } +} + +#[async_trait] +impl ExecutionPlan for MaterializedRowsLimitExec { + fn name(&self) -> &str { + "MaterializedRowsLimitExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result, DataFusionError> { + assert_eq!(children.len(), 1); + Ok(Arc::new(MaterializedRowsLimitExec { + input: children.into_iter().next().unwrap(), + limit: self.limit, + stage: self.stage, + rows: self.rows.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if partition >= self.input.properties().partitioning.partition_count() { + return Err(DataFusionError::Internal(format!( + "MaterializedRowsLimitExec invalid partition {}", + partition + ))); + } + + let input = self.input.execute(partition, context)?; + Ok(Box::pin(MaterializedRowsLimitStream { + schema: self.schema(), + limit: self.limit, + stage: self.stage, + rows: self.rows.clone(), + input, + })) + } +} + +struct MaterializedRowsLimitStream { + schema: SchemaRef, + limit: usize, + stage: &'static str, + rows: Arc, + input: SendableRecordBatchStream, +} + +impl Stream for MaterializedRowsLimitStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => { + let total = + self.rows.fetch_add(batch.num_rows(), Ordering::Relaxed) + batch.num_rows(); + if total > self.limit { + Some(Err(CubeError::user(format!( + "Query execution stage '{}' materialized at least {} rows \ + which exceeds the limit of {} rows. \ + Consider creating a pre-aggregation that performs this stage \ + ahead of time.", + self.stage, total, self.limit + )) + .into())) + } else { + Some(Ok(batch)) + } + } + other => other, + }) + } + + fn size_hint(&self) -> (usize, Option) { + // same number of record batches + self.input.size_hint() + } +} + +impl RecordBatchStream for MaterializedRowsLimitStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::collect; + use datafusion_datasource::memory::MemorySourceConfig; + + fn batches(sizes: &[usize]) -> (SchemaRef, Vec) { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batches = sizes + .iter() + .map(|size| { + let array = Int64Array::from((0..*size as i64).collect::>()); + RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap() + }) + .collect(); + (schema, batches) + } + + async fn run_with_limit( + sizes: &[usize], + limit: usize, + ) -> Result, DataFusionError> { + let (schema, batches) = batches(sizes); + let input = MemorySourceConfig::try_new_exec(&[batches], schema, None).unwrap(); + let limited = Arc::new(MaterializedRowsLimitExec::new(input, limit, "test stage")); + collect(limited, Arc::new(TaskContext::default())).await + } + + #[tokio::test] + async fn passes_under_limit() { + let r = run_with_limit(&[3, 4], 7).await.unwrap(); + assert_eq!(r.iter().map(|b| b.num_rows()).sum::(), 7); + } + + #[tokio::test] + async fn errors_over_limit() { + let err = run_with_limit(&[3, 4], 6).await.unwrap_err(); + let message = err.to_string(); + assert!(message.contains("'test stage'"), "{}", message); + assert!(message.contains("at least 7 rows"), "{}", message); + assert!(message.contains("limit of 6 rows"), "{}", message); + assert!(message.contains("pre-aggregation"), "{}", message); + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index f2a41ac7d3b7c..841ab873ce59f 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -10,6 +10,7 @@ use datafusion_datasource::memory::MemorySourceConfig; use datafusion_datasource::source::DataSourceExec; pub use planning::PlanningMeta; mod check_memory; +pub mod materialized_rows_limit; pub mod physical_plan_flags; pub mod pretty_printers; mod projection_above_limit; diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs new file mode 100644 index 0000000000000..16ae19c0c3500 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs @@ -0,0 +1,71 @@ +use crate::queryplanner::materialized_rows_limit::MaterializedRowsLimitExec; +use crate::queryplanner::planning::WorkerExec; +use datafusion::error::DataFusionError; +use datafusion::physical_plan::aggregates::AggregateExec; +use datafusion::physical_plan::joins::{CrossJoinExec, HashJoinExec}; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::windows::WindowAggExec; +use datafusion::physical_plan::{ExecutionPlan, InputOrderMode}; +use std::sync::Arc; + +/// Add `MaterializedRowsLimitExec` at the points of the plan where rows accumulate in memory: +/// sort and window inputs, join build sides, aggregation outputs and worker results. Streaming +/// nodes are left as is. +pub fn add_materialized_rows_limit_exec( + p: Arc, + limit: usize, +) -> Result, DataFusionError> { + let p_any = p.as_any(); + if let Some(sort) = p_any.downcast_ref::() { + // Sort with a fetch keeps only top `fetch` rows in memory, so it stays under the limit on + // its own when `fetch <= limit`. Otherwise its buffer holds `min(input, fetch)` rows, so + // counting the input errors exactly when the buffer outgrows the limit. + if sort.fetch().map_or(true, |fetch| fetch > limit) { + return wrap_children(&p, &[(0, "sort input")], limit); + } + } else if p_any.is::() { + // HashJoinExec always builds the hash table from its left input. + return wrap_children(&p, &[(0, "hash join build side")], limit); + } else if p_any.is::() { + return wrap_children(&p, &[(0, "cross join left side")], limit); + } else if p_any.is::() { + return wrap_children(&p, &[(0, "window input")], limit); + } else if let Some(agg) = p_any.downcast_ref::() { + // A sorted aggregation streams groups out instead of accumulating a hash table. + if agg.input_order_mode() != &InputOrderMode::Sorted { + return Ok(wrap(p, limit, "aggregation groups")); + } + } else if p_any.is::() { + return wrap_children(&p, &[(0, "worker result")], limit); + } + Ok(p) +} + +pub fn wrap( + p: Arc, + limit: usize, + stage: &'static str, +) -> Arc { + Arc::new(MaterializedRowsLimitExec::new(p, limit, stage)) +} + +fn wrap_children( + p: &Arc, + wraps: &[(usize, &'static str)], + limit: usize, +) -> Result, DataFusionError> { + let mut children: Vec<_> = p.children().into_iter().cloned().collect(); + let mut changed = false; + for (i, stage) in wraps { + // The child rows may already be counted by an adjacent limit node. + if !children[*i].as_any().is::() { + children[*i] = wrap(children[*i].clone(), limit, stage); + changed = true; + } + } + if changed { + p.clone().with_new_children(children) + } else { + Ok(p.clone()) + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs index e4ee5eb698b3c..cb463bc6ad006 100644 --- a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs @@ -2,12 +2,14 @@ mod check_memory; mod distributed_partial_aggregate; mod inline_aggregate_rewriter; pub mod is_not_distinct_from_join_keys; +mod materialized_rows_limit; pub mod rewrite_plan; pub mod rolling_optimizer; mod trace_data_loaded; use super::serialized_plan::PreSerializedPlan; use crate::cluster::{Cluster, WorkerPlanningParams}; +use crate::queryplanner::materialized_rows_limit::MaterializedRowsLimitExec; use crate::queryplanner::optimizations::distributed_partial_aggregate::{ add_limit_to_workers, ensure_partition_merge, push_aggregate_to_workers, replace_suboptimal_merge_sorts, @@ -29,6 +31,7 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use distributed_partial_aggregate::ensure_partition_merge_with_acceptable_parent; +use materialized_rows_limit::add_materialized_rows_limit_exec; use rewrite_plan::rewrite_physical_plan; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -42,6 +45,8 @@ pub struct CubeQueryPlanner { serialized_plan: Arc, memory_handler: Arc, data_loaded_size: Option>, + /// Zero means no limit. + materialized_rows_limit: usize, } impl CubeQueryPlanner { @@ -49,6 +54,7 @@ impl CubeQueryPlanner { cluster: Arc, serialized_plan: Arc, memory_handler: Arc, + materialized_rows_limit: usize, ) -> CubeQueryPlanner { CubeQueryPlanner { cluster: Some(cluster), @@ -56,6 +62,7 @@ impl CubeQueryPlanner { serialized_plan, memory_handler, data_loaded_size: None, + materialized_rows_limit, } } @@ -64,6 +71,7 @@ impl CubeQueryPlanner { worker_planning_params: WorkerPlanningParams, memory_handler: Arc, data_loaded_size: Option>, + materialized_rows_limit: usize, ) -> CubeQueryPlanner { CubeQueryPlanner { serialized_plan, @@ -71,6 +79,7 @@ impl CubeQueryPlanner { worker_partition_count: Some(worker_planning_params), memory_handler, data_loaded_size, + materialized_rows_limit, } } } @@ -103,6 +112,8 @@ impl QueryPlanner for CubeQueryPlanner { self.memory_handler.clone(), self.data_loaded_size.clone(), ctx_state.config().options(), + self.materialized_rows_limit, + self.cluster.is_some(), ); result } @@ -157,6 +168,8 @@ fn finalize_physical_plan( memory_handler: Arc, data_loaded_size: Option>, config: &ConfigOptions, + materialized_rows_limit: usize, + is_router: bool, ) -> Result, DataFusionError> { let p = rewrite_physical_plan(p, &mut |p| add_check_memory_exec(p, memory_handler.clone()))?; log::trace!( @@ -182,5 +195,23 @@ fn finalize_physical_plan( "Rewrote physical plan by replace_suboptimal_merge_sorts:\n{}", pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta()) ); + let p = if materialized_rows_limit > 0 { + let p = rewrite_physical_plan(p, &mut |p| { + add_materialized_rows_limit_exec(p, materialized_rows_limit) + })?; + // The router collects the final query result in memory. + let p = if is_router && !p.as_any().is::() { + materialized_rows_limit::wrap(p, materialized_rows_limit, "query result") + } else { + p + }; + log::trace!( + "Rewrote physical plan by add_materialized_rows_limit_exec:\n{}", + pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta()) + ); + p + } else { + p + }; Ok(p) } diff --git a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs index 25006bc3aeedb..83f6c427b41a0 100644 --- a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs +++ b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs @@ -29,6 +29,7 @@ use std::sync::Arc; use crate::queryplanner::check_memory::CheckMemoryExec; use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec; use crate::queryplanner::inline_aggregate::{InlineAggregateExec, InlineAggregateMode}; +use crate::queryplanner::materialized_rows_limit::MaterializedRowsLimitExec; use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec; use crate::queryplanner::panic::{PanicWorkerExec, PanicWorkerNode}; use crate::queryplanner::planning::{ClusterSendNode, Snapshot, WorkerExec}; @@ -63,6 +64,7 @@ pub struct PPOptions { // Applies only to physical plan. pub show_output_hints: bool, pub show_check_memory_nodes: bool, + pub show_materialized_rows_limit_nodes: bool, pub show_partitions: bool, pub show_metrics: bool, pub traverse_past_clustersend: bool, @@ -78,6 +80,7 @@ impl PPOptions { show_schema: true, show_output_hints: true, show_check_memory_nodes: true, + show_materialized_rows_limit_nodes: true, show_partitions: true, show_metrics: false, // yeah. Is useful only after plan is evaluated, so defaults to false. traverse_past_clustersend: false, @@ -97,6 +100,7 @@ impl PPOptions { show_schema: false, show_output_hints: false, show_check_memory_nodes: false, + show_materialized_rows_limit_nodes: false, show_partitions: false, show_metrics: false, } @@ -526,8 +530,10 @@ fn pp_append_sort_by(out: &mut String, ordering: &LexOrdering) { } fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, out: &mut String) { - if p.as_any().is::() && !o.show_check_memory_nodes { - //We don't show CheckMemoryExec in plan by default + if (p.as_any().is::() && !o.show_check_memory_nodes) + || (p.as_any().is::() && !o.show_materialized_rows_limit_nodes) + { + //We don't show CheckMemoryExec and MaterializedRowsLimitExec in plan by default if let Some(child) = p.children().first() { pp_phys_plan_indented(child.as_ref(), indent, o, out) } @@ -774,6 +780,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou *out += "MemoryExec (ERROR: deprecated)"; } else if let Some(r) = a.downcast_ref::() { *out += &format!("Repartition, partitioning: {}", r.partitioning()); + } else if let Some(m) = a.downcast_ref::() { + *out += &format!("MaterializedRowsLimit, stage: {}", m.stage); } else { let to_string = format!("{:?}", p); *out += &to_string.split(" ").next().unwrap_or(&to_string); diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 05a5e88445c99..67f54430b66af 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -145,6 +145,7 @@ pub struct QueryExecutorImpl { metadata_cache_factory: Arc, parquet_metadata_cache: Arc, memory_handler: Arc, + config_obj: Arc, } crate::di_service!(QueryExecutorImpl, [QueryExecutor]); @@ -401,11 +402,13 @@ impl QueryExecutorImpl { metadata_cache_factory: Arc, parquet_metadata_cache: Arc, memory_handler: Arc, + config_obj: Arc, ) -> Arc { Arc::new(QueryExecutorImpl { metadata_cache_factory, parquet_metadata_cache, memory_handler, + config_obj, }) } @@ -419,6 +422,7 @@ impl QueryExecutorImpl { cluster, serialized_plan, self.memory_handler.clone(), + self.config_obj.materialized_rows_limit(), )) } @@ -434,6 +438,7 @@ impl QueryExecutorImpl { worker_planning_params, self.memory_handler.clone(), data_loaded_size.clone(), + self.config_obj.materialized_rows_limit(), )) } diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 2ab0f11032b9b..bdf6456b46b34 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -2744,6 +2744,106 @@ mod tests { Ok(()) } + #[tokio::test] + async fn materialized_rows_limit() -> Result<(), CubeError> { + Config::test("materialized_rows_limit") + .update_config(|mut c| { + c.materialized_rows_limit = 4; + c + }) + .start_test(async move |services| { + let service = services.sql_service; + + let _ = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; + let _ = service + .exec_query("CREATE TABLE foo.values (id int)") + .await? + .collect() + .await?; + let _ = service + .exec_query("INSERT INTO foo.values (id) VALUES (1), (2), (3), (4), (5), (6)") + .await? + .collect() + .await?; + + let result = service + .exec_query("SELECT id FROM foo.values WHERE id <= 3") + .await? + .collect() + .await?; + assert_eq!(result.get_rows().len(), 3); + + let expect_limit_error = async |query: &str, stage: &str| { + let res = async { service.exec_query(query).await?.collect().await }.await; + let err = res.expect_err("query must exceed the materialized rows limit"); + let message = err.to_string(); + assert!(message.contains(&format!("'{}'", stage)), "{}", message); + assert!(message.contains("limit of 4 rows"), "{}", message); + assert!(message.contains("pre-aggregation"), "{}", message); + }; + + expect_limit_error("SELECT id FROM foo.values", "worker result").await; + expect_limit_error( + "SELECT id % 100, count(*) FROM foo.values GROUP BY 1", + "aggregation groups", + ) + .await; + + Ok::<(), CubeError>(()) + }) + .await; + Ok(()) + } + + #[tokio::test] + async fn materialized_rows_limit_plan() -> Result<(), CubeError> { + Config::test("materialized_rows_limit_plan") + .start_test(async move |services| { + let service = services.sql_service; + + let _ = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; + let _ = service + .exec_query("CREATE TABLE foo.values (id int)") + .await? + .collect() + .await?; + let _ = service + .exec_query("INSERT INTO foo.values (id) VALUES (1), (2), (3)") + .await? + .collect() + .await?; + + let mut opts = PPOptions::default(); + opts.show_materialized_rows_limit_nodes = true; + + let plans = service + .plan_query("SELECT id % 100, count(*) FROM foo.values GROUP BY 1") + .await?; + let worker = pp_phys_plan_ext(plans.worker.as_ref(), &opts); + assert!(worker.contains("stage: aggregation groups"), "{}", worker); + assert!(worker.contains("stage: worker result"), "{}", worker); + + let plans = service + .plan_query("SELECT id FROM foo.values ORDER BY id % 100") + .await?; + let router = pp_phys_plan_ext(plans.router.as_ref(), &opts); + assert!(router.contains("stage: sort input"), "{}", router); + assert!(router.contains("stage: query result"), "{}", router); + + Ok::<(), CubeError>(()) + }) + .await; + Ok(()) + } + #[tokio::test] async fn int96_read() -> Result<(), CubeError> { // Copy pre-DF store.