Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ae0741b
feat: add standalone shuffle benchmark binary for profiling
andygrove Mar 21, 2026
9b5b305
feat: add --limit option to shuffle benchmark (default 1M rows)
andygrove Mar 21, 2026
e1ab490
perf: apply limit during parquet read to avoid scanning all files
andygrove Mar 21, 2026
b7682f4
feat: move shuffle_bench binary into shuffle crate
andygrove Mar 23, 2026
ca36cbd
chore: add comment explaining parquet/rand deps in shuffle crate
andygrove Mar 23, 2026
7225afd
Merge remote-tracking branch 'apache/main' into shuffle-bench-binary
andygrove Mar 26, 2026
6e8bed2
perf: add max_buffered_batches config and stream shuffle bench from p…
andygrove Mar 26, 2026
16ce30f
merge apache/main, remove max_buffered_batches changes
andygrove Mar 27, 2026
2ef57e7
cargo fmt
andygrove Mar 27, 2026
9136e10
prettier
andygrove Mar 27, 2026
7e16819
machete
andygrove Mar 27, 2026
22fe804
feat: add --concurrent-tasks flag to shuffle benchmark
andygrove Mar 28, 2026
a82b369
feat: add IPC stream shuffle writer as alternative to block format
andygrove Mar 29, 2026
9aeaea7
fix: keep IPC spill stream open across spill calls
andygrove Mar 29, 2026
31b0861
feat: update shuffle reader to handle IPC stream format
andygrove Mar 29, 2026
7819d1a
refactor: stream IPC batches lazily instead of materializing all
andygrove Mar 29, 2026
aa911fd
style: apply formatting
andygrove Mar 29, 2026
01e081a
chore: remove benchmark scripts
andygrove Mar 29, 2026
e39fd1f
fix: update NativeBatchDecoderIterator for IPC stream format
andygrove Mar 29, 2026
1c2d189
fix: get column count from outputAttributes for native shuffle
andygrove Mar 29, 2026
16fd5a6
fix: return EOF for empty IPC streams in decodeShuffleBlock
andygrove Mar 29, 2026
4057759
test: add IPC stream format regression tests
andygrove Mar 29, 2026
9f89f61
perf: skip validation on IPC stream reader, fix spill bytes accounting
andygrove Mar 29, 2026
d6dc301
style: apply formatting
andygrove Mar 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,18 @@ object CometConf extends ShimCometConf {
"The maximum number of columns to hash for round robin partitioning must be non-negative.")
.createWithDefault(0)

val COMET_EXEC_SHUFFLE_FORMAT: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.format")
.category(CATEGORY_SHUFFLE)
.doc(
"The format used for writing shuffle data. 'block' uses a custom block format with " +
"per-batch headers and supports all compression codecs (lz4, zstd, snappy). " +
"'ipc_stream' uses standard Arrow IPC stream format with the schema written once " +
"and supports lz4 and zstd compression (snappy is not supported).")
.stringConf
.checkValues(Set("block", "ipc_stream"))
.createWithDefault("block")

val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
.category(CATEGORY_SHUFFLE)
Expand Down
67 changes: 65 additions & 2 deletions docs/source/contributor-guide/native_shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ Native shuffle (`CometExchange`) is selected when all of the following condition
┌───────────────────────────────────┐
│ ShuffleBlockWriter │
│ ShuffleBlockWriter (block format) │
│ IpcStreamWriter (stream format) │
│ (Arrow IPC + compression) │
└───────────────────────────────────┘
Expand Down Expand Up @@ -215,6 +216,64 @@ The `MultiPartitionShuffleRepartitioner` manages:
- `SpillFile`: Temporary file for spilled data
- Memory tracking via `MemoryConsumer` trait

## Shuffle Format

Native shuffle supports two data formats, configured via `spark.comet.exec.shuffle.format`:

### Block Format (default)

Each batch is written as a self-contained block:

```
[8 bytes: block length] [8 bytes: field count] [4 bytes: codec] [compressed Arrow IPC stream]
```

- The Arrow IPC stream inside each block contains the schema, one batch, and an EOS marker.
- Compression wraps the entire IPC stream (schema + batch).
- Supports all codecs: lz4, zstd, snappy.
- Reader parses the length-prefixed blocks sequentially.

This format is implemented by `ShuffleBlockWriter` in `native/shuffle/src/writers/shuffle_block_writer.rs`.

### IPC Stream Format

Each partition's data is written as a standard Arrow IPC stream:

```
[schema message] [batch message 1] [batch message 2] ... [EOS marker]
```

- The schema is written once per partition (not per batch), reducing overhead.
- Uses Arrow's built-in IPC body compression (per-buffer compression within each message).
- Supports lz4 and zstd codecs. Snappy is not supported (not part of the Arrow IPC spec).
- Standard format readable by any Arrow-compatible tool.
- Small batches are coalesced before writing to reduce per-message IPC overhead.

This format is implemented by `IpcStreamWriter` in `native/shuffle/src/writers/ipc_stream_writer.rs`.

### Spill Behavior

Both formats use the same spill strategy: when memory pressure triggers a spill, partitioned
data is written to per-partition temporary files. During the final `shuffle_write`:

- **Block format**: Spill file bytes are raw-copied to the output, then remaining in-memory
batches are written as additional blocks.
- **IPC stream format**: The `IpcStreamWriter` is kept open across spill calls so that all
spilled data for a partition forms a single IPC stream. The stream is finished and raw-copied
to the output, then remaining in-memory batches are written as a second IPC stream.

### Performance Comparison

Benchmark results (200 hash partitions, LZ4, TPC-H SF100 lineitem):

| Metric | Block | IPC Stream |
| ---------- | ----------- | ----------- |
| Throughput | 2.40M row/s | 2.37M row/s |
| Output | 61 MiB | 64 MiB |

For single-partition writes, IPC stream is ~2x faster since the schema is written only once
instead of per batch.

## Compression

Native shuffle supports multiple compression codecs configured via
Expand All @@ -230,13 +289,17 @@ Native shuffle supports multiple compression codecs configured via
The compression codec is applied uniformly to all partitions. Each partition's data is
independently compressed, allowing parallel decompression during reads.

Note: The `snappy` codec is only available with block format. IPC stream format supports
`lz4` and `zstd` only.

## Configuration

| Config | Default | Description |
| ------------------------------------------------- | ------- | ---------------------------------------- |
| `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle |
| `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` |
| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec |
| `spark.comet.exec.shuffle.format` | `block` | Data format: `block` or `ipc_stream` |
| `spark.comet.exec.shuffle.compression.codec` | `lz4` | Compression codec |
| `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level |
| `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size |
| `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch |
Expand Down
Loading
Loading