fix: improve memory accounting in shuffle write [WIP]#3824
Draft
andygrove wants to merge 12 commits intoapache:mainfrom
Draft
fix: improve memory accounting in shuffle write [WIP]#3824andygrove wants to merge 12 commits intoapache:mainfrom
andygrove wants to merge 12 commits intoapache:mainfrom
Conversation
Add a `shuffle_bench` binary that benchmarks shuffle write and read performance independently from Spark, making it easy to profile with tools like `cargo flamegraph`, `perf`, or `instruments`. Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating synthetic data with configurable schema. Covers different scenarios including compression codecs, partition counts, partitioning schemes, and memory-constrained spilling.
…arquet - Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit the number of batches buffered before spilling, allowing earlier spilling to reduce peak memory usage on executors - Fix too-many-open-files: close spill file FD after each spill and reopen in append mode, rather than holding one FD open per partition - Refactor shuffle_bench to stream directly from Parquet instead of loading all input data into memory; remove synthetic data generation - Add --max-buffered-batches CLI arg to shuffle_bench - Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing. This branch now only adds the shuffle benchmark binary.
The shuffle writer's memory reservation only tracks buffered RecordBatches and partition indices, but significant untracked allocations occur in the write path: BufBatchWriter buffers, BatchCoalescer state, Arrow IPC serialization, compression encoder state, and interleave_record_batch output. This caused actual RSS to be 1.5-1.8x the configured memory limit. Apply a 2x multiplier to the reservation growth to trigger spills earlier, keeping total process memory closer to the configured limit. Closes apache#3821
Member
Author
|
I plan on running benchmarks at scale next week and experiment with different amounts of off heap memory. Leaving as draft until I can do this. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3821.
Rationale for this change
The shuffle writer's memory reservation doesn't match the reality of how much memory is actually used. There are untracked allocations.
These untracked allocations cause actual RSS to significantly exceed the configured memory pool limit, because the pool doesn't know about them when making spill decisions.
What changes are included in this PR?
Apply a 2x multiplier to
mem_growthwhen reserving memory inbuffer_partitioned_batch_may_spill. This triggers spills earlier, keeping total process memory closer to the configured limit.How are these changes tested?
Benchmarked with
shuffle_benchon TPCH SF100 lineitem (100M rows, 16 columns, hash partitioning with 200 partitions, lz4 compression) at memory limits of 2 GB, 4 GB, 8 GB, and 16 GB.Before
After (2x multiplier)
Peak RSS is reduced by 26-40% across all configurations while throughput is equal or better. The 4 GB case now stays within the configured limit (0.94x). All existing tests pass.