Skip to content

feat: Add support for IPC stream format shuffle files (native shuffle) [experimental]#3837

Draft
andygrove wants to merge 24 commits intoapache:mainfrom
andygrove:ipc-stream-shuffle-writer
Draft

feat: Add support for IPC stream format shuffle files (native shuffle) [experimental]#3837
andygrove wants to merge 24 commits intoapache:mainfrom
andygrove:ipc-stream-shuffle-writer

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Mar 29, 2026

Which issue does this PR close?

Closes #2928

Includes standalone shuffle benchmark binary from #3752

Rationale for this change

Experimenting

Single-partition writes are ~2x faster with IPC stream format; multi-partition (200 partitions) performance is comparable (~1-2% slower, ~5% larger output due to per-buffer vs whole-stream compression)

What changes are included in this PR?

  • Add IpcStreamWriter as an alternative to ShuffleBlockWriter that writes standard Arrow IPC streams with built-in body compression (LZ4/ZSTD), writing the schema once per partition instead of once per batch
  • Add spark.comet.exec.shuffle.format config (block or ipc_stream) to toggle between the existing custom block format and the new IPC stream format
  • Wire the format choice end-to-end: CometConf → protobuf → planner → ShuffleWriterExec → partitioners → spill writers → reader
  • Update the shuffle reader (CometShuffleBlockIterator + ShuffleScanExec) to auto-detect and decode both formats, streaming IPC batches lazily via Arrow's StreamReader instead of materializing all batches at once
  • For spilling, keep the IpcStreamWriter open across multiple spill calls so each partition's spill file contains a single IPC stream (no concatenation issues), then raw-copy it to the output file
  • Add --format flag to the standalone shuffle benchmark binary

How are these changes tested?

Add a `shuffle_bench` binary that benchmarks shuffle write and read
performance independently from Spark, making it easy to profile with
tools like `cargo flamegraph`, `perf`, or `instruments`.

Supports reading Parquet files (e.g. TPC-H/TPC-DS) or generating
synthetic data with configurable schema. Covers different scenarios
including compression codecs, partition counts, partitioning schemes,
and memory-constrained spilling.
…arquet

- Add `spark.comet.exec.shuffle.maxBufferedBatches` config to limit
  the number of batches buffered before spilling, allowing earlier
  spilling to reduce peak memory usage on executors
- Fix too-many-open-files: close spill file FD after each spill and
  reopen in append mode, rather than holding one FD open per partition
- Refactor shuffle_bench to stream directly from Parquet instead of
  loading all input data into memory; remove synthetic data generation
- Add --max-buffered-batches CLI arg to shuffle_bench
- Add shuffle benchmark documentation to README
Merge latest from apache/main, resolve conflicts, and strip out
COMET_SHUFFLE_MAX_BUFFERED_BATCHES config and all related plumbing.
This branch now only adds the shuffle benchmark binary.
Spawns N parallel shuffle tasks to simulate executor parallelism.
Each task reads the same input and writes to its own output files.
Extracts core shuffle logic into shared async helper to avoid
code duplication between single and concurrent paths.
Add IpcStreamWriter that writes standard Arrow IPC streams with
built-in body compression (LZ4_FRAME/ZSTD), where the schema is
written once per stream instead of per batch.

Add spark.comet.exec.shuffle.format config ("block" or "ipc_stream")
to toggle between the custom block format and the new IPC stream
format. Wire the setting through protobuf, planner, and both
single-partition and multi-partition shuffle writers including spill
support.

Benchmark results (200 partitions, LZ4, 10M rows TPC-H lineitem):
- Block:      2.64M rows/s, 609 MiB
- IPC stream: 2.60M rows/s, 634 MiB
- Single partition: IPC stream is 2x faster
Previously each spill() call for IPC stream format wrote a complete
IPC stream (schema + batches + EOS) to the spill file. Multiple
spills produced concatenated streams, and readers would stop at the
first EOS marker, silently dropping subsequent data.

Fix by keeping the IpcStreamWriter alive on PartitionWriter across
spill calls, finishing it only in shuffle_write via finish_spill().

Also:
- Remove try_clone() fd waste in SinglePartitionShufflePartitioner
- Hoist spill file copy above format match (shared by both formats)
- Remove extra blank line
IPC stream format data is length-prefixed (8-byte LE length followed
by the Arrow IPC stream bytes). The reader auto-detects the format:
block format buffers start with a codec prefix (SNAP/LZ4_/ZSTD/NONE),
IPC stream buffers start with Arrow continuation (0xFFFFFFFF).

Changes:
- IpcStreamWriter: add try_new_length_prefixed/finish_length_prefixed
  for writer-side framing
- CometShuffleBlockIterator: accept isIpcStream flag, read 8-byte
  length header (no field count) for IPC stream format
- ShuffleScanExec: auto-detect format from buffer, decode IPC streams
  with read_ipc_stream(), buffer multiple batches from one stream
- Update contributor guide with format documentation
Replace Vec<RecordBatch> buffering with a persistent StreamReader
that yields one batch per get_next_batch() call. The IPC stream
bytes are copied into owned memory and a StreamReader is created
on first access; subsequent calls iterate the reader without
another JNI round-trip or bulk materialization.
The columnar shuffle read path (NativeBatchDecoderIterator) was not
updated for IPC stream format, causing NegativeArraySizeException
when the IPC stream continuation token (0xFFFFFFFF) was interpreted
as field count = -1.

Fix by:
- Adding numColumns parameter to NativeBatchDecoderIterator, passed
  from the dependency schema
- For IPC stream format, skip the 8-byte field count header read and
  use schema-derived column count instead
- Auto-detect format in native decodeShuffleBlock JNI function
@andygrove andygrove marked this pull request as draft March 29, 2026 17:32
dep.schema is None for native shuffle, causing numColumns=-1. Fall
back to dep.outputAttributes.length which is always set. Also run
prettier on native_shuffle.md.
Cover the three bugs found during integration:
- Empty IPC stream (no batches): must be EOF, not error
- Length-prefixed roundtrip and multiple back-to-back streams
- IPC stream shuffle write with many partitions (some empty)
- IPC stream shuffle write with spilling

New tests: 26 total (up from 18).
- Add with_skip_validation(true) to StreamReader for IPC stream shuffle
  blocks, matching the block format behavior. Data was just written by
  the same process so validation is unnecessary overhead.

- Fix IPC stream spill path returning 0 bytes written, which broke
  memory accounting. Now tracks actual bytes via stream position.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Optimize native shuffle to write schema once per output partition instead of once per batch

1 participant