Skip to content

perf: eagerly flush shuffle partitions once batch_size rows accumulated [experimental]#3839

Closed
andygrove wants to merge 13 commits intoapache:mainfrom
andygrove:eager-flush
Closed

perf: eagerly flush shuffle partitions once batch_size rows accumulated [experimental]#3839
andygrove wants to merge 13 commits intoapache:mainfrom
andygrove:eager-flush

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

N/A - experimental performance optimization

Rationale for this change

The current multi-partition shuffle partitioner buffers all input batches in memory until shuffle_write() is called at the end. This requires large offheap memory allocations, especially at higher scale factors.

This PR modifies the multi-partition partitioner to eagerly flush a partition's accumulated row indices into compressed IPC output (via the existing spill infrastructure) as soon as that partition reaches batch_size rows. This bounds the index memory per partition while maintaining the same output format and performance as the current approach.

What changes are included in this PR?

  • In buffer_partitioned_batch_may_spill, after accumulating row indices, check if any partition has reached batch_size rows and flush it immediately via flush_partition()
  • New flush_partition() method that creates a PartitionedBatchIterator for a single partition, spills the produced batches to disk, and clears the indices
  • Made PartitionedBatchIterator::new visible to sibling modules

How are these changes tested?

Benchmarked on TPC-H SF1000 (Q1):

Full TPC-H SF1000 run in progress.

Add a `shuffle_bench` binary that benchmarks shuffle write and read
performance independently from Spark, making it easy to profile with
tools like `cargo flamegraph`, `perf`, or `instruments`.

Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating
synthetic data with configurable schema. Covers different scenarios
including compression codecs, partition counts, partitioning schemes,
and memory-constrained spilling.
…arquet

- Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit
  the number of batches buffered before spilling, allowing earlier
  spilling to reduce peak memory usage on executors
- Fix too-many-open-files: close spill file FD after each spill and
  reopen in append mode, rather than holding one FD open per partition
- Refactor shuffle_bench to stream directly from Parquet instead of
  loading all input data into memory; remove synthetic data generation
- Add --max-buffered-batches CLI arg to shuffle_bench
- Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out
COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing.
This branch now only adds the shuffle benchmark binary.
Spawns N parallel shuffle tasks to simulate executor parallelism.
Each task reads the same input and writes to its own output files.
Extracts core shuffle logic into shared async helper to avoid
code duplication between single and concurrent paths.
@andygrove andygrove closed this Mar 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant