From cc5ae4e69c701e01d05b56bbc148e46035c2b4a3 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 2 Jun 2026 19:51:02 +0200 Subject: [PATCH 1/3] feat(cubestore): error when a query plan node materializes too many rows Add MaterializedRowsLimitExec that counts rows at the points of the plan where they accumulate in memory: sort and window inputs, join build sides, aggregation outputs, worker results and the final query result. When a single node materializes more than the limit, the query fails with a suggestion to create a pre-aggregation for that stage. Configured via CUBESTORE_MATERIALIZED_ROWS_LIMIT (default 500000, 0 disables the check). --- rust/cubestore/cubestore/src/config/mod.rs | 12 ++ .../queryplanner/materialized_rows_limit.rs | 196 ++++++++++++++++++ .../cubestore/src/queryplanner/mod.rs | 1 + .../optimizations/materialized_rows_limit.rs | 70 +++++++ .../src/queryplanner/optimizations/mod.rs | 31 +++ .../src/queryplanner/pretty_printers.rs | 10 +- .../src/queryplanner/query_executor.rs | 5 + rust/cubestore/cubestore/src/sql/mod.rs | 50 +++++ 8 files changed, 373 insertions(+), 2 deletions(-) create mode 100644 rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs create mode 100644 rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 8c3c78492af64..e8a7e59aeeb06 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -408,6 +408,10 @@ pub trait ConfigObj: DIService { fn query_timeout(&self) -> u64; + /// Maximum number of rows a single query plan node may materialize in memory. Zero means no + /// limit. + fn materialized_rows_limit(&self) -> usize; + fn not_used_timeout(&self) -> u64; fn in_memory_not_used_timeout(&self) -> u64; @@ -594,6 +598,7 @@ pub struct ConfigObjImpl { pub status_bind_address: Option, pub http_bind_address: Option, pub query_timeout: u64, + pub materialized_rows_limit: usize, /// Must be set to 2*query_timeout in prod, only for overrides in tests. pub not_used_timeout: u64, pub in_memory_not_used_timeout: u64, @@ -771,6 +776,10 @@ impl ConfigObj for ConfigObjImpl { self.query_timeout } + fn materialized_rows_limit(&self) -> usize { + self.materialized_rows_limit + } + fn not_used_timeout(&self) -> u64 { self.not_used_timeout } @@ -1391,6 +1400,7 @@ impl Config { format!("0.0.0.0:{}", env_parse("CUBESTORE_HTTP_PORT", 3030)), )), query_timeout, + materialized_rows_limit: env_parse("CUBESTORE_MATERIALIZED_ROWS_LIMIT", 500_000), not_used_timeout: 2 * query_timeout, in_memory_not_used_timeout: 30, import_job_timeout: env_parse("CUBESTORE_IMPORT_JOB_TIMEOUT", 600), @@ -1708,6 +1718,7 @@ impl Config { status_bind_address: None, http_bind_address: None, query_timeout, + materialized_rows_limit: 500_000, not_used_timeout: 2 * query_timeout, in_memory_not_used_timeout: 30, import_job_timeout: 600, @@ -2439,6 +2450,7 @@ impl Config { .clone(), i.get_service_typed().await, i.get_service_typed().await, + i.get_service_typed().await, ) }) .await; diff --git a/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs new file mode 100644 index 0000000000000..620e38c2976f4 --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs @@ -0,0 +1,196 @@ +use crate::CubeError; +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::DataFusionError; +use datafusion::execution::TaskContext; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, + SendableRecordBatchStream, +}; +use futures::stream::Stream; +use futures::StreamExt; +use std::any::Any; +use std::fmt::Formatter; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Errors out when the wrapped stream produces more than `limit` rows in total across all +/// partitions. Placed at points of the plan where rows get materialized in memory. +#[derive(Debug)] +pub struct MaterializedRowsLimitExec { + pub input: Arc, + pub limit: usize, + /// Human-readable description of the materialization point, used in the error message. + pub stage: &'static str, + rows: Arc, +} + +impl MaterializedRowsLimitExec { + pub fn new(input: Arc, limit: usize, stage: &'static str) -> Self { + Self { + input, + limit, + stage, + rows: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl DisplayAs for MaterializedRowsLimitExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "MaterializedRowsLimitExec, limit: {}, stage: {}", + self.limit, self.stage + ) + } +} + +#[async_trait] +impl ExecutionPlan for MaterializedRowsLimitExec { + fn name(&self) -> &str { + "MaterializedRowsLimitExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn properties(&self) -> &PlanProperties { + self.input.properties() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result, DataFusionError> { + assert_eq!(children.len(), 1); + Ok(Arc::new(MaterializedRowsLimitExec { + input: children.into_iter().next().unwrap(), + limit: self.limit, + stage: self.stage, + rows: self.rows.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if partition >= self.input.properties().partitioning.partition_count() { + return Err(DataFusionError::Internal(format!( + "MaterializedRowsLimitExec invalid partition {}", + partition + ))); + } + + let input = self.input.execute(partition, context)?; + Ok(Box::pin(MaterializedRowsLimitStream { + schema: self.schema(), + limit: self.limit, + stage: self.stage, + rows: self.rows.clone(), + input, + })) + } +} + +struct MaterializedRowsLimitStream { + schema: SchemaRef, + limit: usize, + stage: &'static str, + rows: Arc, + input: SendableRecordBatchStream, +} + +impl Stream for MaterializedRowsLimitStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.input.poll_next_unpin(cx).map(|x| match x { + Some(Ok(batch)) => { + let total = + self.rows.fetch_add(batch.num_rows(), Ordering::Relaxed) + batch.num_rows(); + if total > self.limit { + Some(Err(CubeError::user(format!( + "Query execution stage '{}' materialized more than {} rows. \ + Consider creating a pre-aggregation that performs this stage ahead of time.", + self.stage, self.limit + )) + .into())) + } else { + Some(Ok(batch)) + } + } + other => other, + }) + } + + fn size_hint(&self) -> (usize, Option) { + // same number of record batches + self.input.size_hint() + } +} + +impl RecordBatchStream for MaterializedRowsLimitStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion::arrow::array::Int64Array; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::physical_plan::collect; + use datafusion_datasource::memory::MemorySourceConfig; + + fn batches(sizes: &[usize]) -> (SchemaRef, Vec) { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batches = sizes + .iter() + .map(|size| { + let array = Int64Array::from((0..*size as i64).collect::>()); + RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap() + }) + .collect(); + (schema, batches) + } + + async fn run_with_limit( + sizes: &[usize], + limit: usize, + ) -> Result, DataFusionError> { + let (schema, batches) = batches(sizes); + let input = MemorySourceConfig::try_new_exec(&[batches], schema, None).unwrap(); + let limited = Arc::new(MaterializedRowsLimitExec::new(input, limit, "test stage")); + collect(limited, Arc::new(TaskContext::default())).await + } + + #[tokio::test] + async fn passes_under_limit() { + let r = run_with_limit(&[3, 4], 7).await.unwrap(); + assert_eq!(r.iter().map(|b| b.num_rows()).sum::(), 7); + } + + #[tokio::test] + async fn errors_over_limit() { + let err = run_with_limit(&[3, 4], 6).await.unwrap_err(); + let message = err.to_string(); + assert!(message.contains("'test stage'"), "{}", message); + assert!(message.contains("pre-aggregation"), "{}", message); + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index f2a41ac7d3b7c..841ab873ce59f 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -10,6 +10,7 @@ use datafusion_datasource::memory::MemorySourceConfig; use datafusion_datasource::source::DataSourceExec; pub use planning::PlanningMeta; mod check_memory; +pub mod materialized_rows_limit; pub mod physical_plan_flags; pub mod pretty_printers; mod projection_above_limit; diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs new file mode 100644 index 0000000000000..7de08eb22842c --- /dev/null +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs @@ -0,0 +1,70 @@ +use crate::queryplanner::materialized_rows_limit::MaterializedRowsLimitExec; +use crate::queryplanner::planning::WorkerExec; +use datafusion::error::DataFusionError; +use datafusion::physical_plan::aggregates::AggregateExec; +use datafusion::physical_plan::joins::{CrossJoinExec, HashJoinExec}; +use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::windows::WindowAggExec; +use datafusion::physical_plan::{ExecutionPlan, InputOrderMode}; +use std::sync::Arc; + +/// Add `MaterializedRowsLimitExec` at the points of the plan where rows accumulate in memory: +/// sort and window inputs, join build sides, aggregation outputs and worker results. Streaming +/// nodes are left as is. +pub fn add_materialized_rows_limit_exec( + p: Arc, + limit: usize, +) -> Result, DataFusionError> { + let p_any = p.as_any(); + if let Some(sort) = p_any.downcast_ref::() { + // Sort with a fetch keeps only top `fetch` rows in memory, so it stays under the limit on + // its own when `fetch <= limit`. Otherwise its buffer holds `min(input, fetch)` rows, so + // counting the input errors exactly when the buffer outgrows the limit. + if sort.fetch().map_or(true, |fetch| fetch > limit) { + return wrap_children(&p, &[(0, "sort input")], limit); + } + } else if p_any.is::() { + return wrap_children(&p, &[(0, "hash join build side")], limit); + } else if p_any.is::() { + return wrap_children(&p, &[(0, "cross join left side")], limit); + } else if p_any.is::() { + return wrap_children(&p, &[(0, "window input")], limit); + } else if let Some(agg) = p_any.downcast_ref::() { + // A sorted aggregation streams groups out instead of accumulating a hash table. + if agg.input_order_mode() != &InputOrderMode::Sorted { + return Ok(wrap(p, limit, "aggregation groups")); + } + } else if p_any.is::() { + return wrap_children(&p, &[(0, "worker result")], limit); + } + Ok(p) +} + +pub fn wrap( + p: Arc, + limit: usize, + stage: &'static str, +) -> Arc { + Arc::new(MaterializedRowsLimitExec::new(p, limit, stage)) +} + +fn wrap_children( + p: &Arc, + wraps: &[(usize, &'static str)], + limit: usize, +) -> Result, DataFusionError> { + let mut children: Vec<_> = p.children().into_iter().cloned().collect(); + let mut changed = false; + for (i, stage) in wraps { + // The child rows may already be counted by an adjacent limit node. + if !children[*i].as_any().is::() { + children[*i] = wrap(children[*i].clone(), limit, stage); + changed = true; + } + } + if changed { + p.clone().with_new_children(children) + } else { + Ok(p.clone()) + } +} diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs index e4ee5eb698b3c..cb463bc6ad006 100644 --- a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs @@ -2,12 +2,14 @@ mod check_memory; mod distributed_partial_aggregate; mod inline_aggregate_rewriter; pub mod is_not_distinct_from_join_keys; +mod materialized_rows_limit; pub mod rewrite_plan; pub mod rolling_optimizer; mod trace_data_loaded; use super::serialized_plan::PreSerializedPlan; use crate::cluster::{Cluster, WorkerPlanningParams}; +use crate::queryplanner::materialized_rows_limit::MaterializedRowsLimitExec; use crate::queryplanner::optimizations::distributed_partial_aggregate::{ add_limit_to_workers, ensure_partition_merge, push_aggregate_to_workers, replace_suboptimal_merge_sorts, @@ -29,6 +31,7 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; use distributed_partial_aggregate::ensure_partition_merge_with_acceptable_parent; +use materialized_rows_limit::add_materialized_rows_limit_exec; use rewrite_plan::rewrite_physical_plan; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -42,6 +45,8 @@ pub struct CubeQueryPlanner { serialized_plan: Arc, memory_handler: Arc, data_loaded_size: Option>, + /// Zero means no limit. + materialized_rows_limit: usize, } impl CubeQueryPlanner { @@ -49,6 +54,7 @@ impl CubeQueryPlanner { cluster: Arc, serialized_plan: Arc, memory_handler: Arc, + materialized_rows_limit: usize, ) -> CubeQueryPlanner { CubeQueryPlanner { cluster: Some(cluster), @@ -56,6 +62,7 @@ impl CubeQueryPlanner { serialized_plan, memory_handler, data_loaded_size: None, + materialized_rows_limit, } } @@ -64,6 +71,7 @@ impl CubeQueryPlanner { worker_planning_params: WorkerPlanningParams, memory_handler: Arc, data_loaded_size: Option>, + materialized_rows_limit: usize, ) -> CubeQueryPlanner { CubeQueryPlanner { serialized_plan, @@ -71,6 +79,7 @@ impl CubeQueryPlanner { worker_partition_count: Some(worker_planning_params), memory_handler, data_loaded_size, + materialized_rows_limit, } } } @@ -103,6 +112,8 @@ impl QueryPlanner for CubeQueryPlanner { self.memory_handler.clone(), self.data_loaded_size.clone(), ctx_state.config().options(), + self.materialized_rows_limit, + self.cluster.is_some(), ); result } @@ -157,6 +168,8 @@ fn finalize_physical_plan( memory_handler: Arc, data_loaded_size: Option>, config: &ConfigOptions, + materialized_rows_limit: usize, + is_router: bool, ) -> Result, DataFusionError> { let p = rewrite_physical_plan(p, &mut |p| add_check_memory_exec(p, memory_handler.clone()))?; log::trace!( @@ -182,5 +195,23 @@ fn finalize_physical_plan( "Rewrote physical plan by replace_suboptimal_merge_sorts:\n{}", pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta()) ); + let p = if materialized_rows_limit > 0 { + let p = rewrite_physical_plan(p, &mut |p| { + add_materialized_rows_limit_exec(p, materialized_rows_limit) + })?; + // The router collects the final query result in memory. + let p = if is_router && !p.as_any().is::() { + materialized_rows_limit::wrap(p, materialized_rows_limit, "query result") + } else { + p + }; + log::trace!( + "Rewrote physical plan by add_materialized_rows_limit_exec:\n{}", + pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta()) + ); + p + } else { + p + }; Ok(p) } diff --git a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs index 25006bc3aeedb..9e481058265f5 100644 --- a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs +++ b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs @@ -29,6 +29,7 @@ use std::sync::Arc; use crate::queryplanner::check_memory::CheckMemoryExec; use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec; use crate::queryplanner::inline_aggregate::{InlineAggregateExec, InlineAggregateMode}; +use crate::queryplanner::materialized_rows_limit::MaterializedRowsLimitExec; use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec; use crate::queryplanner::panic::{PanicWorkerExec, PanicWorkerNode}; use crate::queryplanner::planning::{ClusterSendNode, Snapshot, WorkerExec}; @@ -63,6 +64,7 @@ pub struct PPOptions { // Applies only to physical plan. pub show_output_hints: bool, pub show_check_memory_nodes: bool, + pub show_materialized_rows_limit_nodes: bool, pub show_partitions: bool, pub show_metrics: bool, pub traverse_past_clustersend: bool, @@ -78,6 +80,7 @@ impl PPOptions { show_schema: true, show_output_hints: true, show_check_memory_nodes: true, + show_materialized_rows_limit_nodes: true, show_partitions: true, show_metrics: false, // yeah. Is useful only after plan is evaluated, so defaults to false. traverse_past_clustersend: false, @@ -97,6 +100,7 @@ impl PPOptions { show_schema: false, show_output_hints: false, show_check_memory_nodes: false, + show_materialized_rows_limit_nodes: false, show_partitions: false, show_metrics: false, } @@ -526,8 +530,10 @@ fn pp_append_sort_by(out: &mut String, ordering: &LexOrdering) { } fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, out: &mut String) { - if p.as_any().is::() && !o.show_check_memory_nodes { - //We don't show CheckMemoryExec in plan by default + if (p.as_any().is::() && !o.show_check_memory_nodes) + || (p.as_any().is::() && !o.show_materialized_rows_limit_nodes) + { + //We don't show CheckMemoryExec and MaterializedRowsLimitExec in plan by default if let Some(child) = p.children().first() { pp_phys_plan_indented(child.as_ref(), indent, o, out) } diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 05a5e88445c99..67f54430b66af 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -145,6 +145,7 @@ pub struct QueryExecutorImpl { metadata_cache_factory: Arc, parquet_metadata_cache: Arc, memory_handler: Arc, + config_obj: Arc, } crate::di_service!(QueryExecutorImpl, [QueryExecutor]); @@ -401,11 +402,13 @@ impl QueryExecutorImpl { metadata_cache_factory: Arc, parquet_metadata_cache: Arc, memory_handler: Arc, + config_obj: Arc, ) -> Arc { Arc::new(QueryExecutorImpl { metadata_cache_factory, parquet_metadata_cache, memory_handler, + config_obj, }) } @@ -419,6 +422,7 @@ impl QueryExecutorImpl { cluster, serialized_plan, self.memory_handler.clone(), + self.config_obj.materialized_rows_limit(), )) } @@ -434,6 +438,7 @@ impl QueryExecutorImpl { worker_planning_params, self.memory_handler.clone(), data_loaded_size.clone(), + self.config_obj.materialized_rows_limit(), )) } diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 2ab0f11032b9b..bfa747fa25968 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -2744,6 +2744,56 @@ mod tests { Ok(()) } + #[tokio::test] + async fn materialized_rows_limit() -> Result<(), CubeError> { + Config::test("materialized_rows_limit") + .update_config(|mut c| { + c.materialized_rows_limit = 4; + c + }) + .start_test(async move |services| { + let service = services.sql_service; + + let _ = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; + let _ = service + .exec_query("CREATE TABLE foo.values (id int)") + .await? + .collect() + .await?; + let _ = service + .exec_query("INSERT INTO foo.values (id) VALUES (1), (2), (3), (4), (5), (6)") + .await? + .collect() + .await?; + + let result = service + .exec_query("SELECT id FROM foo.values WHERE id <= 3") + .await? + .collect() + .await?; + assert_eq!(result.get_rows().len(), 3); + + let res = async { + service + .exec_query("SELECT id FROM foo.values") + .await? + .collect() + .await + } + .await; + let err = res.expect_err("query must exceed the materialized rows limit"); + assert!(err.to_string().contains("pre-aggregation"), "{}", err); + + Ok::<(), CubeError>(()) + }) + .await; + Ok(()) + } + #[tokio::test] async fn int96_read() -> Result<(), CubeError> { // Copy pre-DF store. From ed1529cdbc6d3a7edbd0ebd3a749ecaf05279996 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 2 Jun 2026 20:13:47 +0200 Subject: [PATCH 2/3] feat(cubestore): report row counts in materialized rows limit error Include the observed row count, the limit and the env var name in the error message. Cover stage labels with e2e and plan-shape tests, pin the hash join build-side and counter lifetime invariants in comments. --- .../queryplanner/materialized_rows_limit.rs | 17 ++++- .../optimizations/materialized_rows_limit.rs | 1 + .../src/queryplanner/pretty_printers.rs | 2 + rust/cubestore/cubestore/src/sql/mod.rs | 68 ++++++++++++++++--- 4 files changed, 76 insertions(+), 12 deletions(-) diff --git a/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs index 620e38c2976f4..8f351817a81eb 100644 --- a/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs +++ b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs @@ -25,6 +25,7 @@ pub struct MaterializedRowsLimitExec { pub limit: usize, /// Human-readable description of the materialization point, used in the error message. pub stage: &'static str, + /// Total across all partitions. Never reset: plans are built per query and executed once. rows: Arc, } @@ -125,9 +126,12 @@ impl Stream for MaterializedRowsLimitStream { self.rows.fetch_add(batch.num_rows(), Ordering::Relaxed) + batch.num_rows(); if total > self.limit { Some(Err(CubeError::user(format!( - "Query execution stage '{}' materialized more than {} rows. \ - Consider creating a pre-aggregation that performs this stage ahead of time.", - self.stage, self.limit + "Query execution stage '{}' materialized at least {} rows \ + which exceeds the limit of {} rows. \ + Consider creating a pre-aggregation that performs this stage \ + ahead of time, or adjust the CUBESTORE_MATERIALIZED_ROWS_LIMIT \ + environment variable.", + self.stage, total, self.limit )) .into())) } else { @@ -191,6 +195,13 @@ mod tests { let err = run_with_limit(&[3, 4], 6).await.unwrap_err(); let message = err.to_string(); assert!(message.contains("'test stage'"), "{}", message); + assert!(message.contains("at least 7 rows"), "{}", message); + assert!(message.contains("limit of 6 rows"), "{}", message); assert!(message.contains("pre-aggregation"), "{}", message); + assert!( + message.contains("CUBESTORE_MATERIALIZED_ROWS_LIMIT"), + "{}", + message + ); } } diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs index 7de08eb22842c..16ae19c0c3500 100644 --- a/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/materialized_rows_limit.rs @@ -24,6 +24,7 @@ pub fn add_materialized_rows_limit_exec( return wrap_children(&p, &[(0, "sort input")], limit); } } else if p_any.is::() { + // HashJoinExec always builds the hash table from its left input. return wrap_children(&p, &[(0, "hash join build side")], limit); } else if p_any.is::() { return wrap_children(&p, &[(0, "cross join left side")], limit); diff --git a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs index 9e481058265f5..83f6c427b41a0 100644 --- a/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs +++ b/rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs @@ -780,6 +780,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou *out += "MemoryExec (ERROR: deprecated)"; } else if let Some(r) = a.downcast_ref::() { *out += &format!("Repartition, partitioning: {}", r.partitioning()); + } else if let Some(m) = a.downcast_ref::() { + *out += &format!("MaterializedRowsLimit, stage: {}", m.stage); } else { let to_string = format!("{:?}", p); *out += &to_string.split(" ").next().unwrap_or(&to_string); diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index bfa747fa25968..bdf6456b46b34 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -2777,16 +2777,66 @@ mod tests { .await?; assert_eq!(result.get_rows().len(), 3); - let res = async { - service - .exec_query("SELECT id FROM foo.values") - .await? - .collect() - .await - } + let expect_limit_error = async |query: &str, stage: &str| { + let res = async { service.exec_query(query).await?.collect().await }.await; + let err = res.expect_err("query must exceed the materialized rows limit"); + let message = err.to_string(); + assert!(message.contains(&format!("'{}'", stage)), "{}", message); + assert!(message.contains("limit of 4 rows"), "{}", message); + assert!(message.contains("pre-aggregation"), "{}", message); + }; + + expect_limit_error("SELECT id FROM foo.values", "worker result").await; + expect_limit_error( + "SELECT id % 100, count(*) FROM foo.values GROUP BY 1", + "aggregation groups", + ) .await; - let err = res.expect_err("query must exceed the materialized rows limit"); - assert!(err.to_string().contains("pre-aggregation"), "{}", err); + + Ok::<(), CubeError>(()) + }) + .await; + Ok(()) + } + + #[tokio::test] + async fn materialized_rows_limit_plan() -> Result<(), CubeError> { + Config::test("materialized_rows_limit_plan") + .start_test(async move |services| { + let service = services.sql_service; + + let _ = service + .exec_query("CREATE SCHEMA foo") + .await? + .collect() + .await?; + let _ = service + .exec_query("CREATE TABLE foo.values (id int)") + .await? + .collect() + .await?; + let _ = service + .exec_query("INSERT INTO foo.values (id) VALUES (1), (2), (3)") + .await? + .collect() + .await?; + + let mut opts = PPOptions::default(); + opts.show_materialized_rows_limit_nodes = true; + + let plans = service + .plan_query("SELECT id % 100, count(*) FROM foo.values GROUP BY 1") + .await?; + let worker = pp_phys_plan_ext(plans.worker.as_ref(), &opts); + assert!(worker.contains("stage: aggregation groups"), "{}", worker); + assert!(worker.contains("stage: worker result"), "{}", worker); + + let plans = service + .plan_query("SELECT id FROM foo.values ORDER BY id % 100") + .await?; + let router = pp_phys_plan_ext(plans.router.as_ref(), &opts); + assert!(router.contains("stage: sort input"), "{}", router); + assert!(router.contains("stage: query result"), "{}", router); Ok::<(), CubeError>(()) }) From b50385e1a2bb2079cd7a834a9236f53492913f42 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Wed, 3 Jun 2026 09:59:41 +0200 Subject: [PATCH 3/3] feat(cubestore): default materialized rows limit to partition split threshold A single plan node should not materialize more than one partition's worth of rows, so CUBESTORE_MATERIALIZED_ROWS_LIMIT now defaults to the configured CUBESTORE_PARTITION_SPLIT_THRESHOLD (2M rows by default) instead of a flat 500k. Drop the env var mention from the error message. --- rust/cubestore/cubestore/src/config/mod.rs | 14 +++++++++----- .../src/queryplanner/materialized_rows_limit.rs | 8 +------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index e8a7e59aeeb06..11de9d84fd595 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -1248,6 +1248,8 @@ impl Config { pub fn default() -> Config { let query_timeout = env_parse("CUBESTORE_QUERY_TIMEOUT", 120); + let partition_split_threshold = + env_parse("CUBESTORE_PARTITION_SPLIT_THRESHOLD", 1048576 * 2); let query_cache_stale_while_revalidate_secs: u64 = env_parse("CUBESTORE_QUERY_CACHE_STALE_WHILE_REVALIDATE", 0); let query_cache_time_to_idle_secs = env_parse( @@ -1297,10 +1299,7 @@ impl Config { dump_dir: env::var("CUBESTORE_DUMP_DIR") .ok() .map(|v| PathBuf::from(v)), - partition_split_threshold: env_parse( - "CUBESTORE_PARTITION_SPLIT_THRESHOLD", - 1048576 * 2, - ), + partition_split_threshold, partition_size_split_threshold_bytes: env_parse_size( "CUBESTORE_PARTITION_SIZE_SPLIT_THRESHOLD", 100 * 1024 * 1024, @@ -1400,7 +1399,12 @@ impl Config { format!("0.0.0.0:{}", env_parse("CUBESTORE_HTTP_PORT", 3030)), )), query_timeout, - materialized_rows_limit: env_parse("CUBESTORE_MATERIALIZED_ROWS_LIMIT", 500_000), + // A single plan node should not materialize more than one partition's worth of + // rows. + materialized_rows_limit: env_parse( + "CUBESTORE_MATERIALIZED_ROWS_LIMIT", + partition_split_threshold as usize, + ), not_used_timeout: 2 * query_timeout, in_memory_not_used_timeout: 30, import_job_timeout: env_parse("CUBESTORE_IMPORT_JOB_TIMEOUT", 600), diff --git a/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs index 8f351817a81eb..e6e22d56374d2 100644 --- a/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs +++ b/rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs @@ -129,8 +129,7 @@ impl Stream for MaterializedRowsLimitStream { "Query execution stage '{}' materialized at least {} rows \ which exceeds the limit of {} rows. \ Consider creating a pre-aggregation that performs this stage \ - ahead of time, or adjust the CUBESTORE_MATERIALIZED_ROWS_LIMIT \ - environment variable.", + ahead of time.", self.stage, total, self.limit )) .into())) @@ -198,10 +197,5 @@ mod tests { assert!(message.contains("at least 7 rows"), "{}", message); assert!(message.contains("limit of 6 rows"), "{}", message); assert!(message.contains("pre-aggregation"), "{}", message); - assert!( - message.contains("CUBESTORE_MATERIALIZED_ROWS_LIMIT"), - "{}", - message - ); } }