Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,43 @@ async fn test_sort_with_limited_memory_and_large_record_batch() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_sort_with_limited_memory_and_oversized_record_batch() -> Result<()> {
let record_batch_size = 8192;
let pool_size = 2 * MB as usize;
let task_ctx = {
let memory_pool = Arc::new(FairSpillPool::new(pool_size));
TaskContext::default()
.with_session_config(
SessionConfig::new()
.with_batch_size(record_batch_size)
.with_sort_spill_reservation_bytes(1),
)
.with_runtime(Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(memory_pool)
.build()?,
))
};

// Each spilled run's largest batch is so big that two merge streams cannot be
// reserved at once even at the smallest read-buffer size (`2 * (2 * batch) >
// pool`), yet a single stream still fits (`2 * batch < pool`). Reducing the
// buffer size therefore cannot help, the multi-level merge has to re-spill a
// run with a smaller batch size to make progress instead of failing with
// `ResourcesExhausted`.
run_sort_test_with_limited_memory(RunTestWithLimitedMemoryArgs {
pool_size,
task_ctx: Arc::new(task_ctx),
number_of_record_batches: 100,
get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 3),
memory_behavior: Default::default(),
})
.await?;

Ok(())
}

struct RunTestWithLimitedMemoryArgs {
pool_size: usize,
task_ctx: Arc<TaskContext>,
Expand Down
Loading
Loading