feat: add immediate-mode shuffle partitioner [experimental]#3838
Closed
andygrove wants to merge 21 commits intoapache:mainfrom
Closed
feat: add immediate-mode shuffle partitioner [experimental]#3838andygrove wants to merge 21 commits intoapache:mainfrom
andygrove wants to merge 21 commits intoapache:mainfrom
Conversation
Add a `shuffle_bench` binary that benchmarks shuffle write and read performance independently from Spark, making it easy to profile with tools like `cargo flamegraph`, `perf`, or `instruments`. Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating synthetic data with configurable schema. Covers different scenarios including compression codecs, partition counts, partitioning schemes, and memory-constrained spilling.
…arquet - Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit the number of batches buffered before spilling, allowing earlier spilling to reduce peak memory usage on executors - Fix too-many-open-files: close spill file FD after each spill and reopen in append mode, rather than holding one FD open per partition - Refactor shuffle_bench to stream directly from Parquet instead of loading all input data into memory; remove synthetic data generation - Add --max-buffered-batches CLI arg to shuffle_bench - Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing. This branch now only adds the shuffle benchmark binary.
Add ImmediateShufflePartitioner that repartitions each incoming batch using arrow take and writes per-partition data directly to individual temp files, avoiding in-memory buffering of input batches. At shuffle_write time, per-partition files are concatenated into the final shuffle data file and index. Extract ScratchSpace and partition ID computation (hash, range, round-robin) into shared scratch module, eliminating duplication between multi_partition and immediate_partition implementations. Configurable via spark.comet.exec.shuffle.nativeMode (default/immediate).
Replace per-partition take calls (P*C = 3200 for 200 partitions, 16 columns) with a single take per column to reorder the entire batch, then zero-copy RecordBatch::slice per partition. Also remove BufReader wrapper in shuffle_write copy loop to enable kernel zero-copy on Linux. Add --shuffle-mode flag to shuffle_bench binary. Benchmark (10M rows, 200 partitions, hash on cols 0,3, zstd): default: 4.804s avg (2.08M rows/s) immediate: 4.185s avg (2.39M rows/s) -- 13% faster
Replace per-partition temp files in ImmediateShufflePartitioner with Vec<u8> buffers holding compressed IPC blocks. Eliminates 200 temp file creates/writes/reads per task and the double-write overhead that caused 2x slowdown on clusters with concurrent tasks. Add --concurrent-tasks flag to shuffle_bench for realistic executor simulation. Benchmark (10M rows, 200 partitions, hash on cols 0,3, zstd): Single task: buffered 4.78s vs immediate 3.97s (-17%) 8 concurrent tasks: buffered 6.75s vs immediate 5.94s (-12%)
Register MemoryReservation with DataFusion's memory pool. After each batch, track in-memory buffer growth. When try_grow fails (memory pressure), spill all partition buffers to per-partition temp files, clear the Vec<u8> buffers, and free the reservation. In shuffle_write, copy spill files first then remaining in-memory data. Benchmark (10M rows, 200 partitions, zstd): No limit: 3.98s (2.51M rows/s) 50MB limit: 4.33s (2.31M rows/s) -- 9% spilling overhead
- Replace leaky inner_ref/inner_mut/inner_buffer_len methods on BufBatchWriter with encapsulated Cursor<Vec<u8>>-specific methods: buffered_output_size, reset_output_buffer, output_bytes - Extract write_index_file utility, replacing duplicate code in all three partitioners - Rename _temp_file to temp_file (RAII guard, not unused) - Remove unnecessary comments
The take-then-slice approach (single take to reorder entire batch, then zero-copy slice per partition) caused the BatchCoalescer to copy all data again via copy_rows, resulting in 2x data movement. When combined with concat_batches (attempted optimization), it produced 66 GiB output instead of 396 MiB due to missing coalescing. Switch to per-partition take with shared UInt32Array indices (sliced per partition to avoid allocation). Each partition's small batch (~40 rows) goes through BufBatchWriter's BatchCoalescer which handles proper IPC block coalescing and compression. Benchmark (10M rows, 200 partitions, zstd): buffered: 4.84s (2.07M rows/s) immediate: 4.38s (2.28M rows/s) -- 10% faster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
The current "buffered" shuffle mode collects all input batches into memory before partitioning and writing. This works well for small-to-medium datasets but can cause high memory usage and spill pressure for large shuffles. An "immediate" mode that partitions each batch as it arrives can reduce peak memory and improve throughput for many workloads.
What changes are included in this PR?
This is an experimental implementation of an immediate-mode shuffle partitioner. Key changes:
ImmediatePartitionerthat partitions each input batch on arrival using a single take-then-slice approach, avoiding full-batch bufferingspark.comet.exec.shuffle.nativeModeconfig option withbuffered(default) andimmediatemodesshuffle_bench)How are these changes tested?
Existing shuffle tests pass with both modes. The standalone shuffle benchmark binary can be used for performance comparison and profiling.