Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ pub trait ConfigObj: DIService {

fn query_timeout(&self) -> u64;

/// Maximum number of rows a single query plan node may materialize in memory. Zero means no
/// limit.
fn materialized_rows_limit(&self) -> usize;

fn not_used_timeout(&self) -> u64;

fn in_memory_not_used_timeout(&self) -> u64;
Expand Down Expand Up @@ -594,6 +598,7 @@ pub struct ConfigObjImpl {
pub status_bind_address: Option<String>,
pub http_bind_address: Option<String>,
pub query_timeout: u64,
pub materialized_rows_limit: usize,
/// Must be set to 2*query_timeout in prod, only for overrides in tests.
pub not_used_timeout: u64,
pub in_memory_not_used_timeout: u64,
Expand Down Expand Up @@ -771,6 +776,10 @@ impl ConfigObj for ConfigObjImpl {
self.query_timeout
}

fn materialized_rows_limit(&self) -> usize {
self.materialized_rows_limit
}

fn not_used_timeout(&self) -> u64 {
self.not_used_timeout
}
Expand Down Expand Up @@ -1239,6 +1248,8 @@ impl Config {

pub fn default() -> Config {
let query_timeout = env_parse("CUBESTORE_QUERY_TIMEOUT", 120);
let partition_split_threshold =
env_parse("CUBESTORE_PARTITION_SPLIT_THRESHOLD", 1048576 * 2);
let query_cache_stale_while_revalidate_secs: u64 =
env_parse("CUBESTORE_QUERY_CACHE_STALE_WHILE_REVALIDATE", 0);
let query_cache_time_to_idle_secs = env_parse(
Expand Down Expand Up @@ -1288,10 +1299,7 @@ impl Config {
dump_dir: env::var("CUBESTORE_DUMP_DIR")
.ok()
.map(|v| PathBuf::from(v)),
partition_split_threshold: env_parse(
"CUBESTORE_PARTITION_SPLIT_THRESHOLD",
1048576 * 2,
),
partition_split_threshold,
partition_size_split_threshold_bytes: env_parse_size(
"CUBESTORE_PARTITION_SIZE_SPLIT_THRESHOLD",
100 * 1024 * 1024,
Expand Down Expand Up @@ -1391,6 +1399,12 @@ impl Config {
format!("0.0.0.0:{}", env_parse("CUBESTORE_HTTP_PORT", 3030)),
)),
query_timeout,
// A single plan node should not materialize more than one partition's worth of
// rows.
materialized_rows_limit: env_parse(
"CUBESTORE_MATERIALIZED_ROWS_LIMIT",
partition_split_threshold as usize,
),
not_used_timeout: 2 * query_timeout,
in_memory_not_used_timeout: 30,
import_job_timeout: env_parse("CUBESTORE_IMPORT_JOB_TIMEOUT", 600),
Expand Down Expand Up @@ -1708,6 +1722,7 @@ impl Config {
status_bind_address: None,
http_bind_address: None,
query_timeout,
materialized_rows_limit: 500_000,
not_used_timeout: 2 * query_timeout,
in_memory_not_used_timeout: 30,
import_job_timeout: 600,
Expand Down Expand Up @@ -2439,6 +2454,7 @@ impl Config {
.clone(),
i.get_service_typed().await,
i.get_service_typed().await,
i.get_service_typed().await,
)
})
.await;
Expand Down
201 changes: 201 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/materialized_rows_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
use crate::CubeError;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
use futures::stream::Stream;
use futures::StreamExt;
use std::any::Any;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

/// Errors out when the wrapped stream produces more than `limit` rows in total across all
/// partitions. Placed at points of the plan where rows get materialized in memory.
#[derive(Debug)]
pub struct MaterializedRowsLimitExec {
pub input: Arc<dyn ExecutionPlan>,
pub limit: usize,
/// Human-readable description of the materialization point, used in the error message.
pub stage: &'static str,
/// Total across all partitions. Never reset: plans are built per query and executed once.
rows: Arc<AtomicUsize>,
}

impl MaterializedRowsLimitExec {
pub fn new(input: Arc<dyn ExecutionPlan>, limit: usize, stage: &'static str) -> Self {
Self {
input,
limit,
stage,
rows: Arc::new(AtomicUsize::new(0)),
}
}
}

impl DisplayAs for MaterializedRowsLimitExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"MaterializedRowsLimitExec, limit: {}, stage: {}",
self.limit, self.stage
)
}
}

#[async_trait]
impl ExecutionPlan for MaterializedRowsLimitExec {
fn name(&self) -> &str {
"MaterializedRowsLimitExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.input.schema()
}

fn properties(&self) -> &PlanProperties {
self.input.properties()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
assert_eq!(children.len(), 1);
Ok(Arc::new(MaterializedRowsLimitExec {
input: children.into_iter().next().unwrap(),
limit: self.limit,
stage: self.stage,
rows: self.rows.clone(),
}))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
if partition >= self.input.properties().partitioning.partition_count() {
return Err(DataFusionError::Internal(format!(
"MaterializedRowsLimitExec invalid partition {}",
partition
)));
}

let input = self.input.execute(partition, context)?;
Ok(Box::pin(MaterializedRowsLimitStream {
schema: self.schema(),
limit: self.limit,
stage: self.stage,
rows: self.rows.clone(),
input,
}))
}
Comment thread
claude[bot] marked this conversation as resolved.
}

struct MaterializedRowsLimitStream {
schema: SchemaRef,
limit: usize,
stage: &'static str,
rows: Arc<AtomicUsize>,
input: SendableRecordBatchStream,
}

impl Stream for MaterializedRowsLimitStream {
type Item = Result<RecordBatch, DataFusionError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => {
let total =
self.rows.fetch_add(batch.num_rows(), Ordering::Relaxed) + batch.num_rows();
if total > self.limit {
Some(Err(CubeError::user(format!(
"Query execution stage '{}' materialized at least {} rows \
which exceeds the limit of {} rows. \
Consider creating a pre-aggregation that performs this stage \
ahead of time.",
self.stage, total, self.limit
))
.into()))
Comment thread
claude[bot] marked this conversation as resolved.
} else {
Some(Ok(batch))
}
}
other => other,
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
// same number of record batches
self.input.size_hint()
}
}

impl RecordBatchStream for MaterializedRowsLimitStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::Int64Array;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::collect;
use datafusion_datasource::memory::MemorySourceConfig;

fn batches(sizes: &[usize]) -> (SchemaRef, Vec<RecordBatch>) {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]));
let batches = sizes
.iter()
.map(|size| {
let array = Int64Array::from((0..*size as i64).collect::<Vec<_>>());
RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap()
})
.collect();
(schema, batches)
}

async fn run_with_limit(
sizes: &[usize],
limit: usize,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let (schema, batches) = batches(sizes);
let input = MemorySourceConfig::try_new_exec(&[batches], schema, None).unwrap();
let limited = Arc::new(MaterializedRowsLimitExec::new(input, limit, "test stage"));
collect(limited, Arc::new(TaskContext::default())).await
}

#[tokio::test]
async fn passes_under_limit() {
let r = run_with_limit(&[3, 4], 7).await.unwrap();
assert_eq!(r.iter().map(|b| b.num_rows()).sum::<usize>(), 7);
}

#[tokio::test]
async fn errors_over_limit() {
let err = run_with_limit(&[3, 4], 6).await.unwrap_err();
let message = err.to_string();
assert!(message.contains("'test stage'"), "{}", message);
assert!(message.contains("at least 7 rows"), "{}", message);
assert!(message.contains("limit of 6 rows"), "{}", message);
assert!(message.contains("pre-aggregation"), "{}", message);
}
}
1 change: 1 addition & 0 deletions rust/cubestore/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use datafusion_datasource::memory::MemorySourceConfig;
use datafusion_datasource::source::DataSourceExec;
pub use planning::PlanningMeta;
mod check_memory;
pub mod materialized_rows_limit;
pub mod physical_plan_flags;
pub mod pretty_printers;
mod projection_above_limit;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::queryplanner::materialized_rows_limit::MaterializedRowsLimitExec;
use crate::queryplanner::planning::WorkerExec;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::joins::{CrossJoinExec, HashJoinExec};
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{ExecutionPlan, InputOrderMode};
use std::sync::Arc;

/// Add `MaterializedRowsLimitExec` at the points of the plan where rows accumulate in memory:
/// sort and window inputs, join build sides, aggregation outputs and worker results. Streaming
/// nodes are left as is.
pub fn add_materialized_rows_limit_exec(
p: Arc<dyn ExecutionPlan>,
limit: usize,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let p_any = p.as_any();
if let Some(sort) = p_any.downcast_ref::<SortExec>() {
// Sort with a fetch keeps only top `fetch` rows in memory, so it stays under the limit on
// its own when `fetch <= limit`. Otherwise its buffer holds `min(input, fetch)` rows, so
// counting the input errors exactly when the buffer outgrows the limit.
if sort.fetch().map_or(true, |fetch| fetch > limit) {
return wrap_children(&p, &[(0, "sort input")], limit);
}
} else if p_any.is::<HashJoinExec>() {
Comment thread
claude[bot] marked this conversation as resolved.
// HashJoinExec always builds the hash table from its left input.
return wrap_children(&p, &[(0, "hash join build side")], limit);
} else if p_any.is::<CrossJoinExec>() {
return wrap_children(&p, &[(0, "cross join left side")], limit);
} else if p_any.is::<WindowAggExec>() {
return wrap_children(&p, &[(0, "window input")], limit);
} else if let Some(agg) = p_any.downcast_ref::<AggregateExec>() {
// A sorted aggregation streams groups out instead of accumulating a hash table.
if agg.input_order_mode() != &InputOrderMode::Sorted {
return Ok(wrap(p, limit, "aggregation groups"));
}
} else if p_any.is::<WorkerExec>() {
return wrap_children(&p, &[(0, "worker result")], limit);
}
Ok(p)
}

pub fn wrap(
p: Arc<dyn ExecutionPlan>,
limit: usize,
stage: &'static str,
) -> Arc<dyn ExecutionPlan> {
Arc::new(MaterializedRowsLimitExec::new(p, limit, stage))
}

fn wrap_children(
p: &Arc<dyn ExecutionPlan>,
wraps: &[(usize, &'static str)],
limit: usize,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let mut children: Vec<_> = p.children().into_iter().cloned().collect();
let mut changed = false;
for (i, stage) in wraps {
// The child rows may already be counted by an adjacent limit node.
if !children[*i].as_any().is::<MaterializedRowsLimitExec>() {
children[*i] = wrap(children[*i].clone(), limit, stage);
changed = true;
}
}
if changed {
p.clone().with_new_children(children)
} else {
Ok(p.clone())
}
}
Loading
Loading