diff --git a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs index d401557e966d6..a754816d5fc1a 100644 --- a/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs +++ b/datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs @@ -233,6 +233,43 @@ async fn test_sort_with_limited_memory_and_large_record_batch() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_sort_with_limited_memory_and_oversized_record_batch() -> Result<()> { + let record_batch_size = 8192; + let pool_size = 2 * MB as usize; + let task_ctx = { + let memory_pool = Arc::new(FairSpillPool::new(pool_size)); + TaskContext::default() + .with_session_config( + SessionConfig::new() + .with_batch_size(record_batch_size) + .with_sort_spill_reservation_bytes(1), + ) + .with_runtime(Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool) + .build()?, + )) + }; + + // Each spilled run's largest batch is so big that two merge streams cannot be + // reserved at once even at the smallest read-buffer size (`2 * (2 * batch) > + // pool`), yet a single stream still fits (`2 * batch < pool`). Reducing the + // buffer size therefore cannot help, the multi-level merge has to re-spill a + // run with a smaller batch size to make progress instead of failing with + // `ResourcesExhausted`. + run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs { + pool_size, + task_ctx: Arc::new(task_ctx), + number_of_record_batches: 100, + get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 3), + memory_behavior: Default::default(), + }) + .await?; + + Ok(()) +} + struct RunTestWithLimitedMemoryArgs { pool_size: usize, task_ctx: Arc, diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 8985e1d8c70ee..a80594bb8490c 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use arrow::datatypes::SchemaRef; -use datafusion_common::Result; +use datafusion_common::{Result, internal_err, resources_err}; use datafusion_execution::memory_pool::MemoryReservation; use crate::sorts::builder::try_grow_reservation_to_at_least; @@ -182,7 +182,17 @@ impl MultiLevelMergeBuilder { async fn create_stream(mut self) -> Result { loop { - let mut stream = self.merge_sorted_runs_within_mem_limit()?; + let mut stream = match self.merge_sorted_runs_within_mem_limit()? { + MergeStep::Stream(stream) => stream, + MergeStep::SplitThenRetry(index) => { + // Couldn't reserve memory for the minimum of 2 streams. Re-spill the + // larger of the two we're trying to merge with half its batch size so + // its largest batch shrinks, lowering the per-stream reservation, then + // retry. Makes the merge resilient to skewed (very wide) rows. + self.split_spill_file_in_half(index).await?; + continue; + } + }; // TODO - add a threshold for number of files to disk even if empty and reading from disk so // we can avoid the memory reservation @@ -220,36 +230,36 @@ impl MultiLevelMergeBuilder { /// This tries to create a stream that merges the most sorted streams and sorted spill files /// as possible within the memory limit. - fn merge_sorted_runs_within_mem_limit( - &mut self, - ) -> Result { + fn merge_sorted_runs_within_mem_limit(&mut self) -> Result { match (self.sorted_spill_files.len(), self.sorted_streams.len()) { // No data so empty batch - (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone( - &self.schema, + (0, 0) => Ok(MergeStep::Stream(Box::pin(EmptyRecordBatchStream::new( + Arc::clone(&self.schema), )))), // Only in-memory stream, return that - (0, 1) => Ok(self.sorted_streams.remove(0)), + (0, 1) => Ok(MergeStep::Stream(self.sorted_streams.remove(0))), // Only single sorted spill file so return it (1, 0) => { let spill_file = self.sorted_spill_files.remove(0); // Not reserving any memory for this disk as we are not holding it in memory - self.spill_manager - .read_spill_as_stream(spill_file.file, None) + Ok(MergeStep::Stream( + self.spill_manager + .read_spill_as_stream(spill_file.file, None)?, + )) } // Only in memory streams, so merge them all in a single pass (0, _) => { let sorted_stream = mem::take(&mut self.sorted_streams); - self.create_new_merge_sort( + Ok(MergeStep::Stream(self.create_new_merge_sort( sorted_stream, // If we have no sorted spill files left, this is the last run true, true, - ) + )?)) } // Need to merge multiple streams @@ -261,17 +271,33 @@ impl MultiLevelMergeBuilder { // allocation. let mut memory_reservation = self.reservation.take(); - // Don't account for existing streams memory - // as we are not holding the memory for them - let mut sorted_streams = mem::take(&mut self.sorted_streams); + // Compute the minimum before taking the in-memory streams so that, if we + // need to re-spill and retry, `self.sorted_streams` is left untouched. + let minimum_number_of_required_streams = + 2_usize.saturating_sub(self.sorted_streams.len()); - let (sorted_spill_files, buffer_size) = self + let (sorted_spill_files, buffer_size) = match self .get_sorted_spill_files_to_merge( 2, // we must have at least 2 streams to merge - 2_usize.saturating_sub(sorted_streams.len()), + minimum_number_of_required_streams, &mut memory_reservation, - )?; + )? { + SpillFilesToMerge::Ready(sorted_spill_files, buffer_size) => { + (sorted_spill_files, buffer_size) + } + // Not enough memory to seat 2 streams. Re-spill the blocking file + // smaller and retry. `get_sorted_spill_files_to_merge` already freed + // the reservation and `self.sorted_streams` is untouched, so the + // retry starts clean. + SpillFilesToMerge::SplitThenRetry(index) => { + return Ok(MergeStep::SplitThenRetry(index)); + } + }; + + // Don't account for existing streams memory + // as we are not holding the memory for them + let mut sorted_streams = mem::take(&mut self.sorted_streams); let is_only_merging_memory_streams = sorted_spill_files.is_empty(); @@ -311,14 +337,14 @@ impl MultiLevelMergeBuilder { "when only merging memory streams, we should not have any memory reservation and let the merge sort handle the memory" ); - Ok(merge_sort_stream) + Ok(MergeStep::Stream(merge_sort_stream)) } else { // Attach the memory reservation to the stream to make sure we have enough memory // throughout the merge process as we bypassed the memory pool for the merge sort stream - Ok(Box::pin(StreamAttachedReservation::new( + Ok(MergeStep::Stream(Box::pin(StreamAttachedReservation::new( merge_sort_stream, memory_reservation, - ))) + )))) } } } @@ -370,7 +396,7 @@ impl MultiLevelMergeBuilder { buffer_len: usize, minimum_number_of_required_streams: usize, reservation: &mut MemoryReservation, - ) -> Result<(Vec, usize)> { + ) -> Result { assert_ne!(buffer_len, 0, "Buffer length must be greater than 0"); let mut number_of_spills_to_read_for_current_phase = 0; // Track total memory needed for spill file buffers. When the @@ -412,7 +438,24 @@ impl MultiLevelMergeBuilder { ); } - return Err(err); + // buffer_len == 1 and we still can't seat the minimum of 2 streams. + if number_of_spills_to_read_for_current_phase == 0 { + // We couldn't even reserve a single stream - one record batch + // is larger than the whole merge budget. That's the lone-batch + // case, not the 2-stream merge skew we rescue here - surface it. + return Err(err); + } + + // We seated one stream (index 0) but not the second (index 1, the + // batch that just failed to reserve). Those are by definition the + // only two streams we are trying to merge, so re-spill the larger + // of them with a smaller batch size and retry, the smaller max + // batch lowers the per-stream reservation enough to seat both. + let split_index = usize::from( + self.sorted_spill_files[1].max_record_batch_memory + > self.sorted_spill_files[0].max_record_batch_memory, + ); + return Ok(SpillFilesToMerge::SplitThenRetry(split_index)); } // We reached the maximum amount of memory we can use @@ -427,8 +470,111 @@ impl MultiLevelMergeBuilder { .drain(..number_of_spills_to_read_for_current_phase) .collect::>(); - Ok((spills, buffer_len)) + Ok(SpillFilesToMerge::Ready(spills, buffer_len)) + } + + /// Re-spill the spill file at `index` with half its batch size, putting it back + /// at the same position. We read the file back and re-spill it through the normal + /// spill API (which owns batch layout). + /// Slicing each batch in two halves the largest written batch, + /// which lowers the per-stream merge reservation so the + /// next attempt can seat both streams. One stream's worth of memory is reserved + /// for the duration and freed afterwards. Makes the merge resilient to skew. + async fn split_spill_file_in_half(&mut self, index: usize) -> Result<()> { + let target = self.sorted_spill_files.remove(index); + let old_max = target.max_record_batch_memory; + + // Reserve enough to hold a single stream of this file while we re-spill it. + let reservation = self.reservation.new_empty(); + reservation + .try_grow(get_reserved_bytes_for_record_batch_size(old_max, old_max))?; + + let source = self + .spill_manager + .read_spill_as_stream(target.file, Some(old_max))?; + // Re-spill with half the batch size: slice every batch in two. The spill + // writer owns the batch layout, we only change how many rows per batch. + let mut halved: SendableRecordBatchStream = + Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + source.flat_map(|batch| { + futures::stream::iter(match batch { + Ok(batch) => split_batch_in_half(batch) + .into_iter() + .map(Ok) + .collect::>(), + Err(e) => vec![Err(e)], + }) + }), + )); + + let result = self + .spill_manager + .spill_record_batch_stream_and_return_max_batch_memory( + &mut halved, + "MultiLevelMergeBuilder split skewed spill", + ) + .await?; + + reservation.free(); + + let Some((file, new_max)) = result else { + return internal_err!("re-spilling a skewed spill file produced no data"); + }; + + // If halving could not reduce the largest batch (e.g. a single row that is + // itself wider than the budget), there is nothing more we can do - surface + // the out-of-memory condition instead of looping forever. + if new_max >= old_max { + return resources_err!( + "Cannot merge sorted runs: a single record batch of {old_max} bytes \ + exceeds the available merge memory and cannot be split further" + ); + } + + // Also halve the merge output batch size so the next merge pass emits + // narrower batches. Otherwise the merged stream would rebuild a full-size + // (potentially giant) batch and, when spilled back as an intermediate run, + // reintroduce the exact skew we just resolved. + self.batch_size = (self.batch_size / 2).max(1); + + self.sorted_spill_files.insert( + index, + SortedSpillFile { + file, + max_record_batch_memory: new_max, + }, + ); + + Ok(()) + } +} + +/// Outcome of trying to reserve memory for one multi-level merge pass. +enum SpillFilesToMerge { + /// Enough memory: the spill files to read this pass and the read-ahead buffer size. + Ready(Vec, usize), + /// Could not seat the minimum of 2 streams. Re-spill the spill file at this index + /// with a smaller (halved) batch size, then retry the pass. + SplitThenRetry(usize), +} + +/// What one iteration of the multi-level merge loop should do next. +enum MergeStep { + /// A merged stream is ready to be consumed (and possibly spilled back). + Stream(SendableRecordBatchStream), + /// Re-spill the spill file at this index smaller, then retry the merge step. + SplitThenRetry(usize), +} + +/// Slice `batch` into two row-halves so a re-spill writes batches half the size. +fn split_batch_in_half(batch: RecordBatch) -> Vec { + let num_rows = batch.num_rows(); + if num_rows <= 1 { + return vec![batch]; } + let mid = num_rows / 2; + vec![batch.slice(0, mid), batch.slice(mid, num_rows - mid)] } struct StreamAttachedReservation { @@ -481,3 +627,211 @@ impl RecordBatchStream for StreamAttachedReservation { self.stream.schema() } } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::expressions::PhysicalSortExpr; + use arrow::array::{AsArray, Int64Array}; + use arrow::compute::concat_batches; + use arrow::datatypes::{DataType, Field, Int64Type, Schema}; + use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryConsumer, MemoryPool, + }; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_expr_common::metrics::{ + ExecutionPlanMetricsSet, SpillMetrics, + }; + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])) + } + + fn build_spill_manager(env: &Arc, schema: &SchemaRef) -> SpillManager { + SpillManager::new( + Arc::clone(env), + SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0), + Arc::clone(schema), + ) + } + + /// Spill `values` (which must already be sorted) as a single sorted run and + /// return it as a `SortedSpillFile` carrying its recorded largest-batch memory. + fn make_sorted_spill_file( + spill_manager: &SpillManager, + schema: &SchemaRef, + values: Vec, + ) -> SortedSpillFile { + let batch = RecordBatch::try_new( + Arc::clone(schema), + vec![Arc::new(Int64Array::from(values))], + ) + .unwrap(); + let batches: Vec> = vec![Ok(batch)]; + let (file, max_record_batch_memory) = spill_manager + .spill_record_batch_iter_and_return_max_batch_memory( + batches.into_iter(), + "test input run", + ) + .unwrap() + .expect("spill should produce a file"); + SortedSpillFile { + file, + max_record_batch_memory, + } + } + + fn build_merge_builder( + spill_manager: SpillManager, + schema: SchemaRef, + sorted_spill_files: Vec, + pool: &Arc, + batch_size: usize, + ) -> MultiLevelMergeBuilder { + let reservation = MemoryConsumer::new("test merge").register(pool); + let expr: LexOrdering = + [PhysicalSortExpr::new_default(Arc::new(Column::new("x", 0)))].into(); + MultiLevelMergeBuilder::new( + spill_manager, + schema, + sorted_spill_files, + vec![], + expr, + BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0), + batch_size, + reservation, + None, + false, + ) + } + + /// Proves the fix: two sorted runs whose largest batches are too big to both + /// be seated in the merge budget at once are re-spilled (halved) until they + /// fit, and the merge then completes with fully sorted, complete output. + /// Before the fix this returned `ResourcesExhausted` instead of merging. + #[tokio::test] + async fn skewed_runs_are_respilled_so_the_merge_fits() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let schema = test_schema(); + let spill_manager = build_spill_manager(&env, &schema); + + let n: i64 = 16384; + let f0 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let f1 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let m = f0.max_record_batch_memory.max(f1.max_record_batch_memory); + + // Seating two streams needs ~4*m (2*m each), which does NOT fit, but the + // budget is large enough once a run is halved. The rescue keeps halving + // the blocking run until two streams fit (here, after one halving). + let pool: Arc = Arc::new(GreedyMemoryPool::new(m * 7 / 2)); + + let builder = build_merge_builder( + spill_manager, + Arc::clone(&schema), + vec![f0, f1], + &pool, + 8192, + ); + let stream = builder.create_spillable_merge_stream(); + let batches: Vec = stream.try_collect().await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, + (2 * n) as usize, + "the merge must emit every input row" + ); + + let merged = concat_batches(&schema, &batches)?; + let col = merged.column(0).as_primitive::(); + for i in 1..col.len() { + assert!( + col.value(i - 1) <= col.value(i), + "merge output must be sorted: {} > {} at {i}", + col.value(i - 1), + col.value(i), + ); + } + + Ok(()) + } + + /// Tests the `new_max >= old_max` guard: a single-row run cannot be split + /// any smaller, so re-spilling it does not shrink the largest batch and the + /// rescue surfaces `ResourcesExhausted` rather than looping forever. + #[tokio::test] + async fn respilling_an_unsplittable_run_surfaces_resources_exhausted() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let schema = test_schema(); + let spill_manager = build_spill_manager(&env, &schema); + + // A one-row run: `split_batch_in_half` returns it unchanged, so the + // re-spilled file's largest batch cannot drop below the original. + let f0 = make_sorted_spill_file(&spill_manager, &schema, vec![42]); + + // Ample budget so the only possible failure is the un-splittable guard, + // not the single-stream reservation itself. + let pool: Arc = Arc::new(GreedyMemoryPool::new(1024 * 1024)); + let mut builder = + build_merge_builder(spill_manager, schema, vec![f0], &pool, 1024); + + let err = builder + .split_spill_file_in_half(0) + .await + .expect_err("re-spilling a one-row run cannot shrink it"); + assert!( + err.to_string().contains("cannot be split further"), + "expected the un-splittable guard error, got: {err}" + ); + + Ok(()) + } + + /// Proves the re-spill also halves the merge output batch size: after one + /// re-spill the merged run is emitted in 4096-row batches (not the original + /// 8192), so it cannot rebuild a full-size batch and reintroduce the skew. + #[tokio::test] + async fn respill_halves_the_merge_output_batch_size() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let schema = test_schema(); + let spill_manager = build_spill_manager(&env, &schema); + + let n: i64 = 16384; + let f0 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let f1 = make_sorted_spill_file(&spill_manager, &schema, (0..n).collect()); + let m = f0.max_record_batch_memory.max(f1.max_record_batch_memory); + + // 3.5*m forces exactly one re-spill (split one run, then both fit), which + // halves the merge output batch size. + let initial_batch_size = 8192; + let pool: Arc = Arc::new(GreedyMemoryPool::new(m * 7 / 2)); + + let builder = build_merge_builder( + spill_manager, + Arc::clone(&schema), + vec![f0, f1], + &pool, + initial_batch_size, + ); + let stream = builder.create_spillable_merge_stream(); + let batches: Vec = stream.try_collect().await?; + + // All rows are still present. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, (2 * n) as usize); + + // The largest emitted batch is the halved size, not the original 8192 — + // without halving `self.batch_size` the merge would rebuild 8192-row batches. + let expected_batch_size = initial_batch_size / 2; + let max_batch_rows = batches.iter().map(|b| b.num_rows()).max().unwrap_or(0); + assert_eq!( + max_batch_rows, expected_batch_size, + "after one re-spill the merge must emit {expected_batch_size}-row \ + batches, got a largest batch of {max_batch_rows} rows" + ); + + Ok(()) + } +}