diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index bfe90181ff..b15b6a3339 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -430,6 +430,18 @@ object CometConf extends ShimCometConf { "The maximum number of columns to hash for round robin partitioning must be non-negative.") .createWithDefault(0) + val COMET_EXEC_SHUFFLE_FORMAT: ConfigEntry[String] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.format") + .category(CATEGORY_SHUFFLE) + .doc( + "The format used for writing shuffle data. 'block' uses a custom block format with " + + "per-batch headers and supports all compression codecs (lz4, zstd, snappy). " + + "'ipc_stream' uses standard Arrow IPC stream format with the schema written once " + + "and supports lz4 and zstd compression (snappy is not supported).") + .stringConf + .checkValues(Set("block", "ipc_stream")) + .createWithDefault("block") + val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") .category(CATEGORY_SHUFFLE) diff --git a/docs/source/contributor-guide/native_shuffle.md b/docs/source/contributor-guide/native_shuffle.md index 18e80a90c8..93e63e6235 100644 --- a/docs/source/contributor-guide/native_shuffle.md +++ b/docs/source/contributor-guide/native_shuffle.md @@ -88,7 +88,8 @@ Native shuffle (`CometExchange`) is selected when all of the following condition │ ▼ ┌───────────────────────────────────┐ -│ ShuffleBlockWriter │ +│ ShuffleBlockWriter (block format) │ +│ IpcStreamWriter (stream format) │ │ (Arrow IPC + compression) │ └───────────────────────────────────┘ │ @@ -215,6 +216,64 @@ The `MultiPartitionShuffleRepartitioner` manages: - `SpillFile`: Temporary file for spilled data - Memory tracking via `MemoryConsumer` trait +## Shuffle Format + +Native shuffle supports two data formats, configured via `spark.comet.exec.shuffle.format`: + +### Block Format (default) + +Each batch is written as a self-contained block: + +``` +[8 bytes: block length] [8 bytes: field count] [4 bytes: codec] [compressed Arrow IPC stream] +``` + +- The Arrow IPC stream inside each block contains the schema, one batch, and an EOS marker. +- Compression wraps the entire IPC stream (schema + batch). +- Supports all codecs: lz4, zstd, snappy. +- Reader parses the length-prefixed blocks sequentially. + +This format is implemented by `ShuffleBlockWriter` in `native/shuffle/src/writers/shuffle_block_writer.rs`. + +### IPC Stream Format + +Each partition's data is written as a standard Arrow IPC stream: + +``` +[schema message] [batch message 1] [batch message 2] ... [EOS marker] +``` + +- The schema is written once per partition (not per batch), reducing overhead. +- Uses Arrow's built-in IPC body compression (per-buffer compression within each message). +- Supports lz4 and zstd codecs. Snappy is not supported (not part of the Arrow IPC spec). +- Standard format readable by any Arrow-compatible tool. +- Small batches are coalesced before writing to reduce per-message IPC overhead. + +This format is implemented by `IpcStreamWriter` in `native/shuffle/src/writers/ipc_stream_writer.rs`. + +### Spill Behavior + +Both formats use the same spill strategy: when memory pressure triggers a spill, partitioned +data is written to per-partition temporary files. During the final `shuffle_write`: + +- **Block format**: Spill file bytes are raw-copied to the output, then remaining in-memory + batches are written as additional blocks. +- **IPC stream format**: The `IpcStreamWriter` is kept open across spill calls so that all + spilled data for a partition forms a single IPC stream. The stream is finished and raw-copied + to the output, then remaining in-memory batches are written as a second IPC stream. + +### Performance Comparison + +Benchmark results (200 hash partitions, LZ4, TPC-H SF100 lineitem): + +| Metric | Block | IPC Stream | +| ---------- | ----------- | ----------- | +| Throughput | 2.40M row/s | 2.37M row/s | +| Output | 61 MiB | 64 MiB | + +For single-partition writes, IPC stream is ~2x faster since the schema is written only once +instead of per batch. + ## Compression Native shuffle supports multiple compression codecs configured via @@ -230,13 +289,17 @@ Native shuffle supports multiple compression codecs configured via The compression codec is applied uniformly to all partitions. Each partition's data is independently compressed, allowing parallel decompression during reads. +Note: The `snappy` codec is only available with block format. IPC stream format supports +`lz4` and `zstd` only. + ## Configuration | Config | Default | Description | | ------------------------------------------------- | ------- | ---------------------------------------- | | `spark.comet.exec.shuffle.enabled` | `true` | Enable Comet shuffle | | `spark.comet.exec.shuffle.mode` | `auto` | Shuffle mode: `native`, `jvm`, or `auto` | -| `spark.comet.exec.shuffle.compression.codec` | `zstd` | Compression codec | +| `spark.comet.exec.shuffle.format` | `block` | Data format: `block` or `ipc_stream` | +| `spark.comet.exec.shuffle.compression.codec` | `lz4` | Compression codec | | `spark.comet.exec.shuffle.compression.zstd.level` | `1` | Zstd compression level | | `spark.comet.shuffle.write.buffer.size` | `1MB` | Write buffer size | | `spark.comet.columnar.shuffle.batch.size` | `8192` | Target rows per batch | diff --git a/native/Cargo.lock b/native/Cargo.lock index 598b18d58c..7e16f06fdf 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -96,11 +96,55 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" -version = "1.0.13" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] [[package]] name = "anyhow" @@ -135,9 +179,9 @@ dependencies = [ [[package]] name = "arc-swap" -version = "1.8.2" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9f3647c145568cec02c42054e07bdf9a5a698e15b466fb2341bfc393cd24aa5" +checksum = "a07d1f37ff60921c83bdfc7407723bdefe89b44b98a9b772f225c8f9d67141a6" dependencies = [ "rustversion", ] @@ -289,6 +333,7 @@ dependencies = [ "arrow-select", "flatbuffers", "lz4_flex 0.12.1", + "zstd", ] [[package]] @@ -600,9 +645,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.16.1" +version = "1.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bffc006df10ac2a68c83692d734a465f8ee6c5b384d8545a636f81d858f4bf" +checksum = "a054912289d18629dc78375ba2c3726a3afe3ff71b4edba9dedfca0e3446d1fc" dependencies = [ "aws-lc-sys", "zeroize", @@ -610,9 +655,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.38.0" +version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4321e568ed89bb5a7d291a7f37997c2c0df89809d7b6d12062c81ddb54aa782e" +checksum = "1fa7e52a4c5c547c741610a2c6f123f3881e409b714cd27e6798ef020c514f0a" dependencies = [ "cc", "cmake", @@ -647,9 +692,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.96.0" +version = "1.97.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f64a6eded248c6b453966e915d32aeddb48ea63ad17932682774eb026fbef5b1" +checksum = "9aadc669e184501caaa6beafb28c6267fc1baef0810fb58f9b205485ca3f2567" dependencies = [ "aws-credential-types", "aws-runtime", @@ -671,9 +716,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.98.0" +version = "1.99.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db96d720d3c622fcbe08bae1c4b04a72ce6257d8b0584cb5418da00ae20a344f" +checksum = "1342a7db8f358d3de0aed2007a0b54e875458e39848d54cc1d46700b2bfcb0a8" dependencies = [ "aws-credential-types", "aws-runtime", @@ -695,9 +740,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.100.0" +version = "1.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fafbdda43b93f57f699c5dfe8328db590b967b8a820a13ccdd6687355dfcc7ca" +checksum = "ab41ad64e4051ecabeea802d6a17845a91e83287e1dd249e6963ea1ba78c428a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -868,9 +913,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.6" +version = "1.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2b1117b3b2bbe166d11199b540ceed0d0f7676e36e7b962b5a437a9971eac75" +checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" dependencies = [ "base64-simd", "bytes", @@ -1100,9 +1145,9 @@ dependencies = [ [[package]] name = "bon" -version = "3.9.0" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d13a61f2963b88eef9c1be03df65d42f6996dfeac1054870d950fcf66686f83" +checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" dependencies = [ "bon-macros", "rustversion", @@ -1110,9 +1155,9 @@ dependencies = [ [[package]] name = "bon-macros" -version = "3.9.0" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d314cc62af2b6b0c65780555abb4d02a03dd3b799cd42419044f0c38d99738c0" +checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" dependencies = [ "darling 0.23.0", "ident_case", @@ -1326,38 +1371,59 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] name = "clap_builder" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] name = "clap_lex" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "cmake" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "combine" version = "4.6.7" @@ -1617,16 +1683,6 @@ dependencies = [ "darling_macro 0.20.11", ] -[[package]] -name = "darling" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" -dependencies = [ - "darling_core 0.21.3", - "darling_macro 0.21.3", -] - [[package]] name = "darling" version = "0.23.0" @@ -1651,20 +1707,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "darling_core" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim", - "syn 2.0.117", -] - [[package]] name = "darling_core" version = "0.23.0" @@ -1689,17 +1731,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "darling_macro" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" -dependencies = [ - "darling_core 0.21.3", - "quote", - "syn 2.0.117", -] - [[package]] name = "darling_macro" version = "0.23.0" @@ -1953,6 +1984,7 @@ dependencies = [ "arrow", "async-trait", "bytes", + "clap", "crc32fast", "criterion", "datafusion", @@ -1964,6 +1996,7 @@ dependencies = [ "jni", "log", "lz4_flex 0.13.0", + "parquet", "simd-adler32", "snap", "tempfile", @@ -2675,9 +2708,9 @@ dependencies = [ [[package]] name = "dissimilar" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8975ffdaa0ef3661bfe02dbdcc06c9f829dfafe6a3c474de366a8d5e44276921" +checksum = "aeda16ab4059c5fd2a83f2b9c9e9c981327b18aa8e3b313f7e6563799d4f093e" [[package]] name = "dlv-list" @@ -3612,9 +3645,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "iri-string" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +checksum = "d8e7418f59cc01c88316161279a7f665217ae316b388e58a0d10e29f54f1e5eb" dependencies = [ "memchr", "serde", @@ -3631,6 +3664,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.13.0" @@ -3651,9 +3690,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "java-locator" @@ -3715,7 +3754,7 @@ dependencies = [ "cfg-if", "combine", "java-locator", - "jni-sys", + "jni-sys 0.3.1", "libloading 0.7.4", "log", "thiserror 1.0.69", @@ -3725,9 +3764,31 @@ dependencies = [ [[package]] name = "jni-sys" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" +checksum = "41a652e1f9b6e0275df1f15b32661cf0d4b78d4d87ddec5e0c3c20f097433258" +dependencies = [ + "jni-sys 0.4.1", +] + +[[package]] +name = "jni-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6377a88cb3910bee9b0fa88d4f42e1d2da8e79915598f65fb0c7ee14c878af2" +dependencies = [ + "jni-sys-macros", +] + +[[package]] +name = "jni-sys-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38c0b942f458fe50cdac086d2f946512305e5631e720728f2a61aabcd47a6264" +dependencies = [ + "quote", + "syn 2.0.117", +] [[package]] name = "jobserver" @@ -4067,9 +4128,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", "wasi", @@ -4084,9 +4145,9 @@ checksum = "dce6dd36094cac388f119d2e9dc82dc730ef91c32a6222170d630e5414b956e6" [[package]] name = "moka" -version = "0.12.14" +version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85f8024e1c8e71c778968af91d43700ce1d11b219d127d79fb2934153b82b42b" +checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" dependencies = [ "async-lock", "crossbeam-channel", @@ -4187,9 +4248,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" [[package]] name = "num-format" @@ -4311,6 +4372,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" @@ -4715,9 +4782,9 @@ checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" [[package]] name = "portable-atomic-util" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5" +checksum = "091397be61a01d4be58e7841595bd4bfedb15f1cd54977d79b8271e94ed799a3" dependencies = [ "portable-atomic", ] @@ -5595,9 +5662,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.17.0" +version = "3.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9" +checksum = "dd5414fad8e6907dbdd5bc441a50ae8d6e26151a03b1de04d89a5576de61d01f" dependencies = [ "base64", "chrono", @@ -5614,11 +5681,11 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.17.0" +version = "3.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0" +checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65" dependencies = [ - "darling 0.21.3", + "darling 0.23.0", "proc-macro2", "quote", "syn 2.0.117", @@ -5836,9 +5903,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "symbolic-common" -version = "12.17.2" +version = "12.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751a2823d606b5d0a7616499e4130a516ebd01a44f39811be2b9600936509c23" +checksum = "52ca086c1eb5c7ee74b151ba83c6487d5d33f8c08ad991b86f3f58f6629e68d5" dependencies = [ "debugid", "memmap2", @@ -5848,9 +5915,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "12.17.2" +version = "12.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b237cfbe320601dd24b4ac817a5b68bb28f5508e33f08d42be0682cadc8ac9" +checksum = "baa911a28a62823aaf2cc2e074212492a3ee69d0d926cc8f5b12b4a108ff5c0c" dependencies = [ "cpp_demangle", "rustc-demangle", @@ -6072,9 +6139,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" dependencies = [ "tinyvec_macros", ] @@ -6300,9 +6367,9 @@ checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-segmentation" -version = "1.12.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" [[package]] name = "unicode-width" @@ -6361,6 +6428,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.23.0" @@ -7027,18 +7100,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.42" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" +checksum = "efbb2a062be311f2ba113ce66f697a4dc589f85e78a4aea276200804cea0ed87" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.42" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" +checksum = "0e8bc7269b54418e7aeeef514aa68f8690b8c0489a06b0136e5f57c4c5ccab89" dependencies = [ "proc-macro2", "quote", diff --git a/native/Cargo.toml b/native/Cargo.toml index c626743be1..3fb087e443 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -34,7 +34,7 @@ edition = "2021" rust-version = "1.88" [workspace.dependencies] -arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] } +arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz", "ipc_compression"] } async-trait = { version = "0.1" } bytes = { version = "1.11.1" } parquet = { version = "57.3.0", default-features = false, features = ["experimental"] } diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index e0a395ebbf..b643a9d3a4 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -893,8 +893,23 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_decodeShuffleBlock( let raw_pointer = env.get_direct_buffer_address(&byte_buffer)?; let length = length as usize; let slice: &[u8] = unsafe { std::slice::from_raw_parts(raw_pointer, length) }; - let batch = read_ipc_compressed(slice)?; - prepare_output(&mut env, array_addrs, schema_addrs, batch, false) + + // Auto-detect format: IPC stream starts with Arrow continuation 0xFFFFFFFF + let is_ipc_stream = length >= 4 && slice[0..4] == [0xFF, 0xFF, 0xFF, 0xFF]; + if is_ipc_stream { + use arrow::ipc::reader::StreamReader; + let mut reader = StreamReader::try_new(slice, None)?; + match reader.next() { + Some(Ok(batch)) => { + prepare_output(&mut env, array_addrs, schema_addrs, batch, false) + } + Some(Err(e)) => Err(e.into()), + None => Ok(-1), // empty stream = EOF + } + } else { + let batch = read_ipc_compressed(slice)?; + prepare_output(&mut env, array_addrs, schema_addrs, batch, false) + } }) }) } diff --git a/native/core/src/execution/operators/shuffle_scan.rs b/native/core/src/execution/operators/shuffle_scan.rs index a1ad52310c..c6ffee5251 100644 --- a/native/core/src/execution/operators/shuffle_scan.rs +++ b/native/core/src/execution/operators/shuffle_scan.rs @@ -24,6 +24,7 @@ use crate::{ }; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::ipc::reader::StreamReader; use datafusion::common::{arrow_datafusion_err, Result as DataFusionResult}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::metrics::{ @@ -45,6 +46,9 @@ use std::{ use super::scan::InputBatch; +/// A StreamReader over owned bytes, used for streaming IPC batches one at a time. +type IpcBatchReader = StreamReader>>; + /// ShuffleScanExec reads compressed shuffle blocks from JVM via JNI and decodes them natively. /// Unlike ScanExec which receives Arrow arrays via FFI, ShuffleScanExec receives raw compressed /// bytes from CometShuffleBlockIterator and decodes them using read_ipc_compressed(). @@ -60,6 +64,9 @@ pub struct ShuffleScanExec { pub schema: SchemaRef, /// The current input batch, populated by get_next_batch() before poll_next(). pub batch: Arc>>, + /// Active IPC stream reader for streaming batches one at a time. + /// Wrapped in Arc so ShuffleScanExec can derive Clone. + ipc_stream_reader: Arc>>, /// Cache of plan properties. cache: PlanProperties, /// Metrics collector. @@ -94,6 +101,7 @@ impl ShuffleScanExec { input_source, data_types, batch: Arc::new(Mutex::new(None)), + ipc_stream_reader: Arc::new(Mutex::new(None)), cache, metrics: metrics_set, baseline_metrics, @@ -111,33 +119,75 @@ impl ShuffleScanExec { /// because JNI calls cannot happen from within poll_next on tokio threads. pub fn get_next_batch(&mut self) -> Result<(), CometError> { if self.input_source.is_none() { - // Unit test mode - no JNI calls needed. return Ok(()); } let mut timer = self.baseline_metrics.elapsed_compute().timer(); let mut current_batch = self.batch.try_lock().unwrap(); if current_batch.is_none() { - let next_batch = Self::get_next( - self.exec_context_id, - self.input_source.as_ref().unwrap().as_obj(), - &self.data_types, - &self.decode_time, - )?; - *current_batch = Some(next_batch); + // Try to read the next batch from an active IPC stream reader + let batch_from_reader = { + let mut reader_guard = self.ipc_stream_reader.try_lock().unwrap(); + if let Some(reader) = reader_guard.as_mut() { + match reader.next() { + Some(Ok(batch)) => Some(batch), + _ => { + // Stream exhausted or error — drop the reader + *reader_guard = None; + None + } + } + } else { + None + } + }; + + if let Some(batch) = batch_from_reader { + *current_batch = Some(Self::batch_to_input(&batch, &self.data_types)); + } else { + let next_batch = Self::get_next( + self.exec_context_id, + self.input_source.as_ref().unwrap().as_obj(), + &self.data_types, + &self.decode_time, + &self.ipc_stream_reader, + )?; + *current_batch = Some(next_batch); + } } timer.stop(); - Ok(()) } - /// Invokes JNI calls to get the next compressed shuffle block and decode it. + fn batch_to_input(batch: &arrow::array::RecordBatch, data_types: &[DataType]) -> InputBatch { + let num_rows = batch.num_rows(); + let columns: Vec = batch + .columns() + .iter() + .map(|col| unpack_dictionary(col)) + .collect(); + + debug_assert_eq!( + columns.len(), + data_types.len(), + "Shuffle block column count mismatch: got {} but expected {}", + columns.len(), + data_types.len() + ); + + InputBatch::new(columns, Some(num_rows)) + } + + /// Invokes JNI calls to get the next shuffle block and decode it. + /// For IPC stream format, creates a `StreamReader` that yields batches + /// one at a time (stored in `ipc_stream_reader` for subsequent calls). fn get_next( exec_context_id: i64, iter: &JObject, data_types: &[DataType], decode_time: &Time, + ipc_stream_reader: &Arc>>, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { return Ok(InputBatch::EOF); @@ -151,7 +201,6 @@ impl ShuffleScanExec { let mut env = JVMClasses::get_env()?; - // has_next() reads the next block and returns its length, or -1 if EOF let block_length: i32 = unsafe { jni_call!(&mut env, comet_shuffle_block_iterator(iter).has_next() -> i32)? @@ -161,7 +210,6 @@ impl ShuffleScanExec { return Ok(InputBatch::EOF); } - // Get the DirectByteBuffer containing the compressed shuffle block let buffer: JObject = unsafe { jni_call!(&mut env, comet_shuffle_block_iterator(iter).get_buffer() -> JObject)? @@ -172,32 +220,38 @@ impl ShuffleScanExec { let length = block_length as usize; let slice: &[u8] = unsafe { std::slice::from_raw_parts(raw_pointer, length) }; - // Decode the compressed IPC data + // Detect format: block starts with codec prefix, IPC stream starts with 0xFFFFFFFF let mut timer = decode_time.timer(); - let batch = read_ipc_compressed(slice)?; + let is_ipc_stream = length >= 4 && slice[0..4] == [0xFF, 0xFF, 0xFF, 0xFF]; + + let batch = if is_ipc_stream { + // Copy bytes into owned memory and create a StreamReader that + // yields batches one at a time (no bulk materialization). + let owned = slice.to_vec(); + let cursor = std::io::Cursor::new(owned); + let mut reader = + unsafe { StreamReader::try_new(cursor, None)?.with_skip_validation(true) }; + let first = match reader.next() { + Some(Ok(batch)) => batch, + Some(Err(e)) => { + timer.stop(); + return Err(e.into()); + } + None => { + timer.stop(); + return Ok(InputBatch::EOF); + } + }; + // Store the reader so subsequent calls can pull more batches + // without another JNI round-trip. + *ipc_stream_reader.try_lock().unwrap() = Some(reader); + first + } else { + read_ipc_compressed(slice)? + }; timer.stop(); - let num_rows = batch.num_rows(); - - // Extract column arrays, unpacking any dictionary-encoded columns. - // Native shuffle may dictionary-encode string/binary columns for efficiency, - // but downstream DataFusion operators expect the value types declared in the - // schema (e.g. Utf8, not Dictionary). - let columns: Vec = batch - .columns() - .iter() - .map(|col| unpack_dictionary(col)) - .collect(); - - debug_assert_eq!( - columns.len(), - data_types.len(), - "Shuffle block column count mismatch: got {} but expected {}", - columns.len(), - data_types.len() - ); - - Ok(InputBatch::new(columns, Some(num_rows))) + Ok(Self::batch_to_input(&batch, data_types)) } } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 5af31fcc22..b5fe84cb4e 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -73,7 +73,7 @@ use datafusion_comet_spark_expr::{ use iceberg::expr::Bind; use crate::execution::operators::ExecutionError::GeneralError; -use crate::execution::shuffle::{CometPartitioning, CompressionCodec}; +use crate::execution::shuffle::{CometPartitioning, CompressionCodec, ShuffleFormat}; use crate::execution::spark_plan::SparkPlan; use crate::parquet::parquet_support::prepare_object_store_with_configs; use datafusion::common::scalar::ScalarStructBuilder; @@ -116,7 +116,8 @@ use datafusion_comet_proto::{ spark_operator::{ self, lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct, upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, - CompressionCodec as SparkCompressionCodec, JoinType, Operator, WindowFrameType, + CompressionCodec as SparkCompressionCodec, JoinType, Operator, + ShuffleWriterFormat as SparkShuffleWriterFormat, WindowFrameType, }, spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; @@ -1351,11 +1352,17 @@ impl PhysicalPlanner { ))), }?; + let format = match writer.format.try_into() { + Ok(SparkShuffleWriterFormat::IpcStream) => ShuffleFormat::IpcStream, + _ => ShuffleFormat::Block, + }; + let write_buffer_size = writer.write_buffer_size as usize; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, codec, + format, writer.output_data_file.clone(), writer.output_index_file.clone(), writer.tracing_enabled, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 344b9f0f21..1dc14b82d4 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -284,6 +284,11 @@ enum CompressionCodec { Snappy = 3; } +enum ShuffleWriterFormat { + BLOCK = 0; + IPC_STREAM = 1; +} + message ShuffleWriter { spark.spark_partitioning.Partitioning partitioning = 1; string output_data_file = 3; @@ -294,6 +299,8 @@ message ShuffleWriter { // Size of the write buffer in bytes used when writing shuffle data to disk. // Larger values may improve write performance but use more memory. int32 write_buffer_size = 8; + // The shuffle data format to use (Block or IpcStream). + ShuffleWriterFormat format = 9; } message ParquetWriter { diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml index 94ed5f30a5..a5982c05fa 100644 --- a/native/shuffle/Cargo.toml +++ b/native/shuffle/Cargo.toml @@ -32,6 +32,7 @@ publish = false arrow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +clap = { version = "4", features = ["derive"], optional = true } crc32fast = "1.3.2" datafusion = { workspace = true } datafusion-comet-common = { workspace = true } @@ -42,6 +43,8 @@ itertools = "0.14.0" jni = "0.21" log = "0.4" lz4_flex = { version = "0.13.0", default-features = false, features = ["frame"] } +# parquet is only used by the shuffle_bench binary (shuffle-bench feature) +parquet = { workspace = true, optional = true } simd-adler32 = "0.3.9" snap = "1.1" tokio = { version = "1", features = ["rt-multi-thread"] } @@ -53,10 +56,18 @@ datafusion = { workspace = true, features = ["parquet_encryption", "sql"] } itertools = "0.14.0" tempfile = "3.26.0" +[features] +shuffle-bench = ["clap", "parquet"] + [lib] name = "datafusion_comet_shuffle" path = "src/lib.rs" +[[bin]] +name = "shuffle_bench" +path = "src/bin/shuffle_bench.rs" +required-features = ["shuffle-bench"] + [[bench]] name = "shuffle_writer" harness = false diff --git a/native/shuffle/README.md b/native/shuffle/README.md index 8fba6b0323..74b8dbe656 100644 --- a/native/shuffle/README.md +++ b/native/shuffle/README.md @@ -23,3 +23,46 @@ This crate provides the shuffle writer and reader implementation for Apache Data of the [Apache DataFusion Comet] subproject. [Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/ + +## Shuffle Benchmark Tool + +A standalone benchmark binary (`shuffle_bench`) is included for profiling shuffle write and read +performance outside of Spark. It streams input data directly from Parquet files. + +### Basic usage + +```sh +cargo run --release --features shuffle-bench --bin shuffle_bench -- \ + --input /data/tpch-sf100/lineitem/ \ + --partitions 200 \ + --codec zstd --zstd-level 1 \ + --hash-columns 0,3 +``` + +### Options + +| Option | Default | Description | +| ------------------------ | -------------------------- | ------------------------------------------------------------ | +| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files | +| `--partitions` | `200` | Number of output shuffle partitions | +| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | +| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | +| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | +| `--zstd-level` | `1` | Zstd compression level (1–22) | +| `--batch-size` | `8192` | Batch size for reading Parquet data | +| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | +| `--max-buffered-batches` | `0` | Max batches to buffer before spilling (0 = memory-pool-only) | +| `--write-buffer-size` | `1048576` | Write buffer size in bytes | +| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) | +| `--iterations` | `1` | Number of timed iterations | +| `--warmup` | `0` | Number of warmup iterations before timing | +| `--read-back` | `false` | Also benchmark reading back the shuffle output | +| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files | + +### Profiling with flamegraph + +```sh +cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \ + --input /data/tpch-sf100/lineitem/ \ + --partitions 200 --codec zstd --zstd-level 1 +``` diff --git a/native/shuffle/benches/shuffle_writer.rs b/native/shuffle/benches/shuffle_writer.rs index 27abd919fa..612a7084bc 100644 --- a/native/shuffle/benches/shuffle_writer.rs +++ b/native/shuffle/benches/shuffle_writer.rs @@ -149,6 +149,7 @@ fn create_shuffle_writer_exec( ))), partitioning, compression_codec, + datafusion_comet_shuffle::ShuffleFormat::Block, "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), false, diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs new file mode 100644 index 0000000000..d198ce2fd6 --- /dev/null +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -0,0 +1,657 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Standalone shuffle benchmark tool for profiling Comet shuffle write and read +//! outside of Spark. Streams input directly from Parquet files. +//! +//! # Usage +//! +//! ```sh +//! cargo run --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 \ +//! --codec zstd --zstd-level 1 \ +//! --hash-columns 0,3 \ +//! --read-back +//! ``` +//! +//! Profile with flamegraph: +//! ```sh +//! cargo flamegraph --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 --codec zstd --zstd-level 1 +//! ``` + +use arrow::datatypes::{DataType, SchemaRef}; +use clap::Parser; +use datafusion::execution::config::SessionConfig; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_comet_shuffle::{ + read_ipc_compressed, CometPartitioning, CompressionCodec, ShuffleWriterExec, +}; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +#[derive(Parser, Debug)] +#[command( + name = "shuffle_bench", + about = "Standalone benchmark for Comet shuffle write and read performance" +)] +struct Args { + /// Path to input Parquet file or directory of Parquet files + #[arg(long)] + input: PathBuf, + + /// Batch size for reading Parquet data + #[arg(long, default_value_t = 8192)] + batch_size: usize, + + /// Number of output shuffle partitions + #[arg(long, default_value_t = 200)] + partitions: usize, + + /// Partitioning scheme: hash, single, round-robin + #[arg(long, default_value = "hash")] + partitioning: String, + + /// Column indices to hash on (comma-separated, e.g. "0,3") + #[arg(long, default_value = "0")] + hash_columns: String, + + /// Compression codec: none, lz4, zstd, snappy + #[arg(long, default_value = "zstd")] + codec: String, + + /// Zstd compression level (1-22) + #[arg(long, default_value_t = 1)] + zstd_level: i32, + + /// Memory limit in bytes (triggers spilling when exceeded) + #[arg(long)] + memory_limit: Option, + + /// Also benchmark reading back the shuffle output + #[arg(long, default_value_t = false)] + read_back: bool, + + /// Number of iterations to run + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Number of warmup iterations before timing + #[arg(long, default_value_t = 0)] + warmup: usize, + + /// Output directory for shuffle data/index files + #[arg(long, default_value = "/tmp/comet_shuffle_bench")] + output_dir: PathBuf, + + /// Write buffer size in bytes + #[arg(long, default_value_t = 1048576)] + write_buffer_size: usize, + + /// Limit rows processed per iteration (0 = no limit) + #[arg(long, default_value_t = 0)] + limit: usize, + + /// Number of concurrent shuffle tasks to simulate executor parallelism. + /// Each task reads the same input and writes to its own output files. + #[arg(long, default_value_t = 1)] + concurrent_tasks: usize, + + /// Shuffle format: block or ipc_stream + #[arg(long, default_value = "block")] + format: String, +} + +fn main() { + let args = Args::parse(); + + // Create output directory + fs::create_dir_all(&args.output_dir).expect("Failed to create output directory"); + let data_file = args.output_dir.join("data.out"); + let index_file = args.output_dir.join("index.out"); + + let (schema, total_rows) = read_parquet_metadata(&args.input, args.limit); + + let codec = parse_codec(&args.codec, args.zstd_level); + let format = parse_format(&args.format); + let hash_col_indices = parse_hash_columns(&args.hash_columns); + + println!("=== Shuffle Benchmark ==="); + println!("Input: {}", args.input.display()); + println!( + "Schema: {} columns ({})", + schema.fields().len(), + describe_schema(&schema) + ); + println!("Total rows: {}", format_number(total_rows as usize)); + println!("Batch size: {}", format_number(args.batch_size)); + println!("Partitioning: {}", args.partitioning); + println!("Partitions: {}", args.partitions); + println!("Codec: {:?}", codec); + println!("Format: {:?}", format); + println!("Hash columns: {:?}", hash_col_indices); + if let Some(mem_limit) = args.memory_limit { + println!("Memory limit: {}", format_bytes(mem_limit)); + } + if args.concurrent_tasks > 1 { + println!("Concurrent: {} tasks", args.concurrent_tasks); + } + println!( + "Iterations: {} (warmup: {})", + args.iterations, args.warmup + ); + println!(); + + let total_iters = args.warmup + args.iterations; + let mut write_times = Vec::with_capacity(args.iterations); + let mut read_times = Vec::with_capacity(args.iterations); + let mut data_file_sizes = Vec::with_capacity(args.iterations); + + for i in 0..total_iters { + let is_warmup = i < args.warmup; + let label = if is_warmup { + format!("warmup {}/{}", i + 1, args.warmup) + } else { + format!("iter {}/{}", i - args.warmup + 1, args.iterations) + }; + + let write_elapsed = if args.concurrent_tasks > 1 { + run_concurrent_shuffle_writes( + &args.input, + &schema, + &codec, + &format, + &hash_col_indices, + &args, + ) + } else { + run_shuffle_write( + &args.input, + &schema, + &codec, + &format, + &hash_col_indices, + &args, + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + ) + }; + let data_size = fs::metadata(&data_file).map(|m| m.len()).unwrap_or(0); + + if !is_warmup { + write_times.push(write_elapsed); + data_file_sizes.push(data_size); + } + + print!(" [{label}] write: {:.3}s", write_elapsed); + if args.concurrent_tasks <= 1 { + print!(" output: {}", format_bytes(data_size as usize)); + } + + if args.read_back && args.concurrent_tasks <= 1 { + let read_elapsed = run_shuffle_read( + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + args.partitions, + ); + if !is_warmup { + read_times.push(read_elapsed); + } + print!(" read: {:.3}s", read_elapsed); + } + println!(); + } + + if args.iterations > 0 { + println!(); + println!("=== Results ==="); + + let avg_write = write_times.iter().sum::() / write_times.len() as f64; + let write_throughput_rows = (total_rows as f64 * args.concurrent_tasks as f64) / avg_write; + + println!("Write:"); + println!(" avg time: {:.3}s", avg_write); + if write_times.len() > 1 { + let min = write_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = write_times + .iter() + .cloned() + .fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {} rows/s (total across {} tasks)", + format_number(write_throughput_rows as usize), + args.concurrent_tasks + ); + if args.concurrent_tasks <= 1 { + let avg_data_size = data_file_sizes.iter().sum::() / data_file_sizes.len() as u64; + println!( + " output size: {}", + format_bytes(avg_data_size as usize) + ); + } + + if !read_times.is_empty() { + let avg_data_size = data_file_sizes.iter().sum::() / data_file_sizes.len() as u64; + let avg_read = read_times.iter().sum::() / read_times.len() as f64; + let read_throughput_bytes = avg_data_size as f64 / avg_read; + + println!("Read:"); + println!(" avg time: {:.3}s", avg_read); + if read_times.len() > 1 { + let min = read_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = read_times.iter().cloned().fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {}/s (from compressed)", + format_bytes(read_throughput_bytes as usize) + ); + } + } + + let _ = fs::remove_file(&data_file); + let _ = fs::remove_file(&index_file); +} + +/// Read schema and total row count from Parquet metadata without loading any data. +fn read_parquet_metadata(path: &PathBuf, limit: usize) -> (SchemaRef, u64) { + let paths = collect_parquet_paths(path); + let mut schema = None; + let mut total_rows = 0u64; + + for file_path in &paths { + let file = fs::File::open(file_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {}", file_path.display(), e)); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap_or_else(|e| { + panic!( + "Failed to read Parquet metadata from {}: {}", + file_path.display(), + e + ) + }); + if schema.is_none() { + schema = Some(Arc::clone(builder.schema())); + } + total_rows += builder.metadata().file_metadata().num_rows() as u64; + if limit > 0 && total_rows >= limit as u64 { + total_rows = total_rows.min(limit as u64); + break; + } + } + + (schema.expect("No parquet files found"), total_rows) +} + +fn collect_parquet_paths(path: &PathBuf) -> Vec { + if path.is_dir() { + let mut files: Vec = fs::read_dir(path) + .unwrap_or_else(|e| panic!("Failed to read directory {}: {}", path.display(), e)) + .filter_map(|entry| { + let p = entry.ok()?.path(); + if p.extension().and_then(|e| e.to_str()) == Some("parquet") { + Some(p) + } else { + None + } + }) + .collect(); + files.sort(); + if files.is_empty() { + panic!("No .parquet files found in {}", path.display()); + } + files + } else { + vec![path.clone()] + } +} + +fn run_shuffle_write( + input_path: &PathBuf, + schema: &SchemaRef, + codec: &CompressionCodec, + format: &datafusion_comet_shuffle::ShuffleFormat, + hash_col_indices: &[usize], + args: &Args, + data_file: &str, + index_file: &str, +) -> f64 { + let partitioning = build_partitioning( + &args.partitioning, + args.partitions, + hash_col_indices, + schema, + ); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let start = Instant::now(); + execute_shuffle_write( + input_path.to_str().unwrap(), + codec.clone(), + format.clone(), + partitioning, + args.batch_size, + args.memory_limit, + args.write_buffer_size, + args.limit, + data_file.to_string(), + index_file.to_string(), + ) + .await + .unwrap(); + start.elapsed().as_secs_f64() + }) +} + +/// Core async shuffle write logic shared by single and concurrent paths. +async fn execute_shuffle_write( + input_path: &str, + codec: CompressionCodec, + format: datafusion_comet_shuffle::ShuffleFormat, + partitioning: CometPartitioning, + batch_size: usize, + memory_limit: Option, + write_buffer_size: usize, + limit: usize, + data_file: String, + index_file: String, +) -> datafusion::common::Result<()> { + let config = SessionConfig::new().with_batch_size(batch_size); + let mut runtime_builder = RuntimeEnvBuilder::new(); + if let Some(mem_limit) = memory_limit { + runtime_builder = runtime_builder.with_memory_limit(mem_limit, 1.0); + } + let runtime_env = Arc::new(runtime_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + + let mut df = ctx + .read_parquet(input_path, ParquetReadOptions::default()) + .await + .expect("Failed to create Parquet scan"); + if limit > 0 { + df = df.limit(0, Some(limit)).unwrap(); + } + + let parquet_plan = df + .create_physical_plan() + .await + .expect("Failed to create physical plan"); + + let input: Arc = if parquet_plan + .properties() + .output_partitioning() + .partition_count() + > 1 + { + Arc::new(CoalescePartitionsExec::new(parquet_plan)) + } else { + parquet_plan + }; + + let exec = ShuffleWriterExec::try_new( + input, + partitioning, + codec, + format, + data_file, + index_file, + false, + write_buffer_size, + ) + .expect("Failed to create ShuffleWriterExec"); + + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + collect(stream).await.unwrap(); + Ok(()) +} + +/// Run N concurrent shuffle writes to simulate executor parallelism. +/// Returns wall-clock time for all tasks to complete. +fn run_concurrent_shuffle_writes( + input_path: &PathBuf, + schema: &SchemaRef, + codec: &CompressionCodec, + format: &datafusion_comet_shuffle::ShuffleFormat, + hash_col_indices: &[usize], + args: &Args, +) -> f64 { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let start = Instant::now(); + + let mut handles = Vec::with_capacity(args.concurrent_tasks); + for task_id in 0..args.concurrent_tasks { + let task_dir = args.output_dir.join(format!("task_{task_id}")); + fs::create_dir_all(&task_dir).expect("Failed to create task output directory"); + let data_file = task_dir.join("data.out").to_str().unwrap().to_string(); + let index_file = task_dir.join("index.out").to_str().unwrap().to_string(); + + let input_str = input_path.to_str().unwrap().to_string(); + let codec = codec.clone(); + let format = format.clone(); + let partitioning = build_partitioning( + &args.partitioning, + args.partitions, + hash_col_indices, + schema, + ); + let batch_size = args.batch_size; + let memory_limit = args.memory_limit; + let write_buffer_size = args.write_buffer_size; + let limit = args.limit; + + handles.push(tokio::spawn(async move { + execute_shuffle_write( + &input_str, + codec, + format, + partitioning, + batch_size, + memory_limit, + write_buffer_size, + limit, + data_file, + index_file, + ) + .await + .unwrap(); + })); + } + + for handle in handles { + handle.await.expect("Task panicked"); + } + + for task_id in 0..args.concurrent_tasks { + let task_dir = args.output_dir.join(format!("task_{task_id}")); + let _ = fs::remove_dir_all(&task_dir); + } + + start.elapsed().as_secs_f64() + }) +} + +fn run_shuffle_read(data_file: &str, index_file: &str, num_partitions: usize) -> f64 { + let start = Instant::now(); + + let index_bytes = fs::read(index_file).expect("Failed to read index file"); + let num_offsets = index_bytes.len() / 8; + let offsets: Vec = (0..num_offsets) + .map(|i| { + let bytes: [u8; 8] = index_bytes[i * 8..(i + 1) * 8].try_into().unwrap(); + i64::from_le_bytes(bytes) + }) + .collect(); + + let data_bytes = fs::read(data_file).expect("Failed to read data file"); + + let mut total_rows = 0usize; + let mut total_batches = 0usize; + + for p in 0..num_partitions.min(offsets.len().saturating_sub(1)) { + let start_offset = offsets[p] as usize; + let end_offset = offsets[p + 1] as usize; + + if start_offset >= end_offset { + continue; + } + + let mut offset = start_offset; + while offset < end_offset { + let ipc_length = + u64::from_le_bytes(data_bytes[offset..offset + 8].try_into().unwrap()) as usize; + let block_data = &data_bytes[offset + 16..offset + 8 + ipc_length]; + let batch = read_ipc_compressed(block_data).expect("Failed to decode shuffle block"); + total_rows += batch.num_rows(); + total_batches += 1; + offset += 8 + ipc_length; + } + } + + let elapsed = start.elapsed().as_secs_f64(); + eprintln!( + " read back {} rows in {} batches from {} partitions", + format_number(total_rows), + total_batches, + num_partitions + ); + elapsed +} + +fn build_partitioning( + scheme: &str, + num_partitions: usize, + hash_col_indices: &[usize], + schema: &SchemaRef, +) -> CometPartitioning { + match scheme { + "single" => CometPartitioning::SinglePartition, + "round-robin" => CometPartitioning::RoundRobin(num_partitions, 0), + "hash" => { + let exprs: Vec> = hash_col_indices + .iter() + .map(|&idx| { + let field = schema.field(idx); + Arc::new(Column::new(field.name(), idx)) + as Arc + }) + .collect(); + CometPartitioning::Hash(exprs, num_partitions) + } + other => { + eprintln!("Unknown partitioning scheme: {other}. Using hash."); + build_partitioning("hash", num_partitions, hash_col_indices, schema) + } + } +} + +fn parse_format(format: &str) -> datafusion_comet_shuffle::ShuffleFormat { + match format.to_lowercase().as_str() { + "ipc_stream" | "ipc-stream" | "stream" => { + datafusion_comet_shuffle::ShuffleFormat::IpcStream + } + _ => datafusion_comet_shuffle::ShuffleFormat::Block, + } +} + +fn parse_codec(codec: &str, zstd_level: i32) -> CompressionCodec { + match codec.to_lowercase().as_str() { + "none" => CompressionCodec::None, + "lz4" => CompressionCodec::Lz4Frame, + "zstd" => CompressionCodec::Zstd(zstd_level), + "snappy" => CompressionCodec::Snappy, + other => { + eprintln!("Unknown codec: {other}. Using zstd."); + CompressionCodec::Zstd(zstd_level) + } + } +} + +fn parse_hash_columns(s: &str) -> Vec { + s.split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.trim().parse::().expect("Invalid column index")) + .collect() +} + +fn describe_schema(schema: &arrow::datatypes::Schema) -> String { + let mut counts = std::collections::HashMap::new(); + for field in schema.fields() { + let type_name = match field.data_type() { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => "int", + DataType::Float16 | DataType::Float32 | DataType::Float64 => "float", + DataType::Utf8 | DataType::LargeUtf8 => "string", + DataType::Boolean => "bool", + DataType::Date32 | DataType::Date64 => "date", + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => "decimal", + DataType::Timestamp(_, _) => "timestamp", + DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => "binary", + _ => "other", + }; + *counts.entry(type_name).or_insert(0) += 1; + } + let mut parts: Vec = counts + .into_iter() + .map(|(k, v)| format!("{}x{}", v, k)) + .collect(); + parts.sort(); + parts.join(", ") +} + +fn format_number(n: usize) -> String { + let s = n.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); + } + result.push(c); + } + result.chars().rev().collect() +} + +fn format_bytes(bytes: usize) -> String { + if bytes >= 1024 * 1024 * 1024 { + format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)) + } else if bytes >= 1024 * 1024 { + format!("{:.2} MiB", bytes as f64 / (1024.0 * 1024.0)) + } else if bytes >= 1024 { + format!("{:.2} KiB", bytes as f64 / 1024.0) + } else { + format!("{bytes} B") + } +} diff --git a/native/shuffle/src/ipc.rs b/native/shuffle/src/ipc.rs index 81ee41332a..8e3bfc3ede 100644 --- a/native/shuffle/src/ipc.rs +++ b/native/shuffle/src/ipc.rs @@ -20,6 +20,19 @@ use arrow::ipc::reader::StreamReader; use datafusion::common::DataFusionError; use datafusion::error::Result; +/// Reads all record batches from a standard Arrow IPC stream. +/// +/// This is the counterpart to [`crate::writers::IpcStreamWriter`]. The input +/// should be a complete Arrow IPC stream (schema + batch messages + EOS marker), +/// optionally with Arrow IPC body compression (LZ4_FRAME or ZSTD). +pub fn read_ipc_stream(bytes: &[u8]) -> Result> { + let reader = StreamReader::try_new(bytes, None)?; + reader + .into_iter() + .map(|r| r.map_err(|e| e.into())) + .collect() +} + pub fn read_ipc_compressed(bytes: &[u8]) -> Result { match &bytes[0..4] { b"SNAP" => { diff --git a/native/shuffle/src/lib.rs b/native/shuffle/src/lib.rs index f29588f2e1..12978ce37b 100644 --- a/native/shuffle/src/lib.rs +++ b/native/shuffle/src/lib.rs @@ -24,6 +24,17 @@ pub mod spark_unsafe; pub(crate) mod writers; pub use comet_partitioning::CometPartitioning; -pub use ipc::read_ipc_compressed; +pub use ipc::{read_ipc_compressed, read_ipc_stream}; pub use shuffle_writer::ShuffleWriterExec; -pub use writers::{CompressionCodec, ShuffleBlockWriter}; +pub use writers::{CompressionCodec, IpcStreamWriter, ShuffleBlockWriter}; + +/// The format used for writing shuffle data. +#[derive(Debug, Clone)] +pub enum ShuffleFormat { + /// Custom block format: each batch is a self-contained block with a + /// length-prefix + field-count + codec header followed by compressed Arrow IPC. + Block, + /// Standard Arrow IPC stream format: schema written once, batches as IPC + /// messages, with optional Arrow IPC body compression (LZ4_FRAME or ZSTD). + IpcStream, +} diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 655bee3511..378267c052 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -21,8 +21,12 @@ use crate::partitioners::partitioned_batch_iterator::{ }; use crate::partitioners::ShufflePartitioner; use crate::writers::{BufBatchWriter, PartitionWriter}; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use crate::{ + comet_partitioning, CometPartitioning, CompressionCodec, IpcStreamWriter, ShuffleBlockWriter, + ShuffleFormat, +}; use arrow::array::{ArrayRef, RecordBatch}; +use arrow::compute::kernels::coalesce::BatchCoalescer; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::common::DataFusionError; @@ -108,6 +112,7 @@ impl ScratchSpace { pub(crate) struct MultiPartitionShuffleRepartitioner { output_data_file: String, output_index_file: String, + schema: SchemaRef, buffered_batches: Vec, partition_indices: Vec>, partition_writers: Vec, @@ -122,6 +127,8 @@ pub(crate) struct MultiPartitionShuffleRepartitioner { batch_size: usize, /// Reservation for repartitioning reservation: MemoryReservation, + codec: CompressionCodec, + format: ShuffleFormat, tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, @@ -139,6 +146,7 @@ impl MultiPartitionShuffleRepartitioner { runtime: Arc, batch_size: usize, codec: CompressionCodec, + format: ShuffleFormat, tracing_enabled: bool, write_buffer_size: usize, ) -> datafusion::common::Result { @@ -168,7 +176,7 @@ impl MultiPartitionShuffleRepartitioner { let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; let partition_writers = (0..num_output_partitions) - .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) + .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone(), format.clone())) .collect::>>()?; let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) @@ -178,6 +186,7 @@ impl MultiPartitionShuffleRepartitioner { Ok(Self { output_data_file, output_index_file, + schema, buffered_batches: vec![], partition_indices: vec![vec![]; num_output_partitions], partition_writers, @@ -188,6 +197,8 @@ impl MultiPartitionShuffleRepartitioner { scratch, batch_size, reservation, + codec, + format, tracing_enabled, write_buffer_size, }) @@ -434,7 +445,7 @@ impl MultiPartitionShuffleRepartitioner { Ok(()) } - fn shuffle_write_partition( + fn shuffle_write_partition_block( partition_iter: &mut PartitionedBatchIterator, shuffle_block_writer: &mut ShuffleBlockWriter, output_data: &mut BufWriter, @@ -513,6 +524,8 @@ impl MultiPartitionShuffleRepartitioner { &self.metrics, self.write_buffer_size, self.batch_size, + &self.codec, + self.schema.as_ref(), )?; } @@ -579,8 +592,12 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { for i in 0..num_output_partitions { offsets[i] = output_data.stream_position()?; - // if we wrote a spill file for this partition then copy the - // contents into the shuffle file + let mut partition_iter = partitioned_batches.produce(i); + + // Finish any open IPC spill stream before copying + self.partition_writers[i].finish_spill()?; + + // Copy spill file contents into the output if let Some(spill_path) = self.partition_writers[i].path() { let mut spill_file = BufReader::new(File::open(spill_path)?); let mut write_timer = self.metrics.write_time.timer(); @@ -588,17 +605,40 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { write_timer.stop(); } - // Write in memory batches to output data file - let mut partition_iter = partitioned_batches.produce(i); - Self::shuffle_write_partition( - &mut partition_iter, - &mut self.shuffle_block_writer, - &mut output_data, - &self.metrics.encode_time, - &self.metrics.write_time, - self.write_buffer_size, - self.batch_size, - )?; + match &self.format { + ShuffleFormat::Block => { + Self::shuffle_write_partition_block( + &mut partition_iter, + &mut self.shuffle_block_writer, + &mut output_data, + &self.metrics.encode_time, + &self.metrics.write_time, + self.write_buffer_size, + self.batch_size, + )?; + } + ShuffleFormat::IpcStream => { + let start_pos = output_data.stream_position()?; + let mut ipc_writer = IpcStreamWriter::try_new_length_prefixed( + &mut output_data, + self.schema.as_ref(), + self.codec.clone(), + )?; + let mut coalescer = + BatchCoalescer::new(Arc::clone(&self.schema), self.batch_size); + for batch in &mut partition_iter { + coalescer.push_batch(batch?)?; + while let Some(b) = coalescer.next_completed_batch() { + ipc_writer.write_batch(&b, &self.metrics.encode_time)?; + } + } + coalescer.finish_buffered_batch()?; + while let Some(b) = coalescer.next_completed_batch() { + ipc_writer.write_batch(&b, &self.metrics.encode_time)?; + } + ipc_writer.finish_length_prefixed(start_pos)?; + } + } } let mut write_timer = self.metrics.write_time.timer(); diff --git a/native/shuffle/src/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs index 5801ef613b..9402781975 100644 --- a/native/shuffle/src/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -18,30 +18,40 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::ShufflePartitioner; use crate::writers::BufBatchWriter; -use crate::{CompressionCodec, ShuffleBlockWriter}; +use crate::{CompressionCodec, IpcStreamWriter, ShuffleBlockWriter, ShuffleFormat}; use arrow::array::RecordBatch; +use arrow::compute::kernels::coalesce::BatchCoalescer; use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; use std::fs::{File, OpenOptions}; use std::io::{BufWriter, Write}; +use std::path::PathBuf; use tokio::time::Instant; +/// Output strategy for writing shuffle data in either block or IPC stream format. +enum OutputWriter { + Block(BufBatchWriter), + /// The writer is wrapped in Option so it can be taken for finish(). + IpcStream { + writer: Option>>, + coalescer: Option, + batch_size: usize, + }, +} + /// A partitioner that writes all shuffle data to a single file and a single index file pub(crate) struct SinglePartitionShufflePartitioner { - // output_data_file: File, - output_data_writer: BufBatchWriter, + output: OutputWriter, + output_data_path: PathBuf, output_index_path: String, - /// Batches that are smaller than the batch size and to be concatenated buffered_batches: Vec, - /// Number of rows in the concatenating batches num_buffered_rows: usize, - /// Metrics for the repartitioner metrics: ShufflePartitionerMetrics, - /// The configured batch size batch_size: usize, } impl SinglePartitionShufflePartitioner { + #[allow(clippy::too_many_arguments)] pub(crate) fn try_new( output_data_path: String, output_index_path: String, @@ -49,25 +59,41 @@ impl SinglePartitionShufflePartitioner { metrics: ShufflePartitionerMetrics, batch_size: usize, codec: CompressionCodec, + format: ShuffleFormat, write_buffer_size: usize, ) -> datafusion::common::Result { - let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; - let output_data_file = OpenOptions::new() .write(true) .create(true) .truncate(true) - .open(output_data_path)?; + .open(&output_data_path)?; - let output_data_writer = BufBatchWriter::new( - shuffle_block_writer, - output_data_file, - write_buffer_size, - batch_size, - ); + let output = match format { + ShuffleFormat::Block => { + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; + OutputWriter::Block(BufBatchWriter::new( + shuffle_block_writer, + output_data_file, + write_buffer_size, + batch_size, + )) + } + ShuffleFormat::IpcStream => { + let buf_writer = BufWriter::with_capacity(write_buffer_size, output_data_file); + let writer = + IpcStreamWriter::try_new_length_prefixed(buf_writer, schema.as_ref(), codec)?; + OutputWriter::IpcStream { + writer: Some(writer), + coalescer: None, + batch_size, + } + } + }; Ok(Self { - output_data_writer, + output, + output_data_path: PathBuf::from(output_data_path), output_index_path, buffered_batches: vec![], num_buffered_rows: 0, @@ -76,14 +102,11 @@ impl SinglePartitionShufflePartitioner { }) } - /// Add a batch to the buffer of the partitioner, these buffered batches will be concatenated - /// and written to the output data file when the number of rows in the buffer reaches the batch size. fn add_buffered_batch(&mut self, batch: RecordBatch) { self.num_buffered_rows += batch.num_rows(); self.buffered_batches.push(batch); } - /// Consumes buffered batches and return a concatenated batch if successful fn concat_buffered_batches(&mut self) -> datafusion::common::Result> { if self.buffered_batches.is_empty() { Ok(None) @@ -106,6 +129,67 @@ impl SinglePartitionShufflePartitioner { } } } + + fn write_batch(&mut self, batch: &RecordBatch) -> datafusion::common::Result<()> { + match &mut self.output { + OutputWriter::Block(writer) => { + writer.write(batch, &self.metrics.encode_time, &self.metrics.write_time)?; + Ok(()) + } + OutputWriter::IpcStream { + writer, + coalescer, + batch_size, + } => { + let w = writer.as_mut().ok_or_else(|| { + DataFusionError::Internal("IPC stream writer already finished".to_string()) + })?; + let coal = coalescer + .get_or_insert_with(|| BatchCoalescer::new(batch.schema(), *batch_size)); + coal.push_batch(batch.clone())?; + + while let Some(b) = coal.next_completed_batch() { + w.write_batch(&b, &self.metrics.encode_time)?; + } + Ok(()) + } + } + } + + fn flush_output(&mut self) -> datafusion::common::Result<()> { + match &mut self.output { + OutputWriter::Block(writer) => { + writer.flush(&self.metrics.encode_time, &self.metrics.write_time) + } + OutputWriter::IpcStream { + writer, coalescer, .. + } => { + let w = writer.as_mut().ok_or_else(|| { + DataFusionError::Internal("IPC stream writer already finished".to_string()) + })?; + if let Some(coal) = coalescer { + coal.finish_buffered_batch()?; + while let Some(b) = coal.next_completed_batch() { + w.write_batch(&b, &self.metrics.encode_time)?; + } + } + Ok(()) + } + } + } + + fn finish_ipc_stream(&mut self) -> datafusion::common::Result<()> { + if let OutputWriter::IpcStream { writer, .. } = &mut self.output { + if let Some(w) = writer.take() { + // start_pos is 0 for single partition (length prefix is at file start) + let buf_writer = w.finish_length_prefixed(0)?; + buf_writer + .into_inner() + .map_err(|e| DataFusionError::Execution(format!("flush error: {e}")))?; + } + } + Ok(()) + } } #[async_trait::async_trait] @@ -121,24 +205,13 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { if num_rows >= self.batch_size || num_rows + self.num_buffered_rows > self.batch_size { let concatenated_batch = self.concat_buffered_batches()?; - // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + self.write_batch(&batch)?; } if num_rows >= self.batch_size { - // Write the new batch - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + self.write_batch(&batch)?; } else { - // Add the new batch to the buffer self.add_buffered_batch(batch); } } else { @@ -158,18 +231,13 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { let start_time = Instant::now(); let concatenated_batch = self.concat_buffered_batches()?; - // Write the concatenated buffered batch if let Some(batch) = concatenated_batch { - self.output_data_writer.write( - &batch, - &self.metrics.encode_time, - &self.metrics.write_time, - )?; + self.write_batch(&batch)?; } - self.output_data_writer - .flush(&self.metrics.encode_time, &self.metrics.write_time)?; + self.flush_output()?; + self.finish_ipc_stream()?; - // Write index file. It should only contain 2 entries: 0 and the total number of bytes written + let data_file_length = std::fs::metadata(&self.output_data_path)?.len(); let index_file = OpenOptions::new() .write(true) .create(true) @@ -177,7 +245,6 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { .open(self.output_index_path.clone()) .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; let mut index_buf_writer = BufWriter::new(index_file); - let data_file_length = self.output_data_writer.writer_stream_position()?; for offset in [0, data_file_length] { index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?; } diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index e649aaac69..554a97ff2d 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -21,7 +21,7 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, }; -use crate::{CometPartitioning, CompressionCodec}; +use crate::{CometPartitioning, CompressionCodec, ShuffleFormat}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; @@ -65,6 +65,8 @@ pub struct ShuffleWriterExec { cache: PlanProperties, /// The compression codec to use when compressing shuffle blocks codec: CompressionCodec, + /// The shuffle data format (Block or IpcStream) + format: ShuffleFormat, tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, @@ -77,6 +79,7 @@ impl ShuffleWriterExec { input: Arc, partitioning: CometPartitioning, codec: CompressionCodec, + format: ShuffleFormat, output_data_file: String, output_index_file: String, tracing_enabled: bool, @@ -97,6 +100,7 @@ impl ShuffleWriterExec { output_index_file, cache, codec, + format, tracing_enabled, write_buffer_size, }) @@ -159,6 +163,7 @@ impl ExecutionPlan for ShuffleWriterExec { Arc::clone(&children[0]), self.partitioning.clone(), self.codec.clone(), + self.format.clone(), self.output_data_file.clone(), self.output_index_file.clone(), self.tracing_enabled, @@ -188,6 +193,7 @@ impl ExecutionPlan for ShuffleWriterExec { metrics, context, self.codec.clone(), + self.format.clone(), self.tracing_enabled, self.write_buffer_size, ) @@ -208,6 +214,7 @@ async fn external_shuffle( metrics: ShufflePartitionerMetrics, context: Arc, codec: CompressionCodec, + format: ShuffleFormat, tracing_enabled: bool, write_buffer_size: usize, ) -> Result { @@ -223,6 +230,7 @@ async fn external_shuffle( metrics, context.session_config().batch_size(), codec, + format, write_buffer_size, )?) } @@ -236,6 +244,7 @@ async fn external_shuffle( context.runtime_env(), context.session_config().batch_size(), codec, + format, tracing_enabled, write_buffer_size, )?), @@ -341,6 +350,47 @@ mod test { shuffle_write_test(10000, 100, 200, Some(10 * 1024 * 1024)); } + // --- IPC stream format tests --- + // These mirror the block format tests above to ensure IPC stream format + // produces valid output for the same scenarios. + + #[test] + #[cfg_attr(miri, ignore)] + fn test_ipc_stream_single_partition() { + shuffle_write_test_with_format(1000, 100, 1, None, ShuffleFormat::IpcStream); + shuffle_write_test_with_format(10000, 10, 1, None, ShuffleFormat::IpcStream); + } + + #[test] + #[cfg_attr(miri, ignore)] + fn test_ipc_stream_multi_partition() { + shuffle_write_test_with_format(1000, 10, 16, None, ShuffleFormat::IpcStream); + } + + /// Regression: many partitions means some receive zero rows, producing + /// empty IPC streams (schema + EOS, no batches). The reader must treat + /// these as EOF rather than erroring. + #[test] + #[cfg_attr(miri, ignore)] + fn test_ipc_stream_many_partitions_some_empty() { + // 200 partitions with only 1000 rows — many partitions will be empty + shuffle_write_test_with_format(1000, 1, 200, None, ShuffleFormat::IpcStream); + } + + /// Regression: spilling with IPC stream format must produce valid + /// length-prefixed streams that can be raw-copied to the output file. + #[test] + #[cfg_attr(miri, ignore)] + fn test_ipc_stream_with_spilling() { + shuffle_write_test_with_format( + 10000, + 100, + 200, + Some(10 * 1024 * 1024), + ShuffleFormat::IpcStream, + ); + } + #[tokio::test] async fn shuffle_partitioner_memory() { let batch = create_batch(900); @@ -360,6 +410,7 @@ mod test { runtime_env, 1024, CompressionCodec::Lz4Frame, + ShuffleFormat::Block, false, 1024 * 1024, // write_buffer_size: 1MB default ) @@ -402,6 +453,22 @@ mod test { num_batches: usize, num_partitions: usize, memory_limit: Option, + ) { + shuffle_write_test_with_format( + batch_size, + num_batches, + num_partitions, + memory_limit, + ShuffleFormat::Block, + ); + } + + fn shuffle_write_test_with_format( + batch_size: usize, + num_batches: usize, + num_partitions: usize, + memory_limit: Option, + format: ShuffleFormat, ) { let batch = create_batch(batch_size); @@ -462,6 +529,7 @@ mod test { ))), partitioning, CompressionCodec::Zstd(1), + format.clone(), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), false, @@ -521,6 +589,7 @@ mod test { ))), CometPartitioning::RoundRobin(num_partitions, 0), CompressionCodec::Zstd(1), + ShuffleFormat::Block, data_file.clone(), index_file.clone(), false, diff --git a/native/shuffle/src/writers/buf_batch_writer.rs b/native/shuffle/src/writers/buf_batch_writer.rs index cfddb46539..9b8d17c315 100644 --- a/native/shuffle/src/writers/buf_batch_writer.rs +++ b/native/shuffle/src/writers/buf_batch_writer.rs @@ -134,9 +134,3 @@ impl, W: Write> BufBatchWriter { Ok(()) } } - -impl, W: Write + Seek> BufBatchWriter { - pub(crate) fn writer_stream_position(&mut self) -> datafusion::common::Result { - self.writer.stream_position().map_err(Into::into) - } -} diff --git a/native/shuffle/src/writers/ipc_stream_writer.rs b/native/shuffle/src/writers/ipc_stream_writer.rs new file mode 100644 index 0000000000..9a57a0261d --- /dev/null +++ b/native/shuffle/src/writers/ipc_stream_writer.rs @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::RecordBatch; +use arrow::datatypes::Schema; +use arrow::ipc::writer::{IpcWriteOptions, StreamWriter}; +use arrow::ipc::CompressionType; +use datafusion::common::DataFusionError; +use datafusion::error::Result; +use datafusion::physical_plan::metrics::Time; +use std::io::{Seek, SeekFrom, Write}; + +use super::CompressionCodec; + +/// Maps a [`CompressionCodec`] to Arrow IPC [`IpcWriteOptions`]. +/// +/// Arrow IPC body compression supports LZ4_FRAME and ZSTD. Snappy is not +/// part of the Arrow IPC specification and will return an error. +fn ipc_write_options(codec: &CompressionCodec) -> Result { + match codec { + CompressionCodec::None => Ok(IpcWriteOptions::default()), + CompressionCodec::Lz4Frame => IpcWriteOptions::default() + .try_with_compression(Some(CompressionType::LZ4_FRAME)) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)), + CompressionCodec::Zstd(_) => IpcWriteOptions::default() + .try_with_compression(Some(CompressionType::ZSTD)) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)), + CompressionCodec::Snappy => Err(DataFusionError::NotImplemented( + "Snappy compression is not supported for IPC stream format. \ + Use LZ4 or ZSTD instead." + .to_string(), + )), + } +} + +/// Writes record batches as a standard Arrow IPC stream. +/// +/// Unlike [`super::ShuffleBlockWriter`] which writes each batch as a self-contained +/// block with a custom header (length prefix + field count + codec), this writer +/// produces a standard Arrow IPC stream where the schema is written once and +/// each batch is an IPC record batch message within the stream. +/// +/// Benefits over the block-based format: +/// - Schema is written once per stream instead of once per batch +/// - Standard Arrow IPC format, readable by any Arrow-compatible tool +/// - Uses Arrow's built-in IPC body compression (LZ4_FRAME or ZSTD) +/// +/// The writer is stateful: it must be created, used to write batches, then +/// finished. The schema is written on creation and the end-of-stream marker +/// is written on [`finish`](Self::finish). +/// +/// # Example +/// +/// ```ignore +/// let mut writer = IpcStreamWriter::try_new(file, &schema, CompressionCodec::Lz4Frame)?; +/// writer.write_batch(&batch1, &ipc_time)?; +/// writer.write_batch(&batch2, &ipc_time)?; +/// let file = writer.finish()?; +/// ``` +pub struct IpcStreamWriter { + writer: StreamWriter, +} + +impl IpcStreamWriter { + /// Creates a new IPC stream writer. + /// + /// Writes the IPC stream header (schema message) to the output immediately. + pub fn try_new(output: W, schema: &Schema, codec: CompressionCodec) -> Result { + let options = ipc_write_options(&codec)?; + let writer = StreamWriter::try_new_with_options(output, schema, options)?; + Ok(Self { writer }) + } + + /// Writes a record batch as an IPC message within the stream. + /// + /// Empty batches (0 rows) are skipped. + pub fn write_batch(&mut self, batch: &RecordBatch, ipc_time: &Time) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + let mut timer = ipc_time.timer(); + self.writer.write(batch)?; + timer.stop(); + Ok(()) + } + + /// Writes the end-of-stream marker and returns the underlying writer. + pub fn finish(mut self) -> Result { + self.writer.finish()?; + self.writer.into_inner().map_err(Into::into) + } +} + +impl IpcStreamWriter { + /// Returns the current stream position of the underlying writer. + pub fn stream_position(&mut self) -> std::io::Result { + self.writer.get_mut().stream_position() + } + + /// Creates a new IPC stream writer with space reserved for an 8-byte length + /// prefix. Call [`finish_length_prefixed`](Self::finish_length_prefixed) + /// instead of `finish` to fill in the prefix. + pub fn try_new_length_prefixed( + mut output: W, + schema: &Schema, + codec: CompressionCodec, + ) -> Result { + // Reserve 8 bytes for the length prefix (filled in on finish) + output.write_all(&[0u8; 8])?; + Self::try_new(output, schema, codec) + } + + /// Finishes the IPC stream and fills in the 8-byte length prefix that was + /// reserved by [`try_new_length_prefixed`](Self::try_new_length_prefixed). + /// + /// The length prefix covers the IPC stream data only (not itself). + pub fn finish_length_prefixed(self, start_pos: u64) -> Result { + let mut output = self.finish()?; + let end_pos = output.stream_position()?; + let ipc_length = end_pos - start_pos - 8; + output.seek(SeekFrom::Start(start_pos))?; + output.write_all(&ipc_length.to_le_bytes())?; + output.seek(SeekFrom::Start(end_pos))?; + Ok(output) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::ipc::reader::StreamReader; + use std::io::Cursor; + use std::sync::Arc; + + fn test_schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, true), + ]) + } + + fn test_batch(schema: &Schema, n: i32) -> RecordBatch { + RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![n, n + 1, n + 2])), + Arc::new(StringArray::from(vec![Some("hello"), None, Some("world")])), + ], + ) + .unwrap() + } + + fn roundtrip(codec: CompressionCodec, num_batches: usize) { + let schema = test_schema(); + let ipc_time = Time::default(); + + let mut buf = Vec::new(); + { + let cursor = Cursor::new(&mut buf); + let mut writer = IpcStreamWriter::try_new(cursor, &schema, codec).unwrap(); + for i in 0..num_batches { + let batch = test_batch(&schema, (i * 10) as i32); + writer.write_batch(&batch, &ipc_time).unwrap(); + } + writer.finish().unwrap(); + } + + // Read back + let cursor = Cursor::new(&buf); + let reader = StreamReader::try_new(cursor, None).unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches.len(), num_batches); + + for (i, batch) in batches.iter().enumerate() { + assert_eq!(batch.num_rows(), 3); + let col_a = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let base = (i * 10) as i32; + assert_eq!(col_a.value(0), base); + assert_eq!(col_a.value(1), base + 1); + assert_eq!(col_a.value(2), base + 2); + } + } + + #[test] + fn test_roundtrip_no_compression() { + roundtrip(CompressionCodec::None, 3); + } + + #[test] + fn test_roundtrip_lz4() { + roundtrip(CompressionCodec::Lz4Frame, 3); + } + + #[test] + fn test_roundtrip_zstd() { + roundtrip(CompressionCodec::Zstd(1), 3); + } + + #[test] + fn test_empty_batch_skipped() { + let schema = test_schema(); + let ipc_time = Time::default(); + + let mut buf = Vec::new(); + { + let cursor = Cursor::new(&mut buf); + let mut writer = + IpcStreamWriter::try_new(cursor, &schema, CompressionCodec::None).unwrap(); + + // Write a real batch, an empty batch, then another real batch + writer + .write_batch(&test_batch(&schema, 0), &ipc_time) + .unwrap(); + let empty = RecordBatch::new_empty(Arc::new(schema.clone())); + writer.write_batch(&empty, &ipc_time).unwrap(); + writer + .write_batch(&test_batch(&schema, 10), &ipc_time) + .unwrap(); + writer.finish().unwrap(); + } + + let cursor = Cursor::new(&buf); + let reader = StreamReader::try_new(cursor, None).unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches.len(), 2); // empty batch was skipped + } + + #[test] + fn test_snappy_unsupported() { + let schema = test_schema(); + let buf = Vec::new(); + let cursor = Cursor::new(buf); + let result = IpcStreamWriter::try_new(cursor, &schema, CompressionCodec::Snappy); + let err = match result { + Err(e) => e.to_string(), + Ok(_) => panic!("expected error for Snappy"), + }; + assert!(err.contains("Snappy compression is not supported")); + } + + #[test] + fn test_single_batch() { + roundtrip(CompressionCodec::None, 1); + } + + /// Regression test: an IPC stream with zero batches (schema + EOS only) + /// must be readable without error. This happens when a partition receives + /// no rows from a map task. Previously this caused "Empty IPC stream in + /// shuffle block" errors in the reader. + #[test] + fn test_empty_stream_no_batches() { + let schema = test_schema(); + + // Write a stream with zero batches + let mut buf = Vec::new(); + { + let cursor = Cursor::new(&mut buf); + let writer = IpcStreamWriter::try_new(cursor, &schema, CompressionCodec::None).unwrap(); + // Finish immediately without writing any batches + writer.finish().unwrap(); + } + + assert!(!buf.is_empty(), "Stream should contain schema + EOS"); + + // Read back — should yield zero batches, not error + let cursor = Cursor::new(&buf); + let reader = StreamReader::try_new(cursor, None).unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches.len(), 0); + } + + /// Regression test: length-prefixed IPC streams must roundtrip correctly. + /// The length prefix is needed so the Java reader can frame IPC stream + /// data without parsing Arrow IPC message headers. + #[test] + fn test_length_prefixed_roundtrip() { + let schema = test_schema(); + let ipc_time = Time::default(); + + let mut buf = Vec::new(); + let mut cursor = Cursor::new(&mut buf); + + // Write a length-prefixed stream + let start_pos = cursor.stream_position().unwrap(); + let mut writer = + IpcStreamWriter::try_new_length_prefixed(&mut cursor, &schema, CompressionCodec::None) + .unwrap(); + writer + .write_batch(&test_batch(&schema, 0), &ipc_time) + .unwrap(); + writer + .write_batch(&test_batch(&schema, 10), &ipc_time) + .unwrap(); + writer.finish_length_prefixed(start_pos).unwrap(); + + // Verify: first 8 bytes are length prefix, remaining is valid IPC stream + let length = u64::from_le_bytes(buf[0..8].try_into().unwrap()); + assert_eq!(length as usize, buf.len() - 8); + + // The IPC stream data after the prefix should be readable + let ipc_data = &buf[8..]; + let reader = StreamReader::try_new(ipc_data, None).unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].num_rows(), 3); + assert_eq!(batches[1].num_rows(), 3); + } + + /// Regression test: length-prefixed empty stream (no batches). + /// The reader must handle this as EOF rather than erroring. + #[test] + fn test_length_prefixed_empty_stream() { + let schema = test_schema(); + + let mut buf = Vec::new(); + let mut cursor = Cursor::new(&mut buf); + + let start_pos = cursor.stream_position().unwrap(); + let writer = + IpcStreamWriter::try_new_length_prefixed(&mut cursor, &schema, CompressionCodec::None) + .unwrap(); + writer.finish_length_prefixed(start_pos).unwrap(); + + // Length prefix should point to valid (empty) IPC stream + let length = u64::from_le_bytes(buf[0..8].try_into().unwrap()); + assert_eq!(length as usize, buf.len() - 8); + + let ipc_data = &buf[8..]; + let reader = StreamReader::try_new(ipc_data, None).unwrap(); + let batches: Vec = reader.map(|r| r.unwrap()).collect(); + assert_eq!(batches.len(), 0); + } + + /// Tests that multiple length-prefixed IPC streams can be written + /// back-to-back and read independently. This is how the multi-partition + /// output file is structured (spill stream + remaining-batches stream). + #[test] + fn test_multiple_length_prefixed_streams() { + let schema = test_schema(); + let ipc_time = Time::default(); + + let mut buf = Vec::new(); + let mut cursor = Cursor::new(&mut buf); + + // Write two length-prefixed streams back to back + for base in [0, 100] { + let start_pos = cursor.stream_position().unwrap(); + let mut writer = IpcStreamWriter::try_new_length_prefixed( + &mut cursor, + &schema, + CompressionCodec::None, + ) + .unwrap(); + writer + .write_batch(&test_batch(&schema, base), &ipc_time) + .unwrap(); + writer.finish_length_prefixed(start_pos).unwrap(); + } + + // Read them back: parse length prefix, read IPC stream, repeat + let mut offset = 0; + let mut all_batches = Vec::new(); + while offset < buf.len() { + let length = u64::from_le_bytes(buf[offset..offset + 8].try_into().unwrap()) as usize; + let ipc_data = &buf[offset + 8..offset + 8 + length]; + let reader = StreamReader::try_new(ipc_data, None).unwrap(); + for batch in reader { + all_batches.push(batch.unwrap()); + } + offset += 8 + length; + } + assert_eq!(all_batches.len(), 2); + assert_eq!( + all_batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 0 + ); + assert_eq!( + all_batches[1] + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 100 + ); + } +} diff --git a/native/shuffle/src/writers/mod.rs b/native/shuffle/src/writers/mod.rs index 75caf9f3a3..a7491fb791 100644 --- a/native/shuffle/src/writers/mod.rs +++ b/native/shuffle/src/writers/mod.rs @@ -17,10 +17,12 @@ mod buf_batch_writer; mod checksum; +mod ipc_stream_writer; mod shuffle_block_writer; mod spill; pub(crate) use buf_batch_writer::BufBatchWriter; pub(crate) use checksum::Checksum; +pub use ipc_stream_writer::IpcStreamWriter; pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter}; pub(crate) use spill::PartitionWriter; diff --git a/native/shuffle/src/writers/spill.rs b/native/shuffle/src/writers/spill.rs index c16caddbf9..872172a548 100644 --- a/native/shuffle/src/writers/spill.rs +++ b/native/shuffle/src/writers/spill.rs @@ -15,38 +15,45 @@ // specific language governing permissions and limitations // under the License. -use super::ShuffleBlockWriter; +use super::{IpcStreamWriter, ShuffleBlockWriter}; use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::PartitionedBatchIterator; use crate::writers::buf_batch_writer::BufBatchWriter; +use crate::ShuffleFormat; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv; use std::fs::{File, OpenOptions}; - -/// A temporary disk file for spilling a partition's intermediate shuffle data. -struct SpillFile { - temp_file: RefCountedTempFile, - file: File, -} +use std::io::{BufWriter, Seek}; /// Manages encoding and optional disk spilling for a single shuffle partition. +/// +/// For block format, each `spill()` call appends self-contained blocks to the +/// spill file. For IPC stream format, an `IpcStreamWriter` is kept open across +/// spill calls so that all spilled data forms a single IPC stream (one schema +/// header, many batch messages, one EOS marker written at finish time). pub(crate) struct PartitionWriter { - /// Spill file for intermediate shuffle output for this partition. Each spill event - /// will append to this file and the contents will be copied to the shuffle file at - /// the end of processing. - spill_file: Option, - /// Writer that performs encoding and compression + spill_file: Option, shuffle_block_writer: ShuffleBlockWriter, + /// Persistent IPC stream writer for the spill file, kept open across + /// multiple `spill()` calls. + ipc_spill_writer: Option>>, + /// Start position of the current IPC stream in the spill file (for length prefix). + ipc_spill_start_pos: u64, + format: ShuffleFormat, } impl PartitionWriter { pub(crate) fn try_new( shuffle_block_writer: ShuffleBlockWriter, + format: ShuffleFormat, ) -> datafusion::common::Result { Ok(Self { spill_file: None, shuffle_block_writer, + ipc_spill_writer: None, + ipc_spill_start_pos: 0, + format, }) } @@ -55,26 +62,16 @@ impl PartitionWriter { runtime: &RuntimeEnv, ) -> datafusion::common::Result<()> { if self.spill_file.is_none() { - // Spill file is not yet created, create it - let spill_file = runtime - .disk_manager - .create_tmp_file("shuffle writer spill")?; - let spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(spill_file.path()) - .map_err(|e| { - DataFusionError::Execution(format!("Error occurred while spilling {e}")) - })?; - self.spill_file = Some(SpillFile { - temp_file: spill_file, - file: spill_data, - }); + self.spill_file = Some( + runtime + .disk_manager + .create_tmp_file("shuffle writer spill")?, + ); } Ok(()) } + #[allow(clippy::too_many_arguments)] pub(crate) fn spill( &mut self, iter: &mut PartitionedBatchIterator, @@ -82,41 +79,90 @@ impl PartitionWriter { metrics: &ShufflePartitionerMetrics, write_buffer_size: usize, batch_size: usize, + codec: &crate::CompressionCodec, + schema: &arrow::datatypes::Schema, ) -> datafusion::common::Result { - if let Some(batch) = iter.next() { - self.ensure_spill_file_created(runtime)?; + let Some(batch) = iter.next() else { + return Ok(0); + }; + + self.ensure_spill_file_created(runtime)?; + let spill_path = self.spill_file.as_ref().unwrap().path().to_owned(); - let total_bytes_written = { + match &self.format { + ShuffleFormat::Block => { + let mut file = OpenOptions::new() + .append(true) + .open(&spill_path) + .map_err(|e| { + DataFusionError::Execution(format!("Error occurred while spilling {e}")) + })?; let mut buf_batch_writer = BufBatchWriter::new( &mut self.shuffle_block_writer, - &mut self.spill_file.as_mut().unwrap().file, + &mut file, write_buffer_size, batch_size, ); let mut bytes_written = buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?; for batch in iter { - let batch = batch?; bytes_written += buf_batch_writer.write( - &batch, + &batch?, &metrics.encode_time, &metrics.write_time, )?; } buf_batch_writer.flush(&metrics.encode_time, &metrics.write_time)?; - bytes_written - }; + Ok(bytes_written) + } + ShuffleFormat::IpcStream => { + // Lazily open the IPC stream writer on first spill. It stays + // open so subsequent spills append batches to the same stream. + // Uses length prefix so the spill file can be raw-copied into + // the output and the reader can frame it. + if self.ipc_spill_writer.is_none() { + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&spill_path) + .map_err(|e| { + DataFusionError::Execution(format!("Error occurred while spilling {e}")) + })?; + let mut buf_writer = BufWriter::with_capacity(write_buffer_size, file); + self.ipc_spill_start_pos = buf_writer.stream_position()?; + self.ipc_spill_writer = Some(IpcStreamWriter::try_new_length_prefixed( + buf_writer, + schema, + codec.clone(), + )?); + } + let ipc_writer = self.ipc_spill_writer.as_mut().unwrap(); + let pos_before = ipc_writer.stream_position()?; + ipc_writer.write_batch(&batch?, &metrics.encode_time)?; + for batch in iter { + ipc_writer.write_batch(&batch?, &metrics.encode_time)?; + } + let pos_after = ipc_writer.stream_position()?; + Ok((pos_after - pos_before) as usize) + } + } + } - Ok(total_bytes_written) - } else { - Ok(0) + /// Finish the IPC spill stream writer if one is open. Must be called + /// before raw-copying the spill file to the output. + pub(crate) fn finish_spill(&mut self) -> datafusion::common::Result<()> { + if let Some(writer) = self.ipc_spill_writer.take() { + let buf_writer = writer.finish_length_prefixed(self.ipc_spill_start_pos)?; + buf_writer + .into_inner() + .map_err(|e| DataFusionError::Execution(format!("flush error: {e}")))?; } + Ok(()) } pub(crate) fn path(&self) -> Option<&std::path::Path> { - self.spill_file - .as_ref() - .map(|spill_file| spill_file.temp_file.path()) + self.spill_file.as_ref().map(|f| f.path()) } #[cfg(test)] diff --git a/spark/src/main/java/org/apache/comet/CometShuffleBlockIterator.java b/spark/src/main/java/org/apache/comet/CometShuffleBlockIterator.java index 9f72b20f51..71007d9c16 100644 --- a/spark/src/main/java/org/apache/comet/CometShuffleBlockIterator.java +++ b/spark/src/main/java/org/apache/comet/CometShuffleBlockIterator.java @@ -43,35 +43,50 @@ public class CometShuffleBlockIterator implements Closeable { private static final int INITIAL_BUFFER_SIZE = 128 * 1024; + /** Block format header: 8-byte length + 8-byte field count. */ + private static final int BLOCK_HEADER_SIZE = 16; + + /** IPC stream format header: 8-byte length only. */ + private static final int IPC_STREAM_HEADER_SIZE = 8; + private final ReadableByteChannel channel; private final InputStream inputStream; - private final ByteBuffer headerBuf = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN); + private final ByteBuffer headerBuf = + ByteBuffer.allocate(BLOCK_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); private ByteBuffer dataBuf = ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE); private boolean closed = false; private int currentBlockLength = 0; + private final boolean isIpcStream; public CometShuffleBlockIterator(InputStream in) { + this(in, false); + } + + public CometShuffleBlockIterator(InputStream in, boolean isIpcStream) { this.inputStream = in; this.channel = Channels.newChannel(in); + this.isIpcStream = isIpcStream; } /** - * Reads the next block header and loads the compressed body into the internal buffer. Called by - * native code via JNI. + * Reads the next block header and loads the body into the internal buffer. Called by native code + * via JNI. + * + *

Block format header: 8-byte compressedLength (includes field count but not itself) + 8-byte + * fieldCount (discarded). Body is: 4-byte codec prefix + compressed IPC data. * - *

Header format: 8-byte compressedLength (includes field count but not itself) + 8-byte - * fieldCount (discarded, schema comes from protobuf). + *

IPC stream format header: 8-byte length. Body is: raw Arrow IPC stream data. * - * @return the compressed body length in bytes (codec prefix + compressed IPC), or -1 if EOF + * @return the body length in bytes, or -1 if EOF */ public int hasNext() throws IOException { if (closed) { return -1; } - // Read 16-byte header: clear() resets position=0, limit=capacity, - // preparing the buffer for channel.read() to fill it + int headerSize = isIpcStream ? IPC_STREAM_HEADER_SIZE : BLOCK_HEADER_SIZE; headerBuf.clear(); + headerBuf.limit(headerSize); while (headerBuf.hasRemaining()) { int bytesRead = channel.read(headerBuf); if (bytesRead < 0) { @@ -83,19 +98,25 @@ public int hasNext() throws IOException { } } headerBuf.flip(); - long compressedLength = headerBuf.getLong(); - // Field count discarded - schema determined by ShuffleScan protobuf fields - headerBuf.getLong(); + long length = headerBuf.getLong(); + + long bytesToRead; + if (isIpcStream) { + // IPC stream: length is the IPC stream data size + bytesToRead = length; + } else { + // Block format: length includes the 8-byte field count we already read + headerBuf.getLong(); // discard field count + bytesToRead = length - 8; + } - // Subtract 8 because compressedLength includes the 8-byte field count we already read - long bytesToRead = compressedLength - 8; if (bytesToRead > Integer.MAX_VALUE) { throw new IllegalStateException( "Native shuffle block size of " + bytesToRead + " exceeds maximum of " + Integer.MAX_VALUE - + ". Try reducing spark.comet.columnar.shuffle.batch.size."); + + ". Try reducing shuffle batch size."); } currentBlockLength = (int) bytesToRead; @@ -113,8 +134,6 @@ public int hasNext() throws IOException { throw new EOFException("Data corrupt: unexpected EOF while reading compressed batch"); } } - // Note: native side uses get_direct_buffer_address (base pointer) + currentBlockLength, - // not the buffer's position/limit. No flip needed. return currentBlockLength; } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala index 14e656f038..9709f56440 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometBlockStoreShuffleReader.scala @@ -109,12 +109,17 @@ class CometBlockStoreShuffleReader[K, C]( if (currentReadIterator != null) { currentReadIterator.close() } + val numCols = dep.schema + .map(_.fields.length) + .orElse(Some(dep.outputAttributes.length).filter(_ > 0)) + .getOrElse(-1) currentReadIterator = NativeBatchDecoderIterator( blockIdAndStream._2, dep.decodeTime, nativeLib, nativeUtil, - tracingEnabled) + tracingEnabled, + numCols) currentReadIterator }) .map(b => (0, b)) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..92d388d75b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.comet.CometConf import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, QueryPlanSerde} -import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator} +import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator, ShuffleWriterFormat} import org.apache.comet.serde.QueryPlanSerde.serializeDataType /** @@ -193,6 +193,12 @@ class CometNativeShuffleWriter[K, V]( shuffleWriterBuilder.setWriteBufferSize( CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt) + val format = CometConf.COMET_EXEC_SHUFFLE_FORMAT.get() match { + case "ipc_stream" => ShuffleWriterFormat.IPC_STREAM + case _ => ShuffleWriterFormat.BLOCK + } + shuffleWriterBuilder.setFormat(format) + outputPartitioning match { case p if isSinglePartitioning(p) => val partitioning = PartitioningOuterClass.SinglePartition.newBuilder() diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffledRowRDD.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffledRowRDD.scala index 7604910b06..729fb2128b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffledRowRDD.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffledRowRDD.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsRe import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.comet.CometShuffleBlockIterator +import org.apache.comet.{CometConf, CometShuffleBlockIterator} /** * Different from [[org.apache.spark.sql.execution.ShuffledRowRDD]], this RDD is specialized for @@ -156,7 +156,8 @@ class CometShuffledBatchRDD( split: Partition, context: TaskContext): CometShuffleBlockIterator = { val reader = createReader(split, context) - new CometShuffleBlockIterator(reader.readAsRawStream()) + val isIpcStream = CometConf.COMET_EXEC_SHUFFLE_FORMAT.get() == "ipc_stream" + new CometShuffleBlockIterator(reader.readAsRawStream(), isIpcStream) } override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala index f96c8f16dd..b6595c5bac 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala @@ -26,7 +26,7 @@ import java.nio.channels.{Channels, ReadableByteChannel} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.comet.Native +import org.apache.comet.{CometConf, Native} import org.apache.comet.vector.NativeUtil /** @@ -39,13 +39,14 @@ case class NativeBatchDecoderIterator( decodeTime: SQLMetric, nativeLib: Native, nativeUtil: NativeUtil, - tracingEnabled: Boolean) + tracingEnabled: Boolean, + numColumns: Int = -1) extends Iterator[ColumnarBatch] { private var isClosed = false private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) private var currentBatch: ColumnarBatch = null - private var batch = fetchNext() + private val isIpcStream = CometConf.COMET_EXEC_SHUFFLE_FORMAT.get() == "ipc_stream" import NativeBatchDecoderIterator._ @@ -55,6 +56,8 @@ case class NativeBatchDecoderIterator( null } + private var batch = fetchNext() + def hasNext(): Boolean = { if (channel == null || isClosed) { return false @@ -94,7 +97,7 @@ case class NativeBatchDecoderIterator( return None } - // read compressed batch size from header + // read length from header try { longBuf.clear() while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} @@ -104,8 +107,6 @@ case class NativeBatchDecoderIterator( return None } - // If we reach the end of the stream, we are done, or if we read partial length - // then the stream is corrupted. if (longBuf.hasRemaining) { if (longBuf.position() == 0) { close() @@ -114,31 +115,30 @@ case class NativeBatchDecoderIterator( throw new EOFException("Data corrupt: unexpected EOF while reading compressed ipc lengths") } - // get compressed length (including headers) longBuf.flip() - val compressedLength = longBuf.getLong + val length = longBuf.getLong - // read field count from header - longBuf.clear() - while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} - if (longBuf.hasRemaining) { - throw new EOFException("Data corrupt: unexpected EOF while reading field count") + val (fieldCount, bytesToRead) = if (isIpcStream) { + // IPC stream: length is the data size, field count from schema + (numColumns, length) + } else { + // Block format: read 8-byte field count header + longBuf.clear() + while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} + if (longBuf.hasRemaining) { + throw new EOFException("Data corrupt: unexpected EOF while reading field count") + } + longBuf.flip() + (longBuf.getLong.toInt, length - 8) } - longBuf.flip() - val fieldCount = longBuf.getLong.toInt - // read body - val bytesToRead = compressedLength - 8 if (bytesToRead > Integer.MAX_VALUE) { - // very unlikely that shuffle block will reach 2GB throw new IllegalStateException( s"Native shuffle block size of $bytesToRead exceeds " + s"maximum of ${Integer.MAX_VALUE}. Try reducing shuffle batch size.") } var dataBuf = threadLocalDataBuf.get() if (dataBuf.capacity() < bytesToRead) { - // it is unlikely that we would overflow here since it would - // require a 1GB compressed shuffle block but we check anyway val newCapacity = (bytesToRead * 2L).min(Integer.MAX_VALUE).toInt dataBuf = ByteBuffer.allocateDirect(newCapacity) threadLocalDataBuf.set(dataBuf)